Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add producer interceptor interface #145

Merged
merged 7 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 25 additions & 9 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ type Writer interface {
}

type producer struct {
w Writer
w Writer
interceptors []ProducerInterceptor
}

func NewProducer(cfg *ProducerConfig) (Producer, error) {
func NewProducer(cfg *ProducerConfig, interceptors ...ProducerInterceptor) (Producer, error) {
kafkaWriter := &kafka.Writer{
Addr: kafka.TCP(cfg.Writer.Brokers...),
Topic: cfg.Writer.Topic,
Expand Down Expand Up @@ -51,7 +52,7 @@ func NewProducer(cfg *ProducerConfig) (Producer, error) {
kafkaWriter.Transport = transport
}

p := &producer{w: kafkaWriter}
p := &producer{w: kafkaWriter, interceptors: interceptors}

if cfg.DistributedTracingEnabled {
otelWriter, err := NewOtelProducer(cfg, kafkaWriter)
Expand All @@ -64,18 +65,33 @@ func NewProducer(cfg *ProducerConfig) (Producer, error) {
return p, nil
}

func (c *producer) Produce(ctx context.Context, message Message) error {
return c.w.WriteMessages(ctx, message.toKafkaMessage())
func (p *producer) Produce(ctx context.Context, message Message) error {
if len(p.interceptors) > 0 {
p.executeInterceptors(ctx, &message)
}

return p.w.WriteMessages(ctx, message.toKafkaMessage())
}

func (c *producer) ProduceBatch(ctx context.Context, messages []Message) error {
func (p *producer) ProduceBatch(ctx context.Context, messages []Message) error {
kafkaMessages := make([]kafka.Message, 0, len(messages))
for i := range messages {
if len(p.interceptors) > 0 {
p.executeInterceptors(ctx, &messages[i])
}

kafkaMessages = append(kafkaMessages, messages[i].toKafkaMessage())
}
return c.w.WriteMessages(ctx, kafkaMessages...)

return p.w.WriteMessages(ctx, kafkaMessages...)
}

func (p *producer) executeInterceptors(ctx context.Context, message *Message) {
for _, interceptor := range p.interceptors {
interceptor.OnProduce(ProducerInterceptorContext{Context: ctx, Message: message})
}
}

func (c *producer) Close() error {
return c.w.Close()
func (p *producer) Close() error {
return p.w.Close()
}
14 changes: 14 additions & 0 deletions producer_interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package kafka

import (
"context"
)

type ProducerInterceptorContext struct {
Context context.Context
Message *Message
}

type ProducerInterceptor interface {
OnProduce(ctx ProducerInterceptorContext)
}
51 changes: 50 additions & 1 deletion producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"testing"

"github.com/gofiber/fiber/v2/utils"

"github.com/segmentio/kafka-go"
)

Expand All @@ -20,6 +22,26 @@ func Test_producer_Produce_Successfully(t *testing.T) {
}
}

func Test_producer_Produce_interceptor_Successfully(t *testing.T) {
// Given
mw := &mockWriter{}
msg := Message{Headers: make([]Header, 0)}
msg.Headers = append(msg.Headers, kafka.Header{
Key: "x-correlation-id",
Value: []byte(utils.UUIDv4()),
})
interceptor := newMockProducerInterceptor()

p := producer{w: mw, interceptors: interceptor}

// When
err := p.Produce(context.Background(), msg)
// Then
ademekici marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
t.Fatalf("Producing err %s", err.Error())
}
}

func Test_producer_ProduceBatch_Successfully(t *testing.T) {
// Given
mw := &mockWriter{}
Expand All @@ -33,6 +55,20 @@ func Test_producer_ProduceBatch_Successfully(t *testing.T) {
}
}

func Test_producer_ProduceBatch_interceptor_Successfully(t *testing.T) {
// Given
mw := &mockWriter{}
interceptor := newMockProducerInterceptor()
p := producer{w: mw, interceptors: interceptor}

// When
err := p.ProduceBatch(context.Background(), []Message{{}, {}, {}})
// Then
if err != nil {
t.Fatalf("Batch Producing err %s", err.Error())
}
}

func Test_producer_Close_Successfully(t *testing.T) {
// Given
mw := &mockWriter{}
Expand All @@ -48,10 +84,23 @@ func Test_producer_Close_Successfully(t *testing.T) {

type mockWriter struct{}

func (m *mockWriter) WriteMessages(_ context.Context, _ ...kafka.Message) error {
func (m *mockWriter) WriteMessages(_ context.Context, msg ...kafka.Message) error {
return nil
}

func (m *mockWriter) Close() error {
return nil
}

type mockProducerInterceptor struct{}

func (i *mockProducerInterceptor) OnProduce(ctx ProducerInterceptorContext) {
ctx.Message.Headers = append(ctx.Message.Headers, kafka.Header{
Key: "test",
Value: []byte("test"),
})
}

func newMockProducerInterceptor() []ProducerInterceptor {
return []ProducerInterceptor{&mockProducerInterceptor{}}
}
151 changes: 124 additions & 27 deletions test/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,40 +14,99 @@ import (
func Test_Should_Produce_Successfully(t *testing.T) {
// Given
t.Parallel()
topic := "produce-topic"
brokerAddress := "localhost:9092"

producer, _ := kafka.NewProducer(&kafka.ProducerConfig{
Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}},
Transport: &kafka.TransportConfig{
MetadataTopics: []string{
topic,
t.Run("without interceptor", func(t *testing.T) {
//Given

topic := "produce-topic"
producer, _ := kafka.NewProducer(&kafka.ProducerConfig{
Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}},
Transport: &kafka.TransportConfig{
MetadataTopics: []string{
topic,
},
},
},
})
})

// When
err := producer.Produce(context.Background(), kafka.Message{
Key: []byte("1"),
Value: []byte(`foo`),
// When
err := producer.Produce(context.Background(), kafka.Message{
Key: []byte("1"),
Value: []byte(`foo`),
})

// Then
if err != nil {
t.Fatalf("Error while producing err %s", err.Error())
}
})

// Then
if err != nil {
t.Fatalf("Error while producing err %s", err.Error())
}
t.Run("with interceptor", func(t *testing.T) {
// Given
topic := "produce-interceptor-topic"
consumerGroup := "produce-topic-cg"
interceptor := newMockProducerInterceptor()

producer, _ := kafka.NewProducer(&kafka.ProducerConfig{
Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}},
Transport: &kafka.TransportConfig{
MetadataTopics: []string{
topic,
},
},
}, interceptor...)

// When
err := producer.Produce(context.Background(), kafka.Message{
Key: []byte("1"),
Value: []byte(`foo`),
})

messageCh := make(chan *kafka.Message)

consumerCfg := &kafka.ConsumerConfig{
Reader: kafka.ReaderConfig{Brokers: []string{brokerAddress}, Topic: topic, GroupID: consumerGroup},
ConsumeFn: func(message *kafka.Message) error {
messageCh <- message
return nil
},
}

consumer, _ := kafka.NewConsumer(consumerCfg)
defer consumer.Stop()

consumer.Consume()

// Then

if err != nil {
t.Fatalf("Error while producing err %s", err.Error())
}

actual := <-messageCh
if string(actual.Value) != "foo" {
t.Fatalf("Value does not equal %s", actual.Value)
}
if string(actual.Key) != "1" {
t.Fatalf("Key does not equal %s", actual.Key)
}
if len(actual.Headers) != 1 {
t.Fatalf("Header size does not equal %d", len(actual.Headers))
}
if string(actual.Headers[0].Key) != xSourceAppKey {
t.Fatalf("Header key does not equal %s", actual.Headers[0].Key)
}
if string(actual.Headers[0].Value) != xSourceAppValue {
t.Fatalf("Header value does not equal %s", actual.Headers[0].Value)
}
})
}

func Test_Should_Batch_Produce_Successfully(t *testing.T) {
// Given
t.Parallel()
topic := "batch-produce-topic"
brokerAddress := "localhost:9092"

producer, _ := kafka.NewProducer(&kafka.ProducerConfig{
Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}}})

// When
msgs := []kafka.Message{
{
Key: []byte("1"),
Expand All @@ -59,13 +118,33 @@ func Test_Should_Batch_Produce_Successfully(t *testing.T) {
},
}

// When
err := producer.ProduceBatch(context.Background(), msgs)
t.Run("without interceptor", func(t *testing.T) {
producer, _ := kafka.NewProducer(&kafka.ProducerConfig{
Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}}})

// Then
if err != nil {
t.Fatalf("Error while producing err %s", err.Error())
}
// When
err := producer.ProduceBatch(context.Background(), msgs)

// Then
if err != nil {
t.Fatalf("Error while producing err %s", err.Error())
}
})

t.Run("with interceptor", func(t *testing.T) {
interceptors := newMockProducerInterceptor()

producer, _ := kafka.NewProducer(&kafka.ProducerConfig{
Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}}}, interceptors...)

// When
err := producer.ProduceBatch(context.Background(), msgs)

// Then
if err != nil {
t.Fatalf("Error while producing err %s", err.Error())
}
})
}

func Test_Should_Consume_Message_Successfully(t *testing.T) {
Expand Down Expand Up @@ -563,3 +642,21 @@ func assertEventually(t *testing.T, condition func() bool, waitFor time.Duration
}
}
}

type mockProducerInterceptor struct{}

const (
xSourceAppKey = "x-source-app"
xSourceAppValue = "kafka-konsumer"
)

func (i *mockProducerInterceptor) OnProduce(ctx kafka.ProducerInterceptorContext) {
ctx.Message.Headers = append(ctx.Message.Headers, kafka.Header{
Key: xSourceAppKey,
Value: []byte(xSourceAppValue),
})
}

func newMockProducerInterceptor() []kafka.ProducerInterceptor {
return []kafka.ProducerInterceptor{&mockProducerInterceptor{}}
}
Loading