diff --git a/control-plane/pkg/reconciler/base/receiver_condition_set.go b/control-plane/pkg/reconciler/base/receiver_condition_set.go index 273f4dfbbe..4af83bd1d1 100644 --- a/control-plane/pkg/reconciler/base/receiver_condition_set.go +++ b/control-plane/pkg/reconciler/base/receiver_condition_set.go @@ -42,6 +42,7 @@ const ( ConditionConfigParsed apis.ConditionType = "ConfigParsed" ConditionInitialOffsetsCommitted apis.ConditionType = "InitialOffsetsCommitted" ConditionProbeSucceeded apis.ConditionType = "ProbeSucceeded" + ConditionEventPoliciesReady apis.ConditionType = "EventPoliciesReady" ) var IngressConditionSet = apis.NewLivingConditionSet( @@ -51,6 +52,7 @@ var IngressConditionSet = apis.NewLivingConditionSet( ConditionConfigMapUpdated, ConditionConfigParsed, ConditionProbeSucceeded, + ConditionEventPoliciesReady, ) var EgressConditionSet = apis.NewLivingConditionSet( diff --git a/control-plane/pkg/reconciler/broker/broker.go b/control-plane/pkg/reconciler/broker/broker.go index 99d80ec67c..bfa209b46a 100644 --- a/control-plane/pkg/reconciler/broker/broker.go +++ b/control-plane/pkg/reconciler/broker/broker.go @@ -53,6 +53,7 @@ import ( "knative.dev/eventing-kafka-broker/control-plane/pkg/receiver" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base" "knative.dev/eventing-kafka-broker/control-plane/pkg/security" + eventingv1alpha1listers "knative.dev/eventing/pkg/client/listers/eventing/v1alpha1" ) const ( @@ -86,6 +87,8 @@ type Reconciler struct { Prober prober.NewProber Counter *counter.Counter KafkaFeatureFlags *apisconfig.KafkaFeatureFlags + + EventPolicyLister eventingv1alpha1listers.EventPolicyLister } func (r *Reconciler) ReconcileKind(ctx context.Context, broker *eventing.Broker) reconciler.Event { @@ -297,6 +300,11 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker) broker.GetConditionSet().Manage(broker.GetStatus()).MarkTrue(base.ConditionAddressable) + err = auth.UpdateStatusWithEventPolicies(feature.FromContext(ctx), &broker.Status.AppliedEventPoliciesStatus, &broker.Status, r.EventPolicyLister, eventing.SchemeGroupVersion.WithKind("Broker"), broker.ObjectMeta) + if err != nil { + return fmt.Errorf("could not update broker status with EventPolicies: %v", err) + } + return nil } diff --git a/control-plane/pkg/reconciler/broker/broker_test.go b/control-plane/pkg/reconciler/broker/broker_test.go index eec98331e9..19152f5e06 100644 --- a/control-plane/pkg/reconciler/broker/broker_test.go +++ b/control-plane/pkg/reconciler/broker/broker_test.go @@ -79,6 +79,9 @@ const ( externalTopic = "externalTopic" kafkaFeatureFlags = "kafka-feature-flags" + + readyEventPolicyName = "test-event-policy-ready" + unreadyEventPolicyName = "test-event-policy-unready" ) const ( @@ -102,6 +105,12 @@ var ( Path: fmt.Sprintf("/%s/%s", BrokerNamespace, BrokerName), } + brokerV1GVK = metav1.GroupVersionKind{ + Group: "eventing.knative.dev", + Version: "v1", + Kind: "Broker", + } + createTopicError = fmt.Errorf("failed to create topic") deleteTopicError = fmt.Errorf("failed to delete topic") @@ -209,6 +218,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { URL: brokerAddress, }), WithBrokerAddessable(), + reconcilertesting.WithBrokerEventPoliciesReadyBecauseOIDCDisabled(), ), }, }, @@ -283,6 +293,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { URL: brokerAddress, }), WithBrokerAddessable(), + reconcilertesting.WithBrokerEventPoliciesReadyBecauseOIDCDisabled(), ), }, }, @@ -528,6 +539,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { URL: brokerAddress, }), WithBrokerAddessable(), + reconcilertesting.WithBrokerEventPoliciesReadyBecauseOIDCDisabled(), ), }, }, @@ -599,6 +611,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { URL: brokerAddress, }), WithBrokerAddessable(), + reconcilertesting.WithBrokerEventPoliciesReadyBecauseOIDCDisabled(), ), }, }, @@ -718,6 +731,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { URL: brokerAddress, }), WithBrokerAddessable(), + reconcilertesting.WithBrokerEventPoliciesReadyBecauseOIDCDisabled(), ), }, }, @@ -784,6 +798,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { URL: brokerAddress, }), WithBrokerAddessable(), + reconcilertesting.WithBrokerEventPoliciesReadyBecauseOIDCDisabled(), ), }, }, @@ -877,6 +892,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { URL: brokerAddress, }), WithBrokerAddessable(), + reconcilertesting.WithBrokerEventPoliciesReadyBecauseOIDCDisabled(), ), }, }, @@ -987,6 +1003,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { URL: brokerAddress, }), WithBrokerAddessable(), + reconcilertesting.WithBrokerEventPoliciesReadyBecauseOIDCDisabled(), ), }, }, @@ -1083,6 +1100,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { URL: brokerAddress, }), WithBrokerAddessable(), + reconcilertesting.WithBrokerEventPoliciesReadyBecauseOIDCDisabled(), ), }, }, @@ -1172,6 +1190,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { URL: brokerAddress, }), WithBrokerAddessable(), + reconcilertesting.WithBrokerEventPoliciesReadyBecauseOIDCDisabled(), ), }, }, @@ -1354,6 +1373,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { URL: brokerAddress, }), WithBrokerAddessable(), + reconcilertesting.WithBrokerEventPoliciesReadyBecauseOIDCDisabled(), ), }, }, @@ -1448,6 +1468,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { URL: brokerAddress, }), WithBrokerAddessable(), + reconcilertesting.WithBrokerEventPoliciesReadyBecauseOIDCDisabled(), ), }, }, @@ -1677,6 +1698,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { URL: brokerAddress, }), WithBrokerAddessable(), + reconcilertesting.WithBrokerEventPoliciesReadyBecauseOIDCDisabled(), ), }, }, @@ -1787,6 +1809,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { URL: brokerAddress, }), WithBrokerAddessable(), + reconcilertesting.WithBrokerEventPoliciesReadyBecauseOIDCDisabled(), ), }, }, @@ -1865,6 +1888,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { URL: brokerAddress, }), WithBrokerAddessable(), + reconcilertesting.WithBrokerEventPoliciesReadyBecauseOIDCDisabled(), ), }, }, @@ -1940,6 +1964,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { URL: brokerAddress, }), WithBrokerAddessable(), + reconcilertesting.WithBrokerEventPoliciesReadyBecauseOIDCDisabled(), ), }, }, @@ -2018,6 +2043,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { URL: brokerAddress, }), WithBrokerAddessable(), + reconcilertesting.WithBrokerEventPoliciesReadyBecauseOIDCDisabled(), ), }, }, @@ -2079,6 +2105,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { URL: brokerAddress, }), WithBrokerAddessable(), + reconcilertesting.WithBrokerEventPoliciesReadyBecauseOIDCDisabled(), ), }, }, @@ -2143,6 +2170,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { URL: brokerAddress, }), WithBrokerAddessable(), + reconcilertesting.WithBrokerEventPoliciesReadyBecauseOIDCDisabled(), ), }, }, @@ -2214,6 +2242,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { URL: brokerAddress, }), WithBrokerAddessable(), + reconcilertesting.WithBrokerEventPoliciesReadyBecauseOIDCDisabled(), ), }, }, @@ -2312,6 +2341,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { Audience: &brokerAudience, }), WithBrokerAddessable(), + reconcilertesting.WithBrokerEventPoliciesReadyBecauseNoPolicyAndOIDCEnabled(), ), }, }, @@ -2321,6 +2351,171 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { ReplicationFactor: 5, }, }, + Ctx: feature.ToContext(context.Background(), feature.Flags{ + feature.OIDCAuthentication: feature.Enabled, + feature.AuthorizationDefaultMode: feature.AuthorizationAllowSameNamespace, + }), + }, { + Name: "Should list applying EventPolicies", + Objects: []runtime.Object{ + NewBroker(), + BrokerConfig(bootstrapServers, 20, 5), + NewConfigMapWithBinaryData(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, nil), + NewService(), + BrokerReceiverPod(env.SystemNamespace, map[string]string{ + base.VolumeGenerationAnnotationKey: "0", + "annotation_to_preserve": "value_to_preserve", + }), + BrokerDispatcherPod(env.SystemNamespace, map[string]string{ + base.VolumeGenerationAnnotationKey: "0", + "annotation_to_preserve": "value_to_preserve", + }), + reconcilertesting.NewEventPolicy(readyEventPolicyName, BrokerNamespace, + reconcilertesting.WithReadyEventPolicyCondition, + reconcilertesting.WithEventPolicyToRef(brokerV1GVK, BrokerName), + ), + }, + Key: testKey, + WantEvents: []string{ + finalizerUpdatedEvent, + }, + WantUpdates: []clientgotesting.UpdateActionImpl{ + ConfigMapUpdate(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, env.ContractConfigMapFormat, &contract.Contract{ + Resources: []*contract.Resource{ + { + Uid: BrokerUUID, + Topics: []string{BrokerTopic()}, + Ingress: &contract.Ingress{Path: receiver.Path(BrokerNamespace, BrokerName)}, + BootstrapServers: bootstrapServers, + Reference: BrokerReference(), + }, + }, + Generation: 1, + }), + BrokerReceiverPodUpdate(env.SystemNamespace, map[string]string{ + base.VolumeGenerationAnnotationKey: "1", + "annotation_to_preserve": "value_to_preserve", + }), + BrokerDispatcherPodUpdate(env.SystemNamespace, map[string]string{ + base.VolumeGenerationAnnotationKey: "1", + "annotation_to_preserve": "value_to_preserve", + }), + }, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{ + { + Object: NewBroker( + reconcilertesting.WithInitBrokerConditions, + StatusBrokerConfigMapUpdatedReady(&env), + StatusBrokerDataPlaneAvailable, + StatusBrokerConfigParsed, + StatusBrokerTopicReady, + BrokerAddressable(&env), + StatusBrokerProbeSucceeded, + BrokerConfigMapAnnotations(), + WithTopicStatusAnnotation(BrokerTopic()), + WithBrokerAddresses([]duckv1.Addressable{ + { + Name: pointer.String("http"), + URL: brokerAddress, + Audience: &brokerAudience, + }, + }), + WithBrokerAddress(duckv1.Addressable{ + Name: pointer.String("http"), + URL: brokerAddress, + Audience: &brokerAudience, + }), + WithBrokerAddessable(), + reconcilertesting.WithBrokerEventPoliciesReady(), + reconcilertesting.WithBrokerEventPoliciesListed(readyEventPolicyName), + ), + }, + }, + Ctx: feature.ToContext(context.Background(), feature.Flags{ + feature.OIDCAuthentication: feature.Enabled, + feature.AuthorizationDefaultMode: feature.AuthorizationAllowSameNamespace, + }), + }, { + Name: "Should mark as NotReady on unready EventPolicies", + Objects: []runtime.Object{ + NewBroker(), + BrokerConfig(bootstrapServers, 20, 5), + NewConfigMapWithBinaryData(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, nil), + NewService(), + BrokerReceiverPod(env.SystemNamespace, map[string]string{ + base.VolumeGenerationAnnotationKey: "0", + "annotation_to_preserve": "value_to_preserve", + }), + BrokerDispatcherPod(env.SystemNamespace, map[string]string{ + base.VolumeGenerationAnnotationKey: "0", + "annotation_to_preserve": "value_to_preserve", + }), + reconcilertesting.NewEventPolicy(unreadyEventPolicyName, BrokerNamespace, + reconcilertesting.WithUnreadyEventPolicyCondition("", ""), + reconcilertesting.WithEventPolicyToRef(brokerV1GVK, BrokerName), + ), + }, + Key: testKey, + WantEvents: []string{ + finalizerUpdatedEvent, + }, + WantUpdates: []clientgotesting.UpdateActionImpl{ + ConfigMapUpdate(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, env.ContractConfigMapFormat, &contract.Contract{ + Resources: []*contract.Resource{ + { + Uid: BrokerUUID, + Topics: []string{BrokerTopic()}, + Ingress: &contract.Ingress{Path: receiver.Path(BrokerNamespace, BrokerName)}, + BootstrapServers: bootstrapServers, + Reference: BrokerReference(), + }, + }, + Generation: 1, + }), + BrokerReceiverPodUpdate(env.SystemNamespace, map[string]string{ + base.VolumeGenerationAnnotationKey: "1", + "annotation_to_preserve": "value_to_preserve", + }), + BrokerDispatcherPodUpdate(env.SystemNamespace, map[string]string{ + base.VolumeGenerationAnnotationKey: "1", + "annotation_to_preserve": "value_to_preserve", + }), + }, + WantPatches: []clientgotesting.PatchActionImpl{ + patchFinalizers(), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{ + { + Object: NewBroker( + reconcilertesting.WithInitBrokerConditions, + StatusBrokerConfigMapUpdatedReady(&env), + StatusBrokerDataPlaneAvailable, + StatusBrokerConfigParsed, + StatusBrokerTopicReady, + BrokerAddressable(&env), + StatusBrokerProbeSucceeded, + BrokerConfigMapAnnotations(), + WithTopicStatusAnnotation(BrokerTopic()), + WithBrokerAddresses([]duckv1.Addressable{ + { + Name: pointer.String("http"), + URL: brokerAddress, + Audience: &brokerAudience, + }, + }), + WithBrokerAddress(duckv1.Addressable{ + Name: pointer.String("http"), + URL: brokerAddress, + Audience: &brokerAudience, + }), + WithBrokerAddessable(), + reconcilertesting.WithBrokerEventPoliciesNotReady("EventPoliciesNotReady", fmt.Sprintf("event policies %s are not ready", unreadyEventPolicyName)), + ), + }, + }, Ctx: feature.ToContext(context.Background(), feature.Flags{ feature.OIDCAuthentication: feature.Enabled, }), @@ -2877,6 +3072,7 @@ func brokerFinalization(t *testing.T, format string, env config.Env) { Name: pointer.String("http"), URL: brokerAddress, }), + reconcilertesting.WithBrokerEventPoliciesReadyBecauseOIDCDisabled(), ), }, }, @@ -2944,6 +3140,7 @@ func brokerFinalization(t *testing.T, format string, env config.Env) { URL: httpsURL(BrokerName, BrokerNamespace), CACerts: pointer.String(string(eventingtlstesting.CA)), }), + reconcilertesting.WithBrokerEventPoliciesReadyBecauseOIDCDisabled(), ), }, }, @@ -3034,6 +3231,7 @@ func useTable(t *testing.T, table TableTest, env *config.Env) { Prober: proberMock, Counter: counter.NewExpiringCounter(ctx), KafkaFeatureFlags: featureFlags, + EventPolicyLister: listers.GetEventPolicyLister(), } reconciler.Tracker = &FakeTracker{} diff --git a/control-plane/pkg/reconciler/broker/controller.go b/control-plane/pkg/reconciler/broker/controller.go index 9501d1fb1e..950a944ebd 100644 --- a/control-plane/pkg/reconciler/broker/controller.go +++ b/control-plane/pkg/reconciler/broker/controller.go @@ -24,6 +24,8 @@ import ( "net/http" "strings" + "knative.dev/eventing/pkg/auth" + "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" @@ -39,6 +41,7 @@ import ( "knative.dev/pkg/resolver" brokerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker" + eventpolicyinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy" brokerreconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1/broker" configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap" podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod" @@ -63,6 +66,7 @@ func NewController(ctx context.Context, watcher configmap.Watcher, env *config.E eventing.RegisterAlternateBrokerConditionSet(base.IngressConditionSet) configmapInformer := configmapinformer.Get(ctx) + eventPolicyInformer := eventpolicyinformer.Get(ctx) featureFlags := apisconfig.DefaultFeaturesConfig() clientPool := clientpool.Get(ctx) @@ -81,6 +85,7 @@ func NewController(ctx context.Context, watcher configmap.Watcher, env *config.E }, GetKafkaClusterAdmin: clientPool.GetClusterAdmin, ConfigMapLister: configmapInformer.Lister(), + EventPolicyLister: eventPolicyInformer.Lister(), Env: env, Counter: counter.NewExpiringCounter(ctx), KafkaFeatureFlags: featureFlags, @@ -187,6 +192,12 @@ func NewController(ctx context.Context, watcher configmap.Watcher, env *config.E }, }) + brokerGK := eventing.SchemeGroupVersion.WithKind("Broker").GroupKind() + + // Enqueue the Broker, if we have an EventPolicy which was referencing + // or got updated and now is referencing the Broker + eventPolicyInformer.Informer().AddEventHandler(auth.EventPolicyEventHandler(brokerInformer.Informer().GetIndexer(), brokerGK, impl.EnqueueKey)) + return impl } diff --git a/control-plane/pkg/reconciler/broker/controller_test.go b/control-plane/pkg/reconciler/broker/controller_test.go index 0bef47c500..90c25d6c73 100644 --- a/control-plane/pkg/reconciler/broker/controller_test.go +++ b/control-plane/pkg/reconciler/broker/controller_test.go @@ -28,6 +28,7 @@ import ( "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/clientpool" _ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker/fake" _ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger/fake" + _ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake" _ "knative.dev/pkg/client/injection/ducks/duck/v1/addressable/fake" fakekubeclient "knative.dev/pkg/client/injection/kube/client/fake" _ "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/fake" diff --git a/control-plane/pkg/reconciler/broker/namespaced_broker.go b/control-plane/pkg/reconciler/broker/namespaced_broker.go index 81711084e5..d8b4a0a230 100644 --- a/control-plane/pkg/reconciler/broker/namespaced_broker.go +++ b/control-plane/pkg/reconciler/broker/namespaced_broker.go @@ -45,6 +45,7 @@ import ( eventing "knative.dev/eventing/pkg/apis/eventing/v1" brokerreconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1/broker" eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1" + eventingv1alpha1listers "knative.dev/eventing/pkg/client/listers/eventing/v1alpha1" apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/config" @@ -75,6 +76,7 @@ type NamespacedReconciler struct { DeploymentLister appslisters.DeploymentLister StatefulSetLister appslisters.StatefulSetLister BrokerLister eventinglisters.BrokerLister + EventPolicyLister eventingv1alpha1listers.EventPolicyLister // GetKafkaClusterAdmin creates new sarama ClusterAdmin. It's convenient to add this as Reconciler field so that we can // mock the function used during the reconciliation loop. @@ -244,6 +246,7 @@ func (r *NamespacedReconciler) createReconcilerForBrokerInstance(broker *eventin Prober: r.Prober, Counter: r.Counter, KafkaFeatureFlags: r.KafkaFeatureFlags, + EventPolicyLister: r.EventPolicyLister, } } diff --git a/control-plane/pkg/reconciler/broker/namespaced_broker_test.go b/control-plane/pkg/reconciler/broker/namespaced_broker_test.go index 07645c50ba..05d1f107d6 100644 --- a/control-plane/pkg/reconciler/broker/namespaced_broker_test.go +++ b/control-plane/pkg/reconciler/broker/namespaced_broker_test.go @@ -316,6 +316,7 @@ func namespacedBrokerReconciliation(t *testing.T, format string, env config.Env) URL: brokerNamespacedAddress, }), WithBrokerAddessable(), + reconcilertesting.WithBrokerEventPoliciesReadyBecauseOIDCDisabled(), ), }, }, @@ -509,6 +510,7 @@ func useTableNamespaced(t *testing.T, table TableTest, env *config.Env) { ServiceAccountLister: listers.GetServiceAccountLister(), ServiceLister: listers.GetServiceLister(), ClusterRoleBindingLister: listers.GetClusterRoleBindingLister(), + EventPolicyLister: listers.GetEventPolicyLister(), GetKafkaClusterAdmin: func(_ context.Context, _ []string, _ *corev1.Secret) (sarama.ClusterAdmin, error) { return &kafkatesting.MockKafkaClusterAdmin{ ExpectedTopicName: expectedTopicName, diff --git a/control-plane/pkg/reconciler/broker/namespaced_controller.go b/control-plane/pkg/reconciler/broker/namespaced_controller.go index 4441ee475a..eed5bdc508 100644 --- a/control-plane/pkg/reconciler/broker/namespaced_controller.go +++ b/control-plane/pkg/reconciler/broker/namespaced_controller.go @@ -23,6 +23,8 @@ import ( "net/http" "time" + "knative.dev/eventing/pkg/auth" + "knative.dev/eventing/pkg/eventingtls" "knative.dev/pkg/network" @@ -52,6 +54,7 @@ import ( kubeclient "knative.dev/pkg/client/injection/kube/client" brokerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker" + eventpolicyinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy" brokerreconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1/broker" deploymentinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment" statefulsetinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset" @@ -81,6 +84,7 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, env eventing.RegisterAlternateBrokerConditionSet(base.IngressConditionSet) configmapInformer := configmapinformer.Get(ctx) + eventPolicyInformer := eventpolicyinformer.Get(ctx) cfg := injection.GetConfig(ctx) mfc, err := mfclient.NewClient(cfg) @@ -112,6 +116,7 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, env StatefulSetLister: statefulsetinformer.Get(ctx).Lister(), DeploymentLister: deploymentinformer.Get(ctx).Lister(), BrokerLister: brokerinformer.Get(ctx).Lister(), + EventPolicyLister: eventPolicyInformer.Lister(), Env: env, Counter: counter.NewExpiringCounter(ctx), ManifestivalClient: mfc, @@ -244,5 +249,11 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, env }, }) + brokerGK := eventing.SchemeGroupVersion.WithKind("Broker").GroupKind() + + // Enqueue the Broker, if we have an EventPolicy which was referencing + // or got updated and now is referencing the Broker + eventPolicyInformer.Informer().AddEventHandler(auth.EventPolicyEventHandler(brokerInformer.Informer().GetIndexer(), brokerGK, impl.EnqueueKey)) + return impl } diff --git a/control-plane/pkg/reconciler/sink/kafka_sink.go b/control-plane/pkg/reconciler/sink/kafka_sink.go index 5133b93b08..14782578ed 100644 --- a/control-plane/pkg/reconciler/sink/kafka_sink.go +++ b/control-plane/pkg/reconciler/sink/kafka_sink.go @@ -89,6 +89,8 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink) Recorder: controller.GetEventRecorder(ctx), } + r.markEventPolicyConditionNotYetSupported(ks) + if !r.IsReceiverRunning() { return statusConditionManager.DataPlaneNotAvailable() } @@ -301,6 +303,14 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink) return nil } +func (r *Reconciler) markEventPolicyConditionNotYetSupported(ks *eventing.KafkaSink) { + ks.Status.GetConditionSet().Manage(ks.GetStatus()).MarkTrueWithReason( + base.ConditionEventPoliciesReady, + "AuthzNotSupported", + "Authorization not yet supported", + ) +} + func (r *Reconciler) FinalizeKind(ctx context.Context, ks *eventing.KafkaSink) reconciler.Event { return retry.RetryOnConflict(retry.DefaultBackoff, func() error { return r.finalizeKind(ctx, ks) diff --git a/control-plane/pkg/reconciler/sink/kafka_sink_test.go b/control-plane/pkg/reconciler/sink/kafka_sink_test.go index 298641b446..1656a8a872 100644 --- a/control-plane/pkg/reconciler/sink/kafka_sink_test.go +++ b/control-plane/pkg/reconciler/sink/kafka_sink_test.go @@ -186,6 +186,7 @@ func sinkReconciliation(t *testing.T, format string, env config.Env) { }, }), WithSinkAddessable(), + WithSinkEventPolicyConditionAuthZNotSupported(), ), }, }, @@ -251,6 +252,7 @@ func sinkReconciliation(t *testing.T, format string, env config.Env) { }, }), WithSinkAddessable(), + WithSinkEventPolicyConditionAuthZNotSupported(), ), }, }, @@ -326,6 +328,7 @@ func sinkReconciliation(t *testing.T, format string, env config.Env) { }, }), WithSinkAddessable(), + WithSinkEventPolicyConditionAuthZNotSupported(), ), }, }, @@ -403,6 +406,7 @@ func sinkReconciliation(t *testing.T, format string, env config.Env) { }, }), WithSinkAddessable(), + WithSinkEventPolicyConditionAuthZNotSupported(), ), }, }, @@ -450,6 +454,7 @@ func sinkReconciliation(t *testing.T, format string, env config.Env) { InitSinkConditions, StatusDataPlaneAvailable, StatusTopicNotPresentErr(SinkTopic(), io.EOF), + WithSinkEventPolicyConditionAuthZNotSupported(), ), }, }, @@ -527,6 +532,7 @@ func sinkReconciliation(t *testing.T, format string, env config.Env) { }, }), WithSinkAddessable(), + WithSinkEventPolicyConditionAuthZNotSupported(), ), }, }, @@ -565,6 +571,7 @@ func sinkReconciliation(t *testing.T, format string, env config.Env) { StatusDataPlaneAvailable, BootstrapServers(bootstrapServersArr), StatusFailedToCreateTopic(SinkTopic()), + WithSinkEventPolicyConditionAuthZNotSupported(), ), }, }, @@ -639,6 +646,7 @@ func sinkReconciliation(t *testing.T, format string, env config.Env) { }, }), WithSinkAddessable(), + WithSinkEventPolicyConditionAuthZNotSupported(), ), }, }, @@ -700,6 +708,7 @@ func sinkReconciliation(t *testing.T, format string, env config.Env) { }, }), WithSinkAddessable(), + WithSinkEventPolicyConditionAuthZNotSupported(), ), }, }, @@ -784,6 +793,7 @@ func sinkReconciliation(t *testing.T, format string, env config.Env) { }, }), WithSinkAddessable(), + WithSinkEventPolicyConditionAuthZNotSupported(), ), }, }, @@ -867,6 +877,7 @@ func sinkReconciliation(t *testing.T, format string, env config.Env) { }, }), WithSinkAddessable(), + WithSinkEventPolicyConditionAuthZNotSupported(), ), }, }, @@ -894,6 +905,7 @@ func sinkReconciliation(t *testing.T, format string, env config.Env) { Object: NewSink( InitSinkConditions, StatusDataPlaneNotAvailable, + WithSinkEventPolicyConditionAuthZNotSupported(), ), }, }, @@ -960,6 +972,7 @@ func sinkReconciliation(t *testing.T, format string, env config.Env) { }, }), WithSinkAddessable(), + WithSinkEventPolicyConditionAuthZNotSupported(), ), }, }, @@ -1031,6 +1044,7 @@ func sinkReconciliation(t *testing.T, format string, env config.Env) { }, }), WithSinkAddessable(), + WithSinkEventPolicyConditionAuthZNotSupported(), ), }, }, @@ -1082,6 +1096,7 @@ func sinkReconciliation(t *testing.T, format string, env config.Env) { StatusConfigMapUpdatedReady(&env), StatusTopicReadyWithOwner(SinkTopic(), sink.ControllerTopicOwner), StatusProbeFailed(prober.StatusNotReady), + WithSinkEventPolicyConditionAuthZNotSupported(), ), }, }, @@ -1136,6 +1151,7 @@ func sinkReconciliation(t *testing.T, format string, env config.Env) { StatusConfigMapUpdatedReady(&env), StatusTopicReadyWithOwner(SinkTopic(), sink.ControllerTopicOwner), StatusProbeFailed(prober.StatusUnknown), + WithSinkEventPolicyConditionAuthZNotSupported(), ), }, }, @@ -1211,6 +1227,7 @@ func sinkReconciliation(t *testing.T, format string, env config.Env) { }, }), WithSinkAddessable(), + WithSinkEventPolicyConditionAuthZNotSupported(), ), }, }, @@ -1280,6 +1297,7 @@ func sinkReconciliation(t *testing.T, format string, env config.Env) { }, }), WithSinkAddessable(), + WithSinkEventPolicyConditionAuthZNotSupported(), ), }, }, diff --git a/control-plane/pkg/reconciler/testing/listers.go b/control-plane/pkg/reconciler/testing/listers.go index b97ca8519c..3685a3455c 100644 --- a/control-plane/pkg/reconciler/testing/listers.go +++ b/control-plane/pkg/reconciler/testing/listers.go @@ -28,9 +28,11 @@ import ( rbaclisters "k8s.io/client-go/listers/rbac/v1" "k8s.io/client-go/tools/cache" eventing "knative.dev/eventing/pkg/apis/eventing/v1" + eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" messaging "knative.dev/eventing/pkg/apis/messaging/v1" fakeeventingclientset "knative.dev/eventing/pkg/client/clientset/versioned/fake" eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1" + eventingv1alpha1listers "knative.dev/eventing/pkg/client/listers/eventing/v1alpha1" messaginglisters "knative.dev/eventing/pkg/client/listers/messaging/v1" "knative.dev/pkg/reconciler/testing" @@ -172,6 +174,10 @@ func (l *Listers) GetNamespaceLister() corelisters.NamespaceLister { return corelisters.NewNamespaceLister(l.indexerFor(&corev1.Namespace{})) } +func (l *Listers) GetEventPolicyLister() eventingv1alpha1listers.EventPolicyLister { + return eventingv1alpha1listers.NewEventPolicyLister(l.indexerFor(&eventingv1alpha1.EventPolicy{})) +} + func (l *Listers) indexerFor(obj runtime.Object) cache.Indexer { return l.sorter.IndexerForObjectType(obj) } diff --git a/control-plane/pkg/reconciler/testing/objects_sink.go b/control-plane/pkg/reconciler/testing/objects_sink.go index cf7f094c7b..fde17016e2 100644 --- a/control-plane/pkg/reconciler/testing/objects_sink.go +++ b/control-plane/pkg/reconciler/testing/objects_sink.go @@ -198,3 +198,13 @@ func WithSinkAddessable() KRShapedOption { ch.GetConditionSet().Manage(ch.GetStatus()).MarkTrue(base.ConditionAddressable) } } +func WithSinkEventPolicyConditionAuthZNotSupported() KRShapedOption { + return func(obj duckv1.KRShaped) { + ch := obj.(*eventing.KafkaSink) + ch.GetConditionSet().Manage(ch.GetStatus()).MarkTrueWithReason( + base.ConditionEventPoliciesReady, + "AuthzNotSupported", + "Authorization not yet supported", + ) + } +} diff --git a/vendor/knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake/fake.go b/vendor/knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake/fake.go new file mode 100644 index 0000000000..349893d97b --- /dev/null +++ b/vendor/knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake/fake.go @@ -0,0 +1,40 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by injection-gen. DO NOT EDIT. + +package fake + +import ( + context "context" + + eventpolicy "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy" + fake "knative.dev/eventing/pkg/client/injection/informers/factory/fake" + controller "knative.dev/pkg/controller" + injection "knative.dev/pkg/injection" +) + +var Get = eventpolicy.Get + +func init() { + injection.Fake.RegisterInformer(withInformer) +} + +func withInformer(ctx context.Context) (context.Context, controller.Informer) { + f := fake.Get(ctx) + inf := f.Eventing().V1alpha1().EventPolicies() + return context.WithValue(ctx, eventpolicy.Key{}, inf), inf.Informer() +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 390d93b83f..66938f2bdb 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1220,6 +1220,7 @@ knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker/fake knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger/fake knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy +knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake knative.dev/eventing/pkg/client/injection/informers/factory knative.dev/eventing/pkg/client/injection/informers/factory/fake knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription