Skip to content

Commit

Permalink
refactor: migrate to auto commit
Browse files Browse the repository at this point in the history
  • Loading branch information
Abdulsametileri committed Sep 6, 2023
1 parent e1afd0b commit 3d81c14
Show file tree
Hide file tree
Showing 6 changed files with 8 additions and 180 deletions.
6 changes: 2 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,8 @@ Kafka Konsumer offers an API that handles exposing several metrics.
| Metric Name | Description | Value Type |
|---------------------------------------------------------|---------------------------------------------|------------|
| kafka_konsumer_processed_messages_total_current | Total number of processed messages. | Counter |
| kafka_konsumer_processed_batch_messages_total_current | Total number of processed batch messages. | Counter |
| kafka_konsumer_unprocessed_messages_total_current | Total number of unprocessed messages. | Counter |
| kafka_konsumer_processed_batch_messages_total_current | Total number of processed batch messages. | Counter |
| kafka_konsumer_unprocessed_batch_messages_total_current | Total number of unprocessed batch messages. | Counter |

**NOTE:** `kafka_konsumer_processed_batch_messages_total_current` and `kafka_konsumer_unprocessed_batch_messages_total_current` will
be deprecated in the next releases.
Please use `kafka_konsumer_processed_messages_total_current` and `kafka_konsumer_unprocessed_messages_total_current` instead.
**NOTE:** `kafka_konsumer_processed_batch_messages_total_current` and `kafka_konsumer_unprocessed_batch_messages_total_current` will be deprecated in the next releases. Please use `kafka_konsumer_processed_messages_total_current` and `kafka_konsumer_unprocessed_messages_total_current` instead.
12 changes: 3 additions & 9 deletions batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"time"

kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka"
"github.com/segmentio/kafka-go"
)

type batchConsumer struct {
Expand Down Expand Up @@ -55,8 +54,6 @@ func (b *batchConsumer) Consume() {
b.wg.Add(1)
go b.startBatch()
}

go b.handleCommit()
}

func (b *batchConsumer) startBatch() {
Expand Down Expand Up @@ -98,6 +95,8 @@ func (b *batchConsumer) process(messages []Message) {

// Try to process same message again
if consumeErr = b.consumeFn(messages); consumeErr != nil {
b.metric.TotalUnprocessedMessagesCounter += int64(len(messages))

b.logger.Warnf("Consume Function Again Err %s, messages are sending to exception/retry topic %s", consumeErr.Error(), b.retryTopic)

cronsumerMessages := make([]kcronsumer.Message, 0, len(messages))
Expand All @@ -111,10 +110,5 @@ func (b *batchConsumer) process(messages []Message) {
}
}

segmentioMessages := make([]kafka.Message, 0, len(messages))
for i := range messages {
segmentioMessages = append(segmentioMessages, kafka.Message(messages[i]))
}

b.commitReq <- segmentioMessages
b.metric.TotalProcessedMessagesCounter += int64(len(messages))
}
6 changes: 2 additions & 4 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package kafka

import (
kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka"
"github.com/segmentio/kafka-go"
)

type consumer struct {
Expand Down Expand Up @@ -50,8 +49,6 @@ func (c *consumer) Consume() {
}
}()
}

go c.handleCommit()
}

func (c *consumer) process(message Message) {
Expand All @@ -61,6 +58,7 @@ func (c *consumer) process(message Message) {

// Try to process same message again
if consumeErr = c.consumeFn(message); consumeErr != nil {
c.metric.TotalUnprocessedMessagesCounter++
c.logger.Warnf("Consume Function Again Err %s, message is sending to exception/retry topic %s", consumeErr.Error(), c.retryTopic)

retryableMsg := message.toRetryableMessage(c.retryTopic)
Expand All @@ -71,5 +69,5 @@ func (c *consumer) process(message Message) {
}
}

c.commitReq <- []kafka.Message{kafka.Message(message)}
c.metric.TotalProcessedMessagesCounter++
}
32 changes: 1 addition & 31 deletions consumer_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ type base struct {
context context.Context
messageCh chan Message
quit chan struct{}
commitReq chan []kafka.Message
cancelFn context.CancelFunc
r *kafka.Reader
retryTopic string
Expand Down Expand Up @@ -57,7 +56,6 @@ func newBase(cfg *ConsumerConfig) (*base, error) {
metric: &ConsumerMetric{},
messageCh: make(chan Message, cfg.Concurrency),
quit: make(chan struct{}),
commitReq: make(chan []kafka.Message),
concurrency: cfg.Concurrency,
retryEnabled: cfg.RetryEnabled,
logger: log,
Expand Down Expand Up @@ -97,7 +95,7 @@ func (c *base) startConsume() {
case <-c.quit:
return
default:
message, err := c.r.FetchMessage(c.context)
message, err := c.r.ReadMessage(c.context)
if err != nil {
if c.context.Err() != nil {
continue
Expand All @@ -111,33 +109,6 @@ func (c *base) startConsume() {
}
}

func (c *base) handleCommit() {
// it is used for tracking the latest committed offsets by topic => partition => offset
offsets := offsetStash{}

for msgs := range c.commitReq {
// Extract messages which needed to commit
willBeCommitted := offsets.IgnoreAlreadyCommittedMessages(msgs)
if len(willBeCommitted) == 0 {
continue
}

commitErr := c.r.CommitMessages(context.Background(), willBeCommitted...)
if commitErr != nil {
c.logger.Error("Error Committing messages %s", commitErr.Error())
c.metric.TotalUnprocessedMessagesCounter++
continue
}

// Update the latest offsets with recently committed messages
offsets.Update(willBeCommitted)

c.metric.TotalProcessedMessagesCounter++

c.logger.Debug(offsets)
}
}

func (c *base) WithLogger(logger LoggerInterface) {
c.logger = logger
}
Expand All @@ -151,7 +122,6 @@ func (c *base) Stop() error {
c.quit <- struct{}{}
close(c.messageCh)
c.wg.Wait()
close(c.commitReq)
err = c.r.Close()
})

Expand Down
46 changes: 0 additions & 46 deletions offset_manager.go

This file was deleted.

86 changes: 0 additions & 86 deletions offset_manager_test.go

This file was deleted.

0 comments on commit 3d81c14

Please sign in to comment.