Skip to content

Commit

Permalink
Ignore 404 status code for the purpose of backoff.
Browse files Browse the repository at this point in the history
Issue #705.
  • Loading branch information
rwynn committed Dec 19, 2023
1 parent 0e34d06 commit 46d2663
Showing 1 changed file with 14 additions and 7 deletions.
21 changes: 14 additions & 7 deletions monstache.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,18 +565,17 @@ func (ic *indexClient) afterBulk() func(int64, []elastic.BulkableRequest, *elast
if failed := response.Failed(); failed != nil {
backoff := false
for _, item := range failed {
if item.Status == 409 {
if item.Status == http.StatusConflict {
// ignore version conflict since this simply means the doc
// is already in the index
continue
}
backoff = true
json, err := json.Marshal(item)
if err != nil {
errorLog.Printf("Unable to marshal bulk response item: %s", err)
} else {
errorLog.Printf("Bulk response item: %s", string(json))
logFailedResponseItem(item)
if item.Status == http.StatusNotFound {
// status not found should not initiate back off
continue
}
backoff = true
}
if backoff {
wait := ic.backoffDuration()
Expand All @@ -591,6 +590,14 @@ func (ic *indexClient) afterBulk() func(int64, []elastic.BulkableRequest, *elast
}
}

func logFailedResponseItem(item *elastic.BulkResponseItem) {
if encoded, err := json.Marshal(item); err == nil {
errorLog.Printf("Bulk response item: %s", string(encoded))
} else {
errorLog.Printf("Unable to marshal bulk response item: %s", err)
}
}

func (ic *indexClient) backoffDuration() time.Duration {
consecutiveErrors := int(ic.bulkErrs.Load())
wait, ok := ic.bulkBackoff.Next(consecutiveErrors)
Expand Down

0 comments on commit 46d2663

Please sign in to comment.