Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change default trigger consumergroup name and provide override #3033

Merged
merged 9 commits into from
Apr 20, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ metadata:
name: config-kafka-features
namespace: knative-eventing
annotations:
knative.dev/example-checksum: "b72f1c08"
knative.dev/example-checksum: "330d9f60"
data:
_example: |-
################################
Expand Down Expand Up @@ -33,6 +33,10 @@ data:
# 1. Enabled: KEDA autoscaling of consumers will be setup.
# 2. Disabled: KEDA autoscaling of consumers will not be setup.
controller.autoscaler: "disabled"
# The Go text/template used to generate consumergroup ID for triggers.
# The template can reference the trigger Kubernetes metadata only.
triggers.consumergroup.template: "knative-trigger-{{ .Namespace }}-{{ .Name }}"
dispatcher.rate-limiter: "disabled"
dispatcher.ordered-executor-metrics: "disabled"
controller.autoscaler: "disabled"
triggers.consumergroup.template: "knative-trigger-{{ .Namespace }}-{{ .Name }}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to include the UID as suffix by default?

The reason being that it will allow users replaying events by deleting/recreating, since the trigger is almost fully mutable deleting/recreating wouldn't be necessary for anything other than replaying events while also providing a well known group id prefix for monitoring

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the comment on the issue:

In addition, using the {{ . .Id }}, in iii you would still get a new consumergroup whenever the trigger is recreated, losing the offset again

it's not clear on why the need for recreating the trigger?

Copy link
Contributor Author

@EndPositive EndPositive Apr 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is just a matter of preference. If somehow our CD would recreate the trigger, we want to keep the original consumergroup. Such a name would be very safe for whenever k8s resources are destroyed.

allow users replaying events by deleting/recreating

I think for "complex" use cases like these, we should let the user manually reset offsets, or perhaps create a new trigger with a different name for replay purposes.

But either is fine by me, since we can override the configuration for our preference.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/lgtm
/approve

Thanks appreciate the additional thought

38 changes: 38 additions & 0 deletions control-plane/pkg/apis/config/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@
package config

import (
"bytes"
"context"
"fmt"
"strings"
"sync"
"text/template"

corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"knative.dev/eventing/pkg/apis/feature"
"knative.dev/pkg/configmap"
"knative.dev/pkg/logging"
Expand All @@ -35,19 +39,27 @@ type features struct {
DispatcherRateLimiter feature.Flag
DispatcherOrderedExecutorMetrics feature.Flag
ControllerAutoscaler feature.Flag
TriggersConsumerGroupTemplate *template.Template
}

type KafkaFeatureFlags struct {
features features
m sync.RWMutex
}

var DefaultTriggersConsumerGroupTemplate *template.Template

func init() {
DefaultTriggersConsumerGroupTemplate, _ = template.New("triggers.consumergroup.template").Parse("knative-trigger-{{ .Namespace }}-{{ .Name }}")
}

func DefaultFeaturesConfig() *KafkaFeatureFlags {
return &KafkaFeatureFlags{
features: features{
DispatcherRateLimiter: feature.Disabled,
DispatcherOrderedExecutorMetrics: feature.Disabled,
ControllerAutoscaler: feature.Disabled,
TriggersConsumerGroupTemplate: DefaultTriggersConsumerGroupTemplate,
},
}
}
Expand All @@ -59,6 +71,7 @@ func newFeaturesConfigFromMap(cm *corev1.ConfigMap) (*KafkaFeatureFlags, error)
asFlag("dispatcher.rate-limiter", &nc.features.DispatcherRateLimiter),
asFlag("dispatcher.ordered-executor-metrics", &nc.features.DispatcherOrderedExecutorMetrics),
asFlag("controller.autoscaler", &nc.features.ControllerAutoscaler),
asTemplate("triggers.consumergroup.template", nc.features.TriggersConsumerGroupTemplate),
)
return nc, err
}
Expand All @@ -81,6 +94,16 @@ func (f *KafkaFeatureFlags) IsControllerAutoscalerEnabled() bool {
return f.features.ControllerAutoscaler == feature.Enabled
}

