statsd.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. package statsd
  2. import (
  3. "bytes"
  4. "fmt"
  5. "math/rand"
  6. "net"
  7. "strings"
  8. "time"
  9. "go-common/library/log"
  10. )
  11. const (
  12. _quit = ""
  13. _size = 1400
  14. )
  15. // Config statsd config.
  16. type Config struct {
  17. Project string
  18. Addr string
  19. ChanSize int
  20. }
  21. // Statsd statsd struct.
  22. type Statsd struct {
  23. // project.hostname.api
  24. // Make sure no '/' in the api.
  25. c *Config
  26. business string
  27. r *rand.Rand
  28. stats chan string
  29. }
  30. // New new a statsd struct.
  31. func New(c *Config) (s *Statsd) {
  32. s = new(Statsd)
  33. s.c = c
  34. s.business = fmt.Sprintf("%s", c.Project)
  35. // init rand
  36. s.r = rand.New(rand.NewSource(time.Now().Unix()))
  37. // init stat channel
  38. s.stats = make(chan string, c.ChanSize)
  39. go s.writeproc()
  40. return
  41. }
  42. // send data to udp statsd daemon
  43. func (s *Statsd) send(data string, rate float32) {
  44. if rate < 1 && s.r != nil {
  45. if s.r.Float32() < rate {
  46. return
  47. }
  48. }
  49. select {
  50. case s.stats <- data:
  51. default:
  52. log.Warn("Statsd stat channel is full")
  53. }
  54. }
  55. // writeproc write data into connection.
  56. func (s *Statsd) writeproc() {
  57. var (
  58. err error
  59. l int
  60. stat string
  61. conn net.Conn
  62. buf bytes.Buffer
  63. tick = time.Tick(1 * time.Second)
  64. )
  65. for {
  66. select {
  67. case stat = <-s.stats:
  68. if stat == _quit {
  69. if conn != nil {
  70. conn.Close()
  71. }
  72. return
  73. }
  74. case <-tick:
  75. if l = buf.Len(); l > 0 {
  76. conn.Write(buf.Bytes()[:l-1])
  77. buf.Reset()
  78. }
  79. continue
  80. }
  81. if conn == nil {
  82. if conn, err = net.Dial("udp", s.c.Addr); err != nil {
  83. log.Error("net.Dial('udp', %s) error(%v)", s.c.Addr, err)
  84. time.Sleep(time.Second)
  85. continue
  86. }
  87. }
  88. if l = buf.Len(); l+len(stat) >= _size {
  89. conn.Write(buf.Bytes()[:l-1])
  90. buf.Reset()
  91. }
  92. buf.WriteString(stat)
  93. buf.WriteByte('\n')
  94. }
  95. }
  96. // Close close the connection.
  97. func (s *Statsd) Close() {
  98. s.stats <- _quit
  99. }
  100. // Timing log timing information (in milliseconds) without sampling
  101. func (s *Statsd) Timing(name string, time int64, extra ...string) {
  102. val := formatTiming(s.business, name, time, extra...)
  103. s.send(val, 1)
  104. }
  105. // Incr increments one stat counter without sampling
  106. func (s *Statsd) Incr(name string, extra ...string) {
  107. val := formatIncr(s.business, name, extra...)
  108. s.send(val, 1)
  109. }
  110. // State set state
  111. func (s *Statsd) State(stat string, val int64, extra ...string) {
  112. return
  113. }
  114. func formatIncr(business, name string, extra ...string) string {
  115. ss := []string{business, name}
  116. ss = append(ss, extra...)
  117. return strings.Join(ss, ".") + ":1|c"
  118. }
  119. func formatTiming(business, name string, time int64, extra ...string) string {
  120. ss := []string{business, name}
  121. ss = append(ss, extra...)
  122. return strings.Join(ss, ".") + fmt.Sprintf(":%d|ms", time)
  123. }