Skip to content

Commit

Permalink
feat: add preBatch feature (#83)
Browse files Browse the repository at this point in the history
* feat: add preBatch feature

* chore: fix test

* chore: add integration test

* chore: add preBatch unit test

* chore: add consumer base fetch err unit test

* chore: EOF log from error to warn

* chore: fix transactionalRetry integration test flakyness

* chore: dummy commit for pipeline

* chore: fix modules

* chore: fix lint

* chore: readme typo fix
  • Loading branch information
A.Samet İleri authored Dec 18, 2023
1 parent 9f103dd commit cab3410
Show file tree
Hide file tree
Showing 13 changed files with 285 additions and 31 deletions.
20 changes: 11 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,26 +212,26 @@ under [the specified folder](examples/with-sasl-plaintext) and then start the ap

| config | description | default |
|--------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------|-----------------------------|
| `reader` | [Describes all segmentio kafka reader configurations](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.42#ReaderConfig) | |
| `reader` | [Describes all segmentio kafka reader configurations](https://pkg.go.dev/github.com/segmentio/kafka-go#ReaderConfig) | |
| `consumeFn` | Kafka consumer function, if retry enabled it, is also used to consume retriable messages | |
| `logLevel` | Describes log level; valid options are `debug`, `info`, `warn`, and `error` | info |
| `concurrency` | Number of goroutines used at listeners | 1 |
| `retryEnabled` | Retry/Exception consumer is working or not | false |
| `transactionalRetry` | Set false if you want to use exception/retry strategy to only failed messages | true |
| `commitInterval` | indicates the interval at which offsets are committed to the broker. | 1s |
| `rack` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#RackAffinityGroupBalancer) | |
| `clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.42#Dialer) | |
| `clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#Dialer) | |
| `messageGroupDuration` | Maximum time to wait for a batch | |
| `dial.Timeout` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.42#Dialer) | no timeout |
| `dial.KeepAlive` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.42#Dialer) | not enabled |
| `transport.DialTimeout ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.42#Transport) | 5s |
| `transport.IdleTimeout ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.42#Transport) | 30s |
| `transport.MetadataTTL ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.42#Transport) | 6s |
| `transport.MetadataTopics ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.42#Transport) | all topics in cluster |
| `dial.Timeout` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#Dialer) | no timeout |
| `dial.KeepAlive` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#Dialer) | not enabled |
| `transport.DialTimeout ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@#Transport) | 5s |
| `transport.IdleTimeout ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@#Transport) | 30s |
| `transport.MetadataTTL ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@#Transport) | 6s |
| `transport.MetadataTopics ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@#Transport) | all topics in cluster |
| `distributedTracingEnabled` | indicates open telemetry support on/off for consume and produce operations. | false |
| `distributedTracingConfiguration.TracerProvider` | [see doc](https://opentelemetry.io/docs/specs/otel/trace/api/) | otel.GetTracerProvider() |
| `distributedTracingConfiguration.Propagator` | [see doc](https://opentelemetry.io/docs/specs/otel/trace/api/) | otel.GetTextMapPropagator() |
| `retryConfiguration.clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.42#Transport) | |
| `retryConfiguration.clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@#Transport) | |
| `retryConfiguration.startTimeCron` | Cron expression when retry consumer ([kafka-cronsumer](https://github.com/Trendyol/kafka-cronsumer#configurations)) starts to work at | |
| `retryConfiguration.workDuration` | Work duration exception consumer actively consuming messages | |
| `retryConfiguration.topic` | Retry/Exception topic names | |
Expand All @@ -243,6 +243,8 @@ under [the specified folder](examples/with-sasl-plaintext) and then start the ap
| `retryConfiguration.sasl.username` | SCRAM OR PLAIN username | |
| `retryConfiguration.sasl.password` | SCRAM OR PLAIN password | |
| `batchConfiguration.messageGroupLimit` | Maximum number of messages in a batch | |
| `batchConfiguration.batchConsumeFn` | Kafka batch consumer function, if retry enabled it, is also used to consume retriable messages | |
| `batchConfiguration.preBatchFn` | This function enable for transforming messages before batch consuming starts | |
| `tls.rootCAPath` | [see doc](https://pkg.go.dev/crypto/tls#Config.RootCAs) | "" |
| `tls.intermediateCAPath` | Same with rootCA, if you want to specify two rootca you can use it with rootCAPath | "" |
| `sasl.authType` | `SCRAM` or `PLAIN` | |
Expand Down
12 changes: 10 additions & 2 deletions batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import (
type batchConsumer struct {
*base

consumeFn func([]*Message) error
consumeFn BatchConsumeFn
preBatchFn PreBatchFn

messageGroupLimit int
}
Expand All @@ -25,6 +26,7 @@ func newBatchConsumer(cfg *ConsumerConfig) (Consumer, error) {
c := batchConsumer{
base: consumerBase,
consumeFn: cfg.BatchConfiguration.BatchConsumeFn,
preBatchFn: cfg.BatchConfiguration.PreBatchFn,
messageGroupLimit: cfg.BatchConfiguration.MessageGroupLimit,
}

Expand All @@ -47,9 +49,11 @@ func (b *batchConsumer) GetMetric() *ConsumerMetric {

func (b *batchConsumer) Consume() {
go b.subprocesses.Start()

b.wg.Add(1)
go b.startConsume()

b.wg.Add(b.concurrency)
b.setupConcurrentWorkers()

b.wg.Add(1)
Expand Down Expand Up @@ -92,7 +96,6 @@ func (b *batchConsumer) startBatch() {

func (b *batchConsumer) setupConcurrentWorkers() {
for i := 0; i < b.concurrency; i++ {
b.wg.Add(1)
go func() {
defer b.wg.Done()
for messages := range b.batchConsumingStream {
Expand Down Expand Up @@ -125,6 +128,11 @@ func chunkMessages(allMessages *[]*Message, chunkSize int) [][]*Message {
func (b *batchConsumer) consume(allMessages *[]*Message, commitMessages *[]kafka.Message) {
chunks := chunkMessages(allMessages, b.messageGroupLimit)

if b.preBatchFn != nil {
preBatchResult := b.preBatchFn(*allMessages)
chunks = chunkMessages(&preBatchResult, b.messageGroupLimit)
}

// Send the messages to process
for _, chunk := range chunks {
b.batchConsumingStream <- chunk
Expand Down
60 changes: 59 additions & 1 deletion batch_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func Test_batchConsumer_startBatch(t *testing.T) {
close(bc.base.incomingMessageStream)
}()

bc.base.wg.Add(1)
bc.base.wg.Add(1 + bc.base.concurrency)

// When
bc.setupConcurrentWorkers()
Expand All @@ -68,6 +68,64 @@ func Test_batchConsumer_startBatch(t *testing.T) {
}
}

func Test_batchConsumer_startBatch_with_preBatch(t *testing.T) {
// Given
var numberOfBatch int

mc := mockReader{}
bc := batchConsumer{
base: &base{
incomingMessageStream: make(chan *Message, 1),
batchConsumingStream: make(chan []*Message, 1),
singleConsumingStream: make(chan *Message, 1),
messageProcessedStream: make(chan struct{}, 1),
metric: &ConsumerMetric{},
wg: sync.WaitGroup{},
messageGroupDuration: 500 * time.Millisecond,
r: &mc,
concurrency: 1,
},
messageGroupLimit: 2,
consumeFn: func(messages []*Message) error {
numberOfBatch++
return nil
},
preBatchFn: func(messages []*Message) []*Message {
return messages[:1]
},
}
go func() {
// Simulate messageGroupLimit
bc.base.incomingMessageStream <- &Message{}
bc.base.incomingMessageStream <- &Message{}

time.Sleep(1 * time.Second)

// Simulate messageGroupDuration
bc.base.incomingMessageStream <- &Message{}
bc.base.incomingMessageStream <- &Message{}

time.Sleep(1 * time.Second)

// Return from startBatch
close(bc.base.incomingMessageStream)
}()

bc.base.wg.Add(1 + bc.base.concurrency)

// When
bc.setupConcurrentWorkers()
bc.startBatch()

// Then
if numberOfBatch != 2 {
t.Fatalf("Number of batch group must equal to 2")
}
if bc.metric.TotalProcessedMessagesCounter != 2 {
t.Fatalf("Total Processed Message Counter must equal to 2")
}
}

func Test_batchConsumer_process(t *testing.T) {
t.Run("When_Processing_Is_Successful", func(t *testing.T) {
// Given
Expand Down
5 changes: 2 additions & 3 deletions consumer_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func (c *base) startConsume() {
for {
select {
case <-c.quit:
close(c.incomingMessageStream)
return
default:
m := kafkaMessagePool.Get().(*kafka.Message)
Expand All @@ -134,7 +135,7 @@ func (c *base) startConsume() {
if c.context.Err() != nil {
continue
}
c.logger.Errorf("Message could not read, err %s", err.Error())
c.logger.Warnf("Message could not read, err %s", err.Error())
continue
}

Expand All @@ -159,9 +160,7 @@ func (c *base) Stop() error {
c.subprocesses.Stop()
c.cancelFn()
c.quit <- struct{}{}
close(c.incomingMessageStream)
c.wg.Wait()

err = c.r.Close()
})

Expand Down
4 changes: 4 additions & 0 deletions consumer_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"sync"
"testing"
"time"

"github.com/google/go-cmp/cmp"

Expand All @@ -26,6 +27,9 @@ func Test_base_startConsume(t *testing.T) {

// When
go b.startConsume()

// Ensure some time passes
time.Sleep(3 * time.Second)
b.quit <- struct{}{}

// Then
Expand Down
3 changes: 3 additions & 0 deletions consumer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ type ReaderConfig kafka.ReaderConfig

type BatchConsumeFn func([]*Message) error

type PreBatchFn func([]*Message) []*Message

type ConsumeFn func(*Message) error

type DialConfig struct {
Expand Down Expand Up @@ -124,6 +126,7 @@ type RetryConfiguration struct {

type BatchConfiguration struct {
BatchConsumeFn BatchConsumeFn
PreBatchFn PreBatchFn
MessageGroupLimit int
}

Expand Down
90 changes: 90 additions & 0 deletions examples/with-kafka-batch-consumer-with-prebatch/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package main

import (
"encoding/json"
"fmt"
"github.com/Trendyol/kafka-konsumer/v2"
"os"
"os/signal"
"time"
)

func main() {
consumerCfg := &kafka.ConsumerConfig{
Reader: kafka.ReaderConfig{
Brokers: []string{"localhost:29092"},
Topic: "standart-topic",
GroupID: "standart-cg",
},
BatchConfiguration: &kafka.BatchConfiguration{
MessageGroupLimit: 1000,
BatchConsumeFn: batchConsumeFn,
PreBatchFn: preBatch,
},
RetryEnabled: true,
RetryConfiguration: kafka.RetryConfiguration{
Brokers: []string{"localhost:29092"},
Topic: "retry-topic",
StartTimeCron: "*/1 * * * *",
WorkDuration: 50 * time.Second,
MaxRetry: 3,
},
MessageGroupDuration: time.Second,
}

consumer, _ := kafka.NewConsumer(consumerCfg)
defer consumer.Stop()

consumer.Consume()

fmt.Println("Consumer started...!")
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
}

type MessageValue struct {
Id int `json:"id"`
Version int `json:"version"`
Payload string `json:"payload"`
KafkaMessage *kafka.Message `json:"-"`
}

func preBatch(messages []*kafka.Message) []*kafka.Message {
latestMessageById := getLatestMessageByID(messages)

return convertLatestMessageList(latestMessageById)
}

func getLatestMessageByID(messages []*kafka.Message) map[int]MessageValue {
latestMessageById := make(map[int]MessageValue, len(messages))
for i := range messages {
var mv MessageValue
json.Unmarshal(messages[i].Value, &mv)
mv.KafkaMessage = messages[i]

val, ok := latestMessageById[mv.Id]
if !ok {
latestMessageById[mv.Id] = mv
} else if mv.Version > val.Version {
latestMessageById[mv.Id] = mv
}
}
return latestMessageById
}

func convertLatestMessageList(latestMessageById map[int]MessageValue) []*kafka.Message {
result := make([]*kafka.Message, 0, len(latestMessageById))
for _, latestMessage := range latestMessageById {
result = append(result, latestMessage.KafkaMessage)
}
return result
}

// In order to load topic with data, use:
// kafka-console-producer --broker-list localhost:29092 --topic standart-topic < examples/with-kafka-batch-consumer-with-prebatch/testdata/messages.txt
func batchConsumeFn(messages []*kafka.Message) error {
fmt.Printf("length of %d messages, first message value is %s \n", len(messages), messages[0].Value)

return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{ "id": 1, "version": 1, "payload": "foo" }
{ "id": 1, "version": 2, "payload": "foobar" }
{ "id": 1, "version": 3, "payload": "foobarfoobar" }
{ "id": 2, "version": 1, "payload": "x" }
{ "id": 3, "version": 2, "payload": "y" }
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ require (
github.com/Trendyol/otel-kafka-konsumer v0.0.7
github.com/ansrivas/fiberprometheus/v2 v2.6.1
github.com/gofiber/fiber/v2 v2.50.0
github.com/google/go-cmp v0.6.0
github.com/prometheus/client_golang v1.16.0
github.com/segmentio/kafka-go v0.4.46
github.com/segmentio/kafka-go v0.4.47
go.opentelemetry.io/otel v1.19.0
go.opentelemetry.io/otel/trace v1.19.0
go.uber.org/zap v1.24.0
Expand All @@ -22,7 +23,6 @@ require (
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gofiber/adaptor/v2 v2.2.1 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/uuid v1.3.1 // indirect
github.com/klauspost/compress v1.17.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
Expand Down
5 changes: 2 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4=
Expand Down Expand Up @@ -64,8 +63,8 @@ github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis=
github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/segmentio/kafka-go v0.4.46 h1:Sx8/kvtY+/G8nM0roTNnFezSJj3bT2sW0Xy/YY3CgBI=
github.com/segmentio/kafka-go v0.4.46/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg=
github.com/segmentio/kafka-go v0.4.47 h1:IqziR4pA3vrZq7YdRxaT3w1/5fvIH5qpCwstUanQQB0=
github.com/segmentio/kafka-go v0.4.47/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
Expand Down
2 changes: 1 addition & 1 deletion test/integration/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ replace github.com/Trendyol/kafka-konsumer/v2 => ../..

require (
github.com/Trendyol/kafka-konsumer/v2 v2.0.4
github.com/segmentio/kafka-go v0.4.46
github.com/segmentio/kafka-go v0.4.47
)

require (
Expand Down
Loading

0 comments on commit cab3410

Please sign in to comment.