Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for kafka 2.0.0 #8399

Merged
merged 8 commits into from
Sep 24, 2018
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff]
- Report configured queue type. {pull}8091[8091]
- Added the `add_process_metadata` processor to enrich events with process information. {pull}6789[6789]
- Report number of open file handles on Windows. {pull}8329[8329]
- Support for Kafka 2.0.0 in kafka output {pull}8399[8399]

*Auditbeat*

Expand Down Expand Up @@ -132,6 +133,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff]
- Allow TCP helper to support delimiters and graphite module to accept multiple metrics in a single payload. {pull}8278[8278]
- Added 'died' PID state to process_system metricset on system module{pull}8275[8275]
- Added `ccr` metricset to Elasticsearch module. {pull}8335[8335]
- Support for Kafka 2.0.0 {pull}8399[8399]

*Packetbeat*

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,16 @@

package kafka

import "github.com/Shopify/sarama"
import (
"fmt"

"github.com/Shopify/sarama"
)

// Version is a kafka version
type Version struct {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported type Version should have comment or be unexported

String string
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could have the same features of the validation by using type Version string without having to have a String Field.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, changed


// TODO: remove me.
// Compat version overwrite for missing versions in sarama
Expand All @@ -31,8 +40,6 @@ var (
v1_1_1 = parseKafkaVersion("1.1.1")

kafkaVersions = map[string]sarama.KafkaVersion{
"": sarama.V1_0_0_0,

"0.8.2.0": sarama.V0_8_2_0,
"0.8.2.1": sarama.V0_8_2_1,
"0.8.2.2": sarama.V0_8_2_2,
Expand Down Expand Up @@ -68,6 +75,10 @@ var (
"1.1.1": v1_1_1,
"1.1": v1_1_1,
"1": v1_1_1,

"2.0.0": sarama.V2_0_0_0,
"2.0": sarama.V2_0_0_0,
"2": sarama.V2_0_0_0,
}
)

Expand All @@ -78,3 +89,29 @@ func parseKafkaVersion(s string) sarama.KafkaVersion {
}
return v
}

// Validate that a kafka version is among the possible options
func (v *Version) Validate() error {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method Version.Validate should have comment or be unexported

if _, ok := kafkaVersions[v.String]; !ok {
return fmt.Errorf("unknown/unsupported kafka vesion '%v'", v.String)
}

return nil
}

// Unpack a kafka version
func (v *Version) Unpack(s string) error {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method Version.Unpack should have comment or be unexported

tmp := Version{s}
if err := tmp.Validate(); err != nil {
return err
}

*v = tmp
return nil
}

// Get a sarama kafka version
func (v *Version) Get() (sarama.KafkaVersion, bool) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method Version.Get should have comment or be unexported

kv, ok := kafkaVersions[v.String]
return kv, ok
}
2 changes: 1 addition & 1 deletion libbeat/docs/outputconfig.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ NOTE: Events bigger than <<kafka-max_message_bytes,`max_message_bytes`>> will be

==== Compatibility

This output works with all Kafka in between 0.11 and 1.1.1. Older versions
This output works with all Kafka versions in between 0.11 and 2.0.0. Older versions
might work as well, but are not supported.

