Skip to content

Commit

Permalink
rename kafka.consumer.version to kafka.consumer.protcol-version
Browse files Browse the repository at this point in the history
Signed-off-by: Chodor Marek <marek.chodor@grupawp.pl>
  • Loading branch information
Chodor Marek committed Jul 8, 2019
1 parent 8cd8860 commit 123f92e
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 15 deletions.
2 changes: 1 addition & 1 deletion cmd/ingester/app/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWrit
Topic: options.Topic,
GroupID: options.GroupID,
ClientID: options.ClientID,
Version: options.Version,
ProtocolVersion: options.ProtocolVersion,
AuthenticationConfig: options.AuthenticationConfig,
}
saramaConsumer, err := consumerConfig.NewConsumer()
Expand Down
10 changes: 5 additions & 5 deletions cmd/ingester/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ const (
SuffixGroupID = ".group-id"
// SuffixClientID is a suffix for the client-id flag
SuffixClientID = ".client-id"
// SuffixVersion Kafka protocol version - must be supported by kafka server
SuffixVersion = ".version"
// SuffixProtocolVersion Kafka protocol version - must be supported by kafka server
SuffixProtocolVersion = ".protocol-version"
// SuffixEncoding is a suffix for the encoding flag
SuffixEncoding = ".encoding"
// SuffixDeadlockInterval is a suffix for deadlock detecor flag
Expand Down Expand Up @@ -94,9 +94,9 @@ func AddFlags(flagSet *flag.FlagSet) {
DefaultClientID,
"The Consumer Client ID that ingester will use")
flagSet.String(
KafkaConsumerConfigPrefix+SuffixVersion,
KafkaConsumerConfigPrefix+SuffixProtocolVersion,
"",
"Kafka version - must be supported by kafka server")
"Kafka protocol version - must be supported by kafka server")
flagSet.String(
KafkaConsumerConfigPrefix+SuffixEncoding,
DefaultEncoding,
Expand All @@ -119,7 +119,7 @@ func (o *Options) InitFromViper(v *viper.Viper) {
o.Topic = v.GetString(KafkaConsumerConfigPrefix + SuffixTopic)
o.GroupID = v.GetString(KafkaConsumerConfigPrefix + SuffixGroupID)
o.ClientID = v.GetString(KafkaConsumerConfigPrefix + SuffixClientID)
o.Version = v.GetString(KafkaConsumerConfigPrefix + SuffixVersion)
o.ProtocolVersion = v.GetString(KafkaConsumerConfigPrefix + SuffixProtocolVersion)
o.Encoding = v.GetString(KafkaConsumerConfigPrefix + SuffixEncoding)

o.Parallelism = v.GetInt(ConfigPrefix + SuffixParallelism)
Expand Down
4 changes: 2 additions & 2 deletions cmd/ingester/app/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestOptionsWithFlags(t *testing.T) {
"--kafka.consumer.group-id=group1",
"--kafka.consumer.client-id=client-id1",
"--kafka.consumer.encoding=json",
"--kafka.consumer.version=1.0.0",
"--kafka.consumer.protcol-version=1.0.0",
"--ingester.parallelism=5",
"--ingester.deadlockInterval=2m",
})
Expand All @@ -43,7 +43,7 @@ func TestOptionsWithFlags(t *testing.T) {
assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, o.Brokers)
assert.Equal(t, "group1", o.GroupID)
assert.Equal(t, "client-id1", o.ClientID)
assert.Equal(t, "1.0.0", o.Version)
assert.Equal(t, "1.0.0", o.ProtocolVersion)
assert.Equal(t, 5, o.Parallelism)
assert.Equal(t, 2*time.Minute, o.DeadlockInterval)
assert.Equal(t, kafka.EncodingJSON, o.Encoding)
Expand Down
14 changes: 7 additions & 7 deletions pkg/kafka/consumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ type Builder interface {

// Configuration describes the configuration properties needed to create a Kafka consumer
type Configuration struct {
Brokers []string
Topic string
GroupID string
ClientID string
Version string
Brokers []string
Topic string
GroupID string
ClientID string
ProtocolVersion string
Consumer
auth.AuthenticationConfig
}
Expand All @@ -51,8 +51,8 @@ func (c *Configuration) NewConsumer() (Consumer, error) {
saramaConfig := cluster.NewConfig()
saramaConfig.Group.Mode = cluster.ConsumerModePartitions
saramaConfig.ClientID = c.ClientID
if len(c.Version) > 0 {
ver, err := sarama.ParseKafkaVersion(c.Version)
if len(c.ProtocolVersion) > 0 {
ver, err := sarama.ParseKafkaVersion(c.ProtocolVersion)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 123f92e

Please sign in to comment.