Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry pick custom kafka topics changes #729

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion control-plane/cmd/kafka-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func main() {
injection.NamedControllerConstructor{
Name: "channel-controller",
ControllerConstructor: func(ctx context.Context, watcher configmap.Watcher) *controller.Impl {
return channel.NewController(ctx, channelEnv)
return channel.NewController(ctx, watcher, channelEnv)
},
},

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ metadata:
name: config-kafka-features
namespace: knative-eventing
annotations:
knative.dev/example-checksum: "330d9f60"
knative.dev/example-checksum: "1192895d"
data:
_example: |-
################################
Expand Down Expand Up @@ -36,7 +36,15 @@ data:
# The Go text/template used to generate consumergroup ID for triggers.
# The template can reference the trigger Kubernetes metadata only.
triggers.consumergroup.template: "knative-trigger-{{ .Namespace }}-{{ .Name }}"
# The Go text/template used to generate topics for Brokers.
# The template can reference the broker Kubernetes metadata only.
brokers.topic.template: "knative-broker-{{ .Namespace }}-{{ .Name }}"
# The Go text/template used to generate topics for Channels.
# The template can reference the channel Kubernetes metadata only.
channels.topic.template: "knative-channel-{{ .Namespace }}-{{ .Name }}"
dispatcher.rate-limiter: "disabled"
dispatcher.ordered-executor-metrics: "disabled"
controller.autoscaler: "disabled"
triggers.consumergroup.template: "knative-trigger-{{ .Namespace }}-{{ .Name }}"
brokers.topic.template: "knative-broker-{{ .Namespace }}-{{ .Name }}"
channels.topic.template: "knative-messaging-kafka.{{ .Namespace }}.{{ .Name }}"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi. Is there a reason why the format is different for brokers and channels? Just noticed that the .Namespace and .Name are separated by - for brokers and by . for channels.

The example at line 44 looks better to me (separation by - and using a shorter name knative-channel instead of knative-messaging-kafka.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this was to match the existing formats for channel and broker names so that the default behaviour would not change

53 changes: 39 additions & 14 deletions control-plane/pkg/apis/config/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,27 @@ type features struct {
DispatcherRateLimiter feature.Flag
DispatcherOrderedExecutorMetrics feature.Flag
ControllerAutoscaler feature.Flag
TriggersConsumerGroupTemplate *template.Template
TriggersConsumerGroupTemplate template.Template
BrokersTopicTemplate template.Template
ChannelsTopicTemplate template.Template
}

type KafkaFeatureFlags struct {
features features
m sync.RWMutex
}

var DefaultTriggersConsumerGroupTemplate *template.Template
var (
defaultTriggersConsumerGroupTemplate *template.Template
defaultBrokersTopicTemplate *template.Template
defaultChannelsTopicTemplate *template.Template
)

func init() {
DefaultTriggersConsumerGroupTemplate, _ = template.New("triggers.consumergroup.template").Parse("knative-trigger-{{ .Namespace }}-{{ .Name }}")
defaultTriggersConsumerGroupTemplate, _ = template.New("triggers.consumergroup.template").Parse("knative-trigger-{{ .Namespace }}-{{ .Name }}")
defaultBrokersTopicTemplate, _ = template.New("brokers.topic.template").Parse("knative-broker-{{ .Namespace }}-{{ .Name }}")
// This will resolve to the old naming convention, to prevent errors switching over to the new topic templates approach
defaultChannelsTopicTemplate, _ = template.New("channels.topic.template").Parse("knative-messaging-kafka.{{ .Namespace }}.{{ .Name }}")
}

func DefaultFeaturesConfig() *KafkaFeatureFlags {
Expand All @@ -59,19 +68,23 @@ func DefaultFeaturesConfig() *KafkaFeatureFlags {
DispatcherRateLimiter: feature.Disabled,
DispatcherOrderedExecutorMetrics: feature.Disabled,
ControllerAutoscaler: feature.Disabled,
TriggersConsumerGroupTemplate: DefaultTriggersConsumerGroupTemplate,
TriggersConsumerGroupTemplate: *defaultTriggersConsumerGroupTemplate,
BrokersTopicTemplate: *defaultBrokersTopicTemplate,
ChannelsTopicTemplate: *defaultChannelsTopicTemplate,
},
}
}

// newFeaturesConfigFromMap creates a Features from the supplied Map
func newFeaturesConfigFromMap(cm *corev1.ConfigMap) (*KafkaFeatureFlags, error) {
// NewFeaturesConfigFromMap creates a Features from the supplied Map
func NewFeaturesConfigFromMap(cm *corev1.ConfigMap) (*KafkaFeatureFlags, error) {
nc := DefaultFeaturesConfig()
err := configmap.Parse(cm.Data,
asFlag("dispatcher.rate-limiter", &nc.features.DispatcherRateLimiter),
asFlag("dispatcher.ordered-executor-metrics", &nc.features.DispatcherOrderedExecutorMetrics),
asFlag("controller.autoscaler", &nc.features.ControllerAutoscaler),
asTemplate("triggers.consumergroup.template", nc.features.TriggersConsumerGroupTemplate),
asTemplate("triggers.consumergroup.template", &nc.features.TriggersConsumerGroupTemplate),
asTemplate("brokers.topic.template", &nc.features.BrokersTopicTemplate),
asTemplate("channels.topic.template", &nc.features.ChannelsTopicTemplate),
)
return nc, err
}
Expand All @@ -95,13 +108,15 @@ func (f *KafkaFeatureFlags) IsControllerAutoscalerEnabled() bool {
}

func (f *KafkaFeatureFlags) ExecuteTriggersConsumerGroupTemplate(triggerMetadata v1.ObjectMeta) (string, error) {
var result bytes.Buffer
err := f.features.TriggersConsumerGroupTemplate.Execute(&result, triggerMetadata)
if err != nil {
return "", fmt.Errorf("unable to execute triggers consumergroup template: %w", err)
}
return executeTemplateToString(f.features.TriggersConsumerGroupTemplate, triggerMetadata, "unable to execute triggers consumergroup template: %w")
}

return result.String(), nil
func (f *KafkaFeatureFlags) ExecuteBrokersTopicTemplate(brokerMetadata v1.ObjectMeta) (string, error) {
return executeTemplateToString(f.features.BrokersTopicTemplate, brokerMetadata, "unable to execute brokers topic template: %w")
}

func (f *KafkaFeatureFlags) ExecuteChannelsTopicTemplate(channelMetadata v1.ObjectMeta) (string, error) {
return executeTemplateToString(f.features.ChannelsTopicTemplate, channelMetadata, "unable to execute channels topic template: %w")
}

// Store is a typed wrapper around configmap.Untyped store to handle our configmaps.
Expand All @@ -117,7 +132,7 @@ func NewStore(ctx context.Context, onAfterStore ...func(name string, value *Kafk
"config-kafka-features",
logging.FromContext(ctx).Named("config-kafka-features"),
configmap.Constructors{
FlagsConfigName: newFeaturesConfigFromMap,
FlagsConfigName: NewFeaturesConfigFromMap,
},
func(name string, value interface{}) {
for _, f := range onAfterStore {
Expand Down Expand Up @@ -186,3 +201,13 @@ func asTemplate(key string, target *template.Template) configmap.ParseFunc {
return nil
}
}

func executeTemplateToString(template template.Template, metadata v1.ObjectMeta, errorMessage string) (string, error) {
var result bytes.Buffer
err := template.Execute(&result, metadata)
if err != nil {
return "", fmt.Errorf(errorMessage, err)
}

return result.String(), nil
}
78 changes: 75 additions & 3 deletions control-plane/pkg/apis/config/features_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestFlags_IsEnabled_ContainingFlag(t *testing.T) {

func TestGetFlags(t *testing.T) {
_, example := cm.ConfigMapsFromTestFile(t, FlagsConfigName)
flags, err := newFeaturesConfigFromMap(example)
flags, err := NewFeaturesConfigFromMap(example)
require.NoError(t, err)

require.True(t, flags.IsDispatcherRateLimiterEnabled())
Expand All @@ -60,6 +60,11 @@ func TestGetFlags(t *testing.T) {
require.True(t, flags.IsControllerAutoscalerEnabled())
require.Len(t, flags.features.TriggersConsumerGroupTemplate.Tree.Root.Nodes, 4)
require.Equal(t, flags.features.TriggersConsumerGroupTemplate.Name(), "triggers.consumergroup.template")
require.Len(t, flags.features.BrokersTopicTemplate.Tree.Root.Nodes, 4)
require.Equal(t, flags.features.BrokersTopicTemplate.Name(), "brokers.topic.template")
require.Len(t, flags.features.ChannelsTopicTemplate.Tree.Root.Nodes, 4)
require.Equal(t, flags.features.ChannelsTopicTemplate.Name(), "channels.topic.template")

}

func TestStoreLoadWithConfigMap(t *testing.T) {
Expand All @@ -69,12 +74,14 @@ func TestStoreLoadWithConfigMap(t *testing.T) {
store.OnConfigChanged(exampleConfig)

have := FromContext(store.ToContext(context.Background()))
expected, _ := newFeaturesConfigFromMap(exampleConfig)
expected, _ := NewFeaturesConfigFromMap(exampleConfig)

require.Equal(t, expected.IsDispatcherRateLimiterEnabled(), have.IsDispatcherRateLimiterEnabled())
require.Equal(t, expected.IsDispatcherOrderedExecutorMetricsEnabled(), have.IsDispatcherOrderedExecutorMetricsEnabled())
require.Equal(t, expected.IsControllerAutoscalerEnabled(), have.IsControllerAutoscalerEnabled())
require.Equal(t, expected.features.TriggersConsumerGroupTemplate.Name(), have.features.TriggersConsumerGroupTemplate.Name())
require.Equal(t, expected.features.BrokersTopicTemplate.Name(), have.features.BrokersTopicTemplate.Name())
require.Equal(t, expected.features.ChannelsTopicTemplate.Name(), have.features.ChannelsTopicTemplate.Name())
}

func TestStoreLoadWithContext(t *testing.T) {
Expand All @@ -84,6 +91,8 @@ func TestStoreLoadWithContext(t *testing.T) {
require.False(t, have.IsDispatcherOrderedExecutorMetricsEnabled())
require.False(t, have.IsControllerAutoscalerEnabled())
require.Equal(t, have.features.TriggersConsumerGroupTemplate.Name(), "triggers.consumergroup.template")
require.Equal(t, have.features.BrokersTopicTemplate.Name(), "brokers.topic.template")
require.Equal(t, have.features.ChannelsTopicTemplate.Name(), "channels.topic.template")
}

func TestExecuteTriggersConsumerGroupTemplateDefault(t *testing.T) {
Expand All @@ -102,7 +111,8 @@ func TestExecuteTriggersConsumerGroupTemplateDefault(t *testing.T) {

func TestExecuteTriggersConsumerGroupTemplateOverride(t *testing.T) {
nc := DefaultFeaturesConfig()
nc.features.TriggersConsumerGroupTemplate, _ = template.New("custom-template").Parse("knative-trigger-{{ .Namespace }}-{{ .Name }}-{{ .UID }}")
customTemplate, _ := template.New("custom-template").Parse("knative-trigger-{{ .Namespace }}-{{ .Name }}-{{ .UID }}")
nc.features.TriggersConsumerGroupTemplate = *customTemplate

result, err := nc.ExecuteTriggersConsumerGroupTemplate(v1.ObjectMeta{
Name: "trigger",
Expand All @@ -115,3 +125,65 @@ func TestExecuteTriggersConsumerGroupTemplateOverride(t *testing.T) {

require.Equal(t, result, "knative-trigger-namespace-trigger-138ac0ec-2694-4747-900d-45be3da5c9a9")
}

func TestExecuteBrokersTopicTemplateDefault(t *testing.T) {
nc := DefaultFeaturesConfig()
result, err := nc.ExecuteBrokersTopicTemplate(v1.ObjectMeta{
Name: "topic",
Namespace: "namespace",
UID: "138ac0ec-2694-4747-900d-45be3da5c9a9",
})
if err != nil {
require.NoError(t, err)
}

require.Equal(t, result, "knative-broker-namespace-topic")
}

func TestExecuteBrokersTopicTemplateOverride(t *testing.T) {
nc := DefaultFeaturesConfig()
customTemplate, _ := template.New("custom-template").Parse("knative-broker-{{ .Namespace }}-{{ .Name }}-{{ .UID }}")
nc.features.BrokersTopicTemplate = *customTemplate

result, err := nc.ExecuteBrokersTopicTemplate(v1.ObjectMeta{
Name: "topic",
Namespace: "namespace",
UID: "138ac0ec-2694-4747-900d-45be3da5c9a9",
})
if err != nil {
require.NoError(t, err)
}

require.Equal(t, result, "knative-broker-namespace-topic-138ac0ec-2694-4747-900d-45be3da5c9a9")
}

func TestExecuteChannelsTopicTemplateDefault(t *testing.T) {
nc := DefaultFeaturesConfig()
result, err := nc.ExecuteChannelsTopicTemplate(v1.ObjectMeta{
Name: "topic",
Namespace: "namespace",
UID: "138ac0ec-2694-4747-900d-45be3da5c9a9",
})
if err != nil {
require.NoError(t, err)
}

require.Equal(t, result, "knative-messaging-kafka.namespace.topic")
}

func TestExecuteChannelsTopicTemplateOverride(t *testing.T) {
nc := DefaultFeaturesConfig()
customTemplate, _ := template.New("custom-template").Parse("knative-channel-{{ .Namespace }}-{{ .Name }}-{{ .UID }}")
nc.features.ChannelsTopicTemplate = *customTemplate

result, err := nc.ExecuteChannelsTopicTemplate(v1.ObjectMeta{
Name: "topic",
Namespace: "namespace",
UID: "138ac0ec-2694-4747-900d-45be3da5c9a9",
})
if err != nil {
require.NoError(t, err)
}

require.Equal(t, result, "knative-channel-namespace-topic-138ac0ec-2694-4747-900d-45be3da5c9a9")
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ data:
dispatcher.ordered-executor-metrics: "enabled"
controller.autoscaler: "enabled"
triggers.consumergroup.template: "knative-trigger-{{ .Namespace }}-{{ .Name }}"
brokers.topic.template: "knative-broker-{{ .Namespace }}-{{ .Name }}"
channels.topic.template: "knative-messaging-kafka.{{ .Namespace }}.{{ .Name }}"
25 changes: 18 additions & 7 deletions control-plane/pkg/reconciler/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"knative.dev/pkg/reconciler"
"knative.dev/pkg/resolver"

apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config"
"knative.dev/eventing-kafka-broker/control-plane/pkg/config"
"knative.dev/eventing-kafka-broker/control-plane/pkg/contract"
coreconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/core/config"
Expand All @@ -48,9 +49,6 @@ import (
)

const (
// TopicPrefix is the Kafka Broker topic prefix - (topic name: knative-broker-<broker-namespace>-<broker-name>).
TopicPrefix = "knative-broker-"

// ExternalTopicAnnotation for using external kafka topic for the broker
ExternalTopicAnnotation = "kafka.eventing.knative.dev/external.topic"

Expand All @@ -72,8 +70,9 @@ type Reconciler struct {

BootstrapServers string

Prober prober.Prober
Counter *counter.Counter
Prober prober.Prober
Counter *counter.Counter
KafkaFeatureFlags *apisconfig.KafkaFeatureFlags
}

func (r *Reconciler) ReconcileKind(ctx context.Context, broker *eventing.Broker) reconciler.Event {
Expand Down Expand Up @@ -271,7 +270,15 @@ func (r *Reconciler) reconcileBrokerTopic(broker *eventing.Broker, securityOptio
}
} else {
// no external topic, we create it
topicName = kafka.BrokerTopic(TopicPrefix, broker)
var existingTopic bool
// check if the broker already has reconciled with a topic name and use that if it exists
// otherwise, we create a new topic name from the broker topic template
if topicName, existingTopic = broker.Status.Annotations[kafka.TopicAnnotation]; !existingTopic {
topicName, err = r.KafkaFeatureFlags.ExecuteBrokersTopicTemplate(broker.ObjectMeta)
if err != nil {
return "", statusConditionManager.TopicsNotPresentOrInvalidErr([]string{topicName}, err)
}
}

topic, err := kafka.CreateTopicIfDoesntExist(kafkaClusterAdminClient, logger, topicName, topicConfig)
if err != nil {
Expand Down Expand Up @@ -463,7 +470,11 @@ func (r *Reconciler) finalizeNonExternalBrokerTopic(broker *eventing.Broker, sec
}
defer kafkaClusterAdminClient.Close()

topic, err := kafka.DeleteTopic(kafkaClusterAdminClient, kafka.BrokerTopic(TopicPrefix, broker))
topicName, ok := broker.Status.Annotations[kafka.TopicAnnotation]
if !ok {
return fmt.Errorf("no topic annotated on broker")
}
topic, err := kafka.DeleteTopic(kafkaClusterAdminClient, topicName)
if err != nil {
return err
}
Expand Down
Loading