From aa507a19757b3bc4776d8ee18780c5bdd055b189 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?O=C4=9Fuzhan=20Y=C4=B1ld=C4=B1r=C4=B1m?= Date: Wed, 15 Nov 2023 08:58:33 +0300 Subject: [PATCH] feat: add non transactional retry feature for only handling failed messages * feat: add non transactional retry feature for only handling failed messages * chore: add tests * chore: lint * chore: lint * chore: add integration tests * chore: fix unit test * chore: fix integration test * chore: fix integration test * feat: little changes * feat: little changes --------- Co-authored-by: Abdulsametileri --- README.md | 54 +++++++++++++++-- batch_consumer.go | 14 ++++- batch_consumer_test.go | 46 +++++++++++++-- collector.go | 29 ---------- consumer_base.go | 2 + consumer_config.go | 8 +++ consumer_config_test.go | 3 + .../{with-kafka-batch-consumer => }/load.txt | 0 examples/with-kafka-batch-consumer/main.go | 2 +- .../main.go | 58 +++++++++++++++++++ message.go | 15 +++-- metric.go | 4 -- test/integration/go.mod | 2 +- test/integration/go.sum | 4 +- test/integration/integration_test.go | 58 +++++++++++++++++++ 15 files changed, 244 insertions(+), 55 deletions(-) rename examples/{with-kafka-batch-consumer => }/load.txt (100%) create mode 100644 examples/with-kafka-transactional-retry-disabled/main.go diff --git a/README.md b/README.md index 7e49f20..bfb3fa6 100644 --- a/README.md +++ b/README.md @@ -117,8 +117,55 @@ After running `docker-compose up` command, you can run any application you want. fmt.Printf("%d\n comes first %s", len(messages), messages[0].Value) return nil } + +
+ With Disabling Transactional Retry + + func main() { + consumerCfg := &kafka.ConsumerConfig{ + Reader: kafka.ReaderConfig{ + Brokers: []string{"localhost:29092"}, + Topic: "standart-topic", + GroupID: "standart-cg", + }, + LogLevel: kafka.LogLevelDebug, + RetryEnabled: true, + TransactionalRetry: kafka.NewBoolPtr(false), + RetryConfiguration: kafka.RetryConfiguration{ + Brokers: []string{"localhost:29092"}, + Topic: "retry-topic", + StartTimeCron: "*/1 * * * *", + WorkDuration: 50 * time.Second, + MaxRetry: 3, + }, + BatchConfiguration: kafka.BatchConfiguration{ + MessageGroupLimit: 1000, + MessageGroupDuration: time.Second, + BatchConsumeFn: batchConsumeFn, + }, + } + + consumer, _ := kafka.NewConsumer(consumerCfg) + defer consumer.Stop() + + consumer.Consume() + } + + func batchConsumeFn(messages []kafka.Message) error { + // you can add custom error handling here & flag messages + for i := range messages { + if i%2 == 0 { + messages[i].IsFailed = true + } + } + + // you must return err here to retry failed messages + return errors.New("err") + } + +
#### With Distributed Tracing Support @@ -147,6 +194,7 @@ under [the specified folder](examples/with-sasl-plaintext) and then start the ap | `logLevel` | Describes log level; valid options are `debug`, `info`, `warn`, and `error` | info | | `concurrency` | Number of goroutines used at listeners | 1 | | `retryEnabled` | Retry/Exception consumer is working or not | false | +| `transactionalRetry` | Set false if you want to use exception/retry strategy to only failed messages | true | | `commitInterval` | indicates the interval at which offsets are committed to the broker. | 1s | | `rack` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#RackAffinityGroupBalancer) | | | `clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.42#Dialer) | | @@ -192,8 +240,4 @@ Kafka Konsumer offers an API that handles exposing several metrics. | Metric Name | Description | Value Type | |---------------------------------------------------------|---------------------------------------------|------------| | kafka_konsumer_processed_messages_total_current | Total number of processed messages. | Counter | -| kafka_konsumer_unprocessed_messages_total_current | Total number of unprocessed messages. | Counter | -| kafka_konsumer_processed_batch_messages_total_current | Total number of processed batch messages. | Counter | -| kafka_konsumer_unprocessed_batch_messages_total_current | Total number of unprocessed batch messages. | Counter | - -**NOTE:** `kafka_konsumer_processed_batch_messages_total_current` and `kafka_konsumer_unprocessed_batch_messages_total_current` will be deprecated in the next releases. Please use `kafka_konsumer_processed_messages_total_current` and `kafka_konsumer_unprocessed_messages_total_current` instead. \ No newline at end of file +| kafka_konsumer_unprocessed_messages_total_current | Total number of unprocessed messages. | Counter | \ No newline at end of file diff --git a/batch_consumer.go b/batch_consumer.go index 8a3ecf8..5ea7255 100644 --- a/batch_consumer.go +++ b/batch_consumer.go @@ -93,7 +93,7 @@ func (b *batchConsumer) process(messages []*Message) { if consumeErr != nil { b.logger.Warnf("Consume Function Err %s, Messages will be retried", consumeErr.Error()) - // Try to process same messages again + // Try to process same messages again for resolving transient network errors etc. if consumeErr = b.consumeFn(messages); consumeErr != nil { b.logger.Warnf("Consume Function Again Err %s, messages are sending to exception/retry topic %s", consumeErr.Error(), b.retryTopic) b.metric.TotalUnprocessedMessagesCounter += int64(len(messages)) @@ -101,8 +101,16 @@ func (b *batchConsumer) process(messages []*Message) { if consumeErr != nil && b.retryEnabled { cronsumerMessages := make([]kcronsumer.Message, 0, len(messages)) - for i := range messages { - cronsumerMessages = append(cronsumerMessages, messages[i].toRetryableMessage(b.retryTopic)) + if b.transactionalRetry { + for i := range messages { + cronsumerMessages = append(cronsumerMessages, messages[i].toRetryableMessage(b.retryTopic)) + } + } else { + for i := range messages { + if messages[i].IsFailed { + cronsumerMessages = append(cronsumerMessages, messages[i].toRetryableMessage(b.retryTopic)) + } + } } if produceErr := b.base.cronsumer.ProduceBatch(cronsumerMessages); produceErr != nil { diff --git a/batch_consumer_test.go b/batch_consumer_test.go index ccd5918..555f750 100644 --- a/batch_consumer_test.go +++ b/batch_consumer_test.go @@ -63,7 +63,7 @@ func Test_batchConsumer_process(t *testing.T) { t.Run("When_Processing_Is_Successful", func(t *testing.T) { // Given bc := batchConsumer{ - base: &base{metric: &ConsumerMetric{}}, + base: &base{metric: &ConsumerMetric{}, transactionalRetry: true}, consumeFn: func([]*Message) error { return nil }, @@ -84,7 +84,7 @@ func Test_batchConsumer_process(t *testing.T) { // Given gotOnlyOneTimeException := true bc := batchConsumer{ - base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug)}, + base: &base{metric: &ConsumerMetric{}, transactionalRetry: true, logger: NewZapLogger(LogLevelDebug)}, consumeFn: func(messages []*Message) error { if gotOnlyOneTimeException { gotOnlyOneTimeException = false @@ -108,7 +108,7 @@ func Test_batchConsumer_process(t *testing.T) { t.Run("When_Re-processing_Is_Failed_And_Retry_Disabled", func(t *testing.T) { // Given bc := batchConsumer{ - base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug)}, + base: &base{metric: &ConsumerMetric{}, transactionalRetry: true, logger: NewZapLogger(LogLevelDebug)}, consumeFn: func(messages []*Message) error { return errors.New("error case") }, @@ -129,7 +129,10 @@ func Test_batchConsumer_process(t *testing.T) { // Given mc := mockCronsumer{} bc := batchConsumer{ - base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug), retryEnabled: true, cronsumer: &mc}, + base: &base{ + metric: &ConsumerMetric{}, transactionalRetry: true, + logger: NewZapLogger(LogLevelDebug), retryEnabled: true, cronsumer: &mc, + }, consumeFn: func(messages []*Message) error { return errors.New("error case") }, @@ -150,8 +153,41 @@ func Test_batchConsumer_process(t *testing.T) { // Given mc := mockCronsumer{wantErr: true} bc := batchConsumer{ - base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug), retryEnabled: true, cronsumer: &mc}, + base: &base{ + metric: &ConsumerMetric{}, transactionalRetry: true, + logger: NewZapLogger(LogLevelDebug), retryEnabled: true, cronsumer: &mc, + }, + consumeFn: func(messages []*Message) error { + return errors.New("error case") + }, + } + + // When + bc.process([]*Message{{}, {}, {}}) + + // Then + if bc.metric.TotalProcessedMessagesCounter != 0 { + t.Fatalf("Total Processed Message Counter must equal to 0") + } + if bc.metric.TotalUnprocessedMessagesCounter != 3 { + t.Fatalf("Total Unprocessed Message Counter must equal to 3") + } + }) + t.Run("When_Transactional_Retry_Disabled", func(t *testing.T) { + // Given + mc := &mockCronsumer{wantErr: true} + bc := batchConsumer{ + base: &base{ + metric: &ConsumerMetric{}, + logger: NewZapLogger(LogLevelDebug), + retryEnabled: true, + transactionalRetry: false, + cronsumer: mc, + }, consumeFn: func(messages []*Message) error { + messages[0].IsFailed = true + messages[1].IsFailed = true + return errors.New("error case") }, } diff --git a/collector.go b/collector.go index 4b11555..50f1d0a 100644 --- a/collector.go +++ b/collector.go @@ -13,10 +13,6 @@ type metricCollector struct { totalUnprocessedMessagesCounter *prometheus.Desc totalProcessedMessagesCounter *prometheus.Desc - // Deprecated: it will be removed next releases - totalUnprocessedBatchMessagesCounter *prometheus.Desc - // Deprecated: it will be removed next releases - totalProcessedBatchMessagesCounter *prometheus.Desc } func (s *metricCollector) Describe(ch chan<- *prometheus.Desc) { @@ -37,19 +33,6 @@ func (s *metricCollector) Collect(ch chan<- prometheus.Metric) { float64(s.consumerMetric.TotalUnprocessedMessagesCounter), []string{}..., ) - ch <- prometheus.MustNewConstMetric( - s.totalProcessedBatchMessagesCounter, - prometheus.CounterValue, - float64(s.consumerMetric.TotalProcessedMessagesCounter), - []string{}..., - ) - - ch <- prometheus.MustNewConstMetric( - s.totalUnprocessedBatchMessagesCounter, - prometheus.CounterValue, - float64(s.consumerMetric.TotalUnprocessedMessagesCounter), - []string{}..., - ) } func newMetricCollector(consumerMetric *ConsumerMetric) *metricCollector { @@ -68,18 +51,6 @@ func newMetricCollector(consumerMetric *ConsumerMetric) *metricCollector { []string{}, nil, ), - totalProcessedBatchMessagesCounter: prometheus.NewDesc( - prometheus.BuildFQName(Name, "processed_batch_messages_total", "current"), - "Total number of processed batch messages.", - []string{}, - nil, - ), - totalUnprocessedBatchMessagesCounter: prometheus.NewDesc( - prometheus.BuildFQName(Name, "unprocessed_batch_messages_total", "current"), - "Total number of unprocessed batch messages.", - []string{}, - nil, - ), } } diff --git a/consumer_base.go b/consumer_base.go index 1ff1365..51b6a8c 100644 --- a/consumer_base.go +++ b/consumer_base.go @@ -46,6 +46,7 @@ type base struct { concurrency int once sync.Once retryEnabled bool + transactionalRetry bool distributedTracingEnabled bool propagator propagation.TextMapPropagator } @@ -73,6 +74,7 @@ func newBase(cfg *ConsumerConfig) (*base, error) { quit: make(chan struct{}), concurrency: cfg.Concurrency, retryEnabled: cfg.RetryEnabled, + transactionalRetry: *cfg.TransactionalRetry, distributedTracingEnabled: cfg.DistributedTracingEnabled, logger: log, subprocesses: newSubProcesses(), diff --git a/consumer_config.go b/consumer_config.go index 79ce01a..d73976c 100644 --- a/consumer_config.go +++ b/consumer_config.go @@ -44,6 +44,7 @@ type ConsumerConfig struct { Concurrency int RetryEnabled bool APIEnabled bool + TransactionalRetry *bool } func (cfg *ConsumerConfig) newCronsumerConfig() *kcronsumer.Config { @@ -193,4 +194,11 @@ func (cfg *ConsumerConfig) setDefaults() { cfg.DistributedTracingConfiguration.TracerProvider = otel.GetTracerProvider() } } + if cfg.TransactionalRetry == nil { + cfg.TransactionalRetry = NewBoolPtr(true) + } +} + +func NewBoolPtr(value bool) *bool { + return &value } diff --git a/consumer_config_test.go b/consumer_config_test.go index 6404d42..0821846 100644 --- a/consumer_config_test.go +++ b/consumer_config_test.go @@ -23,6 +23,9 @@ func TestConsumerConfig_validate(t *testing.T) { if cfg.Reader.CommitInterval != time.Second { t.Fatalf("Reader Commit Interval default value must equal to 1s") } + if *cfg.TransactionalRetry != true { + t.Fatal("Default Transactional Retry is true") + } }) t.Run("Set_Defaults_When_Distributed_Tracing_Enabled", func(t *testing.T) { // Given diff --git a/examples/with-kafka-batch-consumer/load.txt b/examples/load.txt similarity index 100% rename from examples/with-kafka-batch-consumer/load.txt rename to examples/load.txt diff --git a/examples/with-kafka-batch-consumer/main.go b/examples/with-kafka-batch-consumer/main.go index bbd550f..ed24f17 100644 --- a/examples/with-kafka-batch-consumer/main.go +++ b/examples/with-kafka-batch-consumer/main.go @@ -42,7 +42,7 @@ func main() { } // In order to load topic with data, use: -// kafka-console-producer --broker-list localhost:29092 --topic standart-topic < examples/with-kafka-batch-consumer/load.txt +// kafka-console-producer --broker-list localhost:29092 --topic standart-topic < examples/load.txt func batchConsumeFn(messages []*kafka.Message) error { fmt.Printf("%d\n comes first %s", len(messages), messages[0].Value) return nil diff --git a/examples/with-kafka-transactional-retry-disabled/main.go b/examples/with-kafka-transactional-retry-disabled/main.go new file mode 100644 index 0000000..f1783c7 --- /dev/null +++ b/examples/with-kafka-transactional-retry-disabled/main.go @@ -0,0 +1,58 @@ +package main + +import ( + "errors" + "fmt" + "github.com/Trendyol/kafka-konsumer" + "os" + "os/signal" + "time" +) + +func main() { + consumerCfg := &kafka.ConsumerConfig{ + Reader: kafka.ReaderConfig{ + Brokers: []string{"localhost:29092"}, + Topic: "standart-topic", + GroupID: "standart-cg", + }, + BatchConfiguration: &kafka.BatchConfiguration{ + MessageGroupLimit: 1000, + MessageGroupDuration: time.Second, + BatchConsumeFn: batchConsumeFn, + }, + RetryEnabled: true, + TransactionalRetry: kafka.NewBoolPtr(false), + RetryConfiguration: kafka.RetryConfiguration{ + Brokers: []string{"localhost:29092"}, + Topic: "retry-topic", + StartTimeCron: "*/5 * * * *", + WorkDuration: 4 * time.Minute, + MaxRetry: 3, + }, + } + + consumer, _ := kafka.NewConsumer(consumerCfg) + defer consumer.Stop() + + consumer.Consume() + + fmt.Println("Consumer started...!") + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + <-c +} + +// In order to load topic with data, use: +// kafka-console-producer --broker-list localhost:29092 --topic standart-topic < examples/load.txt +func batchConsumeFn(messages []*kafka.Message) error { + // you can add custom error handling here & flag messages + for i := range messages { + if i%2 == 0 { + messages[i].IsFailed = true + } + } + + // you must return error here to retry only failed messages + return errors.New("err") +} diff --git a/message.go b/message.go index 1119eae..363e536 100644 --- a/message.go +++ b/message.go @@ -16,11 +16,16 @@ type Message struct { Partition int Offset int64 HighWaterMark int64 - Key []byte - Value []byte - Headers []Header - WriterData interface{} - Time time.Time + + // IsFailed Is only used on transactional retry disabled + IsFailed bool + + Key []byte + Value []byte + Headers []Header + WriterData interface{} + Time time.Time + // Context To enable distributed tracing support Context context.Context } diff --git a/metric.go b/metric.go index 1852238..61afb8f 100644 --- a/metric.go +++ b/metric.go @@ -3,8 +3,4 @@ package kafka type ConsumerMetric struct { TotalUnprocessedMessagesCounter int64 TotalProcessedMessagesCounter int64 - // Deprecated - TotalUnprocessedBatchMessagesCounter int64 - // Deprecated - TotalProcessedBatchMessagesCounter int64 } diff --git a/test/integration/go.mod b/test/integration/go.mod index e70f23e..83752eb 100644 --- a/test/integration/go.mod +++ b/test/integration/go.mod @@ -10,7 +10,7 @@ require ( ) require ( - github.com/Trendyol/kafka-cronsumer v1.3.4 // indirect + github.com/Trendyol/kafka-cronsumer v1.4.4 // indirect github.com/Trendyol/otel-kafka-konsumer v0.0.5 // indirect github.com/andybalholm/brotli v1.0.5 // indirect github.com/ansrivas/fiberprometheus/v2 v2.6.1 // indirect diff --git a/test/integration/go.sum b/test/integration/go.sum index 7d89770..1921925 100644 --- a/test/integration/go.sum +++ b/test/integration/go.sum @@ -1,5 +1,5 @@ -github.com/Trendyol/kafka-cronsumer v1.3.4 h1:H1PmXfNtzCQm6pYsERUHlSTaib/WaICES+GJvl2RX8U= -github.com/Trendyol/kafka-cronsumer v1.3.4/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= +github.com/Trendyol/kafka-cronsumer v1.4.4 h1:RfTpVyvxf+FjLxOJIHQXr6zrMjtba6PGUAYXLoGnVuE= +github.com/Trendyol/kafka-cronsumer v1.4.4/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= github.com/Trendyol/otel-kafka-konsumer v0.0.5 h1:i5Q6vR4ZRTtlb+uLimGJNBOQUiAtcbjn7Xc2FmPap/4= github.com/Trendyol/otel-kafka-konsumer v0.0.5/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index e33756b..b674867 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -154,6 +154,64 @@ func Test_Should_Batch_Consume_Messages_Successfully(t *testing.T) { } } +func Test_Should_Batch_Retry_Only_Failed_Messages_When_Transactional_Retry_Is_Disabled(t *testing.T) { + // Given + topic := "nontransactional-cronsumer-topic" + consumerGroup := "nontransactional-cronsumer-cg" + brokerAddress := "localhost:9092" + + retryTopic := "retry-topic" + + _, cleanUp := createTopicAndWriteMessages(t, topic, []segmentio.Message{ + {Topic: topic, Partition: 0, Offset: 1, Key: []byte("1"), Value: []byte(`foo1`)}, + {Topic: topic, Partition: 0, Offset: 2, Key: []byte("2"), Value: []byte(`foo2`)}, + {Topic: topic, Partition: 0, Offset: 3, Key: []byte("3"), Value: []byte(`foo3`)}, + {Topic: topic, Partition: 0, Offset: 4, Key: []byte("4"), Value: []byte(`foo4`)}, + {Topic: topic, Partition: 0, Offset: 5, Key: []byte("5"), Value: []byte(`foo5`)}, + }) + defer cleanUp() + + retryConn, cleanUpThisToo := createTopicAndWriteMessages(t, retryTopic, nil) + defer cleanUpThisToo() + + consumerCfg := &kafka.ConsumerConfig{ + TransactionalRetry: kafka.NewBoolPtr(false), + Reader: kafka.ReaderConfig{Brokers: []string{brokerAddress}, Topic: topic, GroupID: consumerGroup}, + RetryEnabled: true, + RetryConfiguration: kafka.RetryConfiguration{ + Brokers: []string{brokerAddress}, + Topic: retryTopic, + StartTimeCron: "*/1 * * * *", + WorkDuration: 50 * time.Second, + MaxRetry: 3, + LogLevel: "error", + }, + BatchConfiguration: &kafka.BatchConfiguration{ + MessageGroupLimit: 100, + MessageGroupDuration: time.Second, + BatchConsumeFn: func(messages []*kafka.Message) error { + messages[1].IsFailed = true + return errors.New("err") + }, + }, + LogLevel: kafka.LogLevelError, + } + + consumer, _ := kafka.NewConsumer(consumerCfg) + defer consumer.Stop() + + consumer.Consume() + + // Then + var expectedOffset int64 = 1 + conditionFunc := func() bool { + lastOffset, _ := retryConn.ReadLastOffset() + return lastOffset == expectedOffset + } + + assertEventually(t, conditionFunc, 45*time.Second, time.Second) +} + func Test_Should_Integrate_With_Kafka_Cronsumer_Successfully(t *testing.T) { // Given topic := "cronsumer-topic"