diff --git a/go.mod b/go.mod index 5150768eb5..dfb125e047 100644 --- a/go.mod +++ b/go.mod @@ -35,7 +35,7 @@ require ( k8s.io/client-go v0.19.7 k8s.io/utils v0.0.0-20200729134348-d5654de09c73 knative.dev/control-protocol v0.0.0-20210604132745-1e48321b7e9b - knative.dev/eventing v0.23.1-0.20210608085741-b05d993d260e + knative.dev/eventing v0.23.1-0.20210608130141-92d85bf916e9 knative.dev/hack v0.0.0-20210601210329-de04b70e00d0 knative.dev/networking v0.0.0-20210603073844-5521a8b92648 knative.dev/pkg v0.0.0-20210602095030-0e61d6763dd6 diff --git a/go.sum b/go.sum index 1caea91624..c5aa817c8a 100644 --- a/go.sum +++ b/go.sum @@ -1311,8 +1311,8 @@ k8s.io/utils v0.0.0-20200729134348-d5654de09c73 h1:uJmqzgNWG7XyClnU/mLPBWwfKKF1K k8s.io/utils v0.0.0-20200729134348-d5654de09c73/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= knative.dev/control-protocol v0.0.0-20210604132745-1e48321b7e9b h1:LKpJLida2PnUhSHZ1Eu+kPHUDJEiy1WX5Uec075k43Y= knative.dev/control-protocol v0.0.0-20210604132745-1e48321b7e9b/go.mod h1:iRE6O4mY88gH6xWl6ZFhG6sJobzrZjFrz5xhX4h93Ns= -knative.dev/eventing v0.23.1-0.20210608085741-b05d993d260e h1:ti5tC3CoSxRcjfEvKu11zM9/tZpfjiSGvqUAqj/XojU= -knative.dev/eventing v0.23.1-0.20210608085741-b05d993d260e/go.mod h1:mYAQz8Jk3S5Rd24Drmp7hWX+U3JzTXUEvMmfI62VlOM= +knative.dev/eventing v0.23.1-0.20210608130141-92d85bf916e9 h1:yo2PJ58hGS2PELz7fsSHNJsmcH2fzCwij0kRgk73vAA= +knative.dev/eventing v0.23.1-0.20210608130141-92d85bf916e9/go.mod h1:mYAQz8Jk3S5Rd24Drmp7hWX+U3JzTXUEvMmfI62VlOM= knative.dev/hack v0.0.0-20210601210329-de04b70e00d0 h1:GXGtVDptFpq2aFTh/m+Xii0kvGIcGZQQtx/sHinZqZs= knative.dev/hack v0.0.0-20210601210329-de04b70e00d0/go.mod h1:PHt8x8yX5Z9pPquBEfIj0X66f8iWkWfR0S/sarACJrI= knative.dev/hack/schema v0.0.0-20210601210329-de04b70e00d0/go.mod h1:ffjwmdcrH5vN3mPhO8RrF2KfNnbHeCE2C60A+2cv3U0= diff --git a/vendor/knative.dev/eventing/pkg/channel/fanout/fanout_message_handler.go b/vendor/knative.dev/eventing/pkg/channel/fanout/fanout_message_handler.go index ceb0cbef69..fcb1409837 100644 --- a/vendor/knative.dev/eventing/pkg/channel/fanout/fanout_message_handler.go +++ b/vendor/knative.dev/eventing/pkg/channel/fanout/fanout_message_handler.go @@ -226,7 +226,7 @@ func (f *FanoutMessageHandler) ServeHTTP(response nethttp.ResponseWriter, reques } // ParseDispatchResultAndReportMetric processes the dispatch result and records the related channel metrics with the appropriate context -func ParseDispatchResultAndReportMetrics(result dispatchResult, reporter channel.StatsReporter, reportArgs channel.ReportArgs) error { +func ParseDispatchResultAndReportMetrics(result DispatchResult, reporter channel.StatsReporter, reportArgs channel.ReportArgs) error { if result.info != nil && result.info.Time > channel.NoDuration { if result.info.ResponseCode > channel.NoResponse { _ = reporter.ReportEventDispatchTime(&reportArgs, result.info.ResponseCode, result.info.Time) @@ -245,20 +245,20 @@ func ParseDispatchResultAndReportMetrics(result dispatchResult, reporter channel // dispatch takes the event, fans it out to each subscription in subs. If all the fanned out // events return successfully, then return nil. Else, return an error. -func (f *FanoutMessageHandler) dispatch(ctx context.Context, subs []Subscription, bufferedMessage binding.Message, additionalHeaders nethttp.Header) dispatchResult { +func (f *FanoutMessageHandler) dispatch(ctx context.Context, subs []Subscription, bufferedMessage binding.Message, additionalHeaders nethttp.Header) DispatchResult { // Bind the lifecycle of the buffered message to the number of subs bufferedMessage = buffering.WithAcksBeforeFinish(bufferedMessage, len(subs)) - errorCh := make(chan dispatchResult, len(subs)) + errorCh := make(chan DispatchResult, len(subs)) for _, sub := range subs { go func(s Subscription) { dispatchedResultPerSub, err := f.makeFanoutRequest(ctx, bufferedMessage, additionalHeaders, s) - errorCh <- dispatchResult{err: err, info: dispatchedResultPerSub} + errorCh <- DispatchResult{err: err, info: dispatchedResultPerSub} }(sub) } var totalDispatchTimeForFanout time.Duration = channel.NoDuration - dispatchResultForFanout := dispatchResult{ + dispatchResultForFanout := DispatchResult{ info: &channel.DispatchExecutionInfo{ Time: channel.NoDuration, ResponseCode: channel.NoResponse, @@ -307,7 +307,22 @@ func (f *FanoutMessageHandler) makeFanoutRequest(ctx context.Context, message bi ) } -type dispatchResult struct { +type DispatchResult struct { err error info *channel.DispatchExecutionInfo } + +func (d DispatchResult) Error() error { + return d.err +} + +func (d DispatchResult) Info() *channel.DispatchExecutionInfo { + return d.info +} + +func NewDispatchResult(err error, info *channel.DispatchExecutionInfo) DispatchResult { + return DispatchResult{ + err: err, + info: info, + } +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 675b1b52d0..3bb3b047a5 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1106,7 +1106,7 @@ knative.dev/control-protocol/pkg/message knative.dev/control-protocol/pkg/network knative.dev/control-protocol/pkg/reconciler knative.dev/control-protocol/pkg/service -# knative.dev/eventing v0.23.1-0.20210608085741-b05d993d260e +# knative.dev/eventing v0.23.1-0.20210608130141-92d85bf916e9 ## explicit knative.dev/eventing/pkg/adapter/v2 knative.dev/eventing/pkg/adapter/v2/test