diff --git a/go.mod b/go.mod index d5f5715c8..98a9361d9 100644 --- a/go.mod +++ b/go.mod @@ -74,6 +74,7 @@ require ( github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect github.com/xeipuuv/gojsonschema v1.1.0 golang.org/x/crypto v0.0.0-20200311171314-f7b00557c8c4 + golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6 golang.org/x/time v0.0.0-20191024005414-555d28b269f0 gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0 // indirect gopkg.in/karlseguin/expect.v1 v1.0.1 // indirect diff --git a/zeroex/orderwatch/order_watcher.go b/zeroex/orderwatch/order_watcher.go index 1c6517087..3cb09979b 100644 --- a/zeroex/orderwatch/order_watcher.go +++ b/zeroex/orderwatch/order_watcher.go @@ -21,6 +21,7 @@ import ( ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/event" logger "github.com/sirupsen/logrus" + "golang.org/x/sync/errgroup" ) const ( @@ -144,62 +145,24 @@ func (w *Watcher) Watch(ctx context.Context) error { w.wasStartedOnce = true w.mu.Unlock() - // Create a child context so that we can preemptively cancel if there is an - // error. - innerCtx, cancel := context.WithCancel(ctx) - defer cancel() + g, ctx := errgroup.WithContext(ctx) - // A waitgroup lets us wait for all goroutines to exit. - wg := &sync.WaitGroup{} - - // Start some independent goroutines, each with a separate channel for communicating errors. - mainLoopErrChan := make(chan error, 1) - wg.Add(1) - go func() { - defer wg.Done() - mainLoopErrChan <- w.mainLoop(innerCtx) - }() - cleanupLoopErrChan := make(chan error, 1) - wg.Add(1) - go func() { - defer wg.Done() - cleanupLoopErrChan <- w.cleanupLoop(innerCtx) - }() - removedCheckerLoopErrChan := make(chan error, 1) - wg.Add(1) - go func() { - defer wg.Done() - removedCheckerLoopErrChan <- w.removedCheckerLoop(innerCtx) - }() - - // If any error channel returns a non-nil error, we cancel the inner context - // and return the error. Note that this means we only return the first error - // that occurs. - select { - case err := <-mainLoopErrChan: - if err != nil { - logger.WithError(err).Error("error in orderwatcher mainLoop") - cancel() - return err - } - case err := <-cleanupLoopErrChan: - if err != nil { - logger.WithError(err).Error("error in orderwatcher cleanupLoop") - cancel() - return err - } - case err := <-removedCheckerLoopErrChan: - if err != nil { - logger.WithError(err).Error("error in orderwatcher removedCheckerLoop") - cancel() + namedLoops := []struct { + loop func(context.Context) error + name string + }{{w.mainLoop, "mainLoop"}, {w.cleanupLoop, "cleanupLoop"}, {w.removedCheckerLoop, "removedCheckerLoop"}} + for _, namedLoop := range namedLoops { + namedLoop := namedLoop // https://golang.org/doc/faq#closures_and_goroutines + g.Go(func() error { + err := namedLoop.loop(ctx) + if err != nil { + logger.WithError(err).Errorf("error in orderwatcher %v", namedLoop.name) + } return err - } + }) } - // Wait for all goroutines to exit. If we reached here it means we are done - // and there are no errors. - wg.Wait() - return nil + return g.Wait() } func (w *Watcher) mainLoop(ctx context.Context) error { diff --git a/zeroex/orderwatch/order_watcher_test.go b/zeroex/orderwatch/order_watcher_test.go index b612318e6..27291084c 100644 --- a/zeroex/orderwatch/order_watcher_test.go +++ b/zeroex/orderwatch/order_watcher_test.go @@ -1674,7 +1674,9 @@ func setupOrderWatcher(ctx context.Context, t *testing.T, ethRPCClient ethrpccli // Start OrderWatcher go func() { err := orderWatcher.Watch(ctx) - require.NoError(t, err) + if err != nil { + panic(err) + } }() // Ensure at least one block has been processed and is stored in the DB