==== Configuration options
Expand Down
11 changes: 6 additions & 5 deletions libbeat/outputs/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/fmtstr"
"github.com/elastic/beats/libbeat/common/kafka"
"github.com/elastic/beats/libbeat/common/transport/tlscommon"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
Expand All @@ -48,7 +49,7 @@ type kafkaConfig struct {
BrokerTimeout time.Duration `config:"broker_timeout" validate:"min=1"`
Compression string `config:"compression"`
CompressionLevel int `config:"compression_level"`
Version string `config:"version"`
Version kafka.Version `config:"version"`
BulkMaxSize int `config:"bulk_max_size"`
MaxRetries int `config:"max_retries" validate:"min=-1,nonzero"`
ClientID string `config:"client_id"`
Expand Down Expand Up @@ -96,7 +97,7 @@ func defaultConfig() kafkaConfig {
BrokerTimeout: 10 * time.Second,
Compression: "gzip",
CompressionLevel: 4,
Version: "1.0.0",
Version: kafka.Version{String: "1.0.0"},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to have the default previous behavior, I was looking into that.

MaxRetries: 3,
ClientID: "beats",
ChanBufferSize: 256,
Expand All @@ -114,8 +115,8 @@ func (c *kafkaConfig) Validate() error {
return fmt.Errorf("compression mode '%v' unknown", c.Compression)
}

if _, ok := kafkaVersions[c.Version]; !ok {
return fmt.Errorf("unknown/unsupported kafka version '%v'", c.Version)
if err := c.Version.Validate(); err != nil {
return err
}

if c.Username != "" && c.Password == "" {
Expand Down Expand Up @@ -200,7 +201,7 @@ func newSaramaConfig(config *kafkaConfig) (*sarama.Config, error) {
// configure client ID
k.ClientID = config.ClientID

version, ok := kafkaVersions[config.Version]
version, ok := config.Version.Get()
if !ok {
return nil, fmt.Errorf("Unknown/unsupported kafka version: %v", config.Version)
}
Expand Down
7 changes: 0 additions & 7 deletions libbeat/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,6 @@ import (
"github.com/elastic/beats/libbeat/outputs/outil"
)

type kafka struct {
config kafkaConfig
topic outil.Selector

partitioner sarama.PartitionerConstructor
}

const (
defaultWaitRetry = 1 * time.Second

Expand Down
10 changes: 8 additions & 2 deletions metricbeat/module/kafka/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,14 @@ import (
"github.com/Shopify/sarama"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/kafka"
)

// Version returns a kafka version from its string representation
func Version(version string) kafka.Version {
return kafka.Version{String: version}
}

// Broker provides functionality for communicating with a single kafka broker
type Broker struct {
broker *sarama.Broker
Expand All @@ -50,7 +56,7 @@ type BrokerSettings struct {
Backoff time.Duration
TLS *tls.Config
Username, Password string
Version Version
Version kafka.Version
}

type GroupDescription struct {
Expand Down Expand Up @@ -83,7 +89,7 @@ func NewBroker(host string, settings BrokerSettings) *Broker {
cfg.Net.SASL.User = user
cfg.Net.SASL.Password = settings.Password
}
cfg.Version = settings.Version.get()
cfg.Version, _ = settings.Version.Get()

return &Broker{
broker: sarama.NewBroker(host),
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/kafka/consumergroup/consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
Password: config.Password,

// consumer groups API requires at least 0.9.0.0
Version: kafka.Version{String: "0.9.0.0"},
Version: kafka.Version("0.9.0.0"),
}

return &MetricSet{
Expand Down
1 change: 1 addition & 0 deletions metricbeat/module/kafka/partition/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
TLS: tls,
Username: config.Username,
Password: config.Password,
Version: kafka.Version("0.8.2.0"),
}

return &MetricSet{
Expand Down
79 changes: 0 additions & 79 deletions metricbeat/module/kafka/version.go

This file was deleted.

6 changes: 5 additions & 1 deletion metricbeat/tests/system/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

class KafkaTest(metricbeat.BaseTest):
COMPOSE_SERVICES = ['kafka']
VERSION = "2.0.0"

@unittest.skipUnless(metricbeat.INTEGRATION_TESTS, "integration test")
def test_partition(self):
Expand All @@ -20,7 +21,8 @@ def test_partition(self):
"name": "kafka",
"metricsets": ["partition"],
"hosts": self.get_hosts(),
"period": "1s"
"period": "1s",
"version": self.VERSION,
}])
proc = self.start_beat()
self.wait_until(lambda: self.output_lines() > 0, max_timeout=20)
Expand All @@ -47,7 +49,9 @@ def get_hosts(self):

class Kafka_1_1_0_Test(KafkaTest):
COMPOSE_SERVICES = ['kafka_1_1_0']
VERSION = "1.1.0"


class Kafka_0_10_2_Test(KafkaTest):
COMPOSE_SERVICES = ['kafka_0_10_2']
VERSION = "0.10.2"
2 changes: 1 addition & 1 deletion testing/environments/docker/kafka/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ ENV KAFKA_HOME /kafka
# The advertised host is kafka. This means it will not work if container is started locally and connected from localhost to it
ENV KAFKA_ADVERTISED_HOST kafka
ENV KAFKA_LOGS_DIR="/kafka-logs"
ENV KAFKA_VERSION 1.1.1
ENV KAFKA_VERSION 2.0.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for updating this!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be still pending to test the output with multiple versions, but I leave this for a future change, maybe related with #7957

ENV _JAVA_OPTIONS "-Djava.net.preferIPv4Stack=true"
ENV TERM=linux

Expand Down