Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
make updateRoot and snapupdate concurrently
Browse files Browse the repository at this point in the history
unclezoro committed May 31, 2021
1 parent 4cc729a commit 62904a0
Showing 2 changed files with 112 additions and 62 deletions.
2 changes: 1 addition & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
@@ -1895,7 +1895,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
start := i * len(accountsSlice) / runtime.NumCPU()
end := (i + 1) * len(accountsSlice) / runtime.NumCPU()
if i+1 == runtime.NumCPU() {
end = len(accounts)
end = len(accountsSlice)
}
preloadWg.Add(1)
gopool.Submit(func() {
172 changes: 111 additions & 61 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
@@ -21,10 +21,13 @@ import (
"errors"
"fmt"
"math/big"
"runtime"
"sort"
"sync"
"time"

"github.com/ethereum/go-ethereum/common/gopool"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state/snapshot"
@@ -895,16 +898,38 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
s.prefetcher = nil
}()
}

tasks := make(chan func())
finishCh := make(chan struct{})
wg := sync.WaitGroup{}
for i := 0; i < runtime.NumCPU(); i++ {
gopool.Submit(func() {
for {
select {
case task := <-tasks:
task()
case <-finishCh:
return
}
}
})
}
// Although naively it makes sense to retrieve the account trie and then do
// the contract storage and account updates sequentially, that short circuits
// the account prefetcher. Instead, let's process all the storage updates
// first, giving the account prefeches just a few more milliseconds of time
// to pull useful data from disk.
for addr := range s.stateObjectsPending {
if obj := s.stateObjects[addr]; !obj.deleted {
obj.updateRoot(s.db)
wg.Add(1)
tasks <- func() {
obj.updateRoot(s.db)
wg.Done()
}
}
}
wg.Wait()
close(finishCh)
// Now we're about to start to write changes to the trie. The trie is so far
// _untouched_. We can check with the prefetcher, if it can give us a trie
// which has the same root, but also has some content loaded into it.
@@ -958,72 +983,97 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, error) {
return common.Hash{}, fmt.Errorf("commit aborted due to earlier error: %v", s.dbErr)
}
// Finalize any pending changes and merge everything into the tries
s.IntermediateRoot(deleteEmptyObjects)

