Skip to content

Commit

Permalink
feat: add non transactional retry feature for only handling failed me…
Browse files Browse the repository at this point in the history
…ssages

* feat: add non transactional retry feature for only handling failed messages

* chore: add tests

* chore: lint

* chore: lint

* chore: add integration tests

* chore: fix unit test

* chore: fix integration test

* chore: fix integration test

* feat: little changes

* feat: little changes

---------

Co-authored-by: Abdulsametileri <sametileri07@gmail.com>
  • Loading branch information
oguzyildirim and Abdulsametileri authored Nov 15, 2023
1 parent a92ffd8 commit aa507a1
Show file tree
Hide file tree
Showing 15 changed files with 244 additions and 55 deletions.
54 changes: 49 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,55 @@ After running `docker-compose up` command, you can run any application you want.
fmt.Printf("%d\n comes first %s", len(messages), messages[0].Value)
return nil
}

</details>

<details>
<summary>With Disabling Transactional Retry</summary>

func main() {
consumerCfg := &kafka.ConsumerConfig{
Reader: kafka.ReaderConfig{
Brokers: []string{"localhost:29092"},
Topic: "standart-topic",
GroupID: "standart-cg",
},
LogLevel: kafka.LogLevelDebug,
RetryEnabled: true,
TransactionalRetry: kafka.NewBoolPtr(false),
RetryConfiguration: kafka.RetryConfiguration{
Brokers: []string{"localhost:29092"},
Topic: "retry-topic",
StartTimeCron: "*/1 * * * *",
WorkDuration: 50 * time.Second,
MaxRetry: 3,
},
BatchConfiguration: kafka.BatchConfiguration{
MessageGroupLimit: 1000,
MessageGroupDuration: time.Second,
BatchConsumeFn: batchConsumeFn,
},
}

consumer, _ := kafka.NewConsumer(consumerCfg)
defer consumer.Stop()

consumer.Consume()
}

func batchConsumeFn(messages []kafka.Message) error {
// you can add custom error handling here & flag messages
for i := range messages {
if i%2 == 0 {
messages[i].IsFailed = true
}
}

// you must return err here to retry failed messages
return errors.New("err")
}

</details>

#### With Distributed Tracing Support

