Skip to content
This repository has been archived by the owner on Oct 11, 2024. It is now read-only.

Commit

Permalink
Use errgroup (#852)
Browse files Browse the repository at this point in the history
* Use errgroup

* Fixed data race

* SyncToLatestBlock to prevent race at cleanup

* Re-add logging for errors

* gofmt

* Remove now unneeded SyncToLatestBlock

* Add loop name to logging output

* Use correct names

* Add pointer to golang faq

Co-authored-by: Mason Liang <mason@0x.org>
Co-authored-by: Alex Towle <jalextowle@gmail.com>
  • Loading branch information
3 people committed Jul 13, 2020
1 parent 8b4f762 commit e2767b1
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 53 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ require (
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect
github.com/xeipuuv/gojsonschema v1.1.0
golang.org/x/crypto v0.0.0-20200311171314-f7b00557c8c4
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0 // indirect
gopkg.in/karlseguin/expect.v1 v1.0.1 // indirect
Expand Down
67 changes: 15 additions & 52 deletions zeroex/orderwatch/order_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
ethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
logger "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
)

const (
Expand Down Expand Up @@ -144,62 +145,24 @@ func (w *Watcher) Watch(ctx context.Context) error {
w.wasStartedOnce = true
w.mu.Unlock()

// Create a child context so that we can preemptively cancel if there is an
// error.
innerCtx, cancel := context.WithCancel(ctx)
defer cancel()
g, ctx := errgroup.WithContext(ctx)

// A waitgroup lets us wait for all goroutines to exit.
wg := &sync.WaitGroup{}

// Start some independent goroutines, each with a separate channel for communicating errors.
mainLoopErrChan := make(chan error, 1)
wg.Add(1)
go func() {
defer wg.Done()
mainLoopErrChan <- w.mainLoop(innerCtx)
}()
cleanupLoopErrChan := make(chan error, 1)
wg.Add(1)
go func() {
defer wg.Done()
cleanupLoopErrChan <- w.cleanupLoop(innerCtx)
}()
removedCheckerLoopErrChan := make(chan error, 1)
wg.Add(1)
go func() {
defer wg.Done()
removedCheckerLoopErrChan <- w.removedCheckerLoop(innerCtx)
}()

// If any error channel returns a non-nil error, we cancel the inner context
// and return the error. Note that this means we only return the first error
// that occurs.
select {
case err := <-mainLoopErrChan:
if err != nil {
logger.WithError(err).Error("error in orderwatcher mainLoop")
cancel()
return err
}
case err := <-cleanupLoopErrChan:
if err != nil {
logger.WithError(err).Error("error in orderwatcher cleanupLoop")
cancel()
return err
}
case err := <-removedCheckerLoopErrChan:
if err != nil {
logger.WithError(err).Error("error in orderwatcher removedCheckerLoop")
cancel()
namedLoops := []struct {
loop func(context.Context) error
name string
}{{w.mainLoop, "mainLoop"}, {w.cleanupLoop, "cleanupLoop"}, {w.removedCheckerLoop, "removedCheckerLoop"}}
for _, namedLoop := range namedLoops {
namedLoop := namedLoop // https://golang.org/doc/faq#closures_and_goroutines
g.Go(func() error {
err := namedLoop.loop(ctx)
if err != nil {
logger.WithError(err).Errorf("error in orderwatcher %v", namedLoop.name)
}
return err
}
})
}

// Wait for all goroutines to exit. If we reached here it means we are done
// and there are no errors.
wg.Wait()
return nil
return g.Wait()
}

func (w *Watcher) mainLoop(ctx context.Context) error {
Expand Down
4 changes: 3 additions & 1 deletion zeroex/orderwatch/order_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1674,7 +1674,9 @@ func setupOrderWatcher(ctx context.Context, t *testing.T, ethRPCClient ethrpccli
// Start OrderWatcher
go func() {
err := orderWatcher.Watch(ctx)
require.NoError(t, err)
if err != nil {
panic(err)
}
}()

// Ensure at least one block has been processed and is stored in the DB
Expand Down

0 comments on commit e2767b1

Please sign in to comment.