Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Glue the new DeliverySpec.Timeout #1034

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 20 additions & 8 deletions control-plane/pkg/core/config/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func EgressConfigFromDelivery(
defaultBackoffDelayMs uint64,
) (*contract.EgressConfig, error) {

if delivery == nil || (delivery.DeadLetterSink == nil && delivery.Retry == nil) {
if delivery == nil || (delivery.DeadLetterSink == nil && delivery.Retry == nil && delivery.Timeout == nil) {
return nil, nil
}

Expand All @@ -72,14 +72,26 @@ func EgressConfigFromDelivery(
if delivery.Retry != nil {
egressConfig.Retry = uint32(*delivery.Retry)
var err error
delay, err := BackoffDelayFromISO8601String(delivery.BackoffDelay, defaultBackoffDelayMs)
delay, err := DurationMillisFromISO8601String(delivery.BackoffDelay, defaultBackoffDelayMs)
if err != nil {
return nil, fmt.Errorf("failed to parse Spec.Delivery.BackoffDelay: %w", err)
}
egressConfig.BackoffDelay = delay
egressConfig.BackoffPolicy = BackoffPolicyFromString(delivery.BackoffPolicy)
}

if delivery.Timeout != nil {
var err error
timeout, err := DurationMillisFromISO8601String(
delivery.Timeout,
0, /* 0 means absent, allowing the data plane to default it */
)
if err != nil {
return nil, fmt.Errorf("failed to parse Spec.Delivery.Timeout: %w", err)
}
egressConfig.Timeout = timeout
}

return egressConfig, nil
}

Expand All @@ -102,17 +114,17 @@ func BackoffPolicyFromString(backoffPolicy *duck.BackoffPolicyType) contract.Bac
}
}

// BackoffDelayFromISO8601String returns the BackoffDelay from the given string.
// DurationMillisFromISO8601String returns the duration in milliseconds from the given string.
//
// Default value is the specified defaultDelay.
func BackoffDelayFromISO8601String(backoffDelay *string, defaultDelay uint64) (uint64, error) {
if backoffDelay == nil {
return defaultDelay, nil
func DurationMillisFromISO8601String(durationStr *string, defaultDurationMillis uint64) (uint64, error) {
if durationStr == nil {
return defaultDurationMillis, nil
}

d, err := period.Parse(*backoffDelay, false)
d, err := period.Parse(*durationStr, false)
if err != nil {
return 0, fmt.Errorf("failed to parse backoffDelay: %w", err)
return 0, fmt.Errorf("failed to parse duration string: %w", err)
}

ms, _ := d.Duration()
Expand Down
22 changes: 19 additions & 3 deletions control-plane/pkg/core/config/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func TestBackoffPolicyFromString(t *testing.T) {
}
}

func TestBackoffDelayFromString(t *testing.T) {
func TestDurationMillisFromISO8601String(t *testing.T) {

tests := []struct {
name string
Expand Down Expand Up @@ -170,12 +170,12 @@ func TestBackoffDelayFromString(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := BackoffDelayFromISO8601String(tt.backoffDelay, tt.defaultDelay)
got, err := DurationMillisFromISO8601String(tt.backoffDelay, tt.defaultDelay)
if (err != nil) != tt.wantError {
t.Errorf("wantError = %v got %v", tt.wantError, err)
}
if got != tt.want {
t.Errorf("BackoffDelayFromISO8601String() = %v, want %v", got, tt.want)
t.Errorf("DurationMillisFromISO8601String() = %v, want %v", got, tt.want)
}
})
}
Expand Down Expand Up @@ -275,13 +275,15 @@ func TestEgressConfigFromDelivery(t *testing.T) {
Retry: pointer.Int32Ptr(3),
BackoffPolicy: &exponential,
BackoffDelay: pointer.StringPtr("PT1S"),
Timeout: pointer.StringPtr("PT2S"),
},
defaultBackoffDelayMs: 0,
want: &contract.EgressConfig{
DeadLetter: url.String(),
Retry: 3,
BackoffPolicy: contract.BackoffPolicy_Exponential,
BackoffDelay: uint64(time.Second.Milliseconds()),
Timeout: uint64(time.Second.Milliseconds() * 2),
},
wantErr: false,
},
Expand All @@ -299,6 +301,20 @@ func TestEgressConfigFromDelivery(t *testing.T) {
},
wantErr: false,
},
{
name: "only timeout",
ctx: ctx,
resolver: resolver.NewURIResolver(ctx, func(name types.NamespacedName) {}),
parent: &eventing.KafkaSink{},
delivery: &eventingduck.DeliverySpec{
Timeout: pointer.StringPtr("PT2S"),
},
defaultBackoffDelayMs: 0,
want: &contract.EgressConfig{
Timeout: uint64(time.Second.Milliseconds() * 2),
},
wantErr: false,
},
{
name: "only retry - use default backoff delay",
ctx: ctx,
Expand Down
2 changes: 2 additions & 0 deletions control-plane/pkg/reconciler/trigger/trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs)
Retry: 3,
BackoffPolicy: contract.BackoffPolicy_Exponential,
BackoffDelay: uint64(time.Second.Milliseconds()),
Timeout: uint64((time.Second * 2).Milliseconds()),
},
},
},
Expand Down Expand Up @@ -1408,6 +1409,7 @@ func withDelivery(trigger *eventing.Trigger) {
Retry: pointer.Int32Ptr(3),
BackoffPolicy: &exponential,
BackoffDelay: pointer.StringPtr("PT1S"),
Timeout: pointer.StringPtr("PT2S"),
}
}

Expand Down