From 86d0ee2a38f26bd7a19e46e50e3a1e6ea0cae4b4 Mon Sep 17 00:00:00 2001 From: Lukas Berk Date: Fri, 22 Jan 2021 15:02:42 -0500 Subject: [PATCH 01/24] Groundwork for dispatcher subscriber status w/ consumers --- .../consolidated/dispatcher/dispatcher.go | 38 ++++++++++++++----- 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/pkg/channel/consolidated/dispatcher/dispatcher.go b/pkg/channel/consolidated/dispatcher/dispatcher.go index 5bf3843154..711afcd46b 100644 --- a/pkg/channel/consolidated/dispatcher/dispatcher.go +++ b/pkg/channel/consolidated/dispatcher/dispatcher.go @@ -24,6 +24,8 @@ import ( "sync" "sync/atomic" + "k8s.io/apimachinery/pkg/util/sets" + "knative.dev/eventing-kafka/pkg/common/client" "knative.dev/eventing-kafka/pkg/common/tracing" @@ -44,6 +46,14 @@ import ( "knative.dev/eventing-kafka/pkg/common/consumer" ) +type ConsumerCallback func() + +type KafkaSubscription struct { + subs []types.UID + channelReadySubscriptions sets.String + consumerWatchers []ConsumerCallback +} + type KafkaDispatcher struct { hostToChannelMap atomic.Value // hostToChannelMapLock is used to update hostToChannelMap @@ -53,7 +63,7 @@ type KafkaDispatcher struct { dispatcher *eventingchannels.MessageDispatcherImpl kafkaSyncProducer sarama.SyncProducer - channelSubscriptions map[eventingchannels.ChannelReference][]types.UID + channelSubscriptions map[eventingchannels.ChannelReference]*KafkaSubscription subsConsumerGroups map[types.UID]sarama.ConsumerGroup subscriptions map[types.UID]Subscription // consumerUpdateLock must be used to update kafkaConsumers @@ -114,7 +124,7 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat dispatcher := &KafkaDispatcher{ dispatcher: eventingchannels.NewMessageDispatcher(args.Logger.Desugar()), kafkaConsumerFactory: consumer.NewConsumerGroupFactory(args.Brokers, conf), - channelSubscriptions: make(map[eventingchannels.ChannelReference][]types.UID), + channelSubscriptions: make(map[eventingchannels.ChannelReference]*KafkaSubscription), subsConsumerGroups: make(map[types.UID]sarama.ConsumerGroup), subscriptions: make(map[types.UID]Subscription), kafkaSyncProducer: producer, @@ -256,9 +266,17 @@ func (d *KafkaDispatcher) UpdateKafkaConsumers(config *Config) (map[types.UID]er // Check if sub already exists exists := false - for _, s := range d.channelSubscriptions[channelRef] { - if s == subSpec.UID { - exists = true + if _, ok := d.channelSubscriptions[channelRef]; ok { + for _, s := range d.channelSubscriptions[channelRef].subs { + if s == subSpec.UID { + exists = true + } + } + } else { //ensure the pointer is populated + d.channelSubscriptions[channelRef] = &KafkaSubscription{ + subs: []types.UID{}, + channelReadySubscriptions: sets.String{}, + consumerWatchers: []ConsumerCallback{}, } } @@ -278,7 +296,7 @@ func (d *KafkaDispatcher) UpdateKafkaConsumers(config *Config) (map[types.UID]er // Unsubscribe and close consumer for any deleted subscriptions subsToRemove := make(map[eventingchannels.ChannelReference][]types.UID) for channelRef, actualSubs := range d.channelSubscriptions { - subsToRemove[channelRef] = uidSetDifference(actualSubs, newSubs) + subsToRemove[channelRef] = uidSetDifference(actualSubs.subs, newSubs) } for channelRef, subs := range subsToRemove { @@ -287,7 +305,7 @@ func (d *KafkaDispatcher) UpdateKafkaConsumers(config *Config) (map[types.UID]er return nil, err } } - d.channelSubscriptions[channelRef] = newSubs + d.channelSubscriptions[channelRef].subs = newSubs } return failedToSubscribe, nil @@ -378,7 +396,7 @@ func (d *KafkaDispatcher) subscribe(channelRef eventingchannels.ChannelReference } }() - d.channelSubscriptions[channelRef] = append(d.channelSubscriptions[channelRef], sub.UID) + d.channelSubscriptions[channelRef].subs = append(d.channelSubscriptions[channelRef].subs, sub.UID) d.subscriptions[sub.UID] = sub d.subsConsumerGroups[sub.UID] = consumerGroup @@ -390,14 +408,14 @@ func (d *KafkaDispatcher) subscribe(channelRef eventingchannels.ChannelReference func (d *KafkaDispatcher) unsubscribe(channel eventingchannels.ChannelReference, sub Subscription) error { d.logger.Infow("Unsubscribing from channel", zap.Any("channel", channel), zap.String("subscription", sub.String())) delete(d.subscriptions, sub.UID) - if subsSlice, ok := d.channelSubscriptions[channel]; ok { + if subsSlice := d.channelSubscriptions[channel].subs; subsSlice != nil { var newSlice []types.UID for _, oldSub := range subsSlice { if oldSub != sub.UID { newSlice = append(newSlice, oldSub) } } - d.channelSubscriptions[channel] = newSlice + d.channelSubscriptions[channel].subs = newSlice } if consumer, ok := d.subsConsumerGroups[sub.UID]; ok { delete(d.subsConsumerGroups, sub.UID) From dc066104151ac2c9fa2c64764d01fda01e8d249d Mon Sep 17 00:00:00 2001 From: Lukas Berk Date: Tue, 26 Jan 2021 21:23:32 -0500 Subject: [PATCH 02/24] Add functionality to serve http of subscribers --- .../consolidated/dispatcher/dispatcher.go | 69 +++++++++++++++++-- .../reconciler/dispatcher/kafkachannel.go | 5 -- pkg/common/consumer/consumer_handler.go | 3 + 3 files changed, 65 insertions(+), 12 deletions(-) diff --git a/pkg/channel/consolidated/dispatcher/dispatcher.go b/pkg/channel/consolidated/dispatcher/dispatcher.go index 711afcd46b..38a4c38346 100644 --- a/pkg/channel/consolidated/dispatcher/dispatcher.go +++ b/pkg/channel/consolidated/dispatcher/dispatcher.go @@ -17,6 +17,7 @@ package dispatcher import ( "context" + "encoding/json" "errors" "fmt" nethttp "net/http" @@ -49,9 +50,16 @@ import ( type ConsumerCallback func() type KafkaSubscription struct { - subs []types.UID + channelRef eventingchannels.ChannelReference + subs []types.UID + + // readySubscriptionsLock must be used to synchronize access to channelReadySubscriptions + readySubscriptionsLock sync.RWMutex channelReadySubscriptions sets.String - consumerWatchers []ConsumerCallback + + // watchersLock must be used to synchronize access to consumerWatchers + watchersLock sync.RWMutex + consumerWatchers []ConsumerCallback } type KafkaDispatcher struct { @@ -190,9 +198,14 @@ type KafkaDispatcherArgs struct { } type consumerMessageHandler struct { - logger *zap.SugaredLogger - sub Subscription - dispatcher *eventingchannels.MessageDispatcherImpl + logger *zap.SugaredLogger + sub Subscription + dispatcher *eventingchannels.MessageDispatcherImpl + kafkaSubscription *KafkaSubscription +} + +func (c consumerMessageHandler) SetReady(ready bool) { + c.kafkaSubscription.SetReady(c.sub.UID, ready) } func (c consumerMessageHandler) Handle(ctx context.Context, consumerMessage *sarama.ConsumerMessage) (bool, error) { @@ -245,6 +258,43 @@ type ChannelConfig struct { Subscriptions []Subscription } +// SetReady will mark the subid in the KafkaSubscription and call any registered callbacks +func (ks *KafkaSubscription) SetReady(subID types.UID, ready bool) { + ks.readySubscriptionsLock.Lock() + defer ks.readySubscriptionsLock.Unlock() + if ready { + if !ks.channelReadySubscriptions.Has(string(subID)) { + ks.channelReadySubscriptions.Insert(string(subID)) + } + } else { + if ks.channelReadySubscriptions.Has(string(subID)) { + ks.channelReadySubscriptions.Delete(string(subID)) + } + } + +} + +func (ks *KafkaSubscription) HandleReadySubsStatusRequest(w nethttp.ResponseWriter, r *nethttp.Request) { + + if r.Method != nethttp.MethodGet { + w.WriteHeader(nethttp.StatusMethodNotAllowed) + return + } + ks.readySubscriptionsLock.RLock() + defer ks.readySubscriptionsLock.RUnlock() + var subscriptions = make(map[string][]string) + chanRef := fmt.Sprintf("%s/%s", ks.channelRef.Namespace, ks.channelRef.Name) + + w.Header().Set("K-Subscriber-Status", chanRef) + subscriptions[chanRef] = ks.channelReadySubscriptions.List() + jsonResult, err := json.Marshal(subscriptions) + if err != nil { + return // we should probably log instead, pass logger via context? + } + fmt.Fprintf(w, string(jsonResult)) + w.WriteHeader(nethttp.StatusOK) +} + // UpdateKafkaConsumers will be called by new CRD based kafka channel dispatcher controller. func (d *KafkaDispatcher) UpdateKafkaConsumers(config *Config) (map[types.UID]error, error) { if config == nil { @@ -272,12 +322,17 @@ func (d *KafkaDispatcher) UpdateKafkaConsumers(config *Config) (map[types.UID]er exists = true } } - } else { //ensure the pointer is populated + } else { //ensure the pointer is populated or things go boom d.channelSubscriptions[channelRef] = &KafkaSubscription{ + channelRef: channelRef, subs: []types.UID{}, channelReadySubscriptions: sets.String{}, consumerWatchers: []ConsumerCallback{}, } + go func() { + nethttp.HandleFunc(fmt.Sprintf("/%s/%s", cc.Namespace, cc.Name), d.channelSubscriptions[channelRef].HandleReadySubsStatusRequest) + d.logger.Fatal(nethttp.ListenAndServe(":", nil)) + }() } if !exists { @@ -378,7 +433,7 @@ func (d *KafkaDispatcher) subscribe(channelRef eventingchannels.ChannelReference topicName := d.topicFunc(utils.KafkaChannelSeparator, channelRef.Namespace, channelRef.Name) groupID := fmt.Sprintf("kafka.%s.%s.%s", channelRef.Namespace, channelRef.Name, string(sub.UID)) - handler := &consumerMessageHandler{d.logger, sub, d.dispatcher} + handler := &consumerMessageHandler{d.logger, sub, d.dispatcher, d.channelSubscriptions[channelRef]} consumerGroup, err := d.kafkaConsumerFactory.StartConsumerGroup(groupID, []string{topicName}, d.logger, handler) diff --git a/pkg/channel/consolidated/reconciler/dispatcher/kafkachannel.go b/pkg/channel/consolidated/reconciler/dispatcher/kafkachannel.go index c34b6d06f9..af2ec6fe32 100644 --- a/pkg/channel/consolidated/reconciler/dispatcher/kafkachannel.go +++ b/pkg/channel/consolidated/reconciler/dispatcher/kafkachannel.go @@ -151,11 +151,6 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, kc *v1beta1.KafkaChannel return r.syncDispatcher(ctx) } -func (r *Reconciler) ObserveKind(ctx context.Context, kc *v1beta1.KafkaChannel) pkgreconciler.Event { - logging.FromContext(ctx).Debugw("ObserveKind for channel", zap.String("channel", kc.Name)) - return r.syncDispatcher(ctx) -} - func (r *Reconciler) syncDispatcher(ctx context.Context) pkgreconciler.Event { channels, err := r.kafkachannelLister.List(labels.Everything()) if err != nil { diff --git a/pkg/common/consumer/consumer_handler.go b/pkg/common/consumer/consumer_handler.go index b27ccecdb1..293cb240eb 100644 --- a/pkg/common/consumer/consumer_handler.go +++ b/pkg/common/consumer/consumer_handler.go @@ -28,6 +28,7 @@ type KafkaConsumerHandler interface { // When this function returns true, the consumer group offset is marked as consumed. // The returned error is enqueued in errors channel. Handle(context context.Context, message *sarama.ConsumerMessage) (bool, error) + SetReady(ready bool) } // ConsumerHandler implements sarama.ConsumerGroupHandler and provides some glue code to simplify message handling @@ -59,12 +60,14 @@ func (consumer *SaramaConsumerHandler) Setup(sarama.ConsumerGroupSession) error // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited func (consumer *SaramaConsumerHandler) Cleanup(session sarama.ConsumerGroupSession) error { consumer.logger.Info("cleanup handler") + consumer.handler.SetReady(false) return nil } // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). func (consumer *SaramaConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { consumer.logger.Info(fmt.Sprintf("Starting partition consumer, topic: %s, partition: %d, initialOffset: %d", claim.Topic(), claim.Partition(), claim.InitialOffset())) + consumer.handler.SetReady(true) // NOTE: // Do not move the code below to a goroutine. From b60aa7cc7f368661459d0a0b5ef87ab938731b53 Mon Sep 17 00:00:00 2001 From: Lukas Berk Date: Tue, 26 Jan 2021 21:24:16 -0500 Subject: [PATCH 03/24] Drop callback functions --- pkg/channel/consolidated/dispatcher/dispatcher.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/pkg/channel/consolidated/dispatcher/dispatcher.go b/pkg/channel/consolidated/dispatcher/dispatcher.go index 38a4c38346..9ad49b0663 100644 --- a/pkg/channel/consolidated/dispatcher/dispatcher.go +++ b/pkg/channel/consolidated/dispatcher/dispatcher.go @@ -47,8 +47,6 @@ import ( "knative.dev/eventing-kafka/pkg/common/consumer" ) -type ConsumerCallback func() - type KafkaSubscription struct { channelRef eventingchannels.ChannelReference subs []types.UID @@ -56,10 +54,6 @@ type KafkaSubscription struct { // readySubscriptionsLock must be used to synchronize access to channelReadySubscriptions readySubscriptionsLock sync.RWMutex channelReadySubscriptions sets.String - - // watchersLock must be used to synchronize access to consumerWatchers - watchersLock sync.RWMutex - consumerWatchers []ConsumerCallback } type KafkaDispatcher struct { @@ -327,7 +321,6 @@ func (d *KafkaDispatcher) UpdateKafkaConsumers(config *Config) (map[types.UID]er channelRef: channelRef, subs: []types.UID{}, channelReadySubscriptions: sets.String{}, - consumerWatchers: []ConsumerCallback{}, } go func() { nethttp.HandleFunc(fmt.Sprintf("/%s/%s", cc.Namespace, cc.Name), d.channelSubscriptions[channelRef].HandleReadySubsStatusRequest) From dd07041f9702dcb02260d8d19c78be19880df951 Mon Sep 17 00:00:00 2001 From: Lukas Berk Date: Wed, 27 Jan 2021 16:52:50 -0500 Subject: [PATCH 04/24] Fix failing unit test, add unsub check for chanref --- pkg/channel/consolidated/dispatcher/dispatcher.go | 3 +++ pkg/channel/consolidated/dispatcher/dispatcher_test.go | 4 ++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/pkg/channel/consolidated/dispatcher/dispatcher.go b/pkg/channel/consolidated/dispatcher/dispatcher.go index 9ad49b0663..2a07f80dc9 100644 --- a/pkg/channel/consolidated/dispatcher/dispatcher.go +++ b/pkg/channel/consolidated/dispatcher/dispatcher.go @@ -456,6 +456,9 @@ func (d *KafkaDispatcher) subscribe(channelRef eventingchannels.ChannelReference func (d *KafkaDispatcher) unsubscribe(channel eventingchannels.ChannelReference, sub Subscription) error { d.logger.Infow("Unsubscribing from channel", zap.Any("channel", channel), zap.String("subscription", sub.String())) delete(d.subscriptions, sub.UID) + if _, ok := d.channelSubscriptions[channel]; !ok { + return nil + } if subsSlice := d.channelSubscriptions[channel].subs; subsSlice != nil { var newSlice []types.UID for _, oldSub := range subsSlice { diff --git a/pkg/channel/consolidated/dispatcher/dispatcher_test.go b/pkg/channel/consolidated/dispatcher/dispatcher_test.go index 543ca7e2c2..081b13c469 100644 --- a/pkg/channel/consolidated/dispatcher/dispatcher_test.go +++ b/pkg/channel/consolidated/dispatcher/dispatcher_test.go @@ -1,5 +1,5 @@ /* -Copyright 2019 The Knative Authors +Copyright 2020 The Knative Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -320,7 +320,7 @@ func TestDispatcher_UpdateConfig(t *testing.T) { t.Logf("Running %s", t.Name()) d := &KafkaDispatcher{ kafkaConsumerFactory: &mockKafkaConsumerFactory{}, - channelSubscriptions: make(map[eventingchannels.ChannelReference][]types.UID), + channelSubscriptions: make(map[eventingchannels.ChannelReference]*KafkaSubscription), subsConsumerGroups: make(map[types.UID]sarama.ConsumerGroup), subscriptions: make(map[types.UID]Subscription), topicFunc: utils.TopicName, From fa3042afc752ade0a4f4017944185b897103b4ae Mon Sep 17 00:00:00 2001 From: Lukas Berk Date: Wed, 27 Jan 2021 16:53:14 -0500 Subject: [PATCH 05/24] Rework http handler to be dispatcher local (not kafkasubscription) --- .../consolidated/dispatcher/dispatcher.go | 56 ++++++++++++------- 1 file changed, 35 insertions(+), 21 deletions(-) diff --git a/pkg/channel/consolidated/dispatcher/dispatcher.go b/pkg/channel/consolidated/dispatcher/dispatcher.go index 2a07f80dc9..db702796c5 100644 --- a/pkg/channel/consolidated/dispatcher/dispatcher.go +++ b/pkg/channel/consolidated/dispatcher/dispatcher.go @@ -134,6 +134,41 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat topicFunc: args.TopicFunc, } + subsStatusHandler := nethttp.HandlerFunc(func(w nethttp.ResponseWriter, r *nethttp.Request) { + if r.Method != nethttp.MethodGet { + w.WriteHeader(nethttp.StatusMethodNotAllowed) + return + } + uriSplit := strings.Split(r.RequestURI, "/") + if len(uriSplit) != 3 { + w.WriteHeader(nethttp.StatusBadRequest) + return + } + channelRef := eventingchannels.ChannelReference{ + Name: uriSplit[2], + Namespace: uriSplit[1], + } + if _, ok := dispatcher.channelSubscriptions[channelRef]; !ok { + w.WriteHeader(nethttp.StatusBadRequest) + return + } + dispatcher.channelSubscriptions[channelRef].readySubscriptionsLock.RLock() + defer dispatcher.channelSubscriptions[channelRef].readySubscriptionsLock.RUnlock() + var subscriptions = make(map[string][]string) + w.Header().Set("K-Subscriber-Status", uriSPlit[2]) + subscriptions[uriSplit[2]] = dispatcher.channelSubscriptions[channelRef].channelReadySubscriptions.List() + jsonResult, err := json.Marshal(subscriptions) + if err != nil { + return // we should probably log instead, pass logger via context? + } + fmt.Fprintf(w, string(jsonResult)) + w.WriteHeader(nethttp.StatusOK) + }) + + go func() { + dispatcher.logger.Fatal(nethttp.ListenAndServe(":", subsStatusHandler)) + }() + podName, err := env.GetRequiredConfigValue(args.Logger.Desugar(), env.PodNameEnvVarKey) if err != nil { return nil, err @@ -270,23 +305,6 @@ func (ks *KafkaSubscription) SetReady(subID types.UID, ready bool) { func (ks *KafkaSubscription) HandleReadySubsStatusRequest(w nethttp.ResponseWriter, r *nethttp.Request) { - if r.Method != nethttp.MethodGet { - w.WriteHeader(nethttp.StatusMethodNotAllowed) - return - } - ks.readySubscriptionsLock.RLock() - defer ks.readySubscriptionsLock.RUnlock() - var subscriptions = make(map[string][]string) - chanRef := fmt.Sprintf("%s/%s", ks.channelRef.Namespace, ks.channelRef.Name) - - w.Header().Set("K-Subscriber-Status", chanRef) - subscriptions[chanRef] = ks.channelReadySubscriptions.List() - jsonResult, err := json.Marshal(subscriptions) - if err != nil { - return // we should probably log instead, pass logger via context? - } - fmt.Fprintf(w, string(jsonResult)) - w.WriteHeader(nethttp.StatusOK) } // UpdateKafkaConsumers will be called by new CRD based kafka channel dispatcher controller. @@ -322,10 +340,6 @@ func (d *KafkaDispatcher) UpdateKafkaConsumers(config *Config) (map[types.UID]er subs: []types.UID{}, channelReadySubscriptions: sets.String{}, } - go func() { - nethttp.HandleFunc(fmt.Sprintf("/%s/%s", cc.Namespace, cc.Name), d.channelSubscriptions[channelRef].HandleReadySubsStatusRequest) - d.logger.Fatal(nethttp.ListenAndServe(":", nil)) - }() } if !exists { From a54fd545d5f89893e89fe9a6bb6403592a65420e Mon Sep 17 00:00:00 2001 From: Lukas Berk Date: Fri, 29 Jan 2021 11:29:00 -0500 Subject: [PATCH 06/24] Variable typo fix --- pkg/channel/consolidated/dispatcher/dispatcher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/channel/consolidated/dispatcher/dispatcher.go b/pkg/channel/consolidated/dispatcher/dispatcher.go index db702796c5..f286bafdd2 100644 --- a/pkg/channel/consolidated/dispatcher/dispatcher.go +++ b/pkg/channel/consolidated/dispatcher/dispatcher.go @@ -155,7 +155,7 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat dispatcher.channelSubscriptions[channelRef].readySubscriptionsLock.RLock() defer dispatcher.channelSubscriptions[channelRef].readySubscriptionsLock.RUnlock() var subscriptions = make(map[string][]string) - w.Header().Set("K-Subscriber-Status", uriSPlit[2]) + w.Header().Set("K-Subscriber-Status", uriSplit[2]) subscriptions[uriSplit[2]] = dispatcher.channelSubscriptions[channelRef].channelReadySubscriptions.List() jsonResult, err := json.Marshal(subscriptions) if err != nil { From c8c642a8f84613c6cc7d6070988d25603b64b57b Mon Sep 17 00:00:00 2001 From: Lukas Berk Date: Fri, 29 Jan 2021 11:37:06 -0500 Subject: [PATCH 07/24] Fix copyright years --- pkg/channel/consolidated/dispatcher/dispatcher.go | 2 +- pkg/channel/consolidated/dispatcher/dispatcher_test.go | 2 +- pkg/channel/consolidated/reconciler/dispatcher/kafkachannel.go | 2 +- pkg/common/consumer/consumer_handler.go | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/channel/consolidated/dispatcher/dispatcher.go b/pkg/channel/consolidated/dispatcher/dispatcher.go index f286bafdd2..077bcc786b 100644 --- a/pkg/channel/consolidated/dispatcher/dispatcher.go +++ b/pkg/channel/consolidated/dispatcher/dispatcher.go @@ -1,5 +1,5 @@ /* -Copyright 2018 The Knative Authors +Copyright 2021 The Knative Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/pkg/channel/consolidated/dispatcher/dispatcher_test.go b/pkg/channel/consolidated/dispatcher/dispatcher_test.go index 081b13c469..1610d5fc9f 100644 --- a/pkg/channel/consolidated/dispatcher/dispatcher_test.go +++ b/pkg/channel/consolidated/dispatcher/dispatcher_test.go @@ -1,5 +1,5 @@ /* -Copyright 2020 The Knative Authors +Copyright 2021 The Knative Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/pkg/channel/consolidated/reconciler/dispatcher/kafkachannel.go b/pkg/channel/consolidated/reconciler/dispatcher/kafkachannel.go index af2ec6fe32..3a7969244f 100644 --- a/pkg/channel/consolidated/reconciler/dispatcher/kafkachannel.go +++ b/pkg/channel/consolidated/reconciler/dispatcher/kafkachannel.go @@ -1,5 +1,5 @@ /* -Copyright 2019 The Knative Authors +Copyright 2021 The Knative Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/pkg/common/consumer/consumer_handler.go b/pkg/common/consumer/consumer_handler.go index 293cb240eb..75a85d30cd 100644 --- a/pkg/common/consumer/consumer_handler.go +++ b/pkg/common/consumer/consumer_handler.go @@ -1,5 +1,5 @@ /* -Copyright 2019 The Knative Authors +Copyright 2021 The Knative Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. From 6a2dadeacab8038b0f88be7493a3bf079af459eb Mon Sep 17 00:00:00 2001 From: Lukas Berk Date: Fri, 29 Jan 2021 11:38:54 -0500 Subject: [PATCH 08/24] Change header name to constant --- pkg/channel/consolidated/dispatcher/dispatcher.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/channel/consolidated/dispatcher/dispatcher.go b/pkg/channel/consolidated/dispatcher/dispatcher.go index 077bcc786b..9ab5919517 100644 --- a/pkg/channel/consolidated/dispatcher/dispatcher.go +++ b/pkg/channel/consolidated/dispatcher/dispatcher.go @@ -47,6 +47,10 @@ import ( "knative.dev/eventing-kafka/pkg/common/consumer" ) +const ( + dispatcherReadySubHeader = "K-Subscriber-Status" +) + type KafkaSubscription struct { channelRef eventingchannels.ChannelReference subs []types.UID @@ -155,7 +159,7 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat dispatcher.channelSubscriptions[channelRef].readySubscriptionsLock.RLock() defer dispatcher.channelSubscriptions[channelRef].readySubscriptionsLock.RUnlock() var subscriptions = make(map[string][]string) - w.Header().Set("K-Subscriber-Status", uriSplit[2]) + w.Header().Set(dispatcherReadySubHeader, uriSplit[2]) subscriptions[uriSplit[2]] = dispatcher.channelSubscriptions[channelRef].channelReadySubscriptions.List() jsonResult, err := json.Marshal(subscriptions) if err != nil { From 325e2c9b7650d1857b04d7d8d31112af821fc082 Mon Sep 17 00:00:00 2001 From: Lukas Berk Date: Tue, 2 Feb 2021 08:32:05 -0500 Subject: [PATCH 09/24] Move subscription handler to its own ServeHTTP func --- .../consolidated/dispatcher/dispatcher.go | 68 +++++++++---------- 1 file changed, 32 insertions(+), 36 deletions(-) diff --git a/pkg/channel/consolidated/dispatcher/dispatcher.go b/pkg/channel/consolidated/dispatcher/dispatcher.go index 9ab5919517..2e39f2ec1d 100644 --- a/pkg/channel/consolidated/dispatcher/dispatcher.go +++ b/pkg/channel/consolidated/dispatcher/dispatcher.go @@ -104,6 +104,37 @@ func (sub Subscription) String() string { return s.String() } +func (d *KafkaDispatcher) ServeHTTP(w nethttp.ResponseWriter, r *nethttp.Request) { + if r.Method != nethttp.MethodGet { + w.WriteHeader(nethttp.StatusMethodNotAllowed) + return + } + uriSplit := strings.Split(r.RequestURI, "/") + if len(uriSplit) != 3 { + w.WriteHeader(nethttp.StatusNotFound) + return + } + channelRef := eventingchannels.ChannelReference{ + Name: uriSplit[2], + Namespace: uriSplit[1], + } + if _, ok := d.channelSubscriptions[channelRef]; !ok { + w.WriteHeader(nethttp.StatusBadRequest) + return + } + d.channelSubscriptions[channelRef].readySubscriptionsLock.RLock() + defer d.channelSubscriptions[channelRef].readySubscriptionsLock.RUnlock() + var subscriptions = make(map[string][]string) + w.Header().Set(dispatcherReadySubHeader, uriSplit[2]) + subscriptions[uriSplit[2]] = d.channelSubscriptions[channelRef].channelReadySubscriptions.List() + jsonResult, err := json.Marshal(subscriptions) + if err != nil { + return // we should probably log instead, pass logger via context? + } + fmt.Fprintf(w, string(jsonResult)) + w.WriteHeader(nethttp.StatusOK) +} + func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispatcher, error) { conf, err := client.NewConfigBuilder(). WithClientId(args.ClientID). @@ -138,39 +169,8 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat topicFunc: args.TopicFunc, } - subsStatusHandler := nethttp.HandlerFunc(func(w nethttp.ResponseWriter, r *nethttp.Request) { - if r.Method != nethttp.MethodGet { - w.WriteHeader(nethttp.StatusMethodNotAllowed) - return - } - uriSplit := strings.Split(r.RequestURI, "/") - if len(uriSplit) != 3 { - w.WriteHeader(nethttp.StatusBadRequest) - return - } - channelRef := eventingchannels.ChannelReference{ - Name: uriSplit[2], - Namespace: uriSplit[1], - } - if _, ok := dispatcher.channelSubscriptions[channelRef]; !ok { - w.WriteHeader(nethttp.StatusBadRequest) - return - } - dispatcher.channelSubscriptions[channelRef].readySubscriptionsLock.RLock() - defer dispatcher.channelSubscriptions[channelRef].readySubscriptionsLock.RUnlock() - var subscriptions = make(map[string][]string) - w.Header().Set(dispatcherReadySubHeader, uriSplit[2]) - subscriptions[uriSplit[2]] = dispatcher.channelSubscriptions[channelRef].channelReadySubscriptions.List() - jsonResult, err := json.Marshal(subscriptions) - if err != nil { - return // we should probably log instead, pass logger via context? - } - fmt.Fprintf(w, string(jsonResult)) - w.WriteHeader(nethttp.StatusOK) - }) - go func() { - dispatcher.logger.Fatal(nethttp.ListenAndServe(":", subsStatusHandler)) + dispatcher.logger.Fatal(nethttp.ListenAndServe(":", dispatcher)) }() podName, err := env.GetRequiredConfigValue(args.Logger.Desugar(), env.PodNameEnvVarKey) @@ -307,10 +307,6 @@ func (ks *KafkaSubscription) SetReady(subID types.UID, ready bool) { } -func (ks *KafkaSubscription) HandleReadySubsStatusRequest(w nethttp.ResponseWriter, r *nethttp.Request) { - -} - // UpdateKafkaConsumers will be called by new CRD based kafka channel dispatcher controller. func (d *KafkaDispatcher) UpdateKafkaConsumers(config *Config) (map[types.UID]error, error) { if config == nil { From 67f97a838eb0d625f7c576eccc9c466b14de69d7 Mon Sep 17 00:00:00 2001 From: Lukas Berk Date: Tue, 2 Feb 2021 18:01:51 -0500 Subject: [PATCH 10/24] Remove channelRef in KafkaSubscription --- pkg/channel/consolidated/dispatcher/dispatcher.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pkg/channel/consolidated/dispatcher/dispatcher.go b/pkg/channel/consolidated/dispatcher/dispatcher.go index 2e39f2ec1d..43796c0f52 100644 --- a/pkg/channel/consolidated/dispatcher/dispatcher.go +++ b/pkg/channel/consolidated/dispatcher/dispatcher.go @@ -52,8 +52,7 @@ const ( ) type KafkaSubscription struct { - channelRef eventingchannels.ChannelReference - subs []types.UID + subs []types.UID // readySubscriptionsLock must be used to synchronize access to channelReadySubscriptions readySubscriptionsLock sync.RWMutex @@ -336,7 +335,6 @@ func (d *KafkaDispatcher) UpdateKafkaConsumers(config *Config) (map[types.UID]er } } else { //ensure the pointer is populated or things go boom d.channelSubscriptions[channelRef] = &KafkaSubscription{ - channelRef: channelRef, subs: []types.UID{}, channelReadySubscriptions: sets.String{}, } From 19919bad6c2ca91ac0075ca293ee547e7a7498e5 Mon Sep 17 00:00:00 2001 From: Lukas Berk Date: Tue, 2 Feb 2021 18:02:07 -0500 Subject: [PATCH 11/24] Change bad channelref request to http.StatusNotFound --- pkg/channel/consolidated/dispatcher/dispatcher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/channel/consolidated/dispatcher/dispatcher.go b/pkg/channel/consolidated/dispatcher/dispatcher.go index 43796c0f52..acc765fd04 100644 --- a/pkg/channel/consolidated/dispatcher/dispatcher.go +++ b/pkg/channel/consolidated/dispatcher/dispatcher.go @@ -118,7 +118,7 @@ func (d *KafkaDispatcher) ServeHTTP(w nethttp.ResponseWriter, r *nethttp.Request Namespace: uriSplit[1], } if _, ok := d.channelSubscriptions[channelRef]; !ok { - w.WriteHeader(nethttp.StatusBadRequest) + w.WriteHeader(nethttp.StatusNotFound) return } d.channelSubscriptions[channelRef].readySubscriptionsLock.RLock() From 5373342062f6afd073a440b9317ed20cbf3be5b1 Mon Sep 17 00:00:00 2001 From: Lukas Berk Date: Tue, 2 Feb 2021 18:02:28 -0500 Subject: [PATCH 12/24] Add namespace to subscriptions http output --- pkg/channel/consolidated/dispatcher/dispatcher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/channel/consolidated/dispatcher/dispatcher.go b/pkg/channel/consolidated/dispatcher/dispatcher.go index acc765fd04..30a0297bd1 100644 --- a/pkg/channel/consolidated/dispatcher/dispatcher.go +++ b/pkg/channel/consolidated/dispatcher/dispatcher.go @@ -125,7 +125,7 @@ func (d *KafkaDispatcher) ServeHTTP(w nethttp.ResponseWriter, r *nethttp.Request defer d.channelSubscriptions[channelRef].readySubscriptionsLock.RUnlock() var subscriptions = make(map[string][]string) w.Header().Set(dispatcherReadySubHeader, uriSplit[2]) - subscriptions[uriSplit[2]] = d.channelSubscriptions[channelRef].channelReadySubscriptions.List() + subscriptions[uriSplit[1]+"/"+uriSplit[2]] = d.channelSubscriptions[channelRef].channelReadySubscriptions.List() jsonResult, err := json.Marshal(subscriptions) if err != nil { return // we should probably log instead, pass logger via context? From 422afae08b6fa1065a573ac6c0bb049fba09fb41 Mon Sep 17 00:00:00 2001 From: Lukas Berk Date: Tue, 2 Feb 2021 18:03:07 -0500 Subject: [PATCH 13/24] Add Unit tests for servehttp & setready --- .../dispatcher/dispatcher_test.go | 186 ++++++++++++++++++ 1 file changed, 186 insertions(+) diff --git a/pkg/channel/consolidated/dispatcher/dispatcher_test.go b/pkg/channel/consolidated/dispatcher/dispatcher_test.go index 1610d5fc9f..08b5588baf 100644 --- a/pkg/channel/consolidated/dispatcher/dispatcher_test.go +++ b/pkg/channel/consolidated/dispatcher/dispatcher_test.go @@ -19,6 +19,10 @@ package dispatcher import ( "context" "errors" + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" "net/url" "testing" @@ -439,6 +443,188 @@ func TestNewDispatcher(t *testing.T) { } } +func TestSetReady(t *testing.T) { + testCases := []struct { + name string + ready bool + subID types.UID + originalKafkaSub *KafkaSubscription + desiredKafkaSub *KafkaSubscription + }{ + { + name: "doesn't have the sub, add it (on ready)", + ready: true, + subID: "foo", + originalKafkaSub: &KafkaSubscription{ + channelReadySubscriptions: sets.String{"bar": sets.Empty{}}, + }, + desiredKafkaSub: &KafkaSubscription{ + subs: []types.UID{}, + channelReadySubscriptions: sets.String{"bar": sets.Empty{}, "foo": sets.Empty{}}, + }, + }, + { + name: "has the sub already (on ready)", + ready: true, + subID: "foo", + originalKafkaSub: &KafkaSubscription{ + channelReadySubscriptions: sets.String{"foo": sets.Empty{}, "bar": sets.Empty{}}, + }, + desiredKafkaSub: &KafkaSubscription{ + channelReadySubscriptions: sets.String{"foo": sets.Empty{}, "bar": sets.Empty{}}, + }, + }, + { + name: "has the sub, delete it (on !ready)", + ready: false, + subID: "foo", + originalKafkaSub: &KafkaSubscription{ + channelReadySubscriptions: sets.String{"foo": sets.Empty{}, "bar": sets.Empty{}}, + }, + desiredKafkaSub: &KafkaSubscription{ + channelReadySubscriptions: sets.String{"bar": sets.Empty{}}, + }, + }, + { + name: "doesn't have the sub to delete (on !ready)", + ready: false, + subID: "foo", + originalKafkaSub: &KafkaSubscription{ + channelReadySubscriptions: sets.String{"bar": sets.Empty{}}, + }, + desiredKafkaSub: &KafkaSubscription{ + channelReadySubscriptions: sets.String{"bar": sets.Empty{}}, + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Logf("Running %s", t.Name()) + tc.originalKafkaSub.SetReady(tc.subID, tc.ready) + if diff := cmp.Diff(tc.desiredKafkaSub.channelReadySubscriptions, tc.originalKafkaSub.channelReadySubscriptions); diff != "" { + t.Errorf("unexpected ChannelReadySubscription (-want, +got) = %v", diff) + } + }) + } +} + +func TestServeHTTP(t *testing.T) { + + testCases := []struct { + name string + responseReturnCode int + desiredJson []byte + channelRef eventingchannels.ChannelReference + channelSubs map[eventingchannels.ChannelReference]*KafkaSubscription + }{ + { + name: "channelref not found", + responseReturnCode: http.StatusNotFound, + desiredJson: []byte{}, + channelRef: eventingchannels.ChannelReference{ + Name: "doesnt", + Namespace: "exist", + }, + }, { + name: "nop", + responseReturnCode: http.StatusNotFound, + desiredJson: []byte{}, + channelRef: eventingchannels.ChannelReference{}, + }, { + name: "no ready subscribers", + responseReturnCode: http.StatusOK, + desiredJson: []byte(`{"bar/foo":[]}`), + channelRef: eventingchannels.ChannelReference{ + Name: "foo", + Namespace: "bar", + }, + channelSubs: map[eventingchannels.ChannelReference]*KafkaSubscription{ + {Name: "foo", Namespace: "bar"}: { + subs: []types.UID{}, + channelReadySubscriptions: sets.String{}, + }, + }, + }, { + name: "different channelref called from populated channref (different ns)", + desiredJson: []byte{}, + responseReturnCode: http.StatusNotFound, + channelRef: eventingchannels.ChannelReference{ + Name: "foo", + Namespace: "bar", + }, + channelSubs: map[eventingchannels.ChannelReference]*KafkaSubscription{ + {Name: "foo", Namespace: "baz"}: { + subs: []types.UID{"a", "b"}, + channelReadySubscriptions: sets.String{"a": sets.Empty{}, "b": sets.Empty{}}, + }, + }, + }, { + name: "return correct subscription", + desiredJson: []byte(`{"bar/foo":["a","b"]}`), + responseReturnCode: http.StatusOK, + channelRef: eventingchannels.ChannelReference{ + Name: "foo", + Namespace: "bar", + }, + channelSubs: map[eventingchannels.ChannelReference]*KafkaSubscription{ + {Name: "foo", Namespace: "bar"}: { + subs: []types.UID{"a", "b"}, + channelReadySubscriptions: sets.String{"a": sets.Empty{}, "b": sets.Empty{}}, + }, + }, + }, { + name: "return correct subscription from multiple chanrefs", + desiredJson: []byte(`{"bar/foo":["a","b"]}`), + responseReturnCode: http.StatusOK, + channelRef: eventingchannels.ChannelReference{ + Name: "foo", + Namespace: "bar", + }, + channelSubs: map[eventingchannels.ChannelReference]*KafkaSubscription{ + {Name: "table", Namespace: "flip"}: { + subs: []types.UID{"c", "d"}, + channelReadySubscriptions: sets.String{"c": sets.Empty{}}, + }, + {Name: "foo", Namespace: "bar"}: { + subs: []types.UID{"a", "b"}, + channelReadySubscriptions: sets.String{"a": sets.Empty{}, "b": sets.Empty{}}, + }, + }, + }, + } + d := &KafkaDispatcher{ + channelSubscriptions: make(map[eventingchannels.ChannelReference]*KafkaSubscription), + logger: zaptest.NewLogger(t).Sugar(), + } + ts := httptest.NewServer(d) + defer ts.Close() + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Logf("Running %s", t.Name()) + d.channelSubscriptions = tc.channelSubs + + resp, err := http.Get(fmt.Sprintf("%s/%s/%s", ts.URL, tc.channelRef.Namespace, tc.channelRef.Name)) + if err != nil { + t.Errorf("Could not send request to subscriber endpoint: %v", err) + } + if resp.StatusCode != tc.responseReturnCode { + t.Errorf("unepxected status returned: want: %d, got: %d", tc.responseReturnCode, resp.StatusCode) + } + respBody, err := ioutil.ReadAll(resp.Body) + defer resp.Body.Close() + if err != nil { + t.Errorf("Could not read response from subscriber endpoint: %v", err) + } + if testing.Verbose() && len(respBody) > 0 { + t.Logf("http response: %s\n", string(respBody)) + } + if diff := cmp.Diff(tc.desiredJson, respBody); diff != "" { + t.Errorf("unexpected readysubscriber status response: (-want, +got) = %v", diff) + } + }) + } +} + var sortStrings = cmpopts.SortSlices(func(x, y string) bool { return x < y }) From 6cc860786bc9e5cc4686e3ecd8a2a6da15440cbe Mon Sep 17 00:00:00 2001 From: Lukas Berk Date: Tue, 2 Feb 2021 18:11:44 -0500 Subject: [PATCH 14/24] Split uriSplit into channelRefName{,space} vars --- pkg/channel/consolidated/dispatcher/dispatcher.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/channel/consolidated/dispatcher/dispatcher.go b/pkg/channel/consolidated/dispatcher/dispatcher.go index 30a0297bd1..d1e04c60c6 100644 --- a/pkg/channel/consolidated/dispatcher/dispatcher.go +++ b/pkg/channel/consolidated/dispatcher/dispatcher.go @@ -113,9 +113,10 @@ func (d *KafkaDispatcher) ServeHTTP(w nethttp.ResponseWriter, r *nethttp.Request w.WriteHeader(nethttp.StatusNotFound) return } + channelRefNamespace, channelRefName := uriSplit[1], uriSplit[2] channelRef := eventingchannels.ChannelReference{ - Name: uriSplit[2], - Namespace: uriSplit[1], + Name: channelRefName, + Namespace: channelRefNamespace, } if _, ok := d.channelSubscriptions[channelRef]; !ok { w.WriteHeader(nethttp.StatusNotFound) @@ -125,7 +126,7 @@ func (d *KafkaDispatcher) ServeHTTP(w nethttp.ResponseWriter, r *nethttp.Request defer d.channelSubscriptions[channelRef].readySubscriptionsLock.RUnlock() var subscriptions = make(map[string][]string) w.Header().Set(dispatcherReadySubHeader, uriSplit[2]) - subscriptions[uriSplit[1]+"/"+uriSplit[2]] = d.channelSubscriptions[channelRef].channelReadySubscriptions.List() + subscriptions[channelRefNamespace+"/"+channelRefName] = d.channelSubscriptions[channelRef].channelReadySubscriptions.List() jsonResult, err := json.Marshal(subscriptions) if err != nil { return // we should probably log instead, pass logger via context? From c2650da4f99b787007b6df0889f36ac377e12d44 Mon Sep 17 00:00:00 2001 From: Lukas Berk Date: Tue, 2 Feb 2021 20:19:52 -0500 Subject: [PATCH 15/24] Expose dispatcher http-sub-status port in disatcher svc --- config/channel/consolidated/deployments/dispatcher.yaml | 7 +++++++ pkg/channel/consolidated/dispatcher/dispatcher.go | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/config/channel/consolidated/deployments/dispatcher.yaml b/config/channel/consolidated/deployments/dispatcher.yaml index 45977778ca..896f9ea9e0 100644 --- a/config/channel/consolidated/deployments/dispatcher.yaml +++ b/config/channel/consolidated/deployments/dispatcher.yaml @@ -60,6 +60,9 @@ spec: - containerPort: 9090 name: metrics protocol: TCP + - containerPort: 8081 + name: sub-status + protocol: TCP volumeMounts: - name: config-kafka mountPath: /etc/config-kafka @@ -85,6 +88,10 @@ spec: port: 80 protocol: TCP targetPort: 8080 + - name: http-sub-status + port: 8081 + protocol: TCP + targetPort: 8081 selector: messaging.knative.dev/channel: kafka-channel messaging.knative.dev/role: dispatcher diff --git a/pkg/channel/consolidated/dispatcher/dispatcher.go b/pkg/channel/consolidated/dispatcher/dispatcher.go index d1e04c60c6..a05a2f1f1b 100644 --- a/pkg/channel/consolidated/dispatcher/dispatcher.go +++ b/pkg/channel/consolidated/dispatcher/dispatcher.go @@ -170,7 +170,7 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat } go func() { - dispatcher.logger.Fatal(nethttp.ListenAndServe(":", dispatcher)) + dispatcher.logger.Fatal(nethttp.ListenAndServe(":8081", dispatcher)) }() podName, err := env.GetRequiredConfigValue(args.Logger.Desugar(), env.PodNameEnvVarKey) From 89aa745fd6ec6269c40e5b0d60c48e5ff62ef95f Mon Sep 17 00:00:00 2001 From: Lukas Berk Date: Tue, 2 Feb 2021 20:34:45 -0500 Subject: [PATCH 16/24] Add servehttp diagnostic messages --- pkg/channel/consolidated/dispatcher/dispatcher.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/channel/consolidated/dispatcher/dispatcher.go b/pkg/channel/consolidated/dispatcher/dispatcher.go index a05a2f1f1b..78140135a3 100644 --- a/pkg/channel/consolidated/dispatcher/dispatcher.go +++ b/pkg/channel/consolidated/dispatcher/dispatcher.go @@ -106,11 +106,13 @@ func (sub Subscription) String() string { func (d *KafkaDispatcher) ServeHTTP(w nethttp.ResponseWriter, r *nethttp.Request) { if r.Method != nethttp.MethodGet { w.WriteHeader(nethttp.StatusMethodNotAllowed) + d.logger.Errorf("Received request method that wasn't GET: %s", r.Method) return } uriSplit := strings.Split(r.RequestURI, "/") if len(uriSplit) != 3 { w.WriteHeader(nethttp.StatusNotFound) + d.logger.Errorf("Unable to process request: %s", r.RequestURI) return } channelRefNamespace, channelRefName := uriSplit[1], uriSplit[2] @@ -129,7 +131,8 @@ func (d *KafkaDispatcher) ServeHTTP(w nethttp.ResponseWriter, r *nethttp.Request subscriptions[channelRefNamespace+"/"+channelRefName] = d.channelSubscriptions[channelRef].channelReadySubscriptions.List() jsonResult, err := json.Marshal(subscriptions) if err != nil { - return // we should probably log instead, pass logger via context? + d.logger.Errorf("Error marshalling json for sub-status channelref: %s/%s, %w", channelRefNamespace, channelRefName, err) + return } fmt.Fprintf(w, string(jsonResult)) w.WriteHeader(nethttp.StatusOK) From b38d70aca680da9fe90fd9a5e507decb9ecd1841 Mon Sep 17 00:00:00 2001 From: Lukas Berk Date: Wed, 3 Feb 2021 09:30:36 -0500 Subject: [PATCH 17/24] One more uriSplit -> channelRefName variable rename --- pkg/channel/consolidated/dispatcher/dispatcher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/channel/consolidated/dispatcher/dispatcher.go b/pkg/channel/consolidated/dispatcher/dispatcher.go index 78140135a3..03b0158cfc 100644 --- a/pkg/channel/consolidated/dispatcher/dispatcher.go +++ b/pkg/channel/consolidated/dispatcher/dispatcher.go @@ -127,7 +127,7 @@ func (d *KafkaDispatcher) ServeHTTP(w nethttp.ResponseWriter, r *nethttp.Request d.channelSubscriptions[channelRef].readySubscriptionsLock.RLock() defer d.channelSubscriptions[channelRef].readySubscriptionsLock.RUnlock() var subscriptions = make(map[string][]string) - w.Header().Set(dispatcherReadySubHeader, uriSplit[2]) + w.Header().Set(dispatcherReadySubHeader, channelRefName) subscriptions[channelRefNamespace+"/"+channelRefName] = d.channelSubscriptions[channelRef].channelReadySubscriptions.List() jsonResult, err := json.Marshal(subscriptions) if err != nil { From b1837325bc124a0fa8995109c4dfc055241f464a Mon Sep 17 00:00:00 2001 From: Lukas Berk Date: Wed, 3 Feb 2021 09:30:55 -0500 Subject: [PATCH 18/24] Change how we write the http response --- pkg/channel/consolidated/dispatcher/dispatcher.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/channel/consolidated/dispatcher/dispatcher.go b/pkg/channel/consolidated/dispatcher/dispatcher.go index 03b0158cfc..c79d675d0c 100644 --- a/pkg/channel/consolidated/dispatcher/dispatcher.go +++ b/pkg/channel/consolidated/dispatcher/dispatcher.go @@ -134,8 +134,10 @@ func (d *KafkaDispatcher) ServeHTTP(w nethttp.ResponseWriter, r *nethttp.Request d.logger.Errorf("Error marshalling json for sub-status channelref: %s/%s, %w", channelRefNamespace, channelRefName, err) return } - fmt.Fprintf(w, string(jsonResult)) - w.WriteHeader(nethttp.StatusOK) + _, err = w.Write(jsonResult) + if err != nil { + d.logger.Errorf("Error writing jsonResult to serveHTTP writer: %w", err) + } } func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispatcher, error) { From 1f235674cf3ca1b5c942aff08b5bf7eb633d8432 Mon Sep 17 00:00:00 2001 From: Lukas Berk Date: Wed, 3 Feb 2021 14:29:00 -0500 Subject: [PATCH 19/24] Add empty SetReady() method to source RA --- pkg/source/adapter/adapter.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/source/adapter/adapter.go b/pkg/source/adapter/adapter.go index 1450c0ef5a..2dba25b4f7 100644 --- a/pkg/source/adapter/adapter.go +++ b/pkg/source/adapter/adapter.go @@ -121,6 +121,8 @@ func (a *Adapter) start(stopCh <-chan struct{}) error { return nil } +func (a *Adapter) SetReady(_ bool) {} + func (a *Adapter) Handle(ctx context.Context, msg *sarama.ConsumerMessage) (bool, error) { if a.rateLimiter != nil { a.rateLimiter.Wait(ctx) From e18b36129b21afa9686b6883ce06e99723195f23 Mon Sep 17 00:00:00 2001 From: Lukas Berk Date: Wed, 3 Feb 2021 14:34:51 -0500 Subject: [PATCH 20/24] Fix consumer_handler_test --- pkg/common/consumer/consumer_handler_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/common/consumer/consumer_handler_test.go b/pkg/common/consumer/consumer_handler_test.go index 3548696113..edcf5ea5d0 100644 --- a/pkg/common/consumer/consumer_handler_test.go +++ b/pkg/common/consumer/consumer_handler_test.go @@ -114,6 +114,10 @@ func (m mockMessageHandler) Handle(ctx context.Context, message *sarama.Consumer } } +func (m mockMessageHandler) SetReady(ready bool) { + return +} + //------ Tests func Test(t *testing.T) { From d1ddc68ba1684c7492e0212955a5c9e195b90b21 Mon Sep 17 00:00:00 2001 From: Lukas Berk Date: Wed, 3 Feb 2021 14:38:18 -0500 Subject: [PATCH 21/24] more linting --- pkg/common/consumer/consumer_handler_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/common/consumer/consumer_handler_test.go b/pkg/common/consumer/consumer_handler_test.go index edcf5ea5d0..14953e6c1a 100644 --- a/pkg/common/consumer/consumer_handler_test.go +++ b/pkg/common/consumer/consumer_handler_test.go @@ -115,7 +115,6 @@ func (m mockMessageHandler) Handle(ctx context.Context, message *sarama.Consumer } func (m mockMessageHandler) SetReady(ready bool) { - return } //------ Tests From 48758b27ff488690121c0ea2caf35bd81d0b569e Mon Sep 17 00:00:00 2001 From: Lukas Berk Date: Wed, 3 Feb 2021 15:47:37 -0500 Subject: [PATCH 22/24] Add back ObserveKind until controller implements substatus scraper --- .../consolidated/reconciler/dispatcher/kafkachannel.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/channel/consolidated/reconciler/dispatcher/kafkachannel.go b/pkg/channel/consolidated/reconciler/dispatcher/kafkachannel.go index 3a7969244f..e1e01a38d7 100644 --- a/pkg/channel/consolidated/reconciler/dispatcher/kafkachannel.go +++ b/pkg/channel/consolidated/reconciler/dispatcher/kafkachannel.go @@ -151,6 +151,11 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, kc *v1beta1.KafkaChannel return r.syncDispatcher(ctx) } +func (r *Reconciler) ObserveKind(ctx context.Context, kc *v1beta1.KafkaChannel) pkgreconciler.Event { + logging.FromContext(ctx).Debugw("ObserveKind for channel", zap.String("channel", kc.Name)) + return r.syncDispatcher(ctx) +} + func (r *Reconciler) syncDispatcher(ctx context.Context) pkgreconciler.Event { channels, err := r.kafkachannelLister.List(labels.Everything()) if err != nil { From 1e768b9da55b602f64666709bdf31714e09b25eb Mon Sep 17 00:00:00 2001 From: Lukas Berk Date: Wed, 3 Feb 2021 16:20:54 -0500 Subject: [PATCH 23/24] Add more ServeHTTP unit tests --- .../dispatcher/dispatcher_test.go | 53 +++++++++++-------- 1 file changed, 30 insertions(+), 23 deletions(-) diff --git a/pkg/channel/consolidated/dispatcher/dispatcher_test.go b/pkg/channel/consolidated/dispatcher/dispatcher_test.go index 08b5588baf..50375af28f 100644 --- a/pkg/channel/consolidated/dispatcher/dispatcher_test.go +++ b/pkg/channel/consolidated/dispatcher/dispatcher_test.go @@ -510,76 +510,69 @@ func TestSetReady(t *testing.T) { func TestServeHTTP(t *testing.T) { + httpGet := "GET" + httpPost := "POST" testCases := []struct { name string responseReturnCode int desiredJson []byte - channelRef eventingchannels.ChannelReference channelSubs map[eventingchannels.ChannelReference]*KafkaSubscription + requestURI string + httpMethod string }{ { name: "channelref not found", + httpMethod: httpGet, responseReturnCode: http.StatusNotFound, desiredJson: []byte{}, - channelRef: eventingchannels.ChannelReference{ - Name: "doesnt", - Namespace: "exist", - }, + requestURI: "/exist/thisDoesNot", }, { name: "nop", + httpMethod: httpGet, responseReturnCode: http.StatusNotFound, desiredJson: []byte{}, - channelRef: eventingchannels.ChannelReference{}, + requestURI: "///", }, { name: "no ready subscribers", + httpMethod: httpGet, responseReturnCode: http.StatusOK, desiredJson: []byte(`{"bar/foo":[]}`), - channelRef: eventingchannels.ChannelReference{ - Name: "foo", - Namespace: "bar", - }, channelSubs: map[eventingchannels.ChannelReference]*KafkaSubscription{ {Name: "foo", Namespace: "bar"}: { subs: []types.UID{}, channelReadySubscriptions: sets.String{}, }, }, + requestURI: "/bar/foo", }, { name: "different channelref called from populated channref (different ns)", + httpMethod: httpGet, desiredJson: []byte{}, responseReturnCode: http.StatusNotFound, - channelRef: eventingchannels.ChannelReference{ - Name: "foo", - Namespace: "bar", - }, channelSubs: map[eventingchannels.ChannelReference]*KafkaSubscription{ {Name: "foo", Namespace: "baz"}: { subs: []types.UID{"a", "b"}, channelReadySubscriptions: sets.String{"a": sets.Empty{}, "b": sets.Empty{}}, }, }, + requestURI: "/bar/foo", }, { name: "return correct subscription", + httpMethod: httpGet, desiredJson: []byte(`{"bar/foo":["a","b"]}`), responseReturnCode: http.StatusOK, - channelRef: eventingchannels.ChannelReference{ - Name: "foo", - Namespace: "bar", - }, channelSubs: map[eventingchannels.ChannelReference]*KafkaSubscription{ {Name: "foo", Namespace: "bar"}: { subs: []types.UID{"a", "b"}, channelReadySubscriptions: sets.String{"a": sets.Empty{}, "b": sets.Empty{}}, }, }, + requestURI: "/bar/foo", }, { name: "return correct subscription from multiple chanrefs", + httpMethod: httpGet, desiredJson: []byte(`{"bar/foo":["a","b"]}`), responseReturnCode: http.StatusOK, - channelRef: eventingchannels.ChannelReference{ - Name: "foo", - Namespace: "bar", - }, channelSubs: map[eventingchannels.ChannelReference]*KafkaSubscription{ {Name: "table", Namespace: "flip"}: { subs: []types.UID{"c", "d"}, @@ -590,6 +583,18 @@ func TestServeHTTP(t *testing.T) { channelReadySubscriptions: sets.String{"a": sets.Empty{}, "b": sets.Empty{}}, }, }, + requestURI: "/bar/foo", + }, { + name: "bad request uri", + httpMethod: httpGet, + desiredJson: []byte{}, + responseReturnCode: http.StatusNotFound, + requestURI: "/here/be/dragons/there/are/too/many/slashes", + }, { + name: "bad request method (POST)", + httpMethod: httpPost, + desiredJson: []byte{}, + responseReturnCode: http.StatusMethodNotAllowed, }, } d := &KafkaDispatcher{ @@ -603,7 +608,9 @@ func TestServeHTTP(t *testing.T) { t.Logf("Running %s", t.Name()) d.channelSubscriptions = tc.channelSubs - resp, err := http.Get(fmt.Sprintf("%s/%s/%s", ts.URL, tc.channelRef.Namespace, tc.channelRef.Name)) + request, _ := http.NewRequest(tc.httpMethod, fmt.Sprintf("%s%s", ts.URL, tc.requestURI), nil) + // resp, err := http.Get(fmt.Sprintf("%s%s", ts.URL, tc.requestURI)) + resp, err := http.DefaultClient.Do(request) if err != nil { t.Errorf("Could not send request to subscriber endpoint: %v", err) } From a6fb63bdf747919bcd3b8b189ed7b4bc30487e32 Mon Sep 17 00:00:00 2001 From: Lukas Berk Date: Thu, 4 Feb 2021 14:10:53 -0500 Subject: [PATCH 24/24] slightly alter where we mark a handler as ready or not --- pkg/common/consumer/consumer_handler.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/common/consumer/consumer_handler.go b/pkg/common/consumer/consumer_handler.go index 75a85d30cd..092b3083fc 100644 --- a/pkg/common/consumer/consumer_handler.go +++ b/pkg/common/consumer/consumer_handler.go @@ -67,7 +67,6 @@ func (consumer *SaramaConsumerHandler) Cleanup(session sarama.ConsumerGroupSessi // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). func (consumer *SaramaConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { consumer.logger.Info(fmt.Sprintf("Starting partition consumer, topic: %s, partition: %d, initialOffset: %d", claim.Topic(), claim.Partition(), claim.InitialOffset())) - consumer.handler.SetReady(true) // NOTE: // Do not move the code below to a goroutine. @@ -86,6 +85,7 @@ func (consumer *SaramaConsumerHandler) ConsumeClaim(session sarama.ConsumerGroup if err != nil { consumer.logger.Infow("Failure while handling a message", zap.String("topic", message.Topic), zap.Int32("partition", message.Partition), zap.Int64("offset", message.Offset), zap.Error(err)) consumer.errors <- err + consumer.handler.SetReady(false) } if mustMark { @@ -93,6 +93,7 @@ func (consumer *SaramaConsumerHandler) ConsumeClaim(session sarama.ConsumerGroup if ce := consumer.logger.Desugar().Check(zap.DebugLevel, "debugging"); ce != nil { consumer.logger.Debugw("Message marked", zap.String("topic", message.Topic), zap.Binary("value", message.Value)) } + consumer.handler.SetReady(true) } }