Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA output: codec, acks, and retry configuration #945

Closed
wants to merge 3 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 81 additions & 4 deletions plugins/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package kafka
import (
"crypto/tls"
"fmt"
"strconv"
"strings"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
Expand All @@ -19,6 +21,12 @@ type Kafka struct {
Topic string
// Routing Key Tag
RoutingTag string `toml:"routing_tag"`
// Compression Codec Tag
CompressionCodec string
// RequiredAcks Tag
RequiredAcks string
// MaxRetry Tag
MaxRetry string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make MaxRetry an int directly


// Legacy SSL config options
// TLS client certificate
Expand Down Expand Up @@ -53,6 +61,21 @@ var sampleConfig = `
## ie, if this tag exists, it's value will be used as the routing key
routing_tag = "host"

## CompressionCodec represents the various compression codecs recognized by Kafka in messages.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these should be indented by two spaces, not a tab

## "none" : No compression
## "gzip" : Gzip compression
## "snappy" : Snappy compression
# compression_codec = "none"

## RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements it must see before responding
## "none" : the producer never waits for an acknowledgement from the broker. This option provides the lowest latency but the weakest durability guarantees (some data will be lost when a server fails).
## "leader" : the producer gets an acknowledgement after the leader replica has received the data. This option provides better durability as the client waits until the server acknowledges the request as successful (only messages that were written to the now-dead leader but not yet replicated will be lost).
## "leader_and_replicas" : the producer gets an acknowledgement after all in-sync replicas have received the data. This option provides the best durability, we guarantee that no messages will be lost as long as at least one in sync replica remains.
# required_acks = "leader_and_replicas"

## The total number of times to retry sending a message
# max_retry = "3"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

max_retry = 3


## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
Expand All @@ -71,12 +94,66 @@ func (k *Kafka) SetSerializer(serializer serializers.Serializer) {
k.serializer = serializer
}

func requiredAcks(value string) (sarama.RequiredAcks, error) {
switch strings.ToLower(value) {
case "none":
return sarama.NoResponse, nil
case "leader":
return sarama.WaitForLocal, nil
case "", "leader_and_replicas":
return sarama.WaitForAll, nil
default:
return 0, fmt.Errorf("Failed to recognize required_acks: %s", value)
}
}

func compressionCodec(value string) (sarama.CompressionCodec, error) {
switch strings.ToLower(value) {
case "gzip":
return sarama.CompressionGZIP, nil
case "snappy":
return sarama.CompressionSnappy, nil
case "", "none":
return sarama.CompressionNone, nil
default:
return 0, fmt.Errorf("Failed to recognize compression_codec: %s", value)
}
}

func maxRetry(value string) (int, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make MaxRetry an int and remove this parsing function

if value == "" {
return 3, nil
}
maxRetry, err := strconv.Atoi(value)
if err != nil {
return -1, fmt.Errorf("Failed to parse max_retry: %s", value)
}
if maxRetry < 0 {
return -1, fmt.Errorf("max_retry is %s but it should not be negative", value)
}
return maxRetry, nil
}

func (k *Kafka) Connect() error {
config := sarama.NewConfig()
// Wait for all in-sync replicas to ack the message
config.Producer.RequiredAcks = sarama.WaitForAll
// Retry up to 10 times to produce the message
config.Producer.Retry.Max = 10

requiredAcks, err := requiredAcks(k.RequiredAcks)
if err != nil {
return err
}
config.Producer.RequiredAcks = requiredAcks

compressionCodec, err := compressionCodec(k.CompressionCodec)
if err != nil {
return err
}
config.Producer.Compression = compressionCodec

maxRetry, err := maxRetry(k.MaxRetry)
if err != nil {
return err
}
config.Producer.Retry.Max = maxRetry

// Legacy support ssl config
if k.Certificate != "" {
Expand Down