Skip to content

Commit

Permalink
Merge pull request #189 from checkr/zz/add-kafka-version
Browse files Browse the repository at this point in the history
Add kafka version config
  • Loading branch information
zhouzhuojie authored Nov 13, 2018
2 parents 4b64006 + 8e84a94 commit f28b839
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 0 deletions.
1 change: 1 addition & 0 deletions pkg/config/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ var Config = struct {
RecorderType string `env:"FLAGR_RECORDER_TYPE" envDefault:"kafka"`

// Kafka related configurations for data records logging (Flagr Metrics)
RecorderKafkaVersion string `env:"FLAGR_RECORDER_KAFKA_VERSION" envDefault:"0.8.2.0"`
RecorderKafkaBrokers string `env:"FLAGR_RECORDER_KAFKA_BROKERS" envDefault:":9092"`
RecorderKafkaCertFile string `env:"FLAGR_RECORDER_KAFKA_CERTFILE" envDefault:""`
RecorderKafkaKeyFile string `env:"FLAGR_RECORDER_KAFKA_KEYFILE" envDefault:""`
Expand Down
9 changes: 9 additions & 0 deletions pkg/handler/data_recorder_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ var (
saramaNewAsyncProducer = sarama.NewAsyncProducer
)

func mustParseKafkaVersion() sarama.KafkaVersion {
v, err := sarama.ParseKafkaVersion(config.Config.RecorderKafkaVersion)
if err != nil {
panic(err)
}
return v
}

// NewKafkaRecorder creates a new Kafka recorder
var NewKafkaRecorder = func() DataRecorder {
cfg := sarama.NewConfig()
Expand All @@ -39,6 +47,7 @@ var NewKafkaRecorder = func() DataRecorder {
cfg.Producer.RequiredAcks = sarama.WaitForLocal
cfg.Producer.Retry.Max = config.Config.RecorderKafkaRetryMax
cfg.Producer.Flush.Frequency = config.Config.RecorderKafkaFlushFrequency
cfg.Version = mustParseKafkaVersion()

brokerList := strings.Split(config.Config.RecorderKafkaBrokers, ",")
producer, err := saramaNewAsyncProducer(brokerList, cfg)
Expand Down

0 comments on commit f28b839

Please sign in to comment.