Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add preBatch feature #83

Merged
merged 12 commits into from
Dec 18, 2023
20 changes: 11 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,26 +212,26 @@ under [the specified folder](examples/with-sasl-plaintext) and then start the ap

| config | description | default |
|--------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------|-----------------------------|
| `reader` | [Describes all segmentio kafka reader configurations](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.42#ReaderConfig) | |
| `reader` | [Describes all segmentio kafka reader configurations](https://pkg.go.dev/github.com/segmentio/kafka-go#ReaderConfig) | |
| `consumeFn` | Kafka consumer function, if retry enabled it, is also used to consume retriable messages | |
| `logLevel` | Describes log level; valid options are `debug`, `info`, `warn`, and `error` | info |
| `concurrency` | Number of goroutines used at listeners | 1 |
| `retryEnabled` | Retry/Exception consumer is working or not | false |
| `transactionalRetry` | Set false if you want to use exception/retry strategy to only failed messages | true |
| `commitInterval` | indicates the interval at which offsets are committed to the broker. | 1s |
| `rack` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#RackAffinityGroupBalancer) | |
| `clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.42#Dialer) | |
| `clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#Dialer) | |
| `messageGroupDuration` | Maximum time to wait for a batch | |
| `dial.Timeout` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.42#Dialer) | no timeout |
| `dial.KeepAlive` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.42#Dialer) | not enabled |
| `transport.DialTimeout ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.42#Transport) | 5s |
| `transport.IdleTimeout ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.42#Transport) | 30s |
| `transport.MetadataTTL ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.42#Transport) | 6s |
| `transport.MetadataTopics ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.42#Transport) | all topics in cluster |
| `dial.Timeout` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#Dialer) | no timeout |
| `dial.KeepAlive` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#Dialer) | not enabled |
| `transport.DialTimeout ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@#Transport) | 5s |
| `transport.IdleTimeout ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@#Transport) | 30s |
| `transport.MetadataTTL ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@#Transport) | 6s |
| `transport.MetadataTopics ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@#Transport) | all topics in cluster |
| `distributedTracingEnabled` | indicates open telemetry support on/off for consume and produce operations. | false |
| `distributedTracingConfiguration.TracerProvider` | [see doc](https://opentelemetry.io/docs/specs/otel/trace/api/) | otel.GetTracerProvider() |
| `distributedTracingConfiguration.Propagator` | [see doc](https://opentelemetry.io/docs/specs/otel/trace/api/) | otel.GetTextMapPropagator() |
| `retryConfiguration.clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.42#Transport) | |
| `retryConfiguration.clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@#Transport) | |
| `retryConfiguration.startTimeCron` | Cron expression when retry consumer ([kafka-cronsumer](https://github.com/Trendyol/kafka-cronsumer#configurations)) starts to work at | |
| `retryConfiguration.workDuration` | Work duration exception consumer actively consuming messages | |
| `retryConfiguration.topic` | Retry/Exception topic names | |
Expand All @@ -243,6 +243,8 @@ under [the specified folder](examples/with-sasl-plaintext) and then start the ap
| `retryConfiguration.sasl.username` | SCRAM OR PLAIN username | |
| `retryConfiguration.sasl.password` | SCRAM OR PLAIN password | |
| `batchConfiguration.messageGroupLimit` | Maximum number of messages in a batch | |
| `batchConfiguration.batchConsumeFn` | Kafka batch consumer function, if retry enabled it, is also used to consume retriable messages | |
emreodabas marked this conversation as resolved.
Show resolved Hide resolved
| `batchConfiguration.preBatchFn` | This function enable for transforming messages before batch consuming starts | |
| `tls.rootCAPath` | [see doc](https://pkg.go.dev/crypto/tls#Config.RootCAs) | "" |
| `tls.intermediateCAPath` | Same with rootCA, if you want to specify two rootca you can use it with rootCAPath | "" |
| `sasl.authType` | `SCRAM` or `PLAIN` | |
Expand Down
12 changes: 10 additions & 2 deletions batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
type batchConsumer struct {
*base

consumeFn func([]*Message) error
consumeFn BatchConsumeFn
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already defined func([]*Message) error alias for this.

preBatchFn PreBatchFn

messageGroupLimit int
}
Expand All @@ -25,6 +26,7 @@
c := batchConsumer{
base: consumerBase,
consumeFn: cfg.BatchConfiguration.BatchConsumeFn,
preBatchFn: cfg.BatchConfiguration.PreBatchFn,

Check warning on line 29 in batch_consumer.go

View check run for this annotation

Codecov / codecov/patch

batch_consumer.go#L29

Added line #L29 was not covered by tests
messageGroupLimit: cfg.BatchConfiguration.MessageGroupLimit,
}

Expand All @@ -47,9 +49,11 @@

func (b *batchConsumer) Consume() {
go b.subprocesses.Start()

b.wg.Add(1)
go b.startConsume()

b.wg.Add(b.concurrency)

Check warning on line 56 in batch_consumer.go

View check run for this annotation

Codecov / codecov/patch

batch_consumer.go#L56

Added line #L56 was not covered by tests
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For more readability, I extract wg.Add within setupConcurrentWorkers method here

b.setupConcurrentWorkers()

b.wg.Add(1)
Expand Down Expand Up @@ -92,7 +96,6 @@

func (b *batchConsumer) setupConcurrentWorkers() {
for i := 0; i < b.concurrency; i++ {
b.wg.Add(1)
go func() {
defer b.wg.Done()
for messages := range b.batchConsumingStream {
Expand Down Expand Up @@ -125,6 +128,11 @@
func (b *batchConsumer) consume(allMessages *[]*Message, commitMessages *[]kafka.Message) {
chunks := chunkMessages(allMessages, b.messageGroupLimit)

if b.preBatchFn != nil {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to let you know, I only changed chunks, not all messages. Because allMessages is used for commitMessages. If I mutate, I have some issues of offset/lag, etc.

preBatchResult := b.preBatchFn(*allMessages)
chunks = chunkMessages(&preBatchResult, b.messageGroupLimit)
}

// Send the messages to process
for _, chunk := range chunks {
b.batchConsumingStream <- chunk
Expand Down
60 changes: 59 additions & 1 deletion batch_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func Test_batchConsumer_startBatch(t *testing.T) {
close(bc.base.incomingMessageStream)
}()

bc.base.wg.Add(1)
bc.base.wg.Add(1 + bc.base.concurrency)

// When
bc.setupConcurrentWorkers()
Expand All @@ -68,6 +68,64 @@ func Test_batchConsumer_startBatch(t *testing.T) {
}
}

func Test_batchConsumer_startBatch_with_preBatch(t *testing.T) {
// Given
var numberOfBatch int

mc := mockReader{}
bc := batchConsumer{
base: &base{
incomingMessageStream: make(chan *Message, 1),
batchConsumingStream: make(chan []*Message, 1),
singleConsumingStream: make(chan *Message, 1),
messageProcessedStream: make(chan struct{}, 1),
metric: &ConsumerMetric{},
wg: sync.WaitGroup{},
messageGroupDuration: 500 * time.Millisecond,
r: &mc,
concurrency: 1,
},
messageGroupLimit: 2,
consumeFn: func(messages []*Message) error {
numberOfBatch++
return nil
},
preBatchFn: func(messages []*Message) []*Message {
return messages[:1]
},
}
go func() {
// Simulate messageGroupLimit
bc.base.incomingMessageStream <- &Message{}
bc.base.incomingMessageStream <- &Message{}

time.Sleep(1 * time.Second)

// Simulate messageGroupDuration
bc.base.incomingMessageStream <- &Message{}
bc.base.incomingMessageStream <- &Message{}

time.Sleep(1 * time.Second)

// Return from startBatch
close(bc.base.incomingMessageStream)
}()

bc.base.wg.Add(1 + bc.base.concurrency)

// When
bc.setupConcurrentWorkers()
bc.startBatch()

// Then
if numberOfBatch != 2 {
t.Fatalf("Number of batch group must equal to 2")
}
if bc.metric.TotalProcessedMessagesCounter != 2 {
t.Fatalf("Total Processed Message Counter must equal to 2")
}
}

func Test_batchConsumer_process(t *testing.T) {
t.Run("When_Processing_Is_Successful", func(t *testing.T) {
// Given
Expand Down
5 changes: 2 additions & 3 deletions consumer_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func (c *base) startConsume() {
for {
select {
case <-c.quit:
close(c.incomingMessageStream)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before, we had closed this channel in Stop() method, but this is not the correct way to manage channels.

If a goroutine writes to a channel, It should be that goroutine's responsibility to close the channel, too.

return
default:
m := kafkaMessagePool.Get().(*kafka.Message)
Expand All @@ -134,7 +135,7 @@ func (c *base) startConsume() {
if c.context.Err() != nil {
continue
}
c.logger.Errorf("Message could not read, err %s", err.Error())
c.logger.Warnf("Message could not read, err %s", err.Error())
continue
}

Expand All @@ -159,9 +160,7 @@ func (c *base) Stop() error {
c.subprocesses.Stop()
c.cancelFn()
c.quit <- struct{}{}
close(c.incomingMessageStream)
c.wg.Wait()

err = c.r.Close()
})

Expand Down
4 changes: 4 additions & 0 deletions consumer_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"sync"
"testing"
"time"

"github.com/google/go-cmp/cmp"

Expand All @@ -26,6 +27,9 @@ func Test_base_startConsume(t *testing.T) {

// When
go b.startConsume()

// Ensure some time passes
time.Sleep(3 * time.Second)
b.quit <- struct{}{}

// Then
Expand Down
3 changes: 3 additions & 0 deletions consumer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ type ReaderConfig kafka.ReaderConfig

type BatchConsumeFn func([]*Message) error

type PreBatchFn func([]*Message) []*Message

type ConsumeFn func(*Message) error

type DialConfig struct {
Expand Down Expand Up @@ -124,6 +126,7 @@ type RetryConfiguration struct {

type BatchConfiguration struct {
BatchConsumeFn BatchConsumeFn
PreBatchFn PreBatchFn
MessageGroupLimit int
}

Expand Down
90 changes: 90 additions & 0 deletions examples/with-kafka-batch-consumer-with-prebatch/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package main

import (
"encoding/json"
"fmt"
"github.com/Trendyol/kafka-konsumer/v2"
"os"
"os/signal"
"time"
)

func main() {
consumerCfg := &kafka.ConsumerConfig{
Reader: kafka.ReaderConfig{
Brokers: []string{"localhost:29092"},
Topic: "standart-topic",
GroupID: "standart-cg",
},
BatchConfiguration: &kafka.BatchConfiguration{
MessageGroupLimit: 1000,
BatchConsumeFn: batchConsumeFn,
PreBatchFn: preBatch,
},
RetryEnabled: true,
RetryConfiguration: kafka.RetryConfiguration{
Brokers: []string{"localhost:29092"},
Topic: "retry-topic",
StartTimeCron: "*/1 * * * *",
WorkDuration: 50 * time.Second,
MaxRetry: 3,
},
MessageGroupDuration: time.Second,
}

consumer, _ := kafka.NewConsumer(consumerCfg)
defer consumer.Stop()

consumer.Consume()

fmt.Println("Consumer started...!")
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
}

type MessageValue struct {
Id int `json:"id"`
Version int `json:"version"`
Payload string `json:"payload"`
KafkaMessage *kafka.Message `json:"-"`
}

func preBatch(messages []*kafka.Message) []*kafka.Message {
latestMessageById := getLatestMessageByID(messages)

return convertLatestMessageList(latestMessageById)
}

func getLatestMessageByID(messages []*kafka.Message) map[int]MessageValue {
latestMessageById := make(map[int]MessageValue, len(messages))
for i := range messages {
var mv MessageValue
json.Unmarshal(messages[i].Value, &mv)
mv.KafkaMessage = messages[i]

val, ok := latestMessageById[mv.Id]
if !ok {
latestMessageById[mv.Id] = mv
} else if mv.Version > val.Version {
latestMessageById[mv.Id] = mv
}
}
return latestMessageById
}

func convertLatestMessageList(latestMessageById map[int]MessageValue) []*kafka.Message {
result := make([]*kafka.Message, 0, len(latestMessageById))
for _, latestMessage := range latestMessageById {
result = append(result, latestMessage.KafkaMessage)
}
return result
}

// In order to load topic with data, use:
// kafka-console-producer --broker-list localhost:29092 --topic standart-topic < examples/with-kafka-batch-consumer-with-prebatch/testdata/messages.txt
func batchConsumeFn(messages []*kafka.Message) error {
fmt.Printf("length of %d messages, first message value is %s \n", len(messages), messages[0].Value)

return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{ "id": 1, "version": 1, "payload": "foo" }
{ "id": 1, "version": 2, "payload": "foobar" }
{ "id": 1, "version": 3, "payload": "foobarfoobar" }
{ "id": 2, "version": 1, "payload": "x" }
{ "id": 3, "version": 2, "payload": "y" }
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ require (
github.com/Trendyol/otel-kafka-konsumer v0.0.7
github.com/ansrivas/fiberprometheus/v2 v2.6.1
github.com/gofiber/fiber/v2 v2.50.0
github.com/google/go-cmp v0.6.0
github.com/prometheus/client_golang v1.16.0
github.com/segmentio/kafka-go v0.4.46
github.com/segmentio/kafka-go v0.4.47
go.opentelemetry.io/otel v1.19.0
go.opentelemetry.io/otel/trace v1.19.0
go.uber.org/zap v1.24.0
Expand All @@ -22,7 +23,6 @@ require (
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gofiber/adaptor/v2 v2.2.1 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/uuid v1.3.1 // indirect
github.com/klauspost/compress v1.17.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
Expand Down
5 changes: 2 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4=
Expand Down Expand Up @@ -64,8 +63,8 @@ github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis=
github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/segmentio/kafka-go v0.4.46 h1:Sx8/kvtY+/G8nM0roTNnFezSJj3bT2sW0Xy/YY3CgBI=
github.com/segmentio/kafka-go v0.4.46/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg=
github.com/segmentio/kafka-go v0.4.47 h1:IqziR4pA3vrZq7YdRxaT3w1/5fvIH5qpCwstUanQQB0=
github.com/segmentio/kafka-go v0.4.47/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
Expand Down
2 changes: 1 addition & 1 deletion test/integration/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ replace github.com/Trendyol/kafka-konsumer/v2 => ../..

require (
github.com/Trendyol/kafka-konsumer/v2 v2.0.4
github.com/segmentio/kafka-go v0.4.46
github.com/segmentio/kafka-go v0.4.47
)

require (
Expand Down
Loading
Loading