Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[libbeat] Implement early event encoding for the Elasticsearch output #38572

Merged
merged 69 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
4ff1d75
unbuffer memory queue input channel
faec Feb 28, 2024
5627633
remove queue entry ids
faec Feb 28, 2024
6a27aeb
move ack callbacks to a dedicated worker goroutine
faec Feb 29, 2024
3871376
extra work to keep acknowledgments flowing
faec Feb 29, 2024
18ab493
add eager handling of deletions to the queue runloop
faec Feb 29, 2024
7ae8929
move eager deletion before the check to unblock the push channel
faec Feb 29, 2024
3f5f4ce
remove callback worker and runloop mods
faec Mar 2, 2024
d027a6f
add API support for preencoding beats events
faec Mar 2, 2024
016d0f3
add error check
faec Mar 2, 2024
15b3f7c
re-add entry ids
faec Mar 2, 2024
98f1ed9
encode events in goroutines
faec Mar 3, 2024
d51f6bd
switch to encoder factories to support parallel encoding
faec Mar 3, 2024
8add438
remember to close the channel
faec Mar 3, 2024
3cb57b4
encode in the pipeline client
faec Mar 4, 2024
c0f3b22
free original event data when encoding is done
faec Mar 5, 2024
bd1df4c
mark pre-encoded data correctly
faec Mar 5, 2024
63064a6
add early delete patch
faec Mar 5, 2024
e81e2d1
Merge branch 'main' of github.com:elastic/beats into memqueue-encode-…
faec Mar 22, 2024
8a3592e
disentangling the experimental scaffolding from the final api
faec Mar 22, 2024
65e0d9f
move the encoding datatypes into the queue package
faec Mar 22, 2024
2fd6081
Add early encoding hooks in the memory queue
faec Mar 22, 2024
eab04ff
pass encoder factory through during memqueue creation
faec Mar 22, 2024
b487665
add fallback encoding to client workers if queue doesn't support earl…
faec Mar 22, 2024
8ee228e
Remove proxy queue implementation
faec Mar 22, 2024
bd97879
Use the memory queue instead of the proxy queue in the shipper output
faec Mar 22, 2024
8e4321f
Merge branch 'delete-proxy-queue' into memqueue-encode-client
faec Mar 22, 2024
fc1914b
remove early-encoding hooks from pipeline clients
faec Mar 22, 2024
3119457
Change how encoded event sizes are reported
faec Mar 22, 2024
562e5bb
remove encoding hooks from output workers and eventConsumer
faec Mar 23, 2024
a19d91d
Cleanups
faec Mar 23, 2024
6d0a33b
revert unrelated changes
faec Mar 23, 2024
0d9c948
cleanup
faec Mar 23, 2024
64ab52f
revert unrelated change
faec Mar 23, 2024
1a591f0
revert unrelated change
faec Mar 23, 2024
cae64f7
revert no-op change
faec Mar 23, 2024
ec7080d
revert unneeded change
faec Mar 23, 2024
ebf2dc0
reworking dead letter index handling
faec Mar 23, 2024
22d8f62
Clean up non-indexable policy handilng in the ES output
faec Mar 25, 2024
565f7a6
Merge branch 'main' of github.com:elastic/beats into dead-letter-inde…
faec Mar 25, 2024
70026ad
Merge branch 'dead-letter-index-cleanup' into queue-early-encode
faec Mar 25, 2024
354d8d0
Merge branch 'main' of github.com:elastic/beats into queue-early-encode
faec Mar 26, 2024
df65a98
Merge branch 'main' of github.com:elastic/beats into queue-early-encode
faec Apr 5, 2024
c1da549
update integration tests
faec Apr 5, 2024
e989ac4
Merge branch 'main' of github.com:elastic/beats into queue-early-encode
faec Apr 5, 2024
485b684
remove old code
faec Apr 5, 2024
e75930e
update documentation comments
faec Apr 5, 2024
1fa7052
update more comments
faec Apr 5, 2024
c5851f9
make check
faec Apr 5, 2024
7befab7
small cleanups
faec Apr 5, 2024
50496fc
cleanups
faec Apr 5, 2024
176087f
update more integration tests
faec Apr 6, 2024
92b4522
add artificial panic to track down unupdated tests
faec Apr 6, 2024
5249d7b
fix function call, linter errors
faec Apr 8, 2024
02b0f6f
Merge branch 'main' of github.com:elastic/beats into queue-early-encode
faec Apr 8, 2024
b05d252
lint
faec Apr 8, 2024
46321e7
lint
faec Apr 8, 2024
4084d49
lint
faec Apr 8, 2024
403586b
working on tests
faec Apr 8, 2024
ebf3441
Finish test
faec Apr 8, 2024
6e581b2
Merge branch 'main' of github.com:elastic/beats into queue-early-encode
faec Apr 8, 2024
70c7ba4
remove futile nolint tags
faec Apr 8, 2024
faaaa02
make check
faec Apr 8, 2024
c61ce5f
add comments
faec Apr 8, 2024
8eedfd9
add comments
faec Apr 8, 2024
ed614fc
Merge branch 'benchmark-vanilla' into queue-early-encode
faec Apr 9, 2024
5cc9ebb
update logstash tests
faec Apr 10, 2024
732fcd8
update changelog
faec Apr 10, 2024
75f5e73
remove debug panic from the output
faec Apr 10, 2024
5fe0d35
Merge branch 'main' of github.com:elastic/beats into queue-early-encode
faec Apr 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Raise up logging level to warning when attempting to configure beats with unknown fields from autodiscovered events/environments
- elasticsearch output now supports `idle_connection_timeout`. {issue}35616[35615] {pull}36843[36843]
- Update to Go 1.21.9. {pulk}38727[38727]
- Enable early event encoding in the Elasticsearch output, improving cpu and memory use {pull}38572[38572]

