Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge block-stm to develop #840

Merged
merged 45 commits into from
May 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
b1ba97c
Create MVHashMap and use it StateDB
cffls Jul 6, 2022
bbcc6dd
Parallel state processor
cffls Jul 16, 2022
61accb0
Move fee burning and tipping out of state transition to reduce read/w…
cffls Jul 26, 2022
ab3ebeb
Re-execute parallel tasks when there is a read in coinbase or burn ad…
cffls Jul 27, 2022
f7bd7ca
Txn prioritizer implemented using mutex map (#487)
pratikspatil024 Aug 19, 2022
4507b2e
added getReadMap and getWriteMap (#473)
pratikspatil024 Aug 19, 2022
c36ad88
Block-stm optimization
cffls Aug 8, 2022
f7c041f
Do not write entire objects directly when applying write set in blockstm
cffls Sep 6, 2022
d107c18
fixed a small bug in the Report function (#530)
pratikspatil024 Sep 27, 2022
471afc8
Refactor blockstm executor
cffls Sep 22, 2022
e63e390
Recognize bad transactions and break loop in blockstm executor
cffls Sep 28, 2022
ad658b6
Minor bug fix
cffls Sep 30, 2022
6f0d16f
Add ability to calculate the longest execution path in a block
cffls Oct 5, 2022
c59bb6e
Added unit tests for MV HashMap (#492)
pratikspatil024 Oct 7, 2022
2060b4f
Add tx dependency to block header
cffls Oct 6, 2022
da74f8c
Use a new hasher for each account access
cffls Nov 14, 2022
3a87233
Change functions of Key from pointer to value reference
cffls Nov 14, 2022
96e66e5
Merge branch 'develop' of https://github.com/maticnetwork/bor into bl…
pratikspatil024 Nov 21, 2022
d53c2e7
Block stm miner dependency (#561)
pratikspatil024 Dec 23, 2022
6f16d00
Merge branch 'develop' of https://github.com/maticnetwork/bor into bl…
pratikspatil024 Dec 23, 2022
4968c08
added hardfork checks (#666)
pratikspatil024 Jan 19, 2023
d25aa76
Minor fix in statedb test
cffls Jan 24, 2023
9795a28
added 2 flags to enable parallel EVM and set the number of speculativ…
pratikspatil024 Feb 9, 2023
55962e1
[Bug fix] Use parallel processor in unit test
cffls Feb 14, 2023
480ccf2
Optimized the dependency metadata structure (#804)
pratikspatil024 Apr 5, 2023
463bea0
[Bug fix] Fix tx dependency
cffls Feb 21, 2023
362db7a
Merge remote-tracking branch 'upstream/develop' into block-stm
cffls Apr 11, 2023
650e141
Allow process in state processor to be interrupted by caller
cffls Apr 11, 2023
de0cec5
Merge pull request #818 from cffls/block-stm
cffls Apr 12, 2023
d76fc15
Merge pull request #819 from cffls/jc/block-stm/state_processor_inter…
cffls Apr 13, 2023
48c1f90
Allow parallel state processor to be interrupted by caller
cffls Apr 14, 2023
5860b8e
Create new block context for each parallel task
cffls Apr 18, 2023
01fc7e3
Run serial and parallel processor at the same time
cffls Apr 14, 2023
f9171c4
Add ParallelExecFailedError
cffls Apr 25, 2023
72c030b
Merge pull request #831 from cffls/block-stm
cffls Apr 27, 2023
c88be5b
Disable IO tracing
cffls Apr 28, 2023
a093c4e
Merge remote-tracking branch 'upstream/develop' into block-stm
cffls May 1, 2023
8239026
Remove recover from settle
cffls May 1, 2023
01dd019
Fit linters
cffls May 2, 2023
badb574
Skip looking up interruptedTxCache if it is not initialized
cffls May 3, 2023
193aee0
Make block context sharable in parallel state processor
cffls May 4, 2023
adce0e8
removed PSP comment
pratikspatil024 May 10, 2023
c39c66f
Added check for circular and out-of-range dependency problem (#841)
pratikspatil024 May 11, 2023
f2c48fe
Remove dependency when a transaction is reverted
cffls May 11, 2023
0a7594b
removed docs/config.md
pratikspatil024 May 12, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions builder/files/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ syncmode = "full"
# period = 0
# gaslimit = 11500000

# [parallelevm]
# enable = false
# procs = 8

# [pprof]
# pprof = false
# port = 6060
Expand Down
124 changes: 104 additions & 20 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/ethereum/go-ethereum/common/prque"
"github.com/ethereum/go-ethereum/common/tracing"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core/blockstm"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/state/snapshot"
Expand Down Expand Up @@ -76,11 +77,13 @@ var (
snapshotStorageReadTimer = metrics.NewRegisteredTimer("chain/snapshot/storage/reads", nil)
snapshotCommitTimer = metrics.NewRegisteredTimer("chain/snapshot/commits", nil)

blockImportTimer = metrics.NewRegisteredMeter("chain/imports", nil)
blockInsertTimer = metrics.NewRegisteredTimer("chain/inserts", nil)
blockValidationTimer = metrics.NewRegisteredTimer("chain/validation", nil)
blockExecutionTimer = metrics.NewRegisteredTimer("chain/execution", nil)
blockWriteTimer = metrics.NewRegisteredTimer("chain/write", nil)
blockImportTimer = metrics.NewRegisteredMeter("chain/imports", nil)
blockInsertTimer = metrics.NewRegisteredTimer("chain/inserts", nil)
blockValidationTimer = metrics.NewRegisteredTimer("chain/validation", nil)
blockExecutionTimer = metrics.NewRegisteredTimer("chain/execution", nil)
blockWriteTimer = metrics.NewRegisteredTimer("chain/write", nil)
blockExecutionParallelCounter = metrics.NewRegisteredCounter("chain/execution/parallel", nil)
blockExecutionSerialCounter = metrics.NewRegisteredCounter("chain/execution/serial", nil)

blockReorgMeter = metrics.NewRegisteredMeter("chain/reorg/executes", nil)
blockReorgAddMeter = metrics.NewRegisteredMeter("chain/reorg/add", nil)
Expand Down Expand Up @@ -216,12 +219,13 @@ type BlockChain struct {
running int32 // 0 if chain is running, 1 when stopped
procInterrupt int32 // interrupt signaler for block processing

engine consensus.Engine
validator Validator // Block and state validator interface
prefetcher Prefetcher
processor Processor // Block transaction processor interface
forker *ForkChoice
vmConfig vm.Config
engine consensus.Engine
validator Validator // Block and state validator interface
prefetcher Prefetcher
processor Processor // Block transaction processor interface
parallelProcessor Processor // Parallel block transaction processor interface
forker *ForkChoice
vmConfig vm.Config

// Bor related changes
borReceiptsCache *lru.Cache // Cache for the most recent bor receipt receipts per block
Expand Down Expand Up @@ -435,6 +439,93 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
return bc, nil
}

// Similar to NewBlockChain, this function creates a new blockchain object, but with a parallel state processor
func NewParallelBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *params.ChainConfig, engine consensus.Engine, vmConfig vm.Config, shouldPreserve func(header *types.Header) bool, txLookupLimit *uint64, checker ethereum.ChainValidator) (*BlockChain, error) {
bc, err := NewBlockChain(db, cacheConfig, chainConfig, engine, vmConfig, shouldPreserve, txLookupLimit, checker)

if err != nil {
return nil, err
}

bc.parallelProcessor = NewParallelStateProcessor(chainConfig, bc, engine)

return bc, nil
}

func (bc *BlockChain) ProcessBlock(block *types.Block, parent *types.Header) (types.Receipts, []*types.Log, uint64, *state.StateDB, error) {
// Process the block using processor and parallelProcessor at the same time, take the one which finishes first, cancel the other, and return the result
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

type Result struct {
receipts types.Receipts
logs []*types.Log
usedGas uint64
err error
statedb *state.StateDB
counter metrics.Counter
}

resultChan := make(chan Result, 2)

processorCount := 0

if bc.parallelProcessor != nil {
parallelStatedb, err := state.New(parent.Root, bc.stateCache, bc.snaps)
if err != nil {
return nil, nil, 0, nil, err
}

processorCount++

go func() {
parallelStatedb.StartPrefetcher("chain")
receipts, logs, usedGas, err := bc.parallelProcessor.Process(block, parallelStatedb, bc.vmConfig, ctx)
resultChan <- Result{receipts, logs, usedGas, err, parallelStatedb, blockExecutionParallelCounter}
}()
}

if bc.processor != nil {
statedb, err := state.New(parent.Root, bc.stateCache, bc.snaps)
if err != nil {
return nil, nil, 0, nil, err
}

processorCount++

go func() {
statedb.StartPrefetcher("chain")
receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig, ctx)
resultChan <- Result{receipts, logs, usedGas, err, statedb, blockExecutionSerialCounter}
}()
}

result := <-resultChan

if _, ok := result.err.(blockstm.ParallelExecFailedError); ok {
log.Warn("Parallel state processor failed", "err", result.err)

// If the parallel processor failed, we will fallback to the serial processor if enabled
if processorCount == 2 {
result.statedb.StopPrefetcher()
result = <-resultChan
processorCount--
}
}

result.counter.Inc(1)

// Make sure we are not leaking any prefetchers
if processorCount == 2 {
go func() {
second_result := <-resultChan
second_result.statedb.StopPrefetcher()
}()
}

return result.receipts, result.logs, result.usedGas, result.statedb, result.err
}

// empty returns an indicator whether the blockchain is empty.
// Note, it's a special case that we connect a non-empty ancient
// database with an empty node, so that we can plugin the ancient
Expand Down Expand Up @@ -1761,14 +1852,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool)
if parent == nil {
parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1)
}
statedb, err := state.New(parent.Root, bc.stateCache, bc.snaps)
if err != nil {
return it.index, err
}

// Enable prefetching to pull in trie node paths while processing transactions
statedb.StartPrefetcher("chain")
activeState = statedb

// If we have a followup block, run that against the current state to pre-cache
// transactions and probabilistically some of the account/storage trie nodes.
Expand All @@ -1790,7 +1873,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool)

// Process block using the parent state as reference point
substart := time.Now()
receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig)
receipts, logs, usedGas, statedb, err := bc.ProcessBlock(block, parent)
activeState = statedb
if err != nil {
bc.reportBlock(block, receipts, err)
atomic.StoreUint32(&followupInterrupt, 1)
Expand Down
30 changes: 24 additions & 6 deletions core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/ethereum/go-ethereum/consensus/beacon"
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto"
Expand Down Expand Up @@ -123,9 +122,11 @@ func testFork(t *testing.T, blockchain *BlockChain, i, n int, full bool, compara
if full {
cur := blockchain.CurrentBlock()
tdPre = blockchain.GetTd(cur.Hash(), cur.NumberU64())

if err := testBlockChainImport(blockChainB, blockchain); err != nil {
t.Fatalf("failed to import forked block chain: %v", err)
}

last := blockChainB[len(blockChainB)-1]
tdPost = blockchain.GetTd(last.Hash(), last.NumberU64())
} else {
Expand Down Expand Up @@ -156,11 +157,9 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error {
}
return err
}
statedb, err := state.New(blockchain.GetBlockByHash(block.ParentHash()).Root(), blockchain.stateCache, nil)
if err != nil {
return err
}
receipts, _, usedGas, err := blockchain.processor.Process(block, statedb, vm.Config{})

receipts, _, usedGas, statedb, err := blockchain.ProcessBlock(block, blockchain.GetBlockByHash(block.ParentHash()).Header())

if err != nil {
blockchain.reportBlock(block, receipts, err)
return err
Expand All @@ -180,6 +179,25 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error {
return nil
}

func TestParallelBlockChainImport(t *testing.T) {
t.Parallel()

db, blockchain, err := newCanonical(ethash.NewFaker(), 10, true)
blockchain.parallelProcessor = NewParallelStateProcessor(blockchain.chainConfig, blockchain, blockchain.engine)

if err != nil {
t.Fatalf("failed to make new canonical chain: %v", err)
}

defer blockchain.Stop()

blockChainB := makeFakeNonEmptyBlockChain(blockchain.CurrentBlock(), 5, ethash.NewFaker(), db, forkSeed, 5)

if err := testBlockChainImport(blockChainB, blockchain); err == nil {
t.Fatalf("expected error for bad tx")
}
}

// testHeaderChainImport tries to process a chain of header, writing them into
// the database if successful.
func testHeaderChainImport(chain []*types.Header, blockchain *BlockChain) error {
Expand Down
Loading