Skip to content
This repository has been archived by the owner on Sep 2, 2024. It is now read-only.

Add Subscription prober #433

Merged
merged 17 commits into from
Mar 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config/channel/consolidated/deployments/dispatcher.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ spec:
template:
metadata:
labels:
# Do not change. Used by the controller for probing.
messaging.knative.dev/channel: kafka-channel
# Do not change. Used by the controller for probing.
messaging.knative.dev/role: dispatcher
kafka.eventing.knative.dev/release: devel
spec:
Expand Down
16 changes: 2 additions & 14 deletions config/channel/consolidated/roles/controller-clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ rules:
- apiGroups:
- "" # Core API group.
resources:
- services
- configmaps
- secrets
verbs:
Expand All @@ -51,6 +50,7 @@ rules:
- "" # Core API group.
resources:
- services
- serviceaccounts
verbs: &everything
- get
- list
Expand All @@ -74,18 +74,11 @@ rules:
- create
- patch
- update
- apiGroups:
- ""
resources:
- configmaps
verbs:
- get
- list
- watch
- apiGroups:
- "" # Core API group.
resources:
- endpoints
- pods
verbs:
- get
- list
Expand All @@ -96,11 +89,6 @@ rules:
- deployments
- deployments/status
verbs: *everything
- apiGroups:
- "" # Core API group.
resources:
- serviceaccounts
verbs: *everything
- apiGroups:
- rbac.authorization.k8s.io
resources:
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/stretchr/testify v1.6.1
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
go.opencensus.io v0.22.6
go.uber.org/atomic v1.7.0
go.uber.org/zap v1.16.0
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 // indirect
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
Expand All @@ -35,5 +36,6 @@ require (
k8s.io/utils v0.0.0-20200729134348-d5654de09c73
knative.dev/eventing v0.21.1-0.20210309092525-37e702765dbc
knative.dev/hack v0.0.0-20210305150220-f99a25560134
knative.dev/networking v0.0.0-20210304153916-f813b5904943
knative.dev/pkg v0.0.0-20210309024624-0f8d8de5949d
)
10 changes: 9 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,8 @@ github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls=
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
Expand Down Expand Up @@ -510,6 +512,7 @@ github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dv
github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.9.7/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.11.0 h1:wJbzvpYMVGG9iTI9VxpnNZfd4DzMPoCWze3GgSqz8yg=
Expand Down Expand Up @@ -687,6 +690,7 @@ github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6So
github.com/rogpeppe/fastuuid v1.2.0 h1:Ppwyp6VYCF1nvBTXL3trRso7mXMlRrw9ooo375wvi2s=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rs/dnscache v0.0.0-20210201191234-295bba877686/go.mod h1:qe5TWALJ8/a1Lqznoc5BDHpYX/8HU60Hm2AwRmqzxqA=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
Expand Down Expand Up @@ -1035,6 +1039,7 @@ golang.org/x/tools v0.0.0-20200512131952-2bc93b1c0c88/go.mod h1:EkVYQZoAsY45+roY
golang.org/x/tools v0.0.0-20200515010526-7d3b6ebf133d/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20200616133436-c1934b75d054/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20200618134242-20370b0cb4b2/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20200729194436-6467de6f59a7/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
Expand All @@ -1043,6 +1048,8 @@ golang.org/x/tools v0.0.0-20201110124207-079ba7bd75cd/go.mod h1:emZCQorbCU4vsT4f
golang.org/x/tools v0.0.0-20201201161351-ac6f37ff4c2a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20210105154028-b0ab187a4818 h1:u2ssHESKr0HP2d1wlnjMKH+V/22Vg1lGCVuXmOYU1qA=
golang.org/x/tools v0.0.0-20210105154028-b0ab187a4818/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a h1:CB3a9Nez8M13wwlr/E2YtwoU+qYHKfC+JrDa45RXXoQ=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down Expand Up @@ -1244,7 +1251,8 @@ knative.dev/hack v0.0.0-20210203173706-8368e1f6eacf h1:u4cY4jr2LYvhoz/1HBWEPsMiL
knative.dev/hack v0.0.0-20210203173706-8368e1f6eacf/go.mod h1:PHt8x8yX5Z9pPquBEfIj0X66f8iWkWfR0S/sarACJrI=
knative.dev/hack v0.0.0-20210305150220-f99a25560134 h1:lUllAp28TkevQIgWrsjow8ZLnXJy3AraRzGFm/ffD2c=
knative.dev/hack v0.0.0-20210305150220-f99a25560134/go.mod h1:PHt8x8yX5Z9pPquBEfIj0X66f8iWkWfR0S/sarACJrI=
knative.dev/pkg v0.0.0-20210303192215-8fbab7ebb77b h1:AynUh7MBUe44E60vh0vIyF2Bes4AOoTT2ejy9xrF0FU=
knative.dev/networking v0.0.0-20210304153916-f813b5904943 h1:EEAnCZzqVoTNNPMYyONXqOD3e/45OPVahA4jm8ET4/g=
knative.dev/networking v0.0.0-20210304153916-f813b5904943/go.mod h1:G+KCelFuLocMrnfayHoxqsFG+IYX4t8To1celZes77k=
knative.dev/pkg v0.0.0-20210303192215-8fbab7ebb77b/go.mod h1:TJSdebQOWX5N2bszohOYVi0H1QtXbtlYLuMghAFBMhY=
knative.dev/pkg v0.0.0-20210308052421-737401c38b22/go.mod h1:fP690UCcs5x+qQVhjJxNcm97OWIiUdFC1dqbD3Gsp64=
knative.dev/pkg v0.0.0-20210309024624-0f8d8de5949d h1:2Uc3qyLRLIYOqJrGGKFkJc69X+cxlhoH3jk7p4b4KFM=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,17 @@ type consumerMessageHandler struct {
sub Subscription
dispatcher *eventingchannels.MessageDispatcherImpl
kafkaSubscription *KafkaSubscription
consumerGroup string
}