*Auditbeat*

Expand Down
11 changes: 11 additions & 0 deletions libbeat/esleg/eslegclient/enc.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,22 @@ func (b *jsonEncoder) Marshal(obj interface{}) error {
return b.AddRaw(obj)
}

// RawEncoding is used to wrap objects that have already been json-encoded,
// so the encoder knows to append them directly instead of treating them
// like a string.
type RawEncoding struct {
Encoding []byte
}

func (b *jsonEncoder) AddRaw(obj interface{}) error {
var err error
switch v := obj.(type) {
case beat.Event:
err = b.folder.Fold(event{Timestamp: v.Timestamp, Fields: v.Fields})
case *beat.Event:
err = b.folder.Fold(event{Timestamp: v.Timestamp, Fields: v.Fields})
case RawEncoding:
_, err = b.buf.Write(v.Encoding)
default:
err = b.folder.Fold(obj)
}
Expand Down Expand Up @@ -199,6 +208,8 @@ func (g *gzipEncoder) AddRaw(obj interface{}) error {
err = g.folder.Fold(event{Timestamp: v.Timestamp, Fields: v.Fields})
case *beat.Event:
err = g.folder.Fold(event{Timestamp: v.Timestamp, Fields: v.Fields})
case RawEncoding:
_, err = g.gzip.Write(v.Encoding)
default:
err = g.folder.Fold(obj)
}
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func makeConsole(
}
}

return outputs.Success(config.Queue, config.BatchSize, 0, c)
return outputs.Success(config.Queue, config.BatchSize, 0, nil, c)
}

