binlogvr.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "go-common/app/job/bbq/video/model"
  7. videov1 "go-common/app/service/bbq/video/api/grpc/v1"
  8. "go-common/library/log"
  9. )
  10. //videoRepositorySub video_repository subscription .
  11. func (s *Service) videoRepositoryBinlogSub() {
  12. msgs := s.videoRep.Messages()
  13. for {
  14. var err error
  15. msg, ok := <-msgs
  16. if !ok {
  17. log.Info("video_repository databus Consumer exit")
  18. return
  19. }
  20. res := &model.DatabusRes{}
  21. log.Infov(context.Background(), log.KV("log", fmt.Sprintf("canal message %s", string(msg.Value))))
  22. if err = json.Unmarshal(msg.Value, &res); err != nil {
  23. log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
  24. msg.Commit()
  25. continue
  26. }
  27. if res.Table != "video_repository" || (res.Action != "update" && res.Action != "insert") {
  28. msg.Commit()
  29. continue
  30. }
  31. //unserialize databus struct
  32. var vNew, vOld *model.VideoRepRaw
  33. if res.Action == "insert" || res.Action == "update" {
  34. if err = json.Unmarshal(res.New, &vNew); err != nil {
  35. log.Error("video unmarshal err(%v) data[%s]", err, string(res.New))
  36. msg.Commit()
  37. continue
  38. }
  39. }
  40. if res.Action == "update" {
  41. if err = json.Unmarshal(res.Old, &vOld); err != nil {
  42. log.Error("video unmarshal err(%v) data[%s]", err, string(res.Old))
  43. msg.Commit()
  44. continue
  45. }
  46. }
  47. if res.Action == "insert" {
  48. for i := 0; i < _retryTimes; i++ {
  49. if err = s.PepareResource(vNew); err == nil {
  50. break
  51. }
  52. }
  53. }
  54. msg.Commit()
  55. }
  56. }
  57. //PepareResource ...
  58. func (s *Service) PepareResource(vNew *model.VideoRepRaw) (err error) {
  59. var (
  60. ctx = context.Background()
  61. SVID int64
  62. row *model.VideoRepRaw
  63. )
  64. //bbq/cms video not trans to bvc
  65. if vNew.From == model.VideoFromBBQ || vNew.From == model.VideoFromCMS {
  66. return
  67. }
  68. if row, err = s.dao.RawVideoByID(ctx, vNew.ID); err != nil {
  69. return
  70. }
  71. if row.SVID > 0 {
  72. SVID = row.SVID
  73. } else {
  74. req := &videov1.CreateIDRequest{
  75. Mid: vNew.MID,
  76. }
  77. var rep *videov1.CreateIDResponse
  78. if rep, err = s.dao.VideoClient.CreateID(ctx, req); err != nil {
  79. log.Error("Numbering device return err:%v", err)
  80. return
  81. }
  82. if err = s.dao.UpdateSvid(context.Background(), vNew.ID, rep.NewId); err != nil {
  83. return
  84. }
  85. SVID = rep.NewId
  86. }
  87. reqBvc := &videov1.BVideoTransRequset{
  88. SVID: SVID,
  89. CID: vNew.CID,
  90. }
  91. log.Info("bvc trans commit req:%v", reqBvc)
  92. if _, err = s.dao.VideoClient.BVCTransCommit(ctx, reqBvc); err != nil {
  93. log.Error("BVCTransCommit err :%v,req:%v", err, reqBvc)
  94. }
  95. s.dao.UpdateSyncStatus(ctx, SVID, model.SourceRequest)
  96. return
  97. }