-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for kafka 2.0.0 #8399
Conversation
libbeat/common/kafka/version.go
Outdated
return nil | ||
} | ||
|
||
func (v *Version) Get() (sarama.KafkaVersion, bool) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported method Version.Get should have comment or be unexported
libbeat/common/kafka/version.go
Outdated
return nil | ||
} | ||
|
||
func (v *Version) Unpack(s string) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported method Version.Unpack should have comment or be unexported
libbeat/common/kafka/version.go
Outdated
@@ -78,3 +88,26 @@ func parseKafkaVersion(s string) sarama.KafkaVersion { | |||
} | |||
return v | |||
} | |||
|
|||
func (v *Version) Validate() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported method Version.Validate should have comment or be unexported
libbeat/common/kafka/version.go
Outdated
"github.com/Shopify/sarama" | ||
) | ||
|
||
type Version struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported type Version should have comment or be unexported
libbeat/common/kafka/version.go
Outdated
// Version is a kafka version | ||
type Version struct { | ||
String string | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could have the same features of the validation by using type Version string
without having to have a String Field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, changed
libbeat/outputs/kafka/config.go
Outdated
@@ -96,7 +97,7 @@ func defaultConfig() kafkaConfig { | |||
BrokerTimeout: 10 * time.Second, | |||
Compression: "gzip", | |||
CompressionLevel: 4, | |||
Version: "1.0.0", | |||
Version: kafka.Version{String: "1.0.0"}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to have the default previous behavior, I was looking into that.
@@ -4,7 +4,7 @@ ENV KAFKA_HOME /kafka | |||
# The advertised host is kafka. This means it will not work if container is started locally and connected from localhost to it | |||
ENV KAFKA_ADVERTISED_HOST kafka | |||
ENV KAFKA_LOGS_DIR="/kafka-logs" | |||
ENV KAFKA_VERSION 1.1.1 | |||
ENV KAFKA_VERSION 2.0.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for updating this!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be still pending to test the output with multiple versions, but I leave this for a future change, maybe related with #7957
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor changes for the Version type.
Add support to kafka 2.0.0 in kafka output and metricbeat module. Merge kafka versioning helpers of output and metricbeat module. Set version in kafka module configuration of metricbeat system tests (cherry picked from commit 1bfd445)
…8467) Test metricbeat kafka module with kafka 2.0.0 Upgrade sarama client to 1.18.0. Add support to kafka 2.0.0 in kafka output and metricbeat module. Merge kafka versioning helpers of output and metricbeat module. Set version in kafka module configuration of metricbeat system tests (cherry picked from commits 7635731, 9749f3d and 1bfd445)
Related to #8330 and #7992, fixes #6823