data.go 25 KB


  1. package service
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "strconv"
  7. "time"
  8. "go-common/app/job/main/growup/model"
  9. "go-common/library/database/sql"
  10. "go-common/library/log"
  11. )
  12. const (
  13. _updateUpAccount = "UPDATE up_account SET withdraw_date_version = '%s', total_unwithdraw_income=total_income - total_withdraw_income - exchange_income where withdraw_date_version = '%s' AND is_deleted = 0"
  14. _updateTagAdjust = "UPDATE tag_info SET adjust_type = 1 WHERE id = %d AND adjust_type = 0"
  15. // fix data
  16. _txUpdateAvIncome = "UPDATE av_income SET total_income = total_income %s, income = income %s WHERE av_id = %d AND mid = %d AND date = '%s'"
  17. _txUpdateAvIncomeStatis = "UPDATE av_income_statis SET total_income = total_income %s WHERE av_id = %d AND mid = %d"
  18. _txUpdateAvIncomeStatisIncome = "UPDATE av_income_%s_statis SET income = income %s WHERE category_id = %d AND cdate = '%s'"
  19. _txUpAccount = "UPDATE up_account SET total_income = total_income %s, total_unwithdraw_income = total_unwithdraw_income %s WHERE mid = %d AND is_deleted = 0"
  20. _txUpIncomeTable = "UPDATE %s SET av_income = av_income %s, total_income = total_income %s, income = income %s WHERE mid = %d AND date = '%s'"
  21. _txUpIncomeStatis = "UPDATE up_income_statis SET total_income = total_income %s WHERE mid = %d"
  22. _txUpIncomeDailyStatis = "UPDATE up_income_daily_statis set income = income %s WHERE cdate = '%s'"
  23. _txTagInsertUpIncomeDate = "INSERT INTO %s(mid,income,total_income,date) VALUES(%d,%d,%d,'%s') ON DUPLICATE KEY UPDATE total_income=VALUES(total_income),income=VALUES(income)"
  24. _txTagInsertUpIncomeStatis = "INSERT INTO up_income_statis(mid, total_income) VALUES(%d,%d) ON DUPLICATE KEY UPDATE total_income=VALUES(total_income)"
  25. _txTagInsertUpAccount = "INSERT INTO up_account(mid,has_sign_contract,state,total_income,total_unwithdraw_income,withdraw_date_version) VALUES(%d,1,1,%d,%d,'%s') ON DUPLICATE KEY UPDATE total_income=VALUES(total_income),total_unwithdraw_income=VALUES(total_unwithdraw_income)"
  26. _txTagUpIncomeDailyStatis = "UPDATE up_income_daily_statis SET ups = ups + %d WHERE cdate = '%s' AND money_section = %d"
  27. _txUpAvStatis = "INSERT INTO up_av_statis(mid,weekly_date,weekly_av_ids,monthly_date,monthly_av_ids) VALUES(%d,'2018-05-28','%s', '2018-05-01', '%s') ON DUPLICATE KEY UPDATE mid=VALUES(mid)"
  28. _txAddUpIncomeStatis = "UPDATE up_income_statis SET total_income = total_income + %d WHERE mid = %d"
  29. _txAddAvIncomeStatis = "UPDATE av_income_statis SET total_income = total_income + %d WHERE av_id = %d"
  30. _txAddUpIncome = "UPDATE %s SET total_income = total_income + %d, income = income + %d, av_income = av_income + %d WHERE mid = %d AND date = '%s'"
  31. _txAddAvIncome = "UPDATE av_income SET total_income = total_income + %d, income = income + %d WHERE av_id = %d AND date = '%s'"
  32. _txAddUpAccount = "UPDATE up_account SET total_income = total_income + %d, total_unwithdraw_income = total_unwithdraw_income + %d WHERE mid = %d"
  33. _txUpdateAccountType = "UPDATE up_info_video SET account_type=%d WHERE mid=%d"
  34. _txUpdateUpAccountMoney = "UPDATE up_account SET total_income = total_income + %d, total_unwithdraw_income = total_unwithdraw_income + %d WHERE mid = %d LIMIT 1"
  35. _txUpdateUpBaseIncome = "UPDATE up_income SET base_income=%d WHERE mid=%d AND date='%s'"
  36. _txInUpInfoPGCSQL = "INSERT INTO up_info_bgm(mid,nickname,fans,account_type,account_state,sign_type) values(%d,%s,%d,%d,%d,%d) ON DUPLICATE KEY UPDATE account_type=VALUES(account_type)"
  37. _txDelAvBreachSQL = "DELETE FROM av_breach_record WHERE id = %d LIMIT 1"
  38. _txUpTotalIncomeSQL = "UPDATE %s SET total_income = total_income - %d WHERE mid = %d AND date = '%s' LIMIT 1"
  39. _txColumnTagSQL = "UPDATE %s SET tag_id = %d WHERE date = '2018-08-19' AND tag_id in (%s) AND inc_charge > 0"
  40. _txUpdateBgmBaseIncome = "UPDATE up_income SET bgm_base_income=bgm_income WHERE mid=%d AND date='%s' AND bgm_base_income=0"
  41. _txDelData = "DELETE FROM %s LIMIT %d"
  42. )
  43. // DelDataLimit del up_bill
  44. func (s *Service) DelDataLimit(c context.Context, table string, count int64) (err error) {
  45. if table == "" {
  46. return
  47. }
  48. return s.txUpdateSQL(c, fmt.Sprintf(_txDelData, table, count), count)
  49. }
  50. // FixBgmBaseIncome fix bgm base income
  51. func (s *Service) FixBgmBaseIncome(c context.Context, mid int64, date string) (err error) {
  52. sql := fmt.Sprintf(_txUpdateBgmBaseIncome, mid, date)
  53. return s.txUpdateSQL(c, sql, 1)
  54. }
  55. // FixBaseIncome fix income
  56. func (s *Service) FixBaseIncome(c context.Context, base int64, mid int64, date string) (err error) {
  57. sql := fmt.Sprintf(_txUpdateUpBaseIncome, base, mid, date)
  58. return s.txUpdateSQL(c, sql, 1)
  59. }
  60. // FixIncome fix income
  61. func (s *Service) FixIncome(c context.Context) (err error) {
  62. date := time.Date(2018, 9, 10, 0, 0, 0, 0, time.Local)
  63. total, err := s.getAvIncome(c, date)
  64. if err != nil {
  65. log.Error("s.getAvIncome error(%v)", err)
  66. return
  67. }
  68. avIncomes := make([]*model.IncomeInfo, 0)
  69. for _, av := range total {
  70. if av.UploadTime.Unix() >= date.Unix() {
  71. avIncomes = append(avIncomes, av)
  72. }
  73. }
  74. if len(avIncomes) != 1768 {
  75. err = fmt.Errorf("get av_income(%d) != 1768", len(avIncomes))
  76. return
  77. }
  78. var tx *sql.Tx
  79. tx, err = s.dao.BeginTran(c)
  80. if err != nil {
  81. log.Error("s.dao.BeginTran error(%v)", err)
  82. return
  83. }
  84. for _, av := range avIncomes {
  85. // av_income_statis
  86. err = s.updateSQL(tx, fmt.Sprintf(_txAddAvIncomeStatis, av.Income, av.AVID), 1)
  87. if err != nil {
  88. log.Error("s.UpdateSQL(%s) error(%v)", _txAddAvIncomeStatis, err)
  89. return
  90. }
  91. // up_income_statis
  92. err = s.updateSQL(tx, fmt.Sprintf(_txAddUpIncomeStatis, av.Income, av.MID), 1)
  93. if err != nil {
  94. log.Error("s.UpdateSQL(%s) error(%v)", _txAddUpIncomeStatis, err)
  95. return
  96. }
  97. // av_income
  98. err = s.updateSQL(tx, fmt.Sprintf(_txAddAvIncome, av.Income, av.Income, av.AVID, "2018-09-10"), 1)
  99. if err != nil {
  100. log.Error("s.UpdateSQL(%s) error(%v)", _txAddAvIncome, err)
  101. return
  102. }
  103. // up_income
  104. err = s.updateSQL(tx, fmt.Sprintf(_txAddUpIncome, "up_income", av.Income, av.Income, av.Income, av.MID, "2018-09-10"), 1)
  105. if err != nil {
  106. log.Error("s.UpdateSQL(%s) error(%v)", _txAddUpIncome, err)
  107. return
  108. }
  109. // up_income_weekly
  110. err = s.updateSQL(tx, fmt.Sprintf(_txAddUpIncome, "up_income_weekly", av.Income, av.Income, av.Income, av.MID, "2018-09-10"), 1)
  111. if err != nil {
  112. log.Error("s.UpdateSQL(%s) error(%v)", _txAddUpIncome, err)
  113. return
  114. }
  115. // up_income_monthly
  116. err = s.updateSQL(tx, fmt.Sprintf(_txAddUpIncome, "up_income_monthly", av.Income, av.Income, av.Income, av.MID, "2018-09-01"), 1)
  117. if err != nil {
  118. log.Error("s.UpdateSQL(%s) error(%v)", _txAddUpIncome, err)
  119. return
  120. }
  121. // up_account
  122. err = s.updateSQL(tx, fmt.Sprintf(_txAddUpAccount, av.Income, av.Income, av.MID), 1)
  123. if err != nil {
  124. log.Error("s.UpdateSQL(%s) error(%v)", _txAddUpAccount, err)
  125. return
  126. }
  127. }
  128. if err = tx.Commit(); err != nil {
  129. log.Error("tx.Commit error")
  130. }
  131. return
  132. }
  133. // FixUpAvStatis fix up_av_statis
  134. func (s *Service) FixUpAvStatis(c context.Context, count int) (err error) {
  135. upIncome, err := s.GetUpIncome(c, "up_income", "2018-05-31")
  136. if err != nil {
  137. log.Error("s.GetUpIncome error(%v)", err)
  138. return
  139. }
  140. tx, err := s.dao.BeginTran(c)
  141. if err != nil {
  142. log.Error("s.dao.BeginTran error(%v)", err)
  143. return
  144. }
  145. // 301
  146. addCount := 0
  147. for _, up := range upIncome {
  148. if up.TotalIncome == 12700 && up.Income == 12700 {
  149. err = s.updateSQL(tx, fmt.Sprintf(_txUpAvStatis, up.MID, "", ""), 0)
  150. if err != nil {
  151. log.Error("s.UpdateSQL error(%v)", err)
  152. return
  153. }
  154. addCount++
  155. }
  156. }
  157. if count != addCount {
  158. err = fmt.Errorf("需要添加的record不匹配 %d:%d", count, addCount)
  159. tx.Rollback()
  160. return
  161. }
  162. if err = tx.Commit(); err != nil {
  163. log.Error("tx.Commit error")
  164. }
  165. return
  166. }
  167. // FixUpIncome fix up_income
  168. func (s *Service) FixUpIncome(c context.Context, date string, tagID int64, addCount, needAddIncome int) (err error) {
  169. upIncome, err := s.GetUpIncome(c, "up_income", date)
  170. if err != nil {
  171. log.Error("s.GetUpIncome error(%v)", err)
  172. return
  173. }
  174. upChargeRatio, err := s.dao.GetUpChargeRatio(c, tagID)
  175. if err != nil {
  176. log.Error("s.dao.GetUpChargeRatio error(%v)", err)
  177. return
  178. }
  179. for _, up := range upIncome {
  180. if _, ok := upChargeRatio[up.MID]; ok {
  181. delete(upChargeRatio, up.MID)
  182. }
  183. }
  184. if len(upChargeRatio) != addCount {
  185. err = fmt.Errorf("需要调节的up主数量不匹配 %d:%d", len(upChargeRatio), addCount)
  186. return
  187. }
  188. mids := make([]int64, 0)
  189. for mid := range upChargeRatio {
  190. mids = append(mids, mid)
  191. }
  192. upIncomeStatis, err := s.dao.GetUpIncomeStatis(c, mids)
  193. if err != nil {
  194. log.Error("s.dao.GetUpIncomeStatis error(%v)", err)
  195. return
  196. }
  197. upIncomeWeek, err := s.dao.GetUpIncomeDate(c, mids, "up_income_weekly", "2018-05-28")
  198. if err != nil {
  199. log.Error("s.dao.GetUpIncomeDate error(%v)", err)
  200. return
  201. }
  202. upIncomeMonth, err := s.dao.GetUpIncomeDate(c, mids, "up_income_monthly", "2018-05-01")
  203. if err != nil {
  204. log.Error("s.dao.GetUpIncomeDate error(%v)", err)
  205. return
  206. }
  207. tx, err := s.dao.BeginTran(c)
  208. if err != nil {
  209. log.Error("s.dao.BeginTran error(%v)", err)
  210. return
  211. }
  212. // start add
  213. var totalAddIncome int64
  214. for mid, ratio := range upChargeRatio {
  215. totalIncome := upIncomeStatis[mid]
  216. weekIncome := upIncomeWeek[mid]
  217. monthIncome := upIncomeMonth[mid]
  218. // up_income
  219. err = s.insertIntoSQL(tx, fmt.Sprintf(_txTagInsertUpIncomeDate, "up_income", mid, ratio, totalIncome+ratio, date), 1)
  220. if err != nil {
  221. log.Error("s.UpdateSQL error(%v)", err)
  222. return
  223. }
  224. // up_income_weekly todo
  225. weekDate := "2018-05-28"
  226. err = s.insertIntoSQL(tx, fmt.Sprintf(_txTagInsertUpIncomeDate, "up_income_weekly", mid, ratio+weekIncome, totalIncome+ratio, weekDate), 0)
  227. if err != nil {
  228. log.Error("s.UpdateSQL error(%v)", err)
  229. return
  230. }
  231. // up_income_monthly todo
  232. monthDate := "2018-05-01"
  233. err = s.insertIntoSQL(tx, fmt.Sprintf(_txTagInsertUpIncomeDate, "up_income_monthly", mid, ratio+monthIncome, totalIncome+ratio, monthDate), 0)
  234. if err != nil {
  235. log.Error("s.UpdateSQL error(%v)", err)
  236. return
  237. }
  238. // up_income_statis
  239. err = s.insertIntoSQL(tx, fmt.Sprintf(_txTagInsertUpIncomeStatis, mid, ratio+totalIncome), 0)
  240. if err != nil {
  241. log.Error("s.insertIntoSQL error(%v)", err)
  242. return
  243. }
  244. // up_account
  245. err = s.insertIntoSQL(tx, fmt.Sprintf(_txTagInsertUpAccount, mid, ratio+totalIncome, ratio+totalIncome, "2018-04"), 0)
  246. if err != nil {
  247. log.Error("s.insertIntoSQL error(%v)", err)
  248. return
  249. }
  250. totalAddIncome += ratio
  251. }
  252. // up_income_daily_statis
  253. if totalAddIncome != int64(needAddIncome) {
  254. err = fmt.Errorf("需要调节的up主总收入不匹配 %d:%d", totalAddIncome, needAddIncome)
  255. tx.Rollback()
  256. return
  257. }
  258. add := fmt.Sprintf(" + %d", totalAddIncome)
  259. err = s.updateSQL(tx, fmt.Sprintf(_txUpIncomeDailyStatis, add, date), 12)
  260. if err != nil {
  261. log.Error("s.UpdateSQL error(%v)", err)
  262. return
  263. }
  264. err = s.updateSQL(tx, fmt.Sprintf(_txTagUpIncomeDailyStatis, len(upChargeRatio), date, 3), 1)
  265. if err != nil {
  266. log.Error("s.UpdateSQL error(%v)", err)
  267. return
  268. }
  269. if err = tx.Commit(); err != nil {
  270. log.Error("tx.Commit error")
  271. }
  272. return
  273. }
  274. func (s *Service) getAllAvRatio(c context.Context, limit int64) (rs map[int64]*model.AvChargeRatio, err error) {
  275. rs = make(map[int64]*model.AvChargeRatio)
  276. var id int64
  277. for {
  278. var ros map[int64]*model.AvChargeRatio
  279. ros, id, err = s.dao.AvChargeRatio(c, id, limit)
  280. if err != nil {
  281. return
  282. }
  283. if len(ros) == 0 {
  284. break
  285. }
  286. for k, v := range ros {
  287. rs[k] = v
  288. }
  289. }
  290. return
  291. }
  292. // UpdateTagIncome update tag_Info income
  293. func (s *Service) UpdateTagIncome(c context.Context, date string) (err error) {
  294. avRatio, err := s.getAllAvRatio(c, 2000)
  295. if err != nil {
  296. log.Error("s.getAllAvRatio error(%v)", err)
  297. return
  298. }
  299. log.Info("get avratios:%d", len(avRatio))
  300. updateDate := time.Date(2018, 6, 24, 0, 0, 0, 0, time.Local)
  301. avIncome, err := s.getAvIncome(c, updateDate)
  302. if err != nil {
  303. log.Error("s.getAvIncome error(%v)", err)
  304. return
  305. }
  306. log.Info("get av_income:%d ", len(avIncome))
  307. err = s.updateIncome(c, avIncome, avRatio)
  308. if err != nil {
  309. log.Error("s.updateIncome error(%v)", err)
  310. return
  311. }
  312. return
  313. }
  314. // GetTrueAvsIncome get true av_income
  315. func (s *Service) GetTrueAvsIncome(c context.Context, mids []int64, date string) (avs map[int64]*model.Patch, err error) {
  316. avs = make(map[int64]*model.Patch)
  317. for _, mid := range mids {
  318. var av map[int64]*model.Patch
  319. av, err = s.AvIncomes(c, mid, date)
  320. if err != nil {
  321. return
  322. }
  323. for key, val := range av {
  324. avs[key] = val
  325. }
  326. }
  327. return
  328. }
  329. func (s *Service) updateIncome(c context.Context, avIncome []*model.IncomeInfo, avRatio map[int64]*model.AvChargeRatio) (err error) {
  330. trueAvs := make([]*model.IncomeInfo, 0)
  331. uploadTime := time.Date(2018, 6, 24, 0, 0, 0, 0, time.Local)
  332. for _, av := range avIncome {
  333. if !uploadTime.After(av.UploadTime) {
  334. if _, ok := avRatio[av.AVID]; !ok {
  335. trueAvs = append(trueAvs, av)
  336. }
  337. }
  338. }
  339. if len(trueAvs) != 1856 {
  340. err = fmt.Errorf("实际被作用稿件(%d) != 1856", len(trueAvs))
  341. return
  342. }
  343. tx, err := s.dao.BeginTran(c)
  344. if err != nil {
  345. log.Error("s.dao.BeginTran error(%v)", err)
  346. return
  347. }
  348. for _, av := range trueAvs {
  349. var incIncome, categoryID int64
  350. incIncome, categoryID, err = s.dao.AvDailyIncCharge(c, av.AVID)
  351. if err != nil {
  352. log.Error("s.dao.AvDailyIncCharge avid(%d) error(%v)", av.MID, err)
  353. return
  354. }
  355. err = s.TxUpdateIncome(tx, av.AVID, av.MID, categoryID, incIncome)
  356. if err != nil {
  357. log.Error("ERROR(%v) avid(%d), mid(%d)", err, av.AVID, av.MID)
  358. return
  359. }
  360. }
  361. if err = tx.Commit(); err != nil {
  362. log.Error("tx.Commit error")
  363. }
  364. return
  365. }
  366. // TxUpdateIncome update creative income
  367. func (s *Service) TxUpdateIncome(tx *sql.Tx, avID, mid, categoryID int64, incIncome int64) (err error) {
  368. if incIncome <= 0 {
  369. return
  370. }
  371. var incIncomeStr string
  372. if incIncome > 0 {
  373. incIncomeStr = fmt.Sprintf("+ %d", incIncome)
  374. }
  375. date := "2018-06-24"
  376. // av_income
  377. avIncomeSQL := fmt.Sprintf(_txUpdateAvIncome, incIncomeStr, incIncomeStr, avID, mid, date)
  378. err = s.updateSQL(tx, avIncomeSQL, 1)
  379. if err != nil {
  380. log.Error("s.UpdateSQL error(%v)", err)
  381. return
  382. }
  383. // av_income_statis
  384. avIncomeStatisSQL := fmt.Sprintf(_txUpdateAvIncomeStatis, incIncomeStr, avID, mid)
  385. err = s.updateSQL(tx, avIncomeStatisSQL, 1)
  386. if err != nil {
  387. log.Error("s.UpdateSQL error(%v)", err)
  388. return
  389. }
  390. // av_income_daily_statis
  391. avIncomeDailyStatisIncome := fmt.Sprintf(_txUpdateAvIncomeStatisIncome, "daily", incIncomeStr, categoryID, date)
  392. err = s.updateSQL(tx, avIncomeDailyStatisIncome, 12)
  393. if err != nil {
  394. log.Error("s.UpdateSQL error(%v)", err)
  395. return
  396. }
  397. // av_income_weekly_statis
  398. avIncomeWeeklyStatisIncome := fmt.Sprintf(_txUpdateAvIncomeStatisIncome, "weekly", incIncomeStr, categoryID, "2018-06-18")
  399. err = s.updateSQL(tx, avIncomeWeeklyStatisIncome, 12)
  400. if err != nil {
  401. log.Error("s.UpdateSQL error(%v)", err)
  402. return
  403. }
  404. // av_income_monthly_statis
  405. avIncomeMonthlyStatisIncome := fmt.Sprintf(_txUpdateAvIncomeStatisIncome, "monthly", incIncomeStr, categoryID, "2018-06-01")
  406. err = s.updateSQL(tx, avIncomeMonthlyStatisIncome, 12)
  407. if err != nil {
  408. log.Error("s.UpdateSQL error(%v)", err)
  409. return
  410. }
  411. // up_account
  412. upAcccountSQL := fmt.Sprintf(_txUpAccount, incIncomeStr, incIncomeStr, mid)
  413. err = s.updateSQL(tx, upAcccountSQL, 1)
  414. if err != nil {
  415. log.Error("s.UpdateSQL error(%v)", err)
  416. return
  417. }
  418. // up_income_statis
  419. upIncomeStatisSQL := fmt.Sprintf(_txUpIncomeStatis, incIncomeStr, mid)
  420. err = s.updateSQL(tx, upIncomeStatisSQL, 1)
  421. if err != nil {
  422. log.Error("s.UpdateSQL error(%v)", err)
  423. return
  424. }
  425. // up_income_daily_statis
  426. upIncomeDailyStatisSQL := fmt.Sprintf(_txUpIncomeDailyStatis, incIncomeStr, date)
  427. err = s.updateSQL(tx, upIncomeDailyStatisSQL, 12)
  428. if err != nil {
  429. log.Error("s.UpdateSQL error(%v)", err)
  430. return
  431. }
  432. // up_income
  433. upIncomeSQL := fmt.Sprintf(_txUpIncomeTable, "up_income", incIncomeStr, incIncomeStr, incIncomeStr, mid, date)
  434. err = s.updateSQL(tx, upIncomeSQL, 1)
  435. if err != nil {
  436. log.Error("s.UpdateSQL error(%v)", err)
  437. return
  438. }
  439. // up_income_weekly
  440. upIncomeWeeklySQL := fmt.Sprintf(_txUpIncomeTable, "up_income_weekly", incIncomeStr, incIncomeStr, incIncomeStr, mid, "2018-06-18")
  441. err = s.updateSQL(tx, upIncomeWeeklySQL, 1)
  442. if err != nil {
  443. log.Error("s.UpdateSQL error(%v)", err)
  444. return
  445. }
  446. // up_income_monthly
  447. upIncomeMonthlySQL := fmt.Sprintf(_txUpIncomeTable, "up_income_monthly", incIncomeStr, incIncomeStr, incIncomeStr, mid, "2018-06-01")
  448. err = s.updateSQL(tx, upIncomeMonthlySQL, 1)
  449. if err != nil {
  450. log.Error("s.UpdateSQL error(%v)", err)
  451. return
  452. }
  453. // up_av_statis 不需要修改
  454. // up_income_withdraw 不需要修改
  455. return
  456. }
  457. // UpdateWithdraw update up_account withdraw
  458. func (s *Service) UpdateWithdraw(c context.Context, oldDate, newDate string, count int64) (err error) {
  459. sql := fmt.Sprintf(_updateUpAccount, newDate, oldDate)
  460. return s.txUpdateSQL(c, sql, count)
  461. }
  462. // UpdateTagAdjust update tag adjust_type
  463. func (s *Service) UpdateTagAdjust(c context.Context, id int64) (err error) {
  464. sql := fmt.Sprintf(_updateTagAdjust, id)
  465. return s.txUpdateSQL(c, sql, 1)
  466. }
  467. // UpdateAccountType update account type
  468. func (s *Service) UpdateAccountType(c context.Context, mid int64, accType int) (err error) {
  469. sql := fmt.Sprintf(_txUpdateAccountType, accType, mid)
  470. return s.txUpdateSQL(c, sql, 1)
  471. }
  472. // UpdateUpAccountMoney update up_account
  473. func (s *Service) UpdateUpAccountMoney(c context.Context, mid int64, total, unwithdraw int64) (err error) {
  474. sql := fmt.Sprintf(_txUpdateUpAccountMoney, total, unwithdraw, mid)
  475. return s.txUpdateSQL(c, sql, 1)
  476. }
  477. // SyncUpPGC sync pgc up from up_info_video to up_info_column
  478. func (s *Service) SyncUpPGC(c context.Context) (err error) {
  479. ups, err := s.getAllUps(c, 2000)
  480. if err != nil {
  481. log.Error("s.getAllUps error(%v)", err)
  482. return
  483. }
  484. tx, err := s.dao.BeginTran(c)
  485. if err != nil {
  486. log.Error("s.dao.BeginTran error(%v)", err)
  487. return
  488. }
  489. for _, up := range ups {
  490. if up.AccountType == 2 {
  491. sql := fmt.Sprintf(_txInUpInfoPGCSQL, up.MID, "\""+up.Nickname+"\"", up.Fans, 2, 1, 2)
  492. err = s.insertIntoSQL(tx, sql, 0)
  493. if err != nil {
  494. log.Error("s.UpdateSQL(%s) error(%v)", _txInUpInfoPGCSQL, err)
  495. return
  496. }
  497. }
  498. }
  499. if err = tx.Commit(); err != nil {
  500. log.Error("tx.Commit error")
  501. }
  502. return
  503. }
  504. // FixAvBreach fix av_breach_record data
  505. func (s *Service) FixAvBreach(c context.Context, mid int64, date string, count int) (err error) {
  506. breachs, err := s.dao.GetAvBreach(c, date, date)
  507. if err != nil {
  508. log.Error("s.dao.GetAvBreach error(%v)", err)
  509. return
  510. }
  511. tx, err := s.dao.BeginTran(c)
  512. if err != nil {
  513. log.Error("s.dao.BeginTran error(%v)", err)
  514. return
  515. }
  516. avMap := make(map[int64]bool)
  517. var delCount int
  518. for _, b := range breachs {
  519. if b.MID != mid {
  520. continue
  521. }
  522. if avMap[b.AvID] {
  523. err = s.updateSQL(tx, fmt.Sprintf(_txDelAvBreachSQL, b.ID), 1)
  524. if err != nil {
  525. log.Error("s.UpdateSQL(%s) error(%v)", _txDelAvBreachSQL, err)
  526. return
  527. }
  528. delCount++
  529. } else {
  530. avMap[b.AvID] = true
  531. }
  532. }
  533. if count != delCount {
  534. tx.Rollback()
  535. log.Error("delete count error %d %d", count, delCount)
  536. err = fmt.Errorf("delete count error")
  537. return
  538. }
  539. if err = tx.Commit(); err != nil {
  540. log.Error("tx.Commit error")
  541. }
  542. return
  543. }
  544. // FixUpTotalIncome fix up_income total income
  545. func (s *Service) FixUpTotalIncome(c context.Context, table, date string, count int) (err error) {
  546. upIncome, err := s.GetUpIncome(c, "up_income", date)
  547. if err != nil {
  548. log.Error("s.GetUpIncome error(%v)", err)
  549. return
  550. }
  551. tx, err := s.dao.BeginTran(c)
  552. if err != nil {
  553. log.Error("s.dao.BeginTran error(%v)", err)
  554. return
  555. }
  556. trueCount := 0
  557. for _, up := range upIncome {
  558. err = s.updateSQL(tx, fmt.Sprintf(_txUpTotalIncomeSQL, table, up.Income, up.MID, date), 1)
  559. if err != nil {
  560. log.Error("s.UpdateSQL(%s) error(%v)", _txUpTotalIncomeSQL, err)
  561. return
  562. }
  563. trueCount++
  564. }
  565. if count != trueCount {
  566. tx.Rollback()
  567. log.Error("count error %d %d", count, trueCount)
  568. err = fmt.Errorf(" count error")
  569. return
  570. }
  571. if err = tx.Commit(); err != nil {
  572. log.Error("tx.Commit error")
  573. }
  574. return
  575. }
  576. // UpdateColumnTag update column tag
  577. func (s *Service) UpdateColumnTag(c context.Context, table string, oldTag string, newTag int, count int64) (err error) {
  578. sql := fmt.Sprintf(_txColumnTagSQL, table, newTag, oldTag)
  579. return s.txUpdateSQL(c, sql, count)
  580. }
  581. func (s *Service) txUpdateSQL(c context.Context, sql string, count int64) (err error) {
  582. tx, err := s.dao.BeginTran(c)
  583. if err != nil {
  584. log.Error("s.dao.BeginTran error(%v)", err)
  585. return
  586. }
  587. err = s.updateSQL(tx, sql, count)
  588. if err != nil {
  589. log.Error("s.UpdateSQL(%s) error(%v)", sql, err)
  590. return
  591. }
  592. if err = tx.Commit(); err != nil {
  593. log.Error("tx.Commit error")
  594. }
  595. return
  596. }
  597. func (s *Service) updateSQL(tx *sql.Tx, stmt string, count int64) error {
  598. rows, err := s.dao.UpdateDate(tx, stmt)
  599. if err != nil {
  600. tx.Rollback()
  601. log.Error("s.dao.UpdateDate (%s) error(%v)", stmt, err)
  602. return err
  603. }
  604. if count == 0 && rows <= 1 {
  605. return nil
  606. }
  607. if rows != count {
  608. tx.Rollback()
  609. return fmt.Errorf("%s : rows(%d) != count(%d) error", stmt, rows, count)
  610. }
  611. return nil
  612. }
  613. func (s *Service) insertIntoSQL(tx *sql.Tx, stmt string, count int) error {
  614. rows, err := s.dao.UpdateDate(tx, stmt)
  615. if err != nil {
  616. tx.Rollback()
  617. log.Error("s.dao.UpdateDate (%s) error(%v)", stmt, err)
  618. return err
  619. }
  620. if count == 0 {
  621. if rows > 2 {
  622. tx.Rollback()
  623. return fmt.Errorf("rows(%d) error", rows)
  624. }
  625. } else if rows != int64(count) {
  626. tx.Rollback()
  627. return fmt.Errorf("rows(%d) != count(%d) error", rows, count)
  628. }
  629. return nil
  630. }
  631. // SyncAvBaseIncome sync base_income to av_base_income by mid_date
  632. func (s *Service) SyncAvBaseIncome(c context.Context, table string) (err error) {
  633. data, err := s.avBaseIncomes(c, table)
  634. if err != nil {
  635. return
  636. }
  637. err = s.batchUpdateUpIncome(c, data, table)
  638. if err != nil {
  639. log.Error("batch update av base income error(%v)", err)
  640. }
  641. return
  642. }
  643. func (s *Service) avBaseIncomes(c context.Context, table string) (data []*model.AvBaseIncome, err error) {
  644. var id int64
  645. for {
  646. var abs []*model.AvBaseIncome
  647. abs, id, err = s.dao.GetAvBaseIncome(c, table, id, 2000)
  648. if err != nil {
  649. return
  650. }
  651. if len(abs) == 0 {
  652. break
  653. }
  654. for _, ab := range abs {
  655. if ab.AvBaseIncome > 0 {
  656. data = append(data, ab)
  657. }
  658. }
  659. }
  660. return
  661. }
  662. func (s *Service) batchUpdateUpIncome(c context.Context, us []*model.AvBaseIncome, table string) (err error) {
  663. var (
  664. buff = make([]*model.AvBaseIncome, 2000)
  665. buffEnd = 0
  666. )
  667. for _, u := range us {
  668. buff[buffEnd] = u
  669. buffEnd++
  670. if buffEnd >= 2000 {
  671. values := avBaseIncomeValues(buff[:buffEnd])
  672. buffEnd = 0
  673. _, err = s.dao.BatchUpdateUpIncome(c, table, values)
  674. if err != nil {
  675. return
  676. }
  677. }
  678. }
  679. if buffEnd > 0 {
  680. values := avBaseIncomeValues(buff[:buffEnd])
  681. buffEnd = 0
  682. _, err = s.dao.BatchUpdateUpIncome(c, table, values)
  683. }
  684. return
  685. }
  686. func avBaseIncomeValues(us []*model.AvBaseIncome) (values string) {
  687. var buf bytes.Buffer
  688. for _, u := range us {
  689. buf.WriteString("(")
  690. buf.WriteString(strconv.FormatInt(u.MID, 10))
  691. buf.WriteByte(',')
  692. buf.WriteString("'" + u.Date.Time().Format(_layout) + "'")
  693. buf.WriteByte(',')
  694. buf.WriteString(strconv.FormatInt(u.AvBaseIncome, 10))
  695. buf.WriteString(")")
  696. buf.WriteByte(',')
  697. }
  698. if buf.Len() > 0 {
  699. buf.Truncate(buf.Len() - 1)
  700. }
  701. values = buf.String()
  702. buf.Reset()
  703. return
  704. }
  705. // SyncCreditScore sync credit score
  706. func (s *Service) SyncCreditScore(c context.Context) (err error) {
  707. m := make(map[int64]int)
  708. am, err := s.getCreditScore(c, "video")
  709. if err != nil {
  710. return
  711. }
  712. for mid, score := range am {
  713. m[mid] = score
  714. }
  715. cm, err := s.getCreditScore(c, "column")
  716. if err != nil {
  717. return
  718. }
  719. for mid, score := range cm {
  720. m[mid] = score
  721. }
  722. return s.batchInsertCreditScore(c, m)
  723. }
  724. func (s *Service) batchInsertCreditScore(c context.Context, m map[int64]int) (err error) {
  725. batch := make(map[int64]int)
  726. for mid, score := range m {
  727. batch[mid] = score
  728. if len(batch) == 2000 {
  729. values := creditScoreValues(batch)
  730. _, err = s.dao.SyncCreditScore(c, values)
  731. if err != nil {
  732. return
  733. }
  734. batch = make(map[int64]int)
  735. }
  736. }
  737. if len(batch) > 0 {
  738. values := creditScoreValues(batch)
  739. _, err = s.dao.SyncCreditScore(c, values)
  740. }
  741. return
  742. }
  743. func creditScoreValues(m map[int64]int) (values string) {
  744. var buf bytes.Buffer
  745. for mid, score := range m {
  746. buf.WriteString("(")
  747. buf.WriteString(strconv.FormatInt(mid, 10))
  748. buf.WriteByte(',')
  749. buf.WriteString(strconv.Itoa(score))
  750. buf.WriteString(")")
  751. buf.WriteByte(',')
  752. }
  753. if buf.Len() > 0 {
  754. buf.Truncate(buf.Len() - 1)
  755. }
  756. values = buf.String()
  757. buf.Reset()
  758. return
  759. }
  760. func (s *Service) getCreditScore(c context.Context, table string) (m map[int64]int, err error) {
  761. m = make(map[int64]int)
  762. var id int64
  763. for {
  764. var sm map[int64]int
  765. sm, id, err = s.dao.GetCreditScore(c, table, id, 2000)
  766. if err != nil {
  767. return
  768. }
  769. if len(sm) == 0 {
  770. break
  771. }
  772. for k, v := range sm {
  773. m[k] = v
  774. }
  775. }
  776. return
  777. }
  778. // FixBgmIncomeStatis fix bgm income statis
  779. func (s *Service) FixBgmIncomeStatis(c context.Context) (err error) {
  780. total, err := s.dao.GetBGMIncome(c)
  781. if err != nil {
  782. return
  783. }
  784. for sid, income := range total {
  785. _, err = s.dao.InsertBGMIncomeStatis(c, sid, income)
  786. if err != nil {
  787. return
  788. }
  789. }
  790. return
  791. }