Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change default trigger consumergroup name and provide override #3033

Merged
merged 9 commits into from
Apr 20, 2023
Merged

Change default trigger consumergroup name and provide override #3033

merged 9 commits into from
Apr 20, 2023

Conversation

EndPositive
Copy link
Contributor

@EndPositive EndPositive commented Apr 6, 2023

Fixes #3020

Proposed Changes

  • Add a configuration to config-kafka-features that is a Go template of trigger's metadata with a default: triggers.consumergroup.template: knative-trigger-{{ .Namespace }}-{{ .Name }}
  • Add a status .Annotations[group.id] to all triggers, defaulting to trigger.UID if the consumer group already exists. If it does not yet exist, use the template in config-kafka-features to determine the new group id.
  • Use that status annotation when talking to kafka in the future

Release Note

The Trigger name and namespace will be used as a consumer group name for any new triggers made.

Docs

@knative-prow knative-prow bot added area/control-plane area/test needs-ok-to-test Indicates a PR that requires an org member to verify it is safe to test. size/L Denotes a PR that changes 100-499 lines, ignoring generated files. labels Apr 6, 2023
@knative-prow
Copy link

knative-prow bot commented Apr 6, 2023

Hi @EndPositive. Thanks for your PR.

I'm waiting for a knative-sandbox member to verify that this patch is reasonable to test. If it is, they should reply with /ok-to-test on its own line. Until that is done, I will not automatically test new commits in this PR, but the usual testing commands by org members will still work. Regular contributors should join the org to skip this step.

Once the patch is verified, the new status will be reflected by the ok-to-test label.

I understand the commands that are listed here.

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

@EndPositive
Copy link
Contributor Author

@pierDipi is the triggerv2 still maintained?

@pierDipi
Copy link
Member

Yes

@pierDipi
Copy link
Member

/ok-to-test

@knative-prow knative-prow bot added ok-to-test Indicates a non-member PR verified by an org member that is safe to test. and removed needs-ok-to-test Indicates a PR that requires an org member to verify it is safe to test. labels Apr 11, 2023
@codecov
Copy link

codecov bot commented Apr 11, 2023

Codecov Report

Merging #3033 (e0eed9e) into main (ae1231e) will decrease coverage by 0.04%.
The diff coverage is 59.13%.

@@             Coverage Diff              @@
##               main    #3033      +/-   ##
============================================
- Coverage     64.22%   64.19%   -0.04%     
  Complexity      755      755              
============================================
  Files           156      157       +1     
  Lines         11034    11122      +88     
  Branches        232      232              
============================================
+ Hits           7087     7140      +53     
- Misses         3436     3460      +24     
- Partials        511      522      +11     
Flag Coverage Δ
java-unittests 81.18% <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
control-plane/pkg/reconciler/trigger/trigger.go 66.46% <43.90%> (-1.69%) ⬇️
control-plane/pkg/kafka/consumer_group.go 58.62% <58.62%> (ø)
control-plane/pkg/apis/config/features.go 84.33% <70.00%> (-4.56%) ⬇️
...ntrol-plane/pkg/reconciler/trigger/v2/triggerv2.go 84.26% <72.72%> (-1.70%) ⬇️
control-plane/pkg/reconciler/trigger/controller.go 84.92% <100.00%> (+0.12%) ⬆️
...plane/pkg/reconciler/trigger/namespaced_trigger.go 90.24% <100.00%> (+0.24%) ⬆️
...ol-plane/pkg/reconciler/trigger/v2/controllerv2.go 60.24% <100.00%> (+0.48%) ⬆️

... and 1 file with indirect coverage changes

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

@knative-prow knative-prow bot added size/XL Denotes a PR that changes 500-999 lines, ignoring generated files. and removed size/L Denotes a PR that changes 100-499 lines, ignoring generated files. labels Apr 11, 2023
@EndPositive
Copy link
Contributor Author

/retest

@EndPositive
Copy link
Contributor Author

/retest

1 similar comment
@EndPositive
Copy link
Contributor Author

/retest

@pierDipi
Copy link
Member

I believe there are a few places where we assumed a certain structure for the consumer group id, like: https://github.com/knative-sandbox/eventing-kafka-broker/blob/7f3ec839e802d33985c67766bab1a1f0623c5adf/test/e2e/sacura_test.go#L159-L166

