main.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. // Copyright 2012-present Oliver Eilhard. All rights reserved.
  2. // Use of this source code is governed by a MIT-license.
  3. // See http://olivere.mit-license.org/license.txt for details.
  4. // BulkProcessor runs a bulk processing job that fills an index
  5. // given certain criteria like flush interval etc.
  6. //
  7. // Example
  8. //
  9. // bulk_processor -url=http://127.0.0.1:9200/bulk-processor-test?sniff=false -n=100000 -flush-interval=1s
  10. //
  11. package main
  12. import (
  13. "context"
  14. "flag"
  15. "fmt"
  16. "log"
  17. "math/rand"
  18. "os"
  19. "os/signal"
  20. "sync/atomic"
  21. "syscall"
  22. "time"
  23. "github.com/google/uuid"
  24. elastic "gopkg.in/olivere/elastic.v5"
  25. "gopkg.in/olivere/elastic.v5/config"
  26. )
  27. func main() {
  28. var (
  29. url = flag.String("url", "http://localhost:9200/bulk-processor-test", "Elasticsearch URL")
  30. numWorkers = flag.Int("num-workers", 4, "Number of workers")
  31. n = flag.Int64("n", -1, "Number of documents to process (-1 for unlimited)")
  32. flushInterval = flag.Duration("flush-interval", 1*time.Second, "Flush interval")
  33. bulkActions = flag.Int("bulk-actions", 0, "Number of bulk actions before committing")
  34. bulkSize = flag.Int("bulk-size", 0, "Size of bulk requests before committing")
  35. )
  36. flag.Parse()
  37. log.SetFlags(0)
  38. rand.Seed(time.Now().UnixNano())
  39. // Parse configuration from URL
  40. cfg, err := config.Parse(*url)
  41. if err != nil {
  42. log.Fatal(err)
  43. }
  44. // Create an Elasticsearch client from the parsed config
  45. client, err := elastic.NewClientFromConfig(cfg)
  46. if err != nil {
  47. log.Fatal(err)
  48. }
  49. // Drop old index
  50. exists, err := client.IndexExists(cfg.Index).Do(context.Background())
  51. if err != nil {
  52. log.Fatal(err)
  53. }
  54. if exists {
  55. _, err = client.DeleteIndex(cfg.Index).Do(context.Background())
  56. if err != nil {
  57. log.Fatal(err)
  58. }
  59. }
  60. // Create processor
  61. bulkp := elastic.NewBulkProcessorService(client).
  62. Name("bulk-test-processor").
  63. Stats(true).
  64. Backoff(elastic.StopBackoff{}).
  65. FlushInterval(*flushInterval).
  66. Workers(*numWorkers)
  67. if *bulkActions > 0 {
  68. bulkp = bulkp.BulkActions(*bulkActions)
  69. }
  70. if *bulkSize > 0 {
  71. bulkp = bulkp.BulkSize(*bulkSize)
  72. }
  73. p, err := bulkp.Do(context.Background())
  74. if err != nil {
  75. log.Fatal(err)
  76. }
  77. var created int64
  78. errc := make(chan error, 1)
  79. go func() {
  80. c := make(chan os.Signal, 1)
  81. signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
  82. <-c
  83. errc <- nil
  84. }()
  85. go func() {
  86. defer func() {
  87. if err := p.Close(); err != nil {
  88. errc <- err
  89. }
  90. }()
  91. type Doc struct {
  92. Timestamp time.Time `json:"@timestamp"`
  93. }
  94. for {
  95. current := atomic.AddInt64(&created, 1)
  96. if *n > 0 && current >= *n {
  97. errc <- nil
  98. return
  99. }
  100. r := elastic.NewBulkIndexRequest().
  101. Index(cfg.Index).
  102. Type("doc").
  103. Id(uuid.New().String()).
  104. Doc(Doc{Timestamp: time.Now()})
  105. p.Add(r)
  106. time.Sleep(time.Duration(rand.Intn(1000)) * time.Microsecond)
  107. }
  108. }()
  109. go func() {
  110. t := time.NewTicker(1 * time.Second)
  111. defer t.Stop()
  112. for range t.C {
  113. stats := p.Stats()
  114. written := atomic.LoadInt64(&created)
  115. var queued int64
  116. for _, w := range stats.Workers {
  117. queued += w.Queued
  118. }
  119. fmt.Printf("Queued=%5d Written=%8d Succeeded=%8d Failed=%8d Comitted=%6d Flushed=%6d\n",
  120. queued,
  121. written,
  122. stats.Succeeded,
  123. stats.Failed,
  124. stats.Committed,
  125. stats.Flushed,
  126. )
  127. }
  128. }()
  129. if err := <-errc; err != nil {
  130. log.Fatal(err)
  131. }
  132. }