From 4777bc62d10993988ec10fffdf800bb204d0c20a Mon Sep 17 00:00:00 2001 From: neil-xie <104041627+neil-xie@users.noreply.github.com> Date: Thu, 4 Jan 2024 10:57:41 -0800 Subject: [PATCH] Add unit test for consumer impl (#5573) * Add unit test for consumer impl * A little refactoring --- .../{consumerImpl.go => consumer_impl.go} | 11 +- common/messaging/kafka/consumer_impl_test.go | 216 ++++++++++++++++++ 2 files changed, 221 insertions(+), 6 deletions(-) rename common/messaging/kafka/{consumerImpl.go => consumer_impl.go} (98%) create mode 100644 common/messaging/kafka/consumer_impl_test.go diff --git a/common/messaging/kafka/consumerImpl.go b/common/messaging/kafka/consumer_impl.go similarity index 98% rename from common/messaging/kafka/consumerImpl.go rename to common/messaging/kafka/consumer_impl.go index 683492f3892..81e4fd0cb43 100644 --- a/common/messaging/kafka/consumerImpl.go +++ b/common/messaging/kafka/consumer_impl.go @@ -185,7 +185,7 @@ func (h *consumerHandlerImpl) getCurrentSession() sarama.ConsumerGroupSession { return h.currentSession } -func (h *consumerHandlerImpl) completeMessage(message *messageImpl, isAck bool) { +func (h *consumerHandlerImpl) completeMessage(message *messageImpl, isAck bool) error { h.RLock() defer h.RUnlock() @@ -215,9 +215,10 @@ func (h *consumerHandlerImpl) completeMessage(message *messageImpl, isAck bool) h.logger.Error("Failed to complete an message that hasn't been added to the partition", tag.KafkaPartition(message.Partition()), tag.KafkaOffset(message.Offset())) - return + return err } h.currentSession.MarkOffset(h.topic, message.Partition(), ackLevel+1, "") + return nil } // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited @@ -265,16 +266,14 @@ func (m *messageImpl) Ack() error { if m.isFromPreviousSession() { return nil } - m.handler.completeMessage(m, true) - return nil + return m.handler.completeMessage(m, true) } func (m *messageImpl) Nack() error { if m.isFromPreviousSession() { return nil } - m.handler.completeMessage(m, false) - return nil + return m.handler.completeMessage(m, false) } func (m *messageImpl) isFromPreviousSession() bool { diff --git a/common/messaging/kafka/consumer_impl_test.go b/common/messaging/kafka/consumer_impl_test.go new file mode 100644 index 00000000000..8ae22535881 --- /dev/null +++ b/common/messaging/kafka/consumer_impl_test.go @@ -0,0 +1,216 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package kafka + +import ( + "context" + "testing" + + "github.com/Shopify/sarama" + "github.com/Shopify/sarama/mocks" + "github.com/stretchr/testify/assert" + "github.com/uber-go/tally" + + "github.com/uber/cadence/common/config" + "github.com/uber/cadence/common/log/testlogger" + "github.com/uber/cadence/common/messaging" + "github.com/uber/cadence/common/metrics" +) + +func TestNewConsumer(t *testing.T) { + mockProducer := mocks.NewSyncProducer(t, nil) + group := "tests" + mockBroker := initMockBroker(t, group) + defer mockBroker.Close() + brokerAddr := []string{mockBroker.Addr()} + kafkaConfig := &config.KafkaConfig{ + Clusters: map[string]config.ClusterConfig{ + "test-cluster": { + Brokers: brokerAddr, + }, + }, + Topics: map[string]config.TopicConfig{ + "test-topic": { + Cluster: "test-cluster", + }, + "test-topic-dlq": { + Cluster: "test-cluster", + }, + }, + Applications: map[string]config.TopicList{ + "test-app": { + Topic: "test-topic", + DLQTopic: "test-topic-dlq", + }, + }, + } + topic := "test-topic" + consumerName := "test-consumer" + metricsClient := metrics.NewClient(tally.NoopScope, metrics.History) + logger := testlogger.New(t) + kafkaProducer := NewKafkaProducer(topic, mockProducer, logger) + consumer, err := newKafkaConsumer(kafkaProducer, kafkaConfig, topic, consumerName, + nil, metricsClient, logger) + assert.NoError(t, err, "An error was not expected but got %v", err) + assert.NotNil(t, consumer, "Expected consumer but got nil") + + err = consumer.Start() + assert.NoError(t, err) + + consumer.Stop() +} + +func TestNewConsumerHandlerImpl(t *testing.T) { + topic := "test-topic" + metricsClient := metrics.NewClient(tally.NoopScope, metrics.History) + logger := testlogger.New(t) + mockProducer := mocks.NewSyncProducer(t, nil) + kafkaProducer := NewKafkaProducer(topic, mockProducer, logger) + msgChan := make(chan messaging.Message, 1) + consumerHandler := newConsumerHandlerImpl(kafkaProducer, topic, msgChan, metricsClient, logger) + + assert.NotNil(t, consumerHandler) + assert.Equal(t, kafkaProducer, consumerHandler.dlqProducer) + assert.Equal(t, topic, consumerHandler.topic) + // Close the channel at the end of the test + close(msgChan) +} + +// test multiple methods related to messageImpl since setup is repeated +func TestMessageImpl(t *testing.T) { + topic := "test-topic" + metricsClient := metrics.NewClient(tally.NoopScope, metrics.History) + logger := testlogger.New(t) + mockProducer := mocks.NewSyncProducer(t, nil) + kafkaProducer := NewKafkaProducer(topic, mockProducer, logger) + msgChan := make(chan messaging.Message, 1) + consumerHandler := newConsumerHandlerImpl(kafkaProducer, topic, msgChan, metricsClient, logger) + consumerGroupSession := NewMockConsumerGroupSession(int32(1)) + partition := int32(100) + offset := int64(0) + msgImpl := &messageImpl{ + saramaMsg: &sarama.ConsumerMessage{ + Topic: topic, + Partition: partition, + Offset: offset, + }, + session: consumerGroupSession, + handler: consumerHandler, + logger: logger, + } + + // Ack message that is from a previous session + err := msgImpl.handler.Setup(NewMockConsumerGroupSession(int32(2))) + assert.NoError(t, err) + err = msgImpl.Ack() + assert.NoError(t, err) + + // normal case + err = msgImpl.handler.Setup(NewMockConsumerGroupSession(int32(1))) + assert.NoError(t, err) + msgImpl.handler.manager.AddMessage(partition, offset) + err = msgImpl.Ack() + assert.NoError(t, err) + + // Nack message that is from a previous session + err = msgImpl.handler.Setup(NewMockConsumerGroupSession(int32(2))) + assert.NoError(t, err) + err = msgImpl.Nack() + assert.NoError(t, err) + + // normal case + err = msgImpl.handler.Setup(NewMockConsumerGroupSession(int32(1))) + assert.NoError(t, err) + mockProducer.ExpectSendMessageAndSucceed() + msgImpl.handler.manager.AddMessage(partition, offset) + err = msgImpl.Nack() + assert.NoError(t, err) + + close(msgChan) +} + +// MockConsumerGroupSession implements sarama.ConsumerGroupSession for testing purposes. +type MockConsumerGroupSession struct { + claims map[string][]int32 + memberID string + generationID int32 + offsets map[string]map[int32]int64 + commitCalled bool + context context.Context +} + +func NewMockConsumerGroupSession(generationID int32) *MockConsumerGroupSession { + return &MockConsumerGroupSession{ + claims: map[string][]int32{}, + memberID: "test-member", + generationID: generationID, + offsets: map[string]map[int32]int64{}, + commitCalled: false, + } +} + +func (m *MockConsumerGroupSession) Claims() map[string][]int32 { + return m.claims +} + +func (m *MockConsumerGroupSession) MemberID() string { + return m.memberID +} + +func (m *MockConsumerGroupSession) GenerationID() int32 { + return m.generationID +} + +func (m *MockConsumerGroupSession) MarkOffset(topic string, partition int32, offset int64, metadata string) { + // not needed for testing +} + +func (m *MockConsumerGroupSession) Commit() { + // not needed for testing +} + +func (m *MockConsumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string) { + // not needed for testing +} + +func (m *MockConsumerGroupSession) MarkMessage(msg *sarama.ConsumerMessage, metadata string) { + // not needed for testing +} + +func (m *MockConsumerGroupSession) Context() context.Context { + // Return a context, you can use context.Background() or a custom context if needed + return m.context +} + +func initMockBroker(t *testing.T, group string) *sarama.MockBroker { + topics := []string{"test-topic"} + mockBroker := sarama.NewMockBroker(t, 0) + + mockBroker.SetHandlerByMap(map[string]sarama.MockResponse{ + "MetadataRequest": sarama.NewMockMetadataResponse(t). + SetBroker(mockBroker.Addr(), mockBroker.BrokerID()). + SetLeader(topics[0], 0, mockBroker.BrokerID()). + SetController(mockBroker.BrokerID()), + }) + return mockBroker +}