blacklist.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "math"
  6. "strconv"
  7. "time"
  8. "go-common/app/job/main/growup/model"
  9. "go-common/library/log"
  10. "go-common/library/xstr"
  11. )
  12. // add to blacklist reason
  13. const (
  14. // _all = 0
  15. // _stopIncome = 1
  16. // _breachRecord = 2
  17. _porder = 3
  18. _executeOrder = 4
  19. )
  20. // InitBlacklistMID init av_black_list mid
  21. func (s *Service) InitBlacklistMID(c context.Context) (err error) {
  22. blacklist, err := s.listBlacklist(c, "mid = 0")
  23. if err != nil {
  24. log.Error("s.listBlacklist error(%v)", err)
  25. return
  26. }
  27. avIDs := make([]int64, 0)
  28. for _, b := range blacklist {
  29. if b.MID == 0 {
  30. avIDs = append(avIDs, b.AvID)
  31. }
  32. }
  33. if len(avIDs) == 0 {
  34. return
  35. }
  36. m, err := s.GetAvsMID(c, avIDs)
  37. if err != nil {
  38. log.Error("GetAvsMID error(%v)", err)
  39. return
  40. }
  41. for i := 0; i < len(blacklist); i++ {
  42. blacklist[i].MID = m[blacklist[i].AvID]
  43. }
  44. _, err = s.updateBlacklistBatch(c, blacklist)
  45. return
  46. }
  47. func (s *Service) listBlacklist(c context.Context, query string) (list []*model.Blacklist, err error) {
  48. from, limit := 0, 2000
  49. var b []*model.Blacklist
  50. for {
  51. b, err = s.dao.ListBlacklist(c, query, from, limit)
  52. if err != nil {
  53. return
  54. }
  55. list = append(list, b...)
  56. if len(b) < limit {
  57. break
  58. }
  59. from += len(b)
  60. }
  61. return
  62. }
  63. // UpdateBlacklist update blacklist
  64. func (s *Service) UpdateBlacklist(c context.Context) (err error) {
  65. defer func() {
  66. GetTaskService().SetTaskStatus(c, TaskBlacklist, time.Now().AddDate(0, 0, -1).Format(_layout), err)
  67. }()
  68. blacklist := make([]*model.Blacklist, 0)
  69. porders, err := s.getNewPorder(c)
  70. if err != nil {
  71. log.Error("s.getNewPorder error(%v)", err)
  72. return
  73. }
  74. log.Info("Get new porder %d", len(porders))
  75. blacklist = append(blacklist, porders...)
  76. executeOrders, err := s.getNewExecuteOrder(c)
  77. if err != nil {
  78. log.Error("s.getNewExecuteOrder error(%v)", err)
  79. return
  80. }
  81. log.Info("Get new execute order %d", len(executeOrders))
  82. blacklist = append(blacklist, executeOrders...)
  83. count, err := s.updateBlacklistBatch(c, blacklist)
  84. if err != nil {
  85. log.Error("s.updateBlacklistBatch error(%v)", err)
  86. return
  87. }
  88. log.Info("Add %d list into blacklist", count)
  89. return
  90. }
  91. func (s *Service) updateBlacklistBatch(c context.Context, blacklist []*model.Blacklist) (count int64, err error) {
  92. ups, err := s.getHasSignUpInfo(c)
  93. if err != nil {
  94. log.Error("s.dao.GetHasSignUpInfo error(%v)", err)
  95. return
  96. }
  97. for i := 0; i < len(blacklist); i++ {
  98. if nickname, ok := ups[blacklist[i].MID]; ok {
  99. blacklist[i].HasSigned = 1
  100. blacklist[i].Nickname = nickname
  101. }
  102. }
  103. return s.dao.AddBlacklistBatch(c, blacklist)
  104. }
  105. func (s *Service) getHasSignUpInfo(c context.Context) (m map[int64]string, err error) {
  106. m = make(map[int64]string)
  107. offset, limit := 0, 2000
  108. for {
  109. err = s.dao.GetHasSignUpInfo(c, offset, limit, m)
  110. if err != nil {
  111. log.Error("s.dao.GetHasSignUpInfo error(%v)", err)
  112. return
  113. }
  114. offset += limit
  115. if len(m) < offset {
  116. break
  117. }
  118. }
  119. return
  120. }
  121. func (s *Service) getNewPorder(c context.Context) (blacklist []*model.Blacklist, err error) {
  122. beginTime, err := s.dao.GetLastCtime(c, _porder)
  123. if err != nil {
  124. log.Error("s.dao.GetLastCtime error(%v)", err)
  125. return
  126. }
  127. if beginTime != 0 {
  128. beginTime -= 10 * 60 // pre 10min
  129. }
  130. endTime := time.Now().Unix()
  131. porders, err := s.getPorder(beginTime, endTime)
  132. if err != nil {
  133. log.Error("get Porder error(%v)", err)
  134. return
  135. }
  136. // get porder mid
  137. avIds := []int64{}
  138. for _, b := range porders {
  139. avIds = append(avIds, b.AID)
  140. }
  141. m, err := s.GetAvsMID(c, avIds)
  142. if err != nil {
  143. log.Error("s.dao.GetAvsMID error(%v)", err)
  144. return
  145. }
  146. blacklist = make([]*model.Blacklist, len(porders))
  147. for i := 0; i < len(porders); i++ {
  148. blacklist[i] = &model.Blacklist{
  149. AvID: porders[i].AID,
  150. MID: m[porders[i].AID],
  151. Reason: _porder,
  152. }
  153. }
  154. return
  155. }
  156. // GetAvsMID get avs mid from api
  157. func (s *Service) GetAvsMID(c context.Context, avs []int64) (avsMap map[int64]int64, err error) {
  158. avsMap = make(map[int64]int64)
  159. if len(avs) == 0 {
  160. return
  161. }
  162. start, limit := 0, 10
  163. if limit > len(avs) {
  164. limit = len(avs)
  165. }
  166. for start+limit <= len(avs) {
  167. if err = s.getAvsMID(c, avs[start:start+limit], avsMap); err != nil {
  168. return
  169. }
  170. start += limit
  171. if start < len(avs) && start+limit > len(avs) {
  172. limit = len(avs) - start
  173. }
  174. }
  175. log.Info("Get avs(%d) from archiveURL", len(avsMap))
  176. return
  177. }
  178. func (s *Service) getAvsMID(c context.Context, avs []int64, avsMap map[int64]int64) (err error) {
  179. params := map[string]string{"aids": xstr.JoinInts(avs)}
  180. body, err := s.HTTPClient("GET", s.conf.Host.Archives, params, time.Now().Unix())
  181. if err != nil {
  182. log.Error("s.HTTPClient error(%v)", err)
  183. return
  184. }
  185. res := model.ArchiveRes{}
  186. err = json.Unmarshal(body, &res)
  187. if err != nil {
  188. log.Error("json.Unmarshal body %s error(%v)", string(body), err)
  189. return
  190. }
  191. for _, archive := range res.Data {
  192. avsMap[archive.AID] = archive.Owner.MID
  193. }
  194. return
  195. }
  196. func (s *Service) getPorder(begin, end int64) (porders []*model.Porder, err error) {
  197. params := map[string]string{
  198. "begin": strconv.FormatInt(begin, 10),
  199. "end": strconv.FormatInt(end, 10),
  200. }
  201. body, err := s.HTTPClient("GET", s.conf.Host.Porder, params, time.Now().UnixNano()/int64(math.Pow(10, 6)))
  202. if err != nil {
  203. log.Error("s.HTTPClient error(%v)", err)
  204. return
  205. }
  206. res := model.PorderRes{}
  207. err = json.Unmarshal(body, &res)
  208. if err != nil {
  209. log.Error("json.Unmarshal body %s error(%v)", string(body), err)
  210. return
  211. }
  212. porders = res.Data
  213. return
  214. }
  215. func (s *Service) getNewExecuteOrder(c context.Context) (blacklist []*model.Blacklist, err error) {
  216. beginTime, err := s.dao.GetLastCtime(c, _executeOrder)
  217. if err != nil {
  218. log.Error("s.dao.GetLastCtime error(%v)", err)
  219. return
  220. }
  221. if beginTime != 0 {
  222. beginTime -= 10 * 60 // pre 10min
  223. }
  224. executeOrders, err := s.dao.GetExecuteOrder(c, time.Unix(beginTime, 0), time.Now())
  225. if err != nil {
  226. log.Error("s.dao.GetExecuteOrder error(%v)", err)
  227. return
  228. }
  229. blacklist = make([]*model.Blacklist, len(executeOrders))
  230. for i := 0; i < len(executeOrders); i++ {
  231. blacklist[i] = &model.Blacklist{
  232. AvID: executeOrders[i].AvID,
  233. MID: executeOrders[i].MID,
  234. Reason: _executeOrder,
  235. }
  236. }
  237. return
  238. }