From 4e73156e74f68f204fd25873786f6586c9f87b2e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 15 Apr 2021 01:33:36 +0300 Subject: [PATCH 01/17] eth/protocols/snap: generate storage trie from full dirty snap data --- eth/protocols/snap/sync.go | 114 +++++++++++++++++++++---------------- 1 file changed, 65 insertions(+), 49 deletions(-) diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index cff1a77e6c12..7460d9528d9a 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -209,12 +209,8 @@ type storageResponse struct { hashes [][]common.Hash // Storage slot hashes in the returned range slots [][][]byte // Storage slot values in the returned range nodes []ethdb.KeyValueStore // Database containing the reconstructed trie nodes - tries []*trie.Trie // Reconstructed tries to reject overflown slots - // Fields relevant for the last account only - bounds map[common.Hash]struct{} // Boundary nodes to avoid persisting (incomplete) - overflow *light.NodeSet // Overflow nodes to avoid persisting across chunk boundaries - cont bool // Whether the last storage range has a continuation + cont bool // Whether the last storage range has a continuation } // trienodeHealRequest tracks a pending state trie request to ensure responses @@ -359,7 +355,7 @@ type SyncPeer interface { // trie, starting with the origin. RequestAccountRange(id uint64, root, origin, limit common.Hash, bytes uint64) error - // RequestStorageRange fetches a batch of storage slots belonging to one or + // RequestStorageRanges fetches a batch of storage slots belonging to one or // more accounts. If slots from only one accout is requested, an origin marker // may also be used to retrieve from there. RequestStorageRanges(id uint64, root common.Hash, accounts []common.Hash, origin, limit []byte, bytes uint64) error @@ -792,6 +788,16 @@ func (s *Syncer) cleanStorageTasks() { delete(task.SubTasks, account) task.pend-- + // Upon large account completion, generate the trie using whatever + // data we have. Most of the time it will the the complete perfect + // data. If the pivot moved during sync, we'll have some portions + // dirty that we'll fix during healing. + root := s.generateStorageTrie(account) + for j, hash := range task.res.hashes { + if hash == account && task.res.accounts[j].Root == root { + task.needHeal[j] = false + } + } // If this was the last pending task, forward the account task if task.pend == 0 { s.forwardAccountTask(task) @@ -800,6 +806,42 @@ func (s *Syncer) cleanStorageTasks() { } } +// generateStorageTrie iterates the dirty snapshot of an account storage and +// creates the state trie nodes for it with a stack trie. +func (s *Syncer) generateStorageTrie(account common.Hash) common.Hash { + // Iterate over all the dirty storage slots of the account + it := s.db.NewIterator(append(rawdb.SnapshotStoragePrefix, account.Bytes()...), nil) + defer it.Release() + + // Pass all the slots through a stack trie, periodically flushing to disk + // when too much data accumulates. + batch := s.db.NewBatch() + + t := trie.NewStackTrie(batch) + for it.Next() { + t.TryUpdate(it.Key(), common.CopyBytes(it.Value())) + + if batch.ValueSize() > ethdb.IdealBatchSize { + s.storageBytes += common.StorageSize(batch.ValueSize()) + if err := batch.Write(); err != nil { + log.Error("Failed to write storage trie data", "err", err) + } + batch.Reset() + } + } + // Finalize the trie to retrieve its root hash and bubble it up to decide if + // account healing is still needed or not any more + root, err := t.Commit() + if err != nil { + log.Error("Failed to commit storage trie", "err", err) + } + s.storageBytes += common.StorageSize(batch.ValueSize()) + if err := batch.Write(); err != nil { + log.Error("Failed to finalize storage trie", "err", err) + } + return root +} + // assignAccountTasks attempts to match idle peers to pending account range // retrievals. func (s *Syncer) assignAccountTasks(success chan *accountResponse, fail chan *accountRequest, cancel chan struct{}) { @@ -1681,7 +1723,6 @@ func (s *Syncer) processBytecodeResponse(res *bytecodeResponse) { var ( codes uint64 - bytes common.StorageSize ) for i, hash := range res.hashes { code := res.codes[i] @@ -1699,17 +1740,16 @@ func (s *Syncer) processBytecodeResponse(res *bytecodeResponse) { } } // Push the bytecode into a database batch - s.bytecodeSynced++ - s.bytecodeBytes += common.StorageSize(len(code)) - codes++ - bytes += common.StorageSize(len(code)) - rawdb.WriteCode(batch, hash, code) } + bytes := common.StorageSize(batch.ValueSize()) if err := batch.Write(); err != nil { log.Crit("Failed to persist bytecodes", "err", err) } + s.bytecodeSynced += codes + s.bytecodeBytes += bytes + log.Debug("Persisted set of bytecodes", "count", codes, "bytes", bytes) // If this delivery completed the last pending task, forward the account task @@ -1735,7 +1775,6 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { slots int nodes int skipped int - bytes common.StorageSize ) // Iterate over all the accounts and reconstruct their storage tries from the // delivered slots @@ -1814,12 +1853,7 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { continue } if cmp > 0 { - // Chunk overflown, cut off excess, but also update the boundary - for l := k; l < len(res.hashes[i]); l++ { - if err := res.tries[i].Prove(res.hashes[i][l][:], 0, res.overflow); err != nil { - panic(err) // Account range was already proven, what happened - } - } + // Chunk overflown, cut off excess res.hashes[i] = res.hashes[i][:k] res.slots[i] = res.slots[i][:k] res.cont = false // Mark range completed @@ -1835,37 +1869,26 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { } } // Iterate over all the reconstructed trie nodes and push them to disk + // if the contract is fully delivered. If it's chunked, the trie nodes + // will be reconstructed later. slots += len(res.hashes[i]) - it := res.nodes[i].NewIterator(nil, nil) - for it.Next() { - // Boundary nodes are not written for the last result, since they are incomplete - if i == len(res.hashes)-1 && res.subTask != nil { - if _, ok := res.bounds[common.BytesToHash(it.Key())]; ok { - skipped++ - continue - } - if _, err := res.overflow.Get(it.Key()); err == nil { - skipped++ - continue - } + if i < len(res.hashes)-1 || res.subTask == nil { + it := res.nodes[i].NewIterator(nil, nil) + for it.Next() { + batch.Put(it.Key(), it.Value()) + nodes++ } - // Node is not a boundary, persist to disk - batch.Put(it.Key(), it.Value()) - - bytes += common.StorageSize(common.HashLength + len(it.Value())) - nodes++ + it.Release() } - it.Release() - // Persist the received storage segements. These flat state maybe // outdated during the sync, but it can be fixed later during the // snapshot generation. for j := 0; j < len(res.hashes[i]); j++ { rawdb.WriteStorageSnapshot(batch, account, res.hashes[i][j], res.slots[i][j]) - bytes += common.StorageSize(1 + 2*common.HashLength + len(res.slots[i][j])) } } + bytes := common.StorageSize(batch.ValueSize()) if err := batch.Write(); err != nil { log.Crit("Failed to persist storage slots", "err", err) } @@ -1995,7 +2018,6 @@ func (s *Syncer) forwardAccountTask(task *accountTask) { var ( nodes int skipped int - bytes common.StorageSize ) it := res.nodes.NewIterator(nil, nil) for it.Next() { @@ -2016,8 +2038,6 @@ func (s *Syncer) forwardAccountTask(task *accountTask) { } // Node is neither a boundary, not an incomplete account, persist to disk batch.Put(it.Key(), it.Value()) - - bytes += common.StorageSize(common.HashLength + len(it.Value())) nodes++ } it.Release() @@ -2028,8 +2048,8 @@ func (s *Syncer) forwardAccountTask(task *accountTask) { for i, hash := range res.hashes { blob := snapshot.SlimAccountRLP(res.accounts[i].Nonce, res.accounts[i].Balance, res.accounts[i].Root, res.accounts[i].CodeHash) rawdb.WriteAccountSnapshot(batch, hash, blob) - bytes += common.StorageSize(1 + common.HashLength + len(blob)) } + bytes := common.StorageSize(batch.ValueSize()) if err := batch.Write(); err != nil { log.Crit("Failed to persist accounts", "err", err) } @@ -2355,7 +2375,6 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo // Reconstruct the partial tries from the response and verify them var ( dbs = make([]ethdb.KeyValueStore, len(hashes)) - tries = make([]*trie.Trie, len(hashes)) notary *trie.KeyValueNotary cont bool ) @@ -2375,7 +2394,7 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo if len(nodes) == 0 { // No proof has been attached, the response must cover the entire key // space and hash to the origin root. - dbs[i], tries[i], _, _, err = trie.VerifyRangeProof(req.roots[i], nil, nil, keys, slots[i], nil) + dbs[i], _, _, _, err = trie.VerifyRangeProof(req.roots[i], nil, nil, keys, slots[i], nil) if err != nil { s.scheduleRevertStorageRequest(req) // reschedule request logger.Warn("Storage slots failed proof", "err", err) @@ -2390,7 +2409,7 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo if len(keys) > 0 { end = keys[len(keys)-1] } - dbs[i], tries[i], notary, cont, err = trie.VerifyRangeProof(req.roots[i], req.origin[:], end, keys, slots[i], proofdb) + dbs[i], _, notary, cont, err = trie.VerifyRangeProof(req.roots[i], req.origin[:], end, keys, slots[i], proofdb) if err != nil { s.scheduleRevertStorageRequest(req) // reschedule request logger.Warn("Storage range failed proof", "err", err) @@ -2416,9 +2435,6 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo hashes: hashes, slots: slots, nodes: dbs, - tries: tries, - bounds: bounds, - overflow: light.NewNodeSet(), cont: cont, } select { From 9ee2ad5ba85a9d96321a514772c1f0752503f919 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 15 Apr 2021 02:40:49 +0300 Subject: [PATCH 02/17] eth/protocols/snap: get rid of some more dead code --- eth/protocols/snap/sync.go | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index 7460d9528d9a..0e3844e40e73 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -2374,9 +2374,8 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo // Reconstruct the partial tries from the response and verify them var ( - dbs = make([]ethdb.KeyValueStore, len(hashes)) - notary *trie.KeyValueNotary - cont bool + dbs = make([]ethdb.KeyValueStore, len(hashes)) + cont bool ) for i := 0; i < len(hashes); i++ { // Convert the keys and proofs into an internal format @@ -2409,7 +2408,7 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo if len(keys) > 0 { end = keys[len(keys)-1] } - dbs[i], _, notary, cont, err = trie.VerifyRangeProof(req.roots[i], req.origin[:], end, keys, slots[i], proofdb) + dbs[i], _, _, cont, err = trie.VerifyRangeProof(req.roots[i], req.origin[:], end, keys, slots[i], proofdb) if err != nil { s.scheduleRevertStorageRequest(req) // reschedule request logger.Warn("Storage range failed proof", "err", err) @@ -2418,15 +2417,6 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo } } // Partial tries reconstructed, send them to the scheduler for storage filling - bounds := make(map[common.Hash]struct{}) - - if notary != nil { // if all contract storages are delivered in full, no notary will be created - it := notary.Accessed().NewIterator(nil, nil) - for it.Next() { - bounds[common.BytesToHash(it.Key())] = struct{}{} - } - it.Release() - } response := &storageResponse{ mainTask: req.mainTask, subTask: req.subTask, From d5c95a3360de46630e3bed8568fd659fd8ee0999 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 15 Apr 2021 12:53:00 +0300 Subject: [PATCH 03/17] eth/protocols/snap: less frequent logs, also log during trie generation --- eth/protocols/snap/sync.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index 0e3844e40e73..bf16f7ea500a 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -827,6 +827,10 @@ func (s *Syncer) generateStorageTrie(account common.Hash) common.Hash { log.Error("Failed to write storage trie data", "err", err) } batch.Reset() + + // Occasionally show a log messge since this path can take many minutes + // TODO(karalabe): Do we want to support interrupting this method? + s.reportSyncProgress(false) } } // Finalize the trie to retrieve its root hash and bubble it up to decide if @@ -2664,7 +2668,7 @@ func (s *Syncer) report(force bool) { // reportSyncProgress calculates various status reports and provides it to the user. func (s *Syncer) reportSyncProgress(force bool) { // Don't report all the events, just occasionally - if !force && time.Since(s.logTime) < 3*time.Second { + if !force && time.Since(s.logTime) < 8*time.Second { return } // Don't report anything until we have a meaningful progress @@ -2703,7 +2707,7 @@ func (s *Syncer) reportSyncProgress(force bool) { // reportHealProgress calculates various status reports and provides it to the user. func (s *Syncer) reportHealProgress(force bool) { // Don't report all the events, just occasionally - if !force && time.Since(s.logTime) < 3*time.Second { + if !force && time.Since(s.logTime) < 8*time.Second { return } s.logTime = time.Now() From c228245b1650526c480d26541f9088fa510f552a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 15 Apr 2021 17:13:34 +0300 Subject: [PATCH 04/17] eth/protocols/snap: implement dirty account range stack-hashing --- eth/protocols/snap/sync.go | 198 +++++++++++++++++++++---------------- 1 file changed, 114 insertions(+), 84 deletions(-) diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index bf16f7ea500a..a0400b258113 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -127,12 +127,6 @@ type accountResponse struct { hashes []common.Hash // Account hashes in the returned range accounts []*state.Account // Expanded accounts in the returned range - nodes ethdb.KeyValueStore // Database containing the reconstructed trie nodes - trie *trie.Trie // Reconstructed trie to reject incomplete account paths - - bounds map[common.Hash]struct{} // Boundary nodes to avoid persisting incomplete accounts - overflow *light.NodeSet // Overflow nodes to avoid persisting across chunk boundaries - cont bool // Whether the account range has a continuation } @@ -760,6 +754,8 @@ func (s *Syncer) cleanAccountTasks() { s.lock.Lock() s.snapped = true s.lock.Unlock() + + s.generateAccountTrie() // TODO(karalabe): Do we need to dedup this somehow, interrupt? } } @@ -806,6 +802,83 @@ func (s *Syncer) cleanStorageTasks() { } } +// generateAccountTrie iterates the dirty snapshot of the accounts and creates +// the state trie nodes for it with a stack trie. +func (s *Syncer) generateAccountTrie() { + // Iterate over all the dirty accounts + it := s.db.NewIterator(rawdb.SnapshotAccountPrefix, nil) + defer it.Release() + + // Pass all the accounts through a stack trie, periodically flushing to disk + // when too much data accumulates. + var ( + triedb = trie.NewDatabase(s.db) + batch = s.db.NewBatch() + t = trie.NewStackTrie(batch) + + start = time.Now() + logged = time.Now() + ) + for it.Next() { + // Skip anything that's not an account snapshot item + if len(it.Key()) != 1+common.HashLength { + continue + } + // Check the bytecode and storage of an account before dropping it into + // the trie. Gaps will hurt healing a lot, but we need to fix those gaps + // either way. + account, err := snapshot.FullAccount(it.Value()) + if err != nil { + log.Error("Failed to decode account", "err", err) + continue + } + if codehash := common.BytesToHash(account.CodeHash); codehash != emptyCode { + if code := rawdb.ReadCode(s.db, codehash); len(code) == 0 { + continue // Leave gap for account with missing code + } + } + if roothash := common.BytesToHash(account.Root); roothash != emptyRoot { + if _, err := trie.New(roothash, triedb); err != nil { + continue // Leave gap for account with missing storage + } + } + // Account seems to be complete, insert it into the state trie + acchash := it.Key()[1:] + t.TryUpdate(acchash, common.CopyBytes(it.Value())) + + if batch.ValueSize() > ethdb.IdealBatchSize { + s.accountBytes += common.StorageSize(batch.ValueSize()) + if err := batch.Write(); err != nil { + log.Error("Failed to write account trie data", "err", err) + } + batch.Reset() + + // Occasionally show a log message since this path can take many minutes + if time.Since(logged) > 8*time.Second { + var ( + pos = new(big.Int).SetBytes(acchash) + perc = float64(new(big.Int).Div(new(big.Int).Mul(pos, big10000), hashSpace).Uint64()) / 100 + prog = fmt.Sprintf("%.2f%%", perc) + eta = time.Duration(float64(time.Since(start)) * (100 - perc) / perc) + ) + log.Info("Generating account state trie", "generated", prog, "eta", common.PrettyDuration(eta)) + logged = time.Now() + } + s.reportSyncProgress(false) + + // TODO(karalabe): Do we want to support interrupting this method? + } + } + // Finalize the trie + if _, err := t.Commit(); err != nil { + log.Error("Failed to commit account trie", "err", err) + } + s.accountBytes += common.StorageSize(batch.ValueSize()) + if err := batch.Write(); err != nil { + log.Error("Failed to finalize account trie", "err", err) + } +} + // generateStorageTrie iterates the dirty snapshot of an account storage and // creates the state trie nodes for it with a stack trie. func (s *Syncer) generateStorageTrie(account common.Hash) common.Hash { @@ -815,11 +888,20 @@ func (s *Syncer) generateStorageTrie(account common.Hash) common.Hash { // Pass all the slots through a stack trie, periodically flushing to disk // when too much data accumulates. - batch := s.db.NewBatch() + var ( + batch = s.db.NewBatch() + t = trie.NewStackTrie(batch) - t := trie.NewStackTrie(batch) + start = time.Now() + logged = time.Now() + ) for it.Next() { - t.TryUpdate(it.Key(), common.CopyBytes(it.Value())) + // Skip anything that's not an storage snapshot item + if len(it.Key()) != 1+2*common.HashLength { + continue + } + slothash := it.Key()[1+common.HashLength:] + t.TryUpdate(slothash, common.CopyBytes(it.Value())) if batch.ValueSize() > ethdb.IdealBatchSize { s.storageBytes += common.StorageSize(batch.ValueSize()) @@ -828,9 +910,20 @@ func (s *Syncer) generateStorageTrie(account common.Hash) common.Hash { } batch.Reset() - // Occasionally show a log messge since this path can take many minutes - // TODO(karalabe): Do we want to support interrupting this method? + // Occasionally show a log message since this path can take many minutes + if time.Since(logged) > 8*time.Second { + var ( + pos = new(big.Int).SetBytes(slothash) + perc = float64(new(big.Int).Div(new(big.Int).Mul(pos, big10000), hashSpace).Uint64()) / 100 + prog = fmt.Sprintf("%.2f%%", perc) + eta = time.Duration(float64(time.Since(start)) * (100 - perc) / perc) + ) + log.Info("Generating contract state trie", "generated", prog, "eta", common.PrettyDuration(eta)) + logged = time.Now() + } s.reportSyncProgress(false) + + // TODO(karalabe): Do we want to support interrupting this method? } } // Finalize the trie to retrieve its root hash and bubble it up to decide if @@ -1646,12 +1739,7 @@ func (s *Syncer) processAccountResponse(res *accountResponse) { continue } if cmp > 0 { - // Chunk overflown, cut off excess, but also update the boundary nodes - for j := i; j < len(res.hashes); j++ { - if err := res.trie.Prove(res.hashes[j][:], 0, res.overflow); err != nil { - panic(err) // Account range was already proven, what happened - } - } + // Chunk overflown, cut off excess res.hashes = res.hashes[:i] res.accounts = res.accounts[:i] res.cont = false // Mark range completed @@ -1994,62 +2082,14 @@ func (s *Syncer) forwardAccountTask(task *accountTask) { } task.res = nil - // Iterate over all the accounts and gather all the incomplete trie nodes. A - // node is incomplete if we haven't yet filled it (sync was interrupted), or - // if we filled it in multiple chunks (storage trie), in which case the few - // nodes on the chunk boundaries are missing. - incompletes := light.NewNodeSet() - for i := range res.accounts { - // If the filling was interrupted, mark everything after as incomplete - if task.needCode[i] || task.needState[i] { - for j := i; j < len(res.accounts); j++ { - if err := res.trie.Prove(res.hashes[j][:], 0, incompletes); err != nil { - panic(err) // Account range was already proven, what happened - } - } - break - } - // Filling not interrupted until this point, mark incomplete if needs healing - if task.needHeal[i] { - if err := res.trie.Prove(res.hashes[i][:], 0, incompletes); err != nil { - panic(err) // Account range was already proven, what happened - } - } - } - // Persist every finalized trie node that's not on the boundary - batch := s.db.NewBatch() - - var ( - nodes int - skipped int - ) - it := res.nodes.NewIterator(nil, nil) - for it.Next() { - // Boundary nodes are not written, since they are incomplete - if _, ok := res.bounds[common.BytesToHash(it.Key())]; ok { - skipped++ - continue - } - // Overflow nodes are not written, since they mess with another task - if _, err := res.overflow.Get(it.Key()); err == nil { - skipped++ - continue - } - // Accounts with split storage requests are incomplete - if _, err := incompletes.Get(it.Key()); err == nil { - skipped++ - continue - } - // Node is neither a boundary, not an incomplete account, persist to disk - batch.Put(it.Key(), it.Value()) - nodes++ - } - it.Release() - // Persist the received account segements. These flat state maybe // outdated during the sync, but it can be fixed later during the // snapshot generation. + batch := s.db.NewBatch() for i, hash := range res.hashes { + if task.needCode[i] || task.needState[i] { + break + } blob := snapshot.SlimAccountRLP(res.accounts[i].Nonce, res.accounts[i].Balance, res.accounts[i].Root, res.accounts[i].CodeHash) rawdb.WriteAccountSnapshot(batch, hash, blob) } @@ -2060,7 +2100,7 @@ func (s *Syncer) forwardAccountTask(task *accountTask) { s.accountBytes += bytes s.accountSynced += uint64(len(res.accounts)) - log.Debug("Persisted range of accounts", "accounts", len(res.accounts), "nodes", nodes, "skipped", skipped, "bytes", bytes) + log.Debug("Persisted range of accounts", "accounts", len(res.accounts), "bytes", bytes) // Task filling persisted, push it the chunk marker forward to the first // account still missing data. @@ -2147,22 +2187,13 @@ func (s *Syncer) OnAccounts(peer SyncPeer, id uint64, hashes []common.Hash, acco if len(keys) > 0 { end = keys[len(keys)-1] } - db, tr, notary, cont, err := trie.VerifyRangeProof(root, req.origin[:], end, keys, accounts, proofdb) + _, _, _, cont, err := trie.VerifyRangeProof(root, req.origin[:], end, keys, accounts, proofdb) if err != nil { logger.Warn("Account range failed proof", "err", err) // Signal this request as failed, and ready for rescheduling s.scheduleRevertAccountRequest(req) return err } - // Partial trie reconstructed, send it to the scheduler for storage filling - bounds := make(map[common.Hash]struct{}) - - it := notary.Accessed().NewIterator(nil, nil) - for it.Next() { - bounds[common.BytesToHash(it.Key())] = struct{}{} - } - it.Release() - accs := make([]*state.Account, len(accounts)) for i, account := range accounts { acc := new(state.Account) @@ -2175,10 +2206,6 @@ func (s *Syncer) OnAccounts(peer SyncPeer, id uint64, hashes []common.Hash, acco task: req.task, hashes: hashes, accounts: accs, - nodes: db, - trie: tr, - bounds: bounds, - overflow: light.NewNodeSet(), cont: cont, } select { @@ -2656,6 +2683,9 @@ func (s *Syncer) onHealState(paths [][]byte, value []byte) error { // hashSpace is the total size of the 256 bit hash space for accounts. var hashSpace = new(big.Int).Exp(common.Big2, common.Big256, nil) +// big10000 is used to generate 2 digit precision percentages. +var big10000 = big.NewInt(10000) + // report calculates various status reports and provides it to the user. func (s *Syncer) report(force bool) { if len(s.tasks) > 0 { From 83597764789febc2099dfc55c3fc82ccb6357860 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 15 Apr 2021 21:16:02 +0300 Subject: [PATCH 05/17] eth/protocols/snap: don't loop on account trie generation --- eth/protocols/snap/sync.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index a0400b258113..54c88cb63709 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -744,12 +744,18 @@ func (s *Syncer) saveSyncStatus() { // cleanAccountTasks removes account range retrieval tasks that have already been // completed. func (s *Syncer) cleanAccountTasks() { + // If the sync was already done before, don't even bother + if len(s.tasks) == 0 { + return + } + // Sync wasn't finished previously, check for any task that can be finalized for i := 0; i < len(s.tasks); i++ { if s.tasks[i].done { s.tasks = append(s.tasks[:i], s.tasks[i+1:]...) i-- } } + // If everything was just finalized just, generate the account trie and start heal if len(s.tasks) == 0 { s.lock.Lock() s.snapped = true From cb46ff38759a609e8b29768e1002e7a9961ea7b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 15 Apr 2021 22:00:19 +0300 Subject: [PATCH 06/17] eth/protocols/snap: fix account format in trie --- eth/protocols/snap/sync.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index 54c88cb63709..7bb7f5c95e0f 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -850,7 +850,8 @@ func (s *Syncer) generateAccountTrie() { } // Account seems to be complete, insert it into the state trie acchash := it.Key()[1:] - t.TryUpdate(acchash, common.CopyBytes(it.Value())) + accblob, _ := rlp.EncodeToBytes(account) + t.TryUpdate(acchash, accblob) if batch.ValueSize() > ethdb.IdealBatchSize { s.accountBytes += common.StorageSize(batch.ValueSize()) From ebd4e20ab8d5e60341877765be557f4fe7ce8aa1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Fri, 16 Apr 2021 21:23:14 +0300 Subject: [PATCH 07/17] core, eth, ethdb: glue snap packets together, but not chunks --- core/rawdb/database_test.go | 17 ++ core/rawdb/table.go | 5 + eth/protocols/snap/sync.go | 278 ++++++++++--------------- ethdb/batch.go | 3 + ethdb/leveldb/leveldb.go | 7 + ethdb/memorydb/memorydb.go | 7 + tests/fuzzers/stacktrie/trie_fuzzer.go | 1 + 7 files changed, 154 insertions(+), 164 deletions(-) create mode 100644 core/rawdb/database_test.go diff --git a/core/rawdb/database_test.go b/core/rawdb/database_test.go new file mode 100644 index 000000000000..8bf06f97d8d7 --- /dev/null +++ b/core/rawdb/database_test.go @@ -0,0 +1,17 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rawdb diff --git a/core/rawdb/table.go b/core/rawdb/table.go index 323ef6293cab..4daa6b53497f 100644 --- a/core/rawdb/table.go +++ b/core/rawdb/table.go @@ -176,6 +176,11 @@ func (b *tableBatch) Delete(key []byte) error { return b.batch.Delete(append([]byte(b.prefix), key...)) } +// KeyCount retrieves the number of keys queued up for writing. +func (b *tableBatch) KeyCount() int { + return b.batch.KeyCount() +} + // ValueSize retrieves the amount of data queued up for writing. func (b *tableBatch) ValueSize() int { return b.batch.ValueSize() diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index 7bb7f5c95e0f..e87966080593 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -291,6 +291,9 @@ type accountTask struct { codeTasks map[common.Hash]struct{} // Code hashes that need retrieval stateTasks map[common.Hash]common.Hash // Account hashes->roots that need full state retrieval + genBatch ethdb.Batch // Batch used by the node generator + genTrie *trie.StackTrie // Node generator from storage slots + done bool // Flag whether the task can be removed } @@ -302,7 +305,11 @@ type storageTask struct { // These fields are internals used during runtime root common.Hash // Storage root hash for this instance req *storageRequest // Pending request to fill this task - done bool // Flag whether the task can be removed + + genBatch ethdb.Batch // Batch used by the node generator + genTrie *trie.StackTrie // Node generator from storage slots + + done bool // Flag whether the task can be removed } // healTask represents the sync task for healing the snap-synced chunk boundaries. @@ -670,6 +677,17 @@ func (s *Syncer) loadSyncStatus() { log.Debug("Scheduled account sync task", "from", task.Next, "last", task.Last) } s.tasks = progress.Tasks + for _, task := range s.tasks { + task.genBatch = s.db.NewBatch() + task.genTrie = trie.NewStackTrie(task.genBatch) + + for _, subtasks := range task.SubTasks { + for _, subtask := range subtasks { + subtask.genBatch = s.db.NewBatch() + subtask.genTrie = trie.NewStackTrie(task.genBatch) + } + } + } s.snapped = len(s.tasks) == 0 s.accountSynced = progress.AccountSynced @@ -709,10 +727,13 @@ func (s *Syncer) loadSyncStatus() { // Make sure we don't overflow if the step is not a proper divisor last = common.HexToHash("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff") } + batch := s.db.NewBatch() s.tasks = append(s.tasks, &accountTask{ Next: next, Last: last, SubTasks: make(map[common.Hash][]*storageTask), + genBatch: batch, + genTrie: trie.NewStackTrie(batch), }) log.Debug("Created account sync task", "from", next, "last", last) next = common.BigToHash(new(big.Int).Add(last.Big(), common.Big1)) @@ -721,6 +742,25 @@ func (s *Syncer) loadSyncStatus() { // saveSyncStatus marshals the remaining sync tasks into leveldb. func (s *Syncer) saveSyncStatus() { + // Serialize any partial progress to disk before spinning down + for _, task := range s.tasks { + keys, bytes := task.genBatch.KeyCount(), task.genBatch.ValueSize() + if err := task.genBatch.Write(); err != nil { + log.Error("Failed to persist account slots", "err", err) + } + s.accountBytes += common.StorageSize(keys*common.HashLength + bytes) + + for _, subtasks := range task.SubTasks { + for _, subtask := range subtasks { + keys, bytes := subtask.genBatch.KeyCount(), subtask.genBatch.ValueSize() + if err := subtask.genBatch.Write(); err != nil { + log.Error("Failed to persist storage slots", "err", err) + } + s.accountBytes += common.StorageSize(keys*common.HashLength + bytes) + } + } + } + // Store the actual progress markers progress := &syncProgress{ Tasks: s.tasks, AccountSynced: s.accountSynced, @@ -760,8 +800,6 @@ func (s *Syncer) cleanAccountTasks() { s.lock.Lock() s.snapped = true s.lock.Unlock() - - s.generateAccountTrie() // TODO(karalabe): Do we need to dedup this somehow, interrupt? } } @@ -790,16 +828,6 @@ func (s *Syncer) cleanStorageTasks() { delete(task.SubTasks, account) task.pend-- - // Upon large account completion, generate the trie using whatever - // data we have. Most of the time it will the the complete perfect - // data. If the pivot moved during sync, we'll have some portions - // dirty that we'll fix during healing. - root := s.generateStorageTrie(account) - for j, hash := range task.res.hashes { - if hash == account && task.res.accounts[j].Root == root { - task.needHeal[j] = false - } - } // If this was the last pending task, forward the account task if task.pend == 0 { s.forwardAccountTask(task) @@ -808,144 +836,6 @@ func (s *Syncer) cleanStorageTasks() { } } -// generateAccountTrie iterates the dirty snapshot of the accounts and creates -// the state trie nodes for it with a stack trie. -func (s *Syncer) generateAccountTrie() { - // Iterate over all the dirty accounts - it := s.db.NewIterator(rawdb.SnapshotAccountPrefix, nil) - defer it.Release() - - // Pass all the accounts through a stack trie, periodically flushing to disk - // when too much data accumulates. - var ( - triedb = trie.NewDatabase(s.db) - batch = s.db.NewBatch() - t = trie.NewStackTrie(batch) - - start = time.Now() - logged = time.Now() - ) - for it.Next() { - // Skip anything that's not an account snapshot item - if len(it.Key()) != 1+common.HashLength { - continue - } - // Check the bytecode and storage of an account before dropping it into - // the trie. Gaps will hurt healing a lot, but we need to fix those gaps - // either way. - account, err := snapshot.FullAccount(it.Value()) - if err != nil { - log.Error("Failed to decode account", "err", err) - continue - } - if codehash := common.BytesToHash(account.CodeHash); codehash != emptyCode { - if code := rawdb.ReadCode(s.db, codehash); len(code) == 0 { - continue // Leave gap for account with missing code - } - } - if roothash := common.BytesToHash(account.Root); roothash != emptyRoot { - if _, err := trie.New(roothash, triedb); err != nil { - continue // Leave gap for account with missing storage - } - } - // Account seems to be complete, insert it into the state trie - acchash := it.Key()[1:] - accblob, _ := rlp.EncodeToBytes(account) - t.TryUpdate(acchash, accblob) - - if batch.ValueSize() > ethdb.IdealBatchSize { - s.accountBytes += common.StorageSize(batch.ValueSize()) - if err := batch.Write(); err != nil { - log.Error("Failed to write account trie data", "err", err) - } - batch.Reset() - - // Occasionally show a log message since this path can take many minutes - if time.Since(logged) > 8*time.Second { - var ( - pos = new(big.Int).SetBytes(acchash) - perc = float64(new(big.Int).Div(new(big.Int).Mul(pos, big10000), hashSpace).Uint64()) / 100 - prog = fmt.Sprintf("%.2f%%", perc) - eta = time.Duration(float64(time.Since(start)) * (100 - perc) / perc) - ) - log.Info("Generating account state trie", "generated", prog, "eta", common.PrettyDuration(eta)) - logged = time.Now() - } - s.reportSyncProgress(false) - - // TODO(karalabe): Do we want to support interrupting this method? - } - } - // Finalize the trie - if _, err := t.Commit(); err != nil { - log.Error("Failed to commit account trie", "err", err) - } - s.accountBytes += common.StorageSize(batch.ValueSize()) - if err := batch.Write(); err != nil { - log.Error("Failed to finalize account trie", "err", err) - } -} - -// generateStorageTrie iterates the dirty snapshot of an account storage and -// creates the state trie nodes for it with a stack trie. -func (s *Syncer) generateStorageTrie(account common.Hash) common.Hash { - // Iterate over all the dirty storage slots of the account - it := s.db.NewIterator(append(rawdb.SnapshotStoragePrefix, account.Bytes()...), nil) - defer it.Release() - - // Pass all the slots through a stack trie, periodically flushing to disk - // when too much data accumulates. - var ( - batch = s.db.NewBatch() - t = trie.NewStackTrie(batch) - - start = time.Now() - logged = time.Now() - ) - for it.Next() { - // Skip anything that's not an storage snapshot item - if len(it.Key()) != 1+2*common.HashLength { - continue - } - slothash := it.Key()[1+common.HashLength:] - t.TryUpdate(slothash, common.CopyBytes(it.Value())) - - if batch.ValueSize() > ethdb.IdealBatchSize { - s.storageBytes += common.StorageSize(batch.ValueSize()) - if err := batch.Write(); err != nil { - log.Error("Failed to write storage trie data", "err", err) - } - batch.Reset() - - // Occasionally show a log message since this path can take many minutes - if time.Since(logged) > 8*time.Second { - var ( - pos = new(big.Int).SetBytes(slothash) - perc = float64(new(big.Int).Div(new(big.Int).Mul(pos, big10000), hashSpace).Uint64()) / 100 - prog = fmt.Sprintf("%.2f%%", perc) - eta = time.Duration(float64(time.Since(start)) * (100 - perc) / perc) - ) - log.Info("Generating contract state trie", "generated", prog, "eta", common.PrettyDuration(eta)) - logged = time.Now() - } - s.reportSyncProgress(false) - - // TODO(karalabe): Do we want to support interrupting this method? - } - } - // Finalize the trie to retrieve its root hash and bubble it up to decide if - // account healing is still needed or not any more - root, err := t.Commit() - if err != nil { - log.Error("Failed to commit storage trie", "err", err) - } - s.storageBytes += common.StorageSize(batch.ValueSize()) - if err := batch.Write(); err != nil { - log.Error("Failed to finalize storage trie", "err", err) - } - return root -} - // assignAccountTasks attempts to match idle peers to pending account range // retrievals. func (s *Syncer) assignAccountTasks(success chan *accountResponse, fail chan *accountRequest, cancel chan struct{}) { @@ -1871,9 +1761,9 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { batch := s.db.NewBatch() var ( - slots int - nodes int - skipped int + slots int + nodes int + bytes common.StorageSize ) // Iterate over all the accounts and reconstruct their storage tries from the // delivered slots @@ -1924,10 +1814,13 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { // Make sure we don't overflow if the step is not a proper divisor last = common.HexToHash("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff") } + batch := s.db.NewBatch() tasks = append(tasks, &storageTask{ - Next: next, - Last: last, - root: acc.Root, + Next: next, + Last: last, + root: acc.Root, + genBatch: batch, + genTrie: trie.NewStackTrie(batch), }) log.Debug("Created storage sync task", "account", account, "root", acc.Root, "from", next, "last", last) next = common.BigToHash(new(big.Int).Add(last.Big(), common.Big1)) @@ -1976,6 +1869,8 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { it := res.nodes[i].NewIterator(nil, nil) for it.Next() { batch.Put(it.Key(), it.Value()) + + bytes += common.StorageSize(common.HashLength + len(it.Value())) nodes++ } it.Release() @@ -1985,16 +1880,41 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { // snapshot generation. for j := 0; j < len(res.hashes[i]); j++ { rawdb.WriteStorageSnapshot(batch, account, res.hashes[i][j], res.slots[i][j]) + bytes += common.StorageSize(1 + 2*common.HashLength + len(res.slots[i][j])) + + // If we're storing large contracts, generate the trie nodes + // on the fly to not trash the gluing points + if i == len(res.hashes)-1 && res.subTask != nil { + res.subTask.genTrie.Update(res.hashes[i][j][:], res.slots[i][j]) + } } } - bytes := common.StorageSize(batch.ValueSize()) + // Large contracts could have generated new trie nodes, flush them to disk + if res.subTask != nil { + if res.subTask.done { + if _, err := res.subTask.genTrie.Commit(); err != nil { + log.Error("Failed to commit stack slots", "err", err) + } + } + if data := res.subTask.genBatch.ValueSize(); data > ethdb.IdealBatchSize || res.subTask.done { + keys := res.subTask.genBatch.KeyCount() + if err := res.subTask.genBatch.Write(); err != nil { + log.Error("Failed to persist stack slots", "err", err) + } + res.subTask.genBatch.Reset() + + bytes += common.StorageSize(keys*common.HashLength + data) + nodes += keys + } + } + // Flush anything written just now abd update the stats if err := batch.Write(); err != nil { log.Crit("Failed to persist storage slots", "err", err) } s.storageSynced += uint64(slots) s.storageBytes += bytes - log.Debug("Persisted set of storage slots", "accounts", len(res.hashes), "slots", slots, "nodes", nodes, "skipped", skipped, "bytes", bytes) + log.Debug("Persisted set of storage slots", "accounts", len(res.hashes), "slots", slots, "nodes", nodes, "bytes", bytes) // If this delivery completed the last pending task, forward the account task // to the next chunk @@ -2092,22 +2012,53 @@ func (s *Syncer) forwardAccountTask(task *accountTask) { // Persist the received account segements. These flat state maybe // outdated during the sync, but it can be fixed later during the // snapshot generation. + var ( + nodes int + bytes common.StorageSize + ) batch := s.db.NewBatch() for i, hash := range res.hashes { if task.needCode[i] || task.needState[i] { break } - blob := snapshot.SlimAccountRLP(res.accounts[i].Nonce, res.accounts[i].Balance, res.accounts[i].Root, res.accounts[i].CodeHash) - rawdb.WriteAccountSnapshot(batch, hash, blob) + slim := snapshot.SlimAccountRLP(res.accounts[i].Nonce, res.accounts[i].Balance, res.accounts[i].Root, res.accounts[i].CodeHash) + rawdb.WriteAccountSnapshot(batch, hash, slim) + bytes += common.StorageSize(1 + common.HashLength + len(slim)) + + // If the task is complete, drop it into the stack trie to generate + // account trie nodes for it + if !task.needHeal[i] { + full, err := snapshot.FullAccountRLP(slim) // TODO(karalabe): Slim parsing can be omitted + if err != nil { + panic(err) // Really shouldn't ever happen + } + task.genTrie.Update(hash[:], full) + } } - bytes := common.StorageSize(batch.ValueSize()) + // Stack trie could have generated trie nodes, push them to disk + if task.done { + if _, err := task.genTrie.Commit(); err != nil { + log.Error("Failed to commit stack account", "err", err) + } + } + if data := task.genBatch.ValueSize(); data > ethdb.IdealBatchSize || task.done { + keys := task.genBatch.KeyCount() + if err := task.genBatch.Write(); err != nil { + log.Error("Failed to persist stack account", "err", err) + } + task.genBatch.Reset() + + nodes += keys + bytes += common.StorageSize(keys*common.HashLength + data) + } + // Flush anything written just now abd update the stats if err := batch.Write(); err != nil { log.Crit("Failed to persist accounts", "err", err) } s.accountBytes += bytes s.accountSynced += uint64(len(res.accounts)) - log.Debug("Persisted range of accounts", "accounts", len(res.accounts), "bytes", bytes) + log.Debug("Persisted range of accounts", "accounts", len(res.accounts), "nodes", nodes, "bytes", bytes) // Task filling persisted, push it the chunk marker forward to the first // account still missing data. @@ -2162,7 +2113,6 @@ func (s *Syncer) OnAccounts(peer SyncPeer, id uint64, hashes []common.Hash, acco s.lock.Unlock() return nil } - // Response is valid, but check if peer is signalling that it does not have // the requested data. For account range queries that means the state being // retrieved was either already pruned remotely, or the peer is not yet diff --git a/ethdb/batch.go b/ethdb/batch.go index e261415bff9d..5f8207fc4691 100644 --- a/ethdb/batch.go +++ b/ethdb/batch.go @@ -25,6 +25,9 @@ const IdealBatchSize = 100 * 1024 type Batch interface { KeyValueWriter + // KeyCount retrieves the number of keys queued up for writing. + KeyCount() int + // ValueSize retrieves the amount of data queued up for writing. ValueSize() int diff --git a/ethdb/leveldb/leveldb.go b/ethdb/leveldb/leveldb.go index 5d19cc3577de..d0578742b61c 100644 --- a/ethdb/leveldb/leveldb.go +++ b/ethdb/leveldb/leveldb.go @@ -448,6 +448,7 @@ func (db *Database) meter(refresh time.Duration) { type batch struct { db *leveldb.DB b *leveldb.Batch + keys int size int } @@ -461,10 +462,16 @@ func (b *batch) Put(key, value []byte) error { // Delete inserts the a key removal into the batch for later committing. func (b *batch) Delete(key []byte) error { b.b.Delete(key) + b.keys++ b.size += len(key) return nil } +// KeyCount retrieves the number of keys queued up for writing. +func (b *batch) KeyCount() int { + return b.keys +} + // ValueSize retrieves the amount of data queued up for writing. func (b *batch) ValueSize() int { return b.size diff --git a/ethdb/memorydb/memorydb.go b/ethdb/memorydb/memorydb.go index fedc9e326cf8..f7e372bba122 100644 --- a/ethdb/memorydb/memorydb.go +++ b/ethdb/memorydb/memorydb.go @@ -198,6 +198,7 @@ type keyvalue struct { type batch struct { db *Database writes []keyvalue + keys int size int } @@ -211,10 +212,16 @@ func (b *batch) Put(key, value []byte) error { // Delete inserts the a key removal into the batch for later committing. func (b *batch) Delete(key []byte) error { b.writes = append(b.writes, keyvalue{common.CopyBytes(key), nil, true}) + b.keys++ b.size += len(key) return nil } +// KeyCount retrieves the number of keys queued up for writing. +func (b *batch) KeyCount() int { + return b.keys +} + // ValueSize retrieves the amount of data queued up for writing. func (b *batch) ValueSize() int { return b.size diff --git a/tests/fuzzers/stacktrie/trie_fuzzer.go b/tests/fuzzers/stacktrie/trie_fuzzer.go index 5cea7769c284..0013c919c9f3 100644 --- a/tests/fuzzers/stacktrie/trie_fuzzer.go +++ b/tests/fuzzers/stacktrie/trie_fuzzer.go @@ -90,6 +90,7 @@ func (b *spongeBatch) Put(key, value []byte) error { return nil } func (b *spongeBatch) Delete(key []byte) error { panic("implement me") } +func (b *spongeBatch) KeyCount() int { panic("not implemented") } func (b *spongeBatch) ValueSize() int { return 100 } func (b *spongeBatch) Write() error { return nil } func (b *spongeBatch) Reset() {} From a74a3dd720c939e6fc227232bbb26fe0327d8c2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Fri, 16 Apr 2021 23:25:11 +0300 Subject: [PATCH 08/17] eth/protocols/snap: print completion log for snap phase --- eth/protocols/snap/sync.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index e87966080593..1c59b3fb37e4 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -800,6 +800,9 @@ func (s *Syncer) cleanAccountTasks() { s.lock.Lock() s.snapped = true s.lock.Unlock() + + // Push the final sync report + s.reportSyncProgress(true) } } From ace03d29965ee4fa487626c32a065540754c8488 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Sun, 18 Apr 2021 20:27:47 +0200 Subject: [PATCH 09/17] eth/protocols/snap: extended tests --- eth/protocols/snap/sync.go | 52 +++++++++++++++-------------- eth/protocols/snap/sync_test.go | 59 +++++++++++++++++++++++++++++++-- 2 files changed, 85 insertions(+), 26 deletions(-) diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index 1c59b3fb37e4..95896254cd52 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -73,7 +73,9 @@ const ( // and waste round trip times. If it's too high, we're capping responses and // waste bandwidth. maxTrieRequestCount = 512 +) +var ( // accountConcurrency is the number of chunks to split the account trie into // to allow concurrent retrievals. accountConcurrency = 16 @@ -81,9 +83,7 @@ const ( // storageConcurrency is the number of chunks to split the a large contract // storage trie into to allow concurrent retrievals. storageConcurrency = 16 -) -var ( // requestTimeout is the maximum time a peer is allowed to spend on serving // a single network request. requestTimeout = 15 * time.Second // TODO(karalabe): Make it dynamic ala fast-sync? @@ -718,7 +718,7 @@ func (s *Syncer) loadSyncStatus() { step := new(big.Int).Sub( new(big.Int).Div( new(big.Int).Exp(common.Big2, common.Big256, nil), - big.NewInt(accountConcurrency), + big.NewInt(int64(accountConcurrency)), ), common.Big1, ) for i := 0; i < accountConcurrency; i++ { @@ -1808,7 +1808,7 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { step := new(big.Int).Sub( new(big.Int).Div( new(big.Int).Exp(common.Big2, common.Big256, nil), - big.NewInt(storageConcurrency), + big.NewInt(int64(storageConcurrency)), ), common.Big1, ) for k := 0; k < storageConcurrency; k++ { @@ -2039,29 +2039,33 @@ func (s *Syncer) forwardAccountTask(task *accountTask) { } } // Stack trie could have generated trie nodes, push them to disk - if task.done { - if _, err := task.genTrie.Commit(); err != nil { - log.Error("Failed to commit stack account", "err", err) - } - } - if data := task.genBatch.ValueSize(); data > ethdb.IdealBatchSize || task.done { - keys := task.genBatch.KeyCount() - if err := task.genBatch.Write(); err != nil { - log.Error("Failed to persist stack account", "err", err) + defer func(){ + if task.done { + if root, err := task.genTrie.Commit(); err != nil { + log.Error("Failed to commit stack account", "err", err) + } else { + log.Debug("Range root", "root", root) + } } - task.genBatch.Reset() + if data := task.genBatch.ValueSize(); data > ethdb.IdealBatchSize || task.done { + keys := task.genBatch.KeyCount() + if err := task.genBatch.Write(); err != nil { + log.Error("Failed to persist stack account", "err", err) + } + task.genBatch.Reset() - nodes += keys - bytes += common.StorageSize(keys*common.HashLength + data) - } - // Flush anything written just now abd update the stats - if err := batch.Write(); err != nil { - log.Crit("Failed to persist accounts", "err", err) - } - s.accountBytes += bytes - s.accountSynced += uint64(len(res.accounts)) + nodes += keys + bytes += common.StorageSize(keys*common.HashLength + data) + } + // Flush anything written just now abd update the stats + if err := batch.Write(); err != nil { + log.Crit("Failed to persist accounts", "err", err) + } + s.accountBytes += bytes + s.accountSynced += uint64(len(res.accounts)) - log.Debug("Persisted range of accounts", "accounts", len(res.accounts), "nodes", nodes, "bytes", bytes) + log.Debug("Persisted range of accounts", "accounts", len(res.accounts), "nodes", nodes, "bytes", bytes) + }() // Task filling persisted, push it the chunk marker forward to the first // account still missing data. diff --git a/eth/protocols/snap/sync_test.go b/eth/protocols/snap/sync_test.go index 3e9778dbc7c5..e5420e2f97c1 100644 --- a/eth/protocols/snap/sync_test.go +++ b/eth/protocols/snap/sync_test.go @@ -22,6 +22,7 @@ import ( "encoding/binary" "fmt" "math/big" + "os" "sort" "sync" "testing" @@ -135,6 +136,12 @@ type testPeer struct { trieRequestHandler trieHandlerFunc codeRequestHandler codeHandlerFunc term func() + + // counters + nAccountRequests int + nStorageRequests int + nBytecodeRequests int + nTrienodeRequests int } func newTestPeer(id string, t *testing.T, term func()) *testPeer { @@ -156,19 +163,30 @@ func newTestPeer(id string, t *testing.T, term func()) *testPeer { func (t *testPeer) ID() string { return t.id } func (t *testPeer) Log() log.Logger { return t.logger } +func (t *testPeer) Stats() string { + return fmt.Sprintf(`Account requests: %d +Storage requests: %d +Bytecode requests: %d +Trienode requests: %d +`, t.nAccountRequests, t.nStorageRequests, t.nBytecodeRequests, t.nTrienodeRequests) +} + func (t *testPeer) RequestAccountRange(id uint64, root, origin, limit common.Hash, bytes uint64) error { t.logger.Trace("Fetching range of accounts", "reqid", id, "root", root, "origin", origin, "limit", limit, "bytes", common.StorageSize(bytes)) + t.nAccountRequests++ go t.accountRequestHandler(t, id, root, origin, limit, bytes) return nil } func (t *testPeer) RequestTrieNodes(id uint64, root common.Hash, paths []TrieNodePathSet, bytes uint64) error { t.logger.Trace("Fetching set of trie nodes", "reqid", id, "root", root, "pathsets", len(paths), "bytes", common.StorageSize(bytes)) + t.nTrienodeRequests++ go t.trieRequestHandler(t, id, root, paths, bytes) return nil } func (t *testPeer) RequestStorageRanges(id uint64, root common.Hash, accounts []common.Hash, origin, limit []byte, bytes uint64) error { + t.nStorageRequests++ if len(accounts) == 1 && origin != nil { t.logger.Trace("Fetching range of large storage slots", "reqid", id, "root", root, "account", accounts[0], "origin", common.BytesToHash(origin), "limit", common.BytesToHash(limit), "bytes", common.StorageSize(bytes)) } else { @@ -179,6 +197,7 @@ func (t *testPeer) RequestStorageRanges(id uint64, root common.Hash, accounts [] } func (t *testPeer) RequestByteCodes(id uint64, hashes []common.Hash, bytes uint64) error { + t.nBytecodeRequests++ t.logger.Trace("Fetching set of byte codes", "reqid", id, "hashes", len(hashes), "bytes", common.StorageSize(bytes)) go t.codeRequestHandler(t, id, hashes, bytes) return nil @@ -1365,7 +1384,7 @@ func makeBoundaryAccountTrie(n int) (*trie.Trie, entrySlice) { step := new(big.Int).Sub( new(big.Int).Div( new(big.Int).Exp(common.Big2, common.Big256, nil), - big.NewInt(accountConcurrency), + big.NewInt(int64(accountConcurrency)), ), common.Big1, ) for i := 0; i < accountConcurrency; i++ { @@ -1529,7 +1548,7 @@ func makeBoundaryStorageTrie(n int, db *trie.Database) (*trie.Trie, entrySlice) step := new(big.Int).Sub( new(big.Int).Div( new(big.Int).Exp(common.Big2, common.Big256, nil), - big.NewInt(accountConcurrency), + big.NewInt(int64(accountConcurrency)), ), common.Big1, ) for i := 0; i < accountConcurrency; i++ { @@ -1605,3 +1624,39 @@ func verifyTrie(db ethdb.KeyValueStore, root common.Hash, t *testing.T) { } t.Logf("accounts: %d, slots: %d", accounts, slots) } + +// TestSync tests a basic sync with one peer +func TestSyncAgain(t *testing.T) { + // Set the account concurrency to 1. This _should_ result in the + // range root to become correct, and there should be no healing needed + accountConcurrency = 1 + log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(true)))) + var ( + once sync.Once + cancel = make(chan struct{}) + term = func() { + once.Do(func() { + close(cancel) + }) + } + ) + sourceAccountTrie, elems := makeAccountTrieNoStorage(100) + + mkSource := func(name string) *testPeer { + source := newTestPeer(name, t, term) + source.accountTrie = sourceAccountTrie + source.accountValues = elems + return source + } + src := mkSource("source") + syncer := setupSyncer(src) + if err := syncer.Sync(sourceAccountTrie.Hash(), cancel); err != nil { + t.Fatalf("sync failed: %v", err) + } + verifyTrie(syncer.db, sourceAccountTrie.Hash(), t) + fmt.Printf(src.Stats()) + if have, want := src.nTrienodeRequests, 0; have != want{ + t.Errorf("trie node heal requests wrong, want %d, have %d", want, have) + } + +} From ea7661f6f4566ff0d90fd93725dbd70bf08cd576 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Sun, 18 Apr 2021 20:55:47 +0200 Subject: [PATCH 10/17] eth/protocols/snap: make testcase pass --- eth/protocols/snap/sync_test.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/eth/protocols/snap/sync_test.go b/eth/protocols/snap/sync_test.go index e5420e2f97c1..2ca56927d878 100644 --- a/eth/protocols/snap/sync_test.go +++ b/eth/protocols/snap/sync_test.go @@ -1625,8 +1625,9 @@ func verifyTrie(db ethdb.KeyValueStore, root common.Hash, t *testing.T) { t.Logf("accounts: %d, slots: %d", accounts, slots) } -// TestSync tests a basic sync with one peer -func TestSyncAgain(t *testing.T) { +// TestSyncAccountPerformance tests how efficient the snap algo is at minimizing +// state healing +func TestSyncAccountPerformance(t *testing.T) { // Set the account concurrency to 1. This _should_ result in the // range root to become correct, and there should be no healing needed accountConcurrency = 1 @@ -1654,9 +1655,12 @@ func TestSyncAgain(t *testing.T) { t.Fatalf("sync failed: %v", err) } verifyTrie(syncer.db, sourceAccountTrie.Hash(), t) - fmt.Printf(src.Stats()) - if have, want := src.nTrienodeRequests, 0; have != want{ + // The trie root will always be requested, since it is added when the snap + // sync cycle starts. When popping the queue, we do not look it up again. + // Doing so would bring this number down to zero in this artificial testcase, + // but only add extra IO for no reason in practice. + if have, want := src.nTrienodeRequests, 1; have != want { + fmt.Printf(src.Stats()) t.Errorf("trie node heal requests wrong, want %d, have %d", want, have) } - } From 4a7e01287145157629a493cd676a0ed68f1e108f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 19 Apr 2021 14:42:37 +0300 Subject: [PATCH 11/17] eth/protocols/snap: fix account stacktrie commit without defer --- eth/protocols/snap/sync.go | 53 ++++++++++++++++----------------- eth/protocols/snap/sync_test.go | 4 +-- 2 files changed, 28 insertions(+), 29 deletions(-) diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index 95896254cd52..f613c9306bc9 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -2038,34 +2038,14 @@ func (s *Syncer) forwardAccountTask(task *accountTask) { task.genTrie.Update(hash[:], full) } } - // Stack trie could have generated trie nodes, push them to disk - defer func(){ - if task.done { - if root, err := task.genTrie.Commit(); err != nil { - log.Error("Failed to commit stack account", "err", err) - } else { - log.Debug("Range root", "root", root) - } - } - if data := task.genBatch.ValueSize(); data > ethdb.IdealBatchSize || task.done { - keys := task.genBatch.KeyCount() - if err := task.genBatch.Write(); err != nil { - log.Error("Failed to persist stack account", "err", err) - } - task.genBatch.Reset() - - nodes += keys - bytes += common.StorageSize(keys*common.HashLength + data) - } - // Flush anything written just now abd update the stats - if err := batch.Write(); err != nil { - log.Crit("Failed to persist accounts", "err", err) - } - s.accountBytes += bytes - s.accountSynced += uint64(len(res.accounts)) + // Flush anything written just now abd update the stats + if err := batch.Write(); err != nil { + log.Crit("Failed to persist accounts", "err", err) + } + s.accountBytes += bytes + s.accountSynced += uint64(len(res.accounts)) - log.Debug("Persisted range of accounts", "accounts", len(res.accounts), "nodes", nodes, "bytes", bytes) - }() + log.Debug("Persisted range of accounts", "accounts", len(res.accounts), "nodes", nodes, "bytes", bytes) // Task filling persisted, push it the chunk marker forward to the first // account still missing data. @@ -2077,6 +2057,25 @@ func (s *Syncer) forwardAccountTask(task *accountTask) { } // All accounts marked as complete, track if the entire task is done task.done = !res.cont + + // Stack trie could have generated trie nodes, push them to disk (we need to + // flush after finalizing task.done. It's fine even if we crash and lose this + // write as it will only cause more data to be downloaded during heal. + if task.done { + if _, err := task.genTrie.Commit(); err != nil { + log.Error("Failed to commit stack account", "err", err) + } + } + if data := task.genBatch.ValueSize(); data > ethdb.IdealBatchSize || task.done { + keys := task.genBatch.KeyCount() + if err := task.genBatch.Write(); err != nil { + log.Error("Failed to persist stack account", "err", err) + } + task.genBatch.Reset() + + nodes += keys + bytes += common.StorageSize(keys*common.HashLength + data) + } } // OnAccounts is a callback method to invoke when a range of accounts are diff --git a/eth/protocols/snap/sync_test.go b/eth/protocols/snap/sync_test.go index 2ca56927d878..c7314bdbce76 100644 --- a/eth/protocols/snap/sync_test.go +++ b/eth/protocols/snap/sync_test.go @@ -22,7 +22,6 @@ import ( "encoding/binary" "fmt" "math/big" - "os" "sort" "sync" "testing" @@ -1630,8 +1629,9 @@ func verifyTrie(db ethdb.KeyValueStore, root common.Hash, t *testing.T) { func TestSyncAccountPerformance(t *testing.T) { // Set the account concurrency to 1. This _should_ result in the // range root to become correct, and there should be no healing needed + defer func(old int) { accountConcurrency = old }(accountConcurrency) accountConcurrency = 1 - log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(true)))) + var ( once sync.Once cancel = make(chan struct{}) From 4bf235dc0708683913252d6d129810c1fcf59155 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 19 Apr 2021 16:01:13 +0300 Subject: [PATCH 12/17] ethdb: fix key counts on reset --- ethdb/leveldb/leveldb.go | 2 +- ethdb/memorydb/memorydb.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ethdb/leveldb/leveldb.go b/ethdb/leveldb/leveldb.go index d0578742b61c..da00226e95c7 100644 --- a/ethdb/leveldb/leveldb.go +++ b/ethdb/leveldb/leveldb.go @@ -485,7 +485,7 @@ func (b *batch) Write() error { // Reset resets the batch for reuse. func (b *batch) Reset() { b.b.Reset() - b.size = 0 + b.keys, b.size = 0, 0 } // Replay replays the batch contents. diff --git a/ethdb/memorydb/memorydb.go b/ethdb/memorydb/memorydb.go index f7e372bba122..ded9f5e66c75 100644 --- a/ethdb/memorydb/memorydb.go +++ b/ethdb/memorydb/memorydb.go @@ -245,7 +245,7 @@ func (b *batch) Write() error { // Reset resets the batch for reuse. func (b *batch) Reset() { b.writes = b.writes[:0] - b.size = 0 + b.keys, b.size = 0, 0 } // Replay replays the batch contents. From 7b2b34a7b7f1045bfb3137d0672b32e371aba0bb Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Mon, 19 Apr 2021 15:07:14 +0200 Subject: [PATCH 13/17] eth/protocols: fix typos --- eth/protocols/snap/handler.go | 2 +- eth/protocols/snap/sync.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/eth/protocols/snap/handler.go b/eth/protocols/snap/handler.go index 4c12adfa81d6..9bfac6f03ff7 100644 --- a/eth/protocols/snap/handler.go +++ b/eth/protocols/snap/handler.go @@ -354,7 +354,7 @@ func handleMessage(backend Backend, peer *Peer) error { if err := msg.Decode(res); err != nil { return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) } - // Ensure the ranges ae monotonically increasing + // Ensure the ranges are monotonically increasing for i, slots := range res.Slots { for j := 1; j < len(slots); j++ { if bytes.Compare(slots[j-1].Hash[:], slots[j].Hash[:]) >= 0 { diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index f613c9306bc9..192c16ac53ba 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -1910,7 +1910,7 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { nodes += keys } } - // Flush anything written just now abd update the stats + // Flush anything written just now and update the stats if err := batch.Write(); err != nil { log.Crit("Failed to persist storage slots", "err", err) } @@ -2038,7 +2038,7 @@ func (s *Syncer) forwardAccountTask(task *accountTask) { task.genTrie.Update(hash[:], full) } } - // Flush anything written just now abd update the stats + // Flush anything written just now and update the stats if err := batch.Write(); err != nil { log.Crit("Failed to persist accounts", "err", err) } From ba26c665005b5fc5efbbba4caedd2ec2f3524888 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Tue, 27 Apr 2021 10:14:17 +0200 Subject: [PATCH 14/17] eth/protocols/snap: make better use of delivered data (#44) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * eth/protocols/snap: make better use of delivered data * squashme * eth/protocols/snap: reduce chunking * squashme * eth/protocols/snap: reduce chunking further * eth/protocols/snap: break out hash range calculations * eth/protocols/snap: use sort.Search instead of looping * eth/protocols/snap: prevent crash on storage response with no keys * eth/protocols/snap: nitpicks all around * eth/protocols/snap: clear heal need on 1-chunk storage completion * eth/protocols/snap: fix range chunker, add tests Co-authored-by: Péter Szilágyi --- eth/protocols/snap/range.go | 85 ++++++++++++++++++ eth/protocols/snap/range_test.go | 143 +++++++++++++++++++++++++++++++ eth/protocols/snap/sync.go | 106 ++++++++++++++++------- eth/protocols/snap/sync_test.go | 50 +++++++++++ 4 files changed, 352 insertions(+), 32 deletions(-) create mode 100644 eth/protocols/snap/range.go create mode 100644 eth/protocols/snap/range_test.go diff --git a/eth/protocols/snap/range.go b/eth/protocols/snap/range.go new file mode 100644 index 000000000000..1819c1c88449 --- /dev/null +++ b/eth/protocols/snap/range.go @@ -0,0 +1,85 @@ +// Copyright 2021 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package snap + +import ( + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/math" + "github.com/holiman/uint256" +) + +// hashRange is a utility to handle ranges of hashes, Split up the +// hash-space into sections, and 'walk' over the sections +type hashRange struct { + current *uint256.Int + step *uint256.Int +} + +// newHashRange creates a new hashRange, initiated at the start position, +// and with the step set to fill the desired 'num' chunks +func newHashRange(start common.Hash, num uint64) *hashRange { + left := new(big.Int).Sub( + new(big.Int).Add(math.MaxBig256, common.Big1), + start.Big(), + ) + step := new(big.Int).Div( + new(big.Int).Add(left, new(big.Int).SetUint64(num-1)), + new(big.Int).SetUint64(num), + ) + + step256 := new(uint256.Int) + step256.SetFromBig(step) + + return &hashRange{ + current: uint256.NewInt().SetBytes32(start[:]), + step: step256, + } +} + +// Next pushes the hash range to the next interval. +func (r *hashRange) Next() bool { + next := new(uint256.Int) + if overflow := next.AddOverflow(r.current, r.step); overflow { + return false + } + r.current = next + return true +} + +// Start returns the first hash in the current interval. +func (r *hashRange) Start() common.Hash { + return r.current.Bytes32() +} + +// End returns the last hash in the current interval. +func (r *hashRange) End() common.Hash { + // If the end overflows (non divisible range), return a shorter interval + next := new(uint256.Int) + if overflow := next.AddOverflow(r.current, r.step); overflow { + return common.HexToHash("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff") + } + return new(uint256.Int).Sub(next, uint256.NewInt().SetOne()).Bytes32() +} + +// incHash returns the next hash, in lexicographical order (a.k.a plus one) +func incHash(h common.Hash) common.Hash { + a := uint256.NewInt().SetBytes32(h[:]) + a.Add(a, uint256.NewInt().SetOne()) + return common.Hash(a.Bytes32()) +} diff --git a/eth/protocols/snap/range_test.go b/eth/protocols/snap/range_test.go new file mode 100644 index 000000000000..23273e50bf10 --- /dev/null +++ b/eth/protocols/snap/range_test.go @@ -0,0 +1,143 @@ +// Copyright 2021 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package snap + +import ( + "testing" + + "github.com/ethereum/go-ethereum/common" +) + +// Tests that given a starting hash and a density, the hash ranger can correctly +// split up the remaining hash space into a fixed number of chunks. +func TestHashRanges(t *testing.T) { + tests := []struct { + head common.Hash + chunks uint64 + starts []common.Hash + ends []common.Hash + }{ + // Simple test case to split the entire hash range into 4 chunks + { + head: common.Hash{}, + chunks: 4, + starts: []common.Hash{ + {}, + common.HexToHash("0x4000000000000000000000000000000000000000000000000000000000000000"), + common.HexToHash("0x8000000000000000000000000000000000000000000000000000000000000000"), + common.HexToHash("0xc000000000000000000000000000000000000000000000000000000000000000"), + }, + ends: []common.Hash{ + common.HexToHash("0x3fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"), + common.HexToHash("0x7fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"), + common.HexToHash("0xbfffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"), + common.HexToHash("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"), + }, + }, + // Split a divisible part of the hash range up into 2 chunks + { + head: common.HexToHash("0x2000000000000000000000000000000000000000000000000000000000000000"), + chunks: 2, + starts: []common.Hash{ + common.Hash{}, + common.HexToHash("0x9000000000000000000000000000000000000000000000000000000000000000"), + }, + ends: []common.Hash{ + common.HexToHash("0x8fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"), + common.HexToHash("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"), + }, + }, + // Split the entire hash range into a non divisible 3 chunks + { + head: common.Hash{}, + chunks: 3, + starts: []common.Hash{ + {}, + common.HexToHash("0x5555555555555555555555555555555555555555555555555555555555555556"), + common.HexToHash("0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaac"), + }, + ends: []common.Hash{ + common.HexToHash("0x5555555555555555555555555555555555555555555555555555555555555555"), + common.HexToHash("0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaab"), + common.HexToHash("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"), + }, + }, + // Split a part of hash range into a non divisible 3 chunks + { + head: common.HexToHash("0x2000000000000000000000000000000000000000000000000000000000000000"), + chunks: 3, + starts: []common.Hash{ + {}, + common.HexToHash("0x6aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaab"), + common.HexToHash("0xb555555555555555555555555555555555555555555555555555555555555556"), + }, + ends: []common.Hash{ + common.HexToHash("0x6aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"), + common.HexToHash("0xb555555555555555555555555555555555555555555555555555555555555555"), + common.HexToHash("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"), + }, + }, + // Split a part of hash range into a non divisible 3 chunks, but with a + // meaningful space size for manual verification. + // - The head being 0xff...f0, we have 14 hashes left in the space + // - Chunking up 14 into 3 pieces is 4.(6), but we need the ceil of 5 to avoid a micro-last-chunk + // - Since the range is not divisible, the last interval will be shrter, capped at 0xff...f + // - The chunk ranges thus needs to be [..0, ..5], [..6, ..b], [..c, ..f] + { + head: common.HexToHash("0xfffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff0"), + chunks: 3, + starts: []common.Hash{ + {}, + common.HexToHash("0xfffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff6"), + common.HexToHash("0xfffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffc"), + }, + ends: []common.Hash{ + common.HexToHash("0xfffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff5"), + common.HexToHash("0xfffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffb"), + common.HexToHash("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"), + }, + }, + } + for i, tt := range tests { + r := newHashRange(tt.head, tt.chunks) + + var ( + starts = []common.Hash{{}} + ends = []common.Hash{r.End()} + ) + for r.Next() { + starts = append(starts, r.Start()) + ends = append(ends, r.End()) + } + if len(starts) != len(tt.starts) { + t.Errorf("test %d: starts count mismatch: have %d, want %d", i, len(starts), len(tt.starts)) + } + for j := 0; j < len(starts) && j < len(tt.starts); j++ { + if starts[j] != tt.starts[j] { + t.Errorf("test %d, start %d: hash mismatch: have %x, want %x", i, j, starts[j], tt.starts[j]) + } + } + if len(ends) != len(tt.ends) { + t.Errorf("test %d: ends count mismatch: have %d, want %d", i, len(ends), len(tt.ends)) + } + for j := 0; j < len(ends) && j < len(tt.ends); j++ { + if ends[j] != tt.ends[j] { + t.Errorf("test %d, end %d: hash mismatch: have %x, want %x", i, j, ends[j], tt.ends[j]) + } + } + } +} diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index 192c16ac53ba..018e0e14e227 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -23,10 +23,12 @@ import ( "fmt" "math/big" "math/rand" + "sort" "sync" "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/math" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state/snapshot" @@ -1803,30 +1805,49 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { // the subtasks for it within the main account task if tasks, ok := res.mainTask.SubTasks[account]; !ok { var ( - next common.Hash + keys = res.hashes[i] + chunks = uint64(storageConcurrency) + lastKey common.Hash ) - step := new(big.Int).Sub( - new(big.Int).Div( - new(big.Int).Exp(common.Big2, common.Big256, nil), - big.NewInt(int64(storageConcurrency)), - ), common.Big1, - ) - for k := 0; k < storageConcurrency; k++ { - last := common.BigToHash(new(big.Int).Add(next.Big(), step)) - if k == storageConcurrency-1 { - // Make sure we don't overflow if the step is not a proper divisor - last = common.HexToHash("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff") + if len(keys) > 0 { + lastKey = keys[len(keys)-1] + } + // If the number of slots remaining is low, decrease the + // number of chunks. Somewhere on the order of 10-15K slots + // fit into a packet of 500KB. A key/slot pair is maximum 64 + // bytes, so pessimistically maxRequestSize/64 = 8K. + // + // Chunk so that at least 2 packets are needed to fill a task. + if estimate, err := estimateRemainingSlots(len(keys), lastKey); err == nil { + if n := estimate / (2 * (maxRequestSize / 64)); n+1 < chunks { + chunks = n + 1 } + log.Debug("Chunked large contract", "initiators", len(keys), "tail", lastKey, "remaining", estimate, "chunks", chunks) + } else { + log.Debug("Chunked large contract", "initiators", len(keys), "tail", lastKey, "chunks", chunks) + } + r := newHashRange(lastKey, chunks) + + // Our first task is the one that was just filled by this response. + tasks = append(tasks, &storageTask{ + Next: common.Hash{}, + Last: r.End(), + root: acc.Root, + genBatch: batch, + genTrie: trie.NewStackTrie(batch), + }) + for r.Next() { batch := s.db.NewBatch() tasks = append(tasks, &storageTask{ - Next: next, - Last: last, + Next: r.Start(), + Last: r.End(), root: acc.Root, genBatch: batch, genTrie: trie.NewStackTrie(batch), }) - log.Debug("Created storage sync task", "account", account, "root", acc.Root, "from", next, "last", last) - next = common.BigToHash(new(big.Int).Add(last.Big(), common.Big1)) + } + for _, task := range tasks { + log.Debug("Created storage sync task", "account", account, "root", acc.Root, "from", task.Next, "last", task.Last) } res.mainTask.SubTasks[account] = tasks @@ -1839,25 +1860,23 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { if res.subTask != nil { // Ensure the response doesn't overflow into the subsequent task last := res.subTask.Last.Big() - for k, hash := range res.hashes[i] { - // Mark the range complete if the last is already included. - // Keep iteration to delete the extra states if exists. - cmp := hash.Big().Cmp(last) - if cmp == 0 { + // Find the first overflowing key. While at it, mark res as complete + // if we find the range to include or pass the 'last' + index := sort.Search(len(res.hashes[i]), func(k int) bool { + cmp := res.hashes[i][k].Big().Cmp(last) + if cmp >= 0 { res.cont = false - continue - } - if cmp > 0 { - // Chunk overflown, cut off excess - res.hashes[i] = res.hashes[i][:k] - res.slots[i] = res.slots[i][:k] - res.cont = false // Mark range completed - break } + return cmp > 0 + }) + if index >= 0 { + // cut off excess + res.hashes[i] = res.hashes[i][:index] + res.slots[i] = res.slots[i][:index] } // Forward the relevant storage chunk (even if created just now) if res.cont { - res.subTask.Next = common.BigToHash(new(big.Int).Add(res.hashes[i][len(res.hashes[i])-1].Big(), big.NewInt(1))) + res.subTask.Next = incHash(res.hashes[i][len(res.hashes[i])-1]) } else { res.subTask.done = true } @@ -1895,8 +1914,15 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { // Large contracts could have generated new trie nodes, flush them to disk if res.subTask != nil { if res.subTask.done { - if _, err := res.subTask.genTrie.Commit(); err != nil { + if root, err := res.subTask.genTrie.Commit(); err != nil { log.Error("Failed to commit stack slots", "err", err) + } else if root == res.subTask.root { + // If the chunk's root is an overflown but full delivery, clear the heal request + for i, account := range res.mainTask.res.hashes { + if account == res.accounts[len(res.accounts)-1] { + res.mainTask.needHeal[i] = false + } + } } } if data := res.subTask.genBatch.ValueSize(); data > ethdb.IdealBatchSize || res.subTask.done { @@ -2053,7 +2079,7 @@ func (s *Syncer) forwardAccountTask(task *accountTask) { if task.needCode[i] || task.needState[i] { return } - task.Next = common.BigToHash(new(big.Int).Add(hash.Big(), big.NewInt(1))) + task.Next = incHash(hash) } // All accounts marked as complete, track if the entire task is done task.done = !res.cont @@ -2715,3 +2741,19 @@ func (s *Syncer) reportHealProgress(force bool) { log.Info("State heal in progress", "accounts", accounts, "slots", storage, "codes", bytecode, "nodes", trienode, "pending", s.healer.scheduler.Pending()) } + +// estimateRemainingSlots tries to determine roughly how many slots are left in +// a contract storage, based on the number of keys and the last hash. This method +// assumes that the hashes are lexicographically ordered and evenly distributed. +func estimateRemainingSlots(hashes int, last common.Hash) (uint64, error) { + if last == (common.Hash{}) { + return 0, errors.New("last hash empty") + } + space := new(big.Int).Mul(math.MaxBig256, big.NewInt(int64(hashes))) + space.Div(space, last.Big()) + if !space.IsUint64() { + // Gigantic address space probably due to too few or malicious slots + return 0, errors.New("too few slots for estimation") + } + return space.Uint64() - uint64(hashes), nil +} diff --git a/eth/protocols/snap/sync_test.go b/eth/protocols/snap/sync_test.go index c7314bdbce76..a1cc3581a85c 100644 --- a/eth/protocols/snap/sync_test.go +++ b/eth/protocols/snap/sync_test.go @@ -1664,3 +1664,53 @@ func TestSyncAccountPerformance(t *testing.T) { t.Errorf("trie node heal requests wrong, want %d, have %d", want, have) } } + +func TestSlotEstimation(t *testing.T) { + for i, tc := range []struct { + last common.Hash + count int + want uint64 + }{ + { + // Half the space + common.HexToHash("0x7fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"), + 100, + 100, + }, + { + // 1 / 16th + common.HexToHash("0x0fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"), + 100, + 1500, + }, + { + // Bit more than 1 / 16th + common.HexToHash("0x1000000000000000000000000000000000000000000000000000000000000000"), + 100, + 1499, + }, + { + // Almost everything + common.HexToHash("0xF000000000000000000000000000000000000000000000000000000000000000"), + 100, + 6, + }, + { + // Almost nothing -- should lead to error + common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000001"), + 1, + 0, + }, + { + // Nothing -- should lead to error + common.Hash{}, + 100, + 0, + }, + } { + have, _ := estimateRemainingSlots(tc.count, tc.last) + if want := tc.want; have != want { + t.Errorf("test %d: have %d want %d", i, have, want) + } + } +} From 4bfd9c35afbb432185e6cb9a3c89560ab6eb02b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 27 Apr 2021 11:30:06 +0300 Subject: [PATCH 15/17] trie: fix test API error --- trie/trie_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/trie/trie_test.go b/trie/trie_test.go index 492b423c2ff0..44fddf87e4c0 100644 --- a/trie/trie_test.go +++ b/trie/trie_test.go @@ -706,6 +706,7 @@ func (b *spongeBatch) Put(key, value []byte) error { return nil } func (b *spongeBatch) Delete(key []byte) error { panic("implement me") } +func (b *spongeBatch) KeyCount() int { return 100 } func (b *spongeBatch) ValueSize() int { return 100 } func (b *spongeBatch) Write() error { return nil } func (b *spongeBatch) Reset() {} From 7edc4f65a2969be00601a8ea9e98d6d6a8bcecd6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 27 Apr 2021 11:38:09 +0300 Subject: [PATCH 16/17] eth/protocols/snap: fix some further liter issues --- eth/protocols/snap/range.go | 7 +------ eth/protocols/snap/sync.go | 6 +----- 2 files changed, 2 insertions(+), 11 deletions(-) diff --git a/eth/protocols/snap/range.go b/eth/protocols/snap/range.go index 1819c1c88449..dd380ff47148 100644 --- a/eth/protocols/snap/range.go +++ b/eth/protocols/snap/range.go @@ -20,7 +20,6 @@ import ( "math/big" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/math" "github.com/holiman/uint256" ) @@ -34,15 +33,11 @@ type hashRange struct { // newHashRange creates a new hashRange, initiated at the start position, // and with the step set to fill the desired 'num' chunks func newHashRange(start common.Hash, num uint64) *hashRange { - left := new(big.Int).Sub( - new(big.Int).Add(math.MaxBig256, common.Big1), - start.Big(), - ) + left := new(big.Int).Sub(hashSpace, start.Big()) step := new(big.Int).Div( new(big.Int).Add(left, new(big.Int).SetUint64(num-1)), new(big.Int).SetUint64(num), ) - step256 := new(uint256.Int) step256.SetFromBig(step) diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index 018e0e14e227..0ad694c8bde7 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -2071,8 +2071,6 @@ func (s *Syncer) forwardAccountTask(task *accountTask) { s.accountBytes += bytes s.accountSynced += uint64(len(res.accounts)) - log.Debug("Persisted range of accounts", "accounts", len(res.accounts), "nodes", nodes, "bytes", bytes) - // Task filling persisted, push it the chunk marker forward to the first // account still missing data. for i, hash := range res.hashes { @@ -2102,6 +2100,7 @@ func (s *Syncer) forwardAccountTask(task *accountTask) { nodes += keys bytes += common.StorageSize(keys*common.HashLength + data) } + log.Debug("Persisted range of accounts", "accounts", len(res.accounts), "nodes", nodes, "bytes", bytes) } // OnAccounts is a callback method to invoke when a range of accounts are @@ -2672,9 +2671,6 @@ func (s *Syncer) onHealState(paths [][]byte, value []byte) error { // hashSpace is the total size of the 256 bit hash space for accounts. var hashSpace = new(big.Int).Exp(common.Big2, common.Big256, nil) -// big10000 is used to generate 2 digit precision percentages. -var big10000 = big.NewInt(10000) - // report calculates various status reports and provides it to the user. func (s *Syncer) report(force bool) { if len(s.tasks) > 0 { From 6dde2c5dd3fb41d8b2454599286e74f790e28f41 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 27 Apr 2021 14:41:47 +0300 Subject: [PATCH 17/17] eth/protocols/snap: fix accidental batch reuse --- eth/protocols/snap/sync.go | 1 + 1 file changed, 1 insertion(+) diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index 0ad694c8bde7..3ce4c8735f8d 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -1829,6 +1829,7 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { r := newHashRange(lastKey, chunks) // Our first task is the one that was just filled by this response. + batch := s.db.NewBatch() tasks = append(tasks, &storageTask{ Next: common.Hash{}, Last: r.End(),