Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stateless witness prefetcher changes #29519

Merged
8 changes: 6 additions & 2 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1806,8 +1806,12 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
}
statedb.SetLogger(bc.logger)

// Enable prefetching to pull in trie node paths while processing transactions
statedb.StartPrefetcher("chain")
// If we are past Byzantium, enable prefetching to pull in trie node paths
// while processing transactions. Before Byzantium the prefetcher is mostly
// useless due to the intermediate root hashing after each transaction.
if bc.chainConfig.IsByzantium(block.Number()) {
statedb.StartPrefetcher("chain")
}
activeState = statedb

// If we have a followup block, run that against the current state to pre-cache
Expand Down
2 changes: 1 addition & 1 deletion core/state/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (s *StateDB) DumpToCollector(c DumpCollector, conf *DumpConfig) (nextKey []
}
if !conf.SkipStorage {
account.Storage = make(map[common.Hash]string)
tr, err := obj.getTrie()
tr, err := obj.getTrie(true)
if err != nil {
log.Error("Failed to load storage trie", "err", err)
continue
Expand Down
56 changes: 42 additions & 14 deletions core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"io"
"maps"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -33,6 +34,14 @@ import (
"github.com/holiman/uint256"
)

// hasherPool holds a pool of hashers used by state objects during concurrent
// trie updates.
var hasherPool = sync.Pool{
New: func() interface{} {
return crypto.NewKeccakState()
},
}

type Storage map[common.Hash]common.Hash

func (s Storage) Copy() Storage {
Expand Down Expand Up @@ -121,12 +130,22 @@ func (s *stateObject) touch() {
// getTrie returns the associated storage trie. The trie will be opened
// if it's not loaded previously. An error will be returned if trie can't
// be loaded.
func (s *stateObject) getTrie() (Trie, error) {
//
// The skipPrefetcher parameter is used to request a direct load from disk, even
// if a prefetcher is available. This path is used if snapshots are unavailable,
// since that requires reading the trie *during* execution, when the prefetchers
// cannot yet return data.
func (s *stateObject) getTrie(skipPrefetcher bool) (Trie, error) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, skipPrefetcher is kind of an ugly hack, I just wanted to avoid the lack-of-snapshot poking into the prefetcher. Open to cleaner suggestions.

if s.trie == nil {
// Try fetching from prefetcher first
if s.data.Root != types.EmptyRootHash && s.db.prefetcher != nil {
// When the miner is creating the pending state, there is no prefetcher
s.trie = s.db.prefetcher.trie(s.addrHash, s.data.Root)
// Try fetching from prefetcher first, unless skipping it was explicitly
// requested
if s.data.Root != types.EmptyRootHash && s.db.prefetcher != nil && !skipPrefetcher {
trie, err := s.db.prefetcher.trie(s.addrHash, s.data.Root)
if err != nil {
log.Error("Failed to retrieve storage pre-fetcher trie", "addr", s.address, "err", err)
} else {
s.trie = trie
}
}
if s.trie == nil {
tr, err := s.db.db.OpenStorageTrie(s.db.originalRoot, s.address, s.data.Root, s.db.trie)
Expand Down Expand Up @@ -197,7 +216,7 @@ func (s *stateObject) GetCommittedState(key common.Hash) common.Hash {
// If the snapshot is unavailable or reading from it fails, load from the database.
if s.db.snap == nil || err != nil {
start := time.Now()
tr, err := s.getTrie()
tr, err := s.getTrie(true)
rjl493456442 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
s.db.setError(err)
return common.Hash{}
Expand Down Expand Up @@ -253,7 +272,7 @@ func (s *stateObject) setState(key common.Hash, value *common.Hash) {

// finalise moves all dirty storage slots into the pending area to be hashed or
// committed later. It is invoked at the end of every transaction.
func (s *stateObject) finalise(prefetch bool) {
func (s *stateObject) finalise() {
slotsToPrefetch := make([][]byte, 0, len(s.dirtyStorage))
for key, value := range s.dirtyStorage {
// If the slot is different from its original value, move it into the
Expand All @@ -268,8 +287,10 @@ func (s *stateObject) finalise(prefetch bool) {
delete(s.pendingStorage, key)
}
}
if s.db.prefetcher != nil && prefetch && len(slotsToPrefetch) > 0 && s.data.Root != types.EmptyRootHash {
s.db.prefetcher.prefetch(s.addrHash, s.data.Root, s.address, slotsToPrefetch)
if s.db.prefetcher != nil && len(slotsToPrefetch) > 0 && s.data.Root != types.EmptyRootHash {
if err := s.db.prefetcher.prefetch(s.addrHash, s.data.Root, s.address, slotsToPrefetch); err != nil {
rjl493456442 marked this conversation as resolved.
Show resolved Hide resolved
log.Error("Failed to prefetch slots", "addr", s.address, "slots", len(slotsToPrefetch), "err", err)
}
}
if len(s.dirtyStorage) > 0 {
s.dirtyStorage = make(Storage)
Expand All @@ -288,7 +309,7 @@ func (s *stateObject) finalise(prefetch bool) {
// storage change at all.
func (s *stateObject) updateTrie() (Trie, error) {
// Make sure all dirty slots are finalized into the pending storage area
s.finalise(false)
s.finalise()

// Short circuit if nothing changed, don't bother with hashing anything
if len(s.pendingStorage) == 0 {
Expand All @@ -299,14 +320,17 @@ func (s *stateObject) updateTrie() (Trie, error) {
storage map[common.Hash][]byte
origin map[common.Hash][]byte
)
tr, err := s.getTrie()
tr, err := s.getTrie(false)
if err != nil {
s.db.setError(err)
return nil, err
}
// Insert all the pending storage updates into the trie
usedStorage := make([][]byte, 0, len(s.pendingStorage))

hasher := hasherPool.Get().(crypto.KeccakState)
defer hasherPool.Put(hasher)

// Perform trie updates before deletions. This prevents resolution of unnecessary trie nodes
// in circumstances similar to the following:
//
Expand Down Expand Up @@ -335,26 +359,30 @@ func (s *stateObject) updateTrie() (Trie, error) {
s.db.setError(err)
return nil, err
}
s.db.StorageUpdated += 1
s.db.StorageUpdated.Add(1)
} else {
deletions = append(deletions, key)
}
// Cache the mutated storage slots until commit
if storage == nil {
s.db.storagesLock.Lock()
if storage = s.db.storages[s.addrHash]; storage == nil {
storage = make(map[common.Hash][]byte)
s.db.storages[s.addrHash] = storage
}
s.db.storagesLock.Unlock()
}
khash := crypto.HashData(s.db.hasher, key[:])
khash := crypto.HashData(hasher, key[:])
storage[khash] = encoded // encoded will be nil if it's deleted

// Cache the original value of mutated storage slots
if origin == nil {
s.db.storagesLock.Lock()
if origin = s.db.storagesOrigin[s.address]; origin == nil {
origin = make(map[common.Hash][]byte)
s.db.storagesOrigin[s.address] = origin
}
s.db.storagesLock.Unlock()
}
// Track the original value of slot only if it's mutated first time
if _, ok := origin[khash]; !ok {
Expand All @@ -374,7 +402,7 @@ func (s *stateObject) updateTrie() (Trie, error) {
s.db.setError(err)
return nil, err
}
s.db.StorageDeleted += 1
s.db.StorageDeleted.Add(1)
}
// If no slots were touched, issue a warning as we shouldn't have done all
// the above work in the first place
Expand Down
106 changes: 56 additions & 50 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"slices"
"sort"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -92,10 +93,12 @@ type StateDB struct {

// These maps hold the state changes (including the corresponding
// original value) that occurred in this **block**.
accounts map[common.Hash][]byte // The mutated accounts in 'slim RLP' encoding
accounts map[common.Hash][]byte // The mutated accounts in 'slim RLP' encoding
accountsOrigin map[common.Address][]byte // The original value of mutated accounts in 'slim RLP' encoding

storages map[common.Hash]map[common.Hash][]byte // The mutated slots in prefix-zero trimmed rlp format
accountsOrigin map[common.Address][]byte // The original value of mutated accounts in 'slim RLP' encoding
storagesOrigin map[common.Address]map[common.Hash][]byte // The original value of mutated slots in prefix-zero trimmed rlp format
storagesLock sync.Mutex // Mutex protecting the maps during concurrent updates/commits

// This map holds 'live' objects, which will get modified while
// processing a state transition.
Expand Down Expand Up @@ -161,9 +164,9 @@ type StateDB struct {
TrieDBCommits time.Duration

AccountUpdated int
StorageUpdated int
StorageUpdated atomic.Int64
AccountDeleted int
StorageDeleted int
StorageDeleted atomic.Int64

// Testing hooks
onCommit func(states *triestate.Set) // Hook invoked when commit is performed
Expand Down Expand Up @@ -210,7 +213,8 @@ func (s *StateDB) SetLogger(l *tracing.Hooks) {
// commit phase, most of the needed data is already hot.
func (s *StateDB) StartPrefetcher(namespace string) {
if s.prefetcher != nil {
s.prefetcher.close()
s.prefetcher.terminate(false)
s.prefetcher.report()
s.prefetcher = nil
}
if s.snap != nil {
Expand All @@ -222,7 +226,8 @@ func (s *StateDB) StartPrefetcher(namespace string) {
// from the gathered metrics.
func (s *StateDB) StopPrefetcher() {
if s.prefetcher != nil {
s.prefetcher.close()
s.prefetcher.terminate(false)
s.prefetcher.report()
s.prefetcher = nil
}
}
Expand Down Expand Up @@ -540,9 +545,6 @@ func (s *StateDB) GetTransientState(addr common.Address, key common.Hash) common

// updateStateObject writes the given object to the trie.
func (s *StateDB) updateStateObject(obj *stateObject) {
// Track the amount of time wasted on updating the account from the trie
defer func(start time.Time) { s.AccountUpdates += time.Since(start) }(time.Now())

// Encode the account and update the account trie
addr := obj.Address()
if err := s.trie.UpdateAccount(addr, &obj.data); err != nil {
Expand Down Expand Up @@ -571,10 +573,6 @@ func (s *StateDB) updateStateObject(obj *stateObject) {

// deleteStateObject removes the given object from the state trie.
func (s *StateDB) deleteStateObject(addr common.Address) {
// Track the amount of time wasted on deleting the account from the trie
defer func(start time.Time) { s.AccountUpdates += time.Since(start) }(time.Now())

// Delete the account from the trie
if err := s.trie.DeleteAccount(addr); err != nil {
s.setError(fmt.Errorf("deleteStateObject (%x) error: %v", addr[:], err))
}
Expand Down Expand Up @@ -739,13 +737,6 @@ func (s *StateDB) Copy() *StateDB {
// in the middle of a transaction.
state.accessList = s.accessList.Copy()
state.transientStorage = s.transientStorage.Copy()

// If there's a prefetcher running, make an inactive copy of it that can
// only access data but does not actively preload (since the user will not
// know that they need to explicitly terminate an active copy).
if s.prefetcher != nil {
state.prefetcher = s.prefetcher.copy()
}
return state
}

Expand Down Expand Up @@ -816,7 +807,7 @@ func (s *StateDB) Finalise(deleteEmptyObjects bool) {
delete(s.accountsOrigin, obj.address) // Clear out any previously updated account data (may be recreated via a resurrect)
delete(s.storagesOrigin, obj.address) // Clear out any previously updated storage data (may be recreated via a resurrect)
} else {
obj.finalise(true) // Prefetch slots in the background
obj.finalise()
s.markUpdate(addr)
}
// At this point, also ship the address off to the precacher. The precacher
Expand All @@ -825,7 +816,9 @@ func (s *StateDB) Finalise(deleteEmptyObjects bool) {
addressesToPrefetch = append(addressesToPrefetch, common.CopyBytes(addr[:])) // Copy needed for closure
}
if s.prefetcher != nil && len(addressesToPrefetch) > 0 {
s.prefetcher.prefetch(common.Hash{}, s.originalRoot, common.Address{}, addressesToPrefetch)
if err := s.prefetcher.prefetch(common.Hash{}, s.originalRoot, common.Address{}, addressesToPrefetch); err != nil {
log.Error("Failed to prefetch addresses", "addresses", len(addressesToPrefetch), "err", err)
}
}
// Invalidate journal because reverting across transactions is not allowed.
s.clearJournalAndRefund()
Expand All @@ -838,42 +831,52 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
// Finalise all the dirty storage states and write them into the tries
s.Finalise(deleteEmptyObjects)

// If there was a trie prefetcher operating, it gets aborted and irrevocably
// modified after we start retrieving tries. Remove it from the statedb after
// this round of use.
//
// This is weird pre-byzantium since the first tx runs with a prefetcher and
// the remainder without, but pre-byzantium even the initial prefetcher is
// useless, so no sleep lost.
prefetcher := s.prefetcher
// If there was a trie prefetcher operating, terminate it async so that the
// individual storage tries can be updated as soon as the disk load finishes.
if s.prefetcher != nil {
s.prefetcher.terminate(true)
defer func() {
s.prefetcher.close()
s.prefetcher = nil
s.prefetcher.report()
s.prefetcher = nil // Pre-byzantium, unset any used up prefetcher
}()
}
// Although naively it makes sense to retrieve the account trie and then do
// the contract storage and account updates sequentially, that short circuits
// the account prefetcher. Instead, let's process all the storage updates
// first, giving the account prefetches just a few more milliseconds of time
// to pull useful data from disk.
start := time.Now()
// Process all storage updates concurrently. The state object update root
// method will internally call a blocking trie fetch from the prefetcher,
// so there's no need to explicitly wait for the prefetchers to finish.
var (
start = time.Now()
workers errgroup.Group
)
if s.db.TrieDB().IsVerkle() {
// Whilst MPT storage tries are independent, Verkle has one single trie
// for all the accounts and all the storage slots merged together. The
// former can thus be simply parallelized, but updating the latter will
// need concurrency support within the trie itself. That's a TODO for a
// later time.
workers.SetLimit(1)
}
for addr, op := range s.mutations {
if op.applied {
continue
}
if op.isDelete() {
if op.applied || op.isDelete() {
continue
}
s.stateObjects[addr].updateRoot()
obj := s.stateObjects[addr] // closure for the task runner below
workers.Go(func() error {
obj.updateRoot()
return nil
})
}
workers.Wait()
s.StorageUpdates += time.Since(start)

// Now we're about to start to write changes to the trie. The trie is so far
// _untouched_. We can check with the prefetcher, if it can give us a trie
// which has the same root, but also has some content loaded into it.
if prefetcher != nil {
if trie := prefetcher.trie(common.Hash{}, s.originalRoot); trie != nil {
start = time.Now()

if s.prefetcher != nil {
if trie, err := s.prefetcher.trie(common.Hash{}, s.originalRoot); err != nil {
log.Error("Failed to retrieve account pre-fetcher trie", "err", err)
} else if trie != nil {
s.trie = trie
}
}
Expand Down Expand Up @@ -909,8 +912,10 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
s.deleteStateObject(deletedAddr)
s.AccountDeleted += 1
}
if prefetcher != nil {
prefetcher.used(common.Hash{}, s.originalRoot, usedAddrs)
s.AccountUpdates += time.Since(start)

if s.prefetcher != nil {
s.prefetcher.used(common.Hash{}, s.originalRoot, usedAddrs)
}
// Track the amount of time wasted on hashing the account trie
defer func(start time.Time) { s.AccountHashes += time.Since(start) }(time.Now())
Expand Down Expand Up @@ -1251,15 +1256,16 @@ func (s *StateDB) Commit(block uint64, deleteEmptyObjects bool) (common.Hash, er
return common.Hash{}, err
}
accountUpdatedMeter.Mark(int64(s.AccountUpdated))
storageUpdatedMeter.Mark(int64(s.StorageUpdated))
storageUpdatedMeter.Mark(s.StorageUpdated.Load())
accountDeletedMeter.Mark(int64(s.AccountDeleted))
storageDeletedMeter.Mark(int64(s.StorageDeleted))
storageDeletedMeter.Mark(s.StorageDeleted.Load())
accountTrieUpdatedMeter.Mark(int64(accountTrieNodesUpdated))
accountTrieDeletedMeter.Mark(int64(accountTrieNodesDeleted))
storageTriesUpdatedMeter.Mark(int64(storageTrieNodesUpdated))
storageTriesDeletedMeter.Mark(int64(storageTrieNodesDeleted))
s.AccountUpdated, s.AccountDeleted = 0, 0
s.StorageUpdated, s.StorageDeleted = 0, 0
s.StorageUpdated.Store(0)
s.StorageDeleted.Store(0)

// If snapshotting is enabled, update the snapshot tree with this new version
if s.snap != nil {
Expand Down
Loading
Loading