Skip to content

Commit

Permalink
Update Broker and Controller to use topic name templates (#3162)
Browse files Browse the repository at this point in the history
* Updated broker to use broker topic name template

* Updated broker reconciliation to onlyuse topic name template if no existing topic name for broker

* Fixed import styling

* Updated channel to use topic template

* Updated channels to use topic annotations

* Fixed import formatting

* Added tests for reconciling with custom topic templates

* Fixed go imports

* use annotated topic names when deleting topics

* switched from failing tests to using panics if topic names not setup properly

* updated kafka feature yaml files to match channel topic template

* updated kafka feature default vars to be private

* updated tests

* Fixed goimports

* Fixed broker finalizer tests

* Updated unit tests

* Fixed unit tests

* Switched templates to not be pointers

* Updated broker and channel controllers to watch kafka feature config maps

Signed-off-by: Calum Murray <cmurray@redhat.com>

---------

Signed-off-by: Calum Murray <cmurray@redhat.com>
  • Loading branch information
Cali0707 authored Jun 29, 2023
1 parent fe4652b commit e9792d0
Show file tree
Hide file tree
Showing 27 changed files with 660 additions and 93 deletions.
2 changes: 1 addition & 1 deletion control-plane/cmd/kafka-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func main() {
injection.NamedControllerConstructor{
Name: "channel-controller",
ControllerConstructor: func(ctx context.Context, watcher configmap.Watcher) *controller.Impl {
return channel.NewController(ctx, channelEnv)
return channel.NewController(ctx, watcher, channelEnv)
},
},

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,4 @@ data:
controller.autoscaler: "disabled"
triggers.consumergroup.template: "knative-trigger-{{ .Namespace }}-{{ .Name }}"
brokers.topic.template: "knative-broker-{{ .Namespace }}-{{ .Name }}"
channels.topic.template: "knative-channel-{{ .Namespace }}-{{ .Name }}"
channels.topic.template: "knative-messaging-kafka.{{ .Namespace }}.{{ .Name }}"
39 changes: 20 additions & 19 deletions control-plane/pkg/apis/config/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ type features struct {
DispatcherRateLimiter feature.Flag
DispatcherOrderedExecutorMetrics feature.Flag
ControllerAutoscaler feature.Flag
TriggersConsumerGroupTemplate *template.Template
BrokersTopicTemplate *template.Template
ChannelsTopicTemplate *template.Template
TriggersConsumerGroupTemplate template.Template
BrokersTopicTemplate template.Template
ChannelsTopicTemplate template.Template
}

type KafkaFeatureFlags struct {
Expand All @@ -50,15 +50,16 @@ type KafkaFeatureFlags struct {
}

var (
DefaultTriggersConsumerGroupTemplate *template.Template
DefaultBrokersTopicTemplate *template.Template
DefaultChannelsTopicTemplate *template.Template
defaultTriggersConsumerGroupTemplate *template.Template
defaultBrokersTopicTemplate *template.Template
defaultChannelsTopicTemplate *template.Template
)

func init() {
DefaultTriggersConsumerGroupTemplate, _ = template.New("triggers.consumergroup.template").Parse("knative-trigger-{{ .Namespace }}-{{ .Name }}")
DefaultBrokersTopicTemplate, _ = template.New("brokers.topic.template").Parse("knative-broker-{{ .Namespace }}-{{ .Name }}")
DefaultChannelsTopicTemplate, _ = template.New("channels.topic.template").Parse("knative-channel-{{ .Namespace }}-{{ .Name }}")
defaultTriggersConsumerGroupTemplate, _ = template.New("triggers.consumergroup.template").Parse("knative-trigger-{{ .Namespace }}-{{ .Name }}")
defaultBrokersTopicTemplate, _ = template.New("brokers.topic.template").Parse("knative-broker-{{ .Namespace }}-{{ .Name }}")
// This will resolve to the old naming convention, to prevent errors switching over to the new topic templates approach
defaultChannelsTopicTemplate, _ = template.New("channels.topic.template").Parse("knative-messaging-kafka.{{ .Namespace }}.{{ .Name }}")
}

func DefaultFeaturesConfig() *KafkaFeatureFlags {
Expand All @@ -67,23 +68,23 @@ func DefaultFeaturesConfig() *KafkaFeatureFlags {
DispatcherRateLimiter: feature.Disabled,
DispatcherOrderedExecutorMetrics: feature.Disabled,
ControllerAutoscaler: feature.Disabled,
TriggersConsumerGroupTemplate: DefaultTriggersConsumerGroupTemplate,
BrokersTopicTemplate: DefaultBrokersTopicTemplate,
ChannelsTopicTemplate: DefaultChannelsTopicTemplate,
TriggersConsumerGroupTemplate: *defaultTriggersConsumerGroupTemplate,
BrokersTopicTemplate: *defaultBrokersTopicTemplate,
ChannelsTopicTemplate: *defaultChannelsTopicTemplate,
},
}
}

// newFeaturesConfigFromMap creates a Features from the supplied Map
func newFeaturesConfigFromMap(cm *corev1.ConfigMap) (*KafkaFeatureFlags, error) {
// NewFeaturesConfigFromMap creates a Features from the supplied Map
func NewFeaturesConfigFromMap(cm *corev1.ConfigMap) (*KafkaFeatureFlags, error) {
nc := DefaultFeaturesConfig()
err := configmap.Parse(cm.Data,
asFlag("dispatcher.rate-limiter", &nc.features.DispatcherRateLimiter),
asFlag("dispatcher.ordered-executor-metrics", &nc.features.DispatcherOrderedExecutorMetrics),
asFlag("controller.autoscaler", &nc.features.ControllerAutoscaler),
asTemplate("triggers.consumergroup.template", nc.features.TriggersConsumerGroupTemplate),
asTemplate("brokers.topic.template", nc.features.BrokersTopicTemplate),
asTemplate("channels.topic.template", nc.features.ChannelsTopicTemplate),
asTemplate("triggers.consumergroup.template", &nc.features.TriggersConsumerGroupTemplate),
asTemplate("brokers.topic.template", &nc.features.BrokersTopicTemplate),
asTemplate("channels.topic.template", &nc.features.ChannelsTopicTemplate),
)
return nc, err
}
Expand Down Expand Up @@ -131,7 +132,7 @@ func NewStore(ctx context.Context, onAfterStore ...func(name string, value *Kafk
"config-kafka-features",
logging.FromContext(ctx).Named("config-kafka-features"),
configmap.Constructors{
FlagsConfigName: newFeaturesConfigFromMap,
FlagsConfigName: NewFeaturesConfigFromMap,
},
func(name string, value interface{}) {
for _, f := range onAfterStore {
Expand Down Expand Up @@ -201,7 +202,7 @@ func asTemplate(key string, target *template.Template) configmap.ParseFunc {
}
}

func executeTemplateToString(template *template.Template, metadata v1.ObjectMeta, errorMessage string) (string, error) {
func executeTemplateToString(template template.Template, metadata v1.ObjectMeta, errorMessage string) (string, error) {
var result bytes.Buffer
err := template.Execute(&result, metadata)
if err != nil {
Expand Down
15 changes: 9 additions & 6 deletions control-plane/pkg/apis/config/features_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestFlags_IsEnabled_ContainingFlag(t *testing.T) {

func TestGetFlags(t *testing.T) {
_, example := cm.ConfigMapsFromTestFile(t, FlagsConfigName)
flags, err := newFeaturesConfigFromMap(example)
flags, err := NewFeaturesConfigFromMap(example)
require.NoError(t, err)

require.True(t, flags.IsDispatcherRateLimiterEnabled())
Expand All @@ -74,7 +74,7 @@ func TestStoreLoadWithConfigMap(t *testing.T) {
store.OnConfigChanged(exampleConfig)

have := FromContext(store.ToContext(context.Background()))
expected, _ := newFeaturesConfigFromMap(exampleConfig)
expected, _ := NewFeaturesConfigFromMap(exampleConfig)

require.Equal(t, expected.IsDispatcherRateLimiterEnabled(), have.IsDispatcherRateLimiterEnabled())
require.Equal(t, expected.IsDispatcherOrderedExecutorMetricsEnabled(), have.IsDispatcherOrderedExecutorMetricsEnabled())
Expand Down Expand Up @@ -111,7 +111,8 @@ func TestExecuteTriggersConsumerGroupTemplateDefault(t *testing.T) {

func TestExecuteTriggersConsumerGroupTemplateOverride(t *testing.T) {
nc := DefaultFeaturesConfig()
nc.features.TriggersConsumerGroupTemplate, _ = template.New("custom-template").Parse("knative-trigger-{{ .Namespace }}-{{ .Name }}-{{ .UID }}")
customTemplate, _ := template.New("custom-template").Parse("knative-trigger-{{ .Namespace }}-{{ .Name }}-{{ .UID }}")
nc.features.TriggersConsumerGroupTemplate = *customTemplate

result, err := nc.ExecuteTriggersConsumerGroupTemplate(v1.ObjectMeta{
Name: "trigger",
Expand Down Expand Up @@ -141,7 +142,8 @@ func TestExecuteBrokersTopicTemplateDefault(t *testing.T) {

func TestExecuteBrokersTopicTemplateOverride(t *testing.T) {
nc := DefaultFeaturesConfig()
nc.features.BrokersTopicTemplate, _ = template.New("custom-template").Parse("knative-broker-{{ .Namespace }}-{{ .Name }}-{{ .UID }}")
customTemplate, _ := template.New("custom-template").Parse("knative-broker-{{ .Namespace }}-{{ .Name }}-{{ .UID }}")
nc.features.BrokersTopicTemplate = *customTemplate

result, err := nc.ExecuteBrokersTopicTemplate(v1.ObjectMeta{
Name: "topic",
Expand All @@ -166,12 +168,13 @@ func TestExecuteChannelsTopicTemplateDefault(t *testing.T) {
require.NoError(t, err)
}

require.Equal(t, result, "knative-channel-namespace-topic")
require.Equal(t, result, "knative-messaging-kafka.namespace.topic")
}

func TestExecuteChannelsTopicTemplateOverride(t *testing.T) {
nc := DefaultFeaturesConfig()
nc.features.ChannelsTopicTemplate, _ = template.New("custom-template").Parse("knative-channel-{{ .Namespace }}-{{ .Name }}-{{ .UID }}")
customTemplate, _ := template.New("custom-template").Parse("knative-channel-{{ .Namespace }}-{{ .Name }}-{{ .UID }}")
nc.features.ChannelsTopicTemplate = *customTemplate

result, err := nc.ExecuteChannelsTopicTemplate(v1.ObjectMeta{
Name: "topic",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ data:
controller.autoscaler: "enabled"
triggers.consumergroup.template: "knative-trigger-{{ .Namespace }}-{{ .Name }}"
brokers.topic.template: "knative-broker-{{ .Namespace }}-{{ .Name }}"
channels.topic.template: "knative-channel-{{ .Namespace }}-{{ .Name }}"
channels.topic.template: "knative-messaging-kafka.{{ .Namespace }}.{{ .Name }}"
25 changes: 18 additions & 7 deletions control-plane/pkg/reconciler/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"knative.dev/pkg/reconciler"
"knative.dev/pkg/resolver"

apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config"
"knative.dev/eventing-kafka-broker/control-plane/pkg/config"
"knative.dev/eventing-kafka-broker/control-plane/pkg/contract"
coreconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/core/config"
Expand All @@ -50,9 +51,6 @@ import (
)

const (
// TopicPrefix is the Kafka Broker topic prefix - (topic name: knative-broker-<broker-namespace>-<broker-name>).
TopicPrefix = "knative-broker-"

// ExternalTopicAnnotation for using external kafka topic for the broker
ExternalTopicAnnotation = "kafka.eventing.knative.dev/external.topic"

Expand Down Expand Up @@ -80,8 +78,9 @@ type Reconciler struct {

BootstrapServers string

Prober prober.Prober
Counter *counter.Counter
Prober prober.Prober
Counter *counter.Counter
KafkaFeatureFlags *apisconfig.KafkaFeatureFlags
}

func (r *Reconciler) ReconcileKind(ctx context.Context, broker *eventing.Broker) reconciler.Event {
Expand Down Expand Up @@ -311,7 +310,15 @@ func (r *Reconciler) reconcileBrokerTopic(broker *eventing.Broker, securityOptio
}
} else {
// no external topic, we create it
topicName = kafka.BrokerTopic(TopicPrefix, broker)
var existingTopic bool
// check if the broker already has reconciled with a topic name and use that if it exists
// otherwise, we create a new topic name from the broker topic template
if topicName, existingTopic = broker.Status.Annotations[kafka.TopicAnnotation]; !existingTopic {
topicName, err = r.KafkaFeatureFlags.ExecuteBrokersTopicTemplate(broker.ObjectMeta)
if err != nil {
return "", statusConditionManager.TopicsNotPresentOrInvalidErr([]string{topicName}, err)
}
}

topic, err := kafka.CreateTopicIfDoesntExist(kafkaClusterAdminClient, logger, topicName, topicConfig)
if err != nil {
Expand Down Expand Up @@ -503,7 +510,11 @@ func (r *Reconciler) finalizeNonExternalBrokerTopic(broker *eventing.Broker, sec
}
defer kafkaClusterAdminClient.Close()

topic, err := kafka.DeleteTopic(kafkaClusterAdminClient, kafka.BrokerTopic(TopicPrefix, broker))
topicName, ok := broker.Status.Annotations[kafka.TopicAnnotation]
if !ok {
return fmt.Errorf("no topic annotated on broker")
}
topic, err := kafka.DeleteTopic(kafkaClusterAdminClient, topicName)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit e9792d0

Please sign in to comment.