Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configurable kafka protocol version for msg consuming by jaeger ingester #1640

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/ingester/app/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWrit
Topic: options.Topic,
GroupID: options.GroupID,
ClientID: options.ClientID,
ProtocolVersion: options.ProtocolVersion,
AuthenticationConfig: options.AuthenticationConfig,
}
saramaConsumer, err := consumerConfig.NewConsumer()
Expand Down
7 changes: 7 additions & 0 deletions cmd/ingester/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ const (
SuffixGroupID = ".group-id"
// SuffixClientID is a suffix for the client-id flag
SuffixClientID = ".client-id"
// SuffixProtocolVersion Kafka protocol version - must be supported by kafka server
SuffixProtocolVersion = ".protocol-version"
// SuffixEncoding is a suffix for the encoding flag
SuffixEncoding = ".encoding"
// SuffixDeadlockInterval is a suffix for deadlock detecor flag
Expand Down Expand Up @@ -91,6 +93,10 @@ func AddFlags(flagSet *flag.FlagSet) {
KafkaConsumerConfigPrefix+SuffixClientID,
DefaultClientID,
"The Consumer Client ID that ingester will use")
flagSet.String(
KafkaConsumerConfigPrefix+SuffixProtocolVersion,
"",
"Kafka protocol version - must be supported by kafka server")
flagSet.String(
KafkaConsumerConfigPrefix+SuffixEncoding,
DefaultEncoding,
Expand All @@ -113,6 +119,7 @@ func (o *Options) InitFromViper(v *viper.Viper) {
o.Topic = v.GetString(KafkaConsumerConfigPrefix + SuffixTopic)
o.GroupID = v.GetString(KafkaConsumerConfigPrefix + SuffixGroupID)
o.ClientID = v.GetString(KafkaConsumerConfigPrefix + SuffixClientID)
o.ProtocolVersion = v.GetString(KafkaConsumerConfigPrefix + SuffixProtocolVersion)
o.Encoding = v.GetString(KafkaConsumerConfigPrefix + SuffixEncoding)

o.Parallelism = v.GetInt(ConfigPrefix + SuffixParallelism)
Expand Down
2 changes: 2 additions & 0 deletions cmd/ingester/app/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func TestOptionsWithFlags(t *testing.T) {
"--kafka.consumer.group-id=group1",
"--kafka.consumer.client-id=client-id1",
"--kafka.consumer.encoding=json",
"--kafka.consumer.protocol-version=1.0.0",
"--ingester.parallelism=5",
"--ingester.deadlockInterval=2m",
})
Expand All @@ -42,6 +43,7 @@ func TestOptionsWithFlags(t *testing.T) {
assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, o.Brokers)
assert.Equal(t, "group1", o.GroupID)
assert.Equal(t, "client-id1", o.ClientID)
assert.Equal(t, "1.0.0", o.ProtocolVersion)
assert.Equal(t, 5, o.Parallelism)
assert.Equal(t, 2*time.Minute, o.DeadlockInterval)
assert.Equal(t, kafka.EncodingJSON, o.Encoding)
Expand Down
17 changes: 13 additions & 4 deletions pkg/kafka/consumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package consumer
import (
"io"

"github.com/Shopify/sarama"
"github.com/bsm/sarama-cluster"

"github.com/jaegertracing/jaeger/pkg/kafka/auth"
Expand All @@ -36,10 +37,11 @@ type Builder interface {

// Configuration describes the configuration properties needed to create a Kafka consumer
type Configuration struct {
Brokers []string
Topic string
GroupID string
ClientID string
Brokers []string
Topic string
GroupID string
ClientID string
ProtocolVersion string
Consumer
auth.AuthenticationConfig
}
Expand All @@ -49,6 +51,13 @@ func (c *Configuration) NewConsumer() (Consumer, error) {
saramaConfig := cluster.NewConfig()
saramaConfig.Group.Mode = cluster.ConsumerModePartitions
saramaConfig.ClientID = c.ClientID
if len(c.ProtocolVersion) > 0 {
ver, err := sarama.ParseKafkaVersion(c.ProtocolVersion)
if err != nil {
return nil, err
}
saramaConfig.Config.Version = ver
}
c.AuthenticationConfig.SetConfiguration(&saramaConfig.Config)
return cluster.NewConsumer(c.Brokers, c.GroupID, []string{c.Topic}, saramaConfig)
}