Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

trie: parallel commit (alternative version) #30545

Merged
merged 15 commits into from
Oct 14, 2024
38 changes: 30 additions & 8 deletions trie/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package trie

import (
"fmt"
"sync"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/trie/trienode"
Expand All @@ -42,12 +43,12 @@ func newCommitter(nodeset *trienode.NodeSet, tracer *tracer, collectLeaf bool) *
}

// Commit collapses a node down into a hash node.
func (c *committer) Commit(n node) hashNode {
return c.commit(nil, n).(hashNode)
func (c *committer) Commit(n node, parallel bool) hashNode {
return c.commit(nil, n, parallel).(hashNode)
}

// commit collapses a node down into a hash node and returns it.
func (c *committer) commit(path []byte, n node) node {
func (c *committer) commit(path []byte, n node, parallel bool) node {
// if this path is clean, use available cached data
hash, dirty := n.cache()
if hash != nil && !dirty {
Expand All @@ -62,7 +63,7 @@ func (c *committer) commit(path []byte, n node) node {
// If the child is fullNode, recursively commit,
// otherwise it can only be hashNode or valueNode.
if _, ok := cn.Val.(*fullNode); ok {
collapsed.Val = c.commit(append(path, cn.Key...), cn.Val)
collapsed.Val = c.commit(append(path, cn.Key...), cn.Val, false)
}
// The key needs to be copied, since we're adding it to the
// modified nodeset.
Expand All @@ -73,7 +74,7 @@ func (c *committer) commit(path []byte, n node) node {
}
return collapsed
case *fullNode:
hashedKids := c.commitChildren(path, cn)
hashedKids := c.commitChildren(path, cn, parallel)
collapsed := cn.copy()
collapsed.Children = hashedKids

Expand All @@ -91,8 +92,12 @@ func (c *committer) commit(path []byte, n node) node {
}

// commitChildren commits the children of the given fullnode
func (c *committer) commitChildren(path []byte, n *fullNode) [17]node {
var children [17]node
func (c *committer) commitChildren(path []byte, n *fullNode, parallel bool) [17]node {
var (
wg sync.WaitGroup
nodesMu sync.Mutex
children [17]node
)
for i := 0; i < 16; i++ {
child := n.Children[i]
if child == nil {
Expand All @@ -108,7 +113,24 @@ func (c *committer) commitChildren(path []byte, n *fullNode) [17]node {
// Commit the child recursively and store the "hashed" value.
// Note the returned node can be some embedded nodes, so it's
// possible the type is not hashNode.
children[i] = c.commit(append(path, byte(i)), child)
if !parallel {
children[i] = c.commit(append(path, byte(i)), child, false)
} else {
wg.Add(1)
go func(index int) {
p := append(path, byte(index))
childSet := trienode.NewNodeSet(c.nodes.Owner)
childCommitter := newCommitter(childSet, c.tracer, c.collectLeaf)
children[index] = childCommitter.commit(p, child, false)
nodesMu.Lock()
c.nodes.MergeSet(childSet)
nodesMu.Unlock()
wg.Done()
}(i)
}
}
if parallel {
wg.Wait()
}
// For the 17th child, it's possible the type is valuenode.
if n.Children[16] != nil {
Expand Down
23 changes: 16 additions & 7 deletions trie/trie.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ type Trie struct {
// actually unhashed nodes.
unhashed int

// uncommitted is the number of updates since last commit.
uncommitted int

// reader is the handler trie can retrieve nodes from.
reader *trieReader

Expand All @@ -64,12 +67,13 @@ func (t *Trie) newFlag() nodeFlag {
// Copy returns a copy of Trie.
func (t *Trie) Copy() *Trie {
return &Trie{
root: t.root,
owner: t.owner,
committed: t.committed,
unhashed: t.unhashed,
reader: t.reader,
tracer: t.tracer.copy(),
root: t.root,
owner: t.owner,
committed: t.committed,
reader: t.reader,
tracer: t.tracer.copy(),
uncommitted: t.uncommitted,
unhashed: t.unhashed,
}
}

Expand Down Expand Up @@ -309,6 +313,7 @@ func (t *Trie) Update(key, value []byte) error {

func (t *Trie) update(key, value []byte) error {
t.unhashed++
t.uncommitted++
k := keybytesToHex(key)
if len(value) != 0 {
_, n, err := t.insert(t.root, nil, k, valueNode(value))
Expand Down Expand Up @@ -422,6 +427,7 @@ func (t *Trie) Delete(key []byte) error {
if t.committed {
return ErrCommitted
}
t.uncommitted++
t.unhashed++
k := keybytesToHex(key)
_, n, err := t.delete(t.root, nil, k)
Expand Down Expand Up @@ -642,7 +648,9 @@ func (t *Trie) Commit(collectLeaf bool) (common.Hash, *trienode.NodeSet) {
for _, path := range t.tracer.deletedNodes() {
nodes.AddNode([]byte(path), trienode.NewDeleted())
}
t.root = newCommitter(nodes, t.tracer, collectLeaf).Commit(t.root)
// If the number of changes is below 100, we let one thread handle it
t.root = newCommitter(nodes, t.tracer, collectLeaf).Commit(t.root, t.uncommitted > 100)
t.uncommitted = 0
return rootHash, nodes
}

Expand Down Expand Up @@ -678,6 +686,7 @@ func (t *Trie) Reset() {
t.root = nil
t.owner = common.Hash{}
t.unhashed = 0
t.uncommitted = 0
t.tracer.reset()
t.committed = false
}
104 changes: 104 additions & 0 deletions trie/trie_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"math/rand"
"reflect"
"sort"
"strings"
"testing"
"testing/quick"

Expand All @@ -35,6 +36,7 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/internal/testrand"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie/trienode"
"github.com/holiman/uint256"
Expand Down Expand Up @@ -1206,3 +1208,105 @@ func FuzzTrie(f *testing.F) {
}
})
}

func BenchmarkCommit(b *testing.B) {
benchmarkCommit(b, 100)
benchmarkCommit(b, 500)
benchmarkCommit(b, 2000)
benchmarkCommit(b, 5000)
}

func benchmarkCommit(b *testing.B, n int) {
b.Run(fmt.Sprintf("commit-%vnodes-sequential", n), func(b *testing.B) {
testCommit(b, n, false)
})
b.Run(fmt.Sprintf("commit-%vnodes-parallel", n), func(b *testing.B) {
testCommit(b, n, true)
})
}

func testCommit(b *testing.B, n int, parallel bool) {
tries := make([]*Trie, b.N)
for i := 0; i < b.N; i++ {
tries[i] = NewEmpty(nil)
for j := 0; j < n; j++ {
key := testrand.Bytes(32)
val := testrand.Bytes(32)
tries[i].Update(key, val)
}
tries[i].Hash()
if !parallel {
tries[i].uncommitted = 0
}
}
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < len(tries); i++ {
tries[i].Commit(true)
}
}

func TestCommitCorrect(t *testing.T) {
var paraTrie = NewEmpty(nil)
var refTrie = NewEmpty(nil)

for j := 0; j < 5000; j++ {
key := testrand.Bytes(32)
val := testrand.Bytes(32)
paraTrie.Update(key, val)
refTrie.Update(common.CopyBytes(key), common.CopyBytes(val))
}
paraTrie.Hash()
refTrie.Hash()
refTrie.uncommitted = 0

haveRoot, haveNodes := paraTrie.Commit(true)
wantRoot, wantNodes := refTrie.Commit(true)

if haveRoot != wantRoot {
t.Fatalf("have %x want %x", haveRoot, wantRoot)
}
have := printSet(haveNodes)
want := printSet(wantNodes)
if have != want {
i := 0
for i = 0; i < len(have); i++ {
if have[i] != want[i] {
break
}
}
if i > 100 {
i -= 100
}
t.Fatalf("have != want\nhave %q\nwant %q", have[i:], want[i:])
}
}
func printSet(set *trienode.NodeSet) string {
var out = new(strings.Builder)
fmt.Fprintf(out, "nodeset owner: %v\n", set.Owner)
var paths []string
for k := range set.Nodes {
paths = append(paths, k)
}
sort.Strings(paths)

for _, path := range paths {
n := set.Nodes[path]
// Deletion
if n.IsDeleted() {
fmt.Fprintf(out, " [-]: %x\n", path)
continue
}
// Insertion or update
fmt.Fprintf(out, " [+/*]: %x -> %v \n", path, n.Hash)
}
sort.Slice(set.Leaves, func(i, j int) bool {
a := set.Leaves[i]
b := set.Leaves[j]
return bytes.Compare(a.Parent[:], b.Parent[:]) < 0
})
for _, n := range set.Leaves {
fmt.Fprintf(out, "[leaf]: %v\n", n)
}
return out.String()
}
18 changes: 18 additions & 0 deletions trie/trienode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package trienode

import (
"fmt"
"maps"
"sort"
"strings"

Expand Down Expand Up @@ -99,6 +100,23 @@ func (set *NodeSet) AddNode(path []byte, n *Node) {
set.Nodes[string(path)] = n
}

// MergeSet merges this 'set' with 'other'. It assumes that the sets are disjoint,
// and thus does not deduplicate data (count deletes, dedup leaves etc).
func (set *NodeSet) MergeSet(other *NodeSet) error {
if set.Owner != other.Owner {
return fmt.Errorf("nodesets belong to different owner are not mergeable %x-%x", set.Owner, other.Owner)
}
maps.Copy(set.Nodes, other.Nodes)

set.deletes += other.deletes
set.updates += other.updates

// Since we assume the sets are disjoint, we can safely append leaves
// like this without deduplication.
set.Leaves = append(set.Leaves, other.Leaves...)
return nil
}

// Merge adds a set of nodes into the set.
func (set *NodeSet) Merge(owner common.Hash, nodes map[string]*Node) error {
if set.Owner != owner {
Expand Down