Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

trie: pbss fix release v1.13.5 continue #621

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ const (
var (
hashT = reflect.TypeOf(Hash{})
addressT = reflect.TypeOf(Address{})

// MaxHash represents the maximum possible hash value.
MaxHash = HexToHash("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")
)

// Hash represents the 32 byte Keccak256 hash of arbitrary data.
Expand Down
16 changes: 5 additions & 11 deletions core/state/snapshot/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,22 +363,16 @@ func generateTrieRoot(db ethdb.KeyValueWriter, scheme string, it Iterator, accou

func stackTrieGenerate(db ethdb.KeyValueWriter, scheme string, owner common.Hash, in chan trieKV, out chan common.Hash) {

var nodeWriter trie.NodeWriteFunc
options := trie.NewStackTrieOptions()
// Implement nodeWriter in case db is existed otherwise let it be nil.
if db != nil {
nodeWriter = func(owner common.Hash, path []byte, hash common.Hash, blob []byte) {
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
rawdb.WriteTrieNode(db, owner, path, hash, blob, scheme)
}
})
}
t := trie.NewStackTrieWithOwner(nodeWriter, owner)
t := trie.NewStackTrie(options)
for leaf := range in {
t.TryUpdate(leaf.key[:], leaf.value)
}
var root common.Hash
if db == nil {
root = t.Hash()
} else {
root, _ = t.Commit()
}
out <- root
out <- t.Commit()
}
28 changes: 28 additions & 0 deletions eth/protocols/snap/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,32 @@ var (

IngressRegistrationErrorMeter = metrics.NewRegisteredMeter(ingressRegistrationErrorName, nil)
EgressRegistrationErrorMeter = metrics.NewRegisteredMeter(egressRegistrationErrorName, nil)

// deletionGauge is the metric to track how many trie node deletions
// are performed in total during the sync process.
deletionGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/delete", nil)

// lookupGauge is the metric to track how many trie node lookups are
// performed to determine if node needs to be deleted.
lookupGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/lookup", nil)

// boundaryAccountNodesGauge is the metric to track how many boundary trie
// nodes in account trie are met.
boundaryAccountNodesGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/boundary/account", nil)

// boundaryAccountNodesGauge is the metric to track how many boundary trie
// nodes in storage tries are met.
boundaryStorageNodesGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/boundary/storage", nil)

// smallStorageGauge is the metric to track how many storages are small enough
// to retrieved in one or two request.
smallStorageGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/storage/small", nil)

// largeStorageGauge is the metric to track how many storages are large enough
// to retrieved concurrently.
largeStorageGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/storage/large", nil)

// skipStorageHealingGauge is the metric to track how many storages are retrieved
// in multiple requests but healing is not necessary.
skipStorageHealingGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/storage/noheal", nil)
)
156 changes: 129 additions & 27 deletions eth/protocols/snap/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,19 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error {
}
}

// cleanPath is used to remove the dangling nodes in the stackTrie.
func (s *Syncer) cleanPath(batch ethdb.Batch, owner common.Hash, path []byte) {
if owner == (common.Hash{}) && rawdb.ExistsAccountTrieNode(s.db, path) {
rawdb.DeleteAccountTrieNode(batch, path)
deletionGauge.Inc(1)
}
if owner != (common.Hash{}) && rawdb.ExistsStorageTrieNode(s.db, owner, path) {
rawdb.DeleteStorageTrieNode(batch, owner, path)
deletionGauge.Inc(1)
}
lookupGauge.Inc(1)
}

// loadSyncStatus retrieves a previously aborted sync status from the database,
// or generates a fresh one if none is available.
func (s *Syncer) loadSyncStatus() {
Expand All @@ -721,9 +734,22 @@ func (s *Syncer) loadSyncStatus() {
s.accountBytes += common.StorageSize(len(key) + len(value))
},
}
task.genTrie = trie.NewStackTrie(func(owner common.Hash, path []byte, hash common.Hash, val []byte) {
rawdb.WriteTrieNode(task.genBatch, owner, path, hash, val, s.scheme)
options := trie.NewStackTrieOptions()
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
rawdb.WriteTrieNode(task.genBatch, common.Hash{}, path, hash, blob, s.scheme)
})
if s.scheme == rawdb.PathScheme {
// Configure the dangling node cleaner and also filter out boundary nodes
// only in the context of the path scheme. Deletion is forbidden in the
// hash scheme, as it can disrupt state completeness.
options = options.WithCleaner(func(path []byte) {
s.cleanPath(task.genBatch, common.Hash{}, path)
})
// Skip the left boundary if it's not the first range.
// Skip the right boundary if it's not the last range.
options = options.WithSkipBoundary(task.Next != (common.Hash{}), task.Last != common.MaxHash, boundaryAccountNodesGauge)
}
task.genTrie = trie.NewStackTrie(options)

