Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #8399 to 6.x: Add support for kafka 2.0.0 #8467

Merged
merged 4 commits into from
Oct 1, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ https://github.com/elastic/beats/compare/v6.4.0...6.x[Check the HEAD diff]
- Make kubernetes autodiscover ignore events with empty container IDs {pull}7971[7971]
- Implement CheckConfig in RunnerFactory to make autodiscover check configs {pull}7961[7961]
- Add DNS processor with support for performing reverse lookups on IP addresses. {issue}7770[7770]
- Support for Kafka 2.0.0 in kafka output {pull}8399[8399]

*Auditbeat*

Expand All @@ -113,6 +114,7 @@ https://github.com/elastic/beats/compare/v6.4.0...6.x[Check the HEAD diff]
- Added 'died' PID state to process_system metricset on system module {pull}8275[8275]
- Add `metrics` metricset to MongoDB module. {pull}7611[7611]
- Added `ccr` metricset to Elasticsearch module. {pull}8335[8335]
- Support for Kafka 2.0.0 {pull}8399[8399]

*Packetbeat*

Expand Down
4 changes: 2 additions & 2 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2175,8 +2175,8 @@ THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
--------------------------------------------------------------------
Dependency: github.com/Shopify/sarama
Version: v1.17.0/enh/offset-replica-id
Revision: d1575e4abe04acbbe8ac766320585cdf271dd189
Version: =v1.18.0/enh/offset-replica-id
Revision: 0143592836b090a1b481def4d902cfb3c5c05ae5
License type (autodetected): MIT
./vendor/github.com/Shopify/sarama/LICENSE:
--------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,14 @@

package kafka

import "github.com/Shopify/sarama"
import (
"fmt"

"github.com/Shopify/sarama"
)

// Version is a kafka version
type Version string

// TODO: remove me.
// Compat version overwrite for missing versions in sarama
Expand All @@ -31,8 +38,6 @@ var (
v1_1_1 = parseKafkaVersion("1.1.1")

kafkaVersions = map[string]sarama.KafkaVersion{
"": sarama.V1_0_0_0,

"0.8.2.0": sarama.V0_8_2_0,
"0.8.2.1": sarama.V0_8_2_1,
"0.8.2.2": sarama.V0_8_2_2,
Expand Down Expand Up @@ -68,6 +73,10 @@ var (
"1.1.1": v1_1_1,
"1.1": v1_1_1,
"1": v1_1_1,

"2.0.0": sarama.V2_0_0_0,
"2.0": sarama.V2_0_0_0,
"2": sarama.V2_0_0_0,
}
)

Expand All @@ -78,3 +87,29 @@ func parseKafkaVersion(s string) sarama.KafkaVersion {
}
return v
}

// Validate that a kafka version is among the possible options
func (v *Version) Validate() error {
if _, ok := kafkaVersions[string(*v)]; !ok {
return fmt.Errorf("unknown/unsupported kafka vesion '%v'", *v)
}

return nil
}

// Unpack a kafka version
func (v *Version) Unpack(s string) error {
tmp := Version(s)
if err := tmp.Validate(); err != nil {
return err
}

*v = tmp
return nil
}

// Get a sarama kafka version
func (v Version) Get() (sarama.KafkaVersion, bool) {
kv, ok := kafkaVersions[string(v)]
return kv, ok
}
2 changes: 1 addition & 1 deletion libbeat/docs/outputconfig.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ NOTE: Events bigger than <<kafka-max_message_bytes,`max_message_bytes`>> will be

==== Compatibility

This output works with all Kafka in between 0.11 and 1.1.1. Older versions
This output works with all Kafka versions in between 0.11 and 2.0.0. Older versions
might work as well, but are not supported.

