diff --git a/.github/release.env b/.github/release.env index 37739d157e..2034c1d3fa 100644 --- a/.github/release.env +++ b/.github/release.env @@ -1,2 +1,2 @@ -MAINNET_FILE_URL="https://github.com/binance-chain/bsc/releases/download/v1.1.6/mainnet.zip" -TESTNET_FILE_URL="https://github.com/binance-chain/bsc/releases/download/v1.1.6/testnet.zip" +MAINNET_FILE_URL="https://github.com/binance-chain/bsc/releases/download/v1.1.7/mainnet.zip" +TESTNET_FILE_URL="https://github.com/binance-chain/bsc/releases/download/v1.1.7/testnet.zip" diff --git a/CHANGELOG.md b/CHANGELOG.md index 4d5bf0877f..4dfa55dfb8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,20 @@ # Changelog +## v1.1.8 +FEATURES +* [\#668](https://github.com/binance-chain/bsc/pull/668) implement State Verification && Snapshot Commit pipeline +* [\#581](https://github.com/binance-chain/bsc/pull/581) implement geth native trace +* [\#543](https://github.com/binance-chain/bsc/pull/543) implement offline block prune tools + +IMPROVEMENT +* [\#704](https://github.com/binance-chain/bsc/pull/704) prefetch state by applying the transactions within one block +* [\#713](https://github.com/binance-chain/bsc/pull/713) add ARM binaries for release pipeline + +BUGFIX +* [\#667](https://github.com/binance-chain/bsc/pull/667) trie: reject deletions when verifying range proofs #667 +* [\#643](https://github.com/binance-chain/bsc/pull/643) add timeout for stopping p2p server to fix can not gracefully shutdown issue +* [\#740](https://github.com/binance-chain/bsc/pull/740) update discord link which won't expire + ## v1.1.7 BUGFIX diff --git a/PULL_REQUEST_TEMPLATE b/PULL_REQUEST_TEMPLATE index 69f29fecf8..acf0007d89 100644 --- a/PULL_REQUEST_TEMPLATE +++ b/PULL_REQUEST_TEMPLATE @@ -15,17 +15,3 @@ add an example CLI or API response... Notable changes: * add each change in a bullet point here * ... - -### Preflight checks - -- [ ] build passed (`make build`) -- [ ] tests passed (`make test`) -- [ ] manual transaction test passed - -### Already reviewed by - -... - -### Related issues - -... reference related issue #'s here ... diff --git a/README.md b/README.md index b74ec4d0a5..bc7f7a85ef 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ Binance Smart Chain starts its development based on go-ethereum fork. So you may [![API Reference]( https://camo.githubusercontent.com/915b7be44ada53c290eb157634330494ebe3e30a/68747470733a2f2f676f646f632e6f72672f6769746875622e636f6d2f676f6c616e672f6764646f3f7374617475732e737667 )](https://pkg.go.dev/github.com/ethereum/go-ethereum?tab=doc) -[![Discord](https://img.shields.io/badge/discord-join%20chat-blue.svg)](https://discord.gg/5Z3C3SdxDw) +[![Discord](https://img.shields.io/badge/discord-join%20chat-blue.svg)](https://discord.gg/z2VpC455eU) But from that baseline of EVM compatible, Binance Smart Chain introduces a system of 21 validators with Proof of Staked Authority (PoSA) consensus that can support short block time and lower fees. The most bonded validator candidates of staking will become validators and produce blocks. The double-sign detection and other slashing logic guarantee security, stability, and chain finality. @@ -203,7 +203,7 @@ from anyone on the internet, and are grateful for even the smallest of fixes! If you'd like to contribute to bsc, please fork, fix, commit and send a pull request for the maintainers to review and merge into the main code base. If you wish to submit -more complex changes though, please check up with the core devs first on [our discord channel](https://discord.gg/5Z3C3SdxDw) +more complex changes though, please check up with the core devs first on [our discord channel](https://discord.gg/z2VpC455eU) to ensure those changes are in line with the general philosophy of the project and/or get some early feedback which can make both your efforts much lighter as well as our review and merge procedures quick and simple. diff --git a/cmd/evm/internal/t8ntool/execution.go b/cmd/evm/internal/t8ntool/execution.go index 2a9426540a..aadebdd439 100644 --- a/cmd/evm/internal/t8ntool/execution.go +++ b/cmd/evm/internal/t8ntool/execution.go @@ -223,7 +223,9 @@ func (pre *Prestate) Apply(vmConfig vm.Config, chainConfig *params.ChainConfig, statedb.AddBalance(pre.Env.Coinbase, minerReward) } // Commit block - root, _, err := statedb.Commit(chainConfig.IsEIP158(vmContext.BlockNumber)) + statedb.Finalise(chainConfig.IsEIP158(vmContext.BlockNumber)) + statedb.AccountsIntermediateRoot() + root, _, err := statedb.Commit(nil) if err != nil { fmt.Fprintf(os.Stderr, "Could not commit state: %v", err) return nil, nil, NewError(ErrorEVM, fmt.Errorf("could not commit state: %v", err)) @@ -252,7 +254,9 @@ func MakePreState(db ethdb.Database, accounts core.GenesisAlloc) *state.StateDB } } // Commit and re-open to start with a clean state. - root, _, _ := statedb.Commit(false) + statedb.Finalise(false) + statedb.AccountsIntermediateRoot() + root, _, _ := statedb.Commit(nil) statedb, _ = state.New(root, sdb, nil) return statedb } diff --git a/cmd/evm/runner.go b/cmd/evm/runner.go index f522233e71..fd18270182 100644 --- a/cmd/evm/runner.go +++ b/cmd/evm/runner.go @@ -268,7 +268,9 @@ func runCmd(ctx *cli.Context) error { output, leftOverGas, stats, err := timedExec(bench, execFunc) if ctx.GlobalBool(DumpFlag.Name) { - statedb.Commit(true) + statedb.Finalise(true) + statedb.AccountsIntermediateRoot() + statedb.Commit(nil) statedb.IntermediateRoot(true) fmt.Println(string(statedb.Dump(false, false, true))) } diff --git a/cmd/evm/staterunner.go b/cmd/evm/staterunner.go index bfc243b471..c373772fbe 100644 --- a/cmd/evm/staterunner.go +++ b/cmd/evm/staterunner.go @@ -101,7 +101,8 @@ func stateTestCmd(ctx *cli.Context) error { _, state, err := test.Run(st, cfg, false) // print state root for evmlab tracing if ctx.GlobalBool(MachineFlag.Name) && state != nil { - fmt.Fprintf(os.Stderr, "{\"stateRoot\": \"%x\"}\n", state.IntermediateRoot(false)) + root := state.IntermediateRoot(false) + fmt.Fprintf(os.Stderr, "{\"stateRoot\": \"%x\"}\n", root) } if err != nil { // Test failed, mark as so and dump any state to aid debugging diff --git a/cmd/geth/main.go b/cmd/geth/main.go index ae1dd6fbe9..db00cf19e1 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -72,6 +72,7 @@ var ( utils.DirectBroadcastFlag, utils.DisableSnapProtocolFlag, utils.DiffSyncFlag, + utils.PipeCommitFlag, utils.RangeLimitFlag, utils.USBFlag, utils.SmartCardDaemonPathFlag, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index ae9b6eea32..a407098e61 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -127,6 +127,10 @@ var ( Usage: "Enable diffy sync, Please note that enable diffsync will improve the syncing speed, " + "but will degrade the security to light client level", } + PipeCommitFlag = cli.BoolFlag{ + Name: "pipecommit", + Usage: "Enable MPT pipeline commit, it will improve syncing performance. It is an experimental feature(default is false)", + } RangeLimitFlag = cli.BoolFlag{ Name: "rangelimit", Usage: "Enable 5000 blocks limit for range query", @@ -1636,6 +1640,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { if ctx.GlobalIsSet(DiffSyncFlag.Name) { cfg.DiffSync = ctx.GlobalBool(DiffSyncFlag.Name) } + if ctx.GlobalIsSet(PipeCommitFlag.Name) { + cfg.PipeCommit = ctx.GlobalBool(PipeCommitFlag.Name) + } if ctx.GlobalIsSet(RangeLimitFlag.Name) { cfg.RangeLimit = ctx.GlobalBool(RangeLimitFlag.Name) } diff --git a/consensus/clique/clique.go b/consensus/clique/clique.go index dfec81f6ad..2c8094ca3d 100644 --- a/consensus/clique/clique.go +++ b/consensus/clique/clique.go @@ -560,7 +560,11 @@ func (c *Clique) Finalize(chain consensus.ChainHeaderReader, header *types.Heade func (c *Clique) FinalizeAndAssemble(chain consensus.ChainHeaderReader, header *types.Header, state *state.StateDB, txs []*types.Transaction, uncles []*types.Header, receipts []*types.Receipt) (*types.Block, []*types.Receipt, error) { // No block rewards in PoA, so the state remains as is and uncles are dropped + var err error header.Root = state.IntermediateRoot(chain.Config().IsEIP158(header.Number)) + if err != nil { + return nil, nil, err + } header.UncleHash = types.CalcUncleHash(nil) // Assemble and return the final block for sealing diff --git a/core/block_validator.go b/core/block_validator.go index bf2fb40260..3ea6615b61 100644 --- a/core/block_validator.go +++ b/core/block_validator.go @@ -18,6 +18,7 @@ package core import ( "fmt" + "time" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/core/state" @@ -26,6 +27,8 @@ import ( "github.com/ethereum/go-ethereum/trie" ) +const badBlockCacheExpire = 30 * time.Second + // BlockValidator is responsible for validating block headers, uncles and // processed state. // @@ -54,6 +57,9 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error { if v.bc.HasBlockAndState(block.Hash(), block.NumberU64()) { return ErrKnownBlock } + if v.bc.isCachedBadBlock(block) { + return ErrKnownBadBlock + } // Header validity is known at this point, check the uncles and transactions header := block.Header() if err := v.engine.VerifyUncles(v.bc, block); err != nil { @@ -100,7 +106,7 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error { // transition, such as amount of used gas, the receipt roots and the state root // itself. ValidateState returns a database batch if the validation was a success // otherwise nil and an error is returned. -func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateDB, receipts types.Receipts, usedGas uint64) error { +func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateDB, receipts types.Receipts, usedGas uint64, skipHeavyVerify bool) error { header := block.Header() if block.GasUsed() != usedGas { return fmt.Errorf("invalid gas used (remote: %d local: %d)", block.GasUsed(), usedGas) @@ -119,17 +125,26 @@ func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateD receiptSha := types.DeriveSha(receipts, trie.NewStackTrie(nil)) if receiptSha != header.ReceiptHash { return fmt.Errorf("invalid receipt root hash (remote: %x local: %x)", header.ReceiptHash, receiptSha) - } else { - return nil } + return nil }, - func() error { + } + if skipHeavyVerify { + validateFuns = append(validateFuns, func() error { + if err := statedb.WaitPipeVerification(); err != nil { + return err + } + statedb.Finalise(v.config.IsEIP158(header.Number)) + statedb.AccountsIntermediateRoot() + return nil + }) + } else { + validateFuns = append(validateFuns, func() error { if root := statedb.IntermediateRoot(v.config.IsEIP158(header.Number)); header.Root != root { return fmt.Errorf("invalid merkle root (remote: %x local: %x)", header.Root, root) - } else { - return nil } - }, + return nil + }) } validateRes := make(chan error, len(validateFuns)) for _, f := range validateFuns { diff --git a/core/blockchain.go b/core/blockchain.go index 4aae215508..6e1a174dbb 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -79,7 +79,8 @@ var ( blockReorgDropMeter = metrics.NewRegisteredMeter("chain/reorg/drop", nil) blockReorgInvalidatedTx = metrics.NewRegisteredMeter("chain/reorg/invalidTx", nil) - errInsertionInterrupted = errors.New("insertion is interrupted") + errInsertionInterrupted = errors.New("insertion is interrupted") + errStateRootVerificationFailed = errors.New("state root verification failed") ) const ( @@ -89,6 +90,7 @@ const ( diffLayerRLPCacheLimit = 256 receiptsCacheLimit = 10000 txLookupCacheLimit = 1024 + maxBadBlockLimit = 16 maxFutureBlocks = 256 maxTimeFutureBlocks = 30 maxBeyondBlocks = 2048 @@ -101,6 +103,8 @@ const ( maxDiffForkDist = 11 // Maximum allowed backward distance from the chain head maxDiffLimitForBroadcast = 128 // Maximum number of unique diff layers a peer may have broadcasted + rewindBadBlockInterval = 1 * time.Second + // BlockChainVersion ensures that an incompatible database forces a resync from scratch. // // Changelog: @@ -180,10 +184,11 @@ type BlockChain struct { chainConfig *params.ChainConfig // Chain & network configuration cacheConfig *CacheConfig // Cache configuration for pruning - db ethdb.Database // Low level persistent database to store final content in - snaps *snapshot.Tree // Snapshot tree for fast trie leaf access - triegc *prque.Prque // Priority queue mapping block numbers to tries to gc - gcproc time.Duration // Accumulates canonical block processing for trie dumping + db ethdb.Database // Low level persistent database to store final content in + snaps *snapshot.Tree // Snapshot tree for fast trie leaf access + triegc *prque.Prque // Priority queue mapping block numbers to tries to gc + gcproc time.Duration // Accumulates canonical block processing for trie dumping + commitLock sync.Mutex // CommitLock is used to protect above field from being modified concurrently // txLookupLimit is the maximum number of blocks from head whose tx indices // are reserved: @@ -216,6 +221,7 @@ type BlockChain struct { blockCache *lru.Cache // Cache for the most recent entire blocks txLookupCache *lru.Cache // Cache for the most recent transaction lookup data. futureBlocks *lru.Cache // future blocks are blocks added for later processing + badBlockCache *lru.Cache // Cache for the blocks that failed to pass MPT root verification // trusted diff layers diffLayerCache *lru.Cache // Cache for the diffLayers @@ -242,6 +248,7 @@ type BlockChain struct { validator Validator // Block and state validator interface processor Processor // Block transaction processor interface vmConfig vm.Config + pipeCommit bool shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block. terminateInsert func(common.Hash, uint64) bool // Testing hook used to terminate ancient receipt chain insertion. @@ -265,6 +272,8 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par receiptsCache, _ := lru.New(receiptsCacheLimit) blockCache, _ := lru.New(blockCacheLimit) txLookupCache, _ := lru.New(txLookupCacheLimit) + badBlockCache, _ := lru.New(maxBadBlockLimit) + futureBlocks, _ := lru.New(maxFutureBlocks) diffLayerCache, _ := lru.New(diffLayerCacheLimit) diffLayerRLPCache, _ := lru.New(diffLayerRLPCacheLimit) @@ -287,6 +296,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par bodyRLPCache: bodyRLPCache, receiptsCache: receiptsCache, blockCache: blockCache, + badBlockCache: badBlockCache, diffLayerCache: diffLayerCache, diffLayerRLPCache: diffLayerRLPCache, txLookupCache: txLookupCache, @@ -465,7 +475,10 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par go bc.trustedDiffLayerLoop() } go bc.untrustedDiffLayerPruneLoop() - + if bc.pipeCommit { + // check current block and rewind invalid one + go bc.rewindInvalidHeaderBlockLoop() + } return bc, nil } @@ -596,6 +609,25 @@ func (bc *BlockChain) SetHead(head uint64) error { return err } +func (bc *BlockChain) tryRewindBadBlocks() { + bc.chainmu.Lock() + defer bc.chainmu.Unlock() + block := bc.CurrentBlock() + snaps := bc.snaps + // Verified and Result is false + if snaps != nil && snaps.Snapshot(block.Root()) != nil && + snaps.Snapshot(block.Root()).Verified() && !snaps.Snapshot(block.Root()).WaitAndGetVerifyRes() { + // Rewind by one block + log.Warn("current block verified failed, rewind to its parent", "height", block.NumberU64(), "hash", block.Hash()) + bc.futureBlocks.Remove(block.Hash()) + bc.badBlockCache.Add(block.Hash(), time.Now()) + bc.diffLayerCache.Remove(block.Hash()) + bc.diffLayerRLPCache.Remove(block.Hash()) + bc.reportBlock(block, nil, errStateRootVerificationFailed) + bc.setHeadBeyondRoot(block.NumberU64()-1, common.Hash{}) + } +} + // SetHeadBeyondRoot rewinds the local chain to a new head with the extra condition // that the rewind must pass the specified state root. This method is meant to be // used when rewinding with snapshots enabled to ensure that we go back further than @@ -607,7 +639,10 @@ func (bc *BlockChain) SetHead(head uint64) error { func (bc *BlockChain) SetHeadBeyondRoot(head uint64, root common.Hash) (uint64, error) { bc.chainmu.Lock() defer bc.chainmu.Unlock() + return bc.setHeadBeyondRoot(head, root) +} +func (bc *BlockChain) setHeadBeyondRoot(head uint64, root common.Hash) (uint64, error) { // Track the block number of the requested root hash var rootNumber uint64 // (no root == always 0) @@ -1075,6 +1110,12 @@ func (bc *BlockChain) HasFastBlock(hash common.Hash, number uint64) bool { // HasState checks if state trie is fully present in the database or not. func (bc *BlockChain) HasState(hash common.Hash) bool { + if bc.pipeCommit && bc.snaps != nil { + // If parent snap is pending on verification, treat it as state exist + if s := bc.snaps.Snapshot(hash); s != nil && !s.Verified() { + return true + } + } if bc.stateCache.NoTries() { return bc.snaps != nil && bc.snaps.Snapshot(hash) != nil } @@ -1692,8 +1733,78 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. } wg.Done() }() + + tryCommitTrieDB := func() error { + bc.commitLock.Lock() + defer bc.commitLock.Unlock() + + triedb := bc.stateCache.TrieDB() + // If we're running an archive node, always flush + if bc.cacheConfig.TrieDirtyDisabled { + err := triedb.Commit(block.Root(), false, nil) + if err != nil { + return err + } + } else { + // Full but not archive node, do proper garbage collection + triedb.Reference(block.Root(), common.Hash{}) // metadata reference to keep trie alive + bc.triegc.Push(block.Root(), -int64(block.NumberU64())) + + if current := block.NumberU64(); current > bc.triesInMemory { + // If we exceeded our memory allowance, flush matured singleton nodes to disk + var ( + nodes, imgs = triedb.Size() + limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024 + ) + if nodes > limit || imgs > 4*1024*1024 { + triedb.Cap(limit - ethdb.IdealBatchSize) + } + // Find the next state trie we need to commit + chosen := current - bc.triesInMemory + + // If we exceeded out time allowance, flush an entire trie to disk + if bc.gcproc > bc.cacheConfig.TrieTimeLimit { + canWrite := true + if posa, ok := bc.engine.(consensus.PoSA); ok { + if !posa.EnoughDistance(bc, block.Header()) { + canWrite = false + } + } + if canWrite { + // If the header is missing (canonical chain behind), we're reorging a low + // diff sidechain. Suspend committing until this operation is completed. + header := bc.GetHeaderByNumber(chosen) + if header == nil { + log.Warn("Reorg in progress, trie commit postponed", "number", chosen) + } else { + // If we're exceeding limits but haven't reached a large enough memory gap, + // warn the user that the system is becoming unstable. + if chosen < lastWrite+bc.triesInMemory && bc.gcproc >= 2*bc.cacheConfig.TrieTimeLimit { + log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", bc.cacheConfig.TrieTimeLimit, "optimum", float64(chosen-lastWrite)/float64(bc.triesInMemory)) + } + // Flush an entire trie and restart the counters + triedb.Commit(header.Root, true, nil) + lastWrite = chosen + bc.gcproc = 0 + } + } + } + // Garbage collect anything below our required write retention + for !bc.triegc.Empty() { + root, number := bc.triegc.Pop() + if uint64(-number) > chosen { + bc.triegc.Push(root, number) + break + } + go triedb.Dereference(root.(common.Hash)) + } + } + } + return nil + } + // Commit all cached state changes into underlying memory database. - root, diffLayer, err := state.Commit(bc.chainConfig.IsEIP158(block.Number())) + _, diffLayer, err := state.Commit(bc.tryRewindBadBlocks, tryCommitTrieDB) if err != nil { return NonStatTy, err } @@ -1707,69 +1818,9 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. go bc.cacheDiffLayer(diffLayer, false) } - triedb := bc.stateCache.TrieDB() - // If we're running an archive node, always flush - if bc.cacheConfig.TrieDirtyDisabled { - if err := triedb.Commit(root, false, nil); err != nil { - return NonStatTy, err - } - } else { - // Full but not archive node, do proper garbage collection - triedb.Reference(root, common.Hash{}) // metadata reference to keep trie alive - bc.triegc.Push(root, -int64(block.NumberU64())) - - if current := block.NumberU64(); current > bc.triesInMemory { - // If we exceeded our memory allowance, flush matured singleton nodes to disk - var ( - nodes, imgs = triedb.Size() - limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024 - ) - if nodes > limit || imgs > 4*1024*1024 { - triedb.Cap(limit - ethdb.IdealBatchSize) - } - // Find the next state trie we need to commit - chosen := current - bc.triesInMemory - - // If we exceeded out time allowance, flush an entire trie to disk - if bc.gcproc > bc.cacheConfig.TrieTimeLimit { - canWrite := true - if posa, ok := bc.engine.(consensus.PoSA); ok { - if !posa.EnoughDistance(bc, block.Header()) { - canWrite = false - } - } - if canWrite { - // If the header is missing (canonical chain behind), we're reorging a low - // diff sidechain. Suspend committing until this operation is completed. - header := bc.GetHeaderByNumber(chosen) - if header == nil { - log.Warn("Reorg in progress, trie commit postponed", "number", chosen) - } else { - // If we're exceeding limits but haven't reached a large enough memory gap, - // warn the user that the system is becoming unstable. - if chosen < lastWrite+bc.triesInMemory && bc.gcproc >= 2*bc.cacheConfig.TrieTimeLimit { - log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", bc.cacheConfig.TrieTimeLimit, "optimum", float64(chosen-lastWrite)/float64(bc.triesInMemory)) - } - // Flush an entire trie and restart the counters - triedb.Commit(header.Root, true, nil) - lastWrite = chosen - bc.gcproc = 0 - } - } - } - // Garbage collect anything below our required write retention - for !bc.triegc.Empty() { - root, number := bc.triegc.Pop() - if uint64(-number) > chosen { - bc.triegc.Push(root, number) - break - } - go triedb.Dereference(root.(common.Hash)) - } - } - } wg.Wait() + // If the total difficulty is higher than our known, add it to the canonical chain // Second clause in the if statement reduces the vulnerability to selfish mining. // Please refer to http://www.cs.cornell.edu/~ie53/publications/btcProcFC.pdf @@ -2097,6 +2148,10 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er } //Process block using the parent state as reference point substart := time.Now() + if bc.pipeCommit { + statedb.EnablePipeCommit() + } + statedb.SetExpectedStateRoot(block.Root()) statedb, receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig) atomic.StoreUint32(&followupInterrupt, 1) activeState = statedb @@ -2117,7 +2172,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er // Validate the state using the default validator substart = time.Now() if !statedb.IsLightProcessed() { - if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil { + if err := bc.validator.ValidateState(block, statedb, receipts, usedGas, bc.pipeCommit); err != nil { log.Error("validate state failed", "error", err) bc.reportBlock(block, receipts, err) return it.index, err @@ -2532,6 +2587,19 @@ func (bc *BlockChain) update() { } } +func (bc *BlockChain) rewindInvalidHeaderBlockLoop() { + recheck := time.NewTicker(rewindBadBlockInterval) + defer recheck.Stop() + for { + select { + case <-recheck.C: + bc.tryRewindBadBlocks() + case <-bc.quit: + return + } + } +} + func (bc *BlockChain) trustedDiffLayerLoop() { recheck := time.NewTicker(diffLayerFreezerRecheckInterval) bc.wg.Add(1) @@ -2873,6 +2941,18 @@ func (bc *BlockChain) maintainTxIndex(ancients uint64) { } } +func (bc *BlockChain) isCachedBadBlock(block *types.Block) bool { + if timeAt, exist := bc.badBlockCache.Get(block.Hash()); exist { + putAt := timeAt.(time.Time) + if time.Since(putAt) >= badBlockCacheExpire { + bc.badBlockCache.Remove(block.Hash()) + return false + } + return true + } + return false +} + // reportBlock logs a bad block error. func (bc *BlockChain) reportBlock(block *types.Block, receipts types.Receipts, err error) { rawdb.WriteBadBlock(bc.db, block) @@ -3043,6 +3123,11 @@ func EnableLightProcessor(bc *BlockChain) *BlockChain { return bc } +func EnablePipelineCommit(bc *BlockChain) *BlockChain { + bc.pipeCommit = true + return bc +} + func EnablePersistDiff(limit uint64) BlockChainOption { return func(chain *BlockChain) *BlockChain { chain.diffLayerFreezerBlockLimit = limit diff --git a/core/blockchain_diff_test.go b/core/blockchain_diff_test.go index 3a190ff2f2..ab5af2815c 100644 --- a/core/blockchain_diff_test.go +++ b/core/blockchain_diff_test.go @@ -319,6 +319,9 @@ func TestProcessDiffLayer(t *testing.T) { lightBackend.Chain().HandleDiffLayer(diff, "testpid", true) } _, err := lightBackend.chain.insertChain([]*types.Block{block}, true) + if err != nil { + t.Errorf("failed to insert block %v", err) + } if checks, exist := checkBlocks[i]; exist { for _, check := range checks.txs { s, _ := lightBackend.Chain().Snapshots().Snapshot(block.Root()).Storage(crypto.Keccak256Hash((*check.to)[:]), check.slot) @@ -327,9 +330,6 @@ func TestProcessDiffLayer(t *testing.T) { } } } - if err != nil { - t.Errorf("failed to insert block %v", err) - } } currentBlock := lightBackend.chain.CurrentBlock() nextBlock := fullBackend.chain.GetBlockByNumber(currentBlock.NumberU64() + 1) @@ -370,7 +370,7 @@ func TestFreezeDiffLayer(t *testing.T) { // Wait for the buffer to be zero. } // Minus one empty block. - if fullBackend.chain.diffQueue.Size() != blockNum-1 { + if fullBackend.chain.diffQueue.Size() > blockNum-1 && fullBackend.chain.diffQueue.Size() < blockNum-2 { t.Errorf("size of diff queue is wrong, expected: %d, get: %d", blockNum-1, fullBackend.chain.diffQueue.Size()) } diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 8078db774f..50d02e0acc 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -43,7 +43,8 @@ import ( // So we can deterministically seed different blockchains var ( canonicalSeed = 1 - forkSeed = 2 + forkSeed1 = 2 + forkSeed2 = 3 TestTriesInMemory = 128 ) @@ -51,14 +52,18 @@ var ( // newCanonical creates a chain database, and injects a deterministic canonical // chain. Depending on the full flag, if creates either a full block chain or a // header only chain. -func newCanonical(engine consensus.Engine, n int, full bool) (ethdb.Database, *BlockChain, error) { +func newCanonical(engine consensus.Engine, n int, full, pipeline bool) (ethdb.Database, *BlockChain, error) { var ( db = rawdb.NewMemoryDatabase() genesis = new(Genesis).MustCommit(db) ) // Initialize a fresh chain with only a genesis block - blockchain, _ := NewBlockChain(db, nil, params.AllEthashProtocolChanges, engine, vm.Config{}, nil, nil) + var ops []BlockChainOption + if pipeline { + ops = append(ops, EnablePipelineCommit) + } + blockchain, _ := NewBlockChain(db, nil, params.AllEthashProtocolChanges, engine, vm.Config{}, nil, nil, ops...) // Create and inject the requested chain if n == 0 { return db, blockchain, nil @@ -76,9 +81,53 @@ func newCanonical(engine consensus.Engine, n int, full bool) (ethdb.Database, *B } // Test fork of length N starting from block i -func testFork(t *testing.T, blockchain *BlockChain, i, n int, full bool, comparator func(td1, td2 *big.Int)) { +func testInvalidStateRootBlockImport(t *testing.T, blockchain *BlockChain, i, n int, pipeline bool) { // Copy old chain up to #i into a new db - db, blockchain2, err := newCanonical(ethash.NewFaker(), i, full) + db, blockchain2, err := newCanonical(ethash.NewFaker(), i, true, pipeline) + if err != nil { + t.Fatal("could not make new canonical in testFork", err) + } + defer blockchain2.Stop() + + // Assert the chains have the same header/block at #i + hash1 := blockchain.GetBlockByNumber(uint64(i)).Hash() + hash2 := blockchain2.GetBlockByNumber(uint64(i)).Hash() + if hash1 != hash2 { + t.Errorf("chain content mismatch at %d: have hash %v, want hash %v", i, hash2, hash1) + } + // Extend the newly created chain + blockChainB := makeBlockChain(blockchain2.CurrentBlock(), n, ethash.NewFaker(), db, forkSeed1) + for idx, block := range blockChainB { + block.SetRoot(common.Hash{0: byte(forkSeed1), 19: byte(idx)}) + } + previousBlock := blockchain.CurrentBlock() + // Sanity check that the forked chain can be imported into the original + if _, err := blockchain.InsertChain(blockChainB); err == nil { + t.Fatalf("failed to report insert error") + } + + time.Sleep(2 * rewindBadBlockInterval) + latestBlock := blockchain.CurrentBlock() + if latestBlock.Hash() != previousBlock.Hash() || latestBlock.NumberU64() != previousBlock.NumberU64() { + t.Fatalf("rewind do not take effect") + } + db, blockchain3, err := newCanonical(ethash.NewFaker(), i, true, pipeline) + if err != nil { + t.Fatal("could not make new canonical in testFork", err) + } + defer blockchain3.Stop() + + blockChainC := makeBlockChain(blockchain3.CurrentBlock(), n, ethash.NewFaker(), db, forkSeed2) + + if _, err := blockchain.InsertChain(blockChainC); err != nil { + t.Fatalf("failed to insert forking chain: %v", err) + } +} + +// Test fork of length N starting from block i +func testFork(t *testing.T, blockchain *BlockChain, i, n int, full, pipeline bool, comparator func(td1, td2 *big.Int)) { + // Copy old chain up to #i into a new db + db, blockchain2, err := newCanonical(ethash.NewFaker(), i, full, pipeline) if err != nil { t.Fatal("could not make new canonical in testFork", err) } @@ -102,12 +151,12 @@ func testFork(t *testing.T, blockchain *BlockChain, i, n int, full bool, compara headerChainB []*types.Header ) if full { - blockChainB = makeBlockChain(blockchain2.CurrentBlock(), n, ethash.NewFaker(), db, forkSeed) + blockChainB = makeBlockChain(blockchain2.CurrentBlock(), n, ethash.NewFaker(), db, forkSeed1) if _, err := blockchain2.InsertChain(blockChainB); err != nil { t.Fatalf("failed to insert forking chain: %v", err) } } else { - headerChainB = makeHeaderChain(blockchain2.CurrentHeader(), n, ethash.NewFaker(), db, forkSeed) + headerChainB = makeHeaderChain(blockchain2.CurrentHeader(), n, ethash.NewFaker(), db, forkSeed1) if _, err := blockchain2.InsertHeaderChain(headerChainB, 1); err != nil { t.Fatalf("failed to insert forking chain: %v", err) } @@ -117,7 +166,7 @@ func testFork(t *testing.T, blockchain *BlockChain, i, n int, full bool, compara if full { tdPre = blockchain.GetTdByHash(blockchain.CurrentBlock().Hash()) - if err := testBlockChainImport(blockChainB, blockchain); err != nil { + if err := testBlockChainImport(blockChainB, pipeline, blockchain); err != nil { t.Fatalf("failed to import forked block chain: %v", err) } tdPost = blockchain.GetTdByHash(blockChainB[len(blockChainB)-1].Hash()) @@ -134,7 +183,7 @@ func testFork(t *testing.T, blockchain *BlockChain, i, n int, full bool, compara // testBlockChainImport tries to process a chain of blocks, writing them into // the database if successful. -func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error { +func testBlockChainImport(chain types.Blocks, pipelineCommit bool, blockchain *BlockChain) error { for _, block := range chain { // Try and process the block err := blockchain.engine.VerifyHeader(blockchain, block.Header(), true) @@ -151,12 +200,16 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error { if err != nil { return err } + statedb.SetExpectedStateRoot(block.Root()) + if pipelineCommit { + statedb.EnablePipeCommit() + } statedb, receipts, _, usedGas, err := blockchain.processor.Process(block, statedb, vm.Config{}) if err != nil { blockchain.reportBlock(block, receipts, err) return err } - err = blockchain.validator.ValidateState(block, statedb, receipts, usedGas) + err = blockchain.validator.ValidateState(block, statedb, receipts, usedGas, pipelineCommit) if err != nil { blockchain.reportBlock(block, receipts, err) return err @@ -164,7 +217,9 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error { blockchain.chainmu.Lock() rawdb.WriteTd(blockchain.db, block.Hash(), block.NumberU64(), new(big.Int).Add(block.Difficulty(), blockchain.GetTdByHash(block.ParentHash()))) rawdb.WriteBlock(blockchain.db, block) - statedb.Commit(false) + statedb.Finalise(false) + statedb.AccountsIntermediateRoot() + statedb.Commit(nil) blockchain.chainmu.Unlock() } return nil @@ -187,8 +242,22 @@ func testHeaderChainImport(chain []*types.Header, blockchain *BlockChain) error return nil } +func TestBlockImportVerification(t *testing.T) { + length := 5 + + // Make first chain starting from genesis + _, processor, err := newCanonical(ethash.NewFaker(), length, true, true) + if err != nil { + t.Fatalf("failed to make new canonical chain: %v", err) + } + defer processor.Stop() + // Start fork from current height + processor = EnablePipelineCommit(processor) + testInvalidStateRootBlockImport(t, processor, length, 10, true) +} + func TestLastBlock(t *testing.T) { - _, blockchain, err := newCanonical(ethash.NewFaker(), 0, true) + _, blockchain, err := newCanonical(ethash.NewFaker(), 0, true, false) if err != nil { t.Fatalf("failed to create pristine chain: %v", err) } @@ -205,14 +274,20 @@ func TestLastBlock(t *testing.T) { // Tests that given a starting canonical chain of a given size, it can be extended // with various length chains. -func TestExtendCanonicalHeaders(t *testing.T) { testExtendCanonical(t, false) } -func TestExtendCanonicalBlocks(t *testing.T) { testExtendCanonical(t, true) } +func TestExtendCanonicalHeaders(t *testing.T) { + testExtendCanonical(t, false, false) -func testExtendCanonical(t *testing.T, full bool) { +} +func TestExtendCanonicalBlocks(t *testing.T) { + testExtendCanonical(t, true, false) + testExtendCanonical(t, true, true) +} + +func testExtendCanonical(t *testing.T, full, pipeline bool) { length := 5 // Make first chain starting from genesis - _, processor, err := newCanonical(ethash.NewFaker(), length, full) + _, processor, err := newCanonical(ethash.NewFaker(), length, full, pipeline) if err != nil { t.Fatalf("failed to make new canonical chain: %v", err) } @@ -225,22 +300,25 @@ func testExtendCanonical(t *testing.T, full bool) { } } // Start fork from current height - testFork(t, processor, length, 1, full, better) - testFork(t, processor, length, 2, full, better) - testFork(t, processor, length, 5, full, better) - testFork(t, processor, length, 10, full, better) + testFork(t, processor, length, 1, full, pipeline, better) + testFork(t, processor, length, 2, full, pipeline, better) + testFork(t, processor, length, 5, full, pipeline, better) + testFork(t, processor, length, 10, full, pipeline, better) } // Tests that given a starting canonical chain of a given size, creating shorter // forks do not take canonical ownership. -func TestShorterForkHeaders(t *testing.T) { testShorterFork(t, false) } -func TestShorterForkBlocks(t *testing.T) { testShorterFork(t, true) } +func TestShorterForkHeaders(t *testing.T) { testShorterFork(t, false, false) } +func TestShorterForkBlocks(t *testing.T) { + testShorterFork(t, true, false) + testShorterFork(t, true, true) +} -func testShorterFork(t *testing.T, full bool) { +func testShorterFork(t *testing.T, full, pipeline bool) { length := 10 // Make first chain starting from genesis - _, processor, err := newCanonical(ethash.NewFaker(), length, full) + _, processor, err := newCanonical(ethash.NewFaker(), length, full, pipeline) if err != nil { t.Fatalf("failed to make new canonical chain: %v", err) } @@ -253,24 +331,30 @@ func testShorterFork(t *testing.T, full bool) { } } // Sum of numbers must be less than `length` for this to be a shorter fork - testFork(t, processor, 0, 3, full, worse) - testFork(t, processor, 0, 7, full, worse) - testFork(t, processor, 1, 1, full, worse) - testFork(t, processor, 1, 7, full, worse) - testFork(t, processor, 5, 3, full, worse) - testFork(t, processor, 5, 4, full, worse) + testFork(t, processor, 0, 3, full, pipeline, worse) + testFork(t, processor, 0, 7, full, pipeline, worse) + testFork(t, processor, 1, 1, full, pipeline, worse) + testFork(t, processor, 1, 7, full, pipeline, worse) + testFork(t, processor, 5, 3, full, pipeline, worse) + testFork(t, processor, 5, 4, full, pipeline, worse) } // Tests that given a starting canonical chain of a given size, creating longer // forks do take canonical ownership. -func TestLongerForkHeaders(t *testing.T) { testLongerFork(t, false) } -func TestLongerForkBlocks(t *testing.T) { testLongerFork(t, true) } +func TestLongerForkHeaders(t *testing.T) { + testLongerFork(t, false, false) +} +func TestLongerForkBlocks(t *testing.T) { + testLongerFork(t, true, false) + testLongerFork(t, true, true) + +} -func testLongerFork(t *testing.T, full bool) { +func testLongerFork(t *testing.T, full, pipeline bool) { length := 10 // Make first chain starting from genesis - _, processor, err := newCanonical(ethash.NewFaker(), length, full) + _, processor, err := newCanonical(ethash.NewFaker(), length, full, pipeline) if err != nil { t.Fatalf("failed to make new canonical chain: %v", err) } @@ -283,24 +367,28 @@ func testLongerFork(t *testing.T, full bool) { } } // Sum of numbers must be greater than `length` for this to be a longer fork - testFork(t, processor, 0, 11, full, better) - testFork(t, processor, 0, 15, full, better) - testFork(t, processor, 1, 10, full, better) - testFork(t, processor, 1, 12, full, better) - testFork(t, processor, 5, 6, full, better) - testFork(t, processor, 5, 8, full, better) + testFork(t, processor, 0, 11, full, pipeline, better) + testFork(t, processor, 0, 15, full, pipeline, better) + testFork(t, processor, 1, 10, full, pipeline, better) + testFork(t, processor, 1, 12, full, pipeline, better) + testFork(t, processor, 5, 6, full, pipeline, better) + testFork(t, processor, 5, 8, full, pipeline, better) } // Tests that given a starting canonical chain of a given size, creating equal // forks do take canonical ownership. -func TestEqualForkHeaders(t *testing.T) { testEqualFork(t, false) } -func TestEqualForkBlocks(t *testing.T) { testEqualFork(t, true) } +func TestEqualForkHeaders(t *testing.T) { testEqualFork(t, false, false) } +func TestEqualForkBlocks(t *testing.T) { + testEqualFork(t, true, true) + testEqualFork(t, true, false) -func testEqualFork(t *testing.T, full bool) { +} + +func testEqualFork(t *testing.T, full, pipeline bool) { length := 10 // Make first chain starting from genesis - _, processor, err := newCanonical(ethash.NewFaker(), length, full) + _, processor, err := newCanonical(ethash.NewFaker(), length, full, pipeline) if err != nil { t.Fatalf("failed to make new canonical chain: %v", err) } @@ -313,21 +401,24 @@ func testEqualFork(t *testing.T, full bool) { } } // Sum of numbers must be equal to `length` for this to be an equal fork - testFork(t, processor, 0, 10, full, equal) - testFork(t, processor, 1, 9, full, equal) - testFork(t, processor, 2, 8, full, equal) - testFork(t, processor, 5, 5, full, equal) - testFork(t, processor, 6, 4, full, equal) - testFork(t, processor, 9, 1, full, equal) + testFork(t, processor, 0, 10, full, pipeline, equal) + testFork(t, processor, 1, 9, full, pipeline, equal) + testFork(t, processor, 2, 8, full, pipeline, equal) + testFork(t, processor, 5, 5, full, pipeline, equal) + testFork(t, processor, 6, 4, full, pipeline, equal) + testFork(t, processor, 9, 1, full, pipeline, equal) } // Tests that chains missing links do not get accepted by the processor. -func TestBrokenHeaderChain(t *testing.T) { testBrokenChain(t, false) } -func TestBrokenBlockChain(t *testing.T) { testBrokenChain(t, true) } +func TestBrokenHeaderChain(t *testing.T) { testBrokenChain(t, false, false) } +func TestBrokenBlockChain(t *testing.T) { + testBrokenChain(t, true, false) + testBrokenChain(t, true, true) +} -func testBrokenChain(t *testing.T, full bool) { +func testBrokenChain(t *testing.T, full, pipeline bool) { // Make chain starting from genesis - db, blockchain, err := newCanonical(ethash.NewFaker(), 10, full) + db, blockchain, err := newCanonical(ethash.NewFaker(), 10, full, pipeline) if err != nil { t.Fatalf("failed to make new canonical chain: %v", err) } @@ -335,12 +426,12 @@ func testBrokenChain(t *testing.T, full bool) { // Create a forked chain, and try to insert with a missing link if full { - chain := makeBlockChain(blockchain.CurrentBlock(), 5, ethash.NewFaker(), db, forkSeed)[1:] - if err := testBlockChainImport(chain, blockchain); err == nil { + chain := makeBlockChain(blockchain.CurrentBlock(), 5, ethash.NewFaker(), db, forkSeed1)[1:] + if err := testBlockChainImport(chain, pipeline, blockchain); err == nil { t.Errorf("broken block chain not reported") } } else { - chain := makeHeaderChain(blockchain.CurrentHeader(), 5, ethash.NewFaker(), db, forkSeed)[1:] + chain := makeHeaderChain(blockchain.CurrentHeader(), 5, ethash.NewFaker(), db, forkSeed1)[1:] if err := testHeaderChainImport(chain, blockchain); err == nil { t.Errorf("broken header chain not reported") } @@ -349,19 +440,25 @@ func testBrokenChain(t *testing.T, full bool) { // Tests that reorganising a long difficult chain after a short easy one // overwrites the canonical numbers and links in the database. -func TestReorgLongHeaders(t *testing.T) { testReorgLong(t, false) } -func TestReorgLongBlocks(t *testing.T) { testReorgLong(t, true) } +func TestReorgLongHeaders(t *testing.T) { testReorgLong(t, false, false) } +func TestReorgLongBlocks(t *testing.T) { + testReorgLong(t, true, false) + testReorgLong(t, true, true) +} -func testReorgLong(t *testing.T, full bool) { - testReorg(t, []int64{0, 0, -9}, []int64{0, 0, 0, -9}, 393280, full) +func testReorgLong(t *testing.T, full, pipeline bool) { + testReorg(t, []int64{0, 0, -9}, []int64{0, 0, 0, -9}, 393280, full, pipeline) } // Tests that reorganising a short difficult chain after a long easy one // overwrites the canonical numbers and links in the database. -func TestReorgShortHeaders(t *testing.T) { testReorgShort(t, false) } -func TestReorgShortBlocks(t *testing.T) { testReorgShort(t, true) } +func TestReorgShortHeaders(t *testing.T) { testReorgShort(t, false, false) } +func TestReorgShortBlocks(t *testing.T) { + testReorgShort(t, true, false) + testReorgShort(t, true, true) +} -func testReorgShort(t *testing.T, full bool) { +func testReorgShort(t *testing.T, full, pipeline bool) { // Create a long easy chain vs. a short heavy one. Due to difficulty adjustment // we need a fairly long chain of blocks with different difficulties for a short // one to become heavyer than a long one. The 96 is an empirical value. @@ -373,12 +470,12 @@ func testReorgShort(t *testing.T, full bool) { for i := 0; i < len(diff); i++ { diff[i] = -9 } - testReorg(t, easy, diff, 12615120, full) + testReorg(t, easy, diff, 12615120, full, pipeline) } -func testReorg(t *testing.T, first, second []int64, td int64, full bool) { +func testReorg(t *testing.T, first, second []int64, td int64, full, pipeline bool) { // Create a pristine chain and database - db, blockchain, err := newCanonical(ethash.NewFaker(), 0, full) + db, blockchain, err := newCanonical(ethash.NewFaker(), 0, full, pipeline) if err != nil { t.Fatalf("failed to create pristine chain: %v", err) } @@ -444,12 +541,16 @@ func testReorg(t *testing.T, first, second []int64, td int64, full bool) { } // Tests that the insertion functions detect banned hashes. -func TestBadHeaderHashes(t *testing.T) { testBadHashes(t, false) } -func TestBadBlockHashes(t *testing.T) { testBadHashes(t, true) } +func TestBadHeaderHashes(t *testing.T) { testBadHashes(t, false, false) } +func TestBadBlockHashes(t *testing.T) { + testBadHashes(t, true, true) + testBadHashes(t, true, false) + +} -func testBadHashes(t *testing.T, full bool) { +func testBadHashes(t *testing.T, full, pipeline bool) { // Create a pristine chain and database - db, blockchain, err := newCanonical(ethash.NewFaker(), 0, full) + db, blockchain, err := newCanonical(ethash.NewFaker(), 0, full, pipeline) if err != nil { t.Fatalf("failed to create pristine chain: %v", err) } @@ -478,12 +579,16 @@ func testBadHashes(t *testing.T, full bool) { // Tests that bad hashes are detected on boot, and the chain rolled back to a // good state prior to the bad hash. -func TestReorgBadHeaderHashes(t *testing.T) { testReorgBadHashes(t, false) } -func TestReorgBadBlockHashes(t *testing.T) { testReorgBadHashes(t, true) } +func TestReorgBadHeaderHashes(t *testing.T) { testReorgBadHashes(t, false, false) } +func TestReorgBadBlockHashes(t *testing.T) { + testReorgBadHashes(t, true, false) + testReorgBadHashes(t, true, true) -func testReorgBadHashes(t *testing.T, full bool) { +} + +func testReorgBadHashes(t *testing.T, full, pipeline bool) { // Create a pristine chain and database - db, blockchain, err := newCanonical(ethash.NewFaker(), 0, full) + db, blockchain, err := newCanonical(ethash.NewFaker(), 0, full, pipeline) if err != nil { t.Fatalf("failed to create pristine chain: %v", err) } @@ -533,13 +638,16 @@ func testReorgBadHashes(t *testing.T, full bool) { } // Tests chain insertions in the face of one entity containing an invalid nonce. -func TestHeadersInsertNonceError(t *testing.T) { testInsertNonceError(t, false) } -func TestBlocksInsertNonceError(t *testing.T) { testInsertNonceError(t, true) } +func TestHeadersInsertNonceError(t *testing.T) { testInsertNonceError(t, false, false) } +func TestBlocksInsertNonceError(t *testing.T) { + testInsertNonceError(t, true, false) + testInsertNonceError(t, true, true) +} -func testInsertNonceError(t *testing.T, full bool) { +func testInsertNonceError(t *testing.T, full, pipeline bool) { for i := 1; i < 25 && !t.Failed(); i++ { // Create a pristine chain and database - db, blockchain, err := newCanonical(ethash.NewFaker(), 0, full) + db, blockchain, err := newCanonical(ethash.NewFaker(), 0, full, pipeline) if err != nil { t.Fatalf("failed to create pristine chain: %v", err) } @@ -1212,7 +1320,7 @@ done: // Tests if the canonical block can be fetched from the database during chain insertion. func TestCanonicalBlockRetrieval(t *testing.T) { - _, blockchain, err := newCanonical(ethash.NewFaker(), 0, true) + _, blockchain, err := newCanonical(ethash.NewFaker(), 0, true, false) if err != nil { t.Fatalf("failed to create pristine chain: %v", err) } diff --git a/core/chain_makers.go b/core/chain_makers.go index d8e3ee012f..0e3f9256e2 100644 --- a/core/chain_makers.go +++ b/core/chain_makers.go @@ -223,7 +223,7 @@ func GenerateChain(config *params.ChainConfig, parent *types.Block, engine conse block, _, _ := b.engine.FinalizeAndAssemble(chainreader, b.header, statedb, b.txs, b.uncles, b.receipts) // Write state changes to db - root, _, err := statedb.Commit(config.IsEIP158(b.header.Number)) + root, _, err := statedb.Commit(nil) if err != nil { panic(fmt.Sprintf("state write error: %v", err)) } @@ -254,9 +254,9 @@ func makeHeader(chain consensus.ChainReader, parent *types.Block, state *state.S } else { time = parent.Time() + 10 // block time is fixed at 10 seconds } - + root := state.IntermediateRoot(chain.Config().IsEIP158(parent.Number())) return &types.Header{ - Root: state.IntermediateRoot(chain.Config().IsEIP158(parent.Number())), + Root: root, ParentHash: parent.Hash(), Coinbase: parent.Coinbase(), Difficulty: engine.CalcDifficulty(chain, time, &types.Header{ diff --git a/core/error.go b/core/error.go index 1fbd0d599b..0830a699fe 100644 --- a/core/error.go +++ b/core/error.go @@ -34,6 +34,9 @@ var ( // ErrDiffLayerNotFound is returned when diff layer not found. ErrDiffLayerNotFound = errors.New("diff layer not found") + + // ErrKnownBadBlock is return when the block is a known bad block + ErrKnownBadBlock = errors.New("already known bad block") ) // List of evm-call-message pre-checking errors. All state transition messages will diff --git a/core/genesis.go b/core/genesis.go index 9303522947..94bb06dd77 100644 --- a/core/genesis.go +++ b/core/genesis.go @@ -298,7 +298,7 @@ func (g *Genesis) ToBlock(db ethdb.Database) *types.Block { if g.Difficulty == nil { head.Difficulty = params.GenesisDifficulty } - statedb.Commit(false) + statedb.Commit(nil) statedb.Database().TrieDB().Commit(root, true, nil) return types.NewBlock(head, nil, nil, nil, trie.NewStackTrie(nil)) diff --git a/core/state/database.go b/core/state/database.go index 5f07470d25..dd114dc6ad 100644 --- a/core/state/database.go +++ b/core/state/database.go @@ -277,6 +277,9 @@ func (db *cachingDB) Purge() { // CopyTrie returns an independent copy of the given trie. func (db *cachingDB) CopyTrie(t Trie) Trie { + if t == nil { + return nil + } switch t := t.(type) { case *trie.SecureTrie: return t.Copy() diff --git a/core/state/pruner/pruner.go b/core/state/pruner/pruner.go index a599415aa0..6d89d6b78a 100644 --- a/core/state/pruner/pruner.go +++ b/core/state/pruner/pruner.go @@ -232,7 +232,7 @@ func pruneAll(maindb ethdb.Database, g *core.Genesis) error { } } root := statedb.IntermediateRoot(false) - statedb.Commit(false) + statedb.Commit(nil) statedb.Database().TrieDB().Commit(root, true, nil) log.Info("State pruning successful", "pruned", size, "elapsed", common.PrettyDuration(time.Since(start))) return nil diff --git a/core/state/snapshot/difflayer.go b/core/state/snapshot/difflayer.go index c0f0dab568..65b2729d9c 100644 --- a/core/state/snapshot/difflayer.go +++ b/core/state/snapshot/difflayer.go @@ -118,6 +118,9 @@ type diffLayer struct { storageList map[common.Hash][]common.Hash // List of storage slots for iterated retrievals, one per account. Any existing lists are sorted if non-nil storageData map[common.Hash]map[common.Hash][]byte // Keyed storage slots for direct retrieval. one per account (nil means deleted) + verifiedCh chan struct{} // the difflayer is verified when verifiedCh is nil or closed + valid bool // mark the difflayer is valid or not. + diffed *bloomfilter.Filter // Bloom filter tracking all the diffed items up to the disk layer lock sync.RWMutex @@ -168,7 +171,7 @@ func (h storageBloomHasher) Sum64() uint64 { // newDiffLayer creates a new diff on top of an existing snapshot, whether that's a low // level persistent database or a hierarchical diff already. -func newDiffLayer(parent snapshot, root common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) *diffLayer { +func newDiffLayer(parent snapshot, root common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte, verified chan struct{}) *diffLayer { // Create the new layer with some pre-allocated data segments dl := &diffLayer{ parent: parent, @@ -177,6 +180,7 @@ func newDiffLayer(parent snapshot, root common.Hash, destructs map[common.Hash]s accountData: accounts, storageData: storage, storageList: make(map[common.Hash][]common.Hash), + verifiedCh: verified, } switch parent := parent.(type) { case *diskLayer: @@ -256,6 +260,32 @@ func (dl *diffLayer) Root() common.Hash { return dl.root } +// WaitAndGetVerifyRes will wait until the diff layer been verified and return the verification result +func (dl *diffLayer) WaitAndGetVerifyRes() bool { + if dl.verifiedCh == nil { + return true + } + <-dl.verifiedCh + return dl.valid +} + +func (dl *diffLayer) MarkValid() { + dl.valid = true +} + +// Represent whether the difflayer is been verified, does not means it is a valid or invalid difflayer +func (dl *diffLayer) Verified() bool { + if dl.verifiedCh == nil { + return true + } + select { + case <-dl.verifiedCh: + return true + default: + return false + } +} + // Parent returns the subsequent layer of a diff layer. func (dl *diffLayer) Parent() snapshot { return dl.parent @@ -423,8 +453,8 @@ func (dl *diffLayer) storage(accountHash, storageHash common.Hash, depth int) ([ // Update creates a new layer on top of the existing snapshot diff tree with // the specified data items. -func (dl *diffLayer) Update(blockRoot common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) *diffLayer { - return newDiffLayer(dl, blockRoot, destructs, accounts, storage) +func (dl *diffLayer) Update(blockRoot common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte, verified chan struct{}) *diffLayer { + return newDiffLayer(dl, blockRoot, destructs, accounts, storage, verified) } // flatten pushes all data from this point downwards, flattening everything into diff --git a/core/state/snapshot/difflayer_test.go b/core/state/snapshot/difflayer_test.go index 919af5fa86..5311a0d689 100644 --- a/core/state/snapshot/difflayer_test.go +++ b/core/state/snapshot/difflayer_test.go @@ -79,11 +79,11 @@ func TestMergeBasics(t *testing.T) { } } // Add some (identical) layers on top - parent := newDiffLayer(emptyLayer(), common.Hash{}, copyDestructs(destructs), copyAccounts(accounts), copyStorage(storage)) - child := newDiffLayer(parent, common.Hash{}, copyDestructs(destructs), copyAccounts(accounts), copyStorage(storage)) - child = newDiffLayer(child, common.Hash{}, copyDestructs(destructs), copyAccounts(accounts), copyStorage(storage)) - child = newDiffLayer(child, common.Hash{}, copyDestructs(destructs), copyAccounts(accounts), copyStorage(storage)) - child = newDiffLayer(child, common.Hash{}, copyDestructs(destructs), copyAccounts(accounts), copyStorage(storage)) + parent := newDiffLayer(emptyLayer(), common.Hash{}, copyDestructs(destructs), copyAccounts(accounts), copyStorage(storage), nil) + child := newDiffLayer(parent, common.Hash{}, copyDestructs(destructs), copyAccounts(accounts), copyStorage(storage), nil) + child = newDiffLayer(child, common.Hash{}, copyDestructs(destructs), copyAccounts(accounts), copyStorage(storage), nil) + child = newDiffLayer(child, common.Hash{}, copyDestructs(destructs), copyAccounts(accounts), copyStorage(storage), nil) + child = newDiffLayer(child, common.Hash{}, copyDestructs(destructs), copyAccounts(accounts), copyStorage(storage), nil) // And flatten merged := (child.flatten()).(*diffLayer) @@ -151,13 +151,13 @@ func TestMergeDelete(t *testing.T) { } } // Add some flipAccs-flopping layers on top - parent := newDiffLayer(emptyLayer(), common.Hash{}, flipDrops(), flipAccs(), storage) - child := parent.Update(common.Hash{}, flopDrops(), flopAccs(), storage) - child = child.Update(common.Hash{}, flipDrops(), flipAccs(), storage) - child = child.Update(common.Hash{}, flopDrops(), flopAccs(), storage) - child = child.Update(common.Hash{}, flipDrops(), flipAccs(), storage) - child = child.Update(common.Hash{}, flopDrops(), flopAccs(), storage) - child = child.Update(common.Hash{}, flipDrops(), flipAccs(), storage) + parent := newDiffLayer(emptyLayer(), common.Hash{}, flipDrops(), flipAccs(), storage, nil) + child := parent.Update(common.Hash{}, flopDrops(), flopAccs(), storage, nil) + child = child.Update(common.Hash{}, flipDrops(), flipAccs(), storage, nil) + child = child.Update(common.Hash{}, flopDrops(), flopAccs(), storage, nil) + child = child.Update(common.Hash{}, flipDrops(), flipAccs(), storage, nil) + child = child.Update(common.Hash{}, flopDrops(), flopAccs(), storage, nil) + child = child.Update(common.Hash{}, flipDrops(), flipAccs(), storage, nil) if data, _ := child.Account(h1); data == nil { t.Errorf("last diff layer: expected %x account to be non-nil", h1) @@ -209,7 +209,7 @@ func TestInsertAndMerge(t *testing.T) { accounts = make(map[common.Hash][]byte) storage = make(map[common.Hash]map[common.Hash][]byte) ) - parent = newDiffLayer(emptyLayer(), common.Hash{}, destructs, accounts, storage) + parent = newDiffLayer(emptyLayer(), common.Hash{}, destructs, accounts, storage, nil) } { var ( @@ -220,7 +220,7 @@ func TestInsertAndMerge(t *testing.T) { accounts[acc] = randomAccount() storage[acc] = make(map[common.Hash][]byte) storage[acc][slot] = []byte{0x01} - child = newDiffLayer(parent, common.Hash{}, destructs, accounts, storage) + child = newDiffLayer(parent, common.Hash{}, destructs, accounts, storage, nil) } // And flatten merged := (child.flatten()).(*diffLayer) @@ -256,7 +256,7 @@ func BenchmarkSearch(b *testing.B) { for i := 0; i < 10000; i++ { accounts[randomHash()] = randomAccount() } - return newDiffLayer(parent, common.Hash{}, destructs, accounts, storage) + return newDiffLayer(parent, common.Hash{}, destructs, accounts, storage, nil) } var layer snapshot layer = emptyLayer() @@ -298,7 +298,7 @@ func BenchmarkSearchSlot(b *testing.B) { accStorage[randomHash()] = value storage[accountKey] = accStorage } - return newDiffLayer(parent, common.Hash{}, destructs, accounts, storage) + return newDiffLayer(parent, common.Hash{}, destructs, accounts, storage, nil) } var layer snapshot layer = emptyLayer() @@ -336,7 +336,7 @@ func BenchmarkFlatten(b *testing.B) { } storage[accountKey] = accStorage } - return newDiffLayer(parent, common.Hash{}, destructs, accounts, storage) + return newDiffLayer(parent, common.Hash{}, destructs, accounts, storage, nil) } b.ResetTimer() for i := 0; i < b.N; i++ { @@ -386,7 +386,7 @@ func BenchmarkJournal(b *testing.B) { } storage[accountKey] = accStorage } - return newDiffLayer(parent, common.Hash{}, destructs, accounts, storage) + return newDiffLayer(parent, common.Hash{}, destructs, accounts, storage, nil) } layer := snapshot(new(diskLayer)) for i := 1; i < 128; i++ { diff --git a/core/state/snapshot/disklayer.go b/core/state/snapshot/disklayer.go index 7cbf6e293d..c1de41782c 100644 --- a/core/state/snapshot/disklayer.go +++ b/core/state/snapshot/disklayer.go @@ -49,6 +49,16 @@ func (dl *diskLayer) Root() common.Hash { return dl.root } +func (dl *diskLayer) WaitAndGetVerifyRes() bool { + return true +} + +func (dl *diskLayer) MarkValid() {} + +func (dl *diskLayer) Verified() bool { + return true +} + // Parent always returns nil as there's no layer below the disk. func (dl *diskLayer) Parent() snapshot { return nil @@ -161,6 +171,6 @@ func (dl *diskLayer) Storage(accountHash, storageHash common.Hash) ([]byte, erro // Update creates a new layer on top of the existing snapshot diff tree with // the specified data items. Note, the maps are retained by the method to avoid // copying everything. -func (dl *diskLayer) Update(blockHash common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) *diffLayer { - return newDiffLayer(dl, blockHash, destructs, accounts, storage) +func (dl *diskLayer) Update(blockHash common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte, verified chan struct{}) *diffLayer { + return newDiffLayer(dl, blockHash, destructs, accounts, storage, verified) } diff --git a/core/state/snapshot/disklayer_test.go b/core/state/snapshot/disklayer_test.go index 362edba90d..689ed38773 100644 --- a/core/state/snapshot/disklayer_test.go +++ b/core/state/snapshot/disklayer_test.go @@ -134,7 +134,7 @@ func TestDiskMerge(t *testing.T) { conModCache: {conModCacheSlot: reverse(conModCacheSlot[:])}, conDelNoCache: {conDelNoCacheSlot: nil}, conDelCache: {conDelCacheSlot: nil}, - }); err != nil { + }, nil); err != nil { t.Fatalf("failed to update snapshot tree: %v", err) } if err := snaps.Cap(diffRoot, 0); err != nil { @@ -357,7 +357,7 @@ func TestDiskPartialMerge(t *testing.T) { conModCache: {conModCacheSlot: reverse(conModCacheSlot[:])}, conDelNoCache: {conDelNoCacheSlot: nil}, conDelCache: {conDelCacheSlot: nil}, - }); err != nil { + }, nil); err != nil { t.Fatalf("test %d: failed to update snapshot tree: %v", i, err) } if err := snaps.Cap(diffRoot, 0); err != nil { @@ -468,7 +468,7 @@ func TestDiskGeneratorPersistence(t *testing.T) { // Modify or delete some accounts, flatten everything onto disk if err := snaps.update(diffRoot, baseRoot, nil, map[common.Hash][]byte{ accTwo: accTwo[:], - }, nil); err != nil { + }, nil, nil); err != nil { t.Fatalf("failed to update snapshot tree: %v", err) } if err := snaps.Cap(diffRoot, 0); err != nil { @@ -488,7 +488,7 @@ func TestDiskGeneratorPersistence(t *testing.T) { accThree: accThree.Bytes(), }, map[common.Hash]map[common.Hash][]byte{ accThree: {accThreeSlot: accThreeSlot.Bytes()}, - }); err != nil { + }, nil); err != nil { t.Fatalf("failed to update snapshot tree: %v", err) } diskLayer := snaps.layers[snaps.diskRoot()].(*diskLayer) diff --git a/core/state/snapshot/iterator_test.go b/core/state/snapshot/iterator_test.go index 2a27b01577..3ffaff32ed 100644 --- a/core/state/snapshot/iterator_test.go +++ b/core/state/snapshot/iterator_test.go @@ -53,7 +53,7 @@ func TestAccountIteratorBasics(t *testing.T) { } } // Add some (identical) layers on top - diffLayer := newDiffLayer(emptyLayer(), common.Hash{}, copyDestructs(destructs), copyAccounts(accounts), copyStorage(storage)) + diffLayer := newDiffLayer(emptyLayer(), common.Hash{}, copyDestructs(destructs), copyAccounts(accounts), copyStorage(storage), nil) it := diffLayer.AccountIterator(common.Hash{}) verifyIterator(t, 100, it, verifyNothing) // Nil is allowed for single layer iterator @@ -91,7 +91,7 @@ func TestStorageIteratorBasics(t *testing.T) { nilStorage[h] = nilstorage } // Add some (identical) layers on top - diffLayer := newDiffLayer(emptyLayer(), common.Hash{}, nil, copyAccounts(accounts), copyStorage(storage)) + diffLayer := newDiffLayer(emptyLayer(), common.Hash{}, nil, copyAccounts(accounts), copyStorage(storage), nil) for account := range accounts { it, _ := diffLayer.StorageIterator(account, common.Hash{}) verifyIterator(t, 100, it, verifyNothing) // Nil is allowed for single layer iterator @@ -222,13 +222,13 @@ func TestAccountIteratorTraversal(t *testing.T) { } // Stack three diff layers on top with various overlaps snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, - randomAccountSet("0xaa", "0xee", "0xff", "0xf0"), nil) + randomAccountSet("0xaa", "0xee", "0xff", "0xf0"), nil, nil) snaps.update(common.HexToHash("0x03"), common.HexToHash("0x02"), nil, - randomAccountSet("0xbb", "0xdd", "0xf0"), nil) + randomAccountSet("0xbb", "0xdd", "0xf0"), nil, nil) snaps.update(common.HexToHash("0x04"), common.HexToHash("0x03"), nil, - randomAccountSet("0xcc", "0xf0", "0xff"), nil) + randomAccountSet("0xcc", "0xf0", "0xff"), nil, nil) // Verify the single and multi-layer iterators head := snaps.Snapshot(common.HexToHash("0x04")) @@ -269,13 +269,13 @@ func TestStorageIteratorTraversal(t *testing.T) { } // Stack three diff layers on top with various overlaps snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, - randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x01", "0x02", "0x03"}}, nil)) + randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x01", "0x02", "0x03"}}, nil), nil) snaps.update(common.HexToHash("0x03"), common.HexToHash("0x02"), nil, - randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x04", "0x05", "0x06"}}, nil)) + randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x04", "0x05", "0x06"}}, nil), nil) snaps.update(common.HexToHash("0x04"), common.HexToHash("0x03"), nil, - randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x01", "0x02", "0x03"}}, nil)) + randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x01", "0x02", "0x03"}}, nil), nil) // Verify the single and multi-layer iterators head := snaps.Snapshot(common.HexToHash("0x04")) @@ -353,14 +353,14 @@ func TestAccountIteratorTraversalValues(t *testing.T) { } } // Assemble a stack of snapshots from the account layers - snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, a, nil) - snaps.update(common.HexToHash("0x03"), common.HexToHash("0x02"), nil, b, nil) - snaps.update(common.HexToHash("0x04"), common.HexToHash("0x03"), nil, c, nil) - snaps.update(common.HexToHash("0x05"), common.HexToHash("0x04"), nil, d, nil) - snaps.update(common.HexToHash("0x06"), common.HexToHash("0x05"), nil, e, nil) - snaps.update(common.HexToHash("0x07"), common.HexToHash("0x06"), nil, f, nil) - snaps.update(common.HexToHash("0x08"), common.HexToHash("0x07"), nil, g, nil) - snaps.update(common.HexToHash("0x09"), common.HexToHash("0x08"), nil, h, nil) + snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, a, nil, nil) + snaps.update(common.HexToHash("0x03"), common.HexToHash("0x02"), nil, b, nil, nil) + snaps.update(common.HexToHash("0x04"), common.HexToHash("0x03"), nil, c, nil, nil) + snaps.update(common.HexToHash("0x05"), common.HexToHash("0x04"), nil, d, nil, nil) + snaps.update(common.HexToHash("0x06"), common.HexToHash("0x05"), nil, e, nil, nil) + snaps.update(common.HexToHash("0x07"), common.HexToHash("0x06"), nil, f, nil, nil) + snaps.update(common.HexToHash("0x08"), common.HexToHash("0x07"), nil, g, nil, nil) + snaps.update(common.HexToHash("0x09"), common.HexToHash("0x08"), nil, h, nil, nil) it, _ := snaps.AccountIterator(common.HexToHash("0x09"), common.Hash{}) head := snaps.Snapshot(common.HexToHash("0x09")) @@ -452,14 +452,14 @@ func TestStorageIteratorTraversalValues(t *testing.T) { } } // Assemble a stack of snapshots from the account layers - snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, randomAccountSet("0xaa"), wrapStorage(a)) - snaps.update(common.HexToHash("0x03"), common.HexToHash("0x02"), nil, randomAccountSet("0xaa"), wrapStorage(b)) - snaps.update(common.HexToHash("0x04"), common.HexToHash("0x03"), nil, randomAccountSet("0xaa"), wrapStorage(c)) - snaps.update(common.HexToHash("0x05"), common.HexToHash("0x04"), nil, randomAccountSet("0xaa"), wrapStorage(d)) - snaps.update(common.HexToHash("0x06"), common.HexToHash("0x05"), nil, randomAccountSet("0xaa"), wrapStorage(e)) - snaps.update(common.HexToHash("0x07"), common.HexToHash("0x06"), nil, randomAccountSet("0xaa"), wrapStorage(e)) - snaps.update(common.HexToHash("0x08"), common.HexToHash("0x07"), nil, randomAccountSet("0xaa"), wrapStorage(g)) - snaps.update(common.HexToHash("0x09"), common.HexToHash("0x08"), nil, randomAccountSet("0xaa"), wrapStorage(h)) + snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, randomAccountSet("0xaa"), wrapStorage(a), nil) + snaps.update(common.HexToHash("0x03"), common.HexToHash("0x02"), nil, randomAccountSet("0xaa"), wrapStorage(b), nil) + snaps.update(common.HexToHash("0x04"), common.HexToHash("0x03"), nil, randomAccountSet("0xaa"), wrapStorage(c), nil) + snaps.update(common.HexToHash("0x05"), common.HexToHash("0x04"), nil, randomAccountSet("0xaa"), wrapStorage(d), nil) + snaps.update(common.HexToHash("0x06"), common.HexToHash("0x05"), nil, randomAccountSet("0xaa"), wrapStorage(e), nil) + snaps.update(common.HexToHash("0x07"), common.HexToHash("0x06"), nil, randomAccountSet("0xaa"), wrapStorage(e), nil) + snaps.update(common.HexToHash("0x08"), common.HexToHash("0x07"), nil, randomAccountSet("0xaa"), wrapStorage(g), nil) + snaps.update(common.HexToHash("0x09"), common.HexToHash("0x08"), nil, randomAccountSet("0xaa"), wrapStorage(h), nil) it, _ := snaps.StorageIterator(common.HexToHash("0x09"), common.HexToHash("0xaa"), common.Hash{}) head := snaps.Snapshot(common.HexToHash("0x09")) @@ -522,7 +522,7 @@ func TestAccountIteratorLargeTraversal(t *testing.T) { }, } for i := 1; i < 128; i++ { - snaps.update(common.HexToHash(fmt.Sprintf("0x%02x", i+1)), common.HexToHash(fmt.Sprintf("0x%02x", i)), nil, makeAccounts(200), nil) + snaps.update(common.HexToHash(fmt.Sprintf("0x%02x", i+1)), common.HexToHash(fmt.Sprintf("0x%02x", i)), nil, makeAccounts(200), nil, nil) } // Iterate the entire stack and ensure everything is hit only once head := snaps.Snapshot(common.HexToHash("0x80")) @@ -567,13 +567,13 @@ func TestAccountIteratorFlattening(t *testing.T) { } // Create a stack of diffs on top snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, - randomAccountSet("0xaa", "0xee", "0xff", "0xf0"), nil) + randomAccountSet("0xaa", "0xee", "0xff", "0xf0"), nil, nil) snaps.update(common.HexToHash("0x03"), common.HexToHash("0x02"), nil, - randomAccountSet("0xbb", "0xdd", "0xf0"), nil) + randomAccountSet("0xbb", "0xdd", "0xf0"), nil, nil) snaps.update(common.HexToHash("0x04"), common.HexToHash("0x03"), nil, - randomAccountSet("0xcc", "0xf0", "0xff"), nil) + randomAccountSet("0xcc", "0xf0", "0xff"), nil, nil) // Create an iterator and flatten the data from underneath it it, _ := snaps.AccountIterator(common.HexToHash("0x04"), common.Hash{}) @@ -598,13 +598,13 @@ func TestAccountIteratorSeek(t *testing.T) { }, } snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, - randomAccountSet("0xaa", "0xee", "0xff", "0xf0"), nil) + randomAccountSet("0xaa", "0xee", "0xff", "0xf0"), nil, nil) snaps.update(common.HexToHash("0x03"), common.HexToHash("0x02"), nil, - randomAccountSet("0xbb", "0xdd", "0xf0"), nil) + randomAccountSet("0xbb", "0xdd", "0xf0"), nil, nil) snaps.update(common.HexToHash("0x04"), common.HexToHash("0x03"), nil, - randomAccountSet("0xcc", "0xf0", "0xff"), nil) + randomAccountSet("0xcc", "0xf0", "0xff"), nil, nil) // Account set is now // 02: aa, ee, f0, ff @@ -662,13 +662,13 @@ func TestStorageIteratorSeek(t *testing.T) { } // Stack three diff layers on top with various overlaps snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, - randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x01", "0x03", "0x05"}}, nil)) + randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x01", "0x03", "0x05"}}, nil), nil) snaps.update(common.HexToHash("0x03"), common.HexToHash("0x02"), nil, - randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x02", "0x05", "0x06"}}, nil)) + randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x02", "0x05", "0x06"}}, nil), nil) snaps.update(common.HexToHash("0x04"), common.HexToHash("0x03"), nil, - randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x01", "0x05", "0x08"}}, nil)) + randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x01", "0x05", "0x08"}}, nil), nil) // Account set is now // 02: 01, 03, 05 @@ -725,17 +725,17 @@ func TestAccountIteratorDeletions(t *testing.T) { } // Stack three diff layers on top with various overlaps snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), - nil, randomAccountSet("0x11", "0x22", "0x33"), nil) + nil, randomAccountSet("0x11", "0x22", "0x33"), nil, nil) deleted := common.HexToHash("0x22") destructed := map[common.Hash]struct{}{ deleted: {}, } snaps.update(common.HexToHash("0x03"), common.HexToHash("0x02"), - destructed, randomAccountSet("0x11", "0x33"), nil) + destructed, randomAccountSet("0x11", "0x33"), nil, nil) snaps.update(common.HexToHash("0x04"), common.HexToHash("0x03"), - nil, randomAccountSet("0x33", "0x44", "0x55"), nil) + nil, randomAccountSet("0x33", "0x44", "0x55"), nil, nil) // The output should be 11,33,44,55 it, _ := snaps.AccountIterator(common.HexToHash("0x04"), common.Hash{}) @@ -771,10 +771,10 @@ func TestStorageIteratorDeletions(t *testing.T) { } // Stack three diff layers on top with various overlaps snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, - randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x01", "0x03", "0x05"}}, nil)) + randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x01", "0x03", "0x05"}}, nil), nil) snaps.update(common.HexToHash("0x03"), common.HexToHash("0x02"), nil, - randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x02", "0x04", "0x06"}}, [][]string{{"0x01", "0x03"}})) + randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x02", "0x04", "0x06"}}, [][]string{{"0x01", "0x03"}}), nil) // The output should be 02,04,05,06 it, _ := snaps.StorageIterator(common.HexToHash("0x03"), common.HexToHash("0xaa"), common.Hash{}) @@ -790,7 +790,7 @@ func TestStorageIteratorDeletions(t *testing.T) { destructed := map[common.Hash]struct{}{ common.HexToHash("0xaa"): {}, } - snaps.update(common.HexToHash("0x04"), common.HexToHash("0x03"), destructed, nil, nil) + snaps.update(common.HexToHash("0x04"), common.HexToHash("0x03"), destructed, nil, nil, nil) it, _ = snaps.StorageIterator(common.HexToHash("0x04"), common.HexToHash("0xaa"), common.Hash{}) verifyIterator(t, 0, it, verifyStorage) @@ -798,7 +798,7 @@ func TestStorageIteratorDeletions(t *testing.T) { // Re-insert the slots of the same account snaps.update(common.HexToHash("0x05"), common.HexToHash("0x04"), nil, - randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x07", "0x08", "0x09"}}, nil)) + randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x07", "0x08", "0x09"}}, nil), nil) // The output should be 07,08,09 it, _ = snaps.StorageIterator(common.HexToHash("0x05"), common.HexToHash("0xaa"), common.Hash{}) @@ -806,7 +806,7 @@ func TestStorageIteratorDeletions(t *testing.T) { it.Release() // Destruct the whole storage but re-create the account in the same layer - snaps.update(common.HexToHash("0x06"), common.HexToHash("0x05"), destructed, randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x11", "0x12"}}, nil)) + snaps.update(common.HexToHash("0x06"), common.HexToHash("0x05"), destructed, randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x11", "0x12"}}, nil), nil) it, _ = snaps.StorageIterator(common.HexToHash("0x06"), common.HexToHash("0xaa"), common.Hash{}) verifyIterator(t, 2, it, verifyStorage) // The output should be 11,12 it.Release() @@ -848,7 +848,7 @@ func BenchmarkAccountIteratorTraversal(b *testing.B) { }, } for i := 1; i <= 100; i++ { - snaps.update(common.HexToHash(fmt.Sprintf("0x%02x", i+1)), common.HexToHash(fmt.Sprintf("0x%02x", i)), nil, makeAccounts(200), nil) + snaps.update(common.HexToHash(fmt.Sprintf("0x%02x", i+1)), common.HexToHash(fmt.Sprintf("0x%02x", i)), nil, makeAccounts(200), nil, nil) } // We call this once before the benchmark, so the creation of // sorted accountlists are not included in the results. @@ -943,9 +943,9 @@ func BenchmarkAccountIteratorLargeBaselayer(b *testing.B) { base.root: base, }, } - snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, makeAccounts(2000), nil) + snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, makeAccounts(2000), nil, nil) for i := 2; i <= 100; i++ { - snaps.update(common.HexToHash(fmt.Sprintf("0x%02x", i+1)), common.HexToHash(fmt.Sprintf("0x%02x", i)), nil, makeAccounts(20), nil) + snaps.update(common.HexToHash(fmt.Sprintf("0x%02x", i+1)), common.HexToHash(fmt.Sprintf("0x%02x", i)), nil, makeAccounts(20), nil, nil) } // We call this once before the benchmark, so the creation of // sorted accountlists are not included in the results. diff --git a/core/state/snapshot/journal.go b/core/state/snapshot/journal.go index 6747aed01c..3c18294cc1 100644 --- a/core/state/snapshot/journal.go +++ b/core/state/snapshot/journal.go @@ -248,7 +248,7 @@ func loadDiffLayer(parent snapshot, r *rlp.Stream) (snapshot, error) { } storageData[entry.Hash] = slots } - return loadDiffLayer(newDiffLayer(parent, root, destructSet, accountData, storageData), r) + return loadDiffLayer(newDiffLayer(parent, root, destructSet, accountData, storageData, nil), r) } // Journal terminates any in-progress snapshot generation, also implicitly pushing diff --git a/core/state/snapshot/snapshot.go b/core/state/snapshot/snapshot.go index 41f38bd255..38f52acced 100644 --- a/core/state/snapshot/snapshot.go +++ b/core/state/snapshot/snapshot.go @@ -101,6 +101,15 @@ type Snapshot interface { // Root returns the root hash for which this snapshot was made. Root() common.Hash + // WaitAndGetVerifyRes will wait until the snapshot been verified and return verification result + WaitAndGetVerifyRes() bool + + // Verified returns whether the snapshot is verified + Verified() bool + + // Store the verification result + MarkValid() + // Account directly retrieves the account associated with a particular hash in // the snapshot slim data format. Account(hash common.Hash) (*Account, error) @@ -130,7 +139,7 @@ type snapshot interface { // the specified data items. // // Note, the maps are retained by the method to avoid copying everything. - Update(blockRoot common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) *diffLayer + Update(blockRoot common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte, verified chan struct{}) *diffLayer // Journal commits an entire diff hierarchy to disk into a single journal entry. // This is meant to be used during shutdown to persist the snapshot without @@ -322,14 +331,14 @@ func (t *Tree) Snapshots(root common.Hash, limits int, nodisk bool) []Snapshot { return ret } -func (t *Tree) Update(blockRoot common.Hash, parentRoot common.Hash, destructs map[common.Address]struct{}, accounts map[common.Address][]byte, storage map[common.Address]map[string][]byte) error { +func (t *Tree) Update(blockRoot common.Hash, parentRoot common.Hash, destructs map[common.Address]struct{}, accounts map[common.Address][]byte, storage map[common.Address]map[string][]byte, verified chan struct{}) error { hashDestructs, hashAccounts, hashStorage := transformSnapData(destructs, accounts, storage) - return t.update(blockRoot, parentRoot, hashDestructs, hashAccounts, hashStorage) + return t.update(blockRoot, parentRoot, hashDestructs, hashAccounts, hashStorage, verified) } // Update adds a new snapshot into the tree, if that can be linked to an existing // old parent. It is disallowed to insert a disk layer (the origin of all). -func (t *Tree) update(blockRoot common.Hash, parentRoot common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) error { +func (t *Tree) update(blockRoot common.Hash, parentRoot common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte, verified chan struct{}) error { // Reject noop updates to avoid self-loops in the snapshot tree. This is a // special case that can only happen for Clique networks where empty blocks // don't modify the state (0 block subsidy). @@ -344,7 +353,7 @@ func (t *Tree) update(blockRoot common.Hash, parentRoot common.Hash, destructs m if parent == nil { return fmt.Errorf("parent [%#x] snapshot missing", parentRoot) } - snap := parent.(snapshot).Update(blockRoot, destructs, accounts, storage) + snap := parent.(snapshot).Update(blockRoot, destructs, accounts, storage, verified) // Save the new snapshot for later t.lock.Lock() diff --git a/core/state/snapshot/snapshot_test.go b/core/state/snapshot/snapshot_test.go index f8ced63665..187186a3be 100644 --- a/core/state/snapshot/snapshot_test.go +++ b/core/state/snapshot/snapshot_test.go @@ -105,7 +105,7 @@ func TestDiskLayerExternalInvalidationFullFlatten(t *testing.T) { accounts := map[common.Hash][]byte{ common.HexToHash("0xa1"): randomAccount(), } - if err := snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, accounts, nil); err != nil { + if err := snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, accounts, nil, nil); err != nil { t.Fatalf("failed to create a diff layer: %v", err) } if n := len(snaps.layers); n != 2 { @@ -149,10 +149,10 @@ func TestDiskLayerExternalInvalidationPartialFlatten(t *testing.T) { accounts := map[common.Hash][]byte{ common.HexToHash("0xa1"): randomAccount(), } - if err := snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, accounts, nil); err != nil { + if err := snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, accounts, nil, nil); err != nil { t.Fatalf("failed to create a diff layer: %v", err) } - if err := snaps.update(common.HexToHash("0x03"), common.HexToHash("0x02"), nil, accounts, nil); err != nil { + if err := snaps.update(common.HexToHash("0x03"), common.HexToHash("0x02"), nil, accounts, nil, nil); err != nil { t.Fatalf("failed to create a diff layer: %v", err) } if n := len(snaps.layers); n != 3 { @@ -197,13 +197,13 @@ func TestDiffLayerExternalInvalidationPartialFlatten(t *testing.T) { accounts := map[common.Hash][]byte{ common.HexToHash("0xa1"): randomAccount(), } - if err := snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, accounts, nil); err != nil { + if err := snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, accounts, nil, nil); err != nil { t.Fatalf("failed to create a diff layer: %v", err) } - if err := snaps.update(common.HexToHash("0x03"), common.HexToHash("0x02"), nil, accounts, nil); err != nil { + if err := snaps.update(common.HexToHash("0x03"), common.HexToHash("0x02"), nil, accounts, nil, nil); err != nil { t.Fatalf("failed to create a diff layer: %v", err) } - if err := snaps.update(common.HexToHash("0x04"), common.HexToHash("0x03"), nil, accounts, nil); err != nil { + if err := snaps.update(common.HexToHash("0x04"), common.HexToHash("0x03"), nil, accounts, nil, nil); err != nil { t.Fatalf("failed to create a diff layer: %v", err) } if n := len(snaps.layers); n != 4 { @@ -257,12 +257,12 @@ func TestPostCapBasicDataAccess(t *testing.T) { }, } // The lowest difflayer - snaps.update(common.HexToHash("0xa1"), common.HexToHash("0x01"), nil, setAccount("0xa1"), nil) - snaps.update(common.HexToHash("0xa2"), common.HexToHash("0xa1"), nil, setAccount("0xa2"), nil) - snaps.update(common.HexToHash("0xb2"), common.HexToHash("0xa1"), nil, setAccount("0xb2"), nil) + snaps.update(common.HexToHash("0xa1"), common.HexToHash("0x01"), nil, setAccount("0xa1"), nil, nil) + snaps.update(common.HexToHash("0xa2"), common.HexToHash("0xa1"), nil, setAccount("0xa2"), nil, nil) + snaps.update(common.HexToHash("0xb2"), common.HexToHash("0xa1"), nil, setAccount("0xb2"), nil, nil) - snaps.update(common.HexToHash("0xa3"), common.HexToHash("0xa2"), nil, setAccount("0xa3"), nil) - snaps.update(common.HexToHash("0xb3"), common.HexToHash("0xb2"), nil, setAccount("0xb3"), nil) + snaps.update(common.HexToHash("0xa3"), common.HexToHash("0xa2"), nil, setAccount("0xa3"), nil, nil) + snaps.update(common.HexToHash("0xb3"), common.HexToHash("0xb2"), nil, setAccount("0xb3"), nil, nil) // checkExist verifies if an account exiss in a snapshot checkExist := func(layer *diffLayer, key string) error { @@ -357,7 +357,7 @@ func TestSnaphots(t *testing.T) { ) for i := 0; i < 129; i++ { head = makeRoot(uint64(i + 2)) - snaps.update(head, last, nil, setAccount(fmt.Sprintf("%d", i+2)), nil) + snaps.update(head, last, nil, setAccount(fmt.Sprintf("%d", i+2)), nil, nil) last = head snaps.Cap(head, 128) // 130 layers (128 diffs + 1 accumulator + 1 disk) } diff --git a/core/state/state_test.go b/core/state/state_test.go index 77847772c6..4be9ae8ce3 100644 --- a/core/state/state_test.go +++ b/core/state/state_test.go @@ -54,7 +54,9 @@ func TestDump(t *testing.T) { // write some of them to the trie s.state.updateStateObject(obj1) s.state.updateStateObject(obj2) - s.state.Commit(false) + s.state.Finalise(false) + s.state.AccountsIntermediateRoot() + s.state.Commit(nil) // check that DumpToCollector contains the state objects that are in trie got := string(s.state.Dump(false, false, true)) @@ -95,7 +97,9 @@ func TestNull(t *testing.T) { var value common.Hash s.state.SetState(address, common.Hash{}, value) - s.state.Commit(false) + s.state.Finalise(false) + s.state.AccountsIntermediateRoot() + s.state.Commit(nil) if value := s.state.GetState(address, common.Hash{}); value != (common.Hash{}) { t.Errorf("expected empty current value, got %x", value) @@ -167,7 +171,9 @@ func TestSnapshot2(t *testing.T) { so0.deleted = false state.SetStateObject(so0) - root, _, _ := state.Commit(false) + state.Finalise(false) + state.AccountsIntermediateRoot() + root, _, _ := state.Commit(nil) state, _ = New(root, state.db, state.snaps) // and one with deleted == true diff --git a/core/state/statedb.go b/core/state/statedb.go index f05de2ceb1..384373c98e 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -73,10 +73,13 @@ func (n *proofList) Delete(key []byte) error { // * Contracts // * Accounts type StateDB struct { - db Database - prefetcher *triePrefetcher - originalRoot common.Hash // The pre-state root, before any changes were made - currentRoot common.Hash // only used when noTrie is true + db Database + prefetcherLock sync.Mutex + prefetcher *triePrefetcher + originalRoot common.Hash // The pre-state root, before any changes were made + currentRoot common.Hash // only used when noTrie is true + expectedRoot common.Hash // The state root in the block header + stateRoot common.Hash // The calculation result of IntermediateRoot trie Trie noTrie bool @@ -85,6 +88,8 @@ type StateDB struct { diffTries map[common.Address]Trie diffCode map[common.Hash][]byte lightProcessed bool + fullProcessed bool + pipeCommit bool snapMux sync.Mutex snaps *snapshot.Tree @@ -157,12 +162,7 @@ func newStateDB(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, journal: newJournal(), hasher: crypto.NewKeccakState(), } - tr, err := db.OpenTrie(root) - if err != nil { - return nil, err - } - _, sdb.noTrie = tr.(*trie.EmptyTrie) - sdb.trie = tr + if sdb.snaps != nil { if sdb.snap = sdb.snaps.Snapshot(root); sdb.snap != nil { sdb.snapDestructs = make(map[common.Address]struct{}) @@ -170,6 +170,15 @@ func newStateDB(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, sdb.snapStorage = make(map[common.Address]map[string][]byte) } } + + snapVerified := sdb.snap != nil && sdb.snap.Verified() + tr, err := db.OpenTrie(root) + // return error when 1. failed to open trie and 2. the snap is nil or the snap is not nil and done verification + if err != nil && (sdb.snap == nil || snapVerified) { + return nil, err + } + _, sdb.noTrie = tr.(*trie.EmptyTrie) + sdb.trie = tr return sdb, nil } @@ -177,6 +186,8 @@ func newStateDB(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, // state trie concurrently while the state is mutated so that when we reach the // commit phase, most of the needed data is already hot. func (s *StateDB) StartPrefetcher(namespace string) { + s.prefetcherLock.Lock() + defer s.prefetcherLock.Unlock() if s.noTrie { return } @@ -192,6 +203,8 @@ func (s *StateDB) StartPrefetcher(namespace string) { // StopPrefetcher terminates a running prefetcher and reports any leftover stats // from the gathered metrics. func (s *StateDB) StopPrefetcher() { + s.prefetcherLock.Lock() + defer s.prefetcherLock.Unlock() if s.noTrie { return } @@ -201,11 +214,28 @@ func (s *StateDB) StopPrefetcher() { } } +// Mark that the block is processed by diff layer +func (s *StateDB) SetExpectedStateRoot(root common.Hash) { + s.expectedRoot = root +} + // Mark that the block is processed by diff layer func (s *StateDB) MarkLightProcessed() { s.lightProcessed = true } +// Enable the pipeline commit function of statedb +func (s *StateDB) EnablePipeCommit() { + if s.snap != nil { + s.pipeCommit = true + } +} + +// Mark that the block is full processed +func (s *StateDB) MarkFullProcessed() { + s.fullProcessed = true +} + func (s *StateDB) IsLightProcessed() bool { return s.lightProcessed } @@ -229,8 +259,20 @@ func (s *StateDB) Error() error { return s.dbErr } -func (s *StateDB) Trie() Trie { - return s.trie +// Not thread safe +func (s *StateDB) Trie() (Trie, error) { + if s.trie == nil { + err := s.WaitPipeVerification() + if err != nil { + return nil, err + } + tr, err := s.db.OpenTrie(s.originalRoot) + if err != nil { + return nil, err + } + s.trie = tr + } + return s.trie, nil } func (s *StateDB) SetDiff(diffLayer *types.DiffLayer, diffTries map[common.Address]Trie, diffCode map[common.Hash][]byte) { @@ -378,6 +420,9 @@ func (s *StateDB) GetProof(addr common.Address) ([][]byte, error) { // GetProofByHash returns the Merkle proof for a given account. func (s *StateDB) GetProofByHash(addrHash common.Hash) ([][]byte, error) { var proof proofList + if _, err := s.Trie(); err != nil { + return nil, err + } err := s.trie.Prove(addrHash[:], 0, &proof) return proof, err } @@ -928,6 +973,17 @@ func (s *StateDB) GetRefund() uint64 { return s.refund } +// GetRefund returns the current value of the refund counter. +func (s *StateDB) WaitPipeVerification() error { + // We need wait for the parent trie to commit + if s.snap != nil { + if valid := s.snap.WaitAndGetVerifyRes(); !valid { + return fmt.Errorf("verification on parent snap failed") + } + } + return nil +} + // Finalise finalises the state by removing the s destructed objects and clears // the journal as well as the refunds. Finalise, however, will not push any updates // into the tries just yet. Only IntermediateRoot or Commit will do that. @@ -988,22 +1044,11 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { } // Finalise all the dirty storage states and write them into the tries s.Finalise(deleteEmptyObjects) + s.AccountsIntermediateRoot() + return s.StateIntermediateRoot() +} - // If there was a trie prefetcher operating, it gets aborted and irrevocably - // modified after we start retrieving tries. Remove it from the statedb after - // this round of use. - // - // This is weird pre-byzantium since the first tx runs with a prefetcher and - // the remainder without, but pre-byzantium even the initial prefetcher is - // useless, so no sleep lost. - prefetcher := s.prefetcher - if s.prefetcher != nil { - defer func() { - s.prefetcher.close() - s.prefetcher = nil - }() - } - +func (s *StateDB) AccountsIntermediateRoot() { tasks := make(chan func()) finishCh := make(chan struct{}) defer close(finishCh) @@ -1020,6 +1065,7 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { } }() } + // Although naively it makes sense to retrieve the account trie and then do // the contract storage and account updates sequentially, that short circuits // the account prefetcher. Instead, let's process all the storage updates @@ -1051,6 +1097,27 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { } } wg.Wait() +} + +func (s *StateDB) StateIntermediateRoot() common.Hash { + // If there was a trie prefetcher operating, it gets aborted and irrevocably + // modified after we start retrieving tries. Remove it from the statedb after + // this round of use. + // + // This is weird pre-byzantium since the first tx runs with a prefetcher and + // the remainder without, but pre-byzantium even the initial prefetcher is + // useless, so no sleep lost. + prefetcher := s.prefetcher + defer func() { + s.prefetcherLock.Lock() + if s.prefetcher != nil { + s.prefetcher.close() + s.prefetcher = nil + } + // try not use defer inside defer + s.prefetcherLock.Unlock() + }() + // Now we're about to start to write changes to the trie. The trie is so far // _untouched_. We can check with the prefetcher, if it can give us a trie // which has the same root, but also has some content loaded into it. @@ -1062,7 +1129,7 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { if s.trie == nil { tr, err := s.db.OpenTrie(s.originalRoot) if err != nil { - panic("Failed to open trie tree") + panic(fmt.Sprintf("Failed to open trie tree %s", s.originalRoot)) } s.trie = tr } @@ -1111,9 +1178,12 @@ func (s *StateDB) clearJournalAndRefund() { s.validRevisions = s.validRevisions[:0] // Snapshots can be created without journal entires } -func (s *StateDB) LightCommit(root common.Hash) (common.Hash, *types.DiffLayer, error) { +func (s *StateDB) LightCommit() (common.Hash, *types.DiffLayer, error) { codeWriter := s.db.TrieDB().DiskDB().NewBatch() + // light process already verified it, expectedRoot is trustworthy. + root := s.expectedRoot + commitFuncs := []func() error{ func() error { for codeHash, code := range s.diffCode { @@ -1201,7 +1271,8 @@ func (s *StateDB) LightCommit(root common.Hash) (common.Hash, *types.DiffLayer, } // Only update if there's a state transition (skip empty Clique blocks) if parent := s.snap.Root(); parent != root { - if err := s.snaps.Update(root, parent, s.snapDestructs, s.snapAccounts, s.snapStorage); err != nil { + // for light commit, always do sync commit + if err := s.snaps.Update(root, parent, s.snapDestructs, s.snapAccounts, s.snapStorage, nil); err != nil { log.Warn("Failed to update snapshot tree", "from", parent, "to", root, "err", err) } // Keep n diff layers in the memory @@ -1235,23 +1306,42 @@ func (s *StateDB) LightCommit(root common.Hash) (common.Hash, *types.DiffLayer, } // Commit writes the state to the underlying in-memory trie database. -func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, *types.DiffLayer, error) { +func (s *StateDB) Commit(failPostCommitFunc func(), postCommitFuncs ...func() error) (common.Hash, *types.DiffLayer, error) { if s.dbErr != nil { return common.Hash{}, nil, fmt.Errorf("commit aborted due to earlier error: %v", s.dbErr) } // Finalize any pending changes and merge everything into the tries - root := s.IntermediateRoot(deleteEmptyObjects) if s.lightProcessed { - return s.LightCommit(root) + root, diff, err := s.LightCommit() + if err != nil { + return root, diff, err + } + for _, postFunc := range postCommitFuncs { + err = postFunc() + if err != nil { + return root, diff, err + } + } + return root, diff, nil } var diffLayer *types.DiffLayer + var verified chan struct{} + var snapUpdated chan struct{} if s.snap != nil { diffLayer = &types.DiffLayer{} } - commitFuncs := []func() error{ - func() error { - // Commit objects to the trie, measuring the elapsed time - tasks := make(chan func(batch ethdb.KeyValueWriter)) + if s.pipeCommit { + // async commit the MPT + verified = make(chan struct{}) + snapUpdated = make(chan struct{}) + } + + commmitTrie := func() error { + commitErr := func() error { + if s.stateRoot = s.StateIntermediateRoot(); s.fullProcessed && s.expectedRoot != s.stateRoot { + return fmt.Errorf("invalid merkle root (remote: %x local: %x)", s.expectedRoot, s.stateRoot) + } + tasks := make(chan func()) taskResults := make(chan error, len(s.stateObjectsDirty)) tasksNum := 0 finishCh := make(chan struct{}) @@ -1262,17 +1352,11 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, *types.DiffLayer wg.Add(1) go func() { defer wg.Done() - codeWriter := s.db.TrieDB().DiskDB().NewBatch() for { select { case task := <-tasks: - task(codeWriter) + task() case <-finishCh: - if codeWriter.ValueSize() > 0 { - if err := codeWriter.Write(); err != nil { - log.Crit("Failed to commit dirty codes", "error", err) - } - } return } } @@ -1295,11 +1379,7 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, *types.DiffLayer for addr := range s.stateObjectsDirty { if obj := s.stateObjects[addr]; !obj.deleted { // Write any contract code associated with the state object - tasks <- func(codeWriter ethdb.KeyValueWriter) { - if obj.code != nil && obj.dirtyCode { - rawdb.WriteCode(codeWriter, common.BytesToHash(obj.CodeHash()), obj.code) - obj.dirtyCode = false - } + tasks <- func() { // Write any storage changes in the state object to its storage trie if !s.noTrie { if err := obj.CommitTrie(s.db); err != nil { @@ -1321,14 +1401,6 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, *types.DiffLayer } close(finishCh) - if len(s.stateObjectsDirty) > 0 { - s.stateObjectsDirty = make(map[common.Address]struct{}, len(s.stateObjectsDirty)/2) - } - // Write the account trie changes, measuing the amount of wasted time - var start time.Time - if metrics.EnabledExpensive { - start = time.Now() - } // The onleaf func is called _serially_, so we can reuse the same account // for unmarshalling every time. if !s.noTrie { @@ -1345,15 +1417,62 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, *types.DiffLayer if err != nil { return err } - if metrics.EnabledExpensive { - s.AccountCommits += time.Since(start) - } if root != emptyRoot { s.db.CacheAccount(root, s.trie) } } + + for _, postFunc := range postCommitFuncs { + err := postFunc() + if err != nil { + return err + } + } wg.Wait() return nil + }() + + if s.pipeCommit { + if commitErr == nil { + <-snapUpdated + s.snaps.Snapshot(s.stateRoot).MarkValid() + } else { + // The blockchain will do the further rewind if write block not finish yet + if failPostCommitFunc != nil { + <-snapUpdated + failPostCommitFunc() + } + log.Error("state verification failed", "err", commitErr) + } + close(verified) + } + return commitErr + } + + commitFuncs := []func() error{ + func() error { + codeWriter := s.db.TrieDB().DiskDB().NewBatch() + for addr := range s.stateObjectsDirty { + if obj := s.stateObjects[addr]; !obj.deleted { + if obj.code != nil && obj.dirtyCode { + rawdb.WriteCode(codeWriter, common.BytesToHash(obj.CodeHash()), obj.code) + obj.dirtyCode = false + if codeWriter.ValueSize() > ethdb.IdealBatchSize { + if err := codeWriter.Write(); err != nil { + return err + } + codeWriter.Reset() + } + } + } + } + if codeWriter.ValueSize() > 0 { + if err := codeWriter.Write(); err != nil { + log.Crit("Failed to commit dirty codes", "error", err) + return err + } + } + return nil }, func() error { // If snapshotting is enabled, update the snapshot tree with this new version @@ -1361,18 +1480,23 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, *types.DiffLayer if metrics.EnabledExpensive { defer func(start time.Time) { s.SnapshotCommits += time.Since(start) }(time.Now()) } + if s.pipeCommit { + defer close(snapUpdated) + } // Only update if there's a state transition (skip empty Clique blocks) - if parent := s.snap.Root(); parent != root { - if err := s.snaps.Update(root, parent, s.snapDestructs, s.snapAccounts, s.snapStorage); err != nil { - log.Warn("Failed to update snapshot tree", "from", parent, "to", root, "err", err) + if parent := s.snap.Root(); parent != s.expectedRoot { + if err := s.snaps.Update(s.expectedRoot, parent, s.snapDestructs, s.snapAccounts, s.snapStorage, verified); err != nil { + log.Warn("Failed to update snapshot tree", "from", parent, "to", s.expectedRoot, "err", err) } // Keep n diff layers in the memory // - head layer is paired with HEAD state // - head-1 layer is paired with HEAD-1 state // - head-(n-1) layer(bottom-most diff layer) is paired with HEAD-(n-1)state - if err := s.snaps.Cap(root, s.snaps.CapLimit()); err != nil { - log.Warn("Failed to cap snapshot tree", "root", root, "layers", s.snaps.CapLimit(), "err", err) - } + go func() { + if err := s.snaps.Cap(s.expectedRoot, s.snaps.CapLimit()); err != nil { + log.Warn("Failed to cap snapshot tree", "root", s.expectedRoot, "layers", s.snaps.CapLimit(), "err", err) + } + }() } } return nil @@ -1384,6 +1508,11 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, *types.DiffLayer return nil }, } + if s.pipeCommit { + go commmitTrie() + } else { + commitFuncs = append(commitFuncs, commmitTrie) + } commitRes := make(chan error, len(commitFuncs)) for _, f := range commitFuncs { tmpFunc := f @@ -1397,7 +1526,11 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, *types.DiffLayer return common.Hash{}, nil, r } } - s.snap, s.snapDestructs, s.snapAccounts, s.snapStorage = nil, nil, nil, nil + root := s.stateRoot + if s.pipeCommit { + root = s.expectedRoot + } + return root, diffLayer, nil } diff --git a/core/state/statedb_test.go b/core/state/statedb_test.go index 2c0b9296ff..acbbf1cd2f 100644 --- a/core/state/statedb_test.go +++ b/core/state/statedb_test.go @@ -102,7 +102,9 @@ func TestIntermediateLeaks(t *testing.T) { } // Commit and cross check the databases. - transRoot, _, err := transState.Commit(false) + transState.Finalise(false) + transState.AccountsIntermediateRoot() + transRoot, _, err := transState.Commit(nil) if err != nil { t.Fatalf("failed to commit transition state: %v", err) } @@ -110,7 +112,9 @@ func TestIntermediateLeaks(t *testing.T) { t.Errorf("can not commit trie %v to persistent database", transRoot.Hex()) } - finalRoot, _, err := finalState.Commit(false) + finalState.Finalise(false) + finalState.AccountsIntermediateRoot() + finalRoot, _, err := finalState.Commit(nil) if err != nil { t.Fatalf("failed to commit final state: %v", err) } @@ -473,7 +477,7 @@ func (test *snapshotTest) checkEqual(state, checkstate *StateDB) error { func TestTouchDelete(t *testing.T) { s := newStateTest() s.state.GetOrNewStateObject(common.Address{}) - root, _, _ := s.state.Commit(false) + root, _, _ := s.state.Commit(nil) s.state, _ = New(root, s.state.db, s.state.snaps) snapshot := s.state.Snapshot() @@ -546,7 +550,9 @@ func TestCopyCommitCopy(t *testing.T) { t.Fatalf("first copy pre-commit committed storage slot mismatch: have %x, want %x", val, common.Hash{}) } - copyOne.Commit(false) + copyOne.Finalise(false) + copyOne.AccountsIntermediateRoot() + copyOne.Commit(nil) if balance := copyOne.GetBalance(addr); balance.Cmp(big.NewInt(42)) != 0 { t.Fatalf("first copy post-commit balance mismatch: have %v, want %v", balance, 42) } @@ -631,7 +637,10 @@ func TestCopyCopyCommitCopy(t *testing.T) { if val := copyTwo.GetCommittedState(addr, skey); val != (common.Hash{}) { t.Fatalf("second copy pre-commit committed storage slot mismatch: have %x, want %x", val, common.Hash{}) } - copyTwo.Commit(false) + + copyTwo.Finalise(false) + copyTwo.AccountsIntermediateRoot() + copyTwo.Commit(nil) if balance := copyTwo.GetBalance(addr); balance.Cmp(big.NewInt(42)) != 0 { t.Fatalf("second copy post-commit balance mismatch: have %v, want %v", balance, 42) } @@ -675,7 +684,9 @@ func TestDeleteCreateRevert(t *testing.T) { addr := common.BytesToAddress([]byte("so")) state.SetBalance(addr, big.NewInt(1)) - root, _, _ := state.Commit(false) + state.Finalise(false) + state.AccountsIntermediateRoot() + root, _, _ := state.Commit(nil) state, _ = New(root, state.db, state.snaps) // Simulate self-destructing in one transaction, then create-reverting in another @@ -686,8 +697,10 @@ func TestDeleteCreateRevert(t *testing.T) { state.SetBalance(addr, big.NewInt(2)) state.RevertToSnapshot(id) + state.Finalise(true) + state.AccountsIntermediateRoot() // Commit the entire state and make sure we don't crash and have the correct state - root, _, _ = state.Commit(true) + root, _, _ = state.Commit(nil) state, _ = New(root, state.db, state.snaps) if state.getStateObject(addr) != nil { @@ -712,7 +725,9 @@ func TestMissingTrieNodes(t *testing.T) { a2 := common.BytesToAddress([]byte("another")) state.SetBalance(a2, big.NewInt(100)) state.SetCode(a2, []byte{1, 2, 4}) - root, _, _ = state.Commit(false) + state.Finalise(false) + state.AccountsIntermediateRoot() + root, _, _ = state.Commit(nil) t.Logf("root: %x", root) // force-flush state.Database().TrieDB().Cap(0) @@ -736,7 +751,9 @@ func TestMissingTrieNodes(t *testing.T) { } // Modify the state state.SetBalance(addr, big.NewInt(2)) - root, _, err := state.Commit(false) + state.Finalise(false) + state.AccountsIntermediateRoot() + root, _, err := state.Commit(nil) if err == nil { t.Fatalf("expected error, got root :%x", root) } diff --git a/core/state/sync_test.go b/core/state/sync_test.go index 24cae59004..fe896791d3 100644 --- a/core/state/sync_test.go +++ b/core/state/sync_test.go @@ -69,7 +69,9 @@ func makeTestState() (Database, common.Hash, []*testAccount) { state.updateStateObject(obj) accounts = append(accounts, acc) } - root, _, _ := state.Commit(false) + state.Finalise(false) + state.AccountsIntermediateRoot() + root, _, _ := state.Commit(nil) // Return the generated state return db, root, accounts diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index ddecd7a202..ed60c811d2 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -20,7 +20,6 @@ import ( "sync" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/gopool" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" ) @@ -106,7 +105,7 @@ func (p *triePrefetcher) close() { for _, fetcher := range p.fetchers { p.abortChan <- fetcher // safe to do multiple times <-fetcher.term - if metrics.Enabled { + if metrics.EnabledExpensive { if fetcher.root == p.root { p.accountLoadMeter.Mark(int64(len(fetcher.seen))) p.accountDupMeter.Mark(int64(fetcher.dups)) @@ -257,9 +256,7 @@ func newSubfetcher(db Database, root common.Hash, accountHash common.Hash) *subf seen: make(map[string]struct{}), accountHash: accountHash, } - gopool.Submit(func() { - sf.loop() - }) + go sf.loop() return sf } @@ -322,8 +319,7 @@ func (sf *subfetcher) loop() { trie, err = sf.db.OpenStorageTrie(sf.accountHash, sf.root) } if err != nil { - log.Warn("Trie prefetcher failed opening trie", "root", sf.root, "err", err) - return + log.Debug("Trie prefetcher failed opening trie", "root", sf.root, "err", err) } sf.trie = trie @@ -332,6 +328,18 @@ func (sf *subfetcher) loop() { select { case <-sf.wake: // Subfetcher was woken up, retrieve any tasks to avoid spinning the lock + if sf.trie == nil { + if sf.accountHash == emptyAddr { + sf.trie, err = sf.db.OpenTrie(sf.root) + } else { + // address is useless + sf.trie, err = sf.db.OpenStorageTrie(sf.accountHash, sf.root) + } + if err != nil { + continue + } + } + sf.lock.Lock() tasks := sf.tasks sf.tasks = nil diff --git a/core/state_prefetcher.go b/core/state_prefetcher.go index ec4e7bf972..d559a03a0f 100644 --- a/core/state_prefetcher.go +++ b/core/state_prefetcher.go @@ -17,7 +17,6 @@ package core import ( - "runtime" "sync/atomic" "github.com/ethereum/go-ethereum/consensus" @@ -27,6 +26,8 @@ import ( "github.com/ethereum/go-ethereum/params" ) +const prefetchThread = 2 + // statePrefetcher is a basic Prefetcher, which blindly executes a block on top // of an arbitrary state with the goal of prefetching potentially useful state // data from disk before the main block processor start executing. @@ -54,25 +55,23 @@ func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, c signer = types.MakeSigner(p.config, header.Number) ) transactions := block.Transactions() - threads := runtime.NumCPU() - batch := len(transactions) / (threads + 1) - if batch == 0 { - return + sortTransactions := make([][]*types.Transaction, prefetchThread) + for i := 0; i < prefetchThread; i++ { + sortTransactions[i] = make([]*types.Transaction, 0, len(transactions)/prefetchThread) + } + for idx := range transactions { + threadIdx := idx % prefetchThread + sortTransactions[threadIdx] = append(sortTransactions[threadIdx], transactions[idx]) } // No need to execute the first batch, since the main processor will do it. - for i := 1; i <= threads; i++ { - start := i * batch - end := (i + 1) * batch - if i == threads { - end = len(transactions) - } - go func(start, end int) { + for i := 0; i < prefetchThread; i++ { + go func(idx int) { newStatedb := statedb.Copy() gaspool := new(GasPool).AddGas(block.GasLimit()) blockContext := NewEVMBlockContext(header, p.bc, nil) evm := vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg) // Iterate over and process the individual transactions - for i, tx := range transactions[start:end] { + for i, tx := range sortTransactions[idx] { // If block precaching was interrupted, abort if interrupt != nil && atomic.LoadUint32(interrupt) == 1 { return @@ -82,23 +81,19 @@ func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, c if err != nil { return // Also invalid block, bail out } - newStatedb.Prepare(tx.Hash(), block.Hash(), i) - if err := precacheTransaction(msg, p.config, gaspool, newStatedb, header, evm); err != nil { - return // Ugh, something went horribly wrong, bail out - } + newStatedb.Prepare(tx.Hash(), header.Hash(), i) + precacheTransaction(msg, p.config, gaspool, newStatedb, header, evm) } - }(start, end) + }(i) } - } // precacheTransaction attempts to apply a transaction to the given state database // and uses the input parameters for its environment. The goal is not to execute // the transaction successfully, rather to warm up touched data slots. -func precacheTransaction(msg types.Message, config *params.ChainConfig, gaspool *GasPool, statedb *state.StateDB, header *types.Header, evm *vm.EVM) error { +func precacheTransaction(msg types.Message, config *params.ChainConfig, gaspool *GasPool, statedb *state.StateDB, header *types.Header, evm *vm.EVM) { // Update the evm with the new transaction context. evm.Reset(NewEVMTxContext(msg), statedb) // Add addresses to access list if applicable - _, err := ApplyMessage(evm, msg, gaspool) - return err + ApplyMessage(evm, msg, gaspool) } diff --git a/core/state_processor.go b/core/state_processor.go index 0569ea5229..16f6eab63b 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -123,6 +123,10 @@ func (p *LightStateProcessor) Process(block *types.Block, statedb *state.StateDB statedb.StopPrefetcher() parent := p.bc.GetHeader(block.ParentHash(), block.NumberU64()-1) statedb, err = state.New(parent.Root, p.bc.stateCache, p.bc.snaps) + statedb.SetExpectedStateRoot(block.Root()) + if p.bc.pipeCommit { + statedb.EnablePipeCommit() + } if err != nil { return statedb, nil, nil, 0, err } @@ -148,9 +152,12 @@ func (p *LightStateProcessor) LightProcess(diffLayer *types.DiffLayer, block *ty for _, c := range diffLayer.Codes { fullDiffCode[c.Hash] = c.Code } - + stateTrie, err := statedb.Trie() + if err != nil { + return nil, nil, 0, err + } for des := range snapDestructs { - statedb.Trie().TryDelete(des[:]) + stateTrie.TryDelete(des[:]) } threads := gopool.Threads(len(snapAccounts)) @@ -191,7 +198,7 @@ func (p *LightStateProcessor) LightProcess(diffLayer *types.DiffLayer, block *ty // fetch previous state var previousAccount state.Account stateMux.Lock() - enc, err := statedb.Trie().TryGet(diffAccount[:]) + enc, err := stateTrie.TryGet(diffAccount[:]) stateMux.Unlock() if err != nil { errChan <- err @@ -303,7 +310,7 @@ func (p *LightStateProcessor) LightProcess(diffLayer *types.DiffLayer, block *ty return } stateMux.Lock() - err = statedb.Trie().TryUpdate(diffAccount[:], bz) + err = stateTrie.TryUpdate(diffAccount[:], bz) stateMux.Unlock() if err != nil { errChan <- err @@ -330,7 +337,7 @@ func (p *LightStateProcessor) LightProcess(diffLayer *types.DiffLayer, block *ty } // Do validate in advance so that we can fall back to full process - if err := p.bc.validator.ValidateState(block, statedb, diffLayer.Receipts, gasUsed); err != nil { + if err := p.bc.validator.ValidateState(block, statedb, diffLayer.Receipts, gasUsed, false); err != nil { log.Error("validate state failed during diff sync", "error", err) return nil, nil, 0, err } @@ -378,6 +385,7 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg gp = new(GasPool).AddGas(block.GasLimit()) ) signer := types.MakeSigner(p.bc.chainConfig, block.Number()) + statedb.TryPreload(block, signer) var receipts = make([]*types.Receipt, 0) // Mutate the block and state according to any hard-fork specs if p.config.DAOForkSupport && p.config.DAOForkBlock != nil && p.config.DAOForkBlock.Cmp(block.Number()) == 0 { @@ -396,6 +404,7 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg // initilise bloom processors bloomProcessors := NewAsyncReceiptBloomGenerator(txNum) + statedb.MarkFullProcessed() // usually do have two tx, one for validator set contract, another for system reward contract. systemTxs := make([]*types.Transaction, 0, 2) diff --git a/core/types.go b/core/types.go index 49bd58e086..5ed4817e68 100644 --- a/core/types.go +++ b/core/types.go @@ -31,7 +31,7 @@ type Validator interface { // ValidateState validates the given statedb and optionally the receipts and // gas used. - ValidateState(block *types.Block, state *state.StateDB, receipts types.Receipts, usedGas uint64) error + ValidateState(block *types.Block, state *state.StateDB, receipts types.Receipts, usedGas uint64, skipHeavyVerify bool) error } // Prefetcher is an interface for pre-caching transaction signatures and state. diff --git a/eth/api_test.go b/eth/api_test.go index b44eed40bc..359671579b 100644 --- a/eth/api_test.go +++ b/eth/api_test.go @@ -77,7 +77,9 @@ func TestAccountRange(t *testing.T) { m[addr] = true } } - state.Commit(true) + state.Finalise(true) + state.AccountsIntermediateRoot() + state.Commit(nil) root := state.IntermediateRoot(true) trie, err := statedb.OpenTrie(root) @@ -134,7 +136,7 @@ func TestEmptyAccountRange(t *testing.T) { statedb = state.NewDatabase(rawdb.NewMemoryDatabase()) state, _ = state.New(common.Hash{}, statedb, nil) ) - state.Commit(true) + state.Commit(nil) state.IntermediateRoot(true) results := state.IteratorDump(true, true, true, (common.Hash{}).Bytes(), AccountRangeMaxResults) if bytes.Equal(results.Next, (common.Hash{}).Bytes()) { diff --git a/eth/backend.go b/eth/backend.go index 96d559911b..0e937c49a0 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -207,6 +207,9 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { if config.DiffSync && !config.NoTries { bcOps = append(bcOps, core.EnableLightProcessor) } + if config.PipeCommit { + bcOps = append(bcOps, core.EnablePipelineCommit) + } if config.PersistDiff { bcOps = append(bcOps, core.EnablePersistDiff(config.DiffBlock)) } diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index f7c9d9f5f9..ee35d123a3 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -139,6 +139,7 @@ type Config struct { DirectBroadcast bool DisableSnapProtocol bool //Whether disable snap protocol DiffSync bool // Whether support diff sync + PipeCommit bool RangeLimit bool TxLookupLimit uint64 `toml:",omitempty"` // The maximum number of blocks from head whose tx indices are reserved. diff --git a/eth/state_accessor.go b/eth/state_accessor.go index 6ea3161d61..24a0e776f6 100644 --- a/eth/state_accessor.go +++ b/eth/state_accessor.go @@ -138,7 +138,9 @@ func (eth *Ethereum) stateAtBlock(block *types.Block, reexec uint64, base *state return nil, fmt.Errorf("processing block %d failed: %v", current.NumberU64(), err) } // Finalize the state so any modifications are written to the trie - root, _, err := statedb.Commit(eth.blockchain.Config().IsEIP158(current.Number())) + statedb.Finalise(eth.blockchain.Config().IsEIP158(current.Number())) + statedb.AccountsIntermediateRoot() + root, _, err := statedb.Commit(nil) if err != nil { return nil, fmt.Errorf("stateAtBlock commit failed, number %d root %v: %w", current.NumberU64(), current.Root().Hex(), err) diff --git a/eth/tracers/api.go b/eth/tracers/api.go index a44982b864..8ee2c22ffd 100644 --- a/eth/tracers/api.go +++ b/eth/tracers/api.go @@ -556,7 +556,9 @@ func (api *API) IntermediateRoots(ctx context.Context, hash common.Hash, config } // calling IntermediateRoot will internally call Finalize on the state // so any modifications are written to the trie - roots = append(roots, statedb.IntermediateRoot(deleteEmptyObjects)) + root := statedb.IntermediateRoot(deleteEmptyObjects) + + roots = append(roots, root) } return roots, nil } diff --git a/ethclient/ethclient_test.go b/ethclient/ethclient_test.go index 380481a22d..f38625bd23 100644 --- a/ethclient/ethclient_test.go +++ b/ethclient/ethclient_test.go @@ -271,6 +271,7 @@ func newTestBackend(t *testing.T) (*node.Node, []*types.Block) { config := ðconfig.Config{Genesis: genesis} config.Ethash.PowMode = ethash.ModeFake config.SnapshotCache = 256 + config.TriesInMemory = 128 ethservice, err := eth.New(n, config) if err != nil { t.Fatalf("can't create new ethereum service: %v", err) diff --git a/miner/worker.go b/miner/worker.go index 2dcb75ac10..28ef170e40 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -634,6 +634,7 @@ func (w *worker) resultLoop() { logs = append(logs, receipt.Logs...) } // Commit block and state to database. + task.state.SetExpectedStateRoot(block.Root()) _, err := w.chain.WriteBlockWithState(block, receipts, logs, task.state, true) if err != nil { log.Error("Failed writing block to chain", "err", err) @@ -994,6 +995,10 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64) // and commits new work if consensus engine is running. func (w *worker) commit(uncles []*types.Header, interval func(), update bool, start time.Time) error { s := w.current.state + err := s.WaitPipeVerification() + if err != nil { + return err + } block, receipts, err := w.engine.FinalizeAndAssemble(w.chain, types.CopyHeader(w.current.header), s, w.current.txs, uncles, w.current.receipts) if err != nil { return err diff --git a/params/protocol_params.go b/params/protocol_params.go index 857bb9a582..84515869b6 100644 --- a/params/protocol_params.go +++ b/params/protocol_params.go @@ -114,7 +114,6 @@ const ( // Precompiled contract gas prices - //TODO need further discussion TendermintHeaderValidateGas uint64 = 3000 // Gas for validate tendermiint consensus state IAVLMerkleProofValidateGas uint64 = 3000 // Gas for validate merkle proof diff --git a/params/version.go b/params/version.go index 0cab1d8822..8faa4bb644 100644 --- a/params/version.go +++ b/params/version.go @@ -23,7 +23,7 @@ import ( const ( VersionMajor = 1 // Major version component of the current release VersionMinor = 1 // Minor version component of the current release - VersionPatch = 7 // Patch version component of the current release + VersionPatch = 8 // Patch version component of the current release VersionMeta = "" // Version metadata to append to the version string ) diff --git a/tests/state_test_util.go b/tests/state_test_util.go index 2b22ca0289..1de4a787dd 100644 --- a/tests/state_test_util.go +++ b/tests/state_test_util.go @@ -198,7 +198,9 @@ func (t *StateTest) RunNoVerify(subtest StateSubtest, vmconfig vm.Config, snapsh } // Commit block - statedb.Commit(config.IsEIP158(block.Number())) + statedb.Finalise(config.IsEIP158(block.Number())) + statedb.AccountsIntermediateRoot() + statedb.Commit(nil) // Add 0-value mining reward. This only makes a difference in the cases // where // - the coinbase suicided, or @@ -226,7 +228,9 @@ func MakePreState(db ethdb.Database, accounts core.GenesisAlloc, snapshotter boo } } // Commit and re-open to start with a clean state. - root, _, _ := statedb.Commit(false) + statedb.Finalise(false) + statedb.AccountsIntermediateRoot() + root, _, _ := statedb.Commit(nil) var snaps *snapshot.Tree if snapshotter { diff --git a/trie/database.go b/trie/database.go index f15ad3bd64..fa8b746c58 100644 --- a/trie/database.go +++ b/trie/database.go @@ -606,14 +606,16 @@ func (db *Database) Cap(limit common.StorageSize) error { // outside code doesn't see an inconsistent state (referenced data removed from // memory cache during commit but not yet in persistent storage). This is ensured // by only uncaching existing data when the database write finalizes. + db.lock.RLock() nodes, storage, start := len(db.dirties), db.dirtiesSize, time.Now() - batch := db.diskdb.NewBatch() - // db.dirtiesSize only contains the useful data in the cache, but when reporting // the total memory consumption, the maintenance metadata is also needed to be // counted. size := db.dirtiesSize + common.StorageSize((len(db.dirties)-1)*cachedNodeSize) size += db.childrenSize - common.StorageSize(len(db.dirties[common.Hash{}].children)*(common.HashLength+2)) + db.lock.RUnlock() + + batch := db.diskdb.NewBatch() // If the preimage cache got large enough, push to disk. If it's still small // leave for later to deduplicate writes. @@ -633,27 +635,35 @@ func (db *Database) Cap(limit common.StorageSize) error { } // Keep committing nodes from the flush-list until we're below allowance oldest := db.oldest - for size > limit && oldest != (common.Hash{}) { - // Fetch the oldest referenced node and push into the batch - node := db.dirties[oldest] - rawdb.WriteTrieNode(batch, oldest, node.rlp()) - - // If we exceeded the ideal batch size, commit and reset - if batch.ValueSize() >= ethdb.IdealBatchSize { - if err := batch.Write(); err != nil { - log.Error("Failed to write flush list to disk", "err", err) - return err + err := func() error { + db.lock.RLock() + defer db.lock.RUnlock() + for size > limit && oldest != (common.Hash{}) { + // Fetch the oldest referenced node and push into the batch + node := db.dirties[oldest] + rawdb.WriteTrieNode(batch, oldest, node.rlp()) + + // If we exceeded the ideal batch size, commit and reset + if batch.ValueSize() >= ethdb.IdealBatchSize { + if err := batch.Write(); err != nil { + log.Error("Failed to write flush list to disk", "err", err) + return err + } + batch.Reset() } - batch.Reset() - } - // Iterate to the next flush item, or abort if the size cap was achieved. Size - // is the total size, including the useful cached data (hash -> blob), the - // cache item metadata, as well as external children mappings. - size -= common.StorageSize(common.HashLength + int(node.size) + cachedNodeSize) - if node.children != nil { - size -= common.StorageSize(cachedNodeChildrenSize + len(node.children)*(common.HashLength+2)) + // Iterate to the next flush item, or abort if the size cap was achieved. Size + // is the total size, including the useful cached data (hash -> blob), the + // cache item metadata, as well as external children mappings. + size -= common.StorageSize(common.HashLength + int(node.size) + cachedNodeSize) + if node.children != nil { + size -= common.StorageSize(cachedNodeChildrenSize + len(node.children)*(common.HashLength+2)) + } + oldest = node.flushNext } - oldest = node.flushNext + return nil + }() + if err != nil { + return err } // Flush out any remainder data from the last batch if err := batch.Write(); err != nil { @@ -723,7 +733,9 @@ func (db *Database) Commit(node common.Hash, report bool, callback func(common.H batch.Reset() } // Move the trie itself into the batch, flushing if enough data is accumulated + db.lock.RLock() nodes, storage := len(db.dirties), db.dirtiesSize + db.lock.RUnlock() uncacher := &cleaner{db} if err := db.commit(node, batch, uncacher, callback); err != nil { @@ -767,10 +779,14 @@ func (db *Database) Commit(node common.Hash, report bool, callback func(common.H // commit is the private locked version of Commit. func (db *Database) commit(hash common.Hash, batch ethdb.Batch, uncacher *cleaner, callback func(common.Hash)) error { // If the node does not exist, it's a previously committed node + db.lock.RLock() node, ok := db.dirties[hash] if !ok { + db.lock.RUnlock() return nil } + db.lock.RUnlock() + var err error node.forChilds(func(child common.Hash) { if err == nil {