Skip to content

Commit

Permalink
Back off indexing when bulk indexing errors occur.
Browse files Browse the repository at this point in the history
  • Loading branch information
rwynn committed Dec 1, 2023
1 parent 4c1413a commit af7f1cf
Showing 1 changed file with 84 additions and 45 deletions.
129 changes: 84 additions & 45 deletions monstache.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"text/template"
"time"
Expand Down Expand Up @@ -166,6 +167,9 @@ type indexClient struct {
directReadsPending bool
externalShutdown bool
rwmutex sync.RWMutex
bulkErrs atomic.Int64
bulkBackoff elastic.Backoff
bulkBackoffC chan struct{}
}

type sigHandler struct {
Expand Down Expand Up @@ -549,27 +553,59 @@ func (config *configOptions) ignoreCollectionForDirectReads(col string) bool {
return strings.HasPrefix(col, "system.")
}

func afterBulk(executionID int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) {
if response == nil || !response.Errors {
return
}
if failed := response.Failed(); failed != nil {
for _, item := range failed {
if item.Status == 409 {
// ignore version conflict since this simply means the doc
// is already in the index
continue
func (ic *indexClient) afterBulk() func(int64, []elastic.BulkableRequest, *elastic.BulkResponse, error) {
return func(executionID int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) {
if response == nil || !response.Errors {
ic.bulkErrs.Store(0)
return
}
if failed := response.Failed(); failed != nil {
backoff := false
for _, item := range failed {
if item.Status == 409 {
// ignore version conflict since this simply means the doc
// is already in the index
continue
}
backoff = true
json, err := json.Marshal(item)
if err != nil {
errorLog.Printf("Unable to marshal bulk response item: %s", err)
} else {
errorLog.Printf("Bulk response item: %s", string(json))
}
}
json, err := json.Marshal(item)
if err != nil {
errorLog.Printf("Unable to marshal bulk response item: %s", err)
} else {
errorLog.Printf("Bulk response item: %s", string(json))
if backoff {
infoLog.Println("Backing off after bulk indexing failures")
ic.bulkErrs.Add(1)
// signal the event loop to pause pulling new events for a duration
ic.bulkBackoffC <- struct{}{}
// pause the bulk worker for a duration
ic.backoff()
}
}
}
}

func (ic *indexClient) backoff() {
consecutiveErrors := int(ic.bulkErrs.Load())
wait, ok := ic.bulkBackoff.Next(consecutiveErrors)
if !ok {
return
}
timer := time.NewTimer(wait)
defer timer.Stop()
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
defer signal.Stop(sigs)
select {
case <-timer.C:
return
case <-sigs:
return
}
}

func (config *configOptions) parseElasticsearchVersion(number string) (err error) {
if number == "" {
err = errors.New("Elasticsearch version cannot be blank")
Expand All @@ -593,7 +629,8 @@ func (config *configOptions) parseElasticsearchVersion(number string) (err error
return
}

func (config *configOptions) newBulkProcessor(client *elastic.Client) (bulk *elastic.BulkProcessor, err error) {
func (ic *indexClient) newBulkProcessor(client *elastic.Client) (bulk *elastic.BulkProcessor, err error) {
config := ic.config
bulkService := client.BulkProcessor().Name("monstache")
bulkService.Workers(config.ElasticMaxConns)
bulkService.Stats(config.Stats)
Expand All @@ -602,18 +639,18 @@ func (config *configOptions) newBulkProcessor(client *elastic.Client) (bulk *ela
if config.ElasticRetry == false {
bulkService.Backoff(&elastic.StopBackoff{})
}
bulkService.After(afterBulk)
bulkService.After(ic.afterBulk())
bulkService.FlushInterval(time.Duration(config.ElasticMaxSeconds) * time.Second)
return bulkService.Do(context.Background())
}

func (config *configOptions) newStatsBulkProcessor(client *elastic.Client) (bulk *elastic.BulkProcessor, err error) {
func (ic *indexClient) newStatsBulkProcessor(client *elastic.Client) (bulk *elastic.BulkProcessor, err error) {
bulkService := client.BulkProcessor().Name("monstache-stats")
bulkService.Workers(1)
bulkService.Stats(false)
bulkService.BulkActions(-1)
bulkService.BulkSize(-1)
bulkService.After(afterBulk)
bulkService.After(ic.afterBulk())
bulkService.FlushInterval(time.Duration(5) * time.Second)
return bulkService.Do(context.Background())
}
Expand Down Expand Up @@ -1342,11 +1379,11 @@ func parseIndexMeta(op *gtm.Op) (meta *indexingMeta) {

func (ic *indexClient) addFileContent(op *gtm.Op) (err error) {
op.Data["file"] = ""
var gridByteBuffer bytes.Buffer
var fileContentBuilder strings.Builder
db, bucketName :=
ic.mongo.Database(op.GetDatabase()),
strings.SplitN(op.GetCollection(), ".", 2)[0]
encoder := base64.NewEncoder(base64.StdEncoding, &gridByteBuffer)
encoder := base64.NewEncoder(base64.StdEncoding, &fileContentBuilder)
opts := &options.BucketOptions{}
opts.SetName(bucketName)
var bucket *gridfs.Bucket
Expand All @@ -1366,7 +1403,7 @@ func (ic *indexClient) addFileContent(op *gtm.Op) (err error) {
if err = encoder.Close(); err != nil {
return
}
op.Data["file"] = string(gridByteBuffer.Bytes())
op.Data["file"] = fileContentBuilder.String()
return
}

Expand Down Expand Up @@ -4373,14 +4410,13 @@ func (ic *indexClient) setupFileIndexing() {
}

func (ic *indexClient) setupBulk() {
config := ic.config
bulk, err := config.newBulkProcessor(ic.client)
bulk, err := ic.newBulkProcessor(ic.client)
if err != nil {
errorLog.Fatalf("Unable to start bulk processor: %s", err)
}
var bulkStats *elastic.BulkProcessor
if config.IndexStats {
bulkStats, err = config.newStatsBulkProcessor(ic.client)
if ic.config.IndexStats {
bulkStats, err = ic.newStatsBulkProcessor(ic.client)
if err != nil {
errorLog.Fatalf("Unable to start stats bulk processor: %s", err)
}
Expand Down Expand Up @@ -4977,6 +5013,8 @@ func (ic *indexClient) eventLoop() {
ic.sigH.clientStartedC <- ic
for {
select {
case <-ic.bulkBackoffC:
ic.backoff()
case timeout := <-ic.doneC:
ic.enabled = false
ic.shutdown(timeout)
Expand Down Expand Up @@ -5179,7 +5217,6 @@ func (ic *indexClient) saveTimestampFromServerStatus() {
} else {
ic.processErr(err)
}
return
}

func (ic *indexClient) saveTimestampFromReplStatus() {
Expand Down Expand Up @@ -5297,24 +5334,26 @@ func main() {
elasticClient := buildElasticClient(config)

ic := &indexClient{
config: config,
mongo: mongoClient,
client: elasticClient,
fileWg: &sync.WaitGroup{},
indexWg: &sync.WaitGroup{},
processWg: &sync.WaitGroup{},
relateWg: &sync.WaitGroup{},
opsConsumed: make(chan bool),
closeC: make(chan bool),
doneC: make(chan int),
enabled: true,
indexC: make(chan *gtm.Op),
processC: make(chan *gtm.Op),
fileC: make(chan *gtm.Op),
relateC: make(chan *gtm.Op, config.RelateBuffer),
statusReqC: make(chan *statusRequest),
sigH: sh,
tokens: bson.M{},
config: config,
mongo: mongoClient,
client: elasticClient,
fileWg: &sync.WaitGroup{},
indexWg: &sync.WaitGroup{},
processWg: &sync.WaitGroup{},
relateWg: &sync.WaitGroup{},
opsConsumed: make(chan bool),
closeC: make(chan bool),
doneC: make(chan int),
enabled: true,
indexC: make(chan *gtm.Op),
processC: make(chan *gtm.Op),
fileC: make(chan *gtm.Op),
relateC: make(chan *gtm.Op, config.RelateBuffer),
statusReqC: make(chan *statusRequest),
sigH: sh,
tokens: bson.M{},
bulkBackoffC: make(chan struct{}),
bulkBackoff: elastic.NewExponentialBackoff(1*time.Minute, 1*time.Hour),
}

ic.run()
Expand Down

0 comments on commit af7f1cf

Please sign in to comment.