Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

List applying EventPolicies in KafkaSink #4084

Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion control-plane/pkg/apis/eventing/v1alpha1/kafka_sink_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
)

const (
ConditionAddressable apis.ConditionType = "Addressable"
ConditionAddressable apis.ConditionType = "Addressable"
ConditionEventPoliciesReady apis.ConditionType = "EventPoliciesReady"
)

var conditionSet apis.ConditionSet
Expand Down Expand Up @@ -54,3 +55,19 @@
func (kss *KafkaSinkStatus) InitializeConditions() {
kss.GetConditionSet().Manage(kss).InitializeConditions()
}

func (kss *KafkaSinkStatus) MarkEventPoliciesTrue() {
kss.GetConditionSet().Manage(kss).MarkTrue(ConditionEventPoliciesReady)

Check warning on line 60 in control-plane/pkg/apis/eventing/v1alpha1/kafka_sink_lifecycle.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/apis/eventing/v1alpha1/kafka_sink_lifecycle.go#L59-L60

Added lines #L59 - L60 were not covered by tests
}

func (kss *KafkaSinkStatus) MarkEventPoliciesTrueWithReason(reason, messageFormat string, messageA ...interface{}) {
kss.GetConditionSet().Manage(kss).MarkTrueWithReason(ConditionEventPoliciesReady, reason, messageFormat, messageA...)

Check warning on line 64 in control-plane/pkg/apis/eventing/v1alpha1/kafka_sink_lifecycle.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/apis/eventing/v1alpha1/kafka_sink_lifecycle.go#L63-L64

Added lines #L63 - L64 were not covered by tests
}

func (kss *KafkaSinkStatus) MarkEventPoliciesFailed(reason, messageFormat string, messageA ...interface{}) {
kss.GetConditionSet().Manage(kss).MarkFalse(ConditionEventPoliciesReady, reason, messageFormat, messageA...)

Check warning on line 68 in control-plane/pkg/apis/eventing/v1alpha1/kafka_sink_lifecycle.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/apis/eventing/v1alpha1/kafka_sink_lifecycle.go#L67-L68

Added lines #L67 - L68 were not covered by tests
}

func (kss *KafkaSinkStatus) MarkEventPoliciesUnknown(reason, messageFormat string, messageA ...interface{}) {
kss.GetConditionSet().Manage(kss).MarkUnknown(ConditionEventPoliciesReady, reason, messageFormat, messageA...)

Check warning on line 72 in control-plane/pkg/apis/eventing/v1alpha1/kafka_sink_lifecycle.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/apis/eventing/v1alpha1/kafka_sink_lifecycle.go#L71-L72

Added lines #L71 - L72 were not covered by tests
}
22 changes: 11 additions & 11 deletions control-plane/pkg/core/config/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ import (
"sort"
"strings"

"knative.dev/eventing/pkg/apis/feature"
"knative.dev/eventing/pkg/client/listers/eventing/v1alpha1"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"

"github.com/rickb777/date/period"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1listers "k8s.io/client-go/listers/core/v1"
duck "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/eventing/pkg/apis/feature"
"knative.dev/eventing/pkg/eventingtls"
"knative.dev/pkg/resolver"

Expand All @@ -55,14 +55,14 @@ func ContentModeFromString(mode string) contract.ContentMode {
}
}