Expand Down Expand Up @@ -147,6 +194,7 @@ under [the specified folder](examples/with-sasl-plaintext) and then start the ap
| `logLevel` | Describes log level; valid options are `debug`, `info`, `warn`, and `error` | info |
| `concurrency` | Number of goroutines used at listeners | 1 |
| `retryEnabled` | Retry/Exception consumer is working or not | false |
| `transactionalRetry` | Set false if you want to use exception/retry strategy to only failed messages | true |
| `commitInterval` | indicates the interval at which offsets are committed to the broker. | 1s |
| `rack` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#RackAffinityGroupBalancer) | |
| `clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.42#Dialer) | |
Expand Down Expand Up @@ -192,8 +240,4 @@ Kafka Konsumer offers an API that handles exposing several metrics.
| Metric Name | Description | Value Type |
|---------------------------------------------------------|---------------------------------------------|------------|
| kafka_konsumer_processed_messages_total_current | Total number of processed messages. | Counter |
| kafka_konsumer_unprocessed_messages_total_current | Total number of unprocessed messages. | Counter |
| kafka_konsumer_processed_batch_messages_total_current | Total number of processed batch messages. | Counter |
| kafka_konsumer_unprocessed_batch_messages_total_current | Total number of unprocessed batch messages. | Counter |

**NOTE:** `kafka_konsumer_processed_batch_messages_total_current` and `kafka_konsumer_unprocessed_batch_messages_total_current` will be deprecated in the next releases. Please use `kafka_konsumer_processed_messages_total_current` and `kafka_konsumer_unprocessed_messages_total_current` instead.
| kafka_konsumer_unprocessed_messages_total_current | Total number of unprocessed messages. | Counter |
14 changes: 11 additions & 3 deletions batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,24 @@ func (b *batchConsumer) process(messages []*Message) {

if consumeErr != nil {
b.logger.Warnf("Consume Function Err %s, Messages will be retried", consumeErr.Error())
// Try to process same messages again
// Try to process same messages again for resolving transient network errors etc.
if consumeErr = b.consumeFn(messages); consumeErr != nil {
b.logger.Warnf("Consume Function Again Err %s, messages are sending to exception/retry topic %s", consumeErr.Error(), b.retryTopic)
b.metric.TotalUnprocessedMessagesCounter += int64(len(messages))
}

if consumeErr != nil && b.retryEnabled {
cronsumerMessages := make([]kcronsumer.Message, 0, len(messages))
for i := range messages {
cronsumerMessages = append(cronsumerMessages, messages[i].toRetryableMessage(b.retryTopic))
if b.transactionalRetry {
for i := range messages {
cronsumerMessages = append(cronsumerMessages, messages[i].toRetryableMessage(b.retryTopic))
}
} else {
for i := range messages {
if messages[i].IsFailed {
cronsumerMessages = append(cronsumerMessages, messages[i].toRetryableMessage(b.retryTopic))
}
}
}

if produceErr := b.base.cronsumer.ProduceBatch(cronsumerMessages); produceErr != nil {
Expand Down
46 changes: 41 additions & 5 deletions batch_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func Test_batchConsumer_process(t *testing.T) {
t.Run("When_Processing_Is_Successful", func(t *testing.T) {
// Given
bc := batchConsumer{
base: &base{metric: &ConsumerMetric{}},
base: &base{metric: &ConsumerMetric{}, transactionalRetry: true},
consumeFn: func([]*Message) error {
return nil
},
Expand All @@ -84,7 +84,7 @@ func Test_batchConsumer_process(t *testing.T) {
// Given
gotOnlyOneTimeException := true
bc := batchConsumer{
base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug)},
base: &base{metric: &ConsumerMetric{}, transactionalRetry: true, logger: NewZapLogger(LogLevelDebug)},
consumeFn: func(messages []*Message) error {
if gotOnlyOneTimeException {
gotOnlyOneTimeException = false
Expand All @@ -108,7 +108,7 @@ func Test_batchConsumer_process(t *testing.T) {
t.Run("When_Re-processing_Is_Failed_And_Retry_Disabled", func(t *testing.T) {
// Given
bc := batchConsumer{
base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug)},
base: &base{metric: &ConsumerMetric{}, transactionalRetry: true, logger: NewZapLogger(LogLevelDebug)},
consumeFn: func(messages []*Message) error {
return errors.New("error case")
},
Expand All @@ -129,7 +129,10 @@ func Test_batchConsumer_process(t *testing.T) {
// Given
mc := mockCronsumer{}
bc := batchConsumer{
base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug), retryEnabled: true, cronsumer: &mc},
base: &base{
metric: &ConsumerMetric{}, transactionalRetry: true,
logger: NewZapLogger(LogLevelDebug), retryEnabled: true, cronsumer: &mc,
},
consumeFn: func(messages []*Message) error {
return errors.New("error case")
},
Expand All @@ -150,8 +153,41 @@ func Test_batchConsumer_process(t *testing.T) {
// Given
mc := mockCronsumer{wantErr: true}
bc := batchConsumer{
base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug), retryEnabled: true, cronsumer: &mc},
base: &base{
metric: &ConsumerMetric{}, transactionalRetry: true,
logger: NewZapLogger(LogLevelDebug), retryEnabled: true, cronsumer: &mc,
},
consumeFn: func(messages []*Message) error {
return errors.New("error case")
},
}

// When
bc.process([]*Message{{}, {}, {}})

// Then
if bc.metric.TotalProcessedMessagesCounter != 0 {
t.Fatalf("Total Processed Message Counter must equal to 0")
}
if bc.metric.TotalUnprocessedMessagesCounter != 3 {
t.Fatalf("Total Unprocessed Message Counter must equal to 3")
}
})
t.Run("When_Transactional_Retry_Disabled", func(t *testing.T) {
// Given
mc := &mockCronsumer{wantErr: true}
bc := batchConsumer{
base: &base{
metric: &ConsumerMetric{},
logger: NewZapLogger(LogLevelDebug),
retryEnabled: true,
transactionalRetry: false,
cronsumer: mc,
},
consumeFn: func(messages []*Message) error {
messages[0].IsFailed = true
messages[1].IsFailed = true

return errors.New("error case")
},
}
Expand Down
29 changes: 0 additions & 29 deletions collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ type metricCollector struct {

totalUnprocessedMessagesCounter *prometheus.Desc
totalProcessedMessagesCounter *prometheus.Desc
// Deprecated: it will be removed next releases
totalUnprocessedBatchMessagesCounter *prometheus.Desc
// Deprecated: it will be removed next releases
totalProcessedBatchMessagesCounter *prometheus.Desc
}

func (s *metricCollector) Describe(ch chan<- *prometheus.Desc) {
Expand All @@ -37,19 +33,6 @@ func (s *metricCollector) Collect(ch chan<- prometheus.Metric) {
float64(s.consumerMetric.TotalUnprocessedMessagesCounter),
[]string{}...,
)
ch <- prometheus.MustNewConstMetric(
s.totalProcessedBatchMessagesCounter,
prometheus.CounterValue,
float64(s.consumerMetric.TotalProcessedMessagesCounter),
[]string{}...,
)

ch <- prometheus.MustNewConstMetric(
s.totalUnprocessedBatchMessagesCounter,
prometheus.CounterValue,
float64(s.consumerMetric.TotalUnprocessedMessagesCounter),
[]string{}...,
)
}

func newMetricCollector(consumerMetric *ConsumerMetric) *metricCollector {
Expand All @@ -68,18 +51,6 @@ func newMetricCollector(consumerMetric *ConsumerMetric) *metricCollector {
[]string{},
nil,
),
totalProcessedBatchMessagesCounter: prometheus.NewDesc(
prometheus.BuildFQName(Name, "processed_batch_messages_total", "current"),
"Total number of processed batch messages.",
[]string{},
nil,
),
totalUnprocessedBatchMessagesCounter: prometheus.NewDesc(
prometheus.BuildFQName(Name, "unprocessed_batch_messages_total", "current"),
"Total number of unprocessed batch messages.",
[]string{},
nil,
),
}
}

Expand Down
2 changes: 2 additions & 0 deletions consumer_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type base struct {
concurrency int
once sync.Once
retryEnabled bool
transactionalRetry bool
distributedTracingEnabled bool
propagator propagation.TextMapPropagator
}
Expand Down Expand Up @@ -73,6 +74,7 @@ func newBase(cfg *ConsumerConfig) (*base, error) {
quit: make(chan struct{}),
concurrency: cfg.Concurrency,
retryEnabled: cfg.RetryEnabled,
transactionalRetry: *cfg.TransactionalRetry,
distributedTracingEnabled: cfg.DistributedTracingEnabled,
logger: log,
subprocesses: newSubProcesses(),
Expand Down
8 changes: 8 additions & 0 deletions consumer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type ConsumerConfig struct {
Concurrency int
RetryEnabled bool
APIEnabled bool
TransactionalRetry *bool
}

func (cfg *ConsumerConfig) newCronsumerConfig() *kcronsumer.Config {
Expand Down Expand Up @@ -193,4 +194,11 @@ func (cfg *ConsumerConfig) setDefaults() {
cfg.DistributedTracingConfiguration.TracerProvider = otel.GetTracerProvider()
}
}
if cfg.TransactionalRetry == nil {
cfg.TransactionalRetry = NewBoolPtr(true)
}
}

func NewBoolPtr(value bool) *bool {
return &value
}
3 changes: 3 additions & 0 deletions consumer_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ func TestConsumerConfig_validate(t *testing.T) {
if cfg.Reader.CommitInterval != time.Second {
t.Fatalf("Reader Commit Interval default value must equal to 1s")
}
if *cfg.TransactionalRetry != true {
t.Fatal("Default Transactional Retry is true")
}
})
t.Run("Set_Defaults_When_Distributed_Tracing_Enabled", func(t *testing.T) {
// Given
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion examples/with-kafka-batch-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func main() {
}

// In order to load topic with data, use:
// kafka-console-producer --broker-list localhost:29092 --topic standart-topic < examples/with-kafka-batch-consumer/load.txt
// kafka-console-producer --broker-list localhost:29092 --topic standart-topic < examples/load.txt
func batchConsumeFn(messages []*kafka.Message) error {
fmt.Printf("%d\n comes first %s", len(messages), messages[0].Value)
return nil
Expand Down
58 changes: 58 additions & 0 deletions examples/with-kafka-transactional-retry-disabled/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package main

import (
"errors"
"fmt"
"github.com/Trendyol/kafka-konsumer"
"os"
"os/signal"
"time"
)

func main() {
consumerCfg := &kafka.ConsumerConfig{
Reader: kafka.ReaderConfig{
Brokers: []string{"localhost:29092"},
Topic: "standart-topic",
GroupID: "standart-cg",
},
BatchConfiguration: &kafka.BatchConfiguration{
MessageGroupLimit: 1000,
MessageGroupDuration: time.Second,
BatchConsumeFn: batchConsumeFn,
},
RetryEnabled: true,
TransactionalRetry: kafka.NewBoolPtr(false),
RetryConfiguration: kafka.RetryConfiguration{
Brokers: []string{"localhost:29092"},
Topic: "retry-topic",
StartTimeCron: "*/5 * * * *",
WorkDuration: 4 * time.Minute,
MaxRetry: 3,
},
}

consumer, _ := kafka.NewConsumer(consumerCfg)
defer consumer.Stop()

consumer.Consume()

fmt.Println("Consumer started...!")
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
}

// In order to load topic with data, use:
// kafka-console-producer --broker-list localhost:29092 --topic standart-topic < examples/load.txt
func batchConsumeFn(messages []*kafka.Message) error {
// you can add custom error handling here & flag messages
for i := range messages {
if i%2 == 0 {
messages[i].IsFailed = true
}
}

// you must return error here to retry only failed messages
return errors.New("err")
}
15 changes: 10 additions & 5 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@ type Message struct {
Partition int
Offset int64
HighWaterMark int64
Key []byte
Value []byte
Headers []Header
WriterData interface{}
Time time.Time

// IsFailed Is only used on transactional retry disabled
IsFailed bool

Key []byte
Value []byte
Headers []Header
WriterData interface{}
Time time.Time

// Context To enable distributed tracing support
Context context.Context
}
Expand Down
4 changes: 0 additions & 4 deletions metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,4 @@ package kafka
type ConsumerMetric struct {
TotalUnprocessedMessagesCounter int64
TotalProcessedMessagesCounter int64
// Deprecated
TotalUnprocessedBatchMessagesCounter int64
// Deprecated
TotalProcessedBatchMessagesCounter int64
}
2 changes: 1 addition & 1 deletion test/integration/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
)

require (
github.com/Trendyol/kafka-cronsumer v1.3.4 // indirect
github.com/Trendyol/kafka-cronsumer v1.4.4 // indirect
github.com/Trendyol/otel-kafka-konsumer v0.0.5 // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/ansrivas/fiberprometheus/v2 v2.6.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions test/integration/go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
github.com/Trendyol/kafka-cronsumer v1.3.4 h1:H1PmXfNtzCQm6pYsERUHlSTaib/WaICES+GJvl2RX8U=
github.com/Trendyol/kafka-cronsumer v1.3.4/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU=
github.com/Trendyol/kafka-cronsumer v1.4.4 h1:RfTpVyvxf+FjLxOJIHQXr6zrMjtba6PGUAYXLoGnVuE=
github.com/Trendyol/kafka-cronsumer v1.4.4/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU=
github.com/Trendyol/otel-kafka-konsumer v0.0.5 h1:i5Q6vR4ZRTtlb+uLimGJNBOQUiAtcbjn7Xc2FmPap/4=
github.com/Trendyol/otel-kafka-konsumer v0.0.5/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0=
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
Expand Down
Loading

0 comments on commit aa507a1

Please sign in to comment.