diff --git a/pkg/dispatcher/jetstream_dispatcher.go b/pkg/dispatcher/jetstream_dispatcher.go index 49d5b736a..8d7ff2a7d 100644 --- a/pkg/dispatcher/jetstream_dispatcher.go +++ b/pkg/dispatcher/jetstream_dispatcher.go @@ -25,6 +25,8 @@ import ( "sync/atomic" "time" + "knative.dev/eventing/pkg/kncloudevents" + "go.opencensus.io/trace" "knative.dev/eventing-natss/pkg/tracing" @@ -317,9 +319,24 @@ func (s *jetSubscriptionsSupervisor) subscribe(ctx context.Context, channel even } var deadLetter *url.URL - if subscription.Delivery != nil && subscription.Delivery.DeadLetterSink != nil && !subscription.Delivery.DeadLetterSink.URI.IsEmpty() { - deadLetter = subscription.Delivery.DeadLetterSink.URI.URL() - s.logger.Debug("dispatch message", zap.String("deadLetter", deadLetter.String())) + retryConfig := kncloudevents.NoRetries() + + if subscription.Delivery != nil { + // Extract The DeadLetterSink From The Subscriber.Delivery + if subscription.Delivery != nil && subscription.Delivery.DeadLetterSink != nil && !subscription.Delivery.DeadLetterSink.URI.IsEmpty() { + deadLetter = subscription.Delivery.DeadLetterSink.URI.URL() + s.logger.Debug("dispatch message", zap.String("deadLetter", deadLetter.String())) + } + + // Extract The RetryConfig From The Subscriber.Delivery + var err error + retryConfig, err = kncloudevents.RetryConfigFromDeliverySpec(*subscription.Delivery) + if err != nil { + s.logger.Error("Failed To Parse RetryConfig From DeliverySpec - No Retries Will Occur", zap.Error(err)) + } else { + s.logger.Info("Successfully Parsed RetryConfig From DeliverySpec", zap.Int("RetryMax", retryConfig.RetryMax)) + retryConfig.CheckRetry = kncloudevents.SelectiveRetry // Specify Custom CheckRetry Function + } } event := tracing.ConvertNatsMsgToEvent(s.logger, stanMsg) @@ -335,7 +352,7 @@ func (s *jetSubscriptionsSupervisor) subscribe(ctx context.Context, channel even } defer span.End() - executionInfo, err := s.dispatcher.DispatchMessage(ctx, message, additionalHeaders, destination, reply, deadLetter) + executionInfo, err := s.dispatcher.DispatchMessageWithRetries(ctx, message, additionalHeaders, destination, reply, deadLetter, &retryConfig) if err != nil { s.logger.Error("Failed to dispatch message: ", zap.Error(err)) return diff --git a/pkg/dispatcher/natss_dispatcher.go b/pkg/dispatcher/natss_dispatcher.go index e5acd1e82..fcfe68bd7 100644 --- a/pkg/dispatcher/natss_dispatcher.go +++ b/pkg/dispatcher/natss_dispatcher.go @@ -321,9 +321,24 @@ func (s *subscriptionsSupervisor) subscribe(ctx context.Context, channel eventin } var deadLetter *url.URL - if subscription.Delivery != nil && subscription.Delivery.DeadLetterSink != nil && !subscription.Delivery.DeadLetterSink.URI.IsEmpty() { - deadLetter = subscription.Delivery.DeadLetterSink.URI.URL() - s.logger.Debug("dispatch message", zap.String("deadLetter", deadLetter.String())) + retryConfig := kncloudevents.NoRetries() + + if subscription.Delivery != nil { + // Extract The DeadLetterSink From The Subscriber.Delivery + if subscription.Delivery != nil && subscription.Delivery.DeadLetterSink != nil && !subscription.Delivery.DeadLetterSink.URI.IsEmpty() { + deadLetter = subscription.Delivery.DeadLetterSink.URI.URL() + s.logger.Debug("dispatch message", zap.String("deadLetter", deadLetter.String())) + } + + // Extract The RetryConfig From The Subscriber.Delivery + var err error + retryConfig, err = kncloudevents.RetryConfigFromDeliverySpec(*subscription.Delivery) + if err != nil { + s.logger.Error("Failed To Parse RetryConfig From DeliverySpec - No Retries Will Occur", zap.Error(err)) + } else { + s.logger.Info("Successfully Parsed RetryConfig From DeliverySpec", zap.Int("RetryMax", retryConfig.RetryMax)) + retryConfig.CheckRetry = kncloudevents.SelectiveRetry // Specify Custom CheckRetry Function + } } event := tracing.ConvertNatssMsgToEvent(s.logger, stanMsg) @@ -339,7 +354,7 @@ func (s *subscriptionsSupervisor) subscribe(ctx context.Context, channel eventin } defer span.End() - executionInfo, err := s.dispatcher.DispatchMessage(ctx, message, additionalHeaders, destination, reply, deadLetter) + executionInfo, err := s.dispatcher.DispatchMessageWithRetries(ctx, message, additionalHeaders, destination, reply, deadLetter, &retryConfig) if err != nil { s.logger.Error("Failed to dispatch message: ", zap.Error(err)) return