video.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. package service
  2. import (
  3. "bytes"
  4. "context"
  5. "strconv"
  6. "time"
  7. "go-common/library/log"
  8. xtime "go-common/library/time"
  9. "golang.org/x/sync/errgroup"
  10. "go-common/app/job/main/growup/model"
  11. )
  12. // account_type, 0: All; 1: UGC; 2: PGC
  13. // o: old state; n: new state
  14. func (s *Service) expiredCheck(accType int, o int, n int, table string) (err error) {
  15. c := context.TODO()
  16. var m map[int64]xtime.Time
  17. if accType == 0 {
  18. m, err = s.dao.MIDsByState(c, o, table)
  19. if err != nil {
  20. log.Error("s.dao.MIDsByState error(%v)", err)
  21. return
  22. }
  23. }
  24. if accType == 1 || accType == 2 {
  25. m, err = s.dao.MIDsByStateType(c, accType, o, table)
  26. if err != nil {
  27. log.Error("s.dao.MIDsByStateType error(%v)", err)
  28. return
  29. }
  30. }
  31. var freed []int64
  32. now := time.Now().Unix()
  33. for mid, exp := range m {
  34. if now > int64(exp) {
  35. freed = append(freed, mid)
  36. }
  37. }
  38. if len(freed) == 0 {
  39. log.Info("no mid cooldown")
  40. return
  41. }
  42. _, err = s.dao.UpdateAccountState(c, n, freed, table)
  43. return
  44. }
  45. // UpdateUpInfo update up_info_video
  46. func (s *Service) UpdateUpInfo(c context.Context) (err error) {
  47. ch := make(chan []int64, 100)
  48. var group errgroup.Group
  49. group.Go(func() (err error) {
  50. defer close(ch)
  51. return s.mids(c, ch)
  52. })
  53. group.Go(func() (err error) {
  54. return s.update(c, ch)
  55. })
  56. return group.Wait()
  57. }
  58. func (s *Service) mids(c context.Context, ch chan []int64) (err error) {
  59. var id int64
  60. var ms []int64
  61. for {
  62. id, ms, err = s.dao.MIDs(c, id, 2000)
  63. if err != nil {
  64. return
  65. }
  66. if len(ms) == 0 {
  67. break
  68. }
  69. ch <- ms
  70. }
  71. return
  72. }
  73. func (s *Service) update(c context.Context, ch chan []int64) (err error) {
  74. for mids := range ch {
  75. var bs []*model.UpBaseInfo
  76. bs, err = s.dao.GetUpBaseInfo(c, mids)
  77. if err != nil {
  78. return
  79. }
  80. // batch update bs
  81. values := baseInfoValues(bs)
  82. _, err = s.dao.UpdateUpInfo(c, values)
  83. if err != nil {
  84. return
  85. }
  86. }
  87. return
  88. }
  89. func baseInfoValues(bs []*model.UpBaseInfo) (values string) {
  90. var buf bytes.Buffer
  91. for _, b := range bs {
  92. buf.WriteString("(")
  93. buf.WriteString(strconv.FormatInt(b.MID, 10))
  94. buf.WriteByte(',')
  95. buf.WriteString(strconv.Itoa(b.Fans))
  96. buf.WriteByte(',')
  97. buf.WriteString(strconv.Itoa(b.TotalPlayCount))
  98. buf.WriteByte(',')
  99. buf.WriteString(strconv.Itoa(b.OriginalArchiveCount))
  100. buf.WriteByte(',')
  101. buf.WriteString(strconv.Itoa(b.Avs))
  102. buf.WriteString(")")
  103. buf.WriteByte(',')
  104. }
  105. if buf.Len() > 0 {
  106. buf.Truncate(buf.Len() - 1)
  107. }
  108. values = buf.String()
  109. buf.Reset()
  110. return
  111. }