Skip to content

Commit

Permalink
Add kafka protocol version option to collector (#1658)
Browse files Browse the repository at this point in the history
* Configurable kafka protocol version for msg consuming by jaeger ingester

Signed-off-by: Chodor Marek <marek.chodor@grupawp.pl>

* rename kafka.consumer.version to kafka.consumer.protcol-version

Signed-off-by: Chodor Marek <marek.chodor@grupawp.pl>

* Configurable kafka protocol/server version in producer

Signed-off-by: Chodor Marek <marek.chodor@grupawp.pl>

* remove "(experimental)" annotation from kafka.producer settings

Signed-off-by: Chodor Marek <marek.chodor@grupawp.pl>
  • Loading branch information
marqc authored and yurishkuro committed Jul 12, 2019
1 parent c14b096 commit 035fb77
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 11 deletions.
10 changes: 9 additions & 1 deletion pkg/kafka/producer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ type Builder interface {

// Configuration describes the configuration properties needed to create a Kafka producer
type Configuration struct {
Brokers []string
Brokers []string
ProtocolVersion string
auth.AuthenticationConfig
}

Expand All @@ -36,5 +37,12 @@ func (c *Configuration) NewProducer() (sarama.AsyncProducer, error) {
saramaConfig := sarama.NewConfig()
saramaConfig.Producer.Return.Successes = true
c.AuthenticationConfig.SetConfiguration(saramaConfig)
if len(c.ProtocolVersion) > 0 {
ver, err := sarama.ParseKafkaVersion(c.ProtocolVersion)
if err != nil {
return nil, err
}
saramaConfig.Version = ver
}
return sarama.NewAsyncProducer(c.Brokers, saramaConfig)
}
26 changes: 16 additions & 10 deletions plugin/storage/kafka/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@ const (
// EncodingZipkinThrift is used for spans encoded as Zipkin Thrift.
EncodingZipkinThrift = "zipkin-thrift"

configPrefix = "kafka.producer"
suffixBrokers = ".brokers"
suffixTopic = ".topic"
suffixEncoding = ".encoding"
defaultBroker = "127.0.0.1:9092"
defaultTopic = "jaeger-spans"
defaultEncoding = EncodingProto
configPrefix = "kafka.producer"
suffixBrokers = ".brokers"
suffixTopic = ".topic"
suffixProtocolVersion = ".protocol-version"
suffixEncoding = ".encoding"
defaultBroker = "127.0.0.1:9092"
defaultTopic = "jaeger-spans"
defaultEncoding = EncodingProto
)

var (
Expand All @@ -59,15 +60,19 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) {
flagSet.String(
configPrefix+suffixBrokers,
defaultBroker,
"(experimental) The comma-separated list of kafka brokers. i.e. '127.0.0.1:9092,0.0.0:1234'")
"The comma-separated list of kafka brokers. i.e. '127.0.0.1:9092,0.0.0:1234'")
flagSet.String(
configPrefix+suffixTopic,
defaultTopic,
"(experimental) The name of the kafka topic")
"The name of the kafka topic")
flagSet.String(
configPrefix+suffixProtocolVersion,
"",
"Kafka protocol version - must be supported by kafka server")
flagSet.String(
configPrefix+suffixEncoding,
defaultEncoding,
fmt.Sprintf(`(experimental) Encoding of spans ("%s" or "%s") sent to kafka.`, EncodingJSON, EncodingProto),
fmt.Sprintf(`Encoding of spans ("%s" or "%s") sent to kafka.`, EncodingJSON, EncodingProto),
)
auth.AddFlags(configPrefix, flagSet)
}
Expand All @@ -78,6 +83,7 @@ func (opt *Options) InitFromViper(v *viper.Viper) {
authenticationOptions.InitFromViper(configPrefix, v)
opt.config = producer.Configuration{
Brokers: strings.Split(stripWhiteSpace(v.GetString(configPrefix+suffixBrokers)), ","),
ProtocolVersion: v.GetString(configPrefix + suffixProtocolVersion),
AuthenticationConfig: authenticationOptions,
}
opt.topic = v.GetString(configPrefix + suffixTopic)
Expand Down

0 comments on commit 035fb77

Please sign in to comment.