Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to sarama v1.10.0 #2190

Merged
merged 2 commits into from
Aug 10, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,12 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d
- Configurable redis `keys` using filters and format strings. {pull}2169[2169]
- Add format string support to `output.kafka.topic`. {pull}2188[2188]
- Add `output.kafka.topics` for more advanced kafka topic selection per event. {pull}2188[2188]

- Add support for kafka 0.10.
- Add support for kafka 0.10. {pull}2190[2190]
- Add SASL/PLAIN authentication support to kafka output. {pull}2190[2190]
- Make Kafka metadata update configurable. {pull}2190[2190]
- Add kafka version setting (optional) enabling kafka broker version support. {pull}2190[2190]
- Add kafka message timestamp if at least version 0.10 is configured. {pull}2190[2190]

*Metricbeat*

Expand Down
21 changes: 21 additions & 0 deletions filebeat/filebeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,27 @@ output.elasticsearch:
# using any event field. To set the topic from document type use `%{[type]}`.
#topic: beats

# Authentication details. Password is required if username is set.
#username: ''
#password: ''

# Kafka version filebeat is assumed to run against. Defaults to the oldest
# supported stable version (currently version 0.8.2.0)
#version: 0.8.2

# Metadata update configuration. Metadata do contain leader information
# deciding which broker to use when publishing.
#metadata:
# Max metadata request retry attempts when cluster is in middle of leader
# election. Defaults to 3 retries.
#retry.max: 3

# Waiting time between retries during leader elections. Default is 250ms.
#retry.backoff: 250ms

# Refresh metadata interval. Defaults to every 10 minutes.
#refresh_frequency: 10m

# The number of concurrent load-balanced Kafka output workers.
#worker: 1

Expand Down
4 changes: 3 additions & 1 deletion glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,13 @@ import:
- package: github.com/miekg/dns
version: 5d001d020961ae1c184f9f8152fdc73810481677
- package: github.com/Shopify/sarama
version: v1.9.0
version: v1.10.0
- package: github.com/klauspost/crc32
version: v1.0
- package: github.com/golang/snappy
version: d9eb7a3d35ec988b8585d4a0068e462c27d28380
- package: github.com/eapache/go-xerial-snappy
version: bb955e01b9346ac19dc29eb16586c90ded99a98c
- package: github.com/eapache/queue
version: ded5959c0d4e360646dc9e9908cff48666781367
- package: github.com/eapache/go-resiliency
Expand Down
21 changes: 21 additions & 0 deletions libbeat/_meta/config.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,27 @@ output.elasticsearch:
# using any event field. To set the topic from document type use `%{[type]}`.
#topic: beats

# Authentication details. Password is required if username is set.
#username: ''
#password: ''

# Kafka version beatname is assumed to run against. Defaults to the oldest
# supported stable version (currently version 0.8.2.0)
#version: 0.8.2

# Metadata update configuration. Metadata do contain leader information
# deciding which broker to use when publishing.
#metadata:
# Max metadata request retry attempts when cluster is in middle of leader
# election. Defaults to 3 retries.
#retry.max: 3

# Waiting time between retries during leader elections. Default is 250ms.
#retry.backoff: 250ms

# Refresh metadata interval. Defaults to every 10 minutes.
#refresh_frequency: 10m

# The number of concurrent load-balanced Kafka output workers.
#worker: 1

Expand Down
29 changes: 29 additions & 0 deletions libbeat/docs/outputconfig.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,24 @@ The default value is true.
The list of Kafka broker addresses from where to fetch the cluster metadata.
The cluster metadata contain the actual Kafka brokers events are published to.

===== version

Kafka version ${beatname_lc} is assumed to run against. Defaults to oldest
supported stable version (currently version 0.8.2.0).

Event timestamps will be added, if version 0.10.0.0+ is enabled.

Valid values are `0.8.2.0`, `0.8.2.1`, `0.8.2.2`, `0.8.2`, `0.8`, `0.9.0.0`,
`0.9.0.1`, `0.9.0`, `0.9`, `0.10.0.0`, `0.10.0`, and `0.10`.

===== username

The username for connecting to Kafka. If username is configured, the passowrd must be configured as well. Only SASL/PLAIN is supported.

===== password

The password for connecting to Kafka.

===== topic

The Kafka topic used for produced events. The setting can be a format string
Expand All @@ -554,6 +572,17 @@ The configurable ClientID used for logging, debugging, and auditing purposes. Th

The number of concurrent load-balanced Kafka output workers.

===== metadata