var _ consumer.KafkaConsumerHandler = (*consumerMessageHandler)(nil)

func (c consumerMessageHandler) SetReady(ready bool) {
c.kafkaSubscription.SetReady(c.sub.UID, ready)
func (c consumerMessageHandler) GetConsumerGroup() string {
return c.consumerGroup
}

func (c consumerMessageHandler) SetReady(partition int32, ready bool) {
c.kafkaSubscription.SetReady(c.sub.UID, partition, ready)
}

func (c consumerMessageHandler) Handle(ctx context.Context, consumerMessage *sarama.ConsumerMessage) (bool, error) {
Expand Down
41 changes: 24 additions & 17 deletions pkg/channel/consolidated/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,23 @@ import (
"sync"
"sync/atomic"

"k8s.io/apimachinery/pkg/util/sets"

"knative.dev/eventing-kafka/pkg/common/client"
"knative.dev/eventing-kafka/pkg/common/tracing"

"github.com/Shopify/sarama"
protocolkafka "github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2"
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/google/uuid"
"go.opencensus.io/trace"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/types"
eventingchannels "knative.dev/eventing/pkg/channel"
"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/pkg/kmeta"
"k8s.io/apimachinery/pkg/util/sets"

"knative.dev/eventing-kafka/pkg/channel/consolidated/utils"
"knative.dev/eventing-kafka/pkg/channel/distributed/common/env"
"knative.dev/eventing-kafka/pkg/common/client"
"knative.dev/eventing-kafka/pkg/common/consumer"
"knative.dev/eventing-kafka/pkg/common/tracing"
eventingchannels "knative.dev/eventing/pkg/channel"
"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/pkg/kmeta"
)

const (
Expand Down Expand Up @@ -191,9 +189,11 @@ func (d *KafkaDispatcher) ServeHTTP(w nethttp.ResponseWriter, r *nethttp.Request
}
d.channelSubscriptions[channelRef].readySubscriptionsLock.RLock()
defer d.channelSubscriptions[channelRef].readySubscriptionsLock.RUnlock()
var subscriptions = make(map[string][]string)
var subscriptions = make(map[string][]int32)
w.Header().Set(dispatcherReadySubHeader, channelRefName)
subscriptions[channelRefNamespace+"/"+channelRefName] = d.channelSubscriptions[channelRef].channelReadySubscriptions.List()
for s, ps := range d.channelSubscriptions[channelRef].channelReadySubscriptions {
subscriptions[s] = ps.List()
}
jsonResult, err := json.Marshal(subscriptions)
if err != nil {
d.logger.Errorf("Error marshalling json for sub-status channelref: %s/%s, %w", channelRefNamespace, channelRefName, err)
Expand Down Expand Up @@ -234,8 +234,9 @@ func (d *KafkaDispatcher) UpdateKafkaConsumers(config *Config) (map[types.UID]er
}
} else { //ensure the pointer is populated or things go boom
d.channelSubscriptions[channelRef] = &KafkaSubscription{
logger: d.logger,
subs: []types.UID{},
channelReadySubscriptions: sets.String{},
channelReadySubscriptions: map[string]sets.Int32{},
}
}

Expand Down Expand Up @@ -291,13 +292,18 @@ func (d *KafkaDispatcher) UpdateHostToChannelMap(config *Config) error {
// subscribe reads kafkaConsumers which gets updated in UpdateConfig in a separate go-routine.
// subscribe must be called under updateLock.
func (d *KafkaDispatcher) subscribe(channelRef eventingchannels.ChannelReference, sub Subscription) error {
d.logger.Info("Subscribing", zap.Any("channelRef", channelRef), zap.Any("subscription", sub.UID))

d.logger.Infow("Subscribing to Kafka Channel", zap.Any("channelRef", channelRef), zap.Any("subscription", sub.UID))
topicName := d.topicFunc(utils.KafkaChannelSeparator, channelRef.Namespace, channelRef.Name)
groupID := fmt.Sprintf("kafka.%s.%s.%s", channelRef.Namespace, channelRef.Name, string(sub.UID))

handler := &consumerMessageHandler{d.logger, sub, d.dispatcher, d.channelSubscriptions[channelRef]}

handler := &consumerMessageHandler{
d.logger,
sub,
d.dispatcher,
d.channelSubscriptions[channelRef],
groupID,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to pass the groupID? it looks like it can be constructed by all the parameters passed to the consumerMessageHandler{}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's because consumerMessageHandler shouldn't presume how the ConsumerGroup gets calculated. here it's like this, in KafkaSource (which reuses the consumeMessageHandler) does something different.

}
d.logger.Debugw("Starting consumer group", zap.Any("channelRef", channelRef),
zap.Any("subscription", sub.UID), zap.String("topic", topicName), zap.String("consumer group", groupID))
consumerGroup, err := d.kafkaConsumerFactory.StartConsumerGroup(groupID, []string{topicName}, d.logger, handler)

if err != nil {
Expand All @@ -324,7 +330,7 @@ func (d *KafkaDispatcher) subscribe(channelRef eventingchannels.ChannelReference
// unsubscribe reads kafkaConsumers which gets updated in UpdateConfig in a separate go-routine.
// unsubscribe must be called under updateLock.
func (d *KafkaDispatcher) unsubscribe(channel eventingchannels.ChannelReference, sub Subscription) error {
d.logger.Infow("Unsubscribing from channel", zap.Any("channel", channel), zap.String("subscription", sub.String()))
d.logger.Infow("Unsubscribing from channel", zap.Any("channel", channel), zap.Any("subscription", sub.UID))
delete(d.subscriptions, sub.UID)
if _, ok := d.channelSubscriptions[channel]; !ok {
return nil
Expand All @@ -340,6 +346,7 @@ func (d *KafkaDispatcher) unsubscribe(channel eventingchannels.ChannelReference,
}
if consumer, ok := d.subsConsumerGroups[sub.UID]; ok {
delete(d.subsConsumerGroups, sub.UID)
d.logger.Debugw("Closing cached consumer group", zap.Any("consumer group", consumer))
return consumer.Close()
}
return nil
Expand Down
Loading