Skip to content

Commit

Permalink
[0.19.1] Backports (#159)
Browse files Browse the repository at this point in the history
* Backport (#45)

* Fix #347 (#354)

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

Co-authored-by: slinkydeveloper <francescoguard@gmail.com>

* Fix #342 (#355)

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

Co-authored-by: slinkydeveloper <francescoguard@gmail.com>

Co-authored-by: Knative Prow Robot <knative-prow-robot@google.com>

* Use CE connection args available, instead of hardcoded values (#440)

Co-authored-by: Francesco Guardiani <francescoguard@gmail.com>
Co-authored-by: Knative Prow Robot <knative-prow-robot@google.com>
Co-authored-by: Ali Ok <aliok@redhat.com>
  • Loading branch information
4 people authored Apr 19, 2021
1 parent 2eccc24 commit 8b65276
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 20 deletions.
46 changes: 31 additions & 15 deletions pkg/channel/consolidated/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@ import (
"go.opencensus.io/trace"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/types"
"knative.dev/eventing-kafka/pkg/channel/consolidated/utils"
"knative.dev/eventing-kafka/pkg/channel/distributed/common/env"
"knative.dev/eventing-kafka/pkg/common/consumer"
eventingchannels "knative.dev/eventing/pkg/channel"
"knative.dev/eventing/pkg/channel/fanout"
"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/pkg/kmeta"

"knative.dev/eventing-kafka/pkg/channel/consolidated/utils"
"knative.dev/eventing-kafka/pkg/channel/distributed/common/env"
"knative.dev/eventing-kafka/pkg/common/consumer"
)

type KafkaDispatcher struct {
Expand Down Expand Up @@ -106,6 +107,9 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat
return nil, fmt.Errorf("unable to create kafka producer against Kafka bootstrap servers %v : %v", args.Brokers, err)
}

// Configure connection arguments
kncloudevents.ConfigureConnectionArgs(args.KnCEConnectionArgs)

dispatcher := &KafkaDispatcher{
dispatcher: eventingchannels.NewMessageDispatcher(args.Logger.Desugar()),
kafkaConsumerFactory: consumer.NewConsumerGroupFactory(args.Brokers, conf),
Expand Down Expand Up @@ -270,26 +274,38 @@ func (d *KafkaDispatcher) UpdateKafkaConsumers(config *Config) (map[types.UID]er
d.logger.Debug("Number of subs failed to subscribe", zap.Any("subs", len(failedToSubscribe)))

// Unsubscribe and close consumer for any deleted subscriptions
for channelRef, subs := range d.channelSubscriptions {
for _, oldSub := range subs {
removedSub := true
for _, s := range newSubs {
if s == oldSub {
removedSub = false
}
}
subsToRemove := make(map[eventingchannels.ChannelReference][]types.UID)
for channelRef, actualSubs := range d.channelSubscriptions {
subsToRemove[channelRef] = uidSetDifference(actualSubs, newSubs)
}

if removedSub {
if err := d.unsubscribe(channelRef, d.subscriptions[oldSub]); err != nil {
return nil, err
}
for channelRef, subs := range subsToRemove {
for _, s := range subs {
if err := d.unsubscribe(channelRef, d.subscriptions[s]); err != nil {
return nil, err
}
}
d.channelSubscriptions[channelRef] = newSubs
}

return failedToSubscribe, nil
}

func uidSetDifference(a, b []types.UID) (diff []types.UID) {
m := make(map[types.UID]bool)

for _, item := range b {
m[item] = true
}

for _, item := range a {
if _, ok := m[item]; !ok {
diff = append(diff, item)
}
}
return
}

// UpdateHostToChannelMap will be called by new CRD based kafka channel dispatcher controller.
func (d *KafkaDispatcher) UpdateHostToChannelMap(config *Config) error {
if config == nil {
Expand Down
13 changes: 8 additions & 5 deletions pkg/channel/consolidated/dispatcher/dispatcher_it_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,14 @@ func TestDispatcher(t *testing.T) {
})

dispatcherArgs := KafkaDispatcherArgs{
KnCEConnectionArgs: nil,
ClientID: "testing",
Brokers: []string{"localhost:9092"},
TopicFunc: utils.TopicName,
Logger: logger.Sugar(),
KnCEConnectionArgs: &kncloudevents.ConnectionArgs{
MaxIdleConns: 1000,
MaxIdleConnsPerHost: 100,
},
ClientID: "testing",
Brokers: []string{"localhost:9092"},
TopicFunc: utils.TopicName,
Logger: logger.Sugar(),
}

// Create the dispatcher. At this point, if Kafka is not up, this thing fails
Expand Down

0 comments on commit 8b65276

Please sign in to comment.