Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

Co-authored-by: slinkydeveloper <francescoguard@gmail.com>
  • Loading branch information
knative-prow-robot and slinkydeveloper committed Feb 2, 2021
1 parent e4f2a7e commit 60fe9e4
Showing 1 changed file with 28 additions and 15 deletions.
43 changes: 28 additions & 15 deletions pkg/channel/consolidated/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@ import (
"go.opencensus.io/trace"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/types"
"knative.dev/eventing-kafka/pkg/channel/consolidated/utils"
"knative.dev/eventing-kafka/pkg/channel/distributed/common/env"
"knative.dev/eventing-kafka/pkg/common/consumer"
eventingchannels "knative.dev/eventing/pkg/channel"
"knative.dev/eventing/pkg/channel/fanout"
"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/pkg/kmeta"

"knative.dev/eventing-kafka/pkg/channel/consolidated/utils"
"knative.dev/eventing-kafka/pkg/channel/distributed/common/env"
"knative.dev/eventing-kafka/pkg/common/consumer"
)

type KafkaDispatcher struct {
Expand Down Expand Up @@ -268,26 +269,38 @@ func (d *KafkaDispatcher) UpdateKafkaConsumers(config *Config) (map[types.UID]er
d.logger.Debug("Number of subs failed to subscribe", zap.Any("subs", len(failedToSubscribe)))

// Unsubscribe and close consumer for any deleted subscriptions
for channelRef, subs := range d.channelSubscriptions {
for _, oldSub := range subs {
removedSub := true
for _, s := range newSubs {
if s == oldSub {
removedSub = false
}
}
subsToRemove := make(map[eventingchannels.ChannelReference][]types.UID)
for channelRef, actualSubs := range d.channelSubscriptions {
subsToRemove[channelRef] = uidSetDifference(actualSubs, newSubs)
}

if removedSub {
if err := d.unsubscribe(channelRef, d.subscriptions[oldSub]); err != nil {
return nil, err
}
for channelRef, subs := range subsToRemove {
for _, s := range subs {
if err := d.unsubscribe(channelRef, d.subscriptions[s]); err != nil {
return nil, err
}
}
d.channelSubscriptions[channelRef] = newSubs
}

return failedToSubscribe, nil
}

func uidSetDifference(a, b []types.UID) (diff []types.UID) {
m := make(map[types.UID]bool)

for _, item := range b {
m[item] = true
}

for _, item := range a {
if _, ok := m[item]; !ok {
diff = append(diff, item)
}
}
return
}

// UpdateHostToChannelMap will be called by new CRD based kafka channel dispatcher controller.
func (d *KafkaDispatcher) UpdateHostToChannelMap(config *Config) error {
if config == nil {
Expand Down

0 comments on commit 60fe9e4

Please sign in to comment.