Skip to content
This repository has been archived by the owner on Sep 21, 2023. It is now read-only.

Kafka Output #147

Merged
merged 31 commits into from
Nov 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
a9f5924
wip
robbavey Oct 18, 2022
9c3238a
Initial import of kafka from libbeat
robbavey Oct 18, 2022
86905bf
Stub output
robbavey Oct 18, 2022
0d3e65f
Something compiles
robbavey Oct 18, 2022
1424f70
Base "working" kafka output
robbavey Oct 19, 2022
650dab6
Add rudimentary key support, and fix timestamps
robbavey Oct 20, 2022
97b9050
Checkpoint before converting messages.Event to beats.Event
robbavey Oct 25, 2022
2dd711d
json encoder for content
robbavey Oct 25, 2022
f27651a
tidy
robbavey Oct 26, 2022
d84b99f
Move kafka output config into it's own space
robbavey Oct 26, 2022
99899b5
Fix up license headers
robbavey Oct 26, 2022
6505e9b
Test with SyncProducer
robbavey Oct 27, 2022
ae2ccc2
Merge remote-tracking branch 'upstream/main' into feature/kafka-output
robbavey Oct 27, 2022
be43076
Add enabled flag to kafka output
robbavey Oct 27, 2022
07761f6
Fixes for automated tests
robbavey Oct 27, 2022
1de9908
Formatting for linter
robbavey Oct 27, 2022
e82ec7c
Fix broken config
robbavey Oct 27, 2022
0d7c60b
goimports fixes
robbavey Oct 28, 2022
b9c9ef1
Fixes to make linter happy
robbavey Oct 28, 2022
706aafa
Add unit tests for config
robbavey Oct 28, 2022
dd324de
Add license header to test file
robbavey Oct 28, 2022
c76901a
Formatting
robbavey Oct 28, 2022
c3706b2
Check return code of validate in tests
robbavey Oct 28, 2022
94d1ffc
Use AsyncProducer
robbavey Oct 31, 2022
a81e1da
Update license
robbavey Oct 31, 2022
5ac7ac9
Linter check
robbavey Oct 31, 2022
eca83ab
Use errors.Is to pacify linter
robbavey Nov 1, 2022
0bcb268
Use elastic fork of sarama library, and remove unused code
robbavey Nov 8, 2022
a5a4c6e
Update NOTICE.txt
robbavey Nov 8, 2022
fa3d152
Fix conflicts with 'upstream/main'
robbavey Nov 8, 2022
20abe54
Added link to issue to Kerberos TODO comment
robbavey Nov 8, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
522 changes: 470 additions & 52 deletions NOTICE.txt

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func readConfig(unpacker rawUnpacker) (config ShipperConfig, err error) {
Monitor: monitoring.DefaultConfig(),
Queue: queue.DefaultConfig(),
Server: server.DefaultConfig(),
Output: output.DefaultConfig(),
}

err = unpacker(&config)
Expand Down
4 changes: 4 additions & 0 deletions controller/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/elastic/elastic-agent-shipper/monitoring"
"github.com/elastic/elastic-agent-shipper/output"
"github.com/elastic/elastic-agent-shipper/output/elasticsearch"
"github.com/elastic/elastic-agent-shipper/output/kafka"
"github.com/elastic/elastic-agent-shipper/queue"
"github.com/elastic/elastic-agent-shipper/server"

Expand Down Expand Up @@ -209,6 +210,9 @@ func outputFromConfig(config output.Config, queue *queue.Queue) (Output, error)
if config.Elasticsearch != nil {
return elasticsearch.NewElasticSearch(config.Elasticsearch, queue), nil
}
if config.Kafka != nil && config.Kafka.Enabled {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessarily something to fix in this PR, but if a user defines multiple outputs it looks like we will just arbitrarily pick one with an undocumented precedence.

It would be better if we could return an error to the user indicating that multiple outputs are unsupported. Probably the best way to do this is to report the shipper state as FAILED when the agent gives it an expected configuration with multiple outputs.

If someone has a better idea, let me know. Otherwise I'll write up an issue for us to implement this.

return kafka.NewKafka(config.Kafka, queue), nil
}
if config.Console != nil && config.Console.Enabled {
return output.NewConsole(queue), nil
}
Expand Down
12 changes: 11 additions & 1 deletion elastic-agent-shipper.yml
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ output:
enabled: true

elasticsearch:
# To use the elasticsearch output, disable the console output and
# To use the elasticsearch output, disable the other outputs and
# set elasticsearch.enabled to true.
enabled: false
hosts: ["https://localhost:9200"]
Expand All @@ -133,3 +133,13 @@ output:
# These options may be convenient for local testing:
#allow_older_versions: true
#ssl.verification_mode: none

kafka:
# To use the kafka output, disable the other outputs and
# set kafka.enabled to true.
enabled: false
hosts: ["localhost:9092", "localhost:9093"]
topic: '%{[metricset][name]}'
partition.round_robin:
reachable_only: true
key: '%{[metricset][name]}'
17 changes: 16 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
)

require (
github.com/eapache/go-resiliency v1.2.0
github.com/elastic/beats/v7 v7.0.0-alpha2.0.20220810153818-dd118efed5a5
github.com/elastic/elastic-agent-client/v7 v7.0.0-20221102171927-bc376a4e0f9f
github.com/elastic/elastic-agent-shipper-client v0.4.0
Expand All @@ -32,6 +33,8 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dnephin/pflag v1.0.7 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/elastic/go-licenser v0.4.1 // indirect
github.com/elastic/go-structform v0.0.10 // indirect
github.com/elastic/go-sysinfo v1.8.1 // indirect
Expand All @@ -40,28 +43,37 @@ require (
github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/gobuffalo/here v0.6.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/licenseclassifier v0.0.0-20200402202327-879cb1424de0 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-uuid v1.0.2 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/jcchavezs/porto v0.4.0 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.0.0 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.2 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 // indirect
github.com/jonboulle/clockwork v0.2.2 // indirect
github.com/karrick/godirwalk v1.15.8 // indirect
github.com/kr/pretty v0.2.1 // indirect
github.com/klauspost/compress v1.13.6 // indirect
github.com/markbates/pkger v0.17.0 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/pierrec/lz4 v2.6.0+incompatible // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/santhosh-tekuri/jsonschema v1.2.4 // indirect
github.com/sergi/go-diff v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/xdg/scram v1.0.3 // indirect
github.com/xdg/stringprep v1.0.3 // indirect
go.elastic.co/apm/module/apmelasticsearch/v2 v2.0.0 // indirect
go.elastic.co/apm/module/apmhttp/v2 v2.0.0 // indirect
go.elastic.co/apm/v2 v2.1.0 // indirect
Expand All @@ -86,7 +98,10 @@ require (
howett.net/plist v1.0.0 // indirect
)

require github.com/Shopify/sarama v1.27.0

replace (
github.com/Shopify/sarama => github.com/elastic/sarama v1.19.1-0.20220310193331-ebc2b0d8eef3
github.com/dop251/goja => github.com/andrewkroh/goja v0.0.0-20190128172624-dd2ac4456e20
github.com/dop251/goja_nodejs => github.com/dop251/goja_nodejs v0.0.0-20171011081505-adff31b136e6
)
Loading