From 31a6be9012121e98663938bfc0f7bafa67c6d752 Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Tue, 1 Sep 2020 12:40:36 +0200 Subject: [PATCH] Add Reconciler Signed-off-by: Pierangelo Di Pilato --- .../cmd/kafka-broker-controller/main.go | 3 +- control-plane/pkg/apis/eventing/register.go | 3 +- .../pkg/apis/eventing/v1alpha1/doc.go | 1 - .../eventing/v1alpha1/kafka_sink_defaults.go | 2 + .../eventing/v1alpha1/kafka_sink_lifecycle.go | 17 ++ .../eventing/v1alpha1/kafka_sink_types.go | 7 +- .../pkg/reconciler/base/broker/broker_base.go | 84 ++++++++ .../base/broker/broker_base_test.go | 47 +++++ .../broker/condition_set.go} | 91 ++++----- .../broker/config.go} | 18 +- .../pkg/reconciler/base/reconciler.go | 13 +- control-plane/pkg/reconciler/broker/broker.go | 89 ++++----- .../pkg/reconciler/broker/broker_config.go | 18 +- .../pkg/reconciler/broker/broker_test.go | 79 +++----- .../pkg/reconciler/broker/controller.go | 5 +- .../pkg/reconciler/broker/controller_test.go | 4 +- control-plane/pkg/reconciler/broker/topic.go | 34 +--- .../pkg/reconciler/broker/topic_test.go | 5 +- .../pkg/reconciler/sink/controller.go | 49 +++++ .../pkg/reconciler/sink/kafka_sink.go | 182 ++++++++++++++++++ .../pkg/reconciler/testing/factory.go | 3 +- .../pkg/reconciler/testing/objects.go | 21 +- .../pkg/reconciler/trigger/controller.go | 6 +- .../pkg/reconciler/trigger/controller_test.go | 4 +- .../pkg/reconciler/trigger/trigger.go | 7 +- .../reconciler/trigger/trigger_lifecycle.go | 4 +- .../pkg/reconciler/trigger/trigger_test.go | 77 ++++---- test/cmd/watch-cm/main.go | 4 +- test/e2e/broker_trigger_test.go | 5 +- 29 files changed, 619 insertions(+), 263 deletions(-) create mode 100644 control-plane/pkg/reconciler/base/broker/broker_base.go create mode 100644 control-plane/pkg/reconciler/base/broker/broker_base_test.go rename control-plane/pkg/reconciler/{broker/broker_lifecycle.go => base/broker/condition_set.go} (53%) rename control-plane/pkg/reconciler/{broker/reconciler_config.go => base/broker/config.go} (84%) create mode 100644 control-plane/pkg/reconciler/sink/controller.go create mode 100644 control-plane/pkg/reconciler/sink/kafka_sink.go diff --git a/control-plane/cmd/kafka-broker-controller/main.go b/control-plane/cmd/kafka-broker-controller/main.go index 705394b35e..8f8af3ade1 100644 --- a/control-plane/cmd/kafka-broker-controller/main.go +++ b/control-plane/cmd/kafka-broker-controller/main.go @@ -25,6 +25,7 @@ import ( "knative.dev/pkg/controller" "knative.dev/pkg/injection/sharedmain" + brokerbase "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base/broker" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/trigger" ) @@ -34,7 +35,7 @@ const ( ) func main() { - brokerEnvConfigs := broker.EnvConfigs{} + brokerEnvConfigs := brokerbase.EnvConfigs{} if err := envconfig.Process("", &brokerEnvConfigs); err != nil { log.Fatal("cannot process environment variables", err) diff --git a/control-plane/pkg/apis/eventing/register.go b/control-plane/pkg/apis/eventing/register.go index 4a3b6e3a71..a8e8dc2d5c 100644 --- a/control-plane/pkg/apis/eventing/register.go +++ b/control-plane/pkg/apis/eventing/register.go @@ -19,11 +19,12 @@ package eventing import "k8s.io/apimachinery/pkg/runtime/schema" const ( + // GroupName defines the group of KafkaSink objects. GroupName = "eventing.knative.dev" ) var ( - // KafkaSinkResource represents a Knative Kafka Sink. + // KafkaSinksResource represents a Knative Kafka Sink. KafkaSinksResource = schema.GroupResource{ Group: GroupName, Resource: "kafkasinks", diff --git a/control-plane/pkg/apis/eventing/v1alpha1/doc.go b/control-plane/pkg/apis/eventing/v1alpha1/doc.go index 3ae84dcee2..c10a66fb30 100644 --- a/control-plane/pkg/apis/eventing/v1alpha1/doc.go +++ b/control-plane/pkg/apis/eventing/v1alpha1/doc.go @@ -14,7 +14,6 @@ * limitations under the License. */ - // +k8s:deepcopy-gen=package // +groupName=eventing.knative.dev package v1alpha1 diff --git a/control-plane/pkg/apis/eventing/v1alpha1/kafka_sink_defaults.go b/control-plane/pkg/apis/eventing/v1alpha1/kafka_sink_defaults.go index cbedee288b..0887735b59 100644 --- a/control-plane/pkg/apis/eventing/v1alpha1/kafka_sink_defaults.go +++ b/control-plane/pkg/apis/eventing/v1alpha1/kafka_sink_defaults.go @@ -18,10 +18,12 @@ package v1alpha1 import "context" +// SetDefaults sets KafkaSink defaults. func (ks *KafkaSink) SetDefaults(ctx context.Context) { ks.Spec.SetDefaults(ctx) } +// SetDefaults sets KafkaSinkSpec defaults. func (kss *KafkaSinkSpec) SetDefaults(ctx context.Context) { defaultMode := ModeStructured diff --git a/control-plane/pkg/apis/eventing/v1alpha1/kafka_sink_lifecycle.go b/control-plane/pkg/apis/eventing/v1alpha1/kafka_sink_lifecycle.go index 8e888bd618..06aef8fe42 100644 --- a/control-plane/pkg/apis/eventing/v1alpha1/kafka_sink_lifecycle.go +++ b/control-plane/pkg/apis/eventing/v1alpha1/kafka_sink_lifecycle.go @@ -22,14 +22,31 @@ const ( ConditionAddressable apis.ConditionType = "Addressable" ConditionTopicReady apis.ConditionType = "TopicReady" ConditionConfigMapUpdated apis.ConditionType = "ConfigMapUpdated" + ConditionConfigParsed apis.ConditionType = "ConfigParsed" ) var conditionSet = apis.NewLivingConditionSet( ConditionAddressable, ConditionTopicReady, ConditionConfigMapUpdated, + ConditionConfigParsed, ) func (ks *KafkaSink) GetConditionSet() apis.ConditionSet { return conditionSet } + +func (ks *KafkaSinkStatus) GetConditionSet() apis.ConditionSet { + return conditionSet +} + +// SetAddress makes this Kafka Sink addressable by setting the URI. It also +// sets the ConditionAddressable to true. +func (ks *KafkaSinkStatus) SetAddress(url *apis.URL) { + ks.Address.URL = url + if url != nil { + ks.GetConditionSet().Manage(ks).MarkTrue(ConditionAddressable) + } else { + ks.GetConditionSet().Manage(ks).MarkFalse(ConditionAddressable, "nil URL", "URL is nil") + } +} diff --git a/control-plane/pkg/apis/eventing/v1alpha1/kafka_sink_types.go b/control-plane/pkg/apis/eventing/v1alpha1/kafka_sink_types.go index 404184067e..94b872929b 100644 --- a/control-plane/pkg/apis/eventing/v1alpha1/kafka_sink_types.go +++ b/control-plane/pkg/apis/eventing/v1alpha1/kafka_sink_types.go @@ -26,7 +26,9 @@ import ( ) const ( - ModeBinary = "binary" + // CloudEvents binary content mode. + ModeBinary = "binary" + // CloudEvents structured content mode. ModeStructured = "structured" ) @@ -58,6 +60,7 @@ var _ apis.Defaultable = (*KafkaSink)(nil) var _ runtime.Object = (*KafkaSink)(nil) var _ duckv1.KRShaped = (*KafkaSink)(nil) +// KafkaSinkSpec defines the desired state of the Kafka Sink. type KafkaSinkSpec struct { Topic string `json:"topic"` @@ -71,6 +74,7 @@ type KafkaSinkSpec struct { ContentMode *string `json:"contentMode,omitempty"` } +// KafkaSinkStatus represents the current state of the KafkaSink. type KafkaSinkStatus struct { // inherits duck/v1 Status, which currently provides: // * ObservedGeneration - the 'Generation' of the Kafka Sink that was last processed by the controller. @@ -82,6 +86,7 @@ type KafkaSinkStatus struct { // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object +// KafkaSinkList defines a list of Kafka Sink. type KafkaSinkList struct { metav1.TypeMeta `json:",inline"` // +optional diff --git a/control-plane/pkg/reconciler/base/broker/broker_base.go b/control-plane/pkg/reconciler/base/broker/broker_base.go new file mode 100644 index 0000000000..6ffdf500f4 --- /dev/null +++ b/control-plane/pkg/reconciler/base/broker/broker_base.go @@ -0,0 +1,84 @@ +/* + * Copyright 2020 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package broker + +import ( + "fmt" + + "github.com/Shopify/sarama" + "go.uber.org/zap" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// PathFromObject returns an HTTP request path given a generic object. +func PathFromObject(obj metav1.Object) string { + return Path(obj.GetNamespace(), obj.GetName()) +} + +// Path returns an HTTP request path given namespace and name of an object. +func Path(namespace, name string) string { + return fmt.Sprintf("/%s/%s", namespace, name) +} + +// Topic returns a topic name given a topic prefix and a generic object. +func Topic(prefix string, obj metav1.Object) string { + return fmt.Sprintf("%s%s-%s", prefix, obj.GetNamespace(), obj.GetName()) +} + +// CreateTopic creates a topic with name 'topic' following the TopicConfig configuration passed as parameter. +// +// It returns the topic name or an error. +// +// If the topic already exists, it will return no errors. +func CreateTopic(admin sarama.ClusterAdmin, logger *zap.Logger, topic string, config *TopicConfig) (string, error) { + + topicDetail := &sarama.TopicDetail{ + NumPartitions: config.TopicDetail.NumPartitions, + ReplicationFactor: config.TopicDetail.ReplicationFactor, + } + + logger.Debug("create topic", + zap.String("topic", topic), + zap.Int16("replicationFactor", topicDetail.ReplicationFactor), + zap.Int32("numPartitions", topicDetail.NumPartitions), + ) + + createTopicError := admin.CreateTopic(topic, topicDetail, false) + if err, ok := createTopicError.(*sarama.TopicError); ok && err.Err == sarama.ErrTopicAlreadyExists { + return topic, nil + } + + return topic, createTopicError +} + +// NewClusterAdminFunc creates new sarama.ClusterAdmin. +type NewClusterAdminFunc func(addrs []string, config *sarama.Config) (sarama.ClusterAdmin, error) + +// GetClusterAdmin create a new sarama.ClusterAdmin. +// +// The caller is responsible for closing the sarama.ClusterAdmin. +func GetClusterAdmin(adminFunc NewClusterAdminFunc, bootstrapServers []string) (sarama.ClusterAdmin, error) { + config := sarama.NewConfig() + config.Version = sarama.MaxVersion + + kafkaClusterAdmin, err := adminFunc(bootstrapServers, config) + if err != nil { + return nil, fmt.Errorf("failed to create cluster admin: %w", err) + } + + return kafkaClusterAdmin, nil +} diff --git a/control-plane/pkg/reconciler/base/broker/broker_base_test.go b/control-plane/pkg/reconciler/base/broker/broker_base_test.go new file mode 100644 index 0000000000..3754d4f5c6 --- /dev/null +++ b/control-plane/pkg/reconciler/base/broker/broker_base_test.go @@ -0,0 +1,47 @@ +/* + * Copyright 2020 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package broker + +import "testing" + +func TestPath(t *testing.T) { + type args struct { + namespace string + name string + } + tests := []struct { + name string + args args + want string + }{ + { + name: "namespace/name", + args: args{ + namespace: "broker-namespace", + name: "broker-name", + }, + want: "/broker-namespace/broker-name", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := Path(tt.args.namespace, tt.args.name); got != tt.want { + t.Errorf("Path() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/control-plane/pkg/reconciler/broker/broker_lifecycle.go b/control-plane/pkg/reconciler/base/broker/condition_set.go similarity index 53% rename from control-plane/pkg/reconciler/broker/broker_lifecycle.go rename to control-plane/pkg/reconciler/base/broker/condition_set.go index 82c3664515..fb64b599c4 100644 --- a/control-plane/pkg/reconciler/broker/broker_lifecycle.go +++ b/control-plane/pkg/reconciler/base/broker/condition_set.go @@ -20,10 +20,11 @@ import ( "fmt" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/record" - eventing "knative.dev/eventing/pkg/apis/eventing/v1" "knative.dev/eventing/pkg/reconciler/names" "knative.dev/pkg/apis" + duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/reconciler" ) @@ -41,73 +42,75 @@ var ConditionSet = apis.NewLivingConditionSet( ConditionConfigParsed, ) -const ( - Broker = "Broker" - Reconciled = Broker + "Reconciled" -) +type Object interface { + duckv1.KRShaped + runtime.Object +} + +type StatusConditionManager struct { + Object Object -type statusConditionManager struct { - Broker *eventing.Broker + SetAddress func(u *apis.URL) - configs *Configs + Configs *EnvConfigs - recorder record.EventRecorder + Recorder record.EventRecorder } -func (manager *statusConditionManager) failedToGetBrokersTriggersConfigMap(err error) reconciler.Event { +func (manager *StatusConditionManager) FailedToGetBrokersTriggersConfigMap(err error) reconciler.Event { - manager.Broker.GetConditionSet().Manage(&manager.Broker.Status).MarkFalse( + manager.Object.GetConditionSet().Manage(manager.Object.GetStatus()).MarkFalse( ConditionConfigMapUpdated, fmt.Sprintf( "Failed to get ConfigMap: %s", - manager.configs.DataPlaneConfigMapAsString(), + manager.Configs.DataPlaneConfigMapAsString(), ), "%v", err, ) - return fmt.Errorf("failed to get brokers and triggers config map %s: %w", manager.configs.DataPlaneConfigMapAsString(), err) + return fmt.Errorf("failed to get brokers and triggers config map %s: %w", manager.Configs.DataPlaneConfigMapAsString(), err) } -func (manager *statusConditionManager) failedToGetBrokersTriggersDataFromConfigMap(err error) reconciler.Event { +func (manager *StatusConditionManager) FailedToGetBrokersTriggersDataFromConfigMap(err error) reconciler.Event { - manager.Broker.GetConditionSet().Manage(&manager.Broker.Status).MarkFalse( + manager.Object.GetConditionSet().Manage(manager.Object.GetStatus()).MarkFalse( ConditionConfigMapUpdated, fmt.Sprintf( "Failed to get brokers and trigger data from ConfigMap: %s", - manager.configs.DataPlaneConfigMapAsString(), + manager.Configs.DataPlaneConfigMapAsString(), ), "%v", err, ) - return fmt.Errorf("failed to get broker and triggers data from config map %s: %w", manager.configs.DataPlaneConfigMapAsString(), err) + return fmt.Errorf("failed to get broker and triggers data from config map %s: %w", manager.Configs.DataPlaneConfigMapAsString(), err) } -func (manager *statusConditionManager) failedToUpdateBrokersTriggersConfigMap(err error) reconciler.Event { +func (manager *StatusConditionManager) FailedToUpdateBrokersTriggersConfigMap(err error) reconciler.Event { - manager.Broker.GetConditionSet().Manage(&manager.Broker.Status).MarkFalse( + manager.Object.GetConditionSet().Manage(manager.Object.GetStatus()).MarkFalse( ConditionConfigMapUpdated, - fmt.Sprintf("Failed to update ConfigMap: %s", manager.configs.DataPlaneConfigMapAsString()), + fmt.Sprintf("Failed to update ConfigMap: %s", manager.Configs.DataPlaneConfigMapAsString()), "%s", err, ) - return fmt.Errorf("failed to update brokers and triggers config map %s: %w", manager.configs.DataPlaneConfigMapAsString(), err) + return fmt.Errorf("failed to update brokers and triggers config map %s: %w", manager.Configs.DataPlaneConfigMapAsString(), err) } -func (manager *statusConditionManager) brokersTriggersConfigMapUpdated() { +func (manager *StatusConditionManager) BrokersTriggersConfigMapUpdated() { - manager.Broker.GetConditionSet().Manage(&manager.Broker.Status).MarkTrueWithReason( + manager.Object.GetConditionSet().Manage(manager.Object.GetStatus()).MarkTrueWithReason( ConditionConfigMapUpdated, - fmt.Sprintf("Config map %s updated", manager.configs.DataPlaneConfigMapAsString()), + fmt.Sprintf("Config map %s updated", manager.Configs.DataPlaneConfigMapAsString()), "", ) } -func (manager *statusConditionManager) failedToCreateTopic(topic string, err error) reconciler.Event { +func (manager *StatusConditionManager) FailedToCreateTopic(topic string, err error) reconciler.Event { - manager.Broker.GetConditionSet().Manage(&manager.Broker.Status).MarkFalse( + manager.Object.GetConditionSet().Manage(manager.Object.GetStatus()).MarkFalse( ConditionTopicReady, fmt.Sprintf("Failed to create topic: %s", topic), "%v", @@ -117,36 +120,36 @@ func (manager *statusConditionManager) failedToCreateTopic(topic string, err err return fmt.Errorf("failed to create topic: %s: %w", topic, err) } -func (manager *statusConditionManager) topicCreated(topic string) { +func (manager *StatusConditionManager) TopicCreated(topic string) { - manager.Broker.GetConditionSet().Manage(&manager.Broker.Status).MarkTrueWithReason( + manager.Object.GetConditionSet().Manage(manager.Object.GetStatus()).MarkTrueWithReason( ConditionTopicReady, fmt.Sprintf("Topic %s created", topic), "", ) } -func (manager *statusConditionManager) reconciled() reconciler.Event { +func (manager *StatusConditionManager) Reconciled() reconciler.Event { - broker := manager.Broker + object := manager.Object - broker.Status.Address.URL = &apis.URL{ + manager.SetAddress(&apis.URL{ Scheme: "http", - Host: names.ServiceHostName(manager.configs.BrokerIngressName, manager.configs.SystemNamespace), - Path: fmt.Sprintf("/%s/%s", broker.Namespace, broker.Name), - } - broker.GetConditionSet().Manage(&broker.Status).MarkTrue(ConditionAddressable) + Host: names.ServiceHostName(manager.Configs.BrokerIngressName, manager.Configs.SystemNamespace), + Path: fmt.Sprintf("/%s/%s", object.GetNamespace(), object.GetName()), + }) + object.GetConditionSet().Manage(object.GetStatus()).MarkTrue(ConditionAddressable) return nil } -func (manager *statusConditionManager) failedToUpdateDispatcherPodsAnnotation(err error) { +func (manager *StatusConditionManager) FailedToUpdateDispatcherPodsAnnotation(err error) { // We don't set status conditions for dispatcher pods updates. // Record the event. - manager.recorder.Eventf( - manager.Broker, + manager.Recorder.Eventf( + manager.Object, corev1.EventTypeWarning, "failed to update dispatcher pods annotation", "%v", @@ -154,19 +157,19 @@ func (manager *statusConditionManager) failedToUpdateDispatcherPodsAnnotation(er ) } -func (manager *statusConditionManager) failedToUpdateReceiverPodsAnnotation(err error) reconciler.Event { +func (manager *StatusConditionManager) FailedToUpdateReceiverPodsAnnotation(err error) reconciler.Event { return fmt.Errorf("failed to update receiver pods annotation: %w", err) } -func (manager *statusConditionManager) failedToGetBrokerConfig(err error) reconciler.Event { +func (manager *StatusConditionManager) FailedToGetBrokerConfig(err error) reconciler.Event { return fmt.Errorf("failed to get broker configuration: %w", err) } -func (manager *statusConditionManager) failedToResolveBrokerConfig(err error) reconciler.Event { +func (manager *StatusConditionManager) FailedToResolveBrokerConfig(err error) reconciler.Event { - manager.Broker.GetConditionSet().Manage(&manager.Broker.Status).MarkFalse( + manager.Object.GetConditionSet().Manage(manager.Object.GetStatus()).MarkFalse( ConditionConfigParsed, fmt.Sprintf("%v", err), "", @@ -175,6 +178,6 @@ func (manager *statusConditionManager) failedToResolveBrokerConfig(err error) re return fmt.Errorf("failed to get broker configuration: %w", err) } -func (manager *statusConditionManager) brokerConfigResolved() { - manager.Broker.GetConditionSet().Manage(&manager.Broker.Status).MarkTrue(ConditionConfigParsed) +func (manager *StatusConditionManager) BrokerConfigResolved() { + manager.Object.GetConditionSet().Manage(manager.Object.GetStatus()).MarkTrue(ConditionConfigParsed) } diff --git a/control-plane/pkg/reconciler/broker/reconciler_config.go b/control-plane/pkg/reconciler/base/broker/config.go similarity index 84% rename from control-plane/pkg/reconciler/broker/reconciler_config.go rename to control-plane/pkg/reconciler/base/broker/config.go index a1d784507e..5ffc61ff0b 100644 --- a/control-plane/pkg/reconciler/broker/reconciler_config.go +++ b/control-plane/pkg/reconciler/base/broker/config.go @@ -18,13 +18,10 @@ package broker import ( "fmt" -) - -type Configs struct { - EnvConfigs + "strings" - BootstrapServers string -} + "github.com/Shopify/sarama" +) type EnvConfigs struct { DataPlaneConfigMapNamespace string `required:"true" split_words:"true"` @@ -38,3 +35,12 @@ type EnvConfigs struct { func (c *EnvConfigs) DataPlaneConfigMapAsString() string { return fmt.Sprintf("%s/%s", c.DataPlaneConfigMapNamespace, c.DataPlaneConfigMapName) } + +type TopicConfig struct { + TopicDetail sarama.TopicDetail + BootstrapServers []string +} + +func (c TopicConfig) GetBootstrapServers() string { + return strings.Join(c.BootstrapServers, ",") +} diff --git a/control-plane/pkg/reconciler/base/reconciler.go b/control-plane/pkg/reconciler/base/reconciler.go index b27d14176e..a2736481a2 100644 --- a/control-plane/pkg/reconciler/base/reconciler.go +++ b/control-plane/pkg/reconciler/base/reconciler.go @@ -25,9 +25,11 @@ const ( ConfigMapDataKey = "data" // label for selecting dispatcher pods. - DispatcherLabel = "kafka-broker-dispatcher" + BrokerDispatcherLabel = "kafka-broker-dispatcher" // label for selecting receiver pods. - ReceiverLabel = "kafka-broker-receiver" + BrokerReceiverLabel = "kafka-broker-receiver" + // label for selecting receiver pods. + SinkReceiverLabel = "kafka-sink-receiver" // volume generation annotation data plane pods. VolumeGenerationAnnotationKey = "volumeGeneration" @@ -46,6 +48,9 @@ type Reconciler struct { DataPlaneConfigMapName string DataPlaneConfigFormat string SystemNamespace string + + DispatcherLabel string + ReceiverLabel string } func (r *Reconciler) GetOrCreateDataPlaneConfigMap() (*corev1.ConfigMap, error) { @@ -151,7 +156,7 @@ func (r *Reconciler) UpdateDispatcherPodsAnnotation(logger *zap.Logger, volumeGe return retry.RetryOnConflict(retry.DefaultRetry, func() error { - labelSelector := labels.SelectorFromSet(map[string]string{"app": DispatcherLabel}) + labelSelector := labels.SelectorFromSet(map[string]string{"app": r.DispatcherLabel}) pods, errors := r.PodLister.Pods(r.SystemNamespace).List(labelSelector) if errors != nil { return fmt.Errorf("failed to list dispatcher pods in namespace %s: %w", r.SystemNamespace, errors) @@ -165,7 +170,7 @@ func (r *Reconciler) UpdateReceiverPodsAnnotation(logger *zap.Logger, volumeGene return retry.RetryOnConflict(retry.DefaultRetry, func() error { - labelSelector := labels.SelectorFromSet(map[string]string{"app": ReceiverLabel}) + labelSelector := labels.SelectorFromSet(map[string]string{"app": r.ReceiverLabel}) pods, errors := r.PodLister.Pods(r.SystemNamespace).List(labelSelector) if errors != nil { return fmt.Errorf("failed to list receiver pods in namespace %s: %w", r.SystemNamespace, errors) diff --git a/control-plane/pkg/reconciler/broker/broker.go b/control-plane/pkg/reconciler/broker/broker.go index 0f03180d31..9d4ea8d280 100644 --- a/control-plane/pkg/reconciler/broker/broker.go +++ b/control-plane/pkg/reconciler/broker/broker.go @@ -26,6 +26,7 @@ import ( "github.com/Shopify/sarama" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/util/retry" eventing "knative.dev/eventing/pkg/apis/eventing/v1" @@ -37,16 +38,23 @@ import ( coreconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/core/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/log" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base" + brokerbase "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base/broker" ) const ( - // topic prefix - (topic name: knative-broker--) + // TopicPrefix is the Kafka Broker topic prefix - (topic name: knative-broker--) TopicPrefix = "knative-broker-" // signal that the broker hasn't been added to the config map yet. NoBroker = -1 ) +type Configs struct { + brokerbase.EnvConfigs + + BootstrapServers string +} + type Reconciler struct { *base.Reconciler @@ -60,7 +68,7 @@ type Reconciler struct { // NewClusterAdmin creates new sarama ClusterAdmin. It's convenient to add this as Reconciler field so that we can // mock the function used during the reconciliation loop. - NewClusterAdmin func(addrs []string, config *sarama.Config) (sarama.ClusterAdmin, error) + NewClusterAdmin brokerbase.NewClusterAdminFunc Configs *Configs } @@ -75,32 +83,33 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker) logger := log.Logger(ctx, "reconcile", broker) - statusConditionManager := statusConditionManager{ - Broker: broker, - configs: r.Configs, - recorder: controller.GetEventRecorder(ctx), + statusConditionManager := brokerbase.StatusConditionManager{ + Object: broker, + SetAddress: broker.Status.SetAddress, + Configs: &r.Configs.EnvConfigs, + Recorder: controller.GetEventRecorder(ctx), } config, err := r.resolveBrokerConfig(logger, broker) if err != nil { - return statusConditionManager.failedToResolveBrokerConfig(err) + return statusConditionManager.FailedToResolveBrokerConfig(err) } - statusConditionManager.brokerConfigResolved() + statusConditionManager.BrokerConfigResolved() logger.Debug("config resolved", zap.Any("config", config)) - topic, err := r.CreateTopic(logger, Topic(broker), config) + topic, err := r.CreateTopic(logger, brokerbase.Topic(TopicPrefix, broker), config) if err != nil { - return statusConditionManager.failedToCreateTopic(topic, err) + return statusConditionManager.FailedToCreateTopic(topic, err) } - statusConditionManager.topicCreated(topic) + statusConditionManager.TopicCreated(topic) logger.Debug("Topic created", zap.Any("topic", topic)) // Get brokers and triggers config map. brokersTriggersConfigMap, err := r.GetOrCreateDataPlaneConfigMap() if err != nil { - return statusConditionManager.failedToGetBrokersTriggersConfigMap(err) + return statusConditionManager.FailedToGetBrokersTriggersConfigMap(err) } logger.Debug("Got brokers and triggers config map") @@ -108,7 +117,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker) // Get brokersTriggers data. brokersTriggers, err := r.GetDataPlaneConfigMapData(logger, brokersTriggersConfigMap) if err != nil && brokersTriggers == nil { - return statusConditionManager.failedToGetBrokersTriggersDataFromConfigMap(err) + return statusConditionManager.FailedToGetBrokersTriggersDataFromConfigMap(err) } logger.Debug( @@ -116,12 +125,12 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker) zap.Any(base.BrokersTriggersDataLogKey, log.BrokersMarshaller{Brokers: brokersTriggers}), ) - brokerIndex := FindBroker(brokersTriggers, broker) + brokerIndex := FindBroker(brokersTriggers, broker.UID) // Get broker configuration. brokerConfig, err := r.getBrokerConfig(topic, broker, config) if err != nil { - return statusConditionManager.failedToGetBrokerConfig(err) + return statusConditionManager.FailedToGetBrokerConfig(err) } // Update brokersTriggers data with the new broker configuration if brokerIndex != NoBroker { @@ -143,7 +152,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker) if err := r.UpdateDataPlaneConfigMap(brokersTriggers, brokersTriggersConfigMap); err != nil { return err } - statusConditionManager.brokersTriggersConfigMapUpdated() + statusConditionManager.BrokersTriggersConfigMapUpdated() logger.Debug("Brokers and triggers config map updated") @@ -170,12 +179,12 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker) zap.Error(err), ) - statusConditionManager.failedToUpdateDispatcherPodsAnnotation(err) + statusConditionManager.FailedToUpdateDispatcherPodsAnnotation(err) } else { logger.Debug("Updated dispatcher pod annotation") } - return statusConditionManager.reconciled() + return statusConditionManager.Reconciled() } func (r *Reconciler) FinalizeKind(ctx context.Context, broker *eventing.Broker) reconciler.Event { @@ -207,7 +216,7 @@ func (r *Reconciler) finalizeKind(ctx context.Context, broker *eventing.Broker) zap.Any(base.BrokersTriggersDataLogKey, log.BrokersMarshaller{Brokers: brokersTriggers}), ) - brokerIndex := FindBroker(brokersTriggers, broker) + brokerIndex := FindBroker(brokersTriggers, broker.UID) if brokerIndex != NoBroker { deleteBroker(brokersTriggers, brokerIndex) @@ -230,7 +239,7 @@ func (r *Reconciler) finalizeKind(ctx context.Context, broker *eventing.Broker) } bootstrapServers := config.BootstrapServers - topic, err := r.deleteTopic(Topic(broker), bootstrapServers) + topic, err := r.deleteTopic(brokerbase.Topic(TopicPrefix, broker), bootstrapServers) if err != nil { return fmt.Errorf("failed to delete topic %s: %w", topic, err) } @@ -244,7 +253,7 @@ func incrementVolumeGeneration(generation uint64) uint64 { return (generation + 1) % (math.MaxUint64 - 1) } -func (r *Reconciler) resolveBrokerConfig(logger *zap.Logger, broker *eventing.Broker) (*Config, error) { +func (r *Reconciler) resolveBrokerConfig(logger *zap.Logger, broker *eventing.Broker) (*brokerbase.TopicConfig, error) { logger.Debug("broker config", zap.Any("broker.spec.config", broker.Spec.Config)) @@ -283,25 +292,25 @@ func (r *Reconciler) defaultTopicDetail() sarama.TopicDetail { return topicDetail } -func (r *Reconciler) defaultConfig() (*Config, error) { +func (r *Reconciler) defaultConfig() (*brokerbase.TopicConfig, error) { bootstrapServers, err := r.getDefaultBootstrapServersOrFail() if err != nil { return nil, err } - return &Config{ + return &brokerbase.TopicConfig{ TopicDetail: r.defaultTopicDetail(), BootstrapServers: bootstrapServers, }, nil } -func (r *Reconciler) getBrokerConfig(topic string, broker *eventing.Broker, config *Config) (*coreconfig.Broker, error) { +func (r *Reconciler) getBrokerConfig(topic string, broker *eventing.Broker, config *brokerbase.TopicConfig) (*coreconfig.Broker, error) { brokerConfig := &coreconfig.Broker{ Id: string(broker.UID), Topic: topic, - Path: Path(broker.Namespace, broker.Name), - BootstrapServers: config.getBootstrapServers(), + Path: brokerbase.PathFromObject(broker), + BootstrapServers: config.GetBootstrapServers(), } if broker.Spec.Delivery == nil || broker.Spec.Delivery.DeadLetterSink == nil { @@ -331,11 +340,11 @@ func (r *Reconciler) ConfigMapUpdated(ctx context.Context) func(configMap *corev logger.Debug("new defaults", zap.Any("topicDetail", config.TopicDetail), - zap.String("BootstrapServers", config.getBootstrapServers()), + zap.String("BootstrapServers", config.GetBootstrapServers()), ) r.SetDefaultTopicDetails(config.TopicDetail) - r.SetBootstrapServers(config.getBootstrapServers()) + r.SetBootstrapServers(config.GetBootstrapServers()) } } @@ -346,25 +355,13 @@ func (r *Reconciler) SetBootstrapServers(servers string) { return } - addrs := bootstrapServersArray(servers) + addrs := BootstrapServersArray(servers) r.bootstrapServersLock.Lock() r.bootstrapServers = addrs r.bootstrapServersLock.Unlock() } -func (r *Reconciler) getKafkaClusterAdmin(bootstrapServers []string) (sarama.ClusterAdmin, error) { - config := sarama.NewConfig() - config.Version = sarama.MaxVersion - - kafkaClusterAdmin, err := r.NewClusterAdmin(bootstrapServers, config) - if err != nil { - return nil, fmt.Errorf("failed to create cluster admin: %w", err) - } - - return kafkaClusterAdmin, nil -} - func (r *Reconciler) SetDefaultTopicDetails(topicDetail sarama.TopicDetail) { r.KafkaDefaultTopicDetailsLock.Lock() defer r.KafkaDefaultTopicDetailsLock.Unlock() @@ -372,11 +369,11 @@ func (r *Reconciler) SetDefaultTopicDetails(topicDetail sarama.TopicDetail) { r.KafkaDefaultTopicDetails = topicDetail } -func FindBroker(brokersTriggers *coreconfig.Brokers, broker *eventing.Broker) int { +func FindBroker(brokersTriggers *coreconfig.Brokers, broker types.UID) int { // Find broker in brokersTriggers. brokerIndex := NoBroker for i, b := range brokersTriggers.Brokers { - if b.Id == string(broker.UID) { + if b.Id == string(broker) { brokerIndex = i break } @@ -409,10 +406,6 @@ func (r *Reconciler) getDefaultBootstrapServersOrFail() ([]string, error) { return r.bootstrapServers, nil } -func bootstrapServersArray(bootstrapServers string) []string { +func BootstrapServersArray(bootstrapServers string) []string { return strings.Split(bootstrapServers, ",") } - -func Path(namespace, name string) string { - return fmt.Sprintf("/%s/%s", namespace, name) -} diff --git a/control-plane/pkg/reconciler/broker/broker_config.go b/control-plane/pkg/reconciler/broker/broker_config.go index ff744ae991..b2cbf40a10 100644 --- a/control-plane/pkg/reconciler/broker/broker_config.go +++ b/control-plane/pkg/reconciler/broker/broker_config.go @@ -18,20 +18,16 @@ package broker import ( "fmt" - "strings" "github.com/Shopify/sarama" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "knative.dev/pkg/configmap" -) -type Config struct { - TopicDetail sarama.TopicDetail - BootstrapServers []string -} + brokerbase "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base/broker" +) -func configFromConfigMap(logger *zap.Logger, cm *corev1.ConfigMap) (*Config, error) { +func configFromConfigMap(logger *zap.Logger, cm *corev1.ConfigMap) (*brokerbase.TopicConfig, error) { topicDetail := sarama.TopicDetail{} @@ -57,16 +53,12 @@ func configFromConfigMap(logger *zap.Logger, cm *corev1.ConfigMap) (*Config, err topicDetail.ReplicationFactor = int16(replicationFactor) - config := &Config{ + config := &brokerbase.TopicConfig{ TopicDetail: topicDetail, - BootstrapServers: bootstrapServersArray(bootstrapServers), + BootstrapServers: BootstrapServersArray(bootstrapServers), } logger.Debug("got broker config from config map", zap.Any("config", config)) return config, nil } - -func (c Config) getBootstrapServers() string { - return strings.Join(c.BootstrapServers, ",") -} diff --git a/control-plane/pkg/reconciler/broker/broker_test.go b/control-plane/pkg/reconciler/broker/broker_test.go index 08d7949f37..cffe007846 100644 --- a/control-plane/pkg/reconciler/broker/broker_test.go +++ b/control-plane/pkg/reconciler/broker/broker_test.go @@ -47,6 +47,7 @@ import ( coreconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/core/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base" + brokerbase "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base/broker" . "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/kafka" . "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/testing" @@ -76,7 +77,7 @@ var ( ) func TestBrokerReconciler(t *testing.T) { - eventing.RegisterAlternateBrokerConditionSet(ConditionSet) + eventing.RegisterAlternateBrokerConditionSet(brokerbase.ConditionSet) t.Parallel() @@ -117,7 +118,7 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) { { Id: BrokerUUID, Topic: GetTopic(), - Path: Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), BootstrapServers: bootstrapServers, }, }, @@ -174,7 +175,7 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) { Id: BrokerUUID, Topic: GetTopic(), DeadLetterSink: "http://test-service.test-service-namespace.svc.cluster.local/", - Path: Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), BootstrapServers: bootstrapServers, }, }, @@ -270,7 +271,7 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) { Id: BrokerUUID, Topic: GetTopic(), DeadLetterSink: "http://test-service.test-service-namespace.svc.cluster.local/", - Path: Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), BootstrapServers: bootstrapServers, }, }, @@ -323,7 +324,7 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) { { Id: BrokerUUID, Topic: GetTopic(), - Path: Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), BootstrapServers: bootstrapServers, }, }, @@ -364,7 +365,7 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) { Id: "5384faa4-6bdf-428d-b6c2-d6f89ce1d44b", Topic: "my-existing-topic-a", DeadLetterSink: "http://www.my-sink.com", - Path: Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), }, { Id: "5384faa4-6bdf-428d-b6c2-d6f89ce1d44a", @@ -392,7 +393,7 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) { Id: "5384faa4-6bdf-428d-b6c2-d6f89ce1d44b", Topic: "my-existing-topic-a", DeadLetterSink: "http://www.my-sink.com", - Path: Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), }, { Id: "5384faa4-6bdf-428d-b6c2-d6f89ce1d44a", @@ -402,7 +403,7 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) { { Id: BrokerUUID, Topic: GetTopic(), - Path: Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), BootstrapServers: bootstrapServers, }, }, @@ -487,7 +488,7 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) { Id: BrokerUUID, Topic: GetTopic(), DeadLetterSink: "http://www.my-sink.com/api", - Path: Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), BootstrapServers: bootstrapServers, }, }, @@ -542,13 +543,13 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) { Id: "5384faa4-6bdf-428d-b6c2-d6f89ce1d44b", Topic: "my-existing-topic-a", DeadLetterSink: "http://www.my-sink.com", - Path: Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), }, { Id: BrokerUUID, Topic: GetTopic(), DeadLetterSink: "http://www.my-sink.com", - Path: Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), }, }, }, &configs), @@ -571,12 +572,12 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) { Id: "5384faa4-6bdf-428d-b6c2-d6f89ce1d44b", Topic: "my-existing-topic-a", DeadLetterSink: "http://www.my-sink.com", - Path: Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), }, { Id: BrokerUUID, Topic: GetTopic(), - Path: Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), BootstrapServers: bootstrapServers, }, }, @@ -620,12 +621,12 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) { Id: "5384faa4-6bdf-428d-b6c2-d6f89ce1d44b", Topic: "my-existing-topic-a", DeadLetterSink: "http://www.my-sink.com", - Path: Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), }, { Id: BrokerUUID, Topic: GetTopic(), - Path: Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), }, }, VolumeGeneration: 1, @@ -649,12 +650,12 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) { Id: "5384faa4-6bdf-428d-b6c2-d6f89ce1d44b", Topic: "my-existing-topic-a", DeadLetterSink: "http://www.my-sink.com", - Path: Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), }, { Id: BrokerUUID, Topic: GetTopic(), - Path: Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), BootstrapServers: bootstrapServers, }, }, @@ -701,12 +702,12 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) { Id: "5384faa4-6bdf-428d-b6c2-d6f89ce1d44b", Topic: "my-existing-topic-a", DeadLetterSink: "http://www.my-sink.com", - Path: Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), }, { Id: BrokerUUID, Topic: GetTopic(), - Path: Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), BootstrapServers: bootstrapServers, }, }, @@ -812,7 +813,7 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) { { Id: BrokerUUID, Topic: GetTopic(), - Path: Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), BootstrapServers: bootstrapServers, }, }, @@ -975,7 +976,7 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) { Id: TriggerUUID + "b", }, }, - Path: Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), }, }, VolumeGeneration: 1, @@ -1018,7 +1019,7 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) { }, }, BootstrapServers: bootstrapServers, - Path: Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), }, }, VolumeGeneration: 2, @@ -1081,7 +1082,7 @@ func brokerFinalization(t *testing.T, format string, configs Configs) { { Id: BrokerUUID, Topic: GetTopic(), - Path: Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), }, }, VolumeGeneration: 1, @@ -1110,7 +1111,7 @@ func brokerFinalization(t *testing.T, format string, configs Configs) { Id: BrokerUUID, Topic: GetTopic(), DeadLetterSink: "http://test-service.test-service-namespace.svc.cluster.local/", - Path: Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), }, }, VolumeGeneration: 1, @@ -1343,6 +1344,8 @@ func useTable(t *testing.T, table TableTest, configs *Configs) { DataPlaneConfigMapName: configs.DataPlaneConfigMapName, DataPlaneConfigFormat: configs.DataPlaneConfigFormat, SystemNamespace: configs.SystemNamespace, + DispatcherLabel: base.BrokerDispatcherLabel, + ReceiverLabel: base.BrokerReceiverLabel, }, KafkaDefaultTopicDetails: defaultTopicDetail, KafkaDefaultTopicDetailsLock: sync.RWMutex{}, @@ -1434,31 +1437,3 @@ func getUnmarshallableError(format string) interface{} { } return "invalid character '-' after object key" } - -func TestPath(t *testing.T) { - type args struct { - namespace string - name string - } - tests := []struct { - name string - args args - want string - }{ - { - name: "namespace/name", - args: args{ - namespace: "broker-namespace", - name: "broker-name", - }, - want: "/broker-namespace/broker-name", - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := Path(tt.args.namespace, tt.args.name); got != tt.want { - t.Errorf("Path() = %v, want %v", got, tt.want) - } - }) - } -} diff --git a/control-plane/pkg/reconciler/broker/controller.go b/control-plane/pkg/reconciler/broker/controller.go index ca8e94253c..c8d8a3b050 100644 --- a/control-plane/pkg/reconciler/broker/controller.go +++ b/control-plane/pkg/reconciler/broker/controller.go @@ -37,6 +37,7 @@ import ( podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base" + brokerbase "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base/broker" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/kafka" ) @@ -51,7 +52,7 @@ const ( func NewController(ctx context.Context, watcher configmap.Watcher, configs *Configs) *controller.Impl { - eventing.RegisterAlternateBrokerConditionSet(ConditionSet) + eventing.RegisterAlternateBrokerConditionSet(brokerbase.ConditionSet) configmapInformer := configmapinformer.Get(ctx) @@ -63,6 +64,8 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *Conf DataPlaneConfigMapName: configs.DataPlaneConfigMapName, DataPlaneConfigFormat: configs.DataPlaneConfigFormat, SystemNamespace: configs.SystemNamespace, + DispatcherLabel: base.BrokerDispatcherLabel, + ReceiverLabel: base.BrokerReceiverLabel, }, NewClusterAdmin: sarama.NewClusterAdmin, KafkaDefaultTopicDetails: sarama.TopicDetail{ diff --git a/control-plane/pkg/reconciler/broker/controller_test.go b/control-plane/pkg/reconciler/broker/controller_test.go index cd827ffbf3..b8eae45d63 100644 --- a/control-plane/pkg/reconciler/broker/controller_test.go +++ b/control-plane/pkg/reconciler/broker/controller_test.go @@ -32,13 +32,15 @@ import ( "knative.dev/pkg/configmap" dynamicclient "knative.dev/pkg/injection/clients/dynamicclient/fake" reconcilertesting "knative.dev/pkg/reconciler/testing" + + brokerbase "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base/broker" ) func TestNewController(t *testing.T) { ctx, _ := reconcilertesting.SetupFakeContext(t) configs := &Configs{ - EnvConfigs: EnvConfigs{ + EnvConfigs: brokerbase.EnvConfigs{ SystemNamespace: "cm", GeneralConfigMapName: "cm", }, diff --git a/control-plane/pkg/reconciler/broker/topic.go b/control-plane/pkg/reconciler/broker/topic.go index 2a3438c18b..777e163439 100644 --- a/control-plane/pkg/reconciler/broker/topic.go +++ b/control-plane/pkg/reconciler/broker/topic.go @@ -17,42 +17,26 @@ package broker import ( - "fmt" - "github.com/Shopify/sarama" "go.uber.org/zap" - eventing "knative.dev/eventing/pkg/apis/eventing/v1" + + brokerbase "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base/broker" ) -func (r *Reconciler) CreateTopic(logger *zap.Logger, topic string, config *Config) (string, error) { +func (r *Reconciler) CreateTopic(logger *zap.Logger, topic string, config *brokerbase.TopicConfig) (string, error) { - kafkaClusterAdmin, err := r.getKafkaClusterAdmin(config.BootstrapServers) + kafkaClusterAdmin, err := brokerbase.GetClusterAdmin(r.NewClusterAdmin, config.BootstrapServers) if err != nil { return "", err } defer kafkaClusterAdmin.Close() - topicDetail := &sarama.TopicDetail{ - NumPartitions: config.TopicDetail.NumPartitions, - ReplicationFactor: config.TopicDetail.ReplicationFactor, - } - - logger.Debug("create topic", - zap.String("topic", topic), - zap.Int16("replicationFactor", topicDetail.ReplicationFactor), - zap.Int32("numPartitions", topicDetail.NumPartitions), - ) - - createTopicError := kafkaClusterAdmin.CreateTopic(topic, topicDetail, false) - if err, ok := createTopicError.(*sarama.TopicError); ok && err.Err == sarama.ErrTopicAlreadyExists { - return topic, nil - } - - return topic, createTopicError + return brokerbase.CreateTopic(kafkaClusterAdmin, logger, topic, config) } func (r *Reconciler) deleteTopic(topic string, bootstrapServers []string) (string, error) { - kafkaClusterAdmin, err := r.getKafkaClusterAdmin(bootstrapServers) + + kafkaClusterAdmin, err := brokerbase.GetClusterAdmin(r.NewClusterAdmin, bootstrapServers) if err != nil { return "", err } @@ -68,7 +52,3 @@ func (r *Reconciler) deleteTopic(topic string, bootstrapServers []string) (strin return topic, nil } - -func Topic(broker *eventing.Broker) string { - return fmt.Sprintf("%s%s-%s", TopicPrefix, broker.Namespace, broker.Name) -} diff --git a/control-plane/pkg/reconciler/broker/topic_test.go b/control-plane/pkg/reconciler/broker/topic_test.go index 51eea319e8..4580c7f3ee 100644 --- a/control-plane/pkg/reconciler/broker/topic_test.go +++ b/control-plane/pkg/reconciler/broker/topic_test.go @@ -25,6 +25,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" eventing "knative.dev/eventing/pkg/apis/eventing/v1" + brokerbase "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base/broker" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker" reconcilertesting "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/testing" ) @@ -37,7 +38,7 @@ func TestCreateTopicTopicAlreadyExists(t *testing.T) { Namespace: "bnamespace", }, } - topic := broker.Topic(b) + topic := brokerbase.Topic(broker.TopicPrefix, b) errMsg := "topic already exists" r := broker.Reconciler{ @@ -54,7 +55,7 @@ func TestCreateTopicTopicAlreadyExists(t *testing.T) { }, } - topicRet, err := r.CreateTopic(zap.NewNop(), topic, &broker.Config{}) + topicRet, err := r.CreateTopic(zap.NewNop(), topic, &brokerbase.TopicConfig{}) assert.Equal(t, topicRet, topic, "expected topic %s go %s", topic, topicRet) assert.Nil(t, err, "expected nil error on topic already exists") diff --git a/control-plane/pkg/reconciler/sink/controller.go b/control-plane/pkg/reconciler/sink/controller.go new file mode 100644 index 0000000000..bbdf39c589 --- /dev/null +++ b/control-plane/pkg/reconciler/sink/controller.go @@ -0,0 +1,49 @@ +/* + * Copyright 2020 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package sink + +import ( + "context" + + kubeclient "knative.dev/pkg/client/injection/kube/client" + podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod" + "knative.dev/pkg/configmap" + "knative.dev/pkg/controller" + + sinkreconciler "knative.dev/eventing-kafka-broker/control-plane/pkg/client/injection/reconciler/eventing/v1alpha1/kafkasink" + "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base" + brokerbase "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base/broker" +) + +func NewController(ctx context.Context, _ configmap.Watcher, configs *brokerbase.EnvConfigs) *controller.Impl { + + reconciler := &Reconciler{ + Reconciler: &base.Reconciler{ + KubeClient: kubeclient.Get(ctx), + PodLister: podinformer.Get(ctx).Lister(), + DataPlaneConfigMapNamespace: configs.DataPlaneConfigMapNamespace, + DataPlaneConfigMapName: configs.DataPlaneConfigMapName, + DataPlaneConfigFormat: configs.DataPlaneConfigFormat, + SystemNamespace: configs.SystemNamespace, + ReceiverLabel: base.SinkReceiverLabel, + }, + } + + impl := sinkreconciler.NewImpl(ctx, reconciler) + + return impl +} diff --git a/control-plane/pkg/reconciler/sink/kafka_sink.go b/control-plane/pkg/reconciler/sink/kafka_sink.go new file mode 100644 index 0000000000..8bad770c4f --- /dev/null +++ b/control-plane/pkg/reconciler/sink/kafka_sink.go @@ -0,0 +1,182 @@ +/* + * Copyright 2020 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package sink + +import ( + "context" + "math" + + "github.com/Shopify/sarama" + "go.uber.org/zap" + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/util/retry" + "knative.dev/pkg/controller" + "knative.dev/pkg/reconciler" + + eventing "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/eventing/v1alpha1" + coreconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/core/config" + "knative.dev/eventing-kafka-broker/control-plane/pkg/log" + "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base" + brokerbase "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base/broker" + "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker" +) + +const ( + // TopicPrefix is the Kafka Sink topic prefix - (topic name: knative-sink--) + TopicPrefix = "knative-sink-" +) + +type Reconciler struct { + *base.Reconciler + + ConfigMapLister corelisters.ConfigMapLister + + // NewClusterAdmin creates new sarama ClusterAdmin. It's convenient to add this as Reconciler field so that we can + // mock the function used during the reconciliation loop. + NewClusterAdmin func(addrs []string, config *sarama.Config) (sarama.ClusterAdmin, error) + + Configs *brokerbase.EnvConfigs +} + +func (r *Reconciler) ReconcileKind(ctx context.Context, ks *eventing.KafkaSink) reconciler.Event { + return retry.RetryOnConflict(retry.DefaultRetry, func() error { + return r.reconcileKind(ctx, ks) + }) +} + +func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink) error { + + logger := log.Logger(ctx, "reconcile", ks) + + config := &brokerbase.TopicConfig{ + TopicDetail: sarama.TopicDetail{ + NumPartitions: ks.Spec.NumPartitions, + ReplicationFactor: ks.Spec.ReplicationFactor, + }, + BootstrapServers: broker.BootstrapServersArray(ks.Spec.BootstrapServers), + } + + statusConditionManager := brokerbase.StatusConditionManager{ + Object: ks, + SetAddress: ks.Status.SetAddress, + Configs: r.Configs, + Recorder: controller.GetEventRecorder(ctx), + } + + kafkaClusterAdmin, err := brokerbase.GetClusterAdmin(r.NewClusterAdmin, config.BootstrapServers) + if err != nil { + return statusConditionManager.FailedToCreateTopic("", err) + } + defer kafkaClusterAdmin.Close() + + topic, err := brokerbase.CreateTopic(kafkaClusterAdmin, logger, brokerbase.Topic(TopicPrefix, ks), config) + if err != nil { + return statusConditionManager.FailedToCreateTopic(topic, err) + } + statusConditionManager.TopicCreated(topic) + + logger.Debug("Topic created", zap.Any("topic", topic)) + + // Get brokers and triggers config map. + brokersTriggersConfigMap, err := r.GetOrCreateDataPlaneConfigMap() + if err != nil { + return statusConditionManager.FailedToGetBrokersTriggersConfigMap(err) + } + + logger.Debug("Got brokers and triggers config map") + + // Get brokersTriggers data. + brokersTriggers, err := r.GetDataPlaneConfigMapData(logger, brokersTriggersConfigMap) + if err != nil && brokersTriggers == nil { + return statusConditionManager.FailedToGetBrokersTriggersDataFromConfigMap(err) + } + if brokersTriggers == nil { + brokersTriggers = &coreconfig.Brokers{} + } + + logger.Debug( + "Got brokers and triggers data from config map", + zap.Any(base.BrokersTriggersDataLogKey, log.BrokersMarshaller{Brokers: brokersTriggers}), + ) + + brokerIndex := broker.FindBroker(brokersTriggers, ks.UID) + + // Get broker configuration. + brokerConfig := &coreconfig.Broker{ + Id: string(ks.UID), + Topic: topic, + Path: brokerbase.PathFromObject(ks), + BootstrapServers: ks.Spec.BootstrapServers, + } + + // Update brokersTriggers data with the new broker configuration + if brokerIndex != broker.NoBroker { + brokerConfig.Triggers = brokersTriggers.Brokers[brokerIndex].Triggers + brokersTriggers.Brokers[brokerIndex] = brokerConfig + + logger.Debug("Broker exists", zap.Int("index", brokerIndex)) + + } else { + brokersTriggers.Brokers = append(brokersTriggers.Brokers, brokerConfig) + + logger.Debug("Broker doesn't exist") + } + + // Increment volumeGeneration + brokersTriggers.VolumeGeneration = incrementVolumeGeneration(brokersTriggers.VolumeGeneration) + + // Update the configuration map with the new brokersTriggers data. + if err := r.UpdateDataPlaneConfigMap(brokersTriggers, brokersTriggersConfigMap); err != nil { + return err + } + statusConditionManager.BrokersTriggersConfigMapUpdated() + + logger.Debug("Brokers and triggers config map updated") + + // After #37 we reject events to a non-existing Broker, which means that we cannot consider a Broker Ready if all + // receivers haven't got the Broker, so update failures to receiver pods is a hard failure. + // On the other side, dispatcher pods care about Triggers, and the Broker object is used as a configuration + // prototype for all associated Triggers, so we consider that it's fine on the dispatcher side to receive eventually + // the update even if here eventually means seconds or minutes after the actual update. + + // Update volume generation annotation of receiver pods + if err := r.UpdateReceiverPodsAnnotation(logger, brokersTriggers.VolumeGeneration); err != nil { + return err + } + + logger.Debug("Updated receiver pod annotation") + + return statusConditionManager.Reconciled() +} + +func (r *Reconciler) FinalizeKind(ctx context.Context, ks *eventing.KafkaSink) reconciler.Event { + return retry.RetryOnConflict(retry.DefaultRetry, func() error { + return r.finalizeKind(ctx, ks) + }) +} + +func (r *Reconciler) finalizeKind(ctx context.Context, ks *eventing.KafkaSink) error { + + // logger := log.Logger(ctx, "finalize", ks) + + // TODO implement finalizer + panic("implement me") +} + +func incrementVolumeGeneration(generation uint64) uint64 { + return (generation + 1) % (math.MaxUint64 - 1) +} diff --git a/control-plane/pkg/reconciler/testing/factory.go b/control-plane/pkg/reconciler/testing/factory.go index 90dac4c60d..a90df40c9f 100644 --- a/control-plane/pkg/reconciler/testing/factory.go +++ b/control-plane/pkg/reconciler/testing/factory.go @@ -34,6 +34,7 @@ import ( . "knative.dev/pkg/reconciler/testing" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base" + brokerbase "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base/broker" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker" ) @@ -45,7 +46,7 @@ const ( var DefaultConfigs = &broker.Configs{ - EnvConfigs: broker.EnvConfigs{ + EnvConfigs: brokerbase.EnvConfigs{ DataPlaneConfigMapName: "kafka-broker-brokers-triggers", DataPlaneConfigMapNamespace: "knative-eventing", BrokerIngressName: "kafka-broker-receiver", diff --git a/control-plane/pkg/reconciler/testing/objects.go b/control-plane/pkg/reconciler/testing/objects.go index 8eb064bc15..2ac4e81804 100644 --- a/control-plane/pkg/reconciler/testing/objects.go +++ b/control-plane/pkg/reconciler/testing/objects.go @@ -36,6 +36,7 @@ import ( coreconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/core/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base" + brokerbase "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base/broker" . "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/kafka" ) @@ -73,7 +74,7 @@ func NewDispatcherPod(namespace string, annotations map[string]string) runtime.O Namespace: namespace, Annotations: annotations, Labels: map[string]string{ - "app": base.DispatcherLabel, + "app": base.BrokerDispatcherLabel, }, }, } @@ -90,7 +91,7 @@ func NewReceiverPod(namespace string, annotations map[string]string) runtime.Obj Namespace: namespace, Annotations: annotations, Labels: map[string]string{ - "app": base.ReceiverLabel, + "app": base.BrokerReceiverLabel, }, }, } @@ -260,7 +261,7 @@ func BrokerReady(broker *eventing.Broker) { func ConfigMapUpdatedReady(configs *Configs) func(broker *eventing.Broker) { return func(broker *eventing.Broker) { broker.GetConditionSet().Manage(broker.GetStatus()).MarkTrueWithReason( - ConditionConfigMapUpdated, + brokerbase.ConditionConfigMapUpdated, fmt.Sprintf("Config map %s updated", configs.DataPlaneConfigMapAsString()), "", ) @@ -269,19 +270,19 @@ func ConfigMapUpdatedReady(configs *Configs) func(broker *eventing.Broker) { func TopicReady(broker *eventing.Broker) { broker.GetConditionSet().Manage(broker.GetStatus()).MarkTrueWithReason( - ConditionTopicReady, - fmt.Sprintf("Topic %s created", Topic(broker)), + brokerbase.ConditionTopicReady, + fmt.Sprintf("Topic %s created", brokerbase.Topic(TopicPrefix, broker)), "", ) } func ConfigParsed(broker *eventing.Broker) { - broker.GetConditionSet().Manage(broker.GetStatus()).MarkTrue(ConditionConfigParsed) + broker.GetConditionSet().Manage(broker.GetStatus()).MarkTrue(brokerbase.ConditionConfigParsed) } func ConfigNotParsed(reason string) func(broker *eventing.Broker) { return func(broker *eventing.Broker) { - broker.GetConditionSet().Manage(broker.GetStatus()).MarkFalse(ConditionConfigParsed, reason, "") + broker.GetConditionSet().Manage(broker.GetStatus()).MarkFalse(brokerbase.ConditionConfigParsed, reason, "") } } @@ -295,14 +296,14 @@ func Addressable(configs *Configs) func(broker *eventing.Broker) { Path: fmt.Sprintf("/%s/%s", broker.Namespace, broker.Name), } - broker.GetConditionSet().Manage(&broker.Status).MarkTrue(ConditionAddressable) + broker.GetConditionSet().Manage(&broker.Status).MarkTrue(brokerbase.ConditionAddressable) } } func FailedToCreateTopic(broker *eventing.Broker) { broker.GetConditionSet().Manage(broker.GetStatus()).MarkFalse( - ConditionTopicReady, + brokerbase.ConditionTopicReady, fmt.Sprintf("Failed to create topic: %s", GetTopic()), "%v", fmt.Errorf("failed to create topic"), @@ -315,7 +316,7 @@ func FailedToGetConfigMap(configs *Configs) func(broker *eventing.Broker) { return func(broker *eventing.Broker) { broker.GetConditionSet().Manage(broker.GetStatus()).MarkFalse( - ConditionConfigMapUpdated, + brokerbase.ConditionConfigMapUpdated, fmt.Sprintf( "Failed to get ConfigMap: %s", configs.DataPlaneConfigMapAsString(), diff --git a/control-plane/pkg/reconciler/trigger/controller.go b/control-plane/pkg/reconciler/trigger/controller.go index bbea60a807..654769b7f0 100644 --- a/control-plane/pkg/reconciler/trigger/controller.go +++ b/control-plane/pkg/reconciler/trigger/controller.go @@ -38,7 +38,7 @@ import ( eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base" - "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker" + brokerbase "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base/broker" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/kafka" ) @@ -48,7 +48,7 @@ const ( FinalizerName = "kafka.triggers.eventing.knative.dev" ) -func NewController(ctx context.Context, _ configmap.Watcher, configs *broker.EnvConfigs) *controller.Impl { +func NewController(ctx context.Context, _ configmap.Watcher, configs *brokerbase.EnvConfigs) *controller.Impl { logger := logging.FromContext(ctx) @@ -64,6 +64,8 @@ func NewController(ctx context.Context, _ configmap.Watcher, configs *broker.Env DataPlaneConfigMapName: configs.DataPlaneConfigMapName, DataPlaneConfigFormat: configs.DataPlaneConfigFormat, SystemNamespace: configs.SystemNamespace, + DispatcherLabel: base.BrokerDispatcherLabel, + ReceiverLabel: base.BrokerReceiverLabel, }, BrokerLister: brokerInformer.Lister(), EventingClient: eventingclient.Get(ctx), diff --git a/control-plane/pkg/reconciler/trigger/controller_test.go b/control-plane/pkg/reconciler/trigger/controller_test.go index 3c3c1f901c..2548963209 100644 --- a/control-plane/pkg/reconciler/trigger/controller_test.go +++ b/control-plane/pkg/reconciler/trigger/controller_test.go @@ -31,13 +31,13 @@ import ( _ "knative.dev/pkg/client/injection/ducks/duck/v1/addressable/fake" _ "knative.dev/pkg/client/injection/kube/informers/core/v1/pod/fake" - brokerreconciler "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker" + brokerbase "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base/broker" ) func TestNewController(t *testing.T) { ctx, _ := reconcilertesting.SetupFakeContext(t) - controller := NewController(ctx, configmap.NewStaticWatcher(), &brokerreconciler.EnvConfigs{}) + controller := NewController(ctx, configmap.NewStaticWatcher(), &brokerbase.EnvConfigs{}) if controller == nil { t.Error("failed to create controller: ") } diff --git a/control-plane/pkg/reconciler/trigger/trigger.go b/control-plane/pkg/reconciler/trigger/trigger.go index e02bcd93f6..ef9284a253 100644 --- a/control-plane/pkg/reconciler/trigger/trigger.go +++ b/control-plane/pkg/reconciler/trigger/trigger.go @@ -35,6 +35,7 @@ import ( coreconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/core/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/log" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base" + brokerbase "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base/broker" brokerreconciler "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker" ) @@ -49,7 +50,7 @@ type Reconciler struct { EventingClient eventingclientset.Interface Resolver *resolver.URIResolver - Configs *brokerreconciler.EnvConfigs + Configs *brokerbase.EnvConfigs } func (r *Reconciler) ReconcileKind(ctx context.Context, trigger *eventing.Trigger) reconciler.Event { @@ -97,7 +98,7 @@ func (r *Reconciler) finalizeKind(ctx context.Context, trigger *eventing.Trigger zap.Any(base.BrokersTriggersDataLogKey, log.BrokersMarshaller{Brokers: brokersTriggers}), ) - brokerIndex := brokerreconciler.FindBroker(brokersTriggers, broker) + brokerIndex := brokerreconciler.FindBroker(brokersTriggers, broker.UID) if brokerIndex == brokerreconciler.NoBroker { // If the broker is not there, resources associated with the Trigger are deleted accordingly. return nil @@ -246,7 +247,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, trigger *eventing.Trigge zap.Any(base.BrokersTriggersDataLogKey, log.BrokersMarshaller{Brokers: dataPlaneConfig}), ) - brokerIndex := brokerreconciler.FindBroker(dataPlaneConfig, broker) + brokerIndex := brokerreconciler.FindBroker(dataPlaneConfig, broker.UID) if brokerIndex == brokerreconciler.NoBroker { return statusConditionManager.brokerNotFoundInDataPlaneConfigMap() } diff --git a/control-plane/pkg/reconciler/trigger/trigger_lifecycle.go b/control-plane/pkg/reconciler/trigger/trigger_lifecycle.go index 9d9f968890..2b767b1e31 100644 --- a/control-plane/pkg/reconciler/trigger/trigger_lifecycle.go +++ b/control-plane/pkg/reconciler/trigger/trigger_lifecycle.go @@ -25,13 +25,13 @@ import ( "knative.dev/pkg/apis" "knative.dev/pkg/reconciler" - "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker" + brokerbase "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base/broker" ) type statusConditionManager struct { Trigger *eventing.Trigger - Configs *broker.EnvConfigs + Configs *brokerbase.EnvConfigs Recorder record.EventRecorder } diff --git a/control-plane/pkg/reconciler/trigger/trigger_test.go b/control-plane/pkg/reconciler/trigger/trigger_test.go index 7953b4b8a2..bc02c86d0b 100644 --- a/control-plane/pkg/reconciler/trigger/trigger_test.go +++ b/control-plane/pkg/reconciler/trigger/trigger_test.go @@ -40,6 +40,7 @@ import ( coreconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/core/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base" + brokerbase "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base/broker" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker" . "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/testing" ) @@ -60,7 +61,7 @@ var ( ) func TestTriggerReconciler(t *testing.T) { - eventing.RegisterAlternateBrokerConditionSet(broker.ConditionSet) + eventing.RegisterAlternateBrokerConditionSet(brokerbase.ConditionSet) t.Parallel() @@ -89,7 +90,7 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs) { Id: BrokerUUID, Topic: GetTopic(), - Path: broker.Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), }, }, }, &configs), @@ -108,7 +109,7 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs) { Id: BrokerUUID, Topic: GetTopic(), - Path: broker.Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), Triggers: []*coreconfig.Trigger{ { Destination: ServiceURL, @@ -149,7 +150,7 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs) { Id: BrokerUUID + "z", Topic: GetTopic(), - Path: broker.Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), Triggers: []*coreconfig.Trigger{ { Destination: ServiceURL, @@ -160,7 +161,7 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs) { Id: BrokerUUID, Topic: GetTopic(), - Path: broker.Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), Triggers: []*coreconfig.Trigger{ { Attributes: map[string]string{ @@ -196,7 +197,7 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs) { Id: BrokerUUID + "z", Topic: GetTopic(), - Path: broker.Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), Triggers: []*coreconfig.Trigger{ { Destination: ServiceURL, @@ -207,7 +208,7 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs) { Id: BrokerUUID, Topic: GetTopic(), - Path: broker.Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), Triggers: []*coreconfig.Trigger{ { Attributes: map[string]string{ @@ -471,7 +472,7 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs) { Id: BrokerUUID, Topic: GetTopic(), - Path: broker.Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), Triggers: []*coreconfig.Trigger{ { Destination: ServiceURL, @@ -499,7 +500,7 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs) { Id: BrokerUUID, Topic: GetTopic(), - Path: broker.Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), Triggers: []*coreconfig.Trigger{ { Destination: ServiceURL, @@ -550,7 +551,7 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs) { Id: BrokerUUID, Topic: GetTopic(), - Path: broker.Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), Triggers: []*coreconfig.Trigger{ { Destination: ServiceURL, @@ -579,7 +580,7 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs) { Id: BrokerUUID + "a", Topic: GetTopic(), - Path: broker.Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), Triggers: []*coreconfig.Trigger{ { Destination: ServiceURL, @@ -622,7 +623,7 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs) { Id: BrokerUUID, Topic: GetTopic(), - Path: broker.Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), Triggers: []*coreconfig.Trigger{ { Destination: ServiceURL, @@ -654,7 +655,7 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs) { Id: BrokerUUID + "a", Topic: GetTopic(), - Path: broker.Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), Triggers: []*coreconfig.Trigger{ { Destination: ServiceURL, @@ -720,7 +721,7 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs) { Id: BrokerUUID + "a", Topic: GetTopic(), - Path: broker.Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), Triggers: []*coreconfig.Trigger{ { Destination: ServiceURL, @@ -749,7 +750,7 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs) { Id: BrokerUUID, Topic: GetTopic(), - Path: broker.Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), Triggers: []*coreconfig.Trigger{ { Destination: "http://example.com/1", @@ -788,7 +789,7 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs) { Id: BrokerUUID + "a", Topic: GetTopic(), - Path: broker.Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), Triggers: []*coreconfig.Trigger{ { Destination: ServiceURL, @@ -817,7 +818,7 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs) { Id: BrokerUUID, Topic: GetTopic(), - Path: broker.Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), Triggers: []*coreconfig.Trigger{ { Destination: "http://example.com/1", @@ -882,7 +883,7 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs) { Id: BrokerUUID + "a", Topic: GetTopic(), - Path: broker.Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), Triggers: []*coreconfig.Trigger{ { Destination: ServiceURL, @@ -911,7 +912,7 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs) { Id: BrokerUUID, Topic: GetTopic(), - Path: broker.Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), Triggers: []*coreconfig.Trigger{ { Destination: "http://example.com/1", @@ -940,7 +941,7 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs) { Id: BrokerUUID + "a", Topic: GetTopic(), - Path: broker.Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), Triggers: []*coreconfig.Trigger{ { Destination: ServiceURL, @@ -983,7 +984,7 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs) { Id: BrokerUUID + "a", Topic: GetTopic(), - Path: broker.Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), Triggers: []*coreconfig.Trigger{ { Destination: ServiceURL, @@ -1012,7 +1013,7 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs) { Id: BrokerUUID, Topic: GetTopic(), - Path: broker.Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), Triggers: []*coreconfig.Trigger{ { Destination: "http://example.com/1", @@ -1041,7 +1042,7 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs) { Id: BrokerUUID + "a", Topic: GetTopic(), - Path: broker.Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), Triggers: []*coreconfig.Trigger{ { Destination: ServiceURL, @@ -1241,7 +1242,7 @@ func triggerFinalizer(t *testing.T, format string, configs broker.Configs) { { Id: BrokerUUID, Topic: GetTopic(), - Path: broker.Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), Triggers: []*coreconfig.Trigger{ { Destination: ServiceURL, @@ -1270,7 +1271,7 @@ func triggerFinalizer(t *testing.T, format string, configs broker.Configs) { { Id: BrokerUUID + "a", Topic: GetTopic(), - Path: broker.Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), Triggers: []*coreconfig.Trigger{ { Destination: ServiceURL, @@ -1313,7 +1314,7 @@ func triggerFinalizer(t *testing.T, format string, configs broker.Configs) { { Id: BrokerUUID, Topic: GetTopic(), - Path: broker.Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), Triggers: []*coreconfig.Trigger{ { Destination: "http://example.com/3", @@ -1338,7 +1339,7 @@ func triggerFinalizer(t *testing.T, format string, configs broker.Configs) { { Id: BrokerUUID + "a", Topic: GetTopic(), - Path: broker.Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), Triggers: []*coreconfig.Trigger{ { Destination: ServiceURL, @@ -1392,7 +1393,7 @@ func triggerFinalizer(t *testing.T, format string, configs broker.Configs) { { Id: BrokerUUID + "a", Topic: GetTopic(), - Path: broker.Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), Triggers: []*coreconfig.Trigger{ { Destination: ServiceURL, @@ -1421,7 +1422,7 @@ func triggerFinalizer(t *testing.T, format string, configs broker.Configs) { { Id: BrokerUUID, Topic: GetTopic(), - Path: broker.Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), Triggers: []*coreconfig.Trigger{ { Destination: "http://example.com/1", @@ -1464,7 +1465,7 @@ func triggerFinalizer(t *testing.T, format string, configs broker.Configs) { { Id: BrokerUUID + "a", Topic: GetTopic(), - Path: broker.Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), Triggers: []*coreconfig.Trigger{ { Destination: ServiceURL, @@ -1493,7 +1494,7 @@ func triggerFinalizer(t *testing.T, format string, configs broker.Configs) { { Id: BrokerUUID, Topic: GetTopic(), - Path: broker.Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), Triggers: []*coreconfig.Trigger{ { Destination: "http://example.com/1", @@ -1543,7 +1544,7 @@ func triggerFinalizer(t *testing.T, format string, configs broker.Configs) { { Id: BrokerUUID + "a", Topic: GetTopic(), - Path: broker.Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), Triggers: []*coreconfig.Trigger{ { Destination: ServiceURL, @@ -1572,7 +1573,7 @@ func triggerFinalizer(t *testing.T, format string, configs broker.Configs) { { Id: BrokerUUID, Topic: GetTopic(), - Path: broker.Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), Triggers: []*coreconfig.Trigger{ { Destination: "http://example.com/1", @@ -1601,7 +1602,7 @@ func triggerFinalizer(t *testing.T, format string, configs broker.Configs) { { Id: BrokerUUID + "a", Topic: GetTopic(), - Path: broker.Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), Triggers: []*coreconfig.Trigger{ { Destination: ServiceURL, @@ -1644,7 +1645,7 @@ func triggerFinalizer(t *testing.T, format string, configs broker.Configs) { { Id: BrokerUUID + "a", Topic: GetTopic(), - Path: broker.Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), Triggers: []*coreconfig.Trigger{ { Destination: ServiceURL, @@ -1673,7 +1674,7 @@ func triggerFinalizer(t *testing.T, format string, configs broker.Configs) { { Id: BrokerUUID, Topic: GetTopic(), - Path: broker.Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), Triggers: []*coreconfig.Trigger{ { Destination: "http://example.com/1", @@ -1698,7 +1699,7 @@ func triggerFinalizer(t *testing.T, format string, configs broker.Configs) { { Id: BrokerUUID + "a", Topic: GetTopic(), - Path: broker.Path(BrokerNamespace, BrokerName), + Path: brokerbase.Path(BrokerNamespace, BrokerName), Triggers: []*coreconfig.Trigger{ { Destination: ServiceURL, @@ -1758,6 +1759,8 @@ func useTable(t *testing.T, table TableTest, configs *broker.Configs) { DataPlaneConfigMapName: configs.DataPlaneConfigMapName, DataPlaneConfigFormat: configs.DataPlaneConfigFormat, SystemNamespace: configs.SystemNamespace, + DispatcherLabel: base.BrokerDispatcherLabel, + ReceiverLabel: base.BrokerReceiverLabel, }, BrokerLister: listers.GetBrokerLister(), EventingClient: eventingclient.Get(ctx), diff --git a/test/cmd/watch-cm/main.go b/test/cmd/watch-cm/main.go index 3124531c84..48e0d04c97 100644 --- a/test/cmd/watch-cm/main.go +++ b/test/cmd/watch-cm/main.go @@ -22,13 +22,13 @@ import ( "github.com/kelseyhightower/envconfig" "k8s.io/apimachinery/pkg/types" - "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker" + brokerbase "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base/broker" testobservability "knative.dev/eventing-kafka-broker/test/pkg/observability" ) func main() { - envConfig := &broker.EnvConfigs{} + envConfig := &brokerbase.EnvConfigs{} if err := envconfig.Process("", envConfig); err != nil { log.Fatal("failed to process env config", err) diff --git a/test/e2e/broker_trigger_test.go b/test/e2e/broker_trigger_test.go index 85a769e621..88dbf479ae 100644 --- a/test/e2e/broker_trigger_test.go +++ b/test/e2e/broker_trigger_test.go @@ -31,6 +31,7 @@ import ( "knative.dev/eventing/test/lib/resources" duckv1 "knative.dev/pkg/apis/duck/v1" + brokerbase "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base/broker" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/kafka" kafkatest "knative.dev/eventing-kafka-broker/test/pkg/kafka" @@ -141,7 +142,7 @@ func TestBrokerTrigger(t *testing.T) { BootstrapServers: bootstrapServers, ReplicationFactor: defaultReplicationFactor, NumPartitions: defaultNumPartitions, - Topic: broker.Topic(br), + Topic: brokerbase.Topic(broker.TopicPrefix, br), } err := kafkatest.VerifyNumPartitionAndReplicationFactor( @@ -251,7 +252,7 @@ func TestBrokerWithConfig(t *testing.T) { BootstrapServers: bootstrapServers, ReplicationFactor: replicationFactor, NumPartitions: numPartitions, - Topic: broker.Topic(br), + Topic: brokerbase.Topic(broker.TopicPrefix, br), } err := kafkatest.VerifyNumPartitionAndReplicationFactor(