@pierDipi
Copy link
Member

The controller is panic-ing on https://storage.googleapis.com/knative-prow/pr-logs/pull/knative-sandbox_eventing-kafka-broker/3033/reconciler-tests-namespaced-broker_eventing-kafka-broker_main/1646087730972594176/artifacts/knative-eventing/kafka-controller

{"level":"info","ts":"2023-04-12T10:08:14.649Z","logger":"kafka-broker-controller.event-broadcaster","caller":"record/event.go:285","msg":"Event(v1.ObjectReference{Kind:\"Trigger\", Namespace:\"test-uakmssbv\", Name:\"trigger-osxhyhsp\", UID:\"b92ebe7a-2102-4bed-a106-fa79e5ca2e40\", APIVersion:\"eventing.knative.dev/v1\", ResourceVersion:\"17064\", FieldPath:\"\"}): type: 'Normal' reason: 'FinalizerUpdate' Updated \"trigger-osxhyhsp\" finalizers","commit":"ef86701","knative.dev/pod":"kafka-controller-7ff474fc88-x2qn4"}
E0412 10:08:14.671337       1 runtime.go:79] Observed a panic: "invalid memory address or nil pointer dereference" (runtime error: invalid memory address or nil pointer dereference)
goroutine 573 [running]:
k8s.io/apimachinery/pkg/util/runtime.logPanic({0x1e68880?, 0x35c34a0})
	k8s.io/apimachinery@v0.25.4/pkg/util/runtime/runtime.go:75 +0x99
k8s.io/apimachinery/pkg/util/runtime.HandleCrash({0x0, 0x0, 0xc00055f9d0?})
	k8s.io/apimachinery@v0.25.4/pkg/util/runtime/runtime.go:49 +0x75
panic({0x1e68880, 0x35c34a0})
	runtime/panic.go:884 +0x213
knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config.(*KafkaFeatureFlags).ExecuteTriggersConsumerGroupTemplate(0x0, {{0xc001c8ab10, 0x10}, {0x0, 0x0}, {0xc001c8ab20, 0xd}, {0x0, 0x0}, {0xc001be9260, ...}, ...})
	knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config/features.go:99 +0x39
knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/trigger.(*Reconciler).reconcileConsumerGroup(0xc0018c9830, {0x25108f8, 0xc001795ce0}, 0xc001ac2dc0, 0xc0008491e0)
	knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/trigger/trigger.go:444 +0x885
knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/trigger.(*Reconciler).reconcileKind(0xc0018c9830, {0x25108f8, 0xc001795ce0}, 0xc0008491e0)
	knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/trigger/trigger.go:147 +0x3a7
knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/trigger.(*Reconciler).ReconcileKind.func1()
	knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/trigger/trigger.go:91 +0x29
k8s.io/client-go/util/retry.OnError.func1()
	k8s.io/client-go@v0.25.4/util/retry/util.go:51 +0x33
k8s.io/apimachinery/pkg/util/wait.ConditionFunc.WithContext.func1({0x1f084e0, 0x1})
	k8s.io/apimachinery@v0.25.4/pkg/util/wait/wait.go:222 +0x1b
k8s.io/apimachinery/pkg/util/wait.runConditionWithCrashProtectionWithContext({0x2510888?, 0xc00005e028?}, 0x0?)
	k8s.io/apimachinery@v0.25.4/pkg/util/wait/wait.go:235 +0x57
k8s.io/apimachinery/pkg/util/wait.runConditionWithCrashProtection(0x1fc3240?)
	k8s.io/apimachinery@v0.25.4/pkg/util/wait/wait.go:228 +0x39
k8s.io/apimachinery/pkg/util/wait.ExponentialBackoff({0x989680, 0x4014000000000000, 0x3fb999999999999a, 0x4, 0x0}, 0x40dfe7?)
	k8s.io/apimachinery@v0.25.4/pkg/util/wait/wait.go:423 +0x5f
k8s.io/client-go/util/retry.OnError({0x989680, 0x4014000000000000, 0x3fb999999999999a, 0x4, 0x0}, 0x2279ab0, 0xc00186e960)
	k8s.io/client-go@v0.25.4/util/retry/util.go:50 +0xf1
