diff --git a/go.mod b/go.mod index 98a9361d9..d5f5715c8 100644 --- a/go.mod +++ b/go.mod @@ -74,7 +74,6 @@ require ( github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect github.com/xeipuuv/gojsonschema v1.1.0 golang.org/x/crypto v0.0.0-20200311171314-f7b00557c8c4 - golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6 golang.org/x/time v0.0.0-20191024005414-555d28b269f0 gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0 // indirect gopkg.in/karlseguin/expect.v1 v1.0.1 // indirect diff --git a/zeroex/orderwatch/order_watcher.go b/zeroex/orderwatch/order_watcher.go index f9e3320ed..0c2340448 100644 --- a/zeroex/orderwatch/order_watcher.go +++ b/zeroex/orderwatch/order_watcher.go @@ -21,7 +21,6 @@ import ( ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/event" logger "github.com/sirupsen/logrus" - "golang.org/x/sync/errgroup" ) const ( @@ -145,22 +144,62 @@ func (w *Watcher) Watch(ctx context.Context) error { w.wasStartedOnce = true w.mu.Unlock() - g, innerCtx := errgroup.WithContext(ctx) - namedLoops := []struct { - loop func(context.Context) error - name string - }{{w.mainLoop, "mainLoop"}, {w.cleanupLoop, "cleanupLoop"}, {w.removedCheckerLoop, "removedCheckerLoop"}} - for _, namedLoop := range namedLoops { - g.Go(func() error { - err := namedLoop.loop(innerCtx) - if err != nil { - logger.WithError(err).Error(fmt.Sprintf("error in orderwatcher %s", namedLoop.name)) - } + // Create a child context so that we can preemptively cancel if there is an + // error. + innerCtx, cancel := context.WithCancel(ctx) + defer cancel() + + // A waitgroup lets us wait for all goroutines to exit. + wg := &sync.WaitGroup{} + + // Start some independent goroutines, each with a separate channel for communicating errors. + mainLoopErrChan := make(chan error, 1) + wg.Add(1) + go func() { + defer wg.Done() + mainLoopErrChan <- w.mainLoop(innerCtx) + }() + cleanupLoopErrChan := make(chan error, 1) + wg.Add(1) + go func() { + defer wg.Done() + cleanupLoopErrChan <- w.cleanupLoop(innerCtx) + }() + removedCheckerLoopErrChan := make(chan error, 1) + wg.Add(1) + go func() { + defer wg.Done() + removedCheckerLoopErrChan <- w.removedCheckerLoop(innerCtx) + }() + + // If any error channel returns a non-nil error, we cancel the inner context + // and return the error. Note that this means we only return the first error + // that occurs. + select { + case err := <-mainLoopErrChan: + if err != nil { + cancel() + logger.WithError(err).Error("error in orderwatcher mainLoop") return err - }) + } + case err := <-cleanupLoopErrChan: + if err != nil { + cancel() + logger.WithError(err).Error("error in orderwatcher cleanupLoop") + return err + } + case err := <-removedCheckerLoopErrChan: + if err != nil { + cancel() + logger.WithError(err).Error("error in orderwatcher removedCheckerLoop") + return err + } } - // Wait for all loops to return nil, or for any loop to return an error. - return g.Wait() + + // Wait for all goroutines to exit. If we reached here it means we are done + // and there are no errors. + wg.Wait() + return nil } func (w *Watcher) mainLoop(ctx context.Context) error {