From c14b096b48c2750f63d048da27543acd9f990488 Mon Sep 17 00:00:00 2001 From: marqc Date: Mon, 8 Jul 2019 18:22:21 +0200 Subject: [PATCH] [ingester] Allow configurable kafka protocol version (#1640) * Configurable kafka protocol version for msg consuming by jaeger ingester Signed-off-by: Chodor Marek * rename kafka.consumer.version to kafka.consumer.protcol-version Signed-off-by: Chodor Marek --- cmd/ingester/app/builder/builder.go | 1 + cmd/ingester/app/flags.go | 7 +++++++ cmd/ingester/app/flags_test.go | 2 ++ pkg/kafka/consumer/config.go | 17 +++++++++++++---- 4 files changed, 23 insertions(+), 4 deletions(-) diff --git a/cmd/ingester/app/builder/builder.go b/cmd/ingester/app/builder/builder.go index 279823fe943..4d2e86d100c 100644 --- a/cmd/ingester/app/builder/builder.go +++ b/cmd/ingester/app/builder/builder.go @@ -55,6 +55,7 @@ func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWrit Topic: options.Topic, GroupID: options.GroupID, ClientID: options.ClientID, + ProtocolVersion: options.ProtocolVersion, AuthenticationConfig: options.AuthenticationConfig, } saramaConsumer, err := consumerConfig.NewConsumer() diff --git a/cmd/ingester/app/flags.go b/cmd/ingester/app/flags.go index 9eba6832865..3d45337f84f 100644 --- a/cmd/ingester/app/flags.go +++ b/cmd/ingester/app/flags.go @@ -41,6 +41,8 @@ const ( SuffixGroupID = ".group-id" // SuffixClientID is a suffix for the client-id flag SuffixClientID = ".client-id" + // SuffixProtocolVersion Kafka protocol version - must be supported by kafka server + SuffixProtocolVersion = ".protocol-version" // SuffixEncoding is a suffix for the encoding flag SuffixEncoding = ".encoding" // SuffixDeadlockInterval is a suffix for deadlock detecor flag @@ -91,6 +93,10 @@ func AddFlags(flagSet *flag.FlagSet) { KafkaConsumerConfigPrefix+SuffixClientID, DefaultClientID, "The Consumer Client ID that ingester will use") + flagSet.String( + KafkaConsumerConfigPrefix+SuffixProtocolVersion, + "", + "Kafka protocol version - must be supported by kafka server") flagSet.String( KafkaConsumerConfigPrefix+SuffixEncoding, DefaultEncoding, @@ -113,6 +119,7 @@ func (o *Options) InitFromViper(v *viper.Viper) { o.Topic = v.GetString(KafkaConsumerConfigPrefix + SuffixTopic) o.GroupID = v.GetString(KafkaConsumerConfigPrefix + SuffixGroupID) o.ClientID = v.GetString(KafkaConsumerConfigPrefix + SuffixClientID) + o.ProtocolVersion = v.GetString(KafkaConsumerConfigPrefix + SuffixProtocolVersion) o.Encoding = v.GetString(KafkaConsumerConfigPrefix + SuffixEncoding) o.Parallelism = v.GetInt(ConfigPrefix + SuffixParallelism) diff --git a/cmd/ingester/app/flags_test.go b/cmd/ingester/app/flags_test.go index 00c178668e3..fab2d5ed2f1 100644 --- a/cmd/ingester/app/flags_test.go +++ b/cmd/ingester/app/flags_test.go @@ -33,6 +33,7 @@ func TestOptionsWithFlags(t *testing.T) { "--kafka.consumer.group-id=group1", "--kafka.consumer.client-id=client-id1", "--kafka.consumer.encoding=json", + "--kafka.consumer.protocol-version=1.0.0", "--ingester.parallelism=5", "--ingester.deadlockInterval=2m", }) @@ -42,6 +43,7 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, o.Brokers) assert.Equal(t, "group1", o.GroupID) assert.Equal(t, "client-id1", o.ClientID) + assert.Equal(t, "1.0.0", o.ProtocolVersion) assert.Equal(t, 5, o.Parallelism) assert.Equal(t, 2*time.Minute, o.DeadlockInterval) assert.Equal(t, kafka.EncodingJSON, o.Encoding) diff --git a/pkg/kafka/consumer/config.go b/pkg/kafka/consumer/config.go index 509800b0bdb..caa21818de0 100644 --- a/pkg/kafka/consumer/config.go +++ b/pkg/kafka/consumer/config.go @@ -17,6 +17,7 @@ package consumer import ( "io" + "github.com/Shopify/sarama" "github.com/bsm/sarama-cluster" "github.com/jaegertracing/jaeger/pkg/kafka/auth" @@ -36,10 +37,11 @@ type Builder interface { // Configuration describes the configuration properties needed to create a Kafka consumer type Configuration struct { - Brokers []string - Topic string - GroupID string - ClientID string + Brokers []string + Topic string + GroupID string + ClientID string + ProtocolVersion string Consumer auth.AuthenticationConfig } @@ -49,6 +51,13 @@ func (c *Configuration) NewConsumer() (Consumer, error) { saramaConfig := cluster.NewConfig() saramaConfig.Group.Mode = cluster.ConsumerModePartitions saramaConfig.ClientID = c.ClientID + if len(c.ProtocolVersion) > 0 { + ver, err := sarama.ParseKafkaVersion(c.ProtocolVersion) + if err != nil { + return nil, err + } + saramaConfig.Config.Version = ver + } c.AuthenticationConfig.SetConfiguration(&saramaConfig.Config) return cluster.NewConsumer(c.Brokers, c.GroupID, []string{c.Topic}, saramaConfig) }