Skip to content

Commit

Permalink
refactor: ensure goroutine-safety of manyIter
Browse files Browse the repository at this point in the history
- use WaitGroup and select for ctx.Done()
- call .Val right after .Next inside Goroutine
  • Loading branch information
hacdias committed Dec 4, 2023
1 parent aff7675 commit 503c996
Showing 1 changed file with 43 additions and 24 deletions.
67 changes: 43 additions & 24 deletions server_routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,50 +124,69 @@ func find[T any](ctx context.Context, routers []router, call func(router) (iter.
}

type manyIter[T any] struct {
ctx context.Context
its []iter.ResultIter[T]
nextCh chan int
next int
ctx context.Context
wg sync.WaitGroup
its []iter.ResultIter[T]
done bool
ch chan iter.Result[T]
val iter.Result[T]
}

func newManyIter[T any](ctx context.Context, its []iter.ResultIter[T]) *manyIter[T] {
nextCh := make(chan int)
mi := &manyIter[T]{
ctx: ctx,
its: its,
ch: make(chan iter.Result[T]),
}

for i, it := range its {
go func(ch chan int, it iter.ResultIter[T], index int) {
for _, it := range its {
mi.wg.Add(1)
go func(it iter.ResultIter[T]) {
defer mi.wg.Done()
for it.Next() {
ch <- index
select {
case mi.ch <- it.Val():
case <-ctx.Done():
return
}
}
}(nextCh, it, i)
}(it)
}

return &manyIter[T]{
ctx: ctx,
its: its,
nextCh: nextCh,
next: -1,
}
go func() {
mi.wg.Wait()
close(mi.ch)
}()

return mi
}

func (mi *manyIter[T]) Next() bool {
if mi.done {
return false
}

select {
case i := <-mi.nextCh:
mi.next = i
return true
case val, ok := <-mi.ch:
if ok {
mi.val = val
} else {
mi.done = true
}
case <-mi.ctx.Done():
mi.next = -1
return false
mi.done = true
}

return !mi.done
}

func (mi *manyIter[T]) Val() iter.Result[T] {
if mi.next == -1 {
return iter.Result[T]{Err: errors.New("no next value")}
}
return mi.its[mi.next].Val()
return mi.val
}

func (mi *manyIter[T]) Close() error {
mi.done = true
mi.wg.Wait()
var err error
for _, it := range mi.its {
err = errors.Join(err, it.Close())
Expand Down

0 comments on commit 503c996

Please sign in to comment.