From 7d38e17a3e47f229a6e8f315993e96e122e816ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Fri, 2 Feb 2024 09:30:29 +0100 Subject: [PATCH] Create KafkaSources OIDC service account and expose in its status (#3660) * Create KafkaSources OIDC service account and expose in its status * Run goimport --- .../apis/sources/v1beta1/kafka_lifecycle.go | 20 ++++ .../pkg/reconciler/source/controller.go | 44 +++++++- .../pkg/reconciler/source/controller_test.go | 7 ++ control-plane/pkg/reconciler/source/source.go | 22 +++- .../pkg/reconciler/source/source_test.go | 103 +++++++++++++++++- .../pkg/reconciler/testing/objects_source.go | 26 +++++ 6 files changed, 208 insertions(+), 14 deletions(-) diff --git a/control-plane/pkg/apis/sources/v1beta1/kafka_lifecycle.go b/control-plane/pkg/apis/sources/v1beta1/kafka_lifecycle.go index b7457341ee..64028ec369 100644 --- a/control-plane/pkg/apis/sources/v1beta1/kafka_lifecycle.go +++ b/control-plane/pkg/apis/sources/v1beta1/kafka_lifecycle.go @@ -46,6 +46,9 @@ const ( // KafkaConditionInitialOffsetsCommitted is True when the KafkaSource has committed the // initial offset of all claims KafkaConditionInitialOffsetsCommitted apis.ConditionType = "InitialOffsetsCommitted" + + // KafkaConditionOIDCIdentityCreated has status True when the KafkaSource has created an OIDC identity. + KafkaConditionOIDCIdentityCreated apis.ConditionType = "OIDCIdentityCreated" ) var ( @@ -54,6 +57,7 @@ var ( KafkaConditionDeployed, KafkaConditionConnectionEstablished, KafkaConditionInitialOffsetsCommitted, + KafkaConditionOIDCIdentityCreated, ) kafkaCondSetLock = sync.RWMutex{} @@ -160,6 +164,22 @@ func (s *KafkaSourceStatus) MarkInitialOffsetNotCommitted(reason, messageFormat KafkaSourceCondSet.Manage(s).MarkFalse(KafkaConditionInitialOffsetsCommitted, reason, messageFormat, messageA...) } +func (s *KafkaSourceStatus) MarkOIDCIdentityCreatedSucceeded() { + KafkaSourceCondSet.Manage(s).MarkTrue(KafkaConditionOIDCIdentityCreated) +} + +func (s *KafkaSourceStatus) MarkOIDCIdentityCreatedSucceededWithReason(reason, messageFormat string, messageA ...interface{}) { + KafkaSourceCondSet.Manage(s).MarkTrueWithReason(KafkaConditionOIDCIdentityCreated, reason, messageFormat, messageA...) +} + +func (s *KafkaSourceStatus) MarkOIDCIdentityCreatedFailed(reason, messageFormat string, messageA ...interface{}) { + KafkaSourceCondSet.Manage(s).MarkFalse(KafkaConditionOIDCIdentityCreated, reason, messageFormat, messageA...) +} + +func (s *KafkaSourceStatus) MarkOIDCIdentityCreatedUnknown(reason, messageFormat string, messageA ...interface{}) { + KafkaSourceCondSet.Manage(s).MarkUnknown(KafkaConditionOIDCIdentityCreated, reason, messageFormat, messageA...) +} + func (s *KafkaSourceStatus) UpdateConsumerGroupStatus(status string) { s.Claims = status } diff --git a/control-plane/pkg/reconciler/source/controller.go b/control-plane/pkg/reconciler/source/controller.go index fb8fb503eb..a74ac95f50 100644 --- a/control-plane/pkg/reconciler/source/controller.go +++ b/control-plane/pkg/reconciler/source/controller.go @@ -19,6 +19,9 @@ package source import ( "context" + "knative.dev/eventing/pkg/apis/feature" + "knative.dev/pkg/logging" + "k8s.io/client-go/tools/cache" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" @@ -34,29 +37,51 @@ import ( "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/consumergroup" kedaclient "knative.dev/eventing-kafka-broker/third_party/pkg/client/injection/client" + + kubeclient "knative.dev/pkg/client/injection/kube/client" + serviceaccountinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount" ) func NewController(ctx context.Context, watcher configmap.Watcher) *controller.Impl { kafkaInformer := kafkainformer.Get(ctx) consumerGroupInformer := consumergroupinformer.Get(ctx) + serviceaccountInformer := serviceaccountinformer.Get(ctx) sources.RegisterAlternateKafkaConditionSet(conditionSet) + var globalResync func() + featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) { + if globalResync != nil { + globalResync() + } + }) + featureStore.WatchConfigs(watcher) + r := &Reconciler{ - ConsumerGroupLister: consumerGroupInformer.Lister(), - InternalsClient: consumergroupclient.Get(ctx), - KedaClient: kedaclient.Get(ctx), - KafkaFeatureFlags: config.DefaultFeaturesConfig(), + KubeClient: kubeclient.Get(ctx), + ConsumerGroupLister: consumerGroupInformer.Lister(), + InternalsClient: consumergroupclient.Get(ctx), + KedaClient: kedaclient.Get(ctx), + KafkaFeatureFlags: config.DefaultFeaturesConfig(), + ServiceAccountLister: serviceaccountInformer.Lister(), } - impl := kafkasource.NewImpl(ctx, r) + impl := kafkasource.NewImpl(ctx, r, func(impl *controller.Impl) controller.Options { + return controller.Options{ + ConfigStore: featureStore, + } + }) + + globalResync = func() { + impl.GlobalResync(kafkaInformer.Informer()) + } kafkaInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue)) configStore := config.NewStore(ctx, func(name string, value *config.KafkaFeatureFlags) { r.KafkaFeatureFlags.Reset(value) - impl.GlobalResync(kafkaInformer.Informer()) + globalResync() }) configStore.WatchConfigs(watcher) @@ -65,5 +90,12 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I FilterFunc: consumergroup.Filter("kafkasource"), Handler: controller.HandleAll(consumergroup.Enqueue("kafkasource", impl.EnqueueKey)), }) + + // Reconcile KafkaSource when the OIDC service account changes + serviceaccountInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: controller.FilterController(&sources.KafkaSource{}), + Handler: controller.HandleAll(impl.EnqueueControllerOf), + }) + return impl } diff --git a/control-plane/pkg/reconciler/source/controller_test.go b/control-plane/pkg/reconciler/source/controller_test.go index 3ccfefb94e..eaff75ab92 100644 --- a/control-plane/pkg/reconciler/source/controller_test.go +++ b/control-plane/pkg/reconciler/source/controller_test.go @@ -19,6 +19,8 @@ package source import ( "testing" + "knative.dev/eventing/pkg/apis/feature" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -40,6 +42,7 @@ import ( "knative.dev/eventing-kafka-broker/control-plane/pkg/config" kedaclient "knative.dev/eventing-kafka-broker/third_party/pkg/client/injection/client/fake" + _ "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/fake" ) func TestNewController(t *testing.T) { @@ -69,6 +72,10 @@ func TestNewController(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "config-kafka-features", }, + }, &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: feature.FlagsConfigName, + }, })) if controller == nil { t.Error("failed to create controller: ") diff --git a/control-plane/pkg/reconciler/source/source.go b/control-plane/pkg/reconciler/source/source.go index 34306f2558..07d28fb71f 100644 --- a/control-plane/pkg/reconciler/source/source.go +++ b/control-plane/pkg/reconciler/source/source.go @@ -21,6 +21,11 @@ import ( "fmt" "strings" + "k8s.io/client-go/kubernetes" + corelisters "k8s.io/client-go/listers/core/v1" + "knative.dev/eventing/pkg/apis/feature" + "knative.dev/eventing/pkg/auth" + "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -57,10 +62,12 @@ var ( ) type Reconciler struct { - ConsumerGroupLister internalslst.ConsumerGroupLister - InternalsClient internalsclient.Interface - KedaClient kedaclientset.Interface - KafkaFeatureFlags *config.KafkaFeatureFlags + KubeClient kubernetes.Interface + ConsumerGroupLister internalslst.ConsumerGroupLister + InternalsClient internalsclient.Interface + KedaClient kedaclientset.Interface + KafkaFeatureFlags *config.KafkaFeatureFlags + ServiceAccountLister corelisters.ServiceAccountLister } func (r *Reconciler) ReconcileKind(ctx context.Context, ks *sources.KafkaSource) reconciler.Event { @@ -71,6 +78,13 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, ks *sources.KafkaSource) } ks.Status.Selector = selector.String() + err = auth.SetupOIDCServiceAccount(ctx, feature.FromContext(ctx), r.ServiceAccountLister, r.KubeClient, sources.SchemeGroupVersion.WithKind("KafkaSource"), ks.ObjectMeta, &ks.Status, func(as *duckv1.AuthStatus) { + ks.Status.Auth = as + }) + if err != nil { + return fmt.Errorf("could not setup OIDC service account for KafkaSource %s/%s: %w", ks.Name, ks.Namespace, err) + } + cg, err := r.reconcileConsumerGroup(ctx, ks) if err != nil { ks.GetConditionSet().Manage(&ks.Status).MarkFalse(KafkaConditionConsumerGroup, "failed to reconcile consumer group", err.Error()) diff --git a/control-plane/pkg/reconciler/source/source_test.go b/control-plane/pkg/reconciler/source/source_test.go index dfec70ad63..badac46617 100644 --- a/control-plane/pkg/reconciler/source/source_test.go +++ b/control-plane/pkg/reconciler/source/source_test.go @@ -21,6 +21,9 @@ import ( "fmt" "testing" + "knative.dev/eventing/pkg/apis/feature" + "knative.dev/eventing/pkg/auth" + "github.com/google/go-cmp/cmp" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -52,6 +55,8 @@ import ( . "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/testing" kedaclient "knative.dev/eventing-kafka-broker/third_party/pkg/client/injection/client/fake" + + fakekubeclient "knative.dev/pkg/client/injection/kube/client/fake" ) const ( @@ -148,6 +153,7 @@ func TestReconcileKind(t *testing.T) { StatusSourceConsumerGroupUnknown(), StatusSourceSinkResolved(""), StatusSourceSelector(), + StatusSourceOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled(), ), }, }, @@ -202,6 +208,7 @@ func TestReconcileKind(t *testing.T) { StatusSourceConsumerGroupUnknown(), StatusSourceSinkResolved(""), StatusSourceSelector(), + StatusSourceOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled(), ), }, }, @@ -256,6 +263,7 @@ func TestReconcileKind(t *testing.T) { StatusSourceConsumerGroupUnknown(), StatusSourceSinkResolved(""), StatusSourceSelector(), + StatusSourceOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled(), ), }, }, @@ -315,6 +323,7 @@ func TestReconcileKind(t *testing.T) { StatusSourceConsumerGroupUnknown(), StatusSourceSinkResolved(""), StatusSourceSelector(), + StatusSourceOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled(), ), }, }, @@ -370,6 +379,7 @@ func TestReconcileKind(t *testing.T) { StatusSourceConsumerGroupUnknown(), StatusSourceSinkResolved(""), StatusSourceSelector(), + StatusSourceOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled(), ), }, }, @@ -506,6 +516,7 @@ func TestReconcileKind(t *testing.T) { StatusSourceSinkResolved(""), SourceNetSaslTls(true), StatusSourceSelector(), + StatusSourceOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled(), ), }, }, @@ -626,6 +637,7 @@ func TestReconcileKind(t *testing.T) { StatusSourceSinkResolved(""), SourceNetSaslTls(false), StatusSourceSelector(), + StatusSourceOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled(), ), }, }, @@ -688,6 +700,7 @@ func TestReconcileKind(t *testing.T) { StatusSourceConsumerGroupUnknown(), StatusSourceSinkResolved(""), StatusSourceSelector(), + StatusSourceOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled(), ), }, }, @@ -743,6 +756,7 @@ func TestReconcileKind(t *testing.T) { StatusSourceSinkResolved(""), StatusSourceSelector(), WithAutoscalingAnnotationsSource(), + StatusSourceOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled(), ), }, }, @@ -832,6 +846,7 @@ func TestReconcileKind(t *testing.T) { StatusSourceConsumerGroup(), StatusSourceSinkResolved(""), StatusSourceSelector(), + StatusSourceOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled(), ), }, }, @@ -897,6 +912,7 @@ func TestReconcileKind(t *testing.T) { StatusSourceConsumerGroup(), StatusSourceSinkResolved(""), StatusSourceSelector(), + StatusSourceOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled(), ), }, }, @@ -963,6 +979,7 @@ func TestReconcileKind(t *testing.T) { StatusSourceSinkResolved(""), StatusSourceSelector(), WithAutoscalingAnnotationsSource(), + StatusSourceOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled(), ), }, }, @@ -1027,6 +1044,7 @@ func TestReconcileKind(t *testing.T) { StatusSourceConsumerGroupUnknown(), StatusSourceSinkResolved(""), StatusSourceSelector(), + StatusSourceOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled(), ), }, }, @@ -1080,6 +1098,7 @@ func TestReconcileKind(t *testing.T) { StatusSourceSinkResolved(""), StatusSourceSelector(), WithAutoscalingAnnotationsSource(), + StatusSourceOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled(), ), }, }, @@ -1131,6 +1150,7 @@ func TestReconcileKind(t *testing.T) { StatusSourceConsumerGroupUnknown(), StatusSourceSinkResolved(""), StatusSourceSelector(), + StatusSourceOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled(), ), }, }, @@ -1183,6 +1203,7 @@ func TestReconcileKind(t *testing.T) { StatusSourceConsumerGroupFailed("failed", "failed"), StatusSourceSinkResolved(""), StatusSourceSelector(), + StatusSourceOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled(), ), }, }, @@ -1237,6 +1258,7 @@ func TestReconcileKind(t *testing.T) { StatusSourceSinkResolved(""), StatusSourceConsumerGroupReplicas(1), StatusSourceSelector(), + StatusSourceOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled(), ), }, }, @@ -1325,6 +1347,7 @@ func TestReconcileKind(t *testing.T) { StatusSourceSinkResolved(""), StatusSourceConsumerGroupReplicas(1), StatusSourceSelector(), + StatusSourceOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled(), ), }, }, @@ -1385,6 +1408,7 @@ func TestReconcileKind(t *testing.T) { StatusSourceSinkResolved(""), StatusSourceConsumerGroupReplicas(1), StatusSourceSelector(), + StatusSourceOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled(), ), }, }, @@ -1439,6 +1463,7 @@ func TestReconcileKind(t *testing.T) { StatusSourceConsumerGroupUnknown(), StatusSourceSinkResolved(""), StatusSourceSelector(), + StatusSourceOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled(), ), }, }, @@ -1493,6 +1518,66 @@ func TestReconcileKind(t *testing.T) { StatusSourceConsumerGroupUnknown(), StatusSourceSinkResolved(""), StatusSourceSelector(), + StatusSourceOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled(), + ), + }, + }, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(), + }, + WantEvents: []string{ + finalizerUpdatedEvent, + }, + }, + { + Name: "Reconciled normal - with OIDC enabled", + Ctx: feature.ToContext(context.Background(), feature.Flags{ + feature.OIDCAuthentication: feature.Enabled, + }), + Objects: []runtime.Object{ + NewSource(), + }, + Key: testKey, + WantCreates: []runtime.Object{ + makeKafkaSourceOIDCServiceAccount(), + NewConsumerGroup( + WithConsumerGroupFinalizer(), + WithConsumerGroupName(SourceUUID), + WithConsumerGroupNamespace(SourceNamespace), + WithConsumerGroupOwnerRef(kmeta.NewControllerRef(NewSource())), + WithConsumerGroupMetaLabels(OwnerAsSourceLabel), + WithConsumerGroupLabels(ConsumerSourceLabel), + ConsumerGroupConsumerSpec(NewConsumerSpec( + ConsumerTopics(SourceTopics[0], SourceTopics[1]), + ConsumerConfigs( + ConsumerGroupIdConfig(SourceConsumerGroup), + ConsumerBootstrapServersConfig(SourceBootstrapServers), + ), + ConsumerAuth(NewConsumerSpecAuth()), + ConsumerDelivery( + NewConsumerSpecDelivery( + sources.Ordered, + NewConsumerTimeout("PT600S"), + NewConsumerRetry(10), + NewConsumerBackoffDelay("PT0.3S"), + NewConsumerBackoffPolicy(eventingduck.BackoffPolicyExponential), + ConsumerInitialOffset(sources.OffsetLatest), + ), + ), + ConsumerSubscriber(NewSourceSinkReference()), + ConsumerReply(ConsumerNoReply()), + )), + ConsumerGroupReplicas(1), + ), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{ + { + Object: NewSource( + StatusSourceConsumerGroupUnknown(), + StatusSourceSinkResolved(""), + StatusSourceSelector(), + StatusSourceOIDCIdentityCreatedSucceeded(), + StatusSourceOIDCIdentity(makeKafkaSourceOIDCServiceAccount().Name), ), }, }, @@ -1523,10 +1608,12 @@ func TestReconcileKind(t *testing.T) { } reconciler := &Reconciler{ - ConsumerGroupLister: listers.GetConsumerGroupLister(), - InternalsClient: fakeconsumergroupinformer.Get(ctx), - KedaClient: kedaclient.Get(ctx), - KafkaFeatureFlags: configapis.DefaultFeaturesConfig(), + ConsumerGroupLister: listers.GetConsumerGroupLister(), + InternalsClient: fakeconsumergroupinformer.Get(ctx), + KedaClient: kedaclient.Get(ctx), + KafkaFeatureFlags: configapis.DefaultFeaturesConfig(), + ServiceAccountLister: listers.GetServiceAccountLister(), + KubeClient: fakekubeclient.Get(ctx), } reconciler.KafkaFeatureFlags = configapis.FromContext(store.ToContext(ctx)) @@ -1634,3 +1721,11 @@ func patchFinalizers() clientgotesting.PatchActionImpl { action.Patch = []byte(patch) return action } + +func makeKafkaSourceOIDCServiceAccount() *corev1.ServiceAccount { + return auth.GetOIDCServiceAccountForResource(sources.SchemeGroupVersion.WithKind("KafkaSource"), metav1.ObjectMeta{ + Name: SourceName, + Namespace: SourceNamespace, + UID: SourceUUID, + }) +} diff --git a/control-plane/pkg/reconciler/testing/objects_source.go b/control-plane/pkg/reconciler/testing/objects_source.go index 4762ece316..af11432cf7 100644 --- a/control-plane/pkg/reconciler/testing/objects_source.go +++ b/control-plane/pkg/reconciler/testing/objects_source.go @@ -17,6 +17,8 @@ package testing import ( + "fmt" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -24,6 +26,7 @@ import ( "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/bindings/v1beta1" sources "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/sources/v1beta1" eventingduck "knative.dev/eventing/pkg/apis/duck/v1" + "knative.dev/eventing/pkg/apis/feature" "knative.dev/eventing/pkg/eventingtls/eventingtlstesting" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" @@ -252,6 +255,29 @@ func StatusSourceSinkNotResolved(err string) KRShapedOption { } } +func StatusSourceOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled() KRShapedOption { + return func(obj duckv1.KRShaped) { + ks := obj.(*sources.KafkaSource) + ks.Status.MarkOIDCIdentityCreatedSucceededWithReason(fmt.Sprintf("%s feature disabled", feature.OIDCAuthentication), "") + } +} + +func StatusSourceOIDCIdentityCreatedSucceeded() KRShapedOption { + return func(obj duckv1.KRShaped) { + ks := obj.(*sources.KafkaSource) + ks.Status.MarkOIDCIdentityCreatedSucceeded() + } +} + +func StatusSourceOIDCIdentity(saName string) KRShapedOption { + return func(obj duckv1.KRShaped) { + ks := obj.(*sources.KafkaSource) + ks.Status.Auth = &duckv1.AuthStatus{ + ServiceAccountName: &saName, + } + } +} + func SourceReference() *contract.Reference { return &contract.Reference{ Namespace: SourceNamespace,