Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit 718029b

Browse files
committed
associate partitions with consumer threads
1 parent 136bfe7 commit 718029b

File tree

12 files changed

+105
-53
lines changed

12 files changed

+105
-53
lines changed

docker/docker-chaos/metrictank.ini

+4
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,8 @@ metrics-buffer-max = 100ms
188188
channel-buffer-size = 1000
189189
# The minimum number of message bytes to fetch in a request
190190
fetch-min = 1
191+
# Initial maximum number of bytes per topic+partition to request when fetching messages from the broker
192+
fetch-message-max = 32768
191193
# The maximum amount of time the broker will wait for Consumer.Fetch.Min bytes to become available before it
192194
max-wait = 1s
193195
# Time to wait between attempts to fetch metadata
@@ -302,6 +304,8 @@ channel-buffer-size = 1000
302304
backlog-process-timeout = 60s
303305
# The minimum number of message bytes to fetch in a request
304306
fetch-min = 1
307+
# Initial maximum number of bytes per topic+partition to request when fetching messages from the broker
308+
fetch-message-max = 32768
305309
# The maximum amount of time the broker will wait for Consumer.Fetch.Min bytes to become available before it
306310
max-wait = 1s
307311
# Time to wait between attempts to fetch metadata

docker/docker-cluster/metrictank.ini

+4
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,8 @@ metrics-buffer-max = 100ms
188188
channel-buffer-size = 1000
189189
# The minimum number of message bytes to fetch in a request
190190
fetch-min = 1
191+
# Initial maximum number of bytes per topic+partition to request when fetching messages from the broker
192+
fetch-message-max = 32768
191193
# The maximum amount of time the broker will wait for Consumer.Fetch.Min bytes to become available before it
192194
max-wait = 1s
193195
# Time to wait between attempts to fetch metadata
@@ -302,6 +304,8 @@ channel-buffer-size = 1000
302304
backlog-process-timeout = 60s
303305
# The minimum number of message bytes to fetch in a request
304306
fetch-min = 1
307+
# Initial maximum number of bytes per topic+partition to request when fetching messages from the broker
308+
fetch-message-max = 32768
305309
# The maximum amount of time the broker will wait for Consumer.Fetch.Min bytes to become available before it
306310
max-wait = 1s
307311
# Time to wait between attempts to fetch metadata

docker/docker-dev-custom-cfg-kafka/metrictank.ini

+4
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,8 @@ metrics-buffer-max = 100ms
188188
channel-buffer-size = 1000
189189
# The minimum number of message bytes to fetch in a request
190190
fetch-min = 1
191+
# Initial maximum number of bytes per topic+partition to request when fetching messages from the broker
192+
fetch-message-max = 32768
191193
# The maximum amount of time the broker will wait for Consumer.Fetch.Min bytes to become available before it
192194
max-wait = 1s
193195
# Time to wait between attempts to fetch metadata
@@ -302,6 +304,8 @@ channel-buffer-size = 1000
302304
backlog-process-timeout = 60s
303305
# The minimum number of message bytes to fetch in a request
304306
fetch-min = 1
307+
# Initial maximum number of bytes per topic+partition to request when fetching messages from the broker
308+
fetch-message-max = 32768
305309
# The maximum amount of time the broker will wait for Consumer.Fetch.Min bytes to become available before it
306310
max-wait = 1s
307311
# Time to wait between attempts to fetch metadata

docs/config.md

+4
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,8 @@ metrics-buffer-max = 100ms
236236
channel-buffer-size = 1000
237237
# The minimum number of message bytes to fetch in a request
238238
fetch-min = 1
239+
# Initial maximum number of bytes per topic+partition to request when fetching messages from the broker
240+
fetch-message-max = 32768
239241
# The maximum amount of time the broker will wait for Consumer.Fetch.Min bytes to become available before it
240242
max-wait = 1s
241243
# Time to wait between attempts to fetch metadata
@@ -359,6 +361,8 @@ channel-buffer-size = 1000
359361
backlog-process-timeout = 60s
360362
# The minimum number of message bytes to fetch in a request
361363
fetch-min = 1
364+
# Initial maximum number of bytes per topic+partition to request when fetching messages from the broker
365+
fetch-message-max = 32768
362366
# The maximum amount of time the broker will wait for Consumer.Fetch.Min bytes to become available before it
363367
max-wait = 1s
364368
# Time to wait between attempts to fetch metadata

input/kafkamdm/kafkamdm.go

