Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add preBatch feature #83

Merged
merged 12 commits into from
Dec 18, 2023
20 changes: 11 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,26 +212,26 @@ under [the specified folder](examples/with-sasl-plaintext) and then start the ap

| config | description | default |
|--------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------|-----------------------------|
| `reader` | [Describes all segmentio kafka reader configurations](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.42#ReaderConfig) | |
| `reader` | [Describes all segmentio kafka reader configurations](https://pkg.go.dev/github.com/segmentio/kafka-go#ReaderConfig) | |
| `consumeFn` | Kafka consumer function, if retry enabled it, is also used to consume retriable messages | |
| `logLevel` | Describes log level; valid options are `debug`, `info`, `warn`, and `error` | info |
| `concurrency` | Number of goroutines used at listeners | 1 |
| `retryEnabled` | Retry/Exception consumer is working or not | false |
| `transactionalRetry` | Set false if you want to use exception/retry strategy to only failed messages | true |
| `commitInterval` | indicates the interval at which offsets are committed to the broker. | 1s |
| `rack` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#RackAffinityGroupBalancer) | |
| `clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.42#Dialer) | |
| `clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#Dialer) | |
| `messageGroupDuration` | Maximum time to wait for a batch | |
| `dial.Timeout` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.42#Dialer) | no timeout |
| `dial.KeepAlive` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.42#Dialer) | not enabled |
| `transport.DialTimeout ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.42#Transport) | 5s |
| `transport.IdleTimeout ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.42#Transport) | 30s |
| `transport.MetadataTTL ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.42#Transport) | 6s |
| `transport.MetadataTopics ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.42#Transport) | all topics in cluster |
| `dial.Timeout` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#Dialer) | no timeout |
| `dial.KeepAlive` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#Dialer) | not enabled |
| `transport.DialTimeout ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@#Transport) | 5s |
| `transport.IdleTimeout ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@#Transport) | 30s |
| `transport.MetadataTTL ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@#Transport) | 6s |
| `transport.MetadataTopics ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@#Transport) | all topics in cluster |
| `distributedTracingEnabled` | indicates open telemetry support on/off for consume and produce operations. | false |
| `distributedTracingConfiguration.TracerProvider` | [see doc](https://opentelemetry.io/docs/specs/otel/trace/api/) | otel.GetTracerProvider() |
| `distributedTracingConfiguration.Propagator` | [see doc](https://opentelemetry.io/docs/specs/otel/trace/api/) | otel.GetTextMapPropagator() |
| `retryConfiguration.clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.42#Transport) | |
| `retryConfiguration.clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@#Transport) | |
| `retryConfiguration.startTimeCron` | Cron expression when retry consumer ([kafka-cronsumer](https://github.com/Trendyol/kafka-cronsumer#configurations)) starts to work at | |
| `retryConfiguration.workDuration` | Work duration exception consumer actively consuming messages | |
| `retryConfiguration.topic` | Retry/Exception topic names | |
Expand All @@ -243,6 +243,8 @@ under [the specified folder](examples/with-sasl-plaintext) and then start the ap
| `retryConfiguration.sasl.username` | SCRAM OR PLAIN username | |
| `retryConfiguration.sasl.password` | SCRAM OR PLAIN password | |
| `batchConfiguration.messageGroupLimit` | Maximum number of messages in a batch | |
| `batchConfiguration.batchConsumeFn` | Kafka batch consumer function, if retry enabled it, is also used to consume retriable messages | |
emreodabas marked this conversation as resolved.
Show resolved Hide resolved
| `batchConfiguration.preBatch` | This function enable for transforming messages before batch consuming starts | |
| `tls.rootCAPath` | [see doc](https://pkg.go.dev/crypto/tls#Config.RootCAs) | "" |
| `tls.intermediateCAPath` | Same with rootCA, if you want to specify two rootca you can use it with rootCAPath | "" |
| `sasl.authType` | `SCRAM` or `PLAIN` | |
Expand Down
12 changes: 10 additions & 2 deletions batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import (
type batchConsumer struct {
*base

consumeFn func([]*Message) error
consumeFn BatchConsumeFn
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already defined func([]*Message) error alias for this.

preBatchFn PreBatchFn

messageGroupLimit int
}
Expand All @@ -25,6 +26,7 @@ func newBatchConsumer(cfg *ConsumerConfig) (Consumer, error) {
c := batchConsumer{
base: consumerBase,
consumeFn: cfg.BatchConfiguration.BatchConsumeFn,
preBatchFn: cfg.BatchConfiguration.PreBatchFn,
messageGroupLimit: cfg.BatchConfiguration.MessageGroupLimit,
}

Expand All @@ -47,9 +49,11 @@ func (b *batchConsumer) GetMetric() *ConsumerMetric {

func (b *batchConsumer) Consume() {
go b.subprocesses.Start()

b.wg.Add(1)
go b.startConsume()

b.wg.Add(b.concurrency)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For more readability, I extract wg.Add within setupConcurrentWorkers method here

b.setupConcurrentWorkers()

b.wg.Add(1)
Expand Down Expand Up @@ -92,7 +96,6 @@ func (b *batchConsumer) startBatch() {

func (b *batchConsumer) setupConcurrentWorkers() {
for i := 0; i < b.concurrency; i++ {
b.wg.Add(1)
go func() {
defer b.wg.Done()
for messages := range b.batchConsumingStream {
Expand Down Expand Up @@ -125,6 +128,11 @@ func chunkMessages(allMessages *[]*Message, chunkSize int) [][]*Message {
func (b *batchConsumer) consume(allMessages *[]*Message, commitMessages *[]kafka.Message) {
chunks := chunkMessages(allMessages, b.messageGroupLimit)

if b.preBatchFn != nil {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to let you know, I only changed chunks, not all messages. Because allMessages is used for commitMessages. If I mutate, I have some issues of offset/lag, etc.

preBatchResult := b.preBatchFn(*allMessages)
chunks = chunkMessages(&preBatchResult, b.messageGroupLimit)
}

// Send the messages to process
for _, chunk := range chunks {
b.batchConsumingStream <- chunk
Expand Down
3 changes: 1 addition & 2 deletions consumer_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func (c *base) startConsume() {
for {
select {
case <-c.quit:
close(c.incomingMessageStream)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before, we had closed this channel in Stop() method, but this is not the correct way to manage channels.

If a goroutine writes to a channel, It should be that goroutine's responsibility to close the channel, too.

return
default:
m := kafkaMessagePool.Get().(*kafka.Message)
Expand Down Expand Up @@ -159,9 +160,7 @@ func (c *base) Stop() error {
c.subprocesses.Stop()
c.cancelFn()
c.quit <- struct{}{}
close(c.incomingMessageStream)
c.wg.Wait()

err = c.r.Close()
})

Expand Down
3 changes: 2 additions & 1 deletion consumer_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package kafka
import (
"context"
"errors"
"github.com/google/go-cmp/cmp"
"sync"
"testing"

"github.com/google/go-cmp/cmp"

"github.com/segmentio/kafka-go"
)

Expand Down
3 changes: 3 additions & 0 deletions consumer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ type ReaderConfig kafka.ReaderConfig

type BatchConsumeFn func([]*Message) error

type PreBatchFn func([]*Message) []*Message

type ConsumeFn func(*Message) error

type DialConfig struct {
Expand Down Expand Up @@ -124,6 +126,7 @@ type RetryConfiguration struct {

type BatchConfiguration struct {
BatchConsumeFn BatchConsumeFn
PreBatchFn PreBatchFn
MessageGroupLimit int
}

Expand Down
90 changes: 90 additions & 0 deletions examples/with-kafka-batch-consumer-with-prebatch/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package main

import (
"encoding/json"
"fmt"
"github.com/Trendyol/kafka-konsumer/v2"
"os"
"os/signal"
"time"
)

func main() {
consumerCfg := &kafka.ConsumerConfig{
Reader: kafka.ReaderConfig{
Brokers: []string{"localhost:29092"},
Topic: "standart-topic",
GroupID: "standart-cg",
},
BatchConfiguration: &kafka.BatchConfiguration{
MessageGroupLimit: 1000,
BatchConsumeFn: batchConsumeFn,
PreBatchFn: preBatch,
},
RetryEnabled: true,
RetryConfiguration: kafka.RetryConfiguration{
Brokers: []string{"localhost:29092"},
Topic: "retry-topic",
StartTimeCron: "*/1 * * * *",
WorkDuration: 50 * time.Second,
MaxRetry: 3,
},
MessageGroupDuration: time.Second,
}

consumer, _ := kafka.NewConsumer(consumerCfg)
defer consumer.Stop()

consumer.Consume()

fmt.Println("Consumer started...!")
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
}

type MessageValue struct {
Id int `json:"id"`
Version int `json:"version"`
Payload string `json:"payload"`
KafkaMessage *kafka.Message `json:"-"`
}

func preBatch(messages []*kafka.Message) []*kafka.Message {
latestMessageById := getLatestMessageByID(messages)

return convertLatestMessageList(latestMessageById)
}

func getLatestMessageByID(messages []*kafka.Message) map[int]MessageValue {
latestMessageById := make(map[int]MessageValue, len(messages))
for i := range messages {
var mv MessageValue
json.Unmarshal(messages[i].Value, &mv)
mv.KafkaMessage = messages[i]

val, ok := latestMessageById[mv.Id]
if !ok {
latestMessageById[mv.Id] = mv
} else if mv.Version > val.Version {
latestMessageById[mv.Id] = mv
}
}
return latestMessageById
}

func convertLatestMessageList(latestMessageById map[int]MessageValue) []*kafka.Message {
result := make([]*kafka.Message, 0, len(latestMessageById))
for _, latestMessage := range latestMessageById {
result = append(result, latestMessage.KafkaMessage)
}
return result
}

// In order to load topic with data, use:
// kafka-console-producer --broker-list localhost:29092 --topic standart-topic < examples/with-kafka-batch-consumer-with-prebatch/testdata/messages.txt
func batchConsumeFn(messages []*kafka.Message) error {
fmt.Printf("length of %d messages, first message value is %s \n", len(messages), messages[0].Value)

return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{ "id": 1, "version": 1, "payload": "foo" }
{ "id": 1, "version": 2, "payload": "foobar" }
{ "id": 1, "version": 3, "payload": "foobarfoobar" }
{ "id": 2, "version": 1, "payload": "x" }
{ "id": 3, "version": 2, "payload": "y" }
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/ansrivas/fiberprometheus/v2 v2.6.1
github.com/gofiber/fiber/v2 v2.50.0
github.com/prometheus/client_golang v1.16.0
github.com/segmentio/kafka-go v0.4.46
github.com/segmentio/kafka-go v0.4.47
go.opentelemetry.io/otel v1.19.0
go.opentelemetry.io/otel/trace v1.19.0
go.uber.org/zap v1.24.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/segmentio/kafka-go v0.4.46 h1:Sx8/kvtY+/G8nM0roTNnFezSJj3bT2sW0Xy/YY3CgBI=
github.com/segmentio/kafka-go v0.4.46/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg=
github.com/segmentio/kafka-go v0.4.47 h1:IqziR4pA3vrZq7YdRxaT3w1/5fvIH5qpCwstUanQQB0=
github.com/segmentio/kafka-go v0.4.47/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
Expand Down
Loading