Kafka metadata update settings. The metadata do contain information about
brokers, topics, partition, and active leaders to use for publishing.

*`refresh_frequency`*:: Metadata refreash interval. Defaults to 10 minutes.

*`retry.max`*:: Total number of metadata update retries when cluster is in middle of leader election. The default is 3.

*`retry.backoff`*:: Waiting time between retries during leader elections. Default is 250ms.

===== max_retries

The number of times to retry publishing an event after a publishing failure.
Expand Down
20 changes: 17 additions & 3 deletions libbeat/outputs/kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,30 @@ func (c *client) AsyncPublishEvents(
for _, event := range events {
topic, err := c.topic.Select(event)

var ts time.Time

// message timestamps have been added to kafka with version 0.10.0.0
if c.config.Version.IsAtLeast(sarama.V0_10_0_0) {
if tsRaw, ok := event["@timestamp"]; ok {
if tmp, ok := tsRaw.(common.Time); ok {
ts = time.Time(tmp)
} else if tmp, ok := tsRaw.(time.Time); ok {
ts = tmp
}
}
}

jsonEvent, err := json.Marshal(event)
if err != nil {
ref.done()
continue
}

msg := &sarama.ProducerMessage{
Metadata: ref,
Topic: topic,
Value: sarama.ByteEncoder(jsonEvent),
Metadata: ref,
Topic: topic,
Value: sarama.ByteEncoder(jsonEvent),
Timestamp: ts,
}

ch <- msg
Expand Down
40 changes: 36 additions & 4 deletions libbeat/outputs/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,54 @@ type kafkaConfig struct {
TLS *outputs.TLSConfig `config:"tls"`
Timeout time.Duration `config:"timeout" validate:"min=1"`
Worker int `config:"worker" validate:"min=1"`
Metadata metaConfig `config:"metadata"`
KeepAlive time.Duration `config:"keep_alive" validate:"min=0"`
MaxMessageBytes *int `config:"max_message_bytes" validate:"min=1"`
RequiredACKs *int `config:"required_acks" validate:"min=-1"`
BrokerTimeout time.Duration `config:"broker_timeout" validate:"min=1"`
Compression string `config:"compression"`
Version string `config:"version"`
MaxRetries int `config:"max_retries" validate:"min=-1,nonzero"`
ClientID string `config:"client_id"`
ChanBufferSize int `config:"channel_buffer_size" validate:"min=1"`
Username string `config:"username"`
Password string `config:"password"`
}

type metaConfig struct {
Retry metaRetryConfig `config:"retry"`
RefreshFreq time.Duration `config:"refresh_frequency" validate:"min=0"`
}

type metaRetryConfig struct {
Max int `config:"max" validate:"min=0"`
Backoff time.Duration `config:"backoff" validate:"min=0"`
}

var (
defaultConfig = kafkaConfig{
Hosts: nil,
TLS: nil,
Timeout: 30 * time.Second,
Worker: 1,
Hosts: nil,
TLS: nil,
Timeout: 30 * time.Second,
Worker: 1,
Metadata: metaConfig{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen if these settings are used with an older version of Kafka?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sarama lib supports kafka 0.8 - 0.10 . Settings should be valid for all kafka versions. Lib might internally ignore settings if they do not apply to the version being used. AFAIK metadata updates and publisher settings should have same effect for kafka versions.

Retry: metaRetryConfig{
Max: 3,
Backoff: 250 * time.Millisecond,
},
RefreshFreq: 10 * time.Minute,
},
KeepAlive: 0,
MaxMessageBytes: nil, // use library default
RequiredACKs: nil, // use library default
BrokerTimeout: 10 * time.Second,
Compression: "gzip",
Version: "",
MaxRetries: 3,
ClientID: "beats",
ChanBufferSize: 256,
Username: "",
Password: "",
}
)

Expand All @@ -50,5 +74,13 @@ func (c *kafkaConfig) Validate() error {
return fmt.Errorf("compression mode '%v' unknown", c.Compression)
}

if _, ok := kafkaVersions[c.Version]; !ok {
return fmt.Errorf("unknown/unsupported kafka version '%v'", c.Version)
}

if c.Username != "" && c.Password == "" {
return fmt.Errorf("password must be set when username is configured")
}

return nil
}
39 changes: 37 additions & 2 deletions libbeat/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,25 @@ var (
"gzip": sarama.CompressionGZIP,
"snappy": sarama.CompressionSnappy,
}

kafkaVersions = map[string]sarama.KafkaVersion{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was first thinking something is wrong below as some "entries" were missing because the sorting on the right is not based on the version. I would probably sort based on the values on the right side. But that is more my personal preference. Just wanted to note it as I was first confused.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change ordering

"": sarama.V0_8_2_0,

"0.8.2.0": sarama.V0_8_2_0,
"0.8.2.1": sarama.V0_8_2_1,
"0.8.2.2": sarama.V0_8_2_2,
"0.8.2": sarama.V0_8_2_2,
"0.8": sarama.V0_8_2_2,

"0.9.0.0": sarama.V0_9_0_0,
"0.9.0.1": sarama.V0_9_0_1,
"0.9.0": sarama.V0_9_0_1,
"0.9": sarama.V0_9_0_1,

"0.10.0.0": sarama.V0_10_0_0,
"0.10.0": sarama.V0_10_0_0,
"0.10": sarama.V0_10_0_0,
}
)

