diff --git a/core/block_validator.go b/core/block_validator.go index 061e69b8d2..551fe2d58b 100644 --- a/core/block_validator.go +++ b/core/block_validator.go @@ -157,6 +157,12 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error { return ancestorErr } + // TODO(galaio): add more TxDAG hash when TxDAG in consensus, txDAG check here + if len(block.TxDAG()) > 0 { + if _, err := types.DecodeTxDAG(block.TxDAG()); err != nil { + return errors.New("wrong TxDAG in block body") + } + } return nil } diff --git a/core/blockchain.go b/core/blockchain.go index 204278af2b..314bea901e 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1924,6 +1924,15 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) return it.index, err } + // TODO(galaio): use txDAG in some accelerate scenarios. + if len(block.TxDAG()) > 0 { + txDAG, err := types.DecodeTxDAG(block.TxDAG()) + if err != nil { + return it.index, err + } + log.Info("Insert chain", "block", block.NumberU64(), "txDAG", txDAG) + } + // Enable prefetching to pull in trie node paths while processing transactions statedb.StartPrefetcher("chain") activeState = statedb diff --git a/core/chain_makers_test.go b/core/chain_makers_test.go index 9dcf19a5b4..986b032bbe 100644 --- a/core/chain_makers_test.go +++ b/core/chain_makers_test.go @@ -198,7 +198,7 @@ func ExampleGenerateChain() { db = rawdb.NewMemoryDatabase() genDb = rawdb.NewMemoryDatabase() ) - + // Ensure that key1 has some funds in the genesis block. gspec := &Genesis{ Config: ¶ms.ChainConfig{HomesteadBlock: new(big.Int)}, diff --git a/core/state/journal.go b/core/state/journal.go index cf30b0995b..9895de7058 100644 --- a/core/state/journal.go +++ b/core/state/journal.go @@ -18,8 +18,9 @@ package state import ( "fmt" - "github.com/ethereum/go-ethereum/common" "math/big" + + "github.com/ethereum/go-ethereum/common" ) // journalEntry is a modification entry in the state change journal that can be @@ -189,7 +190,7 @@ func (ch resetObjectChange) revert(dber StateDBer) { if !ch.prevdestruct { s.snapParallelLock.Lock() - delete(s.stateObjectsDestruct, ch.prev.address) + s.deleteStateObjectsDestruct(ch.prev.address) s.snapParallelLock.Unlock() } if ch.prevAccount != nil { diff --git a/core/state/state_object.go b/core/state/state_object.go index bcac425dd4..8977d273f3 100644 --- a/core/state/state_object.go +++ b/core/state/state_object.go @@ -25,6 +25,7 @@ import ( "time" "github.com/ethereum/go-ethereum/core/opcodeCompiler/compiler" + "golang.org/x/exp/slices" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -156,6 +157,11 @@ type stateObject struct { origin *types.StateAccount // Account original data without any change applied, nil means it was not existent data types.StateAccount // Account data with all mutations applied in the scope of block + // dirty account state + dirtyBalance *big.Int + dirtyNonce *uint64 + dirtyCodeHash []byte + // Write caches. trie Trie // storage trie, which becomes non-nil on first access code Code // contract bytecode, which gets set when code is loaded @@ -242,7 +248,7 @@ func newObject(dbItf StateDBer, isParallel bool, address common.Address, acct *t if acct == nil { acct = types.NewEmptyStateAccount() } - return &stateObject{ + s := &stateObject{ db: db, dbItf: dbItf, address: address, @@ -255,6 +261,15 @@ func newObject(dbItf StateDBer, isParallel bool, address common.Address, acct *t dirtyStorage: newStorage(isParallel), created: created, } + + // dirty data when create a new account + if acct == nil { + s.dirtyBalance = new(big.Int).Set(acct.Balance) + s.dirtyNonce = new(uint64) + *s.dirtyNonce = acct.Nonce + s.dirtyCodeHash = acct.CodeHash + } + return s } // EncodeRLP implements rlp.Encoder. @@ -345,7 +360,7 @@ func (s *stateObject) GetCommittedState(key common.Hash) common.Hash { // 2) we don't have new values, and can deliver empty response back //if _, destructed := s.db.stateObjectsDestruct[s.address]; destructed { s.db.snapParallelLock.RLock() - if _, destructed := s.db.stateObjectsDestruct[s.address]; destructed { // fixme: use sync.Map, instead of RWMutex? + if _, destructed := s.db.queryStateObjectsDestruct(s.address); destructed { // fixme: use sync.Map, instead of RWMutex? s.db.snapParallelLock.RUnlock() return common.Hash{} } @@ -438,6 +453,18 @@ func (s *stateObject) finalise(prefetch bool) { } return true }) + if s.dirtyNonce != nil { + s.data.Nonce = *s.dirtyNonce + s.dirtyNonce = nil + } + if s.dirtyBalance != nil { + s.data.Balance = s.dirtyBalance + s.dirtyBalance = nil + } + if s.dirtyCodeHash != nil { + s.data.CodeHash = s.dirtyCodeHash + s.dirtyCodeHash = nil + } if s.db.prefetcher != nil && prefetch && len(slotsToPrefetch) > 0 && s.data.Root != types.EmptyRootHash { s.db.prefetcher.prefetch(s.addrHash, s.data.Root, s.address, slotsToPrefetch) } @@ -446,6 +473,28 @@ func (s *stateObject) finalise(prefetch bool) { } } +func (s *stateObject) finaliseRWSet() { + s.dirtyStorage.Range(func(key, value interface{}) bool { + // three are some unclean dirtyStorage from previous reverted txs, it will skip finalise + // so add a new rule, if val has no change, then skip it + if value == s.GetCommittedState(key.(common.Hash)) { + return true + } + s.db.RecordWrite(types.StorageStateKey(s.address, key.(common.Hash)), value.(common.Hash)) + return true + }) + + if s.dirtyNonce != nil && *s.dirtyNonce != s.data.Nonce { + s.db.RecordWrite(types.AccountStateKey(s.address, types.AccountNonce), *s.dirtyNonce) + } + if s.dirtyBalance != nil && s.dirtyBalance.Cmp(s.data.Balance) != 0 { + s.db.RecordWrite(types.AccountStateKey(s.address, types.AccountBalance), new(big.Int).Set(s.dirtyBalance)) + } + if s.dirtyCodeHash != nil && !slices.Equal(s.dirtyCodeHash, s.data.CodeHash) { + s.db.RecordWrite(types.AccountStateKey(s.address, types.AccountCodeHash), s.dirtyCodeHash) + } +} + // updateTrie is responsible for persisting cached storage changes into the // object's storage trie. In case the storage trie is not yet loaded, this // function will load the trie automatically. If any issues arise during the @@ -644,13 +693,13 @@ func (s *stateObject) SubBalance(amount *big.Int) { func (s *stateObject) SetBalance(amount *big.Int) { s.db.journal.append(balanceChange{ account: &s.address, - prev: new(big.Int).Set(s.data.Balance), + prev: new(big.Int).Set(s.Balance()), }) s.setBalance(amount) } func (s *stateObject) setBalance(amount *big.Int) { - s.data.Balance = amount + s.dirtyBalance = amount } // ReturnGas Return the gas back to the origin. Used by the Virtual machine or Closures @@ -715,6 +764,17 @@ func (s *stateObject) deepCopy(db *StateDB) *stateObject { obj.selfDestructed = s.selfDestructed obj.dirtyCode = s.dirtyCode obj.deleted = s.deleted + + // dirty states + if s.dirtyNonce != nil { + obj.dirtyNonce = new(uint64) + *obj.dirtyNonce = *s.dirtyNonce + } + if s.dirtyBalance != nil { + obj.dirtyBalance = new(big.Int).Set(s.dirtyBalance) + } + obj.dirtyCodeHash = s.dirtyCodeHash + return obj } @@ -784,7 +844,7 @@ func (s *stateObject) SetCode(codeHash common.Hash, code []byte) { func (s *stateObject) setCode(codeHash common.Hash, code []byte) { s.code = code - s.data.CodeHash = codeHash[:] + s.dirtyCodeHash = codeHash[:] s.dirtyCode = true compiler.GenOrLoadOptimizedCode(codeHash, s.code) } @@ -799,18 +859,27 @@ func (s *stateObject) SetNonce(nonce uint64) { } func (s *stateObject) setNonce(nonce uint64) { - s.data.Nonce = nonce + s.dirtyNonce = &nonce } func (s *stateObject) CodeHash() []byte { + if len(s.dirtyCodeHash) > 0 { + return s.dirtyCodeHash + } return s.data.CodeHash } func (s *stateObject) Balance() *big.Int { + if s.dirtyBalance != nil { + return s.dirtyBalance + } return s.data.Balance } func (s *stateObject) Nonce() uint64 { + if s.dirtyNonce != nil { + return *s.dirtyNonce + } return s.data.Nonce } diff --git a/core/state/statedb.go b/core/state/statedb.go index 024b56e35f..c397ac5134 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -19,6 +19,7 @@ package state import ( "bytes" + "errors" "fmt" "math/big" "runtime" @@ -208,10 +209,11 @@ type StateDB struct { // This map holds 'live' objects, which will get modified while processing // a state transition. - stateObjects map[common.Address]*stateObject - stateObjectsPending map[common.Address]struct{} // State objects finalized but not yet written to the trie - stateObjectsDirty map[common.Address]struct{} // State objects modified in the current execution - stateObjectsDestruct map[common.Address]*types.StateAccount // State objects destructed in the block along with its previous value + stateObjects map[common.Address]*stateObject + stateObjectsPending map[common.Address]struct{} // State objects finalized but not yet written to the trie + stateObjectsDirty map[common.Address]struct{} // State objects modified in the current execution + stateObjectsDestruct map[common.Address]*types.StateAccount // State objects destructed in the block along with its previous value + stateObjectsDestructDirty map[common.Address]*types.StateAccount // DB error. // State objects are used by the consensus core and VM which are @@ -231,6 +233,11 @@ type StateDB struct { logs map[common.Hash][]*types.Log logSize uint + // parallel EVM related + rwSet *types.RWSet + mvStates *types.MVStates + es *types.ExeStat + // Preimages occurred seen by VM in the scope of block. preimages map[common.Hash][]byte @@ -285,24 +292,25 @@ func New(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error) return nil, err } sdb := &StateDB{ - db: db, - trie: tr, - originalRoot: root, - snaps: snaps, - accounts: make(map[common.Hash][]byte), - storages: make(map[common.Hash]map[common.Hash][]byte), - accountsOrigin: make(map[common.Address][]byte), - storagesOrigin: make(map[common.Address]map[common.Hash][]byte), - stateObjects: make(map[common.Address]*stateObject), - stateObjectsPending: make(map[common.Address]struct{}), - stateObjectsDirty: make(map[common.Address]struct{}), - stateObjectsDestruct: make(map[common.Address]*types.StateAccount), - logs: make(map[common.Hash][]*types.Log), - preimages: make(map[common.Hash][]byte), - journal: newJournal(), - accessList: newAccessList(), - transientStorage: newTransientStorage(), - hasher: crypto.NewKeccakState(), + db: db, + trie: tr, + originalRoot: root, + snaps: snaps, + accounts: make(map[common.Hash][]byte), + storages: make(map[common.Hash]map[common.Hash][]byte), + accountsOrigin: make(map[common.Address][]byte), + storagesOrigin: make(map[common.Address]map[common.Hash][]byte), + stateObjects: make(map[common.Address]*stateObject), + stateObjectsPending: make(map[common.Address]struct{}), + stateObjectsDirty: make(map[common.Address]struct{}), + stateObjectsDestruct: make(map[common.Address]*types.StateAccount), + stateObjectsDestructDirty: make(map[common.Address]*types.StateAccount, defaultNumOfSlots), + logs: make(map[common.Hash][]*types.Log), + preimages: make(map[common.Hash][]byte), + journal: newJournal(), + accessList: newAccessList(), + transientStorage: newTransientStorage(), + hasher: crypto.NewKeccakState(), parallel: ParallelState{ SlotIndex: -1, @@ -477,7 +485,10 @@ func (s *StateDB) Empty(addr common.Address) bool { } // GetBalance retrieves the balance from the given address or 0 if object not found -func (s *StateDB) GetBalance(addr common.Address) *big.Int { +func (s *StateDB) GetBalance(addr common.Address) (ret *big.Int) { + defer func() { + s.RecordRead(types.AccountStateKey(addr, types.AccountBalance), ret) + }() object := s.getStateObject(addr) if object != nil { return object.Balance() @@ -490,7 +501,10 @@ func (s *StateDB) GetBalanceOpCode(addr common.Address) *big.Int { } // GetNonce retrieves the nonce from the given address or 0 if object not found -func (s *StateDB) GetNonce(addr common.Address) uint64 { +func (s *StateDB) GetNonce(addr common.Address) (ret uint64) { + defer func() { + s.RecordRead(types.AccountStateKey(addr, types.AccountNonce), ret) + }() object := s.getStateObject(addr) if object != nil { return object.Nonce() @@ -519,6 +533,9 @@ func (s *StateDB) BaseTxIndex() int { } func (s *StateDB) GetCode(addr common.Address) []byte { + defer func() { + s.RecordRead(types.AccountStateKey(addr, types.AccountCodeHash), s.GetCodeHash(addr)) + }() object := s.getStateObject(addr) if object != nil { return object.Code() @@ -527,6 +544,9 @@ func (s *StateDB) GetCode(addr common.Address) []byte { } func (s *StateDB) GetCodeSize(addr common.Address) int { + defer func() { + s.RecordRead(types.AccountStateKey(addr, types.AccountCodeHash), s.GetCodeHash(addr)) + }() object := s.getStateObject(addr) if object != nil { return object.CodeSize() @@ -538,7 +558,10 @@ func (s *StateDB) GetCodeSize(addr common.Address) int { // - common.Hash{}: the address does not exist // - emptyCodeHash: the address exist, but code is empty // - others: the address exist, and code is not empty -func (s *StateDB) GetCodeHash(addr common.Address) common.Hash { +func (s *StateDB) GetCodeHash(addr common.Address) (ret common.Hash) { + defer func() { + s.RecordRead(types.AccountStateKey(addr, types.AccountCodeHash), ret.Bytes()) + }() object := s.getStateObject(addr) if object == nil { return common.Hash{} @@ -547,7 +570,10 @@ func (s *StateDB) GetCodeHash(addr common.Address) common.Hash { } // GetState retrieves a value from the given account's storage trie. -func (s *StateDB) GetState(addr common.Address, hash common.Hash) common.Hash { +func (s *StateDB) GetState(addr common.Address, hash common.Hash) (ret common.Hash) { + defer func() { + s.RecordRead(types.StorageStateKey(addr, hash), ret) + }() object := s.getStateObject(addr) if object != nil { return object.GetState(hash) @@ -556,7 +582,10 @@ func (s *StateDB) GetState(addr common.Address, hash common.Hash) common.Hash { } // GetCommittedState retrieves a value from the given account's committed storage trie. -func (s *StateDB) GetCommittedState(addr common.Address, hash common.Hash) common.Hash { +func (s *StateDB) GetCommittedState(addr common.Address, hash common.Hash) (ret common.Hash) { + defer func() { + s.RecordRead(types.StorageStateKey(addr, hash), ret) + }() object := s.getStateObject(addr) if object != nil { return object.GetCommittedState(hash) @@ -585,16 +614,22 @@ func (s *StateDB) HasSelfDestructed(addr common.Address) bool { func (s *StateDB) AddBalance(addr common.Address, amount *big.Int) { object := s.GetOrNewStateObject(addr) if object != nil { + s.RecordRead(types.AccountStateKey(addr, types.AccountBalance), object.Balance()) object.AddBalance(amount) + return } + s.RecordRead(types.AccountStateKey(addr, types.AccountBalance), common.Big0) } // SubBalance subtracts amount from the account associated with addr. func (s *StateDB) SubBalance(addr common.Address, amount *big.Int) { object := s.GetOrNewStateObject(addr) if object != nil { + s.RecordRead(types.AccountStateKey(addr, types.AccountBalance), object.Balance()) object.SubBalance(amount) + return } + s.RecordRead(types.AccountStateKey(addr, types.AccountBalance), common.Big0) } func (s *StateDB) SetBalance(addr common.Address, amount *big.Int) { @@ -637,9 +672,9 @@ func (s *StateDB) SetStorage(addr common.Address, storage map[common.Hash]common // // TODO(rjl493456442) this function should only be supported by 'unwritable' // state and all mutations made should all be discarded afterwards. - if _, ok := s.stateObjectsDestruct[addr]; !ok { + if _, ok := s.queryStateObjectsDestruct(addr); !ok { fmt.Printf("Dav -- setStorage - stateObjectsDestruct[%s] = nil\n", addr) - s.stateObjectsDestruct[addr] = nil + s.tagStateObjectsDestruct(addr, nil) } object := s.GetOrNewStateObject(addr) for k, v := range storage { @@ -663,7 +698,7 @@ func (s *StateDB) SelfDestruct(addr common.Address) { prevbalance: new(big.Int).Set(s.GetBalance(addr)), }) object.markSelfdestructed() - object.data.Balance = new(big.Int) + object.setBalance(new(big.Int)) } func (s *StateDB) Selfdestruct6780(addr common.Address) { @@ -881,6 +916,7 @@ func (s *StateDB) getStateObjectFromSnapshotOrTrie(addr common.Address) (data *t // flag set. This is needed by the state journal to revert to the correct s- // destructed object instead of wiping all knowledge about the state object. func (s *StateDB) getDeletedStateObject(addr common.Address) *stateObject { + s.RecordRead(types.AccountStateKey(addr, types.AccountSelf), struct{}{}) // Prefer live objects if any is available if obj, _ := s.getStateObjectFromStateObjects(addr); obj != nil { return obj @@ -945,9 +981,9 @@ func (s *StateDB) createObject(addr common.Address) (newobj *stateObject) { // will be lost. s.snapParallelLock.Lock() // fixme: with new dispatch policy, the ending Tx could running, while the block have processed. - _, prevdestruct := s.stateObjectsDestruct[prev.address] + _, prevdestruct := s.queryStateObjectsDestruct(prev.address) if !prevdestruct { - s.stateObjectsDestruct[prev.address] = prev.origin + s.tagStateObjectsDestruct(prev.address, prev.origin) } // There may be some cached account/storage data already since IntermediateRoot // will be called for each transaction before byzantium fork which will always @@ -1015,23 +1051,24 @@ func (s *StateDB) CopyDoPrefetch() *StateDB { func (s *StateDB) copyInternal(doPrefetch bool) *StateDB { // Copy all the basic fields, initialize the memory ones state := &StateDB{ - db: s.db, - trie: s.db.CopyTrie(s.trie), - originalRoot: s.originalRoot, - accounts: make(map[common.Hash][]byte), - storages: make(map[common.Hash]map[common.Hash][]byte), - accountsOrigin: make(map[common.Address][]byte), - storagesOrigin: make(map[common.Address]map[common.Hash][]byte), - stateObjects: make(map[common.Address]*stateObject, len(s.journal.dirties)), - stateObjectsPending: make(map[common.Address]struct{}, len(s.stateObjectsPending)), - stateObjectsDirty: make(map[common.Address]struct{}, len(s.journal.dirties)), - stateObjectsDestruct: make(map[common.Address]*types.StateAccount, len(s.stateObjectsDestruct)), - refund: s.refund, - logs: make(map[common.Hash][]*types.Log, len(s.logs)), - logSize: s.logSize, - preimages: make(map[common.Hash][]byte, len(s.preimages)), - journal: newJournal(), - hasher: crypto.NewKeccakState(), + db: s.db, + trie: s.db.CopyTrie(s.trie), + originalRoot: s.originalRoot, + accounts: make(map[common.Hash][]byte), + storages: make(map[common.Hash]map[common.Hash][]byte), + accountsOrigin: make(map[common.Address][]byte), + storagesOrigin: make(map[common.Address]map[common.Hash][]byte), + stateObjects: make(map[common.Address]*stateObject, len(s.journal.dirties)), + stateObjectsPending: make(map[common.Address]struct{}, len(s.stateObjectsPending)), + stateObjectsDirty: make(map[common.Address]struct{}, len(s.journal.dirties)), + stateObjectsDestruct: make(map[common.Address]*types.StateAccount, len(s.stateObjectsDestruct)), + stateObjectsDestructDirty: make(map[common.Address]*types.StateAccount, len(s.stateObjectsDestructDirty)), + refund: s.refund, + logs: make(map[common.Hash][]*types.Log, len(s.logs)), + logSize: s.logSize, + preimages: make(map[common.Hash][]byte, len(s.preimages)), + journal: newJournal(), + hasher: crypto.NewKeccakState(), // In order for the block producer to be able to use and make additions // to the snapshot tree, we need to copy that as well. Otherwise, any @@ -1081,6 +1118,9 @@ func (s *StateDB) copyInternal(doPrefetch bool) *StateDB { // fmt.Printf("Dav -- copyInternal - stateObjectsDestruct[%s] = (%p) : %v \n", addr, value, value) state.stateObjectsDestruct[addr] = value } + for addr, value := range s.stateObjectsDestructDirty { + state.stateObjectsDestructDirty[addr] = value + } // Deep copy the state changes made in the scope of block // along with their original values. state.accounts = copySet(s.accounts) @@ -1116,6 +1156,12 @@ func (s *StateDB) copyInternal(doPrefetch bool) *StateDB { if s.prefetcher != nil { state.prefetcher = s.prefetcher.copy() } + + // parallel EVM related + if s.mvStates != nil { + state.mvStates = s.mvStates + } + return state } @@ -1428,6 +1474,11 @@ func (s *StateDB) GetRefund() uint64 { func (s *StateDB) Finalise(deleteEmptyObjects bool) { addressesToPrefetch := make([][]byte, 0, len(s.journal.dirties)) + // finalise stateObjectsDestruct + for addr, acc := range s.stateObjectsDestructDirty { + s.stateObjectsDestruct[addr] = acc + } + s.stateObjectsDestructDirty = make(map[common.Address]*types.StateAccount) for addr := range s.journal.dirties { var obj *stateObject var exist bool @@ -2245,6 +2296,129 @@ func (s *StateDB) GetSnap() snapshot.Snapshot { return s.snap } +func (s *StateDB) BeforeTxTransition() { + log.Debug("BeforeTxTransition", "mvStates", s.mvStates == nil, "rwSet", s.rwSet == nil) + if s.mvStates == nil { + return + } + s.rwSet = types.NewRWSet(types.StateVersion{ + TxIndex: s.txIndex, + }) +} + +func (s *StateDB) BeginTxStat(index int) { + if s.mvStates == nil { + return + } + s.es = types.NewExeStat(index).Begin() +} + +func (s *StateDB) StopTxStat(usedGas uint64) { + if s.mvStates == nil { + return + } + // record stat first + if s.es != nil { + s.es.Done().WithGas(usedGas).WithRead(len(s.rwSet.ReadSet())) + } +} + +func (s *StateDB) RecordRead(key types.RWKey, val interface{}) { + if s.mvStates == nil || s.rwSet == nil { + return + } + // TODO: read from MVStates, record with ver + s.rwSet.RecordRead(key, types.StateVersion{ + TxIndex: -1, + }, val) +} + +func (s *StateDB) RecordWrite(key types.RWKey, val interface{}) { + if s.mvStates == nil || s.rwSet == nil { + return + } + s.rwSet.RecordWrite(key, val) +} + +func (s *StateDB) ResetMVStates(txCount int) { + s.mvStates = types.NewMVStates(txCount) + s.rwSet = nil +} + +func (s *StateDB) FinaliseRWSet() error { + if s.mvStates == nil || s.rwSet == nil { + return nil + } + // finalise stateObjectsDestruct + for addr, acc := range s.stateObjectsDestructDirty { + s.stateObjectsDestruct[addr] = acc + s.RecordWrite(types.AccountStateKey(addr, types.AccountSuicide), struct{}{}) + } + for addr := range s.journal.dirties { + obj, exist := s.stateObjects[addr] + if !exist { + continue + } + if obj.selfDestructed || obj.empty() { + // We need to maintain account deletions explicitly (will remain + // set indefinitely). Note only the first occurred self-destruct + // event is tracked. + if _, ok := s.stateObjectsDestruct[obj.address]; !ok { + log.Debug("FinaliseRWSet find Destruct", "tx", s.txIndex, "addr", addr, "selfDestructed", obj.selfDestructed) + s.RecordWrite(types.AccountStateKey(addr, types.AccountSuicide), struct{}{}) + } + } else { + // finalise account & storages + obj.finaliseRWSet() + } + } + ver := types.StateVersion{ + TxIndex: s.txIndex, + } + if ver != s.rwSet.Version() { + return errors.New("you finalize a wrong ver of RWSet") + } + + return s.mvStates.FulfillRWSet(s.rwSet, s.es) +} + +func (s *StateDB) queryStateObjectsDestruct(addr common.Address) (*types.StateAccount, bool) { + if acc, ok := s.stateObjectsDestructDirty[addr]; ok { + return acc, ok + } + acc, ok := s.stateObjectsDestruct[addr] + return acc, ok +} + +func (s *StateDB) tagStateObjectsDestruct(addr common.Address, acc *types.StateAccount) { + s.stateObjectsDestructDirty[addr] = acc +} + +func (s *StateDB) deleteStateObjectsDestruct(addr common.Address) { + delete(s.stateObjectsDestructDirty, addr) +} + +func (s *StateDB) MVStates2TxDAG() (types.TxDAG, map[int]*types.ExeStat) { + if s.mvStates == nil { + return types.NewEmptyTxDAG(), nil + } + + return s.mvStates.ResolveTxDAG(), s.mvStates.Stats() +} + +func (s *StateDB) MVStates() *types.MVStates { + return s.mvStates +} + +func (s *StateDB) RecordSystemTxRWSet(index int) { + if s.mvStates == nil { + return + } + s.mvStates.FulfillRWSet(types.NewRWSet(types.StateVersion{ + TxIndex: index, + }).WithSerialFlag(), types.NewExeStat(index).WithSerialFlag()) +} + // copySet returns a deep-copied set. func copySet[k comparable](set map[k][]byte) map[k][]byte { copied := make(map[k][]byte, len(set)) diff --git a/core/state_processor.go b/core/state_processor.go index 92b9013f14..f887b80955 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -29,6 +29,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/params" ) @@ -90,8 +91,10 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg ProcessBeaconBlockRoot(*beaconRoot, vmenv, statedb) } statedb.MarkFullProcessed() + statedb.ResetMVStates(len(block.Transactions())) // Iterate over and process the individual transactions for i, tx := range block.Transactions() { + statedb.BeginTxStat(i) start := time.Now() msg, err := TransactionToMessage(tx, signer, header.BaseFee) if err != nil { @@ -108,6 +111,7 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg if metrics.EnabledExpensive { processTxTimer.UpdateSince(start) } + statedb.StopTxStat(receipt.GasUsed) } // Fail if Shanghai not enabled and len(withdrawals) is non-zero. withdrawals := block.Withdrawals() @@ -117,6 +121,13 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg // Finalize the block, applying any consensus engine specific extras (e.g. block rewards) p.engine.Finalize(p.bc, header, statedb, block.Transactions(), block.Uncles(), withdrawals) + // TODO(galaio): append dag into block body, TxDAGPerformance will print metrics when profile is enabled + // compare input TxDAG when it enable in consensus + dag, exrStats := statedb.MVStates2TxDAG() + types.EvaluateTxDAGPerformance(dag, exrStats) + //fmt.Print(types.EvaluateTxDAGPerformance(dag, exrStats)) + log.Info("Process result", "block", block.NumberU64(), "txDAG", dag) + return receipts, allLogs, *usedGas, nil } diff --git a/core/state_transition.go b/core/state_transition.go index 037be4e4a0..4a95d0c5f1 100644 --- a/core/state_transition.go +++ b/core/state_transition.go @@ -428,6 +428,8 @@ func (st *StateTransition) TransitionDb() (*ExecutionResult, error) { } func (st *StateTransition) innerTransitionDb() (*ExecutionResult, error) { + // start record rw set in here + st.state.BeforeTxTransition() // First check this message satisfies all consensus rules before // applying the message. The rules include these clauses // @@ -511,10 +513,16 @@ func (st *StateTransition) innerTransitionDb() (*ExecutionResult, error) { ReturnData: ret, }, nil } + // stop record rw set in here, skip gas fee distribution + if err := st.state.FinaliseRWSet(); err != nil { + return nil, err + } + // Note for deposit tx there is no ETH refunded for unused gas, but that's taken care of by the fact that gasPrice // is always 0 for deposit tx. So calling refundGas will ensure the gasUsed accounting is correct without actually // changing the sender's balance var gasRefund uint64 + if !rules.IsLondon { // Before EIP-3529: refunds were capped to gasUsed / 2 gasRefund = st.refundGas(params.RefundQuotient) diff --git a/core/types/block.go b/core/types/block.go index 0e0b621974..f4da10994a 100644 --- a/core/types/block.go +++ b/core/types/block.go @@ -171,6 +171,8 @@ type Body struct { Transactions []*Transaction Uncles []*Header Withdrawals []*Withdrawal `rlp:"optional"` + // TODO: add TxDAG in block body + //TxDAG []byte `rlp:"optional"` } // Block represents an Ethereum block. @@ -195,6 +197,8 @@ type Block struct { uncles []*Header transactions Transactions withdrawals Withdrawals + // TODO(galaio): package txDAG in consensus later + txDAG []byte // caches hash atomic.Value @@ -423,6 +427,10 @@ func (b *Block) SanityCheck() error { return b.header.SanityCheck() } +func (b *Block) TxDAG() []byte { + return b.txDAG +} + type writeCounter uint64 func (c *writeCounter) Write(b []byte) (int, error) { @@ -452,6 +460,7 @@ func (b *Block) WithSeal(header *Header) *Block { transactions: b.transactions, uncles: b.uncles, withdrawals: b.withdrawals, + txDAG: b.txDAG, } } @@ -462,6 +471,7 @@ func (b *Block) WithBody(transactions []*Transaction, uncles []*Header) *Block { transactions: make([]*Transaction, len(transactions)), uncles: make([]*Header, len(uncles)), withdrawals: b.withdrawals, + txDAG: b.txDAG, } copy(block.transactions, transactions) for i := range uncles { @@ -476,6 +486,7 @@ func (b *Block) WithWithdrawals(withdrawals []*Withdrawal) *Block { header: b.header, transactions: b.transactions, uncles: b.uncles, + txDAG: b.txDAG, } if withdrawals != nil { block.withdrawals = make([]*Withdrawal, len(withdrawals)) @@ -484,6 +495,18 @@ func (b *Block) WithWithdrawals(withdrawals []*Withdrawal) *Block { return block } +// WithTxDAG returns a block containing the given txDAG. +func (b *Block) WithTxDAG(txDAG []byte) *Block { + block := &Block{ + header: b.header, + transactions: b.transactions, + uncles: b.uncles, + withdrawals: b.withdrawals, + txDAG: txDAG, + } + return block +} + // Hash returns the keccak256 hash of b's header. // The hash is computed on the first call and cached thereafter. func (b *Block) Hash() common.Hash { diff --git a/core/types/dag.go b/core/types/dag.go new file mode 100644 index 0000000000..a4d111458b --- /dev/null +++ b/core/types/dag.go @@ -0,0 +1,392 @@ +package types + +import ( + "bytes" + "errors" + "fmt" + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/rlp" + "golang.org/x/exp/slices" + "strings" + "time" +) + +// TxDAGType Used to extend TxDAG and customize a new DAG structure +const ( + EmptyTxDAGType byte = iota + PlainTxDAGType +) + +type TxDAG interface { + // Type return TxDAG type + Type() byte + + // Inner return inner instance + Inner() interface{} + + // DelayGasDistribution check if delay the distribution of GasFee + DelayGasDistribution() bool + + // TxDep query TxDeps from TxDAG + TxDep(int) TxDep + + // TxCount return tx count + TxCount() int +} + +func EncodeTxDAG(dag TxDAG) ([]byte, error) { + if dag == nil { + return nil, errors.New("input nil TxDAG") + } + var buf bytes.Buffer + buf.WriteByte(dag.Type()) + if err := rlp.Encode(&buf, dag.Inner()); err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +func DecodeTxDAG(enc []byte) (TxDAG, error) { + if len(enc) <= 1 { + return nil, errors.New("too short TxDAG bytes") + } + + switch enc[0] { + case EmptyTxDAGType: + return NewEmptyTxDAG(), nil + case PlainTxDAGType: + dag := new(PlainTxDAG) + if err := rlp.DecodeBytes(enc[1:], dag); err != nil { + return nil, err + } + return dag, nil + default: + return nil, errors.New("unsupported TxDAG bytes") + } +} + +// EmptyTxDAG indicate that execute txs in sequence +// It means no transactions or need timely distribute transaction fees +// it only keep partial serial execution when tx cannot delay the distribution or just execute txs in sequence +type EmptyTxDAG struct { +} + +func NewEmptyTxDAG() TxDAG { + return &EmptyTxDAG{} +} + +func (d *EmptyTxDAG) Type() byte { + return EmptyTxDAGType +} + +func (d *EmptyTxDAG) Inner() interface{} { + return d +} + +func (d *EmptyTxDAG) DelayGasDistribution() bool { + return false +} + +func (d *EmptyTxDAG) TxDep(int) TxDep { + return TxDep{ + Relation: 1, + TxIndexes: nil, + } +} + +func (d *EmptyTxDAG) TxCount() int { + return 0 +} + +func (d *EmptyTxDAG) String() string { + return "None" +} + +// PlainTxDAG indicate how to use the dependency of txs, and delay the distribution of GasFee +type PlainTxDAG struct { + // Tx Dependency List, the list index is equal to TxIndex + TxDeps []TxDep +} + +func (d *PlainTxDAG) Type() byte { + return PlainTxDAGType +} + +func (d *PlainTxDAG) Inner() interface{} { + return d +} + +func (d *PlainTxDAG) DelayGasDistribution() bool { + return true +} + +func (d *PlainTxDAG) TxDep(i int) TxDep { + return d.TxDeps[i] +} + +func (d *PlainTxDAG) TxCount() int { + return len(d.TxDeps) +} + +func NewPlainTxDAG(txLen int) *PlainTxDAG { + return &PlainTxDAG{ + TxDeps: make([]TxDep, txLen), + } +} + +func (d *PlainTxDAG) String() string { + builder := strings.Builder{} + exePaths := travelExecutionPaths(d) + for _, path := range exePaths { + builder.WriteString(fmt.Sprintf("%v\n", path)) + } + return builder.String() +} + +func (d *PlainTxDAG) Size() int { + enc, err := EncodeTxDAG(d) + if err != nil { + return 0 + } + return len(enc) +} + +func travelExecutionPaths(d TxDAG) [][]uint64 { + txCount := d.TxCount() + deps := make([]TxDep, txCount) + for i := 0; i < txCount; i++ { + dep := d.TxDep(i) + if dep.Relation == 0 { + deps[i] = dep + } + + // recover to relation 0 + for j := 0; j < i; j++ { + if !dep.Exist(j) { + deps[i].AppendDep(j) + } + } + } + + exePaths := make([][]uint64, 0) + // travel tx deps with BFS + for i := uint64(0); i < uint64(txCount); i++ { + exePaths = append(exePaths, travelTargetPath(deps, i)) + } + return exePaths +} + +// TxDep store the current tx dependency relation with other txs +type TxDep struct { + // It describes the Relation with below txs + // 0: this tx depends on below txs + // 1: this transaction does not depend on below txs, all other previous txs depend on + Relation uint8 + TxIndexes []uint64 +} + +func (d *TxDep) AppendDep(i int) { + d.TxIndexes = append(d.TxIndexes, uint64(i)) +} + +func (d *TxDep) Exist(i int) bool { + for _, index := range d.TxIndexes { + if index == uint64(i) { + return true + } + } + + return false +} + +var ( + longestTimeTimer = metrics.NewRegisteredTimer("dag/longesttime", nil) + longestGasTimer = metrics.NewRegisteredTimer("dag/longestgas", nil) + serialTimeTimer = metrics.NewRegisteredTimer("dag/serialtime", nil) + totalTxMeter = metrics.NewRegisteredMeter("dag/txcnt", nil) + totalNoDepMeter = metrics.NewRegisteredMeter("dag/nodepcntcnt", nil) + total2DepMeter = metrics.NewRegisteredMeter("dag/2depcntcnt", nil) + total4DepMeter = metrics.NewRegisteredMeter("dag/4depcntcnt", nil) + total8DepMeter = metrics.NewRegisteredMeter("dag/8depcntcnt", nil) + total16DepMeter = metrics.NewRegisteredMeter("dag/16depcntcnt", nil) + total32DepMeter = metrics.NewRegisteredMeter("dag/32depcntcnt", nil) +) + +func EvaluateTxDAGPerformance(dag TxDAG, stats map[int]*ExeStat) string { + if len(stats) != dag.TxCount() || dag.TxCount() == 0 { + return "" + } + sb := strings.Builder{} + //sb.WriteString("TxDAG:\n") + //for i, dep := range dag.TxDeps { + // if stats[i].mustSerialFlag { + // continue + // } + // sb.WriteString(fmt.Sprintf("%v: %v\n", i, dep.TxIndexes)) + //} + //sb.WriteString("Parallel Execution Path:\n") + paths := travelExecutionPaths(dag) + // Attention: this is based on best schedule, it will reduce a lot by executing previous txs in parallel + // It assumes that there is no parallel thread limit + txCount := dag.TxCount() + var ( + maxGasIndex int + maxGas uint64 + maxTimeIndex int + maxTime time.Duration + txTimes = make([]time.Duration, txCount) + txGases = make([]uint64, txCount) + txReads = make([]int, txCount) + noDepdencyCount int + ) + + totalTxMeter.Mark(int64(txCount)) + for i, path := range paths { + if stats[i].mustSerialFlag { + continue + } + if len(path) <= 1 { + noDepdencyCount++ + totalNoDepMeter.Mark(1) + } + if len(path) <= 3 { + total2DepMeter.Mark(1) + } + if len(path) <= 5 { + total4DepMeter.Mark(1) + } + if len(path) <= 9 { + total8DepMeter.Mark(1) + } + if len(path) <= 17 { + total16DepMeter.Mark(1) + } + if len(path) <= 33 { + total32DepMeter.Mark(1) + } + + // find the biggest cost time from dependency txs + for j := 0; j < len(path)-1; j++ { + prev := path[j] + if txTimes[prev] > txTimes[i] { + txTimes[i] = txTimes[prev] + } + if txGases[prev] > txGases[i] { + txGases[i] = txGases[prev] + } + if txReads[prev] > txReads[i] { + txReads[i] = txReads[prev] + } + } + txTimes[i] += stats[i].costTime + txGases[i] += stats[i].usedGas + txReads[i] += stats[i].readCount + + //sb.WriteString(fmt.Sprintf("Tx%v, %.2fms|%vgas|%vreads\npath: %v\n", i, float64(txTimes[i].Microseconds())/1000, txGases[i], txReads[i], path)) + //sb.WriteString(fmt.Sprintf("%v: %v\n", i, path)) + // try to find max gas + if txGases[i] > maxGas { + maxGas = txGases[i] + maxGasIndex = i + } + if txTimes[i] > maxTime { + maxTime = txTimes[i] + maxTimeIndex = i + } + } + + sb.WriteString(fmt.Sprintf("LargestGasPath: %.2fms|%vgas|%vreads\npath: %v\n", float64(txTimes[maxGasIndex].Microseconds())/1000, txGases[maxGasIndex], txReads[maxGasIndex], paths[maxGasIndex])) + sb.WriteString(fmt.Sprintf("LongestTimePath: %.2fms|%vgas|%vreads\npath: %v\n", float64(txTimes[maxTimeIndex].Microseconds())/1000, txGases[maxTimeIndex], txReads[maxTimeIndex], paths[maxTimeIndex])) + longestTimeTimer.Update(txTimes[maxTimeIndex]) + longestGasTimer.Update(txTimes[maxGasIndex]) + // serial path + var ( + sTime time.Duration + sGas uint64 + sRead int + sPath []int + ) + for i, stat := range stats { + if stat.mustSerialFlag { + continue + } + sPath = append(sPath, i) + sTime += stat.costTime + sGas += stat.usedGas + sRead += stat.readCount + } + if sTime == 0 { + return "" + } + sb.WriteString(fmt.Sprintf("SerialPath: %.2fms|%vgas|%vreads\npath: %v\n", float64(sTime.Microseconds())/1000, sGas, sRead, sPath)) + maxParaTime := txTimes[maxTimeIndex] + sb.WriteString(fmt.Sprintf("Estimated saving: %.2fms, %.2f%%, %.2fX, noDepCnt: %v|%.2f%%\n", + float64((sTime-maxParaTime).Microseconds())/1000, float64(sTime-maxParaTime)/float64(sTime)*100, + float64(sTime)/float64(maxParaTime), noDepdencyCount, float64(noDepdencyCount)/float64(txCount)*100)) + serialTimeTimer.Update(sTime) + return sb.String() +} + +func travelTargetPath(deps []TxDep, from uint64) []uint64 { + q := make([]uint64, 0, len(deps)) + path := make([]uint64, 0, len(deps)) + + q = append(q, from) + path = append(path, from) + for len(q) > 0 { + t := make([]uint64, 0, len(deps)) + for _, i := range q { + for _, dep := range deps[i].TxIndexes { + if !slices.Contains(path, dep) { + path = append(path, dep) + t = append(t, dep) + } + } + } + q = t + } + slices.Sort(path) + return path +} + +// ExeStat records tx execution info +type ExeStat struct { + txIndex int + usedGas uint64 + readCount int + startTime time.Time + costTime time.Duration + // TODO: consider system tx, gas fee issues, may need to use different flag + mustSerialFlag bool +} + +func NewExeStat(txIndex int) *ExeStat { + return &ExeStat{ + txIndex: txIndex, + } +} + +func (s *ExeStat) Begin() *ExeStat { + s.startTime = time.Now() + return s +} + +func (s *ExeStat) Done() *ExeStat { + s.costTime = time.Since(s.startTime) + return s +} + +func (s *ExeStat) WithSerialFlag() *ExeStat { + s.mustSerialFlag = true + return s +} + +func (s *ExeStat) WithGas(gas uint64) *ExeStat { + s.usedGas = gas + return s +} + +func (s *ExeStat) WithRead(rc int) *ExeStat { + s.readCount = rc + return s +} diff --git a/core/types/dag_test.go b/core/types/dag_test.go new file mode 100644 index 0000000000..e86fff110c --- /dev/null +++ b/core/types/dag_test.go @@ -0,0 +1,260 @@ +package types + +import ( + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/holiman/uint256" + "github.com/stretchr/testify/require" +) + +var ( + mockAddr = common.HexToAddress("0x482bA86399ab6Dcbe54071f8d22258688B4509b1") + mockHash = common.HexToHash("0xdc13f8d7bdb8ec4de02cd4a50a1aa2ab73ec8814e0cdb550341623be3dd8ab7a") +) + +func TestTxDAG(t *testing.T) { + dag := mockSimpleDAG() + t.Log(dag) + dag = mockSystemTxDAG() + t.Log(dag) +} + +func TestEvaluateTxDAG(t *testing.T) { + dag := mockSystemTxDAG() + stats := make(map[int]*ExeStat, dag.TxCount()) + for i := 0; i < dag.TxCount(); i++ { + stats[i] = NewExeStat(i).WithGas(uint64(i)).WithRead(i) + stats[i].costTime = time.Duration(i) + if dag.TxDep(i).Relation == 1 { + stats[i].WithSerialFlag() + } + } + t.Log(EvaluateTxDAGPerformance(dag, stats)) +} + +func TestSimpleMVStates2TxDAG(t *testing.T) { + ms := NewMVStates(10) + + ms.rwSets[0] = mockRWSet(0, []string{"0x00"}, []string{"0x00"}) + ms.rwSets[1] = mockRWSet(1, []string{"0x01"}, []string{"0x01"}) + ms.rwSets[2] = mockRWSet(2, []string{"0x02"}, []string{"0x02"}) + ms.rwSets[3] = mockRWSet(3, []string{"0x00", "0x03"}, []string{"0x03"}) + ms.rwSets[4] = mockRWSet(4, []string{"0x00", "0x04"}, []string{"0x04"}) + ms.rwSets[5] = mockRWSet(5, []string{"0x01", "0x02", "0x05"}, []string{"0x05"}) + ms.rwSets[6] = mockRWSet(6, []string{"0x02", "0x05", "0x06"}, []string{"0x06"}) + ms.rwSets[7] = mockRWSet(7, []string{"0x06", "0x07"}, []string{"0x07"}) + ms.rwSets[8] = mockRWSet(8, []string{"0x08"}, []string{"0x08"}) + ms.rwSets[9] = mockRWSet(9, []string{"0x08", "0x09"}, []string{"0x09"}) + + dag := ms.ResolveTxDAG() + require.Equal(t, mockSimpleDAG(), dag) + t.Log(dag) +} + +func TestSystemTxMVStates2TxDAG(t *testing.T) { + ms := NewMVStates(12) + + ms.rwSets[0] = mockRWSet(0, []string{"0x00"}, []string{"0x00"}) + ms.rwSets[1] = mockRWSet(1, []string{"0x01"}, []string{"0x01"}) + ms.rwSets[2] = mockRWSet(2, []string{"0x02"}, []string{"0x02"}) + ms.rwSets[3] = mockRWSet(3, []string{"0x00", "0x03"}, []string{"0x03"}) + ms.rwSets[4] = mockRWSet(4, []string{"0x00", "0x04"}, []string{"0x04"}) + ms.rwSets[5] = mockRWSet(5, []string{"0x01", "0x02", "0x05"}, []string{"0x05"}) + ms.rwSets[6] = mockRWSet(6, []string{"0x02", "0x05", "0x06"}, []string{"0x06"}) + ms.rwSets[7] = mockRWSet(7, []string{"0x06", "0x07"}, []string{"0x07"}) + ms.rwSets[8] = mockRWSet(8, []string{"0x08"}, []string{"0x08"}) + ms.rwSets[9] = mockRWSet(9, []string{"0x08", "0x09"}, []string{"0x09"}) + ms.rwSets[10] = mockRWSet(10, []string{"0x10"}, []string{"0x10"}).WithSerialFlag() + ms.rwSets[11] = mockRWSet(11, []string{"0x11"}, []string{"0x11"}).WithSerialFlag() + + dag := ms.ResolveTxDAG() + require.Equal(t, mockSystemTxDAG(), dag) + t.Log(dag) +} + +func TestIsEqualRWVal(t *testing.T) { + tests := []struct { + key RWKey + src interface{} + compared interface{} + isEqual bool + }{ + { + key: AccountStateKey(mockAddr, AccountNonce), + src: uint64(0), + compared: uint64(0), + isEqual: true, + }, + { + key: AccountStateKey(mockAddr, AccountNonce), + src: uint64(0), + compared: uint64(1), + isEqual: false, + }, + { + key: AccountStateKey(mockAddr, AccountBalance), + src: new(uint256.Int).SetUint64(1), + compared: new(uint256.Int).SetUint64(1), + isEqual: true, + }, + { + key: AccountStateKey(mockAddr, AccountBalance), + src: nil, + compared: new(uint256.Int).SetUint64(1), + isEqual: false, + }, + { + key: AccountStateKey(mockAddr, AccountBalance), + src: (*uint256.Int)(nil), + compared: new(uint256.Int).SetUint64(1), + isEqual: false, + }, + { + key: AccountStateKey(mockAddr, AccountBalance), + src: (*uint256.Int)(nil), + compared: (*uint256.Int)(nil), + isEqual: true, + }, + { + key: AccountStateKey(mockAddr, AccountCodeHash), + src: []byte{1}, + compared: []byte{1}, + isEqual: true, + }, + { + key: AccountStateKey(mockAddr, AccountCodeHash), + src: nil, + compared: []byte{1}, + isEqual: false, + }, + { + key: AccountStateKey(mockAddr, AccountCodeHash), + src: ([]byte)(nil), + compared: []byte{1}, + isEqual: false, + }, + { + key: AccountStateKey(mockAddr, AccountCodeHash), + src: ([]byte)(nil), + compared: ([]byte)(nil), + isEqual: true, + }, + { + key: AccountStateKey(mockAddr, AccountSuicide), + src: struct{}{}, + compared: struct{}{}, + isEqual: false, + }, + { + key: AccountStateKey(mockAddr, AccountSuicide), + src: nil, + compared: struct{}{}, + isEqual: false, + }, + { + key: StorageStateKey(mockAddr, mockHash), + src: mockHash, + compared: mockHash, + isEqual: true, + }, + { + key: StorageStateKey(mockAddr, mockHash), + src: nil, + compared: mockHash, + isEqual: false, + }, + } + + for i, item := range tests { + require.Equal(t, item.isEqual, isEqualRWVal(item.key, item.src, item.compared), i) + } +} + +func mockSimpleDAG() TxDAG { + dag := NewPlainTxDAG(10) + dag.TxDeps[0].TxIndexes = []uint64{} + dag.TxDeps[1].TxIndexes = []uint64{} + dag.TxDeps[2].TxIndexes = []uint64{} + dag.TxDeps[3].TxIndexes = []uint64{0} + dag.TxDeps[4].TxIndexes = []uint64{0} + dag.TxDeps[5].TxIndexes = []uint64{1, 2} + dag.TxDeps[6].TxIndexes = []uint64{2, 5} + dag.TxDeps[7].TxIndexes = []uint64{6} + dag.TxDeps[8].TxIndexes = []uint64{} + dag.TxDeps[9].TxIndexes = []uint64{8} + return dag +} + +func mockSystemTxDAG() TxDAG { + dag := NewPlainTxDAG(12) + dag.TxDeps[0].TxIndexes = []uint64{} + dag.TxDeps[1].TxIndexes = []uint64{} + dag.TxDeps[2].TxIndexes = []uint64{} + dag.TxDeps[3].TxIndexes = []uint64{0} + dag.TxDeps[4].TxIndexes = []uint64{0} + dag.TxDeps[5].TxIndexes = []uint64{1, 2} + dag.TxDeps[6].TxIndexes = []uint64{2, 5} + dag.TxDeps[7].TxIndexes = []uint64{6} + dag.TxDeps[8].TxIndexes = []uint64{} + dag.TxDeps[9].TxIndexes = []uint64{8} + dag.TxDeps[10] = TxDep{ + Relation: 1, + TxIndexes: []uint64{}, + } + dag.TxDeps[11] = TxDep{ + Relation: 1, + TxIndexes: []uint64{}, + } + return dag +} + +func mockRWSet(index int, read []string, write []string) *RWSet { + ver := StateVersion{ + TxIndex: index, + } + set := NewRWSet(ver) + for _, k := range read { + key := RWKey{} + if len(k) > len(key) { + k = k[:len(key)] + } + copy(key[:], k) + set.readSet[key] = &ReadRecord{ + StateVersion: ver, + Val: struct{}{}, + } + } + for _, k := range write { + key := RWKey{} + if len(k) > len(key) { + k = k[:len(key)] + } + copy(key[:], k) + set.writeSet[key] = &WriteRecord{ + Val: struct{}{}, + } + } + + return set +} + +func TestTxDAG_Encode_Decode(t *testing.T) { + expected := TxDAG(&EmptyTxDAG{}) + enc, err := EncodeTxDAG(expected) + require.NoError(t, err) + actual, err := DecodeTxDAG(enc) + require.NoError(t, err) + require.Equal(t, expected, actual) + + expected = mockSimpleDAG() + enc, err = EncodeTxDAG(expected) + require.NoError(t, err) + actual, err = DecodeTxDAG(enc) + require.NoError(t, err) + require.Equal(t, expected, actual) + enc[0] = 2 + _, err = DecodeTxDAG(enc) + require.Error(t, err) +} diff --git a/core/types/mvstates.go b/core/types/mvstates.go new file mode 100644 index 0000000000..64c1a4fc7d --- /dev/null +++ b/core/types/mvstates.go @@ -0,0 +1,487 @@ +package types + +import ( + "encoding/hex" + "errors" + "fmt" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" + "github.com/holiman/uint256" + "slices" + "strings" + "sync" +) + +const ( + AccountStatePrefix = 'a' + StorageStatePrefix = 's' +) + +type RWKey [1 + common.AddressLength + common.HashLength]byte + +type AccountState byte + +const ( + AccountSelf AccountState = iota + AccountNonce + AccountBalance + AccountCodeHash + AccountSuicide +) + +func AccountStateKey(account common.Address, state AccountState) RWKey { + var key RWKey + key[0] = AccountStatePrefix + copy(key[1:], account.Bytes()) + key[1+common.AddressLength] = byte(state) + return key +} + +func StorageStateKey(account common.Address, state common.Hash) RWKey { + var key RWKey + key[0] = StorageStatePrefix + copy(key[1:], account.Bytes()) + copy(key[1+common.AddressLength:], state.Bytes()) + return key +} + +func (key *RWKey) IsAccountState() (bool, AccountState) { + return AccountStatePrefix == key[0], AccountState(key[1+common.AddressLength]) +} + +func (key *RWKey) IsAccountSelf() bool { + ok, s := key.IsAccountState() + if !ok { + return false + } + return s == AccountSelf +} + +func (key *RWKey) IsAccountSuicide() bool { + ok, s := key.IsAccountState() + if !ok { + return false + } + return s == AccountSuicide +} + +func (key *RWKey) ToAccountSelf() RWKey { + return AccountStateKey(key.Addr(), AccountSelf) +} + +func (key *RWKey) IsStorageState() bool { + return StorageStatePrefix == key[0] +} + +func (key *RWKey) String() string { + return hex.EncodeToString(key[:]) +} + +func (key *RWKey) Addr() common.Address { + return common.BytesToAddress(key[1 : 1+common.AddressLength]) +} + +// StateVersion record specific TxIndex & TxIncarnation +// if TxIndex equals to -1, it means the state read from DB. +type StateVersion struct { + TxIndex int + // TODO(galaio): used for multi ver state + TxIncarnation int +} + +// ReadRecord keep read value & its version +type ReadRecord struct { + StateVersion + Val interface{} +} + +// WriteRecord keep latest state value & change count +type WriteRecord struct { + Val interface{} +} + +// RWSet record all read & write set in txs +// Attention: this is not a concurrent safety structure +type RWSet struct { + ver StateVersion + readSet map[RWKey]*ReadRecord + writeSet map[RWKey]*WriteRecord + + // some flags + mustSerial bool +} + +func NewRWSet(ver StateVersion) *RWSet { + return &RWSet{ + ver: ver, + readSet: make(map[RWKey]*ReadRecord), + writeSet: make(map[RWKey]*WriteRecord), + } +} + +func (s *RWSet) RecordRead(key RWKey, ver StateVersion, val interface{}) { + // only record the first read version + if _, exist := s.readSet[key]; exist { + return + } + s.readSet[key] = &ReadRecord{ + StateVersion: ver, + Val: val, + } +} + +func (s *RWSet) RecordWrite(key RWKey, val interface{}) { + wr, exist := s.writeSet[key] + if !exist { + s.writeSet[key] = &WriteRecord{ + Val: val, + } + return + } + wr.Val = val +} + +func (s *RWSet) Version() StateVersion { + return s.ver +} + +func (s *RWSet) ReadSet() map[RWKey]*ReadRecord { + return s.readSet +} + +func (s *RWSet) WriteSet() map[RWKey]*WriteRecord { + return s.writeSet +} + +func (s *RWSet) WithSerialFlag() *RWSet { + s.mustSerial = true + return s +} + +func (s *RWSet) String() string { + builder := strings.Builder{} + builder.WriteString(fmt.Sprintf("tx: %v, inc: %v\nreadSet: [", s.ver.TxIndex, s.ver.TxIncarnation)) + i := 0 + for key, _ := range s.readSet { + if i > 0 { + builder.WriteString(fmt.Sprintf(", %v", key.String())) + continue + } + builder.WriteString(fmt.Sprintf("%v", key.String())) + i++ + } + builder.WriteString("]\nwriteSet: [") + i = 0 + for key, _ := range s.writeSet { + if i > 0 { + builder.WriteString(fmt.Sprintf(", %v", key.String())) + continue + } + builder.WriteString(fmt.Sprintf("%v", key.String())) + i++ + } + builder.WriteString("]\n") + return builder.String() +} + +// isEqualRWVal compare state +func isEqualRWVal(key RWKey, src interface{}, compared interface{}) bool { + if ok, state := key.IsAccountState(); ok { + switch state { + case AccountBalance: + if src != nil && compared != nil { + return equalUint256(src.(*uint256.Int), compared.(*uint256.Int)) + } + return src == compared + case AccountNonce: + return src.(uint64) == compared.(uint64) + case AccountCodeHash: + if src != nil && compared != nil { + return slices.Equal(src.([]byte), compared.([]byte)) + } + return src == compared + } + return false + } + + if src != nil && compared != nil { + return src.(common.Hash) == compared.(common.Hash) + } + return src == compared +} + +func equalUint256(s, c *uint256.Int) bool { + if s != nil && c != nil { + return s.Eq(c) + } + + return s == c +} + +type PendingWrite struct { + Ver StateVersion + Val interface{} +} + +func NewPendingWrite(ver StateVersion, wr *WriteRecord) *PendingWrite { + return &PendingWrite{ + Ver: ver, + Val: wr.Val, + } +} + +func (w *PendingWrite) TxIndex() int { + return w.Ver.TxIndex +} + +func (w *PendingWrite) TxIncarnation() int { + return w.Ver.TxIncarnation +} + +type PendingWrites struct { + list []*PendingWrite +} + +func NewPendingWrites() *PendingWrites { + return &PendingWrites{ + list: make([]*PendingWrite, 0), + } +} + +func (w *PendingWrites) Append(pw *PendingWrite) { + if i, found := w.SearchTxIndex(pw.TxIndex()); found { + w.list[i] = pw + return + } + + w.list = append(w.list, pw) + for i := len(w.list) - 1; i > 0; i-- { + if w.list[i].TxIndex() > w.list[i-1].TxIndex() { + break + } + w.list[i-1], w.list[i] = w.list[i], w.list[i-1] + } +} + +func (w *PendingWrites) SearchTxIndex(txIndex int) (int, bool) { + n := len(w.list) + i, j := 0, n + for i < j { + h := int(uint(i+j) >> 1) + // i ≤ h < j + if w.list[h].TxIndex() < txIndex { + i = h + 1 + } else { + j = h + } + } + return i, i < n && w.list[i].TxIndex() == txIndex +} + +func (w *PendingWrites) FindLastWrite(txIndex int) *PendingWrite { + var i, _ = w.SearchTxIndex(txIndex) + for j := i - 1; j >= 0; j-- { + if w.list[j].TxIndex() < txIndex { + return w.list[j] + } + } + + return nil +} + +type MVStates struct { + rwSets map[int]*RWSet + pendingWriteSet map[RWKey]*PendingWrites + + // dependency map cache for generating TxDAG + // depsCache[i].exist(j) means j->i, and i > j + depsCache map[int]TxDepMap + + // execution stat infos + stats map[int]*ExeStat + lock sync.RWMutex +} + +func NewMVStates(txCount int) *MVStates { + return &MVStates{ + rwSets: make(map[int]*RWSet, txCount), + pendingWriteSet: make(map[RWKey]*PendingWrites, txCount*8), + depsCache: make(map[int]TxDepMap, txCount), + stats: make(map[int]*ExeStat, txCount), + } +} + +func (s *MVStates) RWSets() map[int]*RWSet { + s.lock.RLock() + defer s.lock.RUnlock() + return s.rwSets +} + +func (s *MVStates) Stats() map[int]*ExeStat { + s.lock.RLock() + defer s.lock.RUnlock() + return s.stats +} + +func (s *MVStates) RWSet(index int) *RWSet { + s.lock.RLock() + defer s.lock.RUnlock() + if index >= len(s.rwSets) { + return nil + } + return s.rwSets[index] +} + +// ReadState TODO(galaio): read state from MVStates +func (s *MVStates) ReadState(key RWKey) (interface{}, bool) { + return nil, false +} + +// FulfillRWSet it can execute as async, and rwSet & stat must guarantee read-only +// TODO(galaio): try to generate TxDAG, when fulfill RWSet +// TODO(galaio): support flag to stat execution as optional +func (s *MVStates) FulfillRWSet(rwSet *RWSet, stat *ExeStat) error { + log.Debug("FulfillRWSet", "s.len", len(s.rwSets), "cur", rwSet.ver.TxIndex, "reads", len(rwSet.readSet), "writes", len(rwSet.writeSet)) + s.lock.Lock() + defer s.lock.Unlock() + index := rwSet.ver.TxIndex + if s := s.rwSets[index]; s != nil { + return errors.New("refill a exist RWSet") + } + if stat != nil { + if stat.txIndex != index { + return errors.New("wrong execution stat") + } + s.stats[index] = stat + } + + // analysis dep, if the previous transaction is not executed/validated, re-analysis is required + if _, ok := s.depsCache[index]; !ok { + s.depsCache[index] = NewTxDeps(0) + } + for prev := 0; prev < index; prev++ { + // if there are some parallel execution or system txs, it will fulfill in advance + // it's ok, and try re-generate later + if _, ok := s.rwSets[prev]; !ok { + continue + } + if checkDependency(s.rwSets[prev].writeSet, rwSet.readSet) { + s.depsCache[index].add(prev) + // clear redundancy deps compared with prev + for dep := range s.depsCache[index] { + if s.depsCache[prev].exist(dep) { + s.depsCache[index].remove(dep) + } + } + } + } + + // append to pending write set + for k, v := range rwSet.writeSet { + // TODO(galaio): this action is only for testing, it can be removed in production mode. + // ignore no changed write record + checkRWSetInconsistent(index, k, rwSet.readSet, rwSet.writeSet) + if _, exist := s.pendingWriteSet[k]; !exist { + s.pendingWriteSet[k] = NewPendingWrites() + } + s.pendingWriteSet[k].Append(NewPendingWrite(rwSet.ver, v)) + } + s.rwSets[index] = rwSet + return nil +} + +func checkRWSetInconsistent(index int, k RWKey, readSet map[RWKey]*ReadRecord, writeSet map[RWKey]*WriteRecord) bool { + var ( + readOk bool + writeOk bool + r *WriteRecord + ) + + if k.IsAccountSuicide() { + _, readOk = readSet[k.ToAccountSelf()] + } else { + _, readOk = readSet[k] + } + + r, writeOk = writeSet[k] + if readOk != writeOk { + // check if it's correct? read nil, write non-nil + log.Info("checkRWSetInconsistent find inconsistent", "tx", index, "k", k.String(), "read", readOk, "write", writeOk, "val", r.Val) + return true + } + + return false +} + +// ResolveTxDAG generate TxDAG from RWSets +func (s *MVStates) ResolveTxDAG() TxDAG { + rwSets := s.RWSets() + txDAG := NewPlainTxDAG(len(rwSets)) + for i := len(rwSets) - 1; i >= 0; i-- { + txDAG.TxDeps[i].TxIndexes = []uint64{} + if rwSets[i].mustSerial { + txDAG.TxDeps[i].Relation = 1 + continue + } + if s.depsCache[i] != nil { + txDAG.TxDeps[i].TxIndexes = s.depsCache[i].toArray() + continue + } + readSet := rwSets[i].ReadSet() + // TODO: check if there are RW with system address + // check if there has written op before i + for j := 0; j < i; j++ { + if checkDependency(rwSets[j].writeSet, readSet) { + txDAG.TxDeps[i].AppendDep(j) + } + } + } + + return txDAG +} + +func checkDependency(writeSet map[RWKey]*WriteRecord, readSet map[RWKey]*ReadRecord) bool { + // check tx dependency, only check key, skip version + for k, _ := range writeSet { + // check suicide, add read address flag, it only for check suicide quickly, and cannot for other scenarios. + if k.IsAccountSuicide() { + if _, ok := readSet[k.ToAccountSelf()]; ok { + return true + } + continue + } + if _, ok := readSet[k]; ok { + return true + } + } + + return false +} + +type TxDepMap map[int]struct{} + +func NewTxDeps(cap int) TxDepMap { + return make(map[int]struct{}, cap) +} + +func (m TxDepMap) add(index int) { + m[index] = struct{}{} +} + +func (m TxDepMap) exist(index int) bool { + _, ok := m[index] + return ok +} + +func (m TxDepMap) toArray() []uint64 { + ret := make([]uint64, 0, len(m)) + for index := range m { + ret = append(ret, uint64(index)) + } + slices.Sort(ret) + return ret +} + +func (m TxDepMap) remove(index int) { + delete(m, index) +} diff --git a/core/vm/interface.go b/core/vm/interface.go index a441da30de..4633f2dbb5 100644 --- a/core/vm/interface.go +++ b/core/vm/interface.go @@ -86,6 +86,10 @@ type StateDB interface { PrintParallelStateObjects() GetNonceFromBaseDB(addr common.Address) uint64 TxIndex() int + + // parallel DAG related + BeforeTxTransition() + FinaliseRWSet() error } // CallContext provides a basic interface for the EVM calling conventions. The EVM diff --git a/eth/handler_eth.go b/eth/handler_eth.go index 6dbe03e214..4d16ead37a 100644 --- a/eth/handler_eth.go +++ b/eth/handler_eth.go @@ -81,6 +81,9 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error { return h.handleBlockAnnounces(peer, hashes, numbers) case *eth.NewBlockPacket: + if len(packet.TxDAG) != 0 { + packet.Block = packet.Block.WithTxDAG(packet.TxDAG) + } return h.handleBlockBroadcast(peer, packet.Block, packet.TD) case *eth.NewPooledTransactionHashesPacket67: @@ -140,6 +143,7 @@ func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, block *types.Block, td if h.merger.PoSFinalized() { return errors.New("disallowed block broadcast") } + // Schedule the block for import h.blockFetcher.Enqueue(peer.ID(), block) diff --git a/eth/protocols/eth/peer.go b/eth/protocols/eth/peer.go index 98ad22a8cf..6278924b60 100644 --- a/eth/protocols/eth/peer.go +++ b/eth/protocols/eth/peer.go @@ -294,6 +294,7 @@ func (p *Peer) SendNewBlock(block *types.Block, td *big.Int) error { return p2p.Send(p.rw, NewBlockMsg, &NewBlockPacket{ Block: block, TD: td, + TxDAG: block.TxDAG(), }) } diff --git a/eth/protocols/eth/protocol.go b/eth/protocols/eth/protocol.go index 0f44f83de1..6c7ea3b404 100644 --- a/eth/protocols/eth/protocol.go +++ b/eth/protocols/eth/protocol.go @@ -188,6 +188,7 @@ type BlockHeadersRLPPacket struct { type NewBlockPacket struct { Block *types.Block TD *big.Int + TxDAG []byte `rlp:"optional"` } // sanityCheck verifies that the values are reasonable, as a DoS protection @@ -238,6 +239,8 @@ type BlockBody struct { Transactions []*types.Transaction // Transactions contained within a block Uncles []*types.Header // Uncles contained within a block Withdrawals []*types.Withdrawal `rlp:"optional"` // Withdrawals contained within a block + // TODO(galio): add block body later + //TxDAGs [][]byte `rlp:"optional"` // TxDAGs contained within a block } // Unpack retrieves the transactions and uncles from the range packet and returns diff --git a/miner/worker.go b/miner/worker.go index 292a2e9ad0..2c22aefda6 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -1138,6 +1138,7 @@ func (w *worker) fillTransactions(interrupt *atomic.Int32, env *environment) err } start := time.Now() + env.state.ResetMVStates(0) pending := w.eth.TxPool().Pending(true) packFromTxpoolTimer.UpdateSince(start) log.Debug("packFromTxpoolTimer", "duration", common.PrettyDuration(time.Since(start)), "hash", env.header.Hash()) @@ -1371,6 +1372,20 @@ func (w *worker) commit(env *environment, interval func(), update bool, start ti if err != nil { return err } + + // Because the TxDAG appends after sidecar, so we only enable after cancun + if w.chainConfig.IsCancun(env.header.Number, env.header.Time) { + for i := len(env.txs); i < len(block.Transactions()); i++ { + env.state.RecordSystemTxRWSet(i) + } + txDAG, _ := env.state.MVStates2TxDAG() + rawTxDAG, err := types.EncodeTxDAG(txDAG) + if err != nil { + return err + } + block = block.WithTxDAG(rawTxDAG) + } + // If we're post merge, just ignore if !w.isTTDReached(block.Header()) { select {