Skip to content

Commit

Permalink
Fix concurrent map access error when tracing blocks (celo-org#1822)
Browse files Browse the repository at this point in the history
Fixes  celo-org#1799

This commit ensures that state.StateDB instances are not
accessed concurrently when tracing blocks and also adds
an e2e test to verify the fix.

It also disables logging from the e2e tests, because the logs
slowed the tests down and cluttered the output of test runs.

It also updates test node instances to ensure that all rpc api
modules are enabled.
  • Loading branch information
piersy authored Feb 28, 2022
1 parent c78a51f commit 503f156
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 8 deletions.
61 changes: 60 additions & 1 deletion e2e_test/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ import (
"errors"
"fmt"
"math/big"
"sync"
"testing"
"time"

"github.com/celo-org/celo-blockchain/common/hexutil"
"github.com/celo-org/celo-blockchain/core/types"
"github.com/celo-org/celo-blockchain/log"
"github.com/celo-org/celo-blockchain/node"
"github.com/celo-org/celo-blockchain/rpc"
"github.com/celo-org/celo-blockchain/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -20,7 +23,10 @@ func init() {
// This statement is commented out but left here since its very useful for
// debugging problems and its non trivial to construct.
//
// log.Root().SetHandler(log.LvlFilterHandler(log.LvlDebug, log.StreamHandler(os.Stdout, log.TerminalFormat(true))))
// log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stdout, log.TerminalFormat(true))))

// This disables all logging which in general we want, because there is a lot
log.Root().SetHandler(log.DiscardHandler())
}

// This test starts a network submits a transaction and waits for the whole
Expand Down Expand Up @@ -192,6 +198,59 @@ func TestStartStopValidators(t *testing.T) {

}

// This test was created to reproduce the concurrent map access error in
// https://github.com/celo-org/celo-blockchain/issues/1799
//
// It does this by calling debug_traceBlockByNumber a number of times since the
// trace block code was the source of the concurrent map access.
func TestBlockTracingConcurrentMapAccess(t *testing.T) {
ac := test.AccountConfig(1, 2)
gc, ec, err := test.BuildConfig(ac)
require.NoError(t, err)
network, shutdown, err := test.NewNetwork(ac, gc, ec)
require.NoError(t, err)
defer shutdown()
ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
defer cancel()

n := network[0]

accounts := test.Accounts(ac.DeveloperAccounts(), gc.ChainConfig())

var txs []*types.Transaction
// Send one celo from external account 0 to 1 via node 0.
for i := 0; i < 10; i++ {
tx, err := accounts[0].SendCelo(ctx, accounts[1].Address, 1, n)
require.NoError(t, err)
txs = append(txs, tx)
}

// Wait for the whole network to process the transactions.
err = network.AwaitTransactions(ctx, txs...)
require.NoError(t, err)

lastTx := txs[len(txs)-1]

b := n.Tracker.GetProcessedBlockForTx(lastTx.Hash())

var wg sync.WaitGroup
for i := 1; i < +int(b.NumberU64()); i++ {
wg.Add(1)
num := i
go func() {
defer wg.Done()
c, err := rpc.DialContext(ctx, n.WSEndpoint())
require.NoError(t, err)

var result []interface{}
err = c.CallContext(ctx, &result, "debug_traceBlockByNumber", hexutil.EncodeUint64(uint64(num)))
require.NoError(t, err)
}()

}
wg.Wait()
}

type rpcCustomTransaction struct {
BlockNumber *hexutil.Big `json:"blockNumber"`
GasPrice *hexutil.Big `json:"gasPrice"`
Expand Down
17 changes: 10 additions & 7 deletions eth/tracers/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,19 +529,18 @@ func (api *API) traceBlock(ctx context.Context, block *types.Block, config *Trac
}
blockCtx := core.NewEVMBlockContext(block.Header(), api.chainContext(ctx), nil)
blockHash := block.Hash()
var sysCtx *core.SysContractCallCtx
if api.backend.ChainConfig().IsEspresso(block.Number()) {
sysVmRunner := api.backend.VmRunnerAtHeader(block.Header(), statedb)
sysCtx = core.NewSysContractCallCtx(sysVmRunner)
}
for th := 0; th < threads; th++ {
pend.Add(1)
go func() {
defer pend.Done()
// Fetch and execute the next transaction trace tasks
for task := range jobs {
var sysCtx *core.SysContractCallCtx
vmRunner := api.backend.VmRunnerAtHeader(block.Header(), task.statedb)
if api.backend.ChainConfig().IsEspresso(block.Number()) {
sysCtx = core.NewSysContractCallCtx(vmRunner)
}
msg, _ := txs[task.index].AsMessage(signer, nil)
vmRunner := api.backend.VmRunnerAtHeader(block.Header(), statedb)
txctx := &Context{
BlockHash: blockHash,
TxIndex: task.index,
Expand All @@ -556,6 +555,11 @@ func (api *API) traceBlock(ctx context.Context, block *types.Block, config *Trac
}
}()
}
var sysCtx *core.SysContractCallCtx
vmRunner := api.backend.VmRunnerAtHeader(block.Header(), statedb)
if api.backend.ChainConfig().IsEspresso(block.Number()) {
sysCtx = core.NewSysContractCallCtx(vmRunner)
}
// Feed the transactions into the tracers and return
var failed error
for i, tx := range txs {
Expand All @@ -566,7 +570,6 @@ func (api *API) traceBlock(ctx context.Context, block *types.Block, config *Trac
msg, _ := tx.AsMessage(signer, nil)
statedb.Prepare(tx.Hash(), i)
vmenv := vm.NewEVM(blockCtx, core.NewEVMTxContext(msg), statedb, api.backend.ChainConfig(), vm.Config{})
vmRunner := api.backend.VmRunnerAtHeader(block.Header(), statedb)
if _, err := core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(msg.Gas()), vmRunner, sysCtx); err != nil {
failed = err
break
Expand Down
7 changes: 7 additions & 0 deletions test/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/celo-org/celo-blockchain/crypto"
"github.com/celo-org/celo-blockchain/eth"
"github.com/celo-org/celo-blockchain/eth/downloader"
"github.com/celo-org/celo-blockchain/eth/tracers"
"github.com/celo-org/celo-blockchain/ethclient"
"github.com/celo-org/celo-blockchain/mycelo/env"
"github.com/celo-org/celo-blockchain/mycelo/genesis"
Expand All @@ -35,6 +36,7 @@ import (
)

var (
allModules = []string{"admin", "debug", "web3", "eth", "txpool", "personal", "istanbul", "miner", "net"}
baseNodeConfig *node.Config = &node.Config{
Name: "celo",
Version: params.Version,
Expand All @@ -52,6 +54,8 @@ var (
HTTPHost: "0.0.0.0",
WSHost: "0.0.0.0",
UsePlaintextKeystore: true,
WSModules: allModules,
HTTPModules: allModules,
}

baseEthConfig = &eth.Config{
Expand Down Expand Up @@ -179,6 +183,9 @@ func (n *Node) Start() error {
if err != nil {
return err
}
// This manual step is required to enable tracing, it's messy but this is the
// approach taken by geth in cmd/utils.RegisterEthService.
n.Node.RegisterAPIs(tracers.APIs(n.Eth.APIBackend))

err = n.Node.Start()
if err != nil {
Expand Down

0 comments on commit 503f156

Please sign in to comment.