// New instantiates a new kafka output instance.
Expand Down Expand Up @@ -211,8 +230,16 @@ func newKafkaConfig(config *kafkaConfig) (*sarama.Config, error) {
k.Net.TLS.Enable = tls != nil
k.Net.TLS.Config = tls

// TODO: configure metadata level properties
// use lib defaults
if config.Username != "" {
k.Net.SASL.Enable = true
k.Net.SASL.User = config.Username
k.Net.SASL.Password = config.Password
}

// configure metadata update properties
k.Metadata.Retry.Max = config.Metadata.Retry.Max
k.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff
k.Metadata.RefreshFrequency = config.Metadata.RefreshFreq

// configure producer API properties
if config.MaxMessageBytes != nil {
Expand All @@ -237,6 +264,7 @@ func newKafkaConfig(config *kafkaConfig) (*sarama.Config, error) {
retryMax = 1000
}
k.Producer.Retry.Max = retryMax
// TODO: k.Producer.Retry.Backoff = ?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this stay in?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. It's some samara internal config with default 100ms kinda overlapping with libbeat functionality (not critical, though). Added TODO so I will reconsider a parameter maybe another time instead of hiding the fact something might be "incomplete".


// configure per broker go channel buffering
k.ChannelBufferSize = config.ChanBufferSize
Expand All @@ -247,5 +275,12 @@ func newKafkaConfig(config *kafkaConfig) (*sarama.Config, error) {
logp.Err("Invalid kafka configuration: %v", err)
return nil, err
}

version, ok := kafkaVersions[config.Version]
if !ok {
return nil, fmt.Errorf("Unknown/unsupported kafka version: %v", config.Version)
}
k.Version = version

return k, nil
}
21 changes: 21 additions & 0 deletions metricbeat/metricbeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,27 @@ output.elasticsearch:
# using any event field. To set the topic from document type use `%{[type]}`.
#topic: beats

# Authentication details. Password is required if username is set.
#username: ''
#password: ''

# Kafka version metricbeat is assumed to run against. Defaults to the oldest
# supported stable version (currently version 0.8.2.0)
#version: 0.8.2

# Metadata update configuration. Metadata do contain leader information
# deciding which broker to use when publishing.
#metadata:
# Max metadata request retry attempts when cluster is in middle of leader
# election. Defaults to 3 retries.
#retry.max: 3

# Waiting time between retries during leader elections. Default is 250ms.
#retry.backoff: 250ms

# Refresh metadata interval. Defaults to every 10 minutes.
#refresh_frequency: 10m

# The number of concurrent load-balanced Kafka output workers.
#worker: 1

Expand Down
21 changes: 21 additions & 0 deletions packetbeat/packetbeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,27 @@ output.elasticsearch:
# using any event field. To set the topic from document type use `%{[type]}`.
#topic: beats

# Authentication details. Password is required if username is set.
#username: ''
#password: ''

# Kafka version packetbeat is assumed to run against. Defaults to the oldest
# supported stable version (currently version 0.8.2.0)
#version: 0.8.2

# Metadata update configuration. Metadata do contain leader information
# deciding which broker to use when publishing.
#metadata:
# Max metadata request retry attempts when cluster is in middle of leader
# election. Defaults to 3 retries.
#retry.max: 3

# Waiting time between retries during leader elections. Default is 250ms.
#retry.backoff: 250ms

# Refresh metadata interval. Defaults to every 10 minutes.
#refresh_frequency: 10m

# The number of concurrent load-balanced Kafka output workers.
#worker: 1

Expand Down
19 changes: 19 additions & 0 deletions vendor/github.com/Shopify/sarama/.github/ISSUE_TEMPLATE.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading