diff --git a/pkg/kafka/producer/config.go b/pkg/kafka/producer/config.go index c866c4d957f..4e254e12b1a 100644 --- a/pkg/kafka/producer/config.go +++ b/pkg/kafka/producer/config.go @@ -27,7 +27,8 @@ type Builder interface { // Configuration describes the configuration properties needed to create a Kafka producer type Configuration struct { - Brokers []string + Brokers []string + ProtocolVersion string auth.AuthenticationConfig } @@ -36,5 +37,12 @@ func (c *Configuration) NewProducer() (sarama.AsyncProducer, error) { saramaConfig := sarama.NewConfig() saramaConfig.Producer.Return.Successes = true c.AuthenticationConfig.SetConfiguration(saramaConfig) + if len(c.ProtocolVersion) > 0 { + ver, err := sarama.ParseKafkaVersion(c.ProtocolVersion) + if err != nil { + return nil, err + } + saramaConfig.Version = ver + } return sarama.NewAsyncProducer(c.Brokers, saramaConfig) } diff --git a/plugin/storage/kafka/options.go b/plugin/storage/kafka/options.go index 136449d6e14..8b8ff0760fa 100644 --- a/plugin/storage/kafka/options.go +++ b/plugin/storage/kafka/options.go @@ -33,13 +33,14 @@ const ( // EncodingZipkinThrift is used for spans encoded as Zipkin Thrift. EncodingZipkinThrift = "zipkin-thrift" - configPrefix = "kafka.producer" - suffixBrokers = ".brokers" - suffixTopic = ".topic" - suffixEncoding = ".encoding" - defaultBroker = "127.0.0.1:9092" - defaultTopic = "jaeger-spans" - defaultEncoding = EncodingProto + configPrefix = "kafka.producer" + suffixBrokers = ".brokers" + suffixTopic = ".topic" + suffixProtocolVersion = ".protocol-version" + suffixEncoding = ".encoding" + defaultBroker = "127.0.0.1:9092" + defaultTopic = "jaeger-spans" + defaultEncoding = EncodingProto ) var ( @@ -59,15 +60,19 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) { flagSet.String( configPrefix+suffixBrokers, defaultBroker, - "(experimental) The comma-separated list of kafka brokers. i.e. '127.0.0.1:9092,0.0.0:1234'") + "The comma-separated list of kafka brokers. i.e. '127.0.0.1:9092,0.0.0:1234'") flagSet.String( configPrefix+suffixTopic, defaultTopic, - "(experimental) The name of the kafka topic") + "The name of the kafka topic") + flagSet.String( + configPrefix+suffixProtocolVersion, + "", + "Kafka protocol version - must be supported by kafka server") flagSet.String( configPrefix+suffixEncoding, defaultEncoding, - fmt.Sprintf(`(experimental) Encoding of spans ("%s" or "%s") sent to kafka.`, EncodingJSON, EncodingProto), + fmt.Sprintf(`Encoding of spans ("%s" or "%s") sent to kafka.`, EncodingJSON, EncodingProto), ) auth.AddFlags(configPrefix, flagSet) } @@ -78,6 +83,7 @@ func (opt *Options) InitFromViper(v *viper.Viper) { authenticationOptions.InitFromViper(configPrefix, v) opt.config = producer.Configuration{ Brokers: strings.Split(stripWhiteSpace(v.GetString(configPrefix+suffixBrokers)), ","), + ProtocolVersion: v.GetString(configPrefix + suffixProtocolVersion), AuthenticationConfig: authenticationOptions, } opt.topic = v.GetString(configPrefix + suffixTopic)