Skip to content

Commit

Permalink
Resolved Trigger deadLetterSink is available in the status
Browse files Browse the repository at this point in the history
Signed-off-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>
  • Loading branch information
pierDipi committed Jul 10, 2021
1 parent ec19976 commit 5421f7b
Show file tree
Hide file tree
Showing 40 changed files with 1,506 additions and 103 deletions.
38 changes: 38 additions & 0 deletions control-plane/pkg/core/config/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,41 @@ func DurationMillisFromISO8601String(durationStr *string, defaultDurationMillis
func IncrementContractGeneration(ct *contract.Contract) {
ct.Generation = (ct.Generation + 1) % (math.MaxUint64 - 1)
}

// MergeEgressConfig merges the 2 given egress configs into one egress config prioritizing e0 values.
func MergeEgressConfig(e0, e1 *contract.EgressConfig) *contract.EgressConfig {
if e0 == nil {
return e1
}
if e1 == nil {
return e0
}
return &contract.EgressConfig{
DeadLetter: mergeString(e0.GetDeadLetter(), e1.GetDeadLetter()),
Retry: mergeUint32(e0.GetRetry(), e1.GetRetry()),
BackoffPolicy: e0.GetBackoffPolicy(),
BackoffDelay: mergeUint64(e0.GetBackoffDelay(), e1.GetBackoffDelay()),
Timeout: mergeUint64(e0.GetTimeout(), e1.GetTimeout()),
}
}

func mergeUint64(a, b uint64) uint64 {
if a == 0 {
return b
}
return a
}

func mergeUint32(a, b uint32) uint32 {
if a == 0 {
return b
}
return a
}

func mergeString(a, b string) string {
if a == "" {
return b
}
return a
}
133 changes: 133 additions & 0 deletions control-plane/pkg/core/config/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import (
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/protobuf/runtime/protoimpl"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -346,3 +349,133 @@ func TestEgressConfigFromDelivery(t *testing.T) {
})
}
}

func TestMergeEgressConfig(t *testing.T) {

tt := []struct {
name string
e0 *contract.EgressConfig
e1 *contract.EgressConfig
expected *contract.EgressConfig
}{
{
name: "e0 nil",
e1: &contract.EgressConfig{
Retry: 42,
},
expected: &contract.EgressConfig{
Retry: 42,
},
},
{
name: "e1 nil",
e0: &contract.EgressConfig{
Retry: 42,
},
expected: &contract.EgressConfig{
Retry: 42,
},
},
{
name: "e0 retry priority",
e0: &contract.EgressConfig{
Retry: 42,
},
e1: &contract.EgressConfig{
Retry: 43,
},
expected: &contract.EgressConfig{
Retry: 42,
},
},
{
name: "e0 dead letter priority",
e0: &contract.EgressConfig{
DeadLetter: "e0",
},
e1: &contract.EgressConfig{
Retry: 43,
DeadLetter: "e1",
},
expected: &contract.EgressConfig{
DeadLetter: "e0",
Retry: 43,
},
},
{
name: "e0 timeout priority",
e0: &contract.EgressConfig{
DeadLetter: "e0",
Timeout: 100,
},
e1: &contract.EgressConfig{
Retry: 43,
DeadLetter: "e1",
Timeout: 101,
},
expected: &contract.EgressConfig{
DeadLetter: "e0",
Retry: 43,
Timeout: 100,
},
},
{
name: "e0 backoff delay priority",
e0: &contract.EgressConfig{
DeadLetter: "e0",
Timeout: 100,
BackoffDelay: 4001,
},
e1: &contract.EgressConfig{
Retry: 43,
DeadLetter: "e1",
Timeout: 101,
BackoffDelay: 4000,
},
expected: &contract.EgressConfig{
DeadLetter: "e0",
Retry: 43,
Timeout: 100,
BackoffDelay: 4001,
},
},
{
name: "e0 backoff policy priority",
e0: &contract.EgressConfig{
DeadLetter: "e0",
Timeout: 100,
BackoffDelay: 4001,
BackoffPolicy: 0,
},
e1: &contract.EgressConfig{
Retry: 43,
DeadLetter: "e1",
Timeout: 101,
BackoffDelay: 4000,
BackoffPolicy: 1,
},
expected: &contract.EgressConfig{
DeadLetter: "e0",
Retry: 43,
Timeout: 100,
BackoffDelay: 4001,
BackoffPolicy: 0,
},
},
}

for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
opts := []cmp.Option{
cmpopts.IgnoreTypes(
protoimpl.MessageState{},
protoimpl.SizeCache(0),
protoimpl.UnknownFields{},
),
}
if diff := cmp.Diff(tc.expected, MergeEgressConfig(tc.e0, tc.e1), opts...); diff != "" {
t.Errorf("(-want, +got) %s", diff)
}
})
}
}
19 changes: 11 additions & 8 deletions control-plane/pkg/reconciler/trigger/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,11 @@ func (r *Reconciler) reconcileKind(ctx context.Context, trigger *eventing.Trigge
}
triggerIndex := coreconfig.FindEgress(ct.Resources[brokerIndex].Egresses, trigger.UID)

triggerConfig, err := r.getTriggerConfig(ctx, trigger)
triggerConfig, err := r.getTriggerConfig(ctx, broker, trigger)
if err != nil {
return statusConditionManager.failedToResolveTriggerConfig(err)
}
statusConditionManager.subscriberResolved(
fmt.Sprintf("Subscriber will receive events with the delivery order: %s", triggerConfig.DeliveryOrder.String()),
)
statusConditionManager.subscriberResolved(triggerConfig)

changed := coreconfig.AddOrUpdateEgressConfig(ct, brokerIndex, triggerConfig, triggerIndex)

Expand Down Expand Up @@ -266,7 +264,7 @@ func (r *Reconciler) finalizeKind(ctx context.Context, trigger *eventing.Trigger
return nil
}

func (r *Reconciler) getTriggerConfig(ctx context.Context, trigger *eventing.Trigger) (*contract.Egress, error) {
func (r *Reconciler) getTriggerConfig(ctx context.Context, broker *eventing.Broker, trigger *eventing.Trigger) (*contract.Egress, error) {
destination, err := r.Resolver.URIFromDestinationV1(ctx, trigger.Spec.Subscriber, trigger)
if err != nil {
return nil, fmt.Errorf("failed to resolve Trigger.Spec.Subscriber: %w", err)
Expand All @@ -285,11 +283,16 @@ func (r *Reconciler) getTriggerConfig(ctx context.Context, trigger *eventing.Tri
}
}

egressConfig, err := coreconfig.EgressConfigFromDelivery(ctx, r.Resolver, trigger, trigger.Spec.Delivery, r.Configs.DefaultBackoffDelayMs)
triggerEgressConfig, err := coreconfig.EgressConfigFromDelivery(ctx, r.Resolver, trigger, trigger.Spec.Delivery, r.Configs.DefaultBackoffDelayMs)
if err != nil {
return nil, fmt.Errorf("[trigger] %w", err)
}
brokerEgressConfig, err := coreconfig.EgressConfigFromDelivery(ctx, r.Resolver, broker, broker.Spec.Delivery, r.Configs.DefaultBackoffDelayMs)
if err != nil {
return nil, err
return nil, fmt.Errorf("[broker] %w", err)
}
egress.EgressConfig = egressConfig
// Merge Broker and Trigger egress configuration prioritizing the Trigger configuration.
egress.EgressConfig = coreconfig.MergeEgressConfig(triggerEgressConfig, brokerEgressConfig)

deliveryOrderAnnotationValue, ok := trigger.Annotations[deliveryOrderAnnotation]
if ok {
Expand Down
17 changes: 15 additions & 2 deletions control-plane/pkg/reconciler/trigger/trigger_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"knative.dev/pkg/reconciler"

"knative.dev/eventing-kafka-broker/control-plane/pkg/config"
"knative.dev/eventing-kafka-broker/control-plane/pkg/contract"
)

type statusConditionManager struct {
Expand Down Expand Up @@ -122,10 +123,22 @@ func (m *statusConditionManager) failedToUpdateDispatcherPodsAnnotation(err erro
)
}

func (m *statusConditionManager) subscriberResolved(details string) {
func (m *statusConditionManager) subscriberResolved(egress *contract.Egress) {
m.Trigger.GetConditionSet().Manage(&m.Trigger.Status).MarkTrueWithReason(
eventing.TriggerConditionSubscriberResolved,
string(eventing.TriggerConditionSubscriberResolved),
details,
fmt.Sprintf("Subscriber will receive events with the delivery order: %s", egress.DeliveryOrder.String()),
)

if isDeadLetterSinkConfigured(egress.EgressConfig) {
m.Trigger.Status.MarkDeadLetterSinkResolvedSucceeded()
uri, _ /* safe to ignore */ := apis.ParseURL(egress.EgressConfig.DeadLetter)
m.Trigger.Status.DeadLetterURI = uri
} else {
m.Trigger.Status.MarkDeadLetterSinkNotConfigured()
}
}

func isDeadLetterSinkConfigured(egressConfig *contract.EgressConfig) bool {
return egressConfig != nil && egressConfig.DeadLetter != ""
}
10 changes: 10 additions & 0 deletions control-plane/pkg/reconciler/trigger/trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs)
reconcilertesting.WithTriggerDependencyReady(),
reconcilertesting.WithTriggerBrokerReady(),
withTriggerSubscriberResolvedSucceeded(contract.DeliveryOrder_UNORDERED),
reconcilertesting.WithTriggerDeadLetterSinkNotConfigured(),
),
},
},
Expand Down Expand Up @@ -212,6 +213,7 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs)
reconcilertesting.WithTriggerBrokerReady(),
withTriggerSubscriberResolvedSucceeded(contract.DeliveryOrder_ORDERED),
reconcilertesting.WithAnnotation(deliveryOrderAnnotation, deliveryOrderOrdered),
reconcilertesting.WithTriggerDeadLetterSinkNotConfigured(),
),
},
},
Expand Down Expand Up @@ -275,6 +277,7 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs)
reconcilertesting.WithTriggerBrokerReady(),
withTriggerSubscriberResolvedSucceeded(contract.DeliveryOrder_UNORDERED),
reconcilertesting.WithAnnotation(deliveryOrderAnnotation, deliveryOrderUnordered),
reconcilertesting.WithTriggerDeadLetterSinkNotConfigured(),
),
},
},
Expand Down Expand Up @@ -344,6 +347,8 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs)
reconcilertesting.WithTriggerDependencyReady(),
reconcilertesting.WithTriggerBrokerReady(),
withTriggerSubscriberResolvedSucceeded(contract.DeliveryOrder_UNORDERED),
reconcilertesting.WithTriggerDeadLetterSinkResolvedSucceeded(),
reconcilertesting.WithTriggerStatusDeadLetterSinkURI("http://localhost/path"),
),
},
},
Expand Down Expand Up @@ -456,6 +461,7 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs)
reconcilertesting.WithTriggerDependencyReady(),
reconcilertesting.WithTriggerBrokerReady(),
withTriggerSubscriberResolvedSucceeded(contract.DeliveryOrder_UNORDERED),
reconcilertesting.WithTriggerDeadLetterSinkNotConfigured(),
),
},
},
Expand Down Expand Up @@ -753,6 +759,7 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs)
reconcilertesting.WithTriggerDependencyReady(),
reconcilertesting.WithTriggerBrokerReady(),
withTriggerSubscriberResolvedSucceeded(contract.DeliveryOrder_UNORDERED),
reconcilertesting.WithTriggerDeadLetterSinkNotConfigured(),
),
},
},
Expand Down Expand Up @@ -939,6 +946,7 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs)
reconcilertesting.WithTriggerDependencyReady(),
reconcilertesting.WithTriggerBrokerReady(),
withTriggerSubscriberResolvedSucceeded(contract.DeliveryOrder_UNORDERED),
reconcilertesting.WithTriggerDeadLetterSinkNotConfigured(),
),
},
},
Expand Down Expand Up @@ -1120,6 +1128,7 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs)
reconcilertesting.WithTriggerDependencyReady(),
reconcilertesting.WithTriggerBrokerReady(),
withTriggerSubscriberResolvedSucceeded(contract.DeliveryOrder_UNORDERED),
reconcilertesting.WithTriggerDeadLetterSinkNotConfigured(),
),
},
},
Expand Down Expand Up @@ -1365,6 +1374,7 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs)
reconcilertesting.WithTriggerDependencyReady(),
reconcilertesting.WithTriggerBrokerReady(),
withTriggerSubscriberResolvedSucceeded(contract.DeliveryOrder_UNORDERED),
reconcilertesting.WithTriggerDeadLetterSinkNotConfigured(),
),
},
},
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ require (
k8s.io/apiserver v0.20.7
k8s.io/client-go v0.20.7
k8s.io/utils v0.0.0-20210111153108-fddb29f9d009
knative.dev/eventing v0.24.0
knative.dev/eventing v0.24.1-0.20210708130023-221dfdfced62
knative.dev/hack v0.0.0-20210622141627-e28525d8d260
knative.dev/pkg v0.0.0-20210628225612-51cfaabbcdf6
knative.dev/reconciler-test v0.0.0-20210630182710-2a6d91dfee1e
knative.dev/pkg v0.0.0-20210706174620-fe90576475ca
knative.dev/reconciler-test v0.0.0-20210707164418-32f0df0c7399
)
13 changes: 6 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1222,17 +1222,16 @@ k8s.io/kube-openapi v0.0.0-20201113171705-d219536bb9fd/go.mod h1:WOJ3KddDSol4tAG
k8s.io/utils v0.0.0-20201110183641-67b214c5f920/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
k8s.io/utils v0.0.0-20210111153108-fddb29f9d009 h1:0T5IaWHO3sJTEmCP6mUlBvMukxPKUQWqiI/YuiBNMiQ=
k8s.io/utils v0.0.0-20210111153108-fddb29f9d009/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
knative.dev/eventing v0.24.0 h1:CoaQwZBizxZyOFJUvFcyb7vYSvpYBmfb4IYRNWUdTPE=
knative.dev/eventing v0.24.0/go.mod h1:9xo0SWkIfpXrx0lvGQO7MUlPF8cu+QCMd2gGxj6wxrU=
knative.dev/eventing v0.24.1-0.20210708130023-221dfdfced62 h1:UiWAQRr1xifUbK7znIJSLQmnx6hnUIhpJQ1coMO6S9U=
knative.dev/eventing v0.24.1-0.20210708130023-221dfdfced62/go.mod h1:vrmE0f+CSunMBBCvb1lyO5OTsBN72bipJOsC0WoJvj4=
knative.dev/hack v0.0.0-20210622141627-e28525d8d260 h1:f2eMtOubAOc/Q7JlvFPDKXiPlJVK+VpX2Cot8hRzCgQ=
knative.dev/hack v0.0.0-20210622141627-e28525d8d260/go.mod h1:PHt8x8yX5Z9pPquBEfIj0X66f8iWkWfR0S/sarACJrI=
knative.dev/hack/schema v0.0.0-20210622141627-e28525d8d260/go.mod h1:ffjwmdcrH5vN3mPhO8RrF2KfNnbHeCE2C60A+2cv3U0=
knative.dev/pkg v0.0.0-20210622173328-dd0db4b05c80/go.mod h1:kGegTnbZ+ljFjAE3E1+8wgaH2LMv8qYi+72o3F3cbdc=
knative.dev/pkg v0.0.0-20210628225612-51cfaabbcdf6 h1:DEZJpuiDMEzdJQZI0+ZAqsLe5uwsHHYOgrjcg0Sv8wA=
knative.dev/pkg v0.0.0-20210628225612-51cfaabbcdf6/go.mod h1:kGegTnbZ+ljFjAE3E1+8wgaH2LMv8qYi+72o3F3cbdc=
knative.dev/reconciler-test v0.0.0-20210623134345-88c84739abd9/go.mod h1:4wqv2WyWUC5yhTesRUVwgjv/fHTHny1RYBfdB6tVDok=
knative.dev/reconciler-test v0.0.0-20210630182710-2a6d91dfee1e h1:OlvD5NiM8rl3j56NIQfmwSazBffwofQE+XDAt88+BV4=
knative.dev/reconciler-test v0.0.0-20210630182710-2a6d91dfee1e/go.mod h1:4wqv2WyWUC5yhTesRUVwgjv/fHTHny1RYBfdB6tVDok=
knative.dev/pkg v0.0.0-20210706174620-fe90576475ca h1:WF0VUpn7S8RvhQ1Q419NAlI+iFJUBTDmVR7lbMzhGMk=
knative.dev/pkg v0.0.0-20210706174620-fe90576475ca/go.mod h1:kGegTnbZ+ljFjAE3E1+8wgaH2LMv8qYi+72o3F3cbdc=
knative.dev/reconciler-test v0.0.0-20210707164418-32f0df0c7399 h1:qPbQKtiWTL2Pw5r44CgxN9wxy7SN+drHlJMxvL7D6DA=
knative.dev/reconciler-test v0.0.0-20210707164418-32f0df0c7399/go.mod h1:4wqv2WyWUC5yhTesRUVwgjv/fHTHny1RYBfdB6tVDok=
modernc.org/cc v1.0.0/go.mod h1:1Sk4//wdnYJiUIxnW8ddKpaOJCF37yAdqYnkxUpaYxw=
modernc.org/golex v1.0.0/go.mod h1:b/QX9oBD/LhixY6NDh+IdGv17hgB+51fET1i2kPSmvk=
modernc.org/mathutil v1.0.0/go.mod h1:wU0vUrJsVWBZ4P6e7xtFJEhFSNsfRLJ8H458uRjg03k=
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
language: go

sudo: false

go:
- 1.4
- 1.5
- 1.6
- tip

script:
- go test -bench . -benchmem -v ./...
Empty file modified third_party/VENDOR-LICENSE/github.com/hashicorp/go-uuid/LICENSE
100755 → 100644
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# uuid [![Build Status](https://travis-ci.org/hashicorp/go-uuid.svg?branch=master)](https://travis-ci.org/hashicorp/go-uuid)

Generates UUID-format strings using high quality, _purely random_ bytes. It is **not** intended to be RFC compliant, merely to use a well-understood string representation of a 128-bit value. It can also parse UUID-format strings into their component bytes.

Documentation
=============

The full documentation is available on [Godoc](http://godoc.org/github.com/hashicorp/go-uuid).
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
module github.com/hashicorp/go-uuid
Loading

0 comments on commit 5421f7b

Please sign in to comment.