Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit 9dd2b88

Browse files
committed
cleaner way to define and manage the kafkaStats and its memory layout
1 parent d6f13a2 commit 9dd2b88

File tree

3 files changed

+43
-30
lines changed

3 files changed

+43
-30
lines changed

input/kafkamdm/kafkamdm.go

+14-26
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ var consumerMaxWaitTime time.Duration
6060
var consumerMaxProcessingTime time.Duration
6161
var netMaxOpenRequests int
6262
var offsetDuration time.Duration
63-
var kafkaStats map[int32]*KafkaStats
63+
var kafkaStats stats.Kafka
6464

6565
func ConfigSetup() {
6666
inKafkaMdm := flag.NewFlagSet("kafka-mdm-in", flag.ExitOnError)
@@ -158,25 +158,13 @@ func ConfigProcess(instance string) {
158158
cluster.Manager.SetPartitions(partitions)
159159
}
160160

161-
// initialize our offset metrics
162-
kafkaStats = make(map[int32]*KafkaStats)
163-
for _, part := range partitions {
164-
ks := KafkaStats{}
165-
// metric input.kafka-mdm.partition.%d.offset is the current offset for the partition (%d) that we have consumed.
166-
stats.NewGauge64Existing(fmt.Sprintf("input.kafka-mdm.partition.%d.offset", part), &ks.offset)
167-
// metric input.kafka-mdm.partition.%d.log_size is the current size of the kafka partition (%d), aka the newest available offset.
168-
stats.NewGauge64Existing(fmt.Sprintf("input.kafka-mdm.partition.%d.log_size", part), &ks.logSize)
169-
// metric input.kafka-mdm.partition.%d.lag is how many messages (metrics) there are in the kafka partition (%d) that we have not yet consumed.
170-
stats.NewGauge64Existing(fmt.Sprintf("input.kafka-mdm.partition.%d.lag", part), &ks.lag)
171-
kafkaStats[part] = &ks
172-
}
173-
}
161+
// the extra empty newlines are because metrics2docs doesn't recognize the comments properly otherwise
162+
// metric input.kafka-mdm.partition.%d.offset is the current offset for the partition (%d) that we have consumed.
163+
164+
// metric input.kafka-mdm.partition.%d.log_size is the current size of the kafka partition (%d), aka the newest available offset.
174165

175-
// KafkaStats is a set of per-partition stats to track the health of our consumer
176-
type KafkaStats struct {
177-
offset stats.Gauge64
178-
logSize stats.Gauge64
179-
lag stats.Gauge64
166+
// metric input.kafka-mdm.partition.%d.lag is how many messages (metrics) there are in the kafka partition (%d) that we have not yet consumed.
167+
kafkaStats = stats.NewKafka("input.kafka-mdm", partitions)
180168
}
181169

182170
func New() *KafkaMdm {
@@ -284,9 +272,9 @@ func (k *KafkaMdm) consumePartition(topic string, partition int32, currentOffset
284272
}
285273
}
286274

287-
kafkaStats.offset.Set(int(currentOffset))
288-
kafkaStats.logSize.Set(int(newest))
289-
kafkaStats.lag.Set(int(newest - currentOffset))
275+
kafkaStats.Offset.Set(int(currentOffset))
276+
kafkaStats.LogSize.Set(int(newest))
277+
kafkaStats.Lag.Set(int(newest - currentOffset))
290278
go k.trackStats(topic, partition)
291279

292280
log.Infof("kafkamdm: consuming from %s:%d from offset %d", topic, partition, currentOffset)
@@ -308,7 +296,7 @@ func (k *KafkaMdm) consumePartition(topic string, partition int32, currentOffset
308296
}
309297
log.Debugf("kafkamdm: received message: Topic %s, Partition: %d, Offset: %d, Key: %x", msg.Topic, msg.Partition, msg.Offset, msg.Key)
310298
k.handleMsg(msg.Value, partition)
311-
kafkaStats.offset.Set(int(msg.Offset))
299+
kafkaStats.Offset.Set(int(msg.Offset))
312300
case <-k.shutdown:
313301
pc.Close()
314302
log.Infof("kafkamdm: consumer for %s:%d ended.", topic, partition)
@@ -359,16 +347,16 @@ func (k *KafkaMdm) trackStats(topic string, partition int32) {
359347
ticker.Stop()
360348
return
361349
case ts := <-ticker.C:
362-
currentOffset := int64(kafkaStats.offset.Peek())
350+
currentOffset := int64(kafkaStats.Offset.Peek())
363351
k.lagMonitor.StoreOffset(partition, currentOffset, ts)
364352
newest, err := k.tryGetOffset(topic, partition, sarama.OffsetNewest, 1, 0)
365353
if err != nil {
366354
log.Errorf("kafkamdm: %s", err.Error())
367355
continue
368356
}
369-
kafkaStats.logSize.Set(int(newest))
357+
kafkaStats.LogSize.Set(int(newest))
370358
lag := int(newest - currentOffset)
371-
kafkaStats.lag.Set(lag)
359+
kafkaStats.Lag.Set(lag)
372360
k.lagMonitor.StoreLag(partition, lag)
373361
}
374362
}

stats/gauge64.go

-4
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,6 @@ func NewGauge64(name string) *Gauge64 {
1212
return registry.getOrAdd(name, &u).(*Gauge64)
1313
}
1414

15-
func NewGauge64Existing(name string, g *Gauge64) *Gauge64 {
16-
return registry.getOrAdd(name, g).(*Gauge64)
17-
}
18-
1915
func (g *Gauge64) Inc() {
2016
atomic.AddUint64((*uint64)(g), 1)
2117
}

stats/kafka.go

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package stats
2+
3+
import "strconv"
4+
5+
// Kafka tracks the health of a consumer
6+
type Kafka map[int32]*KafkaPartition
7+
8+
func NewKafka(prefix string, partitions []int32) Kafka {
9+
k := make(map[int32]*KafkaPartition)
10+
for _, part := range partitions {
11+
k[part] = NewKafkaPartition(prefix + ".partition." + strconv.Itoa(int(part)))
12+
}
13+
return k
14+
}
15+
16+
// KafkaPartition tracks the health of a partition consumer
17+
type KafkaPartition struct {
18+
Offset Gauge64
19+
LogSize Gauge64
20+
Lag Gauge64
21+
}
22+
23+
func NewKafkaPartition(prefix string) *KafkaPartition {
24+
k := KafkaPartition{}
25+
registry.getOrAdd(prefix+".offset", &k.Offset)
26+
registry.getOrAdd(prefix+".log_size", &k.LogSize)
27+
registry.getOrAdd(prefix+".lag", &k.Lag)
28+
return &k
29+
}

0 commit comments

Comments
 (0)