Skip to content
This repository has been archived by the owner on Sep 2, 2024. It is now read-only.

[main] KafkaChannel to init offsets before dispatcher #911

Merged
merged 7 commits into from
Oct 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/channel/distributed/dispatcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func main() {
MetricsRegistry: ekConfig.Sarama.Config.MetricRegistry,
SaramaConfig: ekConfig.Sarama.Config,
}
dispatcher, managerEvents := dispatch.NewDispatcher(dispatcherConfig, controlProtocolServer)
dispatcher, managerEvents := dispatch.NewDispatcher(dispatcherConfig, controlProtocolServer, func(ref types.NamespacedName) {})

// Create KafkaChannel Informer
kafkaClient := kafkaclientset.NewForConfigOrDie(k8sConfig)
Expand Down
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ require (
github.com/stretchr/testify v1.7.0
github.com/xdg-go/scram v1.0.2
go.opencensus.io v0.23.0
go.uber.org/atomic v1.9.0
go.uber.org/multierr v1.6.0
go.uber.org/zap v1.19.1
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
Expand All @@ -37,7 +36,6 @@ require (
knative.dev/control-protocol v0.0.0-20210929151023-4b73baddb22a
knative.dev/eventing v0.26.1-0.20210930102845-5b9ac948cefc
knative.dev/hack v0.0.0-20210806075220-815cd312d65c
knative.dev/networking v0.0.0-20210929162523-749575ef53f8
knative.dev/pkg v0.0.0-20210929111822-2267a4cbebb8
knative.dev/reconciler-test v0.0.0-20210930064245-45904ca4383d
)
3 changes: 0 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,6 @@ github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6So
github.com/rogpeppe/fastuuid v1.2.0 h1:Ppwyp6VYCF1nvBTXL3trRso7mXMlRrw9ooo375wvi2s=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rs/dnscache v0.0.0-20210201191234-295bba877686/go.mod h1:qe5TWALJ8/a1Lqznoc5BDHpYX/8HU60Hm2AwRmqzxqA=
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
Expand Down Expand Up @@ -1306,8 +1305,6 @@ knative.dev/eventing v0.26.1-0.20210930102845-5b9ac948cefc/go.mod h1:HwxfiCneWF5
knative.dev/hack v0.0.0-20210806075220-815cd312d65c h1:nOXoDWAAItwr4o0dp3nHr6skgpVD4IvME/UX84YNl5k=
knative.dev/hack v0.0.0-20210806075220-815cd312d65c/go.mod h1:PHt8x8yX5Z9pPquBEfIj0X66f8iWkWfR0S/sarACJrI=
knative.dev/hack/schema v0.0.0-20210806075220-815cd312d65c/go.mod h1:ffjwmdcrH5vN3mPhO8RrF2KfNnbHeCE2C60A+2cv3U0=
knative.dev/networking v0.0.0-20210929162523-749575ef53f8 h1:1VblI2O16leI1bFP4HitfuIRE8SIuPwqDAput/SBh/g=
knative.dev/networking v0.0.0-20210929162523-749575ef53f8/go.mod h1:IQMjYdQwtOsqJ+fQDs8MZGB/cPxXdMc4bpP7fdtXp2Q=
knative.dev/pkg v0.0.0-20210914164111-4857ab6939e3/go.mod h1:jMSqkNMsrzuy+XR4Yr/BMy7SDVbUOl3KKB6+5MR+ZU8=
knative.dev/pkg v0.0.0-20210929111822-2267a4cbebb8 h1:Ut2Z22rH5WfybI5hjEfi9fDU7rK6kIduWpSAgoEXh7M=
knative.dev/pkg v0.0.0-20210929111822-2267a4cbebb8/go.mod h1:r27D20afKNeK+9aNOg+0qMv8JgQcyeP+CAYQIR1jEQY=
Expand Down
33 changes: 11 additions & 22 deletions pkg/channel/consolidated/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package dispatcher
import (
"context"
"fmt"
nethttp "net/http"
"strings"
"sync"

Expand All @@ -29,24 +30,19 @@ import (
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"knative.dev/eventing-kafka/pkg/common/config"
"knative.dev/pkg/logging"

"knative.dev/eventing-kafka/pkg/common/config"

eventingchannels "knative.dev/eventing/pkg/channel"
"knative.dev/pkg/kmeta"

nethttp "net/http"

"knative.dev/eventing-kafka/pkg/channel/consolidated/utils"
"knative.dev/eventing-kafka/pkg/channel/distributed/common/env"
"knative.dev/eventing-kafka/pkg/common/consumer"
"knative.dev/eventing-kafka/pkg/common/tracing"
)

const (
dispatcherReadySubHeader = "K-Subscriber-Status"
)

type TopicFunc func(separator, namespace, name string) string

type KafkaDispatcherArgs struct {
Expand Down Expand Up @@ -77,7 +73,9 @@ type KafkaDispatcher struct {
logger *zap.SugaredLogger
}

func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispatcher, error) {
// NewDispatcher creates a new dispatcher struct. enqueue argument is a function that is used to
// requeue a KafkaChannel instance via the reconciler which is used when creating the consumer.
func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs, enqueue func(ref types.NamespacedName)) (*KafkaDispatcher, error) {

producer, err := sarama.NewSyncProducer(args.Brokers, args.Config.Sarama.Config)
if err != nil {
Expand All @@ -86,7 +84,7 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat

dispatcher := &KafkaDispatcher{
dispatcher: eventingchannels.NewMessageDispatcher(logging.FromContext(ctx).Desugar()),
kafkaConsumerFactory: consumer.NewConsumerGroupFactory(args.Brokers, args.Config.Sarama.Config),
kafkaConsumerFactory: consumer.NewConsumerGroupFactory(args.Brokers, args.Config.Sarama.Config, &consumer.KafkaConsumerGroupOffsetsChecker{}, enqueue),
channelSubscriptions: make(map[types.NamespacedName]*KafkaSubscription),
subsConsumerGroups: make(map[types.UID]sarama.ConsumerGroup),
subscriptions: make(map[types.UID]Subscription),
Expand All @@ -95,15 +93,6 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat
topicFunc: args.TopicFunc,
}

// initialize and start the subscription endpoint server
subscriptionEndpoint := &subscriptionEndpoint{
dispatcher: dispatcher,
logger: logging.FromContext(ctx),
}
go func() {
subscriptionEndpoint.start()
}()

podName, err := env.GetRequiredConfigValue(logging.FromContext(ctx).Desugar(), env.PodNameEnvVarKey)
if err != nil {
return nil, err
Expand Down Expand Up @@ -171,7 +160,7 @@ func (k UpdateError) Error() string {
}

// ReconcileConsumers will be called by new CRD based kafka channel dispatcher controller.
func (d *KafkaDispatcher) ReconcileConsumers(config *ChannelConfig) error {
func (d *KafkaDispatcher) ReconcileConsumers(ctx context.Context, config *ChannelConfig) error {
channelNamespacedName := types.NamespacedName{
Namespace: config.Namespace,
Name: config.Name,
Expand Down Expand Up @@ -215,7 +204,7 @@ func (d *KafkaDispatcher) ReconcileConsumers(config *ChannelConfig) error {

failedToSubscribe := make(UpdateError)
for subUid, subSpec := range toAddSubs {
if err := d.subscribe(channelNamespacedName, subSpec); err != nil {
if err := d.subscribe(ctx, channelNamespacedName, subSpec); err != nil {
failedToSubscribe[subUid] = err
}
}
Expand Down Expand Up @@ -286,7 +275,7 @@ func (d *KafkaDispatcher) CleanupChannel(name, namespace, hostname string) error

// subscribe reads kafkaConsumers which gets updated in UpdateConfig in a separate go-routine.
// subscribe must be called under updateLock.
func (d *KafkaDispatcher) subscribe(channelRef types.NamespacedName, sub Subscription) error {
func (d *KafkaDispatcher) subscribe(ctx context.Context, channelRef types.NamespacedName, sub Subscription) error {
d.logger.Infow("Subscribing to Kafka Channel", zap.Any("channelRef", channelRef), zap.Any("subscription", sub.UID))

topicName := d.topicFunc(utils.KafkaChannelSeparator, channelRef.Namespace, channelRef.Name)
Expand All @@ -310,7 +299,7 @@ func (d *KafkaDispatcher) subscribe(channelRef types.NamespacedName, sub Subscri
}
d.logger.Debugw("Starting consumer group", zap.Any("channelRef", channelRef),
zap.Any("subscription", sub.UID), zap.String("topic", topicName), zap.String("consumer group", groupID))
consumerGroup, err := d.kafkaConsumerFactory.StartConsumerGroup(groupID, []string{topicName}, d.logger, handler)
consumerGroup, err := d.kafkaConsumerFactory.StartConsumerGroup(ctx, groupID, []string{topicName}, handler, channelRef)

if err != nil {
// we can not create a consumer - logging that, with reason
Expand Down
9 changes: 6 additions & 3 deletions pkg/channel/consolidated/dispatcher/dispatcher_it_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/cloudevents/sdk-go/v2/test"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/types"
"knative.dev/eventing/pkg/channel/fanout"
"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/pkg/apis"
Expand All @@ -55,6 +56,8 @@ func TestDispatcher(t *testing.T) {
t.Skipf("This test can't run in CI")
}

ctx := context.TODO()

logger, err := zap.NewDevelopment(zap.AddStacktrace(zap.WarnLevel))
if err != nil {
t.Fatal(err)
Expand All @@ -80,7 +83,7 @@ func TestDispatcher(t *testing.T) {
}

// Create the dispatcher. At this point, if Kafka is not up, this thing fails
dispatcher, err := NewDispatcher(context.Background(), &dispatcherArgs)
dispatcher, err := NewDispatcher(context.Background(), &dispatcherArgs, func(ref types.NamespacedName) {})
if err != nil {
t.Skipf("no dispatcher: %v", err)
}
Expand Down Expand Up @@ -179,7 +182,7 @@ func TestDispatcher(t *testing.T) {
},
}
require.NoError(t, dispatcher.RegisterChannelHost(channelAConfig))
require.NoError(t, dispatcher.ReconcileConsumers(channelAConfig))
require.NoError(t, dispatcher.ReconcileConsumers(ctx, channelAConfig))

channelBConfig := &ChannelConfig{
Namespace: "default",
Expand All @@ -195,7 +198,7 @@ func TestDispatcher(t *testing.T) {
},
}
require.NoError(t, dispatcher.RegisterChannelHost(channelBConfig))
require.NoError(t, dispatcher.ReconcileConsumers(channelBConfig))
require.NoError(t, dispatcher.ReconcileConsumers(ctx, channelBConfig))

time.Sleep(5 * time.Second)

Expand Down
27 changes: 18 additions & 9 deletions pkg/channel/consolidated/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"

"knative.dev/eventing-kafka/pkg/channel/consolidated/utils"
"knative.dev/eventing-kafka/pkg/common/consumer"
eventingchannels "knative.dev/eventing/pkg/channel"
"knative.dev/eventing/pkg/channel/fanout"
klogtesting "knative.dev/pkg/logging/testing"
_ "knative.dev/pkg/system/testing"

"knative.dev/eventing-kafka/pkg/channel/consolidated/utils"
"knative.dev/eventing-kafka/pkg/common/consumer"
)

// ----- Mocks
Expand All @@ -50,7 +51,7 @@ type mockKafkaConsumerFactory struct {
createErr bool
}

func (c mockKafkaConsumerFactory) StartConsumerGroup(groupID string, topics []string, logger *zap.SugaredLogger, handler consumer.KafkaConsumerHandler, options ...consumer.SaramaConsumerHandlerOption) (sarama.ConsumerGroup, error) {
func (c mockKafkaConsumerFactory) StartConsumerGroup(ctx context.Context, groupID string, topics []string, handler consumer.KafkaConsumerHandler, ref types.NamespacedName, options ...consumer.SaramaConsumerHandlerOption) (sarama.ConsumerGroup, error) {
if c.createErr {
return nil, errors.New("error creating consumer")
}
Expand Down Expand Up @@ -267,9 +268,11 @@ func TestDispatcher_UpdateConsumers(t *testing.T) {
logger: zaptest.NewLogger(t).Sugar(),
}

ctx := context.TODO()

// Initialize using oldConfig
require.NoError(t, d.RegisterChannelHost(tc.oldConfig))
require.NoError(t, d.ReconcileConsumers(tc.oldConfig))
require.NoError(t, d.ReconcileConsumers(ctx, tc.oldConfig))

oldSubscribers := sets.NewString()
for _, sub := range d.subscriptions {
Expand All @@ -283,7 +286,7 @@ func TestDispatcher_UpdateConsumers(t *testing.T) {
}

// Update with new config
err := d.ReconcileConsumers(tc.newConfig)
err := d.ReconcileConsumers(ctx, tc.newConfig)
if tc.createErr != "" {
if err == nil {
t.Errorf("Expected UpdateConfig error: '%v'. Actual nil", tc.createErr)
Expand Down Expand Up @@ -363,6 +366,8 @@ func TestDispatcher_MultipleChannelsInParallel(t *testing.T) {
logger: zaptest.NewLogger(t).Sugar(),
}

ctx := context.TODO()

// Let's register channel configs first
for _, c := range configs {
require.NoError(t, d.RegisterChannelHost(c))
Expand All @@ -374,7 +379,7 @@ func TestDispatcher_MultipleChannelsInParallel(t *testing.T) {
wg.Add(1)
go func(c *ChannelConfig) {
defer wg.Done()
assert.NoError(t, d.ReconcileConsumers(c))
assert.NoError(t, d.ReconcileConsumers(ctx, c))
}(c)
}
}
Expand Down Expand Up @@ -418,6 +423,8 @@ func TestKafkaDispatcher_CleanupChannel(t *testing.T) {
logger: zaptest.NewLogger(t).Sugar(),
}

ctx := context.TODO()

channelConfig := &ChannelConfig{
Namespace: "default",
Name: "test-channel",
Expand All @@ -438,7 +445,7 @@ func TestKafkaDispatcher_CleanupChannel(t *testing.T) {
},
}
require.NoError(t, d.RegisterChannelHost(channelConfig))
require.NoError(t, d.ReconcileConsumers(channelConfig))
require.NoError(t, d.ReconcileConsumers(ctx, channelConfig))

require.NoError(t, d.CleanupChannel(channelConfig.Name, channelConfig.Namespace, channelConfig.HostName))
require.NotContains(t, d.subscriptions, "subscription-1")
Expand All @@ -461,6 +468,8 @@ func TestSubscribeError(t *testing.T) {
channelSubscriptions: map[types.NamespacedName]*KafkaSubscription{},
}

ctx := context.TODO()

channelRef := types.NamespacedName{
Name: "test-channel",
Namespace: "test-ns",
Expand All @@ -470,7 +479,7 @@ func TestSubscribeError(t *testing.T) {
UID: "test-sub",
Subscription: fanout.Subscription{},
}
err := d.subscribe(channelRef, subRef)
err := d.subscribe(ctx, channelRef, subRef)
if err == nil {
t.Errorf("Expected error want %s, got %s", "error creating consumer", err)
}
Expand Down Expand Up @@ -511,7 +520,7 @@ func TestNewDispatcher(t *testing.T) {
Brokers: []string{"localhost:10000"},
TopicFunc: utils.TopicName,
}
_, err := NewDispatcher(context.TODO(), args)
_, err := NewDispatcher(context.TODO(), args, func(ref types.NamespacedName) {})
if err == nil {
t.Errorf("Expected error want %s, got %s", "message receiver is not set", err)
}
Expand Down
60 changes: 0 additions & 60 deletions pkg/channel/consolidated/dispatcher/subscription_endpoint.go

This file was deleted.

Loading