func (f *KafkaFeatureFlags) ExecuteTriggersConsumerGroupTemplate(triggerMetadata v1.ObjectMeta) (string, error) {
var result bytes.Buffer
err := f.features.TriggersConsumerGroupTemplate.Execute(&result, triggerMetadata)
if err != nil {
return "", fmt.Errorf("unable to execute triggers consumergroup template: %w", err)
}

return result.String(), nil
}

// Store is a typed wrapper around configmap.Untyped store to handle our configmaps.
// +k8s:deepcopy-gen=false
type Store struct {
Expand Down Expand Up @@ -148,3 +171,18 @@ func asFlag(key string, target *feature.Flag) configmap.ParseFunc {
return nil
}
}

// asTemplate parses the value at key as a go text template into the target, if it exists.
func asTemplate(key string, target *template.Template) configmap.ParseFunc {
return func(data map[string]string) error {
if raw, ok := data[key]; ok {
tmlp, err := template.New(key).Parse(raw)
if err != nil {
return err
}

*target = *tmlp
}
return nil
}
}
37 changes: 37 additions & 0 deletions control-plane/pkg/apis/config/features_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package config
import (
"context"
"testing"
"text/template"

"github.com/stretchr/testify/require"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"knative.dev/eventing/pkg/apis/feature"
cm "knative.dev/pkg/configmap/testing"
_ "knative.dev/pkg/system/testing"
Expand Down Expand Up @@ -55,6 +57,9 @@ func TestGetFlags(t *testing.T) {
require.True(t, flags.IsDispatcherRateLimiterEnabled())
require.True(t, flags.IsDispatcherOrderedExecutorMetricsEnabled())
require.True(t, flags.IsControllerAutoscalerEnabled())
require.True(t, flags.IsControllerAutoscalerEnabled())
require.Len(t, flags.features.TriggersConsumerGroupTemplate.Tree.Root.Nodes, 4)
require.Equal(t, flags.features.TriggersConsumerGroupTemplate.Name(), "triggers.consumergroup.template")
}

func TestStoreLoadWithConfigMap(t *testing.T) {
Expand All @@ -69,6 +74,7 @@ func TestStoreLoadWithConfigMap(t *testing.T) {
require.Equal(t, expected.IsDispatcherRateLimiterEnabled(), have.IsDispatcherRateLimiterEnabled())
require.Equal(t, expected.IsDispatcherOrderedExecutorMetricsEnabled(), have.IsDispatcherOrderedExecutorMetricsEnabled())
require.Equal(t, expected.IsControllerAutoscalerEnabled(), have.IsControllerAutoscalerEnabled())
require.Equal(t, expected.features.TriggersConsumerGroupTemplate.Name(), have.features.TriggersConsumerGroupTemplate.Name())
}

func TestStoreLoadWithContext(t *testing.T) {
Expand All @@ -77,4 +83,35 @@ func TestStoreLoadWithContext(t *testing.T) {
require.False(t, have.IsDispatcherRateLimiterEnabled())
require.False(t, have.IsDispatcherOrderedExecutorMetricsEnabled())
require.False(t, have.IsControllerAutoscalerEnabled())
require.Equal(t, have.features.TriggersConsumerGroupTemplate.Name(), "triggers.consumergroup.template")
}

func TestExecuteTriggersConsumerGroupTemplateDefault(t *testing.T) {
nc := DefaultFeaturesConfig()
result, err := nc.ExecuteTriggersConsumerGroupTemplate(v1.ObjectMeta{
Name: "trigger",
Namespace: "namespace",
UID: "138ac0ec-2694-4747-900d-45be3da5c9a9",
})
if err != nil {
require.NoError(t, err)
}

require.Equal(t, result, "knative-trigger-namespace-trigger")
}

func TestExecuteTriggersConsumerGroupTemplateOverride(t *testing.T) {
nc := DefaultFeaturesConfig()
nc.features.TriggersConsumerGroupTemplate, _ = template.New("custom-template").Parse("knative-trigger-{{ .Namespace }}-{{ .Name }}-{{ .UID }}")

result, err := nc.ExecuteTriggersConsumerGroupTemplate(v1.ObjectMeta{
Name: "trigger",
Namespace: "namespace",
UID: "138ac0ec-2694-4747-900d-45be3da5c9a9",
})
if err != nil {
require.NoError(t, err)
}

require.Equal(t, result, "knative-trigger-namespace-trigger-138ac0ec-2694-4747-900d-45be3da5c9a9")
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ data:
dispatcher.rate-limiter: "enabled"
dispatcher.ordered-executor-metrics: "enabled"
controller.autoscaler: "enabled"
triggers.consumergroup.template: "knative-trigger-{{ .Namespace }}-{{ .Name }}"
45 changes: 45 additions & 0 deletions control-plane/pkg/kafka/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package kafka

import (
"context"
"fmt"

"github.com/Shopify/sarama"

"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/offset"
)

Expand All @@ -29,3 +31,46 @@ type InitOffsetsFunc func(ctx context.Context, kafkaClient sarama.Client, kafkaA
var (
_ InitOffsetsFunc = offset.InitOffsets
)

const (
GroupIdAnnotation = "group.id"
)

func AreConsumerGroupsPresentAndValid(kafkaClusterAdmin sarama.ClusterAdmin, consumerGroups ...string) (bool, error) {
if len(consumerGroups) == 0 {
return false, fmt.Errorf("expected at least one consumergroup, got 0")
}

consumerGroupDescription, err := kafkaClusterAdmin.DescribeConsumerGroups(consumerGroups)
if err != nil {
return false, fmt.Errorf("failed to describe consumer groups %v: %w", consumerGroups, err)
}

descriptionByConsumerGroup := make(map[string]*sarama.GroupDescription, len(consumerGroupDescription))
for _, d := range consumerGroupDescription {
descriptionByConsumerGroup[d.GroupId] = d
}

for _, t := range consumerGroups {
d, ok := descriptionByConsumerGroup[t]
if !ok {
return false, fmt.Errorf("kafka did not respond with consumer group metadata")
}
if !isValidSingleConsumerGroup(d) {
return false, nil
}
}
return true, nil
}

func isValidSingleConsumerGroup(metadata *sarama.GroupDescription) bool {
if metadata == nil {
return false
}

if metadata.State == "Dead" {
return false
}

return true
}
96 changes: 96 additions & 0 deletions control-plane/pkg/kafka/consumer_group_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 2023 The Knative Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka

import (
"testing"

"github.com/Shopify/sarama"

kafkatesting "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/testing"
)

func TestNewClusterAdminClientFuncIsConsumerGroupPresent(t *testing.T) {
tests := []struct {
name string
clusterAdmin sarama.ClusterAdmin
consumerGroups []string
want bool
wantErr bool
}{
{
name: "consumergroup does not exist",
clusterAdmin: &kafkatesting.MockKafkaClusterAdmin{
ExpectedConsumerGroups: []string{"consumer-group-name-1"},
ExpectedGroupDescriptionOnDescribeConsumerGroups: []*sarama.GroupDescription{
{
GroupId: "consumer-group-name-1",
State: "Dead",
},
},
T: t,
},
consumerGroups: []string{"consumer-group-name-1"},
want: false,
wantErr: false,
},
{
name: "consumergroup exists (empty)",
clusterAdmin: &kafkatesting.MockKafkaClusterAdmin{
ExpectedConsumerGroups: []string{"consumer-group-name-1"},
ExpectedGroupDescriptionOnDescribeConsumerGroups: []*sarama.GroupDescription{
{
GroupId: "consumer-group-name-1",
State: "Empty",
},
},
T: t,
},
consumerGroups: []string{"consumer-group-name-1"},
want: true,
wantErr: false,
},
{
name: "consumergroup exists (Stable)",
clusterAdmin: &kafkatesting.MockKafkaClusterAdmin{
ExpectedConsumerGroups: []string{"consumer-group-name-1"},
ExpectedGroupDescriptionOnDescribeConsumerGroups: []*sarama.GroupDescription{
{
GroupId: "consumer-group-name-1",
State: "Stable",
},
},
T: t,
},
consumerGroups: []string{"consumer-group-name-1"},
want: true,
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := AreConsumerGroupsPresentAndValid(tt.clusterAdmin, tt.consumerGroups...)
if (err != nil) != tt.wantErr {
t.Errorf("AreConsumerGroupsPresentAndValid() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("AreConsumerGroupsPresentAndValid() got = %v, want %v", got, tt.want)
}
})
}
}
11 changes: 10 additions & 1 deletion control-plane/pkg/kafka/testing/admin_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ type MockKafkaClusterAdmin struct {
ExpectedErrorOnDescribeTopics error
ExpectedTopicsMetadataOnDescribeTopics []*sarama.TopicMetadata

// DescribeConsumerGroups
ExpectedConsumerGroups []string
ExpectedErrorOnDescribeConsumerGroups error
ExpectedGroupDescriptionOnDescribeConsumerGroups []*sarama.GroupDescription

ErrorOnDeleteConsumerGroup error

T *testing.T
Expand Down Expand Up @@ -164,7 +169,11 @@ func (m *MockKafkaClusterAdmin) ListConsumerGroups() (map[string]string, error)
}

func (m *MockKafkaClusterAdmin) DescribeConsumerGroups(groups []string) ([]*sarama.GroupDescription, error) {
panic("implement me")
if !sets.NewString(m.ExpectedConsumerGroups...).HasAll(groups...) {
m.T.Errorf("unexpected consumer groups %v, expected %v", groups, m.ExpectedConsumerGroups)
}

return m.ExpectedGroupDescriptionOnDescribeConsumerGroups, m.ExpectedErrorOnDescribeTopics
}

func (m *MockKafkaClusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*sarama.OffsetFetchResponse, error) {
Expand Down
5 changes: 4 additions & 1 deletion control-plane/pkg/reconciler/trigger/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/offset"
v1 "knative.dev/eventing/pkg/client/informers/externalversions/eventing/v1"
kubeclient "knative.dev/pkg/client/injection/kube/client"
configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap"
Expand All @@ -34,6 +33,9 @@ import (
"knative.dev/pkg/logging"
"knative.dev/pkg/resolver"

apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config"
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/offset"

apiseventing "knative.dev/eventing/pkg/apis/eventing"
eventing "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/eventing/pkg/apis/feature"
Expand Down Expand Up @@ -85,6 +87,7 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
Env: configs,
BrokerClass: kafka.BrokerClass,
DataPlaneConfigMapLabeler: base.NoopConfigmapOption,
KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(),
NewKafkaClient: sarama.NewClient,
NewKafkaClusterAdminClient: sarama.NewClusterAdmin,
InitOffsetsFunc: offset.InitOffsets,
Expand Down
2 changes: 2 additions & 0 deletions control-plane/pkg/reconciler/trigger/namespaced_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
eventingclientset "knative.dev/eventing/pkg/client/clientset/versioned"
eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1"

apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config"
"knative.dev/eventing-kafka-broker/control-plane/pkg/config"
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka"
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base"
Expand Down Expand Up @@ -88,6 +89,7 @@ func (r *NamespacedReconciler) createReconcilerForTriggerInstance(trigger *event
// override
BrokerClass: kafka.NamespacedBrokerClass,
DataPlaneConfigMapLabeler: kafka.NamespacedDataplaneLabelConfigmapOption,
KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(),
NewKafkaClusterAdminClient: r.NewKafkaClusterAdminClient,
NewKafkaClient: r.NewKafkaClient,
InitOffsetsFunc: r.InitOffsetsFunc,
Expand Down
Loading