Skip to content

Commit

Permalink
core/state: convert prefetch queue to slice, add various warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
karalabe committed Jan 8, 2021
1 parent ba7999b commit fd78e87
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 85 deletions.
2 changes: 1 addition & 1 deletion core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func (s *stateObject) finalise() {
s.pendingStorage[key] = value
trieChanges = append(trieChanges, key)
}
if len(trieChanges) > 0 && s.db.prefetcher != nil && s.data.Root != emptyRoot {
if s.db.prefetcher != nil && len(trieChanges) > 0 && s.data.Root != emptyRoot && !s.deleted {
s.db.prefetcher.PrefetchStorage(s.data.Root, trieChanges)
}
if len(s.dirtyStorage) > 0 {
Expand Down
6 changes: 3 additions & 3 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -802,17 +802,17 @@ func (s *StateDB) Finalise(deleteEmptyObjects bool) {
}
s.stateObjectsPending[addr] = struct{}{}
s.stateObjectsDirty[addr] = struct{}{}

// At this point, also ship the address off to the precacher. The precacher
// will start loading tries, and when the change is eventually committed,
// the commit-phase will be a lot faster
if s.prefetcher != nil {
addressesToPrefetch = append(addressesToPrefetch, addr)
}
}
if s.prefetcher != nil {
if s.prefetcher != nil && addressesToPrefetch != nil {
s.prefetcher.PrefetchAddresses(addressesToPrefetch)
}

// Invalidate journal because reverting across transactions is not allowed.
s.clearJournalAndRefund()
}
Expand All @@ -829,10 +829,10 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
// a trie which has the same root, but also has some content loaded into it.
// If so, use that one instead.
if s.prefetcher != nil {
s.prefetcher.Pause()
// We only want to do this _once_, if someone calls IntermediateRoot again,
// we shouldn't fetch the trie again
if s.originalRoot != (common.Hash{}) {
s.prefetcher.Pause()
if trie := s.prefetcher.GetTrie(s.originalRoot); trie != nil {
s.trie = trie
}
Expand Down
225 changes: 144 additions & 81 deletions core/state/trie_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package state

import (
"sync"
"sync/atomic"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
Expand All @@ -30,20 +33,25 @@ var (

triePrefetchFetchMeter = metrics.NewRegisteredMeter("trie/prefetch/fetch", nil)
triePrefetchSkipMeter = metrics.NewRegisteredMeter("trie/prefetch/skip", nil)
triePrefetchDropMeter = metrics.NewRegisteredMeter("trie/prefetch/drop", nil)
)

// TriePrefetcher is an active prefetcher, which receives accounts or storage
// items on two channels, and does trie-loading of the items.
// The goal is to get as much useful content into the caches as possible
// items and does trie-loading of them. The goal is to get as much useful content
// into the caches as possible.
//
// Note, the prefetcher's API is not thread safe.
type TriePrefetcher struct {
requestCh chan (fetchRequest) // Chan to receive requests for data to fetch
cmdCh chan (*cmd) // Chan to control activity, pause/new root
quitCh chan (struct{})
deliveryCh chan (struct{})
db Database
db Database

cmdCh chan *command // Control channel to pause or reset the root
reqCh chan struct{} // Notification channel for new preload requests
quitCh chan struct{}

accountTasks []common.Address // Set of accounts to prefetch
storageTasks map[common.Hash][]common.Hash // Set of storage slots to prefetch
taskLock sync.Mutex // Lock protecting the task sets

paused bool
paused uint32 // Whether the prefetcher is actively loading data or serving tries

storageTries map[common.Hash]Trie
accountTrie Trie
Expand All @@ -52,22 +60,18 @@ type TriePrefetcher struct {

func NewTriePrefetcher(db Database) *TriePrefetcher {
return &TriePrefetcher{
requestCh: make(chan fetchRequest, 200),
cmdCh: make(chan *cmd),
quitCh: make(chan struct{}),
deliveryCh: make(chan struct{}),
db: db,
db: db,
cmdCh: make(chan *command),
reqCh: make(chan struct{}, 1), // 1 to notify, no need to track multiple notifications
quitCh: make(chan struct{}),
storageTasks: make(map[common.Hash][]common.Hash),
paused: 1, // User needs to call Resume() before allowing to preload data
}
}

type cmd struct {
type command struct {
root common.Hash
}

type fetchRequest struct {
slots []common.Hash
storageRoot *common.Hash
addresses []common.Address
done chan struct{}
}

func (p *TriePrefetcher) Loop() {
Expand All @@ -92,26 +96,19 @@ func (p *TriePrefetcher) Loop() {
case <-p.quitCh:
return
case cmd := <-p.cmdCh:
// Clear out any old requests
drain:
for {
select {
case req := <-p.requestCh:
if req.slots != nil {
skipped += int64(len(req.slots))
} else {
skipped += int64(len(req.addresses))
}
default:
break drain
}
// Clear out any pending update notification
select {
case <-p.reqCh: // Skip stale notification
default: // No notification queued
}

if paused {
// Clear old data
// Prefetcher is being resumed, clear old data
p.storageTries = nil
p.accountTrie = nil
p.accountTrieRoot = common.Hash{}
// Resume again

// Start with a new set of tries
storageTries = make(map[common.Hash]Trie)
accountTrieRoot = cmd.root
accountTrie, err = p.db.OpenTrie(accountTrieRoot)
Expand All @@ -123,47 +120,85 @@ func (p *TriePrefetcher) Loop() {
}
paused = false
} else {
// Prefetcher is being paused, abort all unfulfilled requests
p.taskLock.Lock()
skipped += int64(len(p.accountTasks))
for storageTasks := range p.storageTasks {
skipped += int64(len(storageTasks))
}
p.accountTasks = nil
p.storageTasks = make(map[common.Hash][]common.Hash)
p.taskLock.Unlock()

// Update metrics at new block events
triePrefetchFetchMeter.Mark(fetched)
triePrefetchSkipMeter.Mark(skipped)
fetched, skipped = 0, 0

// Make the tries accessible
p.accountTrie = accountTrie
p.storageTries = storageTries
p.accountTrieRoot = accountTrieRoot

if cmd.root != (common.Hash{}) {
log.Error("Trie prefetcher paused with non-empty root")
}
paused = true
}
p.deliveryCh <- struct{}{}
case req := <-p.requestCh:
close(cmd.done)

case <-p.reqCh:
if paused {
log.Error("Prefetch request arrived whilst paused")
continue
}
if sRoot := req.storageRoot; sRoot != nil {
// Storage slots to fetch
var (
storageTrie Trie
err error
)
if storageTrie = storageTries[*sRoot]; storageTrie == nil {
if storageTrie, err = p.db.OpenTrie(*sRoot); err != nil {
log.Warn("trie prefetcher failed opening storage trie", "root", *sRoot, "err", err)
skipped += int64(len(req.slots))
// Retrieve all the tasks queued up and reset the sets for new insertions
p.taskLock.Lock()
accountTasks, storageTasks := p.accountTasks, p.storageTasks
if len(accountTasks) > 0 {
p.accountTasks = nil
}
if len(storageTasks) > 0 {
p.storageTasks = make(map[common.Hash][]common.Hash)
}
p.taskLock.Unlock()

// Keep prefetching the data until an interruption is triggered
for _, addr := range accountTasks {
if atomic.LoadUint32(&p.paused) == 1 {
break
}
accountTrie.TryGet(addr[:])
fetched++
}
for root, slots := range storageTasks {
if atomic.LoadUint32(&p.paused) == 1 {
break
}
if _, ok := storageTries[root]; !ok {
storageTrie, err := p.db.OpenTrie(root)
if err != nil {
log.Warn("Trie prefetcher failed opening storage trie", "root", root, "err", err)
skipped += int64(len(slots))
continue
}
storageTries[*sRoot] = storageTrie
storageTries[root] = storageTrie
}
for _, key := range req.slots {
storageTrie.TryGet(key[:])
storageTrie := storageTries[root]
for _, hash := range slots {
if atomic.LoadUint32(&p.paused) == 1 {
break
}
storageTrie.TryGet(hash[:])
}
fetched += int64(len(req.slots))
} else { // an account
for _, addr := range req.addresses {
accountTrie.TryGet(addr[:])
fetched += int64(len(slots))
}
// If pre-fetching was interrupted, mark all remaining asks as skipped
if atomic.LoadUint32(&p.paused) == 1 {
skipped += int64(len(accountTasks))
for tasks := range storageTasks {
skipped += int64(len(tasks))
}
fetched += int64(len(req.addresses))
}
}
}
Expand All @@ -178,62 +213,90 @@ func (p *TriePrefetcher) Close() {
}

// Resume causes the prefetcher to clear out old data, and get ready to
// fetch data concerning the new root
// fetch data concerning the new root.
func (p *TriePrefetcher) Resume(root common.Hash) {
p.paused = false
p.cmdCh <- &cmd{
// Abort if the prefetcher is not paused
if atomic.LoadUint32(&p.paused) == 0 {
log.Error("Trie prefetcher already resumed")
return
}
atomic.StoreUint32(&p.paused, 0)

cmd := &command{
root: root,
done: make(chan struct{}),
}
// Wait for it
<-p.deliveryCh
p.cmdCh <- cmd
<-cmd.done
}

// Pause causes the prefetcher to pause prefetching, and make tries
// accessible to callers via GetTrie
// accessible to callers via GetTrie.
func (p *TriePrefetcher) Pause() {
if p.paused {
// Abort if the prefetcher is already paused
if atomic.LoadUint32(&p.paused) == 1 {
log.Error("Trie prefetcher already paused")
return
}
p.paused = true
p.cmdCh <- &cmd{
root: common.Hash{},
atomic.StoreUint32(&p.paused, 1)

// Request a pause and wait until it's done
cmd := &command{
done: make(chan struct{}),
}
// Wait for it
<-p.deliveryCh
p.cmdCh <- cmd
<-cmd.done
}

// PrefetchAddresses adds an address for prefetching
func (p *TriePrefetcher) PrefetchAddresses(addresses []common.Address) {
cmd := fetchRequest{
addresses: addresses,
// Abort if the prefetcher is already paused
if atomic.LoadUint32(&p.paused) == 1 {
log.Error("Attempted account trie-prefetch whilst paused")
return
}
// We do an async send here, to not cause the caller to block
//p.requestCh <- cmd
// Inject the addresses into the task queue and notify the prefetcher
p.taskLock.Lock()
defer p.taskLock.Unlock()

p.accountTasks = append(p.accountTasks, addresses...)
select {
case p.requestCh <- cmd:
case p.reqCh <- struct{}{}:
default:
triePrefetchDropMeter.Mark(int64(len(addresses)))
// Already notified
}
}

// PrefetchStorage adds a storage root and a set of keys for prefetching
func (p *TriePrefetcher) PrefetchStorage(root common.Hash, slots []common.Hash) {
cmd := fetchRequest{
storageRoot: &root,
slots: slots,
// Abort if the prefetcher is already paused
if atomic.LoadUint32(&p.paused) == 1 {
log.Error("Attempted storage trie-prefetch whilst paused")
return
}
// We do an async send here, to not cause the caller to block
//p.requestCh <- cmd
// Inject the storage hashes into the task queue and notify the prefetcher
p.taskLock.Lock()
defer p.taskLock.Unlock()

p.storageTasks[root] = append(p.storageTasks[root], slots...)
select {
case p.requestCh <- cmd:
case p.reqCh <- struct{}{}:
default:
triePrefetchDropMeter.Mark(int64(len(slots)))
// Already notified
}
}

// GetTrie returns the trie matching the root hash, or nil if the prefetcher
// doesn't have it.
//
// Note, if snapshots are not enabled, we'll hit this code path during normal
// transaction processing, so don't check for that invariant.
func (p *TriePrefetcher) GetTrie(root common.Hash) Trie {
// Abort if the prefetcher is not paused
//if atomic.LoadUint32(&p.paused) == 0 {
// log.Error("Attempted trie-prefetcher retrieval whilst not paused")
// return nil
//}
if root == p.accountTrieRoot {
return p.accountTrie
}
Expand Down

0 comments on commit fd78e87

Please sign in to comment.