Skip to content

Commit

Permalink
Add namespace to referenced addressable before resolving destination (#…
Browse files Browse the repository at this point in the history
…1254)

The resolver uses `ref.namespace` to lookup the destination
addressable.
`ref.namespace` for the `deadLetterSink` is not defaulted by the
webhook like for `trigger.subscriber.ref`, so we might receive a
destination with an empty string for
`deadLetterSink.ref.namespace` which leads to not finding the
referenced addressable service.

Signed-off-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>
  • Loading branch information
pierDipi authored Sep 25, 2021
1 parent 963ae77 commit cbd8c28
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 8 deletions.
4 changes: 4 additions & 0 deletions control-plane/pkg/core/config/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ func EgressConfigFromDelivery(
egressConfig := &contract.EgressConfig{}

if delivery.DeadLetterSink != nil {
destination := *delivery.DeadLetterSink // Do not update object Spec, so copy destination.
if destination.Ref != nil && destination.Ref.Namespace == "" {
destination.Ref.Namespace = parent.GetNamespace()
}
deadLetterSinkURL, err := resolver.URIFromDestinationV1(ctx, *delivery.DeadLetterSink, parent)
if err != nil {
return nil, fmt.Errorf("failed to resolve Spec.Delivery.DeadLetterSink: %w", err)
Expand Down
57 changes: 57 additions & 0 deletions control-plane/pkg/reconciler/broker/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,63 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) {
BootstrapServersConfigMapKey: bootstrapServers,
},
},
{
Name: "Reconciled normal - with DLS - no DLS ref namespace",
Objects: []runtime.Object{
NewBroker(
WithDelivery(WithNoDeadLetterSinkNamespace),
),
NewConfigMapFromContract(&contract.Contract{
Generation: 1,
}, &configs),
NewService(WithServiceNamespace(BrokerNamespace)),
BrokerReceiverPod(configs.SystemNamespace, map[string]string{base.VolumeGenerationAnnotationKey: "3"}),
BrokerDispatcherPod(configs.SystemNamespace, map[string]string{base.VolumeGenerationAnnotationKey: "1"}),
},
Key: testKey,
WantEvents: []string{
finalizerUpdatedEvent,
},
WantUpdates: []clientgotesting.UpdateActionImpl{
ConfigMapUpdate(&configs, &contract.Contract{
Resources: []*contract.Resource{
{
Uid: BrokerUUID,
Topics: []string{BrokerTopic()},
Ingress: &contract.Ingress{ContentMode: contract.ContentMode_BINARY, IngressType: &contract.Ingress_Path{Path: receiver.Path(BrokerNamespace, BrokerName)}},
BootstrapServers: bootstrapServers,
EgressConfig: &contract.EgressConfig{DeadLetter: ServiceURLFrom(BrokerNamespace, ServiceName)},
},
},
Generation: 2,
}),
BrokerReceiverPodUpdate(configs.SystemNamespace, map[string]string{
base.VolumeGenerationAnnotationKey: "2",
}),
BrokerDispatcherPodUpdate(configs.SystemNamespace, map[string]string{
base.VolumeGenerationAnnotationKey: "2",
}),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{
{
Object: NewBroker(
WithDelivery(WithNoDeadLetterSinkNamespace),
reconcilertesting.WithInitBrokerConditions,
BrokerConfigMapUpdatedReady(&configs),
BrokerDataPlaneAvailable,
BrokerTopicReady,
BrokerConfigParsed,
BrokerAddressable(&configs),
),
},
},
OtherTestData: map[string]interface{}{
BootstrapServersConfigMapKey: bootstrapServers,
},
},
{
Name: "Failed to create topic",
Objects: []runtime.Object{
Expand Down
11 changes: 10 additions & 1 deletion control-plane/pkg/reconciler/testing/objects_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func NewDeletedBroker(options ...reconcilertesting.BrokerOption) runtime.Object
)
}

func WithDelivery() func(*eventing.Broker) {
func WithDelivery(mutations ...func(spec *eventingduck.DeliverySpec)) func(*eventing.Broker) {
service := NewService()

return func(broker *eventing.Broker) {
Expand All @@ -92,6 +92,15 @@ func WithDelivery() func(*eventing.Broker) {
APIVersion: service.APIVersion,
},
}
for _, mut := range mutations {
mut(broker.Spec.Delivery)
}
}
}

func WithNoDeadLetterSinkNamespace(spec *eventingduck.DeliverySpec) {
if spec.DeadLetterSink != nil && spec.DeadLetterSink.Ref != nil {
spec.DeadLetterSink.Ref.Namespace = ""
}
}

Expand Down
30 changes: 23 additions & 7 deletions control-plane/pkg/reconciler/testing/objects_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package testing

import (
"fmt"
"io/ioutil"

"knative.dev/eventing-kafka-broker/control-plane/pkg/contract"
Expand All @@ -39,9 +40,8 @@ const (
ConfigMapNamespace = "test-namespace-config-map"
ConfigMapName = "test-config-cm"

serviceNamespace = "test-service-namespace"
serviceName = "test-service"
ServiceURL = "http://test-service.test-service-namespace.svc.cluster.local"
ServiceNamespace = "test-service-namespace"
ServiceName = "test-service"

TriggerUUID = "e7185016-5d98-4b54-84e8-3b1cd4acc6b5"

Expand All @@ -51,19 +51,35 @@ const (

var (
Formats = []string{base.Protobuf, base.Json}

ServiceURL = ServiceURLFrom(ServiceNamespace, ServiceName)
)

func NewService() *corev1.Service {
return &corev1.Service{
func NewService(mutations ...func(*corev1.Service)) *corev1.Service {
s := &corev1.Service{
TypeMeta: metav1.TypeMeta{
Kind: "Service",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: serviceName,
Namespace: serviceNamespace,
Name: ServiceName,
Namespace: ServiceNamespace,
},
}
for _, mut := range mutations {
mut(s)
}
return s
}

func WithServiceNamespace(ns string) func(s *corev1.Service) {
return func(s *corev1.Service) {
s.Namespace = ns
}
}

func ServiceURLFrom(ns, name string) string {
return fmt.Sprintf("http://%s.%s.svc.cluster.local", name, ns)
}

func NewConfigMap(configs *Configs, data []byte) runtime.Object {
Expand Down

0 comments on commit cbd8c28

Please sign in to comment.