Skip to content

Commit

Permalink
make updateRoot and snapupdate concurrently
Browse files Browse the repository at this point in the history
  • Loading branch information
unclezoro committed May 31, 2021
1 parent 4cc729a commit 5bdb7b4
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 64 deletions.
2 changes: 1 addition & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1895,7 +1895,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
start := i * len(accountsSlice) / runtime.NumCPU()
end := (i + 1) * len(accountsSlice) / runtime.NumCPU()
if i+1 == runtime.NumCPU() {
end = len(accounts)
end = len(accountsSlice)
}
preloadWg.Add(1)
gopool.Submit(func() {
Expand Down
14 changes: 12 additions & 2 deletions core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,11 @@ func (s *stateObject) updateTrie(db Database) Trie {
}
// Track the amount of time wasted on updating the storage trie
if metrics.EnabledExpensive {
defer func(start time.Time) { s.db.StorageUpdates += time.Since(start) }(time.Now())
defer func(start time.Time) {
s.db.MetricsMux.Lock()
s.db.StorageUpdates += time.Since(start)
s.db.MetricsMux.Unlock()
}(time.Now())
}
// The snapshot storage map for the object
var storage map[common.Hash][]byte
Expand All @@ -361,6 +365,7 @@ func (s *stateObject) updateTrie(db Database) Trie {
}
// If state snapshotting is active, cache the data til commit
if s.db.snap != nil {
s.db.snapMux.Lock()
if storage == nil {
// Retrieve the old storage map, if available, create a new one otherwise
if storage = s.db.snapStorage[s.addrHash]; storage == nil {
Expand All @@ -369,6 +374,7 @@ func (s *stateObject) updateTrie(db Database) Trie {
}
}
storage[crypto.HashData(hasher, key[:])] = v // v will be nil if value is 0x00
s.db.snapMux.Unlock()
}
usedStorage = append(usedStorage, common.CopyBytes(key[:])) // Copy needed for closure
}
Expand All @@ -389,7 +395,11 @@ func (s *stateObject) updateRoot(db Database) {
}
// Track the amount of time wasted on hashing the storage trie
if metrics.EnabledExpensive {
defer func(start time.Time) { s.db.StorageHashes += time.Since(start) }(time.Now())
defer func(start time.Time) {
s.db.MetricsMux.Lock()
s.db.StorageHashes += time.Since(start)
s.db.MetricsMux.Unlock()
}(time.Now())
}
s.data.Root = s.trie.Hash()
}
Expand Down
174 changes: 113 additions & 61 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ import (
"errors"
"fmt"
"math/big"
"runtime"
"sort"
"sync"
"time"

"github.com/ethereum/go-ethereum/common/gopool"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state/snapshot"
Expand Down Expand Up @@ -69,6 +72,7 @@ type StateDB struct {
trie Trie
hasher crypto.KeccakState

snapMux sync.Mutex
snaps *snapshot.Tree
snap snapshot.Snapshot
snapDestructs map[common.Hash]struct{}
Expand Down Expand Up @@ -108,6 +112,7 @@ type StateDB struct {
nextRevisionId int

// Measurements gathered during execution for debugging purposes
MetricsMux sync.Mutex
AccountReads time.Duration
AccountHashes time.Duration
AccountUpdates time.Duration
Expand Down Expand Up @@ -895,16 +900,38 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
s.prefetcher = nil
}()
}

tasks := make(chan func())
finishCh := make(chan struct{})
wg := sync.WaitGroup{}
for i := 0; i < runtime.NumCPU(); i++ {
gopool.Submit(func() {
for {
select {
case task := <-tasks:
task()
case <-finishCh:
return
}
}
})
}
// Although naively it makes sense to retrieve the account trie and then do
// the contract storage and account updates sequentially, that short circuits
// the account prefetcher. Instead, let's process all the storage updates
// first, giving the account prefeches just a few more milliseconds of time
// to pull useful data from disk.
for addr := range s.stateObjectsPending {
if obj := s.stateObjects[addr]; !obj.deleted {
obj.updateRoot(s.db)
wg.Add(1)
tasks <- func() {
obj.updateRoot(s.db)
wg.Done()
}
}
}
wg.Wait()
close(finishCh)
// Now we're about to start to write changes to the trie. The trie is so far
// _untouched_. We can check with the prefetcher, if it can give us a trie
// which has the same root, but also has some content loaded into it.
Expand Down Expand Up @@ -958,72 +985,97 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, error) {
return common.Hash{}, fmt.Errorf("commit aborted due to earlier error: %v", s.dbErr)
}
// Finalize any pending changes and merge everything into the tries
s.IntermediateRoot(deleteEmptyObjects)

// Commit objects to the trie, measuring the elapsed time
codeWriter := s.db.TrieDB().DiskDB().NewBatch()
for addr := range s.stateObjectsDirty {
if obj := s.stateObjects[addr]; !obj.deleted {
// Write any contract code associated with the state object
if obj.code != nil && obj.dirtyCode {
rawdb.WriteCode(codeWriter, common.BytesToHash(obj.CodeHash()), obj.code)
obj.dirtyCode = false
root := s.IntermediateRoot(deleteEmptyObjects)

commitFuncs := []func() error{
func() error {
// Commit objects to the trie, measuring the elapsed time
codeWriter := s.db.TrieDB().DiskDB().NewBatch()
for addr := range s.stateObjectsDirty {
if obj := s.stateObjects[addr]; !obj.deleted {
// Write any contract code associated with the state object
if obj.code != nil && obj.dirtyCode {
rawdb.WriteCode(codeWriter, common.BytesToHash(obj.CodeHash()), obj.code)
obj.dirtyCode = false
}
// Write any storage changes in the state object to its storage trie
if err := obj.CommitTrie(s.db); err != nil {
return err
}
}
}
// Write any storage changes in the state object to its storage trie
if err := obj.CommitTrie(s.db); err != nil {
return common.Hash{}, err
if len(s.stateObjectsDirty) > 0 {
s.stateObjectsDirty = make(map[common.Address]struct{}, len(s.stateObjectsDirty)/2)
}
}
}
if len(s.stateObjectsDirty) > 0 {
s.stateObjectsDirty = make(map[common.Address]struct{})
}
if codeWriter.ValueSize() > 0 {
if err := codeWriter.Write(); err != nil {
log.Crit("Failed to commit dirty codes", "error", err)
}
}
// Write the account trie changes, measuing the amount of wasted time
var start time.Time
if metrics.EnabledExpensive {
start = time.Now()
}
// The onleaf func is called _serially_, so we can reuse the same account
// for unmarshalling every time.
var account Account
root, err := s.trie.Commit(func(_ [][]byte, _ []byte, leaf []byte, parent common.Hash) error {
if err := rlp.DecodeBytes(leaf, &account); err != nil {
return nil
}
if account.Root != emptyRoot {
s.db.TrieDB().Reference(account.Root, parent)
}
return nil
})
if metrics.EnabledExpensive {
s.AccountCommits += time.Since(start)
}
// If snapshotting is enabled, update the snapshot tree with this new version
if s.snap != nil {
if metrics.EnabledExpensive {
defer func(start time.Time) { s.SnapshotCommits += time.Since(start) }(time.Now())
}
// Only update if there's a state transition (skip empty Clique blocks)
if parent := s.snap.Root(); parent != root {
if err := s.snaps.Update(root, parent, s.snapDestructs, s.snapAccounts, s.snapStorage); err != nil {
log.Warn("Failed to update snapshot tree", "from", parent, "to", root, "err", err)
if codeWriter.ValueSize() > 0 {
if err := codeWriter.Write(); err != nil {
log.Crit("Failed to commit dirty codes", "error", err)
}
}
// Write the account trie changes, measuing the amount of wasted time
var start time.Time
if metrics.EnabledExpensive {
start = time.Now()
}
// Keep 128 diff layers in the memory, persistent layer is 129th.
// - head layer is paired with HEAD state
// - head-1 layer is paired with HEAD-1 state
// - head-127 layer(bottom-most diff layer) is paired with HEAD-127 state
if err := s.snaps.Cap(root, 128); err != nil {
log.Warn("Failed to cap snapshot tree", "root", root, "layers", 128, "err", err)
// The onleaf func is called _serially_, so we can reuse the same account
// for unmarshalling every time.
var account Account
_, err := s.trie.Commit(func(_ [][]byte, _ []byte, leaf []byte, parent common.Hash) error {
if err := rlp.DecodeBytes(leaf, &account); err != nil {
return nil
}
if account.Root != emptyRoot {
s.db.TrieDB().Reference(account.Root, parent)
}
return nil
})
if err != nil {
return err
}
if metrics.EnabledExpensive {
s.AccountCommits += time.Since(start)
}
return nil
},
func() error {
// If snapshotting is enabled, update the snapshot tree with this new version
if s.snap != nil {
if metrics.EnabledExpensive {
defer func(start time.Time) { s.SnapshotCommits += time.Since(start) }(time.Now())
}
// Only update if there's a state transition (skip empty Clique blocks)
if parent := s.snap.Root(); parent != root {
if err := s.snaps.Update(root, parent, s.snapDestructs, s.snapAccounts, s.snapStorage); err != nil {
log.Warn("Failed to update snapshot tree", "from", parent, "to", root, "err", err)
}
// Keep 128 diff layers in the memory, persistent layer is 129th.
// - head layer is paired with HEAD state
// - head-1 layer is paired with HEAD-1 state
// - head-127 layer(bottom-most diff layer) is paired with HEAD-127 state
if err := s.snaps.Cap(root, 128); err != nil {
log.Warn("Failed to cap snapshot tree", "root", root, "layers", 128, "err", err)
}
}
s.snap, s.snapDestructs, s.snapAccounts, s.snapStorage = nil, nil, nil, nil
}
return nil
},
}
commitRes := make(chan error, len(commitFuncs))
for _, f := range commitFuncs {
tmpFunc := f
gopool.Submit(func() {
commitRes <- tmpFunc()
})
}
for i := 0; i < len(commitFuncs); i++ {
r := <-commitRes
if r != nil {
return common.Hash{}, r
}
s.snap, s.snapDestructs, s.snapAccounts, s.snapStorage = nil, nil, nil, nil
}
return root, err

return root, nil
}

// PrepareAccessList handles the preparatory steps for executing a state transition with
Expand Down

0 comments on commit 5bdb7b4

Please sign in to comment.