Skip to content
This repository has been archived by the owner on Sep 2, 2024. It is now read-only.

[KafkaSource] Use the control protocol to expose the consumer group status #328

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
a92730e
Use the control protocol to expose the consumer group status in the K…
slinkydeveloper Jan 22, 2021
9b8930a
Fixes to make this thing working
slinkydeveloper Jan 25, 2021
70be9e1
Updated after https://github.com/slinkydeveloper/control-data-plane-c…
slinkydeveloper Jan 26, 2021
243b5bc
Sync with latest changes from the prototype
slinkydeveloper Feb 3, 2021
9fe88e9
Fixed conflicts
slinkydeveloper Feb 3, 2021
644b257
Deps
slinkydeveloper Feb 3, 2021
c9ae344
Removed some code
slinkydeveloper Feb 3, 2021
68ba97a
Now depending on knative.dev/control-protocol
slinkydeveloper Mar 4, 2021
bac78b1
Fixed goimports
slinkydeveloper Mar 4, 2021
dc95847
Just add this as status, not as condition
slinkydeveloper Mar 8, 2021
f4c79d5
Fix after the update
slinkydeveloper Mar 8, 2021
3085f57
Another massage of go mods
slinkydeveloper Mar 8, 2021
c8b91ea
Added status field for conversion
slinkydeveloper Mar 8, 2021
6593efe
My bad
slinkydeveloper Mar 8, 2021
c7ea2d0
More beautification
slinkydeveloper Mar 8, 2021
62fc1e1
Reverted license for i don't know what reason
slinkydeveloper Mar 8, 2021
3706388
Update
slinkydeveloper Mar 17, 2021
a6f6b6a
Added a test too
slinkydeveloper Mar 17, 2021
cd63da0
Fix Claims test
slinkydeveloper Mar 18, 2021
eef54e4
Ok we don't need this
slinkydeveloper Mar 18, 2021
2cb87b0
I guess there's some race here?
slinkydeveloper Mar 18, 2021
5ec97b9
Yeah I know i'm cheating
slinkydeveloper Mar 18, 2021
be432d7
Rebase changes
slinkydeveloper Mar 22, 2021
759cd92
Rebase changes
slinkydeveloper Mar 22, 2021
ee56189
Update
slinkydeveloper Apr 1, 2021
f133168
Update
slinkydeveloper Apr 7, 2021
3bcb691
Skipping the KafkaSourceUpdate test
slinkydeveloper Apr 7, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ require (
k8s.io/apimachinery v0.19.7
k8s.io/client-go v0.19.7
k8s.io/utils v0.0.0-20200729134348-d5654de09c73
knative.dev/control-protocol v0.0.0-20210402065223-6e2383c7197d
knative.dev/eventing v0.22.1-0.20210406191848-5171353bd1ed
knative.dev/hack v0.0.0-20210325223819-b6ab329907d3
knative.dev/networking v0.0.0-20210406043338-e38eb2be3962
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1276,6 +1276,8 @@ k8s.io/kube-openapi v0.0.0-20200805222855-6aeccd4b50c6 h1:+WnxoVtG8TMiudHBSEtrVL
k8s.io/kube-openapi v0.0.0-20200805222855-6aeccd4b50c6/go.mod h1:UuqjUnNftUyPE5H64/qeyjQoUZhGpeFDVdxjTeEVN2o=
k8s.io/utils v0.0.0-20200729134348-d5654de09c73 h1:uJmqzgNWG7XyClnU/mLPBWwfKKF1K8Hf8whTseBgJcg=
k8s.io/utils v0.0.0-20200729134348-d5654de09c73/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
knative.dev/control-protocol v0.0.0-20210402065223-6e2383c7197d h1:1ZueD3ovMOj8d/Ez+uPTol9jCwgy/6vkcGj26i3I498=
knative.dev/control-protocol v0.0.0-20210402065223-6e2383c7197d/go.mod h1:82/EwanWb1EjpPVz7ts5IxLi0GlX0eFZ5oVa8/xLMuc=
knative.dev/eventing v0.22.1-0.20210406191848-5171353bd1ed h1:TDGiR+w4hHN+VaXWF4bg8mb7Avwen+pjPfwSJbVQbxA=
knative.dev/eventing v0.22.1-0.20210406191848-5171353bd1ed/go.mod h1:LOG7bh0eZQkbYANcnORwke6Yy6aUu62o8GeByaOFfRQ=
knative.dev/hack v0.0.0-20210325223819-b6ab329907d3 h1:km0Rrh0T9/wA2pivQm1hqSPVwgNgGCHC2WNn3GakZmE=
Expand All @@ -1286,6 +1288,7 @@ knative.dev/networking v0.0.0-20210406043338-e38eb2be3962/go.mod h1:0V6M1AaWPL/M
knative.dev/pkg v0.0.0-20210329065222-9d92ea16c0d3/go.mod h1:PD5g8hUCXq6iR3tILjmZeJBvQfXGnHMPKryq54qHJhg=
knative.dev/pkg v0.0.0-20210330162221-808d62257db6/go.mod h1:PD5g8hUCXq6iR3tILjmZeJBvQfXGnHMPKryq54qHJhg=
knative.dev/pkg v0.0.0-20210331062321-6317ec6066f4/go.mod h1:PD5g8hUCXq6iR3tILjmZeJBvQfXGnHMPKryq54qHJhg=
knative.dev/pkg v0.0.0-20210331065221-952fdd90dbb0/go.mod h1:PD5g8hUCXq6iR3tILjmZeJBvQfXGnHMPKryq54qHJhg=
knative.dev/pkg v0.0.0-20210406170139-b8e331a6abf3 h1:mm1fYtGW1NPYbX5+YtvXx8v4Hjqil7Vb1xc6nHpTDEY=
knative.dev/pkg v0.0.0-20210406170139-b8e331a6abf3/go.mod h1:PD5g8hUCXq6iR3tILjmZeJBvQfXGnHMPKryq54qHJhg=
knative.dev/reconciler-test v0.0.0-20210329214021-2a67496462a4/go.mod h1:qLfJMHc0i9ENTSet/SUp/FcQm4QVfNTX8ZC//aVQN0M=
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/sources/v1alpha1/kafka_conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func (source *KafkaSource) ConvertTo(ctx context.Context, obj apis.Convertible)
}
source.Status.Status.DeepCopyInto(&sink.Status.Status)
sink.Status.Consumers = source.Status.Consumers
sink.Status.Claims = source.Status.Claims
sink.Status.Selector = source.Status.Selector
source.Status.Placeable.DeepCopyInto(&sink.Status.Placeable)

