Skip to content

Commit b473b0a

Browse files
committed
remove kafka offset tracker
there is no use case for it. see grafana#1016
1 parent e407846 commit b473b0a

File tree

8 files changed

+28
-229
lines changed

8 files changed

+28
-229
lines changed

cmd/mt-kafka-mdm-sniff-out-of-order/main.go

-2
Original file line numberDiff line numberDiff line change
@@ -204,8 +204,6 @@ func main() {
204204

205205
// config may have had it disabled
206206
inKafkaMdm.Enabled = true
207-
// important: we don't want to share the same offset tracker as the mdm input of MT itself
208-
inKafkaMdm.DataDir = "/tmp/" + instance
209207

210208
inKafkaMdm.ConfigProcess(instance)
211209

cmd/mt-kafka-mdm-sniff/main.go

-2
Original file line numberDiff line numberDiff line change
@@ -151,8 +151,6 @@ func main() {
151151

152152
// config may have had it disabled
153153
inKafkaMdm.Enabled = true
154-
// important: we don't want to share the same offset tracker as the mdm input of MT itself
155-
inKafkaMdm.DataDir = "/tmp/" + instance
156154

157155
inKafkaMdm.ConfigProcess(instance)
158156

input/kafkamdm/kafkamdm.go

+2-31
Original file line numberDiff line numberDiff line change
@@ -55,17 +55,14 @@ var topics []string
5555
var partitionStr string
5656
var partitions []int32
5757
var offsetStr string
58-
var DataDir string
5958
var config *sarama.Config
6059
var channelBufferSize int
6160
var consumerFetchMin int
6261
var consumerFetchDefault int
6362
var consumerMaxWaitTime time.Duration
6463
var consumerMaxProcessingTime time.Duration
6564
var netMaxOpenRequests int
66-
var offsetMgr *kafka.OffsetMgr
6765
var offsetDuration time.Duration
68-
var offsetCommitInterval time.Duration
6966
var partitionOffset map[int32]*stats.Gauge64
7067
var partitionLogSize map[int32]*stats.Gauge64
7168
var partitionLag map[int32]*stats.Gauge64
@@ -77,10 +74,8 @@ func ConfigSetup() {
7774
inKafkaMdm.StringVar(&brokerStr, "brokers", "kafka:9092", "tcp address for kafka (may be be given multiple times as a comma-separated list)")
7875
inKafkaMdm.StringVar(&kafkaVersionStr, "kafka-version", "0.10.0.0", "Kafka version in semver format. All brokers must be this version or newer.")
7976
inKafkaMdm.StringVar(&topicStr, "topics", "mdm", "kafka topic (may be given multiple times as a comma-separated list)")
80-
inKafkaMdm.StringVar(&offsetStr, "offset", "last", "Set the offset to start consuming from. Can be one of newest, oldest,last or a time duration")
77+
inKafkaMdm.StringVar(&offsetStr, "offset", "newest", "Set the offset to start consuming from. Can be oldest, newest or a time duration")
8178
inKafkaMdm.StringVar(&partitionStr, "partitions", "*", "kafka partitions to consume. use '*' or a comma separated list of id's")
82-
inKafkaMdm.DurationVar(&offsetCommitInterval, "offset-commit-interval", time.Second*5, "Interval at which offsets should be saved.")
83-
inKafkaMdm.StringVar(&DataDir, "data-dir", "", "Directory to store partition offsets index")
8479
inKafkaMdm.IntVar(&channelBufferSize, "channel-buffer-size", 1000, "The number of metrics to buffer in internal and external channels")
8580
inKafkaMdm.IntVar(&consumerFetchMin, "consumer-fetch-min", 1, "The minimum number of message bytes to fetch in a request")
8681
inKafkaMdm.IntVar(&consumerFetchDefault, "consumer-fetch-default", 32768, "The default number of message bytes to fetch in a request")
@@ -100,9 +95,6 @@ func ConfigProcess(instance string) {
10095
log.Fatalf("kafkamdm: invalid kafka-version. %s", err)
10196
}
10297

103-
if offsetCommitInterval == 0 {
104-
log.Fatal("kafkamdm: offset-commit-interval must be greater then 0")
105-
}
10698
if consumerMaxWaitTime == 0 {
10799
log.Fatal("kafkamdm: consumer-max-wait-time must be greater then 0")
108100
}
@@ -111,7 +103,6 @@ func ConfigProcess(instance string) {
111103
}
112104

113105
switch offsetStr {
114-
case "last":
115106
case "oldest":
116107
case "newest":
117108
default:
@@ -121,10 +112,6 @@ func ConfigProcess(instance string) {
121112
}
122113
}
123114

124-
offsetMgr, err = kafka.NewOffsetMgr(DataDir)
125-
if err != nil {
126-
log.Fatalf("kafkamdm: couldnt create offsetMgr. %s", err)
127-
}
128115
brokers = strings.Split(brokerStr, ",")
129116
topics = strings.Split(topicStr, ",")
130117

@@ -222,12 +209,6 @@ func (k *KafkaMdm) Start(handler input.Handler, cancel context.CancelFunc) error
222209
offset = sarama.OffsetOldest
223210
case "newest":
224211
offset = sarama.OffsetNewest
225-
case "last":
226-
offset, err = offsetMgr.Last(topic, partition)
227-
if err != nil {
228-
log.Errorf("kafkamdm: Failed to get %q duration offset for %s:%d. %q", offsetStr, topic, partition, err)
229-
return err
230-
}
231212
default:
232213
offset, err = k.client.GetOffset(topic, partition, time.Now().Add(-1*offsetDuration).UnixNano()/int64(time.Millisecond))
233214
if err != nil {
@@ -315,26 +296,20 @@ func (k *KafkaMdm) consumePartition(topic string, partition int32, currentOffset
315296
return
316297
}
317298
messages := pc.Messages()
318-
ticker := time.NewTicker(offsetCommitInterval)
299+
ticker := time.NewTicker(5 * time.Second)
319300
for {
320301
select {
321302
case msg, ok := <-messages:
322303
// https://github.com/Shopify/sarama/wiki/Frequently-Asked-Questions#why-am-i-getting-a-nil-message-from-the-sarama-consumer
323304
if !ok {
324305
log.Errorf("kafkamdm: kafka consumer for %s:%d has shutdown. stop consuming", topic, partition)
325-
if err := offsetMgr.Commit(topic, partition, currentOffset); err != nil {
326-
log.Errorf("kafkamdm: failed to commit offset for %s:%d, %s", topic, partition, err)
327-
}
328306
k.cancel()
329307
return
330308
}
331309
log.Debugf("kafkamdm: received message: Topic %s, Partition: %d, Offset: %d, Key: %x", msg.Topic, msg.Partition, msg.Offset, msg.Key)
332310
k.handleMsg(msg.Value, partition)
333311
currentOffset = msg.Offset
334312
case ts := <-ticker.C:
335-
if err := offsetMgr.Commit(topic, partition, currentOffset); err != nil {
336-
log.Errorf("kafkamdm: failed to commit offset for %s:%d, %s", topic, partition, err)
337-
}
338313
k.lagMonitor.StoreOffset(partition, currentOffset, ts)
339314
newest, err := k.tryGetOffset(topic, partition, sarama.OffsetNewest, 1, 0)
340315
if err != nil {
@@ -351,9 +326,6 @@ func (k *KafkaMdm) consumePartition(topic string, partition int32, currentOffset
351326
}
352327
case <-k.stopConsuming:
353328
pc.Close()
354-
if err := offsetMgr.Commit(topic, partition, currentOffset); err != nil {
355-
log.Errorf("kafkamdm: failed to commit offset for %s:%d, %s", topic, partition, err)
356-
}
357329
log.Infof("kafkamdm: consumer for %s:%d ended.", topic, partition)
358330
return
359331
}
@@ -391,7 +363,6 @@ func (k *KafkaMdm) Stop() {
391363
close(k.stopConsuming)
392364
k.wg.Wait()
393365
k.client.Close()
394-
offsetMgr.Close()
395366
}
396367

397368
func (k *KafkaMdm) MaintainPriority() {

input/kafkamdm/lag_monitor.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,7 @@ func newLagLogger(size int) *lagLogger {
2222
// Store saves the current value, potentially overwriting an old value
2323
// if needed.
2424
// Note: negative values are ignored. We rely on previous data - if any - in such case.
25-
// negative values can happen when:
26-
// - kafka had to recover, and a previous offset loaded from offsetMgr was bigger than current offset
27-
// - a rollover of the offset counter
25+
// negative values can happen upon a rollover of the offset counter
2826
func (l *lagLogger) Store(lag int) {
2927
if lag < 0 {
3028
return

kafka/offsetMgr.go

-131
This file was deleted.

mdata/notifierKafka/cfg.go

+1-6
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,8 @@ var brokerStr string
2020
var brokers []string
2121
var topic string
2222
var offsetStr string
23-
var dataDir string
2423
var config *sarama.Config
2524
var offsetDuration time.Duration
26-
var offsetCommitInterval time.Duration
2725
var partitionStr string
2826
var partitions []int32
2927
var bootTimeOffsets map[int32]int64
@@ -46,9 +44,7 @@ func init() {
4644
fs.StringVar(&kafkaVersionStr, "kafka-version", "0.10.0.0", "Kafka version in semver format. All brokers must be this version or newer.")
4745
fs.StringVar(&topic, "topic", "metricpersist", "kafka topic")
4846
fs.StringVar(&partitionStr, "partitions", "*", "kafka partitions to consume. use '*' or a comma separated list of id's. This should match the partitions used for kafka-mdm-in")
49-
fs.StringVar(&offsetStr, "offset", "last", "Set the offset to start consuming from. Can be one of newest, oldest,last or a time duration")
50-
fs.StringVar(&dataDir, "data-dir", "", "Directory to store partition offsets index")
51-
fs.DurationVar(&offsetCommitInterval, "offset-commit-interval", time.Second*5, "Interval at which offsets should be saved.")
47+
fs.StringVar(&offsetStr, "offset", "newest", "Set the offset to start consuming from. Can be oldest, newest or a time duration")
5248
fs.StringVar(&backlogProcessTimeoutStr, "backlog-process-timeout", "60s", "Maximum time backlog processing can block during metrictank startup.")
5349
globalconf.Register("kafka-cluster", fs)
5450
}
@@ -64,7 +60,6 @@ func ConfigProcess(instance string) {
6460
}
6561

6662
switch offsetStr {
67-
case "last":
6863
case "oldest":
6964
case "newest":
7065
default:

0 commit comments

Comments
 (0)