Skip to content

Commit

Permalink
Add support for kafka 2.0.0 (elastic#8399)
Browse files Browse the repository at this point in the history
Add support to kafka 2.0.0 in kafka output and metricbeat module.
Merge kafka versioning helpers of output and metricbeat module.
Set version in kafka module configuration of metricbeat system tests

(cherry picked from commit 1bfd445)
  • Loading branch information
jsoriano committed Sep 27, 2018
1 parent fe318a5 commit a400d41
Show file tree
Hide file tree
Showing 11 changed files with 63 additions and 100 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ https://github.com/elastic/beats/compare/v6.4.0...6.x[Check the HEAD diff]
- Make kubernetes autodiscover ignore events with empty container IDs {pull}7971[7971]
- Implement CheckConfig in RunnerFactory to make autodiscover check configs {pull}7961[7961]
- Add DNS processor with support for performing reverse lookups on IP addresses. {issue}7770[7770]
- Support for Kafka 2.0.0 in kafka output {pull}8399[8399]

*Auditbeat*

Expand All @@ -113,6 +114,7 @@ https://github.com/elastic/beats/compare/v6.4.0...6.x[Check the HEAD diff]
- Added 'died' PID state to process_system metricset on system module {pull}8275[8275]
- Add `metrics` metricset to MongoDB module. {pull}7611[7611]
- Added `ccr` metricset to Elasticsearch module. {pull}8335[8335]
- Support for Kafka 2.0.0 {pull}8399[8399]

*Packetbeat*

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,14 @@

package kafka

import "github.com/Shopify/sarama"
import (
"fmt"

"github.com/Shopify/sarama"
)

// Version is a kafka version
type Version string

// TODO: remove me.
// Compat version overwrite for missing versions in sarama
Expand All @@ -31,8 +38,6 @@ var (
v1_1_1 = parseKafkaVersion("1.1.1")

kafkaVersions = map[string]sarama.KafkaVersion{
"": sarama.V1_0_0_0,

"0.8.2.0": sarama.V0_8_2_0,
"0.8.2.1": sarama.V0_8_2_1,
"0.8.2.2": sarama.V0_8_2_2,
Expand Down Expand Up @@ -68,6 +73,10 @@ var (
"1.1.1": v1_1_1,
"1.1": v1_1_1,
"1": v1_1_1,

"2.0.0": sarama.V2_0_0_0,
"2.0": sarama.V2_0_0_0,
"2": sarama.V2_0_0_0,
}
)

Expand All @@ -78,3 +87,29 @@ func parseKafkaVersion(s string) sarama.KafkaVersion {
}
return v
}

// Validate that a kafka version is among the possible options
func (v *Version) Validate() error {
if _, ok := kafkaVersions[string(*v)]; !ok {
return fmt.Errorf("unknown/unsupported kafka vesion '%v'", *v)
}

return nil
}

// Unpack a kafka version
func (v *Version) Unpack(s string) error {
tmp := Version(s)
if err := tmp.Validate(); err != nil {
return err
}

*v = tmp
return nil
}

// Get a sarama kafka version
func (v Version) Get() (sarama.KafkaVersion, bool) {
kv, ok := kafkaVersions[string(v)]
return kv, ok
}
2 changes: 1 addition & 1 deletion libbeat/docs/outputconfig.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ NOTE: Events bigger than <<kafka-max_message_bytes,`max_message_bytes`>> will be

==== Compatibility

This output works with all Kafka in between 0.11 and 1.1.1. Older versions
This output works with all Kafka versions in between 0.11 and 2.0.0. Older versions
might work as well, but are not supported.

==== Configuration options
Expand Down
11 changes: 6 additions & 5 deletions libbeat/outputs/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/fmtstr"
"github.com/elastic/beats/libbeat/common/kafka"
"github.com/elastic/beats/libbeat/common/transport/tlscommon"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
Expand All @@ -48,7 +49,7 @@ type kafkaConfig struct {
BrokerTimeout time.Duration `config:"broker_timeout" validate:"min=1"`
Compression string `config:"compression"`
CompressionLevel int `config:"compression_level"`
Version string `config:"version"`
Version kafka.Version `config:"version"`
BulkMaxSize int `config:"bulk_max_size"`
MaxRetries int `config:"max_retries" validate:"min=-1,nonzero"`
ClientID string `config:"client_id"`
Expand Down Expand Up @@ -96,7 +97,7 @@ func defaultConfig() kafkaConfig {
BrokerTimeout: 10 * time.Second,
Compression: "gzip",
CompressionLevel: 4,
Version: "1.0.0",
Version: kafka.Version("1.0.0"),
MaxRetries: 3,
ClientID: "beats",
ChanBufferSize: 256,
Expand All @@ -114,8 +115,8 @@ func (c *kafkaConfig) Validate() error {
return fmt.Errorf("compression mode '%v' unknown", c.Compression)
}

if _, ok := kafkaVersions[c.Version]; !ok {
return fmt.Errorf("unknown/unsupported kafka version '%v'", c.Version)
if err := c.Version.Validate(); err != nil {
return err
}

if c.Username != "" && c.Password == "" {
Expand Down Expand Up @@ -200,7 +201,7 @@ func newSaramaConfig(config *kafkaConfig) (*sarama.Config, error) {
// configure client ID
k.ClientID = config.ClientID

version, ok := kafkaVersions[config.Version]
version, ok := config.Version.Get()
if !ok {
return nil, fmt.Errorf("Unknown/unsupported kafka version: %v", config.Version)
}
Expand Down
7 changes: 0 additions & 7 deletions libbeat/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,6 @@ import (
"github.com/elastic/beats/libbeat/outputs/outil"
)

type kafka struct {
config kafkaConfig
topic outil.Selector

partitioner sarama.PartitionerConstructor
}

const (
defaultWaitRetry = 1 * time.Second

Expand Down
10 changes: 8 additions & 2 deletions metricbeat/module/kafka/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,14 @@ import (
"github.com/Shopify/sarama"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/kafka"
)

// Version returns a kafka version from its string representation
func Version(version string) kafka.Version {
return kafka.Version(version)
}

// Broker provides functionality for communicating with a single kafka broker
type Broker struct {
broker *sarama.Broker
Expand All @@ -50,7 +56,7 @@ type BrokerSettings struct {
Backoff time.Duration
TLS *tls.Config
Username, Password string
Version Version
Version kafka.Version
}

type GroupDescription struct {
Expand Down Expand Up @@ -83,7 +89,7 @@ func NewBroker(host string, settings BrokerSettings) *Broker {
cfg.Net.SASL.User = user
cfg.Net.SASL.Password = settings.Password
}
cfg.Version = settings.Version.get()
cfg.Version, _ = settings.Version.Get()

return &Broker{
broker: sarama.NewBroker(host),
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/kafka/consumergroup/consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
Password: config.Password,

// consumer groups API requires at least 0.9.0.0
Version: kafka.Version{String: "0.9.0.0"},
Version: kafka.Version("0.9.0.0"),
}

return &MetricSet{
Expand Down
1 change: 1 addition & 0 deletions metricbeat/module/kafka/partition/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
TLS: tls,
Username: config.Username,
Password: config.Password,
Version: kafka.Version("0.8.2.0"),
}

return &MetricSet{
Expand Down
79 changes: 0 additions & 79 deletions metricbeat/module/kafka/version.go

This file was deleted.

6 changes: 5 additions & 1 deletion metricbeat/tests/system/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

class KafkaTest(metricbeat.BaseTest):
COMPOSE_SERVICES = ['kafka']
VERSION = "2.0.0"

@unittest.skipUnless(metricbeat.INTEGRATION_TESTS, "integration test")
def test_partition(self):
Expand All @@ -20,7 +21,8 @@ def test_partition(self):
"name": "kafka",
"metricsets": ["partition"],
"hosts": self.get_hosts(),
"period": "1s"
"period": "1s",
"version": self.VERSION,
}])
proc = self.start_beat()
self.wait_until(lambda: self.output_lines() > 0, max_timeout=20)
Expand All @@ -47,7 +49,9 @@ def get_hosts(self):

class Kafka_1_1_0_Test(KafkaTest):
COMPOSE_SERVICES = ['kafka_1_1_0']
VERSION = "1.1.0"


class Kafka_0_10_2_Test(KafkaTest):
COMPOSE_SERVICES = ['kafka_0_10_2']
VERSION = "0.10.2"
2 changes: 1 addition & 1 deletion testing/environments/docker/kafka/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ ENV KAFKA_HOME /kafka
# The advertised host is kafka. This means it will not work if container is started locally and connected from localhost to it
ENV KAFKA_ADVERTISED_HOST kafka
ENV KAFKA_LOGS_DIR="/kafka-logs"
ENV KAFKA_VERSION 1.1.1
ENV KAFKA_VERSION 2.0.0
ENV _JAVA_OPTIONS "-Djava.net.preferIPv4Stack=true"
ENV TERM=linux

Expand Down

0 comments on commit a400d41

Please sign in to comment.