func newConsole(index string, observer outputs.Observer, codec codec.Codec) (*console, error) {
Expand Down
96 changes: 43 additions & 53 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,78 +299,65 @@ func (client *Client) bulkEncodePublishRequest(version version.V, data []publish
okEvents := data[:0]
bulkItems := []interface{}{}
for i := range data {
event := &data[i].Content
if data[i].EncodedEvent == nil {
client.log.Error("Elasticsearch output received unencoded publisher.Event")
continue
}
event := data[i].EncodedEvent.(*encodedEvent)
if event.err != nil {
// This means there was an error when encoding the event and it isn't
// ingestable, so report the error and continue.
client.log.Error(event.err)
continue
}
meta, err := client.createEventBulkMeta(version, event)
if err != nil {
client.log.Errorf("Failed to encode event meta data: %+v", err)
continue
}
if opType := events.GetOpType(*event); opType == events.OpTypeDelete {
if event.opType == events.OpTypeDelete {
// We don't include the event source in a bulk DELETE
bulkItems = append(bulkItems, meta)
} else {
bulkItems = append(bulkItems, meta, event)
// Wrap the encoded event in a RawEncoding so the Elasticsearch client
// knows not to re-encode it
bulkItems = append(bulkItems, meta, eslegclient.RawEncoding{Encoding: event.encoding})
}
okEvents = append(okEvents, data[i])
}
return okEvents, bulkItems
}

func (client *Client) createEventBulkMeta(version version.V, event *beat.Event) (interface{}, error) {
func (client *Client) createEventBulkMeta(version version.V, event *encodedEvent) (interface{}, error) {
eventType := ""
if version.Major < 7 {
eventType = defaultEventType
}

pipeline, err := client.getPipeline(event)
if err != nil {
err := fmt.Errorf("failed to select pipeline: %w", err)
return nil, err
}

index, err := client.getIndex(event)
if err != nil {
err := fmt.Errorf("failed to select event index: %w", err)
return nil, err
}

id, _ := events.GetMetaStringValue(*event, events.FieldMetaID)
opType := events.GetOpType(*event)

meta := eslegclient.BulkMeta{
Index: index,
Index: event.index,
DocType: eventType,
Pipeline: pipeline,
ID: id,
Pipeline: event.pipeline,
ID: event.id,
}

if opType == events.OpTypeDelete {
if id != "" {
if event.opType == events.OpTypeDelete {
if event.id != "" {
return eslegclient.BulkDeleteAction{Delete: meta}, nil
} else {
return nil, fmt.Errorf("%s %s requires _id", events.FieldMetaOpType, events.OpTypeDelete)
}
}
if id != "" || version.Major > 7 || (version.Major == 7 && version.Minor >= 5) {
if opType == events.OpTypeIndex {
if event.id != "" || version.Major > 7 || (version.Major == 7 && version.Minor >= 5) {
if event.opType == events.OpTypeIndex {
return eslegclient.BulkIndexAction{Index: meta}, nil
}
return eslegclient.BulkCreateAction{Create: meta}, nil
}
return eslegclient.BulkIndexAction{Index: meta}, nil
}

func (client *Client) getIndex(event *beat.Event) (string, error) {
// If this event has been dead-lettered, override its index
if event.Meta != nil {
if deadLetter, _ := event.Meta.HasKey(dead_letter_marker_field); deadLetter {
return client.deadLetterIndex, nil
}
}
return client.indexSelector.Select(event)
}

func (client *Client) getPipeline(event *beat.Event) (string, error) {
func getPipeline(event *beat.Event, defaultSelector *outil.Selector) (string, error) {
if event.Meta != nil {
pipeline, err := events.GetMetaStringValue(*event, events.FieldMetaPipeline)
if errors.Is(err, mapstr.ErrKeyNotFound) {
Expand All @@ -383,8 +370,8 @@ func (client *Client) getPipeline(event *beat.Event) (string, error) {
return strings.ToLower(pipeline), nil
}

if client.pipelineSelector != nil {
return client.pipelineSelector.Select(event)
if defaultSelector != nil {
return defaultSelector.Select(event)
}
return "", nil
}
Expand Down Expand Up @@ -427,27 +414,16 @@ func (client *Client) bulkCollectPublishFails(result eslegclient.BulkResult, dat
stats.tooMany++
} else {
// hard failure, apply policy action
result, _ := data[i].Content.Meta.HasKey(dead_letter_marker_field)
if result {
encodedEvent := data[i].EncodedEvent.(*encodedEvent)
if encodedEvent.deadLetter {
stats.nonIndexable++
client.log.Errorf("Can't deliver to dead letter index event (status=%v). Enable debug logs to view the event and cause.", status)
client.log.Debugf("Can't deliver to dead letter index event %#v (status=%v): %s", data[i], status, msg)
// poison pill - this will clog the pipeline if the underlying failure is non transient.
} else if client.deadLetterIndex != "" {
client.log.Warnf("Cannot index event (status=%v), trying dead letter index. Enable debug logs to view the event and cause.", status)
client.log.Debugf("Cannot index event %#v (status=%v): %s, trying dead letter index", data[i], status, msg)
if data[i].Content.Meta == nil {
data[i].Content.Meta = mapstr.M{
dead_letter_marker_field: true,
}
} else {
data[i].Content.Meta[dead_letter_marker_field] = true
}
data[i].Content.Fields = mapstr.M{
"message": data[i].Content.Fields.String(),
"error.type": status,
"error.message": string(msg),
}
client.setDeadLetter(encodedEvent, status, string(msg))
} else { // drop
stats.nonIndexable++
client.log.Warnf("Cannot index event (status=%v): dropping event! Enable debug logs to view the event and cause.", status)
Expand All @@ -465,6 +441,20 @@ func (client *Client) bulkCollectPublishFails(result eslegclient.BulkResult, dat
return failed, stats
}

func (client *Client) setDeadLetter(
encodedEvent *encodedEvent, errType int, errMsg string,
) {
encodedEvent.deadLetter = true
encodedEvent.index = client.deadLetterIndex
deadLetterReencoding := mapstr.M{
"@timestamp": encodedEvent.timestamp,
"message": string(encodedEvent.encoding),
"error.type": errType,
"error.message": errMsg,
}
encodedEvent.encoding = []byte(deadLetterReencoding.String())
}

func (client *Client) Connect() error {
return client.conn.Connect()
}
Expand Down
72 changes: 21 additions & 51 deletions libbeat/outputs/elasticsearch/client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,7 @@
import (
"context"
"fmt"
"io/ioutil"
"math/rand"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"time"

Expand Down Expand Up @@ -85,15 +81,15 @@
output, client := connectTestEsWithStats(t, cfg, index)

// drop old index preparing test
client.conn.Delete(index, "", "", nil)
_, _, _ = client.conn.Delete(index, "", "", nil)

batch := outest.NewBatch(beat.Event{
batch := encodeBatch(client, outest.NewBatch(beat.Event{

Check failure on line 86 in libbeat/outputs/elasticsearch/client_integration_test.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

cannot infer B (libbeat/outputs/elasticsearch/event_encoder_test.go:106:18) (typecheck)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This linter error (and the similar ones below) is mistaken, there is no typecheck error here.

What's worse, if I add a nolint directive to skip it, it gives an error because the directive is "unused", so something is maybe wrong with the linter config...

Timestamp: time.Now(),
Fields: mapstr.M{
"type": "libbeat",
"message": "Test message from libbeat",
},
})
}))

err := output.Publish(context.Background(), batch)
if err != nil {
Expand Down Expand Up @@ -131,15 +127,16 @@
"index": index,
"pipeline": "%{[pipeline]}",
})
client.conn.Delete(index, "", "", nil)
_, _, _ = client.conn.Delete(index, "", "", nil)

// Check version
if client.conn.GetVersion().Major < 5 {
t.Skip("Skipping tests as pipeline not available in <5.x releases")
}

publish := func(event beat.Event) {
err := output.Publish(context.Background(), outest.NewBatch(event))
batch := encodeBatch(client, outest.NewBatch(event))

Check failure on line 138 in libbeat/outputs/elasticsearch/client_integration_test.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

cannot infer B (libbeat/outputs/elasticsearch/event_encoder_test.go:106:18) (typecheck)
err := output.Publish(context.Background(), batch)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -167,7 +164,7 @@
},
}

client.conn.DeletePipeline(pipeline, nil)
_, _, _ = client.conn.DeletePipeline(pipeline, nil)
_, resp, err := client.conn.CreatePipeline(pipeline, nil, pipelineBody)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -217,29 +214,30 @@
},
},
})
client.conn.Delete(index, "", "", nil)
client.conn.Delete(deadletterIndex, "", "", nil)
_, _, _ = client.conn.Delete(index, "", "", nil)
_, _, _ = client.conn.Delete(deadletterIndex, "", "", nil)

err := output.Publish(context.Background(), outest.NewBatch(beat.Event{
batch := encodeBatch(client, outest.NewBatch(beat.Event{

Check failure on line 220 in libbeat/outputs/elasticsearch/client_integration_test.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

cannot infer B (libbeat/outputs/elasticsearch/event_encoder_test.go:106:18) (typecheck)
Timestamp: time.Now(),
Fields: mapstr.M{
"type": "libbeat",
"message": "Test message 1",
"testfield": 0,
},
}))
err := output.Publish(context.Background(), batch)
if err != nil {
t.Fatal(err)
}

batch := outest.NewBatch(beat.Event{
batch = encodeBatch(client, outest.NewBatch(beat.Event{
Timestamp: time.Now(),
Fields: mapstr.M{
"type": "libbeat",
"message": "Test message 2",
"testfield": "foo0",
},
})
}))
err = output.Publish(context.Background(), batch)
if err == nil {
t.Fatal("Expecting mapping conflict")
Expand Down Expand Up @@ -277,14 +275,15 @@
"index": index,
"pipeline": "%{[pipeline]}",
})
client.conn.Delete(index, "", "", nil)
_, _, _ = client.conn.Delete(index, "", "", nil)

if client.conn.GetVersion().Major < 5 {
t.Skip("Skipping tests as pipeline not available in <5.x releases")
}

publish := func(events ...beat.Event) {
err := output.Publish(context.Background(), outest.NewBatch(events...))
batch := encodeBatch(client, outest.NewBatch(events...))
err := output.Publish(context.Background(), batch)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -312,7 +311,7 @@
},
}

client.conn.DeletePipeline(pipeline, nil)
_, _, _ = client.conn.DeletePipeline(pipeline, nil)
_, resp, err := client.conn.CreatePipeline(pipeline, nil, pipelineBody)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -354,14 +353,14 @@
"index": index,
})

client.conn.Delete(index, "", "", nil)
_, _, _ = client.conn.Delete(index, "", "", nil)

batch := outest.NewBatch(beat.Event{
batch := encodeBatch(client, outest.NewBatch(beat.Event{
Timestamp: time.Now(),
Fields: mapstr.M{
"message": "Hello world",
},
})
}))

tx, spans, _ := apmtest.WithTransaction(func(ctx context.Context) {
err := output.Publish(ctx, batch)
Expand Down Expand Up @@ -434,7 +433,7 @@
client := randomClient(output).(clientWrap).Client().(*Client)

// Load version number
client.Connect()
_ = client.Connect()

return client, client
}
Expand Down Expand Up @@ -475,32 +474,3 @@
client := grp.Clients[rand.Intn(L)]
return client.(outputs.NetworkClient)
}

// startTestProxy starts a proxy that redirects all connections to the specified URL
func startTestProxy(t *testing.T, redirectURL string) *httptest.Server {
t.Helper()

realURL, err := url.Parse(redirectURL)
require.NoError(t, err)

proxy := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
req := r.Clone(context.Background())
req.RequestURI = ""
req.URL.Scheme = realURL.Scheme
req.URL.Host = realURL.Host

resp, err := http.DefaultClient.Do(req)
require.NoError(t, err)
defer resp.Body.Close()

body, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)

for _, header := range []string{"Content-Encoding", "Content-Type"} {
w.Header().Set(header, resp.Header.Get(header))
}
w.WriteHeader(resp.StatusCode)
w.Write(body)
}))
return proxy
}
Loading
Loading