fanout.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. package fanout
  2. import (
  3. "context"
  4. "errors"
  5. "runtime"
  6. "sync"
  7. "go-common/library/log"
  8. "go-common/library/net/metadata"
  9. "go-common/library/net/trace"
  10. "go-common/library/stat/prom"
  11. )
  12. var (
  13. // ErrFull chan full.
  14. ErrFull = errors.New("fanout: chan full")
  15. stats = prom.BusinessInfoCount
  16. traceTags = []trace.Tag{
  17. trace.Tag{Key: trace.TagSpanKind, Value: "background"},
  18. trace.Tag{Key: trace.TagComponent, Value: "sync/pipeline/fanout"},
  19. }
  20. )
  21. type options struct {
  22. worker int
  23. buffer int
  24. }
  25. // Option fanout option
  26. type Option func(*options)
  27. // Worker specifies the worker of fanout
  28. func Worker(n int) Option {
  29. if n <= 0 {
  30. panic("fanout: worker should > 0")
  31. }
  32. return func(o *options) {
  33. o.worker = n
  34. }
  35. }
  36. // Buffer specifies the buffer of fanout
  37. func Buffer(n int) Option {
  38. if n <= 0 {
  39. panic("fanout: buffer should > 0")
  40. }
  41. return func(o *options) {
  42. o.buffer = n
  43. }
  44. }
  45. type item struct {
  46. f func(c context.Context)
  47. ctx context.Context
  48. }
  49. // Fanout async consume data from chan.
  50. type Fanout struct {
  51. name string
  52. ch chan item
  53. options *options
  54. waiter sync.WaitGroup
  55. ctx context.Context
  56. cancel func()
  57. }
  58. // New new a fanout struct.
  59. func New(name string, opts ...Option) *Fanout {
  60. if name == "" {
  61. name = "fanout"
  62. }
  63. o := &options{
  64. worker: 1,
  65. buffer: 1024,
  66. }
  67. for _, op := range opts {
  68. op(o)
  69. }
  70. c := &Fanout{
  71. ch: make(chan item, o.buffer),
  72. name: name,
  73. options: o,
  74. }
  75. c.ctx, c.cancel = context.WithCancel(context.Background())
  76. c.waiter.Add(o.worker)
  77. for i := 0; i < o.worker; i++ {
  78. go c.proc()
  79. }
  80. return c
  81. }
  82. func (c *Fanout) proc() {
  83. defer c.waiter.Done()
  84. for {
  85. select {
  86. case t := <-c.ch:
  87. wrapFunc(t.f)(t.ctx)
  88. stats.State(c.name+"_channel", int64(len(c.ch)))
  89. case <-c.ctx.Done():
  90. return
  91. }
  92. }
  93. }
  94. func wrapFunc(f func(c context.Context)) (res func(context.Context)) {
  95. res = func(ctx context.Context) {
  96. defer func() {
  97. if r := recover(); r != nil {
  98. buf := make([]byte, 64*1024)
  99. buf = buf[:runtime.Stack(buf, false)]
  100. log.Error("panic in fanout proc, err: %s, stack: %s", r, buf)
  101. }
  102. }()
  103. f(ctx)
  104. if tr, ok := trace.FromContext(ctx); ok {
  105. tr.Finish(nil)
  106. }
  107. }
  108. return
  109. }
  110. // Do save a callback func.
  111. func (c *Fanout) Do(ctx context.Context, f func(ctx context.Context)) (err error) {
  112. if f == nil || c.ctx.Err() != nil {
  113. return c.ctx.Err()
  114. }
  115. nakeCtx := metadata.WithContext(ctx)
  116. if tr, ok := trace.FromContext(ctx); ok {
  117. tr = tr.Fork("", "Fanout:Do").SetTag(traceTags...)
  118. nakeCtx = trace.NewContext(nakeCtx, tr)
  119. }
  120. select {
  121. case c.ch <- item{f: f, ctx: nakeCtx}:
  122. default:
  123. err = ErrFull
  124. }
  125. stats.State(c.name+"_channel", int64(len(c.ch)))
  126. return
  127. }
  128. // Close close fanout
  129. func (c *Fanout) Close() error {
  130. if err := c.ctx.Err(); err != nil {
  131. return err
  132. }
  133. c.cancel()
  134. c.waiter.Wait()
  135. return nil
  136. }