From 5088f85226fae6c0e7e8787fb54cb6922ad95001 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Tue, 20 Jun 2023 17:18:09 -0400 Subject: [PATCH 01/19] Updated broker to use broker topic name template --- control-plane/pkg/reconciler/broker/broker.go | 20 ++++++++++++------- .../pkg/reconciler/broker/broker_test.go | 17 ++++++++++------ .../pkg/reconciler/broker/controller.go | 3 +++ .../reconciler/broker/namespaced_broker.go | 4 ++++ .../broker/namespaced_broker_test.go | 6 +++++- .../broker/namespaced_controller.go | 2 ++ .../pkg/reconciler/testing/objects_broker.go | 20 ++++++++++++++++--- test/rekt/features/broker_config.go | 15 +++++++------- test/rekt/features/kafka_sink.go | 19 +++++++++--------- 9 files changed, 71 insertions(+), 35 deletions(-) diff --git a/control-plane/pkg/reconciler/broker/broker.go b/control-plane/pkg/reconciler/broker/broker.go index 864eb7938f..b835b1e4c1 100644 --- a/control-plane/pkg/reconciler/broker/broker.go +++ b/control-plane/pkg/reconciler/broker/broker.go @@ -37,6 +37,7 @@ import ( "knative.dev/pkg/reconciler" "knative.dev/pkg/resolver" + apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/contract" coreconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/core/config" @@ -50,9 +51,6 @@ import ( ) const ( - // TopicPrefix is the Kafka Broker topic prefix - (topic name: knative-broker--). - TopicPrefix = "knative-broker-" - // ExternalTopicAnnotation for using external kafka topic for the broker ExternalTopicAnnotation = "kafka.eventing.knative.dev/external.topic" @@ -80,8 +78,9 @@ type Reconciler struct { BootstrapServers string - Prober prober.Prober - Counter *counter.Counter + Prober prober.Prober + Counter *counter.Counter + KafkaFeatureFlags *apisconfig.KafkaFeatureFlags } func (r *Reconciler) ReconcileKind(ctx context.Context, broker *eventing.Broker) reconciler.Event { @@ -311,7 +310,10 @@ func (r *Reconciler) reconcileBrokerTopic(broker *eventing.Broker, securityOptio } } else { // no external topic, we create it - topicName = kafka.BrokerTopic(TopicPrefix, broker) + topicName, err = r.KafkaFeatureFlags.ExecuteBrokersTopicTemplate(broker.ObjectMeta) + if err != nil { + return "", statusConditionManager.TopicsNotPresentOrInvalidErr([]string{topicName}, err) + } topic, err := kafka.CreateTopicIfDoesntExist(kafkaClusterAdminClient, logger, topicName, topicConfig) if err != nil { @@ -503,7 +505,11 @@ func (r *Reconciler) finalizeNonExternalBrokerTopic(broker *eventing.Broker, sec } defer kafkaClusterAdminClient.Close() - topic, err := kafka.DeleteTopic(kafkaClusterAdminClient, kafka.BrokerTopic(TopicPrefix, broker)) + topicName, err := r.KafkaFeatureFlags.ExecuteBrokersTopicTemplate(broker.ObjectMeta) + if err != nil { + return err + } + topic, err := kafka.DeleteTopic(kafkaClusterAdminClient, topicName) if err != nil { return err } diff --git a/control-plane/pkg/reconciler/broker/broker_test.go b/control-plane/pkg/reconciler/broker/broker_test.go index a9cf8939d2..3ac3d2e045 100644 --- a/control-plane/pkg/reconciler/broker/broker_test.go +++ b/control-plane/pkg/reconciler/broker/broker_test.go @@ -37,11 +37,13 @@ import ( "knative.dev/eventing-kafka-broker/control-plane/pkg/contract" "github.com/Shopify/sarama" + "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" clientgotesting "k8s.io/client-go/testing" + apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" eventing "knative.dev/eventing/pkg/apis/eventing/v1" "knative.dev/eventing/pkg/apis/feature" "knative.dev/eventing/pkg/eventingtls/eventingtlstesting" @@ -98,8 +100,9 @@ var ( createTopicError = fmt.Errorf("failed to create topic") deleteTopicError = fmt.Errorf("failed to delete topic") - linear = eventingduck.BackoffPolicyLinear - exponential = eventingduck.BackoffPolicyExponential + linear = eventingduck.BackoffPolicyLinear + exponential = eventingduck.BackoffPolicyExponential + kafkaFeatureFlags = apisconfig.DefaultFeaturesConfig() ) var DefaultEnv = &config.Env{ @@ -2789,7 +2792,8 @@ func useTable(t *testing.T, table TableTest, env *config.Env) { expectedTopicDetail = td.(sarama.TopicDetail) } - expectedTopicName := fmt.Sprintf("%s%s-%s", TopicPrefix, BrokerNamespace, BrokerName) + expectedTopicName, err := kafkaFeatureFlags.ExecuteBrokersTopicTemplate(metav1.ObjectMeta{Namespace: BrokerNamespace, Name: BrokerName}) + require.NoError(t, err, "Failed to create broker topic name from feature flags") if t, ok := row.OtherTestData[externalTopic]; ok { expectedTopicName = t.(string) } @@ -2830,9 +2834,10 @@ func useTable(t *testing.T, table TableTest, env *config.Env) { T: t, }, nil }, - Env: env, - Prober: proberMock, - Counter: counter.NewExpiringCounter(ctx), + Env: env, + Prober: proberMock, + Counter: counter.NewExpiringCounter(ctx), + KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(), } reconciler.Tracker = &FakeTracker{} diff --git a/control-plane/pkg/reconciler/broker/controller.go b/control-plane/pkg/reconciler/broker/controller.go index aabdfccd10..3503770d9c 100644 --- a/control-plane/pkg/reconciler/broker/controller.go +++ b/control-plane/pkg/reconciler/broker/controller.go @@ -39,6 +39,7 @@ import ( podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod" secretinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/secret" + apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/counter" "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka" @@ -56,6 +57,7 @@ func NewController(ctx context.Context, watcher configmap.Watcher, env *config.E eventing.RegisterAlternateBrokerConditionSet(base.IngressConditionSet) configmapInformer := configmapinformer.Get(ctx) + featureFlags := apisconfig.DefaultFeaturesConfig() reconciler := &Reconciler{ Reconciler: &base.Reconciler{ @@ -73,6 +75,7 @@ func NewController(ctx context.Context, watcher configmap.Watcher, env *config.E ConfigMapLister: configmapInformer.Lister(), Env: env, Counter: counter.NewExpiringCounter(ctx), + KafkaFeatureFlags: featureFlags, } logger := logging.FromContext(ctx) diff --git a/control-plane/pkg/reconciler/broker/namespaced_broker.go b/control-plane/pkg/reconciler/broker/namespaced_broker.go index 2c3679eaaf..e03b03d689 100644 --- a/control-plane/pkg/reconciler/broker/namespaced_broker.go +++ b/control-plane/pkg/reconciler/broker/namespaced_broker.go @@ -46,6 +46,7 @@ import ( brokerreconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1/broker" eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1" + apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/counter" "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka" @@ -86,6 +87,8 @@ type NamespacedReconciler struct { ManifestivalClient mf.Client DataplaneLifecycleLocksByNamespace util.LockMap[string] + + KafkaFeatureFlags *apisconfig.KafkaFeatureFlags } func (r *NamespacedReconciler) ReconcileKind(ctx context.Context, broker *eventing.Broker) reconciler.Event { @@ -238,6 +241,7 @@ func (r *NamespacedReconciler) createReconcilerForBrokerInstance(broker *eventin BootstrapServers: r.BootstrapServers, Prober: r.Prober, Counter: r.Counter, + KafkaFeatureFlags: r.KafkaFeatureFlags, } } diff --git a/control-plane/pkg/reconciler/broker/namespaced_broker_test.go b/control-plane/pkg/reconciler/broker/namespaced_broker_test.go index e7ae4a044c..e13b818b42 100644 --- a/control-plane/pkg/reconciler/broker/namespaced_broker_test.go +++ b/control-plane/pkg/reconciler/broker/namespaced_broker_test.go @@ -40,6 +40,7 @@ import ( "github.com/Shopify/sarama" "github.com/manifestival/client-go-client" + "github.com/stretchr/testify/require" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" @@ -59,6 +60,7 @@ import ( brokerreconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1/broker" reconcilertesting "knative.dev/eventing/pkg/reconciler/testing/v1" + apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/receiver" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base" . "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker" @@ -452,7 +454,8 @@ func useTableNamespaced(t *testing.T, table TableTest, env *config.Env) { expectedTopicDetail = td.(sarama.TopicDetail) } - expectedTopicName := fmt.Sprintf("%s%s-%s", TopicPrefix, BrokerNamespace, BrokerName) + expectedTopicName, err := kafkaFeatureFlags.ExecuteBrokersTopicTemplate(metav1.ObjectMeta{Namespace: BrokerNamespace, Name: BrokerName}) + require.NoError(t, err, "Failed to create broker topic name from feature flags") if t, ok := row.OtherTestData[externalTopic]; ok { expectedTopicName = t.(string) } @@ -506,6 +509,7 @@ func useTableNamespaced(t *testing.T, table TableTest, env *config.Env) { Prober: proberMock, ManifestivalClient: mfcMockClient, DataplaneLifecycleLocksByNamespace: util.NewExpiringLockMap[string](ctx, time.Minute*30), + KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(), } r := brokerreconciler.NewReconciler( diff --git a/control-plane/pkg/reconciler/broker/namespaced_controller.go b/control-plane/pkg/reconciler/broker/namespaced_controller.go index 22b2ec85b5..73d8b267a3 100644 --- a/control-plane/pkg/reconciler/broker/namespaced_controller.go +++ b/control-plane/pkg/reconciler/broker/namespaced_controller.go @@ -56,6 +56,7 @@ import ( serviceaccountinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount" clusterrolebindinginformer "knative.dev/pkg/client/injection/kube/informers/rbac/v1/clusterrolebinding" + apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base" @@ -105,6 +106,7 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, env Counter: counter.NewExpiringCounter(ctx), ManifestivalClient: mfc, DataplaneLifecycleLocksByNamespace: util.NewExpiringLockMap[string](ctx, time.Minute*30), + KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(), } impl := brokerreconciler.NewImpl(ctx, reconciler, kafka.NamespacedBrokerClass, func(impl *controller.Impl) controller.Options { diff --git a/control-plane/pkg/reconciler/testing/objects_broker.go b/control-plane/pkg/reconciler/testing/objects_broker.go index ac5ba145d5..97f72f2a81 100644 --- a/control-plane/pkg/reconciler/testing/objects_broker.go +++ b/control-plane/pkg/reconciler/testing/objects_broker.go @@ -38,6 +38,7 @@ import ( "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka" "knative.dev/eventing-kafka-broker/control-plane/pkg/prober" + apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base" . "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker" "knative.dev/eventing-kafka-broker/control-plane/pkg/security" @@ -57,11 +58,12 @@ const ( ) var ( - BrokerTopics = []string{fmt.Sprintf("%s%s-%s", TopicPrefix, BrokerNamespace, BrokerName)} + kafkaFeatureFlags = apisconfig.DefaultFeaturesConfig() + BrokerTopics = []string{getKafkaTopic()} ) func BrokerTopic() string { - return fmt.Sprintf("%s%s-%s", TopicPrefix, BrokerNamespace, BrokerName) + return getKafkaTopic() } // NewBroker creates a new Broker with broker class equals to kafka.BrokerClass. @@ -310,7 +312,11 @@ func StatusBrokerConfigMapUpdatedReady(env *config.Env) func(broker *eventing.Br } func StatusBrokerTopicReady(broker *eventing.Broker) { - StatusTopicReadyWithName(kafka.BrokerTopic(TopicPrefix, broker))(broker) + topicName, err := kafkaFeatureFlags.ExecuteBrokersTopicTemplate(broker.ObjectMeta) + if err != nil { + panic("Failed to create broker topic name") + } + StatusTopicReadyWithName(topicName)(broker) } func StatusExternalBrokerTopicReady(topic string) func(broker *eventing.Broker) { @@ -489,3 +495,11 @@ func BrokerConfigMapSecretAnnotation(name string) reconcilertesting.BrokerOption broker.Status.Annotations[security.AuthSecretNameKey] = name } } + +func getKafkaTopic() string { + topicName, err := kafkaFeatureFlags.ExecuteBrokersTopicTemplate(metav1.ObjectMeta{Namespace: BrokerNamespace, Name: BrokerName}) + if err != nil { + panic("Failed to create broker topic name") + } + return topicName +} diff --git a/test/rekt/features/broker_config.go b/test/rekt/features/broker_config.go index c799716c54..fbe0d65b28 100644 --- a/test/rekt/features/broker_config.go +++ b/test/rekt/features/broker_config.go @@ -17,15 +17,14 @@ package features import ( - "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka" + "context" testpkg "knative.dev/eventing-kafka-broker/test/pkg" "knative.dev/reconciler-test/pkg/environment" "knative.dev/reconciler-test/pkg/feature" - brokerreconciler "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker" + apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" brokerconfigmap "knative.dev/eventing-kafka-broker/test/rekt/resources/configmap/broker" "knative.dev/eventing-kafka-broker/test/rekt/resources/kafkatopic" - eventing "knative.dev/eventing/pkg/apis/eventing/v1" "knative.dev/eventing/test/rekt/resources/broker" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -53,11 +52,11 @@ func BrokerWithCustomReplicationFactorAndNumPartitions(env environment.Environme )) f.Setup("Broker ready", broker.IsReady(brokerName)) - topic := kafka.BrokerTopic(brokerreconciler.TopicPrefix, &eventing.Broker{ - ObjectMeta: metav1.ObjectMeta{ - Name: brokerName, - Namespace: env.Namespace(), - }, + topic, err := apisconfig.DefaultFeaturesConfig().ExecuteBrokersTopicTemplate(metav1.ObjectMeta{Name: brokerName, Namespace: env.Namespace()}) + f.Assert("No error creating broker topic", func(ctx context.Context, t feature.T) { + if err != nil { + t.Fatal(err) + } }) f.Setup("Topic is ready", kafkatopic.IsReady(topic)) diff --git a/test/rekt/features/kafka_sink.go b/test/rekt/features/kafka_sink.go index 50646ed6c5..45b457ca9f 100644 --- a/test/rekt/features/kafka_sink.go +++ b/test/rekt/features/kafka_sink.go @@ -17,6 +17,7 @@ package features import ( + "context" "fmt" "time" @@ -24,7 +25,7 @@ import ( "github.com/google/uuid" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" "knative.dev/eventing/test/rekt/resources/broker" "knative.dev/eventing/test/rekt/resources/trigger" "knative.dev/reconciler-test/pkg/environment" @@ -34,8 +35,6 @@ import ( cetest "github.com/cloudevents/sdk-go/v2/test" - "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka" - brokerreconciler "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker" testpkg "knative.dev/eventing-kafka-broker/test/pkg" "knative.dev/eventing-kafka-broker/test/rekt/resources/kafkasink" "knative.dev/eventing-kafka-broker/test/rekt/resources/kafkatopic" @@ -63,13 +62,6 @@ func BrokerWithTriggersAndKafkaSink(env environment.Environment) *feature.Featur sink := feature.MakeRandomK8sName("sink") verifyMessagesJobName := feature.MakeRandomK8sName("verify-messages-job") - topic := kafka.BrokerTopic(brokerreconciler.TopicPrefix, &eventingv1.Broker{ - ObjectMeta: metav1.ObjectMeta{ - Name: brokerName, - Namespace: env.Namespace(), - }, - }) - trigger1FilterType := "trigger-1" trigger2FilterType := "trigger-2" @@ -84,6 +76,13 @@ func BrokerWithTriggersAndKafkaSink(env environment.Environment) *feature.Featur eventTrigger2.SetType(trigger2FilterType) f := feature.NewFeatureNamed("Trigger with KafkaSink") + topic, err := apisconfig.DefaultFeaturesConfig().ExecuteBrokersTopicTemplate(metav1.ObjectMeta{Name: brokerName, Namespace: env.Namespace()}) + + f.Assert("No error creating broker topic", func(ctx context.Context, t feature.T) { + if err != nil { + t.Fatal(err) + } + }) f.Setup("Install broker", broker.Install(brokerName, broker.WithEnvConfig()...)) f.Setup("Broker is ready", broker.IsReady(brokerName)) From d1efe2d7d31abb72e34077f66120b04c2ee86a9d Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Wed, 21 Jun 2023 11:14:00 -0400 Subject: [PATCH 02/19] Updated broker reconciliation to onlyuse topic name template if no existing topic name for broker --- control-plane/pkg/reconciler/broker/broker.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/control-plane/pkg/reconciler/broker/broker.go b/control-plane/pkg/reconciler/broker/broker.go index b835b1e4c1..0ae7222467 100644 --- a/control-plane/pkg/reconciler/broker/broker.go +++ b/control-plane/pkg/reconciler/broker/broker.go @@ -310,9 +310,14 @@ func (r *Reconciler) reconcileBrokerTopic(broker *eventing.Broker, securityOptio } } else { // no external topic, we create it - topicName, err = r.KafkaFeatureFlags.ExecuteBrokersTopicTemplate(broker.ObjectMeta) - if err != nil { - return "", statusConditionManager.TopicsNotPresentOrInvalidErr([]string{topicName}, err) + var existingTopic bool + // check if the broker already has reconciled with a topic name and use that if it exists + // otherwise, we create a new topic name from the broker topic template + if topicName, existingTopic = broker.Status.Annotations[kafka.TopicAnnotation]; !existingTopic { + topicName, err = r.KafkaFeatureFlags.ExecuteBrokersTopicTemplate(broker.ObjectMeta) + if err != nil { + return "", statusConditionManager.TopicsNotPresentOrInvalidErr([]string{topicName}, err) + } } topic, err := kafka.CreateTopicIfDoesntExist(kafkaClusterAdminClient, logger, topicName, topicConfig) From 1a72d4c9d40c8657b7cdbddfb6155c9d3a7d67fb Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Wed, 21 Jun 2023 11:31:09 -0400 Subject: [PATCH 03/19] Fixed import styling --- test/rekt/features/broker_config.go | 1 + 1 file changed, 1 insertion(+) diff --git a/test/rekt/features/broker_config.go b/test/rekt/features/broker_config.go index fbe0d65b28..c2bbc06cfc 100644 --- a/test/rekt/features/broker_config.go +++ b/test/rekt/features/broker_config.go @@ -18,6 +18,7 @@ package features import ( "context" + testpkg "knative.dev/eventing-kafka-broker/test/pkg" "knative.dev/reconciler-test/pkg/environment" "knative.dev/reconciler-test/pkg/feature" From 588b85b821b833c7e0ceb23d7c698eb81d2665b4 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Wed, 21 Jun 2023 15:56:07 -0400 Subject: [PATCH 04/19] Updated channel to use topic template --- .../pkg/reconciler/channel/channel.go | 22 ++++++++++++++----- .../pkg/reconciler/channel/channel_test.go | 6 +++-- .../pkg/reconciler/channel/controller.go | 2 ++ .../pkg/reconciler/channel/v2/channelv2.go | 15 +++++++++---- .../reconciler/channel/v2/channelv2_test.go | 2 ++ .../pkg/reconciler/testing/objects_channel.go | 9 +++++--- 6 files changed, 42 insertions(+), 14 deletions(-) diff --git a/control-plane/pkg/reconciler/channel/channel.go b/control-plane/pkg/reconciler/channel/channel.go index f06f5fd5e4..48f9430bcd 100644 --- a/control-plane/pkg/reconciler/channel/channel.go +++ b/control-plane/pkg/reconciler/channel/channel.go @@ -48,6 +48,7 @@ import ( messagingv1beta1 "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/messaging/v1beta1" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/channel/resources" + apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/contract" coreconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/core/config" @@ -61,7 +62,6 @@ import ( const ( // TopicPrefix is the Kafka Channel topic prefix - (topic name: knative-messaging-kafka..). - TopicPrefix = "knative-messaging-kafka" DefaultDeliveryOrder = contract.DeliveryOrder_ORDERED NewChannelIngressServiceName = "kafka-channel-ingress" ) @@ -92,6 +92,8 @@ type Reconciler struct { Prober prober.Prober IngressHost string + + KafkaFeatureFlags *apisconfig.KafkaFeatureFlags } func (r *Reconciler) ReconcileKind(ctx context.Context, channel *messagingv1beta1.KafkaChannel) reconciler.Event { @@ -161,7 +163,10 @@ func (r *Reconciler) reconcileKind(ctx context.Context, channel *messagingv1beta return fmt.Errorf("failed to track secret: %w", err) } - topicName := kafka.ChannelTopic(TopicPrefix, channel) + topicName, err := r.KafkaFeatureFlags.ExecuteChannelsTopicTemplate(channel.ObjectMeta) + if err != nil { + return err + } kafkaClusterAdminSaramaConfig, err := kafka.GetSaramaConfig(saramaSecurityOption) if err != nil { @@ -432,7 +437,11 @@ func (r *Reconciler) finalizeKind(ctx context.Context, channel *messagingv1beta1 } defer kafkaClusterAdminClient.Close() - topic, err := kafka.DeleteTopic(kafkaClusterAdminClient, kafka.ChannelTopic(TopicPrefix, channel)) + topicName, err := r.KafkaFeatureFlags.ExecuteChannelsTopicTemplate(channel.ObjectMeta) + if err != nil { + return err + } + topic, err := kafka.DeleteTopic(kafkaClusterAdminClient, topicName) if err != nil { return err } @@ -503,9 +512,12 @@ func (r *Reconciler) reconcileInitialOffset(ctx context.Context, channel *messag return nil } - topicName := kafka.ChannelTopic(TopicPrefix, channel) + topicName, err := r.KafkaFeatureFlags.ExecuteChannelsTopicTemplate(channel.ObjectMeta) + if err != nil { + return err + } groupID := consumerGroup(channel, sub) - _, err := r.InitOffsetsFunc(ctx, kafkaClient, kafkaClusterAdmin, []string{topicName}, groupID) + _, err = r.InitOffsetsFunc(ctx, kafkaClient, kafkaClusterAdmin, []string{topicName}, groupID) return err } diff --git a/control-plane/pkg/reconciler/channel/channel_test.go b/control-plane/pkg/reconciler/channel/channel_test.go index 079413cd17..16cd7d649e 100644 --- a/control-plane/pkg/reconciler/channel/channel_test.go +++ b/control-plane/pkg/reconciler/channel/channel_test.go @@ -19,6 +19,7 @@ package channel_test import ( "context" "fmt" + apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" "strconv" "testing" @@ -1806,8 +1807,9 @@ func useTable(t *testing.T, table TableTest, env config.Env) { T: t, }, nil }, - Prober: proberMock, - IngressHost: network.GetServiceHostname(env.IngressName, env.SystemNamespace), + Prober: proberMock, + IngressHost: network.GetServiceHostname(env.IngressName, env.SystemNamespace), + KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(), } reconciler.Tracker = &FakeTracker{} diff --git a/control-plane/pkg/reconciler/channel/controller.go b/control-plane/pkg/reconciler/channel/controller.go index 9da869dd40..71f1184a90 100644 --- a/control-plane/pkg/reconciler/channel/controller.go +++ b/control-plane/pkg/reconciler/channel/controller.go @@ -43,6 +43,7 @@ import ( "knative.dev/pkg/network" "knative.dev/pkg/resolver" + apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/prober" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base" @@ -74,6 +75,7 @@ func NewController(ctx context.Context, configs *config.Env) *controller.Impl { Env: configs, ConfigMapLister: configmapInformer.Lister(), ServiceLister: serviceInformer.Lister(), + KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(), } logger := logging.FromContext(ctx) diff --git a/control-plane/pkg/reconciler/channel/v2/channelv2.go b/control-plane/pkg/reconciler/channel/v2/channelv2.go index 35b56451af..9b632edb7d 100644 --- a/control-plane/pkg/reconciler/channel/v2/channelv2.go +++ b/control-plane/pkg/reconciler/channel/v2/channelv2.go @@ -59,6 +59,7 @@ import ( channelreconciler "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/channel" "knative.dev/eventing-kafka-broker/control-plane/pkg/security" + apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" internalscg "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1" internalsclient "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/clientset/versioned" internalslst "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/listers/eventing/v1alpha1" @@ -69,8 +70,6 @@ import ( ) const ( - // TopicPrefix is the Kafka Channel topic prefix - (topic name: knative-messaging-kafka..). - TopicPrefix = "knative-messaging-kafka" DefaultDeliveryOrder = kafkasource.Ordered KafkaChannelConditionSubscribersReady apis.ConditionType = "Subscribers" // condition is registered by controller @@ -106,6 +105,7 @@ type Reconciler struct { ConsumerGroupLister internalslst.ConsumerGroupLister InternalsClient internalsclient.Interface + KafkaFeatureFlags *apisconfig.KafkaFeatureFlags } func (r *Reconciler) ReconcileKind(ctx context.Context, channel *messagingv1beta1.KafkaChannel) reconciler.Event { @@ -159,7 +159,10 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, channel *messagingv1beta // get security option for Sarama with secret info in it saramaSecurityOption := security.NewSaramaSecurityOptionFromSecret(authContext.VirtualSecret) - topicName := kafka.ChannelTopic(TopicPrefix, channel) + topicName, err := r.KafkaFeatureFlags.ExecuteChannelsTopicTemplate(channel.ObjectMeta) + if err != nil { + return err + } kafkaClusterAdminSaramaConfig, err := kafka.GetSaramaConfig(saramaSecurityOption) if err != nil { @@ -426,7 +429,11 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, channel *messagingv1beta1 } defer kafkaClusterAdminClient.Close() - topic, err := kafka.DeleteTopic(kafkaClusterAdminClient, kafka.ChannelTopic(TopicPrefix, channel)) + topicName, err := r.KafkaFeatureFlags.ExecuteChannelsTopicTemplate(channel.ObjectMeta) + if err != nil { + return err + } + topic, err := kafka.DeleteTopic(kafkaClusterAdminClient, topicName) if err != nil { return err } diff --git a/control-plane/pkg/reconciler/channel/v2/channelv2_test.go b/control-plane/pkg/reconciler/channel/v2/channelv2_test.go index eb5887b850..991c919599 100644 --- a/control-plane/pkg/reconciler/channel/v2/channelv2_test.go +++ b/control-plane/pkg/reconciler/channel/v2/channelv2_test.go @@ -37,6 +37,7 @@ import ( "knative.dev/pkg/network" . "knative.dev/pkg/reconciler/testing" + apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" kafkasource "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/sources/v1beta1" "knative.dev/eventing-kafka-broker/control-plane/pkg/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/contract" @@ -1653,6 +1654,7 @@ func TestReconcileKind(t *testing.T) { InternalsClient: fakeconsumergroupinformer.Get(ctx), Prober: proberMock, IngressHost: network.GetServiceHostname(env.IngressName, env.SystemNamespace), + KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(), } reconciler.Tracker = &FakeTracker{} reconciler.Tracker = &FakeTracker{} diff --git a/control-plane/pkg/reconciler/testing/objects_channel.go b/control-plane/pkg/reconciler/testing/objects_channel.go index 2a7b6127bc..c20d223248 100644 --- a/control-plane/pkg/reconciler/testing/objects_channel.go +++ b/control-plane/pkg/reconciler/testing/objects_channel.go @@ -32,14 +32,13 @@ import ( duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/network" + apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" messagingv1beta1 "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/messaging/v1beta1" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/channel/resources" "knative.dev/eventing-kafka-broker/control-plane/pkg/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/contract" - "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base" - . "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/channel" subscriptionv1 "knative.dev/eventing/pkg/reconciler/testing/v1" ) @@ -62,7 +61,11 @@ const ( func ChannelTopic() string { c := NewChannel() - return kafka.ChannelTopic(TopicPrefix, c) + topicName, err := apisconfig.DefaultFeaturesConfig().ExecuteChannelsTopicTemplate(c.ObjectMeta) + if err != nil { + panic("Failed to create channel topic name") + } + return topicName } func NewChannel(options ...KRShapedOption) *messagingv1beta1.KafkaChannel { From eaabca289747159c07e3445d75a63b751476193d Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Thu, 22 Jun 2023 14:34:50 -0400 Subject: [PATCH 05/19] Updated channels to use topic annotations --- control-plane/pkg/apis/config/features.go | 9 +++++---- .../pkg/apis/config/features_test.go | 4 ++-- .../pkg/reconciler/channel/channel.go | 16 +++++++++++++--- .../pkg/reconciler/channel/channel_test.go | 17 +++++++++++++++++ .../pkg/reconciler/channel/v2/channelv2.go | 16 +++++++++++++--- .../reconciler/channel/v2/channelv2_test.go | 19 +++++++++++++++++++ .../pkg/reconciler/testing/objects_channel.go | 11 +++++++++++ 7 files changed, 80 insertions(+), 12 deletions(-) diff --git a/control-plane/pkg/apis/config/features.go b/control-plane/pkg/apis/config/features.go index 34032d6fc3..369644b3bb 100644 --- a/control-plane/pkg/apis/config/features.go +++ b/control-plane/pkg/apis/config/features.go @@ -58,7 +58,8 @@ var ( func init() { DefaultTriggersConsumerGroupTemplate, _ = template.New("triggers.consumergroup.template").Parse("knative-trigger-{{ .Namespace }}-{{ .Name }}") DefaultBrokersTopicTemplate, _ = template.New("brokers.topic.template").Parse("knative-broker-{{ .Namespace }}-{{ .Name }}") - DefaultChannelsTopicTemplate, _ = template.New("channels.topic.template").Parse("knative-channel-{{ .Namespace }}-{{ .Name }}") + // This will resolve to the old naming convention, to prevent errors switching over to the new topic templates approach + DefaultChannelsTopicTemplate, _ = template.New("channels.topic.template").Parse("knative-messaging-kafka.{{ .Namespace }}.{{ .Name }}") } func DefaultFeaturesConfig() *KafkaFeatureFlags { @@ -74,8 +75,8 @@ func DefaultFeaturesConfig() *KafkaFeatureFlags { } } -// newFeaturesConfigFromMap creates a Features from the supplied Map -func newFeaturesConfigFromMap(cm *corev1.ConfigMap) (*KafkaFeatureFlags, error) { +// NewFeaturesConfigFromMap creates a Features from the supplied Map +func NewFeaturesConfigFromMap(cm *corev1.ConfigMap) (*KafkaFeatureFlags, error) { nc := DefaultFeaturesConfig() err := configmap.Parse(cm.Data, asFlag("dispatcher.rate-limiter", &nc.features.DispatcherRateLimiter), @@ -131,7 +132,7 @@ func NewStore(ctx context.Context, onAfterStore ...func(name string, value *Kafk "config-kafka-features", logging.FromContext(ctx).Named("config-kafka-features"), configmap.Constructors{ - FlagsConfigName: newFeaturesConfigFromMap, + FlagsConfigName: NewFeaturesConfigFromMap, }, func(name string, value interface{}) { for _, f := range onAfterStore { diff --git a/control-plane/pkg/apis/config/features_test.go b/control-plane/pkg/apis/config/features_test.go index dc08600134..e6ec711d77 100644 --- a/control-plane/pkg/apis/config/features_test.go +++ b/control-plane/pkg/apis/config/features_test.go @@ -51,7 +51,7 @@ func TestFlags_IsEnabled_ContainingFlag(t *testing.T) { func TestGetFlags(t *testing.T) { _, example := cm.ConfigMapsFromTestFile(t, FlagsConfigName) - flags, err := newFeaturesConfigFromMap(example) + flags, err := NewFeaturesConfigFromMap(example) require.NoError(t, err) require.True(t, flags.IsDispatcherRateLimiterEnabled()) @@ -74,7 +74,7 @@ func TestStoreLoadWithConfigMap(t *testing.T) { store.OnConfigChanged(exampleConfig) have := FromContext(store.ToContext(context.Background())) - expected, _ := newFeaturesConfigFromMap(exampleConfig) + expected, _ := NewFeaturesConfigFromMap(exampleConfig) require.Equal(t, expected.IsDispatcherRateLimiterEnabled(), have.IsDispatcherRateLimiterEnabled()) require.Equal(t, expected.IsDispatcherOrderedExecutorMetricsEnabled(), have.IsDispatcherOrderedExecutorMetricsEnabled()) diff --git a/control-plane/pkg/reconciler/channel/channel.go b/control-plane/pkg/reconciler/channel/channel.go index 48f9430bcd..eb7ea270a0 100644 --- a/control-plane/pkg/reconciler/channel/channel.go +++ b/control-plane/pkg/reconciler/channel/channel.go @@ -163,10 +163,20 @@ func (r *Reconciler) reconcileKind(ctx context.Context, channel *messagingv1beta return fmt.Errorf("failed to track secret: %w", err) } - topicName, err := r.KafkaFeatureFlags.ExecuteChannelsTopicTemplate(channel.ObjectMeta) - if err != nil { - return err + if channel.Status.Annotations == nil { + channel.Status.Annotations = make(map[string]string) + } + // Check if there is an existing topic name for this channel. If there is, reconcile the channel with the existing name. + // If not, create a new topic name from the channel topic name template. + var topicName string + var existingTopic bool + if topicName, existingTopic = channel.Status.Annotations[kafka.TopicAnnotation]; !existingTopic { + topicName, err = r.KafkaFeatureFlags.ExecuteChannelsTopicTemplate(channel.ObjectMeta) + if err != nil { + return err + } } + channel.Status.Annotations[kafka.TopicAnnotation] = topicName kafkaClusterAdminSaramaConfig, err := kafka.GetSaramaConfig(saramaSecurityOption) if err != nil { diff --git a/control-plane/pkg/reconciler/channel/channel_test.go b/control-plane/pkg/reconciler/channel/channel_test.go index 16cd7d649e..68bec48948 100644 --- a/control-plane/pkg/reconciler/channel/channel_test.go +++ b/control-plane/pkg/reconciler/channel/channel_test.go @@ -194,6 +194,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), StatusDataPlaneAvailable, ChannelAddressable(&env), @@ -274,6 +275,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), StatusDataPlaneAvailable, ChannelAddressable(&env), @@ -342,6 +344,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), StatusDataPlaneAvailable, StatusProbeFailed(prober.StatusNotReady), @@ -411,6 +414,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), StatusDataPlaneAvailable, StatusProbeFailed(prober.StatusUnknown), @@ -482,6 +486,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), StatusDataPlaneAvailable, WithSubscribers(Subscriber1(WithFreshSubscriber, WithNoSubscriberURI)), @@ -558,6 +563,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), StatusDataPlaneAvailable, ChannelAddressable(&env), @@ -639,6 +645,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), StatusDataPlaneAvailable, ChannelAddressable(&env), @@ -720,6 +727,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), StatusDataPlaneAvailable, ChannelAddressable(&env), @@ -801,6 +809,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), StatusDataPlaneAvailable, ChannelAddressable(&env), @@ -899,6 +908,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), StatusDataPlaneAvailable, ChannelAddressable(&env), @@ -1171,6 +1181,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), StatusDataPlaneAvailable, ChannelAddressable(&env), @@ -1273,6 +1284,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), StatusDataPlaneAvailable, ChannelAddressable(&env), @@ -1378,6 +1390,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), StatusDataPlaneAvailable, ChannelAddressable(&env), @@ -1480,6 +1493,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), StatusDataPlaneAvailable, ChannelAddressable(&env), @@ -1548,6 +1562,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), StatusDataPlaneAvailable, ChannelAddressable(&env), @@ -1613,6 +1628,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), StatusDataPlaneAvailable, ChannelAddressable(&env), @@ -1654,6 +1670,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusDataPlaneAvailable, StatusConfigParsed, + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), StatusConfigMapNotUpdatedReady( "Failed to get contract data from ConfigMap: knative-eventing/kafka-channel-channels-subscriptions", diff --git a/control-plane/pkg/reconciler/channel/v2/channelv2.go b/control-plane/pkg/reconciler/channel/v2/channelv2.go index 9b632edb7d..dea10004b0 100644 --- a/control-plane/pkg/reconciler/channel/v2/channelv2.go +++ b/control-plane/pkg/reconciler/channel/v2/channelv2.go @@ -159,10 +159,20 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, channel *messagingv1beta // get security option for Sarama with secret info in it saramaSecurityOption := security.NewSaramaSecurityOptionFromSecret(authContext.VirtualSecret) - topicName, err := r.KafkaFeatureFlags.ExecuteChannelsTopicTemplate(channel.ObjectMeta) - if err != nil { - return err + if channel.Status.Annotations == nil { + channel.Status.Annotations = make(map[string]string) + } + // Check if there is an existing topic name for this channel. If there is, reconcile the channel with the existing name. + // If not, create a new topic name from the channel topic name template. + var topicName string + var existingTopic bool + if topicName, existingTopic = channel.Status.Annotations[kafka.TopicAnnotation]; !existingTopic { + topicName, err = r.KafkaFeatureFlags.ExecuteChannelsTopicTemplate(channel.ObjectMeta) + if err != nil { + return err + } } + channel.Status.Annotations[kafka.TopicAnnotation] = topicName kafkaClusterAdminSaramaConfig, err := kafka.GetSaramaConfig(saramaSecurityOption) if err != nil { diff --git a/control-plane/pkg/reconciler/channel/v2/channelv2_test.go b/control-plane/pkg/reconciler/channel/v2/channelv2_test.go index 991c919599..a6883638c5 100644 --- a/control-plane/pkg/reconciler/channel/v2/channelv2_test.go +++ b/control-plane/pkg/reconciler/channel/v2/channelv2_test.go @@ -218,6 +218,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), ChannelAddressable(&env), StatusProbeSucceeded, @@ -289,6 +290,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), ChannelAddressable(&env), StatusProbeSucceeded, @@ -340,6 +342,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), StatusProbeFailed(prober.StatusNotReady), StatusChannelSubscribers(), @@ -392,6 +395,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), StatusProbeFailed(prober.StatusUnknown), StatusChannelSubscribers(), @@ -460,6 +464,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), ChannelAddressable(&env), StatusProbeSucceeded, @@ -527,6 +532,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), ChannelAddressable(&env), StatusProbeSucceeded, @@ -594,6 +600,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), ChannelAddressable(&env), StatusProbeSucceeded, @@ -670,6 +677,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), ChannelAddressable(&env), WithSubscribers(Subscriber1(WithFreshSubscriber)), @@ -722,6 +730,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), ChannelAddressable(&env), StatusProbeSucceeded, @@ -824,6 +833,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), ChannelAddressable(&env), StatusProbeSucceeded, @@ -909,6 +919,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), ChannelAddressable(&env), StatusProbeSucceeded, @@ -1074,6 +1085,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), ChannelAddressable(&env), StatusProbeSucceeded, @@ -1171,6 +1183,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), ChannelAddressable(&env), WithSubscribers(Subscriber1(WithFreshSubscriber)), @@ -1271,6 +1284,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), ChannelAddressable(&env), WithSubscribers(Subscriber1(WithFreshSubscriber)), @@ -1367,6 +1381,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), ChannelAddressable(&env), WithSubscribers(Subscriber1(WithFreshSubscriber)), @@ -1441,6 +1456,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), ChannelAddressable(&env), StatusProbeSucceeded, @@ -1492,6 +1508,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), ChannelAddressable(&env), StatusProbeSucceeded, @@ -1541,6 +1558,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), ChannelAddressable(&env), StatusProbeSucceeded, @@ -1572,6 +1590,7 @@ func TestReconcileKind(t *testing.T) { Object: NewChannel( WithInitKafkaChannelConditions, StatusConfigParsed, + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), StatusConfigMapNotUpdatedReady( "Failed to get contract data from ConfigMap: knative-eventing/kafka-channel-channels-subscriptions", diff --git a/control-plane/pkg/reconciler/testing/objects_channel.go b/control-plane/pkg/reconciler/testing/objects_channel.go index c20d223248..6967a86df9 100644 --- a/control-plane/pkg/reconciler/testing/objects_channel.go +++ b/control-plane/pkg/reconciler/testing/objects_channel.go @@ -34,6 +34,7 @@ import ( apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" messagingv1beta1 "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/messaging/v1beta1" + "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/channel/resources" "knative.dev/eventing-kafka-broker/control-plane/pkg/config" @@ -223,6 +224,16 @@ func ChannelAddressable(env *config.Env) func(obj duckv1.KRShaped) { } } +func WithChannelTopicStatusAnnotation(topicName string) func(obj duckv1.KRShaped) { + return func(obj duckv1.KRShaped) { + channel := obj.(*messagingv1beta1.KafkaChannel) + if channel.Status.Annotations == nil { + channel.Status.Annotations = make(map[string]string, 1) + } + channel.Status.Annotations[kafka.TopicAnnotation] = topicName + } +} + func ChannelReference() *contract.Reference { return &contract.Reference{ Uuid: ChannelUUID, From 5efbe15b9302cb8425ea1e650ddb6838363088ab Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Thu, 22 Jun 2023 14:43:52 -0400 Subject: [PATCH 06/19] Fixed import formatting --- control-plane/pkg/reconciler/channel/channel_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/control-plane/pkg/reconciler/channel/channel_test.go b/control-plane/pkg/reconciler/channel/channel_test.go index 68bec48948..2da3509325 100644 --- a/control-plane/pkg/reconciler/channel/channel_test.go +++ b/control-plane/pkg/reconciler/channel/channel_test.go @@ -19,10 +19,11 @@ package channel_test import ( "context" "fmt" - apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" "strconv" "testing" + apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" + "github.com/Shopify/sarama" "k8s.io/utils/pointer" eventingduck "knative.dev/eventing/pkg/apis/duck/v1" From 2419d38705abdee067760729d31911d36beb9741 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Thu, 22 Jun 2023 15:56:05 -0400 Subject: [PATCH 07/19] Added tests for reconciling with custom topic templates --- .../pkg/reconciler/broker/broker_test.go | 102 +++++++++++++++++- .../testdata/config-kafka-features.yaml | 13 +++ .../pkg/reconciler/channel/channel_test.go | 97 ++++++++++++++++- .../testdata/config-kafka-features.yaml | 13 +++ .../pkg/reconciler/testing/objects_broker.go | 11 ++ .../pkg/reconciler/testing/objects_channel.go | 12 +++ 6 files changed, 245 insertions(+), 3 deletions(-) create mode 100644 control-plane/pkg/reconciler/broker/testdata/config-kafka-features.yaml create mode 100644 control-plane/pkg/reconciler/channel/testdata/config-kafka-features.yaml diff --git a/control-plane/pkg/reconciler/broker/broker_test.go b/control-plane/pkg/reconciler/broker/broker_test.go index 3ac3d2e045..f88442053c 100644 --- a/control-plane/pkg/reconciler/broker/broker_test.go +++ b/control-plane/pkg/reconciler/broker/broker_test.go @@ -19,8 +19,10 @@ package broker_test // different package name due to import cycles. (broker -> t import ( "context" "fmt" + cm "knative.dev/pkg/configmap/testing" "net/url" "testing" + "text/template" "knative.dev/eventing-kafka-broker/control-plane/pkg/counter" @@ -74,6 +76,10 @@ const ( ExpectedTopicDetail = "expectedTopicDetail" testProber = "testProber" externalTopic = "externalTopic" + + useCustomTopicTemplate = "use-custom-topic-template" + + FlagsConfigName = "config-kafka-features" ) const ( @@ -103,6 +109,8 @@ var ( linear = eventingduck.BackoffPolicyLinear exponential = eventingduck.BackoffPolicyExponential kafkaFeatureFlags = apisconfig.DefaultFeaturesConfig() + + customBrokerTopicTemplate = customTemplate() ) var DefaultEnv = &config.Env{ @@ -2138,6 +2146,81 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { ), }, }, + }, { + Name: "Reconciled normal - with custom topic template", + Objects: []runtime.Object{ + NewBroker(), + BrokerConfig(bootstrapServers, 20, 5), + NewConfigMapWithBinaryData(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, nil), + NewService(), + BrokerReceiverPod(env.SystemNamespace, map[string]string{ + base.VolumeGenerationAnnotationKey: "0", + "annotation_to_preserve": "value_to_preserve", + }), + BrokerDispatcherPod(env.SystemNamespace, map[string]string{ + base.VolumeGenerationAnnotationKey: "0", + "annotation_to_preserve": "value_to_preserve", + }), + }, + Key: testKey, + WantEvents: []string{ + finalizerUpdatedEvent, + }, + WantUpdates: []clientgotesting.UpdateActionImpl{ + ConfigMapUpdate(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, env.ContractConfigMapFormat, &contract.Contract{ + Resources: []*contract.Resource{ + { + Uid: BrokerUUID, + Topics: []string{CustomBrokerTopic(customBrokerTopicTemplate)}, + Ingress: &contract.Ingress{Path: receiver.Path(BrokerNamespace, BrokerName)}, + BootstrapServers: bootstrapServers, + Reference: BrokerReference(), + }, + }, + Generation: 1, + }), + BrokerReceiverPodUpdate(env.SystemNamespace, map[string]string{ + base.VolumeGenerationAnnotationKey: "1", + "annotation_to_preserve": "value_to_preserve", + }), + BrokerDispatcherPodUpdate(env.SystemNamespace, map[string]string{ + base.VolumeGenerationAnnotationKey: "1", + "annotation_to_preserve": "value_to_preserve", + }), + }, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{ + { + Object: NewBroker( + reconcilertesting.WithInitBrokerConditions, + StatusBrokerConfigMapUpdatedReady(&env), + StatusBrokerDataPlaneAvailable, + StatusBrokerConfigParsed, + StatusExternalBrokerTopicReady(CustomBrokerTopic(customBrokerTopicTemplate)), + BrokerAddressable(&env), + StatusBrokerProbeSucceeded, + BrokerConfigMapAnnotations(), + WithTopicStatusAnnotation(CustomBrokerTopic(customBrokerTopicTemplate)), + WithBrokerAddresses([]duckv1.Addressable{ + { + Name: pointer.String("http"), + URL: brokerAddress, + }, + }), + WithBrokerAddress(duckv1.Addressable{ + Name: pointer.String("http"), + URL: brokerAddress, + }), + WithBrokerAddessable(), + ), + }, + }, + + OtherTestData: map[string]interface{}{ + useCustomTopicTemplate: true, + }, }, } @@ -2777,6 +2860,16 @@ func useTable(t *testing.T, table TableTest, env *config.Env) { ReplicationFactor: DefaultReplicationFactor, } + var featureFlags *apisconfig.KafkaFeatureFlags + var err error + if useCustomTemplate, ok := row.OtherTestData[useCustomTopicTemplate]; ok == true && useCustomTemplate == true { + _, example := cm.ConfigMapsFromTestFile(t, FlagsConfigName) + featureFlags, err = apisconfig.NewFeaturesConfigFromMap(example) + require.NoError(t, err) + } else { + featureFlags = apisconfig.DefaultFeaturesConfig() + } + var onCreateTopicError error if want, ok := row.OtherTestData[wantErrorOnCreateTopic]; ok { onCreateTopicError = want.(error) @@ -2792,7 +2885,7 @@ func useTable(t *testing.T, table TableTest, env *config.Env) { expectedTopicDetail = td.(sarama.TopicDetail) } - expectedTopicName, err := kafkaFeatureFlags.ExecuteBrokersTopicTemplate(metav1.ObjectMeta{Namespace: BrokerNamespace, Name: BrokerName}) + expectedTopicName, err := featureFlags.ExecuteBrokersTopicTemplate(metav1.ObjectMeta{Namespace: BrokerNamespace, Name: BrokerName, UID: BrokerUUID}) require.NoError(t, err, "Failed to create broker topic name from feature flags") if t, ok := row.OtherTestData[externalTopic]; ok { expectedTopicName = t.(string) @@ -2837,7 +2930,7 @@ func useTable(t *testing.T, table TableTest, env *config.Env) { Env: env, Prober: proberMock, Counter: counter.NewExpiringCounter(ctx), - KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(), + KafkaFeatureFlags: featureFlags, } reconciler.Tracker = &FakeTracker{} @@ -2888,3 +2981,8 @@ func httpsURL(name string, namespace string) *apis.URL { Path: fmt.Sprintf("/%s/%s", namespace, name), } } + +func customTemplate() *template.Template { + brokersTemplate, _ := template.New("brokers.topic.template").Parse("custom-broker-template.{{ .Namespace }}-{{ .Name }}") + return brokersTemplate +} diff --git a/control-plane/pkg/reconciler/broker/testdata/config-kafka-features.yaml b/control-plane/pkg/reconciler/broker/testdata/config-kafka-features.yaml new file mode 100644 index 0000000000..ca4f7a4367 --- /dev/null +++ b/control-plane/pkg/reconciler/broker/testdata/config-kafka-features.yaml @@ -0,0 +1,13 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: config-kafka-features + namespace: knative-eventing +data: + _example: | + dispatcher.rate-limiter: "enabled" + dispatcher.ordered-executor-metrics: "enabled" + controller.autoscaler: "enabled" + triggers.consumergroup.template: "knative-trigger-{{ .Namespace }}-{{ .Name }}" + brokers.topic.template: "custom-broker-template.{{ .Namespace }}-{{ .Name }}" + channels.topic.template: "knative-channel-{{ .Namespace }}-{{ .Name }}" diff --git a/control-plane/pkg/reconciler/channel/channel_test.go b/control-plane/pkg/reconciler/channel/channel_test.go index 2da3509325..f65dbec592 100644 --- a/control-plane/pkg/reconciler/channel/channel_test.go +++ b/control-plane/pkg/reconciler/channel/channel_test.go @@ -19,8 +19,10 @@ package channel_test import ( "context" "fmt" + "github.com/stretchr/testify/require" "strconv" "testing" + "text/template" apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" @@ -39,6 +41,7 @@ import ( "k8s.io/apimachinery/pkg/types" clientgotesting "k8s.io/client-go/testing" kubeclient "knative.dev/pkg/client/injection/kube/client/fake" + cm "knative.dev/pkg/configmap/testing" "knative.dev/pkg/controller" "knative.dev/pkg/logging" . "knative.dev/pkg/reconciler/testing" @@ -68,6 +71,10 @@ const ( TestExpectedDataNumPartitions = "TestExpectedDataNumPartitions" TestExpectedReplicationFactor = "TestExpectedReplicationFactor" TestExpectedRetentionDuration = "TestExpectedRetentionDuration" + + useCustomTopicTemplate = "use-custom-topic-template" + + FlagsConfigName = "config-kafka-features" ) var finalizerUpdatedEvent = Eventf( @@ -85,6 +92,8 @@ var DefaultEnv = &config.Env{ ContractConfigMapFormat: base.Json, } +var customChannelTopicTemplate = customTemplate() + func TestReconcileKind(t *testing.T) { t.Setenv("SYSTEM_NAMESPACE", "knative-eventing") @@ -1692,6 +1701,77 @@ func TestReconcileKind(t *testing.T) { ), }, }, + { + Name: "Reconciled normal - custom topic template", + Objects: []runtime.Object{ + NewChannel(), + NewService(), + ChannelReceiverPod(env.SystemNamespace, map[string]string{ + base.VolumeGenerationAnnotationKey: "0", + "annotation_to_preserve": "value_to_preserve", + }), + ChannelDispatcherPod(env.SystemNamespace, map[string]string{ + base.VolumeGenerationAnnotationKey: "0", + "annotation_to_preserve": "value_to_preserve", + }), + NewConfigMapWithTextData(system.Namespace(), DefaultEnv.GeneralConfigMapName, map[string]string{ + kafka.BootstrapServersConfigMapKey: ChannelBootstrapServers, + }), + }, + Key: testKey, + WantUpdates: []clientgotesting.UpdateActionImpl{ + ConfigMapUpdate(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, env.ContractConfigMapFormat, &contract.Contract{ + Generation: 1, + Resources: []*contract.Resource{ + { + Uid: ChannelUUID, + Topics: []string{CustomTopic(customChannelTopicTemplate)}, + BootstrapServers: ChannelBootstrapServers, + Reference: ChannelReference(), + Ingress: &contract.Ingress{ + Host: receiver.Host(ChannelNamespace, ChannelName), + }, + }, + }, + }), + ChannelReceiverPodUpdate(env.SystemNamespace, map[string]string{ + "annotation_to_preserve": "value_to_preserve", + base.VolumeGenerationAnnotationKey: "1", + }), + ChannelDispatcherPodUpdate(env.SystemNamespace, map[string]string{ + "annotation_to_preserve": "value_to_preserve", + base.VolumeGenerationAnnotationKey: "1", + }), + }, + SkipNamespaceValidation: true, // WantCreates compare the channel namespace with configmap namespace, so skip it + WantCreates: []runtime.Object{ + NewConfigMapWithBinaryData(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, nil), + NewPerChannelService(DefaultEnv), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{ + { + Object: NewChannel( + WithInitKafkaChannelConditions, + StatusConfigParsed, + StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(CustomTopic(customChannelTopicTemplate)), + StatusTopicReadyWithName(CustomTopic(customChannelTopicTemplate)), + StatusDataPlaneAvailable, + ChannelAddressable(&env), + StatusProbeSucceeded, + ), + }, + }, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(), + }, + WantEvents: []string{ + finalizerUpdatedEvent, + }, + OtherTestData: map[string]interface{}{ + useCustomTopicTemplate: true, + }, + }, } useTable(t, table, env) @@ -1769,6 +1849,16 @@ func useTable(t *testing.T, table TableTest, env config.Env) { proberMock = p.(prober.Prober) } + var kafkaFeatureFlags *apisconfig.KafkaFeatureFlags + var err error + if useCustomTemplate, ok := row.OtherTestData[useCustomTopicTemplate]; ok == true && useCustomTemplate == true { + _, example := cm.ConfigMapsFromTestFile(t, FlagsConfigName) + kafkaFeatureFlags, err = apisconfig.NewFeaturesConfigFromMap(example) + require.NoError(t, err) + } else { + kafkaFeatureFlags = apisconfig.DefaultFeaturesConfig() + } + numPartitions := int32(1) if v, ok := row.OtherTestData[TestExpectedDataNumPartitions]; ok { numPartitions = v.(int32) @@ -1827,7 +1917,7 @@ func useTable(t *testing.T, table TableTest, env config.Env) { }, Prober: proberMock, IngressHost: network.GetServiceHostname(env.IngressName, env.SystemNamespace), - KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(), + KafkaFeatureFlags: kafkaFeatureFlags, } reconciler.Tracker = &FakeTracker{} @@ -1855,3 +1945,8 @@ func patchFinalizers() clientgotesting.PatchActionImpl { action.Patch = []byte(patch) return action } + +func customTemplate() *template.Template { + channelsTemplate, _ := template.New("channels.topic.template").Parse("custom-channel-template.{{ .Namespace }}.{{ .Name }}") + return channelsTemplate +} diff --git a/control-plane/pkg/reconciler/channel/testdata/config-kafka-features.yaml b/control-plane/pkg/reconciler/channel/testdata/config-kafka-features.yaml new file mode 100644 index 0000000000..fdc7298324 --- /dev/null +++ b/control-plane/pkg/reconciler/channel/testdata/config-kafka-features.yaml @@ -0,0 +1,13 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: config-kafka-features + namespace: knative-eventing +data: + _example: | + dispatcher.rate-limiter: "enabled" + dispatcher.ordered-executor-metrics: "enabled" + controller.autoscaler: "enabled" + triggers.consumergroup.template: "knative-trigger-{{ .Namespace }}-{{ .Name }}" + brokers.topic.template: "knative-broker-{{ .Namespace }}-{{ .Name }}" + channels.topic.template: "custom-channel-template.{{ .Namespace }}.{{ .Name }}" diff --git a/control-plane/pkg/reconciler/testing/objects_broker.go b/control-plane/pkg/reconciler/testing/objects_broker.go index 97f72f2a81..f3f37a992a 100644 --- a/control-plane/pkg/reconciler/testing/objects_broker.go +++ b/control-plane/pkg/reconciler/testing/objects_broker.go @@ -17,8 +17,10 @@ package testing import ( + "bytes" "fmt" "strings" + "text/template" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -66,6 +68,15 @@ func BrokerTopic() string { return getKafkaTopic() } +func CustomBrokerTopic(template *template.Template) string { + var result bytes.Buffer + err := template.Execute(&result, metav1.ObjectMeta{Namespace: BrokerNamespace, Name: BrokerName, UID: BrokerUUID}) + if err != nil { + panic("Failed to create custom topic name") + } + return result.String() +} + // NewBroker creates a new Broker with broker class equals to kafka.BrokerClass. func NewBroker(options ...reconcilertesting.BrokerOption) runtime.Object { return doNewBroker(kafka.BrokerClass, options...) diff --git a/control-plane/pkg/reconciler/testing/objects_channel.go b/control-plane/pkg/reconciler/testing/objects_channel.go index 6967a86df9..1fac909fdd 100644 --- a/control-plane/pkg/reconciler/testing/objects_channel.go +++ b/control-plane/pkg/reconciler/testing/objects_channel.go @@ -17,8 +17,10 @@ package testing import ( + "bytes" "context" "fmt" + "text/template" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -69,6 +71,16 @@ func ChannelTopic() string { return topicName } +func CustomTopic(template *template.Template) string { + c := NewChannel() + var result bytes.Buffer + err := template.Execute(&result, c.ObjectMeta) + if err != nil { + panic("Failed to create custom topic name") + } + return result.String() +} + func NewChannel(options ...KRShapedOption) *messagingv1beta1.KafkaChannel { c := &messagingv1beta1.KafkaChannel{ ObjectMeta: metav1.ObjectMeta{ From c2c859193cd830018ee61256d7eb33255de56009 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Thu, 22 Jun 2023 16:01:07 -0400 Subject: [PATCH 08/19] Fixed go imports --- control-plane/pkg/reconciler/broker/broker_test.go | 3 ++- control-plane/pkg/reconciler/channel/channel_test.go | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/control-plane/pkg/reconciler/broker/broker_test.go b/control-plane/pkg/reconciler/broker/broker_test.go index f88442053c..e6f3fbe033 100644 --- a/control-plane/pkg/reconciler/broker/broker_test.go +++ b/control-plane/pkg/reconciler/broker/broker_test.go @@ -19,7 +19,6 @@ package broker_test // different package name due to import cycles. (broker -> t import ( "context" "fmt" - cm "knative.dev/pkg/configmap/testing" "net/url" "testing" "text/template" @@ -30,6 +29,8 @@ import ( "k8s.io/utils/pointer" + cm "knative.dev/pkg/configmap/testing" + "knative.dev/eventing-kafka-broker/control-plane/pkg/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka" kafkatesting "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/testing" diff --git a/control-plane/pkg/reconciler/channel/channel_test.go b/control-plane/pkg/reconciler/channel/channel_test.go index f65dbec592..5e458567dc 100644 --- a/control-plane/pkg/reconciler/channel/channel_test.go +++ b/control-plane/pkg/reconciler/channel/channel_test.go @@ -19,13 +19,14 @@ package channel_test import ( "context" "fmt" - "github.com/stretchr/testify/require" "strconv" "testing" "text/template" apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" + "github.com/stretchr/testify/require" + "github.com/Shopify/sarama" "k8s.io/utils/pointer" eventingduck "knative.dev/eventing/pkg/apis/duck/v1" From 6bb551c79c0ef1afe9d855fb87c47258b0430d7e Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Fri, 23 Jun 2023 13:27:52 -0400 Subject: [PATCH 09/19] use annotated topic names when deleting topics --- control-plane/pkg/reconciler/broker/broker.go | 6 +++--- control-plane/pkg/reconciler/channel/channel.go | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/control-plane/pkg/reconciler/broker/broker.go b/control-plane/pkg/reconciler/broker/broker.go index 0ae7222467..aeed0185df 100644 --- a/control-plane/pkg/reconciler/broker/broker.go +++ b/control-plane/pkg/reconciler/broker/broker.go @@ -510,9 +510,9 @@ func (r *Reconciler) finalizeNonExternalBrokerTopic(broker *eventing.Broker, sec } defer kafkaClusterAdminClient.Close() - topicName, err := r.KafkaFeatureFlags.ExecuteBrokersTopicTemplate(broker.ObjectMeta) - if err != nil { - return err + topicName, ok := broker.Status.Annotations[kafka.TopicAnnotation] + if !ok { + return fmt.Errorf("no topic annotated on broker") } topic, err := kafka.DeleteTopic(kafkaClusterAdminClient, topicName) if err != nil { diff --git a/control-plane/pkg/reconciler/channel/channel.go b/control-plane/pkg/reconciler/channel/channel.go index eb7ea270a0..27808ad73e 100644 --- a/control-plane/pkg/reconciler/channel/channel.go +++ b/control-plane/pkg/reconciler/channel/channel.go @@ -447,9 +447,9 @@ func (r *Reconciler) finalizeKind(ctx context.Context, channel *messagingv1beta1 } defer kafkaClusterAdminClient.Close() - topicName, err := r.KafkaFeatureFlags.ExecuteChannelsTopicTemplate(channel.ObjectMeta) - if err != nil { - return err + topicName, ok := channel.Status.Annotations[kafka.TopicAnnotation] + if !ok { + return fmt.Errorf("no topic annotated on channel") } topic, err := kafka.DeleteTopic(kafkaClusterAdminClient, topicName) if err != nil { From 88c5cad20e70a0b8a4e29d8fdb09ef3f9f6ef239 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Fri, 23 Jun 2023 13:33:57 -0400 Subject: [PATCH 10/19] switched from failing tests to using panics if topic names not setup properly --- test/rekt/features/broker_config.go | 8 +++----- test/rekt/features/kafka_sink.go | 10 +++------- 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/test/rekt/features/broker_config.go b/test/rekt/features/broker_config.go index c2bbc06cfc..41f434f3b4 100644 --- a/test/rekt/features/broker_config.go +++ b/test/rekt/features/broker_config.go @@ -54,11 +54,9 @@ func BrokerWithCustomReplicationFactorAndNumPartitions(env environment.Environme f.Setup("Broker ready", broker.IsReady(brokerName)) topic, err := apisconfig.DefaultFeaturesConfig().ExecuteBrokersTopicTemplate(metav1.ObjectMeta{Name: brokerName, Namespace: env.Namespace()}) - f.Assert("No error creating broker topic", func(ctx context.Context, t feature.T) { - if err != nil { - t.Fatal(err) - } - }) + if err != nil { + panic("failed to create broker topic name") + } f.Setup("Topic is ready", kafkatopic.IsReady(topic)) f.Assert("Replication factor", kafkatopic.HasReplicationFactor(topic, replicationFactor)) diff --git a/test/rekt/features/kafka_sink.go b/test/rekt/features/kafka_sink.go index 45b457ca9f..48bd4d4086 100644 --- a/test/rekt/features/kafka_sink.go +++ b/test/rekt/features/kafka_sink.go @@ -17,7 +17,6 @@ package features import ( - "context" "fmt" "time" @@ -77,12 +76,9 @@ func BrokerWithTriggersAndKafkaSink(env environment.Environment) *feature.Featur f := feature.NewFeatureNamed("Trigger with KafkaSink") topic, err := apisconfig.DefaultFeaturesConfig().ExecuteBrokersTopicTemplate(metav1.ObjectMeta{Name: brokerName, Namespace: env.Namespace()}) - - f.Assert("No error creating broker topic", func(ctx context.Context, t feature.T) { - if err != nil { - t.Fatal(err) - } - }) + if err != nil { + panic("failed to create broker topic name") + } f.Setup("Install broker", broker.Install(brokerName, broker.WithEnvConfig()...)) f.Setup("Broker is ready", broker.IsReady(brokerName)) From 8df556e8bded0c01142885309d53c277fa2906af Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Fri, 23 Jun 2023 13:39:54 -0400 Subject: [PATCH 11/19] updated kafka feature yaml files to match channel topic template --- .../200-controller/100-config-kafka-features.yaml | 2 +- control-plane/pkg/apis/config/features_test.go | 2 +- .../pkg/apis/config/testdata/config-kafka-features.yaml | 2 +- test/config/100-config-kafka-features.yaml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/control-plane/config/eventing-kafka-broker/200-controller/100-config-kafka-features.yaml b/control-plane/config/eventing-kafka-broker/200-controller/100-config-kafka-features.yaml index df8780a26d..c1b6babd13 100644 --- a/control-plane/config/eventing-kafka-broker/200-controller/100-config-kafka-features.yaml +++ b/control-plane/config/eventing-kafka-broker/200-controller/100-config-kafka-features.yaml @@ -47,4 +47,4 @@ data: controller.autoscaler: "disabled" triggers.consumergroup.template: "knative-trigger-{{ .Namespace }}-{{ .Name }}" brokers.topic.template: "knative-broker-{{ .Namespace }}-{{ .Name }}" - channels.topic.template: "knative-channel-{{ .Namespace }}-{{ .Name }}" + channels.topic.template: "knative-messaging-kafka.{{ .Namespace }}.{{ .Name }}" diff --git a/control-plane/pkg/apis/config/features_test.go b/control-plane/pkg/apis/config/features_test.go index e6ec711d77..a13aede852 100644 --- a/control-plane/pkg/apis/config/features_test.go +++ b/control-plane/pkg/apis/config/features_test.go @@ -166,7 +166,7 @@ func TestExecuteChannelsTopicTemplateDefault(t *testing.T) { require.NoError(t, err) } - require.Equal(t, result, "knative-channel-namespace-topic") + require.Equal(t, result, "knative-messaging-kafka.namespace.topic") } func TestExecuteChannelsTopicTemplateOverride(t *testing.T) { diff --git a/control-plane/pkg/apis/config/testdata/config-kafka-features.yaml b/control-plane/pkg/apis/config/testdata/config-kafka-features.yaml index 5f99736b6d..7546aee0c1 100644 --- a/control-plane/pkg/apis/config/testdata/config-kafka-features.yaml +++ b/control-plane/pkg/apis/config/testdata/config-kafka-features.yaml @@ -10,4 +10,4 @@ data: controller.autoscaler: "enabled" triggers.consumergroup.template: "knative-trigger-{{ .Namespace }}-{{ .Name }}" brokers.topic.template: "knative-broker-{{ .Namespace }}-{{ .Name }}" - channels.topic.template: "knative-channel-{{ .Namespace }}-{{ .Name }}" + channels.topic.template: "knative-messaging-kafka.{{ .Namespace }}.{{ .Name }}" diff --git a/test/config/100-config-kafka-features.yaml b/test/config/100-config-kafka-features.yaml index 57365e5f90..0f3a495aed 100644 --- a/test/config/100-config-kafka-features.yaml +++ b/test/config/100-config-kafka-features.yaml @@ -9,4 +9,4 @@ data: controller.autoscaler: "enabled" triggers.consumergroup.template: "knative-trigger-{{ .Namespace }}-{{ .Name }}" brokers.topic.template: "knative-broker-{{ .Namespace }}-{{ .Name }}" - channels.topic.template: "knative-channel-{{ .Namespace }}-{{ .Name }}" + channels.topic.template: "knative-messaging-kafka.{{ .Namespace }}.{{ .Name }}" From a575b5fa121a6a2b63438d7cbe13b7e02524f609 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Fri, 23 Jun 2023 14:46:19 -0400 Subject: [PATCH 12/19] updated kafka feature default vars to be private --- control-plane/pkg/apis/config/features.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/control-plane/pkg/apis/config/features.go b/control-plane/pkg/apis/config/features.go index 369644b3bb..ee954548e4 100644 --- a/control-plane/pkg/apis/config/features.go +++ b/control-plane/pkg/apis/config/features.go @@ -50,16 +50,16 @@ type KafkaFeatureFlags struct { } var ( - DefaultTriggersConsumerGroupTemplate *template.Template - DefaultBrokersTopicTemplate *template.Template - DefaultChannelsTopicTemplate *template.Template + defaultTriggersConsumerGroupTemplate *template.Template + defaultBrokersTopicTemplate *template.Template + defaultChannelsTopicTemplate *template.Template ) func init() { - DefaultTriggersConsumerGroupTemplate, _ = template.New("triggers.consumergroup.template").Parse("knative-trigger-{{ .Namespace }}-{{ .Name }}") - DefaultBrokersTopicTemplate, _ = template.New("brokers.topic.template").Parse("knative-broker-{{ .Namespace }}-{{ .Name }}") + defaultTriggersConsumerGroupTemplate, _ = template.New("triggers.consumergroup.template").Parse("knative-trigger-{{ .Namespace }}-{{ .Name }}") + defaultBrokersTopicTemplate, _ = template.New("brokers.topic.template").Parse("knative-broker-{{ .Namespace }}-{{ .Name }}") // This will resolve to the old naming convention, to prevent errors switching over to the new topic templates approach - DefaultChannelsTopicTemplate, _ = template.New("channels.topic.template").Parse("knative-messaging-kafka.{{ .Namespace }}.{{ .Name }}") + defaultChannelsTopicTemplate, _ = template.New("channels.topic.template").Parse("knative-messaging-kafka.{{ .Namespace }}.{{ .Name }}") } func DefaultFeaturesConfig() *KafkaFeatureFlags { @@ -68,9 +68,9 @@ func DefaultFeaturesConfig() *KafkaFeatureFlags { DispatcherRateLimiter: feature.Disabled, DispatcherOrderedExecutorMetrics: feature.Disabled, ControllerAutoscaler: feature.Disabled, - TriggersConsumerGroupTemplate: DefaultTriggersConsumerGroupTemplate, - BrokersTopicTemplate: DefaultBrokersTopicTemplate, - ChannelsTopicTemplate: DefaultChannelsTopicTemplate, + TriggersConsumerGroupTemplate: defaultTriggersConsumerGroupTemplate, + BrokersTopicTemplate: defaultBrokersTopicTemplate, + ChannelsTopicTemplate: defaultChannelsTopicTemplate, }, } } From 65dbd4a3ad651e7d8f604bdb842b2da44098a6bf Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Fri, 23 Jun 2023 15:58:20 -0400 Subject: [PATCH 13/19] updated tests --- control-plane/pkg/apis/config/features.go | 20 ++- .../pkg/reconciler/broker/broker_test.go | 33 ++--- .../broker/namespaced_broker_test.go | 2 +- .../pkg/reconciler/channel/channel_test.go | 59 ++++++--- .../reconciler/channel/v2/channelv2_test.go | 115 +++++++++++++++++- .../pkg/reconciler/testing/objects_channel.go | 27 +++- 6 files changed, 215 insertions(+), 41 deletions(-) diff --git a/control-plane/pkg/apis/config/features.go b/control-plane/pkg/apis/config/features.go index ee954548e4..d216822fbb 100644 --- a/control-plane/pkg/apis/config/features.go +++ b/control-plane/pkg/apis/config/features.go @@ -63,14 +63,28 @@ func init() { } func DefaultFeaturesConfig() *KafkaFeatureFlags { + // we need to clone the default values when creating a new features config + // otherwise, when calling NewFeaturesConfigFromMap, we will overwrite the default variables + triggersGroupTemplate, err := defaultTriggersConsumerGroupTemplate.Clone() + if err != nil { + panic("failed to clone default triggers group template") + } + brokersTopicTemplate, err := defaultBrokersTopicTemplate.Clone() + if err != nil { + panic("failed to clone default brokers topic template") + } + channelsTopicTemplate, err := defaultChannelsTopicTemplate.Clone() + if err != nil { + panic("failed to clone default channels topic template") + } return &KafkaFeatureFlags{ features: features{ DispatcherRateLimiter: feature.Disabled, DispatcherOrderedExecutorMetrics: feature.Disabled, ControllerAutoscaler: feature.Disabled, - TriggersConsumerGroupTemplate: defaultTriggersConsumerGroupTemplate, - BrokersTopicTemplate: defaultBrokersTopicTemplate, - ChannelsTopicTemplate: defaultChannelsTopicTemplate, + TriggersConsumerGroupTemplate: triggersGroupTemplate, + BrokersTopicTemplate: brokersTopicTemplate, + ChannelsTopicTemplate: channelsTopicTemplate, }, } } diff --git a/control-plane/pkg/reconciler/broker/broker_test.go b/control-plane/pkg/reconciler/broker/broker_test.go index e6f3fbe033..aed0a22887 100644 --- a/control-plane/pkg/reconciler/broker/broker_test.go +++ b/control-plane/pkg/reconciler/broker/broker_test.go @@ -29,8 +29,6 @@ import ( "k8s.io/utils/pointer" - cm "knative.dev/pkg/configmap/testing" - "knative.dev/eventing-kafka-broker/control-plane/pkg/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka" kafkatesting "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/testing" @@ -78,9 +76,7 @@ const ( testProber = "testProber" externalTopic = "externalTopic" - useCustomTopicTemplate = "use-custom-topic-template" - - FlagsConfigName = "config-kafka-features" + kafkaFeatureFlags = "kafka-feature-flags" ) const ( @@ -107,10 +103,8 @@ var ( createTopicError = fmt.Errorf("failed to create topic") deleteTopicError = fmt.Errorf("failed to delete topic") - linear = eventingduck.BackoffPolicyLinear - exponential = eventingduck.BackoffPolicyExponential - kafkaFeatureFlags = apisconfig.DefaultFeaturesConfig() - + linear = eventingduck.BackoffPolicyLinear + exponential = eventingduck.BackoffPolicyExponential customBrokerTopicTemplate = customTemplate() ) @@ -2220,7 +2214,11 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { }, OtherTestData: map[string]interface{}{ - useCustomTopicTemplate: true, + kafkaFeatureFlags: newKafkaFeaturesConfigFromMap(&corev1.ConfigMap{ + Data: map[string]string{ + "brokers.topic.template": "custom-broker-template.{{ .Namespace }}-{{ .Name }}", + }, + }), }, }, } @@ -2862,11 +2860,8 @@ func useTable(t *testing.T, table TableTest, env *config.Env) { } var featureFlags *apisconfig.KafkaFeatureFlags - var err error - if useCustomTemplate, ok := row.OtherTestData[useCustomTopicTemplate]; ok == true && useCustomTemplate == true { - _, example := cm.ConfigMapsFromTestFile(t, FlagsConfigName) - featureFlags, err = apisconfig.NewFeaturesConfigFromMap(example) - require.NoError(t, err) + if v, ok := row.OtherTestData[kafkaFeatureFlags]; ok { + featureFlags = v.(*apisconfig.KafkaFeatureFlags) } else { featureFlags = apisconfig.DefaultFeaturesConfig() } @@ -2987,3 +2982,11 @@ func customTemplate() *template.Template { brokersTemplate, _ := template.New("brokers.topic.template").Parse("custom-broker-template.{{ .Namespace }}-{{ .Name }}") return brokersTemplate } + +func newKafkaFeaturesConfigFromMap(cm *corev1.ConfigMap) *apisconfig.KafkaFeatureFlags { + featureFlags, err := apisconfig.NewFeaturesConfigFromMap(cm) + if err != nil { + panic("failed to create kafka features from config map") + } + return featureFlags +} diff --git a/control-plane/pkg/reconciler/broker/namespaced_broker_test.go b/control-plane/pkg/reconciler/broker/namespaced_broker_test.go index e13b818b42..b25ac50333 100644 --- a/control-plane/pkg/reconciler/broker/namespaced_broker_test.go +++ b/control-plane/pkg/reconciler/broker/namespaced_broker_test.go @@ -454,7 +454,7 @@ func useTableNamespaced(t *testing.T, table TableTest, env *config.Env) { expectedTopicDetail = td.(sarama.TopicDetail) } - expectedTopicName, err := kafkaFeatureFlags.ExecuteBrokersTopicTemplate(metav1.ObjectMeta{Namespace: BrokerNamespace, Name: BrokerName}) + expectedTopicName, err := apisconfig.DefaultFeaturesConfig().ExecuteBrokersTopicTemplate(metav1.ObjectMeta{Namespace: BrokerNamespace, Name: BrokerName}) require.NoError(t, err, "Failed to create broker topic name from feature flags") if t, ok := row.OtherTestData[externalTopic]; ok { expectedTopicName = t.(string) diff --git a/control-plane/pkg/reconciler/channel/channel_test.go b/control-plane/pkg/reconciler/channel/channel_test.go index 5e458567dc..22e595359c 100644 --- a/control-plane/pkg/reconciler/channel/channel_test.go +++ b/control-plane/pkg/reconciler/channel/channel_test.go @@ -19,14 +19,13 @@ package channel_test import ( "context" "fmt" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "strconv" "testing" "text/template" apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" - "github.com/stretchr/testify/require" - "github.com/Shopify/sarama" "k8s.io/utils/pointer" eventingduck "knative.dev/eventing/pkg/apis/duck/v1" @@ -42,7 +41,6 @@ import ( "k8s.io/apimachinery/pkg/types" clientgotesting "k8s.io/client-go/testing" kubeclient "knative.dev/pkg/client/injection/kube/client/fake" - cm "knative.dev/pkg/configmap/testing" "knative.dev/pkg/controller" "knative.dev/pkg/logging" . "knative.dev/pkg/reconciler/testing" @@ -73,9 +71,7 @@ const ( TestExpectedReplicationFactor = "TestExpectedReplicationFactor" TestExpectedRetentionDuration = "TestExpectedRetentionDuration" - useCustomTopicTemplate = "use-custom-topic-template" - - FlagsConfigName = "config-kafka-features" + kafkaFeatureFlags = "kafka-feature-flags" ) var finalizerUpdatedEvent = Eventf( @@ -123,7 +119,10 @@ func TestReconcileKind(t *testing.T) { Name: "Channel is being deleted, probe not ready", Key: testKey, Objects: []runtime.Object{ - NewChannel( + NewChannelWithAnnotations( + map[string]string{ + kafka.TopicAnnotation: defaultTopicName(), + }, WithInitKafkaChannelConditions, WithDeletedTimeStamp), NewConfigMapWithTextData(system.Namespace(), DefaultEnv.GeneralConfigMapName, map[string]string{ @@ -1770,7 +1769,11 @@ func TestReconcileKind(t *testing.T) { finalizerUpdatedEvent, }, OtherTestData: map[string]interface{}{ - useCustomTopicTemplate: true, + kafkaFeatureFlags: newKafkaFeaturesConfigFromMap(&corev1.ConfigMap{ + Data: map[string]string{ + "channels.topic.template": "custom-channel-template.{{ .Namespace }}.{{ .Name }}", + }, + }), }, }, } @@ -1791,7 +1794,9 @@ func TestFinalizeKind(t *testing.T) { { Name: "Finalize normal - no auth", Objects: []runtime.Object{ - NewDeletedChannel(), + NewDeletedChannel(map[string]string{ + kafka.TopicAnnotation: defaultTopicName(), + }), NewConfigMapFromContract(&contract.Contract{ Generation: 1, Resources: []*contract.Resource{ @@ -1850,14 +1855,11 @@ func useTable(t *testing.T, table TableTest, env config.Env) { proberMock = p.(prober.Prober) } - var kafkaFeatureFlags *apisconfig.KafkaFeatureFlags - var err error - if useCustomTemplate, ok := row.OtherTestData[useCustomTopicTemplate]; ok == true && useCustomTemplate == true { - _, example := cm.ConfigMapsFromTestFile(t, FlagsConfigName) - kafkaFeatureFlags, err = apisconfig.NewFeaturesConfigFromMap(example) - require.NoError(t, err) + var featureFlags *apisconfig.KafkaFeatureFlags + if v, ok := row.OtherTestData[kafkaFeatureFlags]; ok { + featureFlags = v.(*apisconfig.KafkaFeatureFlags) } else { - kafkaFeatureFlags = apisconfig.DefaultFeaturesConfig() + featureFlags = apisconfig.DefaultFeaturesConfig() } numPartitions := int32(1) @@ -1881,6 +1883,11 @@ func useTable(t *testing.T, table TableTest, env config.Env) { retentionMillisString := strconv.FormatInt(retentionDuration.Milliseconds(), 10) + expectedTopicName, err := featureFlags.ExecuteChannelsTopicTemplate(metav1.ObjectMeta{Name: ChannelName, Namespace: ChannelNamespace, UID: ChannelUUID}) + if err != nil { + panic("failed to create expected topic name") + } + reconciler := &Reconciler{ Reconciler: &base.Reconciler{ KubeClient: kubeclient.Get(ctx), @@ -1905,7 +1912,7 @@ func useTable(t *testing.T, table TableTest, env config.Env) { }, NewKafkaClusterAdminClient: func(_ []string, _ *sarama.Config) (sarama.ClusterAdmin, error) { return &kafkatesting.MockKafkaClusterAdmin{ - ExpectedTopicName: ChannelTopic(), + ExpectedTopicName: expectedTopicName, ExpectedTopicDetail: sarama.TopicDetail{ NumPartitions: numPartitions, ReplicationFactor: replicationFactor, @@ -1918,7 +1925,7 @@ func useTable(t *testing.T, table TableTest, env config.Env) { }, Prober: proberMock, IngressHost: network.GetServiceHostname(env.IngressName, env.SystemNamespace), - KafkaFeatureFlags: kafkaFeatureFlags, + KafkaFeatureFlags: featureFlags, } reconciler.Tracker = &FakeTracker{} @@ -1947,7 +1954,23 @@ func patchFinalizers() clientgotesting.PatchActionImpl { return action } +func defaultTopicName() string { + topicName, err := apisconfig.DefaultFeaturesConfig().ExecuteChannelsTopicTemplate(metav1.ObjectMeta{Name: ChannelName, Namespace: ChannelNamespace}) + if err != nil { + panic("failed to create default channel topic name") + } + return topicName +} + func customTemplate() *template.Template { channelsTemplate, _ := template.New("channels.topic.template").Parse("custom-channel-template.{{ .Namespace }}.{{ .Name }}") return channelsTemplate } + +func newKafkaFeaturesConfigFromMap(cm *corev1.ConfigMap) *apisconfig.KafkaFeatureFlags { + featureFlags, err := apisconfig.NewFeaturesConfigFromMap(cm) + if err != nil { + panic("failed to create kafka features from config map") + } + return featureFlags +} diff --git a/control-plane/pkg/reconciler/channel/v2/channelv2_test.go b/control-plane/pkg/reconciler/channel/v2/channelv2_test.go index a6883638c5..42c40e219f 100644 --- a/control-plane/pkg/reconciler/channel/v2/channelv2_test.go +++ b/control-plane/pkg/reconciler/channel/v2/channelv2_test.go @@ -19,8 +19,10 @@ package v2 import ( "context" "fmt" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "strconv" "testing" + "text/template" "github.com/Shopify/sarama" corev1 "k8s.io/api/core/v1" @@ -68,6 +70,8 @@ const ( TestExpectedDataNumPartitions = "TestExpectedDataNumPartitions" TestExpectedReplicationFactor = "TestExpectedReplicationFactor" TestExpectedRetentionDuration = "TestExpectedRetentionDuration" + + kafkaFeatureFlags = "kafka-feature-flags" ) var finalizerUpdatedEvent = Eventf( @@ -76,6 +80,8 @@ var finalizerUpdatedEvent = Eventf( fmt.Sprintf(`Updated %q finalizers`, ChannelName), ) +var customChannelTopicTemplate = customTemplate() + var DefaultEnv = &config.Env{ DataPlaneConfigMapNamespace: "knative-eventing", ContractConfigMapName: "kafka-channel-channels-subscriptions", @@ -1611,6 +1617,86 @@ func TestReconcileKind(t *testing.T) { ), }, }, + { + Name: "Reconciled normal - with custom template", + Objects: []runtime.Object{ + NewChannel( + WithChannelDelivery(&eventingduck.DeliverySpec{ + DeadLetterSink: ServiceDestination, + Retry: pointer.Int32(5), + }), + ), + NewConfigMapWithTextData(env.SystemNamespace, DefaultEnv.GeneralConfigMapName, map[string]string{ + kafka.BootstrapServersConfigMapKey: ChannelBootstrapServers, + }), + ChannelReceiverPod(env.SystemNamespace, map[string]string{ + base.VolumeGenerationAnnotationKey: "0", + "annotation_to_preserve": "value_to_preserve", + }), + }, + Key: testKey, + WantUpdates: []clientgotesting.UpdateActionImpl{ + ConfigMapUpdate(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, env.ContractConfigMapFormat, &contract.Contract{ + Generation: 1, + Resources: []*contract.Resource{ + { + Uid: ChannelUUID, + Topics: []string{CustomTopic(customChannelTopicTemplate)}, + BootstrapServers: ChannelBootstrapServers, + Reference: ChannelReference(), + Ingress: &contract.Ingress{ + Host: receiver.Host(ChannelNamespace, ChannelName), + }, + EgressConfig: &contract.EgressConfig{ + DeadLetter: ServiceURL, + Retry: 5, + }, + }, + }, + }), + ChannelReceiverPodUpdate(env.SystemNamespace, map[string]string{ + "annotation_to_preserve": "value_to_preserve", + base.VolumeGenerationAnnotationKey: "1", + }), + }, + SkipNamespaceValidation: true, // WantCreates compare the channel namespace with configmap namespace, so skip it + WantCreates: []runtime.Object{ + NewConfigMapWithBinaryData(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, nil), + NewPerChannelService(&env), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{ + { + Object: NewChannel( + WithChannelDelivery(&eventingduck.DeliverySpec{ + DeadLetterSink: ServiceDestination, + Retry: pointer.Int32(5), + }), + WithInitKafkaChannelConditions, + StatusConfigParsed, + StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(CustomTopic(customChannelTopicTemplate)), + StatusTopicReadyWithName(CustomTopic(customChannelTopicTemplate)), + ChannelAddressable(&env), + StatusProbeSucceeded, + StatusChannelSubscribers(), + WithChannelDeadLetterSinkURI(ServiceURL), + ), + }, + }, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(), + }, + WantEvents: []string{ + finalizerUpdatedEvent, + }, + OtherTestData: map[string]interface{}{ + kafkaFeatureFlags: newKafkaFeaturesConfigFromMap(&corev1.ConfigMap{ + Data: map[string]string{ + "channels.topic.template": "custom-channel-template.{{ .Namespace}}.{{ .Name }}", + }, + }), + }, + }, } table.Test(t, NewFactory(&env, func(ctx context.Context, listers *Listers, env *config.Env, row *TableRow) controller.Reconciler { @@ -1619,6 +1705,13 @@ func TestReconcileKind(t *testing.T) { proberMock = p.(prober.Prober) } + var featureFlags *apisconfig.KafkaFeatureFlags + if v, ok := row.OtherTestData[kafkaFeatureFlags]; ok { + featureFlags = v.(*apisconfig.KafkaFeatureFlags) + } else { + featureFlags = apisconfig.DefaultFeaturesConfig() + } + numPartitions := int32(1) if v, ok := row.OtherTestData[TestExpectedDataNumPartitions]; ok { numPartitions = v.(int32) @@ -1640,6 +1733,11 @@ func TestReconcileKind(t *testing.T) { retentionMillisString := strconv.FormatInt(retentionDuration.Milliseconds(), 10) + expectedTopicName, err := featureFlags.ExecuteChannelsTopicTemplate(metav1.ObjectMeta{Name: ChannelName, Namespace: ChannelNamespace, UID: ChannelUUID}) + if err != nil { + panic("failed to create expected topic name") + } + reconciler := &Reconciler{ Reconciler: &base.Reconciler{ KubeClient: kubeclient.Get(ctx), @@ -1655,7 +1753,7 @@ func TestReconcileKind(t *testing.T) { Env: env, NewKafkaClusterAdminClient: func(_ []string, _ *sarama.Config) (sarama.ClusterAdmin, error) { return &kafkatesting.MockKafkaClusterAdmin{ - ExpectedTopicName: ChannelTopic(), + ExpectedTopicName: expectedTopicName, ExpectedTopicDetail: sarama.TopicDetail{ NumPartitions: numPartitions, ReplicationFactor: replicationFactor, @@ -1673,7 +1771,7 @@ func TestReconcileKind(t *testing.T) { InternalsClient: fakeconsumergroupinformer.Get(ctx), Prober: proberMock, IngressHost: network.GetServiceHostname(env.IngressName, env.SystemNamespace), - KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(), + KafkaFeatureFlags: featureFlags, } reconciler.Tracker = &FakeTracker{} reconciler.Tracker = &FakeTracker{} @@ -1719,3 +1817,16 @@ func patchFinalizers() clientgotesting.PatchActionImpl { action.Patch = []byte(patch) return action } + +func customTemplate() *template.Template { + channelsTemplate, _ := template.New("channels.topic.template").Parse("custom-channel-template.{{ .Namespace }}.{{ .Name }}") + return channelsTemplate +} + +func newKafkaFeaturesConfigFromMap(cm *corev1.ConfigMap) *apisconfig.KafkaFeatureFlags { + featureFlags, err := apisconfig.NewFeaturesConfigFromMap(cm) + if err != nil { + panic("failed to create kafka features from config map") + } + return featureFlags +} diff --git a/control-plane/pkg/reconciler/testing/objects_channel.go b/control-plane/pkg/reconciler/testing/objects_channel.go index 1fac909fdd..8c972e52b5 100644 --- a/control-plane/pkg/reconciler/testing/objects_channel.go +++ b/control-plane/pkg/reconciler/testing/objects_channel.go @@ -81,6 +81,24 @@ func CustomTopic(template *template.Template) string { return result.String() } +func NewChannelWithAnnotations(annotations map[string]string, options ...KRShapedOption) *messagingv1beta1.KafkaChannel { + c := &messagingv1beta1.KafkaChannel{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ChannelNamespace, + Name: ChannelName, + UID: ChannelUUID, + }, + Status: messagingv1beta1.KafkaChannelStatus{ + ChannelableStatus: eventingduckv1.ChannelableStatus{ + Status: duckv1.Status{ + Annotations: annotations, + }, + }, + }, + } + return applyOptionsToChannel(c, options...) +} + func NewChannel(options ...KRShapedOption) *messagingv1beta1.KafkaChannel { c := &messagingv1beta1.KafkaChannel{ ObjectMeta: metav1.ObjectMeta{ @@ -89,6 +107,10 @@ func NewChannel(options ...KRShapedOption) *messagingv1beta1.KafkaChannel { UID: ChannelUUID, }, } + return applyOptionsToChannel(c, options...) +} + +func applyOptionsToChannel(c *messagingv1beta1.KafkaChannel, options ...KRShapedOption) *messagingv1beta1.KafkaChannel { for _, opt := range options { opt(c) } @@ -96,8 +118,9 @@ func NewChannel(options ...KRShapedOption) *messagingv1beta1.KafkaChannel { return c } -func NewDeletedChannel(options ...KRShapedOption) runtime.Object { - return NewChannel( +func NewDeletedChannel(annotations map[string]string, options ...KRShapedOption) runtime.Object { + return NewChannelWithAnnotations( + annotations, append( options, WithDeletedTimeStamp, From b9a0a93686858ae3d700a3bd720decb0a508863f Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Fri, 23 Jun 2023 16:01:39 -0400 Subject: [PATCH 14/19] Fixed goimports --- control-plane/pkg/reconciler/channel/channel_test.go | 3 ++- control-plane/pkg/reconciler/channel/v2/channelv2_test.go | 3 ++- test/rekt/features/broker_config.go | 2 -- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/control-plane/pkg/reconciler/channel/channel_test.go b/control-plane/pkg/reconciler/channel/channel_test.go index 22e595359c..fdaa13e5f3 100644 --- a/control-plane/pkg/reconciler/channel/channel_test.go +++ b/control-plane/pkg/reconciler/channel/channel_test.go @@ -19,13 +19,14 @@ package channel_test import ( "context" "fmt" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "strconv" "testing" "text/template" apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/Shopify/sarama" "k8s.io/utils/pointer" eventingduck "knative.dev/eventing/pkg/apis/duck/v1" diff --git a/control-plane/pkg/reconciler/channel/v2/channelv2_test.go b/control-plane/pkg/reconciler/channel/v2/channelv2_test.go index 42c40e219f..9d72bf0e75 100644 --- a/control-plane/pkg/reconciler/channel/v2/channelv2_test.go +++ b/control-plane/pkg/reconciler/channel/v2/channelv2_test.go @@ -19,11 +19,12 @@ package v2 import ( "context" "fmt" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "strconv" "testing" "text/template" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/Shopify/sarama" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" diff --git a/test/rekt/features/broker_config.go b/test/rekt/features/broker_config.go index 41f434f3b4..50c6fecb53 100644 --- a/test/rekt/features/broker_config.go +++ b/test/rekt/features/broker_config.go @@ -17,8 +17,6 @@ package features import ( - "context" - testpkg "knative.dev/eventing-kafka-broker/test/pkg" "knative.dev/reconciler-test/pkg/environment" "knative.dev/reconciler-test/pkg/feature" From fad1041f4420f47531d3bc43afd6a9c841366f26 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Fri, 23 Jun 2023 16:13:13 -0400 Subject: [PATCH 15/19] Fixed broker finalizer tests --- .../pkg/reconciler/broker/broker_test.go | 21 +++++++++++-------- .../pkg/reconciler/testing/objects_broker.go | 10 +++++++++ 2 files changed, 22 insertions(+), 9 deletions(-) diff --git a/control-plane/pkg/reconciler/broker/broker_test.go b/control-plane/pkg/reconciler/broker/broker_test.go index aed0a22887..3a91102b25 100644 --- a/control-plane/pkg/reconciler/broker/broker_test.go +++ b/control-plane/pkg/reconciler/broker/broker_test.go @@ -2272,7 +2272,7 @@ func brokerFinalization(t *testing.T, format string, env config.Env) { { Name: "Reconciled normal - no DLS", Objects: []runtime.Object{ - NewDeletedBroker(), + NewDeletedBrokerWithAnnotatedTopicName(BrokerTopic()), BrokerConfig(bootstrapServers, 20, 5), NewConfigMapFromContract(&contract.Contract{ Resources: []*contract.Resource{ @@ -2299,7 +2299,8 @@ func brokerFinalization(t *testing.T, format string, env config.Env) { { Name: "Reconciled normal - no ConfigMap, rebuild from annotations", Objects: []runtime.Object{ - NewDeletedBroker( + NewDeletedBrokerWithAnnotatedTopicName( + BrokerTopic(), BrokerConfigMapAnnotations(), ), NewConfigMapFromContract(&contract.Contract{ @@ -2353,7 +2354,7 @@ func brokerFinalization(t *testing.T, format string, env config.Env) { { Name: "Reconciled failed - probe not ready", Objects: []runtime.Object{ - NewDeletedBroker(), + NewDeletedBrokerWithAnnotatedTopicName(BrokerTopic()), BrokerConfig(bootstrapServers, 20, 5), NewConfigMapFromContract(&contract.Contract{ Resources: []*contract.Resource{ @@ -2381,7 +2382,8 @@ func brokerFinalization(t *testing.T, format string, env config.Env) { { Name: "Reconciled normal - with DLS", Objects: []runtime.Object{ - NewDeletedBroker( + NewDeletedBrokerWithAnnotatedTopicName( + BrokerTopic(), WithDelivery(), ), BrokerConfig(bootstrapServers, 20, 5), @@ -2553,7 +2555,7 @@ func brokerFinalization(t *testing.T, format string, env config.Env) { { Name: "Failed to delete topic", Objects: []runtime.Object{ - NewDeletedBroker(), + NewDeletedBrokerWithAnnotatedTopicName(BrokerTopic()), BrokerConfig(bootstrapServers, 20, 5), NewConfigMapFromContract(&contract.Contract{ Resources: []*contract.Resource{ @@ -2589,7 +2591,8 @@ func brokerFinalization(t *testing.T, format string, env config.Env) { { Name: "Config map not found - create config map", Objects: []runtime.Object{ - NewDeletedBroker( + NewDeletedBrokerWithAnnotatedTopicName( + BrokerTopic(), WithDelivery(), ), BrokerConfig(bootstrapServers, 20, 5), @@ -2608,7 +2611,7 @@ func brokerFinalization(t *testing.T, format string, env config.Env) { { Name: "Reconciled normal - preserve config map previous state", Objects: []runtime.Object{ - NewDeletedBroker(), + NewDeletedBrokerWithAnnotatedTopicName(BrokerTopic()), BrokerConfig(bootstrapServers, 20, 5), NewConfigMapFromContract(&contract.Contract{ Resources: []*contract.Resource{ @@ -2646,7 +2649,7 @@ func brokerFinalization(t *testing.T, format string, env config.Env) { { Name: "Reconciled normal - topic doesn't exist", Objects: []runtime.Object{ - NewDeletedBroker(), + NewDeletedBrokerWithAnnotatedTopicName(BrokerTopic()), BrokerConfig(bootstrapServers, 20, 5), NewConfigMapFromContract(&contract.Contract{ Resources: []*contract.Resource{ @@ -2685,7 +2688,7 @@ func brokerFinalization(t *testing.T, format string, env config.Env) { { Name: "Reconciled normal - no broker found in config map", Objects: []runtime.Object{ - NewDeletedBroker(), + NewDeletedBrokerWithAnnotatedTopicName(BrokerTopic()), BrokerConfig(bootstrapServers, 20, 5), NewConfigMapFromContract(&contract.Contract{ Resources: []*contract.Resource{ diff --git a/control-plane/pkg/reconciler/testing/objects_broker.go b/control-plane/pkg/reconciler/testing/objects_broker.go index f3f37a992a..87df1e84b3 100644 --- a/control-plane/pkg/reconciler/testing/objects_broker.go +++ b/control-plane/pkg/reconciler/testing/objects_broker.go @@ -124,6 +124,16 @@ func NewDeletedBroker(options ...reconcilertesting.BrokerOption) runtime.Object ) } +func NewDeletedBrokerWithAnnotatedTopicName(topicName string, options ...reconcilertesting.BrokerOption) runtime.Object { + b := NewDeletedBroker(options...) + broker := b.(*eventing.Broker) + if broker.Status.Annotations == nil { + broker.Status.Annotations = make(map[string]string) + } + broker.Status.Annotations[kafka.TopicAnnotation] = topicName + return broker +} + func NewDeletedBrokerWithoutConfigMapAnnotations(options ...reconcilertesting.BrokerOption) runtime.Object { return NewBroker( append( From e41e196ce8cda30421912872ca551b478b2f23ad Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Mon, 26 Jun 2023 09:14:47 -0400 Subject: [PATCH 16/19] Updated unit tests --- .../pkg/reconciler/broker/broker_test.go | 24 +++++++++---------- .../broker/namespaced_broker_test.go | 5 +++- .../pkg/reconciler/channel/channel_test.go | 10 +++----- .../pkg/reconciler/testing/objects_broker.go | 10 -------- .../pkg/reconciler/testing/objects_channel.go | 23 ++---------------- 5 files changed, 21 insertions(+), 51 deletions(-) diff --git a/control-plane/pkg/reconciler/broker/broker_test.go b/control-plane/pkg/reconciler/broker/broker_test.go index 3a91102b25..3466517972 100644 --- a/control-plane/pkg/reconciler/broker/broker_test.go +++ b/control-plane/pkg/reconciler/broker/broker_test.go @@ -2272,7 +2272,7 @@ func brokerFinalization(t *testing.T, format string, env config.Env) { { Name: "Reconciled normal - no DLS", Objects: []runtime.Object{ - NewDeletedBrokerWithAnnotatedTopicName(BrokerTopic()), + NewDeletedBroker(WithTopicStatusAnnotation(BrokerTopic())), BrokerConfig(bootstrapServers, 20, 5), NewConfigMapFromContract(&contract.Contract{ Resources: []*contract.Resource{ @@ -2299,8 +2299,8 @@ func brokerFinalization(t *testing.T, format string, env config.Env) { { Name: "Reconciled normal - no ConfigMap, rebuild from annotations", Objects: []runtime.Object{ - NewDeletedBrokerWithAnnotatedTopicName( - BrokerTopic(), + NewDeletedBroker( + WithTopicStatusAnnotation(BrokerTopic()), BrokerConfigMapAnnotations(), ), NewConfigMapFromContract(&contract.Contract{ @@ -2354,7 +2354,7 @@ func brokerFinalization(t *testing.T, format string, env config.Env) { { Name: "Reconciled failed - probe not ready", Objects: []runtime.Object{ - NewDeletedBrokerWithAnnotatedTopicName(BrokerTopic()), + NewDeletedBroker(WithTopicStatusAnnotation(BrokerTopic())), BrokerConfig(bootstrapServers, 20, 5), NewConfigMapFromContract(&contract.Contract{ Resources: []*contract.Resource{ @@ -2382,8 +2382,8 @@ func brokerFinalization(t *testing.T, format string, env config.Env) { { Name: "Reconciled normal - with DLS", Objects: []runtime.Object{ - NewDeletedBrokerWithAnnotatedTopicName( - BrokerTopic(), + NewDeletedBroker( + WithTopicStatusAnnotation(BrokerTopic()), WithDelivery(), ), BrokerConfig(bootstrapServers, 20, 5), @@ -2555,7 +2555,7 @@ func brokerFinalization(t *testing.T, format string, env config.Env) { { Name: "Failed to delete topic", Objects: []runtime.Object{ - NewDeletedBrokerWithAnnotatedTopicName(BrokerTopic()), + NewDeletedBroker(WithTopicStatusAnnotation(BrokerTopic())), BrokerConfig(bootstrapServers, 20, 5), NewConfigMapFromContract(&contract.Contract{ Resources: []*contract.Resource{ @@ -2591,8 +2591,8 @@ func brokerFinalization(t *testing.T, format string, env config.Env) { { Name: "Config map not found - create config map", Objects: []runtime.Object{ - NewDeletedBrokerWithAnnotatedTopicName( - BrokerTopic(), + NewDeletedBroker( + WithTopicStatusAnnotation(BrokerTopic()), WithDelivery(), ), BrokerConfig(bootstrapServers, 20, 5), @@ -2611,7 +2611,7 @@ func brokerFinalization(t *testing.T, format string, env config.Env) { { Name: "Reconciled normal - preserve config map previous state", Objects: []runtime.Object{ - NewDeletedBrokerWithAnnotatedTopicName(BrokerTopic()), + NewDeletedBroker(WithTopicStatusAnnotation(BrokerTopic())), BrokerConfig(bootstrapServers, 20, 5), NewConfigMapFromContract(&contract.Contract{ Resources: []*contract.Resource{ @@ -2649,7 +2649,7 @@ func brokerFinalization(t *testing.T, format string, env config.Env) { { Name: "Reconciled normal - topic doesn't exist", Objects: []runtime.Object{ - NewDeletedBrokerWithAnnotatedTopicName(BrokerTopic()), + NewDeletedBroker(WithTopicStatusAnnotation(BrokerTopic())), BrokerConfig(bootstrapServers, 20, 5), NewConfigMapFromContract(&contract.Contract{ Resources: []*contract.Resource{ @@ -2688,7 +2688,7 @@ func brokerFinalization(t *testing.T, format string, env config.Env) { { Name: "Reconciled normal - no broker found in config map", Objects: []runtime.Object{ - NewDeletedBrokerWithAnnotatedTopicName(BrokerTopic()), + NewDeletedBroker(WithTopicStatusAnnotation(BrokerTopic())), BrokerConfig(bootstrapServers, 20, 5), NewConfigMapFromContract(&contract.Contract{ Resources: []*contract.Resource{ diff --git a/control-plane/pkg/reconciler/broker/namespaced_broker_test.go b/control-plane/pkg/reconciler/broker/namespaced_broker_test.go index b25ac50333..2b9c11439d 100644 --- a/control-plane/pkg/reconciler/broker/namespaced_broker_test.go +++ b/control-plane/pkg/reconciler/broker/namespaced_broker_test.go @@ -343,7 +343,10 @@ func namespacedBrokerFinalization(t *testing.T, format string, env config.Env) { reconcilertesting.NewNamespace(BrokerNamespace, func(ns *corev1.Namespace) { ns.UID = BrokerNamespaceUUID }), - NewDeletedBroker(reconcilertesting.WithBrokerClass(kafka.NamespacedBrokerClass)), + NewDeletedBroker( + WithTopicStatusAnnotation(BrokerTopic()), + reconcilertesting.WithBrokerClass(kafka.NamespacedBrokerClass), + ), BrokerConfig(bootstrapServers, 20, 5), NewConfigMapFromContract(&contract.Contract{ Resources: []*contract.Resource{ diff --git a/control-plane/pkg/reconciler/channel/channel_test.go b/control-plane/pkg/reconciler/channel/channel_test.go index fdaa13e5f3..eb43fa72d7 100644 --- a/control-plane/pkg/reconciler/channel/channel_test.go +++ b/control-plane/pkg/reconciler/channel/channel_test.go @@ -120,10 +120,8 @@ func TestReconcileKind(t *testing.T) { Name: "Channel is being deleted, probe not ready", Key: testKey, Objects: []runtime.Object{ - NewChannelWithAnnotations( - map[string]string{ - kafka.TopicAnnotation: defaultTopicName(), - }, + NewChannel( + WithChannelTopicStatusAnnotation(defaultTopicName()), WithInitKafkaChannelConditions, WithDeletedTimeStamp), NewConfigMapWithTextData(system.Namespace(), DefaultEnv.GeneralConfigMapName, map[string]string{ @@ -1795,9 +1793,7 @@ func TestFinalizeKind(t *testing.T) { { Name: "Finalize normal - no auth", Objects: []runtime.Object{ - NewDeletedChannel(map[string]string{ - kafka.TopicAnnotation: defaultTopicName(), - }), + NewDeletedChannel(WithChannelTopicStatusAnnotation(defaultTopicName())), NewConfigMapFromContract(&contract.Contract{ Generation: 1, Resources: []*contract.Resource{ diff --git a/control-plane/pkg/reconciler/testing/objects_broker.go b/control-plane/pkg/reconciler/testing/objects_broker.go index 87df1e84b3..f3f37a992a 100644 --- a/control-plane/pkg/reconciler/testing/objects_broker.go +++ b/control-plane/pkg/reconciler/testing/objects_broker.go @@ -124,16 +124,6 @@ func NewDeletedBroker(options ...reconcilertesting.BrokerOption) runtime.Object ) } -func NewDeletedBrokerWithAnnotatedTopicName(topicName string, options ...reconcilertesting.BrokerOption) runtime.Object { - b := NewDeletedBroker(options...) - broker := b.(*eventing.Broker) - if broker.Status.Annotations == nil { - broker.Status.Annotations = make(map[string]string) - } - broker.Status.Annotations[kafka.TopicAnnotation] = topicName - return broker -} - func NewDeletedBrokerWithoutConfigMapAnnotations(options ...reconcilertesting.BrokerOption) runtime.Object { return NewBroker( append( diff --git a/control-plane/pkg/reconciler/testing/objects_channel.go b/control-plane/pkg/reconciler/testing/objects_channel.go index 8c972e52b5..1fe645a6ea 100644 --- a/control-plane/pkg/reconciler/testing/objects_channel.go +++ b/control-plane/pkg/reconciler/testing/objects_channel.go @@ -81,24 +81,6 @@ func CustomTopic(template *template.Template) string { return result.String() } -func NewChannelWithAnnotations(annotations map[string]string, options ...KRShapedOption) *messagingv1beta1.KafkaChannel { - c := &messagingv1beta1.KafkaChannel{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: ChannelNamespace, - Name: ChannelName, - UID: ChannelUUID, - }, - Status: messagingv1beta1.KafkaChannelStatus{ - ChannelableStatus: eventingduckv1.ChannelableStatus{ - Status: duckv1.Status{ - Annotations: annotations, - }, - }, - }, - } - return applyOptionsToChannel(c, options...) -} - func NewChannel(options ...KRShapedOption) *messagingv1beta1.KafkaChannel { c := &messagingv1beta1.KafkaChannel{ ObjectMeta: metav1.ObjectMeta{ @@ -118,9 +100,8 @@ func applyOptionsToChannel(c *messagingv1beta1.KafkaChannel, options ...KRShaped return c } -func NewDeletedChannel(annotations map[string]string, options ...KRShapedOption) runtime.Object { - return NewChannelWithAnnotations( - annotations, +func NewDeletedChannel(options ...KRShapedOption) runtime.Object { + return NewChannel( append( options, WithDeletedTimeStamp, From a9d4ee91aae4056e5c40a430252df66a9905d596 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Mon, 26 Jun 2023 14:50:10 -0400 Subject: [PATCH 17/19] Fixed unit tests --- control-plane/pkg/reconciler/channel/channel_test.go | 2 ++ control-plane/pkg/reconciler/channel/v2/channelv2_test.go | 2 ++ 2 files changed, 4 insertions(+) diff --git a/control-plane/pkg/reconciler/channel/channel_test.go b/control-plane/pkg/reconciler/channel/channel_test.go index a7409c2d57..559403e4ba 100644 --- a/control-plane/pkg/reconciler/channel/channel_test.go +++ b/control-plane/pkg/reconciler/channel/channel_test.go @@ -1855,6 +1855,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), StatusDataPlaneAvailable, ChannelAddressable(&env), @@ -1956,6 +1957,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), StatusDataPlaneAvailable, ChannelAddressable(&env), diff --git a/control-plane/pkg/reconciler/channel/v2/channelv2_test.go b/control-plane/pkg/reconciler/channel/v2/channelv2_test.go index 7b0bd4c5a9..7676b5871d 100644 --- a/control-plane/pkg/reconciler/channel/v2/channelv2_test.go +++ b/control-plane/pkg/reconciler/channel/v2/channelv2_test.go @@ -1782,6 +1782,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), ChannelAddressable(&env), StatusProbeSucceeded, @@ -1874,6 +1875,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), ChannelAddressable(&env), StatusProbeSucceeded, From d7206cdc8410560b21f7031b27219239737be296 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Wed, 28 Jun 2023 09:28:52 -0400 Subject: [PATCH 18/19] Switched templates to not be pointers --- control-plane/pkg/apis/config/features.go | 34 ++++++------------- .../pkg/apis/config/features_test.go | 9 +++-- 2 files changed, 16 insertions(+), 27 deletions(-) diff --git a/control-plane/pkg/apis/config/features.go b/control-plane/pkg/apis/config/features.go index d216822fbb..67af0cc4fe 100644 --- a/control-plane/pkg/apis/config/features.go +++ b/control-plane/pkg/apis/config/features.go @@ -39,9 +39,9 @@ type features struct { DispatcherRateLimiter feature.Flag DispatcherOrderedExecutorMetrics feature.Flag ControllerAutoscaler feature.Flag - TriggersConsumerGroupTemplate *template.Template - BrokersTopicTemplate *template.Template - ChannelsTopicTemplate *template.Template + TriggersConsumerGroupTemplate template.Template + BrokersTopicTemplate template.Template + ChannelsTopicTemplate template.Template } type KafkaFeatureFlags struct { @@ -63,28 +63,14 @@ func init() { } func DefaultFeaturesConfig() *KafkaFeatureFlags { - // we need to clone the default values when creating a new features config - // otherwise, when calling NewFeaturesConfigFromMap, we will overwrite the default variables - triggersGroupTemplate, err := defaultTriggersConsumerGroupTemplate.Clone() - if err != nil { - panic("failed to clone default triggers group template") - } - brokersTopicTemplate, err := defaultBrokersTopicTemplate.Clone() - if err != nil { - panic("failed to clone default brokers topic template") - } - channelsTopicTemplate, err := defaultChannelsTopicTemplate.Clone() - if err != nil { - panic("failed to clone default channels topic template") - } return &KafkaFeatureFlags{ features: features{ DispatcherRateLimiter: feature.Disabled, DispatcherOrderedExecutorMetrics: feature.Disabled, ControllerAutoscaler: feature.Disabled, - TriggersConsumerGroupTemplate: triggersGroupTemplate, - BrokersTopicTemplate: brokersTopicTemplate, - ChannelsTopicTemplate: channelsTopicTemplate, + TriggersConsumerGroupTemplate: *defaultTriggersConsumerGroupTemplate, + BrokersTopicTemplate: *defaultBrokersTopicTemplate, + ChannelsTopicTemplate: *defaultChannelsTopicTemplate, }, } } @@ -96,9 +82,9 @@ func NewFeaturesConfigFromMap(cm *corev1.ConfigMap) (*KafkaFeatureFlags, error) asFlag("dispatcher.rate-limiter", &nc.features.DispatcherRateLimiter), asFlag("dispatcher.ordered-executor-metrics", &nc.features.DispatcherOrderedExecutorMetrics), asFlag("controller.autoscaler", &nc.features.ControllerAutoscaler), - asTemplate("triggers.consumergroup.template", nc.features.TriggersConsumerGroupTemplate), - asTemplate("brokers.topic.template", nc.features.BrokersTopicTemplate), - asTemplate("channels.topic.template", nc.features.ChannelsTopicTemplate), + asTemplate("triggers.consumergroup.template", &nc.features.TriggersConsumerGroupTemplate), + asTemplate("brokers.topic.template", &nc.features.BrokersTopicTemplate), + asTemplate("channels.topic.template", &nc.features.ChannelsTopicTemplate), ) return nc, err } @@ -216,7 +202,7 @@ func asTemplate(key string, target *template.Template) configmap.ParseFunc { } } -func executeTemplateToString(template *template.Template, metadata v1.ObjectMeta, errorMessage string) (string, error) { +func executeTemplateToString(template template.Template, metadata v1.ObjectMeta, errorMessage string) (string, error) { var result bytes.Buffer err := template.Execute(&result, metadata) if err != nil { diff --git a/control-plane/pkg/apis/config/features_test.go b/control-plane/pkg/apis/config/features_test.go index a13aede852..5f1a11e8d3 100644 --- a/control-plane/pkg/apis/config/features_test.go +++ b/control-plane/pkg/apis/config/features_test.go @@ -111,7 +111,8 @@ func TestExecuteTriggersConsumerGroupTemplateDefault(t *testing.T) { func TestExecuteTriggersConsumerGroupTemplateOverride(t *testing.T) { nc := DefaultFeaturesConfig() - nc.features.TriggersConsumerGroupTemplate, _ = template.New("custom-template").Parse("knative-trigger-{{ .Namespace }}-{{ .Name }}-{{ .UID }}") + customTemplate, _ := template.New("custom-template").Parse("knative-trigger-{{ .Namespace }}-{{ .Name }}-{{ .UID }}") + nc.features.TriggersConsumerGroupTemplate = *customTemplate result, err := nc.ExecuteTriggersConsumerGroupTemplate(v1.ObjectMeta{ Name: "trigger", @@ -141,7 +142,8 @@ func TestExecuteBrokersTopicTemplateDefault(t *testing.T) { func TestExecuteBrokersTopicTemplateOverride(t *testing.T) { nc := DefaultFeaturesConfig() - nc.features.BrokersTopicTemplate, _ = template.New("custom-template").Parse("knative-broker-{{ .Namespace }}-{{ .Name }}-{{ .UID }}") + customTemplate, _ := template.New("custom-template").Parse("knative-broker-{{ .Namespace }}-{{ .Name }}-{{ .UID }}") + nc.features.BrokersTopicTemplate = *customTemplate result, err := nc.ExecuteBrokersTopicTemplate(v1.ObjectMeta{ Name: "topic", @@ -171,7 +173,8 @@ func TestExecuteChannelsTopicTemplateDefault(t *testing.T) { func TestExecuteChannelsTopicTemplateOverride(t *testing.T) { nc := DefaultFeaturesConfig() - nc.features.ChannelsTopicTemplate, _ = template.New("custom-template").Parse("knative-channel-{{ .Namespace }}-{{ .Name }}-{{ .UID }}") + customTemplate, _ := template.New("custom-template").Parse("knative-channel-{{ .Namespace }}-{{ .Name }}-{{ .UID }}") + nc.features.ChannelsTopicTemplate = *customTemplate result, err := nc.ExecuteChannelsTopicTemplate(v1.ObjectMeta{ Name: "topic", From 7a5c4a8c4b88177bd3c0e43944439a234eeaa91f Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Wed, 28 Jun 2023 14:22:16 -0400 Subject: [PATCH 19/19] Updated broker and channel controllers to watch kafka feature config maps Signed-off-by: Calum Murray --- control-plane/cmd/kafka-controller/main.go | 2 +- control-plane/pkg/reconciler/broker/controller.go | 6 ++++++ .../pkg/reconciler/broker/controller_test.go | 5 +++++ control-plane/pkg/reconciler/channel/controller.go | 9 ++++++++- .../pkg/reconciler/channel/controller_test.go | 7 +++++++ .../pkg/reconciler/channel/v2/controllerv2.go | 12 +++++++++++- .../pkg/reconciler/channel/v2/controllerv2_test.go | 7 +++++++ 7 files changed, 45 insertions(+), 3 deletions(-) diff --git a/control-plane/cmd/kafka-controller/main.go b/control-plane/cmd/kafka-controller/main.go index 40ab5f03be..f5e15c1764 100644 --- a/control-plane/cmd/kafka-controller/main.go +++ b/control-plane/cmd/kafka-controller/main.go @@ -96,7 +96,7 @@ func main() { injection.NamedControllerConstructor{ Name: "channel-controller", ControllerConstructor: func(ctx context.Context, watcher configmap.Watcher) *controller.Impl { - return channel.NewController(ctx, channelEnv) + return channel.NewController(ctx, watcher, channelEnv) }, }, diff --git a/control-plane/pkg/reconciler/broker/controller.go b/control-plane/pkg/reconciler/broker/controller.go index 3503770d9c..46d5348894 100644 --- a/control-plane/pkg/reconciler/broker/controller.go +++ b/control-plane/pkg/reconciler/broker/controller.go @@ -98,6 +98,12 @@ func NewController(ctx context.Context, watcher configmap.Watcher, env *config.E brokerInformer := brokerinformer.Get(ctx) + kafkaConfigStore := apisconfig.NewStore(ctx, func(name string, value *apisconfig.KafkaFeatureFlags) { + reconciler.KafkaFeatureFlags.Reset(value) + impl.GlobalResync(brokerInformer.Informer()) + }) + kafkaConfigStore.WatchConfigs(watcher) + brokerInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: kafka.BrokerClassFilter(), Handler: controller.HandleAll(impl.Enqueue), diff --git a/control-plane/pkg/reconciler/broker/controller_test.go b/control-plane/pkg/reconciler/broker/controller_test.go index 29e1019b0a..555c32e56c 100644 --- a/control-plane/pkg/reconciler/broker/controller_test.go +++ b/control-plane/pkg/reconciler/broker/controller_test.go @@ -24,6 +24,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" fakekubeclientset "k8s.io/client-go/kubernetes/fake" + apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" _ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker/fake" _ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger/fake" _ "knative.dev/pkg/client/injection/ducks/duck/v1/addressable/fake" @@ -84,6 +85,10 @@ func TestNewController(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "cm", }, + }, &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: apisconfig.FlagsConfigName, + }, }), env, ) diff --git a/control-plane/pkg/reconciler/channel/controller.go b/control-plane/pkg/reconciler/channel/controller.go index 049aab9b26..d03944e145 100644 --- a/control-plane/pkg/reconciler/channel/controller.go +++ b/control-plane/pkg/reconciler/channel/controller.go @@ -37,6 +37,7 @@ import ( podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod" secretinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/secret" serviceinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/service" + "knative.dev/pkg/configmap" "knative.dev/pkg/controller" "knative.dev/pkg/logging" @@ -49,7 +50,7 @@ import ( "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base" ) -func NewController(ctx context.Context, configs *config.Env) *controller.Impl { +func NewController(ctx context.Context, watcher configmap.Watcher, configs *config.Env) *controller.Impl { messagingv1beta.RegisterAlternateKafkaChannelConditionSet(base.IngressConditionSet) @@ -95,6 +96,12 @@ func NewController(ctx context.Context, configs *config.Env) *controller.Impl { channelInformer := kafkachannelinformer.Get(ctx) + kafkaConfigStore := apisconfig.NewStore(ctx, func(name string, value *apisconfig.KafkaFeatureFlags) { + reconciler.KafkaFeatureFlags.Reset(value) + impl.GlobalResync(channelInformer.Informer()) + }) + kafkaConfigStore.WatchConfigs(watcher) + channelInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue)) reconciler.Resolver = resolver.NewURIResolverFromTracker(ctx, impl.Tracker) diff --git a/control-plane/pkg/reconciler/channel/controller_test.go b/control-plane/pkg/reconciler/channel/controller_test.go index 3536a31f28..297c9b352e 100644 --- a/control-plane/pkg/reconciler/channel/controller_test.go +++ b/control-plane/pkg/reconciler/channel/controller_test.go @@ -24,6 +24,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" fakekubeclientset "k8s.io/client-go/kubernetes/fake" + apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" _ "knative.dev/eventing-kafka-broker/control-plane/pkg/client/injection/informers/messaging/v1beta1/kafkachannel/fake" _ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription/fake" "knative.dev/eventing/pkg/eventingtls/eventingtlstesting" @@ -34,6 +35,7 @@ import ( secretinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/secret" _ "knative.dev/pkg/client/injection/kube/informers/core/v1/secret/fake" _ "knative.dev/pkg/client/injection/kube/informers/core/v1/service/fake" + "knative.dev/pkg/configmap" dynamicclient "knative.dev/pkg/injection/clients/dynamicclient/fake" reconcilertesting "knative.dev/pkg/reconciler/testing" @@ -81,6 +83,11 @@ func TestNewController(t *testing.T) { controller := NewController( ctx, + configmap.NewStaticWatcher(&corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: apisconfig.FlagsConfigName, + }, + }), configs, ) if controller == nil { diff --git a/control-plane/pkg/reconciler/channel/v2/controllerv2.go b/control-plane/pkg/reconciler/channel/v2/controllerv2.go index 9d3b9f86a3..ee8c2c9cc3 100644 --- a/control-plane/pkg/reconciler/channel/v2/controllerv2.go +++ b/control-plane/pkg/reconciler/channel/v2/controllerv2.go @@ -25,6 +25,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/client-go/tools/cache" serviceinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/service" + "knative.dev/pkg/configmap" "knative.dev/pkg/logging" "knative.dev/pkg/resolver" @@ -45,12 +46,13 @@ import ( "knative.dev/pkg/controller" + apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/consumergroup" ) -func NewController(ctx context.Context, configs *config.Env) *controller.Impl { +func NewController(ctx context.Context, watcher configmap.Watcher, configs *config.Env) *controller.Impl { configmapInformer := configmapinformer.Get(ctx) channelInformer := kafkachannelinformer.Get(ctx) @@ -75,6 +77,7 @@ func NewController(ctx context.Context, configs *config.Env) *controller.Impl { SubscriptionLister: subscriptioninformer.Get(ctx).Lister(), ConsumerGroupLister: consumerGroupInformer.Lister(), InternalsClient: consumergroupclient.Get(ctx), + KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(), } logger := logging.FromContext(ctx) @@ -88,6 +91,13 @@ func NewController(ctx context.Context, configs *config.Env) *controller.Impl { } impl := kafkachannelreconciler.NewImpl(ctx, reconciler) + + kafkaConfigStore := apisconfig.NewStore(ctx, func(name string, value *apisconfig.KafkaFeatureFlags) { + reconciler.KafkaFeatureFlags.Reset(value) + impl.GlobalResync(channelInformer.Informer()) + }) + kafkaConfigStore.WatchConfigs(watcher) + IPsLister := prober.IdentityIPsLister() reconciler.Prober = prober.NewAsync(ctx, http.DefaultClient, "", IPsLister, impl.EnqueueKey) reconciler.IngressHost = network.GetServiceHostname(configs.IngressName, configs.SystemNamespace) diff --git a/control-plane/pkg/reconciler/channel/v2/controllerv2_test.go b/control-plane/pkg/reconciler/channel/v2/controllerv2_test.go index c0e209c1d1..c5f00f8426 100644 --- a/control-plane/pkg/reconciler/channel/v2/controllerv2_test.go +++ b/control-plane/pkg/reconciler/channel/v2/controllerv2_test.go @@ -25,6 +25,7 @@ import ( "k8s.io/apimachinery/pkg/types" fakekubeclientset "k8s.io/client-go/kubernetes/fake" + apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" _ "knative.dev/eventing-kafka-broker/control-plane/pkg/client/injection/informers/messaging/v1beta1/kafkachannel/fake" _ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription/fake" "knative.dev/eventing/pkg/eventingtls/eventingtlstesting" @@ -35,6 +36,7 @@ import ( secretinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/secret" _ "knative.dev/pkg/client/injection/kube/informers/core/v1/secret/fake" _ "knative.dev/pkg/client/injection/kube/informers/core/v1/service/fake" + "knative.dev/pkg/configmap" dynamicclient "knative.dev/pkg/injection/clients/dynamicclient/fake" reconcilertesting "knative.dev/pkg/reconciler/testing" @@ -84,6 +86,11 @@ func TestNewController(t *testing.T) { controller := NewController( ctx, + configmap.NewStaticWatcher(&corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: apisconfig.FlagsConfigName, + }, + }), configs, ) if controller == nil {