From 410a2a8853724743ad2c96a3ad3010903dcd2586 Mon Sep 17 00:00:00 2001 From: Denis Rechkunov Date: Thu, 9 Dec 2021 16:15:32 +0100 Subject: [PATCH 1/3] Drop event batch when get HTTP status 413 from ES To prevent infinite loops when having `http.max_content_length` set too low or `bulk_max_size` too high we now handle this status code separately and drop the whole event batch producing a detailed error message on the console. --- libbeat/outputs/elasticsearch/client.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index deab29c3dcd..0d9d619b9f5 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -38,6 +38,8 @@ import ( "github.com/elastic/beats/v7/libbeat/testing" ) +var errPayloadTooLarge = errors.New("the bulk payload is too large for the server. Consider to adjust `http.max_content_length` parameter in Elasticsearch or `bulk_max_size` in the beat. The batch has been dropped") + // Client is an elasticsearch client. type Client struct { conn eslegclient.Connection @@ -180,9 +182,13 @@ func (client *Client) Clone() *Client { func (client *Client) Publish(ctx context.Context, batch publisher.Batch) error { events := batch.Events() rest, err := client.publishEvents(ctx, events) - if len(rest) == 0 { + + switch { + case err == errPayloadTooLarge: + batch.Drop() + case len(rest) == 0: batch.ACK() - } else { + default: batch.RetryEvents(rest) } return err @@ -220,7 +226,11 @@ func (client *Client) publishEvents(ctx context.Context, data []publisher.Event) } status, result, sendErr := client.conn.Bulk(ctx, "", "", nil, bulkItems) + if sendErr != nil { + if status == http.StatusRequestEntityTooLarge { + sendErr = errPayloadTooLarge + } err := apm.CaptureError(ctx, fmt.Errorf("failed to perform any bulk index operations: %w", sendErr)) err.Send() client.log.Error(err) From cef34d2d3fa4d53d2bb1e33878e4dee320407ca6 Mon Sep 17 00:00:00 2001 From: Denis Rechkunov Date: Thu, 9 Dec 2021 17:16:24 +0100 Subject: [PATCH 2/3] Add changelog entry --- CHANGELOG-developer.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index e640b5a0334..dcb2a5665f4 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -68,6 +68,7 @@ The list below covers the major changes between 7.0.0-rc2 and master only. - Remove `event.dataset` (ECS) annotion from `libbeat.logp`. {issue}27404[27404] - Errors should be thrown as errors. Metricsets inside Metricbeat will now throw errors as the `error` log level. {pull}27804[27804] - Avoid panicking in `add_fields` processor when input event.Fields is a nil map. {pull}28219[28219] +- Drop event batch when get HTTP status 413 from Elasticsearch to avoid infinite loop {issue}14350[14350] {pull}29368[29368] ==== Added From e5344c50a4d3a0ed9603fa85b9fcd61aa5e5b7f9 Mon Sep 17 00:00:00 2001 From: Denis Rechkunov Date: Tue, 14 Dec 2021 11:49:21 +0100 Subject: [PATCH 3/3] Add test for the HTTP status codes --- libbeat/outputs/elasticsearch/client_test.go | 97 ++++++++++++++++++++ 1 file changed, 97 insertions(+) diff --git a/libbeat/outputs/elasticsearch/client_test.go b/libbeat/outputs/elasticsearch/client_test.go index 2a03d10481d..0a2ca672a7c 100644 --- a/libbeat/outputs/elasticsearch/client_test.go +++ b/libbeat/outputs/elasticsearch/client_test.go @@ -44,6 +44,103 @@ import ( "github.com/elastic/beats/v7/libbeat/version" ) +type testIndexSelector struct{} + +func (testIndexSelector) Select(event *beat.Event) (string, error) { + return "test", nil +} + +type batchMock struct { + // we embed the interface so we are able to implement the interface partially, + // only functions needed for tests are implemented + // if you use a function that is not implemented in the mock it will panic + publisher.Batch + events []publisher.Event + ack bool + drop bool + retryEvents []publisher.Event +} + +func (bm batchMock) Events() []publisher.Event { + return bm.events +} +func (bm *batchMock) ACK() { + bm.ack = true +} +func (bm *batchMock) Drop() { + bm.drop = true +} +func (bm *batchMock) RetryEvents(events []publisher.Event) { + bm.retryEvents = events +} + +func TestPublishStatusCode(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + event := publisher.Event{Content: beat.Event{Fields: common.MapStr{"field": 1}}} + events := []publisher.Event{event} + + t.Run("returns pre-defined error and drops batch when 413", func(t *testing.T) { + esMock := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusRequestEntityTooLarge) + w.Write([]byte("Request failed to get to the server (status code: 413)")) // actual response from ES + })) + defer esMock.Close() + + client, err := NewClient( + ClientSettings{ + ConnectionSettings: eslegclient.ConnectionSettings{ + URL: esMock.URL, + }, + Index: testIndexSelector{}, + }, + nil, + ) + assert.NoError(t, err) + + event := publisher.Event{Content: beat.Event{Fields: common.MapStr{"field": 1}}} + events := []publisher.Event{event} + batch := &batchMock{ + events: events, + } + + err = client.Publish(ctx, batch) + + assert.Error(t, err) + assert.Equal(t, errPayloadTooLarge, err, "should be a pre-defined error") + assert.True(t, batch.drop, "should must be dropped") + }) + + t.Run("retries the batch if bad HTTP status", func(t *testing.T) { + esMock := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer esMock.Close() + + client, err := NewClient( + ClientSettings{ + ConnectionSettings: eslegclient.ConnectionSettings{ + URL: esMock.URL, + }, + Index: testIndexSelector{}, + }, + nil, + ) + assert.NoError(t, err) + + batch := &batchMock{ + events: events, + } + + err = client.Publish(ctx, batch) + + assert.Error(t, err) + assert.False(t, batch.ack, "should not be acknowledged") + assert.Len(t, batch.retryEvents, len(events), "all events should be in retry") + }) +} + func TestCollectPublishFailsNone(t *testing.T) { client, err := NewClient( ClientSettings{