Skip to content

Commit

Permalink
Cherry pick custom kafka topics changes (#729)
Browse files Browse the repository at this point in the history
* Allow defining topic prefixes (knative-extensions#3157)

* Added kafka feature for broker and channel topic templates

* Added tests for broker and channel features

* Updated codegen

* Update Broker and Controller to use topic name templates (knative-extensions#3162)

* Updated broker to use broker topic name template

* Updated broker reconciliation to onlyuse topic name template if no existing topic name for broker

* Fixed import styling

* Updated channel to use topic template

* Updated channels to use topic annotations

* Fixed import formatting

* Added tests for reconciling with custom topic templates

* Fixed go imports

* use annotated topic names when deleting topics

* switched from failing tests to using panics if topic names not setup properly

* updated kafka feature yaml files to match channel topic template

* updated kafka feature default vars to be private

* updated tests

* Fixed goimports

* Fixed broker finalizer tests

* Updated unit tests

* Fixed unit tests

* Switched templates to not be pointers

* Updated broker and channel controllers to watch kafka feature config maps

Signed-off-by: Calum Murray <cmurray@redhat.com>

---------

Signed-off-by: Calum Murray <cmurray@redhat.com>

---------

Signed-off-by: Calum Murray <cmurray@redhat.com>
  • Loading branch information
Cali0707 authored Jul 5, 2023
1 parent fb326c8 commit 472dc51
Show file tree
Hide file tree
Showing 27 changed files with 740 additions and 79 deletions.
2 changes: 1 addition & 1 deletion control-plane/cmd/kafka-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func main() {
injection.NamedControllerConstructor{
Name: "channel-controller",
ControllerConstructor: func(ctx context.Context, watcher configmap.Watcher) *controller.Impl {
return channel.NewController(ctx, channelEnv)
return channel.NewController(ctx, watcher, channelEnv)
},
},

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ metadata:
name: config-kafka-features
namespace: knative-eventing
annotations:
knative.dev/example-checksum: "330d9f60"
knative.dev/example-checksum: "1192895d"
data:
_example: |-
################################
Expand Down Expand Up @@ -36,7 +36,15 @@ data:
# The Go text/template used to generate consumergroup ID for triggers.
# The template can reference the trigger Kubernetes metadata only.
triggers.consumergroup.template: "knative-trigger-{{ .Namespace }}-{{ .Name }}"
# The Go text/template used to generate topics for Brokers.
# The template can reference the broker Kubernetes metadata only.
brokers.topic.template: "knative-broker-{{ .Namespace }}-{{ .Name }}"
# The Go text/template used to generate topics for Channels.
# The template can reference the channel Kubernetes metadata only.
channels.topic.template: "knative-channel-{{ .Namespace }}-{{ .Name }}"
dispatcher.rate-limiter: "disabled"
dispatcher.ordered-executor-metrics: "disabled"
controller.autoscaler: "disabled"
triggers.consumergroup.template: "knative-trigger-{{ .Namespace }}-{{ .Name }}"
brokers.topic.template: "knative-broker-{{ .Namespace }}-{{ .Name }}"
channels.topic.template: "knative-messaging-kafka.{{ .Namespace }}.{{ .Name }}"
53 changes: 39 additions & 14 deletions control-plane/pkg/apis/config/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,27 @@ type features struct {
DispatcherRateLimiter feature.Flag
DispatcherOrderedExecutorMetrics feature.Flag
ControllerAutoscaler feature.Flag
TriggersConsumerGroupTemplate *template.Template
TriggersConsumerGroupTemplate template.Template
BrokersTopicTemplate template.Template
ChannelsTopicTemplate template.Template
}

type KafkaFeatureFlags struct {
features features
m sync.RWMutex
}

var DefaultTriggersConsumerGroupTemplate *template.Template
var (
defaultTriggersConsumerGroupTemplate *template.Template
defaultBrokersTopicTemplate *template.Template
defaultChannelsTopicTemplate *template.Template
)

func init() {
DefaultTriggersConsumerGroupTemplate, _ = template.New("triggers.consumergroup.template").Parse("knative-trigger-{{ .Namespace }}-{{ .Name }}")
defaultTriggersConsumerGroupTemplate, _ = template.New("triggers.consumergroup.template").Parse("knative-trigger-{{ .Namespace }}-{{ .Name }}")
defaultBrokersTopicTemplate, _ = template.New("brokers.topic.template").Parse("knative-broker-{{ .Namespace }}-{{ .Name }}")
// This will resolve to the old naming convention, to prevent errors switching over to the new topic templates approach
defaultChannelsTopicTemplate, _ = template.New("channels.topic.template").Parse("knative-messaging-kafka.{{ .Namespace }}.{{ .Name }}")
}

func DefaultFeaturesConfig() *KafkaFeatureFlags {
Expand All @@ -59,19 +68,23 @@ func DefaultFeaturesConfig() *KafkaFeatureFlags {
DispatcherRateLimiter: feature.Disabled,
DispatcherOrderedExecutorMetrics: feature.Disabled,
ControllerAutoscaler: feature.Disabled,
TriggersConsumerGroupTemplate: DefaultTriggersConsumerGroupTemplate,
TriggersConsumerGroupTemplate: *defaultTriggersConsumerGroupTemplate,
BrokersTopicTemplate: *defaultBrokersTopicTemplate,
ChannelsTopicTemplate: *defaultChannelsTopicTemplate,
},
}
}

// newFeaturesConfigFromMap creates a Features from the supplied Map
func newFeaturesConfigFromMap(cm *corev1.ConfigMap) (*KafkaFeatureFlags, error) {
// NewFeaturesConfigFromMap creates a Features from the supplied Map
func NewFeaturesConfigFromMap(cm *corev1.ConfigMap) (*KafkaFeatureFlags, error) {
nc := DefaultFeaturesConfig()
err := configmap.Parse(cm.Data,
asFlag("dispatcher.rate-limiter", &nc.features.DispatcherRateLimiter),
asFlag("dispatcher.ordered-executor-metrics", &nc.features.DispatcherOrderedExecutorMetrics),
asFlag("controller.autoscaler", &nc.features.ControllerAutoscaler),
asTemplate("triggers.consumergroup.template", nc.features.TriggersConsumerGroupTemplate),
asTemplate("triggers.consumergroup.template", &nc.features.TriggersConsumerGroupTemplate),
asTemplate("brokers.topic.template", &nc.features.BrokersTopicTemplate),
asTemplate("channels.topic.template", &nc.features.ChannelsTopicTemplate),
)
return nc, err
}
Expand All @@ -95,13 +108,15 @@ func (f *KafkaFeatureFlags) IsControllerAutoscalerEnabled() bool {
}

func (f *KafkaFeatureFlags) ExecuteTriggersConsumerGroupTemplate(triggerMetadata v1.ObjectMeta) (string, error) {
var result bytes.Buffer
err := f.features.TriggersConsumerGroupTemplate.Execute(&result, triggerMetadata)
if err != nil {
return "", fmt.Errorf("unable to execute triggers consumergroup template: %w", err)
}
return executeTemplateToString(f.features.TriggersConsumerGroupTemplate, triggerMetadata, "unable to execute triggers consumergroup template: %w")
}

return result.String(), nil
func (f *KafkaFeatureFlags) ExecuteBrokersTopicTemplate(brokerMetadata v1.ObjectMeta) (string, error) {
return executeTemplateToString(f.features.BrokersTopicTemplate, brokerMetadata, "unable to execute brokers topic template: %w")
}

func (f *KafkaFeatureFlags) ExecuteChannelsTopicTemplate(channelMetadata v1.ObjectMeta) (string, error) {
return executeTemplateToString(f.features.ChannelsTopicTemplate, channelMetadata, "unable to execute channels topic template: %w")
}

// Store is a typed wrapper around configmap.Untyped store to handle our configmaps.
Expand All @@ -117,7 +132,7 @@ func NewStore(ctx context.Context, onAfterStore ...func(name string, value *Kafk
"config-kafka-features",
logging.FromContext(ctx).Named("config-kafka-features"),
configmap.Constructors{
FlagsConfigName: newFeaturesConfigFromMap,
FlagsConfigName: NewFeaturesConfigFromMap,
},
func(name string, value interface{}) {
for _, f := range onAfterStore {
Expand Down Expand Up @@ -186,3 +201,13 @@ func asTemplate(key string, target *template.Template) configmap.ParseFunc {
return nil
}
}

func executeTemplateToString(template template.Template, metadata v1.ObjectMeta, errorMessage string) (string, error) {
var result bytes.Buffer
err := template.Execute(&result, metadata)
if err != nil {
return "", fmt.Errorf(errorMessage, err)
}

return result.String(), nil
}
78 changes: 75 additions & 3 deletions control-plane/pkg/apis/config/features_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestFlags_IsEnabled_ContainingFlag(t *testing.T) {

func TestGetFlags(t *testing.T) {
_, example := cm.ConfigMapsFromTestFile(t, FlagsConfigName)
flags, err := newFeaturesConfigFromMap(example)
flags, err := NewFeaturesConfigFromMap(example)
require.NoError(t, err)

require.True(t, flags.IsDispatcherRateLimiterEnabled())
Expand All @@ -60,6 +60,11 @@ func TestGetFlags(t *testing.T) {
require.True(t, flags.IsControllerAutoscalerEnabled())
require.Len(t, flags.features.TriggersConsumerGroupTemplate.Tree.Root.Nodes, 4)
require.Equal(t, flags.features.TriggersConsumerGroupTemplate.Name(), "triggers.consumergroup.template")
require.Len(t, flags.features.BrokersTopicTemplate.Tree.Root.Nodes, 4)
require.Equal(t, flags.features.BrokersTopicTemplate.Name(), "brokers.topic.template")
require.Len(t, flags.features.ChannelsTopicTemplate.Tree.Root.Nodes, 4)
require.Equal(t, flags.features.ChannelsTopicTemplate.Name(), "channels.topic.template")

}

func TestStoreLoadWithConfigMap(t *testing.T) {
Expand All @@ -69,12 +74,14 @@ func TestStoreLoadWithConfigMap(t *testing.T) {
store.OnConfigChanged(exampleConfig)

have := FromContext(store.ToContext(context.Background()))
expected, _ := newFeaturesConfigFromMap(exampleConfig)
expected, _ := NewFeaturesConfigFromMap(exampleConfig)

require.Equal(t, expected.IsDispatcherRateLimiterEnabled(), have.IsDispatcherRateLimiterEnabled())
require.Equal(t, expected.IsDispatcherOrderedExecutorMetricsEnabled(), have.IsDispatcherOrderedExecutorMetricsEnabled())
require.Equal(t, expected.IsControllerAutoscalerEnabled(), have.IsControllerAutoscalerEnabled())
require.Equal(t, expected.features.TriggersConsumerGroupTemplate.Name(), have.features.TriggersConsumerGroupTemplate.Name())
require.Equal(t, expected.features.BrokersTopicTemplate.Name(), have.features.BrokersTopicTemplate.Name())
require.Equal(t, expected.features.ChannelsTopicTemplate.Name(), have.features.ChannelsTopicTemplate.Name())
}

func TestStoreLoadWithContext(t *testing.T) {
Expand All @@ -84,6 +91,8 @@ func TestStoreLoadWithContext(t *testing.T) {
require.False(t, have.IsDispatcherOrderedExecutorMetricsEnabled())
require.False(t, have.IsControllerAutoscalerEnabled())
require.Equal(t, have.features.TriggersConsumerGroupTemplate.Name(), "triggers.consumergroup.template")
require.Equal(t, have.features.BrokersTopicTemplate.Name(), "brokers.topic.template")
require.Equal(t, have.features.ChannelsTopicTemplate.Name(), "channels.topic.template")
}

func TestExecuteTriggersConsumerGroupTemplateDefault(t *testing.T) {
Expand All @@ -102,7 +111,8 @@ func TestExecuteTriggersConsumerGroupTemplateDefault(t *testing.T) {

func TestExecuteTriggersConsumerGroupTemplateOverride(t *testing.T) {
nc := DefaultFeaturesConfig()
nc.features.TriggersConsumerGroupTemplate, _ = template.New("custom-template").Parse("knative-trigger-{{ .Namespace }}-{{ .Name }}-{{ .UID }}")
customTemplate, _ := template.New("custom-template").Parse("knative-trigger-{{ .Namespace }}-{{ .Name }}-{{ .UID }}")
nc.features.TriggersConsumerGroupTemplate = *customTemplate

result, err := nc.ExecuteTriggersConsumerGroupTemplate(v1.ObjectMeta{
Name: "trigger",
Expand All @@ -115,3 +125,65 @@ func TestExecuteTriggersConsumerGroupTemplateOverride(t *testing.T) {

require.Equal(t, result, "knative-trigger-namespace-trigger-138ac0ec-2694-4747-900d-45be3da5c9a9")
}

func TestExecuteBrokersTopicTemplateDefault(t *testing.T) {
nc := DefaultFeaturesConfig()
result, err := nc.ExecuteBrokersTopicTemplate(v1.ObjectMeta{
Name: "topic",
Namespace: "namespace",
UID: "138ac0ec-2694-4747-900d-45be3da5c9a9",
})
if err != nil {
require.NoError(t, err)
}

require.Equal(t, result, "knative-broker-namespace-topic")
}

func TestExecuteBrokersTopicTemplateOverride(t *testing.T) {
nc := DefaultFeaturesConfig()
customTemplate, _ := template.New("custom-template").Parse("knative-broker-{{ .Namespace }}-{{ .Name }}-{{ .UID }}")
nc.features.BrokersTopicTemplate = *customTemplate

result, err := nc.ExecuteBrokersTopicTemplate(v1.ObjectMeta{
Name: "topic",
Namespace: "namespace",
UID: "138ac0ec-2694-4747-900d-45be3da5c9a9",
})
if err != nil {
require.NoError(t, err)
}

require.Equal(t, result, "knative-broker-namespace-topic-138ac0ec-2694-4747-900d-45be3da5c9a9")
}

func TestExecuteChannelsTopicTemplateDefault(t *testing.T) {
nc := DefaultFeaturesConfig()
result, err := nc.ExecuteChannelsTopicTemplate(v1.ObjectMeta{
Name: "topic",
Namespace: "namespace",
UID: "138ac0ec-2694-4747-900d-45be3da5c9a9",
})
if err != nil {
require.NoError(t, err)
}

require.Equal(t, result, "knative-messaging-kafka.namespace.topic")
}

func TestExecuteChannelsTopicTemplateOverride(t *testing.T) {
nc := DefaultFeaturesConfig()
customTemplate, _ := template.New("custom-template").Parse("knative-channel-{{ .Namespace }}-{{ .Name }}-{{ .UID }}")
nc.features.ChannelsTopicTemplate = *customTemplate

result, err := nc.ExecuteChannelsTopicTemplate(v1.ObjectMeta{
Name: "topic",
Namespace: "namespace",
UID: "138ac0ec-2694-4747-900d-45be3da5c9a9",
})
if err != nil {
require.NoError(t, err)
}

require.Equal(t, result, "knative-channel-namespace-topic-138ac0ec-2694-4747-900d-45be3da5c9a9")
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ data:
dispatcher.ordered-executor-metrics: "enabled"
controller.autoscaler: "enabled"
triggers.consumergroup.template: "knative-trigger-{{ .Namespace }}-{{ .Name }}"
brokers.topic.template: "knative-broker-{{ .Namespace }}-{{ .Name }}"
channels.topic.template: "knative-messaging-kafka.{{ .Namespace }}.{{ .Name }}"
25 changes: 18 additions & 7 deletions control-plane/pkg/reconciler/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"knative.dev/pkg/reconciler"
"knative.dev/pkg/resolver"

apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config"
"knative.dev/eventing-kafka-broker/control-plane/pkg/config"
"knative.dev/eventing-kafka-broker/control-plane/pkg/contract"
coreconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/core/config"
Expand All @@ -48,9 +49,6 @@ import (
)

const (
// TopicPrefix is the Kafka Broker topic prefix - (topic name: knative-broker-<broker-namespace>-<broker-name>).
TopicPrefix = "knative-broker-"

// ExternalTopicAnnotation for using external kafka topic for the broker
ExternalTopicAnnotation = "kafka.eventing.knative.dev/external.topic"

Expand All @@ -72,8 +70,9 @@ type Reconciler struct {

BootstrapServers string

Prober prober.Prober
Counter *counter.Counter
Prober prober.Prober
Counter *counter.Counter
KafkaFeatureFlags *apisconfig.KafkaFeatureFlags
}

func (r *Reconciler) ReconcileKind(ctx context.Context, broker *eventing.Broker) reconciler.Event {
Expand Down Expand Up @@ -271,7 +270,15 @@ func (r *Reconciler) reconcileBrokerTopic(broker *eventing.Broker, securityOptio
}
} else {
// no external topic, we create it
topicName = kafka.BrokerTopic(TopicPrefix, broker)
var existingTopic bool
// check if the broker already has reconciled with a topic name and use that if it exists
// otherwise, we create a new topic name from the broker topic template
if topicName, existingTopic = broker.Status.Annotations[kafka.TopicAnnotation]; !existingTopic {
topicName, err = r.KafkaFeatureFlags.ExecuteBrokersTopicTemplate(broker.ObjectMeta)
if err != nil {
return "", statusConditionManager.TopicsNotPresentOrInvalidErr([]string{topicName}, err)
}
}

topic, err := kafka.CreateTopicIfDoesntExist(kafkaClusterAdminClient, logger, topicName, topicConfig)
if err != nil {
Expand Down Expand Up @@ -463,7 +470,11 @@ func (r *Reconciler) finalizeNonExternalBrokerTopic(broker *eventing.Broker, sec
}
defer kafkaClusterAdminClient.Close()

topic, err := kafka.DeleteTopic(kafkaClusterAdminClient, kafka.BrokerTopic(TopicPrefix, broker))
topicName, ok := broker.Status.Annotations[kafka.TopicAnnotation]
if !ok {
return fmt.Errorf("no topic annotated on broker")
}
topic, err := kafka.DeleteTopic(kafkaClusterAdminClient, topicName)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 472dc51

Please sign in to comment.