service.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. package service
  2. import (
  3. "context"
  4. "go-common/app/job/live/push-search/conf"
  5. "go-common/app/job/live/push-search/dao"
  6. accountApi "go-common/app/service/main/account/api"
  7. "go-common/library/queue/databus"
  8. "sync"
  9. )
  10. const (
  11. _tableArchive = "ap_room"
  12. )
  13. // Service struct
  14. type Service struct {
  15. c *conf.Config
  16. dao *dao.Dao
  17. binLogMergeChan []chan *message
  18. attentionMergeChan []chan *message
  19. unameMergeChan []chan *message
  20. waiter *sync.WaitGroup
  21. waiterChan *sync.WaitGroup
  22. AccountClient accountApi.AccountClient
  23. }
  24. type message struct {
  25. next *message
  26. data *databus.Message
  27. object interface{}
  28. done bool
  29. }
  30. // New init
  31. func New(c *conf.Config) (s *Service) {
  32. dao.InitAPI()
  33. s = &Service{
  34. c: c,
  35. dao: dao.New(c),
  36. binLogMergeChan: make([]chan *message, c.Group.RoomInfo.Num),
  37. attentionMergeChan: make([]chan *message, c.Group.Attention.Num),
  38. unameMergeChan: make([]chan *message, c.Group.UserInfo.Num),
  39. waiter: new(sync.WaitGroup),
  40. waiterChan: new(sync.WaitGroup),
  41. }
  42. accountClient, err := accountApi.NewClient(nil)
  43. if err != nil {
  44. panic(err)
  45. }
  46. s.AccountClient = accountClient
  47. //ap room 表 binlog qps 高, hash roomId 并行
  48. for i := 0; i < c.Group.RoomInfo.Num; i++ {
  49. ch := make(chan *message, c.Group.RoomInfo.Chan)
  50. s.binLogMergeChan[i] = ch
  51. s.waiterChan.Add(1)
  52. go s.roomInfoNotifyHandleProc(ch)
  53. }
  54. for i := 0; i < c.Group.Attention.Num; i++ {
  55. ch := make(chan *message, c.Group.Attention.Chan)
  56. s.attentionMergeChan[i] = ch
  57. s.waiterChan.Add(1)
  58. go s.attentionNotifyHandleProc(ch)
  59. }
  60. for i := 0; i < c.Group.UserInfo.Num; i++ {
  61. ch := make(chan *message, c.Group.UserInfo.Chan)
  62. s.unameMergeChan[i] = ch
  63. s.waiterChan.Add(1)
  64. go s.unameNotifyHandleProc(ch)
  65. }
  66. s.waiter.Add(1)
  67. go s.roomInfoNotifyConsumeProc()
  68. s.waiter.Add(1)
  69. go s.attentionNotifyConsumeProc()
  70. s.waiter.Add(1)
  71. go s.unameNotifyConsumeProc()
  72. return s
  73. }
  74. // Ping Service
  75. func (s *Service) Ping(c context.Context) (err error) {
  76. return s.dao.Ping(c)
  77. }
  78. // Close Service
  79. func (s *Service) Close() {
  80. //databus chan close
  81. s.dao.Close()
  82. s.waiter.Wait()
  83. //task goroutine close
  84. for _, ch := range s.binLogMergeChan {
  85. close(ch)
  86. }
  87. for _, ch := range s.attentionMergeChan {
  88. close(ch)
  89. }
  90. for _, ch := range s.unameMergeChan {
  91. close(ch)
  92. }
  93. s.waiterChan.Wait()
  94. s.dao.PushSearchDataBus.Close()
  95. }