Skip to content
This repository has been archived by the owner on Sep 2, 2024. It is now read-only.

Commit

Permalink
update to latest eventing deps (#692)
Browse files Browse the repository at this point in the history
  • Loading branch information
skonto authored Jun 8, 2021
1 parent 93e0220 commit 2f12463
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 10 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ require (
k8s.io/client-go v0.19.7
k8s.io/utils v0.0.0-20200729134348-d5654de09c73
knative.dev/control-protocol v0.0.0-20210604132745-1e48321b7e9b
knative.dev/eventing v0.23.1-0.20210608085741-b05d993d260e
knative.dev/eventing v0.23.1-0.20210608130141-92d85bf916e9
knative.dev/hack v0.0.0-20210601210329-de04b70e00d0
knative.dev/networking v0.0.0-20210603073844-5521a8b92648
knative.dev/pkg v0.0.0-20210602095030-0e61d6763dd6
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1311,8 +1311,8 @@ k8s.io/utils v0.0.0-20200729134348-d5654de09c73 h1:uJmqzgNWG7XyClnU/mLPBWwfKKF1K
k8s.io/utils v0.0.0-20200729134348-d5654de09c73/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
knative.dev/control-protocol v0.0.0-20210604132745-1e48321b7e9b h1:LKpJLida2PnUhSHZ1Eu+kPHUDJEiy1WX5Uec075k43Y=
knative.dev/control-protocol v0.0.0-20210604132745-1e48321b7e9b/go.mod h1:iRE6O4mY88gH6xWl6ZFhG6sJobzrZjFrz5xhX4h93Ns=
knative.dev/eventing v0.23.1-0.20210608085741-b05d993d260e h1:ti5tC3CoSxRcjfEvKu11zM9/tZpfjiSGvqUAqj/XojU=
knative.dev/eventing v0.23.1-0.20210608085741-b05d993d260e/go.mod h1:mYAQz8Jk3S5Rd24Drmp7hWX+U3JzTXUEvMmfI62VlOM=
knative.dev/eventing v0.23.1-0.20210608130141-92d85bf916e9 h1:yo2PJ58hGS2PELz7fsSHNJsmcH2fzCwij0kRgk73vAA=
knative.dev/eventing v0.23.1-0.20210608130141-92d85bf916e9/go.mod h1:mYAQz8Jk3S5Rd24Drmp7hWX+U3JzTXUEvMmfI62VlOM=
knative.dev/hack v0.0.0-20210601210329-de04b70e00d0 h1:GXGtVDptFpq2aFTh/m+Xii0kvGIcGZQQtx/sHinZqZs=
knative.dev/hack v0.0.0-20210601210329-de04b70e00d0/go.mod h1:PHt8x8yX5Z9pPquBEfIj0X66f8iWkWfR0S/sarACJrI=
knative.dev/hack/schema v0.0.0-20210601210329-de04b70e00d0/go.mod h1:ffjwmdcrH5vN3mPhO8RrF2KfNnbHeCE2C60A+2cv3U0=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func (f *FanoutMessageHandler) ServeHTTP(response nethttp.ResponseWriter, reques
}

// ParseDispatchResultAndReportMetric processes the dispatch result and records the related channel metrics with the appropriate context
func ParseDispatchResultAndReportMetrics(result dispatchResult, reporter channel.StatsReporter, reportArgs channel.ReportArgs) error {
func ParseDispatchResultAndReportMetrics(result DispatchResult, reporter channel.StatsReporter, reportArgs channel.ReportArgs) error {
if result.info != nil && result.info.Time > channel.NoDuration {
if result.info.ResponseCode > channel.NoResponse {
_ = reporter.ReportEventDispatchTime(&reportArgs, result.info.ResponseCode, result.info.Time)
Expand All @@ -245,20 +245,20 @@ func ParseDispatchResultAndReportMetrics(result dispatchResult, reporter channel

// dispatch takes the event, fans it out to each subscription in subs. If all the fanned out
// events return successfully, then return nil. Else, return an error.
func (f *FanoutMessageHandler) dispatch(ctx context.Context, subs []Subscription, bufferedMessage binding.Message, additionalHeaders nethttp.Header) dispatchResult {
func (f *FanoutMessageHandler) dispatch(ctx context.Context, subs []Subscription, bufferedMessage binding.Message, additionalHeaders nethttp.Header) DispatchResult {
// Bind the lifecycle of the buffered message to the number of subs
bufferedMessage = buffering.WithAcksBeforeFinish(bufferedMessage, len(subs))

errorCh := make(chan dispatchResult, len(subs))
errorCh := make(chan DispatchResult, len(subs))
for _, sub := range subs {
go func(s Subscription) {
dispatchedResultPerSub, err := f.makeFanoutRequest(ctx, bufferedMessage, additionalHeaders, s)
errorCh <- dispatchResult{err: err, info: dispatchedResultPerSub}
errorCh <- DispatchResult{err: err, info: dispatchedResultPerSub}
}(sub)
}

var totalDispatchTimeForFanout time.Duration = channel.NoDuration
dispatchResultForFanout := dispatchResult{
dispatchResultForFanout := DispatchResult{
info: &channel.DispatchExecutionInfo{
Time: channel.NoDuration,
ResponseCode: channel.NoResponse,
Expand Down Expand Up @@ -307,7 +307,22 @@ func (f *FanoutMessageHandler) makeFanoutRequest(ctx context.Context, message bi
)
}

type dispatchResult struct {
type DispatchResult struct {
err error
info *channel.DispatchExecutionInfo
}

func (d DispatchResult) Error() error {
return d.err
}

func (d DispatchResult) Info() *channel.DispatchExecutionInfo {
return d.info
}

func NewDispatchResult(err error, info *channel.DispatchExecutionInfo) DispatchResult {
return DispatchResult{
err: err,
info: info,
}
}
2 changes: 1 addition & 1 deletion vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1106,7 +1106,7 @@ knative.dev/control-protocol/pkg/message
knative.dev/control-protocol/pkg/network
knative.dev/control-protocol/pkg/reconciler
knative.dev/control-protocol/pkg/service
# knative.dev/eventing v0.23.1-0.20210608085741-b05d993d260e
# knative.dev/eventing v0.23.1-0.20210608130141-92d85bf916e9
## explicit
knative.dev/eventing/pkg/adapter/v2
knative.dev/eventing/pkg/adapter/v2/test
Expand Down

0 comments on commit 2f12463

Please sign in to comment.