Skip to content

Commit

Permalink
Add http endpoint to communicate dispatcher readySubscriber status (k…
Browse files Browse the repository at this point in the history
…native-extensions#344)

* Groundwork for dispatcher subscriber status w/ consumers

* Add functionality to serve http of subscribers

* Drop callback functions

* Fix failing unit test, add unsub check for chanref

* Rework http handler to be dispatcher local (not kafkasubscription)

* Variable typo fix

* Fix copyright years

* Change header name to constant

* Move subscription handler to its own ServeHTTP func

* Remove channelRef in KafkaSubscription

* Change bad channelref request to http.StatusNotFound

* Add namespace to subscriptions http output

* Add Unit tests for servehttp & setready

* Split uriSplit into channelRefName{,space} vars

* Expose dispatcher http-sub-status port in disatcher svc

* Add servehttp diagnostic messages

* One more uriSplit -> channelRefName variable rename

* Change how we write the http response

* Add empty SetReady() method to source RA

* Fix consumer_handler_test

* more linting

* Add back ObserveKind until controller implements substatus scraper

* Add more ServeHTTP unit tests

* slightly alter where we mark a handler as ready or not
  • Loading branch information
lberk authored and devguyio committed Mar 25, 2021
1 parent bf08689 commit e956c9e
Show file tree
Hide file tree
Showing 7 changed files with 315 additions and 19 deletions.
7 changes: 7 additions & 0 deletions config/channel/consolidated/deployments/dispatcher.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ spec:
- containerPort: 9090
name: metrics
protocol: TCP
- containerPort: 8081
name: sub-status
protocol: TCP
volumeMounts:
- name: config-kafka
mountPath: /etc/config-kafka
Expand All @@ -85,6 +88,10 @@ spec:
port: 80
protocol: TCP
targetPort: 8080
- name: http-sub-status
port: 8081
protocol: TCP
targetPort: 8081
selector:
messaging.knative.dev/channel: kafka-channel
messaging.knative.dev/role: dispatcher
117 changes: 102 additions & 15 deletions pkg/channel/consolidated/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2018 The Knative Authors
Copyright 2021 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -17,13 +17,16 @@ package dispatcher

import (
"context"
"encoding/json"
"errors"
"fmt"
nethttp "net/http"
"strings"
"sync"
"sync/atomic"

"k8s.io/apimachinery/pkg/util/sets"

"knative.dev/eventing-kafka/pkg/common/client"
"knative.dev/eventing-kafka/pkg/common/tracing"

Expand All @@ -44,6 +47,18 @@ import (
"knative.dev/eventing-kafka/pkg/common/consumer"
)

const (
dispatcherReadySubHeader = "K-Subscriber-Status"
)

type KafkaSubscription struct {
subs []types.UID

// readySubscriptionsLock must be used to synchronize access to channelReadySubscriptions
readySubscriptionsLock sync.RWMutex
channelReadySubscriptions sets.String
}

type KafkaDispatcher struct {
hostToChannelMap atomic.Value
// hostToChannelMapLock is used to update hostToChannelMap
Expand All @@ -53,7 +68,7 @@ type KafkaDispatcher struct {
dispatcher *eventingchannels.MessageDispatcherImpl

kafkaSyncProducer sarama.SyncProducer
channelSubscriptions map[eventingchannels.ChannelReference][]types.UID
channelSubscriptions map[eventingchannels.ChannelReference]*KafkaSubscription
subsConsumerGroups map[types.UID]sarama.ConsumerGroup
subscriptions map[types.UID]Subscription
// consumerUpdateLock must be used to update kafkaConsumers
Expand Down Expand Up @@ -88,6 +103,43 @@ func (sub Subscription) String() string {
return s.String()
}

func (d *KafkaDispatcher) ServeHTTP(w nethttp.ResponseWriter, r *nethttp.Request) {
if r.Method != nethttp.MethodGet {
w.WriteHeader(nethttp.StatusMethodNotAllowed)
d.logger.Errorf("Received request method that wasn't GET: %s", r.Method)
return
}
uriSplit := strings.Split(r.RequestURI, "/")
if len(uriSplit) != 3 {
w.WriteHeader(nethttp.StatusNotFound)
d.logger.Errorf("Unable to process request: %s", r.RequestURI)
return
}
channelRefNamespace, channelRefName := uriSplit[1], uriSplit[2]
channelRef := eventingchannels.ChannelReference{
Name: channelRefName,
Namespace: channelRefNamespace,
}
if _, ok := d.channelSubscriptions[channelRef]; !ok {
w.WriteHeader(nethttp.StatusNotFound)
return
}
d.channelSubscriptions[channelRef].readySubscriptionsLock.RLock()
defer d.channelSubscriptions[channelRef].readySubscriptionsLock.RUnlock()
var subscriptions = make(map[string][]string)
w.Header().Set(dispatcherReadySubHeader, channelRefName)
subscriptions[channelRefNamespace+"/"+channelRefName] = d.channelSubscriptions[channelRef].channelReadySubscriptions.List()
jsonResult, err := json.Marshal(subscriptions)
if err != nil {
d.logger.Errorf("Error marshalling json for sub-status channelref: %s/%s, %w", channelRefNamespace, channelRefName, err)
return
}
_, err = w.Write(jsonResult)
if err != nil {
d.logger.Errorf("Error writing jsonResult to serveHTTP writer: %w", err)
}
}

func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispatcher, error) {
conf, err := client.NewConfigBuilder().
WithClientId(args.ClientID).
Expand All @@ -114,14 +166,18 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat
dispatcher := &KafkaDispatcher{
dispatcher: eventingchannels.NewMessageDispatcher(args.Logger.Desugar()),
kafkaConsumerFactory: consumer.NewConsumerGroupFactory(args.Brokers, conf),
channelSubscriptions: make(map[eventingchannels.ChannelReference][]types.UID),
channelSubscriptions: make(map[eventingchannels.ChannelReference]*KafkaSubscription),
subsConsumerGroups: make(map[types.UID]sarama.ConsumerGroup),
subscriptions: make(map[types.UID]Subscription),
kafkaSyncProducer: producer,
logger: args.Logger,
topicFunc: args.TopicFunc,
}

go func() {
dispatcher.logger.Fatal(nethttp.ListenAndServe(":8081", dispatcher))
}()

podName, err := env.GetRequiredConfigValue(args.Logger.Desugar(), env.PodNameEnvVarKey)
if err != nil {
return nil, err
Expand Down Expand Up @@ -179,9 +235,14 @@ type KafkaDispatcherArgs struct {
}

type consumerMessageHandler struct {
logger *zap.SugaredLogger
sub Subscription
dispatcher *eventingchannels.MessageDispatcherImpl
logger *zap.SugaredLogger
sub Subscription
dispatcher *eventingchannels.MessageDispatcherImpl
kafkaSubscription *KafkaSubscription
}

func (c consumerMessageHandler) SetReady(ready bool) {
c.kafkaSubscription.SetReady(c.sub.UID, ready)
}

func (c consumerMessageHandler) Handle(ctx context.Context, consumerMessage *sarama.ConsumerMessage) (bool, error) {
Expand Down Expand Up @@ -234,6 +295,22 @@ type ChannelConfig struct {
Subscriptions []Subscription
}

// SetReady will mark the subid in the KafkaSubscription and call any registered callbacks
func (ks *KafkaSubscription) SetReady(subID types.UID, ready bool) {
ks.readySubscriptionsLock.Lock()
defer ks.readySubscriptionsLock.Unlock()
if ready {
if !ks.channelReadySubscriptions.Has(string(subID)) {
ks.channelReadySubscriptions.Insert(string(subID))
}
} else {
if ks.channelReadySubscriptions.Has(string(subID)) {
ks.channelReadySubscriptions.Delete(string(subID))
}
}

}

// UpdateKafkaConsumers will be called by new CRD based kafka channel dispatcher controller.
func (d *KafkaDispatcher) UpdateKafkaConsumers(config *Config) (map[types.UID]error, error) {
if config == nil {
Expand All @@ -255,9 +332,16 @@ func (d *KafkaDispatcher) UpdateKafkaConsumers(config *Config) (map[types.UID]er

// Check if sub already exists
exists := false
for _, s := range d.channelSubscriptions[channelRef] {
if s == subSpec.UID {
exists = true
if _, ok := d.channelSubscriptions[channelRef]; ok {
for _, s := range d.channelSubscriptions[channelRef].subs {
if s == subSpec.UID {
exists = true
}
}
} else { //ensure the pointer is populated or things go boom
d.channelSubscriptions[channelRef] = &KafkaSubscription{
subs: []types.UID{},
channelReadySubscriptions: sets.String{},
}
}

Expand All @@ -277,7 +361,7 @@ func (d *KafkaDispatcher) UpdateKafkaConsumers(config *Config) (map[types.UID]er
// Unsubscribe and close consumer for any deleted subscriptions
subsToRemove := make(map[eventingchannels.ChannelReference][]types.UID)
for channelRef, actualSubs := range d.channelSubscriptions {
subsToRemove[channelRef] = uidSetDifference(actualSubs, newSubs)
subsToRemove[channelRef] = uidSetDifference(actualSubs.subs, newSubs)
}

for channelRef, subs := range subsToRemove {
Expand All @@ -286,7 +370,7 @@ func (d *KafkaDispatcher) UpdateKafkaConsumers(config *Config) (map[types.UID]er
return nil, err
}
}
d.channelSubscriptions[channelRef] = newSubs
d.channelSubscriptions[channelRef].subs = newSubs
}

return failedToSubscribe, nil
Expand Down Expand Up @@ -359,7 +443,7 @@ func (d *KafkaDispatcher) subscribe(channelRef eventingchannels.ChannelReference
topicName := d.topicFunc(utils.KafkaChannelSeparator, channelRef.Namespace, channelRef.Name)
groupID := fmt.Sprintf("kafka.%s.%s.%s", channelRef.Namespace, channelRef.Name, string(sub.UID))

handler := &consumerMessageHandler{d.logger, sub, d.dispatcher}
handler := &consumerMessageHandler{d.logger, sub, d.dispatcher, d.channelSubscriptions[channelRef]}

consumerGroup, err := d.kafkaConsumerFactory.StartConsumerGroup(groupID, []string{topicName}, d.logger, handler)

Expand All @@ -377,7 +461,7 @@ func (d *KafkaDispatcher) subscribe(channelRef eventingchannels.ChannelReference
}
}()

d.channelSubscriptions[channelRef] = append(d.channelSubscriptions[channelRef], sub.UID)
d.channelSubscriptions[channelRef].subs = append(d.channelSubscriptions[channelRef].subs, sub.UID)
d.subscriptions[sub.UID] = sub
d.subsConsumerGroups[sub.UID] = consumerGroup

Expand All @@ -389,14 +473,17 @@ func (d *KafkaDispatcher) subscribe(channelRef eventingchannels.ChannelReference
func (d *KafkaDispatcher) unsubscribe(channel eventingchannels.ChannelReference, sub Subscription) error {
d.logger.Infow("Unsubscribing from channel", zap.Any("channel", channel), zap.String("subscription", sub.String()))
delete(d.subscriptions, sub.UID)
if subsSlice, ok := d.channelSubscriptions[channel]; ok {
if _, ok := d.channelSubscriptions[channel]; !ok {
return nil
}
if subsSlice := d.channelSubscriptions[channel].subs; subsSlice != nil {
var newSlice []types.UID
for _, oldSub := range subsSlice {
if oldSub != sub.UID {
newSlice = append(newSlice, oldSub)
}
}
d.channelSubscriptions[channel] = newSlice
d.channelSubscriptions[channel].subs = newSlice
}
if consumer, ok := d.subsConsumerGroups[sub.UID]; ok {
delete(d.subsConsumerGroups, sub.UID)
Expand Down
Loading

0 comments on commit e956c9e

Please sign in to comment.