k8s.io/client-go/util/retry.RetryOnConflict(...)
	k8s.io/client-go@v0.25.4/util/retry/util.go:104
knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/trigger.(*Reconciler).ReconcileKind(0xc0018c9830?, {0x25108f8?, 0xc001795ce0?}, 0xc0008491e0?)
	knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/trigger/trigger.go:90 +0x125
knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/trigger.(*NamespacedReconciler).ReconcileKind(0x5?, {0x25108f8, 0xc001795ce0}, 0xc000849330?)
	knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/trigger/namespaced_trigger.go:54 +0x3f
knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1/trigger.(*reconcilerImpl).Reconcile(0xc00052b5e0, {0x25108f8, 0xc001795c80}, {0xc001ad1d20, 0x1e})
	knative.dev/eventing@v0.36.1-0.20230309084530-05f6d84ad43c/pkg/client/injection/reconciler/eventing/v1/trigger/reconciler.go:240 +0x577
knative.dev/pkg/controller.(*Impl).processNextWorkItem(0xc0009aeba0)
	knative.dev/pkg@v0.0.0-20230309013522-c5dd1d1264ba/controller/controller.go:542 +0x4cd
knative.dev/pkg/controller.(*Impl).RunContext.func3()
	knative.dev/pkg@v0.0.0-20230309013522-c5dd1d1264ba/controller/controller.go:491 +0x68
created by knative.dev/pkg/controller.(*Impl).RunContext
	knative.dev/pkg@v0.0.0-20230309013522-c5dd1d1264ba/controller/controller.go:489 +0x354
panic: runtime error: invalid memory address or nil pointer dereference [recovered]
	panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x30 pc=0x1b80c19]

goroutine 573 [running]:
k8s.io/apimachinery/pkg/util/runtime.HandleCrash({0x0, 0x0, 0xc00055f9d0?})
	k8s.io/apimachinery@v0.25.4/pkg/util/runtime/runtime.go:56 +0xd7
panic({0x1e68880, 0x35c34a0})
	runtime/panic.go:884 +0x213
knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config.(*KafkaFeatureFlags).ExecuteTriggersConsumerGroupTemplate(0x0, {{0xc001c8ab10, 0x10}, {0x0, 0x0}, {0xc001c8ab20, 0xd}, {0x0, 0x0}, {0xc001be9260, ...}, ...})
	knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config/features.go:99 +0x39
knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/trigger.(*Reconciler).reconcileConsumerGroup(0xc0018c9830, {0x25108f8, 0xc001795ce0}, 0xc001ac2dc0, 0xc0008491e0)
	knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/trigger/trigger.go:444 +0x885
knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/trigger.(*Reconciler).reconcileKind(0xc0018c9830, {0x25108f8, 0xc001795ce0}, 0xc0008491e0)
	knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/trigger/trigger.go:147 +0x3a7
knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/trigger.(*Reconciler).ReconcileKind.func1()
	knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/trigger/trigger.go:91 +0x29
k8s.io/client-go/util/retry.OnError.func1()
	k8s.io/client-go@v0.25.4/util/retry/util.go:51 +0x33
k8s.io/apimachinery/pkg/util/wait.ConditionFunc.WithContext.func1({0x1f084e0, 0x1})
	k8s.io/apimachinery@v0.25.4/pkg/util/wait/wait.go:222 +0x1b
k8s.io/apimachinery/pkg/util/wait.runConditionWithCrashProtectionWithContext({0x2510888?, 0xc00005e028?}, 0x0?)
	k8s.io/apimachinery@v0.25.4/pkg/util/wait/wait.go:235 +0x57
k8s.io/apimachinery/pkg/util/wait.runConditionWithCrashProtection(0x1fc3240?)
	k8s.io/apimachinery@v0.25.4/pkg/util/wait/wait.go:228 +0x39
k8s.io/apimachinery/pkg/util/wait.ExponentialBackoff({0x989680, 0x4014000000000000, 0x3fb999999999999a, 0x4, 0x0}, 0x40dfe7?)
	k8s.io/apimachinery@v0.25.4/pkg/util/wait/wait.go:423 +0x5f