for accountHash, subtasks := range task.SubTasks {
for _, subtask := range subtasks {
Expand All @@ -735,9 +761,24 @@ func (s *Syncer) loadSyncStatus() {
s.storageBytes += common.StorageSize(len(key) + len(value))
},
}
subtask.genTrie = trie.NewStackTrieWithOwner(func(owner common.Hash, path []byte, hash common.Hash, val []byte) {
rawdb.WriteTrieNode(subtask.genBatch, owner, path, hash, val, s.scheme)
}, accountHash)
owner := accountHash // local assignment for stacktrie writer closure
options := trie.NewStackTrieOptions()

options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
rawdb.WriteTrieNode(subtask.genBatch, owner, path, hash, blob, s.scheme)
})
if s.scheme == rawdb.PathScheme {
// Configure the dangling node cleaner and also filter out boundary nodes
// only in the context of the path scheme. Deletion is forbidden in the
// hash scheme, as it can disrupt state completeness.
options = options.WithCleaner(func(path []byte) {
s.cleanPath(subtask.genBatch, owner, path)
})
// Skip the left boundary if it's not the first range.
// Skip the right boundary if it's not the last range.
options = options.WithSkipBoundary(subtask.Next != common.Hash{}, subtask.Last != common.MaxHash, boundaryStorageNodesGauge)
}
subtask.genTrie = trie.NewStackTrie(options)
}
}
}
Expand Down Expand Up @@ -786,14 +827,27 @@ func (s *Syncer) loadSyncStatus() {
s.accountBytes += common.StorageSize(len(key) + len(value))
},
}
options := trie.NewStackTrieOptions()
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
rawdb.WriteTrieNode(batch, common.Hash{}, path, hash, blob, s.scheme)
})
if s.scheme == rawdb.PathScheme {
// Configure the dangling node cleaner and also filter out boundary nodes
// only in the context of the path scheme. Deletion is forbidden in the
// hash scheme, as it can disrupt state completeness.
options = options.WithCleaner(func(path []byte) {
s.cleanPath(batch, common.Hash{}, path)
})
// Skip the left boundary if it's not the first range.
// Skip the right boundary if it's not the last range.
options = options.WithSkipBoundary(next != common.Hash{}, last != common.MaxHash, boundaryAccountNodesGauge)
}
s.tasks = append(s.tasks, &accountTask{
Next: next,
Last: last,
SubTasks: make(map[common.Hash][]*storageTask),
genBatch: batch,
genTrie: trie.NewStackTrie(func(owner common.Hash, path []byte, hash common.Hash, val []byte) {
rawdb.WriteTrieNode(batch, owner, path, hash, val, s.scheme)
}),
genTrie: trie.NewStackTrie(options),
})
log.Debug("Created account sync task", "from", next, "last", last)
next = common.BigToHash(new(big.Int).Add(last.Big(), common.Big1))
Expand Down Expand Up @@ -1930,6 +1984,7 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
if res.subTask == nil && res.mainTask.needState[j] && (i < len(res.hashes)-1 || !res.cont) {
res.mainTask.needState[j] = false
res.mainTask.pend--
smallStorageGauge.Inc(1)
}
// If the last contract was chunked, mark it as needing healing
// to avoid writing it out to disk prematurely.
Expand Down Expand Up @@ -1965,22 +2020,37 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
log.Debug("Chunked large contract", "initiators", len(keys), "tail", lastKey, "chunks", chunks)
}
r := newHashRange(lastKey, chunks)