// EventPoliciesFromAppliedEventPoliciesStatus resolves a AppliedEventPoliciesStatus into a list of contract.EventPolicy
func EventPoliciesFromAppliedEventPoliciesStatus(status duck.AppliedEventPoliciesStatus, lister v1alpha1.EventPolicyLister, namespace string, features feature.Flags) ([]*contract.EventPolicy, error) {
eventPolicies := make([]*contract.EventPolicy, 0, len(status.Policies))
// ContractEventPoliciesEventPolicies resolves a list of v1alpha1.EventPolicy into a list of contract.EventPolicy
func ContractEventPoliciesEventPolicies(applyingEventPolicies []*eventingv1alpha1.EventPolicy, namespace string, features feature.Flags) []*contract.EventPolicy {
eventPolicies := make([]*contract.EventPolicy, 0, len(applyingEventPolicies))

for _, appliedPolicy := range status.Policies {
policy, err := lister.EventPolicies(namespace).Get(appliedPolicy.Name)
if err != nil {
return nil, fmt.Errorf("failed to get eventPolicy %s: %w", appliedPolicy.Name, err)
for _, policy := range applyingEventPolicies {
if !policy.Status.IsReady() {
// only add ready eventpolicies to the contract
continue
}

contractPolicy := &contract.EventPolicy{}
Expand Down Expand Up @@ -97,7 +97,7 @@ func EventPoliciesFromAppliedEventPoliciesStatus(status duck.AppliedEventPolicie
eventPolicies = append(eventPolicies, contractPolicy)
}

if len(eventPolicies) == 0 {
if len(eventPolicies) == 0 && features.IsOIDCAuthentication() {
creydr marked this conversation as resolved.
Show resolved Hide resolved
if features.IsAuthorizationDefaultModeAllowAll() {
// add event policy to match all subs
eventPolicies = append(eventPolicies, &contract.EventPolicy{
Expand Down Expand Up @@ -132,7 +132,7 @@ func EventPoliciesFromAppliedEventPoliciesStatus(status duck.AppliedEventPolicie
// else: deny all -> add no additional policy
}

return eventPolicies, nil
return eventPolicies
}

func EgressConfigFromDelivery(
Expand Down
146 changes: 117 additions & 29 deletions control-plane/pkg/core/config/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,20 @@ import (
"testing"
"time"

corev1 "k8s.io/api/core/v1"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"

"google.golang.org/protobuf/encoding/protojson"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
"knative.dev/eventing/pkg/apis/feature"
reconcilertesting "knative.dev/pkg/reconciler/testing"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/runtime/protoimpl"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/pointer"
eventingduck "knative.dev/eventing/pkg/apis/duck/v1"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
"knative.dev/eventing/pkg/apis/feature"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/client/injection/ducks/duck/v1/addressable"
Expand All @@ -48,7 +47,6 @@ import (
"knative.dev/eventing-kafka-broker/control-plane/pkg/contract"

eventing "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/eventing/v1alpha1"
eventpolicyinformerfake "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake"
)

func TestContentModeFromString(t *testing.T) {
Expand Down Expand Up @@ -522,7 +520,7 @@ func TestEventPoliciesFromAppliedEventPoliciesStatus(t *testing.T) {
namespace string
defaultAuthorizationMode feature.Flag
expected []*contract.EventPolicy
wantErr bool
oidcDisabled bool
}{
{
name: "Exact match",
Expand All @@ -539,6 +537,14 @@ func TestEventPoliciesFromAppliedEventPoliciesStatus(t *testing.T) {
From: []string{
"from-1",
},
Status: duckv1.Status{
Conditions: duckv1.Conditions{
{
Type: eventingv1alpha1.EventPolicyConditionReady,
Status: corev1.ConditionTrue,
},
},
},
},
},
},
Expand Down Expand Up @@ -566,6 +572,14 @@ func TestEventPoliciesFromAppliedEventPoliciesStatus(t *testing.T) {
From: []string{
"from-*",
},
Status: duckv1.Status{
Conditions: duckv1.Conditions{
{
Type: eventingv1alpha1.EventPolicyConditionReady,
Status: corev1.ConditionTrue,
},
},
},
},
},
},
Expand Down Expand Up @@ -594,6 +608,14 @@ func TestEventPoliciesFromAppliedEventPoliciesStatus(t *testing.T) {
From: []string{
"from-1",
},
Status: duckv1.Status{
Conditions: duckv1.Conditions{
{
Type: eventingv1alpha1.EventPolicyConditionReady,
Status: corev1.ConditionTrue,
},
},
},
},
}, {
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -604,6 +626,14 @@ func TestEventPoliciesFromAppliedEventPoliciesStatus(t *testing.T) {
From: []string{
"from-2-*",
},
Status: duckv1.Status{
Conditions: duckv1.Conditions{
{
Type: eventingv1alpha1.EventPolicyConditionReady,
Status: corev1.ConditionTrue,
},
},
},
},
},
},
Expand Down Expand Up @@ -643,6 +673,14 @@ func TestEventPoliciesFromAppliedEventPoliciesStatus(t *testing.T) {
From: []string{
"from-1",
},
Status: duckv1.Status{
Conditions: duckv1.Conditions{
{
Type: eventingv1alpha1.EventPolicyConditionReady,
Status: corev1.ConditionTrue,
},
},
},
},
}, {
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -660,6 +698,14 @@ func TestEventPoliciesFromAppliedEventPoliciesStatus(t *testing.T) {
From: []string{
"from-2-*",
},
Status: duckv1.Status{
Conditions: duckv1.Conditions{
{
Type: eventingv1alpha1.EventPolicyConditionReady,
Status: corev1.ConditionTrue,
},
},
},
},
},
},
Expand Down Expand Up @@ -728,46 +774,88 @@ func TestEventPoliciesFromAppliedEventPoliciesStatus(t *testing.T) {
defaultAuthorizationMode: feature.AuthorizationDenyAll,
expected: []*contract.EventPolicy{},
}, {
name: "Applying policy does not exist",
name: "Applying policy not ready",
applyingPolicies: []string{
"not-found",
"policy-1",
},
existingEventPolicies: []*eventingv1alpha1.EventPolicy{
{
ObjectMeta: metav1.ObjectMeta{
Name: "policy-1",
Namespace: "my-ns",
},
Status: eventingv1alpha1.EventPolicyStatus{
From: []string{
"from-*",
},
Status: duckv1.Status{
Conditions: duckv1.Conditions{
{
Type: eventingv1alpha1.EventPolicyConditionReady,
Status: corev1.ConditionFalse,
},
},
},
},
},
},
namespace: "my-ns",
defaultAuthorizationMode: feature.AuthorizationDenyAll,
expected: []*contract.EventPolicy{},
}, {
name: "No policy when OIDC is disabled",
oidcDisabled: true,
applyingPolicies: []string{
"policy-1",
},
existingEventPolicies: []*eventingv1alpha1.EventPolicy{
{
ObjectMeta: metav1.ObjectMeta{
Name: "policy-1",
Namespace: "my-ns",
},
Status: eventingv1alpha1.EventPolicyStatus{
From: []string{
"from-1",
},
Status: duckv1.Status{
Conditions: duckv1.Conditions{
{
Type: eventingv1alpha1.EventPolicyConditionReady,
Status: corev1.ConditionFalse, // is false, as OIDC is disabled
},
},
},
},
},
},
existingEventPolicies: []*eventingv1alpha1.EventPolicy{},
namespace: "my-ns",
defaultAuthorizationMode: feature.AuthorizationAllowSameNamespace,
expected: []*contract.EventPolicy{},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

ctx, _ := reconcilertesting.SetupFakeContext(t)
features := feature.Flags{
feature.AuthorizationDefaultMode: tt.defaultAuthorizationMode,
feature.OIDCAuthentication: feature.Enabled,
}

for _, ep := range tt.existingEventPolicies {
err := eventpolicyinformerfake.Get(ctx).Informer().GetStore().Add(ep)
if err != nil {
t.Fatal(err)
}
if tt.oidcDisabled {
features[feature.OIDCAuthentication] = feature.Disabled
}

applyingPoliciesStatus := eventingduck.AppliedEventPoliciesStatus{}
for _, ep := range tt.applyingPolicies {
applyingPoliciesStatus.Policies = append(applyingPoliciesStatus.Policies, eventingduck.AppliedEventPolicyRef{
Name: ep,
APIVersion: eventingv1alpha1.SchemeGroupVersion.String(),
})
}

got, err := EventPoliciesFromAppliedEventPoliciesStatus(applyingPoliciesStatus, eventpolicyinformerfake.Get(ctx).Lister(), tt.namespace, features)
if (err != nil) != tt.wantErr {
t.Errorf("EventPoliciesFromAppliedEventPoliciesStatus() error = %v, wantErr %v", err, tt.wantErr)
return
applyingPolicies := []*eventingv1alpha1.EventPolicy{}
for _, applyingPolicyName := range tt.applyingPolicies {
for _, existingPolicy := range tt.existingEventPolicies {
if applyingPolicyName == existingPolicy.Name {
applyingPolicies = append(applyingPolicies, existingPolicy)
}
}
}

got := ContractEventPoliciesEventPolicies(applyingPolicies, tt.namespace, features)
expectedJSON, err := protojson.Marshal(&contract.Ingress{
EventPolicies: tt.expected,
})
Expand Down
24 changes: 12 additions & 12 deletions control-plane/pkg/reconciler/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
"strings"
"time"

eventingduck "knative.dev/eventing/pkg/apis/duck/v1"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"

"k8s.io/utils/ptr"

Expand Down Expand Up @@ -191,13 +191,13 @@
audience = nil
}

err = auth.UpdateStatusWithEventPolicies(features, &broker.Status.AppliedEventPoliciesStatus, &broker.Status, r.EventPolicyLister, eventing.SchemeGroupVersion.WithKind("Broker"), broker.ObjectMeta)
applyingEventPolicies, err := auth.GetEventPoliciesForResource(r.EventPolicyLister, eventing.SchemeGroupVersion.WithKind("Broker"), broker.ObjectMeta)
if err != nil {
return fmt.Errorf("could not update broker status with EventPolicies: %v", err)
return fmt.Errorf("could not get applying eventpolicies for broker: %v", err)

Check warning on line 196 in control-plane/pkg/reconciler/broker/broker.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/broker/broker.go#L196

Added line #L196 was not covered by tests
}

// Get resource configuration.
brokerResource, err := r.reconcilerBrokerResource(ctx, topic, broker, secret, topicConfig, audience, broker.Status.AppliedEventPoliciesStatus)
brokerResource, err := r.reconcilerBrokerResource(ctx, topic, broker, secret, topicConfig, audience, applyingEventPolicies)
if err != nil {
return statusConditionManager.FailedToResolveConfig(err)
}
Expand Down Expand Up @@ -253,6 +253,11 @@
logger.Debug("Updated dispatcher pod annotation")
}

err = auth.UpdateStatusWithProvidedEventPolicies(features, &broker.Status.AppliedEventPoliciesStatus, &broker.Status, applyingEventPolicies)
if err != nil {
return fmt.Errorf("could not update Broker status with EventPolicies: %v", err)

Check warning on line 258 in control-plane/pkg/reconciler/broker/broker.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/broker/broker.go#L258

Added line #L258 was not covered by tests
}

ingressHost := network.GetServiceHostname(r.Env.IngressName, r.DataPlaneNamespace)

var addressableStatus duckv1.AddressStatus
Expand Down Expand Up @@ -623,14 +628,15 @@
return cm
}

func (r *Reconciler) reconcilerBrokerResource(ctx context.Context, topic string, broker *eventing.Broker, secret *corev1.Secret, config *kafka.TopicConfig, audience *string, appliedEventPoliciesStatus eventingduck.AppliedEventPoliciesStatus) (*contract.Resource, error) {
func (r *Reconciler) reconcilerBrokerResource(ctx context.Context, topic string, broker *eventing.Broker, secret *corev1.Secret, config *kafka.TopicConfig, audience *string, applyingEventPolicies []*eventingv1alpha1.EventPolicy) (*contract.Resource, error) {
features := feature.FromContext(ctx)

resource := &contract.Resource{
Uid: string(broker.UID),
Topics: []string{topic},
Ingress: &contract.Ingress{
Path: receiver.PathFromObject(broker),
Path: receiver.PathFromObject(broker),
EventPolicies: coreconfig.ContractEventPoliciesEventPolicies(applyingEventPolicies, broker.Namespace, features),
},
FeatureFlags: &contract.FeatureFlags{
EnableEventTypeAutocreate: features.IsEnabled(feature.EvenTypeAutoCreate),
Expand Down Expand Up @@ -666,12 +672,6 @@
}
resource.EgressConfig = egressConfig

eventPolicies, err := coreconfig.EventPoliciesFromAppliedEventPoliciesStatus(appliedEventPoliciesStatus, r.EventPolicyLister, broker.Namespace, features)
if err != nil {
return nil, fmt.Errorf("could not get eventpolicies from broker status: %w", err)
}
resource.Ingress.EventPolicies = eventPolicies

return resource, nil
}

Expand Down
Loading
Loading