// Commit objects to the trie, measuring the elapsed time
codeWriter := s.db.TrieDB().DiskDB().NewBatch()
for addr := range s.stateObjectsDirty {
if obj := s.stateObjects[addr]; !obj.deleted {
// Write any contract code associated with the state object
if obj.code != nil && obj.dirtyCode {
rawdb.WriteCode(codeWriter, common.BytesToHash(obj.CodeHash()), obj.code)
obj.dirtyCode = false
root := s.IntermediateRoot(deleteEmptyObjects)

commitFuncs := []func() error{
func() error {
// Commit objects to the trie, measuring the elapsed time
codeWriter := s.db.TrieDB().DiskDB().NewBatch()
for addr := range s.stateObjectsDirty {
if obj := s.stateObjects[addr]; !obj.deleted {
// Write any contract code associated with the state object
if obj.code != nil && obj.dirtyCode {
rawdb.WriteCode(codeWriter, common.BytesToHash(obj.CodeHash()), obj.code)
obj.dirtyCode = false
}
// Write any storage changes in the state object to its storage trie
if err := obj.CommitTrie(s.db); err != nil {
return err
}
}
}
// Write any storage changes in the state object to its storage trie
if err := obj.CommitTrie(s.db); err != nil {
return common.Hash{}, err
if len(s.stateObjectsDirty) > 0 {
s.stateObjectsDirty = make(map[common.Address]struct{}, len(s.stateObjectsDirty)/2)
}
}
}
if len(s.stateObjectsDirty) > 0 {
s.stateObjectsDirty = make(map[common.Address]struct{})
}
if codeWriter.ValueSize() > 0 {
if err := codeWriter.Write(); err != nil {
log.Crit("Failed to commit dirty codes", "error", err)
}
}
// Write the account trie changes, measuing the amount of wasted time
var start time.Time
if metrics.EnabledExpensive {
start = time.Now()
}
// The onleaf func is called _serially_, so we can reuse the same account
// for unmarshalling every time.
var account Account
root, err := s.trie.Commit(func(_ [][]byte, _ []byte, leaf []byte, parent common.Hash) error {
if err := rlp.DecodeBytes(leaf, &account); err != nil {
return nil
}
if account.Root != emptyRoot {
s.db.TrieDB().Reference(account.Root, parent)
}
return nil
})
if metrics.EnabledExpensive {
s.AccountCommits += time.Since(start)
}
// If snapshotting is enabled, update the snapshot tree with this new version
if s.snap != nil {
if metrics.EnabledExpensive {
defer func(start time.Time) { s.SnapshotCommits += time.Since(start) }(time.Now())
}
// Only update if there's a state transition (skip empty Clique blocks)
if parent := s.snap.Root(); parent != root {
if err := s.snaps.Update(root, parent, s.snapDestructs, s.snapAccounts, s.snapStorage); err != nil {
log.Warn("Failed to update snapshot tree", "from", parent, "to", root, "err", err)
if codeWriter.ValueSize() > 0 {
if err := codeWriter.Write(); err != nil {
log.Crit("Failed to commit dirty codes", "error", err)
}
}
// Write the account trie changes, measuing the amount of wasted time
var start time.Time
if metrics.EnabledExpensive {
start = time.Now()
}
// Keep 128 diff layers in the memory, persistent layer is 129th.
// - head layer is paired with HEAD state
// - head-1 layer is paired with HEAD-1 state
// - head-127 layer(bottom-most diff layer) is paired with HEAD-127 state
if err := s.snaps.Cap(root, 128); err != nil {
log.Warn("Failed to cap snapshot tree", "root", root, "layers", 128, "err", err)
// The onleaf func is called _serially_, so we can reuse the same account
// for unmarshalling every time.
var account Account
_, err := s.trie.Commit(func(_ [][]byte, _ []byte, leaf []byte, parent common.Hash) error {
if err := rlp.DecodeBytes(leaf, &account); err != nil {
return nil
}
if account.Root != emptyRoot {
s.db.TrieDB().Reference(account.Root, parent)
}
return nil
})
if err != nil {
return err
}
if metrics.EnabledExpensive {
s.AccountCommits += time.Since(start)
}
return nil
},
func() error {
// If snapshotting is enabled, update the snapshot tree with this new version
if s.snap != nil {
if metrics.EnabledExpensive {
defer func(start time.Time) { s.SnapshotCommits += time.Since(start) }(time.Now())
}
// Only update if there's a state transition (skip empty Clique blocks)
if parent := s.snap.Root(); parent != root {
if err := s.snaps.Update(root, parent, s.snapDestructs, s.snapAccounts, s.snapStorage); err != nil {
log.Warn("Failed to update snapshot tree", "from", parent, "to", root, "err", err)
}
// Keep 128 diff layers in the memory, persistent layer is 129th.
// - head layer is paired with HEAD state
// - head-1 layer is paired with HEAD-1 state
// - head-127 layer(bottom-most diff layer) is paired with HEAD-127 state
if err := s.snaps.Cap(root, 128); err != nil {
log.Warn("Failed to cap snapshot tree", "root", root, "layers", 128, "err", err)
}
}
s.snap, s.snapDestructs, s.snapAccounts, s.snapStorage = nil, nil, nil, nil
}
return nil
},
}
commitRes := make(chan error, len(commitFuncs))
for _, f := range commitFuncs {
tmpFunc := f
gopool.Submit(func() {
commitRes <- tmpFunc()
})
}
for i := 0; i < len(commitFuncs); i++ {
r := <-commitRes
if r != nil {
return common.Hash{}, r
}
s.snap, s.snapDestructs, s.snapAccounts, s.snapStorage = nil, nil, nil, nil
}
return root, err

return root, nil
}

// PrepareAccessList handles the preparatory steps for executing a state transition with

0 comments on commit 62904a0

Please sign in to comment.