Skip to content

Commit

Permalink
cmd, core: prefetch reads too from tries if requested (#29807)
Browse files Browse the repository at this point in the history
* cmd/utils, consensus/beacon, core/state: when configured via stub  flag: prefetch all reads from account/storage tries, terminate prefetcher synchronously.

* cmd, core/state: fix nil panic, fix error handling, prefetch nosnap too

* core/state: expand prefetcher metrics for reads and writes separately

* cmd/utils, eth: fix noop collect witness flag

---------

Co-authored-by: Péter Szilágyi <peterke@gmail.com>
  • Loading branch information
jwasinger and karalabe authored Jun 11, 2024
1 parent 2eb185c commit 85587d5
Show file tree
Hide file tree
Showing 10 changed files with 159 additions and 56 deletions.
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ var (
utils.BeaconGenesisRootFlag,
utils.BeaconGenesisTimeFlag,
utils.BeaconCheckpointFlag,
utils.CollectWitnessFlag,
}, utils.NetworkFlags, utils.DatabaseFlags)

rpcFlags = []cli.Flag{
Expand Down
13 changes: 12 additions & 1 deletion cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,11 @@ var (
Usage: "Disables db compaction after import",
Category: flags.LoggingCategory,
}
CollectWitnessFlag = &cli.BoolFlag{
Name: "collectwitness",
Usage: "Enable state witness generation during block execution. Work in progress flag, don't use.",
Category: flags.MiscCategory,
}

// MISC settings
SyncTargetFlag = &cli.StringFlag{
Expand Down Expand Up @@ -1760,6 +1765,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
// TODO(fjl): force-enable this in --dev mode
cfg.EnablePreimageRecording = ctx.Bool(VMEnableDebugFlag.Name)
}
if ctx.IsSet(CollectWitnessFlag.Name) {
cfg.EnableWitnessCollection = ctx.Bool(CollectWitnessFlag.Name)
}

if ctx.IsSet(RPCGlobalGasCapFlag.Name) {
cfg.RPCGasCap = ctx.Uint64(RPCGlobalGasCapFlag.Name)
Expand Down Expand Up @@ -2190,7 +2198,10 @@ func MakeChain(ctx *cli.Context, stack *node.Node, readonly bool) (*core.BlockCh
if ctx.IsSet(CacheFlag.Name) || ctx.IsSet(CacheGCFlag.Name) {
cache.TrieDirtyLimit = ctx.Int(CacheFlag.Name) * ctx.Int(CacheGCFlag.Name) / 100
}
vmcfg := vm.Config{EnablePreimageRecording: ctx.Bool(VMEnableDebugFlag.Name)}
vmcfg := vm.Config{
EnablePreimageRecording: ctx.Bool(VMEnableDebugFlag.Name),
EnableWitnessCollection: ctx.Bool(CollectWitnessFlag.Name),
}
if ctx.IsSet(VMTraceFlag.Name) {
if name := ctx.String(VMTraceFlag.Name); name != "" {
var config json.RawMessage
Expand Down
2 changes: 1 addition & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1809,7 +1809,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
// while processing transactions. Before Byzantium the prefetcher is mostly
// useless due to the intermediate root hashing after each transaction.
if bc.chainConfig.IsByzantium(block.Number()) {
statedb.StartPrefetcher("chain")
statedb.StartPrefetcher("chain", !bc.vmConfig.EnableWitnessCollection)
}
activeState = statedb

Expand Down
10 changes: 9 additions & 1 deletion core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,14 @@ func (s *stateObject) GetCommittedState(key common.Hash) common.Hash {
}
value.SetBytes(val)
}
// Independent of where we loaded the data from, add it to the prefetcher.
// Whilst this would be a bit weird if snapshots are disabled, but we still
// want the trie nodes to end up in the prefetcher too, so just push through.
if s.db.prefetcher != nil && s.data.Root != types.EmptyRootHash {
if err = s.db.prefetcher.prefetch(s.addrHash, s.origin.Root, s.address, [][]byte{key[:]}, true); err != nil {
log.Error("Failed to prefetch storage slot", "addr", s.address, "key", key, "err", err)
}
}
s.originStorage[key] = value
return value
}
Expand Down Expand Up @@ -293,7 +301,7 @@ func (s *stateObject) finalise() {
s.pendingStorage[key] = value
}
if s.db.prefetcher != nil && len(slotsToPrefetch) > 0 && s.data.Root != types.EmptyRootHash {
if err := s.db.prefetcher.prefetch(s.addrHash, s.data.Root, s.address, slotsToPrefetch); err != nil {
if err := s.db.prefetcher.prefetch(s.addrHash, s.data.Root, s.address, slotsToPrefetch, false); err != nil {
log.Error("Failed to prefetch slots", "addr", s.address, "slots", len(slotsToPrefetch), "err", err)
}
}
Expand Down
16 changes: 12 additions & 4 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,14 +200,14 @@ func (s *StateDB) SetLogger(l *tracing.Hooks) {
// StartPrefetcher initializes a new trie prefetcher to pull in nodes from the
// state trie concurrently while the state is mutated so that when we reach the
// commit phase, most of the needed data is already hot.
func (s *StateDB) StartPrefetcher(namespace string) {
func (s *StateDB) StartPrefetcher(namespace string, noreads bool) {
if s.prefetcher != nil {
s.prefetcher.terminate(false)
s.prefetcher.report()
s.prefetcher = nil
}
if s.snap != nil {
s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace)
s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace, noreads)

// With the switch to the Proof-of-Stake consensus algorithm, block production
// rewards are now handled at the consensus layer. Consequently, a block may
Expand All @@ -218,7 +218,7 @@ func (s *StateDB) StartPrefetcher(namespace string) {
// To prevent this, the account trie is always scheduled for prefetching once
// the prefetcher is constructed. For more details, see:
// https://github.com/ethereum/go-ethereum/issues/29880
if err := s.prefetcher.prefetch(common.Hash{}, s.originalRoot, common.Address{}, nil); err != nil {
if err := s.prefetcher.prefetch(common.Hash{}, s.originalRoot, common.Address{}, nil, false); err != nil {
log.Error("Failed to prefetch account trie", "root", s.originalRoot, "err", err)
}
}
Expand Down Expand Up @@ -616,6 +616,14 @@ func (s *StateDB) getStateObject(addr common.Address) *stateObject {
return nil
}
}
// Independent of where we loaded the data from, add it to the prefetcher.
// Whilst this would be a bit weird if snapshots are disabled, but we still
// want the trie nodes to end up in the prefetcher too, so just push through.
if s.prefetcher != nil {
if err := s.prefetcher.prefetch(common.Hash{}, s.originalRoot, common.Address{}, [][]byte{addr[:]}, true); err != nil {
log.Error("Failed to prefetch account", "addr", addr, "err", err)
}
}
// Insert into the live set
obj := newObject(s, addr, data)
s.setStateObject(obj)
Expand Down Expand Up @@ -792,7 +800,7 @@ func (s *StateDB) Finalise(deleteEmptyObjects bool) {
addressesToPrefetch = append(addressesToPrefetch, common.CopyBytes(addr[:])) // Copy needed for closure
}
if s.prefetcher != nil && len(addressesToPrefetch) > 0 {
if err := s.prefetcher.prefetch(common.Hash{}, s.originalRoot, common.Address{}, addressesToPrefetch); err != nil {
if err := s.prefetcher.prefetch(common.Hash{}, s.originalRoot, common.Address{}, addressesToPrefetch, false); err != nil {
log.Error("Failed to prefetch addresses", "addresses", len(addressesToPrefetch), "err", err)
}
}
Expand Down
162 changes: 116 additions & 46 deletions core/state/trie_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,31 +44,49 @@ type triePrefetcher struct {
root common.Hash // Root hash of the account trie for metrics
fetchers map[string]*subfetcher // Subfetchers for each trie
term chan struct{} // Channel to signal interruption
noreads bool // Whether to ignore state-read-only prefetch requests

deliveryMissMeter metrics.Meter
accountLoadMeter metrics.Meter
accountDupMeter metrics.Meter
accountWasteMeter metrics.Meter
storageLoadMeter metrics.Meter
storageDupMeter metrics.Meter
storageWasteMeter metrics.Meter

accountLoadReadMeter metrics.Meter
accountLoadWriteMeter metrics.Meter
accountDupReadMeter metrics.Meter
accountDupWriteMeter metrics.Meter
accountDupCrossMeter metrics.Meter
accountWasteMeter metrics.Meter

storageLoadReadMeter metrics.Meter
storageLoadWriteMeter metrics.Meter
storageDupReadMeter metrics.Meter
storageDupWriteMeter metrics.Meter
storageDupCrossMeter metrics.Meter
storageWasteMeter metrics.Meter
}

func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePrefetcher {
func newTriePrefetcher(db Database, root common.Hash, namespace string, noreads bool) *triePrefetcher {
prefix := triePrefetchMetricsPrefix + namespace
return &triePrefetcher{
db: db,
root: root,
fetchers: make(map[string]*subfetcher), // Active prefetchers use the fetchers map
term: make(chan struct{}),
noreads: noreads,

deliveryMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss", nil),
accountLoadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load", nil),
accountDupMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup", nil),
accountWasteMeter: metrics.GetOrRegisterMeter(prefix+"/account/waste", nil),
storageLoadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/load", nil),
storageDupMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup", nil),
storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil),

accountLoadReadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load/read", nil),
accountLoadWriteMeter: metrics.GetOrRegisterMeter(prefix+"/account/load/write", nil),
accountDupReadMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup/read", nil),
accountDupWriteMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup/write", nil),
accountDupCrossMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup/cross", nil),
accountWasteMeter: metrics.GetOrRegisterMeter(prefix+"/account/waste", nil),

storageLoadReadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/load/read", nil),
storageLoadWriteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/load/write", nil),
storageDupReadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup/read", nil),
storageDupWriteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup/write", nil),
storageDupCrossMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup/cross", nil),
storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil),
}
}

Expand Down Expand Up @@ -98,19 +116,31 @@ func (p *triePrefetcher) report() {
fetcher.wait() // ensure the fetcher's idle before poking in its internals

if fetcher.root == p.root {
p.accountLoadMeter.Mark(int64(len(fetcher.seen)))
p.accountDupMeter.Mark(int64(fetcher.dups))
p.accountLoadReadMeter.Mark(int64(len(fetcher.seenRead)))
p.accountLoadWriteMeter.Mark(int64(len(fetcher.seenWrite)))

p.accountDupReadMeter.Mark(int64(fetcher.dupsRead))
p.accountDupWriteMeter.Mark(int64(fetcher.dupsWrite))
p.accountDupCrossMeter.Mark(int64(fetcher.dupsCross))

for _, key := range fetcher.used {
delete(fetcher.seen, string(key))
delete(fetcher.seenRead, string(key))
delete(fetcher.seenWrite, string(key))
}
p.accountWasteMeter.Mark(int64(len(fetcher.seen)))
p.accountWasteMeter.Mark(int64(len(fetcher.seenRead) + len(fetcher.seenWrite)))
} else {
p.storageLoadMeter.Mark(int64(len(fetcher.seen)))
p.storageDupMeter.Mark(int64(fetcher.dups))
p.storageLoadReadMeter.Mark(int64(len(fetcher.seenRead)))
p.storageLoadWriteMeter.Mark(int64(len(fetcher.seenWrite)))

p.storageDupReadMeter.Mark(int64(fetcher.dupsRead))
p.storageDupWriteMeter.Mark(int64(fetcher.dupsWrite))
p.storageDupCrossMeter.Mark(int64(fetcher.dupsCross))

for _, key := range fetcher.used {
delete(fetcher.seen, string(key))
delete(fetcher.seenRead, string(key))
delete(fetcher.seenWrite, string(key))
}
p.storageWasteMeter.Mark(int64(len(fetcher.seen)))
p.storageWasteMeter.Mark(int64(len(fetcher.seenRead) + len(fetcher.seenWrite)))
}
}
}
Expand All @@ -126,7 +156,11 @@ func (p *triePrefetcher) report() {
// upon the same contract, the parameters invoking this method may be
// repeated.
// 2. Finalize of the main account trie. This happens only once per block.
func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, addr common.Address, keys [][]byte) error {
func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, addr common.Address, keys [][]byte, read bool) error {
// If the state item is only being read, but reads are disabled, return
if read && p.noreads {
return nil
}
// Ensure the subfetcher is still alive
select {
case <-p.term:
Expand All @@ -139,7 +173,7 @@ func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, addr comm
fetcher = newSubfetcher(p.db, p.root, owner, root, addr)
p.fetchers[id] = fetcher
}
return fetcher.schedule(keys)
return fetcher.schedule(keys, read)
}

// trie returns the trie matching the root hash, blocking until the fetcher of
Expand Down Expand Up @@ -186,38 +220,51 @@ type subfetcher struct {
addr common.Address // Address of the account that the trie belongs to
trie Trie // Trie being populated with nodes

tasks [][]byte // Items queued up for retrieval
lock sync.Mutex // Lock protecting the task queue
tasks []*subfetcherTask // Items queued up for retrieval
lock sync.Mutex // Lock protecting the task queue

wake chan struct{} // Wake channel if a new task is scheduled
stop chan struct{} // Channel to interrupt processing
term chan struct{} // Channel to signal interruption

seen map[string]struct{} // Tracks the entries already loaded
dups int // Number of duplicate preload tasks
used [][]byte // Tracks the entries used in the end
seenRead map[string]struct{} // Tracks the entries already loaded via read operations
seenWrite map[string]struct{} // Tracks the entries already loaded via write operations

dupsRead int // Number of duplicate preload tasks via reads only
dupsWrite int // Number of duplicate preload tasks via writes only
dupsCross int // Number of duplicate preload tasks via read-write-crosses

used [][]byte // Tracks the entries used in the end
}

// subfetcherTask is a trie path to prefetch, tagged with whether it originates
// from a read or a write request.
type subfetcherTask struct {
read bool
key []byte
}

// newSubfetcher creates a goroutine to prefetch state items belonging to a
// particular root hash.
func newSubfetcher(db Database, state common.Hash, owner common.Hash, root common.Hash, addr common.Address) *subfetcher {
sf := &subfetcher{
db: db,
state: state,
owner: owner,
root: root,
addr: addr,
wake: make(chan struct{}, 1),
stop: make(chan struct{}),
term: make(chan struct{}),
seen: make(map[string]struct{}),
db: db,
state: state,
owner: owner,
root: root,
addr: addr,
wake: make(chan struct{}, 1),
stop: make(chan struct{}),
term: make(chan struct{}),
seenRead: make(map[string]struct{}),
seenWrite: make(map[string]struct{}),
}
go sf.loop()
return sf
}

// schedule adds a batch of trie keys to the queue to prefetch.
func (sf *subfetcher) schedule(keys [][]byte) error {
func (sf *subfetcher) schedule(keys [][]byte, read bool) error {
// Ensure the subfetcher is still alive
select {
case <-sf.term:
Expand All @@ -226,7 +273,10 @@ func (sf *subfetcher) schedule(keys [][]byte) error {
}
// Append the tasks to the current queue
sf.lock.Lock()
sf.tasks = append(sf.tasks, keys...)
for _, key := range keys {
key := key // closure for the append below
sf.tasks = append(sf.tasks, &subfetcherTask{read: read, key: key})
}
sf.lock.Unlock()

// Notify the background thread to execute scheduled tasks
Expand Down Expand Up @@ -303,16 +353,36 @@ func (sf *subfetcher) loop() {
sf.lock.Unlock()

for _, task := range tasks {
if _, ok := sf.seen[string(task)]; ok {
sf.dups++
continue
key := string(task.key)
if task.read {
if _, ok := sf.seenRead[key]; ok {
sf.dupsRead++
continue
}
if _, ok := sf.seenWrite[key]; ok {
sf.dupsCross++
continue
}
} else {
if _, ok := sf.seenRead[key]; ok {
sf.dupsCross++
continue
}
if _, ok := sf.seenWrite[key]; ok {
sf.dupsWrite++
continue
}
}
if len(task.key) == common.AddressLength {
sf.trie.GetAccount(common.BytesToAddress(task.key))
} else {
sf.trie.GetStorage(sf.addr, task.key)
}
if len(task) == common.AddressLength {
sf.trie.GetAccount(common.BytesToAddress(task))
if task.read {
sf.seenRead[key] = struct{}{}
} else {
sf.trie.GetStorage(sf.addr, task)
sf.seenWrite[key] = struct{}{}
}
sf.seen[string(task)] = struct{}{}
}

case <-sf.stop:
Expand Down
6 changes: 3 additions & 3 deletions core/state/trie_prefetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ func filledStateDB() *StateDB {

func TestUseAfterTerminate(t *testing.T) {
db := filledStateDB()
prefetcher := newTriePrefetcher(db.db, db.originalRoot, "")
prefetcher := newTriePrefetcher(db.db, db.originalRoot, "", true)
skey := common.HexToHash("aaa")

if err := prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}); err != nil {
if err := prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}, false); err != nil {
t.Errorf("Prefetch failed before terminate: %v", err)
}
prefetcher.terminate(false)

if err := prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}); err == nil {
if err := prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}, false); err == nil {
t.Errorf("Prefetch succeeded after terminate: %v", err)
}
if tr := prefetcher.trie(common.Hash{}, db.originalRoot); tr == nil {
Expand Down
1 change: 1 addition & 0 deletions core/vm/interpreter.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Config struct {
NoBaseFee bool // Forces the EIP-1559 baseFee to 0 (needed for 0 price calls)
EnablePreimageRecording bool // Enables recording of SHA3/keccak preimages
ExtraEips []int // Additional EIPS that are to be enabled
EnableWitnessCollection bool // true if witness collection is enabled
}

// ScopeContext contains the things that are per-call, such as stack and memory,
Expand Down
Loading

0 comments on commit 85587d5

Please sign in to comment.