+1
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ func ConfigSetup() {
5353
inKafkaMdm.DurationVar(&clientConf.BufferMax, "metrics-buffer-max", time.Millisecond*100, "Delay in milliseconds to wait for messages in the producer queue to accumulate before constructing message batches (MessageSets) to transmit to brokers")
5454
inKafkaMdm.IntVar(&clientConf.ChannelBufferSize, "channel-buffer-size", 1000, "Maximum number of messages allowed on the producer queue")
5555
inKafkaMdm.IntVar(&clientConf.FetchMin, "fetch-min", 1, "Minimum number of bytes the broker responds with. If fetch.wait.max.ms expires the accumulated data will be sent to the client regardless of this setting")
56+
inKafkaMdm.IntVar(&clientConf.FetchMessageMax, "fetch-message-max", 32768, "Initial maximum number of bytes per topic+partition to request when fetching messages from the broker.")
5657
inKafkaMdm.DurationVar(&clientConf.MaxWait, "max-wait", time.Millisecond*100, "Maximum time the broker may wait to fill the response with fetch.min.bytes")
5758
inKafkaMdm.DurationVar(&clientConf.MetadataBackoffTime, "metadata-backoff-time", time.Millisecond*500, "Time to wait between attempts to fetch metadata")
5859
inKafkaMdm.IntVar(&clientConf.MetadataRetries, "metadata-retries", 5, "Number of retries to fetch metadata in case of failure")

kafka/consumer.go

+72-50
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,18 @@ import (
1616
var LogLevel int
1717

1818
type Consumer struct {
19-
conf ClientConf
20-
wg sync.WaitGroup
21-
consumer *confluent.Consumer
22-
Partitions []int32
23-
currentOffsets map[int32]*int64
24-
bootTimeOffsets map[int32]int64
25-
partitionOffset map[int32]*stats.Gauge64
26-
partitionLogSize map[int32]*stats.Gauge64
27-
partitionLag map[int32]*stats.Gauge64
28-
LagMonitor *LagMonitor
29-
stopChan chan struct{}
19+
conf ClientConf
20+
wg sync.WaitGroup
21+
consumer *confluent.Consumer
22+
partitionConsumers map[int32]*confluent.Consumer
23+
Partitions []int32
24+
currentOffsets map[int32]*int64
25+
bootTimeOffsets map[int32]int64
26+
partitionOffset map[int32]*stats.Gauge64
27+
partitionLogSize map[int32]*stats.Gauge64
28+
partitionLag map[int32]*stats.Gauge64
29+
LagMonitor *LagMonitor
30+
stopChan chan struct{}
3031
}
3132

3233
type ClientConf struct {
@@ -41,6 +42,7 @@ type ClientConf struct {
4142
BufferMax time.Duration
4243
ChannelBufferSize int
4344
FetchMin int
45+
FetchMessageMax int
4446
NetMaxOpenRequests int
4547
MaxWait time.Duration
4648
SessionTimeout time.Duration
@@ -63,47 +65,49 @@ func (c *ClientConf) OffsetIsValid() bool {
6365
return true
6466
}
6567

66-
func (c *ClientConf) GetConfluentConfig() *confluent.ConfigMap {
67-
return GetConfig(c.Broker, c.ClientID, "snappy", c.BatchNumMessages, int(c.BufferMax/time.Millisecond), c.ChannelBufferSize, c.FetchMin, c.NetMaxOpenRequests, int(c.MaxWait/time.Millisecond), int(c.SessionTimeout/time.Millisecond))
68-
}
69-
7068
func NewConfig() *ClientConf {
7169
return &ClientConf{
7270
GaugePrefix: "default.kafka.partition",
7371
BatchNumMessages: 10000,
7472
BufferMax: time.Millisecond * 100,
7573
ChannelBufferSize: 1000000,
7674
FetchMin: 1,
77-
NetMaxOpenRequests: 100,
78-
MaxWait: time.Millisecond * 100,
75+
FetchMessageMax: 32768,
76+
MaxWait: time.Second * 1,
7977
SessionTimeout: time.Second * 30,
78+
NetMaxOpenRequests: 1000,
8079
MetadataRetries: 5,
8180
MetadataBackoffTime: time.Millisecond * 500,
8281
MetadataTimeout: time.Second * 10,
8382
LagCollectionInterval: time.Second * 5,
8483
}
8584
}
8685

86+
func (c *ClientConf) GetConfluentConfig(clientId string) *confluent.ConfigMap {
87+
conf := GetConfig(c.Broker, "snappy", c.BatchNumMessages, int(c.BufferMax/time.Millisecond), c.ChannelBufferSize, c.FetchMin, c.FetchMessageMax, c.NetMaxOpenRequests, int(c.MaxWait/time.Millisecond), int(c.SessionTimeout/time.Millisecond))
88+
conf.SetKey("group.id", clientId)
89+
conf.SetKey("retries", 10)
90+
return conf
91+
}
92+
8793
func NewConsumer(conf *ClientConf) (*Consumer, error) {
8894
if len(conf.Topics) < 1 {
8995
return nil, fmt.Errorf("kafka-consumer: Requiring at least 1 topic")
9096
}
9197

92-
clientConf := conf.GetConfluentConfig()
93-
clientConf.SetKey("retries", 10)
94-
clientConf.SetKey("enable.partition.eof", false)
95-
clientConf.SetKey("enable.auto.offset.store", false)
96-
clientConf.SetKey("enable.auto.commit", false)
97-
clientConf.SetKey("go.events.channel.enable", true)
98-
clientConf.SetKey("go.application.rebalance.enable", true)
99-
consumer, err := confluent.NewConsumer(clientConf)
100-
if err != nil {
101-
return nil, err
98+
getConsumer := func(clientId string) (*confluent.Consumer, error) {
99+
clientConf := conf.GetConfluentConfig(clientId)
100+
clientConf.SetKey("enable.partition.eof", false)
101+
clientConf.SetKey("enable.auto.offset.store", false)
102+
clientConf.SetKey("enable.auto.commit", false)
103+
clientConf.SetKey("go.events.channel.enable", true)
104+
clientConf.SetKey("go.events.channel.size", 100000)
105+
clientConf.SetKey("go.application.rebalance.enable", true)
106+
return confluent.NewConsumer(clientConf)
102107
}
103108

104109
c := Consumer{
105110
conf: *conf,
106-
consumer: consumer,
107111
currentOffsets: make(map[int32]*int64),
108112
bootTimeOffsets: make(map[int32]int64),
109113
partitionOffset: make(map[int32]*stats.Gauge64),
@@ -112,6 +116,12 @@ func NewConsumer(conf *ClientConf) (*Consumer, error) {
112116
stopChan: make(chan struct{}),
113117
}
114118

119+
var err error
120+
c.consumer, err = getConsumer(conf.ClientID + "-metadata")
121+
if err != nil {
122+
return nil, err
123+
}
124+
115125
availParts, err := GetPartitions(c.consumer, c.conf.Topics, c.conf.MetadataRetries, int(c.conf.MetadataTimeout/time.Millisecond), c.conf.MetadataBackoffTime)
116126
if err != nil {
117127
return nil, err
@@ -135,8 +145,14 @@ func NewConsumer(conf *ClientConf) (*Consumer, error) {
135145
}
136146
}
137147

148+
c.partitionConsumers = make(map[int32]*confluent.Consumer, len(c.Partitions))
138149
for _, part := range c.Partitions {
139-
_, offset, err := c.consumer.QueryWatermarkOffsets(c.conf.Topics[0], part, int(c.conf.MetadataTimeout/time.Millisecond))
150+
c.partitionConsumers[part], err = getConsumer(fmt.Sprintf("%s-partition-%d", conf.ClientID, part))
151+
if err != nil {
152+
return nil, err
153+
}
154+
155+
_, offset, err := c.partitionConsumers[part].QueryWatermarkOffsets(c.conf.Topics[0], part, int(c.conf.MetadataTimeout/time.Millisecond))
140156
if err != nil {
141157
return nil, fmt.Errorf("Failed to get newest offset for topic %s part %d: %s", c.conf.Topics[0], part, err)
142158
}
@@ -164,8 +180,8 @@ func (c *Consumer) Start(processBacklog *sync.WaitGroup) error {
164180

165181
go c.monitorLag(processBacklog)
166182

167-
for range c.Partitions {
168-
go c.consume()
183+
for _, partition := range c.Partitions {
184+
go c.consume(partition)
169185
}
170186

171187
return nil
@@ -202,13 +218,18 @@ func (c *Consumer) StartAndAwaitBacklog(backlogProcessTimeout time.Duration) err
202218
return nil
203219
}
204220

205-
func (c *Consumer) consume() {
221+
func (c *Consumer) consume(partition int32) {
206222
c.wg.Add(1)
207223
defer c.wg.Done()
208224

209-
var ok bool
210225
var offsetPtr *int64
211-
events := c.consumer.Events()
226+
var ok bool
227+
if offsetPtr, ok = c.currentOffsets[partition]; !ok || offsetPtr == nil {
228+
log.Fatal(3, "kafka-consumer: Failed to get currentOffset for partition %d", partition)
229+
}
230+
231+
log.Info("kafka-consumer: Consumer started for partition %d of topics %+v", partition, c.conf.Topics)
232+
events := c.partitionConsumers[partition].Events()
212233
for {
213234
select {
214235
case ev := <-events:
@@ -221,23 +242,18 @@ func (c *Consumer) consume() {
221242
log.Info("kafka-consumer: Revoked partitions: %+v", e)
222243
case *confluent.Message:
223244
tp := e.TopicPartition
224-
if LogLevel < 2 {
225-
log.Debug("kafka-consumer: Received message: Topic %s, Partition: %d, Offset: %d, Key: %x", tp.Topic, tp.Partition, tp.Offset, e.Key)
226-
}
227-
228-
if offsetPtr, ok = c.currentOffsets[tp.Partition]; !ok || offsetPtr == nil {
229-
log.Error(3, "kafka-consumer: Received message of unexpected partition: %s:%d", tp.Topic, tp.Partition)
245+
if tp.Partition != partition {
246+
fmt.Println("received unexpected partition")
230247
continue
231248
}
232-
233249
c.conf.MessageHandler(e.Value, tp.Partition)
234250
atomic.StoreInt64(offsetPtr, int64(tp.Offset))
235251
case *confluent.Error:
236252
log.Error(3, "kafka-consumer: Kafka consumer error: %s", e.String())
237253
return
238254
}
239255
case <-c.stopChan:
240-
log.Info("kafka-consumer: Consumer ended.")
256+
log.Info("kafka-consumer: Consumer ended for partition %d topics %+v", partition, c.conf.Topics)
241257
return
242258
}
243259
}
@@ -302,12 +318,14 @@ func (c *Consumer) monitorLag(processBacklog *sync.WaitGroup) {
302318
func (c *Consumer) startConsumer() error {
303319
var offset confluent.Offset
304320
var err error
305-
var topicPartitions confluent.TopicPartitions
306321
c.currentOffsets = make(map[int32]*int64, len(c.Partitions))
307322

308-
for i, topic := range c.conf.Topics {
309-
for _, partition := range c.Partitions {
310-
var currentOffset int64
323+
for _, partition := range c.Partitions {
324+
var currentOffset int64
325+
var topicPartitions confluent.TopicPartitions
326+
c.currentOffsets[partition] = &currentOffset
327+
328+
for _, topic := range c.conf.Topics {
311329
switch c.conf.StartAtOffset {
312330
case "oldest":
313331
currentOffset, err = c.tryGetOffset(topic, partition, int64(confluent.OffsetBeginning), 3, time.Second)
@@ -345,14 +363,15 @@ func (c *Consumer) startConsumer() error {
345363
Partition: partition,
346364
Offset: offset,
347365
})
366+
}
348367

349-
if i == 0 {
350-
c.currentOffsets[partition] = &currentOffset
351-
}
368+
err := c.partitionConsumers[partition].Assign(topicPartitions)
369+
if err != nil {
370+
return err
352371
}
353372
}
354373

355-
return c.consumer.Assign(topicPartitions)
374+
return nil
356375
}
357376

358377
func (c *Consumer) tryGetOffset(topic string, partition int32, offsetI int64, attempts int, sleep time.Duration) (int64, error) {
@@ -403,4 +422,7 @@ func (c *Consumer) Stop() {
403422
close(c.stopChan)
404423
c.wg.Wait()
405424
c.consumer.Close()
425+
for i := range c.partitionConsumers {
426+
c.partitionConsumers[i].Close()
427+
}
406428
}

kafka/utils.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@ import (
44
confluent "github.com/confluentinc/confluent-kafka-go/kafka"
55
)
66

7-
func GetConfig(broker, client, compression string, batchNumMessages, bufferMaxMs, bufferSize, fetchMin, maxOpenRequests, maxWaitMs, sessionTimeoutMs int) *confluent.ConfigMap {
7+
func GetConfig(broker, compression string, batchNumMessages, bufferMaxMs, bufferSize, fetchMin, fetchMessageMax, maxOpenRequests, maxWaitMs, sessionTimeoutMs int) *confluent.ConfigMap {
88
return &confluent.ConfigMap{
99
"bootstrap.servers": broker,
1010
"compression.codec": "snappy",
11-
"group.id": client,
1211
"fetch.min.bytes": fetchMin,
12+
"fetch.message.max.bytes": fetchMessageMax,
1313
"fetch.wait.max.ms": maxWaitMs,
1414
"max.in.flight.requests.per.connection": maxOpenRequests,
1515
"queue.buffering.max.messages": bufferSize,

mdata/notifierKafka/cfg.go

+1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ func init() {
3131
fs.DurationVar(&clientConf.BufferMax, "metrics-buffer-max", time.Millisecond*100, "Delay to wait for messages in the producer queue to accumulate before constructing message batches (MessageSets) to transmit to brokers")
3232
fs.IntVar(&clientConf.ChannelBufferSize, "channel-buffer-size", 1000000, "Maximum number of messages allowed on the producer queue")
3333
fs.IntVar(&clientConf.FetchMin, "fetch-min", 1, "Minimum number of bytes the broker responds with. If fetch.wait.max.ms expires the accumulated data will be sent to the client regardless of this setting")
34+
fs.IntVar(&clientConf.FetchMessageMax, "fetch-message-max", 32768, "Initial maximum number of bytes per topic+partition to request when fetching messages from the broker.")
3435
fs.DurationVar(&clientConf.MaxWait, "max-wait", time.Millisecond*100, "Maximum time the broker may wait to fill the response with fetch.min.bytes")
3536
fs.DurationVar(&clientConf.MetadataBackoffTime, "metadata-backoff-time", time.Millisecond*500, "Time to wait between attempts to fetch metadata")
3637
fs.IntVar(&clientConf.MetadataRetries, "metadata-retries", 5, "Number of retries to fetch metadata in case of failure")

mdata/notifierKafka/notifierKafka.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ type NotifierKafka struct {
3434

3535
func New(instance string, metrics mdata.Metrics, idx idx.MetricIndex) *NotifierKafka {
3636
clientConf.ClientID = instance + "-notifier-producer"
37-
producer, err := confluent.NewProducer(clientConf.GetConfluentConfig())
37+
producer, err := confluent.NewProducer(clientConf.GetConfluentConfig("notifier"))
3838

3939
if err != nil {
4040
log.Fatal(2, "kafka-cluster failed to initialize producer: %s", err)

metrictank-sample.ini

+4
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,8 @@ metrics-buffer-max = 100ms
191191
channel-buffer-size = 1000
192192
# The minimum number of message bytes to fetch in a request
193193
fetch-min = 1
194+
# Initial maximum number of bytes per topic+partition to request when fetching messages from the broker
195+
fetch-message-max = 32768
194196
# The maximum amount of time the broker will wait for Consumer.Fetch.Min bytes to become available before it
195197
max-wait = 1s
196198
# Time to wait between attempts to fetch metadata
@@ -305,6 +307,8 @@ channel-buffer-size = 1000
305307
backlog-process-timeout = 60s
306308
# The minimum number of message bytes to fetch in a request
307309
fetch-min = 1
310+
# Initial maximum number of bytes per topic+partition to request when fetching messages from the broker
311+
fetch-message-max = 32768
308312
# The maximum amount of time the broker will wait for Consumer.Fetch.Min bytes to become available before it
309313
max-wait = 1s
310314
# Time to wait between attempts to fetch metadata

scripts/config/metrictank-docker.ini

+4
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,8 @@ metrics-buffer-max = 100ms
188188
channel-buffer-size = 1000
189189
# The minimum number of message bytes to fetch in a request
190190
fetch-min = 1
191+
# Initial maximum number of bytes per topic+partition to request when fetching messages from the broker
192+
fetch-message-max = 32768
191193
# The maximum amount of time the broker will wait for Consumer.Fetch.Min bytes to become available before it
192194
max-wait = 1s
193195
# Time to wait between attempts to fetch metadata
@@ -302,6 +304,8 @@ channel-buffer-size = 1000
302304
backlog-process-timeout = 60s
303305
# The minimum number of message bytes to fetch in a request
304306
fetch-min = 1
307+
# Initial maximum number of bytes per topic+partition to request when fetching messages from the broker
308+
fetch-message-max = 32768
305309
# The maximum amount of time the broker will wait for Consumer.Fetch.Min bytes to become available before it
306310
max-wait = 1s
307311
# Time to wait between attempts to fetch metadata

0 commit comments

Comments
 (0)