Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add early encoding support to the Logstash output #38841

Draft
wants to merge 72 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
4ff1d75
unbuffer memory queue input channel
faec Feb 28, 2024
5627633
remove queue entry ids
faec Feb 28, 2024
6a27aeb
move ack callbacks to a dedicated worker goroutine
faec Feb 29, 2024
3871376
extra work to keep acknowledgments flowing
faec Feb 29, 2024
18ab493
add eager handling of deletions to the queue runloop
faec Feb 29, 2024
7ae8929
move eager deletion before the check to unblock the push channel
faec Feb 29, 2024
3f5f4ce
remove callback worker and runloop mods
faec Mar 2, 2024
d027a6f
add API support for preencoding beats events
faec Mar 2, 2024
016d0f3
add error check
faec Mar 2, 2024
15b3f7c
re-add entry ids
faec Mar 2, 2024
98f1ed9
encode events in goroutines
faec Mar 3, 2024
d51f6bd
switch to encoder factories to support parallel encoding
faec Mar 3, 2024
8add438
remember to close the channel
faec Mar 3, 2024
3cb57b4
encode in the pipeline client
faec Mar 4, 2024
c0f3b22
free original event data when encoding is done
faec Mar 5, 2024
bd1df4c
mark pre-encoded data correctly
faec Mar 5, 2024
63064a6
add early delete patch
faec Mar 5, 2024
e81e2d1
Merge branch 'main' of github.com:elastic/beats into memqueue-encode-…
faec Mar 22, 2024
8a3592e
disentangling the experimental scaffolding from the final api
faec Mar 22, 2024
65e0d9f
move the encoding datatypes into the queue package
faec Mar 22, 2024
2fd6081
Add early encoding hooks in the memory queue
faec Mar 22, 2024
eab04ff
pass encoder factory through during memqueue creation
faec Mar 22, 2024
b487665
add fallback encoding to client workers if queue doesn't support earl…
faec Mar 22, 2024
8ee228e
Remove proxy queue implementation
faec Mar 22, 2024
bd97879
Use the memory queue instead of the proxy queue in the shipper output
faec Mar 22, 2024
8e4321f
Merge branch 'delete-proxy-queue' into memqueue-encode-client
faec Mar 22, 2024
fc1914b
remove early-encoding hooks from pipeline clients
faec Mar 22, 2024
3119457
Change how encoded event sizes are reported
faec Mar 22, 2024
562e5bb
remove encoding hooks from output workers and eventConsumer
faec Mar 23, 2024
a19d91d
Cleanups
faec Mar 23, 2024
6d0a33b
revert unrelated changes
faec Mar 23, 2024
0d9c948
cleanup
faec Mar 23, 2024
64ab52f
revert unrelated change
faec Mar 23, 2024
1a591f0
revert unrelated change
faec Mar 23, 2024
cae64f7
revert no-op change
faec Mar 23, 2024
ec7080d
revert unneeded change
faec Mar 23, 2024
ebf2dc0
reworking dead letter index handling
faec Mar 23, 2024
22d8f62
Clean up non-indexable policy handilng in the ES output
faec Mar 25, 2024
565f7a6
Merge branch 'main' of github.com:elastic/beats into dead-letter-inde…
faec Mar 25, 2024
70026ad
Merge branch 'dead-letter-index-cleanup' into queue-early-encode
faec Mar 25, 2024
354d8d0
Merge branch 'main' of github.com:elastic/beats into queue-early-encode
faec Mar 26, 2024
df65a98
Merge branch 'main' of github.com:elastic/beats into queue-early-encode
faec Apr 5, 2024
c1da549
update integration tests
faec Apr 5, 2024
e989ac4
Merge branch 'main' of github.com:elastic/beats into queue-early-encode
faec Apr 5, 2024
485b684
remove old code
faec Apr 5, 2024
e75930e
update documentation comments
faec Apr 5, 2024
1fa7052
update more comments
faec Apr 5, 2024
c5851f9
make check
faec Apr 5, 2024
7befab7
small cleanups
faec Apr 5, 2024
50496fc
cleanups
faec Apr 5, 2024
176087f
update more integration tests
faec Apr 6, 2024
92b4522
add artificial panic to track down unupdated tests
faec Apr 6, 2024
5249d7b
fix function call, linter errors
faec Apr 8, 2024
02b0f6f
Merge branch 'main' of github.com:elastic/beats into queue-early-encode
faec Apr 8, 2024
b05d252
lint
faec Apr 8, 2024
46321e7
lint
faec Apr 8, 2024
4084d49
lint
faec Apr 8, 2024
403586b
working on tests
faec Apr 8, 2024
ebf3441
Finish test
faec Apr 8, 2024
6e581b2
Merge branch 'main' of github.com:elastic/beats into queue-early-encode
faec Apr 8, 2024
70c7ba4
remove futile nolint tags
faec Apr 8, 2024
faaaa02
make check
faec Apr 8, 2024
c61ce5f
add comments
faec Apr 8, 2024
8eedfd9
add comments
faec Apr 8, 2024
ed614fc
Merge branch 'benchmark-vanilla' into queue-early-encode
faec Apr 9, 2024
5cc9ebb
update logstash tests
faec Apr 10, 2024
732fcd8
update changelog
faec Apr 10, 2024
75f5e73
remove debug panic from the output
faec Apr 10, 2024
5fe0d35
Merge branch 'main' of github.com:elastic/beats into queue-early-encode
faec Apr 10, 2024
333365f
support early encoding in the logstash output
faec Apr 10, 2024
5be7689
fix data copying
faec Apr 10, 2024
a981a7b
Merge branch 'main' of github.com:elastic/beats into logstash-early-e…
faec Apr 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions libbeat/outputs/logstash/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,10 @@ func newAsyncClient(
log.Warn(`The async Logstash client does not support the "ttl" option`)
}

