Skip to content

Commit

Permalink
Update broker and channel to new utils function
Browse files Browse the repository at this point in the history
  • Loading branch information
creydr committed Aug 28, 2024
1 parent 3b1f616 commit a7f6706
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 23 deletions.
24 changes: 12 additions & 12 deletions control-plane/pkg/reconciler/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"strings"
"time"

eventingduck "knative.dev/eventing/pkg/apis/duck/v1"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"

"k8s.io/utils/ptr"

Expand Down Expand Up @@ -191,13 +191,13 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker)
audience = nil
}

err = auth.UpdateStatusWithEventPolicies(features, &broker.Status.AppliedEventPoliciesStatus, &broker.Status, r.EventPolicyLister, eventing.SchemeGroupVersion.WithKind("Broker"), broker.ObjectMeta)
applyingEventPolicies, err := auth.GetEventPoliciesForResource(r.EventPolicyLister, eventing.SchemeGroupVersion.WithKind("Broker"), broker.ObjectMeta)
if err != nil {
return fmt.Errorf("could not update broker status with EventPolicies: %v", err)
return fmt.Errorf("could not get applying eventpolicies for broker: %v", err)

Check warning on line 196 in control-plane/pkg/reconciler/broker/broker.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/broker/broker.go#L196

Added line #L196 was not covered by tests
}

// Get resource configuration.
brokerResource, err := r.reconcilerBrokerResource(ctx, topic, broker, secret, topicConfig, audience, broker.Status.AppliedEventPoliciesStatus)
brokerResource, err := r.reconcilerBrokerResource(ctx, topic, broker, secret, topicConfig, audience, applyingEventPolicies)
if err != nil {
return statusConditionManager.FailedToResolveConfig(err)
}
Expand Down Expand Up @@ -253,6 +253,11 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker)
logger.Debug("Updated dispatcher pod annotation")
}

err = auth.UpdateStatusWithProvidedEventPolicies(features, &broker.Status.AppliedEventPoliciesStatus, &broker.Status, applyingEventPolicies)
if err != nil {
return fmt.Errorf("could not update Broker status with EventPolicies: %v", err)

Check warning on line 258 in control-plane/pkg/reconciler/broker/broker.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/broker/broker.go#L258

Added line #L258 was not covered by tests
}

ingressHost := network.GetServiceHostname(r.Env.IngressName, r.DataPlaneNamespace)

var addressableStatus duckv1.AddressStatus
Expand Down Expand Up @@ -623,14 +628,15 @@ func rebuildCMFromStatusAnnotations(br *eventing.Broker) *corev1.ConfigMap {
return cm
}

