Skip to content
This repository has been archived by the owner on Sep 2, 2024. It is now read-only.

Commit

Permalink
Merge pull request #1 from pierDipi/kafka-ch-event-loss
Browse files Browse the repository at this point in the history
Change poll loop
  • Loading branch information
aliok authored Oct 7, 2021
2 parents b0c5c5f + bd188fc commit c807031
Show file tree
Hide file tree
Showing 17 changed files with 93 additions and 65 deletions.
2 changes: 1 addition & 1 deletion cmd/channel/distributed/dispatcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func main() {
MetricsRegistry: ekConfig.Sarama.Config.MetricRegistry,
SaramaConfig: ekConfig.Sarama.Config,
}
dispatcher, managerEvents := dispatch.NewDispatcher(dispatcherConfig, controlProtocolServer)
dispatcher, managerEvents := dispatch.NewDispatcher(dispatcherConfig, controlProtocolServer, func(ref types.NamespacedName) {})

// Create KafkaChannel Informer
kafkaClient := kafkaclientset.NewForConfigOrDie(k8sConfig)
Expand Down
12 changes: 6 additions & 6 deletions pkg/channel/consolidated/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package dispatcher
import (
"context"
"fmt"
nethttp "net/http"
"strings"
"sync"

Expand All @@ -29,14 +30,13 @@ import (
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"knative.dev/eventing-kafka/pkg/common/config"
"knative.dev/pkg/logging"

"knative.dev/eventing-kafka/pkg/common/config"

eventingchannels "knative.dev/eventing/pkg/channel"
"knative.dev/pkg/kmeta"

nethttp "net/http"

"knative.dev/eventing-kafka/pkg/channel/consolidated/utils"
"knative.dev/eventing-kafka/pkg/channel/distributed/common/env"
"knative.dev/eventing-kafka/pkg/common/consumer"
Expand Down Expand Up @@ -73,7 +73,7 @@ type KafkaDispatcher struct {
logger *zap.SugaredLogger
}

func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispatcher, error) {
func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs, enqueue func(ref types.NamespacedName)) (*KafkaDispatcher, error) {

producer, err := sarama.NewSyncProducer(args.Brokers, args.Config.Sarama.Config)
if err != nil {
Expand All @@ -82,7 +82,7 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat

dispatcher := &KafkaDispatcher{
dispatcher: eventingchannels.NewMessageDispatcher(logging.FromContext(ctx).Desugar()),
kafkaConsumerFactory: consumer.NewConsumerGroupFactory(args.Brokers, args.Config.Sarama.Config, &consumer.KafkaConsumerGroupOffsetsChecker{}),
kafkaConsumerFactory: consumer.NewConsumerGroupFactory(args.Brokers, args.Config.Sarama.Config, &consumer.KafkaConsumerGroupOffsetsChecker{}, enqueue),
channelSubscriptions: make(map[types.NamespacedName]*KafkaSubscription),
subsConsumerGroups: make(map[types.UID]sarama.ConsumerGroup),
subscriptions: make(map[types.UID]Subscription),
Expand Down Expand Up @@ -297,7 +297,7 @@ func (d *KafkaDispatcher) subscribe(ctx context.Context, channelRef types.Namesp
}
d.logger.Debugw("Starting consumer group", zap.Any("channelRef", channelRef),
zap.Any("subscription", sub.UID), zap.String("topic", topicName), zap.String("consumer group", groupID))
consumerGroup, err := d.kafkaConsumerFactory.StartConsumerGroup(ctx, groupID, []string{topicName}, handler)
consumerGroup, err := d.kafkaConsumerFactory.StartConsumerGroup(ctx, groupID, []string{topicName}, handler, channelRef)

if err != nil {
// we can not create a consumer - logging that, with reason
Expand Down
3 changes: 2 additions & 1 deletion pkg/channel/consolidated/dispatcher/dispatcher_it_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/cloudevents/sdk-go/v2/test"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/types"
"knative.dev/eventing/pkg/channel/fanout"
"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/pkg/apis"
Expand Down Expand Up @@ -82,7 +83,7 @@ func TestDispatcher(t *testing.T) {
}

// Create the dispatcher. At this point, if Kafka is not up, this thing fails
dispatcher, err := NewDispatcher(context.Background(), &dispatcherArgs)
dispatcher, err := NewDispatcher(context.Background(), &dispatcherArgs, func(ref types.NamespacedName) {})
if err != nil {
t.Skipf("no dispatcher: %v", err)
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/channel/consolidated/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"

"knative.dev/eventing-kafka/pkg/channel/consolidated/utils"
"knative.dev/eventing-kafka/pkg/common/consumer"
eventingchannels "knative.dev/eventing/pkg/channel"
"knative.dev/eventing/pkg/channel/fanout"
klogtesting "knative.dev/pkg/logging/testing"
_ "knative.dev/pkg/system/testing"

"knative.dev/eventing-kafka/pkg/channel/consolidated/utils"
"knative.dev/eventing-kafka/pkg/common/consumer"
)

// ----- Mocks
Expand All @@ -50,7 +51,7 @@ type mockKafkaConsumerFactory struct {
createErr bool
}

func (c mockKafkaConsumerFactory) StartConsumerGroup(ctx context.Context, groupID string, topics []string, handler consumer.KafkaConsumerHandler, options ...consumer.SaramaConsumerHandlerOption) (sarama.ConsumerGroup, error) {
func (c mockKafkaConsumerFactory) StartConsumerGroup(ctx context.Context, groupID string, topics []string, handler consumer.KafkaConsumerHandler, ref types.NamespacedName, options ...consumer.SaramaConsumerHandlerOption) (sarama.ConsumerGroup, error) {
if c.createErr {
return nil, errors.New("error creating consumer")
}
Expand Down Expand Up @@ -519,7 +520,7 @@ func TestNewDispatcher(t *testing.T) {
Brokers: []string{"localhost:10000"},
TopicFunc: utils.TopicName,
}
_, err := NewDispatcher(context.TODO(), args)
_, err := NewDispatcher(context.TODO(), args, func(ref types.NamespacedName) {})
if err == nil {
t.Errorf("Expected error want %s, got %s", "message receiver is not set", err)
}
Expand Down
16 changes: 9 additions & 7 deletions pkg/channel/consolidated/reconciler/dispatcher/kafkachannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,8 @@ func NewController(ctx context.Context, cmw configmap.Watcher) *controller.Impl
Config: kafkaConfig.EventingKafka,
TopicFunc: utils.TopicName,
}
kafkaDispatcher, err := dispatcher.NewDispatcher(ctx, args)
if err != nil {
logger.Fatalw("Unable to create kafka dispatcher", zap.Error(err))
}
logger.Info("Starting the Kafka dispatcher")
logger.Infow("Kafka broker configuration", zap.Strings("Brokers", kafkaConfig.Brokers))

r := &Reconciler{
kafkaDispatcher: kafkaDispatcher,
kafkaClientSet: kafkaclientsetinjection.Get(ctx),
kafkachannelLister: kafkaChannelInformer.Lister(),
kafkachannelInformer: kafkaChannelInformer.Informer(),
Expand All @@ -123,6 +116,15 @@ func NewController(ctx context.Context, cmw configmap.Watcher) *controller.Impl
return controller.Options{SkipStatusUpdates: true}
})

kafkaDispatcher, err := dispatcher.NewDispatcher(ctx, args, r.impl.EnqueueKey)
if err != nil {
logger.Fatalw("Unable to create kafka dispatcher", zap.Error(err))
}
logger.Info("Starting the Kafka dispatcher")
logger.Infow("Kafka broker configuration", zap.Strings("Brokers", kafkaConfig.Brokers))

r.kafkaDispatcher = kafkaDispatcher

logger.Info("Setting up event handlers")

// Watch for kafka channels.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,11 @@ func (r Reconciler) reconcile(ctx context.Context, channel *kafkav1beta1.KafkaCh
}

// Update The ConsumerGroups To Align With Current KafkaChannel Subscribers
subscriptions := r.dispatcher.UpdateSubscriptions(ctx, subscribers)
channelRef := types.NamespacedName{
Namespace: channel.GetNamespace(),
Name: channel.GetName(),
}
subscriptions := r.dispatcher.UpdateSubscriptions(ctx, channelRef, subscribers)

// Update The KafkaChannel Subscribable Status Based On ConsumerGroup Creation Status
channel.Status.SubscribableStatus = r.createSubscribableStatus(channel.Spec.Subscribers, subscriptions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ func (m *MockDispatcher) Shutdown() {
m.Called()
}

func (m *MockDispatcher) UpdateSubscriptions(_ context.Context, subscriberSpecs []eventingduck.SubscriberSpec) consumer.SubscriberStatusMap {
func (m *MockDispatcher) UpdateSubscriptions(_ context.Context, ref types.NamespacedName, subscriberSpecs []eventingduck.SubscriberSpec) consumer.SubscriberStatusMap {
args := m.Called(subscriberSpecs)
return args.Get(0).(consumer.SubscriberStatusMap)
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/channel/distributed/dispatcher/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func NewSubscriberWrapper(subscriberSpec eventingduck.SubscriberSpec, groupId st
type Dispatcher interface {
SecretChanged(ctx context.Context, secret *corev1.Secret)
Shutdown()
UpdateSubscriptions(ctx context.Context, subscriberSpecs []eventingduck.SubscriberSpec) commonconsumer.SubscriberStatusMap
UpdateSubscriptions(ctx context.Context, channelRef types.NamespacedName, subscriberSpecs []eventingduck.SubscriberSpec) commonconsumer.SubscriberStatusMap
}

// DispatcherImpl Is A Struct With Configuration & ConsumerGroup State
Expand All @@ -83,9 +83,9 @@ type DispatcherImpl struct {
var _ Dispatcher = &DispatcherImpl{}

// NewDispatcher Is The Dispatcher Constructor
func NewDispatcher(dispatcherConfig DispatcherConfig, controlServer controlprotocol.ServerHandler) (Dispatcher, <-chan commonconsumer.ManagerEvent) {
func NewDispatcher(dispatcherConfig DispatcherConfig, controlServer controlprotocol.ServerHandler, enqueue func(ref types.NamespacedName)) (Dispatcher, <-chan commonconsumer.ManagerEvent) {

consumerGroupManager := commonconsumer.NewConsumerGroupManager(dispatcherConfig.Logger, controlServer, dispatcherConfig.Brokers, dispatcherConfig.SaramaConfig, &commonconsumer.NoopConsumerGroupOffsetsChecker{})
consumerGroupManager := commonconsumer.NewConsumerGroupManager(dispatcherConfig.Logger, controlServer, dispatcherConfig.Brokers, dispatcherConfig.SaramaConfig, &commonconsumer.NoopConsumerGroupOffsetsChecker{}, enqueue)

// Create The DispatcherImpl With Specified Configuration
dispatcher := &DispatcherImpl{
Expand Down Expand Up @@ -123,7 +123,7 @@ func (d *DispatcherImpl) Shutdown() {
}

// UpdateSubscriptions manages the Dispatcher's Subscriptions to align with new state
func (d *DispatcherImpl) UpdateSubscriptions(ctx context.Context, subscriberSpecs []eventingduck.SubscriberSpec) commonconsumer.SubscriberStatusMap {
func (d *DispatcherImpl) UpdateSubscriptions(ctx context.Context, channelRef types.NamespacedName, subscriberSpecs []eventingduck.SubscriberSpec) commonconsumer.SubscriberStatusMap {

if d.SaramaConfig == nil {
d.Logger.Error("Dispatcher has no config!")
Expand Down Expand Up @@ -151,7 +151,7 @@ func (d *DispatcherImpl) UpdateSubscriptions(ctx context.Context, subscriberSpec

// Create/Start A New ConsumerGroup With Custom Handler
handler := NewHandler(logger, groupId, &subscriberSpec)
err := d.consumerMgr.StartConsumerGroup(ctx, groupId, []string{d.Topic}, handler)
err := d.consumerMgr.StartConsumerGroup(ctx, groupId, []string{d.Topic}, handler, channelRef)
if err != nil {

// Log & Return Failure
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func TestUpdateSubscriptions(t *testing.T) {
}

// Perform The Test
result := dispatcher.UpdateSubscriptions(ctx, testCase.args.subscriberSpecs)
result := dispatcher.UpdateSubscriptions(ctx, types.NamespacedName{}, testCase.args.subscriberSpecs)

close(errorSource)

Expand Down Expand Up @@ -572,7 +572,7 @@ func createTestDispatcher(t *testing.T, brokers []string, config *sarama.Config)
serverHandler.Service.On("SendAndWaitForAck", mock.Anything, mock.Anything).Return(nil)

// Create The Dispatcher
dispatcher, events := NewDispatcher(dispatcherConfig, serverHandler)
dispatcher, events := NewDispatcher(dispatcherConfig, serverHandler, func(ref types.NamespacedName) {})
assert.NotNil(t, events)
serverHandler.AssertExpectations(t)

Expand Down
30 changes: 16 additions & 14 deletions pkg/common/consumer/consumer_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/Shopify/sarama"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/types"
"knative.dev/pkg/logging"
)

Expand All @@ -33,13 +34,14 @@ type consumeFunc func(ctx context.Context, topics []string, handler sarama.Consu

// KafkaConsumerGroupFactory creates the ConsumerGroup and start consuming the specified topic
type KafkaConsumerGroupFactory interface {
StartConsumerGroup(ctx context.Context, groupID string, topics []string, handler KafkaConsumerHandler, options ...SaramaConsumerHandlerOption) (sarama.ConsumerGroup, error)
StartConsumerGroup(ctx context.Context, groupID string, topics []string, handler KafkaConsumerHandler, channelRef types.NamespacedName, options ...SaramaConsumerHandlerOption) (sarama.ConsumerGroup, error)
}

type kafkaConsumerGroupFactoryImpl struct {
config *sarama.Config
addrs []string
offsetsChecker ConsumerGroupOffsetsChecker
enqueue func(ref types.NamespacedName)
}

type customConsumerGroup struct {
Expand All @@ -66,7 +68,7 @@ func (c *customConsumerGroup) Close() error {
var _ sarama.ConsumerGroup = (*customConsumerGroup)(nil)

// StartConsumerGroup creates a new customConsumerGroup and starts a Consume goroutine on it
func (c kafkaConsumerGroupFactoryImpl) StartConsumerGroup(ctx context.Context, groupID string, topics []string, handler KafkaConsumerHandler, options ...SaramaConsumerHandlerOption) (sarama.ConsumerGroup, error) {
func (c kafkaConsumerGroupFactoryImpl) StartConsumerGroup(ctx context.Context, groupID string, topics []string, handler KafkaConsumerHandler, channelRef types.NamespacedName, options ...SaramaConsumerHandlerOption) (sarama.ConsumerGroup, error) {
logger := logging.FromContext(ctx)

consumerGroup, err := c.createConsumerGroup(groupID)
Expand All @@ -76,7 +78,7 @@ func (c kafkaConsumerGroupFactoryImpl) StartConsumerGroup(ctx context.Context, g
}

// Start the consumerGroup.Consume function in a separate goroutine
return c.startExistingConsumerGroup(groupID, consumerGroup, consumerGroup.Consume, topics, logger, handler, options...), nil
return c.startExistingConsumerGroup(groupID, consumerGroup, consumerGroup.Consume, topics, logger, handler, channelRef, options...), nil
}

// createConsumerGroup creates a Sarama ConsumerGroup using the newConsumerGroup wrapper, with the
Expand All @@ -87,14 +89,7 @@ func (c kafkaConsumerGroupFactoryImpl) createConsumerGroup(groupID string) (sara

// startExistingConsumerGroup creates a goroutine that begins a custom Consume loop on the provided ConsumerGroup
// This loop is cancelable via the function provided in the returned customConsumerGroup.
func (c kafkaConsumerGroupFactoryImpl) startExistingConsumerGroup(
groupID string,
saramaGroup sarama.ConsumerGroup,
consume consumeFunc,
topics []string,
logger *zap.SugaredLogger,
handler KafkaConsumerHandler,
options ...SaramaConsumerHandlerOption) *customConsumerGroup {
func (c kafkaConsumerGroupFactoryImpl) startExistingConsumerGroup(groupID string, saramaGroup sarama.ConsumerGroup, consume consumeFunc, topics []string, logger *zap.SugaredLogger, handler KafkaConsumerHandler, channelRef types.NamespacedName, options ...SaramaConsumerHandlerOption) *customConsumerGroup {

errorCh := make(chan error, 10)
releasedCh := make(chan bool)
Expand All @@ -105,8 +100,15 @@ func (c kafkaConsumerGroupFactoryImpl) startExistingConsumerGroup(
// do not proceed until the check is done
err := c.offsetsChecker.WaitForOffsetsInitialization(ctx, groupID, topics, logger, c.addrs, c.config)
if err != nil {
logger.Errorw("error while checking if offsets are initialized", zap.Any("topics", topics), zap.String("groupId", groupID), zap.Error(err))
logger.Errorw("error while checking if offsets are initialized",
zap.Any("topics", topics),
zap.String("groupId", groupID),
zap.String("channel", channelRef.String()),
zap.Error(err),
)
errorCh <- err
c.enqueue(channelRef)
return
}

logger.Debugw("all offsets are initialized", zap.Any("topics", topics), zap.Any("groupID", groupID))
Expand Down Expand Up @@ -136,8 +138,8 @@ func (c kafkaConsumerGroupFactoryImpl) startExistingConsumerGroup(
return &customConsumerGroup{cancel, errorCh, saramaGroup, releasedCh}
}

func NewConsumerGroupFactory(addrs []string, config *sarama.Config, offsetsChecker ConsumerGroupOffsetsChecker) KafkaConsumerGroupFactory {
return kafkaConsumerGroupFactoryImpl{addrs: addrs, config: config, offsetsChecker: offsetsChecker}
func NewConsumerGroupFactory(addrs []string, config *sarama.Config, offsetsChecker ConsumerGroupOffsetsChecker, enqueue func(ref types.NamespacedName)) KafkaConsumerGroupFactory {
return kafkaConsumerGroupFactoryImpl{addrs: addrs, config: config, offsetsChecker: offsetsChecker, enqueue: enqueue}
}

var _ KafkaConsumerGroupFactory = (*kafkaConsumerGroupFactoryImpl)(nil)
Expand Down
22 changes: 18 additions & 4 deletions pkg/common/consumer/consumer_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (

"github.com/Shopify/sarama"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/types"

controllertesting "knative.dev/eventing-kafka/pkg/common/commands/resetoffset/controller/testing"
commontesting "knative.dev/eventing-kafka/pkg/common/testing"
)
Expand Down Expand Up @@ -111,6 +113,18 @@ func mockedNewSaramaClusterAdmin(clusterAdmin sarama.ClusterAdmin, mustFail bool
}
}

func mockedNewSaramaClusterAdminFromClient(clusterAdmin sarama.ClusterAdmin, mustFail bool) func(client sarama.Client) (sarama.ClusterAdmin, error) {
if !mustFail {
return func(client sarama.Client) (sarama.ClusterAdmin, error) {
return clusterAdmin, nil
}
} else {
return func(client sarama.Client) (sarama.ClusterAdmin, error) {
return nil, errors.New("failed")
}
}
}

//------ Tests

type mockConsumerGroupOffsetsChecker struct {
Expand All @@ -130,15 +144,15 @@ func TestErrorPropagationCustomConsumerGroup(t *testing.T) {
// override some functions
newConsumerGroup = mockedNewConsumerGroupFromClient(nil, true, true, false, false)
newSaramaClient = mockedNewSaramaClient(client, false)
newSaramaClusterAdmin = mockedNewSaramaClusterAdmin(clusterAdmin, false)
newClusterAdminFromClient = mockedNewSaramaClusterAdminFromClient(clusterAdmin, false)

factory := kafkaConsumerGroupFactoryImpl{
config: sarama.NewConfig(),
addrs: []string{"b1", "b2"},
offsetsChecker: &mockConsumerGroupOffsetsChecker{},
}

consumerGroup, err := factory.StartConsumerGroup(ctx, "bla", []string{}, nil)
consumerGroup, err := factory.StartConsumerGroup(ctx, "bla", []string{}, nil, types.NamespacedName{})
if err != nil {
t.Errorf("Should not throw error %v", err)
}
Expand Down Expand Up @@ -187,7 +201,7 @@ func TestErrorWhileCreatingNewConsumerGroup(t *testing.T) {
addrs: []string{"b1", "b2"},
offsetsChecker: &mockConsumerGroupOffsetsChecker{},
}
_, err := factory.StartConsumerGroup(ctx, "bla", []string{}, nil)
_, err := factory.StartConsumerGroup(ctx, "bla", []string{}, nil, types.NamespacedName{})

if err == nil || err.Error() != "failed" {
t.Errorf("Should contain an error with message failed. Got %v", err)
Expand All @@ -203,7 +217,7 @@ func TestErrorWhileNewConsumerGroup(t *testing.T) {
addrs: []string{"b1", "b2"},
offsetsChecker: &mockConsumerGroupOffsetsChecker{},
}
consumerGroup, _ := factory.StartConsumerGroup(ctx, "bla", []string{}, nil)
consumerGroup, _ := factory.StartConsumerGroup(ctx, "bla", []string{}, nil, types.NamespacedName{})

consumerGroup.(*customConsumerGroup).cancel() // Stop the consume loop from spinning after the error is generated
err := <-consumerGroup.Errors()
Expand Down
3 changes: 2 additions & 1 deletion pkg/common/consumer/consumer_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/Shopify/sarama"

"go.uber.org/zap"

kafkasarama "knative.dev/eventing-kafka/pkg/common/kafka/sarama"
)

Expand Down Expand Up @@ -99,7 +100,7 @@ func NewConsumerHandler(logger *zap.SugaredLogger, handler KafkaConsumerHandler,

// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *SaramaConsumerHandler) Setup(session sarama.ConsumerGroupSession) error {
consumer.logger.Info("setting up handler")
consumer.logger.Info("setting up handler", zap.Any("claims", session.Claims()))
consumer.lifecycleListener.Setup(session)
return nil
}
Expand Down
Loading

0 comments on commit c807031

Please sign in to comment.