diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 56bf83702fb..37fc00bd926 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -141,6 +141,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Raise up logging level to warning when attempting to configure beats with unknown fields from autodiscovered events/environments - elasticsearch output now supports `idle_connection_timeout`. {issue}35616[35615] {pull}36843[36843] - Update to Go 1.21.9. {pulk}38727[38727] +- Enable early event encoding in the Elasticsearch output, improving cpu and memory use {pull}38572[38572] *Auditbeat* diff --git a/libbeat/esleg/eslegclient/enc.go b/libbeat/esleg/eslegclient/enc.go index 644a2b7d8cc..27e409f9172 100644 --- a/libbeat/esleg/eslegclient/enc.go +++ b/libbeat/esleg/eslegclient/enc.go @@ -109,6 +109,13 @@ func (b *jsonEncoder) Marshal(obj interface{}) error { return b.AddRaw(obj) } +// RawEncoding is used to wrap objects that have already been json-encoded, +// so the encoder knows to append them directly instead of treating them +// like a string. +type RawEncoding struct { + Encoding []byte +} + func (b *jsonEncoder) AddRaw(obj interface{}) error { var err error switch v := obj.(type) { @@ -116,6 +123,8 @@ func (b *jsonEncoder) AddRaw(obj interface{}) error { err = b.folder.Fold(event{Timestamp: v.Timestamp, Fields: v.Fields}) case *beat.Event: err = b.folder.Fold(event{Timestamp: v.Timestamp, Fields: v.Fields}) + case RawEncoding: + _, err = b.buf.Write(v.Encoding) default: err = b.folder.Fold(obj) } @@ -199,6 +208,8 @@ func (g *gzipEncoder) AddRaw(obj interface{}) error { err = g.folder.Fold(event{Timestamp: v.Timestamp, Fields: v.Fields}) case *beat.Event: err = g.folder.Fold(event{Timestamp: v.Timestamp, Fields: v.Fields}) + case RawEncoding: + _, err = g.gzip.Write(v.Encoding) default: err = g.folder.Fold(obj) } diff --git a/libbeat/outputs/console/console.go b/libbeat/outputs/console/console.go index b81bf336348..f723bf818c9 100644 --- a/libbeat/outputs/console/console.go +++ b/libbeat/outputs/console/console.go @@ -85,7 +85,7 @@ func makeConsole( } } - return outputs.Success(config.Queue, config.BatchSize, 0, c) + return outputs.Success(config.Queue, config.BatchSize, 0, nil, c) } func newConsole(index string, observer outputs.Observer, codec codec.Codec) (*console, error) { diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 936bbea8ca4..504aac710af 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -299,60 +299,57 @@ func (client *Client) bulkEncodePublishRequest(version version.V, data []publish okEvents := data[:0] bulkItems := []interface{}{} for i := range data { - event := &data[i].Content + if data[i].EncodedEvent == nil { + client.log.Error("Elasticsearch output received unencoded publisher.Event") + continue + } + event := data[i].EncodedEvent.(*encodedEvent) + if event.err != nil { + // This means there was an error when encoding the event and it isn't + // ingestable, so report the error and continue. + client.log.Error(event.err) + continue + } meta, err := client.createEventBulkMeta(version, event) if err != nil { client.log.Errorf("Failed to encode event meta data: %+v", err) continue } - if opType := events.GetOpType(*event); opType == events.OpTypeDelete { + if event.opType == events.OpTypeDelete { // We don't include the event source in a bulk DELETE bulkItems = append(bulkItems, meta) } else { - bulkItems = append(bulkItems, meta, event) + // Wrap the encoded event in a RawEncoding so the Elasticsearch client + // knows not to re-encode it + bulkItems = append(bulkItems, meta, eslegclient.RawEncoding{Encoding: event.encoding}) } okEvents = append(okEvents, data[i]) } return okEvents, bulkItems } -func (client *Client) createEventBulkMeta(version version.V, event *beat.Event) (interface{}, error) { +func (client *Client) createEventBulkMeta(version version.V, event *encodedEvent) (interface{}, error) { eventType := "" if version.Major < 7 { eventType = defaultEventType } - pipeline, err := client.getPipeline(event) - if err != nil { - err := fmt.Errorf("failed to select pipeline: %w", err) - return nil, err - } - - index, err := client.getIndex(event) - if err != nil { - err := fmt.Errorf("failed to select event index: %w", err) - return nil, err - } - - id, _ := events.GetMetaStringValue(*event, events.FieldMetaID) - opType := events.GetOpType(*event) - meta := eslegclient.BulkMeta{ - Index: index, + Index: event.index, DocType: eventType, - Pipeline: pipeline, - ID: id, + Pipeline: event.pipeline, + ID: event.id, } - if opType == events.OpTypeDelete { - if id != "" { + if event.opType == events.OpTypeDelete { + if event.id != "" { return eslegclient.BulkDeleteAction{Delete: meta}, nil } else { return nil, fmt.Errorf("%s %s requires _id", events.FieldMetaOpType, events.OpTypeDelete) } } - if id != "" || version.Major > 7 || (version.Major == 7 && version.Minor >= 5) { - if opType == events.OpTypeIndex { + if event.id != "" || version.Major > 7 || (version.Major == 7 && version.Minor >= 5) { + if event.opType == events.OpTypeIndex { return eslegclient.BulkIndexAction{Index: meta}, nil } return eslegclient.BulkCreateAction{Create: meta}, nil @@ -360,17 +357,7 @@ func (client *Client) createEventBulkMeta(version version.V, event *beat.Event) return eslegclient.BulkIndexAction{Index: meta}, nil } -func (client *Client) getIndex(event *beat.Event) (string, error) { - // If this event has been dead-lettered, override its index - if event.Meta != nil { - if deadLetter, _ := event.Meta.HasKey(dead_letter_marker_field); deadLetter { - return client.deadLetterIndex, nil - } - } - return client.indexSelector.Select(event) -} - -func (client *Client) getPipeline(event *beat.Event) (string, error) { +func getPipeline(event *beat.Event, defaultSelector *outil.Selector) (string, error) { if event.Meta != nil { pipeline, err := events.GetMetaStringValue(*event, events.FieldMetaPipeline) if errors.Is(err, mapstr.ErrKeyNotFound) { @@ -383,8 +370,8 @@ func (client *Client) getPipeline(event *beat.Event) (string, error) { return strings.ToLower(pipeline), nil } - if client.pipelineSelector != nil { - return client.pipelineSelector.Select(event) + if defaultSelector != nil { + return defaultSelector.Select(event) } return "", nil } @@ -427,8 +414,8 @@ func (client *Client) bulkCollectPublishFails(result eslegclient.BulkResult, dat stats.tooMany++ } else { // hard failure, apply policy action - result, _ := data[i].Content.Meta.HasKey(dead_letter_marker_field) - if result { + encodedEvent := data[i].EncodedEvent.(*encodedEvent) + if encodedEvent.deadLetter { stats.nonIndexable++ client.log.Errorf("Can't deliver to dead letter index event (status=%v). Enable debug logs to view the event and cause.", status) client.log.Debugf("Can't deliver to dead letter index event %#v (status=%v): %s", data[i], status, msg) @@ -436,18 +423,7 @@ func (client *Client) bulkCollectPublishFails(result eslegclient.BulkResult, dat } else if client.deadLetterIndex != "" { client.log.Warnf("Cannot index event (status=%v), trying dead letter index. Enable debug logs to view the event and cause.", status) client.log.Debugf("Cannot index event %#v (status=%v): %s, trying dead letter index", data[i], status, msg) - if data[i].Content.Meta == nil { - data[i].Content.Meta = mapstr.M{ - dead_letter_marker_field: true, - } - } else { - data[i].Content.Meta[dead_letter_marker_field] = true - } - data[i].Content.Fields = mapstr.M{ - "message": data[i].Content.Fields.String(), - "error.type": status, - "error.message": string(msg), - } + client.setDeadLetter(encodedEvent, status, string(msg)) } else { // drop stats.nonIndexable++ client.log.Warnf("Cannot index event (status=%v): dropping event! Enable debug logs to view the event and cause.", status) @@ -465,6 +441,20 @@ func (client *Client) bulkCollectPublishFails(result eslegclient.BulkResult, dat return failed, stats } +func (client *Client) setDeadLetter( + encodedEvent *encodedEvent, errType int, errMsg string, +) { + encodedEvent.deadLetter = true + encodedEvent.index = client.deadLetterIndex + deadLetterReencoding := mapstr.M{ + "@timestamp": encodedEvent.timestamp, + "message": string(encodedEvent.encoding), + "error.type": errType, + "error.message": errMsg, + } + encodedEvent.encoding = []byte(deadLetterReencoding.String()) +} + func (client *Client) Connect() error { return client.conn.Connect() } diff --git a/libbeat/outputs/elasticsearch/client_integration_test.go b/libbeat/outputs/elasticsearch/client_integration_test.go index 7a8a06becca..56567931ee4 100644 --- a/libbeat/outputs/elasticsearch/client_integration_test.go +++ b/libbeat/outputs/elasticsearch/client_integration_test.go @@ -22,11 +22,7 @@ package elasticsearch import ( "context" "fmt" - "io/ioutil" "math/rand" - "net/http" - "net/http/httptest" - "net/url" "testing" "time" @@ -85,15 +81,15 @@ func testPublishEvent(t *testing.T, index string, cfg map[string]interface{}) { output, client := connectTestEsWithStats(t, cfg, index) // drop old index preparing test - client.conn.Delete(index, "", "", nil) + _, _, _ = client.conn.Delete(index, "", "", nil) - batch := outest.NewBatch(beat.Event{ + batch := encodeBatch(client, outest.NewBatch(beat.Event{ Timestamp: time.Now(), Fields: mapstr.M{ "type": "libbeat", "message": "Test message from libbeat", }, - }) + })) err := output.Publish(context.Background(), batch) if err != nil { @@ -131,7 +127,7 @@ func TestClientPublishEventWithPipeline(t *testing.T) { "index": index, "pipeline": "%{[pipeline]}", }) - client.conn.Delete(index, "", "", nil) + _, _, _ = client.conn.Delete(index, "", "", nil) // Check version if client.conn.GetVersion().Major < 5 { @@ -139,7 +135,8 @@ func TestClientPublishEventWithPipeline(t *testing.T) { } publish := func(event beat.Event) { - err := output.Publish(context.Background(), outest.NewBatch(event)) + batch := encodeBatch(client, outest.NewBatch(event)) + err := output.Publish(context.Background(), batch) if err != nil { t.Fatal(err) } @@ -167,7 +164,7 @@ func TestClientPublishEventWithPipeline(t *testing.T) { }, } - client.conn.DeletePipeline(pipeline, nil) + _, _, _ = client.conn.DeletePipeline(pipeline, nil) _, resp, err := client.conn.CreatePipeline(pipeline, nil, pipelineBody) if err != nil { t.Fatal(err) @@ -217,10 +214,10 @@ func TestClientBulkPublishEventsWithDeadletterIndex(t *testing.T) { }, }, }) - client.conn.Delete(index, "", "", nil) - client.conn.Delete(deadletterIndex, "", "", nil) + _, _, _ = client.conn.Delete(index, "", "", nil) + _, _, _ = client.conn.Delete(deadletterIndex, "", "", nil) - err := output.Publish(context.Background(), outest.NewBatch(beat.Event{ + batch := encodeBatch(client, outest.NewBatch(beat.Event{ Timestamp: time.Now(), Fields: mapstr.M{ "type": "libbeat", @@ -228,18 +225,19 @@ func TestClientBulkPublishEventsWithDeadletterIndex(t *testing.T) { "testfield": 0, }, })) + err := output.Publish(context.Background(), batch) if err != nil { t.Fatal(err) } - batch := outest.NewBatch(beat.Event{ + batch = encodeBatch(client, outest.NewBatch(beat.Event{ Timestamp: time.Now(), Fields: mapstr.M{ "type": "libbeat", "message": "Test message 2", "testfield": "foo0", }, - }) + })) err = output.Publish(context.Background(), batch) if err == nil { t.Fatal("Expecting mapping conflict") @@ -277,14 +275,15 @@ func TestClientBulkPublishEventsWithPipeline(t *testing.T) { "index": index, "pipeline": "%{[pipeline]}", }) - client.conn.Delete(index, "", "", nil) + _, _, _ = client.conn.Delete(index, "", "", nil) if client.conn.GetVersion().Major < 5 { t.Skip("Skipping tests as pipeline not available in <5.x releases") } publish := func(events ...beat.Event) { - err := output.Publish(context.Background(), outest.NewBatch(events...)) + batch := encodeBatch(client, outest.NewBatch(events...)) + err := output.Publish(context.Background(), batch) if err != nil { t.Fatal(err) } @@ -312,7 +311,7 @@ func TestClientBulkPublishEventsWithPipeline(t *testing.T) { }, } - client.conn.DeletePipeline(pipeline, nil) + _, _, _ = client.conn.DeletePipeline(pipeline, nil) _, resp, err := client.conn.CreatePipeline(pipeline, nil, pipelineBody) if err != nil { t.Fatal(err) @@ -354,14 +353,14 @@ func TestClientPublishTracer(t *testing.T) { "index": index, }) - client.conn.Delete(index, "", "", nil) + _, _, _ = client.conn.Delete(index, "", "", nil) - batch := outest.NewBatch(beat.Event{ + batch := encodeBatch(client, outest.NewBatch(beat.Event{ Timestamp: time.Now(), Fields: mapstr.M{ "message": "Hello world", }, - }) + })) tx, spans, _ := apmtest.WithTransaction(func(ctx context.Context) { err := output.Publish(ctx, batch) @@ -434,7 +433,7 @@ func connectTestEs(t *testing.T, cfg interface{}, stats outputs.Observer) (outpu client := randomClient(output).(clientWrap).Client().(*Client) // Load version number - client.Connect() + _ = client.Connect() return client, client } @@ -475,32 +474,3 @@ func randomClient(grp outputs.Group) outputs.NetworkClient { client := grp.Clients[rand.Intn(L)] return client.(outputs.NetworkClient) } - -// startTestProxy starts a proxy that redirects all connections to the specified URL -func startTestProxy(t *testing.T, redirectURL string) *httptest.Server { - t.Helper() - - realURL, err := url.Parse(redirectURL) - require.NoError(t, err) - - proxy := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - req := r.Clone(context.Background()) - req.RequestURI = "" - req.URL.Scheme = realURL.Scheme - req.URL.Host = realURL.Host - - resp, err := http.DefaultClient.Do(req) - require.NoError(t, err) - defer resp.Body.Close() - - body, err := ioutil.ReadAll(resp.Body) - require.NoError(t, err) - - for _, header := range []string{"Content-Encoding", "Content-Type"} { - w.Header().Set(header, resp.Header.Get(header)) - } - w.WriteHeader(resp.StatusCode) - w.Write(body) - })) - return proxy -} diff --git a/libbeat/outputs/elasticsearch/client_test.go b/libbeat/outputs/elasticsearch/client_test.go index 12493e28d02..28033ff3cb2 100644 --- a/libbeat/outputs/elasticsearch/client_test.go +++ b/libbeat/outputs/elasticsearch/client_test.go @@ -20,7 +20,9 @@ package elasticsearch import ( + "bytes" "context" + "encoding/json" "fmt" "io" "net/http" @@ -50,12 +52,6 @@ import ( libversion "github.com/elastic/elastic-agent-libs/version" ) -type testIndexSelector struct{} - -func (testIndexSelector) Select(event *beat.Event) (string, error) { - return "test", nil -} - type batchMock struct { events []publisher.Event ack bool @@ -117,20 +113,20 @@ func TestPublish(t *testing.T) { client := makePublishTestClient(t, esMock.URL) // Try publishing a batch that can be split - batch := &batchMock{ + batch := encodeBatch(client, &batchMock{ events: []publisher.Event{event1}, canSplit: true, - } + }) err := client.Publish(ctx, batch) assert.NoError(t, err, "Publish should split the batch without error") assert.True(t, batch.didSplit, "batch should be split") // Try publishing a batch that cannot be split - batch = &batchMock{ + batch = encodeBatch(client, &batchMock{ events: []publisher.Event{event1}, canSplit: false, - } + }) err = client.Publish(ctx, batch) assert.NoError(t, err, "Publish should drop the batch without error") @@ -145,9 +141,9 @@ func TestPublish(t *testing.T) { defer esMock.Close() client := makePublishTestClient(t, esMock.URL) - batch := &batchMock{ + batch := encodeBatch(client, &batchMock{ events: []publisher.Event{event1, event2}, - } + }) err := client.Publish(ctx, batch) @@ -171,7 +167,7 @@ func TestPublish(t *testing.T) { // test results directly without atomics/mutexes. done := false retryCount := 0 - batch := pipeline.NewBatchForTesting( + batch := encodeBatch(client, pipeline.NewBatchForTesting( []publisher.Event{event1, event2, event3}, func(b publisher.Batch) { // The retry function sends the batch back through Publish. @@ -179,11 +175,13 @@ func TestPublish(t *testing.T) { // first and then back to Publish when an output worker was // available. retryCount++ + // We shouldn't need to re-encode the events since that was done + // before the initial Publish call err := client.Publish(ctx, b) assert.NoError(t, err, "Publish should return without error") }, func() { done = true }, - ) + )) err := client.Publish(ctx, batch) assert.NoError(t, err, "Publish should return without error") @@ -220,7 +218,7 @@ func TestPublish(t *testing.T) { // test results directly without atomics/mutexes. done := false retryCount := 0 - batch := pipeline.NewBatchForTesting( + batch := encodeBatch(client, pipeline.NewBatchForTesting( []publisher.Event{event1, event2, event3}, func(b publisher.Batch) { // The retry function sends the batch back through Publish. @@ -232,7 +230,7 @@ func TestPublish(t *testing.T) { assert.NoError(t, err, "Publish should return without error") }, func() { done = true }, - ) + )) err := client.Publish(ctx, batch) assert.NoError(t, err, "Publish should return without error") @@ -308,24 +306,25 @@ func TestCollectPublishFailDeadLetterQueue(t *testing.T) { ) assert.NoError(t, err) + parseError := `{ + "root_cause" : [ + { + "type" : "mapper_parsing_exception", + "reason" : "failed to parse field [bar] of type [long] in document with id '1'. Preview of field's value: 'bar1'" + } + ], + "type" : "mapper_parsing_exception", + "reason" : "failed to parse field [bar] of type [long] in document with id '1'. Preview of field's value: 'bar1'", + "caused_by" : { + "type" : "illegal_argument_exception", + "reason" : "For input string: \"bar1\"" + } + }` response := []byte(` { "items": [ {"create": {"status": 200}}, {"create": { - "error" : { - "root_cause" : [ - { - "type" : "mapper_parsing_exception", - "reason" : "failed to parse field [bar] of type [long] in document with id '1'. Preview of field's value: 'bar1'" - } - ], - "type" : "mapper_parsing_exception", - "reason" : "failed to parse field [bar] of type [long] in document with id '1'. Preview of field's value: 'bar1'", - "caused_by" : { - "type" : "illegal_argument_exception", - "reason" : "For input string: \"bar1\"" - } - }, + "error" : ` + parseError + `, "status" : 400 } }, @@ -334,24 +333,18 @@ func TestCollectPublishFailDeadLetterQueue(t *testing.T) { `) event := publisher.Event{Content: beat.Event{Fields: mapstr.M{"bar": 1}}} + event2 := publisher.Event{Content: beat.Event{Fields: mapstr.M{"bar": 2}}} eventFail := publisher.Event{Content: beat.Event{Fields: mapstr.M{"bar": "bar1"}}} - events := []publisher.Event{event, eventFail, event} + events := encodeEvents(client, []publisher.Event{event, eventFail, event2}) res, stats := client.bulkCollectPublishFails(response, events) assert.Equal(t, 1, len(res)) if len(res) == 1 { - expected := publisher.Event{ - Content: beat.Event{ - Fields: mapstr.M{ - "message": "{\"bar\":\"bar1\"}", - "error.type": 400, - "error.message": "{\n\t\t\t\"root_cause\" : [\n\t\t\t {\n\t\t\t\t\"type\" : \"mapper_parsing_exception\",\n\t\t\t\t\"reason\" : \"failed to parse field [bar] of type [long] in document with id '1'. Preview of field's value: 'bar1'\"\n\t\t\t }\n\t\t\t],\n\t\t\t\"type\" : \"mapper_parsing_exception\",\n\t\t\t\"reason\" : \"failed to parse field [bar] of type [long] in document with id '1'. Preview of field's value: 'bar1'\",\n\t\t\t\"caused_by\" : {\n\t\t\t \"type\" : \"illegal_argument_exception\",\n\t\t\t \"reason\" : \"For input string: \\\"bar1\\\"\"\n\t\t\t}\n\t\t }", - }, - Meta: mapstr.M{ - dead_letter_marker_field: true, - }, - }, - } + expected := encodeEvent(client, eventFail) + encodedEvent := expected.EncodedEvent.(*encodedEvent) + // Mark the encoded event with the expected error + client.setDeadLetter(encodedEvent, 400, parseError) + assert.Equal(t, expected, res[0]) } assert.Equal(t, bulkResultStats{acked: 2, fails: 1, nonIndexable: 0}, stats) @@ -394,7 +387,7 @@ func TestCollectPublishFailDrop(t *testing.T) { event := publisher.Event{Content: beat.Event{Fields: mapstr.M{"bar": 1}}} eventFail := publisher.Event{Content: beat.Event{Fields: mapstr.M{"bar": "bar1"}}} - events := []publisher.Event{event, eventFail, event} + events := encodeEvents(client, []publisher.Event{event, eventFail, event}) res, stats := client.bulkCollectPublishFails(response, events) assert.Equal(t, 0, len(res)) @@ -419,7 +412,7 @@ func TestCollectPublishFailAll(t *testing.T) { `) event := publisher.Event{Content: beat.Event{Fields: mapstr.M{"field": 2}}} - events := []publisher.Event{event, event, event} + events := encodeEvents(client, []publisher.Event{event, event, event}) res, stats := client.bulkCollectPublishFails(response, events) assert.Equal(t, 3, len(res)) @@ -468,7 +461,7 @@ func TestCollectPipelinePublishFail(t *testing.T) { }`) event := publisher.Event{Content: beat.Event{Fields: mapstr.M{"field": 2}}} - events := []publisher.Event{event} + events := encodeEvents(client, []publisher.Event{event}) res, _ := client.bulkCollectPublishFails(response, events) assert.Equal(t, 1, len(res)) @@ -494,7 +487,7 @@ func BenchmarkCollectPublishFailsNone(b *testing.B) { `) event := publisher.Event{Content: beat.Event{Fields: mapstr.M{"field": 1}}} - events := []publisher.Event{event, event, event} + events := encodeEvents(client, []publisher.Event{event, event, event}) for i := 0; i < b.N; i++ { res, _ := client.bulkCollectPublishFails(response, events) @@ -523,7 +516,7 @@ func BenchmarkCollectPublishFailMiddle(b *testing.B) { event := publisher.Event{Content: beat.Event{Fields: mapstr.M{"field": 1}}} eventFail := publisher.Event{Content: beat.Event{Fields: mapstr.M{"field": 2}}} - events := []publisher.Event{event, eventFail, event} + events := encodeEvents(client, []publisher.Event{event, eventFail, event}) for i := 0; i < b.N; i++ { res, _ := client.bulkCollectPublishFails(response, events) @@ -551,7 +544,7 @@ func BenchmarkCollectPublishFailAll(b *testing.B) { `) event := publisher.Event{Content: beat.Event{Fields: mapstr.M{"field": 2}}} - events := []publisher.Event{event, event, event} + events := encodeEvents(client, []publisher.Event{event, event, event}) for i := 0; i < b.N; i++ { res, _ := client.bulkCollectPublishFails(response, events) @@ -608,7 +601,7 @@ func TestClientWithHeaders(t *testing.T) { "message": "Test message from libbeat", }} - batch := outest.NewBatch(event, event, event) + batch := encodeBatch(client, outest.NewBatch(event, event, event)) err = client.Publish(context.Background(), batch) assert.NoError(t, err) assert.Equal(t, 2, requestCount) @@ -650,16 +643,6 @@ func TestBulkEncodeEvents(t *testing.T) { index, pipeline, err := buildSelectors(im, info, cfg) require.NoError(t, err) - events := make([]publisher.Event, len(test.events)) - for i, fields := range test.events { - events[i] = publisher.Event{ - Content: beat.Event{ - Timestamp: time.Now(), - Fields: fields, - }, - } - } - client, err := NewClient( clientSettings{ observer: outputs.NewNilObserver(), @@ -670,6 +653,17 @@ func TestBulkEncodeEvents(t *testing.T) { ) assert.NoError(t, err) + events := make([]publisher.Event, len(test.events)) + for i, fields := range test.events { + events[i] = publisher.Event{ + Content: beat.Event{ + Timestamp: time.Now(), + Fields: fields, + }, + } + } + encodeEvents(client, events) + encoded, bulkItems := client.bulkEncodePublishRequest(*libversion.MustNew(test.version), events) assert.Equal(t, len(events), len(encoded), "all events should have been encoded") assert.Equal(t, 2*len(events), len(bulkItems), "incomplete bulk") @@ -717,6 +711,15 @@ func TestBulkEncodeEventsWithOpType(t *testing.T) { index, pipeline, err := buildSelectors(im, info, cfg) require.NoError(t, err) + client, _ := NewClient( + clientSettings{ + observer: outputs.NewNilObserver(), + indexSelector: index, + pipelineSelector: pipeline, + }, + nil, + ) + events := make([]publisher.Event, len(cases)) for i, fields := range cases { meta := mapstr.M{ @@ -735,15 +738,7 @@ func TestBulkEncodeEventsWithOpType(t *testing.T) { }, } } - - client, _ := NewClient( - clientSettings{ - observer: outputs.NewNilObserver(), - indexSelector: index, - pipelineSelector: pipeline, - }, - nil, - ) + encodeEvents(client, events) encoded, bulkItems := client.bulkEncodePublishRequest(*libversion.MustNew(version.GetDefaultVersion()), events) require.Equal(t, len(events)-1, len(encoded), "all events should have been encoded") @@ -841,7 +836,7 @@ func TestPublishEventsWithBulkFiltering(t *testing.T) { client := makePublishTestClient(t, esMock.URL, nil) // Try publishing a batch that can be split - events := []publisher.Event{event1} + events := encodeEvents(client, []publisher.Event{event1}) evt, err := client.publishEvents(ctx, events) require.NoError(t, err) require.Equal(t, len(recParams), len(expectedFilteringParams)) @@ -872,7 +867,7 @@ func TestPublishEventsWithBulkFiltering(t *testing.T) { client := makePublishTestClient(t, esMock.URL, configParams) // Try publishing a batch that can be split - events := []publisher.Event{event1} + events := encodeEvents(client, []publisher.Event{event1}) evt, err := client.publishEvents(ctx, events) require.NoError(t, err) require.Equal(t, len(recParams), len(expectedFilteringParams)+len(configParams)) @@ -914,29 +909,36 @@ func TestPublishEventsWithBulkFiltering(t *testing.T) { client := makePublishTestClient(t, esMock.URL, nil) // Try publishing a batch that can be split - events := []publisher.Event{event1} + events := encodeEvents(client, []publisher.Event{event1}) _, err := client.publishEvents(ctx, events) require.NoError(t, err) require.Equal(t, len(recParams), 1) }) } -func TestGetIndex(t *testing.T) { +func TestSetDeadLetter(t *testing.T) { dead_letter_index := "dead_index" client := &Client{ deadLetterIndex: dead_letter_index, indexSelector: testIndexSelector{}, } - event := &beat.Event{ - Meta: make(map[string]interface{}), + e := &encodedEvent{ + index: "original_index", + } + errType := 123 + errStr := "test error string" + client.setDeadLetter(e, errType, errStr) + + assert.True(t, e.deadLetter, "setDeadLetter should set the event's deadLetter flag") + assert.Equal(t, dead_letter_index, e.index, "setDeadLetter should overwrite the event's original index") + + var errFields struct { + ErrType int `json:"error.type"` + ErrMessage string `json:"error.message"` } - index, err := client.getIndex(event) - require.NoError(t, err, "getIndex call must succeed") - assert.Equal(t, "test", index, "Event with no dead letter marker should use the client's index selector") - - event.Meta[dead_letter_marker_field] = true - index, err = client.getIndex(event) - require.NoError(t, err, "getIndex call must succeed") - assert.Equal(t, dead_letter_index, index, "Event with dead letter marker should use the client's dead letter index") + err := json.NewDecoder(bytes.NewReader(e.encoding)).Decode(&errFields) + require.NoError(t, err, "json decoding of encoded event should succeed") + assert.Equal(t, errType, errFields.ErrType, "encoded error.type should match value in setDeadLetter") + assert.Equal(t, errStr, errFields.ErrMessage, "encoded error.message should match value in setDeadLetter") } diff --git a/libbeat/outputs/elasticsearch/dead_letter_index.go b/libbeat/outputs/elasticsearch/dead_letter_index.go index d2788205691..43541831478 100644 --- a/libbeat/outputs/elasticsearch/dead_letter_index.go +++ b/libbeat/outputs/elasticsearch/dead_letter_index.go @@ -25,9 +25,8 @@ import ( ) const ( - dead_letter_marker_field = "deadlettered" - drop = "drop" - dead_letter_index = "dead_letter_index" + drop = "drop" + dead_letter_index = "dead_letter_index" ) func deadLetterIndexForConfig(config *config.C) (string, error) { diff --git a/libbeat/outputs/elasticsearch/elasticsearch.go b/libbeat/outputs/elasticsearch/elasticsearch.go index 616e08e08f6..9bc8498afe4 100644 --- a/libbeat/outputs/elasticsearch/elasticsearch.go +++ b/libbeat/outputs/elasticsearch/elasticsearch.go @@ -35,7 +35,7 @@ const logSelector = "elasticsearch" func makeES( im outputs.IndexManager, - beat beat.Info, + beatInfo beat.Info, observer outputs.Observer, cfg *config.C, ) (outputs.Group, error) { @@ -46,7 +46,7 @@ func makeES( } } - index, pipeline, err := buildSelectors(im, beat, cfg) + indexSelector, pipelineSelector, err := buildSelectors(im, beatInfo, cfg) if err != nil { return outputs.Fail(err) } @@ -94,6 +94,9 @@ func makeES( params = nil } + encoderFactory := newEventEncoderFactory( + esConfig.EscapeHTML, indexSelector, pipelineSelector) + clients := make([]outputs.NetworkClient, len(hosts)) for i, host := range hosts { esURL, err := common.MakeURL(esConfig.Protocol, esConfig.Path, host, 9200) @@ -106,7 +109,7 @@ func makeES( client, err = NewClient(clientSettings{ connection: eslegclient.ConnectionSettings{ URL: esURL, - Beatname: beat.Beat, + Beatname: beatInfo.Beat, Kerberos: esConfig.Kerberos, Username: esConfig.Username, Password: esConfig.Password, @@ -119,8 +122,8 @@ func makeES( Transport: esConfig.Transport, IdleConnTimeout: esConfig.Transport.IdleConnTimeout, }, - indexSelector: index, - pipelineSelector: pipeline, + indexSelector: indexSelector, + pipelineSelector: pipelineSelector, observer: observer, deadLetterIndex: deadLetterIndex, }, &connectCallbackRegistry) @@ -132,7 +135,7 @@ func makeES( clients[i] = client } - return outputs.SuccessNet(esConfig.Queue, esConfig.LoadBalance, esConfig.BulkMaxSize, esConfig.MaxRetries, clients) + return outputs.SuccessNet(esConfig.Queue, esConfig.LoadBalance, esConfig.BulkMaxSize, esConfig.MaxRetries, encoderFactory, clients) } func buildSelectors( diff --git a/libbeat/outputs/elasticsearch/elasticsearch_test.go b/libbeat/outputs/elasticsearch/elasticsearch_test.go index 7950bdd6323..25902801cbb 100644 --- a/libbeat/outputs/elasticsearch/elasticsearch_test.go +++ b/libbeat/outputs/elasticsearch/elasticsearch_test.go @@ -20,7 +20,7 @@ package elasticsearch import ( "testing" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" @@ -120,21 +120,13 @@ func TestPipelineSelection(t *testing.T) { test := _test t.Run(name, func(t *testing.T) { selector, err := buildPipelineSelector(config.MustNewConfigFrom(test.cfg)) - assert.NoError(t, err) - - client, err := NewClient( - clientSettings{ - pipelineSelector: &selector, - }, - nil, - ) - assert.NoError(t, err) + require.NoError(t, err) if err != nil { t.Fatalf("Failed to parse configuration: %v", err) } - got, err := client.getPipeline(&test.event) + got, err := getPipeline(&test.event, &selector) if err != nil { t.Fatalf("Failed to create pipeline name: %v", err) } diff --git a/libbeat/outputs/elasticsearch/event_encoder.go b/libbeat/outputs/elasticsearch/event_encoder.go new file mode 100644 index 00000000000..0441695d53c --- /dev/null +++ b/libbeat/outputs/elasticsearch/event_encoder.go @@ -0,0 +1,138 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package elasticsearch + +import ( + "bytes" + "fmt" + "time" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/beat/events" + "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" + "github.com/elastic/beats/v7/libbeat/outputs" + "github.com/elastic/beats/v7/libbeat/outputs/outil" + "github.com/elastic/beats/v7/libbeat/publisher" + "github.com/elastic/beats/v7/libbeat/publisher/queue" +) + +type eventEncoder struct { + buf *bytes.Buffer + enc eslegclient.BodyEncoder + pipelineSelector *outil.Selector + indexSelector outputs.IndexSelector +} + +type encodedEvent struct { + // If err is set, the event couldn't be encoded, and other fields should + // not be relied on. + err error + + // If deadLetter is true, this event produced an ingestion error on a + // previous attempt, and is now being retried as a bare event with all + // contents included as a raw string in the "message" field. + deadLetter bool + + // timestamp is the timestamp from the source beat.Event. It's only used + // when reencoding for the dead letter index, so it isn't strictly needed + // but it avoids deserializing the encoded event to recover one field if + // there's an ingestion error. + timestamp time.Time + + id string + opType events.OpType + pipeline string + index string + encoding []byte +} + +func newEventEncoderFactory( + escapeHTML bool, + indexSelector outputs.IndexSelector, + pipelineSelector *outil.Selector, +) queue.EncoderFactory { + return func() queue.Encoder { + return newEventEncoder(escapeHTML, indexSelector, pipelineSelector) + } +} + +func newEventEncoder(escapeHTML bool, + indexSelector outputs.IndexSelector, + pipelineSelector *outil.Selector, +) queue.Encoder { + buf := bytes.NewBuffer(nil) + enc := eslegclient.NewJSONEncoder(buf, escapeHTML) + return &eventEncoder{ + buf: buf, + enc: enc, + pipelineSelector: pipelineSelector, + indexSelector: indexSelector, + } +} + +func (pe *eventEncoder) EncodeEntry(entry queue.Entry) (queue.Entry, int) { + e, ok := entry.(publisher.Event) + if !ok { + // Currently all queue entries are publisher.Events but let's be cautious. + return entry, 0 + } + + encodedEvent := pe.encodeRawEvent(&e.Content) + e.EncodedEvent = encodedEvent + e.Content = beat.Event{} + return e, len(encodedEvent.encoding) +} + +// Note: we can't early-encode the bulk metadata that goes with an event, +// because it depends on the upstream Elasticsearch version and thus requires +// a live client connection. However, benchmarks show that even for a known +// version, encoding the bulk metadata and the event together gives slightly +// worse performance, so there's no reason to try optimizing around this +// dependency. +func (pe *eventEncoder) encodeRawEvent(e *beat.Event) *encodedEvent { + opType := events.GetOpType(*e) + pipeline, err := getPipeline(e, pe.pipelineSelector) + if err != nil { + return &encodedEvent{err: fmt.Errorf("failed to select event pipeline: %w", err)} + } + var index string + if pe.indexSelector != nil { + index, err = pe.indexSelector.Select(e) + if err != nil { + return &encodedEvent{err: fmt.Errorf("failed to select event index: %w", err)} + } + } + + id, _ := events.GetMetaStringValue(*e, events.FieldMetaID) + + err = pe.enc.Marshal(e) + if err != nil { + return &encodedEvent{err: fmt.Errorf("failed to encode event for output: %w", err)} + } + bufBytes := pe.buf.Bytes() + bytes := make([]byte, len(bufBytes)) + copy(bytes, bufBytes) + return &encodedEvent{ + id: id, + timestamp: e.Timestamp, + opType: opType, + pipeline: pipeline, + index: index, + encoding: bytes, + } +} diff --git a/libbeat/outputs/elasticsearch/event_encoder_test.go b/libbeat/outputs/elasticsearch/event_encoder_test.go new file mode 100644 index 00000000000..a3aef08ca23 --- /dev/null +++ b/libbeat/outputs/elasticsearch/event_encoder_test.go @@ -0,0 +1,142 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package elasticsearch + +import ( + "encoding/json" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/beat/events" + "github.com/elastic/beats/v7/libbeat/publisher" + "github.com/elastic/elastic-agent-libs/mapstr" +) + +type testIndexSelector struct{} + +func (testIndexSelector) Select(event *beat.Event) (string, error) { + return "test", nil +} + +func TestEncodeEntry(t *testing.T) { + indexSelector := testIndexSelector{} + + encoder := newEventEncoder(true, indexSelector, nil) + + timestamp := time.Date(1980, time.January, 1, 0, 0, 0, 0, time.UTC) + pubEvent := publisher.Event{ + Content: beat.Event{ + Timestamp: timestamp, + Fields: mapstr.M{ + "test_field": "test_value", + "number_field": 5, + "nested": mapstr.M{ + "nested_field": "nested_value", + }, + }, + Meta: mapstr.M{ + events.FieldMetaOpType: "create", + events.FieldMetaPipeline: "TEST_PIPELINE", + events.FieldMetaID: "test_id", + }, + }, + } + + encoded, encodedSize := encoder.EncodeEntry(pubEvent) + encPubEvent, ok := encoded.(publisher.Event) + + // Check the resulting publisher.Event + require.True(t, ok, "EncodeEntry must return a publisher.Event") + require.NotNil(t, encPubEvent.EncodedEvent, "EncodeEntry must set EncodedEvent") + assert.Nil(t, encPubEvent.Content.Fields, "EncodeEntry should clear event.Content") + + // Check the inner encodedEvent + encBeatEvent, ok := encPubEvent.EncodedEvent.(*encodedEvent) + require.True(t, ok, "EncodeEntry should set EncodedEvent to a *encodedEvent") + require.Equal(t, len(encBeatEvent.encoding), encodedSize, "Reported size should match encoded buffer") + + // Check event metadata + assert.Equal(t, "test_id", encBeatEvent.id, "Event id should match original metadata") + assert.Equal(t, "test", encBeatEvent.index, "Event should have the index set by its selector") + assert.Equal(t, "test_pipeline", encBeatEvent.pipeline, "Event pipeline should match original metadata") + assert.Equal(t, timestamp, encBeatEvent.timestamp, "encodedEvent.timestamp should match the original event") + assert.Equal(t, events.OpTypeCreate, encBeatEvent.opType, "encoded opType should match the original metadata") + assert.False(t, encBeatEvent.deadLetter, "encoded event shouldn't have deadLetter flag set") + + // Check encoded fields + var eventContent struct { + Timestamp time.Time `json:"@timestamp"` + TestField string `json:"test_field"` + NumberField int `json:"number_field"` + Nested struct { + NestedField string `json:"nested_field"` + } `json:"nested"` + } + err := json.Unmarshal(encBeatEvent.encoding, &eventContent) + require.NoError(t, err, "encoding should contain valid json") + assert.Equal(t, timestamp, eventContent.Timestamp, "Encoded timestamp should match original") + assert.Equal(t, "test_value", eventContent.TestField, "Encoded field should match original") + assert.Equal(t, 5, eventContent.NumberField, "Encoded field should match original") + assert.Equal(t, "nested_value", eventContent.Nested.NestedField, "Encoded field should match original") +} + +// encodeBatch encodes a publisher.Batch so it can be provided to +// Client.Publish and other helpers. +// This modifies the batch in place, but also returns its input batch +// to allow for easy chaining while creating test batches. +func encodeBatch[B publisher.Batch](client *Client, batch B) B { + encodeEvents(client, batch.Events()) + return batch +} + +// A test helper to encode an event array for an Elasticsearch client. +// This isn't particularly efficient since it creates a new encoder object +// for every set of events, but it's much easier and the difference is +// negligible for any non-benchmark tests. +// This modifies the slice in place, but also returns its input slice +// to allow for easy chaining while creating test events. +func encodeEvents(client *Client, events []publisher.Event) []publisher.Event { + encoder := newEventEncoder( + client.conn.EscapeHTML, + client.indexSelector, + client.pipelineSelector, + ) + for i := range events { + // Skip encoding if there's already encoded data present + if events[i].EncodedEvent == nil { + encoded, _ := encoder.EncodeEntry(events[i]) + event := encoded.(publisher.Event) + events[i] = event + } + } + return events +} + +func encodeEvent(client *Client, event publisher.Event) publisher.Event { + encoder := newEventEncoder( + client.conn.EscapeHTML, + client.indexSelector, + client.pipelineSelector, + ) + encoded, _ := encoder.EncodeEntry(event) + return encoded.(publisher.Event) +} diff --git a/libbeat/outputs/fileout/file.go b/libbeat/outputs/fileout/file.go index 34c57f29791..d14bd99d69a 100644 --- a/libbeat/outputs/fileout/file.go +++ b/libbeat/outputs/fileout/file.go @@ -66,7 +66,7 @@ func makeFileout( return outputs.Fail(err) } - return outputs.Success(foConfig.Queue, -1, 0, fo) + return outputs.Success(foConfig.Queue, -1, 0, nil, fo) } func (out *fileOutput) init(beat beat.Info, c fileOutConfig) error { diff --git a/libbeat/outputs/kafka/kafka.go b/libbeat/outputs/kafka/kafka.go index d004bd16ba3..cb23823a95a 100644 --- a/libbeat/outputs/kafka/kafka.go +++ b/libbeat/outputs/kafka/kafka.go @@ -84,7 +84,7 @@ func makeKafka( if kConfig.MaxRetries < 0 { retry = -1 } - return outputs.Success(kConfig.Queue, kConfig.BulkMaxSize, retry, client) + return outputs.Success(kConfig.Queue, kConfig.BulkMaxSize, retry, nil, client) } // buildTopicSelector builds the topic selector for standalone Beat and when diff --git a/libbeat/outputs/logstash/logstash.go b/libbeat/outputs/logstash/logstash.go index 072ec049f6f..c4c51ae5437 100644 --- a/libbeat/outputs/logstash/logstash.go +++ b/libbeat/outputs/logstash/logstash.go @@ -85,5 +85,5 @@ func makeLogstash( clients[i] = client } - return outputs.SuccessNet(lsConfig.Queue, lsConfig.LoadBalance, lsConfig.BulkMaxSize, lsConfig.MaxRetries, clients) + return outputs.SuccessNet(lsConfig.Queue, lsConfig.LoadBalance, lsConfig.BulkMaxSize, lsConfig.MaxRetries, nil, clients) } diff --git a/libbeat/outputs/logstash/logstash_integration_test.go b/libbeat/outputs/logstash/logstash_integration_test.go index 2cfbcd03974..442145835df 100644 --- a/libbeat/outputs/logstash/logstash_integration_test.go +++ b/libbeat/outputs/logstash/logstash_integration_test.go @@ -38,6 +38,8 @@ import ( _ "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch" "github.com/elastic/beats/v7/libbeat/outputs/outest" "github.com/elastic/beats/v7/libbeat/outputs/outil" + "github.com/elastic/beats/v7/libbeat/publisher" + "github.com/elastic/beats/v7/libbeat/publisher/queue" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/transport/httpcommon" @@ -61,6 +63,7 @@ type esConnection struct { type testOutputer struct { outputs.NetworkClient *esConnection + encoder queue.Encoder } type esSource interface { @@ -161,7 +164,7 @@ func newTestLogstashOutput(t *testing.T, test string, tls bool) *testOutputer { index := testLogstashIndex(test) connection := esConnect(t, index) - return &testOutputer{output, connection} + return &testOutputer{output, connection, nil} } func newTestElasticsearchOutput(t *testing.T, test string) *testOutputer { @@ -201,6 +204,9 @@ func newTestElasticsearchOutput(t *testing.T, test string) *testOutputer { es := &testOutputer{} es.NetworkClient = grp.Clients[0].(outputs.NetworkClient) es.esConnection = connection + // The Elasticsearch output requires events to be encoded + // before calling Publish, so create an event encoder. + es.encoder = grp.EncoderFactory() es.Connect() return es @@ -552,12 +558,13 @@ func checkEvent(t *testing.T, ls, es map[string]interface{}) { } func (t *testOutputer) PublishEvent(event beat.Event) { - t.Publish(context.Background(), outest.NewBatch(event)) + batch := encodeBatch(t.encoder, outest.NewBatch(event)) + t.Publish(context.Background(), batch) } func (t *testOutputer) BulkPublish(events []beat.Event) bool { ok := false - batch := outest.NewBatch(events...) + batch := encodeBatch(t.encoder, outest.NewBatch(events...)) var wg sync.WaitGroup wg.Add(1) @@ -570,3 +577,26 @@ func (t *testOutputer) BulkPublish(events []beat.Event) bool { wg.Wait() return ok } + +// encodeBatch encodes a publisher.Batch so it can be provided to +// Client.Publish and other helpers. +// This modifies the batch in place, but also returns its input batch +// to allow for easy chaining while creating test batches. +func encodeBatch[B publisher.Batch](encoder queue.Encoder, batch B) B { + if encoder != nil { + encodeEvents(encoder, batch.Events()) + } + return batch +} + +func encodeEvents(encoder queue.Encoder, events []publisher.Event) []publisher.Event { + for i := range events { + // Skip encoding if there's already encoded data present + if events[i].EncodedEvent == nil { + encoded, _ := encoder.EncodeEntry(events[i]) + event := encoded.(publisher.Event) + events[i] = event + } + } + return events +} diff --git a/libbeat/outputs/output_reg.go b/libbeat/outputs/output_reg.go index 3d2675c2ce2..fdd8e22a663 100644 --- a/libbeat/outputs/output_reg.go +++ b/libbeat/outputs/output_reg.go @@ -59,6 +59,22 @@ type Group struct { BatchSize int Retry int QueueFactory queue.QueueFactory + + // If the output supports early encoding (where events are converted to their + // output-serialized form before entering the queue) it should provide an + // encoder factory here. Events will be processed using the resulting encoders + // before being returned from the queue. This can provide significant cpu and + // memory savings for outputs that support it. + // - Each encoder will be accessed from only one goroutine at a time. + // - Encoders should add the event's output-serialized form, along with any + // metadata needed to handle a Publish call, to the EncodedEvent field of + // the underlying publisher.Event. + // - Encoders should clear the Content field of the underlying publisher.Event + // so memory can be reclaimed for the unencoded version. + // - If there is a fatal error in encoding, provide a non-nil EncodedEvent + // and clear Content anyway. Metadata about the error should be saved in + // EncodedEvent and reported when Publish is called. + EncoderFactory queue.EncoderFactory } // RegisterType registers a new output type. diff --git a/libbeat/outputs/redis/redis.go b/libbeat/outputs/redis/redis.go index 9814d6abee7..d0cba1e7061 100644 --- a/libbeat/outputs/redis/redis.go +++ b/libbeat/outputs/redis/redis.go @@ -165,7 +165,7 @@ func makeRedis( clients[i] = newBackoffClient(client, rConfig.Backoff.Init, rConfig.Backoff.Max) } - return outputs.SuccessNet(rConfig.Queue, rConfig.LoadBalance, rConfig.BulkMaxSize, rConfig.MaxRetries, clients) + return outputs.SuccessNet(rConfig.Queue, rConfig.LoadBalance, rConfig.BulkMaxSize, rConfig.MaxRetries, nil, clients) } func buildKeySelector(cfg *config.C) (outil.Selector, error) { diff --git a/libbeat/outputs/util.go b/libbeat/outputs/util.go index cab6b99aebe..8b3d96fcaa5 100644 --- a/libbeat/outputs/util.go +++ b/libbeat/outputs/util.go @@ -35,7 +35,7 @@ func Fail(err error) (Group, error) { return Group{}, err } // instances. The first argument is expected to contain a queue // config.Namespace. The queue config is passed to assign the queue // factory when elastic-agent reloads the output. -func Success(cfg config.Namespace, batchSize, retry int, clients ...Client) (Group, error) { +func Success(cfg config.Namespace, batchSize, retry int, encoderFactory queue.EncoderFactory, clients ...Client) (Group, error) { var q queue.QueueFactory if cfg.IsSet() && cfg.Config().Enabled() { switch cfg.Name() { @@ -59,10 +59,11 @@ func Success(cfg config.Namespace, batchSize, retry int, clients ...Client) (Gro } } return Group{ - Clients: clients, - BatchSize: batchSize, - Retry: retry, - QueueFactory: q, + Clients: clients, + BatchSize: batchSize, + Retry: retry, + QueueFactory: q, + EncoderFactory: encoderFactory, }, nil } @@ -79,12 +80,12 @@ func NetworkClients(netclients []NetworkClient) []Client { // The first argument is expected to contain a queue config.Namespace. // The queue config is passed to assign the queue factory when // elastic-agent reloads the output. -func SuccessNet(cfg config.Namespace, loadbalance bool, batchSize, retry int, netclients []NetworkClient) (Group, error) { +func SuccessNet(cfg config.Namespace, loadbalance bool, batchSize, retry int, encoderFactory queue.EncoderFactory, netclients []NetworkClient) (Group, error) { if !loadbalance { - return Success(cfg, batchSize, retry, NewFailoverClient(netclients)) + return Success(cfg, batchSize, retry, encoderFactory, NewFailoverClient(netclients)) } clients := NetworkClients(netclients) - return Success(cfg, batchSize, retry, clients...) + return Success(cfg, batchSize, retry, encoderFactory, clients...) } diff --git a/libbeat/publisher/event.go b/libbeat/publisher/event.go index 83dbb22f777..77ab6716f99 100644 --- a/libbeat/publisher/event.go +++ b/libbeat/publisher/event.go @@ -68,6 +68,12 @@ type Event struct { Content beat.Event Flags EventFlags Cache EventCache + + // If the output provides an early encoder for incoming events, + // it should store the encoded form in EncodedEvent and clear Content + // to free the unencoded data. The updated event will be provided to + // output workers when calling Publish. + EncodedEvent interface{} } // EventFlags provides additional flags/option types for used with the outputs. diff --git a/libbeat/publisher/pipeline/client_test.go b/libbeat/publisher/pipeline/client_test.go index 015d8f70c9d..4ed45d25628 100644 --- a/libbeat/publisher/pipeline/client_test.go +++ b/libbeat/publisher/pipeline/client_test.go @@ -131,7 +131,7 @@ func TestClient(t *testing.T) { Events: 5, MaxGetRequest: 1, FlushTimeout: time.Millisecond, - }, 5) + }, 5, nil) // model a processor that we're going to make produce errors after p := &testProcessor{} @@ -243,7 +243,7 @@ func TestClientWaitClose(t *testing.T) { } logp.TestingSetup() - q := memqueue.NewQueue(logp.L(), nil, memqueue.Settings{Events: 1}, 0) + q := memqueue.NewQueue(logp.L(), nil, memqueue.Settings{Events: 1}, 0, nil) pipeline := makePipeline(Settings{}, q) defer pipeline.Close() diff --git a/libbeat/publisher/pipeline/controller.go b/libbeat/publisher/pipeline/controller.go index 1c480c01bce..bb75c9619c5 100644 --- a/libbeat/publisher/pipeline/controller.go +++ b/libbeat/publisher/pipeline/controller.go @@ -267,11 +267,11 @@ func (c *outputController) createQueueIfNeeded(outGrp outputs.Group) { factory = c.queueFactory } - queue, err := factory(logger, c.onACK, c.inputQueueSize) + queue, err := factory(logger, c.onACK, c.inputQueueSize, outGrp.EncoderFactory) if err != nil { logger.Errorf("queue creation failed, falling back to default memory queue, check your queue configuration") s, _ := memqueue.SettingsForUserConfig(nil) - queue = memqueue.NewQueue(logger, c.onACK, s, c.inputQueueSize) + queue = memqueue.NewQueue(logger, c.onACK, s, c.inputQueueSize, outGrp.EncoderFactory) } c.queue = queue @@ -295,11 +295,11 @@ func (c *outputController) createQueueIfNeeded(outGrp outputs.Group) { // a producer for a nonexistent queue. type emptyProducer struct{} -func (emptyProducer) Publish(_ interface{}) (queue.EntryID, bool) { +func (emptyProducer) Publish(_ queue.Entry) (queue.EntryID, bool) { return 0, false } -func (emptyProducer) TryPublish(_ interface{}) (queue.EntryID, bool) { +func (emptyProducer) TryPublish(_ queue.Entry) (queue.EntryID, bool) { return 0, false } diff --git a/libbeat/publisher/pipeline/controller_test.go b/libbeat/publisher/pipeline/controller_test.go index 7384e5f7128..6834af2c7f3 100644 --- a/libbeat/publisher/pipeline/controller_test.go +++ b/libbeat/publisher/pipeline/controller_test.go @@ -189,7 +189,7 @@ func TestOutputQueueFactoryTakesPrecedence(t *testing.T) { func TestFailedQueueFactoryRevertsToDefault(t *testing.T) { defaultSettings, _ := memqueue.SettingsForUserConfig(nil) - failedFactory := func(_ *logp.Logger, _ func(int), _ int) (queue.Queue, error) { + failedFactory := func(_ *logp.Logger, _ func(int), _ int, _ queue.EncoderFactory) (queue.Queue, error) { return nil, fmt.Errorf("This queue creation intentionally failed") } controller := outputController{ diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index cf03163750e..37bf437395c 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -255,7 +255,7 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { producerCfg := queue.ProducerConfig{} if client.eventWaitGroup != nil || cfg.ClientListener != nil { - producerCfg.OnDrop = func(event interface{}) { + producerCfg.OnDrop = func(event queue.Entry) { publisherEvent, _ := event.(publisher.Event) if cfg.ClientListener != nil { cfg.ClientListener.DroppedOnPublish(publisherEvent.Content) diff --git a/libbeat/publisher/pipeline/pipeline_test.go b/libbeat/publisher/pipeline/pipeline_test.go index 1278f5196ab..5a236acc9c0 100644 --- a/libbeat/publisher/pipeline/pipeline_test.go +++ b/libbeat/publisher/pipeline/pipeline_test.go @@ -32,7 +32,7 @@ type testQueue struct { } type testProducer struct { - publish func(try bool, event interface{}) (queue.EntryID, bool) + publish func(try bool, event queue.Entry) (queue.EntryID, bool) cancel func() int } @@ -72,14 +72,14 @@ func (q *testQueue) Get(sz int) (queue.Batch, error) { return nil, nil } -func (p *testProducer) Publish(event interface{}) (queue.EntryID, bool) { +func (p *testProducer) Publish(event queue.Entry) (queue.EntryID, bool) { if p.publish != nil { return p.publish(false, event) } return 0, false } -func (p *testProducer) TryPublish(event interface{}) (queue.EntryID, bool) { +func (p *testProducer) TryPublish(event queue.Entry) (queue.EntryID, bool) { if p.publish != nil { return p.publish(true, event) } @@ -118,7 +118,7 @@ func makeTestQueue() queue.Queue { var producer *testProducer p := blockingProducer(cfg) producer = &testProducer{ - publish: func(try bool, event interface{}) (queue.EntryID, bool) { + publish: func(try bool, event queue.Entry) (queue.EntryID, bool) { if try { return p.TryPublish(event) } @@ -150,7 +150,7 @@ func blockingProducer(_ queue.ProducerConfig) queue.Producer { waiting := atomic.MakeInt(0) return &testProducer{ - publish: func(_ bool, _ interface{}) (queue.EntryID, bool) { + publish: func(_ bool, _ queue.Entry) (queue.EntryID, bool) { waiting.Inc() <-sig return 0, false diff --git a/libbeat/publisher/pipeline/stress/out.go b/libbeat/publisher/pipeline/stress/out.go index d1014b8d782..03ea06d3be8 100644 --- a/libbeat/publisher/pipeline/stress/out.go +++ b/libbeat/publisher/pipeline/stress/out.go @@ -67,7 +67,7 @@ func makeTestOutput(_ outputs.IndexManager, beat beat.Info, observer outputs.Obs clients[i] = client } - return outputs.Success(config.Queue, config.BulkMaxSize, config.Retry, clients...) + return outputs.Success(config.Queue, config.BulkMaxSize, config.Retry, nil, clients...) } func (*testOutput) Close() error { return nil } diff --git a/libbeat/publisher/pipeline/ttl_batch_test.go b/libbeat/publisher/pipeline/ttl_batch_test.go index 5e277d5042c..4c5207acbb0 100644 --- a/libbeat/publisher/pipeline/ttl_batch_test.go +++ b/libbeat/publisher/pipeline/ttl_batch_test.go @@ -25,6 +25,7 @@ import ( "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/publisher" + "github.com/elastic/beats/v7/libbeat/publisher/queue" ) func TestBatchSplitRetry(t *testing.T) { @@ -128,7 +129,7 @@ func (b *mockQueueBatch) Count() int { func (b *mockQueueBatch) Done() { } -func (b *mockQueueBatch) Entry(i int) interface{} { +func (b *mockQueueBatch) Entry(i int) queue.Entry { return fmt.Sprintf("event %v", i) } diff --git a/libbeat/publisher/queue/diskqueue/benchmark_test.go b/libbeat/publisher/queue/diskqueue/benchmark_test.go index 8bd2a23276c..1ac91e57ce1 100644 --- a/libbeat/publisher/queue/diskqueue/benchmark_test.go +++ b/libbeat/publisher/queue/diskqueue/benchmark_test.go @@ -100,7 +100,7 @@ func setup(b *testing.B, encrypt bool, compress bool, protobuf bool) (*diskQueue } s.UseCompression = compress s.UseProtobuf = protobuf - q, err := NewQueue(logp.L(), nil, s) + q, err := NewQueue(logp.L(), nil, s, nil) if err != nil { panic(err) } @@ -118,7 +118,7 @@ func setup(b *testing.B, encrypt bool, compress bool, protobuf bool) (*diskQueue func publishEvents(p queue.Producer, num int, protobuf bool) { for i := 0; i < num; i++ { - var e interface{} + var e queue.Entry if protobuf { e = makeMessagesEvent() } else { diff --git a/libbeat/publisher/queue/diskqueue/consumer.go b/libbeat/publisher/queue/diskqueue/consumer.go index 3515d0d2820..0ebdcef5ad3 100644 --- a/libbeat/publisher/queue/diskqueue/consumer.go +++ b/libbeat/publisher/queue/diskqueue/consumer.go @@ -86,7 +86,7 @@ func (batch *diskQueueBatch) Count() int { return len(batch.frames) } -func (batch *diskQueueBatch) Entry(i int) interface{} { +func (batch *diskQueueBatch) Entry(i int) queue.Entry { return batch.frames[i].event } diff --git a/libbeat/publisher/queue/diskqueue/frames.go b/libbeat/publisher/queue/diskqueue/frames.go index f0bd7d3b0b6..2043c5b649b 100644 --- a/libbeat/publisher/queue/diskqueue/frames.go +++ b/libbeat/publisher/queue/diskqueue/frames.go @@ -17,6 +17,8 @@ package diskqueue +import "github.com/elastic/beats/v7/libbeat/publisher/queue" + // Every data frame read from the queue is assigned a unique sequential // integer, which is used to keep track of which frames have been // acknowledged. @@ -52,7 +54,7 @@ type readFrame struct { id frameID // The event decoded from the data frame. - event interface{} + event queue.Entry // How much space this frame occupied on disk (before deserialization), // including the frame header / footer. diff --git a/libbeat/publisher/queue/diskqueue/producer.go b/libbeat/publisher/queue/diskqueue/producer.go index 7471c2b4701..69725c62ccc 100644 --- a/libbeat/publisher/queue/diskqueue/producer.go +++ b/libbeat/publisher/queue/diskqueue/producer.go @@ -49,16 +49,16 @@ type producerWriteRequest struct { // diskQueueProducer implementation of the queue.Producer interface // -func (producer *diskQueueProducer) Publish(event interface{}) (queue.EntryID, bool) { +func (producer *diskQueueProducer) Publish(event queue.Entry) (queue.EntryID, bool) { return 0, producer.publish(event, true) } -func (producer *diskQueueProducer) TryPublish(event interface{}) (queue.EntryID, bool) { +func (producer *diskQueueProducer) TryPublish(event queue.Entry) (queue.EntryID, bool) { return 0, producer.publish(event, false) } func (producer *diskQueueProducer) publish( - event interface{}, shouldBlock bool, + event queue.Entry, shouldBlock bool, ) bool { if producer.cancelled { return false diff --git a/libbeat/publisher/queue/diskqueue/queue.go b/libbeat/publisher/queue/diskqueue/queue.go index 74fff3fea64..5c04f9a0385 100644 --- a/libbeat/publisher/queue/diskqueue/queue.go +++ b/libbeat/publisher/queue/diskqueue/queue.go @@ -110,8 +110,9 @@ func FactoryForSettings(settings Settings) queue.QueueFactory { logger *logp.Logger, ackCallback func(eventCount int), inputQueueSize int, + encoderFactory queue.EncoderFactory, ) (queue.Queue, error) { - return NewQueue(logger, ackCallback, settings) + return NewQueue(logger, ackCallback, settings, encoderFactory) } } @@ -121,6 +122,7 @@ func NewQueue( logger *logp.Logger, writeToDiskCallback func(eventCount int), settings Settings, + encoderFactory queue.EncoderFactory, ) (*diskQueue, error) { logger = logger.Named("diskqueue") logger.Debugf( @@ -212,6 +214,11 @@ func NewQueue( activeFrameCount -= int(nextReadPosition.frameIndex) logger.Infof("Found %d existing events on queue start", activeFrameCount) + var encoder queue.Encoder + if encoderFactory != nil { + encoder = encoderFactory() + } + queue := &diskQueue{ logger: logger, settings: settings, @@ -225,7 +232,7 @@ func NewQueue( acks: newDiskQueueACKs(logger, nextReadPosition, positionFile), - readerLoop: newReaderLoop(settings), + readerLoop: newReaderLoop(settings, encoder), writerLoop: newWriterLoop(logger, writeToDiskCallback, settings), deleterLoop: newDeleterLoop(settings), diff --git a/libbeat/publisher/queue/diskqueue/queue_test.go b/libbeat/publisher/queue/diskqueue/queue_test.go index c0b780ffb38..f6a4c406ed3 100644 --- a/libbeat/publisher/queue/diskqueue/queue_test.go +++ b/libbeat/publisher/queue/diskqueue/queue_test.go @@ -89,7 +89,7 @@ func TestMetrics(t *testing.T) { // lower max segment size so we can get multiple segments settings.MaxSegmentSize = 100 - testQueue, err := NewQueue(logp.L(), nil, settings) + testQueue, err := NewQueue(logp.L(), nil, settings, nil) require.NoError(t, err) defer testQueue.Close() @@ -124,7 +124,7 @@ func makeTestQueue() queuetest.QueueFactory { } settings := DefaultSettings() settings.Path = dir - queue, _ := NewQueue(logp.L(), nil, settings) + queue, _ := NewQueue(logp.L(), nil, settings, nil) return testQueue{ diskQueue: queue, teardown: func() { diff --git a/libbeat/publisher/queue/diskqueue/reader_loop.go b/libbeat/publisher/queue/diskqueue/reader_loop.go index 644e378f301..0dae48732b3 100644 --- a/libbeat/publisher/queue/diskqueue/reader_loop.go +++ b/libbeat/publisher/queue/diskqueue/reader_loop.go @@ -21,6 +21,8 @@ import ( "encoding/binary" "fmt" "io" + + "github.com/elastic/beats/v7/libbeat/publisher/queue" ) // startPosition and endPosition are absolute byte offsets into the segment @@ -67,16 +69,22 @@ type readerLoop struct { // The helper object to deserialize binary blobs from the queue into // publisher.Event objects that can be returned in a readFrame. decoder *eventDecoder + + // If set, this encoding helper is called on events after loading + // them from disk, to convert them to their final output serialization + // format. + outputEncoder queue.Encoder } -func newReaderLoop(settings Settings) *readerLoop { +func newReaderLoop(settings Settings, outputEncoder queue.Encoder) *readerLoop { return &readerLoop{ settings: settings, - requestChan: make(chan readerLoopRequest, 1), - responseChan: make(chan readerLoopResponse), - output: make(chan *readFrame, settings.ReadAheadLimit), - decoder: newEventDecoder(), + requestChan: make(chan readerLoopRequest, 1), + responseChan: make(chan readerLoopResponse), + output: make(chan *readFrame, settings.ReadAheadLimit), + decoder: newEventDecoder(), + outputEncoder: outputEncoder, } } @@ -124,6 +132,10 @@ func (rl *readerLoop) processRequest(request readerLoopRequest) readerLoopRespon frame.segment = request.segment frame.id = nextFrameID nextFrameID++ + // If an output encoder is configured, apply it now + if rl.outputEncoder != nil { + frame.event, _ = rl.outputEncoder.EncodeEntry(frame.event) + } // We've read the frame, try sending it to the output channel. select { case rl.output <- frame: diff --git a/libbeat/publisher/queue/memqueue/broker.go b/libbeat/publisher/queue/memqueue/broker.go index 1455745961c..23569f02150 100644 --- a/libbeat/publisher/queue/memqueue/broker.go +++ b/libbeat/publisher/queue/memqueue/broker.go @@ -54,6 +54,9 @@ type broker struct { // wait group for queue workers (runLoop and ackLoop) wg sync.WaitGroup + // The factory used to create an event encoder when creating a producer + encoderFactory queue.EncoderFactory + /////////////////////////// // api channels @@ -113,7 +116,7 @@ type Settings struct { } type queueEntry struct { - event interface{} + event queue.Entry id queue.EntryID producer *ackProducer @@ -147,8 +150,9 @@ func FactoryForSettings(settings Settings) queue.QueueFactory { logger *logp.Logger, ackCallback func(eventCount int), inputQueueSize int, + encoderFactory queue.EncoderFactory, ) (queue.Queue, error) { - return NewQueue(logger, ackCallback, settings, inputQueueSize), nil + return NewQueue(logger, ackCallback, settings, inputQueueSize, encoderFactory), nil } } @@ -160,8 +164,9 @@ func NewQueue( ackCallback func(eventCount int), settings Settings, inputQueueSize int, + encoderFactory queue.EncoderFactory, ) *broker { - b := newQueue(logger, ackCallback, settings, inputQueueSize) + b := newQueue(logger, ackCallback, settings, inputQueueSize, encoderFactory) // Start the queue workers b.wg.Add(2) @@ -186,6 +191,7 @@ func newQueue( ackCallback func(eventCount int), settings Settings, inputQueueSize int, + encoderFactory queue.EncoderFactory, ) *broker { chanSize := AdjustInputQueueSize(inputQueueSize, settings.Events) @@ -213,6 +219,8 @@ func newQueue( buf: make([]queueEntry, settings.Events), + encoderFactory: encoderFactory, + // broker API channels pushChan: make(chan pushRequest, chanSize), getChan: make(chan getRequest), @@ -249,7 +257,14 @@ func (b *broker) BufferConfig() queue.BufferConfig { } func (b *broker) Producer(cfg queue.ProducerConfig) queue.Producer { - return newProducer(b, cfg.ACK, cfg.OnDrop, cfg.DropOnCancel) + // If we were given an encoder factory to allow producers to encode + // events for output before they entered the queue, then create an + // encoder for the new producer. + var encoder queue.Encoder + if b.encoderFactory != nil { + encoder = b.encoderFactory() + } + return newProducer(b, cfg.ACK, cfg.OnDrop, cfg.DropOnCancel, encoder) } func (b *broker) Get(count int) (queue.Batch, error) { @@ -398,7 +413,7 @@ func (b *batch) rawEntry(i int) *queueEntry { } // Return the event referenced by the i-th element of this batch -func (b *batch) Entry(i int) interface{} { +func (b *batch) Entry(i int) queue.Entry { return b.rawEntry(i).event } diff --git a/libbeat/publisher/queue/memqueue/internal_api.go b/libbeat/publisher/queue/memqueue/internal_api.go index ae93a5df0d5..95b5e0eba90 100644 --- a/libbeat/publisher/queue/memqueue/internal_api.go +++ b/libbeat/publisher/queue/memqueue/internal_api.go @@ -22,7 +22,11 @@ import "github.com/elastic/beats/v7/libbeat/publisher/queue" // producer -> broker API type pushRequest struct { - event interface{} + event queue.Entry + + // The event's encoded size in bytes if the configured output supports + // early encoding, 0 otherwise. + eventSize int // The producer that generated this event, or nil if this producer does // not require ack callbacks. diff --git a/libbeat/publisher/queue/memqueue/produce.go b/libbeat/publisher/queue/memqueue/produce.go index 5ddea468e4c..55f15a8cc86 100644 --- a/libbeat/publisher/queue/memqueue/produce.go +++ b/libbeat/publisher/queue/memqueue/produce.go @@ -40,6 +40,7 @@ type openState struct { done chan struct{} queueDone <-chan struct{} events chan pushRequest + encoder queue.Encoder } // producerID stores the order of events within a single producer, so multiple @@ -50,19 +51,20 @@ type producerID uint64 type produceState struct { cb ackHandler - dropCB func(interface{}) + dropCB func(queue.Entry) cancelled bool lastACK producerID } type ackHandler func(count int) -func newProducer(b *broker, cb ackHandler, dropCB func(interface{}), dropOnCancel bool) queue.Producer { +func newProducer(b *broker, cb ackHandler, dropCB func(queue.Entry), dropOnCancel bool, encoder queue.Encoder) queue.Producer { openState := openState{ log: b.logger, done: make(chan struct{}), queueDone: b.ctx.Done(), events: b.pushChan, + encoder: encoder, } if cb != nil { @@ -74,18 +76,18 @@ func newProducer(b *broker, cb ackHandler, dropCB func(interface{}), dropOnCance return &forgetfulProducer{broker: b, openState: openState} } -func (p *forgetfulProducer) makePushRequest(event interface{}) pushRequest { +func (p *forgetfulProducer) makePushRequest(event queue.Entry) pushRequest { resp := make(chan queue.EntryID, 1) return pushRequest{ event: event, resp: resp} } -func (p *forgetfulProducer) Publish(event interface{}) (queue.EntryID, bool) { +func (p *forgetfulProducer) Publish(event queue.Entry) (queue.EntryID, bool) { return p.openState.publish(p.makePushRequest(event)) } -func (p *forgetfulProducer) TryPublish(event interface{}) (queue.EntryID, bool) { +func (p *forgetfulProducer) TryPublish(event queue.Entry) (queue.EntryID, bool) { return p.openState.tryPublish(p.makePushRequest(event)) } @@ -94,7 +96,7 @@ func (p *forgetfulProducer) Cancel() int { return 0 } -func (p *ackProducer) makePushRequest(event interface{}) pushRequest { +func (p *ackProducer) makePushRequest(event queue.Entry) pushRequest { resp := make(chan queue.EntryID, 1) return pushRequest{ event: event, @@ -105,7 +107,7 @@ func (p *ackProducer) makePushRequest(event interface{}) pushRequest { resp: resp} } -func (p *ackProducer) Publish(event interface{}) (queue.EntryID, bool) { +func (p *ackProducer) Publish(event queue.Entry) (queue.EntryID, bool) { id, published := p.openState.publish(p.makePushRequest(event)) if published { p.producedCount++ @@ -113,7 +115,7 @@ func (p *ackProducer) Publish(event interface{}) (queue.EntryID, bool) { return id, published } -func (p *ackProducer) TryPublish(event interface{}) (queue.EntryID, bool) { +func (p *ackProducer) TryPublish(event queue.Entry) (queue.EntryID, bool) { id, published := p.openState.tryPublish(p.makePushRequest(event)) if published { p.producedCount++ @@ -143,6 +145,11 @@ func (st *openState) Close() { } func (st *openState) publish(req pushRequest) (queue.EntryID, bool) { + // If we were given an encoder callback for incoming events, apply it before + // sending the entry to the queue. + if st.encoder != nil { + req.event, req.eventSize = st.encoder.EncodeEntry(req.event) + } select { case st.events <- req: // The events channel is buffered, which means we may successfully @@ -166,6 +173,11 @@ func (st *openState) publish(req pushRequest) (queue.EntryID, bool) { } func (st *openState) tryPublish(req pushRequest) (queue.EntryID, bool) { + // If we were given an encoder callback for incoming events, apply it before + // sending the entry to the queue. + if st.encoder != nil { + req.event, req.eventSize = st.encoder.EncodeEntry(req.event) + } select { case st.events <- req: // The events channel is buffered, which means we may successfully diff --git a/libbeat/publisher/queue/memqueue/queue_test.go b/libbeat/publisher/queue/memqueue/queue_test.go index 637e7ccd4fb..df2d16d0dec 100644 --- a/libbeat/publisher/queue/memqueue/queue_test.go +++ b/libbeat/publisher/queue/memqueue/queue_test.go @@ -92,12 +92,12 @@ func TestProducerDoesNotBlockWhenQueueClosed(t *testing.T) { Events: 2, // Queue size MaxGetRequest: 1, // make sure the queue won't buffer events FlushTimeout: time.Millisecond, - }, 0) + }, 0, nil) p := q.Producer(queue.ProducerConfig{ // We do not read from the queue, so the callbacks are never called ACK: func(count int) {}, - OnDrop: func(e interface{}) {}, + OnDrop: func(e queue.Entry) {}, DropOnCancel: false, }) @@ -164,13 +164,13 @@ func TestProducerClosePreservesEventCount(t *testing.T) { Events: 3, // Queue size MaxGetRequest: 2, FlushTimeout: 10 * time.Millisecond, - }, 1) + }, 1, nil) p := q.Producer(queue.ProducerConfig{ ACK: func(count int) { activeEvents.Add(-int64(count)) }, - OnDrop: func(e interface{}) { + OnDrop: func(e queue.Entry) { //activeEvents.Add(-1) }, DropOnCancel: false, @@ -263,7 +263,7 @@ func TestQueueMetricsBuffer(t *testing.T) { } func queueTestWithSettings(t *testing.T, settings Settings, eventsToTest int, testName string) { - testQueue := NewQueue(nil, nil, settings, 0) + testQueue := NewQueue(nil, nil, settings, 0, nil) defer testQueue.Close() // Send events to queue @@ -307,7 +307,7 @@ func makeTestQueue(sz, minEvents int, flushTimeout time.Duration) queuetest.Queu Events: sz, MaxGetRequest: minEvents, FlushTimeout: flushTimeout, - }, 0) + }, 0, nil) } } @@ -418,22 +418,22 @@ func TestEntryIDs(t *testing.T) { } t.Run("acking in forward order with directEventLoop reports the right event IDs", func(t *testing.T) { - testQueue := NewQueue(nil, nil, Settings{Events: 1000}, 0) + testQueue := NewQueue(nil, nil, Settings{Events: 1000}, 0, nil) testForward(testQueue) }) t.Run("acking in reverse order with directEventLoop reports the right event IDs", func(t *testing.T) { - testQueue := NewQueue(nil, nil, Settings{Events: 1000}, 0) + testQueue := NewQueue(nil, nil, Settings{Events: 1000}, 0, nil) testBackward(testQueue) }) t.Run("acking in forward order with bufferedEventLoop reports the right event IDs", func(t *testing.T) { - testQueue := NewQueue(nil, nil, Settings{Events: 1000, MaxGetRequest: 2, FlushTimeout: time.Microsecond}, 0) + testQueue := NewQueue(nil, nil, Settings{Events: 1000, MaxGetRequest: 2, FlushTimeout: time.Microsecond}, 0, nil) testForward(testQueue) }) t.Run("acking in reverse order with bufferedEventLoop reports the right event IDs", func(t *testing.T) { - testQueue := NewQueue(nil, nil, Settings{Events: 1000, MaxGetRequest: 2, FlushTimeout: time.Microsecond}, 0) + testQueue := NewQueue(nil, nil, Settings{Events: 1000, MaxGetRequest: 2, FlushTimeout: time.Microsecond}, 0, nil) testBackward(testQueue) }) } @@ -447,7 +447,7 @@ func TestBatchFreeEntries(t *testing.T) { // 4. Make sure only events 6-10 are nil // 5. Call FreeEntries on the first batch // 6. Make sure all events are nil - testQueue := NewQueue(nil, nil, Settings{Events: queueSize, MaxGetRequest: batchSize, FlushTimeout: time.Second}, 0) + testQueue := NewQueue(nil, nil, Settings{Events: queueSize, MaxGetRequest: batchSize, FlushTimeout: time.Second}, 0, nil) producer := testQueue.Producer(queue.ProducerConfig{}) for i := 0; i < queueSize; i++ { _, ok := producer.Publish(i) diff --git a/libbeat/publisher/queue/memqueue/runloop_test.go b/libbeat/publisher/queue/memqueue/runloop_test.go index 9b3a467647a..d25537265ea 100644 --- a/libbeat/publisher/queue/memqueue/runloop_test.go +++ b/libbeat/publisher/queue/memqueue/runloop_test.go @@ -42,9 +42,9 @@ func TestFlushSettingsDoNotBlockFullBatches(t *testing.T) { MaxGetRequest: 500, FlushTimeout: 10 * time.Second, }, - 10) + 10, nil) - producer := newProducer(broker, nil, nil, false) + producer := newProducer(broker, nil, nil, false, nil) rl := broker.runLoop for i := 0; i < 100; i++ { // Pair each publish call with an iteration of the run loop so we @@ -81,9 +81,9 @@ func TestFlushSettingsBlockPartialBatches(t *testing.T) { MaxGetRequest: 500, FlushTimeout: 10 * time.Second, }, - 10) + 10, nil) - producer := newProducer(broker, nil, nil, false) + producer := newProducer(broker, nil, nil, false, nil) rl := broker.runLoop for i := 0; i < 100; i++ { // Pair each publish call with an iteration of the run loop so we diff --git a/libbeat/publisher/queue/queue.go b/libbeat/publisher/queue/queue.go index 101a3290117..e691c2888f6 100644 --- a/libbeat/publisher/queue/queue.go +++ b/libbeat/publisher/queue/queue.go @@ -25,6 +25,12 @@ import ( "github.com/elastic/elastic-agent-libs/opt" ) +// Entry is a placeholder type for the objects contained by the queue, which +// can be anything (but right now is always a publisher.Event). We could just +// use interface{} everywhere but this makes the API's intentions clearer +// and reduces accidental type mismatches. +type Entry interface{} + // Metrics is a set of basic-user friendly metrics that report the current state of the queue. These metrics are meant to be relatively generic and high-level, and when reported directly, can be comprehensible to a user. type Metrics struct { //EventCount is the total events currently in the queue @@ -74,7 +80,14 @@ type Queue interface { Metrics() (Metrics, error) } -type QueueFactory func(logger *logp.Logger, ack func(eventCount int), inputQueueSize int) (Queue, error) +// If encoderFactory is provided, then the resulting queue must use it to +// encode queued events before returning them. +type QueueFactory func( + logger *logp.Logger, + ack func(eventCount int), + inputQueueSize int, + encoderFactory EncoderFactory, +) (Queue, error) // BufferConfig returns the pipelines buffering settings, // for the pipeline to use. @@ -98,7 +111,7 @@ type ProducerConfig struct { // the queue. Currently this can only happen when a Publish call is sent // to the memory queue's request channel but the producer is cancelled // before it reaches the queue buffer. - OnDrop func(interface{}) + OnDrop func(Entry) // DropOnCancel is a hint to the queue to drop events if the producer disconnects // via Cancel. @@ -110,35 +123,49 @@ type EntryID uint64 // Producer is an interface to be used by the pipelines client to forward // events to a queue. type Producer interface { - // Publish adds an event to the queue, blocking if necessary, and returns + // Publish adds an entry to the queue, blocking if necessary, and returns // the new entry's id and true on success. - Publish(event interface{}) (EntryID, bool) + Publish(entry Entry) (EntryID, bool) - // TryPublish adds an event to the queue if doing so will not block the + // TryPublish adds an entry to the queue if doing so will not block the // caller, otherwise it immediately returns. The reasons a publish attempt // might block are defined by the specific queue implementation and its // configuration. If the event was successfully added, returns true with // the event's assigned ID, and false otherwise. - TryPublish(event interface{}) (EntryID, bool) + TryPublish(entry Entry) (EntryID, bool) // Cancel closes this Producer endpoint. If the producer is configured to - // drop its events on Cancel, the number of dropped events is returned. + // drop its entries on Cancel, the number of dropped entries is returned. // Note: A queue may still send ACK signals even after Cancel is called on // the originating Producer. The pipeline client must accept and // discard these ACKs. Cancel() int } -// Batch of events to be returned to Consumers. The `Done` method will tell the -// queue that the batch has been consumed and its events can be discarded. +// Batch of entries (usually publisher.Event) to be returned to Consumers. +// The `Done` method will tell the queue that the batch has been consumed and +// its entries can be acknowledged and discarded. type Batch interface { Count() int - Entry(i int) interface{} - // Release the internal references to the contained events. + Entry(i int) Entry + // Release the internal references to the contained events, if + // supported (the disk queue does not yet implement it). // Count() and Entry() cannot be used after this call. - // This is only guaranteed to release references when using the - // proxy queue, where it is used to avoid keeping multiple copies - // of events that have already been queued by the shipper. FreeEntries() Done() } + +// Outputs can provide an EncoderFactory to enable early encoding, in which +// case the queue will run the given encoder on events before they reach +// consumers. +// Encoders are provided as factories so each worker goroutine can have its own +type EncoderFactory func() Encoder + +type Encoder interface { + // Return the encoded form of the entry that the output workers can use, + // and the in-memory size of the encoded buffer. + // EncodeEntry should return a valid Entry when given one, even if the + // encoding fails. In that case, the returned Entry should contain the + // metadata needed to report the error when the entry is consumed. + EncodeEntry(Entry) (Entry, int) +}