Expand Down Expand Up @@ -97,6 +98,7 @@ func (sink *KafkaSource) ConvertFrom(ctx context.Context, obj apis.Convertible)
source.Status.Status.DeepCopyInto(&sink.Status.Status)
sink.Status.Consumers = source.Status.Consumers
sink.Status.Selector = source.Status.Selector
sink.Status.Claims = source.Status.Claims
source.Status.Placeable.DeepCopyInto(&sink.Status.Placeable)

// Optionals
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/sources/v1alpha1/kafka_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ type KafkaSourceStatus struct {
// For round-tripping only.
Consumers int32 `json:"consumers,omitempty"`

// Claims consumed by this KafkaSource instance
// +optional
// For round-tripping only.
Claims string `json:"claims,omitempty"`

// Use for labelSelectorPath when scaling Kafka source
// +optional
Selector string `json:"selector,omitempty"`
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/sources/v1beta1/kafka_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,7 @@ func (s *KafkaSourceStatus) MarkKeyTypeCorrect() {
func (s *KafkaSourceStatus) MarkKeyTypeIncorrect(reason, messageFormat string, messageA ...interface{}) {
KafkaSourceCondSet.Manage(s).MarkFalse(KafkaConditionKeyType, reason, messageFormat, messageA...)
}

func (s *KafkaSourceStatus) UpdateConsumerGroupStatus(status string) {
s.Claims = status
}
4 changes: 4 additions & 0 deletions pkg/apis/sources/v1beta1/kafka_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ type KafkaSourceStatus struct {
// +optional
Selector string `json:"selector,omitempty"`

// Claims consumed by this KafkaSource instance
// +optional
Claims string `json:"claims,omitempty"`

// Implement Placeable.
// +optional
v1alpha1.Placeable `json:",inline"`
Expand Down
2 changes: 1 addition & 1 deletion pkg/channel/consolidated/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type mockKafkaConsumerFactory struct {
createErr bool
}

func (c mockKafkaConsumerFactory) StartConsumerGroup(groupID string, topics []string, logger *zap.SugaredLogger, handler consumer.KafkaConsumerHandler) (sarama.ConsumerGroup, error) {
func (c mockKafkaConsumerFactory) StartConsumerGroup(groupID string, topics []string, logger *zap.SugaredLogger, handler consumer.KafkaConsumerHandler, options ...consumer.SaramaConsumerHandlerOption) (sarama.ConsumerGroup, error) {
if c.createErr {
return nil, errors.New("error creating consumer")
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/common/consumer/consumer_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var newConsumerGroup = sarama.NewConsumerGroup

// Kafka consumer factory creates the ConsumerGroup and start consuming the specified topic
type KafkaConsumerGroupFactory interface {
StartConsumerGroup(groupID string, topics []string, logger *zap.SugaredLogger, handler KafkaConsumerHandler) (sarama.ConsumerGroup, error)
StartConsumerGroup(groupID string, topics []string, logger *zap.SugaredLogger, handler KafkaConsumerHandler, options ...SaramaConsumerHandlerOption) (sarama.ConsumerGroup, error)
}

type kafkaConsumerGroupFactoryImpl struct {
Expand All @@ -53,7 +53,7 @@ func (c *customConsumerGroup) Close() error {

var _ sarama.ConsumerGroup = (*customConsumerGroup)(nil)

func (c kafkaConsumerGroupFactoryImpl) StartConsumerGroup(groupID string, topics []string, logger *zap.SugaredLogger, handler KafkaConsumerHandler) (sarama.ConsumerGroup, error) {
func (c kafkaConsumerGroupFactoryImpl) StartConsumerGroup(groupID string, topics []string, logger *zap.SugaredLogger, handler KafkaConsumerHandler, options ...SaramaConsumerHandlerOption) (sarama.ConsumerGroup, error) {
consumerGroup, err := newConsumerGroup(c.addrs, groupID, c.config)
if err != nil {
return nil, err
Expand All @@ -65,7 +65,7 @@ func (c kafkaConsumerGroupFactoryImpl) StartConsumerGroup(groupID string, topics
go func() {
defer close(errorCh)
for {
consumerHandler := NewConsumerHandler(logger, handler, errorCh)
consumerHandler := NewConsumerHandler(logger, handler, errorCh, options...)

err := consumerGroup.Consume(ctx, topics, &consumerHandler)
if err == sarama.ErrClosedConsumerGroup {
Expand Down
45 changes: 39 additions & 6 deletions pkg/common/consumer/consumer_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,29 +32,61 @@ type KafkaConsumerHandler interface {
GetConsumerGroup() string
}

type SaramaConsumerLifecycleListener interface {
// Setup is invoked when the consumer is joining the session
Setup(sess sarama.ConsumerGroupSession)

// Cleanup is invoked when the consumer is leaving the session
Cleanup(sess sarama.ConsumerGroupSession)
}

type noopSaramaConsumerLifecycleListener struct{}

func (n noopSaramaConsumerLifecycleListener) Setup(sess sarama.ConsumerGroupSession) {}

func (n noopSaramaConsumerLifecycleListener) Cleanup(sess sarama.ConsumerGroupSession) {}

func WithSaramaConsumerLifecycleListener(listener SaramaConsumerLifecycleListener) SaramaConsumerHandlerOption {
return func(handler *SaramaConsumerHandler) {
handler.lifecycleListener = listener
}
}

// ConsumerHandler implements sarama.ConsumerGroupHandler and provides some glue code to simplify message handling
// You must implement KafkaConsumerHandler and create a new SaramaConsumerHandler with it
type SaramaConsumerHandler struct {
// The user message handler
handler KafkaConsumerHandler

lifecycleListener SaramaConsumerLifecycleListener

logger *zap.SugaredLogger

// Errors channel
errors chan error
}

func NewConsumerHandler(logger *zap.SugaredLogger, handler KafkaConsumerHandler, errorsCh chan error) SaramaConsumerHandler {
return SaramaConsumerHandler{
logger: logger,
handler: handler,
errors: errorsCh,
type SaramaConsumerHandlerOption func(*SaramaConsumerHandler)

func NewConsumerHandler(logger *zap.SugaredLogger, handler KafkaConsumerHandler, errorsCh chan error, options ...SaramaConsumerHandlerOption) SaramaConsumerHandler {
sch := SaramaConsumerHandler{
handler: handler,
lifecycleListener: noopSaramaConsumerLifecycleListener{},
logger: logger,
errors: errorsCh,
}

for _, f := range options {
f(&sch)
}

return sch
}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *SaramaConsumerHandler) Setup(sarama.ConsumerGroupSession) error {
func (consumer *SaramaConsumerHandler) Setup(session sarama.ConsumerGroupSession) error {
consumer.logger.Info("setting up handler")
consumer.lifecycleListener.Setup(session)
return nil
}

Expand All @@ -68,6 +100,7 @@ func (consumer *SaramaConsumerHandler) Cleanup(session sarama.ConsumerGroupSessi
consumer.handler.SetReady(p, false)
}
}
consumer.lifecycleListener.Cleanup(session)
return nil
}

Expand Down
46 changes: 43 additions & 3 deletions pkg/source/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ import (

"golang.org/x/time/rate"

ctrl "knative.dev/control-protocol/pkg"
ctrlnetwork "knative.dev/control-protocol/pkg/network"

"github.com/Shopify/sarama"
"go.opencensus.io/trace"
"go.uber.org/zap"
Expand All @@ -38,6 +41,7 @@ import (

"knative.dev/eventing-kafka/pkg/common/consumer"
"knative.dev/eventing-kafka/pkg/source/client"
kafkasourcecontrol "knative.dev/eventing-kafka/pkg/source/control"
)

const (
Expand All @@ -59,7 +63,9 @@ func NewEnvConfig() adapter.EnvConfigAccessor {
}

type Adapter struct {
config *AdapterConfig
config *AdapterConfig
controlServer *ctrlnetwork.ControlServer

httpMessageSender *kncloudevents.HTTPMessageSender
reporter pkgsource.StatsReporter
logger *zap.SugaredLogger
Expand All @@ -68,6 +74,8 @@ type Adapter struct {
}

var _ adapter.MessageAdapter = (*Adapter)(nil)
var _ consumer.KafkaConsumerHandler = (*Adapter)(nil)
var _ consumer.SaramaConsumerLifecycleListener = (*Adapter)(nil)
var _ adapter.MessageAdapterConstructor = NewAdapter

func NewAdapter(ctx context.Context, processed adapter.EnvConfigAccessor, httpMessageSender *kncloudevents.HTTPMessageSender, reporter pkgsource.StatsReporter) adapter.MessageAdapter {
Expand All @@ -86,7 +94,7 @@ func (a *Adapter) GetConsumerGroup() string {
return a.config.ConsumerGroup
}

func (a *Adapter) Start(ctx context.Context) error {
func (a *Adapter) Start(ctx context.Context) (err error) {
a.logger.Infow("Starting with config: ",
zap.String("Topics", strings.Join(a.config.Topics, ",")),
zap.String("ConsumerGroup", a.config.ConsumerGroup),
Expand All @@ -95,14 +103,27 @@ func (a *Adapter) Start(ctx context.Context) error {
zap.String("Namespace", a.config.Namespace),
)

// Init control service
a.controlServer, err = ctrlnetwork.StartInsecureControlServer(ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

insecrure :-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah for this first pass, we don't encrypt the connection with mTLS, I think the certificates controller still need some work.

if err != nil {
return err
}
a.controlServer.MessageHandler(a)

// init consumer group
addrs, config, err := client.NewConfigWithEnv(context.Background(), &a.config.KafkaEnvConfig)
if err != nil {
return fmt.Errorf("failed to create the config: %w", err)
}

consumerGroupFactory := consumer.NewConsumerGroupFactory(addrs, config)
group, err := consumerGroupFactory.StartConsumerGroup(a.config.ConsumerGroup, a.config.Topics, a.logger, a)
group, err := consumerGroupFactory.StartConsumerGroup(
a.config.ConsumerGroup,
a.config.Topics,
a.logger,
a,
consumer.WithSaramaConsumerLifecycleListener(a),
)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -177,3 +198,22 @@ func (a *Adapter) Handle(ctx context.Context, msg *sarama.ConsumerMessage) (bool
func (a *Adapter) SetRateLimits(r rate.Limit, b int) {
a.rateLimiter = rate.NewLimiter(r, b)
}

func (a *Adapter) HandleServiceMessage(ctx context.Context, message ctrl.ServiceMessage) {
// In this first PR, there is only the RA sending messages to control plane,
// there is no message the control plane should send to the RA
a.logger.Info("Received unexpected control message")
message.Ack()
}

func (a *Adapter) Setup(sess sarama.ConsumerGroupSession) {
if err := a.controlServer.SendAndWaitForAck(kafkasourcecontrol.NotifySetupClaimsOpCode, kafkasourcecontrol.Claims(sess.Claims())); err != nil {
a.logger.Warnf("Cannot send the claims update: %v", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these codecov warnings make me wonder, do we have a way to mock/unittest the control protocol as a consumer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recently developed some mocks and other testing utilities https://github.com/knative-sandbox/control-protocol/tree/main/pkg/test, but I still don't use these here. You want me to tackle it now or in a followup?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't you planning to start using more of that? e.g. for changing/updating topics?

I am fine in handling it via a follow-up.

}
}

func (a *Adapter) Cleanup(sess sarama.ConsumerGroupSession) {
if err := a.controlServer.SendAndWaitForAck(kafkasourcecontrol.NotifyCleanupClaimsOpCode, kafkasourcecontrol.Claims(sess.Claims())); err != nil {
a.logger.Warnf("Cannot send the claims update: %v", err)
}
}
91 changes: 91 additions & 0 deletions pkg/source/control/message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package control

import (
"encoding/json"
"fmt"
"strings"

"k8s.io/apimachinery/pkg/util/sets"

ctrl "knative.dev/control-protocol/pkg"
)

// This just contains the different opcodes
const (
NotifySetupClaimsOpCode ctrl.OpCode = 1
NotifyCleanupClaimsOpCode ctrl.OpCode = 2
Comment on lines +15 to +16
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we good with just single digits ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why shouldn't we be?

)

type Claims map[string][]int32

func ClaimsParser(payload []byte) (interface{}, error) {
var claims Claims
err := (&claims).UnmarshalBinary(payload)
if err != nil {
return nil, err
}
return claims, nil
}

func ClaimsMerger(old interface{}, new interface{}) interface{} {
oldClaims := old.(Claims)
newClaims := new.(Claims)
result := oldClaims.copy()

for topic, partitions := range result {
if newPartitions, ok := newClaims[topic]; ok {
result[topic] = sets.NewInt32(partitions...).Insert(newPartitions...).List() // Merge partitions
delete(newClaims, topic)
}
}
for newTopic, newPartitions := range newClaims {
result[newTopic] = newPartitions
}

return result
}

func ClaimsDifference(old interface{}, new interface{}) interface{} {
oldClaims := old.(Claims)
cleanedClaims := new.(Claims)
result := oldClaims.copy()

for topic, partitions := range result {
if cleanedPartitions, ok := cleanedClaims[topic]; ok {
newSet := sets.NewInt32(partitions...).Delete(cleanedPartitions...).List()
if len(newSet) == 0 {
delete(result, topic)
} else {
result[topic] = newSet
}
}
}

return result
}

func (c Claims) String() string {
strs := make([]string, 0, len(c))
for topic, partitions := range c {
strs = append(strs, fmt.Sprintf("'%s': %v", topic, partitions))
}
return strings.Join(strs, ", ")
}

func (c Claims) MarshalBinary() (data []byte, err error) {
return json.Marshal(c)
}

func (c *Claims) UnmarshalBinary(data []byte) error {
return json.Unmarshal(data, c)
}

func (c Claims) copy() Claims {
res := make(Claims, len(c))

for k, v := range c {
res[k] = v
}

return res
}
Loading