Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA output: codec, acks, and retry configuration #945

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
- [#848](https://github.com/influxdata/telegraf/issues/848): Provide option to omit host tag from telegraf agent.
- [#928](https://github.com/influxdata/telegraf/pull/928): Deprecating the statsd "convert_names" options, expose separator config.
- [#919](https://github.com/influxdata/telegraf/pull/919): ipmi_sensor input plugin. Thanks @ebookbug!
- [#945](https://github.com/influxdata/telegraf/pull/945): KAFKA output: codec, acks, and retry configuration. Thanks @framiere!

### Bugfixes
- [#890](https://github.com/influxdata/telegraf/issues/890): Create TLS config even if only ssl_ca is provided.
Expand Down
29 changes: 25 additions & 4 deletions plugins/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ type Kafka struct {
Topic string
// Routing Key Tag
RoutingTag string `toml:"routing_tag"`
// Compression Codec Tag
CompressionCodec int
// RequiredAcks Tag
RequiredAcks int
// MaxRetry Tag
MaxRetry int

// Legacy SSL config options
// TLS client certificate
Expand Down Expand Up @@ -53,6 +59,21 @@ var sampleConfig = `
## ie, if this tag exists, it's value will be used as the routing key
routing_tag = "host"

## CompressionCodec represents the various compression codecs recognized by Kafka in messages.
## 0 : No compression
## 1 : Gzip compression
## 2 : Snappy compression
compression_codec = 0

## RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements it must see before responding
## 0 : the producer never waits for an acknowledgement from the broker. This option provides the lowest latency but the weakest durability guarantees (some data will be lost when a server fails).
## 1 : the producer gets an acknowledgement after the leader replica has received the data. This option provides better durability as the client waits until the server acknowledges the request as successful (only messages that were written to the now-dead leader but not yet replicated will be lost).
## -1 : the producer gets an acknowledgement after all in-sync replicas have received the data. This option provides the best durability, we guarantee that no messages will be lost as long as at least one in sync replica remains.
required_acks = -1

## The total number of times to retry sending a message
max_retry = 3

## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
Expand All @@ -73,10 +94,10 @@ func (k *Kafka) SetSerializer(serializer serializers.Serializer) {

func (k *Kafka) Connect() error {
config := sarama.NewConfig()
// Wait for all in-sync replicas to ack the message
config.Producer.RequiredAcks = sarama.WaitForAll
// Retry up to 10 times to produce the message
config.Producer.Retry.Max = 10

config.Producer.RequiredAcks = sarama.RequiredAcks(k.RequiredAcks)
config.Producer.Compression = sarama.CompressionCodec(k.CompressionCodec)
config.Producer.Retry.Max = k.MaxRetry

// Legacy support ssl config
if k.Certificate != "" {
Expand Down