Skip to content

Commit

Permalink
fix several UT with racing issues (bnb-chain#5)
Browse files Browse the repository at this point in the history
* fix several UT with racing issues

* fix incorrect nonce balance codehash issue

case: TestEIP1559 / TestDeleteThenCreate

* Fix ExecutionSpec tests

mainly root caused by balance not updated to dirty correctly.
also fix similar issue with nonce and codehash.

* fix TestBlockChain testcase issue

    TestBlockchain/ValidBlocks/bcStateTests/refundReset.json

Co-authored-by: Sunny <sunny2022.za@gmail.com>
  • Loading branch information
DavidZangNR and sunny2022da committed Aug 13, 2024
1 parent 3c6f2c2 commit e7ee5cc
Show file tree
Hide file tree
Showing 11 changed files with 392 additions and 324 deletions.
7 changes: 3 additions & 4 deletions core/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,9 @@ func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateD
func() error {
// Validate the state root against the received state root and throw
// an error if they don't match.
// @TODO shall we disable it?
//if root := statedb.IntermediateRoot(v.config.IsEIP158(header.Number)); header.Root != root {
// return fmt.Errorf("invalid merkle root (remote: %x local: %x) dberr: %w", header.Root, root, statedb.Error())
//}
if root := statedb.IntermediateRoot(v.config.IsEIP158(header.Number)); header.Root != root {
return fmt.Errorf("invalid merkle root (remote: %x local: %x) dberr: %w", header.Root, root, statedb.Error())
}
return nil
},
}
Expand Down
20 changes: 20 additions & 0 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2002,8 +2002,28 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
if !setHead {
// Don't set the head, only insert the block
err = bc.writeBlockWithState(block, receipts, statedb)
if false {
fmt.Printf("Dav -- After writeBlockWithState: %d check balance\n", block.NumberU64())
actual := statedb.GetBalance(block.Coinbase())
fmt.Printf("Dav -- AfterwriteBlockWithState: %d balance: %d\n", block.NumberU64(), actual.Uint64())
}
} else {
status, err = bc.writeBlockAndSetHead(block, receipts, logs, statedb, false)
if false {
fmt.Printf("Dav -- After writeBlockAndSetHead: %d check balance\n", block.NumberU64())
actual := statedb.GetBalance(block.Coinbase())
fmt.Printf("Dav -- writeBlockAndSetHead: %d balance: %d\n", block.NumberU64(), actual.Uint64())

s, _ := bc.State()
bk := bc.CurrentBlock()
fmt.Printf("Dav -- writeBlockAndSetHead - currentBlock: %d root: %s\n", bk.Number.Uint64(), bk.Root)
obj, _ := s.GetStateObjectFromSnapshotOrTrie(block.Coinbase())

//obj, _ := statedb.GetStateObjectFromSnapshotOrTrie(block.Coinbase())

fmt.Printf("Dav -- writeBlockAndSetHead: %d obj from snap or trie: %p\n", block.NumberU64(), obj)

}
}
followupInterrupt.Store(true)
if err != nil {
Expand Down
58 changes: 32 additions & 26 deletions core/parallel_state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type ParallelStateProcessor struct {
slotState []*SlotState // idle, or pending messages
allTxReqs []*ParallelTxRequest
txResultChan chan *ParallelTxResult // to notify dispatcher that a tx is done
mergedTxIndex int // the latest finalized tx index, fixme: use Atomic
mergedTxIndex atomic.Int32 // the latest finalized tx index, fixme: use Atomic
pendingConfirmResults map[int][]*ParallelTxResult // tx could be executed several times, with several result to check
unconfirmedResults *sync.Map // this is for stage2 confirm, since pendingConfirmResults can not be accessed in stage2 loop
unconfirmedDBs *sync.Map
Expand Down Expand Up @@ -102,7 +102,7 @@ type ParallelTxRequest struct {
curTxChan chan int
systemAddrRedo bool
runnable int32 // 0: not runnable, 1: runnable
executedNum int32
executedNum atomic.Int32
retryNum int32
}

Expand Down Expand Up @@ -148,7 +148,7 @@ func (p *ParallelStateProcessor) resetState(txNum int, statedb *state.StateDB) {
if txNum == 0 {
return
}
p.mergedTxIndex = -1
p.mergedTxIndex.Store(-1)
p.debugConflictRedoNum = 0
p.inConfirmStage2 = false

Expand Down Expand Up @@ -251,8 +251,8 @@ func (p *ParallelStateProcessor) switchSlot(slotIndex int) {
}

func (p *ParallelStateProcessor) executeInSlot(slotIndex int, txReq *ParallelTxRequest) *ParallelTxResult {
atomic.AddInt32(&txReq.executedNum, 1)
slotDB := state.NewSlotDB(txReq.baseStateDB, txReq.txIndex, p.mergedTxIndex, p.unconfirmedDBs)
execNum := txReq.executedNum.Add(1)
slotDB := state.NewSlotDB(txReq.baseStateDB, txReq.txIndex, int(p.mergedTxIndex.Load()), p.unconfirmedDBs)

blockContext := NewEVMBlockContext(txReq.block.Header(), p.bc, nil, p.config, slotDB) // can share blockContext within a block for efficiency
txContext := NewEVMTxContext(txReq.msg)
Expand All @@ -274,7 +274,7 @@ func (p *ParallelStateProcessor) executeInSlot(slotIndex int, txReq *ParallelTxR

evm, result, err := applyTransactionStageExecution(txReq.msg, gpSlot, slotDB, vmenv)
txResult := ParallelTxResult{
executedIndex: atomic.LoadInt32(&txReq.executedNum),
executedIndex: execNum,
slotIndex: slotIndex,
txReq: txReq,
receipt: nil, // receipt is generated in finalize stage
Expand Down Expand Up @@ -304,7 +304,7 @@ func (p *ParallelStateProcessor) executeInSlot(slotIndex int, txReq *ParallelTxR
// to confirm a serial TxResults with same txIndex
func (p *ParallelStateProcessor) toConfirmTxIndex(targetTxIndex int, isStage2 bool) *ParallelTxResult {
if isStage2 {
if targetTxIndex <= p.mergedTxIndex+1 {
if targetTxIndex <= int(p.mergedTxIndex.Load())+1 {
// `p.mergedTxIndex+1` is the one to be merged,
// in stage2, we do likely conflict check, for these not their turn.
return nil
Expand All @@ -327,7 +327,7 @@ func (p *ParallelStateProcessor) toConfirmTxIndex(targetTxIndex int, isStage2 bo
if atomic.LoadInt32(&targetResult.txReq.runnable) == 1 {
return nil
}
if targetResult.executedIndex < atomic.LoadInt32(&targetResult.txReq.executedNum) {
if targetResult.executedIndex < targetResult.txReq.executedNum.Load() {
// skip the intermediate result that is not the latest.
return nil
}
Expand Down Expand Up @@ -358,22 +358,24 @@ func (p *ParallelStateProcessor) toConfirmTxIndex(targetTxIndex int, isStage2 bo
blockTxCount := targetResult.txReq.block.Transactions().Len()
// This means that the tx has been executed more than blockTxCount times, so it exits with the error.
// TODO-dav: p.mergedTxIndex+2 may be more reasonable? - this is buggy for expected exit
if targetResult.txReq.txIndex == p.mergedTxIndex+1 {
if targetResult.txReq.txIndex == int(p.mergedTxIndex.Load())+1 {
// txReq is the next to merge
if atomic.LoadInt32(&targetResult.txReq.retryNum) <= int32(blockTxCount)+3000 {
atomic.AddInt32(&targetResult.txReq.retryNum, 1)
// conflict retry
} else {
// retry 100 times and still conflict, either the tx is expected to be wrong, or something wrong.
// retry many times and still conflict, either the tx is expected to be wrong, or something wrong.
if targetResult.err != nil {
fmt.Printf("!!!!!!!!!!! Parallel execution exited with error!!!!!, txIndex:%d, err: %v\n", targetResult.txReq.txIndex, targetResult.err)
if true { // TODO: delete the printf
fmt.Printf("!!!!!!!!!!! Parallel execution exited with error!!!!!, txIndex:%d, err: %v\n", targetResult.txReq.txIndex, targetResult.err)
}
return targetResult
} else {
// abnormal exit with conflict error, need check the parallel algorithm
targetResult.err = ErrParallelUnexpectedConflict

fmt.Printf("!!!!!!!!!!! Parallel execution exited unexpected conflict!!!!!, txIndex:%d\n", targetResult.txReq.txIndex)

if true {
fmt.Printf("!!!!!!!!!!! Parallel execution exited unexpected conflict!!!!!, txIndex:%d\n", targetResult.txReq.txIndex)
}
return targetResult
}
}
Expand Down Expand Up @@ -443,7 +445,7 @@ func (p *ParallelStateProcessor) runSlotLoop(slotIndex int, slotType int32) {

interrupted := false
for _, txReq := range curSlot.pendingTxReqList {
if txReq.txIndex <= p.mergedTxIndex {
if txReq.txIndex <= int(p.mergedTxIndex.Load()) {
continue
}

Expand Down Expand Up @@ -471,7 +473,7 @@ func (p *ParallelStateProcessor) runSlotLoop(slotIndex int, slotType int32) {
// as long as the TxReq is runnable, we steal it, mark it as stolen
for _, stealTxReq := range p.allTxReqs {
// fmt.Printf("Dav -- stealLoop, handle TxREQ: %d\n", stealTxReq.txIndex)
if stealTxReq.txIndex <= p.mergedTxIndex {
if stealTxReq.txIndex <= int(p.mergedTxIndex.Load()) {
// fmt.Printf("Dav -- stealLoop, - txReq.txIndex <= p.mergedTxIndex - TxREQ: %d\n", stealTxReq.txIndex)
continue
}
Expand Down Expand Up @@ -518,7 +520,7 @@ func (p *ParallelStateProcessor) runConfirmStage2Loop() {
// if lucky, it is the Tx's turn, we will do conflict check with WBNB makeup
// otherwise, do conflict check without WBNB makeup, but we will ignore WBNB's balance conflict.
// throw these likely conflicted tx back to re-execute
startTxIndex := p.mergedTxIndex + 2 // stage 2's will start from the next target merge index
startTxIndex := int(p.mergedTxIndex.Load()) + 2 // stage 2's will start from the next target merge index
endTxIndex := startTxIndex + stage2CheckNumber
txSize := len(p.allTxReqs)
if endTxIndex > (txSize - 1) {
Expand All @@ -538,16 +540,16 @@ func (p *ParallelStateProcessor) runConfirmStage2Loop() {
}

func (p *ParallelStateProcessor) handleTxResults() *ParallelTxResult {
confirmedResult := p.toConfirmTxIndex(p.mergedTxIndex+1, false)
confirmedResult := p.toConfirmTxIndex(int(p.mergedTxIndex.Load())+1, false)
if confirmedResult == nil {
return nil
}
// schedule stage 2 when new Tx has been merged, schedule once and ASAP
// stage 2,if all tx have been executed at least once, and its result has been received.
// in Stage 2, we will run check when main DB is advanced, i.e., new Tx result has been merged.
if p.inConfirmStage2 && p.mergedTxIndex >= p.nextStage2TxIndex {
p.nextStage2TxIndex = p.mergedTxIndex + stage2CheckNumber
p.confirmStage2Chan <- p.mergedTxIndex
if p.inConfirmStage2 && int(p.mergedTxIndex.Load()) >= p.nextStage2TxIndex {
p.nextStage2TxIndex = int(p.mergedTxIndex.Load()) + stage2CheckNumber
p.confirmStage2Chan <- int(p.mergedTxIndex.Load())
}
return confirmedResult
}
Expand Down Expand Up @@ -581,11 +583,11 @@ func (p *ParallelStateProcessor) confirmTxResults(statedb *state.StateDB, gp *Ga
// merge slotDB into mainDB
statedb.MergeSlotDB(result.slotDB, result.receipt, resultTxIndex)

if resultTxIndex != p.mergedTxIndex+1 {
if resultTxIndex != int(p.mergedTxIndex.Load())+1 {
log.Error("ProcessParallel tx result out of order", "resultTxIndex", resultTxIndex,
"p.mergedTxIndex", p.mergedTxIndex)
"p.mergedTxIndex", p.mergedTxIndex.Load())
}
p.mergedTxIndex = resultTxIndex
p.mergedTxIndex.Store(int32(resultTxIndex))

return result
}
Expand Down Expand Up @@ -628,6 +630,8 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat
misc.ApplyPreContractHardFork(statedb)
}

misc.EnsureCreate2Deployer(p.config, block.Time(), statedb)

txNum := len(block.Transactions())
p.resetState(txNum, statedb)

Expand All @@ -645,6 +649,7 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat
ProcessBeaconBlockRoot(*beaconRoot, vmenv, statedb)
}

statedb.MarkFullProcessed()
// var txReqs []*ParallelTxRequest
for i, tx := range block.Transactions() {
// can be moved it into slot for efficiency, but signer is not concurrent safe
Expand All @@ -670,9 +675,9 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat
curTxChan: make(chan int, 1),
systemAddrRedo: false, // set to true, when systemAddr access is detected.
runnable: 1, // 0: not runnable, 1: runnable
executedNum: 0,
retryNum: 0,
}
txReq.executedNum.Store(0)
p.allTxReqs = append(p.allTxReqs, txReq)
}
// set up stage2 enter criteria
Expand All @@ -683,6 +688,7 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat
p.targetStage2Count = p.targetStage2Count - stage2AheadNum
}

// From now on, entering parallel execution.
p.doStaticDispatch(p.allTxReqs) // todo: put txReqs in unit?

// after static dispatch, we notify the slot to work.
Expand All @@ -698,7 +704,7 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat
}
unconfirmedResult := <-p.txResultChan
unconfirmedTxIndex := unconfirmedResult.txReq.txIndex
if unconfirmedTxIndex <= p.mergedTxIndex {
if unconfirmedTxIndex <= int(p.mergedTxIndex.Load()) {
// log.Warn("drop merged txReq", "unconfirmedTxIndex", unconfirmedTxIndex, "p.mergedTxIndex", p.mergedTxIndex)
continue
}
Expand Down
6 changes: 0 additions & 6 deletions core/state/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package state

import (
"fmt"
"math/big"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -176,11 +175,6 @@ func (ch createObjectChange) dirtied() *common.Address {
func (ch resetObjectChange) revert(dber StateDBer) {
s := dber.getBaseStateDB()
if s.parallel.isSlotDB {

if ch.prev.address.Hex() == "0x6295eE1B4F6dD65047762F924Ecd367c17eaBf8f" {
fmt.Printf("Dav - revert() - set dirtiedStateObjectsInSlot[%s] = obj, obj.codehash: %s\n",
ch.prev.address, common.Bytes2Hex(ch.prev.CodeHash()))
}
// ch.prev must be from dirtiedStateObjectsInSlot, put it back
s.parallel.dirtiedStateObjectsInSlot[ch.prev.address] = ch.prev
} else {
Expand Down
Loading

0 comments on commit e7ee5cc

Please sign in to comment.