Skip to content
This repository has been archived by the owner on Sep 2, 2024. It is now read-only.

Add http endpoint to communicate dispatcher readySubscriber status #344

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
86d0ee2
Groundwork for dispatcher subscriber status w/ consumers
lberk Jan 22, 2021
dc06610
Add functionality to serve http of subscribers
lberk Jan 27, 2021
b60aa7c
Drop callback functions
lberk Jan 27, 2021
dd07041
Fix failing unit test, add unsub check for chanref
lberk Jan 27, 2021
fa3042a
Rework http handler to be dispatcher local (not kafkasubscription)
lberk Jan 27, 2021
a54fd54
Variable typo fix
lberk Jan 29, 2021
c8c642a
Fix copyright years
lberk Jan 29, 2021
6a2dade
Change header name to constant
lberk Jan 29, 2021
325e2c9
Move subscription handler to its own ServeHTTP func
lberk Feb 2, 2021
67f97a8
Remove channelRef in KafkaSubscription
lberk Feb 2, 2021
19919ba
Change bad channelref request to http.StatusNotFound
lberk Feb 2, 2021
5373342
Add namespace to subscriptions http output
lberk Feb 2, 2021
422afae
Add Unit tests for servehttp & setready
lberk Feb 2, 2021
6cc8607
Split uriSplit into channelRefName{,space} vars
lberk Feb 2, 2021
c2650da
Expose dispatcher http-sub-status port in disatcher svc
lberk Feb 3, 2021
89aa745
Add servehttp diagnostic messages
lberk Feb 3, 2021
b38d70a
One more uriSplit -> channelRefName variable rename
lberk Feb 3, 2021
b183732
Change how we write the http response
lberk Feb 3, 2021
1f23567
Add empty SetReady() method to source RA
lberk Feb 3, 2021
e18b361
Fix consumer_handler_test
lberk Feb 3, 2021
d1ddc68
more linting
lberk Feb 3, 2021
48758b2
Add back ObserveKind until controller implements substatus scraper
lberk Feb 3, 2021
1e768b9
Add more ServeHTTP unit tests
lberk Feb 3, 2021
a6fb63b
slightly alter where we mark a handler as ready or not
lberk Feb 4, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions config/channel/consolidated/deployments/dispatcher.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ spec:
- containerPort: 9090
name: metrics
protocol: TCP
- containerPort: 8081
name: sub-status
protocol: TCP
volumeMounts:
- name: config-kafka
mountPath: /etc/config-kafka
Expand All @@ -85,6 +88,10 @@ spec:
port: 80
protocol: TCP
targetPort: 8080
- name: http-sub-status
port: 8081
protocol: TCP
targetPort: 8081
selector:
messaging.knative.dev/channel: kafka-channel
messaging.knative.dev/role: dispatcher
117 changes: 102 additions & 15 deletions pkg/channel/consolidated/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2018 The Knative Authors
Copyright 2021 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -17,13 +17,16 @@ package dispatcher

