From 0947a728f81280494de0e9ed2f43835cf9c4083d Mon Sep 17 00:00:00 2001 From: Francesco Guardiani Date: Tue, 2 Feb 2021 10:37:33 +0100 Subject: [PATCH] Fix #347 (#352) Signed-off-by: Francesco Guardiani --- .../consolidated/dispatcher/dispatcher.go | 43 ++++++++++++------- 1 file changed, 28 insertions(+), 15 deletions(-) diff --git a/pkg/channel/consolidated/dispatcher/dispatcher.go b/pkg/channel/consolidated/dispatcher/dispatcher.go index 7298c35292..289d745ecf 100644 --- a/pkg/channel/consolidated/dispatcher/dispatcher.go +++ b/pkg/channel/consolidated/dispatcher/dispatcher.go @@ -34,13 +34,14 @@ import ( "go.opencensus.io/trace" "go.uber.org/zap" "k8s.io/apimachinery/pkg/types" - "knative.dev/eventing-kafka/pkg/channel/consolidated/utils" - "knative.dev/eventing-kafka/pkg/channel/distributed/common/env" - "knative.dev/eventing-kafka/pkg/common/consumer" eventingchannels "knative.dev/eventing/pkg/channel" "knative.dev/eventing/pkg/channel/fanout" "knative.dev/eventing/pkg/kncloudevents" "knative.dev/pkg/kmeta" + + "knative.dev/eventing-kafka/pkg/channel/consolidated/utils" + "knative.dev/eventing-kafka/pkg/channel/distributed/common/env" + "knative.dev/eventing-kafka/pkg/common/consumer" ) type KafkaDispatcher struct { @@ -269,26 +270,38 @@ func (d *KafkaDispatcher) UpdateKafkaConsumers(config *Config) (map[types.UID]er d.logger.Debug("Number of subs failed to subscribe", zap.Any("subs", len(failedToSubscribe))) // Unsubscribe and close consumer for any deleted subscriptions - for channelRef, subs := range d.channelSubscriptions { - for _, oldSub := range subs { - removedSub := true - for _, s := range newSubs { - if s == oldSub { - removedSub = false - } - } + subsToRemove := make(map[eventingchannels.ChannelReference][]types.UID) + for channelRef, actualSubs := range d.channelSubscriptions { + subsToRemove[channelRef] = uidSetDifference(actualSubs, newSubs) + } - if removedSub { - if err := d.unsubscribe(channelRef, d.subscriptions[oldSub]); err != nil { - return nil, err - } + for channelRef, subs := range subsToRemove { + for _, s := range subs { + if err := d.unsubscribe(channelRef, d.subscriptions[s]); err != nil { + return nil, err } } d.channelSubscriptions[channelRef] = newSubs } + return failedToSubscribe, nil } +func uidSetDifference(a, b []types.UID) (diff []types.UID) { + m := make(map[types.UID]bool) + + for _, item := range b { + m[item] = true + } + + for _, item := range a { + if _, ok := m[item]; !ok { + diff = append(diff, item) + } + } + return +} + // UpdateHostToChannelMap will be called by new CRD based kafka channel dispatcher controller. func (d *KafkaDispatcher) UpdateHostToChannelMap(config *Config) error { if config == nil {