func (r *Reconciler) reconcilerBrokerResource(ctx context.Context, topic string, broker *eventing.Broker, secret *corev1.Secret, config *kafka.TopicConfig, audience *string, appliedEventPoliciesStatus eventingduck.AppliedEventPoliciesStatus) (*contract.Resource, error) {
func (r *Reconciler) reconcilerBrokerResource(ctx context.Context, topic string, broker *eventing.Broker, secret *corev1.Secret, config *kafka.TopicConfig, audience *string, applyingEventPolicies []*eventingv1alpha1.EventPolicy) (*contract.Resource, error) {
features := feature.FromContext(ctx)

resource := &contract.Resource{
Uid: string(broker.UID),
Topics: []string{topic},
Ingress: &contract.Ingress{
Path: receiver.PathFromObject(broker),
Path: receiver.PathFromObject(broker),
EventPolicies: coreconfig.ContractEventPoliciesEventPolicies(applyingEventPolicies, broker.Namespace, features),
},
FeatureFlags: &contract.FeatureFlags{
EnableEventTypeAutocreate: features.IsEnabled(feature.EvenTypeAutoCreate),
Expand Down Expand Up @@ -666,12 +672,6 @@ func (r *Reconciler) reconcilerBrokerResource(ctx context.Context, topic string,
}
resource.EgressConfig = egressConfig

eventPolicies, err := coreconfig.EventPoliciesFromAppliedEventPoliciesStatus(appliedEventPoliciesStatus, r.EventPolicyLister, broker.Namespace, features)
if err != nil {
return nil, fmt.Errorf("could not get eventpolicies from broker status: %w", err)
}
resource.Ingress.EventPolicies = eventPolicies

return resource, nil
}

Expand Down
1 change: 0 additions & 1 deletion control-plane/pkg/reconciler/broker/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1272,7 +1272,6 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) {
StatusBrokerDataPlaneAvailable,
StatusBrokerConfigNotParsed("failed to resolve Spec.Delivery.DeadLetterSink: destination missing Ref and URI, expected at least one"),
StatusBrokerTopicReady,
reconcilertesting.WithBrokerEventPoliciesReadyBecauseOIDCDisabled(),
BrokerConfigMapAnnotations(),
WithTopicStatusAnnotation(BrokerTopic()),
),
Expand Down
17 changes: 7 additions & 10 deletions control-plane/pkg/reconciler/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"strings"
"time"

"knative.dev/eventing/pkg/apis/eventing/v1alpha1"

eventingv1alpha1listers "knative.dev/eventing/pkg/client/listers/eventing/v1alpha1"

"k8s.io/utils/pointer"
Expand Down Expand Up @@ -233,7 +235,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, channel *messagingv1beta
}

// Get resource configuration
channelResource, err := r.getChannelContractResource(ctx, topic, channel, authContext, topicConfig, audience, channel.Status.AppliedEventPoliciesStatus)
channelResource, err := r.getChannelContractResource(ctx, topic, channel, authContext, topicConfig, audience, nil)
if err != nil {
return statusConditionManager.FailedToResolveConfig(err)
}
Expand Down Expand Up @@ -688,15 +690,16 @@ func (r *Reconciler) reconcileConsumerGroup(ctx context.Context, channel *messag
return cg, nil
}

func (r *Reconciler) getChannelContractResource(ctx context.Context, topic string, channel *messagingv1beta1.KafkaChannel, auth *security.NetSpecAuthContext, config *kafka.TopicConfig, audience *string, appliedEventPoliciesStatus v1.AppliedEventPoliciesStatus) (*contract.Resource, error) {
func (r *Reconciler) getChannelContractResource(ctx context.Context, topic string, channel *messagingv1beta1.KafkaChannel, auth *security.NetSpecAuthContext, config *kafka.TopicConfig, audience *string, applyingEventPolicies []*v1alpha1.EventPolicy) (*contract.Resource, error) {
features := feature.FromContext(ctx)

resource := &contract.Resource{
Uid: string(channel.UID),
Topics: []string{topic},
Ingress: &contract.Ingress{
Host: receiver.Host(channel.GetNamespace(), channel.GetName()),
Path: receiver.Path(channel.GetNamespace(), channel.GetName()),
Host: receiver.Host(channel.GetNamespace(), channel.GetName()),
Path: receiver.Path(channel.GetNamespace(), channel.GetName()),
EventPolicies: coreconfig.ContractEventPoliciesEventPolicies(applyingEventPolicies, channel.Namespace, features),
},
FeatureFlags: &contract.FeatureFlags{
EnableEventTypeAutocreate: features.IsEnabled(feature.EvenTypeAutoCreate) && !ownedByBroker(channel),
Expand All @@ -721,12 +724,6 @@ func (r *Reconciler) getChannelContractResource(ctx context.Context, topic strin
resource.Ingress.Audience = *audience
}

eventPolicies, err := coreconfig.EventPoliciesFromAppliedEventPoliciesStatus(appliedEventPoliciesStatus, r.EventPolicyLister, channel.Namespace, features)
if err != nil {
return nil, fmt.Errorf("could not get eventpolicies from channel status: %w", err)
}
resource.Ingress.EventPolicies = eventPolicies

egressConfig, err := coreconfig.EgressConfigFromDelivery(ctx, r.Resolver, channel, channel.Spec.Delivery, r.DefaultBackoffDelayMs)
if err != nil {
return nil, err
Expand Down

0 comments on commit a7f6706

Please sign in to comment.