Skip to content

Commit

Permalink
fix: cancel parallel routers
Browse files Browse the repository at this point in the history
  • Loading branch information
dennis-tra authored and Jorropo committed Feb 9, 2023
1 parent 7f581c8 commit e5d83c5
Showing 1 changed file with 12 additions and 8 deletions.
20 changes: 12 additions & 8 deletions compparallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,10 @@ func (r *composableParallel) FindProvidersAsync(ctx context.Context, cid cid.Cid
return r.FindProvidersAsync(ctx, cid, count), nil
},
func() bool {
return atomic.AddInt64(&totalCount, 1) > int64(count) && count != 0
if count == 0 {
return false
}
return atomic.AddInt64(&totalCount, 1) >= int64(count)
},
)

Expand Down Expand Up @@ -369,23 +372,24 @@ func getChannelOrErrorParallel[T any](
return
}

if shouldStop() {
log.Debug("getChannelOrErrorParallel: stopping channel iteration for router ", r.Router,
select {
case <-ctx.Done():
log.Debug("getChannelOrErrorParallel: stopping execution on router on context done inside select for router ", r.Router,
" with timeout ", r.Timeout,
" and ignore errors ", r.IgnoreError,
" closed channel: ", ok,
)
return
case outCh <- val:
}

select {
case <-ctx.Done():
log.Debug("getChannelOrErrorParallel: stopping execution on router on context done inside select for router ", r.Router,
if shouldStop() {
log.Debug("getChannelOrErrorParallel: stopping channel iteration for router ", r.Router,
" with timeout ", r.Timeout,
" and ignore errors ", r.IgnoreError,
" closed channel: ", ok,
)
cancelAll()
return
case outCh <- val:
}
}
}
Expand Down

0 comments on commit e5d83c5

Please sign in to comment.