Skip to content
This repository has been archived by the owner on Sep 2, 2024. It is now read-only.

Commit

Permalink
Fix Kafka channel event loss during subscription becoming ready
Browse files Browse the repository at this point in the history
  • Loading branch information
aliok committed Oct 1, 2021
1 parent 77e7cdf commit 65c72b0
Show file tree
Hide file tree
Showing 30 changed files with 727 additions and 2,160 deletions.
23 changes: 5 additions & 18 deletions pkg/channel/consolidated/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ import (
"knative.dev/eventing-kafka/pkg/common/tracing"
)

const (
dispatcherReadySubHeader = "K-Subscriber-Status"
)

type TopicFunc func(separator, namespace, name string) string

type KafkaDispatcherArgs struct {
Expand Down Expand Up @@ -86,7 +82,7 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat

dispatcher := &KafkaDispatcher{
dispatcher: eventingchannels.NewMessageDispatcher(logging.FromContext(ctx).Desugar()),
kafkaConsumerFactory: consumer.NewConsumerGroupFactory(args.Brokers, args.Config.Sarama.Config),
kafkaConsumerFactory: consumer.NewConsumerGroupFactory(args.Brokers, args.Config.Sarama.Config, &consumer.KafkaConsumerGroupOffsetsChecker{}),
channelSubscriptions: make(map[types.NamespacedName]*KafkaSubscription),
subsConsumerGroups: make(map[types.UID]sarama.ConsumerGroup),
subscriptions: make(map[types.UID]Subscription),
Expand All @@ -95,15 +91,6 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat
topicFunc: args.TopicFunc,
}

// initialize and start the subscription endpoint server
subscriptionEndpoint := &subscriptionEndpoint{
dispatcher: dispatcher,
logger: logging.FromContext(ctx),
}
go func() {
subscriptionEndpoint.start()
}()

podName, err := env.GetRequiredConfigValue(logging.FromContext(ctx).Desugar(), env.PodNameEnvVarKey)
if err != nil {
return nil, err
Expand Down Expand Up @@ -171,7 +158,7 @@ func (k UpdateError) Error() string {
}

// ReconcileConsumers will be called by new CRD based kafka channel dispatcher controller.
func (d *KafkaDispatcher) ReconcileConsumers(config *ChannelConfig) error {
func (d *KafkaDispatcher) ReconcileConsumers(ctx context.Context, config *ChannelConfig) error {
channelNamespacedName := types.NamespacedName{
Namespace: config.Namespace,
Name: config.Name,
Expand Down Expand Up @@ -215,7 +202,7 @@ func (d *KafkaDispatcher) ReconcileConsumers(config *ChannelConfig) error {

failedToSubscribe := make(UpdateError)
for subUid, subSpec := range toAddSubs {
if err := d.subscribe(channelNamespacedName, subSpec); err != nil {
if err := d.subscribe(ctx, channelNamespacedName, subSpec); err != nil {
failedToSubscribe[subUid] = err
}
}
Expand Down Expand Up @@ -286,7 +273,7 @@ func (d *KafkaDispatcher) CleanupChannel(name, namespace, hostname string) error

// subscribe reads kafkaConsumers which gets updated in UpdateConfig in a separate go-routine.
// subscribe must be called under updateLock.
func (d *KafkaDispatcher) subscribe(channelRef types.NamespacedName, sub Subscription) error {
func (d *KafkaDispatcher) subscribe(ctx context.Context, channelRef types.NamespacedName, sub Subscription) error {
d.logger.Infow("Subscribing to Kafka Channel", zap.Any("channelRef", channelRef), zap.Any("subscription", sub.UID))

topicName := d.topicFunc(utils.KafkaChannelSeparator, channelRef.Namespace, channelRef.Name)
Expand All @@ -310,7 +297,7 @@ func (d *KafkaDispatcher) subscribe(channelRef types.NamespacedName, sub Subscri
}
d.logger.Debugw("Starting consumer group", zap.Any("channelRef", channelRef),
zap.Any("subscription", sub.UID), zap.String("topic", topicName), zap.String("consumer group", groupID))
consumerGroup, err := d.kafkaConsumerFactory.StartConsumerGroup(groupID, []string{topicName}, d.logger, handler)
consumerGroup, err := d.kafkaConsumerFactory.StartConsumerGroup(ctx, groupID, []string{topicName}, handler)

if err != nil {
// we can not create a consumer - logging that, with reason
Expand Down
6 changes: 4 additions & 2 deletions pkg/channel/consolidated/dispatcher/dispatcher_it_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ func TestDispatcher(t *testing.T) {
t.Skipf("This test can't run in CI")
}

ctx := context.TODO()

logger, err := zap.NewDevelopment(zap.AddStacktrace(zap.WarnLevel))
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -179,7 +181,7 @@ func TestDispatcher(t *testing.T) {
},
}
require.NoError(t, dispatcher.RegisterChannelHost(channelAConfig))
require.NoError(t, dispatcher.ReconcileConsumers(channelAConfig))
require.NoError(t, dispatcher.ReconcileConsumers(ctx, channelAConfig))

channelBConfig := &ChannelConfig{
Namespace: "default",
Expand All @@ -195,7 +197,7 @@ func TestDispatcher(t *testing.T) {
},
}
require.NoError(t, dispatcher.RegisterChannelHost(channelBConfig))
require.NoError(t, dispatcher.ReconcileConsumers(channelBConfig))
require.NoError(t, dispatcher.ReconcileConsumers(ctx, channelBConfig))

time.Sleep(5 * time.Second)

Expand Down
20 changes: 14 additions & 6 deletions pkg/channel/consolidated/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type mockKafkaConsumerFactory struct {
createErr bool
}

func (c mockKafkaConsumerFactory) StartConsumerGroup(groupID string, topics []string, logger *zap.SugaredLogger, handler consumer.KafkaConsumerHandler, options ...consumer.SaramaConsumerHandlerOption) (sarama.ConsumerGroup, error) {
func (c mockKafkaConsumerFactory) StartConsumerGroup(ctx context.Context, groupID string, topics []string, handler consumer.KafkaConsumerHandler, options ...consumer.SaramaConsumerHandlerOption) (sarama.ConsumerGroup, error) {
if c.createErr {
return nil, errors.New("error creating consumer")
}
Expand Down Expand Up @@ -267,9 +267,11 @@ func TestDispatcher_UpdateConsumers(t *testing.T) {
logger: zaptest.NewLogger(t).Sugar(),
}

ctx := context.TODO()

// Initialize using oldConfig
require.NoError(t, d.RegisterChannelHost(tc.oldConfig))
require.NoError(t, d.ReconcileConsumers(tc.oldConfig))
require.NoError(t, d.ReconcileConsumers(ctx, tc.oldConfig))

oldSubscribers := sets.NewString()
for _, sub := range d.subscriptions {
Expand All @@ -283,7 +285,7 @@ func TestDispatcher_UpdateConsumers(t *testing.T) {
}

// Update with new config
err := d.ReconcileConsumers(tc.newConfig)
err := d.ReconcileConsumers(ctx, tc.newConfig)
if tc.createErr != "" {
if err == nil {
t.Errorf("Expected UpdateConfig error: '%v'. Actual nil", tc.createErr)
Expand Down Expand Up @@ -363,6 +365,8 @@ func TestDispatcher_MultipleChannelsInParallel(t *testing.T) {
logger: zaptest.NewLogger(t).Sugar(),
}

ctx := context.TODO()

// Let's register channel configs first
for _, c := range configs {
require.NoError(t, d.RegisterChannelHost(c))
Expand All @@ -374,7 +378,7 @@ func TestDispatcher_MultipleChannelsInParallel(t *testing.T) {
wg.Add(1)
go func(c *ChannelConfig) {
defer wg.Done()
assert.NoError(t, d.ReconcileConsumers(c))
assert.NoError(t, d.ReconcileConsumers(ctx, c))
}(c)
}
}
Expand Down Expand Up @@ -418,6 +422,8 @@ func TestKafkaDispatcher_CleanupChannel(t *testing.T) {
logger: zaptest.NewLogger(t).Sugar(),
}

ctx := context.TODO()

channelConfig := &ChannelConfig{
Namespace: "default",
Name: "test-channel",
Expand All @@ -438,7 +444,7 @@ func TestKafkaDispatcher_CleanupChannel(t *testing.T) {
},
}
require.NoError(t, d.RegisterChannelHost(channelConfig))
require.NoError(t, d.ReconcileConsumers(channelConfig))
require.NoError(t, d.ReconcileConsumers(ctx, channelConfig))

require.NoError(t, d.CleanupChannel(channelConfig.Name, channelConfig.Namespace, channelConfig.HostName))
require.NotContains(t, d.subscriptions, "subscription-1")
Expand All @@ -461,6 +467,8 @@ func TestSubscribeError(t *testing.T) {
channelSubscriptions: map[types.NamespacedName]*KafkaSubscription{},
}

ctx := context.TODO()

channelRef := types.NamespacedName{
Name: "test-channel",
Namespace: "test-ns",
Expand All @@ -470,7 +478,7 @@ func TestSubscribeError(t *testing.T) {
UID: "test-sub",
Subscription: fanout.Subscription{},
}
err := d.subscribe(channelRef, subRef)
err := d.subscribe(ctx, channelRef, subRef)
if err == nil {
t.Errorf("Expected error want %s, got %s", "error creating consumer", err)
}
Expand Down
60 changes: 0 additions & 60 deletions pkg/channel/consolidated/dispatcher/subscription_endpoint.go

This file was deleted.

Loading

0 comments on commit 65c72b0

Please sign in to comment.