enc := makeLogstashEventEncoder(log, beat, config.EscapeHTML, config.Index)

queueSize := config.Pipelining - 1
timeout := config.Timeout
compressLvl := config.CompressionLevel
clientFactory := makeClientFactory(queueSize, timeout, enc, compressLvl)
clientFactory := makeClientFactory(queueSize, timeout, compressLvl)

var err error
c.client, err = clientFactory(c.Client)
Expand All @@ -104,12 +102,11 @@ func newAsyncClient(
func makeClientFactory(
queueSize int,
timeout time.Duration,
enc func(interface{}) ([]byte, error),
compressLvl int,
) func(net.Conn) (*v2.AsyncClient, error) {
return func(conn net.Conn) (*v2.AsyncClient, error) {
return v2.NewAsyncClientWithConn(conn, queueSize,
v2.JSONEncoder(enc),
v2.JSONEncoder(logstashEventUnwrapper),
v2.Timeout(timeout),
v2.CompressionLevel(compressLvl),
)
Expand Down Expand Up @@ -216,7 +213,7 @@ func (c *asyncClient) sendEvents(ref *msgRef, events []publisher.Event) error {
}
window := make([]interface{}, len(events))
for i := range events {
window[i] = &events[i].Content
window[i] = events[i].EncodedEvent
}
ref.count.Inc()
return client.Send(ref.callback, window)
Expand Down
73 changes: 64 additions & 9 deletions libbeat/outputs/logstash/enc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,79 @@
package logstash

import (
"strings"
"fmt"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/outputs/codec/json"
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
"github.com/elastic/elastic-agent-libs/logp"
)

func makeLogstashEventEncoder(log *logp.Logger, info beat.Info, escapeHTML bool, index string) func(interface{}) ([]byte, error) {
type eventEncoder struct {
log *logp.Logger
enc *json.Encoder
index string
}

type encodedEvent struct {
encoding []byte
err error
}

func newEventEncoderFactory(
log *logp.Logger,
info beat.Info,
escapeHTML bool,
index string,
) queue.EncoderFactory {
return func() queue.Encoder {
return newEventEncoder(log, info, escapeHTML, index)
}
}

func newEventEncoder(
log *logp.Logger,
info beat.Info,
escapeHTML bool,
index string,
) queue.Encoder {
enc := json.New(info.Version, json.Config{
Pretty: false,
EscapeHTML: escapeHTML,
})
index = strings.ToLower(index)
return func(event interface{}) (d []byte, err error) {
d, err = enc.Encode(index, event.(*beat.Event))
if err != nil {
log.Debugf("Failed to encode event: %v", event)
}
return
return &eventEncoder{
log: log,
enc: enc,
index: index,
}
}

func (e *eventEncoder) EncodeEntry(entry queue.Entry) (queue.Entry, int) {
pubEvent, ok := entry.(publisher.Event)
if !ok {
// Currently all queue entries are publisher.Events but let's be cautious.
return entry, 0
}
encoding, err := e.enc.Encode(e.index, &pubEvent.Content)
if err != nil {
e.log.Debugf("Failed to encode event: %v", pubEvent.Content)
}

copiedEncoding := make([]byte, len(encoding))
copy(copiedEncoding, encoding)
pubEvent.EncodedEvent = &encodedEvent{
encoding: copiedEncoding,
err: err,
}
pubEvent.Content = beat.Event{}
return pubEvent, len(encoding)
}

func logstashEventUnwrapper(event interface{}) ([]byte, error) {
encoded, ok := event.(*encodedEvent)
if !ok {
return nil, fmt.Errorf("event is wrong type (expected *encodedEvent)")
}
return encoded.encoding, encoded.err
}
9 changes: 8 additions & 1 deletion libbeat/outputs/logstash/logstash.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/outputs"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/transport"
"github.com/elastic/elastic-agent-libs/transport/tlscommon"
)
Expand Down Expand Up @@ -63,6 +64,12 @@ func makeLogstash(
Stats: observer,
}

encoderFactory := newEventEncoderFactory(
logp.NewLogger("logstash"),
beat,
lsConfig.EscapeHTML,
lsConfig.Index)

clients := make([]outputs.NetworkClient, len(hosts))
for i, host := range hosts {
var client outputs.NetworkClient
Expand All @@ -85,5 +92,5 @@ func makeLogstash(
clients[i] = client
}

return outputs.SuccessNet(lsConfig.Queue, lsConfig.LoadBalance, lsConfig.BulkMaxSize, lsConfig.MaxRetries, nil, clients)
return outputs.SuccessNet(lsConfig.Queue, lsConfig.LoadBalance, lsConfig.BulkMaxSize, lsConfig.MaxRetries, encoderFactory, clients)
}
5 changes: 2 additions & 3 deletions libbeat/outputs/logstash/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,8 @@ func newSyncClient(
}

var err error
enc := makeLogstashEventEncoder(log, beat, config.EscapeHTML, config.Index)
c.client, err = v2.NewSyncClientWithConn(conn,
v2.JSONEncoder(enc),
v2.JSONEncoder(logstashEventUnwrapper),
v2.Timeout(config.Timeout),
v2.CompressionLevel(config.CompressionLevel),
)
Expand Down Expand Up @@ -196,7 +195,7 @@ func (c *syncClient) publishWindowed(events []publisher.Event) (int, error) {
func (c *syncClient) sendEvents(events []publisher.Event) (int, error) {
window := make([]interface{}, len(events))
for i := range events {
window[i] = &events[i].Content
window[i] = events[i].EncodedEvent
}
return c.client.Send(window)
}
Loading