From a9905127203441679c75adb64553a7201293c3a4 Mon Sep 17 00:00:00 2001 From: Felix Geller Date: Sat, 27 Aug 2016 16:22:53 +1200 Subject: [PATCH] adds leading 0 to versions and missing config. --- common.go | 10 +++++----- consume.go | 14 +++++++++++++- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/common.go b/common.go index 3e90ae2..3a75077 100644 --- a/common.go +++ b/common.go @@ -13,15 +13,15 @@ var ( func kafkaVersion(s string) sarama.KafkaVersion { switch s { - case "v8.2.0": + case "v0.8.2.0": return sarama.V0_8_2_0 - case "v8.2.1": + case "v0.8.2.1": return sarama.V0_8_2_1 - case "v8.2.2": + case "v0.8.2.2": return sarama.V0_8_2_2 - case "v9.0.0": + case "v0.9.0.0": return sarama.V0_9_0_0 - case "v9.0.1": + case "v0.9.0.1": return sarama.V0_9_0_1 default: return sarama.V0_10_0_0 diff --git a/consume.go b/consume.go index 413b0c4..e087cc2 100644 --- a/consume.go +++ b/consume.go @@ -330,7 +330,19 @@ func consumeCommand() command { if config.consume.verbose { sarama.Logger = log.New(os.Stderr, "", log.LstdFlags) } - consumer, err := sarama.NewConsumer(config.consume.brokers, nil) + + conf := sarama.NewConfig() + conf.Version = config.consume.version + u, err := user.Current() + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to read current user err=%v", err) + } + conf.ClientID = "kt-consume-" + u.Username + if config.consume.verbose { + fmt.Fprintf(os.Stderr, "sarama client configuration %#v\n", conf) + } + + consumer, err := sarama.NewConsumer(config.consume.brokers, conf) if err != nil { fmt.Fprintf(os.Stderr, "Failed to create consumer err=%v\n", err) os.Exit(1)