diff --git a/control-plane/cmd/kafka-controller/main.go b/control-plane/cmd/kafka-controller/main.go index 40ab5f03be..f5e15c1764 100644 --- a/control-plane/cmd/kafka-controller/main.go +++ b/control-plane/cmd/kafka-controller/main.go @@ -96,7 +96,7 @@ func main() { injection.NamedControllerConstructor{ Name: "channel-controller", ControllerConstructor: func(ctx context.Context, watcher configmap.Watcher) *controller.Impl { - return channel.NewController(ctx, channelEnv) + return channel.NewController(ctx, watcher, channelEnv) }, }, diff --git a/control-plane/config/eventing-kafka-broker/200-controller/100-config-kafka-features.yaml b/control-plane/config/eventing-kafka-broker/200-controller/100-config-kafka-features.yaml index df8780a26d..c1b6babd13 100644 --- a/control-plane/config/eventing-kafka-broker/200-controller/100-config-kafka-features.yaml +++ b/control-plane/config/eventing-kafka-broker/200-controller/100-config-kafka-features.yaml @@ -47,4 +47,4 @@ data: controller.autoscaler: "disabled" triggers.consumergroup.template: "knative-trigger-{{ .Namespace }}-{{ .Name }}" brokers.topic.template: "knative-broker-{{ .Namespace }}-{{ .Name }}" - channels.topic.template: "knative-channel-{{ .Namespace }}-{{ .Name }}" + channels.topic.template: "knative-messaging-kafka.{{ .Namespace }}.{{ .Name }}" diff --git a/control-plane/pkg/apis/config/features.go b/control-plane/pkg/apis/config/features.go index 34032d6fc3..67af0cc4fe 100644 --- a/control-plane/pkg/apis/config/features.go +++ b/control-plane/pkg/apis/config/features.go @@ -39,9 +39,9 @@ type features struct { DispatcherRateLimiter feature.Flag DispatcherOrderedExecutorMetrics feature.Flag ControllerAutoscaler feature.Flag - TriggersConsumerGroupTemplate *template.Template - BrokersTopicTemplate *template.Template - ChannelsTopicTemplate *template.Template + TriggersConsumerGroupTemplate template.Template + BrokersTopicTemplate template.Template + ChannelsTopicTemplate template.Template } type KafkaFeatureFlags struct { @@ -50,15 +50,16 @@ type KafkaFeatureFlags struct { } var ( - DefaultTriggersConsumerGroupTemplate *template.Template - DefaultBrokersTopicTemplate *template.Template - DefaultChannelsTopicTemplate *template.Template + defaultTriggersConsumerGroupTemplate *template.Template + defaultBrokersTopicTemplate *template.Template + defaultChannelsTopicTemplate *template.Template ) func init() { - DefaultTriggersConsumerGroupTemplate, _ = template.New("triggers.consumergroup.template").Parse("knative-trigger-{{ .Namespace }}-{{ .Name }}") - DefaultBrokersTopicTemplate, _ = template.New("brokers.topic.template").Parse("knative-broker-{{ .Namespace }}-{{ .Name }}") - DefaultChannelsTopicTemplate, _ = template.New("channels.topic.template").Parse("knative-channel-{{ .Namespace }}-{{ .Name }}") + defaultTriggersConsumerGroupTemplate, _ = template.New("triggers.consumergroup.template").Parse("knative-trigger-{{ .Namespace }}-{{ .Name }}") + defaultBrokersTopicTemplate, _ = template.New("brokers.topic.template").Parse("knative-broker-{{ .Namespace }}-{{ .Name }}") + // This will resolve to the old naming convention, to prevent errors switching over to the new topic templates approach + defaultChannelsTopicTemplate, _ = template.New("channels.topic.template").Parse("knative-messaging-kafka.{{ .Namespace }}.{{ .Name }}") } func DefaultFeaturesConfig() *KafkaFeatureFlags { @@ -67,23 +68,23 @@ func DefaultFeaturesConfig() *KafkaFeatureFlags { DispatcherRateLimiter: feature.Disabled, DispatcherOrderedExecutorMetrics: feature.Disabled, ControllerAutoscaler: feature.Disabled, - TriggersConsumerGroupTemplate: DefaultTriggersConsumerGroupTemplate, - BrokersTopicTemplate: DefaultBrokersTopicTemplate, - ChannelsTopicTemplate: DefaultChannelsTopicTemplate, + TriggersConsumerGroupTemplate: *defaultTriggersConsumerGroupTemplate, + BrokersTopicTemplate: *defaultBrokersTopicTemplate, + ChannelsTopicTemplate: *defaultChannelsTopicTemplate, }, } } -// newFeaturesConfigFromMap creates a Features from the supplied Map -func newFeaturesConfigFromMap(cm *corev1.ConfigMap) (*KafkaFeatureFlags, error) { +// NewFeaturesConfigFromMap creates a Features from the supplied Map +func NewFeaturesConfigFromMap(cm *corev1.ConfigMap) (*KafkaFeatureFlags, error) { nc := DefaultFeaturesConfig() err := configmap.Parse(cm.Data, asFlag("dispatcher.rate-limiter", &nc.features.DispatcherRateLimiter), asFlag("dispatcher.ordered-executor-metrics", &nc.features.DispatcherOrderedExecutorMetrics), asFlag("controller.autoscaler", &nc.features.ControllerAutoscaler), - asTemplate("triggers.consumergroup.template", nc.features.TriggersConsumerGroupTemplate), - asTemplate("brokers.topic.template", nc.features.BrokersTopicTemplate), - asTemplate("channels.topic.template", nc.features.ChannelsTopicTemplate), + asTemplate("triggers.consumergroup.template", &nc.features.TriggersConsumerGroupTemplate), + asTemplate("brokers.topic.template", &nc.features.BrokersTopicTemplate), + asTemplate("channels.topic.template", &nc.features.ChannelsTopicTemplate), ) return nc, err } @@ -131,7 +132,7 @@ func NewStore(ctx context.Context, onAfterStore ...func(name string, value *Kafk "config-kafka-features", logging.FromContext(ctx).Named("config-kafka-features"), configmap.Constructors{ - FlagsConfigName: newFeaturesConfigFromMap, + FlagsConfigName: NewFeaturesConfigFromMap, }, func(name string, value interface{}) { for _, f := range onAfterStore { @@ -201,7 +202,7 @@ func asTemplate(key string, target *template.Template) configmap.ParseFunc { } } -func executeTemplateToString(template *template.Template, metadata v1.ObjectMeta, errorMessage string) (string, error) { +func executeTemplateToString(template template.Template, metadata v1.ObjectMeta, errorMessage string) (string, error) { var result bytes.Buffer err := template.Execute(&result, metadata) if err != nil { diff --git a/control-plane/pkg/apis/config/features_test.go b/control-plane/pkg/apis/config/features_test.go index dc08600134..5f1a11e8d3 100644 --- a/control-plane/pkg/apis/config/features_test.go +++ b/control-plane/pkg/apis/config/features_test.go @@ -51,7 +51,7 @@ func TestFlags_IsEnabled_ContainingFlag(t *testing.T) { func TestGetFlags(t *testing.T) { _, example := cm.ConfigMapsFromTestFile(t, FlagsConfigName) - flags, err := newFeaturesConfigFromMap(example) + flags, err := NewFeaturesConfigFromMap(example) require.NoError(t, err) require.True(t, flags.IsDispatcherRateLimiterEnabled()) @@ -74,7 +74,7 @@ func TestStoreLoadWithConfigMap(t *testing.T) { store.OnConfigChanged(exampleConfig) have := FromContext(store.ToContext(context.Background())) - expected, _ := newFeaturesConfigFromMap(exampleConfig) + expected, _ := NewFeaturesConfigFromMap(exampleConfig) require.Equal(t, expected.IsDispatcherRateLimiterEnabled(), have.IsDispatcherRateLimiterEnabled()) require.Equal(t, expected.IsDispatcherOrderedExecutorMetricsEnabled(), have.IsDispatcherOrderedExecutorMetricsEnabled()) @@ -111,7 +111,8 @@ func TestExecuteTriggersConsumerGroupTemplateDefault(t *testing.T) { func TestExecuteTriggersConsumerGroupTemplateOverride(t *testing.T) { nc := DefaultFeaturesConfig() - nc.features.TriggersConsumerGroupTemplate, _ = template.New("custom-template").Parse("knative-trigger-{{ .Namespace }}-{{ .Name }}-{{ .UID }}") + customTemplate, _ := template.New("custom-template").Parse("knative-trigger-{{ .Namespace }}-{{ .Name }}-{{ .UID }}") + nc.features.TriggersConsumerGroupTemplate = *customTemplate result, err := nc.ExecuteTriggersConsumerGroupTemplate(v1.ObjectMeta{ Name: "trigger", @@ -141,7 +142,8 @@ func TestExecuteBrokersTopicTemplateDefault(t *testing.T) { func TestExecuteBrokersTopicTemplateOverride(t *testing.T) { nc := DefaultFeaturesConfig() - nc.features.BrokersTopicTemplate, _ = template.New("custom-template").Parse("knative-broker-{{ .Namespace }}-{{ .Name }}-{{ .UID }}") + customTemplate, _ := template.New("custom-template").Parse("knative-broker-{{ .Namespace }}-{{ .Name }}-{{ .UID }}") + nc.features.BrokersTopicTemplate = *customTemplate result, err := nc.ExecuteBrokersTopicTemplate(v1.ObjectMeta{ Name: "topic", @@ -166,12 +168,13 @@ func TestExecuteChannelsTopicTemplateDefault(t *testing.T) { require.NoError(t, err) } - require.Equal(t, result, "knative-channel-namespace-topic") + require.Equal(t, result, "knative-messaging-kafka.namespace.topic") } func TestExecuteChannelsTopicTemplateOverride(t *testing.T) { nc := DefaultFeaturesConfig() - nc.features.ChannelsTopicTemplate, _ = template.New("custom-template").Parse("knative-channel-{{ .Namespace }}-{{ .Name }}-{{ .UID }}") + customTemplate, _ := template.New("custom-template").Parse("knative-channel-{{ .Namespace }}-{{ .Name }}-{{ .UID }}") + nc.features.ChannelsTopicTemplate = *customTemplate result, err := nc.ExecuteChannelsTopicTemplate(v1.ObjectMeta{ Name: "topic", diff --git a/control-plane/pkg/apis/config/testdata/config-kafka-features.yaml b/control-plane/pkg/apis/config/testdata/config-kafka-features.yaml index 5f99736b6d..7546aee0c1 100644 --- a/control-plane/pkg/apis/config/testdata/config-kafka-features.yaml +++ b/control-plane/pkg/apis/config/testdata/config-kafka-features.yaml @@ -10,4 +10,4 @@ data: controller.autoscaler: "enabled" triggers.consumergroup.template: "knative-trigger-{{ .Namespace }}-{{ .Name }}" brokers.topic.template: "knative-broker-{{ .Namespace }}-{{ .Name }}" - channels.topic.template: "knative-channel-{{ .Namespace }}-{{ .Name }}" + channels.topic.template: "knative-messaging-kafka.{{ .Namespace }}.{{ .Name }}" diff --git a/control-plane/pkg/reconciler/broker/broker.go b/control-plane/pkg/reconciler/broker/broker.go index 864eb7938f..aeed0185df 100644 --- a/control-plane/pkg/reconciler/broker/broker.go +++ b/control-plane/pkg/reconciler/broker/broker.go @@ -37,6 +37,7 @@ import ( "knative.dev/pkg/reconciler" "knative.dev/pkg/resolver" + apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/contract" coreconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/core/config" @@ -50,9 +51,6 @@ import ( ) const ( - // TopicPrefix is the Kafka Broker topic prefix - (topic name: knative-broker--). - TopicPrefix = "knative-broker-" - // ExternalTopicAnnotation for using external kafka topic for the broker ExternalTopicAnnotation = "kafka.eventing.knative.dev/external.topic" @@ -80,8 +78,9 @@ type Reconciler struct { BootstrapServers string - Prober prober.Prober - Counter *counter.Counter + Prober prober.Prober + Counter *counter.Counter + KafkaFeatureFlags *apisconfig.KafkaFeatureFlags } func (r *Reconciler) ReconcileKind(ctx context.Context, broker *eventing.Broker) reconciler.Event { @@ -311,7 +310,15 @@ func (r *Reconciler) reconcileBrokerTopic(broker *eventing.Broker, securityOptio } } else { // no external topic, we create it - topicName = kafka.BrokerTopic(TopicPrefix, broker) + var existingTopic bool + // check if the broker already has reconciled with a topic name and use that if it exists + // otherwise, we create a new topic name from the broker topic template + if topicName, existingTopic = broker.Status.Annotations[kafka.TopicAnnotation]; !existingTopic { + topicName, err = r.KafkaFeatureFlags.ExecuteBrokersTopicTemplate(broker.ObjectMeta) + if err != nil { + return "", statusConditionManager.TopicsNotPresentOrInvalidErr([]string{topicName}, err) + } + } topic, err := kafka.CreateTopicIfDoesntExist(kafkaClusterAdminClient, logger, topicName, topicConfig) if err != nil { @@ -503,7 +510,11 @@ func (r *Reconciler) finalizeNonExternalBrokerTopic(broker *eventing.Broker, sec } defer kafkaClusterAdminClient.Close() - topic, err := kafka.DeleteTopic(kafkaClusterAdminClient, kafka.BrokerTopic(TopicPrefix, broker)) + topicName, ok := broker.Status.Annotations[kafka.TopicAnnotation] + if !ok { + return fmt.Errorf("no topic annotated on broker") + } + topic, err := kafka.DeleteTopic(kafkaClusterAdminClient, topicName) if err != nil { return err } diff --git a/control-plane/pkg/reconciler/broker/broker_test.go b/control-plane/pkg/reconciler/broker/broker_test.go index a9cf8939d2..3466517972 100644 --- a/control-plane/pkg/reconciler/broker/broker_test.go +++ b/control-plane/pkg/reconciler/broker/broker_test.go @@ -21,6 +21,7 @@ import ( "fmt" "net/url" "testing" + "text/template" "knative.dev/eventing-kafka-broker/control-plane/pkg/counter" @@ -37,11 +38,13 @@ import ( "knative.dev/eventing-kafka-broker/control-plane/pkg/contract" "github.com/Shopify/sarama" + "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" clientgotesting "k8s.io/client-go/testing" + apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" eventing "knative.dev/eventing/pkg/apis/eventing/v1" "knative.dev/eventing/pkg/apis/feature" "knative.dev/eventing/pkg/eventingtls/eventingtlstesting" @@ -72,6 +75,8 @@ const ( ExpectedTopicDetail = "expectedTopicDetail" testProber = "testProber" externalTopic = "externalTopic" + + kafkaFeatureFlags = "kafka-feature-flags" ) const ( @@ -98,8 +103,9 @@ var ( createTopicError = fmt.Errorf("failed to create topic") deleteTopicError = fmt.Errorf("failed to delete topic") - linear = eventingduck.BackoffPolicyLinear - exponential = eventingduck.BackoffPolicyExponential + linear = eventingduck.BackoffPolicyLinear + exponential = eventingduck.BackoffPolicyExponential + customBrokerTopicTemplate = customTemplate() ) var DefaultEnv = &config.Env{ @@ -2135,6 +2141,85 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { ), }, }, + }, { + Name: "Reconciled normal - with custom topic template", + Objects: []runtime.Object{ + NewBroker(), + BrokerConfig(bootstrapServers, 20, 5), + NewConfigMapWithBinaryData(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, nil), + NewService(), + BrokerReceiverPod(env.SystemNamespace, map[string]string{ + base.VolumeGenerationAnnotationKey: "0", + "annotation_to_preserve": "value_to_preserve", + }), + BrokerDispatcherPod(env.SystemNamespace, map[string]string{ + base.VolumeGenerationAnnotationKey: "0", + "annotation_to_preserve": "value_to_preserve", + }), + }, + Key: testKey, + WantEvents: []string{ + finalizerUpdatedEvent, + }, + WantUpdates: []clientgotesting.UpdateActionImpl{ + ConfigMapUpdate(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, env.ContractConfigMapFormat, &contract.Contract{ + Resources: []*contract.Resource{ + { + Uid: BrokerUUID, + Topics: []string{CustomBrokerTopic(customBrokerTopicTemplate)}, + Ingress: &contract.Ingress{Path: receiver.Path(BrokerNamespace, BrokerName)}, + BootstrapServers: bootstrapServers, + Reference: BrokerReference(), + }, + }, + Generation: 1, + }), + BrokerReceiverPodUpdate(env.SystemNamespace, map[string]string{ + base.VolumeGenerationAnnotationKey: "1", + "annotation_to_preserve": "value_to_preserve", + }), + BrokerDispatcherPodUpdate(env.SystemNamespace, map[string]string{ + base.VolumeGenerationAnnotationKey: "1", + "annotation_to_preserve": "value_to_preserve", + }), + }, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{ + { + Object: NewBroker( + reconcilertesting.WithInitBrokerConditions, + StatusBrokerConfigMapUpdatedReady(&env), + StatusBrokerDataPlaneAvailable, + StatusBrokerConfigParsed, + StatusExternalBrokerTopicReady(CustomBrokerTopic(customBrokerTopicTemplate)), + BrokerAddressable(&env), + StatusBrokerProbeSucceeded, + BrokerConfigMapAnnotations(), + WithTopicStatusAnnotation(CustomBrokerTopic(customBrokerTopicTemplate)), + WithBrokerAddresses([]duckv1.Addressable{ + { + Name: pointer.String("http"), + URL: brokerAddress, + }, + }), + WithBrokerAddress(duckv1.Addressable{ + Name: pointer.String("http"), + URL: brokerAddress, + }), + WithBrokerAddessable(), + ), + }, + }, + + OtherTestData: map[string]interface{}{ + kafkaFeatureFlags: newKafkaFeaturesConfigFromMap(&corev1.ConfigMap{ + Data: map[string]string{ + "brokers.topic.template": "custom-broker-template.{{ .Namespace }}-{{ .Name }}", + }, + }), + }, }, } @@ -2187,7 +2272,7 @@ func brokerFinalization(t *testing.T, format string, env config.Env) { { Name: "Reconciled normal - no DLS", Objects: []runtime.Object{ - NewDeletedBroker(), + NewDeletedBroker(WithTopicStatusAnnotation(BrokerTopic())), BrokerConfig(bootstrapServers, 20, 5), NewConfigMapFromContract(&contract.Contract{ Resources: []*contract.Resource{ @@ -2215,6 +2300,7 @@ func brokerFinalization(t *testing.T, format string, env config.Env) { Name: "Reconciled normal - no ConfigMap, rebuild from annotations", Objects: []runtime.Object{ NewDeletedBroker( + WithTopicStatusAnnotation(BrokerTopic()), BrokerConfigMapAnnotations(), ), NewConfigMapFromContract(&contract.Contract{ @@ -2268,7 +2354,7 @@ func brokerFinalization(t *testing.T, format string, env config.Env) { { Name: "Reconciled failed - probe not ready", Objects: []runtime.Object{ - NewDeletedBroker(), + NewDeletedBroker(WithTopicStatusAnnotation(BrokerTopic())), BrokerConfig(bootstrapServers, 20, 5), NewConfigMapFromContract(&contract.Contract{ Resources: []*contract.Resource{ @@ -2297,6 +2383,7 @@ func brokerFinalization(t *testing.T, format string, env config.Env) { Name: "Reconciled normal - with DLS", Objects: []runtime.Object{ NewDeletedBroker( + WithTopicStatusAnnotation(BrokerTopic()), WithDelivery(), ), BrokerConfig(bootstrapServers, 20, 5), @@ -2468,7 +2555,7 @@ func brokerFinalization(t *testing.T, format string, env config.Env) { { Name: "Failed to delete topic", Objects: []runtime.Object{ - NewDeletedBroker(), + NewDeletedBroker(WithTopicStatusAnnotation(BrokerTopic())), BrokerConfig(bootstrapServers, 20, 5), NewConfigMapFromContract(&contract.Contract{ Resources: []*contract.Resource{ @@ -2505,6 +2592,7 @@ func brokerFinalization(t *testing.T, format string, env config.Env) { Name: "Config map not found - create config map", Objects: []runtime.Object{ NewDeletedBroker( + WithTopicStatusAnnotation(BrokerTopic()), WithDelivery(), ), BrokerConfig(bootstrapServers, 20, 5), @@ -2523,7 +2611,7 @@ func brokerFinalization(t *testing.T, format string, env config.Env) { { Name: "Reconciled normal - preserve config map previous state", Objects: []runtime.Object{ - NewDeletedBroker(), + NewDeletedBroker(WithTopicStatusAnnotation(BrokerTopic())), BrokerConfig(bootstrapServers, 20, 5), NewConfigMapFromContract(&contract.Contract{ Resources: []*contract.Resource{ @@ -2561,7 +2649,7 @@ func brokerFinalization(t *testing.T, format string, env config.Env) { { Name: "Reconciled normal - topic doesn't exist", Objects: []runtime.Object{ - NewDeletedBroker(), + NewDeletedBroker(WithTopicStatusAnnotation(BrokerTopic())), BrokerConfig(bootstrapServers, 20, 5), NewConfigMapFromContract(&contract.Contract{ Resources: []*contract.Resource{ @@ -2600,7 +2688,7 @@ func brokerFinalization(t *testing.T, format string, env config.Env) { { Name: "Reconciled normal - no broker found in config map", Objects: []runtime.Object{ - NewDeletedBroker(), + NewDeletedBroker(WithTopicStatusAnnotation(BrokerTopic())), BrokerConfig(bootstrapServers, 20, 5), NewConfigMapFromContract(&contract.Contract{ Resources: []*contract.Resource{ @@ -2774,6 +2862,13 @@ func useTable(t *testing.T, table TableTest, env *config.Env) { ReplicationFactor: DefaultReplicationFactor, } + var featureFlags *apisconfig.KafkaFeatureFlags + if v, ok := row.OtherTestData[kafkaFeatureFlags]; ok { + featureFlags = v.(*apisconfig.KafkaFeatureFlags) + } else { + featureFlags = apisconfig.DefaultFeaturesConfig() + } + var onCreateTopicError error if want, ok := row.OtherTestData[wantErrorOnCreateTopic]; ok { onCreateTopicError = want.(error) @@ -2789,7 +2884,8 @@ func useTable(t *testing.T, table TableTest, env *config.Env) { expectedTopicDetail = td.(sarama.TopicDetail) } - expectedTopicName := fmt.Sprintf("%s%s-%s", TopicPrefix, BrokerNamespace, BrokerName) + expectedTopicName, err := featureFlags.ExecuteBrokersTopicTemplate(metav1.ObjectMeta{Namespace: BrokerNamespace, Name: BrokerName, UID: BrokerUUID}) + require.NoError(t, err, "Failed to create broker topic name from feature flags") if t, ok := row.OtherTestData[externalTopic]; ok { expectedTopicName = t.(string) } @@ -2830,9 +2926,10 @@ func useTable(t *testing.T, table TableTest, env *config.Env) { T: t, }, nil }, - Env: env, - Prober: proberMock, - Counter: counter.NewExpiringCounter(ctx), + Env: env, + Prober: proberMock, + Counter: counter.NewExpiringCounter(ctx), + KafkaFeatureFlags: featureFlags, } reconciler.Tracker = &FakeTracker{} @@ -2883,3 +2980,16 @@ func httpsURL(name string, namespace string) *apis.URL { Path: fmt.Sprintf("/%s/%s", namespace, name), } } + +func customTemplate() *template.Template { + brokersTemplate, _ := template.New("brokers.topic.template").Parse("custom-broker-template.{{ .Namespace }}-{{ .Name }}") + return brokersTemplate +} + +func newKafkaFeaturesConfigFromMap(cm *corev1.ConfigMap) *apisconfig.KafkaFeatureFlags { + featureFlags, err := apisconfig.NewFeaturesConfigFromMap(cm) + if err != nil { + panic("failed to create kafka features from config map") + } + return featureFlags +} diff --git a/control-plane/pkg/reconciler/broker/controller.go b/control-plane/pkg/reconciler/broker/controller.go index aabdfccd10..46d5348894 100644 --- a/control-plane/pkg/reconciler/broker/controller.go +++ b/control-plane/pkg/reconciler/broker/controller.go @@ -39,6 +39,7 @@ import ( podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod" secretinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/secret" + apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/counter" "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka" @@ -56,6 +57,7 @@ func NewController(ctx context.Context, watcher configmap.Watcher, env *config.E eventing.RegisterAlternateBrokerConditionSet(base.IngressConditionSet) configmapInformer := configmapinformer.Get(ctx) + featureFlags := apisconfig.DefaultFeaturesConfig() reconciler := &Reconciler{ Reconciler: &base.Reconciler{ @@ -73,6 +75,7 @@ func NewController(ctx context.Context, watcher configmap.Watcher, env *config.E ConfigMapLister: configmapInformer.Lister(), Env: env, Counter: counter.NewExpiringCounter(ctx), + KafkaFeatureFlags: featureFlags, } logger := logging.FromContext(ctx) @@ -95,6 +98,12 @@ func NewController(ctx context.Context, watcher configmap.Watcher, env *config.E brokerInformer := brokerinformer.Get(ctx) + kafkaConfigStore := apisconfig.NewStore(ctx, func(name string, value *apisconfig.KafkaFeatureFlags) { + reconciler.KafkaFeatureFlags.Reset(value) + impl.GlobalResync(brokerInformer.Informer()) + }) + kafkaConfigStore.WatchConfigs(watcher) + brokerInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: kafka.BrokerClassFilter(), Handler: controller.HandleAll(impl.Enqueue), diff --git a/control-plane/pkg/reconciler/broker/controller_test.go b/control-plane/pkg/reconciler/broker/controller_test.go index 29e1019b0a..555c32e56c 100644 --- a/control-plane/pkg/reconciler/broker/controller_test.go +++ b/control-plane/pkg/reconciler/broker/controller_test.go @@ -24,6 +24,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" fakekubeclientset "k8s.io/client-go/kubernetes/fake" + apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" _ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker/fake" _ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger/fake" _ "knative.dev/pkg/client/injection/ducks/duck/v1/addressable/fake" @@ -84,6 +85,10 @@ func TestNewController(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "cm", }, + }, &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: apisconfig.FlagsConfigName, + }, }), env, ) diff --git a/control-plane/pkg/reconciler/broker/namespaced_broker.go b/control-plane/pkg/reconciler/broker/namespaced_broker.go index 2c3679eaaf..e03b03d689 100644 --- a/control-plane/pkg/reconciler/broker/namespaced_broker.go +++ b/control-plane/pkg/reconciler/broker/namespaced_broker.go @@ -46,6 +46,7 @@ import ( brokerreconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1/broker" eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1" + apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/counter" "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka" @@ -86,6 +87,8 @@ type NamespacedReconciler struct { ManifestivalClient mf.Client DataplaneLifecycleLocksByNamespace util.LockMap[string] + + KafkaFeatureFlags *apisconfig.KafkaFeatureFlags } func (r *NamespacedReconciler) ReconcileKind(ctx context.Context, broker *eventing.Broker) reconciler.Event { @@ -238,6 +241,7 @@ func (r *NamespacedReconciler) createReconcilerForBrokerInstance(broker *eventin BootstrapServers: r.BootstrapServers, Prober: r.Prober, Counter: r.Counter, + KafkaFeatureFlags: r.KafkaFeatureFlags, } } diff --git a/control-plane/pkg/reconciler/broker/namespaced_broker_test.go b/control-plane/pkg/reconciler/broker/namespaced_broker_test.go index e7ae4a044c..2b9c11439d 100644 --- a/control-plane/pkg/reconciler/broker/namespaced_broker_test.go +++ b/control-plane/pkg/reconciler/broker/namespaced_broker_test.go @@ -40,6 +40,7 @@ import ( "github.com/Shopify/sarama" "github.com/manifestival/client-go-client" + "github.com/stretchr/testify/require" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" @@ -59,6 +60,7 @@ import ( brokerreconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1/broker" reconcilertesting "knative.dev/eventing/pkg/reconciler/testing/v1" + apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/receiver" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base" . "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker" @@ -341,7 +343,10 @@ func namespacedBrokerFinalization(t *testing.T, format string, env config.Env) { reconcilertesting.NewNamespace(BrokerNamespace, func(ns *corev1.Namespace) { ns.UID = BrokerNamespaceUUID }), - NewDeletedBroker(reconcilertesting.WithBrokerClass(kafka.NamespacedBrokerClass)), + NewDeletedBroker( + WithTopicStatusAnnotation(BrokerTopic()), + reconcilertesting.WithBrokerClass(kafka.NamespacedBrokerClass), + ), BrokerConfig(bootstrapServers, 20, 5), NewConfigMapFromContract(&contract.Contract{ Resources: []*contract.Resource{ @@ -452,7 +457,8 @@ func useTableNamespaced(t *testing.T, table TableTest, env *config.Env) { expectedTopicDetail = td.(sarama.TopicDetail) } - expectedTopicName := fmt.Sprintf("%s%s-%s", TopicPrefix, BrokerNamespace, BrokerName) + expectedTopicName, err := apisconfig.DefaultFeaturesConfig().ExecuteBrokersTopicTemplate(metav1.ObjectMeta{Namespace: BrokerNamespace, Name: BrokerName}) + require.NoError(t, err, "Failed to create broker topic name from feature flags") if t, ok := row.OtherTestData[externalTopic]; ok { expectedTopicName = t.(string) } @@ -506,6 +512,7 @@ func useTableNamespaced(t *testing.T, table TableTest, env *config.Env) { Prober: proberMock, ManifestivalClient: mfcMockClient, DataplaneLifecycleLocksByNamespace: util.NewExpiringLockMap[string](ctx, time.Minute*30), + KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(), } r := brokerreconciler.NewReconciler( diff --git a/control-plane/pkg/reconciler/broker/namespaced_controller.go b/control-plane/pkg/reconciler/broker/namespaced_controller.go index 22b2ec85b5..73d8b267a3 100644 --- a/control-plane/pkg/reconciler/broker/namespaced_controller.go +++ b/control-plane/pkg/reconciler/broker/namespaced_controller.go @@ -56,6 +56,7 @@ import ( serviceaccountinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount" clusterrolebindinginformer "knative.dev/pkg/client/injection/kube/informers/rbac/v1/clusterrolebinding" + apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base" @@ -105,6 +106,7 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, env Counter: counter.NewExpiringCounter(ctx), ManifestivalClient: mfc, DataplaneLifecycleLocksByNamespace: util.NewExpiringLockMap[string](ctx, time.Minute*30), + KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(), } impl := brokerreconciler.NewImpl(ctx, reconciler, kafka.NamespacedBrokerClass, func(impl *controller.Impl) controller.Options { diff --git a/control-plane/pkg/reconciler/broker/testdata/config-kafka-features.yaml b/control-plane/pkg/reconciler/broker/testdata/config-kafka-features.yaml new file mode 100644 index 0000000000..ca4f7a4367 --- /dev/null +++ b/control-plane/pkg/reconciler/broker/testdata/config-kafka-features.yaml @@ -0,0 +1,13 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: config-kafka-features + namespace: knative-eventing +data: + _example: | + dispatcher.rate-limiter: "enabled" + dispatcher.ordered-executor-metrics: "enabled" + controller.autoscaler: "enabled" + triggers.consumergroup.template: "knative-trigger-{{ .Namespace }}-{{ .Name }}" + brokers.topic.template: "custom-broker-template.{{ .Namespace }}-{{ .Name }}" + channels.topic.template: "knative-channel-{{ .Namespace }}-{{ .Name }}" diff --git a/control-plane/pkg/reconciler/channel/channel.go b/control-plane/pkg/reconciler/channel/channel.go index da1c2bc23e..c7f0a930f6 100644 --- a/control-plane/pkg/reconciler/channel/channel.go +++ b/control-plane/pkg/reconciler/channel/channel.go @@ -50,6 +50,7 @@ import ( messagingv1beta1 "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/messaging/v1beta1" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/channel/resources" + apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/contract" coreconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/core/config" @@ -63,7 +64,6 @@ import ( const ( // TopicPrefix is the Kafka Channel topic prefix - (topic name: knative-messaging-kafka..). - TopicPrefix = "knative-messaging-kafka" DefaultDeliveryOrder = contract.DeliveryOrder_ORDERED NewChannelIngressServiceName = "kafka-channel-ingress" kafkaChannelTLSSecretName = "kafka-channel-ingress-server-tls" //nolint:gosec // This is not a hardcoded credential @@ -96,6 +96,8 @@ type Reconciler struct { Prober prober.Prober IngressHost string + + KafkaFeatureFlags *apisconfig.KafkaFeatureFlags } func (r *Reconciler) ReconcileKind(ctx context.Context, channel *messagingv1beta1.KafkaChannel) reconciler.Event { @@ -165,7 +167,20 @@ func (r *Reconciler) reconcileKind(ctx context.Context, channel *messagingv1beta return fmt.Errorf("failed to track secret: %w", err) } - topicName := kafka.ChannelTopic(TopicPrefix, channel) + if channel.Status.Annotations == nil { + channel.Status.Annotations = make(map[string]string) + } + // Check if there is an existing topic name for this channel. If there is, reconcile the channel with the existing name. + // If not, create a new topic name from the channel topic name template. + var topicName string + var existingTopic bool + if topicName, existingTopic = channel.Status.Annotations[kafka.TopicAnnotation]; !existingTopic { + topicName, err = r.KafkaFeatureFlags.ExecuteChannelsTopicTemplate(channel.ObjectMeta) + if err != nil { + return err + } + } + channel.Status.Annotations[kafka.TopicAnnotation] = topicName kafkaClusterAdminSaramaConfig, err := kafka.GetSaramaConfig(saramaSecurityOption) if err != nil { @@ -477,7 +492,11 @@ func (r *Reconciler) finalizeKind(ctx context.Context, channel *messagingv1beta1 } defer kafkaClusterAdminClient.Close() - topic, err := kafka.DeleteTopic(kafkaClusterAdminClient, kafka.ChannelTopic(TopicPrefix, channel)) + topicName, ok := channel.Status.Annotations[kafka.TopicAnnotation] + if !ok { + return fmt.Errorf("no topic annotated on channel") + } + topic, err := kafka.DeleteTopic(kafkaClusterAdminClient, topicName) if err != nil { return err } @@ -548,9 +567,12 @@ func (r *Reconciler) reconcileInitialOffset(ctx context.Context, channel *messag return nil } - topicName := kafka.ChannelTopic(TopicPrefix, channel) + topicName, err := r.KafkaFeatureFlags.ExecuteChannelsTopicTemplate(channel.ObjectMeta) + if err != nil { + return err + } groupID := consumerGroup(channel, sub) - _, err := r.InitOffsetsFunc(ctx, kafkaClient, kafkaClusterAdmin, []string{topicName}, groupID) + _, err = r.InitOffsetsFunc(ctx, kafkaClient, kafkaClusterAdmin, []string{topicName}, groupID) return err } diff --git a/control-plane/pkg/reconciler/channel/channel_test.go b/control-plane/pkg/reconciler/channel/channel_test.go index 47e1b53c2a..559403e4ba 100644 --- a/control-plane/pkg/reconciler/channel/channel_test.go +++ b/control-plane/pkg/reconciler/channel/channel_test.go @@ -21,6 +21,11 @@ import ( "fmt" "strconv" "testing" + "text/template" + + apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/Shopify/sarama" "k8s.io/utils/pointer" @@ -36,7 +41,6 @@ import ( "knative.dev/eventing-kafka-broker/control-plane/pkg/security" corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" clientgotesting "k8s.io/client-go/testing" @@ -71,6 +75,8 @@ const ( TestExpectedDataNumPartitions = "TestExpectedDataNumPartitions" TestExpectedReplicationFactor = "TestExpectedReplicationFactor" TestExpectedRetentionDuration = "TestExpectedRetentionDuration" + + kafkaFeatureFlags = "kafka-feature-flags" ) var ( @@ -94,6 +100,8 @@ var DefaultEnv = &config.Env{ ContractConfigMapFormat: base.Json, } +var customChannelTopicTemplate = customTemplate() + func TestReconcileKind(t *testing.T) { t.Setenv("SYSTEM_NAMESPACE", "knative-eventing") @@ -123,6 +131,7 @@ func TestReconcileKind(t *testing.T) { Key: testKey, Objects: []runtime.Object{ NewChannel( + WithChannelTopicStatusAnnotation(defaultTopicName()), WithInitKafkaChannelConditions, WithDeletedTimeStamp), NewConfigMapWithTextData(system.Namespace(), DefaultEnv.GeneralConfigMapName, map[string]string{ @@ -204,6 +213,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), StatusDataPlaneAvailable, ChannelAddressable(&env), @@ -284,6 +294,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), StatusDataPlaneAvailable, ChannelAddressable(&env), @@ -352,6 +363,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), StatusDataPlaneAvailable, StatusProbeFailed(prober.StatusNotReady), @@ -421,6 +433,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), StatusDataPlaneAvailable, StatusProbeFailed(prober.StatusUnknown), @@ -492,6 +505,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), StatusDataPlaneAvailable, WithSubscribers(Subscriber1(WithFreshSubscriber, WithNoSubscriberURI)), @@ -568,6 +582,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), StatusDataPlaneAvailable, ChannelAddressable(&env), @@ -649,6 +664,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), StatusDataPlaneAvailable, ChannelAddressable(&env), @@ -730,6 +746,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), StatusDataPlaneAvailable, ChannelAddressable(&env), @@ -811,6 +828,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), StatusDataPlaneAvailable, ChannelAddressable(&env), @@ -909,6 +927,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), StatusDataPlaneAvailable, ChannelAddressable(&env), @@ -1181,6 +1200,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), StatusDataPlaneAvailable, ChannelAddressable(&env), @@ -1283,6 +1303,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), StatusDataPlaneAvailable, ChannelAddressable(&env), @@ -1388,6 +1409,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), StatusDataPlaneAvailable, ChannelAddressable(&env), @@ -1490,6 +1512,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), StatusDataPlaneAvailable, ChannelAddressable(&env), @@ -1558,6 +1581,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), StatusDataPlaneAvailable, ChannelAddressable(&env), @@ -1623,6 +1647,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), StatusDataPlaneAvailable, ChannelAddressable(&env), @@ -1664,6 +1689,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusDataPlaneAvailable, StatusConfigParsed, + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), StatusConfigMapNotUpdatedReady( "Failed to get contract data from ConfigMap: knative-eventing/kafka-channel-channels-subscriptions", @@ -1684,6 +1710,81 @@ func TestReconcileKind(t *testing.T) { ), }, }, + { + Name: "Reconciled normal - custom topic template", + Objects: []runtime.Object{ + NewChannel(), + NewService(), + ChannelReceiverPod(env.SystemNamespace, map[string]string{ + base.VolumeGenerationAnnotationKey: "0", + "annotation_to_preserve": "value_to_preserve", + }), + ChannelDispatcherPod(env.SystemNamespace, map[string]string{ + base.VolumeGenerationAnnotationKey: "0", + "annotation_to_preserve": "value_to_preserve", + }), + NewConfigMapWithTextData(system.Namespace(), DefaultEnv.GeneralConfigMapName, map[string]string{ + kafka.BootstrapServersConfigMapKey: ChannelBootstrapServers, + }), + }, + Key: testKey, + WantUpdates: []clientgotesting.UpdateActionImpl{ + ConfigMapUpdate(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, env.ContractConfigMapFormat, &contract.Contract{ + Generation: 1, + Resources: []*contract.Resource{ + { + Uid: ChannelUUID, + Topics: []string{CustomTopic(customChannelTopicTemplate)}, + BootstrapServers: ChannelBootstrapServers, + Reference: ChannelReference(), + Ingress: &contract.Ingress{ + Host: receiver.Host(ChannelNamespace, ChannelName), + }, + }, + }, + }), + ChannelReceiverPodUpdate(env.SystemNamespace, map[string]string{ + "annotation_to_preserve": "value_to_preserve", + base.VolumeGenerationAnnotationKey: "1", + }), + ChannelDispatcherPodUpdate(env.SystemNamespace, map[string]string{ + "annotation_to_preserve": "value_to_preserve", + base.VolumeGenerationAnnotationKey: "1", + }), + }, + SkipNamespaceValidation: true, // WantCreates compare the channel namespace with configmap namespace, so skip it + WantCreates: []runtime.Object{ + NewConfigMapWithBinaryData(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, nil), + NewPerChannelService(DefaultEnv), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{ + { + Object: NewChannel( + WithInitKafkaChannelConditions, + StatusConfigParsed, + StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(CustomTopic(customChannelTopicTemplate)), + StatusTopicReadyWithName(CustomTopic(customChannelTopicTemplate)), + StatusDataPlaneAvailable, + ChannelAddressable(&env), + StatusProbeSucceeded, + ), + }, + }, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(), + }, + WantEvents: []string{ + finalizerUpdatedEvent, + }, + OtherTestData: map[string]interface{}{ + kafkaFeatureFlags: newKafkaFeaturesConfigFromMap(&corev1.ConfigMap{ + Data: map[string]string{ + "channels.topic.template": "custom-channel-template.{{ .Namespace }}.{{ .Name }}", + }, + }), + }, + }, { Name: "Reconciled normal - TLS Permissive", Objects: []runtime.Object{ @@ -1754,6 +1855,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), StatusDataPlaneAvailable, ChannelAddressable(&env), @@ -1855,6 +1957,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), StatusDataPlaneAvailable, ChannelAddressable(&env), @@ -1901,7 +2004,7 @@ func TestFinalizeKind(t *testing.T) { { Name: "Finalize normal - no auth", Objects: []runtime.Object{ - NewDeletedChannel(), + NewDeletedChannel(WithChannelTopicStatusAnnotation(defaultTopicName())), NewConfigMapFromContract(&contract.Contract{ Generation: 1, Resources: []*contract.Resource{ @@ -1960,6 +2063,13 @@ func useTable(t *testing.T, table TableTest, env config.Env) { proberMock = p.(prober.Prober) } + var featureFlags *apisconfig.KafkaFeatureFlags + if v, ok := row.OtherTestData[kafkaFeatureFlags]; ok { + featureFlags = v.(*apisconfig.KafkaFeatureFlags) + } else { + featureFlags = apisconfig.DefaultFeaturesConfig() + } + numPartitions := int32(1) if v, ok := row.OtherTestData[TestExpectedDataNumPartitions]; ok { numPartitions = v.(int32) @@ -1981,6 +2091,11 @@ func useTable(t *testing.T, table TableTest, env config.Env) { retentionMillisString := strconv.FormatInt(retentionDuration.Milliseconds(), 10) + expectedTopicName, err := featureFlags.ExecuteChannelsTopicTemplate(metav1.ObjectMeta{Name: ChannelName, Namespace: ChannelNamespace, UID: ChannelUUID}) + if err != nil { + panic("failed to create expected topic name") + } + reconciler := &Reconciler{ Reconciler: &base.Reconciler{ KubeClient: kubeclient.Get(ctx), @@ -2005,7 +2120,7 @@ func useTable(t *testing.T, table TableTest, env config.Env) { }, NewKafkaClusterAdminClient: func(_ []string, _ *sarama.Config) (sarama.ClusterAdmin, error) { return &kafkatesting.MockKafkaClusterAdmin{ - ExpectedTopicName: ChannelTopic(), + ExpectedTopicName: expectedTopicName, ExpectedTopicDetail: sarama.TopicDetail{ NumPartitions: numPartitions, ReplicationFactor: replicationFactor, @@ -2016,8 +2131,9 @@ func useTable(t *testing.T, table TableTest, env config.Env) { T: t, }, nil }, - Prober: proberMock, - IngressHost: network.GetServiceHostname(env.IngressName, env.SystemNamespace), + Prober: proberMock, + IngressHost: network.GetServiceHostname(env.IngressName, env.SystemNamespace), + KafkaFeatureFlags: featureFlags, } reconciler.Tracker = &FakeTracker{} @@ -2046,6 +2162,27 @@ func patchFinalizers() clientgotesting.PatchActionImpl { return action } +func defaultTopicName() string { + topicName, err := apisconfig.DefaultFeaturesConfig().ExecuteChannelsTopicTemplate(metav1.ObjectMeta{Name: ChannelName, Namespace: ChannelNamespace}) + if err != nil { + panic("failed to create default channel topic name") + } + return topicName +} + +func customTemplate() *template.Template { + channelsTemplate, _ := template.New("channels.topic.template").Parse("custom-channel-template.{{ .Namespace }}.{{ .Name }}") + return channelsTemplate +} + +func newKafkaFeaturesConfigFromMap(cm *corev1.ConfigMap) *apisconfig.KafkaFeatureFlags { + featureFlags, err := apisconfig.NewFeaturesConfigFromMap(cm) + if err != nil { + panic("failed to create kafka features from config map") + } + return featureFlags +} + func makeTLSSecret() *corev1.Secret { return &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ diff --git a/control-plane/pkg/reconciler/channel/controller.go b/control-plane/pkg/reconciler/channel/controller.go index cf90d72aff..d03944e145 100644 --- a/control-plane/pkg/reconciler/channel/controller.go +++ b/control-plane/pkg/reconciler/channel/controller.go @@ -37,18 +37,20 @@ import ( podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod" secretinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/secret" serviceinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/service" + "knative.dev/pkg/configmap" "knative.dev/pkg/controller" "knative.dev/pkg/logging" "knative.dev/pkg/network" "knative.dev/pkg/resolver" + apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/prober" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base" ) -func NewController(ctx context.Context, configs *config.Env) *controller.Impl { +func NewController(ctx context.Context, watcher configmap.Watcher, configs *config.Env) *controller.Impl { messagingv1beta.RegisterAlternateKafkaChannelConditionSet(base.IngressConditionSet) @@ -74,6 +76,7 @@ func NewController(ctx context.Context, configs *config.Env) *controller.Impl { Env: configs, ConfigMapLister: configmapInformer.Lister(), ServiceLister: serviceInformer.Lister(), + KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(), } logger := logging.FromContext(ctx) @@ -93,6 +96,12 @@ func NewController(ctx context.Context, configs *config.Env) *controller.Impl { channelInformer := kafkachannelinformer.Get(ctx) + kafkaConfigStore := apisconfig.NewStore(ctx, func(name string, value *apisconfig.KafkaFeatureFlags) { + reconciler.KafkaFeatureFlags.Reset(value) + impl.GlobalResync(channelInformer.Informer()) + }) + kafkaConfigStore.WatchConfigs(watcher) + channelInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue)) reconciler.Resolver = resolver.NewURIResolverFromTracker(ctx, impl.Tracker) diff --git a/control-plane/pkg/reconciler/channel/controller_test.go b/control-plane/pkg/reconciler/channel/controller_test.go index 3536a31f28..297c9b352e 100644 --- a/control-plane/pkg/reconciler/channel/controller_test.go +++ b/control-plane/pkg/reconciler/channel/controller_test.go @@ -24,6 +24,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" fakekubeclientset "k8s.io/client-go/kubernetes/fake" + apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" _ "knative.dev/eventing-kafka-broker/control-plane/pkg/client/injection/informers/messaging/v1beta1/kafkachannel/fake" _ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription/fake" "knative.dev/eventing/pkg/eventingtls/eventingtlstesting" @@ -34,6 +35,7 @@ import ( secretinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/secret" _ "knative.dev/pkg/client/injection/kube/informers/core/v1/secret/fake" _ "knative.dev/pkg/client/injection/kube/informers/core/v1/service/fake" + "knative.dev/pkg/configmap" dynamicclient "knative.dev/pkg/injection/clients/dynamicclient/fake" reconcilertesting "knative.dev/pkg/reconciler/testing" @@ -81,6 +83,11 @@ func TestNewController(t *testing.T) { controller := NewController( ctx, + configmap.NewStaticWatcher(&corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: apisconfig.FlagsConfigName, + }, + }), configs, ) if controller == nil { diff --git a/control-plane/pkg/reconciler/channel/testdata/config-kafka-features.yaml b/control-plane/pkg/reconciler/channel/testdata/config-kafka-features.yaml new file mode 100644 index 0000000000..fdc7298324 --- /dev/null +++ b/control-plane/pkg/reconciler/channel/testdata/config-kafka-features.yaml @@ -0,0 +1,13 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: config-kafka-features + namespace: knative-eventing +data: + _example: | + dispatcher.rate-limiter: "enabled" + dispatcher.ordered-executor-metrics: "enabled" + controller.autoscaler: "enabled" + triggers.consumergroup.template: "knative-trigger-{{ .Namespace }}-{{ .Name }}" + brokers.topic.template: "knative-broker-{{ .Namespace }}-{{ .Name }}" + channels.topic.template: "custom-channel-template.{{ .Namespace }}.{{ .Name }}" diff --git a/control-plane/pkg/reconciler/channel/v2/channelv2.go b/control-plane/pkg/reconciler/channel/v2/channelv2.go index a4cdb3902e..309f348ed9 100644 --- a/control-plane/pkg/reconciler/channel/v2/channelv2.go +++ b/control-plane/pkg/reconciler/channel/v2/channelv2.go @@ -60,6 +60,7 @@ import ( channelreconciler "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/channel" "knative.dev/eventing-kafka-broker/control-plane/pkg/security" + apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" internalscg "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1" internalsclient "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/clientset/versioned" internalslst "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/listers/eventing/v1alpha1" @@ -70,8 +71,6 @@ import ( ) const ( - // TopicPrefix is the Kafka Channel topic prefix - (topic name: knative-messaging-kafka..). - TopicPrefix = "knative-messaging-kafka" DefaultDeliveryOrder = kafkasource.Ordered KafkaChannelConditionSubscribersReady apis.ConditionType = "Subscribers" // condition is registered by controller @@ -110,6 +109,7 @@ type Reconciler struct { ConsumerGroupLister internalslst.ConsumerGroupLister InternalsClient internalsclient.Interface + KafkaFeatureFlags *apisconfig.KafkaFeatureFlags } func (r *Reconciler) ReconcileKind(ctx context.Context, channel *messagingv1beta1.KafkaChannel) reconciler.Event { @@ -163,7 +163,20 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, channel *messagingv1beta // get security option for Sarama with secret info in it saramaSecurityOption := security.NewSaramaSecurityOptionFromSecret(authContext.VirtualSecret) - topicName := kafka.ChannelTopic(TopicPrefix, channel) + if channel.Status.Annotations == nil { + channel.Status.Annotations = make(map[string]string) + } + // Check if there is an existing topic name for this channel. If there is, reconcile the channel with the existing name. + // If not, create a new topic name from the channel topic name template. + var topicName string + var existingTopic bool + if topicName, existingTopic = channel.Status.Annotations[kafka.TopicAnnotation]; !existingTopic { + topicName, err = r.KafkaFeatureFlags.ExecuteChannelsTopicTemplate(channel.ObjectMeta) + if err != nil { + return err + } + } + channel.Status.Annotations[kafka.TopicAnnotation] = topicName kafkaClusterAdminSaramaConfig, err := kafka.GetSaramaConfig(saramaSecurityOption) if err != nil { @@ -473,7 +486,11 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, channel *messagingv1beta1 } defer kafkaClusterAdminClient.Close() - topic, err := kafka.DeleteTopic(kafkaClusterAdminClient, kafka.ChannelTopic(TopicPrefix, channel)) + topicName, err := r.KafkaFeatureFlags.ExecuteChannelsTopicTemplate(channel.ObjectMeta) + if err != nil { + return err + } + topic, err := kafka.DeleteTopic(kafkaClusterAdminClient, topicName) if err != nil { return err } diff --git a/control-plane/pkg/reconciler/channel/v2/channelv2_test.go b/control-plane/pkg/reconciler/channel/v2/channelv2_test.go index 86a81a0514..7676b5871d 100644 --- a/control-plane/pkg/reconciler/channel/v2/channelv2_test.go +++ b/control-plane/pkg/reconciler/channel/v2/channelv2_test.go @@ -21,10 +21,12 @@ import ( "fmt" "strconv" "testing" + "text/template" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/Shopify/sarama" corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" clientgotesting "k8s.io/client-go/testing" @@ -42,6 +44,7 @@ import ( . "knative.dev/pkg/reconciler/testing" "knative.dev/pkg/system" + apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" kafkasource "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/sources/v1beta1" "knative.dev/eventing-kafka-broker/control-plane/pkg/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/contract" @@ -72,6 +75,8 @@ const ( TestExpectedDataNumPartitions = "TestExpectedDataNumPartitions" TestExpectedReplicationFactor = "TestExpectedReplicationFactor" TestExpectedRetentionDuration = "TestExpectedRetentionDuration" + + kafkaFeatureFlags = "kafka-feature-flags" ) var finalizerUpdatedEvent = Eventf( @@ -80,6 +85,8 @@ var finalizerUpdatedEvent = Eventf( fmt.Sprintf(`Updated %q finalizers`, ChannelName), ) +var customChannelTopicTemplate = customTemplate() + var DefaultEnv = &config.Env{ DataPlaneConfigMapNamespace: "knative-eventing", ContractConfigMapName: "kafka-channel-channels-subscriptions", @@ -226,6 +233,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), ChannelAddressable(&env), StatusProbeSucceeded, @@ -297,6 +305,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), ChannelAddressable(&env), StatusProbeSucceeded, @@ -348,6 +357,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), StatusProbeFailed(prober.StatusNotReady), StatusChannelSubscribers(), @@ -400,6 +410,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), StatusProbeFailed(prober.StatusUnknown), StatusChannelSubscribers(), @@ -469,6 +480,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), ChannelAddressable(&env), StatusProbeSucceeded, @@ -538,6 +550,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), ChannelAddressable(&env), StatusProbeSucceeded, @@ -606,6 +619,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), ChannelAddressable(&env), StatusProbeSucceeded, @@ -684,6 +698,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), ChannelAddressable(&env), WithSubscribers(Subscriber1(WithFreshSubscriber)), @@ -737,6 +752,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), ChannelAddressable(&env), StatusProbeSucceeded, @@ -841,6 +857,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), ChannelAddressable(&env), StatusProbeSucceeded, @@ -927,6 +944,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), ChannelAddressable(&env), StatusProbeSucceeded, @@ -1093,6 +1111,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), ChannelAddressable(&env), StatusProbeSucceeded, @@ -1191,6 +1210,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), ChannelAddressable(&env), WithSubscribers(Subscriber1(WithFreshSubscriber)), @@ -1292,6 +1312,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), ChannelAddressable(&env), WithSubscribers(Subscriber1(WithFreshSubscriber)), @@ -1389,6 +1410,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), ChannelAddressable(&env), WithSubscribers(Subscriber1(WithFreshSubscriber)), @@ -1464,6 +1486,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), ChannelAddressable(&env), StatusProbeSucceeded, @@ -1515,6 +1538,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), ChannelAddressable(&env), StatusProbeSucceeded, @@ -1564,6 +1588,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), ChannelAddressable(&env), StatusProbeSucceeded, @@ -1595,6 +1620,7 @@ func TestReconcileKind(t *testing.T) { Object: NewChannel( WithInitKafkaChannelConditions, StatusConfigParsed, + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), StatusConfigMapNotUpdatedReady( "Failed to get contract data from ConfigMap: knative-eventing/kafka-channel-channels-subscriptions", @@ -1615,6 +1641,86 @@ func TestReconcileKind(t *testing.T) { ), }, }, + { + Name: "Reconciled normal - with custom template", + Objects: []runtime.Object{ + NewChannel( + WithChannelDelivery(&eventingduck.DeliverySpec{ + DeadLetterSink: ServiceDestination, + Retry: pointer.Int32(5), + }), + ), + NewConfigMapWithTextData(env.SystemNamespace, DefaultEnv.GeneralConfigMapName, map[string]string{ + kafka.BootstrapServersConfigMapKey: ChannelBootstrapServers, + }), + ChannelReceiverPod(env.SystemNamespace, map[string]string{ + base.VolumeGenerationAnnotationKey: "0", + "annotation_to_preserve": "value_to_preserve", + }), + }, + Key: testKey, + WantUpdates: []clientgotesting.UpdateActionImpl{ + ConfigMapUpdate(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, env.ContractConfigMapFormat, &contract.Contract{ + Generation: 1, + Resources: []*contract.Resource{ + { + Uid: ChannelUUID, + Topics: []string{CustomTopic(customChannelTopicTemplate)}, + BootstrapServers: ChannelBootstrapServers, + Reference: ChannelReference(), + Ingress: &contract.Ingress{ + Host: receiver.Host(ChannelNamespace, ChannelName), + }, + EgressConfig: &contract.EgressConfig{ + DeadLetter: ServiceURL, + Retry: 5, + }, + }, + }, + }), + ChannelReceiverPodUpdate(env.SystemNamespace, map[string]string{ + "annotation_to_preserve": "value_to_preserve", + base.VolumeGenerationAnnotationKey: "1", + }), + }, + SkipNamespaceValidation: true, // WantCreates compare the channel namespace with configmap namespace, so skip it + WantCreates: []runtime.Object{ + NewConfigMapWithBinaryData(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, nil), + NewPerChannelService(&env), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{ + { + Object: NewChannel( + WithChannelDelivery(&eventingduck.DeliverySpec{ + DeadLetterSink: ServiceDestination, + Retry: pointer.Int32(5), + }), + WithInitKafkaChannelConditions, + StatusConfigParsed, + StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(CustomTopic(customChannelTopicTemplate)), + StatusTopicReadyWithName(CustomTopic(customChannelTopicTemplate)), + ChannelAddressable(&env), + StatusProbeSucceeded, + StatusChannelSubscribers(), + WithChannelDeadLetterSinkURI(ServiceURL), + ), + }, + }, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(), + }, + WantEvents: []string{ + finalizerUpdatedEvent, + }, + OtherTestData: map[string]interface{}{ + kafkaFeatureFlags: newKafkaFeaturesConfigFromMap(&corev1.ConfigMap{ + Data: map[string]string{ + "channels.topic.template": "custom-channel-template.{{ .Namespace}}.{{ .Name }}", + }, + }), + }, + }, { Name: "Reconciled normal - TLS Permissive", Objects: []runtime.Object{ @@ -1676,6 +1782,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), ChannelAddressable(&env), StatusProbeSucceeded, @@ -1768,6 +1875,7 @@ func TestReconcileKind(t *testing.T) { WithInitKafkaChannelConditions, StatusConfigParsed, StatusConfigMapUpdatedReady(&env), + WithChannelTopicStatusAnnotation(ChannelTopic()), StatusTopicReadyWithName(ChannelTopic()), ChannelAddressable(&env), StatusProbeSucceeded, @@ -1804,6 +1912,13 @@ func TestReconcileKind(t *testing.T) { proberMock = p.(prober.Prober) } + var featureFlags *apisconfig.KafkaFeatureFlags + if v, ok := row.OtherTestData[kafkaFeatureFlags]; ok { + featureFlags = v.(*apisconfig.KafkaFeatureFlags) + } else { + featureFlags = apisconfig.DefaultFeaturesConfig() + } + numPartitions := int32(1) if v, ok := row.OtherTestData[TestExpectedDataNumPartitions]; ok { numPartitions = v.(int32) @@ -1825,6 +1940,11 @@ func TestReconcileKind(t *testing.T) { retentionMillisString := strconv.FormatInt(retentionDuration.Milliseconds(), 10) + expectedTopicName, err := featureFlags.ExecuteChannelsTopicTemplate(metav1.ObjectMeta{Name: ChannelName, Namespace: ChannelNamespace, UID: ChannelUUID}) + if err != nil { + panic("failed to create expected topic name") + } + reconciler := &Reconciler{ Reconciler: &base.Reconciler{ KubeClient: kubeclient.Get(ctx), @@ -1840,7 +1960,7 @@ func TestReconcileKind(t *testing.T) { Env: env, NewKafkaClusterAdminClient: func(_ []string, _ *sarama.Config) (sarama.ClusterAdmin, error) { return &kafkatesting.MockKafkaClusterAdmin{ - ExpectedTopicName: ChannelTopic(), + ExpectedTopicName: expectedTopicName, ExpectedTopicDetail: sarama.TopicDetail{ NumPartitions: numPartitions, ReplicationFactor: replicationFactor, @@ -1858,6 +1978,7 @@ func TestReconcileKind(t *testing.T) { InternalsClient: fakeconsumergroupinformer.Get(ctx), Prober: proberMock, IngressHost: network.GetServiceHostname(env.IngressName, env.SystemNamespace), + KafkaFeatureFlags: featureFlags, } reconciler.Tracker = &FakeTracker{} reconciler.Tracker = &FakeTracker{} @@ -1904,6 +2025,19 @@ func patchFinalizers() clientgotesting.PatchActionImpl { return action } +func customTemplate() *template.Template { + channelsTemplate, _ := template.New("channels.topic.template").Parse("custom-channel-template.{{ .Namespace }}.{{ .Name }}") + return channelsTemplate +} + +func newKafkaFeaturesConfigFromMap(cm *corev1.ConfigMap) *apisconfig.KafkaFeatureFlags { + featureFlags, err := apisconfig.NewFeaturesConfigFromMap(cm) + if err != nil { + panic("failed to create kafka features from config map") + } + return featureFlags +} + func makeTLSSecret() *corev1.Secret { return &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ diff --git a/control-plane/pkg/reconciler/channel/v2/controllerv2.go b/control-plane/pkg/reconciler/channel/v2/controllerv2.go index 9d3b9f86a3..ee8c2c9cc3 100644 --- a/control-plane/pkg/reconciler/channel/v2/controllerv2.go +++ b/control-plane/pkg/reconciler/channel/v2/controllerv2.go @@ -25,6 +25,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/client-go/tools/cache" serviceinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/service" + "knative.dev/pkg/configmap" "knative.dev/pkg/logging" "knative.dev/pkg/resolver" @@ -45,12 +46,13 @@ import ( "knative.dev/pkg/controller" + apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/consumergroup" ) -func NewController(ctx context.Context, configs *config.Env) *controller.Impl { +func NewController(ctx context.Context, watcher configmap.Watcher, configs *config.Env) *controller.Impl { configmapInformer := configmapinformer.Get(ctx) channelInformer := kafkachannelinformer.Get(ctx) @@ -75,6 +77,7 @@ func NewController(ctx context.Context, configs *config.Env) *controller.Impl { SubscriptionLister: subscriptioninformer.Get(ctx).Lister(), ConsumerGroupLister: consumerGroupInformer.Lister(), InternalsClient: consumergroupclient.Get(ctx), + KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(), } logger := logging.FromContext(ctx) @@ -88,6 +91,13 @@ func NewController(ctx context.Context, configs *config.Env) *controller.Impl { } impl := kafkachannelreconciler.NewImpl(ctx, reconciler) + + kafkaConfigStore := apisconfig.NewStore(ctx, func(name string, value *apisconfig.KafkaFeatureFlags) { + reconciler.KafkaFeatureFlags.Reset(value) + impl.GlobalResync(channelInformer.Informer()) + }) + kafkaConfigStore.WatchConfigs(watcher) + IPsLister := prober.IdentityIPsLister() reconciler.Prober = prober.NewAsync(ctx, http.DefaultClient, "", IPsLister, impl.EnqueueKey) reconciler.IngressHost = network.GetServiceHostname(configs.IngressName, configs.SystemNamespace) diff --git a/control-plane/pkg/reconciler/channel/v2/controllerv2_test.go b/control-plane/pkg/reconciler/channel/v2/controllerv2_test.go index c0e209c1d1..c5f00f8426 100644 --- a/control-plane/pkg/reconciler/channel/v2/controllerv2_test.go +++ b/control-plane/pkg/reconciler/channel/v2/controllerv2_test.go @@ -25,6 +25,7 @@ import ( "k8s.io/apimachinery/pkg/types" fakekubeclientset "k8s.io/client-go/kubernetes/fake" + apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" _ "knative.dev/eventing-kafka-broker/control-plane/pkg/client/injection/informers/messaging/v1beta1/kafkachannel/fake" _ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription/fake" "knative.dev/eventing/pkg/eventingtls/eventingtlstesting" @@ -35,6 +36,7 @@ import ( secretinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/secret" _ "knative.dev/pkg/client/injection/kube/informers/core/v1/secret/fake" _ "knative.dev/pkg/client/injection/kube/informers/core/v1/service/fake" + "knative.dev/pkg/configmap" dynamicclient "knative.dev/pkg/injection/clients/dynamicclient/fake" reconcilertesting "knative.dev/pkg/reconciler/testing" @@ -84,6 +86,11 @@ func TestNewController(t *testing.T) { controller := NewController( ctx, + configmap.NewStaticWatcher(&corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: apisconfig.FlagsConfigName, + }, + }), configs, ) if controller == nil { diff --git a/control-plane/pkg/reconciler/testing/objects_broker.go b/control-plane/pkg/reconciler/testing/objects_broker.go index ac5ba145d5..f3f37a992a 100644 --- a/control-plane/pkg/reconciler/testing/objects_broker.go +++ b/control-plane/pkg/reconciler/testing/objects_broker.go @@ -17,8 +17,10 @@ package testing import ( + "bytes" "fmt" "strings" + "text/template" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -38,6 +40,7 @@ import ( "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka" "knative.dev/eventing-kafka-broker/control-plane/pkg/prober" + apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base" . "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker" "knative.dev/eventing-kafka-broker/control-plane/pkg/security" @@ -57,11 +60,21 @@ const ( ) var ( - BrokerTopics = []string{fmt.Sprintf("%s%s-%s", TopicPrefix, BrokerNamespace, BrokerName)} + kafkaFeatureFlags = apisconfig.DefaultFeaturesConfig() + BrokerTopics = []string{getKafkaTopic()} ) func BrokerTopic() string { - return fmt.Sprintf("%s%s-%s", TopicPrefix, BrokerNamespace, BrokerName) + return getKafkaTopic() +} + +func CustomBrokerTopic(template *template.Template) string { + var result bytes.Buffer + err := template.Execute(&result, metav1.ObjectMeta{Namespace: BrokerNamespace, Name: BrokerName, UID: BrokerUUID}) + if err != nil { + panic("Failed to create custom topic name") + } + return result.String() } // NewBroker creates a new Broker with broker class equals to kafka.BrokerClass. @@ -310,7 +323,11 @@ func StatusBrokerConfigMapUpdatedReady(env *config.Env) func(broker *eventing.Br } func StatusBrokerTopicReady(broker *eventing.Broker) { - StatusTopicReadyWithName(kafka.BrokerTopic(TopicPrefix, broker))(broker) + topicName, err := kafkaFeatureFlags.ExecuteBrokersTopicTemplate(broker.ObjectMeta) + if err != nil { + panic("Failed to create broker topic name") + } + StatusTopicReadyWithName(topicName)(broker) } func StatusExternalBrokerTopicReady(topic string) func(broker *eventing.Broker) { @@ -489,3 +506,11 @@ func BrokerConfigMapSecretAnnotation(name string) reconcilertesting.BrokerOption broker.Status.Annotations[security.AuthSecretNameKey] = name } } + +func getKafkaTopic() string { + topicName, err := kafkaFeatureFlags.ExecuteBrokersTopicTemplate(metav1.ObjectMeta{Namespace: BrokerNamespace, Name: BrokerName}) + if err != nil { + panic("Failed to create broker topic name") + } + return topicName +} diff --git a/control-plane/pkg/reconciler/testing/objects_channel.go b/control-plane/pkg/reconciler/testing/objects_channel.go index f6195d7f29..026f6220d9 100644 --- a/control-plane/pkg/reconciler/testing/objects_channel.go +++ b/control-plane/pkg/reconciler/testing/objects_channel.go @@ -17,8 +17,10 @@ package testing import ( + "bytes" "context" "fmt" + "text/template" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -32,14 +34,14 @@ import ( duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/network" + apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" messagingv1beta1 "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/messaging/v1beta1" + "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/channel/resources" "knative.dev/eventing-kafka-broker/control-plane/pkg/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/contract" - "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base" - . "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/channel" subscriptionv1 "knative.dev/eventing/pkg/reconciler/testing/v1" ) @@ -62,7 +64,21 @@ const ( func ChannelTopic() string { c := NewChannel() - return kafka.ChannelTopic(TopicPrefix, c) + topicName, err := apisconfig.DefaultFeaturesConfig().ExecuteChannelsTopicTemplate(c.ObjectMeta) + if err != nil { + panic("Failed to create channel topic name") + } + return topicName +} + +func CustomTopic(template *template.Template) string { + c := NewChannel() + var result bytes.Buffer + err := template.Execute(&result, c.ObjectMeta) + if err != nil { + panic("Failed to create custom topic name") + } + return result.String() } func NewChannel(options ...KRShapedOption) *messagingv1beta1.KafkaChannel { @@ -73,6 +89,10 @@ func NewChannel(options ...KRShapedOption) *messagingv1beta1.KafkaChannel { UID: ChannelUUID, }, } + return applyOptionsToChannel(c, options...) +} + +func applyOptionsToChannel(c *messagingv1beta1.KafkaChannel, options ...KRShapedOption) *messagingv1beta1.KafkaChannel { for _, opt := range options { opt(c) } @@ -224,6 +244,16 @@ func ChannelAddressable(env *config.Env) func(obj duckv1.KRShaped) { } } +func WithChannelTopicStatusAnnotation(topicName string) func(obj duckv1.KRShaped) { + return func(obj duckv1.KRShaped) { + channel := obj.(*messagingv1beta1.KafkaChannel) + if channel.Status.Annotations == nil { + channel.Status.Annotations = make(map[string]string, 1) + } + channel.Status.Annotations[kafka.TopicAnnotation] = topicName + } +} + func ChannelReference() *contract.Reference { return &contract.Reference{ Uuid: ChannelUUID, diff --git a/test/config/100-config-kafka-features.yaml b/test/config/100-config-kafka-features.yaml index 57365e5f90..0f3a495aed 100644 --- a/test/config/100-config-kafka-features.yaml +++ b/test/config/100-config-kafka-features.yaml @@ -9,4 +9,4 @@ data: controller.autoscaler: "enabled" triggers.consumergroup.template: "knative-trigger-{{ .Namespace }}-{{ .Name }}" brokers.topic.template: "knative-broker-{{ .Namespace }}-{{ .Name }}" - channels.topic.template: "knative-channel-{{ .Namespace }}-{{ .Name }}" + channels.topic.template: "knative-messaging-kafka.{{ .Namespace }}.{{ .Name }}" diff --git a/test/rekt/features/broker_config.go b/test/rekt/features/broker_config.go index c799716c54..50c6fecb53 100644 --- a/test/rekt/features/broker_config.go +++ b/test/rekt/features/broker_config.go @@ -17,15 +17,13 @@ package features import ( - "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka" testpkg "knative.dev/eventing-kafka-broker/test/pkg" "knative.dev/reconciler-test/pkg/environment" "knative.dev/reconciler-test/pkg/feature" - brokerreconciler "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker" + apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" brokerconfigmap "knative.dev/eventing-kafka-broker/test/rekt/resources/configmap/broker" "knative.dev/eventing-kafka-broker/test/rekt/resources/kafkatopic" - eventing "knative.dev/eventing/pkg/apis/eventing/v1" "knative.dev/eventing/test/rekt/resources/broker" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -53,12 +51,10 @@ func BrokerWithCustomReplicationFactorAndNumPartitions(env environment.Environme )) f.Setup("Broker ready", broker.IsReady(brokerName)) - topic := kafka.BrokerTopic(brokerreconciler.TopicPrefix, &eventing.Broker{ - ObjectMeta: metav1.ObjectMeta{ - Name: brokerName, - Namespace: env.Namespace(), - }, - }) + topic, err := apisconfig.DefaultFeaturesConfig().ExecuteBrokersTopicTemplate(metav1.ObjectMeta{Name: brokerName, Namespace: env.Namespace()}) + if err != nil { + panic("failed to create broker topic name") + } f.Setup("Topic is ready", kafkatopic.IsReady(topic)) f.Assert("Replication factor", kafkatopic.HasReplicationFactor(topic, replicationFactor)) diff --git a/test/rekt/features/kafka_sink.go b/test/rekt/features/kafka_sink.go index 50646ed6c5..48bd4d4086 100644 --- a/test/rekt/features/kafka_sink.go +++ b/test/rekt/features/kafka_sink.go @@ -24,7 +24,7 @@ import ( "github.com/google/uuid" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" "knative.dev/eventing/test/rekt/resources/broker" "knative.dev/eventing/test/rekt/resources/trigger" "knative.dev/reconciler-test/pkg/environment" @@ -34,8 +34,6 @@ import ( cetest "github.com/cloudevents/sdk-go/v2/test" - "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka" - brokerreconciler "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker" testpkg "knative.dev/eventing-kafka-broker/test/pkg" "knative.dev/eventing-kafka-broker/test/rekt/resources/kafkasink" "knative.dev/eventing-kafka-broker/test/rekt/resources/kafkatopic" @@ -63,13 +61,6 @@ func BrokerWithTriggersAndKafkaSink(env environment.Environment) *feature.Featur sink := feature.MakeRandomK8sName("sink") verifyMessagesJobName := feature.MakeRandomK8sName("verify-messages-job") - topic := kafka.BrokerTopic(brokerreconciler.TopicPrefix, &eventingv1.Broker{ - ObjectMeta: metav1.ObjectMeta{ - Name: brokerName, - Namespace: env.Namespace(), - }, - }) - trigger1FilterType := "trigger-1" trigger2FilterType := "trigger-2" @@ -84,6 +75,10 @@ func BrokerWithTriggersAndKafkaSink(env environment.Environment) *feature.Featur eventTrigger2.SetType(trigger2FilterType) f := feature.NewFeatureNamed("Trigger with KafkaSink") + topic, err := apisconfig.DefaultFeaturesConfig().ExecuteBrokersTopicTemplate(metav1.ObjectMeta{Name: brokerName, Namespace: env.Namespace()}) + if err != nil { + panic("failed to create broker topic name") + } f.Setup("Install broker", broker.Install(brokerName, broker.WithEnvConfig()...)) f.Setup("Broker is ready", broker.IsReady(brokerName))