auth.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. package service
  2. import (
  3. "encoding/base64"
  4. "encoding/hex"
  5. "encoding/json"
  6. "strconv"
  7. "strings"
  8. "time"
  9. "go-common/app/job/main/passport/model"
  10. "go-common/library/log"
  11. "go-common/library/queue/databus"
  12. )
  13. func (s *Service) authBinLogconsumeproc() {
  14. mergeNum := s.c.Group.AuthBinLog.Num
  15. var (
  16. err error
  17. n int
  18. msgs = s.authBinLog.Messages()
  19. )
  20. for {
  21. msg, ok := <-msgs
  22. if !ok {
  23. log.Error("s.authBinLogconsumeproc closed")
  24. return
  25. }
  26. // marked head to first commit
  27. m := &message{data: msg}
  28. if n, err = strconv.Atoi(msg.Key); err != nil {
  29. log.Error("strconv.Atoi(%s) error(%v)", msg.Key, err)
  30. continue
  31. }
  32. s.authBinLogMu.Lock()
  33. if s.authBinLogHead == nil {
  34. s.authBinLogHead = m
  35. s.authBinLogLast = m
  36. } else {
  37. s.authBinLogLast.next = m
  38. s.authBinLogLast = m
  39. }
  40. s.authBinLogMu.Unlock()
  41. // use specify goroutine to merge messages
  42. s.authBinLogMergeChans[n%mergeNum] <- m
  43. log.Info("authBinLogconsumeproc key:%s partition:%d offset:%d", msg.Key, msg.Partition, msg.Offset)
  44. }
  45. }
  46. func (s *Service) authBinLogcommitproc() {
  47. commits := make(map[int32]*databus.Message, s.c.Group.AuthBinLog.Size)
  48. for {
  49. done := <-s.authBinLogDoneChan
  50. // merge partitions to commit offset
  51. for _, d := range done {
  52. d.done = true
  53. }
  54. s.mu.Lock()
  55. for ; s.authBinLogHead != nil && s.authBinLogHead.done; s.authBinLogHead = s.authBinLogHead.next {
  56. commits[s.authBinLogHead.data.Partition] = s.authBinLogHead.data
  57. }
  58. s.mu.Unlock()
  59. for k, m := range commits {
  60. log.Info("authBinLogcommitproc committed, key:%s partition:%d offset:%d", m.Key, m.Partition, m.Offset)
  61. m.Commit()
  62. delete(commits, k)
  63. }
  64. }
  65. }
  66. func (s *Service) authBinLogmergeproc(c chan *message) {
  67. var (
  68. err error
  69. max = s.c.Group.AuthBinLog.Size
  70. merges = make([]*model.AuthToken, 0, max)
  71. marked = make([]*message, 0, max)
  72. ticker = time.NewTicker(time.Duration(s.c.Group.AuthBinLog.Ticker))
  73. )
  74. for {
  75. select {
  76. case msg, ok := <-c:
  77. if !ok {
  78. log.Error("s.authBinLogmergeproc closed")
  79. return
  80. }
  81. bmsg := &model.BMsg{}
  82. if err = json.Unmarshal(msg.data.Value, bmsg); err != nil {
  83. log.Error("json.Unmarshal(%s) error(%v)", string(msg.data.Value), err)
  84. continue
  85. }
  86. if bmsg.Action == "delete" && strings.HasPrefix(bmsg.Table, "user_token_") {
  87. t := &model.AuthToken{}
  88. if err = json.Unmarshal(bmsg.New, t); err != nil {
  89. log.Error("json.Unmarshal(%s) error(%v)", string(bmsg.New), err)
  90. continue
  91. }
  92. merges = append(merges, t)
  93. }
  94. marked = append(marked, msg)
  95. if len(marked) < max && len(merges) < max {
  96. continue
  97. }
  98. case <-ticker.C:
  99. }
  100. if len(merges) > 0 {
  101. s.cleanAuthTokens(merges)
  102. merges = make([]*model.AuthToken, 0, max)
  103. }
  104. if len(marked) > 0 {
  105. s.authBinLogDoneChan <- marked
  106. marked = make([]*message, 0, max)
  107. }
  108. }
  109. }
  110. // cleanTokens by auth .
  111. func (s *Service) cleanAuthTokens(authTokens []*model.AuthToken) {
  112. for _, authToken := range authTokens {
  113. var (
  114. bytes []byte
  115. err error
  116. )
  117. if bytes, err = base64.StdEncoding.DecodeString(authToken.Token); err != nil {
  118. log.Error("cleanAuthTokens base64 decode err %v", err)
  119. continue
  120. }
  121. token := hex.EncodeToString(bytes)
  122. log.Info("auth binlog clear cleanAuthTokens,msg is (%+v)", authToken)
  123. t := &model.AccessInfo{
  124. Mid: authToken.Mid,
  125. AppID: int32(authToken.AppID),
  126. Token: token,
  127. Expires: authToken.Expires,
  128. }
  129. s.cleanToken(t)
  130. }
  131. }