Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix concurrent map access error when tracing blocks #1822

Merged
merged 6 commits into from
Feb 28, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 61 additions & 1 deletion e2e_test/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@ import (
"context"
"errors"
"fmt"
"sync"
"testing"
"time"

"github.com/celo-org/celo-blockchain/common/hexutil"
"github.com/celo-org/celo-blockchain/core/types"
"github.com/celo-org/celo-blockchain/log"
"github.com/celo-org/celo-blockchain/node"
"github.com/celo-org/celo-blockchain/rpc"
"github.com/celo-org/celo-blockchain/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -18,7 +22,10 @@ func init() {
// This statement is commented out but left here since its very useful for
// debugging problems and its non trivial to construct.
//
// log.Root().SetHandler(log.LvlFilterHandler(log.LvlDebug, log.StreamHandler(os.Stdout, log.TerminalFormat(true))))
// log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stdout, log.TerminalFormat(true))))

// This disables all logging which in general we want, because there is a lot
log.Root().SetHandler(log.DiscardHandler())
}

// This test starts a network submits a transaction and waits for the whole
Expand Down Expand Up @@ -189,3 +196,56 @@ func TestStartStopValidators(t *testing.T) {
require.NoError(t, err)

}

// This test was created to reproduce the concurrent map access error in
// https://github.com/celo-org/celo-blockchain/issues/1799
//
// It does this by calling debug_traceBlockByNumber a number of times since the
// trace block code was the source of the concurrent map access.
func TestBlockTracingConcurrentMapAccess(t *testing.T) {
ac := test.AccountConfig(1, 2)
gc, ec, err := test.BuildConfig(ac)
require.NoError(t, err)
network, shutdown, err := test.NewNetwork(ac, gc, ec)
require.NoError(t, err)
defer shutdown()
ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
defer cancel()

n := network[0]

accounts := test.Accounts(ac.DeveloperAccounts(), gc.ChainConfig())

var txs []*types.Transaction
// Send one celo from external account 0 to 1 via node 0.
for i := 0; i < 10; i++ {
tx, err := accounts[0].SendCelo(ctx, accounts[1].Address, 1, n)
require.NoError(t, err)
txs = append(txs, tx)
}

// Wait for the whole network to process the transactions.
err = network.AwaitTransactions(ctx, txs...)
require.NoError(t, err)

lastTx := txs[len(txs)-1]

b := n.Tracker.GetProcessedBlockForTx(lastTx.Hash())

var wg sync.WaitGroup
for i := 1; i < +int(b.NumberU64()); i++ {
wg.Add(1)
num := i
go func() {
defer wg.Done()
c, err := rpc.DialContext(ctx, n.WSEndpoint())
require.NoError(t, err)

var result []interface{}
err = c.CallContext(ctx, &result, "debug_traceBlockByNumber", hexutil.EncodeUint64(uint64(num)))
require.NoError(t, err)
}()

}
wg.Wait()
}
17 changes: 10 additions & 7 deletions eth/tracers/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,19 +529,18 @@ func (api *API) traceBlock(ctx context.Context, block *types.Block, config *Trac
}
blockCtx := core.NewEVMBlockContext(block.Header(), api.chainContext(ctx), nil)
blockHash := block.Hash()
var sysCtx *core.SysContractCallCtx
if api.backend.ChainConfig().IsEspresso(block.Number()) {
sysVmRunner := api.backend.VmRunnerAtHeader(block.Header(), statedb)
sysCtx = core.NewSysContractCallCtx(sysVmRunner)
}
for th := 0; th < threads; th++ {
pend.Add(1)
go func() {
defer pend.Done()
// Fetch and execute the next transaction trace tasks
for task := range jobs {
var sysCtx *core.SysContractCallCtx
vmRunner := api.backend.VmRunnerAtHeader(block.Header(), task.statedb)
if api.backend.ChainConfig().IsEspresso(block.Number()) {
sysCtx = core.NewSysContractCallCtx(vmRunner)
}
msg, _ := txs[task.index].AsMessage(signer, nil)
vmRunner := api.backend.VmRunnerAtHeader(block.Header(), statedb)
txctx := &Context{
BlockHash: blockHash,
TxIndex: task.index,
Expand All @@ -556,6 +555,11 @@ func (api *API) traceBlock(ctx context.Context, block *types.Block, config *Trac
}
}()
}
var sysCtx *core.SysContractCallCtx
vmRunner := api.backend.VmRunnerAtHeader(block.Header(), statedb)
if api.backend.ChainConfig().IsEspresso(block.Number()) {
sysCtx = core.NewSysContractCallCtx(vmRunner)
}
// Feed the transactions into the tracers and return
var failed error
for i, tx := range txs {
Expand All @@ -566,7 +570,6 @@ func (api *API) traceBlock(ctx context.Context, block *types.Block, config *Trac
msg, _ := tx.AsMessage(signer, nil)
statedb.Prepare(tx.Hash(), i)
vmenv := vm.NewEVM(blockCtx, core.NewEVMTxContext(msg), statedb, api.backend.ChainConfig(), vm.Config{})
vmRunner := api.backend.VmRunnerAtHeader(block.Header(), statedb)
if _, err := core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(msg.Gas()), vmRunner, sysCtx); err != nil {
failed = err
break
Expand Down
7 changes: 7 additions & 0 deletions test/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/celo-org/celo-blockchain/crypto"
"github.com/celo-org/celo-blockchain/eth"
"github.com/celo-org/celo-blockchain/eth/downloader"
"github.com/celo-org/celo-blockchain/eth/tracers"
"github.com/celo-org/celo-blockchain/ethclient"
"github.com/celo-org/celo-blockchain/mycelo/env"
"github.com/celo-org/celo-blockchain/mycelo/genesis"
Expand All @@ -35,6 +36,7 @@ import (
)

var (
allModules = []string{"admin", "debug", "web3", "eth", "txpool", "personal", "istanbul", "miner", "net"}
baseNodeConfig *node.Config = &node.Config{
Name: "celo",
Version: params.Version,
Expand All @@ -52,6 +54,8 @@ var (
HTTPHost: "0.0.0.0",
WSHost: "0.0.0.0",
UsePlaintextKeystore: true,
WSModules: allModules,
HTTPModules: allModules,
}

baseEthConfig = &eth.Config{
Expand Down Expand Up @@ -179,6 +183,9 @@ func (n *Node) Start() error {
if err != nil {
return err
}
// This manual step is required to enable tracing, it's messy but this is the
// approach taken by geth in cmd/utils.RegisterEthService.
n.Node.RegisterAPIs(tracers.APIs(n.Eth.APIBackend))

err = n.Node.Start()
if err != nil {
Expand Down