diff --git a/config/channel/consolidated/deployments/dispatcher.yaml b/config/channel/consolidated/deployments/dispatcher.yaml index 45977778ca..896f9ea9e0 100644 --- a/config/channel/consolidated/deployments/dispatcher.yaml +++ b/config/channel/consolidated/deployments/dispatcher.yaml @@ -60,6 +60,9 @@ spec: - containerPort: 9090 name: metrics protocol: TCP + - containerPort: 8081 + name: sub-status + protocol: TCP volumeMounts: - name: config-kafka mountPath: /etc/config-kafka @@ -85,6 +88,10 @@ spec: port: 80 protocol: TCP targetPort: 8080 + - name: http-sub-status + port: 8081 + protocol: TCP + targetPort: 8081 selector: messaging.knative.dev/channel: kafka-channel messaging.knative.dev/role: dispatcher diff --git a/pkg/channel/consolidated/dispatcher/dispatcher.go b/pkg/channel/consolidated/dispatcher/dispatcher.go index 5bf3843154..c79d675d0c 100644 --- a/pkg/channel/consolidated/dispatcher/dispatcher.go +++ b/pkg/channel/consolidated/dispatcher/dispatcher.go @@ -1,5 +1,5 @@ /* -Copyright 2018 The Knative Authors +Copyright 2021 The Knative Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package dispatcher import ( "context" + "encoding/json" "errors" "fmt" nethttp "net/http" @@ -24,6 +25,8 @@ import ( "sync" "sync/atomic" + "k8s.io/apimachinery/pkg/util/sets" + "knative.dev/eventing-kafka/pkg/common/client" "knative.dev/eventing-kafka/pkg/common/tracing" @@ -44,6 +47,18 @@ import ( "knative.dev/eventing-kafka/pkg/common/consumer" ) +const ( + dispatcherReadySubHeader = "K-Subscriber-Status" +) + +type KafkaSubscription struct { + subs []types.UID + + // readySubscriptionsLock must be used to synchronize access to channelReadySubscriptions + readySubscriptionsLock sync.RWMutex + channelReadySubscriptions sets.String +} + type KafkaDispatcher struct { hostToChannelMap atomic.Value // hostToChannelMapLock is used to update hostToChannelMap @@ -53,7 +68,7 @@ type KafkaDispatcher struct { dispatcher *eventingchannels.MessageDispatcherImpl kafkaSyncProducer sarama.SyncProducer - channelSubscriptions map[eventingchannels.ChannelReference][]types.UID + channelSubscriptions map[eventingchannels.ChannelReference]*KafkaSubscription subsConsumerGroups map[types.UID]sarama.ConsumerGroup subscriptions map[types.UID]Subscription // consumerUpdateLock must be used to update kafkaConsumers @@ -88,6 +103,43 @@ func (sub Subscription) String() string { return s.String() } +func (d *KafkaDispatcher) ServeHTTP(w nethttp.ResponseWriter, r *nethttp.Request) { + if r.Method != nethttp.MethodGet { + w.WriteHeader(nethttp.StatusMethodNotAllowed) + d.logger.Errorf("Received request method that wasn't GET: %s", r.Method) + return + } + uriSplit := strings.Split(r.RequestURI, "/") + if len(uriSplit) != 3 { + w.WriteHeader(nethttp.StatusNotFound) + d.logger.Errorf("Unable to process request: %s", r.RequestURI) + return + } + channelRefNamespace, channelRefName := uriSplit[1], uriSplit[2] + channelRef := eventingchannels.ChannelReference{ + Name: channelRefName, + Namespace: channelRefNamespace, + } + if _, ok := d.channelSubscriptions[channelRef]; !ok { + w.WriteHeader(nethttp.StatusNotFound) + return + } + d.channelSubscriptions[channelRef].readySubscriptionsLock.RLock() + defer d.channelSubscriptions[channelRef].readySubscriptionsLock.RUnlock() + var subscriptions = make(map[string][]string) + w.Header().Set(dispatcherReadySubHeader, channelRefName) + subscriptions[channelRefNamespace+"/"+channelRefName] = d.channelSubscriptions[channelRef].channelReadySubscriptions.List() + jsonResult, err := json.Marshal(subscriptions) + if err != nil { + d.logger.Errorf("Error marshalling json for sub-status channelref: %s/%s, %w", channelRefNamespace, channelRefName, err) + return + } + _, err = w.Write(jsonResult) + if err != nil { + d.logger.Errorf("Error writing jsonResult to serveHTTP writer: %w", err) + } +} + func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispatcher, error) { conf, err := client.NewConfigBuilder(). WithClientId(args.ClientID). @@ -114,7 +166,7 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat dispatcher := &KafkaDispatcher{ dispatcher: eventingchannels.NewMessageDispatcher(args.Logger.Desugar()), kafkaConsumerFactory: consumer.NewConsumerGroupFactory(args.Brokers, conf), - channelSubscriptions: make(map[eventingchannels.ChannelReference][]types.UID), + channelSubscriptions: make(map[eventingchannels.ChannelReference]*KafkaSubscription), subsConsumerGroups: make(map[types.UID]sarama.ConsumerGroup), subscriptions: make(map[types.UID]Subscription), kafkaSyncProducer: producer, @@ -122,6 +174,10 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat topicFunc: args.TopicFunc, } + go func() { + dispatcher.logger.Fatal(nethttp.ListenAndServe(":8081", dispatcher)) + }() + podName, err := env.GetRequiredConfigValue(args.Logger.Desugar(), env.PodNameEnvVarKey) if err != nil { return nil, err @@ -180,9 +236,14 @@ type KafkaDispatcherArgs struct { } type consumerMessageHandler struct { - logger *zap.SugaredLogger - sub Subscription - dispatcher *eventingchannels.MessageDispatcherImpl + logger *zap.SugaredLogger + sub Subscription + dispatcher *eventingchannels.MessageDispatcherImpl + kafkaSubscription *KafkaSubscription +} + +func (c consumerMessageHandler) SetReady(ready bool) { + c.kafkaSubscription.SetReady(c.sub.UID, ready) } func (c consumerMessageHandler) Handle(ctx context.Context, consumerMessage *sarama.ConsumerMessage) (bool, error) { @@ -235,6 +296,22 @@ type ChannelConfig struct { Subscriptions []Subscription } +// SetReady will mark the subid in the KafkaSubscription and call any registered callbacks +func (ks *KafkaSubscription) SetReady(subID types.UID, ready bool) { + ks.readySubscriptionsLock.Lock() + defer ks.readySubscriptionsLock.Unlock() + if ready { + if !ks.channelReadySubscriptions.Has(string(subID)) { + ks.channelReadySubscriptions.Insert(string(subID)) + } + } else { + if ks.channelReadySubscriptions.Has(string(subID)) { + ks.channelReadySubscriptions.Delete(string(subID)) + } + } + +} + // UpdateKafkaConsumers will be called by new CRD based kafka channel dispatcher controller. func (d *KafkaDispatcher) UpdateKafkaConsumers(config *Config) (map[types.UID]error, error) { if config == nil { @@ -256,9 +333,16 @@ func (d *KafkaDispatcher) UpdateKafkaConsumers(config *Config) (map[types.UID]er // Check if sub already exists exists := false - for _, s := range d.channelSubscriptions[channelRef] { - if s == subSpec.UID { - exists = true + if _, ok := d.channelSubscriptions[channelRef]; ok { + for _, s := range d.channelSubscriptions[channelRef].subs { + if s == subSpec.UID { + exists = true + } + } + } else { //ensure the pointer is populated or things go boom + d.channelSubscriptions[channelRef] = &KafkaSubscription{ + subs: []types.UID{}, + channelReadySubscriptions: sets.String{}, } } @@ -278,7 +362,7 @@ func (d *KafkaDispatcher) UpdateKafkaConsumers(config *Config) (map[types.UID]er // Unsubscribe and close consumer for any deleted subscriptions subsToRemove := make(map[eventingchannels.ChannelReference][]types.UID) for channelRef, actualSubs := range d.channelSubscriptions { - subsToRemove[channelRef] = uidSetDifference(actualSubs, newSubs) + subsToRemove[channelRef] = uidSetDifference(actualSubs.subs, newSubs) } for channelRef, subs := range subsToRemove { @@ -287,7 +371,7 @@ func (d *KafkaDispatcher) UpdateKafkaConsumers(config *Config) (map[types.UID]er return nil, err } } - d.channelSubscriptions[channelRef] = newSubs + d.channelSubscriptions[channelRef].subs = newSubs } return failedToSubscribe, nil @@ -360,7 +444,7 @@ func (d *KafkaDispatcher) subscribe(channelRef eventingchannels.ChannelReference topicName := d.topicFunc(utils.KafkaChannelSeparator, channelRef.Namespace, channelRef.Name) groupID := fmt.Sprintf("kafka.%s.%s.%s", channelRef.Namespace, channelRef.Name, string(sub.UID)) - handler := &consumerMessageHandler{d.logger, sub, d.dispatcher} + handler := &consumerMessageHandler{d.logger, sub, d.dispatcher, d.channelSubscriptions[channelRef]} consumerGroup, err := d.kafkaConsumerFactory.StartConsumerGroup(groupID, []string{topicName}, d.logger, handler) @@ -378,7 +462,7 @@ func (d *KafkaDispatcher) subscribe(channelRef eventingchannels.ChannelReference } }() - d.channelSubscriptions[channelRef] = append(d.channelSubscriptions[channelRef], sub.UID) + d.channelSubscriptions[channelRef].subs = append(d.channelSubscriptions[channelRef].subs, sub.UID) d.subscriptions[sub.UID] = sub d.subsConsumerGroups[sub.UID] = consumerGroup @@ -390,14 +474,17 @@ func (d *KafkaDispatcher) subscribe(channelRef eventingchannels.ChannelReference func (d *KafkaDispatcher) unsubscribe(channel eventingchannels.ChannelReference, sub Subscription) error { d.logger.Infow("Unsubscribing from channel", zap.Any("channel", channel), zap.String("subscription", sub.String())) delete(d.subscriptions, sub.UID) - if subsSlice, ok := d.channelSubscriptions[channel]; ok { + if _, ok := d.channelSubscriptions[channel]; !ok { + return nil + } + if subsSlice := d.channelSubscriptions[channel].subs; subsSlice != nil { var newSlice []types.UID for _, oldSub := range subsSlice { if oldSub != sub.UID { newSlice = append(newSlice, oldSub) } } - d.channelSubscriptions[channel] = newSlice + d.channelSubscriptions[channel].subs = newSlice } if consumer, ok := d.subsConsumerGroups[sub.UID]; ok { delete(d.subsConsumerGroups, sub.UID) diff --git a/pkg/channel/consolidated/dispatcher/dispatcher_test.go b/pkg/channel/consolidated/dispatcher/dispatcher_test.go index 543ca7e2c2..50375af28f 100644 --- a/pkg/channel/consolidated/dispatcher/dispatcher_test.go +++ b/pkg/channel/consolidated/dispatcher/dispatcher_test.go @@ -1,5 +1,5 @@ /* -Copyright 2019 The Knative Authors +Copyright 2021 The Knative Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -19,6 +19,10 @@ package dispatcher import ( "context" "errors" + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" "net/url" "testing" @@ -320,7 +324,7 @@ func TestDispatcher_UpdateConfig(t *testing.T) { t.Logf("Running %s", t.Name()) d := &KafkaDispatcher{ kafkaConsumerFactory: &mockKafkaConsumerFactory{}, - channelSubscriptions: make(map[eventingchannels.ChannelReference][]types.UID), + channelSubscriptions: make(map[eventingchannels.ChannelReference]*KafkaSubscription), subsConsumerGroups: make(map[types.UID]sarama.ConsumerGroup), subscriptions: make(map[types.UID]Subscription), topicFunc: utils.TopicName, @@ -439,6 +443,195 @@ func TestNewDispatcher(t *testing.T) { } } +func TestSetReady(t *testing.T) { + testCases := []struct { + name string + ready bool + subID types.UID + originalKafkaSub *KafkaSubscription + desiredKafkaSub *KafkaSubscription + }{ + { + name: "doesn't have the sub, add it (on ready)", + ready: true, + subID: "foo", + originalKafkaSub: &KafkaSubscription{ + channelReadySubscriptions: sets.String{"bar": sets.Empty{}}, + }, + desiredKafkaSub: &KafkaSubscription{ + subs: []types.UID{}, + channelReadySubscriptions: sets.String{"bar": sets.Empty{}, "foo": sets.Empty{}}, + }, + }, + { + name: "has the sub already (on ready)", + ready: true, + subID: "foo", + originalKafkaSub: &KafkaSubscription{ + channelReadySubscriptions: sets.String{"foo": sets.Empty{}, "bar": sets.Empty{}}, + }, + desiredKafkaSub: &KafkaSubscription{ + channelReadySubscriptions: sets.String{"foo": sets.Empty{}, "bar": sets.Empty{}}, + }, + }, + { + name: "has the sub, delete it (on !ready)", + ready: false, + subID: "foo", + originalKafkaSub: &KafkaSubscription{ + channelReadySubscriptions: sets.String{"foo": sets.Empty{}, "bar": sets.Empty{}}, + }, + desiredKafkaSub: &KafkaSubscription{ + channelReadySubscriptions: sets.String{"bar": sets.Empty{}}, + }, + }, + { + name: "doesn't have the sub to delete (on !ready)", + ready: false, + subID: "foo", + originalKafkaSub: &KafkaSubscription{ + channelReadySubscriptions: sets.String{"bar": sets.Empty{}}, + }, + desiredKafkaSub: &KafkaSubscription{ + channelReadySubscriptions: sets.String{"bar": sets.Empty{}}, + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Logf("Running %s", t.Name()) + tc.originalKafkaSub.SetReady(tc.subID, tc.ready) + if diff := cmp.Diff(tc.desiredKafkaSub.channelReadySubscriptions, tc.originalKafkaSub.channelReadySubscriptions); diff != "" { + t.Errorf("unexpected ChannelReadySubscription (-want, +got) = %v", diff) + } + }) + } +} + +func TestServeHTTP(t *testing.T) { + + httpGet := "GET" + httpPost := "POST" + testCases := []struct { + name string + responseReturnCode int + desiredJson []byte + channelSubs map[eventingchannels.ChannelReference]*KafkaSubscription + requestURI string + httpMethod string + }{ + { + name: "channelref not found", + httpMethod: httpGet, + responseReturnCode: http.StatusNotFound, + desiredJson: []byte{}, + requestURI: "/exist/thisDoesNot", + }, { + name: "nop", + httpMethod: httpGet, + responseReturnCode: http.StatusNotFound, + desiredJson: []byte{}, + requestURI: "///", + }, { + name: "no ready subscribers", + httpMethod: httpGet, + responseReturnCode: http.StatusOK, + desiredJson: []byte(`{"bar/foo":[]}`), + channelSubs: map[eventingchannels.ChannelReference]*KafkaSubscription{ + {Name: "foo", Namespace: "bar"}: { + subs: []types.UID{}, + channelReadySubscriptions: sets.String{}, + }, + }, + requestURI: "/bar/foo", + }, { + name: "different channelref called from populated channref (different ns)", + httpMethod: httpGet, + desiredJson: []byte{}, + responseReturnCode: http.StatusNotFound, + channelSubs: map[eventingchannels.ChannelReference]*KafkaSubscription{ + {Name: "foo", Namespace: "baz"}: { + subs: []types.UID{"a", "b"}, + channelReadySubscriptions: sets.String{"a": sets.Empty{}, "b": sets.Empty{}}, + }, + }, + requestURI: "/bar/foo", + }, { + name: "return correct subscription", + httpMethod: httpGet, + desiredJson: []byte(`{"bar/foo":["a","b"]}`), + responseReturnCode: http.StatusOK, + channelSubs: map[eventingchannels.ChannelReference]*KafkaSubscription{ + {Name: "foo", Namespace: "bar"}: { + subs: []types.UID{"a", "b"}, + channelReadySubscriptions: sets.String{"a": sets.Empty{}, "b": sets.Empty{}}, + }, + }, + requestURI: "/bar/foo", + }, { + name: "return correct subscription from multiple chanrefs", + httpMethod: httpGet, + desiredJson: []byte(`{"bar/foo":["a","b"]}`), + responseReturnCode: http.StatusOK, + channelSubs: map[eventingchannels.ChannelReference]*KafkaSubscription{ + {Name: "table", Namespace: "flip"}: { + subs: []types.UID{"c", "d"}, + channelReadySubscriptions: sets.String{"c": sets.Empty{}}, + }, + {Name: "foo", Namespace: "bar"}: { + subs: []types.UID{"a", "b"}, + channelReadySubscriptions: sets.String{"a": sets.Empty{}, "b": sets.Empty{}}, + }, + }, + requestURI: "/bar/foo", + }, { + name: "bad request uri", + httpMethod: httpGet, + desiredJson: []byte{}, + responseReturnCode: http.StatusNotFound, + requestURI: "/here/be/dragons/there/are/too/many/slashes", + }, { + name: "bad request method (POST)", + httpMethod: httpPost, + desiredJson: []byte{}, + responseReturnCode: http.StatusMethodNotAllowed, + }, + } + d := &KafkaDispatcher{ + channelSubscriptions: make(map[eventingchannels.ChannelReference]*KafkaSubscription), + logger: zaptest.NewLogger(t).Sugar(), + } + ts := httptest.NewServer(d) + defer ts.Close() + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Logf("Running %s", t.Name()) + d.channelSubscriptions = tc.channelSubs + + request, _ := http.NewRequest(tc.httpMethod, fmt.Sprintf("%s%s", ts.URL, tc.requestURI), nil) + // resp, err := http.Get(fmt.Sprintf("%s%s", ts.URL, tc.requestURI)) + resp, err := http.DefaultClient.Do(request) + if err != nil { + t.Errorf("Could not send request to subscriber endpoint: %v", err) + } + if resp.StatusCode != tc.responseReturnCode { + t.Errorf("unepxected status returned: want: %d, got: %d", tc.responseReturnCode, resp.StatusCode) + } + respBody, err := ioutil.ReadAll(resp.Body) + defer resp.Body.Close() + if err != nil { + t.Errorf("Could not read response from subscriber endpoint: %v", err) + } + if testing.Verbose() && len(respBody) > 0 { + t.Logf("http response: %s\n", string(respBody)) + } + if diff := cmp.Diff(tc.desiredJson, respBody); diff != "" { + t.Errorf("unexpected readysubscriber status response: (-want, +got) = %v", diff) + } + }) + } +} + var sortStrings = cmpopts.SortSlices(func(x, y string) bool { return x < y }) diff --git a/pkg/channel/consolidated/reconciler/dispatcher/kafkachannel.go b/pkg/channel/consolidated/reconciler/dispatcher/kafkachannel.go index c34b6d06f9..e1e01a38d7 100644 --- a/pkg/channel/consolidated/reconciler/dispatcher/kafkachannel.go +++ b/pkg/channel/consolidated/reconciler/dispatcher/kafkachannel.go @@ -1,5 +1,5 @@ /* -Copyright 2019 The Knative Authors +Copyright 2021 The Knative Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/pkg/common/consumer/consumer_handler.go b/pkg/common/consumer/consumer_handler.go index b27ccecdb1..092b3083fc 100644 --- a/pkg/common/consumer/consumer_handler.go +++ b/pkg/common/consumer/consumer_handler.go @@ -1,5 +1,5 @@ /* -Copyright 2019 The Knative Authors +Copyright 2021 The Knative Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -28,6 +28,7 @@ type KafkaConsumerHandler interface { // When this function returns true, the consumer group offset is marked as consumed. // The returned error is enqueued in errors channel. Handle(context context.Context, message *sarama.ConsumerMessage) (bool, error) + SetReady(ready bool) } // ConsumerHandler implements sarama.ConsumerGroupHandler and provides some glue code to simplify message handling @@ -59,6 +60,7 @@ func (consumer *SaramaConsumerHandler) Setup(sarama.ConsumerGroupSession) error // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited func (consumer *SaramaConsumerHandler) Cleanup(session sarama.ConsumerGroupSession) error { consumer.logger.Info("cleanup handler") + consumer.handler.SetReady(false) return nil } @@ -83,6 +85,7 @@ func (consumer *SaramaConsumerHandler) ConsumeClaim(session sarama.ConsumerGroup if err != nil { consumer.logger.Infow("Failure while handling a message", zap.String("topic", message.Topic), zap.Int32("partition", message.Partition), zap.Int64("offset", message.Offset), zap.Error(err)) consumer.errors <- err + consumer.handler.SetReady(false) } if mustMark { @@ -90,6 +93,7 @@ func (consumer *SaramaConsumerHandler) ConsumeClaim(session sarama.ConsumerGroup if ce := consumer.logger.Desugar().Check(zap.DebugLevel, "debugging"); ce != nil { consumer.logger.Debugw("Message marked", zap.String("topic", message.Topic), zap.Binary("value", message.Value)) } + consumer.handler.SetReady(true) } } diff --git a/pkg/common/consumer/consumer_handler_test.go b/pkg/common/consumer/consumer_handler_test.go index 3548696113..14953e6c1a 100644 --- a/pkg/common/consumer/consumer_handler_test.go +++ b/pkg/common/consumer/consumer_handler_test.go @@ -114,6 +114,9 @@ func (m mockMessageHandler) Handle(ctx context.Context, message *sarama.Consumer } } +func (m mockMessageHandler) SetReady(ready bool) { +} + //------ Tests func Test(t *testing.T) { diff --git a/pkg/source/adapter/adapter.go b/pkg/source/adapter/adapter.go index 1450c0ef5a..2dba25b4f7 100644 --- a/pkg/source/adapter/adapter.go +++ b/pkg/source/adapter/adapter.go @@ -121,6 +121,8 @@ func (a *Adapter) start(stopCh <-chan struct{}) error { return nil } +func (a *Adapter) SetReady(_ bool) {} + func (a *Adapter) Handle(ctx context.Context, msg *sarama.ConsumerMessage) (bool, error) { if a.rateLimiter != nil { a.rateLimiter.Wait(ctx)