Skip to content

Commit

Permalink
[ingester] Allow configurable kafka protocol version (#1640)
Browse files Browse the repository at this point in the history
* Configurable kafka protocol version for msg consuming by jaeger ingester

Signed-off-by: Chodor Marek <marek.chodor@grupawp.pl>

* rename kafka.consumer.version to kafka.consumer.protcol-version

Signed-off-by: Chodor Marek <marek.chodor@grupawp.pl>
  • Loading branch information
marqc authored and yurishkuro committed Jul 8, 2019
1 parent 65c8e56 commit c14b096
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 4 deletions.
1 change: 1 addition & 0 deletions cmd/ingester/app/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWrit
Topic: options.Topic,
GroupID: options.GroupID,
ClientID: options.ClientID,
ProtocolVersion: options.ProtocolVersion,
AuthenticationConfig: options.AuthenticationConfig,
}
saramaConsumer, err := consumerConfig.NewConsumer()
Expand Down
7 changes: 7 additions & 0 deletions cmd/ingester/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ const (
SuffixGroupID = ".group-id"
// SuffixClientID is a suffix for the client-id flag
SuffixClientID = ".client-id"
// SuffixProtocolVersion Kafka protocol version - must be supported by kafka server
SuffixProtocolVersion = ".protocol-version"
// SuffixEncoding is a suffix for the encoding flag
SuffixEncoding = ".encoding"
// SuffixDeadlockInterval is a suffix for deadlock detecor flag
Expand Down Expand Up @@ -91,6 +93,10 @@ func AddFlags(flagSet *flag.FlagSet) {
KafkaConsumerConfigPrefix+SuffixClientID,
DefaultClientID,
"The Consumer Client ID that ingester will use")
flagSet.String(
KafkaConsumerConfigPrefix+SuffixProtocolVersion,
"",
"Kafka protocol version - must be supported by kafka server")
flagSet.String(
KafkaConsumerConfigPrefix+SuffixEncoding,
DefaultEncoding,
Expand All @@ -113,6 +119,7 @@ func (o *Options) InitFromViper(v *viper.Viper) {
o.Topic = v.GetString(KafkaConsumerConfigPrefix + SuffixTopic)
o.GroupID = v.GetString(KafkaConsumerConfigPrefix + SuffixGroupID)
o.ClientID = v.GetString(KafkaConsumerConfigPrefix + SuffixClientID)
o.ProtocolVersion = v.GetString(KafkaConsumerConfigPrefix + SuffixProtocolVersion)
o.Encoding = v.GetString(KafkaConsumerConfigPrefix + SuffixEncoding)

o.Parallelism = v.GetInt(ConfigPrefix + SuffixParallelism)
Expand Down
2 changes: 2 additions & 0 deletions cmd/ingester/app/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func TestOptionsWithFlags(t *testing.T) {
"--kafka.consumer.group-id=group1",
"--kafka.consumer.client-id=client-id1",
"--kafka.consumer.encoding=json",
"--kafka.consumer.protocol-version=1.0.0",
"--ingester.parallelism=5",
"--ingester.deadlockInterval=2m",
})
Expand All @@ -42,6 +43,7 @@ func TestOptionsWithFlags(t *testing.T) {
assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, o.Brokers)
assert.Equal(t, "group1", o.GroupID)
assert.Equal(t, "client-id1", o.ClientID)
assert.Equal(t, "1.0.0", o.ProtocolVersion)
assert.Equal(t, 5, o.Parallelism)
assert.Equal(t, 2*time.Minute, o.DeadlockInterval)
assert.Equal(t, kafka.EncodingJSON, o.Encoding)
Expand Down
17 changes: 13 additions & 4 deletions pkg/kafka/consumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package consumer
import (
"io"

"github.com/Shopify/sarama"
"github.com/bsm/sarama-cluster"

"github.com/jaegertracing/jaeger/pkg/kafka/auth"
Expand All @@ -36,10 +37,11 @@ type Builder interface {

// Configuration describes the configuration properties needed to create a Kafka consumer
type Configuration struct {
Brokers []string
Topic string
GroupID string
ClientID string
Brokers []string
Topic string
GroupID string
ClientID string
ProtocolVersion string
Consumer
auth.AuthenticationConfig
}
Expand All @@ -49,6 +51,13 @@ func (c *Configuration) NewConsumer() (Consumer, error) {
saramaConfig := cluster.NewConfig()
saramaConfig.Group.Mode = cluster.ConsumerModePartitions
saramaConfig.ClientID = c.ClientID
if len(c.ProtocolVersion) > 0 {
ver, err := sarama.ParseKafkaVersion(c.ProtocolVersion)
if err != nil {
return nil, err
}
saramaConfig.Config.Version = ver
}
c.AuthenticationConfig.SetConfiguration(&saramaConfig.Config)
return cluster.NewConsumer(c.Brokers, c.GroupID, []string{c.Topic}, saramaConfig)
}

0 comments on commit c14b096

Please sign in to comment.