k8s.io/client-go/util/retry.OnError({0x989680, 0x4014000000000000, 0x3fb999999999999a, 0x4, 0x0}, 0x2279ab0, 0xc00186e960)
	k8s.io/client-go@v0.25.4/util/retry/util.go:50 +0xf1
k8s.io/client-go/util/retry.RetryOnConflict(...)
	k8s.io/client-go@v0.25.4/util/retry/util.go:104
knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/trigger.(*Reconciler).ReconcileKind(0xc0018c9830?, {0x25108f8?, 0xc001795ce0?}, 0xc0008491e0?)
	knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/trigger/trigger.go:90 +0x125
knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/trigger.(*NamespacedReconciler).ReconcileKind(0x5?, {0x25108f8, 0xc001795ce0}, 0xc000849330?)
	knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/trigger/namespaced_trigger.go:54 +0x3f
knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1/trigger.(*reconcilerImpl).Reconcile(0xc00052b5e0, {0x25108f8, 0xc001795c80}, {0xc001ad1d20, 0x1e})
	knative.dev/eventing@v0.36.1-0.20230309084530-05f6d84ad43c/pkg/client/injection/reconciler/eventing/v1/trigger/reconciler.go:240 +0x577
knative.dev/pkg/controller.(*Impl).processNextWorkItem(0xc0009aeba0)
	knative.dev/pkg@v0.0.0-20230309013522-c5dd1d1264ba/controller/controller.go:542 +0x4cd
knative.dev/pkg/controller.(*Impl).RunContext.func3()
	knative.dev/pkg@v0.0.0-20230309013522-c5dd1d1264ba/controller/controller.go:491 +0x68
created by knative.dev/pkg/controller.(*Impl).RunContext
	knative.dev/pkg@v0.0.0-20230309013522-c5dd1d1264ba/controller/controller.go:489 +0x354

@EndPositive
Copy link
Contributor Author

/retest

dispatcher.rate-limiter: "disabled"
dispatcher.ordered-executor-metrics: "disabled"
controller.autoscaler: "disabled"
triggers.consumergroup.template: "knative-trigger-{{ .Namespace }}-{{ .Name }}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to include the UID as suffix by default?

The reason being that it will allow users replaying events by deleting/recreating, since the trigger is almost fully mutable deleting/recreating wouldn't be necessary for anything other than replaying events while also providing a well known group id prefix for monitoring

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the comment on the issue:

In addition, using the {{ . .Id }}, in iii you would still get a new consumergroup whenever the trigger is recreated, losing the offset again

it's not clear on why the need for recreating the trigger?

Copy link
Contributor Author

@EndPositive EndPositive Apr 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is just a matter of preference. If somehow our CD would recreate the trigger, we want to keep the original consumergroup. Such a name would be very safe for whenever k8s resources are destroyed.

allow users replaying events by deleting/recreating

I think for "complex" use cases like these, we should let the user manually reset offsets, or perhaps create a new trigger with a different name for replay purposes.

But either is fine by me, since we can override the configuration for our preference.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/lgtm
/approve

Thanks appreciate the additional thought

@knative-prow knative-prow bot added the lgtm Indicates that a PR is ready to be merged. label Apr 20, 2023
@knative-prow
Copy link

knative-prow bot commented Apr 20, 2023

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: EndPositive, pierDipi

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@knative-prow knative-prow bot added the approved Indicates a PR has been approved by an approver from all required OWNERS files. label Apr 20, 2023
@pierDipi
Copy link
Member

/test upgrade-tests

@knative-prow knative-prow bot merged commit 293b798 into knative-extensions:main Apr 20, 2023
@xqianwang
Copy link

Great work. Thanks for contribution. We need this feature.

@EndPositive EndPositive deleted the feat/consumergroup-name branch April 26, 2023 08:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
approved Indicates a PR has been approved by an approver from all required OWNERS files. area/control-plane area/test lgtm Indicates that a PR is ready to be merged. ok-to-test Indicates a non-member PR verified by an org member that is safe to test. size/XL Denotes a PR that changes 500-999 lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Use reproducable consumergroup name for Triggers
3 participants