123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887 |
- package service
- import (
- "bytes"
- "context"
- "fmt"
- "strconv"
- "time"
- "go-common/app/job/main/growup/model"
- "go-common/library/database/sql"
- "go-common/library/log"
- )
- const (
- _updateUpAccount = "UPDATE up_account SET withdraw_date_version = '%s', total_unwithdraw_income=total_income - total_withdraw_income - exchange_income where withdraw_date_version = '%s' AND is_deleted = 0"
- _updateTagAdjust = "UPDATE tag_info SET adjust_type = 1 WHERE id = %d AND adjust_type = 0"
- // fix data
- _txUpdateAvIncome = "UPDATE av_income SET total_income = total_income %s, income = income %s WHERE av_id = %d AND mid = %d AND date = '%s'"
- _txUpdateAvIncomeStatis = "UPDATE av_income_statis SET total_income = total_income %s WHERE av_id = %d AND mid = %d"
- _txUpdateAvIncomeStatisIncome = "UPDATE av_income_%s_statis SET income = income %s WHERE category_id = %d AND cdate = '%s'"
- _txUpAccount = "UPDATE up_account SET total_income = total_income %s, total_unwithdraw_income = total_unwithdraw_income %s WHERE mid = %d AND is_deleted = 0"
- _txUpIncomeTable = "UPDATE %s SET av_income = av_income %s, total_income = total_income %s, income = income %s WHERE mid = %d AND date = '%s'"
- _txUpIncomeStatis = "UPDATE up_income_statis SET total_income = total_income %s WHERE mid = %d"
- _txUpIncomeDailyStatis = "UPDATE up_income_daily_statis set income = income %s WHERE cdate = '%s'"
- _txTagInsertUpIncomeDate = "INSERT INTO %s(mid,income,total_income,date) VALUES(%d,%d,%d,'%s') ON DUPLICATE KEY UPDATE total_income=VALUES(total_income),income=VALUES(income)"
- _txTagInsertUpIncomeStatis = "INSERT INTO up_income_statis(mid, total_income) VALUES(%d,%d) ON DUPLICATE KEY UPDATE total_income=VALUES(total_income)"
- _txTagInsertUpAccount = "INSERT INTO up_account(mid,has_sign_contract,state,total_income,total_unwithdraw_income,withdraw_date_version) VALUES(%d,1,1,%d,%d,'%s') ON DUPLICATE KEY UPDATE total_income=VALUES(total_income),total_unwithdraw_income=VALUES(total_unwithdraw_income)"
- _txTagUpIncomeDailyStatis = "UPDATE up_income_daily_statis SET ups = ups + %d WHERE cdate = '%s' AND money_section = %d"
- _txUpAvStatis = "INSERT INTO up_av_statis(mid,weekly_date,weekly_av_ids,monthly_date,monthly_av_ids) VALUES(%d,'2018-05-28','%s', '2018-05-01', '%s') ON DUPLICATE KEY UPDATE mid=VALUES(mid)"
- _txAddUpIncomeStatis = "UPDATE up_income_statis SET total_income = total_income + %d WHERE mid = %d"
- _txAddAvIncomeStatis = "UPDATE av_income_statis SET total_income = total_income + %d WHERE av_id = %d"
- _txAddUpIncome = "UPDATE %s SET total_income = total_income + %d, income = income + %d, av_income = av_income + %d WHERE mid = %d AND date = '%s'"
- _txAddAvIncome = "UPDATE av_income SET total_income = total_income + %d, income = income + %d WHERE av_id = %d AND date = '%s'"
- _txAddUpAccount = "UPDATE up_account SET total_income = total_income + %d, total_unwithdraw_income = total_unwithdraw_income + %d WHERE mid = %d"
- _txUpdateAccountType = "UPDATE up_info_video SET account_type=%d WHERE mid=%d"
- _txUpdateUpAccountMoney = "UPDATE up_account SET total_income = total_income + %d, total_unwithdraw_income = total_unwithdraw_income + %d WHERE mid = %d LIMIT 1"
- _txUpdateUpBaseIncome = "UPDATE up_income SET base_income=%d WHERE mid=%d AND date='%s'"
- _txInUpInfoPGCSQL = "INSERT INTO up_info_bgm(mid,nickname,fans,account_type,account_state,sign_type) values(%d,%s,%d,%d,%d,%d) ON DUPLICATE KEY UPDATE account_type=VALUES(account_type)"
- _txDelAvBreachSQL = "DELETE FROM av_breach_record WHERE id = %d LIMIT 1"
- _txUpTotalIncomeSQL = "UPDATE %s SET total_income = total_income - %d WHERE mid = %d AND date = '%s' LIMIT 1"
- _txColumnTagSQL = "UPDATE %s SET tag_id = %d WHERE date = '2018-08-19' AND tag_id in (%s) AND inc_charge > 0"
- _txUpdateBgmBaseIncome = "UPDATE up_income SET bgm_base_income=bgm_income WHERE mid=%d AND date='%s' AND bgm_base_income=0"
- _txDelData = "DELETE FROM %s LIMIT %d"
- )
- // DelDataLimit del up_bill
- func (s *Service) DelDataLimit(c context.Context, table string, count int64) (err error) {
- if table == "" {
- return
- }
- return s.txUpdateSQL(c, fmt.Sprintf(_txDelData, table, count), count)
- }
- // FixBgmBaseIncome fix bgm base income
- func (s *Service) FixBgmBaseIncome(c context.Context, mid int64, date string) (err error) {
- sql := fmt.Sprintf(_txUpdateBgmBaseIncome, mid, date)
- return s.txUpdateSQL(c, sql, 1)
- }
- // FixBaseIncome fix income
- func (s *Service) FixBaseIncome(c context.Context, base int64, mid int64, date string) (err error) {
- sql := fmt.Sprintf(_txUpdateUpBaseIncome, base, mid, date)
- return s.txUpdateSQL(c, sql, 1)
- }
- // FixIncome fix income
- func (s *Service) FixIncome(c context.Context) (err error) {
- date := time.Date(2018, 9, 10, 0, 0, 0, 0, time.Local)
- total, err := s.getAvIncome(c, date)
- if err != nil {
- log.Error("s.getAvIncome error(%v)", err)
- return
- }
- avIncomes := make([]*model.IncomeInfo, 0)
- for _, av := range total {
- if av.UploadTime.Unix() >= date.Unix() {
- avIncomes = append(avIncomes, av)
- }
- }
- if len(avIncomes) != 1768 {
- err = fmt.Errorf("get av_income(%d) != 1768", len(avIncomes))
- return
- }
- var tx *sql.Tx
- tx, err = s.dao.BeginTran(c)
- if err != nil {
- log.Error("s.dao.BeginTran error(%v)", err)
- return
- }
- for _, av := range avIncomes {
- // av_income_statis
- err = s.updateSQL(tx, fmt.Sprintf(_txAddAvIncomeStatis, av.Income, av.AVID), 1)
- if err != nil {
- log.Error("s.UpdateSQL(%s) error(%v)", _txAddAvIncomeStatis, err)
- return
- }
- // up_income_statis
- err = s.updateSQL(tx, fmt.Sprintf(_txAddUpIncomeStatis, av.Income, av.MID), 1)
- if err != nil {
- log.Error("s.UpdateSQL(%s) error(%v)", _txAddUpIncomeStatis, err)
- return
- }
- // av_income
- err = s.updateSQL(tx, fmt.Sprintf(_txAddAvIncome, av.Income, av.Income, av.AVID, "2018-09-10"), 1)
- if err != nil {
- log.Error("s.UpdateSQL(%s) error(%v)", _txAddAvIncome, err)
- return
- }
- // up_income
- err = s.updateSQL(tx, fmt.Sprintf(_txAddUpIncome, "up_income", av.Income, av.Income, av.Income, av.MID, "2018-09-10"), 1)
- if err != nil {
- log.Error("s.UpdateSQL(%s) error(%v)", _txAddUpIncome, err)
- return
- }
- // up_income_weekly
- err = s.updateSQL(tx, fmt.Sprintf(_txAddUpIncome, "up_income_weekly", av.Income, av.Income, av.Income, av.MID, "2018-09-10"), 1)
- if err != nil {
- log.Error("s.UpdateSQL(%s) error(%v)", _txAddUpIncome, err)
- return
- }
- // up_income_monthly
- err = s.updateSQL(tx, fmt.Sprintf(_txAddUpIncome, "up_income_monthly", av.Income, av.Income, av.Income, av.MID, "2018-09-01"), 1)
- if err != nil {
- log.Error("s.UpdateSQL(%s) error(%v)", _txAddUpIncome, err)
- return
- }
- // up_account
- err = s.updateSQL(tx, fmt.Sprintf(_txAddUpAccount, av.Income, av.Income, av.MID), 1)
- if err != nil {
- log.Error("s.UpdateSQL(%s) error(%v)", _txAddUpAccount, err)
- return
- }
- }
- if err = tx.Commit(); err != nil {
- log.Error("tx.Commit error")
- }
- return
- }
- // FixUpAvStatis fix up_av_statis
- func (s *Service) FixUpAvStatis(c context.Context, count int) (err error) {
- upIncome, err := s.GetUpIncome(c, "up_income", "2018-05-31")
- if err != nil {
- log.Error("s.GetUpIncome error(%v)", err)
- return
- }
- tx, err := s.dao.BeginTran(c)
- if err != nil {
- log.Error("s.dao.BeginTran error(%v)", err)
- return
- }
- // 301
- addCount := 0
- for _, up := range upIncome {
- if up.TotalIncome == 12700 && up.Income == 12700 {
- err = s.updateSQL(tx, fmt.Sprintf(_txUpAvStatis, up.MID, "", ""), 0)
- if err != nil {
- log.Error("s.UpdateSQL error(%v)", err)
- return
- }
- addCount++
- }
- }
- if count != addCount {
- err = fmt.Errorf("需要添加的record不匹配 %d:%d", count, addCount)
- tx.Rollback()
- return
- }
- if err = tx.Commit(); err != nil {
- log.Error("tx.Commit error")
- }
- return
- }
- // FixUpIncome fix up_income
- func (s *Service) FixUpIncome(c context.Context, date string, tagID int64, addCount, needAddIncome int) (err error) {
- upIncome, err := s.GetUpIncome(c, "up_income", date)
- if err != nil {
- log.Error("s.GetUpIncome error(%v)", err)
- return
- }
- upChargeRatio, err := s.dao.GetUpChargeRatio(c, tagID)
- if err != nil {
- log.Error("s.dao.GetUpChargeRatio error(%v)", err)
- return
- }
- for _, up := range upIncome {
- if _, ok := upChargeRatio[up.MID]; ok {
- delete(upChargeRatio, up.MID)
- }
- }
- if len(upChargeRatio) != addCount {
- err = fmt.Errorf("需要调节的up主数量不匹配 %d:%d", len(upChargeRatio), addCount)
- return
- }
- mids := make([]int64, 0)
- for mid := range upChargeRatio {
- mids = append(mids, mid)
- }
- upIncomeStatis, err := s.dao.GetUpIncomeStatis(c, mids)
- if err != nil {
- log.Error("s.dao.GetUpIncomeStatis error(%v)", err)
- return
- }
- upIncomeWeek, err := s.dao.GetUpIncomeDate(c, mids, "up_income_weekly", "2018-05-28")
- if err != nil {
- log.Error("s.dao.GetUpIncomeDate error(%v)", err)
- return
- }
- upIncomeMonth, err := s.dao.GetUpIncomeDate(c, mids, "up_income_monthly", "2018-05-01")
- if err != nil {
- log.Error("s.dao.GetUpIncomeDate error(%v)", err)
- return
- }
- tx, err := s.dao.BeginTran(c)
- if err != nil {
- log.Error("s.dao.BeginTran error(%v)", err)
- return
- }
- // start add
- var totalAddIncome int64
- for mid, ratio := range upChargeRatio {
- totalIncome := upIncomeStatis[mid]
- weekIncome := upIncomeWeek[mid]
- monthIncome := upIncomeMonth[mid]
- // up_income
- err = s.insertIntoSQL(tx, fmt.Sprintf(_txTagInsertUpIncomeDate, "up_income", mid, ratio, totalIncome+ratio, date), 1)
- if err != nil {
- log.Error("s.UpdateSQL error(%v)", err)
- return
- }
- // up_income_weekly todo
- weekDate := "2018-05-28"
- err = s.insertIntoSQL(tx, fmt.Sprintf(_txTagInsertUpIncomeDate, "up_income_weekly", mid, ratio+weekIncome, totalIncome+ratio, weekDate), 0)
- if err != nil {
- log.Error("s.UpdateSQL error(%v)", err)
- return
- }
- // up_income_monthly todo
- monthDate := "2018-05-01"
- err = s.insertIntoSQL(tx, fmt.Sprintf(_txTagInsertUpIncomeDate, "up_income_monthly", mid, ratio+monthIncome, totalIncome+ratio, monthDate), 0)
- if err != nil {
- log.Error("s.UpdateSQL error(%v)", err)
- return
- }
- // up_income_statis
- err = s.insertIntoSQL(tx, fmt.Sprintf(_txTagInsertUpIncomeStatis, mid, ratio+totalIncome), 0)
- if err != nil {
- log.Error("s.insertIntoSQL error(%v)", err)
- return
- }
- // up_account
- err = s.insertIntoSQL(tx, fmt.Sprintf(_txTagInsertUpAccount, mid, ratio+totalIncome, ratio+totalIncome, "2018-04"), 0)
- if err != nil {
- log.Error("s.insertIntoSQL error(%v)", err)
- return
- }
- totalAddIncome += ratio
- }
- // up_income_daily_statis
- if totalAddIncome != int64(needAddIncome) {
- err = fmt.Errorf("需要调节的up主总收入不匹配 %d:%d", totalAddIncome, needAddIncome)
- tx.Rollback()
- return
- }
- add := fmt.Sprintf(" + %d", totalAddIncome)
- err = s.updateSQL(tx, fmt.Sprintf(_txUpIncomeDailyStatis, add, date), 12)
- if err != nil {
- log.Error("s.UpdateSQL error(%v)", err)
- return
- }
- err = s.updateSQL(tx, fmt.Sprintf(_txTagUpIncomeDailyStatis, len(upChargeRatio), date, 3), 1)
- if err != nil {
- log.Error("s.UpdateSQL error(%v)", err)
- return
- }
- if err = tx.Commit(); err != nil {
- log.Error("tx.Commit error")
- }
- return
- }
- func (s *Service) getAllAvRatio(c context.Context, limit int64) (rs map[int64]*model.AvChargeRatio, err error) {
- rs = make(map[int64]*model.AvChargeRatio)
- var id int64
- for {
- var ros map[int64]*model.AvChargeRatio
- ros, id, err = s.dao.AvChargeRatio(c, id, limit)
- if err != nil {
- return
- }
- if len(ros) == 0 {
- break
- }
- for k, v := range ros {
- rs[k] = v
- }
- }
- return
- }
- // UpdateTagIncome update tag_Info income
- func (s *Service) UpdateTagIncome(c context.Context, date string) (err error) {
- avRatio, err := s.getAllAvRatio(c, 2000)
- if err != nil {
- log.Error("s.getAllAvRatio error(%v)", err)
- return
- }
- log.Info("get avratios:%d", len(avRatio))
- updateDate := time.Date(2018, 6, 24, 0, 0, 0, 0, time.Local)
- avIncome, err := s.getAvIncome(c, updateDate)
- if err != nil {
- log.Error("s.getAvIncome error(%v)", err)
- return
- }
- log.Info("get av_income:%d ", len(avIncome))
- err = s.updateIncome(c, avIncome, avRatio)
- if err != nil {
- log.Error("s.updateIncome error(%v)", err)
- return
- }
- return
- }
- // GetTrueAvsIncome get true av_income
- func (s *Service) GetTrueAvsIncome(c context.Context, mids []int64, date string) (avs map[int64]*model.Patch, err error) {
- avs = make(map[int64]*model.Patch)
- for _, mid := range mids {
- var av map[int64]*model.Patch
- av, err = s.AvIncomes(c, mid, date)
- if err != nil {
- return
- }
- for key, val := range av {
- avs[key] = val
- }
- }
- return
- }
- func (s *Service) updateIncome(c context.Context, avIncome []*model.IncomeInfo, avRatio map[int64]*model.AvChargeRatio) (err error) {
- trueAvs := make([]*model.IncomeInfo, 0)
- uploadTime := time.Date(2018, 6, 24, 0, 0, 0, 0, time.Local)
- for _, av := range avIncome {
- if !uploadTime.After(av.UploadTime) {
- if _, ok := avRatio[av.AVID]; !ok {
- trueAvs = append(trueAvs, av)
- }
- }
- }
- if len(trueAvs) != 1856 {
- err = fmt.Errorf("实际被作用稿件(%d) != 1856", len(trueAvs))
- return
- }
- tx, err := s.dao.BeginTran(c)
- if err != nil {
- log.Error("s.dao.BeginTran error(%v)", err)
- return
- }
- for _, av := range trueAvs {
- var incIncome, categoryID int64
- incIncome, categoryID, err = s.dao.AvDailyIncCharge(c, av.AVID)
- if err != nil {
- log.Error("s.dao.AvDailyIncCharge avid(%d) error(%v)", av.MID, err)
- return
- }
- err = s.TxUpdateIncome(tx, av.AVID, av.MID, categoryID, incIncome)
- if err != nil {
- log.Error("ERROR(%v) avid(%d), mid(%d)", err, av.AVID, av.MID)
- return
- }
- }
- if err = tx.Commit(); err != nil {
- log.Error("tx.Commit error")
- }
- return
- }
- // TxUpdateIncome update creative income
- func (s *Service) TxUpdateIncome(tx *sql.Tx, avID, mid, categoryID int64, incIncome int64) (err error) {
- if incIncome <= 0 {
- return
- }
- var incIncomeStr string
- if incIncome > 0 {
- incIncomeStr = fmt.Sprintf("+ %d", incIncome)
- }
- date := "2018-06-24"
- // av_income
- avIncomeSQL := fmt.Sprintf(_txUpdateAvIncome, incIncomeStr, incIncomeStr, avID, mid, date)
- err = s.updateSQL(tx, avIncomeSQL, 1)
- if err != nil {
- log.Error("s.UpdateSQL error(%v)", err)
- return
- }
- // av_income_statis
- avIncomeStatisSQL := fmt.Sprintf(_txUpdateAvIncomeStatis, incIncomeStr, avID, mid)
- err = s.updateSQL(tx, avIncomeStatisSQL, 1)
- if err != nil {
- log.Error("s.UpdateSQL error(%v)", err)
- return
- }
- // av_income_daily_statis
- avIncomeDailyStatisIncome := fmt.Sprintf(_txUpdateAvIncomeStatisIncome, "daily", incIncomeStr, categoryID, date)
- err = s.updateSQL(tx, avIncomeDailyStatisIncome, 12)
- if err != nil {
- log.Error("s.UpdateSQL error(%v)", err)
- return
- }
- // av_income_weekly_statis
- avIncomeWeeklyStatisIncome := fmt.Sprintf(_txUpdateAvIncomeStatisIncome, "weekly", incIncomeStr, categoryID, "2018-06-18")
- err = s.updateSQL(tx, avIncomeWeeklyStatisIncome, 12)
- if err != nil {
- log.Error("s.UpdateSQL error(%v)", err)
- return
- }
- // av_income_monthly_statis
- avIncomeMonthlyStatisIncome := fmt.Sprintf(_txUpdateAvIncomeStatisIncome, "monthly", incIncomeStr, categoryID, "2018-06-01")
- err = s.updateSQL(tx, avIncomeMonthlyStatisIncome, 12)
- if err != nil {
- log.Error("s.UpdateSQL error(%v)", err)
- return
- }
- // up_account
- upAcccountSQL := fmt.Sprintf(_txUpAccount, incIncomeStr, incIncomeStr, mid)
- err = s.updateSQL(tx, upAcccountSQL, 1)
- if err != nil {
- log.Error("s.UpdateSQL error(%v)", err)
- return
- }
- // up_income_statis
- upIncomeStatisSQL := fmt.Sprintf(_txUpIncomeStatis, incIncomeStr, mid)
- err = s.updateSQL(tx, upIncomeStatisSQL, 1)
- if err != nil {
- log.Error("s.UpdateSQL error(%v)", err)
- return
- }
- // up_income_daily_statis
- upIncomeDailyStatisSQL := fmt.Sprintf(_txUpIncomeDailyStatis, incIncomeStr, date)
- err = s.updateSQL(tx, upIncomeDailyStatisSQL, 12)
- if err != nil {
- log.Error("s.UpdateSQL error(%v)", err)
- return
- }
- // up_income
- upIncomeSQL := fmt.Sprintf(_txUpIncomeTable, "up_income", incIncomeStr, incIncomeStr, incIncomeStr, mid, date)
- err = s.updateSQL(tx, upIncomeSQL, 1)
- if err != nil {
- log.Error("s.UpdateSQL error(%v)", err)
- return
- }
- // up_income_weekly
- upIncomeWeeklySQL := fmt.Sprintf(_txUpIncomeTable, "up_income_weekly", incIncomeStr, incIncomeStr, incIncomeStr, mid, "2018-06-18")
- err = s.updateSQL(tx, upIncomeWeeklySQL, 1)
- if err != nil {
- log.Error("s.UpdateSQL error(%v)", err)
- return
- }
- // up_income_monthly
- upIncomeMonthlySQL := fmt.Sprintf(_txUpIncomeTable, "up_income_monthly", incIncomeStr, incIncomeStr, incIncomeStr, mid, "2018-06-01")
- err = s.updateSQL(tx, upIncomeMonthlySQL, 1)
- if err != nil {
- log.Error("s.UpdateSQL error(%v)", err)
- return
- }
- // up_av_statis 不需要修改
- // up_income_withdraw 不需要修改
- return
- }
- // UpdateWithdraw update up_account withdraw
- func (s *Service) UpdateWithdraw(c context.Context, oldDate, newDate string, count int64) (err error) {
- sql := fmt.Sprintf(_updateUpAccount, newDate, oldDate)
- return s.txUpdateSQL(c, sql, count)
- }
- // UpdateTagAdjust update tag adjust_type
- func (s *Service) UpdateTagAdjust(c context.Context, id int64) (err error) {
- sql := fmt.Sprintf(_updateTagAdjust, id)
- return s.txUpdateSQL(c, sql, 1)
- }
- // UpdateAccountType update account type
- func (s *Service) UpdateAccountType(c context.Context, mid int64, accType int) (err error) {
- sql := fmt.Sprintf(_txUpdateAccountType, accType, mid)
- return s.txUpdateSQL(c, sql, 1)
- }
- // UpdateUpAccountMoney update up_account
- func (s *Service) UpdateUpAccountMoney(c context.Context, mid int64, total, unwithdraw int64) (err error) {
- sql := fmt.Sprintf(_txUpdateUpAccountMoney, total, unwithdraw, mid)
- return s.txUpdateSQL(c, sql, 1)
- }
- // SyncUpPGC sync pgc up from up_info_video to up_info_column
- func (s *Service) SyncUpPGC(c context.Context) (err error) {
- ups, err := s.getAllUps(c, 2000)
- if err != nil {
- log.Error("s.getAllUps error(%v)", err)
- return
- }
- tx, err := s.dao.BeginTran(c)
- if err != nil {
- log.Error("s.dao.BeginTran error(%v)", err)
- return
- }
- for _, up := range ups {
- if up.AccountType == 2 {
- sql := fmt.Sprintf(_txInUpInfoPGCSQL, up.MID, "\""+up.Nickname+"\"", up.Fans, 2, 1, 2)
- err = s.insertIntoSQL(tx, sql, 0)
- if err != nil {
- log.Error("s.UpdateSQL(%s) error(%v)", _txInUpInfoPGCSQL, err)
- return
- }
- }
- }
- if err = tx.Commit(); err != nil {
- log.Error("tx.Commit error")
- }
- return
- }
- // FixAvBreach fix av_breach_record data
- func (s *Service) FixAvBreach(c context.Context, mid int64, date string, count int) (err error) {
- breachs, err := s.dao.GetAvBreach(c, date, date)
- if err != nil {
- log.Error("s.dao.GetAvBreach error(%v)", err)
- return
- }
- tx, err := s.dao.BeginTran(c)
- if err != nil {
- log.Error("s.dao.BeginTran error(%v)", err)
- return
- }
- avMap := make(map[int64]bool)
- var delCount int
- for _, b := range breachs {
- if b.MID != mid {
- continue
- }
- if avMap[b.AvID] {
- err = s.updateSQL(tx, fmt.Sprintf(_txDelAvBreachSQL, b.ID), 1)
- if err != nil {
- log.Error("s.UpdateSQL(%s) error(%v)", _txDelAvBreachSQL, err)
- return
- }
- delCount++
- } else {
- avMap[b.AvID] = true
- }
- }
- if count != delCount {
- tx.Rollback()
- log.Error("delete count error %d %d", count, delCount)
- err = fmt.Errorf("delete count error")
- return
- }
- if err = tx.Commit(); err != nil {
- log.Error("tx.Commit error")
- }
- return
- }
- // FixUpTotalIncome fix up_income total income
- func (s *Service) FixUpTotalIncome(c context.Context, table, date string, count int) (err error) {
- upIncome, err := s.GetUpIncome(c, "up_income", date)
- if err != nil {
- log.Error("s.GetUpIncome error(%v)", err)
- return
- }
- tx, err := s.dao.BeginTran(c)
- if err != nil {
- log.Error("s.dao.BeginTran error(%v)", err)
- return
- }
- trueCount := 0
- for _, up := range upIncome {
- err = s.updateSQL(tx, fmt.Sprintf(_txUpTotalIncomeSQL, table, up.Income, up.MID, date), 1)
- if err != nil {
- log.Error("s.UpdateSQL(%s) error(%v)", _txUpTotalIncomeSQL, err)
- return
- }
- trueCount++
- }
- if count != trueCount {
- tx.Rollback()
- log.Error("count error %d %d", count, trueCount)
- err = fmt.Errorf(" count error")
- return
- }
- if err = tx.Commit(); err != nil {
- log.Error("tx.Commit error")
- }
- return
- }
- // UpdateColumnTag update column tag
- func (s *Service) UpdateColumnTag(c context.Context, table string, oldTag string, newTag int, count int64) (err error) {
- sql := fmt.Sprintf(_txColumnTagSQL, table, newTag, oldTag)
- return s.txUpdateSQL(c, sql, count)
- }
- func (s *Service) txUpdateSQL(c context.Context, sql string, count int64) (err error) {
- tx, err := s.dao.BeginTran(c)
- if err != nil {
- log.Error("s.dao.BeginTran error(%v)", err)
- return
- }
- err = s.updateSQL(tx, sql, count)
- if err != nil {
- log.Error("s.UpdateSQL(%s) error(%v)", sql, err)
- return
- }
- if err = tx.Commit(); err != nil {
- log.Error("tx.Commit error")
- }
- return
- }
- func (s *Service) updateSQL(tx *sql.Tx, stmt string, count int64) error {
- rows, err := s.dao.UpdateDate(tx, stmt)
- if err != nil {
- tx.Rollback()
- log.Error("s.dao.UpdateDate (%s) error(%v)", stmt, err)
- return err
- }
- if count == 0 && rows <= 1 {
- return nil
- }
- if rows != count {
- tx.Rollback()
- return fmt.Errorf("%s : rows(%d) != count(%d) error", stmt, rows, count)
- }
- return nil
- }
- func (s *Service) insertIntoSQL(tx *sql.Tx, stmt string, count int) error {
- rows, err := s.dao.UpdateDate(tx, stmt)
- if err != nil {
- tx.Rollback()
- log.Error("s.dao.UpdateDate (%s) error(%v)", stmt, err)
- return err
- }
- if count == 0 {
- if rows > 2 {
- tx.Rollback()
- return fmt.Errorf("rows(%d) error", rows)
- }
- } else if rows != int64(count) {
- tx.Rollback()
- return fmt.Errorf("rows(%d) != count(%d) error", rows, count)
- }
- return nil
- }
- // SyncAvBaseIncome sync base_income to av_base_income by mid_date
- func (s *Service) SyncAvBaseIncome(c context.Context, table string) (err error) {
- data, err := s.avBaseIncomes(c, table)
- if err != nil {
- return
- }
- err = s.batchUpdateUpIncome(c, data, table)
- if err != nil {
- log.Error("batch update av base income error(%v)", err)
- }
- return
- }
- func (s *Service) avBaseIncomes(c context.Context, table string) (data []*model.AvBaseIncome, err error) {
- var id int64
- for {
- var abs []*model.AvBaseIncome
- abs, id, err = s.dao.GetAvBaseIncome(c, table, id, 2000)
- if err != nil {
- return
- }
- if len(abs) == 0 {
- break
- }
- for _, ab := range abs {
- if ab.AvBaseIncome > 0 {
- data = append(data, ab)
- }
- }
- }
- return
- }
- func (s *Service) batchUpdateUpIncome(c context.Context, us []*model.AvBaseIncome, table string) (err error) {
- var (
- buff = make([]*model.AvBaseIncome, 2000)
- buffEnd = 0
- )
- for _, u := range us {
- buff[buffEnd] = u
- buffEnd++
- if buffEnd >= 2000 {
- values := avBaseIncomeValues(buff[:buffEnd])
- buffEnd = 0
- _, err = s.dao.BatchUpdateUpIncome(c, table, values)
- if err != nil {
- return
- }
- }
- }
- if buffEnd > 0 {
- values := avBaseIncomeValues(buff[:buffEnd])
- buffEnd = 0
- _, err = s.dao.BatchUpdateUpIncome(c, table, values)
- }
- return
- }
- func avBaseIncomeValues(us []*model.AvBaseIncome) (values string) {
- var buf bytes.Buffer
- for _, u := range us {
- buf.WriteString("(")
- buf.WriteString(strconv.FormatInt(u.MID, 10))
- buf.WriteByte(',')
- buf.WriteString("'" + u.Date.Time().Format(_layout) + "'")
- buf.WriteByte(',')
- buf.WriteString(strconv.FormatInt(u.AvBaseIncome, 10))
- buf.WriteString(")")
- buf.WriteByte(',')
- }
- if buf.Len() > 0 {
- buf.Truncate(buf.Len() - 1)
- }
- values = buf.String()
- buf.Reset()
- return
- }
- // SyncCreditScore sync credit score
- func (s *Service) SyncCreditScore(c context.Context) (err error) {
- m := make(map[int64]int)
- am, err := s.getCreditScore(c, "video")
- if err != nil {
- return
- }
- for mid, score := range am {
- m[mid] = score
- }
- cm, err := s.getCreditScore(c, "column")
- if err != nil {
- return
- }
- for mid, score := range cm {
- m[mid] = score
- }
- return s.batchInsertCreditScore(c, m)
- }
- func (s *Service) batchInsertCreditScore(c context.Context, m map[int64]int) (err error) {
- batch := make(map[int64]int)
- for mid, score := range m {
- batch[mid] = score
- if len(batch) == 2000 {
- values := creditScoreValues(batch)
- _, err = s.dao.SyncCreditScore(c, values)
- if err != nil {
- return
- }
- batch = make(map[int64]int)
- }
- }
- if len(batch) > 0 {
- values := creditScoreValues(batch)
- _, err = s.dao.SyncCreditScore(c, values)
- }
- return
- }
- func creditScoreValues(m map[int64]int) (values string) {
- var buf bytes.Buffer
- for mid, score := range m {
- buf.WriteString("(")
- buf.WriteString(strconv.FormatInt(mid, 10))
- buf.WriteByte(',')
- buf.WriteString(strconv.Itoa(score))
- buf.WriteString(")")
- buf.WriteByte(',')
- }
- if buf.Len() > 0 {
- buf.Truncate(buf.Len() - 1)
- }
- values = buf.String()
- buf.Reset()
- return
- }
- func (s *Service) getCreditScore(c context.Context, table string) (m map[int64]int, err error) {
- m = make(map[int64]int)
- var id int64
- for {
- var sm map[int64]int
- sm, id, err = s.dao.GetCreditScore(c, table, id, 2000)
- if err != nil {
- return
- }
- if len(sm) == 0 {
- break
- }
- for k, v := range sm {
- m[k] = v
- }
- }
- return
- }
- // FixBgmIncomeStatis fix bgm income statis
- func (s *Service) FixBgmIncomeStatis(c context.Context) (err error) {
- total, err := s.dao.GetBGMIncome(c)
- if err != nil {
- return
- }
- for sid, income := range total {
- _, err = s.dao.InsertBGMIncomeStatis(c, sid, income)
- if err != nil {
- return
- }
- }
- return
- }
|