From 273ff7dd32346f17ff2eb43464b1695898cc185d Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Tue, 8 Dec 2020 12:18:22 -0600 Subject: [PATCH 1/6] Provide errgroup to BlockWorker interface --- storage/modules/block_storage.go | 40 ++++++++++++++++++++++---------- 1 file changed, 28 insertions(+), 12 deletions(-) diff --git a/storage/modules/block_storage.go b/storage/modules/block_storage.go index 42e519e0..bd3d377b 100644 --- a/storage/modules/block_storage.go +++ b/storage/modules/block_storage.go @@ -19,7 +19,6 @@ import ( "errors" "fmt" "log" - "runtime" "strconv" "strings" @@ -108,9 +107,16 @@ func getTransactionPrefix( // to be done while a block is added/removed from storage // in the same database transaction as the change. type BlockWorker interface { - AddingBlock(context.Context, *types.Block, database.Transaction) (database.CommitWorker, error) + AddingBlock( + context.Context, + *errgroup.Group, + *types.Block, + database.Transaction, + ) (database.CommitWorker, error) + RemovingBlock( context.Context, + *errgroup.Group, *types.Block, database.Transaction, ) (database.CommitWorker, error) @@ -119,19 +125,20 @@ type BlockWorker interface { // BlockStorage implements block specific storage methods // on top of a database.Database and database.Transaction interface. type BlockStorage struct { - db database.Database - numCPU int + db database.Database - workers []BlockWorker + workers []BlockWorker + workerConcurrency int } // NewBlockStorage returns a new BlockStorage. func NewBlockStorage( db database.Database, + workerConcurrency int, ) *BlockStorage { return &BlockStorage{ - db: db, - numCPU: runtime.NumCPU(), + db: db, + workerConcurrency: workerConcurrency, } } @@ -252,7 +259,7 @@ func (b *BlockStorage) pruneBlock( blockIdentifier := blockResponse.Block.BlockIdentifier // Remove all transaction hashes - g, gctx := errgroup.WithContextN(ctx, b.numCPU, b.numCPU) + g, gctx := errgroup.WithContextN(ctx, b.workerConcurrency, b.workerConcurrency) for i := range blockResponse.OtherTransactions { // We need to set variable before calling goroutine // to avoid getting an updated pointer as loop iteration @@ -680,7 +687,7 @@ func (b *BlockStorage) SeeBlock( return nil } - g, gctx := errgroup.WithContextN(ctx, b.numCPU, b.numCPU) + g, gctx := errgroup.WithContextN(ctx, b.workerConcurrency, b.workerConcurrency) for i := range block.Transactions { // We need to set variable before calling goroutine // to avoid getting an updated pointer as loop iteration @@ -780,7 +787,7 @@ func (b *BlockStorage) RemoveBlock( } // Remove all transaction hashes - g, gctx := errgroup.WithContextN(ctx, b.numCPU, b.numCPU) + g, gctx := errgroup.WithContextN(ctx, b.workerConcurrency, b.workerConcurrency) for i := range block.Transactions { // We need to set variable before calling goroutine // to avoid getting an updated pointer as loop iteration @@ -814,13 +821,18 @@ func (b *BlockStorage) callWorkersAndCommit( adding bool, ) error { commitWorkers := make([]database.CommitWorker, len(b.workers)) + + // Provision global errgroup to use for all workers + // so that we don't need to wait at the end of each worker + // for all results. + g, gctx := errgroup.WithContextN(ctx, b.workerConcurrency, b.workerConcurrency) for i, w := range b.workers { var cw database.CommitWorker var err error if adding { - cw, err = w.AddingBlock(ctx, block, txn) + cw, err = w.AddingBlock(gctx, g, block, txn) } else { - cw, err = w.RemovingBlock(ctx, block, txn) + cw, err = w.RemovingBlock(gctx, g, block, txn) } if err != nil { return err @@ -829,6 +841,10 @@ func (b *BlockStorage) callWorkersAndCommit( commitWorkers[i] = cw } + if err := g.Wait(); err != nil { + return err + } + if err := txn.Commit(ctx); err != nil { return err } From bf92a7687118146a05add7f5146b247ba2aa2e20 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Tue, 8 Dec 2020 12:25:01 -0600 Subject: [PATCH 2/6] update balance storage --- storage/modules/balance_storage.go | 33 +++++------------------------- 1 file changed, 5 insertions(+), 28 deletions(-) diff --git a/storage/modules/balance_storage.go b/storage/modules/balance_storage.go index 3108147c..a4aa71ef 100644 --- a/storage/modules/balance_storage.go +++ b/storage/modules/balance_storage.go @@ -184,6 +184,7 @@ func (b *BalanceStorage) Initialize( // AddingBlock is called by BlockStorage when adding a block to storage. func (b *BalanceStorage) AddingBlock( ctx context.Context, + g *errgroup.Group, block *types.Block, transaction database.Transaction, ) (database.CommitWorker, error) { @@ -194,11 +195,6 @@ func (b *BalanceStorage) AddingBlock( // Keep track of how many new accounts have been seen so that the counter // can be updated in a single op. - var newAccounts int - var newAccountsLock sync.Mutex - - // Concurrent execution limited to runtime.NumCPU - g, gctx := errgroup.WithContextN(ctx, b.numCPU, b.numCPU) for i := range changes { // We need to set variable before calling goroutine // to avoid getting an updated pointer as loop iteration @@ -206,7 +202,7 @@ func (b *BalanceStorage) AddingBlock( change := changes[i] g.Go(func() error { newAccount, err := b.UpdateBalance( - gctx, + ctx, transaction, change, block.ParentBlockIdentifier, @@ -219,25 +215,10 @@ func (b *BalanceStorage) AddingBlock( return nil } - newAccountsLock.Lock() - newAccounts++ - newAccountsLock.Unlock() - - return nil + return b.handler.AccountsSeen(ctx, transaction, 1) }) } - if err := g.Wait(); err != nil { - return nil, err - } - - // Update accounts seen - if newAccounts > 0 { - if err := b.handler.AccountsSeen(ctx, transaction, newAccounts); err != nil { - return nil, err - } - } - // Update accounts reconciled var pending int b.pendingReconciliationMutex.Lock(true) @@ -259,6 +240,7 @@ func (b *BalanceStorage) AddingBlock( // RemovingBlock is called by BlockStorage when removing a block from storage. func (b *BalanceStorage) RemovingBlock( ctx context.Context, + g *errgroup.Group, block *types.Block, transaction database.Transaction, ) (database.CommitWorker, error) { @@ -273,7 +255,6 @@ func (b *BalanceStorage) RemovingBlock( var staleAccountsMutex sync.Mutex // Concurrent execution limited to runtime.NumCPU - g, gctx := errgroup.WithContextN(ctx, b.numCPU, b.numCPU) for i := range changes { // We need to set variable before calling goroutine // to avoid getting an updated pointer as loop iteration @@ -281,7 +262,7 @@ func (b *BalanceStorage) RemovingBlock( change := changes[i] g.Go(func() error { shouldRemove, err := b.OrphanBalance( - gctx, + ctx, transaction, change, ) @@ -304,10 +285,6 @@ func (b *BalanceStorage) RemovingBlock( }) } - if err := g.Wait(); err != nil { - return nil, err - } - return func(ctx context.Context) error { if err := b.handler.BlockRemoved(ctx, block, changes); err != nil { return err From 4280a802e77bf10c6350545d0eba038b6e4ac45b Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Tue, 8 Dec 2020 12:28:31 -0600 Subject: [PATCH 3/6] Change remaining block workers --- storage/modules/broadcast_storage.go | 4 ++++ storage/modules/coin_storage.go | 14 ++++++++------ storage/modules/counter_storage.go | 4 ++++ 3 files changed, 16 insertions(+), 6 deletions(-) diff --git a/storage/modules/broadcast_storage.go b/storage/modules/broadcast_storage.go index 2e944301..96a3afeb 100644 --- a/storage/modules/broadcast_storage.go +++ b/storage/modules/broadcast_storage.go @@ -20,6 +20,8 @@ import ( "log" "sync" + "github.com/neilotoole/errgroup" + "github.com/coinbase/rosetta-sdk-go/storage/database" "github.com/coinbase/rosetta-sdk-go/storage/errors" "github.com/coinbase/rosetta-sdk-go/types" @@ -213,6 +215,7 @@ func (b *BroadcastStorage) invokeAddBlockHandlers( // AddingBlock is called by BlockStorage when adding a block. func (b *BroadcastStorage) AddingBlock( ctx context.Context, + g *errgroup.Group, block *types.Block, transaction database.Transaction, ) (database.CommitWorker, error) { @@ -303,6 +306,7 @@ func (b *BroadcastStorage) AddingBlock( // TODO: error if transaction removed after confirmed (means confirmation depth not deep enough) func (b *BroadcastStorage) RemovingBlock( ctx context.Context, + g *errgroup.Group, block *types.Block, transaction database.Transaction, ) (database.CommitWorker, error) { diff --git a/storage/modules/coin_storage.go b/storage/modules/coin_storage.go index ac9e8c58..52a39f48 100644 --- a/storage/modules/coin_storage.go +++ b/storage/modules/coin_storage.go @@ -265,6 +265,7 @@ func (c *CoinStorage) skipOperation( // however, this would put a larger strain on the db. func (c *CoinStorage) updateCoins( // nolint:gocognit ctx context.Context, + g *errgroup.Group, block *types.Block, addCoinCreated bool, dbTx database.Transaction, @@ -298,7 +299,6 @@ func (c *CoinStorage) updateCoins( // nolint:gocognit } } - g, gctx := errgroup.WithContextN(ctx, c.numCPU, c.numCPU) for identifier, val := range addCoins { if _, ok := removeCoins[identifier]; ok { continue @@ -310,7 +310,7 @@ func (c *CoinStorage) updateCoins( // nolint:gocognit op := val g.Go(func() error { if err := c.addCoin( - gctx, + ctx, op.Account, &types.Coin{ CoinIdentifier: op.CoinChange.CoinIdentifier, @@ -336,7 +336,7 @@ func (c *CoinStorage) updateCoins( // nolint:gocognit op := val g.Go(func() error { if err := c.removeCoin( - gctx, + ctx, op.Account, op.CoinChange.CoinIdentifier, dbTx, @@ -348,25 +348,27 @@ func (c *CoinStorage) updateCoins( // nolint:gocognit }) } - return g.Wait() + return nil } // AddingBlock is called by BlockStorage when adding a block. func (c *CoinStorage) AddingBlock( ctx context.Context, + g *errgroup.Group, block *types.Block, transaction database.Transaction, ) (database.CommitWorker, error) { - return nil, c.updateCoins(ctx, block, true, transaction) + return nil, c.updateCoins(ctx, g, block, true, transaction) } // RemovingBlock is called by BlockStorage when removing a block. func (c *CoinStorage) RemovingBlock( ctx context.Context, + g *errgroup.Group, block *types.Block, transaction database.Transaction, ) (database.CommitWorker, error) { - return nil, c.updateCoins(ctx, block, false, transaction) + return nil, c.updateCoins(ctx, g, block, false, transaction) } // GetCoinsTransactional returns all unspent coins for a provided *types.AccountIdentifier. diff --git a/storage/modules/counter_storage.go b/storage/modules/counter_storage.go index 45901869..a5412a90 100644 --- a/storage/modules/counter_storage.go +++ b/storage/modules/counter_storage.go @@ -19,6 +19,8 @@ import ( "fmt" "math/big" + "github.com/neilotoole/errgroup" + "github.com/coinbase/rosetta-sdk-go/storage/database" "github.com/coinbase/rosetta-sdk-go/types" "github.com/coinbase/rosetta-sdk-go/utils" @@ -200,6 +202,7 @@ func (c *CounterStorage) GetTransactional( // AddingBlock is called by BlockStorage when adding a block. func (c *CounterStorage) AddingBlock( ctx context.Context, + g *errgroup.Group, block *types.Block, transaction database.Transaction, ) (database.CommitWorker, error) { @@ -243,6 +246,7 @@ func (c *CounterStorage) AddingBlock( // RemovingBlock is called by BlockStorage when removing a block. func (c *CounterStorage) RemovingBlock( ctx context.Context, + g *errgroup.Group, block *types.Block, transaction database.Transaction, ) (database.CommitWorker, error) { From 03cc858b487c1f750a4a0f7d163e2c4bbb00eba0 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Tue, 8 Dec 2020 18:09:16 -0600 Subject: [PATCH 4/6] update mocks --- mocks/storage/modules/block_worker.go | 29 ++++++++++++++------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/mocks/storage/modules/block_worker.go b/mocks/storage/modules/block_worker.go index 130c1994..8bea40a1 100644 --- a/mocks/storage/modules/block_worker.go +++ b/mocks/storage/modules/block_worker.go @@ -5,6 +5,7 @@ package modules import ( context "context" + errgroup "github.com/neilotoole/errgroup" mock "github.com/stretchr/testify/mock" database "github.com/coinbase/rosetta-sdk-go/storage/database" @@ -16,13 +17,13 @@ type BlockWorker struct { mock.Mock } -// AddingBlock provides a mock function with given fields: _a0, _a1, _a2 -func (_m *BlockWorker) AddingBlock(_a0 context.Context, _a1 *types.Block, _a2 database.Transaction) (database.CommitWorker, error) { - ret := _m.Called(_a0, _a1, _a2) +// AddingBlock provides a mock function with given fields: _a0, _a1, _a2, _a3 +func (_m *BlockWorker) AddingBlock(_a0 context.Context, _a1 *errgroup.Group, _a2 *types.Block, _a3 database.Transaction) (database.CommitWorker, error) { + ret := _m.Called(_a0, _a1, _a2, _a3) var r0 database.CommitWorker - if rf, ok := ret.Get(0).(func(context.Context, *types.Block, database.Transaction) database.CommitWorker); ok { - r0 = rf(_a0, _a1, _a2) + if rf, ok := ret.Get(0).(func(context.Context, *errgroup.Group, *types.Block, database.Transaction) database.CommitWorker); ok { + r0 = rf(_a0, _a1, _a2, _a3) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(database.CommitWorker) @@ -30,8 +31,8 @@ func (_m *BlockWorker) AddingBlock(_a0 context.Context, _a1 *types.Block, _a2 da } var r1 error - if rf, ok := ret.Get(1).(func(context.Context, *types.Block, database.Transaction) error); ok { - r1 = rf(_a0, _a1, _a2) + if rf, ok := ret.Get(1).(func(context.Context, *errgroup.Group, *types.Block, database.Transaction) error); ok { + r1 = rf(_a0, _a1, _a2, _a3) } else { r1 = ret.Error(1) } @@ -39,13 +40,13 @@ func (_m *BlockWorker) AddingBlock(_a0 context.Context, _a1 *types.Block, _a2 da return r0, r1 } -// RemovingBlock provides a mock function with given fields: _a0, _a1, _a2 -func (_m *BlockWorker) RemovingBlock(_a0 context.Context, _a1 *types.Block, _a2 database.Transaction) (database.CommitWorker, error) { - ret := _m.Called(_a0, _a1, _a2) +// RemovingBlock provides a mock function with given fields: _a0, _a1, _a2, _a3 +func (_m *BlockWorker) RemovingBlock(_a0 context.Context, _a1 *errgroup.Group, _a2 *types.Block, _a3 database.Transaction) (database.CommitWorker, error) { + ret := _m.Called(_a0, _a1, _a2, _a3) var r0 database.CommitWorker - if rf, ok := ret.Get(0).(func(context.Context, *types.Block, database.Transaction) database.CommitWorker); ok { - r0 = rf(_a0, _a1, _a2) + if rf, ok := ret.Get(0).(func(context.Context, *errgroup.Group, *types.Block, database.Transaction) database.CommitWorker); ok { + r0 = rf(_a0, _a1, _a2, _a3) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(database.CommitWorker) @@ -53,8 +54,8 @@ func (_m *BlockWorker) RemovingBlock(_a0 context.Context, _a1 *types.Block, _a2 } var r1 error - if rf, ok := ret.Get(1).(func(context.Context, *types.Block, database.Transaction) error); ok { - r1 = rf(_a0, _a1, _a2) + if rf, ok := ret.Get(1).(func(context.Context, *errgroup.Group, *types.Block, database.Transaction) error); ok { + r1 = rf(_a0, _a1, _a2, _a3) } else { r1 = ret.Error(1) } From 9d117dce2cbbfb742b305fe085373afeb49d06c0 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Tue, 8 Dec 2020 18:22:51 -0600 Subject: [PATCH 5/6] first pass at fixing tests --- storage/modules/balance_storage_test.go | 29 +++++++++++++----- storage/modules/block_storage_test.go | 13 ++++---- storage/modules/broadcast_storage_test.go | 37 +++++++++++++++++------ storage/modules/coin_storage_test.go | 37 +++++++++++++++++------ 4 files changed, 85 insertions(+), 31 deletions(-) diff --git a/storage/modules/balance_storage_test.go b/storage/modules/balance_storage_test.go index 1d35fb4d..f912ddbe 100644 --- a/storage/modules/balance_storage_test.go +++ b/storage/modules/balance_storage_test.go @@ -23,6 +23,7 @@ import ( "path" "testing" + "github.com/neilotoole/errgroup" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -1557,8 +1558,10 @@ func TestBlockSyncing(t *testing.T) { t.Run("add genesis block", func(t *testing.T) { dbTx := database.Transaction(ctx) - _, err = storage.AddingBlock(ctx, b0, dbTx) + g, gctx := errgroup.WithContext(ctx) + _, err = storage.AddingBlock(gctx, g, b0, dbTx) assert.NoError(t, err) + assert.NoError(t, g.Wait()) assert.NoError(t, dbTx.Commit(ctx)) amount, err := storage.GetBalance(ctx, addr1, curr, b0.BlockIdentifier.Index) @@ -1582,8 +1585,10 @@ func TestBlockSyncing(t *testing.T) { nil, ).Once() mockHandler.On("AccountsSeen", ctx, dbTx, 1).Return(nil).Once() - _, err = storage.AddingBlock(ctx, b1, dbTx) + g, gctx := errgroup.WithContext(ctx) + _, err = storage.AddingBlock(gctx, g, b1, dbTx) assert.NoError(t, err) + assert.NoError(t, g.Wait()) assert.NoError(t, dbTx.Commit(ctx)) amount, err := storage.GetBalance(ctx, addr1, curr, b0.BlockIdentifier.Index) @@ -1621,8 +1626,10 @@ func TestBlockSyncing(t *testing.T) { ).Once() mockHandler.On("AccountsSeen", ctx, dbTx, 1).Return(nil).Once() mockHandler.On("AccountsReconciled", ctx, dbTx, 1).Return(nil).Once() - _, err = storage.AddingBlock(ctx, b2, dbTx) + g, gctx := errgroup.WithContext(ctx) + _, err = storage.AddingBlock(gctx, g, b2, dbTx) assert.NoError(t, err) + assert.NoError(t, g.Wait()) assert.NoError(t, dbTx.Commit(ctx)) amount, err := storage.GetBalance(ctx, addr1, curr, b0.BlockIdentifier.Index) @@ -1659,8 +1666,10 @@ func TestBlockSyncing(t *testing.T) { t.Run("orphan block 2", func(t *testing.T) { dbTx := database.Transaction(ctx) - commitWorker, err := storage.RemovingBlock(ctx, b2, dbTx) + g, gctx := errgroup.WithContext(ctx) + commitWorker, err := storage.RemovingBlock(gctx, g, b2, dbTx) assert.NoError(t, err) + assert.NoError(t, g.Wait()) assert.NoError(t, dbTx.Commit(ctx)) mockHandler.On("BlockRemoved", ctx, b2, mock.Anything).Return(nil).Once() mockHandler.On("AccountsSeen", ctx, mock.Anything, -1).Return(nil).Once() @@ -1694,8 +1703,10 @@ func TestBlockSyncing(t *testing.T) { t.Run("orphan block 1", func(t *testing.T) { dbTx := database.Transaction(ctx) - commitWorker, err := storage.RemovingBlock(ctx, b1, dbTx) + g, gctx := errgroup.WithContext(ctx) + commitWorker, err := storage.RemovingBlock(gctx, g, b1, dbTx) assert.NoError(t, err) + assert.NoError(t, g.Wait()) assert.NoError(t, dbTx.Commit(ctx)) mockHandler.On("BlockRemoved", ctx, b1, mock.Anything).Return(nil).Once() mockHandler.On("AccountsSeen", ctx, mock.Anything, -1).Return(nil).Once() @@ -1735,8 +1746,10 @@ func TestBlockSyncing(t *testing.T) { nil, ).Once() mockHandler.On("AccountsSeen", ctx, dbTx, 1).Return(nil).Once() - _, err = storage.AddingBlock(ctx, b1, dbTx) + g, gctx := errgroup.WithContext(ctx) + _, err = storage.AddingBlock(gctx, g, b1, dbTx) assert.NoError(t, err) + assert.NoError(t, g.Wait()) assert.NoError(t, dbTx.Commit(ctx)) amount, err := storage.GetBalance(ctx, addr1, curr, b0.BlockIdentifier.Index) @@ -1778,8 +1791,10 @@ func TestBlockSyncing(t *testing.T) { nil, ).Once() mockHandler.On("AccountsSeen", ctx, dbTx, 1).Return(nil).Once() - _, err = storage.AddingBlock(ctx, b2a, dbTx) + g, gctx := errgroup.WithContext(ctx) + _, err = storage.AddingBlock(gctx, g, b2a, dbTx) assert.NoError(t, err) + assert.NoError(t, g.Wait()) assert.NoError(t, dbTx.Commit(ctx)) amount, err := storage.GetBalance(ctx, addr1, curr, b0.BlockIdentifier.Index) diff --git a/storage/modules/block_storage_test.go b/storage/modules/block_storage_test.go index 4f866ae3..f99c2fd4 100644 --- a/storage/modules/block_storage_test.go +++ b/storage/modules/block_storage_test.go @@ -28,7 +28,8 @@ import ( ) const ( - minPruningDepth = 20 + minPruningDepth = 20 + blockWorkerConcurrency = 10 ) func TestHeadBlockIdentifier(t *testing.T) { @@ -53,7 +54,7 @@ func TestHeadBlockIdentifier(t *testing.T) { assert.NoError(t, err) defer database.Close(ctx) - storage := NewBlockStorage(database) + storage := NewBlockStorage(database, blockWorkerConcurrency) t.Run("No head block set", func(t *testing.T) { blockIdentifier, err := storage.GetHeadBlockIdentifier(ctx) @@ -310,7 +311,7 @@ func TestBlock(t *testing.T) { assert.NoError(t, err) defer database.Close(ctx) - storage := NewBlockStorage(database) + storage := NewBlockStorage(database, blockWorkerConcurrency) t.Run("Get non-existent tx", func(t *testing.T) { newestBlock, transaction, err := findTransactionWithDbTransaction( @@ -699,7 +700,7 @@ func TestManyBlocks(t *testing.T) { assert.NoError(t, err) defer database.Close(ctx) - storage := NewBlockStorage(database) + storage := NewBlockStorage(database, blockWorkerConcurrency) for i := int64(0); i < 10000; i++ { blockIdentifier := &types.BlockIdentifier{ @@ -751,7 +752,7 @@ func TestCreateBlockCache(t *testing.T) { assert.NoError(t, err) defer database.Close(ctx) - storage := NewBlockStorage(database) + storage := NewBlockStorage(database, blockWorkerConcurrency) t.Run("no blocks processed", func(t *testing.T) { assert.Equal(t, []*types.BlockIdentifier{}, storage.CreateBlockCache(ctx, minPruningDepth)) @@ -820,7 +821,7 @@ func TestAtTip(t *testing.T) { assert.NoError(t, err) defer database.Close(ctx) - storage := NewBlockStorage(database) + storage := NewBlockStorage(database, blockWorkerConcurrency) tipDelay := int64(100) t.Run("no blocks processed", func(t *testing.T) { diff --git a/storage/modules/broadcast_storage_test.go b/storage/modules/broadcast_storage_test.go index 0a5b0476..d616fc5e 100644 --- a/storage/modules/broadcast_storage_test.go +++ b/storage/modules/broadcast_storage_test.go @@ -20,6 +20,7 @@ import ( "fmt" "testing" + "github.com/neilotoole/errgroup" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -154,8 +155,10 @@ func TestBroadcastStorageBroadcastSuccess(t *testing.T) { block := blocks[0] txn := storage.db.Transaction(ctx) - commitWorker, err := storage.AddingBlock(ctx, block, txn) + g, gctx := errgroup.WithContext(ctx) + commitWorker, err := storage.AddingBlock(gctx, g, block, txn) assert.NoError(t, err) + assert.NoError(t, g.Wait()) err = txn.Commit(ctx) assert.NoError(t, err) @@ -216,8 +219,10 @@ func TestBroadcastStorageBroadcastSuccess(t *testing.T) { nil, nil, ).Once() - commitWorker, err := storage.AddingBlock(ctx, block, txn) + g, gctx := errgroup.WithContext(ctx) + commitWorker, err := storage.AddingBlock(gctx, g, block, txn) assert.NoError(t, err) + assert.NoError(t, g.Wait()) err = txn.Commit(ctx) assert.NoError(t, err) @@ -370,8 +375,10 @@ func TestBroadcastStorageBroadcastSuccess(t *testing.T) { &types.TransactionIdentifier{Hash: "tx 1"}, nil, ).Once() - commitWorker, err := storage.AddingBlock(ctx, block, txn) + g, gctx := errgroup.WithContext(ctx) + commitWorker, err := storage.AddingBlock(gctx, g, block, txn) assert.NoError(t, err) + assert.NoError(t, g.Wait()) accounts, err := storage.LockedAccounts(ctx, txn) assert.NoError(t, err) @@ -473,8 +480,10 @@ func TestBroadcastStorageBroadcastSuccess(t *testing.T) { &types.TransactionIdentifier{Hash: "tx 2"}, nil, ).Once() - commitWorker, err := storage.AddingBlock(ctx, block, txn) + g, gctx := errgroup.WithContext(ctx) + commitWorker, err := storage.AddingBlock(gctx, g, block, txn) assert.NoError(t, err) + assert.NoError(t, g.Wait()) accounts, err := storage.LockedAccounts(ctx, txn) assert.NoError(t, err) @@ -561,8 +570,10 @@ func TestBroadcastStorageBroadcastSuccess(t *testing.T) { ).Return( nil, ).Once() - commitWorker, err := storage.AddingBlock(ctx, block, txn) + g, gctx := errgroup.WithContext(ctx) + commitWorker, err := storage.AddingBlock(gctx, g, block, txn) assert.NoError(t, err) + assert.NoError(t, g.Wait()) accounts, err := storage.LockedAccounts(ctx, txn) assert.NoError(t, err) @@ -627,8 +638,10 @@ func TestBroadcastStorageBroadcastSuccess(t *testing.T) { ).Return( nil, ).Once() - commitWorker, err := storage.AddingBlock(ctx, block, txn) + g, gctx := errgroup.WithContext(ctx) + commitWorker, err := storage.AddingBlock(gctx, g, block, txn) assert.NoError(t, err) + assert.NoError(t, g.Wait()) accounts, err := storage.LockedAccounts(ctx, txn) assert.NoError(t, err) @@ -866,8 +879,10 @@ func TestBroadcastStorageBroadcastFailure(t *testing.T) { for _, block := range blocks { mockHelper.On("CurrentBlockIdentifier", ctx).Return(block.BlockIdentifier, nil).Once() txn := storage.db.Transaction(ctx) - commitWorker, err := storage.AddingBlock(ctx, block, txn) + g, gctx := errgroup.WithContext(ctx) + commitWorker, err := storage.AddingBlock(gctx, g, block, txn) assert.NoError(t, err) + assert.NoError(t, g.Wait()) err = txn.Commit(ctx) assert.NoError(t, err) @@ -993,8 +1008,10 @@ func TestBroadcastStorageBehindTip(t *testing.T) { for _, block := range blocks[:60] { mockHelper.On("CurrentBlockIdentifier", ctx).Return(block.BlockIdentifier, nil).Once() txn := storage.db.Transaction(ctx) - commitWorker, err := storage.AddingBlock(ctx, block, txn) + g, gctx := errgroup.WithContext(ctx) + commitWorker, err := storage.AddingBlock(gctx, g, block, txn) assert.NoError(t, err) + assert.NoError(t, g.Wait()) err = txn.Commit(ctx) assert.NoError(t, err) @@ -1086,8 +1103,10 @@ func TestBroadcastStorageBehindTip(t *testing.T) { for _, block := range blocks[60:71] { mockHelper.On("CurrentBlockIdentifier", ctx).Return(block.BlockIdentifier, nil).Once() txn := storage.db.Transaction(ctx) - commitWorker, err := storage.AddingBlock(ctx, block, txn) + g, gctx := errgroup.WithContext(ctx) + commitWorker, err := storage.AddingBlock(gctx, g, block, txn) assert.NoError(t, err) + assert.NoError(t, g.Wait()) err = txn.Commit(ctx) assert.NoError(t, err) diff --git a/storage/modules/coin_storage_test.go b/storage/modules/coin_storage_test.go index 76087cd5..127ce443 100644 --- a/storage/modules/coin_storage_test.go +++ b/storage/modules/coin_storage_test.go @@ -20,6 +20,7 @@ import ( "math/big" "testing" + "github.com/neilotoole/errgroup" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -544,9 +545,11 @@ func TestCoinStorage(t *testing.T) { t.Run("add block", func(t *testing.T) { tx := c.db.Transaction(ctx) - commitFunc, err := c.AddingBlock(ctx, coinBlock, tx) + g, gctx := errgroup.WithContext(ctx) + commitFunc, err := c.AddingBlock(gctx, g, coinBlock, tx) assert.Nil(t, commitFunc) assert.NoError(t, err) + assert.NoError(t, g.Wait()) assert.NoError(t, tx.Commit(ctx)) mockHelper.On( @@ -565,9 +568,11 @@ func TestCoinStorage(t *testing.T) { t.Run("add duplicate coin", func(t *testing.T) { tx := c.db.Transaction(ctx) - commitFunc, err := c.AddingBlock(ctx, coinBlock, tx) + g, gctx := errgroup.WithContext(ctx) + commitFunc, err := c.AddingBlock(gctx, g, coinBlock, tx) assert.Nil(t, commitFunc) assert.Error(t, err) + assert.NoError(t, g.Wait()) tx.Discard(ctx) mockHelper.On( @@ -586,9 +591,11 @@ func TestCoinStorage(t *testing.T) { t.Run("add duplicate coin in same block", func(t *testing.T) { tx := c.db.Transaction(ctx) - commitFunc, err := c.AddingBlock(ctx, coinBlockRepeat, tx) + g, gctx := errgroup.WithContext(ctx) + commitFunc, err := c.AddingBlock(gctx, g, coinBlockRepeat, tx) assert.Nil(t, commitFunc) assert.Error(t, err) + assert.NoError(t, g.Wait()) tx.Discard(ctx) mockHelper.On( @@ -607,9 +614,11 @@ func TestCoinStorage(t *testing.T) { t.Run("remove block", func(t *testing.T) { tx := c.db.Transaction(ctx) - commitFunc, err := c.RemovingBlock(ctx, coinBlock, tx) + g, gctx := errgroup.WithContext(ctx) + commitFunc, err := c.RemovingBlock(gctx, g, coinBlock, tx) assert.Nil(t, commitFunc) assert.NoError(t, err) + assert.NoError(t, g.Wait()) assert.NoError(t, tx.Commit(ctx)) mockHelper.On( @@ -641,9 +650,11 @@ func TestCoinStorage(t *testing.T) { t.Run("spend coin", func(t *testing.T) { tx := c.db.Transaction(ctx) - commitFunc, err := c.AddingBlock(ctx, coinBlock, tx) + g, gctx := errgroup.WithContext(ctx) + commitFunc, err := c.AddingBlock(gctx, g, coinBlock, tx) assert.Nil(t, commitFunc) assert.NoError(t, err) + assert.NoError(t, g.Wait()) assert.NoError(t, tx.Commit(ctx)) mockHelper.On( @@ -660,9 +671,11 @@ func TestCoinStorage(t *testing.T) { assert.Equal(t, blockIdentifier, block) tx = c.db.Transaction(ctx) - commitFunc, err = c.AddingBlock(ctx, coinBlock2, tx) + g, gctx = errgroup.WithContext(ctx) + commitFunc, err = c.AddingBlock(gctx, g, coinBlock2, tx) assert.Nil(t, commitFunc) assert.NoError(t, err) + assert.NoError(t, g.Wait()) assert.NoError(t, tx.Commit(ctx)) mockHelper.On( @@ -694,9 +707,11 @@ func TestCoinStorage(t *testing.T) { t.Run("add block with multiple outputs for 1 account", func(t *testing.T) { tx := c.db.Transaction(ctx) - commitFunc, err := c.AddingBlock(ctx, coinBlock3, tx) + g, gctx := errgroup.WithContext(ctx) + commitFunc, err := c.AddingBlock(gctx, g, coinBlock3, tx) assert.Nil(t, commitFunc) assert.NoError(t, err) + assert.NoError(t, g.Wait()) assert.NoError(t, tx.Commit(ctx)) mockHelper.On( @@ -756,15 +771,19 @@ func TestCoinStorage(t *testing.T) { t.Run("remove block that creates and spends single coin", func(t *testing.T) { tx := c.db.Transaction(ctx) - commitFunc, err := c.RemovingBlock(ctx, coinBlock3, tx) + g, gctx := errgroup.WithContext(ctx) + commitFunc, err := c.RemovingBlock(gctx, g, coinBlock3, tx) assert.Nil(t, commitFunc) assert.NoError(t, err) + assert.NoError(t, g.Wait()) assert.NoError(t, tx.Commit(ctx)) tx = c.db.Transaction(ctx) - commitFunc, err = c.AddingBlock(ctx, coinBlock3, tx) + g, gctx = errgroup.WithContext(ctx) + commitFunc, err = c.AddingBlock(gctx, g, coinBlock3, tx) assert.Nil(t, commitFunc) assert.NoError(t, err) + assert.NoError(t, g.Wait()) assert.NoError(t, tx.Commit(ctx)) mockHelper.On( From 65038e6eaacd7418a993239207c4e27f85dab4ba Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Tue, 8 Dec 2020 18:43:09 -0600 Subject: [PATCH 6/6] Fix tests --- storage/modules/balance_storage_test.go | 26 ++++++------ storage/modules/broadcast_storage_test.go | 48 +++++++++++------------ storage/modules/coin_storage_test.go | 4 +- 3 files changed, 39 insertions(+), 39 deletions(-) diff --git a/storage/modules/balance_storage_test.go b/storage/modules/balance_storage_test.go index f912ddbe..fc17ecd9 100644 --- a/storage/modules/balance_storage_test.go +++ b/storage/modules/balance_storage_test.go @@ -1574,9 +1574,10 @@ func TestBlockSyncing(t *testing.T) { t.Run("add block 1", func(t *testing.T) { dbTx := database.Transaction(ctx) + g, gctx := errgroup.WithContext(ctx) mockHelper.On( "AccountBalance", - mock.Anything, + gctx, addr1, curr, b0.BlockIdentifier, @@ -1584,8 +1585,7 @@ func TestBlockSyncing(t *testing.T) { &types.Amount{Value: "1", Currency: curr}, nil, ).Once() - mockHandler.On("AccountsSeen", ctx, dbTx, 1).Return(nil).Once() - g, gctx := errgroup.WithContext(ctx) + mockHandler.On("AccountsSeen", gctx, dbTx, 1).Return(nil).Once() _, err = storage.AddingBlock(gctx, g, b1, dbTx) assert.NoError(t, err) assert.NoError(t, g.Wait()) @@ -1614,9 +1614,10 @@ func TestBlockSyncing(t *testing.T) { assert.NoError(t, err) dbTx := database.Transaction(ctx) + g, gctx := errgroup.WithContext(ctx) mockHelper.On( "AccountBalance", - mock.Anything, + gctx, addr2, curr, b1.BlockIdentifier, @@ -1624,9 +1625,8 @@ func TestBlockSyncing(t *testing.T) { &types.Amount{Value: "0", Currency: curr}, nil, ).Once() - mockHandler.On("AccountsSeen", ctx, dbTx, 1).Return(nil).Once() - mockHandler.On("AccountsReconciled", ctx, dbTx, 1).Return(nil).Once() - g, gctx := errgroup.WithContext(ctx) + mockHandler.On("AccountsSeen", gctx, dbTx, 1).Return(nil).Once() + mockHandler.On("AccountsReconciled", gctx, dbTx, 1).Return(nil).Once() _, err = storage.AddingBlock(gctx, g, b2, dbTx) assert.NoError(t, err) assert.NoError(t, g.Wait()) @@ -1735,9 +1735,10 @@ func TestBlockSyncing(t *testing.T) { t.Run("add block 1", func(t *testing.T) { dbTx := database.Transaction(ctx) + g, gctx := errgroup.WithContext(ctx) mockHelper.On( "AccountBalance", - mock.Anything, + gctx, addr1, curr, b0.BlockIdentifier, @@ -1745,8 +1746,7 @@ func TestBlockSyncing(t *testing.T) { &types.Amount{Value: "1", Currency: curr}, nil, ).Once() - mockHandler.On("AccountsSeen", ctx, dbTx, 1).Return(nil).Once() - g, gctx := errgroup.WithContext(ctx) + mockHandler.On("AccountsSeen", gctx, dbTx, 1).Return(nil).Once() _, err = storage.AddingBlock(gctx, g, b1, dbTx) assert.NoError(t, err) assert.NoError(t, g.Wait()) @@ -1780,9 +1780,10 @@ func TestBlockSyncing(t *testing.T) { t.Run("add block 2a", func(t *testing.T) { dbTx := database.Transaction(ctx) + g, gctx := errgroup.WithContext(ctx) mockHelper.On( "AccountBalance", - mock.Anything, + gctx, addr2, curr, b1.BlockIdentifier, @@ -1790,8 +1791,7 @@ func TestBlockSyncing(t *testing.T) { &types.Amount{Value: "0", Currency: curr}, nil, ).Once() - mockHandler.On("AccountsSeen", ctx, dbTx, 1).Return(nil).Once() - g, gctx := errgroup.WithContext(ctx) + mockHandler.On("AccountsSeen", gctx, dbTx, 1).Return(nil).Once() _, err = storage.AddingBlock(gctx, g, b2a, dbTx) assert.NoError(t, err) assert.NoError(t, g.Wait()) diff --git a/storage/modules/broadcast_storage_test.go b/storage/modules/broadcast_storage_test.go index d616fc5e..b962b0cb 100644 --- a/storage/modules/broadcast_storage_test.go +++ b/storage/modules/broadcast_storage_test.go @@ -209,9 +209,10 @@ func TestBroadcastStorageBroadcastSuccess(t *testing.T) { block := blocks[1] txn := storage.db.Transaction(ctx) + g, gctx := errgroup.WithContext(ctx) mockHelper.On( "FindTransaction", - ctx, + gctx, &types.TransactionIdentifier{Hash: "tx 1"}, txn, ).Return( @@ -219,7 +220,6 @@ func TestBroadcastStorageBroadcastSuccess(t *testing.T) { nil, nil, ).Once() - g, gctx := errgroup.WithContext(ctx) commitWorker, err := storage.AddingBlock(gctx, g, block, txn) assert.NoError(t, err) assert.NoError(t, g.Wait()) @@ -337,9 +337,10 @@ func TestBroadcastStorageBroadcastSuccess(t *testing.T) { mockHelper.On("AtTip", ctx, mock.Anything).Return(true, nil) txn := storage.db.Transaction(ctx) + g, gctx := errgroup.WithContext(ctx) mockHelper.On( "FindTransaction", - ctx, + gctx, &types.TransactionIdentifier{Hash: "tx 1"}, txn, ).Return( @@ -349,7 +350,7 @@ func TestBroadcastStorageBroadcastSuccess(t *testing.T) { ).Once() mockHelper.On( "FindTransaction", - ctx, + gctx, &types.TransactionIdentifier{Hash: "tx 2"}, txn, ).Return( @@ -359,7 +360,7 @@ func TestBroadcastStorageBroadcastSuccess(t *testing.T) { ).Once() mockHandler.On( "TransactionStale", - ctx, + gctx, txn, "broadcast 1", &types.TransactionIdentifier{Hash: "tx 1"}, @@ -375,7 +376,6 @@ func TestBroadcastStorageBroadcastSuccess(t *testing.T) { &types.TransactionIdentifier{Hash: "tx 1"}, nil, ).Once() - g, gctx := errgroup.WithContext(ctx) commitWorker, err := storage.AddingBlock(gctx, g, block, txn) assert.NoError(t, err) assert.NoError(t, g.Wait()) @@ -442,9 +442,10 @@ func TestBroadcastStorageBroadcastSuccess(t *testing.T) { block.Transactions = []*types.Transaction{tx1} txn := storage.db.Transaction(ctx) + g, gctx := errgroup.WithContext(ctx) mockHelper.On( "FindTransaction", - ctx, + gctx, &types.TransactionIdentifier{Hash: "tx 1"}, txn, ).Return( @@ -454,7 +455,7 @@ func TestBroadcastStorageBroadcastSuccess(t *testing.T) { ).Once() mockHelper.On( "FindTransaction", - ctx, + gctx, &types.TransactionIdentifier{Hash: "tx 2"}, txn, ).Return( @@ -464,7 +465,7 @@ func TestBroadcastStorageBroadcastSuccess(t *testing.T) { ).Once() mockHandler.On( "TransactionStale", - ctx, + gctx, txn, "broadcast 2", &types.TransactionIdentifier{Hash: "tx 2"}, @@ -480,7 +481,6 @@ func TestBroadcastStorageBroadcastSuccess(t *testing.T) { &types.TransactionIdentifier{Hash: "tx 2"}, nil, ).Once() - g, gctx := errgroup.WithContext(ctx) commitWorker, err := storage.AddingBlock(gctx, g, block, txn) assert.NoError(t, err) assert.NoError(t, g.Wait()) @@ -539,9 +539,10 @@ func TestBroadcastStorageBroadcastSuccess(t *testing.T) { block.Transactions = []*types.Transaction{tx2} txn := storage.db.Transaction(ctx) + g, gctx := errgroup.WithContext(ctx) mockHelper.On( "FindTransaction", - ctx, + gctx, &types.TransactionIdentifier{Hash: "tx 1"}, txn, ).Return( @@ -551,7 +552,7 @@ func TestBroadcastStorageBroadcastSuccess(t *testing.T) { ).Once() mockHelper.On( "FindTransaction", - ctx, + gctx, &types.TransactionIdentifier{Hash: "tx 2"}, txn, ).Return( @@ -561,7 +562,7 @@ func TestBroadcastStorageBroadcastSuccess(t *testing.T) { ).Once() mockHandler.On( "TransactionConfirmed", - ctx, + gctx, txn, "broadcast 1", blocks[3].BlockIdentifier, @@ -570,7 +571,6 @@ func TestBroadcastStorageBroadcastSuccess(t *testing.T) { ).Return( nil, ).Once() - g, gctx := errgroup.WithContext(ctx) commitWorker, err := storage.AddingBlock(gctx, g, block, txn) assert.NoError(t, err) assert.NoError(t, g.Wait()) @@ -617,9 +617,10 @@ func TestBroadcastStorageBroadcastSuccess(t *testing.T) { block := blocks[5] txn := storage.db.Transaction(ctx) + g, gctx := errgroup.WithContext(ctx) mockHelper.On( "FindTransaction", - ctx, + gctx, &types.TransactionIdentifier{Hash: "tx 2"}, txn, ).Return( @@ -629,7 +630,7 @@ func TestBroadcastStorageBroadcastSuccess(t *testing.T) { ).Once() mockHandler.On( "TransactionConfirmed", - ctx, + gctx, txn, "broadcast 2", blocks[4].BlockIdentifier, @@ -638,7 +639,6 @@ func TestBroadcastStorageBroadcastSuccess(t *testing.T) { ).Return( nil, ).Once() - g, gctx := errgroup.WithContext(ctx) commitWorker, err := storage.AddingBlock(gctx, g, block, txn) assert.NoError(t, err) assert.NoError(t, g.Wait()) @@ -803,7 +803,7 @@ func TestBroadcastStorageBroadcastFailure(t *testing.T) { ) mockHandler.On( "TransactionStale", - ctx, + mock.Anything, mock.Anything, "broadcast 1", &types.TransactionIdentifier{Hash: "tx 1"}, @@ -835,7 +835,7 @@ func TestBroadcastStorageBroadcastFailure(t *testing.T) { ) mockHandler.On( "TransactionStale", - ctx, + mock.Anything, mock.Anything, "broadcast 2", &types.TransactionIdentifier{Hash: "tx 2"}, @@ -846,7 +846,7 @@ func TestBroadcastStorageBroadcastFailure(t *testing.T) { ) mockHandler.On( "BroadcastFailed", - ctx, + mock.Anything, mock.Anything, "broadcast 2", &types.TransactionIdentifier{Hash: "tx 2"}, @@ -858,7 +858,7 @@ func TestBroadcastStorageBroadcastFailure(t *testing.T) { // Never find in block mockHelper.On( "FindTransaction", - ctx, + mock.Anything, &types.TransactionIdentifier{Hash: "tx 1"}, mock.Anything, ).Return( @@ -868,7 +868,7 @@ func TestBroadcastStorageBroadcastFailure(t *testing.T) { ) mockHelper.On( "FindTransaction", - ctx, + mock.Anything, &types.TransactionIdentifier{Hash: "tx 2"}, mock.Anything, ).Return( @@ -1082,7 +1082,7 @@ func TestBroadcastStorageBehindTip(t *testing.T) { // Never find in block mockHelper.On( "FindTransaction", - ctx, + mock.Anything, &types.TransactionIdentifier{Hash: "tx 1"}, mock.Anything, ).Return( @@ -1092,7 +1092,7 @@ func TestBroadcastStorageBehindTip(t *testing.T) { ) mockHelper.On( "FindTransaction", - ctx, + mock.Anything, &types.TransactionIdentifier{Hash: "tx 2"}, mock.Anything, ).Return( diff --git a/storage/modules/coin_storage_test.go b/storage/modules/coin_storage_test.go index 127ce443..86b25f2c 100644 --- a/storage/modules/coin_storage_test.go +++ b/storage/modules/coin_storage_test.go @@ -571,8 +571,8 @@ func TestCoinStorage(t *testing.T) { g, gctx := errgroup.WithContext(ctx) commitFunc, err := c.AddingBlock(gctx, g, coinBlock, tx) assert.Nil(t, commitFunc) - assert.Error(t, err) - assert.NoError(t, g.Wait()) + assert.NoError(t, err) + assert.Error(t, g.Wait()) tx.Discard(ctx) mockHelper.On(