up_bill.go 8.8 KB


  1. package service
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "math/rand"
  7. "sort"
  8. "strconv"
  9. "strings"
  10. "time"
  11. "go-common/app/job/main/growup/model"
  12. "go-common/app/job/main/growup/model/income"
  13. "go-common/library/log"
  14. )
  15. var (
  16. _dbLimit = 2000
  17. _dbBatchSize = 2000
  18. )
  19. // CreativeUpBill creative up bill
  20. func (s *Service) CreativeUpBill(c context.Context, startDate, endDate time.Time) (err error) {
  21. // up_info_video
  22. ups, err := s.signed(c, int64(_dbLimit))
  23. if err != nil {
  24. log.Error("s.signed error(%v)", err)
  25. return
  26. }
  27. upBills := handleUps(ups, endDate)
  28. // up_income
  29. upIncome, err := s.getUpIncomeByDate(c, "up_income", startDate, endDate)
  30. if err != nil {
  31. log.Error("s.getUpIncomeByDate error(%v)", err)
  32. return
  33. }
  34. handleUpIncome(upBills, upIncome)
  35. // up_signed_avs
  36. upAvs, err := s.upSignedAvs(c, _dbLimit)
  37. if err != nil {
  38. log.Error("s.upSignedAvs error(%v)", err)
  39. return
  40. }
  41. for _, up := range upBills {
  42. up.AvCount = upAvs[up.MID]
  43. }
  44. // av_income_statis
  45. avs, err := s.getAvIncomeStatis(c, int64(_dbLimit), endDate)
  46. if err != nil {
  47. log.Error("s.getAvIncomeStatis error(%v)", err)
  48. return
  49. }
  50. handleAvIncomeStatis(upBills, avs)
  51. upQuality, err := s.getUpQuality(c, int(endDate.Day()), _dbLimit)
  52. if err != nil {
  53. log.Error("s.getUpQualities error(%v)", err)
  54. return
  55. }
  56. if len(upQuality) == 0 {
  57. err = fmt.Errorf("Error: get 0 ups from up_quality_info_%d", endDate.Day())
  58. return
  59. }
  60. handleUpQuality(upBills, upQuality)
  61. handleUpBills(upBills)
  62. // insert
  63. err = s.upBillDBStore(c, upBills)
  64. if err != nil {
  65. log.Error("s.upBillDBStore error(%v)", err)
  66. }
  67. return
  68. }
  69. func randStr(strs []string) string {
  70. return strs[rand.Intn(len(strs))%len(strs)]
  71. }
  72. func handleUpBills(upBills map[int64]*model.UpBill) {
  73. for _, up := range upBills {
  74. titles := []string{}
  75. shareItem := ""
  76. switch {
  77. case up.TotalIncome >= 500000:
  78. titles = append(titles, "掘金小能手")
  79. shareItem = randStr([]string{"98亿手办", "圣地巡礼机票"})
  80. case up.TotalIncome >= 100000:
  81. shareItem = randStr([]string{"BML现场门票", "老婆的演唱会门票", "购物车里的“老婆”"})
  82. case up.TotalIncome >= 50000:
  83. shareItem = randStr([]string{"超大堆小电视抱枕", "肥宅快乐桶吃到吐", "老婆的应援周边"})
  84. case up.TotalIncome >= 10000:
  85. shareItem = randStr([]string{"一堆“2233”挂件", "N个月大会员", "一暑假肥宅快乐水"})
  86. case up.TotalIncome < 10000:
  87. shareItem = randStr([]string{"创作补给餐", "自我打call棒", "承包几部番剧", "老婆的海报"})
  88. }
  89. if up.AvCount >= 30 {
  90. titles = append(titles, "B站劳模")
  91. }
  92. if up.Fans >= 10000 {
  93. titles = append(titles, "万人迷")
  94. }
  95. if up.TotalPlayCount >= 500000 {
  96. titles = append(titles, "流量王")
  97. }
  98. if len(titles) == 0 {
  99. titles = []string{"社会人", "快乐肥宅", "9percent"}
  100. }
  101. up.Title = randStr(titles)
  102. up.ShareItems = shareItem
  103. }
  104. }
  105. func handleUps(ups map[int64]*model.UpInfoVideo, end time.Time) (upBills map[int64]*model.UpBill) {
  106. upBills = make(map[int64]*model.UpBill)
  107. for _, up := range ups {
  108. upBills[up.MID] = &model.UpBill{
  109. MID: up.MID,
  110. SignedAt: up.SignedAt.Time().Format(_layout),
  111. Fans: up.Fans,
  112. TotalPlayCount: up.TotalPlayCount,
  113. EndAt: end.Format(_layout),
  114. }
  115. }
  116. return
  117. }
  118. func handleUpIncome(upBills map[int64]*model.UpBill, upIncome []*model.UpIncome) {
  119. for _, up := range upIncome {
  120. upB, ok := upBills[up.MID]
  121. if !ok {
  122. continue
  123. }
  124. if up.Date.Time().Format(_layout) >= upB.SignedAt {
  125. upB.TotalIncome += up.Income
  126. if upB.FirstTime == "" {
  127. upB.FirstIncome = up.Income
  128. upB.FirstTime = up.Date.Time().Format(_layout)
  129. }
  130. if upB.MaxIncome < up.Income {
  131. upB.MaxIncome = up.Income
  132. upB.MaxTime = up.Date.Time().Format(_layout)
  133. }
  134. }
  135. }
  136. }
  137. func handleAvIncomeStatis(upBills map[int64]*model.UpBill, avs map[int64]*income.AvIncomeStat) {
  138. for _, av := range avs {
  139. upB, ok := upBills[av.MID]
  140. if !ok {
  141. continue
  142. }
  143. if av.CTime.Time().Format(_layout) < upB.SignedAt {
  144. continue
  145. }
  146. income := av.TotalIncome
  147. if income > upB.AvMaxIncome {
  148. upB.AvMaxIncome = income
  149. upB.AvID = av.AvID
  150. }
  151. }
  152. }
  153. func handleUpQuality(upBills map[int64]*model.UpBill, upQualities []*model.UpQuality) {
  154. total := len(upQualities)
  155. sort.Slice(upQualities, func(i, j int) bool {
  156. return upQualities[i].Quality > upQualities[j].Quality
  157. })
  158. for i := 0; i < len(upQualities); i++ {
  159. mid := upQualities[i].MID
  160. if _, ok := upBills[mid]; ok {
  161. upBills[mid].QualityValue = upQualities[i].Quality
  162. rank := i
  163. for rank > 0 && upQualities[rank].Quality == upQualities[rank-1].Quality {
  164. rank--
  165. }
  166. upBills[mid].DefeatNum = (10000 * (total - rank)) / total
  167. }
  168. }
  169. }
  170. func (s *Service) signed(c context.Context, limit int64) (m map[int64]*model.UpInfoVideo, err error) {
  171. var id int64
  172. m = make(map[int64]*model.UpInfoVideo)
  173. for {
  174. var us map[int64]*model.UpInfoVideo
  175. id, us, err = s.dao.UpInfoVideo(c, id, limit)
  176. if err != nil {
  177. return
  178. }
  179. for k, v := range us {
  180. if v.AccountState == 3 && v.IsDeleted == 0 {
  181. m[k] = v
  182. }
  183. }
  184. if len(us) < _dbLimit {
  185. break
  186. }
  187. }
  188. return
  189. }
  190. func (s *Service) getUpIncomeByDate(c context.Context, table string, start, end time.Time) (ups []*model.UpIncome, err error) {
  191. ups = make([]*model.UpIncome, 0)
  192. end = end.AddDate(0, 0, 1)
  193. for start.Before(end) {
  194. var up []*model.UpIncome
  195. up, err = s.GetUpIncome(c, table, start.Format("2006-01-02"))
  196. if err != nil {
  197. return
  198. }
  199. ups = append(ups, up...)
  200. start = start.AddDate(0, 0, 1)
  201. }
  202. return
  203. }
  204. func (s *Service) getAvIncomeStatis(c context.Context, limit int64, endDate time.Time) (m map[int64]*income.AvIncomeStat, err error) {
  205. m = make(map[int64]*income.AvIncomeStat)
  206. var id int64
  207. for {
  208. var am map[int64]*income.AvIncomeStat
  209. am, id, err = s.income.AvIncomeStat(c, id, limit)
  210. if err != nil {
  211. return
  212. }
  213. for avID, stat := range am {
  214. if stat.CTime.Time().Before(endDate.AddDate(0, 0, 1)) {
  215. m[avID] = stat
  216. }
  217. }
  218. if len(am) < int(limit) {
  219. break
  220. }
  221. }
  222. return
  223. }
  224. func (s *Service) upSignedAvs(c context.Context, limit int) (upAvs map[int64]int64, err error) {
  225. upAvs = make(map[int64]int64)
  226. var id int64
  227. for {
  228. var ups map[int64]int64
  229. ups, id, err = s.dao.ListUpSignedAvs(c, id, limit)
  230. if err != nil {
  231. return
  232. }
  233. for mid, avCount := range ups {
  234. upAvs[mid] = avCount
  235. }
  236. if len(ups) < limit {
  237. break
  238. }
  239. }
  240. return
  241. }
  242. func (s *Service) getUpQuality(c context.Context, day, limit int) (ups []*model.UpQuality, err error) {
  243. ups = make([]*model.UpQuality, 0)
  244. var (
  245. id int64
  246. table string
  247. up []*model.UpQuality
  248. )
  249. if day < 10 {
  250. table = fmt.Sprintf("up_quality_info_0%d", day)
  251. } else {
  252. table = fmt.Sprintf("up_quality_info_%d", day)
  253. }
  254. for {
  255. up, id, err = s.dao.GetUpQuality(c, table, id, limit)
  256. if err != nil {
  257. return
  258. }
  259. ups = append(ups, up...)
  260. if len(up) < limit {
  261. break
  262. }
  263. }
  264. return
  265. }
  266. func (s *Service) upBillDBStore(c context.Context, upBill map[int64]*model.UpBill) (err error) {
  267. var (
  268. buff = make([]*model.UpBill, _dbBatchSize)
  269. buffEnd = 0
  270. )
  271. for _, u := range upBill {
  272. buff[buffEnd] = u
  273. buffEnd++
  274. if buffEnd >= _dbBatchSize {
  275. _, err = s.upBillBatchInsert(c, buff[:buffEnd])
  276. if err != nil {
  277. return
  278. }
  279. buffEnd = 0
  280. }
  281. }
  282. if buffEnd > 0 {
  283. _, err = s.upBillBatchInsert(c, buff[:buffEnd])
  284. if err != nil {
  285. return
  286. }
  287. buffEnd = 0
  288. }
  289. return
  290. }
  291. func assembleUpBill(upBill []*model.UpBill) (vals string) {
  292. var buf bytes.Buffer
  293. for _, row := range upBill {
  294. buf.WriteString("(")
  295. buf.WriteString(strconv.FormatInt(row.MID, 10))
  296. buf.WriteByte(',')
  297. buf.WriteString(strconv.FormatInt(row.FirstIncome, 10))
  298. buf.WriteByte(',')
  299. buf.WriteString(strconv.FormatInt(row.MaxIncome, 10))
  300. buf.WriteByte(',')
  301. buf.WriteString(strconv.FormatInt(row.TotalIncome, 10))
  302. buf.WriteByte(',')
  303. buf.WriteString(strconv.FormatInt(row.AvCount, 10))
  304. buf.WriteByte(',')
  305. buf.WriteString(strconv.FormatInt(row.AvMaxIncome, 10))
  306. buf.WriteByte(',')
  307. buf.WriteString(strconv.FormatInt(row.AvID, 10))
  308. buf.WriteByte(',')
  309. buf.WriteString(strconv.FormatInt(row.QualityValue, 10))
  310. buf.WriteByte(',')
  311. buf.WriteString(strconv.Itoa(row.DefeatNum))
  312. buf.WriteByte(',')
  313. buf.WriteString("\"" + strings.Replace(row.Title, "\"", "\\\"", -1) + "\"")
  314. buf.WriteByte(',')
  315. buf.WriteString("'" + row.ShareItems + "'")
  316. buf.WriteByte(',')
  317. buf.WriteString("'" + row.FirstTime + "'")
  318. buf.WriteByte(',')
  319. buf.WriteString("'" + row.MaxTime + "'")
  320. buf.WriteByte(',')
  321. buf.WriteString("'" + row.SignedAt + "'")
  322. buf.WriteByte(',')
  323. buf.WriteString("'" + row.EndAt + "'")
  324. buf.WriteString(")")
  325. buf.WriteByte(',')
  326. }
  327. if buf.Len() > 0 {
  328. buf.Truncate(buf.Len() - 1)
  329. }
  330. vals = buf.String()
  331. buf.Reset()
  332. return
  333. }
  334. func (s *Service) upBillBatchInsert(c context.Context, upBill []*model.UpBill) (rows int64, err error) {
  335. vals := assembleUpBill(upBill)
  336. rows, err = s.dao.InsertUpBillBatch(c, vals)
  337. return
  338. }