Skip to content
This repository has been archived by the owner on Jun 4, 2021. It is now read-only.

KafkaChannel reconciler to use KafkaChannel v1beta1 API #1407

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions kafka/channel/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ topics.
1. Create the `KafkaChannel` custom objects:

```yaml
apiVersion: messaging.knative.dev/v1alpha1
apiVersion: messaging.knative.dev/v1beta1
kind: KafkaChannel
metadata:
name: my-kafka-channel
Expand Down Expand Up @@ -119,7 +119,7 @@ data:
Then create a KafkaChannel:

```yaml
apiVersion: messaging.knative.dev/v1alpha1
apiVersion: messaging.knative.dev/v1beta1
kind: KafkaChannel
metadata:
name: my-kafka-channel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,24 @@ package v1alpha1

import (
"context"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Format Go code:

Suggested change
"context"
"context"

"knative.dev/eventing/pkg/apis/messaging"

"knative.dev/eventing-contrib/kafka/channel/pkg/utils"
)

func (c *KafkaChannel) SetDefaults(ctx context.Context) {
// Set the duck subscription to the stored version of the duck
// we support. Reason for this is that the stored version will
// not get a chance to get modified, but for newer versions
// conversion webhook will be able to take a crack at it and
// can modify it to match the duck shape.
if c.Annotations == nil {
c.Annotations = make(map[string]string)
}
if _, ok := c.Annotations[messaging.SubscribableDuckVersionAnnotation]; !ok {
c.Annotations[messaging.SubscribableDuckVersionAnnotation] = "v1alpha1"
}

c.Spec.SetDefaults(ctx)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"context"
"testing"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"knative.dev/eventing-contrib/kafka/channel/pkg/utils"

"github.com/google/go-cmp/cmp"
Expand All @@ -35,6 +37,9 @@ func TestKafkaChannelDefaults(t *testing.T) {
"nil spec": {
initial: KafkaChannel{},
expected: KafkaChannel{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{"messaging.knative.dev/subscribable": "v1alpha1"},
},
Spec: KafkaChannelSpec{
NumPartitions: utils.DefaultNumPartitions,
ReplicationFactor: utils.DefaultReplicationFactor,
Expand All @@ -48,6 +53,9 @@ func TestKafkaChannelDefaults(t *testing.T) {
},
},
expected: KafkaChannel{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{"messaging.knative.dev/subscribable": "v1alpha1"},
},
Spec: KafkaChannelSpec{
NumPartitions: utils.DefaultNumPartitions,
ReplicationFactor: testReplicationFactor,
Expand All @@ -61,6 +69,9 @@ func TestKafkaChannelDefaults(t *testing.T) {
},
},
expected: KafkaChannel{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{"messaging.knative.dev/subscribable": "v1alpha1"},
},
Spec: KafkaChannelSpec{
NumPartitions: testNumPartitions,
ReplicationFactor: utils.DefaultReplicationFactor,
Expand Down
91 changes: 44 additions & 47 deletions kafka/channel/pkg/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"errors"
"fmt"
nethttp "net/http"
"net/url"
"strings"
"sync"
"sync/atomic"

Expand All @@ -30,8 +30,8 @@ import (
"go.opencensus.io/trace"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/types"
eventingduck "knative.dev/eventing/pkg/apis/duck/v1beta1"
eventingchannels "knative.dev/eventing/pkg/channel"
"knative.dev/eventing/pkg/channel/fanout"
"knative.dev/eventing/pkg/kncloudevents"

"knative.dev/eventing-contrib/kafka/channel/pkg/utils"
Expand All @@ -49,7 +49,7 @@ type KafkaDispatcher struct {
kafkaAsyncProducer sarama.AsyncProducer
channelSubscriptions map[eventingchannels.ChannelReference][]types.UID
subsConsumerGroups map[types.UID]sarama.ConsumerGroup
subscriptions map[types.UID]subscription
subscriptions map[types.UID]Subscription
// consumerUpdateLock must be used to update kafkaConsumers
consumerUpdateLock sync.Mutex
kafkaConsumerFactory kafka.KafkaConsumerGroupFactory
Expand All @@ -58,6 +58,30 @@ type KafkaDispatcher struct {
logger *zap.Logger
}

type Subscription struct {
UID types.UID
fanout.Subscription
}

func (sub Subscription) String() string {
var s strings.Builder
s.WriteString("UID: " + string(sub.UID))
s.WriteRune('\n')
if sub.Subscriber != nil {
s.WriteString("Subscriber: " + sub.Subscriber.String())
s.WriteRune('\n')
}
if sub.Reply != nil {
s.WriteString("Reply: " + sub.Reply.String())
s.WriteRune('\n')
}
if sub.DeadLetter != nil {
s.WriteString("DeadLetter: " + sub.DeadLetter.String())
s.WriteRune('\n')
}
return s.String()
}

func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispatcher, error) {
conf := sarama.NewConfig()
conf.Version = sarama.V2_0_0_0
Expand All @@ -74,7 +98,7 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat
kafkaConsumerFactory: kafka.NewConsumerGroupFactory(args.Brokers, conf),
channelSubscriptions: make(map[eventingchannels.ChannelReference][]types.UID),
subsConsumerGroups: make(map[types.UID]sarama.ConsumerGroup),
subscriptions: make(map[types.UID]subscription),
subscriptions: make(map[types.UID]Subscription),
kafkaAsyncProducer: producer,
logger: args.Logger,
topicFunc: args.TopicFunc,
Expand Down Expand Up @@ -119,7 +143,7 @@ type KafkaDispatcherArgs struct {

type consumerMessageHandler struct {
logger *zap.Logger
sub subscription
sub Subscription
dispatcher *eventingchannels.MessageDispatcherImpl
}

Expand All @@ -139,26 +163,19 @@ func (c consumerMessageHandler) Handle(ctx context.Context, consumerMessage *sar

c.logger.Debug("Going to dispatch the message",
zap.String("topic", consumerMessage.Topic),
zap.Any("destination", c.sub.SubscriberURI),
zap.Any("reply", c.sub.ReplyURI),
zap.Any("delivery", c.sub.Delivery),
zap.String("subscription", c.sub.String()),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, was that the reason of the issues ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, but it was causing other issues in logging. The reason of issues was the previous subscription type used by the dispatcher

)

ctx, span := startTraceFromMessage(c.logger, ctx, message, consumerMessage.Topic)
defer span.End()

var DLS *url.URL
if c.sub.Delivery != nil && c.sub.Delivery.DeadLetterSink != nil && c.sub.Delivery.DeadLetterSink.URI != nil {
DLS = (*url.URL)(c.sub.Delivery.DeadLetterSink.URI)
}

err := c.dispatcher.DispatchMessageWithRetries(
ctx,
message,
nil,
(*url.URL)(c.sub.SubscriberURI),
(*url.URL)(c.sub.ReplyURI),
DLS,
c.sub.Subscriber,
c.sub.Reply,
c.sub.DeadLetter,
c.sub.RetryConfig,
)

Expand All @@ -180,19 +197,8 @@ type ChannelConfig struct {
Subscriptions []Subscription
}

type Subscription struct {
eventingduck.SubscriberSpec
RetryConfig *kncloudevents.RetryConfig
}

type subscription struct {
Subscription
Namespace string
Name string
}

// UpdateKafkaConsumers will be called by new CRD based kafka channel dispatcher controller.
func (d *KafkaDispatcher) UpdateKafkaConsumers(config *Config) (map[eventingduck.SubscriberSpec]error, error) {
func (d *KafkaDispatcher) UpdateKafkaConsumers(config *Config) (map[types.UID]error, error) {
if config == nil {
return nil, fmt.Errorf("nil config")
}
Expand All @@ -201,29 +207,28 @@ func (d *KafkaDispatcher) UpdateKafkaConsumers(config *Config) (map[eventingduck
defer d.consumerUpdateLock.Unlock()

var newSubs []types.UID
failedToSubscribe := make(map[eventingduck.SubscriberSpec]error)
failedToSubscribe := make(map[types.UID]error)
for _, cc := range config.ChannelConfigs {
channelRef := eventingchannels.ChannelReference{
Name: cc.Name,
Namespace: cc.Namespace,
}
for _, subSpec := range cc.Subscriptions {
sub := newSubscription(subSpec, string(subSpec.UID), cc.Namespace)
newSubs = append(newSubs, sub.UID)
newSubs = append(newSubs, subSpec.UID)

// Check if sub already exists
exists := false
for _, s := range d.channelSubscriptions[channelRef] {
if s == sub.UID {
if s == subSpec.UID {
exists = true
}
}

if !exists {
// only subscribe when not exists in channel-subscriptions map
// do not need to resubscribe every time channel fanout config is updated
if err := d.subscribe(channelRef, sub); err != nil {
failedToSubscribe[subSpec.SubscriberSpec] = err
if err := d.subscribe(channelRef, subSpec); err != nil {
failedToSubscribe[subSpec.UID] = err
}
}
}
Expand Down Expand Up @@ -316,11 +321,11 @@ func (d *KafkaDispatcher) Start(ctx context.Context) error {

// subscribe reads kafkaConsumers which gets updated in UpdateConfig in a separate go-routine.
// subscribe must be called under updateLock.
func (d *KafkaDispatcher) subscribe(channelRef eventingchannels.ChannelReference, sub subscription) error {
d.logger.Info("Subscribing", zap.Any("channelRef", channelRef), zap.Any("subscription", sub))
func (d *KafkaDispatcher) subscribe(channelRef eventingchannels.ChannelReference, sub Subscription) error {
d.logger.Info("Subscribing", zap.Any("channelRef", channelRef), zap.Any("subscription", sub.UID))

topicName := d.topicFunc(utils.KafkaChannelSeparator, channelRef.Namespace, channelRef.Name)
groupID := fmt.Sprintf("kafka.%s.%s.%s", sub.Namespace, channelRef.Name, sub.Name)
groupID := fmt.Sprintf("kafka.%s.%s.%s", channelRef.Namespace, channelRef.Name, string(sub.UID))

handler := &consumerMessageHandler{d.logger, sub, d.dispatcher}

Expand Down Expand Up @@ -349,8 +354,8 @@ func (d *KafkaDispatcher) subscribe(channelRef eventingchannels.ChannelReference

// unsubscribe reads kafkaConsumers which gets updated in UpdateConfig in a separate go-routine.
// unsubscribe must be called under updateLock.
func (d *KafkaDispatcher) unsubscribe(channel eventingchannels.ChannelReference, sub subscription) error {
d.logger.Info("Unsubscribing from channel", zap.Any("channel", channel), zap.Any("subscription", sub))
func (d *KafkaDispatcher) unsubscribe(channel eventingchannels.ChannelReference, sub Subscription) error {
d.logger.Info("Unsubscribing from channel", zap.Any("channel", channel), zap.String("subscription", sub.String()))
delete(d.subscriptions, sub.UID)
if subsSlice, ok := d.channelSubscriptions[channel]; ok {
var newSlice []types.UID
Expand Down Expand Up @@ -385,14 +390,6 @@ func (d *KafkaDispatcher) getChannelReferenceFromHost(host string) (eventingchan
return cr, nil
}

func newSubscription(sub Subscription, name string, namespace string) subscription {
return subscription{
Subscription: sub,
Name: name,
Namespace: namespace,
}
}

func startTraceFromMessage(logger *zap.Logger, inCtx context.Context, message *protocolkafka.Message, topic string) (context.Context, *trace.Span) {
sc, ok := parseSpanContext(message.Headers)
if !ok {
Expand Down
38 changes: 16 additions & 22 deletions kafka/channel/pkg/dispatcher/dispatcher_it_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net/http"
"net/http/httptest"
"net/http/httputil"
"net/url"
"os"
"sync"
"testing"
Expand All @@ -31,12 +32,10 @@ import (
protocolhttp "github.com/cloudevents/sdk-go/v2/protocol/http"
"github.com/cloudevents/sdk-go/v2/test"
"go.uber.org/zap"
eventingduck "knative.dev/eventing/pkg/apis/duck/v1beta1"
"knative.dev/eventing/pkg/channel/fanout"
"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/eventing/pkg/tracing"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"

tracingconfig "knative.dev/pkg/tracing/config"

"knative.dev/eventing-contrib/kafka/channel/pkg/utils"
Expand Down Expand Up @@ -158,22 +157,18 @@ func TestDispatcher(t *testing.T) {
HostName: "channela.svc",
Subscriptions: []Subscription{
{
SubscriberSpec: eventingduck.SubscriberSpec{
UID: "aaaa",
Generation: 1,
SubscriberURI: mustParseUrl(t, transformationsServer.URL),
ReplyURI: mustParseUrl(t, channelBProxy.URL),
UID: "aaaa",
Subscription: fanout.Subscription{
Subscriber: mustParseUrl(t, transformationsServer.URL),
Reply: mustParseUrl(t, channelBProxy.URL),
},
},
{
SubscriberSpec: eventingduck.SubscriberSpec{
UID: "cccc",
Generation: 1,
SubscriberURI: mustParseUrl(t, transformationsFailureServer.URL),
ReplyURI: mustParseUrl(t, channelBProxy.URL),
Delivery: &eventingduck.DeliverySpec{
DeadLetterSink: &duckv1.Destination{URI: mustParseUrl(t, deadLetterServer.URL)},
},
UID: "cccc",
Subscription: fanout.Subscription{
Subscriber: mustParseUrl(t, transformationsFailureServer.URL),
Reply: mustParseUrl(t, channelBProxy.URL),
DeadLetter: mustParseUrl(t, deadLetterServer.URL),
},
},
},
Expand All @@ -184,10 +179,9 @@ func TestDispatcher(t *testing.T) {
HostName: "channelb.svc",
Subscriptions: []Subscription{
{
SubscriberSpec: eventingduck.SubscriberSpec{
UID: "bbbb",
Generation: 1,
SubscriberURI: mustParseUrl(t, receiverServer.URL),
UID: "bbbb",
Subscription: fanout.Subscription{
Subscriber: mustParseUrl(t, receiverServer.URL),
},
},
},
Expand Down Expand Up @@ -264,10 +258,10 @@ func createReverseProxy(t *testing.T, host string) *httputil.ReverseProxy {
return &httputil.ReverseProxy{Director: director}
}

func mustParseUrl(t *testing.T, str string) *apis.URL {
func mustParseUrl(t *testing.T, str string) *url.URL {
url, err := apis.ParseURL(str)
if err != nil {
t.Fatal(err)
}
return url
return url.URL()
}
2 changes: 1 addition & 1 deletion kafka/channel/pkg/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"github.com/google/go-cmp/cmp/cmpopts"
"go.uber.org/zap"

eventingduck "knative.dev/eventing/pkg/apis/duck/v1beta1"
eventingduck "knative.dev/eventing/pkg/apis/duck/v1"
eventingchannels "knative.dev/eventing/pkg/channel"
_ "knative.dev/pkg/system/testing"

Expand Down
4 changes: 2 additions & 2 deletions kafka/channel/pkg/reconciler/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ import (
"knative.dev/pkg/system"

kafkaChannelClient "knative.dev/eventing-contrib/kafka/channel/pkg/client/injection/client"
"knative.dev/eventing-contrib/kafka/channel/pkg/client/injection/informers/messaging/v1alpha1/kafkachannel"
kafkaChannelReconciler "knative.dev/eventing-contrib/kafka/channel/pkg/client/injection/reconciler/messaging/v1alpha1/kafkachannel"
"knative.dev/eventing-contrib/kafka/channel/pkg/client/injection/informers/messaging/v1beta1/kafkachannel"
kafkaChannelReconciler "knative.dev/eventing-contrib/kafka/channel/pkg/client/injection/reconciler/messaging/v1beta1/kafkachannel"
eventingClient "knative.dev/eventing/pkg/client/injection/client"
)

Expand Down
Loading