From 40800e657152d07e1ed486ae30a0f56b8b3c334a Mon Sep 17 00:00:00 2001 From: Mason Liang Date: Wed, 24 Jun 2020 07:35:47 -0600 Subject: [PATCH 1/9] Use errgroup --- go.mod | 1 + zeroex/orderwatch/order_watcher.go | 64 +++++------------------------- 2 files changed, 11 insertions(+), 54 deletions(-) diff --git a/go.mod b/go.mod index d5f5715c8..98a9361d9 100644 --- a/go.mod +++ b/go.mod @@ -74,6 +74,7 @@ require ( github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect github.com/xeipuuv/gojsonschema v1.1.0 golang.org/x/crypto v0.0.0-20200311171314-f7b00557c8c4 + golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6 golang.org/x/time v0.0.0-20191024005414-555d28b269f0 gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0 // indirect gopkg.in/karlseguin/expect.v1 v1.0.1 // indirect diff --git a/zeroex/orderwatch/order_watcher.go b/zeroex/orderwatch/order_watcher.go index 1c6517087..652fd2145 100644 --- a/zeroex/orderwatch/order_watcher.go +++ b/zeroex/orderwatch/order_watcher.go @@ -21,6 +21,7 @@ import ( ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/event" logger "github.com/sirupsen/logrus" + "golang.org/x/sync/errgroup" ) const ( @@ -144,62 +145,17 @@ func (w *Watcher) Watch(ctx context.Context) error { w.wasStartedOnce = true w.mu.Unlock() - // Create a child context so that we can preemptively cancel if there is an - // error. - innerCtx, cancel := context.WithCancel(ctx) - defer cancel() + g, innerCtx := errgroup.WithContext(ctx) - // A waitgroup lets us wait for all goroutines to exit. - wg := &sync.WaitGroup{} - - // Start some independent goroutines, each with a separate channel for communicating errors. - mainLoopErrChan := make(chan error, 1) - wg.Add(1) - go func() { - defer wg.Done() - mainLoopErrChan <- w.mainLoop(innerCtx) - }() - cleanupLoopErrChan := make(chan error, 1) - wg.Add(1) - go func() { - defer wg.Done() - cleanupLoopErrChan <- w.cleanupLoop(innerCtx) - }() - removedCheckerLoopErrChan := make(chan error, 1) - wg.Add(1) - go func() { - defer wg.Done() - removedCheckerLoopErrChan <- w.removedCheckerLoop(innerCtx) - }() - - // If any error channel returns a non-nil error, we cancel the inner context - // and return the error. Note that this means we only return the first error - // that occurs. - select { - case err := <-mainLoopErrChan: - if err != nil { - logger.WithError(err).Error("error in orderwatcher mainLoop") - cancel() - return err - } - case err := <-cleanupLoopErrChan: - if err != nil { - logger.WithError(err).Error("error in orderwatcher cleanupLoop") - cancel() - return err - } - case err := <-removedCheckerLoopErrChan: - if err != nil { - logger.WithError(err).Error("error in orderwatcher removedCheckerLoop") - cancel() - return err - } + loops := []func(context.Context) error{w.mainLoop, w.cleanupLoop, w.removedCheckerLoop} + for _, loop := range loops { + loop := loop + g.Go(func() error { + return loop(innerCtx) + }) } - - // Wait for all goroutines to exit. If we reached here it means we are done - // and there are no errors. - wg.Wait() - return nil + // Wait for all loops to return nil, or for any loop to return an error. + return g.Wait() } func (w *Watcher) mainLoop(ctx context.Context) error { From 75f4264d17e026dfd41ea728d3b8eae31e4594a9 Mon Sep 17 00:00:00 2001 From: Alex Towle Date: Wed, 1 Jul 2020 17:52:27 -0500 Subject: [PATCH 2/9] Fixed data race --- zeroex/orderwatch/order_watcher_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/zeroex/orderwatch/order_watcher_test.go b/zeroex/orderwatch/order_watcher_test.go index 3e11a7f2e..c6d765cb5 100644 --- a/zeroex/orderwatch/order_watcher_test.go +++ b/zeroex/orderwatch/order_watcher_test.go @@ -1673,7 +1673,9 @@ func setupOrderWatcher(ctx context.Context, t *testing.T, ethRPCClient ethrpccli // Start OrderWatcher go func() { err := orderWatcher.Watch(ctx) - require.NoError(t, err) + if err != nil { + panic(err) + } }() // Ensure at least one block has been processed and is stored in the DB From 7fad0bc412c3d6b23ce3135da56e44a301e3f0db Mon Sep 17 00:00:00 2001 From: Mason Liang Date: Tue, 7 Jul 2020 17:16:34 -0600 Subject: [PATCH 3/9] SyncToLatestBlock to prevent race at cleanup --- zeroex/orderwatch/order_watcher_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/zeroex/orderwatch/order_watcher_test.go b/zeroex/orderwatch/order_watcher_test.go index c6d765cb5..5be6e2fad 100644 --- a/zeroex/orderwatch/order_watcher_test.go +++ b/zeroex/orderwatch/order_watcher_test.go @@ -698,6 +698,8 @@ func TestOrderWatcherNoChange(t *testing.T) { lastValidatedBlock: latestStoredBlock, } checkOrderState(t, expectedOrderState, newOrders[0]) + + blockWatcher.SyncToLatestBlock() } func TestOrderWatcherWETHWithdrawAndDeposit(t *testing.T) { From b7f4d3d8289fc1da2c9e793a720f8c95a397e69f Mon Sep 17 00:00:00 2001 From: Mason Liang Date: Tue, 7 Jul 2020 17:27:32 -0600 Subject: [PATCH 4/9] Re-add logging for errors --- zeroex/orderwatch/order_watcher.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/zeroex/orderwatch/order_watcher.go b/zeroex/orderwatch/order_watcher.go index 652fd2145..a38a501a3 100644 --- a/zeroex/orderwatch/order_watcher.go +++ b/zeroex/orderwatch/order_watcher.go @@ -145,16 +145,20 @@ func (w *Watcher) Watch(ctx context.Context) error { w.wasStartedOnce = true w.mu.Unlock() - g, innerCtx := errgroup.WithContext(ctx) + g, ctx := errgroup.WithContext(ctx) loops := []func(context.Context) error{w.mainLoop, w.cleanupLoop, w.removedCheckerLoop} - for _, loop := range loops { - loop := loop + for i, loop := range loops { + i, loop := i, loop g.Go(func() error { - return loop(innerCtx) + err := loop(ctx) + if err != nil { + logger.WithError(err).Errorf("error in orderwatcher loop %v", i) + } + return err }) } - // Wait for all loops to return nil, or for any loop to return an error. + return g.Wait() } From a144863a60977651e87e50ef65c4d0f671641704 Mon Sep 17 00:00:00 2001 From: Mason Liang Date: Tue, 7 Jul 2020 17:29:05 -0600 Subject: [PATCH 5/9] gofmt --- zeroex/orderwatch/order_watcher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zeroex/orderwatch/order_watcher.go b/zeroex/orderwatch/order_watcher.go index a38a501a3..efe53d2a1 100644 --- a/zeroex/orderwatch/order_watcher.go +++ b/zeroex/orderwatch/order_watcher.go @@ -153,7 +153,7 @@ func (w *Watcher) Watch(ctx context.Context) error { g.Go(func() error { err := loop(ctx) if err != nil { - logger.WithError(err).Errorf("error in orderwatcher loop %v", i) + logger.WithError(err).Errorf("error in orderwatcher loop %v", i) } return err }) From bb7dd897169f62430da00b1b50d328a3c1a3ec52 Mon Sep 17 00:00:00 2001 From: Mason Liang Date: Tue, 7 Jul 2020 17:32:11 -0600 Subject: [PATCH 6/9] Remove now unneeded SyncToLatestBlock --- zeroex/orderwatch/order_watcher_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/zeroex/orderwatch/order_watcher_test.go b/zeroex/orderwatch/order_watcher_test.go index 5be6e2fad..c6d765cb5 100644 --- a/zeroex/orderwatch/order_watcher_test.go +++ b/zeroex/orderwatch/order_watcher_test.go @@ -698,8 +698,6 @@ func TestOrderWatcherNoChange(t *testing.T) { lastValidatedBlock: latestStoredBlock, } checkOrderState(t, expectedOrderState, newOrders[0]) - - blockWatcher.SyncToLatestBlock() } func TestOrderWatcherWETHWithdrawAndDeposit(t *testing.T) { From 94d93000078e56a4dde9b458ed222cf230a5bbd6 Mon Sep 17 00:00:00 2001 From: Mason Liang Date: Wed, 8 Jul 2020 17:12:10 -0600 Subject: [PATCH 7/9] Add loop name to logging output --- zeroex/orderwatch/order_watcher.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/zeroex/orderwatch/order_watcher.go b/zeroex/orderwatch/order_watcher.go index efe53d2a1..3a3d8afdd 100644 --- a/zeroex/orderwatch/order_watcher.go +++ b/zeroex/orderwatch/order_watcher.go @@ -147,13 +147,16 @@ func (w *Watcher) Watch(ctx context.Context) error { g, ctx := errgroup.WithContext(ctx) - loops := []func(context.Context) error{w.mainLoop, w.cleanupLoop, w.removedCheckerLoop} - for i, loop := range loops { - i, loop := i, loop + namedLoops := []struct { + loop func(context.Context) error + name string + }{{w.mainLoop, "mainLoop"}, {w.cleanupLoop, "mainLoop"}, {w.removedCheckerLoop, "mainLoop"}} + for _, namedLoop := range namedLoops { + namedLoop := namedLoop g.Go(func() error { - err := loop(ctx) + err := namedLoop.loop(ctx) if err != nil { - logger.WithError(err).Errorf("error in orderwatcher loop %v", i) + logger.WithError(err).Errorf("error in orderwatcher %v", namedLoop.name) } return err }) From 50f02560c64468894f54018e6c71c7a64c98cc1f Mon Sep 17 00:00:00 2001 From: Mason Liang Date: Wed, 8 Jul 2020 17:20:45 -0600 Subject: [PATCH 8/9] Use correct names --- zeroex/orderwatch/order_watcher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zeroex/orderwatch/order_watcher.go b/zeroex/orderwatch/order_watcher.go index 3a3d8afdd..0a34ade06 100644 --- a/zeroex/orderwatch/order_watcher.go +++ b/zeroex/orderwatch/order_watcher.go @@ -150,7 +150,7 @@ func (w *Watcher) Watch(ctx context.Context) error { namedLoops := []struct { loop func(context.Context) error name string - }{{w.mainLoop, "mainLoop"}, {w.cleanupLoop, "mainLoop"}, {w.removedCheckerLoop, "mainLoop"}} + }{{w.mainLoop, "mainLoop"}, {w.cleanupLoop, "cleanupLoop"}, {w.removedCheckerLoop, "removedCheckerLoop"}} for _, namedLoop := range namedLoops { namedLoop := namedLoop g.Go(func() error { From 9fe3e6e8957e225f3294a1ec37d66da5c3772f4e Mon Sep 17 00:00:00 2001 From: Mason Liang Date: Thu, 9 Jul 2020 11:11:00 -0600 Subject: [PATCH 9/9] Add pointer to golang faq --- zeroex/orderwatch/order_watcher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zeroex/orderwatch/order_watcher.go b/zeroex/orderwatch/order_watcher.go index 0a34ade06..3cb09979b 100644 --- a/zeroex/orderwatch/order_watcher.go +++ b/zeroex/orderwatch/order_watcher.go @@ -152,7 +152,7 @@ func (w *Watcher) Watch(ctx context.Context) error { name string }{{w.mainLoop, "mainLoop"}, {w.cleanupLoop, "cleanupLoop"}, {w.removedCheckerLoop, "removedCheckerLoop"}} for _, namedLoop := range namedLoops { - namedLoop := namedLoop + namedLoop := namedLoop // https://golang.org/doc/faq#closures_and_goroutines g.Go(func() error { err := namedLoop.loop(ctx) if err != nil {