Skip to content

Commit

Permalink
List applying EventPolicies in KafkaSink
Browse files Browse the repository at this point in the history
  • Loading branch information
creydr committed Aug 26, 2024
1 parent 3eaafe6 commit c526e8c
Show file tree
Hide file tree
Showing 5 changed files with 293 additions and 40 deletions.
19 changes: 18 additions & 1 deletion control-plane/pkg/apis/eventing/v1alpha1/kafka_sink_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import (
)

const (
ConditionAddressable apis.ConditionType = "Addressable"
ConditionAddressable apis.ConditionType = "Addressable"
ConditionEventPoliciesReady apis.ConditionType = "EventPoliciesReady"
)

var conditionSet apis.ConditionSet
Expand Down Expand Up @@ -54,3 +55,19 @@ func (ks *KafkaSinkStatus) SetAddress(addr *duckv1.Addressable) {
func (kss *KafkaSinkStatus) InitializeConditions() {
kss.GetConditionSet().Manage(kss).InitializeConditions()
}

func (kss *KafkaSinkStatus) MarkEventPoliciesTrue() {
kss.GetConditionSet().Manage(kss).MarkTrue(ConditionEventPoliciesReady)

Check warning on line 60 in control-plane/pkg/apis/eventing/v1alpha1/kafka_sink_lifecycle.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/apis/eventing/v1alpha1/kafka_sink_lifecycle.go#L59-L60

Added lines #L59 - L60 were not covered by tests
}

func (kss *KafkaSinkStatus) MarkEventPoliciesTrueWithReason(reason, messageFormat string, messageA ...interface{}) {
kss.GetConditionSet().Manage(kss).MarkTrueWithReason(ConditionEventPoliciesReady, reason, messageFormat, messageA...)

Check warning on line 64 in control-plane/pkg/apis/eventing/v1alpha1/kafka_sink_lifecycle.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/apis/eventing/v1alpha1/kafka_sink_lifecycle.go#L63-L64

Added lines #L63 - L64 were not covered by tests
}

func (kss *KafkaSinkStatus) MarkEventPoliciesFailed(reason, messageFormat string, messageA ...interface{}) {
kss.GetConditionSet().Manage(kss).MarkFalse(ConditionEventPoliciesReady, reason, messageFormat, messageA...)

Check warning on line 68 in control-plane/pkg/apis/eventing/v1alpha1/kafka_sink_lifecycle.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/apis/eventing/v1alpha1/kafka_sink_lifecycle.go#L67-L68

Added lines #L67 - L68 were not covered by tests
}

func (kss *KafkaSinkStatus) MarkEventPoliciesUnknown(reason, messageFormat string, messageA ...interface{}) {
kss.GetConditionSet().Manage(kss).MarkUnknown(ConditionEventPoliciesReady, reason, messageFormat, messageA...)

Check warning on line 72 in control-plane/pkg/apis/eventing/v1alpha1/kafka_sink_lifecycle.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/apis/eventing/v1alpha1/kafka_sink_lifecycle.go#L71-L72

Added lines #L71 - L72 were not covered by tests
}
8 changes: 8 additions & 0 deletions control-plane/pkg/reconciler/sink/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"net"
"net/http"

"knative.dev/eventing/pkg/auth"

eventpolicyinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy"

"go.uber.org/zap"
Expand Down Expand Up @@ -161,5 +163,11 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
DeleteFunc: reconciler.OnDeleteObserver,
})

sinkGK := eventing.SchemeGroupVersion.WithKind("KafkaSink").GroupKind()

// Enqueue the KafkaSink, if we have an EventPolicy which was referencing
// or got updated and now is referencing the KafkaSink
eventPolicyInformer.Informer().AddEventHandler(auth.EventPolicyEventHandler(sinkInformer.Informer().GetIndexer(), sinkGK, impl.EnqueueKey))

return impl
}
19 changes: 7 additions & 12 deletions control-plane/pkg/reconciler/sink/kafka_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,6 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink)
Recorder: controller.GetEventRecorder(ctx),
}

r.markEventPolicyConditionNotYetSupported(ks)

if !r.IsReceiverRunning() {
return statusConditionManager.DataPlaneNotAvailable()
}
Expand Down Expand Up @@ -170,6 +168,11 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink)

logger.Debug("Got contract config map")

err = auth.UpdateStatusWithEventPolicies(feature.FromContext(ctx), &ks.Status.AppliedEventPoliciesStatus, &ks.Status, r.EventPolicyLister, eventing.SchemeGroupVersion.WithKind("KafkaSink"), ks.ObjectMeta)
if err != nil {
return fmt.Errorf("could not update KafkaSinks status with EventPolicies: %v", err)

Check warning on line 173 in control-plane/pkg/reconciler/sink/kafka_sink.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/sink/kafka_sink.go#L173

Added line #L173 was not covered by tests
}

// Get contract data.
ct, err := r.GetDataPlaneConfigMapData(logger, contractConfigMap)
if err != nil && ct == nil {
Expand Down Expand Up @@ -294,14 +297,6 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink)
return nil
}

func (r *Reconciler) markEventPolicyConditionNotYetSupported(ks *eventing.KafkaSink) {
ks.Status.GetConditionSet().Manage(ks.GetStatus()).MarkTrueWithReason(
base.ConditionEventPoliciesReady,
"AuthzNotSupported",
"Authorization not yet supported",
)
}

func (r *Reconciler) FinalizeKind(ctx context.Context, ks *eventing.KafkaSink) reconciler.Event {
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
return r.finalizeKind(ctx, ks)
Expand Down Expand Up @@ -437,8 +432,8 @@ func (r *Reconciler) getSinkContractResource(ctx context.Context, kafkaSink *eve
Uid: string(kafkaSink.UID),
Topics: []string{kafkaSink.Spec.Topic},
Ingress: &contract.Ingress{
Path: receiver.PathFromObject(kafkaSink),
ContentMode: coreconfig.ContentModeFromString(*kafkaSink.Spec.ContentMode),
Path: receiver.PathFromObject(kafkaSink),
ContentMode: coreconfig.ContentModeFromString(*kafkaSink.Spec.ContentMode),
},
FeatureFlags: &contract.FeatureFlags{
EnableEventTypeAutocreate: features.IsEnabled(feature.EvenTypeAutoCreate),
Expand Down
Loading

0 comments on commit c526e8c

Please sign in to comment.