diff --git a/pkg/channel/message_dispatcher.go b/pkg/channel/message_dispatcher.go index e80ddafa1ad..67b4ef6099d 100644 --- a/pkg/channel/message_dispatcher.go +++ b/pkg/channel/message_dispatcher.go @@ -51,7 +51,7 @@ type MessageDispatcher interface { // DispatchMessageWithRetries dispatches an event to a destination over HTTP. // // The destination and reply are URLs. - DispatchMessageWithRetries(ctx context.Context, message cloudevents.Message, additionalHeaders nethttp.Header, destination *url.URL, reply *url.URL, deadLetter *url.URL, config *kncloudevents.RetryConfig) (*DispatchExecutionInfo, error) + DispatchMessageWithRetries(ctx context.Context, message cloudevents.Message, additionalHeaders nethttp.Header, destination *url.URL, reply *url.URL, deadLetter *url.URL, config *kncloudevents.RetryConfig, transformers ...binding.Transformer) (*DispatchExecutionInfo, error) } // MessageDispatcherImpl is the 'real' MessageDispatcher used everywhere except unit tests. @@ -93,7 +93,7 @@ func (d *MessageDispatcherImpl) DispatchMessage(ctx context.Context, message clo return d.DispatchMessageWithRetries(ctx, message, additionalHeaders, destination, reply, deadLetter, nil) } -func (d *MessageDispatcherImpl) DispatchMessageWithRetries(ctx context.Context, message cloudevents.Message, additionalHeaders nethttp.Header, destination *url.URL, reply *url.URL, deadLetter *url.URL, retriesConfig *kncloudevents.RetryConfig) (*DispatchExecutionInfo, error) { +func (d *MessageDispatcherImpl) DispatchMessageWithRetries(ctx context.Context, message cloudevents.Message, additionalHeaders nethttp.Header, destination *url.URL, reply *url.URL, deadLetter *url.URL, retriesConfig *kncloudevents.RetryConfig, transformers ...binding.Transformer) (*DispatchExecutionInfo, error) { // All messages that should be finished at the end of this function // are placed in this slice var messagesToFinish []binding.Message @@ -154,7 +154,7 @@ func (d *MessageDispatcherImpl) DispatchMessageWithRetries(ctx context.Context, return dispatchExecutionInfo, nil } - ctx, responseResponseMessage, _, dispatchExecutionInfo, err := d.executeRequest(ctx, reply, responseMessage, responseAdditionalHeaders, retriesConfig) + ctx, responseResponseMessage, _, dispatchExecutionInfo, err := d.executeRequest(ctx, reply, responseMessage, responseAdditionalHeaders, retriesConfig, transformers...) if err != nil { // If DeadLetter is configured, then send original message with knative error extensions if deadLetter != nil {