income.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485
  1. package service
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "sort"
  7. "strconv"
  8. "time"
  9. "go-common/app/job/main/growup/model"
  10. "go-common/library/database/sql"
  11. "go-common/library/log"
  12. )
  13. const (
  14. _layout = "2006-01-02"
  15. )
  16. // InsertTagIncome insert up_tag_income.
  17. func (s *Service) InsertTagIncome(c context.Context, date time.Time) (err error) {
  18. infos, err := s.getTagAvInfo(c, date)
  19. if err != nil {
  20. log.Error("s.InsertTagIncome getTagAVInfo error(%v)", err)
  21. return
  22. }
  23. tx, err := s.dao.BeginTran(c)
  24. if err != nil {
  25. log.Error("s.InsertTagIncome dao.BeginTran error(%v)", err)
  26. return
  27. }
  28. if err = s.insertTagIncome(c, tx, infos); err != nil {
  29. tx.Rollback()
  30. log.Error("s.InsertTagIncome insertTagIncome error(%v)", err)
  31. return
  32. }
  33. if err = s.updateTagInfo(tx, infos); err != nil {
  34. tx.Rollback()
  35. log.Error("s.InsertTagIncome updateTagInfo error(%v)", err)
  36. return
  37. }
  38. if err = s.updateTagUpInfo(tx, infos); err != nil {
  39. tx.Rollback()
  40. log.Error("s.InsertTagIncome updateTagUpInfo error(%v)", err)
  41. return
  42. }
  43. if err = tx.Commit(); err != nil {
  44. log.Error("s.InsertTagIncome tx.Commit error")
  45. return
  46. }
  47. return
  48. }
  49. func (s *Service) getTagAvInfo(c context.Context, date time.Time) (infos []*model.TagAvIncome, err error) {
  50. var (
  51. from, limit int64
  52. av, avs []*model.ActivityAVInfo
  53. )
  54. from, limit = 0, 3000
  55. for {
  56. av, err = s.dao.GetAvTagRatio(c, from, limit)
  57. if err != nil {
  58. log.Error("s.getTagAvInfo dao.GetAvTagRatio error(%v)", err)
  59. return
  60. }
  61. avs = append(avs, av...)
  62. if int64(len(av)) < limit {
  63. break
  64. }
  65. from = av[len(av)-1].MID
  66. }
  67. for _, a := range avs {
  68. var income *model.TagAvIncome
  69. income, err = s.dao.GetAvIncomeInfo(c, a.AVID, date)
  70. if err != nil {
  71. log.Error("s.GetAvIncomes dao.GetAvIncomeInfo error(%v)", err)
  72. return
  73. }
  74. if income == nil {
  75. continue
  76. }
  77. income.TagID = a.TagID
  78. infos = append(infos, income)
  79. }
  80. return
  81. }
  82. func (s *Service) getTagAVLatestTotalIncome(c context.Context, avID, tagID int64) (totalIncome int, err error) {
  83. infos, err := s.dao.GetTagAvTotalIncome(c, tagID, avID)
  84. if err != nil {
  85. log.Error("s.getTagAVLatestTotalIncome dao.GetTagAvTotalIncome error(%v)", err)
  86. return
  87. }
  88. for _, info := range infos {
  89. if int(info.TotalIncome) > totalIncome {
  90. totalIncome = int(info.TotalIncome)
  91. }
  92. }
  93. return
  94. }
  95. func (s *Service) insertTagIncome(c context.Context, tx *sql.Tx, infos []*model.TagAvIncome) (err error) {
  96. var buf bytes.Buffer
  97. var cnt, totalIncome int
  98. var rows, totalRows int64
  99. for _, info := range infos {
  100. totalIncome, err = s.getTagAVLatestTotalIncome(c, info.TagID, info.AVID)
  101. if err != nil {
  102. log.Error("s.insertTagIncome dao.GetTagAvTotalIncome error(%v)", err)
  103. return
  104. }
  105. buf.WriteString("(")
  106. buf.WriteString(strconv.FormatInt(info.TagID, 10))
  107. buf.WriteString(",")
  108. buf.WriteString(strconv.FormatInt(info.MID, 10))
  109. buf.WriteString(",")
  110. buf.WriteString(strconv.FormatInt(info.AVID, 10))
  111. buf.WriteString(",")
  112. buf.WriteString(strconv.Itoa(info.Income))
  113. buf.WriteString(",")
  114. buf.WriteString(strconv.Itoa(totalIncome + info.Income))
  115. buf.WriteString(",")
  116. buf.WriteString("'")
  117. buf.WriteString(strconv.Itoa(info.Date.Year()))
  118. buf.WriteString("-")
  119. if int(info.Date.Month()) < 10 {
  120. buf.WriteString("0")
  121. }
  122. buf.WriteString(strconv.Itoa(int(info.Date.Month())))
  123. buf.WriteString("-")
  124. if info.Date.Day() < 10 {
  125. buf.WriteString("0")
  126. }
  127. buf.WriteString(strconv.Itoa(info.Date.Day()))
  128. buf.WriteString("'")
  129. buf.WriteString("),")
  130. cnt++
  131. if cnt%1000 == 0 {
  132. buf.Truncate(buf.Len() - 1)
  133. rows, err = s.dao.TxInsertTagIncome(tx, buf.String())
  134. if err != nil {
  135. log.Error("s.InsertTagIncome dao.TxInsertTagIncome error(%v)", err)
  136. return
  137. }
  138. totalRows += rows
  139. buf.Reset()
  140. }
  141. }
  142. if buf.Len() > 0 {
  143. buf.Truncate(buf.Len() - 1)
  144. rows, err = s.dao.TxInsertTagIncome(tx, buf.String())
  145. if err != nil {
  146. log.Error("s.InsertTagIncome dao.TxInsertTagIncome error(%v)", err)
  147. return
  148. }
  149. totalRows += rows
  150. }
  151. log.Info("s.InsertTagIncome insert up_tag_income (%d) rows", totalRows)
  152. return
  153. }
  154. // updateTagInfo update tag_info total_income.
  155. func (s *Service) updateTagInfo(tx *sql.Tx, infos []*model.TagAvIncome) (err error) {
  156. tim := make(map[int64]int) // key-value: tag_id-total av income.
  157. for _, info := range infos {
  158. tim[info.TagID] += info.Income
  159. }
  160. for k, v := range tim {
  161. query := "total_income = total_income + "
  162. query += strconv.Itoa(v)
  163. _, err = s.dao.TxUpdateTagInfo(tx, k, query)
  164. if err != nil {
  165. log.Error("s.updateTagInfo dao.UpdateTagInfo error(%v)", err)
  166. return
  167. }
  168. }
  169. return
  170. }
  171. // updateTagUpInfo update tag_up_info totalIncome.
  172. func (s *Service) updateTagUpInfo(tx *sql.Tx, infos []*model.TagAvIncome) (err error) {
  173. utm := make(map[int64]*model.TagAvIncome) // key-value: mid-totalIncome
  174. for _, info := range infos {
  175. _, ok := utm[info.MID]
  176. if !ok {
  177. a := &model.TagAvIncome{TagID: info.TagID, MID: info.MID, TotalIncome: info.Income}
  178. utm[info.MID] = a
  179. } else {
  180. utm[info.MID].TotalIncome += info.Income
  181. }
  182. }
  183. cnt := 0
  184. var buf bytes.Buffer
  185. for _, v := range utm {
  186. buf.WriteString("(")
  187. buf.WriteString(strconv.FormatInt(v.TagID, 10))
  188. buf.WriteString(",")
  189. buf.WriteString(strconv.FormatInt(v.MID, 10))
  190. buf.WriteString(",")
  191. buf.WriteString(strconv.Itoa(v.TotalIncome))
  192. buf.WriteString("),")
  193. cnt++
  194. if cnt%2000 == 0 {
  195. buf.Truncate(buf.Len() - 1)
  196. _, err = s.dao.TxUpdateTagUpInfo(tx, buf.String())
  197. if err != nil {
  198. log.Error("s.updateTagUpInfo dao.UpdateTagUpInfo error(%v)", err)
  199. return
  200. }
  201. buf.Reset()
  202. }
  203. }
  204. if buf.Len() > 0 {
  205. buf.Truncate(buf.Len() - 1)
  206. _, err = s.dao.TxUpdateTagUpInfo(tx, buf.String())
  207. if err != nil {
  208. log.Error("s.updateTagUpInfo dao.UpdateTagUpInfo error(%v)", err)
  209. return
  210. }
  211. }
  212. return
  213. }
  214. // GetAvIncomeStatis get av monthly income
  215. func (s *Service) GetAvIncomeStatis(c context.Context, date string) error {
  216. d, _ := time.Parse(_layout, date)
  217. endTime := d.AddDate(0, 1, 0).Format(_layout)
  218. avs, err := s.GetAvIncome(c)
  219. if err != nil {
  220. log.Error("s.GetAvIncome error(%v)", err)
  221. return err
  222. }
  223. log.Info("GetAvIncomeStatis get %d avs", len(avs))
  224. avsMap := make(map[int64]*model.AvIncome)
  225. avIncomeStatis(avsMap, avs, date, endTime)
  226. data := make([]*model.AvIncome, 0)
  227. for _, av := range avsMap {
  228. data = append(data, av)
  229. }
  230. sort.Slice(data, func(i, j int) bool {
  231. return data[i].Income > data[j].Income
  232. })
  233. log.Info("GetAvIncomeStatis calculate success: %d", len(data))
  234. return s.batchSend(data)
  235. }
  236. func (s *Service) batchSend(data []*model.AvIncome) error {
  237. fileNo, start, batchSize := 0, 0, 50000
  238. for {
  239. if start+batchSize >= len(data) {
  240. batchSize = len(data) - start
  241. }
  242. if batchSize <= 0 {
  243. break
  244. }
  245. records := formatAvIncome(data[start : start+batchSize])
  246. filename := fmt.Sprintf("av_statis0%d", fileNo)
  247. err := WriteCSV(records, filename)
  248. if err != nil {
  249. return err
  250. }
  251. err = s.email.SendMailAttach(filename, "稿件月收入", []string{"shaozhenyu@bilibili.com"})
  252. if err != nil {
  253. return err
  254. }
  255. fileNo++
  256. start += batchSize
  257. }
  258. return nil
  259. }
  260. func avIncomeStatis(avsMap map[int64]*model.AvIncome, avs []*model.AvIncome, fromTime, toTime string) {
  261. for _, av := range avs {
  262. d := av.Date.Time().Format(_layout)
  263. if d < fromTime || d >= toTime {
  264. continue
  265. }
  266. if _, ok := avsMap[av.AvID]; ok {
  267. avsMap[av.AvID].Income += av.Income
  268. if avsMap[av.AvID].Date < av.Date {
  269. avsMap[av.AvID].TotalIncome = av.TotalIncome
  270. avsMap[av.AvID].Date = av.Date
  271. }
  272. } else {
  273. avsMap[av.AvID] = av
  274. }
  275. }
  276. }
  277. // GetAvIncome get av_income
  278. func (s *Service) GetAvIncome(c context.Context) (avs []*model.AvIncome, err error) {
  279. limit := 2000
  280. var id int64
  281. for {
  282. av, err := s.dao.ListAvIncome(c, id, limit)
  283. if err != nil {
  284. return avs, err
  285. }
  286. avs = append(avs, av...)
  287. if len(av) < limit {
  288. break
  289. }
  290. id = av[len(av)-1].ID
  291. }
  292. return
  293. }
  294. // GetUpIncomeStatis get up statis
  295. func (s *Service) GetUpIncomeStatis(c context.Context, date string, hasWithdraw int) (err error) {
  296. var upAccount []*model.UpAccount
  297. if hasWithdraw == 1 {
  298. upAccount, err = s.getUpIncomeStatisAfterWithdraw(c, date)
  299. if err != nil {
  300. log.Error("s.getUpIncomeStatisAfterWithdraw error(%v)", err)
  301. return err
  302. }
  303. } else {
  304. upAccount, err = s.getUpIncomeStatisBeforeWithdraw(c, date)
  305. if err != nil {
  306. log.Error("s.getUpIncomeStatisBeforeWithdraw error(%v)", err)
  307. return err
  308. }
  309. }
  310. upa := make(map[int64]*model.UpAccount)
  311. mids := make([]int64, len(upAccount))
  312. for i := 0; i < len(upAccount); i++ {
  313. mids[i] = upAccount[i].MID
  314. upa[upAccount[i].MID] = upAccount[i]
  315. }
  316. upNick, err := s.GetUpNickname(c, mids)
  317. if err != nil {
  318. log.Error("s.GetUpNickname error(%v)", err)
  319. return
  320. }
  321. upIncome, err := s.GetUpIncome(c, "up_income_monthly", date)
  322. if err != nil {
  323. log.Error("s.GetUpIncome error(%v)", err)
  324. return
  325. }
  326. upIncomeStatis(upa, upIncome, upNick)
  327. data := []*model.UpAccount{}
  328. for _, up := range upa {
  329. data = append(data, up)
  330. }
  331. sort.Slice(data, func(i, j int) bool {
  332. return data[i].MonthIncome > data[j].MonthIncome
  333. })
  334. d, _ := time.Parse(_layout, date)
  335. records := formatUpAccount(data, int(d.Month()))
  336. err = WriteCSV(records, "up_statis.csv")
  337. if err != nil {
  338. log.Error("WriteCSV error(%v)", err)
  339. return
  340. }
  341. return s.email.SendMailAttach("up_statis.csv", "up主月结算", []string{"shaozhenyu@bilibili.com"})
  342. }
  343. func (s *Service) getUpIncomeStatisAfterWithdraw(c context.Context, date string) (upAccount []*model.UpAccount, err error) {
  344. d, _ := time.Parse(_layout, date)
  345. withdrawDateStr := d.Format("2006-01")
  346. ctime := d.AddDate(0, 1, 1).Format(_layout)
  347. upAccount, err = s.GetUpAccount(c, withdrawDateStr, ctime)
  348. if err != nil {
  349. log.Error("s.GetUpAccount error(%v)", err)
  350. return
  351. }
  352. upWithdraw, err := s.GetUpWithdraw(c, withdrawDateStr)
  353. if err != nil {
  354. log.Error("s.GetUpWithdraw error(%v)", err)
  355. return
  356. }
  357. for _, up := range upAccount {
  358. up.TotalUnwithdrawIncome = upWithdraw[up.MID]
  359. }
  360. return
  361. }
  362. func (s *Service) getUpIncomeStatisBeforeWithdraw(c context.Context, date string) (upAccount []*model.UpAccount, err error) {
  363. d, _ := time.Parse(_layout, date)
  364. withdrawDateStr := d.AddDate(0, -1, 0).Format("2006-01")
  365. ctime := d.AddDate(0, 1, 1).Format(_layout)
  366. return s.GetUpAccount(c, withdrawDateStr, ctime)
  367. }
  368. func upIncomeStatis(upa map[int64]*model.UpAccount, upIncome []*model.UpIncome, upNick map[int64]string) {
  369. for _, income := range upIncome {
  370. if _, ok := upa[income.MID]; ok {
  371. upa[income.MID].AvCount = income.AvCount
  372. upa[income.MID].MonthIncome = income.Income
  373. upa[income.MID].Nickname = upNick[income.MID]
  374. }
  375. }
  376. }
  377. // GetUpAccount get up_account
  378. func (s *Service) GetUpAccount(c context.Context, date, ctime string) (ups []*model.UpAccount, err error) {
  379. offset, limit := 0, 2000
  380. for {
  381. up, err := s.dao.ListUpAccount(c, date, ctime, offset, limit)
  382. if err != nil {
  383. return ups, err
  384. }
  385. ups = append(ups, up...)
  386. if len(up) < limit {
  387. break
  388. }
  389. offset += limit
  390. }
  391. return
  392. }
  393. // GetUpIncome get up_income
  394. func (s *Service) GetUpIncome(c context.Context, table, date string) (ups []*model.UpIncome, err error) {
  395. ups = make([]*model.UpIncome, 0)
  396. var id int64
  397. limit := 2000
  398. for {
  399. var up []*model.UpIncome
  400. up, err = s.dao.ListUpIncome(c, table, date, id, limit)
  401. if err != nil {
  402. return
  403. }
  404. ups = append(ups, up...)
  405. if len(up) < limit {
  406. break
  407. }
  408. id = up[len(up)-1].ID
  409. }
  410. return
  411. }
  412. // GetUpWithdraw get up_income_withdraw
  413. func (s *Service) GetUpWithdraw(c context.Context, date string) (ups map[int64]int64, err error) {
  414. ups = make(map[int64]int64)
  415. offset, limit := 0, 2000
  416. for {
  417. up, err := s.dao.ListUpWithdraw(c, date, offset, limit)
  418. if err != nil {
  419. return ups, err
  420. }
  421. for mid, income := range up {
  422. ups[mid] = income
  423. }
  424. if len(up) < limit {
  425. break
  426. }
  427. offset += limit
  428. }
  429. return
  430. }
  431. // GetUpNickname get up nickname
  432. func (s *Service) GetUpNickname(c context.Context, mids []int64) (upNick map[int64]string, err error) {
  433. upNick = make(map[int64]string)
  434. offset, limit := 0, 2000
  435. for {
  436. if offset+limit > len(mids) {
  437. limit = len(mids) - offset
  438. }
  439. if limit <= 0 {
  440. break
  441. }
  442. err = s.dao.ListUpNickname(c, mids[offset:offset+limit], upNick)
  443. if err != nil {
  444. log.Error("s.dao.ListUpNickname error(%v)", err)
  445. return
  446. }
  447. offset += limit
  448. }
  449. return
  450. }