Skip to content
This repository has been archived by the owner on Oct 11, 2024. It is now read-only.

Use errgroup #852

Merged
merged 9 commits into from
Jul 9, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ require (
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect
github.com/xeipuuv/gojsonschema v1.1.0
golang.org/x/crypto v0.0.0-20200311171314-f7b00557c8c4
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0 // indirect
gopkg.in/karlseguin/expect.v1 v1.0.1 // indirect
Expand Down
64 changes: 12 additions & 52 deletions zeroex/orderwatch/order_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
ethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
logger "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
)

const (
Expand Down Expand Up @@ -144,62 +145,21 @@ func (w *Watcher) Watch(ctx context.Context) error {
w.wasStartedOnce = true
w.mu.Unlock()

// Create a child context so that we can preemptively cancel if there is an
// error.
innerCtx, cancel := context.WithCancel(ctx)
defer cancel()
g, ctx := errgroup.WithContext(ctx)

// A waitgroup lets us wait for all goroutines to exit.
wg := &sync.WaitGroup{}

// Start some independent goroutines, each with a separate channel for communicating errors.
mainLoopErrChan := make(chan error, 1)
wg.Add(1)
go func() {
defer wg.Done()
mainLoopErrChan <- w.mainLoop(innerCtx)
}()
cleanupLoopErrChan := make(chan error, 1)
wg.Add(1)
go func() {
defer wg.Done()
cleanupLoopErrChan <- w.cleanupLoop(innerCtx)
}()
removedCheckerLoopErrChan := make(chan error, 1)
wg.Add(1)
go func() {
defer wg.Done()
removedCheckerLoopErrChan <- w.removedCheckerLoop(innerCtx)
}()

// If any error channel returns a non-nil error, we cancel the inner context
// and return the error. Note that this means we only return the first error
// that occurs.
select {
case err := <-mainLoopErrChan:
if err != nil {
logger.WithError(err).Error("error in orderwatcher mainLoop")
cancel()
return err
}
case err := <-cleanupLoopErrChan:
if err != nil {
logger.WithError(err).Error("error in orderwatcher cleanupLoop")
cancel()
return err
}
case err := <-removedCheckerLoopErrChan:
if err != nil {
logger.WithError(err).Error("error in orderwatcher removedCheckerLoop")
cancel()
loops := []func(context.Context) error{w.mainLoop, w.cleanupLoop, w.removedCheckerLoop}
z2trillion marked this conversation as resolved.
Show resolved Hide resolved
for i, loop := range loops {
i, loop := i, loop
g.Go(func() error {
err := loop(ctx)
if err != nil {
logger.WithError(err).Errorf("error in orderwatcher loop %v", i)
}
return err
}
})
}

// Wait for all goroutines to exit. If we reached here it means we are done
// and there are no errors.
wg.Wait()
return nil
return g.Wait()
}

func (w *Watcher) mainLoop(ctx context.Context) error {
Expand Down
4 changes: 3 additions & 1 deletion zeroex/orderwatch/order_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1673,7 +1673,9 @@ func setupOrderWatcher(ctx context.Context, t *testing.T, ethRPCClient ethrpccli
// Start OrderWatcher
go func() {
err := orderWatcher.Watch(ctx)
require.NoError(t, err)
if err != nil {
panic(err)
}
}()

// Ensure at least one block has been processed and is stored in the DB
Expand Down