Skip to content
This repository has been archived by the owner on Oct 11, 2024. It is now read-only.

Commit

Permalink
Revert using errgroup
Browse files Browse the repository at this point in the history
  • Loading branch information
Mason Liang committed Jun 24, 2020
1 parent 5605416 commit 0f32319
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 16 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ require (
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect
github.com/xeipuuv/gojsonschema v1.1.0
golang.org/x/crypto v0.0.0-20200311171314-f7b00557c8c4
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0 // indirect
gopkg.in/karlseguin/expect.v1 v1.0.1 // indirect
Expand Down
67 changes: 52 additions & 15 deletions zeroex/orderwatch/order_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
ethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
logger "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
)

const (
Expand Down Expand Up @@ -145,22 +144,60 @@ func (w *Watcher) Watch(ctx context.Context) error {
w.wasStartedOnce = true
w.mu.Unlock()

g, innerCtx := errgroup.WithContext(ctx)
namedLoops := []struct {
loop func(context.Context) error
name string
}{{w.mainLoop, "mainLoop"}, {w.cleanupLoop, "cleanupLoop"}, {w.removedCheckerLoop, "removedCheckerLoop"}}
for _, namedLoop := range namedLoops {
g.Go(func() error {
err := namedLoop.loop(innerCtx)
if err != nil {
logger.WithError(err).Error(fmt.Sprintf("error in orderwatcher %s", namedLoop.name))
}
// Create a child context so that we can preemptively cancel if there is an
// error.
innerCtx, cancel := context.WithCancel(ctx)
defer cancel()

// A waitgroup lets us wait for all goroutines to exit.
wg := &sync.WaitGroup{}

// Start four independent goroutines. The main loop, cleanup loop, removed orders
// checker and max expirationTime checker. Use four separate channels to communicate errors.
mainLoopErrChan := make(chan error, 1)
wg.Add(1)
go func() {
defer wg.Done()
mainLoopErrChan <- w.mainLoop(innerCtx)
}()
cleanupLoopErrChan := make(chan error, 1)
wg.Add(1)
go func() {
defer wg.Done()
cleanupLoopErrChan <- w.cleanupLoop(innerCtx)
}()
removedCheckerLoopErrChan := make(chan error, 1)
wg.Add(1)
go func() {
defer wg.Done()
removedCheckerLoopErrChan <- w.removedCheckerLoop(innerCtx)
}()

// If any error channel returns a non-nil error, we cancel the inner context
// and return the error. Note that this means we only return the first error
// that occurs.
select {
case err := <-mainLoopErrChan:
if err != nil {
cancel()
return err
})
}
case err := <-cleanupLoopErrChan:
if err != nil {
cancel()
return err
}
case err := <-removedCheckerLoopErrChan:
if err != nil {
cancel()
return err
}
}
// Wait for all loops to return nil, or for any loop to return an error.
return g.Wait()

// Wait for all goroutines to exit. If we reached here it means we are done
// and there are no errors.
wg.Wait()
return nil
}

func (w *Watcher) mainLoop(ctx context.Context) error {
Expand Down

0 comments on commit 0f32319

Please sign in to comment.