From 0cb4babab526a98200762f6ffdfdb26735c9abba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florent=20Rami=C3=A8re?= Date: Thu, 31 Mar 2016 11:14:20 +0200 Subject: [PATCH 1/3] Add compression/acks/retry conf to Kafka output plugin The following configuration is now possible ## CompressionCodec represents the various compression codecs recognized by Kafka in messages. ## "none" : No compression ## "gzip" : Gzip compression ## "snappy" : Snappy compression # compression_codec = "none" ## RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements it must see before responding ## "none" : the producer never waits for an acknowledgement from the broker. This option provides the lowest latency but the weakest durability guarantees (some data will be lost when a server fails). ## "leader" : the producer gets an acknowledgement after the leader replica has received the data. This option provides better durability as the client waits until the server acknowledges the request as successful (only messages that were written to the now-dead leader but not yet replicated will be lost). ## "leader_and_replicas" : the producer gets an acknowledgement after all in-sync replicas have received the data. This option provides the best durability, we guarantee that no messages will be lost as long as at least one in sync replica remains. # required_acks = "leader_and_replicas" ## The total number of times to retry sending a message # max_retry = "3" --- plugins/outputs/kafka/kafka.go | 85 ++++++++++++++++++++++++++++++++-- 1 file changed, 81 insertions(+), 4 deletions(-) diff --git a/plugins/outputs/kafka/kafka.go b/plugins/outputs/kafka/kafka.go index 8dea2b2a182ad..2bba2e77e5a49 100644 --- a/plugins/outputs/kafka/kafka.go +++ b/plugins/outputs/kafka/kafka.go @@ -3,6 +3,8 @@ package kafka import ( "crypto/tls" "fmt" + "strconv" + "strings" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" @@ -19,6 +21,12 @@ type Kafka struct { Topic string // Routing Key Tag RoutingTag string `toml:"routing_tag"` + // Compression Codec Tag + CompressionCodec string + // RequiredAcks Tag + RequiredAcks string + // MaxRetry Tag + MaxRetry string // Legacy SSL config options // TLS client certificate @@ -53,6 +61,21 @@ var sampleConfig = ` ## ie, if this tag exists, it's value will be used as the routing key routing_tag = "host" + ## CompressionCodec represents the various compression codecs recognized by Kafka in messages. + ## "none" : No compression + ## "gzip" : Gzip compression + ## "snappy" : Snappy compression + # compression_codec = "none" + + ## RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements it must see before responding + ## "none" : the producer never waits for an acknowledgement from the broker. This option provides the lowest latency but the weakest durability guarantees (some data will be lost when a server fails). + ## "leader" : the producer gets an acknowledgement after the leader replica has received the data. This option provides better durability as the client waits until the server acknowledges the request as successful (only messages that were written to the now-dead leader but not yet replicated will be lost). + ## "leader_and_replicas" : the producer gets an acknowledgement after all in-sync replicas have received the data. This option provides the best durability, we guarantee that no messages will be lost as long as at least one in sync replica remains. + # required_acks = "leader_and_replicas" + + ## The total number of times to retry sending a message + # max_retry = "3" + ## Optional SSL Config # ssl_ca = "/etc/telegraf/ca.pem" # ssl_cert = "/etc/telegraf/cert.pem" @@ -71,12 +94,66 @@ func (k *Kafka) SetSerializer(serializer serializers.Serializer) { k.serializer = serializer } +func requiredAcks(value string) (sarama.RequiredAcks, error) { + switch strings.ToLower(value) { + case "none": + return sarama.NoResponse, nil + case "leader": + return sarama.WaitForLocal, nil + case "", "leader_and_replicas": + return sarama.WaitForAll, nil + default: + return 0, fmt.Errorf("Failed to recognize required_acks: %s", value) + } +} + +func compressionCodec(value string) (sarama.CompressionCodec, error) { + switch strings.ToLower(value) { + case "gzip": + return sarama.CompressionGZIP, nil + case "snappy": + return sarama.CompressionSnappy, nil + case "", "none": + return sarama.CompressionNone, nil + default: + return 0, fmt.Errorf("Failed to recognize compression_codec: %s", value) + } +} + +func maxRetry(value string) (int, error) { + if value == "" { + return 3, nil + } + maxRetry, err := strconv.Atoi(value) + if err != nil { + return -1, fmt.Errorf("Failed to parse max_retry: %s", value) + } + if maxRetry < 0 { + return -1, fmt.Errorf("max_retry is %s but it should not be negative", value) + } + return maxRetry, nil +} + func (k *Kafka) Connect() error { config := sarama.NewConfig() - // Wait for all in-sync replicas to ack the message - config.Producer.RequiredAcks = sarama.WaitForAll - // Retry up to 10 times to produce the message - config.Producer.Retry.Max = 10 + + requiredAcks, err := requiredAcks(k.RequiredAcks) + if err != nil { + return err + } + config.Producer.RequiredAcks = requiredAcks + + compressionCodec, err := compressionCodec(k.CompressionCodec) + if err != nil { + return err + } + config.Producer.Compression = compressionCodec + + maxRetry, err := maxRetry(k.MaxRetry) + if err != nil { + return err + } + config.Producer.Retry.Max = maxRetry // Legacy support ssl config if k.Certificate != "" { From e918bfbbec94afd30f12aceec48dc5ea39e9ace7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florent=20Rami=C3=A8re?= Date: Thu, 31 Mar 2016 17:27:14 +0200 Subject: [PATCH 2/3] Use numerical codes instead of symbolic ones --- plugins/outputs/kafka/kafka.go | 92 +++++++--------------------------- 1 file changed, 18 insertions(+), 74 deletions(-) diff --git a/plugins/outputs/kafka/kafka.go b/plugins/outputs/kafka/kafka.go index 2bba2e77e5a49..3cecfeeab3462 100644 --- a/plugins/outputs/kafka/kafka.go +++ b/plugins/outputs/kafka/kafka.go @@ -3,8 +3,6 @@ package kafka import ( "crypto/tls" "fmt" - "strconv" - "strings" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" @@ -22,11 +20,11 @@ type Kafka struct { // Routing Key Tag RoutingTag string `toml:"routing_tag"` // Compression Codec Tag - CompressionCodec string + CompressionCodec int // RequiredAcks Tag - RequiredAcks string + RequiredAcks int // MaxRetry Tag - MaxRetry string + MaxRetry int // Legacy SSL config options // TLS client certificate @@ -61,20 +59,20 @@ var sampleConfig = ` ## ie, if this tag exists, it's value will be used as the routing key routing_tag = "host" - ## CompressionCodec represents the various compression codecs recognized by Kafka in messages. - ## "none" : No compression - ## "gzip" : Gzip compression - ## "snappy" : Snappy compression - # compression_codec = "none" + ## CompressionCodec represents the various compression codecs recognized by Kafka in messages. + ## 0 : No compression + ## 1 : Gzip compression + ## 2 : Snappy compression + compression_codec = 0 - ## RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements it must see before responding - ## "none" : the producer never waits for an acknowledgement from the broker. This option provides the lowest latency but the weakest durability guarantees (some data will be lost when a server fails). - ## "leader" : the producer gets an acknowledgement after the leader replica has received the data. This option provides better durability as the client waits until the server acknowledges the request as successful (only messages that were written to the now-dead leader but not yet replicated will be lost). - ## "leader_and_replicas" : the producer gets an acknowledgement after all in-sync replicas have received the data. This option provides the best durability, we guarantee that no messages will be lost as long as at least one in sync replica remains. - # required_acks = "leader_and_replicas" + ## RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements it must see before responding + ## 0 : the producer never waits for an acknowledgement from the broker. This option provides the lowest latency but the weakest durability guarantees (some data will be lost when a server fails). + ## 1 : the producer gets an acknowledgement after the leader replica has received the data. This option provides better durability as the client waits until the server acknowledges the request as successful (only messages that were written to the now-dead leader but not yet replicated will be lost). + ## -1 : the producer gets an acknowledgement after all in-sync replicas have received the data. This option provides the best durability, we guarantee that no messages will be lost as long as at least one in sync replica remains. + required_acks = -1 - ## The total number of times to retry sending a message - # max_retry = "3" + ## The total number of times to retry sending a message + max_retry = 3 ## Optional SSL Config # ssl_ca = "/etc/telegraf/ca.pem" @@ -94,66 +92,12 @@ func (k *Kafka) SetSerializer(serializer serializers.Serializer) { k.serializer = serializer } -func requiredAcks(value string) (sarama.RequiredAcks, error) { - switch strings.ToLower(value) { - case "none": - return sarama.NoResponse, nil - case "leader": - return sarama.WaitForLocal, nil - case "", "leader_and_replicas": - return sarama.WaitForAll, nil - default: - return 0, fmt.Errorf("Failed to recognize required_acks: %s", value) - } -} - -func compressionCodec(value string) (sarama.CompressionCodec, error) { - switch strings.ToLower(value) { - case "gzip": - return sarama.CompressionGZIP, nil - case "snappy": - return sarama.CompressionSnappy, nil - case "", "none": - return sarama.CompressionNone, nil - default: - return 0, fmt.Errorf("Failed to recognize compression_codec: %s", value) - } -} - -func maxRetry(value string) (int, error) { - if value == "" { - return 3, nil - } - maxRetry, err := strconv.Atoi(value) - if err != nil { - return -1, fmt.Errorf("Failed to parse max_retry: %s", value) - } - if maxRetry < 0 { - return -1, fmt.Errorf("max_retry is %s but it should not be negative", value) - } - return maxRetry, nil -} - func (k *Kafka) Connect() error { config := sarama.NewConfig() - requiredAcks, err := requiredAcks(k.RequiredAcks) - if err != nil { - return err - } - config.Producer.RequiredAcks = requiredAcks - - compressionCodec, err := compressionCodec(k.CompressionCodec) - if err != nil { - return err - } - config.Producer.Compression = compressionCodec - - maxRetry, err := maxRetry(k.MaxRetry) - if err != nil { - return err - } - config.Producer.Retry.Max = maxRetry + config.Producer.RequiredAcks = sarama.RequiredAcks(k.RequiredAcks) + config.Producer.Compression = sarama.CompressionCodec(k.CompressionCodec) + config.Producer.Retry.Max = k.MaxRetry // Legacy support ssl config if k.Certificate != "" { From 3d8a1e8fdbe1aba0388fd3d8dc2996401f3d65ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florent=20Rami=C3=A8re?= Date: Thu, 31 Mar 2016 17:30:39 +0200 Subject: [PATCH 3/3] Update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 08c4b6ceba10a..f43aca1611064 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ - [#848](https://github.com/influxdata/telegraf/issues/848): Provide option to omit host tag from telegraf agent. - [#928](https://github.com/influxdata/telegraf/pull/928): Deprecating the statsd "convert_names" options, expose separator config. - [#919](https://github.com/influxdata/telegraf/pull/919): ipmi_sensor input plugin. Thanks @ebookbug! +- [#945](https://github.com/influxdata/telegraf/pull/945): KAFKA output: codec, acks, and retry configuration. Thanks @framiere! ### Bugfixes - [#890](https://github.com/influxdata/telegraf/issues/890): Create TLS config even if only ssl_ca is provided.