import (
"context"
"encoding/json"
"errors"
"fmt"
nethttp "net/http"
"strings"
"sync"
"sync/atomic"

"k8s.io/apimachinery/pkg/util/sets"

"knative.dev/eventing-kafka/pkg/common/client"
"knative.dev/eventing-kafka/pkg/common/tracing"

Expand All @@ -44,6 +47,18 @@ import (
"knative.dev/eventing-kafka/pkg/common/consumer"
)

const (
dispatcherReadySubHeader = "K-Subscriber-Status"
)

type KafkaSubscription struct {
subs []types.UID

// readySubscriptionsLock must be used to synchronize access to channelReadySubscriptions
readySubscriptionsLock sync.RWMutex
channelReadySubscriptions sets.String
}

type KafkaDispatcher struct {
hostToChannelMap atomic.Value
// hostToChannelMapLock is used to update hostToChannelMap
Expand All @@ -53,7 +68,7 @@ type KafkaDispatcher struct {
dispatcher *eventingchannels.MessageDispatcherImpl

kafkaSyncProducer sarama.SyncProducer
channelSubscriptions map[eventingchannels.ChannelReference][]types.UID
channelSubscriptions map[eventingchannels.ChannelReference]*KafkaSubscription
lberk marked this conversation as resolved.
Show resolved Hide resolved
subsConsumerGroups map[types.UID]sarama.ConsumerGroup
subscriptions map[types.UID]Subscription
// consumerUpdateLock must be used to update kafkaConsumers
Expand Down Expand Up @@ -88,6 +103,43 @@ func (sub Subscription) String() string {
return s.String()
}

func (d *KafkaDispatcher) ServeHTTP(w nethttp.ResponseWriter, r *nethttp.Request) {
if r.Method != nethttp.MethodGet {
w.WriteHeader(nethttp.StatusMethodNotAllowed)
d.logger.Errorf("Received request method that wasn't GET: %s", r.Method)
return
}
uriSplit := strings.Split(r.RequestURI, "/")
if len(uriSplit) != 3 {
w.WriteHeader(nethttp.StatusNotFound)
d.logger.Errorf("Unable to process request: %s", r.RequestURI)
return
}
channelRefNamespace, channelRefName := uriSplit[1], uriSplit[2]
channelRef := eventingchannels.ChannelReference{
Name: channelRefName,
Namespace: channelRefNamespace,
}
if _, ok := d.channelSubscriptions[channelRef]; !ok {
w.WriteHeader(nethttp.StatusNotFound)
return
}
d.channelSubscriptions[channelRef].readySubscriptionsLock.RLock()
defer d.channelSubscriptions[channelRef].readySubscriptionsLock.RUnlock()
var subscriptions = make(map[string][]string)
w.Header().Set(dispatcherReadySubHeader, channelRefName)
subscriptions[channelRefNamespace+"/"+channelRefName] = d.channelSubscriptions[channelRef].channelReadySubscriptions.List()
jsonResult, err := json.Marshal(subscriptions)
if err != nil {
d.logger.Errorf("Error marshalling json for sub-status channelref: %s/%s, %w", channelRefNamespace, channelRefName, err)
return
}
_, err = w.Write(jsonResult)
if err != nil {
d.logger.Errorf("Error writing jsonResult to serveHTTP writer: %w", err)
}
}

func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispatcher, error) {
conf, err := client.NewConfigBuilder().
WithClientId(args.ClientID).
Expand All @@ -114,14 +166,18 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat
dispatcher := &KafkaDispatcher{
dispatcher: eventingchannels.NewMessageDispatcher(args.Logger.Desugar()),
kafkaConsumerFactory: consumer.NewConsumerGroupFactory(args.Brokers, conf),
channelSubscriptions: make(map[eventingchannels.ChannelReference][]types.UID),
channelSubscriptions: make(map[eventingchannels.ChannelReference]*KafkaSubscription),
subsConsumerGroups: make(map[types.UID]sarama.ConsumerGroup),
subscriptions: make(map[types.UID]Subscription),
kafkaSyncProducer: producer,
logger: args.Logger,
topicFunc: args.TopicFunc,
}

go func() {
dispatcher.logger.Fatal(nethttp.ListenAndServe(":8081", dispatcher))
}()

podName, err := env.GetRequiredConfigValue(args.Logger.Desugar(), env.PodNameEnvVarKey)
if err != nil {
return nil, err
Expand Down Expand Up @@ -180,9 +236,14 @@ type KafkaDispatcherArgs struct {
}

type consumerMessageHandler struct {
logger *zap.SugaredLogger
sub Subscription
dispatcher *eventingchannels.MessageDispatcherImpl
logger *zap.SugaredLogger
sub Subscription
dispatcher *eventingchannels.MessageDispatcherImpl
kafkaSubscription *KafkaSubscription
}

func (c consumerMessageHandler) SetReady(ready bool) {
c.kafkaSubscription.SetReady(c.sub.UID, ready)
}

func (c consumerMessageHandler) Handle(ctx context.Context, consumerMessage *sarama.ConsumerMessage) (bool, error) {
Expand Down Expand Up @@ -235,6 +296,22 @@ type ChannelConfig struct {
Subscriptions []Subscription
}

// SetReady will mark the subid in the KafkaSubscription and call any registered callbacks
func (ks *KafkaSubscription) SetReady(subID types.UID, ready bool) {
ks.readySubscriptionsLock.Lock()
defer ks.readySubscriptionsLock.Unlock()
if ready {
if !ks.channelReadySubscriptions.Has(string(subID)) {
ks.channelReadySubscriptions.Insert(string(subID))
}
} else {
if ks.channelReadySubscriptions.Has(string(subID)) {
ks.channelReadySubscriptions.Delete(string(subID))
}
}

}

// UpdateKafkaConsumers will be called by new CRD based kafka channel dispatcher controller.
func (d *KafkaDispatcher) UpdateKafkaConsumers(config *Config) (map[types.UID]error, error) {
if config == nil {
Expand All @@ -256,9 +333,16 @@ func (d *KafkaDispatcher) UpdateKafkaConsumers(config *Config) (map[types.UID]er

// Check if sub already exists
exists := false
for _, s := range d.channelSubscriptions[channelRef] {
if s == subSpec.UID {
exists = true
if _, ok := d.channelSubscriptions[channelRef]; ok {
for _, s := range d.channelSubscriptions[channelRef].subs {
if s == subSpec.UID {
exists = true
}
}
} else { //ensure the pointer is populated or things go boom
d.channelSubscriptions[channelRef] = &KafkaSubscription{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've a feeling this needs to be protected with a lock, wdyt?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean like the outer lock d.consumerUpdateLock.Lock() on L236?

subs: []types.UID{},
channelReadySubscriptions: sets.String{},
}
}

Expand All @@ -278,7 +362,7 @@ func (d *KafkaDispatcher) UpdateKafkaConsumers(config *Config) (map[types.UID]er
// Unsubscribe and close consumer for any deleted subscriptions
subsToRemove := make(map[eventingchannels.ChannelReference][]types.UID)
for channelRef, actualSubs := range d.channelSubscriptions {
subsToRemove[channelRef] = uidSetDifference(actualSubs, newSubs)
subsToRemove[channelRef] = uidSetDifference(actualSubs.subs, newSubs)
}

for channelRef, subs := range subsToRemove {
Expand All @@ -287,7 +371,7 @@ func (d *KafkaDispatcher) UpdateKafkaConsumers(config *Config) (map[types.UID]er
return nil, err
}
}
d.channelSubscriptions[channelRef] = newSubs
d.channelSubscriptions[channelRef].subs = newSubs
}

return failedToSubscribe, nil
Expand Down Expand Up @@ -360,7 +444,7 @@ func (d *KafkaDispatcher) subscribe(channelRef eventingchannels.ChannelReference
topicName := d.topicFunc(utils.KafkaChannelSeparator, channelRef.Namespace, channelRef.Name)
groupID := fmt.Sprintf("kafka.%s.%s.%s", channelRef.Namespace, channelRef.Name, string(sub.UID))

handler := &consumerMessageHandler{d.logger, sub, d.dispatcher}
handler := &consumerMessageHandler{d.logger, sub, d.dispatcher, d.channelSubscriptions[channelRef]}

consumerGroup, err := d.kafkaConsumerFactory.StartConsumerGroup(groupID, []string{topicName}, d.logger, handler)

Expand All @@ -378,7 +462,7 @@ func (d *KafkaDispatcher) subscribe(channelRef eventingchannels.ChannelReference
}
}()

d.channelSubscriptions[channelRef] = append(d.channelSubscriptions[channelRef], sub.UID)
d.channelSubscriptions[channelRef].subs = append(d.channelSubscriptions[channelRef].subs, sub.UID)
d.subscriptions[sub.UID] = sub
d.subsConsumerGroups[sub.UID] = consumerGroup

Expand All @@ -390,14 +474,17 @@ func (d *KafkaDispatcher) subscribe(channelRef eventingchannels.ChannelReference
func (d *KafkaDispatcher) unsubscribe(channel eventingchannels.ChannelReference, sub Subscription) error {
d.logger.Infow("Unsubscribing from channel", zap.Any("channel", channel), zap.String("subscription", sub.String()))
delete(d.subscriptions, sub.UID)
if subsSlice, ok := d.channelSubscriptions[channel]; ok {
if _, ok := d.channelSubscriptions[channel]; !ok {
return nil
}
if subsSlice := d.channelSubscriptions[channel].subs; subsSlice != nil {
var newSlice []types.UID
for _, oldSub := range subsSlice {
if oldSub != sub.UID {
newSlice = append(newSlice, oldSub)
}
}
d.channelSubscriptions[channel] = newSlice
d.channelSubscriptions[channel].subs = newSlice
}
if consumer, ok := d.subsConsumerGroups[sub.UID]; ok {
delete(d.subsConsumerGroups, sub.UID)
Expand Down
Loading