if chunks == 1 {
smallStorageGauge.Inc(1)
} else {
largeStorageGauge.Inc(1)
}
// Our first task is the one that was just filled by this response.
batch := ethdb.HookedBatch{
Batch: s.db.NewBatch(),
OnPut: func(key []byte, value []byte) {
s.storageBytes += common.StorageSize(len(key) + len(value))
},
}
owner := account // local assignment for stacktrie writer closure
options := trie.NewStackTrieOptions()
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
rawdb.WriteTrieNode(batch, owner, path, hash, blob, s.scheme)
})
if s.scheme == rawdb.PathScheme {
options = options.WithCleaner(func(path []byte) {
s.cleanPath(batch, owner, path)
})
// Keep the left boundary as it's the first range.
// Skip the right boundary if it's not the last range.
options = options.WithSkipBoundary(false, r.End() != common.MaxHash, boundaryStorageNodesGauge)
}
tasks = append(tasks, &storageTask{
Next: common.Hash{},
Last: r.End(),
root: acc.Root,
genBatch: batch,
genTrie: trie.NewStackTrieWithOwner(func(owner common.Hash, path []byte, hash common.Hash, val []byte) {
rawdb.WriteTrieNode(batch, owner, path, hash, val, s.scheme)
}, account),
genTrie: trie.NewStackTrie(options),
})
for r.Next() {
batch := ethdb.HookedBatch{
Expand All @@ -1989,14 +2059,27 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
s.storageBytes += common.StorageSize(len(key) + len(value))
},
}
options := trie.NewStackTrieOptions()
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
rawdb.WriteTrieNode(batch, owner, path, hash, blob, s.scheme)
})
if s.scheme == rawdb.PathScheme {
// Configure the dangling node cleaner and also filter out boundary nodes
// only in the context of the path scheme. Deletion is forbidden in the
// hash scheme, as it can disrupt state completeness.
options = options.WithCleaner(func(path []byte) {
s.cleanPath(batch, owner, path)
})
// Skip the left boundary as it's not the first range
// Skip the right boundary if it's not the last range.
options = options.WithSkipBoundary(true, r.End() != common.MaxHash, boundaryStorageNodesGauge)
}
tasks = append(tasks, &storageTask{
Next: r.Start(),
Last: r.End(),
root: acc.Root,
genBatch: batch,
genTrie: trie.NewStackTrieWithOwner(func(owner common.Hash, path []byte, hash common.Hash, val []byte) {
rawdb.WriteTrieNode(batch, owner, path, hash, val, s.scheme)
}, account),
genTrie: trie.NewStackTrie(options),
})
}
for _, task := range tasks {
Expand Down Expand Up @@ -2041,9 +2124,23 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
slots += len(res.hashes[i])

if i < len(res.hashes)-1 || res.subTask == nil {
tr := trie.NewStackTrieWithOwner(func(owner common.Hash, path []byte, hash common.Hash, val []byte) {
rawdb.WriteTrieNode(batch, owner, path, hash, val, s.scheme)
}, account)
// no need to make local reassignment of account: this closure does not outlive the loop
options := trie.NewStackTrieOptions()
options = options.WithWriter(func(path []byte, hash common.Hash, blob []byte) {
rawdb.WriteTrieNode(batch, account, path, hash, blob, s.scheme)
})
if s.scheme == rawdb.PathScheme {
// Configure the dangling node cleaner only in the context of the
// path scheme. Deletion is forbidden in the hash scheme, as it can
// disrupt state completeness.
//
// Notably, boundary nodes can be also kept because the whole storage
// trie is complete.
options = options.WithCleaner(func(path []byte) {
s.cleanPath(batch, account, path)
})
}
tr := trie.NewStackTrie(options)
for j := 0; j < len(res.hashes[i]); j++ {
tr.Update(res.hashes[i][j][:], res.slots[i][j])
}
Expand All @@ -2065,18 +2162,25 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
// Large contracts could have generated new trie nodes, flush them to disk
if res.subTask != nil {
if res.subTask.done {
if root, err := res.subTask.genTrie.Commit(); err != nil {
log.Error("Failed to commit stack slots", "err", err)
} else if root == res.subTask.root {
// If the chunk's root is an overflown but full delivery, clear the heal request
root := res.subTask.genTrie.Commit()
if err := res.subTask.genBatch.Write(); err != nil {
log.Error("Failed to persist stack slots", "err", err)
}
res.subTask.genBatch.Reset()

// If the chunk's root is an overflown but full delivery,
// clear the heal request.
accountHash := res.accounts[len(res.accounts)-1]
if root == res.subTask.root && rawdb.HasStorageTrieNode(s.db, accountHash, nil, root) {
for i, account := range res.mainTask.res.hashes {
if account == res.accounts[len(res.accounts)-1] {
if account == accountHash {
res.mainTask.needHeal[i] = false
skipStorageHealingGauge.Inc(1)
}
}
}
}
if res.subTask.genBatch.ValueSize() > ethdb.IdealBatchSize || res.subTask.done {
if res.subTask.genBatch.ValueSize() > ethdb.IdealBatchSize {
if err := res.subTask.genBatch.Write(); err != nil {
log.Error("Failed to persist stack slots", "err", err)
}
Expand Down Expand Up @@ -2283,9 +2387,7 @@ func (s *Syncer) forwardAccountTask(task *accountTask) {
// flush after finalizing task.done. It's fine even if we crash and lose this
// write as it will only cause more data to be downloaded during heal.
if task.done {
if _, err := task.genTrie.Commit(); err != nil {
log.Error("Failed to commit stack account", "err", err)
}
task.genTrie.Commit()
}
if task.genBatch.ValueSize() > ethdb.IdealBatchSize || task.done {
if err := task.genBatch.Write(); err != nil {
Expand Down
19 changes: 8 additions & 11 deletions tests/fuzzers/stacktrie/trie_fuzzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,10 @@ func (f *fuzzer) fuzz() int {
trieA = trie.NewEmpty(dbA)
spongeB = &spongeDb{sponge: sha3.NewLegacyKeccak256()}
dbB = trie.NewDatabase(rawdb.NewDatabase(spongeB), nil)
trieB = trie.NewStackTrie(func(owner common.Hash, path []byte, hash common.Hash, blob []byte) {
rawdb.WriteTrieNode(spongeB, owner, path, hash, blob, dbB.Scheme())
options = trie.NewStackTrieOptions().WithWriter(func(path []byte, hash common.Hash, blob []byte) {
rawdb.WriteTrieNode(spongeB, common.Hash{}, path, hash, blob, dbB.Scheme())
})
trieB = trie.NewStackTrie(options)
vals kvs
useful bool
maxElements = 10000
Expand Down Expand Up @@ -203,9 +204,7 @@ func (f *fuzzer) fuzz() int {
trieB.Update(kv.k, kv.v)
}
rootB := trieB.Hash()
if _, err := trieB.Commit(); err != nil {
panic(err)
}
trieB.Commit()
if rootA != rootB {
panic(fmt.Sprintf("roots differ: (trie) %x != %x (stacktrie)", rootA, rootB))
}
Expand All @@ -217,22 +216,20 @@ func (f *fuzzer) fuzz() int {
// Ensure all the nodes are persisted correctly
// Need tracked deleted nodes.
var (
nodeset = make(map[string][]byte) // path -> blob
trieC = trie.NewStackTrie(func(owner common.Hash, path []byte, hash common.Hash, blob []byte) {
nodeset = make(map[string][]byte) // path -> blob
optionsC = trie.NewStackTrieOptions().WithWriter(func(path []byte, hash common.Hash, blob []byte) {
if crypto.Keccak256Hash(blob) != hash {
panic("invalid node blob")
}
if owner != (common.Hash{}) {
panic("invalid node owner")
}
nodeset[string(path)] = common.CopyBytes(blob)
})
trieC = trie.NewStackTrie(optionsC)
checked int
)
for _, kv := range vals {
trieC.Update(kv.k, kv.v)
}
rootC, _ := trieC.Commit()
rootC := trieC.Commit()
if rootA != rootC {
panic(fmt.Sprintf("roots differ: (trie) %x != %x (stacktrie)", rootA, rootC))
}
Expand Down
4 changes: 4 additions & 0 deletions trie/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ type kv struct {
t bool
}

func (k *kv) cmp(other *kv) int {
return bytes.Compare(k.k, other.k)
}

func TestIteratorLargeData(t *testing.T) {
trie := NewEmpty(NewDatabase(rawdb.NewMemoryDatabase(), nil))
vals := make(map[string]*kv)
Expand Down
2 changes: 1 addition & 1 deletion trie/proof.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ func VerifyRangeProof(rootHash common.Hash, firstKey []byte, lastKey []byte, key
if proof == nil {
tr := NewStackTrie(nil)
for index, key := range keys {
tr.TryUpdate(key, values[index])
tr.Update(key, values[index])
}
if have, want := tr.Hash(), rootHash; have != want {
return false, fmt.Errorf("invalid proof, want hash %x, got %x", want, have)
Expand Down
Loading
Loading