==== Configuration options
Expand Down
11 changes: 6 additions & 5 deletions libbeat/outputs/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/fmtstr"
"github.com/elastic/beats/libbeat/common/kafka"
"github.com/elastic/beats/libbeat/common/transport/tlscommon"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
Expand All @@ -48,7 +49,7 @@ type kafkaConfig struct {
BrokerTimeout time.Duration `config:"broker_timeout" validate:"min=1"`
Compression string `config:"compression"`
CompressionLevel int `config:"compression_level"`
Version string `config:"version"`
Version kafka.Version `config:"version"`
BulkMaxSize int `config:"bulk_max_size"`
MaxRetries int `config:"max_retries" validate:"min=-1,nonzero"`
ClientID string `config:"client_id"`
Expand Down Expand Up @@ -96,7 +97,7 @@ func defaultConfig() kafkaConfig {
BrokerTimeout: 10 * time.Second,
Compression: "gzip",
CompressionLevel: 4,
Version: "1.0.0",
Version: kafka.Version("1.0.0"),
MaxRetries: 3,
ClientID: "beats",
ChanBufferSize: 256,
Expand All @@ -114,8 +115,8 @@ func (c *kafkaConfig) Validate() error {
return fmt.Errorf("compression mode '%v' unknown", c.Compression)
}

if _, ok := kafkaVersions[c.Version]; !ok {
return fmt.Errorf("unknown/unsupported kafka version '%v'", c.Version)
if err := c.Version.Validate(); err != nil {
return err
}

if c.Username != "" && c.Password == "" {
Expand Down Expand Up @@ -200,7 +201,7 @@ func newSaramaConfig(config *kafkaConfig) (*sarama.Config, error) {
// configure client ID
k.ClientID = config.ClientID

version, ok := kafkaVersions[config.Version]
version, ok := config.Version.Get()
if !ok {
return nil, fmt.Errorf("Unknown/unsupported kafka version: %v", config.Version)
}
Expand Down
7 changes: 0 additions & 7 deletions libbeat/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,6 @@ import (
"github.com/elastic/beats/libbeat/outputs/outil"
)

type kafka struct {
config kafkaConfig
topic outil.Selector

partitioner sarama.PartitionerConstructor
}

const (
defaultWaitRetry = 1 * time.Second

Expand Down
12 changes: 10 additions & 2 deletions metricbeat/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,20 @@ services:
kafka:
build:
context: ./module/kafka/_meta
dockerfile: Dockerfile.1.1.0
args:
KAFKA_VERSION: 2.0.0

kafka_1_1_0:
build:
context: ./module/kafka/_meta
args:
KAFKA_VERSION: 1.1.0

kafka_0_10_2:
build:
context: ./module/kafka/_meta
dockerfile: Dockerfile.0.10.2
args:
KAFKA_VERSION: 0.10.2.1

kibana:
build: ./module/kibana/_meta
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/docs/modules/kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ The default metricsets are `consumergroup` and `partition`.
[float]
=== Compability

This module is tested with Kafka 0.10.2 and 1.1.0.
This module is tested with Kafka 0.10.2.1, 1.1.0 and 2.0.0.


[float]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
FROM debian:stretch

ARG KAFKA_VERSION=2.0.0

ENV KAFKA_HOME /kafka
# The advertised host is kafka. This means it will not work if container is started locally and connected from localhost to it
ENV KAFKA_LOGS_DIR="/kafka-logs"
ENV KAFKA_VERSION 1.1.0
ENV _JAVA_OPTIONS "-Djava.net.preferIPv4Stack=true"
ENV TERM=linux

Expand Down
25 changes: 0 additions & 25 deletions metricbeat/module/kafka/_meta/Dockerfile.0.10.2

This file was deleted.

2 changes: 1 addition & 1 deletion metricbeat/module/kafka/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ The default metricsets are `consumergroup` and `partition`.
[float]
=== Compability

This module is tested with Kafka 0.10.2 and 1.1.0.
This module is tested with Kafka 0.10.2.1, 1.1.0 and 2.0.0.
10 changes: 8 additions & 2 deletions metricbeat/module/kafka/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,14 @@ import (
"github.com/Shopify/sarama"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/kafka"
)

// Version returns a kafka version from its string representation
func Version(version string) kafka.Version {
return kafka.Version(version)
}

// Broker provides functionality for communicating with a single kafka broker
type Broker struct {
broker *sarama.Broker
Expand All @@ -50,7 +56,7 @@ type BrokerSettings struct {
Backoff time.Duration
TLS *tls.Config
Username, Password string
Version Version
Version kafka.Version
}

type GroupDescription struct {
Expand Down Expand Up @@ -83,7 +89,7 @@ func NewBroker(host string, settings BrokerSettings) *Broker {
cfg.Net.SASL.User = user
cfg.Net.SASL.Password = settings.Password
}
cfg.Version = settings.Version.get()
cfg.Version, _ = settings.Version.Get()

return &Broker{
broker: sarama.NewBroker(host),
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/kafka/consumergroup/consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
Password: config.Password,

// consumer groups API requires at least 0.9.0.0
Version: kafka.Version{String: "0.9.0.0"},
Version: kafka.Version("0.9.0.0"),
}

return &MetricSet{
Expand Down
1 change: 1 addition & 0 deletions metricbeat/module/kafka/partition/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
TLS: tls,
Username: config.Username,
Password: config.Password,
Version: kafka.Version("0.8.2.0"),
}

return &MetricSet{
Expand Down
79 changes: 0 additions & 79 deletions metricbeat/module/kafka/version.go

This file was deleted.

10 changes: 9 additions & 1 deletion metricbeat/tests/system/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

class KafkaTest(metricbeat.BaseTest):
COMPOSE_SERVICES = ['kafka']
VERSION = "2.0.0"

@unittest.skipUnless(metricbeat.INTEGRATION_TESTS, "integration test")
def test_partition(self):
Expand All @@ -20,7 +21,8 @@ def test_partition(self):
"name": "kafka",
"metricsets": ["partition"],
"hosts": self.get_hosts(),
"period": "1s"
"period": "1s",
"version": self.VERSION,
}])
proc = self.start_beat()
self.wait_until(lambda: self.output_lines() > 0, max_timeout=20)
Expand All @@ -45,5 +47,11 @@ def get_hosts(self):
os.getenv('KAFKA_PORT', '9092')]


class Kafka_1_1_0_Test(KafkaTest):
COMPOSE_SERVICES = ['kafka_1_1_0']
VERSION = "1.1.0"


class Kafka_0_10_2_Test(KafkaTest):
COMPOSE_SERVICES = ['kafka_0_10_2']
VERSION = "0.10.2"
Loading