From 133900e56849cb71374f89282454465d54f9ba54 Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Wed, 23 Oct 2024 14:49:22 +0800 Subject: [PATCH] core, triedb/pathdb: integrate state snapshot inth pathdb --- core/blockchain.go | 57 +- core/blockchain_repair_test.go | 32 +- core/blockchain_sethead_test.go | 2 +- core/blockchain_snapshot_test.go | 16 +- core/state/database.go | 5 +- core/state/snapshot/generate_test.go | 4 +- core/state/statedb_test.go | 3 +- eth/handler.go | 3 +- eth/protocols/snap/handler.go | 40 +- eth/protocols/snap/sync_test.go | 2 +- tests/block_test_util.go | 6 +- trie/database_test.go | 6 +- triedb/database.go | 31 + triedb/pathdb/buffer.go | 10 +- triedb/pathdb/context.go | 245 ++++++++ triedb/pathdb/database.go | 124 +++- triedb/pathdb/database_test.go | 3 +- triedb/pathdb/disklayer.go | 181 +++++- triedb/pathdb/errors.go | 9 + triedb/pathdb/flush.go | 82 +++ triedb/pathdb/generate.go | 845 +++++++++++++++++++++++++++ triedb/pathdb/generate_test.go | 740 +++++++++++++++++++++++ triedb/pathdb/iterator_test.go | 80 ++- triedb/pathdb/journal.go | 53 +- triedb/pathdb/metrics.go | 50 +- triedb/pathdb/states.go | 9 +- 26 files changed, 2478 insertions(+), 160 deletions(-) create mode 100644 triedb/pathdb/context.go create mode 100644 triedb/pathdb/generate.go create mode 100644 triedb/pathdb/generate_test.go diff --git a/core/blockchain.go b/core/blockchain.go index c3da61b28108..bd74a5851c5f 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -161,7 +161,8 @@ func (c *CacheConfig) triedbConfig(isVerkle bool) *triedb.Config { if c.StateScheme == rawdb.PathScheme { config.PathDB = &pathdb.Config{ StateHistory: c.StateHistory, - CleanCacheSize: c.TrieCleanLimit * 1024 * 1024, + TrieCleanSize: c.TrieCleanLimit * 1024 * 1024, + StateCleanSize: c.SnapshotLimit * 1024 * 1024, WriteBufferSize: c.TrieDirtyLimit * 1024 * 1024, } } @@ -349,11 +350,14 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis // Do nothing here until the state syncer picks it up. log.Info("Genesis state is missing, wait state sync") } else { - // Head state is missing, before the state recovery, find out the - // disk layer point of snapshot(if it's enabled). Make sure the - // rewound point is lower than disk layer. + // Head state is missing, before the state recovery, find out the disk + // layer point of snapshot(if it's enabled). Make sure the rewound point + // is lower than disk layer. + // + // Note it's unnecessary in path mode which always keep trie data and + // state data in consistent. var diskRoot common.Hash - if bc.cacheConfig.SnapshotLimit > 0 { + if bc.cacheConfig.SnapshotLimit > 0 && bc.cacheConfig.StateScheme == rawdb.HashScheme { diskRoot = rawdb.ReadSnapshotRoot(bc.db) } if diskRoot != (common.Hash{}) { @@ -426,7 +430,32 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis bc.logger.OnGenesisBlock(bc.genesisBlock, alloc) } } + bc.setupSnapshot() + // Rewind the chain in case of an incompatible config upgrade. + if compat, ok := genesisErr.(*params.ConfigCompatError); ok { + log.Warn("Rewinding chain to upgrade configuration", "err", compat) + if compat.RewindToTime > 0 { + bc.SetHeadWithTimestamp(compat.RewindToTime) + } else { + bc.SetHead(compat.RewindToBlock) + } + rawdb.WriteChainConfig(db, genesisHash, chainConfig) + } + + // Start tx indexer if it's enabled. + if txLookupLimit != nil { + bc.txIndexer = newTxIndexer(*txLookupLimit, bc) + } + return bc, nil +} + +func (bc *BlockChain) setupSnapshot() { + // Short circuit if the chain is established with path scheme, as the + // state snapshot has been integrated into path database natively. + if bc.cacheConfig.StateScheme == rawdb.PathScheme { + return + } // Load any existing snapshot, regenerating it if loading failed if bc.cacheConfig.SnapshotLimit > 0 { // If the chain was rewound past the snapshot persistent layer (causing @@ -434,7 +463,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis // in recovery mode and in that case, don't invalidate the snapshot on a // head mismatch. var recover bool - head := bc.CurrentBlock() if layer := rawdb.ReadSnapshotRecoveryNumber(bc.db); layer != nil && *layer >= head.Number.Uint64() { log.Warn("Enabling snapshot recovery", "chainhead", head.Number, "diskbase", *layer) @@ -451,23 +479,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis // Re-initialize the state database with snapshot bc.statedb = state.NewDatabase(bc.triedb, bc.snaps) } - - // Rewind the chain in case of an incompatible config upgrade. - if compat, ok := genesisErr.(*params.ConfigCompatError); ok { - log.Warn("Rewinding chain to upgrade configuration", "err", compat) - if compat.RewindToTime > 0 { - bc.SetHeadWithTimestamp(compat.RewindToTime) - } else { - bc.SetHead(compat.RewindToBlock) - } - rawdb.WriteChainConfig(db, genesisHash, chainConfig) - } - - // Start tx indexer if it's enabled. - if txLookupLimit != nil { - bc.txIndexer = newTxIndexer(*txLookupLimit, bc) - } - return bc, nil } // empty returns an indicator whether the blockchain is empty. diff --git a/core/blockchain_repair_test.go b/core/blockchain_repair_test.go index 8a2dfe9f11f0..70ffceebfbd4 100644 --- a/core/blockchain_repair_test.go +++ b/core/blockchain_repair_test.go @@ -1789,7 +1789,7 @@ func testRepairWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme s } ) defer engine.Close() - if snapshots { + if snapshots && scheme == rawdb.HashScheme { config.SnapshotLimit = 256 config.SnapshotWait = true } @@ -1818,7 +1818,7 @@ func testRepairWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme s if err := chain.triedb.Commit(canonblocks[tt.commitBlock-1].Root(), false); err != nil { t.Fatalf("Failed to flush trie state: %v", err) } - if snapshots { + if snapshots && scheme == rawdb.HashScheme { if err := chain.snaps.Cap(canonblocks[tt.commitBlock-1].Root(), 0); err != nil { t.Fatalf("Failed to flatten snapshots: %v", err) } @@ -1948,8 +1948,10 @@ func testIssue23496(t *testing.T, scheme string) { if _, err := chain.InsertChain(blocks[1:2]); err != nil { t.Fatalf("Failed to import canonical chain start: %v", err) } - if err := chain.snaps.Cap(blocks[1].Root(), 0); err != nil { - t.Fatalf("Failed to flatten snapshots: %v", err) + if scheme == rawdb.HashScheme { + if err := chain.snaps.Cap(blocks[1].Root(), 0); err != nil { + t.Fatalf("Failed to flatten snapshots: %v", err) + } } // Insert block B3 and commit the state into disk @@ -1992,15 +1994,21 @@ func testIssue23496(t *testing.T, scheme string) { } expHead := uint64(1) if scheme == rawdb.PathScheme { - expHead = uint64(2) + expHead = uint64(3) } if head := chain.CurrentBlock(); head.Number.Uint64() != expHead { t.Errorf("Head block mismatch: have %d, want %d", head.Number, expHead) } - - // Reinsert B2-B4 - if _, err := chain.InsertChain(blocks[1:]); err != nil { - t.Fatalf("Failed to import canonical chain tail: %v", err) + if scheme == rawdb.PathScheme { + // Reinsert B3-B4 + if _, err := chain.InsertChain(blocks[2:]); err != nil { + t.Fatalf("Failed to import canonical chain tail: %v", err) + } + } else { + // Reinsert B2-B4 + if _, err := chain.InsertChain(blocks[1:]); err != nil { + t.Fatalf("Failed to import canonical chain tail: %v", err) + } } if head := chain.CurrentHeader(); head.Number.Uint64() != uint64(4) { t.Errorf("Head header mismatch: have %d, want %d", head.Number, 4) @@ -2011,7 +2019,9 @@ func testIssue23496(t *testing.T, scheme string) { if head := chain.CurrentBlock(); head.Number.Uint64() != uint64(4) { t.Errorf("Head block mismatch: have %d, want %d", head.Number, uint64(4)) } - if layer := chain.Snapshots().Snapshot(blocks[2].Root()); layer == nil { - t.Error("Failed to regenerate the snapshot of known state") + if scheme == rawdb.HashScheme { + if layer := chain.Snapshots().Snapshot(blocks[2].Root()); layer == nil { + t.Error("Failed to regenerate the snapshot of known state") + } } } diff --git a/core/blockchain_sethead_test.go b/core/blockchain_sethead_test.go index b72de3389636..4bd09af6b0d5 100644 --- a/core/blockchain_sethead_test.go +++ b/core/blockchain_sethead_test.go @@ -2021,7 +2021,7 @@ func testSetHeadWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme } if tt.commitBlock > 0 { chain.triedb.Commit(canonblocks[tt.commitBlock-1].Root(), false) - if snapshots { + if snapshots && scheme == rawdb.HashScheme { if err := chain.snaps.Cap(canonblocks[tt.commitBlock-1].Root(), 0); err != nil { t.Fatalf("Failed to flatten snapshots: %v", err) } diff --git a/core/blockchain_snapshot_test.go b/core/blockchain_snapshot_test.go index 120977f222fc..0ae9d0879d2f 100644 --- a/core/blockchain_snapshot_test.go +++ b/core/blockchain_snapshot_test.go @@ -103,7 +103,7 @@ func (basic *snapshotTestBasic) prepare(t *testing.T) (*BlockChain, []*types.Blo if basic.commitBlock > 0 && basic.commitBlock == point { chain.TrieDB().Commit(blocks[point-1].Root(), false) } - if basic.snapshotBlock > 0 && basic.snapshotBlock == point { + if basic.snapshotBlock > 0 && basic.snapshotBlock == point && basic.scheme == rawdb.HashScheme { // Flushing the entire snap tree into the disk, the // relevant (a) snapshot root and (b) snapshot generator // will be persisted atomically. @@ -147,13 +147,17 @@ func (basic *snapshotTestBasic) verify(t *testing.T, chain *BlockChain, blocks [ block := chain.GetBlockByNumber(basic.expSnapshotBottom) if block == nil { t.Errorf("The corresponding block[%d] of snapshot disk layer is missing", basic.expSnapshotBottom) - } else if !bytes.Equal(chain.snaps.DiskRoot().Bytes(), block.Root().Bytes()) { - t.Errorf("The snapshot disk layer root is incorrect, want %x, get %x", block.Root(), chain.snaps.DiskRoot()) + } else if basic.scheme == rawdb.HashScheme { + if !bytes.Equal(chain.snaps.DiskRoot().Bytes(), block.Root().Bytes()) { + t.Errorf("The snapshot disk layer root is incorrect, want %x, get %x", block.Root(), chain.snaps.DiskRoot()) + } } // Check the snapshot, ensure it's integrated - if err := chain.snaps.Verify(block.Root()); err != nil { - t.Errorf("The disk layer is not integrated %v", err) + if basic.scheme == rawdb.HashScheme { + if err := chain.snaps.Verify(block.Root()); err != nil { + t.Errorf("The disk layer is not integrated %v", err) + } } } @@ -567,7 +571,7 @@ func TestHighCommitCrashWithNewSnapshot(t *testing.T) { for _, scheme := range []string{rawdb.HashScheme, rawdb.PathScheme} { expHead := uint64(0) if scheme == rawdb.PathScheme { - expHead = uint64(4) + expHead = uint64(6) } test := &crashSnapshotTest{ snapshotTestBasic{ diff --git a/core/state/database.go b/core/state/database.go index 3b64f82089bb..d8b87f2a944e 100644 --- a/core/state/database.go +++ b/core/state/database.go @@ -193,8 +193,9 @@ func (db *CachingDB) Reader(stateRoot common.Hash) (Reader, error) { readers = append(readers, newStateReader(snap)) // snap reader is optional } } else { - // If standalone state snapshot is not available, try to construct - // the state reader with database. + // If standalone state snapshot is not available (path scheme + // or the state snapshot is explicitly disabled in hash mode), + // try to construct the state reader with database. reader, err := db.triedb.StateReader(stateRoot) if err == nil { readers = append(readers, newStateReader(reader)) // state reader is optional diff --git a/core/state/snapshot/generate_test.go b/core/state/snapshot/generate_test.go index 56abff348d18..4946dd1fd704 100644 --- a/core/state/snapshot/generate_test.go +++ b/core/state/snapshot/generate_test.go @@ -166,7 +166,9 @@ func newHelper(scheme string) *testHelper { diskdb := rawdb.NewMemoryDatabase() config := &triedb.Config{} if scheme == rawdb.PathScheme { - config.PathDB = &pathdb.Config{} // disable caching + config.PathDB = &pathdb.Config{ + SnapshotNoBuild: true, + } // disable caching } else { config.HashDB = &hashdb.Config{} // disable caching } diff --git a/core/state/statedb_test.go b/core/state/statedb_test.go index 3647397df6f8..0b336a54c4fd 100644 --- a/core/state/statedb_test.go +++ b/core/state/statedb_test.go @@ -983,7 +983,8 @@ func testMissingTrieNodes(t *testing.T, scheme string) { ) if scheme == rawdb.PathScheme { tdb = triedb.NewDatabase(memDb, &triedb.Config{PathDB: &pathdb.Config{ - CleanCacheSize: 0, + TrieCleanSize: 0, + StateCleanSize: 0, WriteBufferSize: 0, }}) // disable caching } else { diff --git a/eth/handler.go b/eth/handler.go index b28081eef0ec..ca0e8837049a 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -27,6 +27,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/forkid" + "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" @@ -174,7 +175,7 @@ func newHandler(config *handlerConfig) (*handler, error) { } } // If snap sync is requested but snapshots are disabled, fail loudly - if h.snapSync.Load() && config.Chain.Snapshots() == nil { + if h.snapSync.Load() && (config.Chain.Snapshots() == nil && config.Chain.TrieDB().Scheme() == rawdb.HashScheme) { return nil, errors.New("snap sync not supported with snapshots disabled") } // Construct the downloader (long sync) diff --git a/eth/protocols/snap/handler.go b/eth/protocols/snap/handler.go index d36f9621b13b..4ca1d1dc7fc5 100644 --- a/eth/protocols/snap/handler.go +++ b/eth/protocols/snap/handler.go @@ -23,6 +23,8 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/state/snapshot" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" @@ -31,6 +33,7 @@ import ( "github.com/ethereum/go-ethereum/p2p/enr" "github.com/ethereum/go-ethereum/trie" "github.com/ethereum/go-ethereum/trie/trienode" + "github.com/ethereum/go-ethereum/triedb/database" ) const ( @@ -279,7 +282,14 @@ func ServiceGetAccountRangeQuery(chain *core.BlockChain, req *GetAccountRangePac if err != nil { return nil, nil } - it, err := chain.Snapshots().AccountIterator(req.Root, req.Origin) + // Temporary solution: using the snapshot interface for both cases. + // This can be removed once the hash scheme is deprecated. + var it snapshot.AccountIterator + if chain.TrieDB().Scheme() == rawdb.HashScheme { + it, err = chain.Snapshots().AccountIterator(req.Root, req.Origin) + } else { + it, err = chain.TrieDB().AccountIterator(req.Root, req.Origin) + } if err != nil { return nil, nil } @@ -359,7 +369,17 @@ func ServiceGetStorageRangesQuery(chain *core.BlockChain, req *GetStorageRangesP limit, req.Limit = common.BytesToHash(req.Limit), nil } // Retrieve the requested state and bail out if non existent - it, err := chain.Snapshots().StorageIterator(req.Root, account, origin) + var ( + err error + it snapshot.StorageIterator + ) + // Temporary solution: using the snapshot interface for both cases. + // This can be removed once the hash scheme is deprecated. + if chain.TrieDB().Scheme() == rawdb.HashScheme { + it, err = chain.Snapshots().StorageIterator(req.Root, account, origin) + } else { + it, err = chain.TrieDB().StorageIterator(req.Root, account, origin) + } if err != nil { return nil, nil } @@ -479,8 +499,15 @@ func ServiceGetTrieNodesQuery(chain *core.BlockChain, req *GetTrieNodesPacket, s // We don't have the requested state available, bail out return nil, nil } - // The 'snap' might be nil, in which case we cannot serve storage slots. - snap := chain.Snapshots().Snapshot(req.Root) + // The 'reader' might be nil, in which case we cannot serve storage slots + // via snapshot. + var reader database.StateReader + if chain.Snapshots() != nil { + reader = chain.Snapshots().Snapshot(req.Root) + } + if reader == nil { + reader, _ = triedb.StateReader(req.Root) + } // Retrieve trie nodes until the packet size limit is reached var ( nodes [][]byte @@ -505,8 +532,9 @@ func ServiceGetTrieNodesQuery(chain *core.BlockChain, req *GetTrieNodesPacket, s default: var stRoot common.Hash + // Storage slots requested, open the storage trie and retrieve from there - if snap == nil { + if reader == nil { // We don't have the requested state snapshotted yet (or it is stale), // but can look up the account via the trie instead. account, err := accTrie.GetAccountByHash(common.BytesToHash(pathset[0])) @@ -516,7 +544,7 @@ func ServiceGetTrieNodesQuery(chain *core.BlockChain, req *GetTrieNodesPacket, s } stRoot = account.Root } else { - account, err := snap.Account(common.BytesToHash(pathset[0])) + account, err := reader.Account(common.BytesToHash(pathset[0])) loads++ // always account database reads, even for failures if err != nil || account == nil { break diff --git a/eth/protocols/snap/sync_test.go b/eth/protocols/snap/sync_test.go index d318077d99a8..d599e7ecc325 100644 --- a/eth/protocols/snap/sync_test.go +++ b/eth/protocols/snap/sync_test.go @@ -1962,5 +1962,5 @@ func newDbConfig(scheme string) *triedb.Config { if scheme == rawdb.HashScheme { return &triedb.Config{} } - return &triedb.Config{PathDB: pathdb.Defaults} + return &triedb.Config{PathDB: &pathdb.Config{SnapshotNoBuild: true}} } diff --git a/tests/block_test_util.go b/tests/block_test_util.go index b0a31a69720b..a007d70bd0a9 100644 --- a/tests/block_test_util.go +++ b/tests/block_test_util.go @@ -181,8 +181,10 @@ func (t *BlockTest) Run(snapshotter bool, scheme string, witness bool, tracer *t } // Cross-check the snapshot-to-hash against the trie hash if snapshotter { - if err := chain.Snapshots().Verify(chain.CurrentBlock().Root); err != nil { - return err + if chain.Snapshots() != nil { + if err := chain.Snapshots().Verify(chain.CurrentBlock().Root); err != nil { + return err + } } } return t.validateImportedHeaders(chain, validBlocks) diff --git a/trie/database_test.go b/trie/database_test.go index 729d9f699be1..535f0d61b208 100644 --- a/trie/database_test.go +++ b/trie/database_test.go @@ -25,7 +25,7 @@ import ( "github.com/ethereum/go-ethereum/triedb/database" ) -// testReader implements database.Reader interface, providing function to +// testReader implements database.NodeReader interface, providing function to // access trie nodes. type testReader struct { db ethdb.Database @@ -33,7 +33,7 @@ type testReader struct { nodes []*trienode.MergedNodeSet // sorted from new to old } -// Node implements database.Reader interface, retrieving trie node with +// Node implements database.NodeReader interface, retrieving trie node with // all available cached layers. func (r *testReader) Node(owner common.Hash, path []byte, hash common.Hash) ([]byte, error) { // Check the node presence with the cached layer, from latest to oldest. @@ -54,7 +54,7 @@ func (r *testReader) Node(owner common.Hash, path []byte, hash common.Hash) ([]b return rawdb.ReadTrieNode(r.db, owner, path, hash, r.scheme), nil } -// testDb implements database.Database interface, using for testing purpose. +// testDb implements database.NodeDatabase interface, using for testing purpose. type testDb struct { disk ethdb.Database root common.Hash diff --git a/triedb/database.go b/triedb/database.go index b448d7cd07b0..dedcd9c2e813 100644 --- a/triedb/database.go +++ b/triedb/database.go @@ -322,6 +322,37 @@ func (db *Database) Journal(root common.Hash) error { return pdb.Journal(root) } +// WaitGeneration waits until the background generation is finished. It assumes +// that the generation is permitted; otherwise, it will block indefinitely. +func (db *Database) WaitGeneration() error { + pdb, ok := db.backend.(*pathdb.Database) + if !ok { + return errors.New("not supported") + } + pdb.WaitGeneration() + return nil +} + +// AccountIterator creates a new account iterator for the specified root hash and +// seeks to a starting account hash. +func (db *Database) AccountIterator(root common.Hash, seek common.Hash) (pathdb.AccountIterator, error) { + pdb, ok := db.backend.(*pathdb.Database) + if !ok { + return nil, errors.New("not supported") + } + return pdb.AccountIterator(root, seek) +} + +// StorageIterator creates a new storage iterator for the specified root hash and +// account. The iterator will be move to the specific start position. +func (db *Database) StorageIterator(root common.Hash, account common.Hash, seek common.Hash) (pathdb.StorageIterator, error) { + pdb, ok := db.backend.(*pathdb.Database) + if !ok { + return nil, errors.New("not supported") + } + return pdb.StorageIterator(root, account, seek) +} + // IsVerkle returns the indicator if the database is holding a verkle tree. func (db *Database) IsVerkle() bool { return db.config.IsVerkle diff --git a/triedb/pathdb/buffer.go b/triedb/pathdb/buffer.go index 488d0a4d4bd2..1059beb225dc 100644 --- a/triedb/pathdb/buffer.go +++ b/triedb/pathdb/buffer.go @@ -124,7 +124,7 @@ func (b *buffer) size() uint64 { // flush persists the in-memory dirty trie node into the disk if the configured // memory threshold is reached. Note, all data must be written atomically. -func (b *buffer) flush(db ethdb.KeyValueStore, freezer ethdb.AncientWriter, nodesCache *fastcache.Cache, id uint64) error { +func (b *buffer) flush(root common.Hash, db ethdb.KeyValueStore, freezer ethdb.AncientWriter, progress []byte, nodesCache, statesCache *fastcache.Cache, id uint64) error { // Ensure the target state id is aligned with the internal counter. head := rawdb.ReadPersistentStateID(db) if head+b.layers != id { @@ -133,7 +133,7 @@ func (b *buffer) flush(db ethdb.KeyValueStore, freezer ethdb.AncientWriter, node // Terminate the state snapshot generation if it's active var ( start = time.Now() - batch = db.NewBatchWithSize(b.nodes.dbsize() * 11 / 10) // extra 10% for potential pebble internal stuff + batch = db.NewBatchWithSize((b.nodes.dbsize() + b.states.dbsize()) * 11 / 10) // extra 10% for potential pebble internal stuff ) // Explicitly sync the state freezer, ensuring that all written // data is transferred to disk before updating the key-value store. @@ -143,7 +143,9 @@ func (b *buffer) flush(db ethdb.KeyValueStore, freezer ethdb.AncientWriter, node } } nodes := b.nodes.write(batch, nodesCache) + accounts, slots := b.states.write(db, batch, progress, statesCache) rawdb.WritePersistentStateID(batch, id) + rawdb.WriteSnapshotRoot(batch, root) // Flush all mutations in a single batch size := batch.ValueSize() @@ -152,8 +154,10 @@ func (b *buffer) flush(db ethdb.KeyValueStore, freezer ethdb.AncientWriter, node } commitBytesMeter.Mark(int64(size)) commitNodesMeter.Mark(int64(nodes)) + commitAccountsMeter.Mark(int64(accounts)) + commitStoragesMeter.Mark(int64(slots)) commitTimeTimer.UpdateSince(start) b.reset() - log.Debug("Persisted buffer content", "nodes", nodes, "bytes", common.StorageSize(size), "elapsed", common.PrettyDuration(time.Since(start))) + log.Debug("Persisted buffer content", "nodes", nodes, "accounts", accounts, "slots", slots, "bytes", common.StorageSize(size), "elapsed", common.PrettyDuration(time.Since(start))) return nil } diff --git a/triedb/pathdb/context.go b/triedb/pathdb/context.go new file mode 100644 index 000000000000..81465ae8e0cc --- /dev/null +++ b/triedb/pathdb/context.go @@ -0,0 +1,245 @@ +// Copyright 2022 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package pathdb + +import ( + "bytes" + "encoding/binary" + "errors" + "math" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/ethdb/memorydb" + "github.com/ethereum/go-ethereum/log" +) + +const ( + snapAccount = "account" // Identifier of account snapshot generation + snapStorage = "storage" // Identifier of storage snapshot generation +) + +// generatorStats is a collection of statistics gathered by the snapshot generator +// for logging purposes. This data structure is used throughout the entire +// lifecycle of the snapshot generation process and is shared across multiple +// generation cycles. +type generatorStats struct { + origin uint64 // Origin prefix where generation started + start time.Time // Timestamp when generation started + accounts uint64 // Number of accounts indexed(generated or recovered) + slots uint64 // Number of storage slots indexed(generated or recovered) + dangling uint64 // Number of dangling storage slots + storage common.StorageSize // Total account and storage slot size(generation or recovery) +} + +// log creates a contextual log with the given message and the context pulled +// from the internally maintained statistics. +func (gs *generatorStats) log(msg string, root common.Hash, marker []byte) { + var ctx []interface{} + if root != (common.Hash{}) { + ctx = append(ctx, []interface{}{"root", root}...) + } + // Figure out whether we're after or within an account + switch len(marker) { + case common.HashLength: + ctx = append(ctx, []interface{}{"at", common.BytesToHash(marker)}...) + case 2 * common.HashLength: + ctx = append(ctx, []interface{}{ + "in", common.BytesToHash(marker[:common.HashLength]), + "at", common.BytesToHash(marker[common.HashLength:]), + }...) + } + // Add the usual measurements + ctx = append(ctx, []interface{}{ + "accounts", gs.accounts, + "slots", gs.slots, + "storage", gs.storage, + "dangling", gs.dangling, + "elapsed", common.PrettyDuration(time.Since(gs.start)), + }...) + // Calculate the estimated indexing time based on current stats + if len(marker) > 0 { + if done := binary.BigEndian.Uint64(marker[:8]) - gs.origin; done > 0 { + left := math.MaxUint64 - binary.BigEndian.Uint64(marker[:8]) + + speed := done/uint64(time.Since(gs.start)/time.Millisecond+1) + 1 // +1s to avoid division by zero + ctx = append(ctx, []interface{}{ + "eta", common.PrettyDuration(time.Duration(left/speed) * time.Millisecond), + }...) + } + } + log.Info(msg, ctx...) +} + +// generatorContext holds several global fields that are used throughout the +// current generation cycle. +type generatorContext struct { + root common.Hash // State root of the generation target + account *holdableIterator // Iterator of account snapshot data + storage *holdableIterator // Iterator of storage snapshot data + db ethdb.KeyValueStore // Key-value store containing the snapshot data + batch ethdb.Batch // Database batch for writing data atomically + logged time.Time // The timestamp when last generation progress was displayed +} + +// newGeneratorContext initializes the context for generation. +func newGeneratorContext(root common.Hash, marker []byte, db ethdb.KeyValueStore) *generatorContext { + ctx := &generatorContext{ + root: root, + db: db, + batch: db.NewBatch(), + logged: time.Now(), + } + accMarker, storageMarker := splitMarker(marker) + ctx.openIterator(snapAccount, accMarker) + ctx.openIterator(snapStorage, storageMarker) + return ctx +} + +// openIterator constructs global account and storage snapshot iterators +// at the interrupted position. These iterators should be reopened from time +// to time to avoid blocking leveldb compaction for a long time. +func (ctx *generatorContext) openIterator(kind string, start []byte) { + if kind == snapAccount { + iter := ctx.db.NewIterator(rawdb.SnapshotAccountPrefix, start) + ctx.account = newHoldableIterator(rawdb.NewKeyLengthIterator(iter, 1+common.HashLength)) + return + } + iter := ctx.db.NewIterator(rawdb.SnapshotStoragePrefix, start) + ctx.storage = newHoldableIterator(rawdb.NewKeyLengthIterator(iter, 1+2*common.HashLength)) +} + +// reopenIterator releases the specified snapshot iterator and re-open it +// in the next position. It's aimed for not blocking leveldb compaction. +func (ctx *generatorContext) reopenIterator(kind string) { + // Shift iterator one more step, so that we can reopen + // the iterator at the right position. + var iter = ctx.account + if kind == snapStorage { + iter = ctx.storage + } + hasNext := iter.Next() + if !hasNext { + // Iterator exhausted, release forever and create an already exhausted virtual iterator + iter.Release() + if kind == snapAccount { + ctx.account = newHoldableIterator(memorydb.New().NewIterator(nil, nil)) + return + } + ctx.storage = newHoldableIterator(memorydb.New().NewIterator(nil, nil)) + return + } + next := iter.Key() + iter.Release() + ctx.openIterator(kind, next[1:]) +} + +// close releases all the held resources. +func (ctx *generatorContext) close() { + ctx.account.Release() + ctx.storage.Release() +} + +// iterator returns the corresponding iterator specified by the kind. +func (ctx *generatorContext) iterator(kind string) *holdableIterator { + if kind == snapAccount { + return ctx.account + } + return ctx.storage +} + +// removeStorageBefore deletes all storage entries which are located before +// the specified account. When the iterator touches the storage entry which +// is located in or outside the given account, it stops and holds the current +// iterated element locally. +func (ctx *generatorContext) removeStorageBefore(account common.Hash) uint64 { + var ( + count uint64 + start = time.Now() + iter = ctx.storage + ) + for iter.Next() { + key := iter.Key() + if bytes.Compare(key[1:1+common.HashLength], account.Bytes()) >= 0 { + iter.Hold() + break + } + count++ + ctx.batch.Delete(key) + if ctx.batch.ValueSize() > ethdb.IdealBatchSize { + ctx.batch.Write() + ctx.batch.Reset() + } + } + storageCleanCounter.Inc(time.Since(start).Nanoseconds()) + return count +} + +// removeStorageAt deletes all storage entries which are located in the specified +// account. When the iterator touches the storage entry which is outside the given +// account, it stops and holds the current iterated element locally. An error will +// be returned if the initial position of iterator is not in the given account. +func (ctx *generatorContext) removeStorageAt(account common.Hash) error { + var ( + count int64 + start = time.Now() + iter = ctx.storage + ) + for iter.Next() { + key := iter.Key() + cmp := bytes.Compare(key[len(rawdb.SnapshotStoragePrefix):len(rawdb.SnapshotStoragePrefix)+common.HashLength], account.Bytes()) + if cmp < 0 { + return errors.New("invalid iterator position") + } + if cmp > 0 { + iter.Hold() + break + } + count++ + ctx.batch.Delete(key) + if ctx.batch.ValueSize() > ethdb.IdealBatchSize { + ctx.batch.Write() + ctx.batch.Reset() + } + } + wipedStorageMeter.Mark(count) + storageCleanCounter.Inc(time.Since(start).Nanoseconds()) + return nil +} + +// removeStorageLeft deletes all storage entries which are located after +// the current iterator position. +func (ctx *generatorContext) removeStorageLeft() uint64 { + var ( + count uint64 + start = time.Now() + iter = ctx.storage + ) + for iter.Next() { + count++ + ctx.batch.Delete(iter.Key()) + if ctx.batch.ValueSize() > ethdb.IdealBatchSize { + ctx.batch.Write() + ctx.batch.Reset() + } + } + danglingStorageMeter.Mark(int64(count)) + storageCleanCounter.Inc(time.Since(start).Nanoseconds()) + return count +} diff --git a/triedb/pathdb/database.go b/triedb/pathdb/database.go index 69076bca16f9..061a6e5e030c 100644 --- a/triedb/pathdb/database.go +++ b/triedb/pathdb/database.go @@ -17,6 +17,7 @@ package pathdb import ( + "encoding/binary" "errors" "fmt" "io" @@ -34,8 +35,11 @@ import ( ) const ( - // defaultCleanSize is the default memory allowance of clean cache. - defaultCleanSize = 16 * 1024 * 1024 + // defaultTrieCleanSize is the default memory allowance of clean trie cache. + defaultTrieCleanSize = 16 * 1024 * 1024 + + // defaultStateCleanSize is the default memory allowance of clean state cache. + defaultStateCleanSize = 16 * 1024 * 1024 // maxBufferSize is the maximum memory allowance of node buffer. // Too large buffer will cause the system to pause for a long @@ -110,9 +114,11 @@ type layer interface { // Config contains the settings for database. type Config struct { StateHistory uint64 // Number of recent blocks to maintain state history for - CleanCacheSize int // Maximum memory allowance (in bytes) for caching clean nodes + TrieCleanSize int // Maximum memory allowance (in bytes) for caching clean **trie nodes** + StateCleanSize int // Maximum memory allowance (in bytes) for caching clean **states** WriteBufferSize int // Maximum memory allowance (in bytes) for write buffer - ReadOnly bool // Flag whether the database is opened in read only mode. + ReadOnly bool // Flag whether the database is opened in read only mode + SnapshotNoBuild bool // Flag Whether the background generation is allowed } // sanitize checks the provided user configurations and changes anything that's @@ -132,7 +138,11 @@ func (c *Config) fields() []interface{} { if c.ReadOnly { list = append(list, "readonly", true) } - list = append(list, "cache", common.StorageSize(c.CleanCacheSize)) + if c.SnapshotNoBuild { + list = append(list, "snapshot", false) + } + list = append(list, "triecache", common.StorageSize(c.TrieCleanSize)) + list = append(list, "statecache", common.StorageSize(c.StateCleanSize)) list = append(list, "buffer", common.StorageSize(c.WriteBufferSize)) list = append(list, "history", c.StateHistory) return list @@ -141,7 +151,8 @@ func (c *Config) fields() []interface{} { // Defaults contains default settings for Ethereum mainnet. var Defaults = &Config{ StateHistory: params.FullImmutabilityThreshold, - CleanCacheSize: defaultCleanSize, + TrieCleanSize: defaultTrieCleanSize, + StateCleanSize: defaultStateCleanSize, WriteBufferSize: defaultBufferSize, } @@ -213,6 +224,12 @@ func New(diskdb ethdb.Database, config *Config, isVerkle bool) *Database { log.Crit("Failed to disable database", "err", err) // impossible to happen } } + // Resolving the state snapshot generation progress from the database is + // mandatory. This ensures that uncovered flat states are not accessed, + // even if background generation is not allowed. If permitted, the generation + // might be scheduled. + db.setStateGenerator() + fields := config.fields() if db.isVerkle { fields = append(fields, "verkle", true) @@ -270,6 +287,53 @@ func (db *Database) repairHistory() error { return nil } +// setStateGenerator loads the state generation progress marker and potentially +// resume the state generation if it's permitted. +func (db *Database) setStateGenerator() { + // Load the state snapshot generation progress marker to prevent access + // to uncovered states. + generator, root := loadGenerator(db.diskdb) + if generator == nil { + // Initialize an empty generator to rebuild the state snapshot + // from scratch + generator = &journalGenerator{ + Marker: []byte{}, + } + } + // Short circuit if the whole state snapshot has already been fully generated. + // The generator will be left as nil in disk layer for representing the whole + // state snapshot is available for accessing. + if generator.Done { + return + } + var origin uint64 + if len(generator.Marker) >= 8 { + origin = binary.BigEndian.Uint64(generator.Marker) + } + stats := &generatorStats{ + origin: origin, + start: time.Now(), + accounts: generator.Accounts, + slots: generator.Slots, + storage: common.StorageSize(generator.Storage), + } + dl := db.tree.bottom() + + // Construct the generator and link it to the disk layer, ensuring that the + // generation progress is resolved to prevent accessing uncovered states + // regardless of whether background state snapshot generation is allowed. + noBuild := db.readOnly || db.config.SnapshotNoBuild + dl.generator = newGenerator(db.diskdb, noBuild, generator.Marker, stats) + + // Short circuit if the background generation is not permitted. Notably, + // snapshot generation is not functional in the verkle design. + if noBuild || db.isVerkle || db.waitSync { + return + } + stats.log("Starting snapshot generation", root, generator.Marker) + dl.generator.run(root) +} + // Update adds a new layer into the tree, if that can be linked to an existing // old parent. It is disallowed to insert a disk layer (the origin of all). Apart // from that this function will flatten the extra diff layers at bottom into disk @@ -330,8 +394,13 @@ func (db *Database) Disable() error { } db.waitSync = true - // Mark the disk layer as stale to prevent access to persistent state. - db.tree.bottom().markStale() + // Terminate the state generator if it's active and mark the disk layer + // as stale to prevent access to persistent state. + disk := db.tree.bottom() + if disk.generator != nil { + disk.generator.stop() + } + disk.markStale() // Write the initial sync flag to persist it across restarts. rawdb.WriteSnapSyncStatusFlag(db.diskdb, rawdb.StateSyncRunning) @@ -362,6 +431,7 @@ func (db *Database) Enable(root common.Hash) error { // reset the persistent state id back to zero. batch := db.diskdb.NewBatch() rawdb.DeleteTrieJournal(batch) + rawdb.DeleteSnapshotRoot(batch) rawdb.WritePersistentStateID(batch, 0) if err := batch.Write(); err != nil { return err @@ -375,13 +445,13 @@ func (db *Database) Enable(root common.Hash) error { return err } } - // Re-construct a new disk layer backed by persistent state - // with **empty clean cache and node buffer**. - db.tree.reset(newDiskLayer(root, 0, db, nil, newBuffer(db.config.WriteBufferSize, nil, nil, 0))) - // Re-enable the database as the final step. db.waitSync = false rawdb.WriteSnapSyncStatusFlag(db.diskdb, rawdb.StateSyncFinished) + + // Re-construct a new disk layer backed by persistent state + // and schedule the state snapshot generation if it's permitted. + db.tree.reset(generateSnapshot(db, root)) log.Info("Rebuilt trie database", "root", root) return nil } @@ -475,8 +545,12 @@ func (db *Database) Close() error { // following mutations. db.readOnly = true - // Release the memory held by clean cache. - db.tree.bottom().resetCache() + // Terminate the background generation if it's active + disk := db.tree.bottom() + if disk.generator != nil { + disk.generator.stop() + } + disk.resetCache() // release the memory held by clean cache // Close the attached state history freezer. if db.freezer == nil { @@ -556,20 +630,30 @@ func (db *Database) HistoryRange() (uint64, uint64, error) { return historyRange(db.freezer) } +// WaitGeneration waits until the background generation is finished. It assumes +// that the generation is permitted; otherwise, it will block indefinitely. +func (db *Database) WaitGeneration() { + gen := db.tree.bottom().generator + if gen == nil || gen.completed() { + return + } + <-gen.done +} + // AccountIterator creates a new account iterator for the specified root hash and // seeks to a starting account hash. func (db *Database) AccountIterator(root common.Hash, seek common.Hash) (AccountIterator, error) { - //if gen := db.tree.bottom().generator; gen != nil && !gen.completed() { - // return nil, errNotConstructed - //} + if gen := db.tree.bottom().generator; gen != nil && !gen.completed() { + return nil, errNotConstructed + } return newFastAccountIterator(db, root, seek) } // StorageIterator creates a new storage iterator for the specified root hash and // account. The iterator will be move to the specific start position. func (db *Database) StorageIterator(root common.Hash, account common.Hash, seek common.Hash) (StorageIterator, error) { - //if gen := db.tree.bottom().generator; gen != nil && !gen.completed() { - // return nil, errNotConstructed - //} + if gen := db.tree.bottom().generator; gen != nil && !gen.completed() { + return nil, errNotConstructed + } return newFastStorageIterator(db, root, account, seek) } diff --git a/triedb/pathdb/database_test.go b/triedb/pathdb/database_test.go index 1c796218d10c..642b47bd6c01 100644 --- a/triedb/pathdb/database_test.go +++ b/triedb/pathdb/database_test.go @@ -108,7 +108,8 @@ func newTester(t *testing.T, historyLimit uint64) *tester { disk, _ = rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false) db = New(disk, &Config{ StateHistory: historyLimit, - CleanCacheSize: 16 * 1024, + TrieCleanSize: 16 * 1024, + StateCleanSize: 16 * 1024, WriteBufferSize: 16 * 1024, }, false) obj = &tester{ diff --git a/triedb/pathdb/disklayer.go b/triedb/pathdb/disklayer.go index c0d278d6bc5b..a54702dc3b8f 100644 --- a/triedb/pathdb/disklayer.go +++ b/triedb/pathdb/disklayer.go @@ -17,7 +17,7 @@ package pathdb import ( - "errors" + "bytes" "fmt" "sync" @@ -34,24 +34,32 @@ type diskLayer struct { id uint64 // Immutable, corresponding state id db *Database // Path-based trie database nodes *fastcache.Cache // GC friendly memory cache of clean nodes + states *fastcache.Cache // GC friendly memory cache of clean states buffer *buffer // Dirty buffer to aggregate writes of nodes and states stale bool // Signals that the layer became stale (state progressed) - lock sync.RWMutex // Lock used to protect stale flag + lock sync.RWMutex // Lock used to protect stale flag and genMarker + + // The generator is set if the state snapshot was not fully completed + generator *generator } // newDiskLayer creates a new disk layer based on the passing arguments. -func newDiskLayer(root common.Hash, id uint64, db *Database, nodes *fastcache.Cache, buffer *buffer) *diskLayer { - // Initialize a clean cache if the memory allowance is not zero - // or reuse the provided cache if it is not nil (inherited from +func newDiskLayer(root common.Hash, id uint64, db *Database, nodes *fastcache.Cache, states *fastcache.Cache, buffer *buffer) *diskLayer { + // Initialize the clean caches if the memory allowance is not zero + // or reuse the provided caches if they are not nil (inherited from // the original disk layer). - if nodes == nil && db.config.CleanCacheSize != 0 { - nodes = fastcache.New(db.config.CleanCacheSize) + if nodes == nil && db.config.TrieCleanSize != 0 { + nodes = fastcache.New(db.config.TrieCleanSize) + } + if states == nil && db.config.StateCleanSize != 0 { + states = fastcache.New(db.config.StateCleanSize) } return &diskLayer{ root: root, id: id, db: db, nodes: nodes, + states: states, buffer: buffer, } } @@ -72,6 +80,13 @@ func (dl *diskLayer) parentLayer() layer { return nil } +// setGenerator links the given generator to disk layer, representing the +// associated state snapshot is not fully completed yet and the generation +// is potentially running in the background. +func (dl *diskLayer) setGenerator(generator *generator) { + dl.generator = generator +} + // isStale return whether this layer has become stale (was flattened across) or if // it's still live. func (dl *diskLayer) isStale() bool { @@ -171,8 +186,41 @@ func (dl *diskLayer) account(hash common.Hash, depth int) ([]byte, error) { } dirtyStateMissMeter.Mark(1) - // TODO(rjl493456442) support persistent state retrieval - return nil, errors.New("not supported") + // If the layer is being generated, ensure the requested account has + // already been covered by the generator. + marker := dl.genMarker() + if marker != nil && bytes.Compare(hash.Bytes(), marker) > 0 { + return nil, errNotCoveredYet + } + // Try to retrieve the account from the memory cache + if dl.states != nil { + if blob, found := dl.states.HasGet(nil, hash[:]); found { + cleanStateHitMeter.Mark(1) + cleanStateReadMeter.Mark(int64(len(blob))) + + if len(blob) == 0 { + stateAccountMissMeter.Mark(1) + } else { + stateAccountHitMeter.Mark(1) + } + return blob, nil + } + cleanStateMissMeter.Mark(1) + } + // Try to retrieve the account from the disk. + blob = rawdb.ReadAccountSnapshot(dl.db.diskdb, hash) + if dl.states != nil { + dl.states.Set(hash[:], blob) + cleanStateWriteMeter.Mark(int64(len(blob))) + } + if len(blob) == 0 { + stateAccountMissMeter.Mark(1) + stateAccountDiskMissMeter.Mark(1) + } else { + stateAccountHitMeter.Mark(1) + stateAccountDiskHitMeter.Mark(1) + } + return blob, nil } // storage directly retrieves the storage data associated with a particular hash, @@ -206,8 +254,42 @@ func (dl *diskLayer) storage(accountHash, storageHash common.Hash, depth int) ([ } dirtyStateMissMeter.Mark(1) - // TODO(rjl493456442) support persistent state retrieval - return nil, errors.New("not supported") + // If the layer is being generated, ensure the requested storage slot + // has already been covered by the generator. + key := append(accountHash[:], storageHash[:]...) + marker := dl.genMarker() + if marker != nil && bytes.Compare(key, marker) > 0 { + return nil, errNotCoveredYet + } + // Try to retrieve the storage slot from the memory cache + if dl.states != nil { + if blob, found := dl.states.HasGet(nil, key); found { + cleanStateHitMeter.Mark(1) + cleanStateReadMeter.Mark(int64(len(blob))) + + if len(blob) == 0 { + stateStorageMissMeter.Mark(1) + } else { + stateStorageHitMeter.Mark(1) + } + return blob, nil + } + cleanStateMissMeter.Mark(1) + } + // Try to retrieve the account from the disk + blob := rawdb.ReadStorageSnapshot(dl.db.diskdb, accountHash, storageHash) + if dl.states != nil { + dl.states.Set(key, blob) + cleanStateWriteMeter.Mark(int64(len(blob))) + } + if len(blob) == 0 { + stateStorageMissMeter.Mark(1) + stateStorageDiskMissMeter.Mark(1) + } else { + stateStorageHitMeter.Mark(1) + stateStorageDiskHitMeter.Mark(1) + } + return blob, nil } // update implements the layer interface, returning a new diff layer on top @@ -268,13 +350,32 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) { // Merge the trie nodes and flat states of the bottom-most diff layer into the // buffer as the combined layer. combined := dl.buffer.commit(bottom.nodes, bottom.states.stateSet) + + // Terminate the background state snapshot generation before mutating the + // persistent state. if combined.full() || force { - if err := combined.flush(dl.db.diskdb, dl.db.freezer, dl.nodes, bottom.stateID()); err != nil { + // Terminate the background state snapshot generator to prevent data race + var progress []byte + if dl.generator != nil { + dl.generator.stop() + progress = dl.generator.progressMarker() + log.Info("Terminated state snapshot generation") + } + // Flush the content in combined buffer. Any state data after the progress + // marker will be ignored, as the generator will pick it up later. + if err := combined.flush(bottom.root, dl.db.diskdb, dl.db.freezer, progress, dl.nodes, dl.states, bottom.stateID()); err != nil { return nil, err } } - ndl := newDiskLayer(bottom.root, bottom.stateID(), dl.db, dl.nodes, combined) - + ndl := newDiskLayer(bottom.root, bottom.stateID(), dl.db, dl.nodes, dl.states, combined) + + // Link the generator if snapshot is not yet completed + if dl.generator != nil && !dl.generator.completed() { + // Relaunch the state snapshot generation if it's not done yet + ndl.setGenerator(dl.generator) + ndl.generator.run(bottom.root) + log.Info("Resumed state snapshot generation", "root", bottom.root) + } // To remove outdated history objects from the end, we set the 'tail' parameter // to 'oldest-1' due to the offset between the freezer index and the history ID. if overflow { @@ -336,15 +437,41 @@ func (dl *diskLayer) revert(h *history) (*diskLayer, error) { if err != nil { return nil, err } - } else { - batch := dl.db.diskdb.NewBatch() - writeNodes(batch, nodes, dl.nodes) - rawdb.WritePersistentStateID(batch, dl.id-1) - if err := batch.Write(); err != nil { - log.Crit("Failed to write states", "err", err) + ndl := newDiskLayer(h.meta.parent, dl.id-1, dl.db, dl.nodes, dl.states, dl.buffer) + + // Link the generator if it exists + if dl.generator != nil { + ndl.setGenerator(dl.generator) } + return ndl, nil + } + // Terminate the generation before writing any data into database + var progress []byte + if dl.generator != nil { + dl.generator.stop() + progress = dl.generator.progressMarker() } - return newDiskLayer(h.meta.parent, dl.id-1, dl.db, dl.nodes, dl.buffer), nil + batch := dl.db.diskdb.NewBatch() + writeNodes(batch, nodes, dl.nodes) + + // Provide the original values of modified accounts and storages for revert. + // Note the account deletions are included in accounts map (with value as nil), + // rather than the destruction list (nil list). + writeStates(dl.db.diskdb, batch, progress, nil, accounts, storages, dl.states) + rawdb.WritePersistentStateID(batch, dl.id-1) + rawdb.WriteSnapshotRoot(batch, h.meta.parent) + if err := batch.Write(); err != nil { + log.Crit("Failed to write states", "err", err) + } + // Link the generator and resume generation if the snapshot is not yet + // fully completed. + ndl := newDiskLayer(h.meta.parent, dl.id-1, dl.db, dl.nodes, dl.states, dl.buffer) + if dl.generator != nil && !dl.generator.completed() { + ndl.generator = dl.generator + ndl.generator.run(h.meta.parent) + log.Info("Resumed state snapshot generation", "root", h.meta.parent) + } + return ndl, nil } // size returns the approximate size of cached nodes in the disk layer. @@ -370,6 +497,18 @@ func (dl *diskLayer) resetCache() { if dl.nodes != nil { dl.nodes.Reset() } + if dl.states != nil { + dl.states.Reset() + } +} + +// genMarker returns the current state snapshot generation progress marker. If +// the state snapshot has already been fully generated, nil is returned. +func (dl *diskLayer) genMarker() []byte { + if dl.generator == nil { + return nil + } + return dl.generator.progressMarker() } // hasher is used to compute the sha256 hash of the provided data. diff --git a/triedb/pathdb/errors.go b/triedb/pathdb/errors.go index 498bc9ec8107..b656a6f3dcab 100644 --- a/triedb/pathdb/errors.go +++ b/triedb/pathdb/errors.go @@ -39,4 +39,13 @@ var ( // errStateUnrecoverable is returned if state is required to be reverted to // a destination without associated state history available. errStateUnrecoverable = errors.New("state is unrecoverable") + + // errNotCoveredYet is returned from data accessors if the underlying snapshot + // is being generated currently and the requested data item is not yet in the + // range of accounts covered. + errNotCoveredYet = errors.New("not covered yet") + + // errNotConstructed is returned if the callers want to iterate the snapshot + // while the generation is not finished yet. + errNotConstructed = errors.New("state snapshot is not constructed") ) diff --git a/triedb/pathdb/flush.go b/triedb/pathdb/flush.go index baa0bfb292bc..f6c02900a334 100644 --- a/triedb/pathdb/flush.go +++ b/triedb/pathdb/flush.go @@ -17,6 +17,8 @@ package pathdb import ( + "bytes" + "github.com/VictoriaMetrics/fastcache" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/rawdb" @@ -63,3 +65,83 @@ func writeNodes(batch ethdb.Batch, nodes map[common.Hash]map[string]*trienode.No } return total } + +// writeStates flushes state mutations into the provided database batch as a whole. +func writeStates(db ethdb.KeyValueStore, batch ethdb.Batch, genMarker []byte, destructSet map[common.Hash]struct{}, accountData map[common.Hash][]byte, storageData map[common.Hash]map[common.Hash][]byte, clean *fastcache.Cache) (int, int) { + var ( + accounts int + slots int + ) + for addrHash := range destructSet { + // Skip any account not covered yet by the snapshot. The account + // at generation position (addrHash == genMarker[:common.HashLength]) + // should be updated. + if genMarker != nil && bytes.Compare(addrHash[:], genMarker) > 0 { + continue + } + rawdb.DeleteAccountSnapshot(batch, addrHash) + accounts += 1 + if clean != nil { + clean.Set(addrHash[:], nil) + } + // Safe to traverse the account storage for Ethereum mainnet (no OOM issue) + it := rawdb.IterateStorageSnapshots(db, addrHash) + for it.Next() { + batch.Delete(it.Key()) + slots += 1 + if clean != nil { + clean.Del(it.Key()[len(rawdb.SnapshotStoragePrefix):]) + } + } + it.Release() + } + for addrHash, blob := range accountData { + // Skip any account not covered yet by the snapshot. The account + // at generation position (addrHash == genMarker[:common.HashLength]) + // should be updated. + if genMarker != nil && bytes.Compare(addrHash[:], genMarker) > 0 { + continue + } + accounts += 1 + if len(blob) == 0 { + rawdb.DeleteAccountSnapshot(batch, addrHash) + if clean != nil { + clean.Set(addrHash[:], nil) + } + } else { + rawdb.WriteAccountSnapshot(batch, addrHash, blob) + if clean != nil { + clean.Set(addrHash[:], blob) + } + } + } + for addrHash, storages := range storageData { + // Skip any account not covered yet by the snapshot + if genMarker != nil && bytes.Compare(addrHash[:], genMarker) > 0 { + continue + } + midAccount := genMarker != nil && bytes.Equal(addrHash[:], genMarker[:common.HashLength]) + + for storageHash, blob := range storages { + // Skip any slot not covered yet by the snapshot. The storage slot + // at generation position (addrHash == genMarker[:common.HashLength] + // and storageHash == genMarker[common.HashLength:]) should be updated. + if midAccount && bytes.Compare(storageHash[:], genMarker[common.HashLength:]) > 0 { + continue + } + slots += 1 + if len(blob) == 0 { + rawdb.DeleteStorageSnapshot(batch, addrHash, storageHash) + if clean != nil { + clean.Set(append(addrHash[:], storageHash[:]...), nil) + } + } else { + rawdb.WriteStorageSnapshot(batch, addrHash, storageHash, blob) + if clean != nil { + clean.Set(append(addrHash[:], storageHash[:]...), blob) + } + } + } + } + return accounts, slots +} diff --git a/triedb/pathdb/generate.go b/triedb/pathdb/generate.go new file mode 100644 index 000000000000..53f6211fa1a3 --- /dev/null +++ b/triedb/pathdb/generate.go @@ -0,0 +1,845 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package pathdb + +import ( + "bytes" + "errors" + "fmt" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/trie" + "github.com/ethereum/go-ethereum/triedb/database" +) + +var ( + // accountCheckRange is the upper limit of the number of accounts involved in + // each range check. This is a value estimated based on experience. If this + // range is too large, the failure rate of range proof will increase. Otherwise, + // if the range is too small, the efficiency of the state recovery will decrease. + accountCheckRange = 128 + + // storageCheckRange is the upper limit of the number of storage slots involved + // in each range check. This is a value estimated based on experience. If this + // range is too large, the failure rate of range proof will increase. Otherwise, + // if the range is too small, the efficiency of the state recovery will decrease. + storageCheckRange = 1024 + + // errMissingTrie is returned if the target trie is missing while the generation + // is running. In this case the generation is aborted and wait the new signal. + errMissingTrie = errors.New("missing trie") +) + +// diskReader is a wrapper of key-value store and implements database.NodeReader, +// providing a function for accessing persistent trie nodes in the disk +type diskReader struct{ db ethdb.KeyValueStore } + +// Node retrieves the trie node blob with the provided trie identifier, +// node path and the corresponding node hash. No error will be returned +// if the node is not found. +func (r *diskReader) Node(owner common.Hash, path []byte, hash common.Hash) ([]byte, error) { + if owner == (common.Hash{}) { + return rawdb.ReadAccountTrieNode(r.db, path), nil + } + return rawdb.ReadStorageTrieNode(r.db, owner, path), nil +} + +// diskStore is a wrapper of key-value store and implements database.NodeDatabase. +// It's meant to be used for generating state snapshot from the trie data. +type diskStore struct { + db ethdb.KeyValueStore +} + +// NodeReader returns a node reader associated with the specific state. +// An error will be returned if the specified state is not available. +func (s *diskStore) NodeReader(stateRoot common.Hash) (database.NodeReader, error) { + root := types.EmptyRootHash + if blob := rawdb.ReadAccountTrieNode(s.db, nil); len(blob) > 0 { + root = crypto.Keccak256Hash(blob) + } + if root != stateRoot { + return nil, fmt.Errorf("state %x is not available", stateRoot) + } + return &diskReader{s.db}, nil +} + +// Generator is the struct for initial state snapshot generation. It is not thread-safe; +// the caller must manage concurrency issues themselves. +type generator struct { + noBuild bool // Flag indicating whether snapshot generation is permitted + running bool // Flag indicating whether the background generation is running + + db ethdb.KeyValueStore // Key-value store containing the snapshot data + stats *generatorStats // Generation statistics used throughout the entire life cycle + abort chan chan struct{} // Notification channel to abort generating the snapshot in this layer + done chan struct{} // Notification channel when generation is done + + progress []byte // Progress marker of the state generation, nil means it's completed + lock sync.RWMutex // Lock which protects the progress +} + +// newGenerator constructs the state snapshot generator. +func newGenerator(db ethdb.KeyValueStore, noBuild bool, progress []byte, stats *generatorStats) *generator { + if stats == nil { + stats = &generatorStats{start: time.Now()} + } + return &generator{ + noBuild: noBuild, + progress: progress, + db: db, + stats: stats, + abort: make(chan chan struct{}), + done: make(chan struct{}), + } +} + +// run starts the state snapshot generation in the background. +func (g *generator) run(root common.Hash) { + if g.noBuild { + log.Warn("Snapshot generation is not permitted") + return + } + if g.running { + g.stop() + log.Warn("Terminated the leftover generation cycle") + } + g.running = true + go g.generate(newGeneratorContext(root, g.progress, g.db)) +} + +// stop terminates the background generation if it's actively running. +// The Recent generation progress being made will be saved before returning. +func (g *generator) stop() { + if !g.running { + log.Debug("Snapshot generation is not running") + return + } + ch := make(chan struct{}) + g.abort <- ch + <-ch + g.running = false +} + +// completed returns the flag indicating if the whole generation is done. +func (g *generator) completed() bool { + progress := g.progressMarker() + return progress == nil +} + +// progressMarker returns the current generation progress marker. It may lag +// slightly behind the actual generation position (the progress field will only +// be updated once checkAndFlush is called), but it's acceptable. +func (g *generator) progressMarker() []byte { + g.lock.RLock() + defer g.lock.RUnlock() + + return g.progress +} + +// splitMarker is an internal helper which splits the generation progress marker +// into two parts. +func splitMarker(marker []byte) ([]byte, []byte) { + var accMarker []byte + if len(marker) > 0 { // []byte{} is the start, use nil for that + accMarker = marker[:common.HashLength] + } + return accMarker, marker +} + +// generateSnapshot regenerates a brand-new snapshot based on an existing state +// database and head block asynchronously. The snapshot is returned immediately +// and generation is continued in the background until done. +func generateSnapshot(triedb *Database, root common.Hash) *diskLayer { + // Create a new disk layer with an initialized state marker at zero + var ( + stats = &generatorStats{start: time.Now()} + genMarker = []byte{} // Initialized but empty! + ) + dl := newDiskLayer(root, 0, triedb, nil, nil, newBuffer(triedb.config.WriteBufferSize, nil, nil, 0)) + dl.generator = newGenerator(triedb.diskdb, false, genMarker, stats) + dl.generator.run(root) + log.Info("Started snapshot generation", "root", root) + return dl +} + +// journalProgress persists the generator stats into the database to resume later. +func journalProgress(db ethdb.KeyValueWriter, marker []byte, stats *generatorStats) { + // Write out the generator marker. Note it's a standalone disk layer generator + // which is not mixed with journal. It's ok if the generator is persisted while + // journal is not. + entry := journalGenerator{ + Done: marker == nil, + Marker: marker, + } + if stats != nil { + entry.Accounts = stats.accounts + entry.Slots = stats.slots + entry.Storage = uint64(stats.storage) + } + blob, err := rlp.EncodeToBytes(entry) + if err != nil { + panic(err) // Cannot happen, here to catch dev errors + } + var logstr string + switch { + case marker == nil: + logstr = "done" + case bytes.Equal(marker, []byte{}): + logstr = "empty" + case len(marker) == common.HashLength: + logstr = fmt.Sprintf("%#x", marker) + default: + logstr = fmt.Sprintf("%#x:%#x", marker[:common.HashLength], marker[common.HashLength:]) + } + log.Debug("Journalled generator progress", "progress", logstr) + rawdb.WriteSnapshotGenerator(db, blob) +} + +// proofResult contains the output of range proving which can be used +// for further processing regardless if it is successful or not. +type proofResult struct { + keys [][]byte // The key set of all elements being iterated, even proving is failed + vals [][]byte // The val set of all elements being iterated, even proving is failed + diskMore bool // Set when the database has extra snapshot states since last iteration + trieMore bool // Set when the trie has extra snapshot states(only meaningful for successful proving) + proofErr error // Indicator whether the given state range is valid or not + tr *trie.Trie // The trie, in case the trie was resolved by the prover (may be nil) +} + +// valid returns the indicator that range proof is successful or not. +func (result *proofResult) valid() bool { + return result.proofErr == nil +} + +// last returns the last verified element key regardless of whether the range proof is +// successful or not. Nil is returned if nothing involved in the proving. +func (result *proofResult) last() []byte { + var last []byte + if len(result.keys) > 0 { + last = result.keys[len(result.keys)-1] + } + return last +} + +// forEach iterates all the visited elements and applies the given callback on them. +// The iteration is aborted if the callback returns non-nil error. +func (result *proofResult) forEach(callback func(key []byte, val []byte) error) error { + for i := 0; i < len(result.keys); i++ { + key, val := result.keys[i], result.vals[i] + if err := callback(key, val); err != nil { + return err + } + } + return nil +} + +// proveRange proves the snapshot segment with particular prefix is "valid". +// The iteration start point will be assigned if the iterator is restored from +// the last interruption. Max will be assigned in order to limit the maximum +// amount of data involved in each iteration. +// +// The proof result will be returned if the range proving is finished, otherwise +// the error will be returned to abort the entire procedure. +func (g *generator) proveRange(ctx *generatorContext, trieId *trie.ID, prefix []byte, kind string, origin []byte, max int, valueConvertFn func([]byte) ([]byte, error)) (*proofResult, error) { + var ( + keys [][]byte + vals [][]byte + proof = rawdb.NewMemoryDatabase() + diskMore = false + iter = ctx.iterator(kind) + start = time.Now() + min = append(prefix, origin...) + ) + for iter.Next() { + // Ensure the iterated item is always equal or larger than the given origin. + key := iter.Key() + if bytes.Compare(key, min) < 0 { + return nil, errors.New("invalid iteration position") + } + // Ensure the iterated item still fall in the specified prefix. If + // not which means the items in the specified area are all visited. + // Move the iterator a step back since we iterate one extra element + // out. + if !bytes.Equal(key[:len(prefix)], prefix) { + iter.Hold() + break + } + // Break if we've reached the max size, and signal that we're not + // done yet. Move the iterator a step back since we iterate one + // extra element out. + if len(keys) == max { + iter.Hold() + diskMore = true + break + } + keys = append(keys, common.CopyBytes(key[len(prefix):])) + + if valueConvertFn == nil { + vals = append(vals, common.CopyBytes(iter.Value())) + } else { + val, err := valueConvertFn(iter.Value()) + if err != nil { + // Special case, the state data is corrupted (invalid slim-format account), + // don't abort the entire procedure directly. Instead, let the fallback + // generation to heal the invalid data. + // + // Here append the original value to ensure that the number of key and + // value are aligned. + vals = append(vals, common.CopyBytes(iter.Value())) + log.Error("Failed to convert account state data", "err", err) + } else { + vals = append(vals, val) + } + } + } + // Update metrics for database iteration and merkle proving + if kind == snapStorage { + storageSnapReadCounter.Inc(time.Since(start).Nanoseconds()) + } else { + accountSnapReadCounter.Inc(time.Since(start).Nanoseconds()) + } + defer func(start time.Time) { + if kind == snapStorage { + storageProveCounter.Inc(time.Since(start).Nanoseconds()) + } else { + accountProveCounter.Inc(time.Since(start).Nanoseconds()) + } + }(time.Now()) + + // The snap state is exhausted, pass the entire key/val set for verification + root := trieId.Root + if origin == nil && !diskMore { + stackTr := trie.NewStackTrie(nil) + for i, key := range keys { + if err := stackTr.Update(key, vals[i]); err != nil { + return nil, err + } + } + if gotRoot := stackTr.Hash(); gotRoot != root { + return &proofResult{ + keys: keys, + vals: vals, + proofErr: fmt.Errorf("wrong root: have %#x want %#x", gotRoot, root), + }, nil + } + return &proofResult{keys: keys, vals: vals}, nil + } + // Snap state is chunked, generate edge proofs for verification. + tr, err := trie.New(trieId, &diskStore{db: g.db}) + if err != nil { + log.Info("Trie missing, state snapshotting paused", "state", ctx.root, "kind", kind, "root", trieId.Root) + return nil, errMissingTrie + } + // Generate the Merkle proofs for the first and last element + if origin == nil { + origin = common.Hash{}.Bytes() + } + if err := tr.Prove(origin, proof); err != nil { + log.Debug("Failed to prove range", "kind", kind, "origin", origin, "err", err) + return &proofResult{ + keys: keys, + vals: vals, + diskMore: diskMore, + proofErr: err, + tr: tr, + }, nil + } + if len(keys) > 0 { + if err := tr.Prove(keys[len(keys)-1], proof); err != nil { + log.Debug("Failed to prove range", "kind", kind, "last", keys[len(keys)-1], "err", err) + return &proofResult{ + keys: keys, + vals: vals, + diskMore: diskMore, + proofErr: err, + tr: tr, + }, nil + } + } + // Verify the snapshot segment with range prover, ensure that all flat states + // in this range correspond to merkle trie. + cont, err := trie.VerifyRangeProof(root, origin, keys, vals, proof) + return &proofResult{ + keys: keys, + vals: vals, + diskMore: diskMore, + trieMore: cont, + proofErr: err, + tr: tr}, + nil +} + +// onStateCallback is a function that is called by generateRange, when processing a range of +// accounts or storage slots. For each element, the callback is invoked. +// +// - If 'delete' is true, then this element (and potential slots) needs to be deleted from the snapshot. +// - If 'write' is true, then this element needs to be updated with the 'val'. +// - If 'write' is false, then this element is already correct, and needs no update. +// The 'val' is the canonical encoding of the value (not the slim format for accounts) +// +// However, for accounts, the storage trie of the account needs to be checked. Also, +// dangling storages(storage exists but the corresponding account is missing) need to +// be cleaned up. +type onStateCallback func(key []byte, val []byte, write bool, delete bool) error + +// generateRange generates the state segment with particular prefix. Generation can +// either verify the correctness of existing state through range-proof and skip +// generation, or iterate trie to regenerate state on demand. +func (g *generator) generateRange(ctx *generatorContext, trieId *trie.ID, prefix []byte, kind string, origin []byte, max int, onState onStateCallback, valueConvertFn func([]byte) ([]byte, error)) (bool, []byte, error) { + // Use range prover to check the validity of the flat state in the range + result, err := g.proveRange(ctx, trieId, prefix, kind, origin, max, valueConvertFn) + if err != nil { + return false, nil, err + } + last := result.last() + + // Construct contextual logger + logCtx := []interface{}{"kind", kind, "prefix", hexutil.Encode(prefix)} + if len(origin) > 0 { + logCtx = append(logCtx, "origin", hexutil.Encode(origin)) + } + logger := log.New(logCtx...) + + // The range prover says the range is correct, skip trie iteration + if result.valid() { + successfulRangeProofMeter.Mark(1) + logger.Trace("Proved state range", "last", hexutil.Encode(last)) + + // The verification is passed, process each state with the given + // callback function. If this state represents a contract, the + // corresponding storage check will be performed in the callback + if err := result.forEach(func(key []byte, val []byte) error { return onState(key, val, false, false) }); err != nil { + return false, nil, err + } + // Only abort the iteration when both database and trie are exhausted + return !result.diskMore && !result.trieMore, last, nil + } + logger.Trace("Detected outdated state range", "last", hexutil.Encode(last), "err", result.proofErr) + failedRangeProofMeter.Mark(1) + + // Special case, the entire trie is missing. In the original trie scheme, + // all the duplicated subtries will be filtered out (only one copy of data + // will be stored). While in the snapshot model, all the storage tries + // belong to different contracts will be kept even they are duplicated. + // Track it to a certain extent remove the noise data used for statistics. + if origin == nil && last == nil { + meter := missallAccountMeter + if kind == snapStorage { + meter = missallStorageMeter + } + meter.Mark(1) + } + // We use the snap data to build up a cache which can be used by the + // main account trie as a primary lookup when resolving hashes + var resolver trie.NodeResolver + if len(result.keys) > 0 { + tr := trie.NewEmpty(nil) + for i, key := range result.keys { + tr.Update(key, result.vals[i]) + } + _, nodes := tr.Commit(false) + hashSet := nodes.HashSet() + resolver = func(owner common.Hash, path []byte, hash common.Hash) []byte { + return hashSet[hash] + } + } + // Construct the trie for state iteration, reuse the trie + // if it's already opened with some nodes resolved. + tr := result.tr + if tr == nil { + tr, err = trie.New(trieId, &diskStore{db: g.db}) + if err != nil { + log.Info("Trie missing, state snapshotting paused", "state", ctx.root, "kind", kind, "root", trieId.Root) + return false, nil, errMissingTrie + } + } + var ( + trieMore bool + kvkeys, kvvals = result.keys, result.vals + + // counters + count = 0 // number of states delivered by iterator + created = 0 // states created from the trie + updated = 0 // states updated from the trie + deleted = 0 // states not in trie, but were in snapshot + untouched = 0 // states already correct + + // timers + start = time.Now() + internal time.Duration + ) + nodeIt, err := tr.NodeIterator(origin) + if err != nil { + return false, nil, err + } + nodeIt.AddResolver(resolver) + iter := trie.NewIterator(nodeIt) + + for iter.Next() { + if last != nil && bytes.Compare(iter.Key, last) > 0 { + trieMore = true + break + } + count++ + write := true + created++ + for len(kvkeys) > 0 { + if cmp := bytes.Compare(kvkeys[0], iter.Key); cmp < 0 { + // delete the key + istart := time.Now() + if err := onState(kvkeys[0], nil, false, true); err != nil { + return false, nil, err + } + kvkeys = kvkeys[1:] + kvvals = kvvals[1:] + deleted++ + internal += time.Since(istart) + continue + } else if cmp == 0 { + // the snapshot key can be overwritten + created-- + if write = !bytes.Equal(kvvals[0], iter.Value); write { + updated++ + } else { + untouched++ + } + kvkeys = kvkeys[1:] + kvvals = kvvals[1:] + } + break + } + istart := time.Now() + if err := onState(iter.Key, iter.Value, write, false); err != nil { + return false, nil, err + } + internal += time.Since(istart) + } + if iter.Err != nil { + // Trie errors should never happen. Still, in case of a bug, expose the + // error here, as the outer code will presume errors are interrupts, not + // some deeper issues. + log.Error("State snapshotter failed to iterate trie", "err", iter.Err) + return false, nil, iter.Err + } + // Delete all stale snapshot states remaining + istart := time.Now() + for _, key := range kvkeys { + if err := onState(key, nil, false, true); err != nil { + return false, nil, err + } + deleted += 1 + } + internal += time.Since(istart) + + // Update metrics for counting trie iteration + if kind == snapStorage { + storageTrieReadCounter.Inc((time.Since(start) - internal).Nanoseconds()) + } else { + accountTrieReadCounter.Inc((time.Since(start) - internal).Nanoseconds()) + } + logger.Debug("Regenerated state range", "root", trieId.Root, "last", hexutil.Encode(last), + "count", count, "created", created, "updated", updated, "untouched", untouched, "deleted", deleted) + + // If there are either more trie items, or there are more snap items + // (in the next segment), then we need to keep working + return !trieMore && !result.diskMore, last, nil +} + +// checkAndFlush checks if an interruption signal is received or the +// batch size has exceeded the allowance. +func (g *generator) checkAndFlush(ctx *generatorContext, current []byte) error { + var abort chan struct{} + select { + case abort = <-g.abort: + default: + } + if ctx.batch.ValueSize() > ethdb.IdealBatchSize || abort != nil { + if bytes.Compare(current, g.progress) < 0 { + log.Error("Snapshot generator went backwards", "current", fmt.Sprintf("%x", current), "genMarker", fmt.Sprintf("%x", g.progress)) + } + // Persist the progress marker regardless of whether the batch is empty or not. + // It may happen that all the flat states in the database are correct, so the + // generator indeed makes progress even if there is nothing to commit. + journalProgress(ctx.batch, current, g.stats) + + // Flush out the database writes atomically + if err := ctx.batch.Write(); err != nil { + return err + } + ctx.batch.Reset() + + // Update the generation progress marker + g.lock.Lock() + g.progress = current + g.lock.Unlock() + + // Abort the generation if it's required + if abort != nil { + g.stats.log("Aborting state snapshot generation", ctx.root, g.progress) + return newAbortErr(abort) // bubble up an error for interruption + } + // Don't hold the iterators too long, release them to let compactor works + ctx.reopenIterator(snapAccount) + ctx.reopenIterator(snapStorage) + } + if time.Since(ctx.logged) > 8*time.Second { + g.stats.log("Generating state snapshot", ctx.root, g.progress) + ctx.logged = time.Now() + } + return nil +} + +// generateStorages generates the missing storage slots of the specific contract. +// It's supposed to restart the generation from the given origin position. +func (g *generator) generateStorages(ctx *generatorContext, account common.Hash, storageRoot common.Hash, storeMarker []byte) error { + onStorage := func(key []byte, val []byte, write bool, delete bool) error { + defer func(start time.Time) { + storageWriteCounter.Inc(time.Since(start).Nanoseconds()) + }(time.Now()) + + if delete { + rawdb.DeleteStorageSnapshot(ctx.batch, account, common.BytesToHash(key)) + wipedStorageMeter.Mark(1) + return nil + } + if write { + rawdb.WriteStorageSnapshot(ctx.batch, account, common.BytesToHash(key), val) + generatedStorageMeter.Mark(1) + } else { + recoveredStorageMeter.Mark(1) + } + g.stats.storage += common.StorageSize(1 + 2*common.HashLength + len(val)) + g.stats.slots++ + + // If we've exceeded our batch allowance or termination was requested, flush to disk + if err := g.checkAndFlush(ctx, append(account[:], key...)); err != nil { + return err + } + return nil + } + // Loop for re-generating the missing storage slots. + var origin = common.CopyBytes(storeMarker) + for { + id := trie.StorageTrieID(ctx.root, account, storageRoot) + exhausted, last, err := g.generateRange(ctx, id, append(rawdb.SnapshotStoragePrefix, account.Bytes()...), snapStorage, origin, storageCheckRange, onStorage, nil) + if err != nil { + return err // The procedure it aborted, either by external signal or internal error. + } + // Abort the procedure if the entire contract storage is generated + if exhausted { + break + } + if origin = increaseKey(last); origin == nil { + break // special case, the last is 0xffffffff...fff + } + } + return nil +} + +// generateAccounts generates the missing snapshot accounts as well as their +// storage slots in the main trie. It's supposed to restart the generation +// from the given origin position. +func (g *generator) generateAccounts(ctx *generatorContext, accMarker []byte) error { + onAccount := func(key []byte, val []byte, write bool, delete bool) error { + // Make sure to clear all dangling storages before this account + account := common.BytesToHash(key) + g.stats.dangling += ctx.removeStorageBefore(account) + + start := time.Now() + if delete { + rawdb.DeleteAccountSnapshot(ctx.batch, account) + wipedAccountMeter.Mark(1) + accountWriteCounter.Inc(time.Since(start).Nanoseconds()) + + ctx.removeStorageAt(account) + return nil + } + // Retrieve the current account and flatten it into the internal format + var acc types.StateAccount + if err := rlp.DecodeBytes(val, &acc); err != nil { + log.Crit("Invalid account encountered during snapshot creation", "err", err) + } + // If the account is not yet in-progress, write it out + if accMarker == nil || !bytes.Equal(account[:], accMarker) { + dataLen := len(val) // Approximate size, saves us a round of RLP-encoding + if !write { + if bytes.Equal(acc.CodeHash, types.EmptyCodeHash[:]) { + dataLen -= 32 + } + if acc.Root == types.EmptyRootHash { + dataLen -= 32 + } + recoveredAccountMeter.Mark(1) + } else { + data := types.SlimAccountRLP(acc) + dataLen = len(data) + rawdb.WriteAccountSnapshot(ctx.batch, account, data) + generatedAccountMeter.Mark(1) + } + g.stats.storage += common.StorageSize(1 + common.HashLength + dataLen) + g.stats.accounts++ + } + // If the snap generation goes here after interrupted, genMarker may go backward + // when last genMarker is consisted of accountHash and storageHash + marker := account[:] + if accMarker != nil && bytes.Equal(marker, accMarker) && len(g.progress) > common.HashLength { + marker = g.progress + } + // If we've exceeded our batch allowance or termination was requested, flush to disk + if err := g.checkAndFlush(ctx, marker); err != nil { + return err + } + accountWriteCounter.Inc(time.Since(start).Nanoseconds()) // let's count flush time as well + + // If the iterated account is the contract, create a further loop to + // verify or regenerate the contract storage. + if acc.Root == types.EmptyRootHash { + ctx.removeStorageAt(account) + } else { + var storeMarker []byte + if accMarker != nil && bytes.Equal(account[:], accMarker) && len(g.progress) > common.HashLength { + storeMarker = g.progress[common.HashLength:] + } + if err := g.generateStorages(ctx, account, acc.Root, storeMarker); err != nil { + return err + } + } + // Some account processed, unmark the marker + accMarker = nil + return nil + } + origin := common.CopyBytes(accMarker) + for { + id := trie.StateTrieID(ctx.root) + exhausted, last, err := g.generateRange(ctx, id, rawdb.SnapshotAccountPrefix, snapAccount, origin, accountCheckRange, onAccount, types.FullAccountRLP) + if err != nil { + return err // The procedure it aborted, either by external signal or internal error. + } + origin = increaseKey(last) + + // Last step, cleanup the storages after the last account. + // All the left storages should be treated as dangling. + if origin == nil || exhausted { + g.stats.dangling += ctx.removeStorageLeft() + break + } + } + return nil +} + +// generate is a background thread that iterates over the state and storage tries, +// constructing the state snapshot. All the arguments are purely for statistics +// gathering and logging, since the method surfs the blocks as they arrive, often +// being restarted. +func (g *generator) generate(ctx *generatorContext) { + g.stats.log("Resuming state snapshot generation", ctx.root, g.progress) + defer ctx.close() + + // Persist the initial marker and state snapshot root if progress is none + if len(g.progress) == 0 { + batch := g.db.NewBatch() + rawdb.WriteSnapshotRoot(batch, ctx.root) + journalProgress(batch, g.progress, g.stats) + if err := batch.Write(); err != nil { + log.Crit("Failed to write initialized state marker", "err", err) + } + } + // Initialize the global generator context. The snapshot iterators are + // opened at the interrupted position because the assumption is held + // that all the snapshot data are generated correctly before the marker. + // Even if the snapshot data is updated during the interruption (before + // or at the marker), the assumption is still held. + // For the account or storage slot at the interruption, they will be + // processed twice by the generator(they are already processed in the + // last run) but it's fine. + var ( + accMarker, _ = splitMarker(g.progress) + abort chan struct{} + ) + if err := g.generateAccounts(ctx, accMarker); err != nil { + // Extract the received interruption signal if exists + var aerr *abortErr + if errors.As(err, &aerr) { + abort = aerr.abort + } + // Aborted by internal error, wait the signal + if abort == nil { + abort = <-g.abort + } + close(abort) + return + } + // Snapshot fully generated, set the marker to nil. + // Note even there is nothing to commit, persist the + // generator anyway to mark the snapshot is complete. + journalProgress(ctx.batch, nil, g.stats) + if err := ctx.batch.Write(); err != nil { + log.Error("Failed to flush batch", "err", err) + abort = <-g.abort + close(abort) + return + } + ctx.batch.Reset() + + log.Info("Generated state snapshot", "accounts", g.stats.accounts, "slots", g.stats.slots, + "storage", g.stats.storage, "dangling", g.stats.dangling, "elapsed", common.PrettyDuration(time.Since(g.stats.start))) + + // Update the generation progress marker + g.lock.Lock() + g.progress = nil + g.lock.Unlock() + close(g.done) + + // Someone will be looking for us, wait it out + abort = <-g.abort + close(abort) +} + +// increaseKey increase the input key by one bit. Return nil if the entire +// addition operation overflows. +func increaseKey(key []byte) []byte { + for i := len(key) - 1; i >= 0; i-- { + key[i]++ + if key[i] != 0x0 { + return key + } + } + return nil +} + +// abortErr wraps an interruption signal received to represent the +// generation is aborted by external processes. +type abortErr struct { + abort chan struct{} +} + +func newAbortErr(abort chan struct{}) error { + return &abortErr{abort: abort} +} + +func (err *abortErr) Error() string { + return "aborted" +} diff --git a/triedb/pathdb/generate_test.go b/triedb/pathdb/generate_test.go new file mode 100644 index 000000000000..bbc7da3c9b13 --- /dev/null +++ b/triedb/pathdb/generate_test.go @@ -0,0 +1,740 @@ +// Copyright 2024 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package pathdb + +import ( + "fmt" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/internal/testrand" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/trie" + "github.com/ethereum/go-ethereum/trie/trienode" + "github.com/holiman/uint256" +) + +func hashData(input []byte) common.Hash { + return crypto.Keccak256Hash(input) +} + +type genTester struct { + diskdb ethdb.Database + db *Database + acctTrie *trie.Trie + nodes *trienode.MergedNodeSet + states *StateSetWithOrigin +} + +func newGenTester() *genTester { + disk := rawdb.NewMemoryDatabase() + config := *Defaults + config.SnapshotNoBuild = true // no background generation + db := New(disk, &config, false) + tr, _ := trie.New(trie.StateTrieID(types.EmptyRootHash), db) + return &genTester{ + diskdb: disk, + db: db, + acctTrie: tr, + nodes: trienode.NewMergedNodeSet(), + states: NewStateSetWithOrigin(nil, nil, nil, nil, nil), + } +} + +func (t *genTester) addTrieAccount(acckey string, acc *types.StateAccount) { + var ( + addr = common.BytesToAddress([]byte(acckey)) + key = hashData([]byte(acckey)) + val, _ = rlp.EncodeToBytes(acc) + ) + t.acctTrie.MustUpdate(key.Bytes(), val) + + t.states.accountData[key] = val + t.states.accountOrigin[addr] = nil +} + +func (t *genTester) addSnapAccount(acckey string, acc *types.StateAccount) { + key := hashData([]byte(acckey)) + rawdb.WriteAccountSnapshot(t.diskdb, key, types.SlimAccountRLP(*acc)) +} + +func (t *genTester) addAccount(acckey string, acc *types.StateAccount) { + t.addTrieAccount(acckey, acc) + t.addSnapAccount(acckey, acc) +} + +func (t *genTester) addSnapStorage(accKey string, keys []string, vals []string) { + accHash := hashData([]byte(accKey)) + for i, key := range keys { + rawdb.WriteStorageSnapshot(t.diskdb, accHash, hashData([]byte(key)), []byte(vals[i])) + } +} + +func (t *genTester) makeStorageTrie(accKey string, keys []string, vals []string, commit bool) common.Hash { + var ( + owner = hashData([]byte(accKey)) + addr = common.BytesToAddress([]byte(accKey)) + id = trie.StorageTrieID(types.EmptyRootHash, owner, types.EmptyRootHash) + tr, _ = trie.New(id, t.db) + + storages = make(map[common.Hash][]byte) + storageOrigins = make(map[common.Hash][]byte) + ) + for i, k := range keys { + key := hashData([]byte(k)) + tr.MustUpdate(key.Bytes(), []byte(vals[i])) + storages[key] = []byte(vals[i]) + storageOrigins[key] = nil + } + if !commit { + return tr.Hash() + } + root, nodes := tr.Commit(false) + if nodes != nil { + t.nodes.Merge(nodes) + } + t.states.storageData[owner] = storages + t.states.storageOrigin[addr] = storageOrigins + return root +} + +func (t *genTester) Commit() common.Hash { + root, nodes := t.acctTrie.Commit(true) + if nodes != nil { + t.nodes.Merge(nodes) + } + t.db.Update(root, types.EmptyRootHash, 0, t.nodes, t.states) + t.db.Commit(root, false) + return root +} + +func (t *genTester) CommitAndGenerate() (common.Hash, *diskLayer) { + root := t.Commit() + dl := generateSnapshot(t.db, root) + return root, dl +} + +// Tests that snapshot generation from an empty database. +func TestGeneration(t *testing.T) { + helper := newGenTester() + stRoot := helper.makeStorageTrie("", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, false) + + helper.addTrieAccount("acc-1", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}) + helper.addTrieAccount("acc-2", &types.StateAccount{Balance: uint256.NewInt(2), Root: types.EmptyRootHash, CodeHash: types.EmptyCodeHash.Bytes()}) + helper.addTrieAccount("acc-3", &types.StateAccount{Balance: uint256.NewInt(3), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}) + + helper.makeStorageTrie("acc-1", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true) + helper.makeStorageTrie("acc-3", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true) + + root, dl := helper.CommitAndGenerate() + if have, want := root, common.HexToHash("0xe3712f1a226f3782caca78ca770ccc19ee000552813a9f59d479f8611db9b1fd"); have != want { + t.Fatalf("have %#x want %#x", have, want) + } + select { + case <-dl.generator.done: + // Snapshot generation succeeded + case <-time.After(3 * time.Second): + t.Errorf("Snapshot generation failed") + } + //checkSnapRoot(t, snap, root) + + // Signal abortion to the generator and wait for it to tear down + dl.generator.stop() +} + +// Tests that snapshot generation with existent flat state, where the flat state +// contains some errors: +// - the contract with empty storage root but has storage entries in the disk +// - the contract with non empty storage root but empty storage slots +// - the contract(non-empty storage) misses some storage slots +// - miss in the beginning +// - miss in the middle +// - miss in the end +// +// - the contract(non-empty storage) has wrong storage slots +// - wrong slots in the beginning +// - wrong slots in the middle +// - wrong slots in the end +// +// - the contract(non-empty storage) has extra storage slots +// - extra slots in the beginning +// - extra slots in the middle +// - extra slots in the end +func TestGenerateExistentStateWithWrongStorage(t *testing.T) { + helper := newGenTester() + + // Account one, empty storage trie root but non-empty flat states + helper.addAccount("acc-1", &types.StateAccount{Balance: uint256.NewInt(1), Root: types.EmptyRootHash, CodeHash: types.EmptyCodeHash.Bytes()}) + helper.addSnapStorage("acc-1", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}) + + // Account two, non-empty storage trie root but empty flat states + stRoot := helper.makeStorageTrie("acc-2", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true) + helper.addAccount("acc-2", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}) + + // Miss slots + { + // Account three, non-empty root but misses slots in the beginning + helper.makeStorageTrie("acc-3", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true) + helper.addAccount("acc-3", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}) + helper.addSnapStorage("acc-3", []string{"key-2", "key-3"}, []string{"val-2", "val-3"}) + + // Account four, non-empty root but misses slots in the middle + helper.makeStorageTrie("acc-4", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true) + helper.addAccount("acc-4", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}) + helper.addSnapStorage("acc-4", []string{"key-1", "key-3"}, []string{"val-1", "val-3"}) + + // Account five, non-empty root but misses slots in the end + helper.makeStorageTrie("acc-5", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true) + helper.addAccount("acc-5", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}) + helper.addSnapStorage("acc-5", []string{"key-1", "key-2"}, []string{"val-1", "val-2"}) + } + + // Wrong storage slots + { + // Account six, non-empty root but wrong slots in the beginning + helper.makeStorageTrie("acc-6", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true) + helper.addAccount("acc-6", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}) + helper.addSnapStorage("acc-6", []string{"key-1", "key-2", "key-3"}, []string{"badval-1", "val-2", "val-3"}) + + // Account seven, non-empty root but wrong slots in the middle + helper.makeStorageTrie("acc-7", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true) + helper.addAccount("acc-7", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}) + helper.addSnapStorage("acc-7", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "badval-2", "val-3"}) + + // Account eight, non-empty root but wrong slots in the end + helper.makeStorageTrie("acc-8", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true) + helper.addAccount("acc-8", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}) + helper.addSnapStorage("acc-8", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "badval-3"}) + + // Account 9, non-empty root but rotated slots + helper.makeStorageTrie("acc-9", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true) + helper.addAccount("acc-9", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}) + helper.addSnapStorage("acc-9", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-3", "val-2"}) + } + + // Extra storage slots + { + // Account 10, non-empty root but extra slots in the beginning + helper.makeStorageTrie("acc-10", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true) + helper.addAccount("acc-10", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}) + helper.addSnapStorage("acc-10", []string{"key-0", "key-1", "key-2", "key-3"}, []string{"val-0", "val-1", "val-2", "val-3"}) + + // Account 11, non-empty root but extra slots in the middle + helper.makeStorageTrie("acc-11", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true) + helper.addAccount("acc-11", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}) + helper.addSnapStorage("acc-11", []string{"key-1", "key-2", "key-2-1", "key-3"}, []string{"val-1", "val-2", "val-2-1", "val-3"}) + + // Account 12, non-empty root but extra slots in the end + helper.makeStorageTrie("acc-12", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true) + helper.addAccount("acc-12", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}) + helper.addSnapStorage("acc-12", []string{"key-1", "key-2", "key-3", "key-4"}, []string{"val-1", "val-2", "val-3", "val-4"}) + } + + root, dl := helper.CommitAndGenerate() + t.Logf("Root: %#x\n", root) // Root = 0x8746cce9fd9c658b2cfd639878ed6584b7a2b3e73bb40f607fcfa156002429a0 + + select { + case <-dl.generator.done: + // Snapshot generation succeeded + + case <-time.After(3 * time.Second): + t.Errorf("Snapshot generation failed") + } + //checkSnapRoot(t, snap, root) + + // Signal abortion to the generator and wait for it to tear down + dl.generator.stop() +} + +// Tests that snapshot generation with existent flat state, where the flat state +// contains some errors: +// - miss accounts +// - wrong accounts +// - extra accounts +func TestGenerateExistentStateWithWrongAccounts(t *testing.T) { + helper := newGenTester() + + // Trie accounts [acc-1, acc-2, acc-3, acc-4, acc-6] + helper.makeStorageTrie("acc-1", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true) + helper.makeStorageTrie("acc-2", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true) + helper.makeStorageTrie("acc-3", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true) + helper.makeStorageTrie("acc-4", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true) + stRoot := helper.makeStorageTrie("acc-6", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true) + + // Missing accounts, only in the trie + { + helper.addTrieAccount("acc-1", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}) // Beginning + helper.addTrieAccount("acc-4", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}) // Middle + helper.addTrieAccount("acc-6", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}) // End + } + + // Wrong accounts + { + helper.addTrieAccount("acc-2", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}) + helper.addSnapAccount("acc-2", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: common.Hex2Bytes("0x1234")}) + + helper.addTrieAccount("acc-3", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}) + helper.addSnapAccount("acc-3", &types.StateAccount{Balance: uint256.NewInt(1), Root: types.EmptyRootHash, CodeHash: types.EmptyCodeHash.Bytes()}) + } + + // Extra accounts, only in the snap + { + helper.addSnapAccount("acc-0", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}) // before the beginning + helper.addSnapAccount("acc-5", &types.StateAccount{Balance: uint256.NewInt(1), Root: types.EmptyRootHash, CodeHash: common.Hex2Bytes("0x1234")}) // Middle + helper.addSnapAccount("acc-7", &types.StateAccount{Balance: uint256.NewInt(1), Root: types.EmptyRootHash, CodeHash: types.EmptyCodeHash.Bytes()}) // after the end + } + + root, dl := helper.CommitAndGenerate() + t.Logf("Root: %#x\n", root) // Root = 0x825891472281463511e7ebcc7f109e4f9200c20fa384754e11fd605cd98464e8 + + select { + case <-dl.generator.done: + // Snapshot generation succeeded + + case <-time.After(3 * time.Second): + t.Errorf("Snapshot generation failed") + } + //checkSnapRoot(t, snap, root) + + // Signal abortion to the generator and wait for it to tear down + dl.generator.stop() +} + +func TestGenerateCorruptAccountTrie(t *testing.T) { + helper := newGenTester() + helper.addTrieAccount("acc-1", &types.StateAccount{Balance: uint256.NewInt(1), Root: types.EmptyRootHash, CodeHash: types.EmptyCodeHash.Bytes()}) // 0xc7a30f39aff471c95d8a837497ad0e49b65be475cc0953540f80cfcdbdcd9074 + helper.addTrieAccount("acc-2", &types.StateAccount{Balance: uint256.NewInt(2), Root: types.EmptyRootHash, CodeHash: types.EmptyCodeHash.Bytes()}) // 0x65145f923027566669a1ae5ccac66f945b55ff6eaeb17d2ea8e048b7d381f2d7 + helper.addTrieAccount("acc-3", &types.StateAccount{Balance: uint256.NewInt(3), Root: types.EmptyRootHash, CodeHash: types.EmptyCodeHash.Bytes()}) // 0x19ead688e907b0fab07176120dceec244a72aff2f0aa51e8b827584e378772f4 + + root := helper.Commit() // Root: 0xa04693ea110a31037fb5ee814308a6f1d76bdab0b11676bdf4541d2de55ba978 + + // Delete an account trie node and ensure the generator chokes + path := []byte{0xc} + rawdb.HasAccountTrieNode(helper.diskdb, path) + rawdb.DeleteAccountTrieNode(helper.diskdb, path) + helper.db.tree.bottom().resetCache() + + dl := generateSnapshot(helper.db, root) + select { + case <-dl.generator.done: + // Snapshot generation succeeded + t.Errorf("Snapshot generated against corrupt account trie") + + case <-time.After(time.Second): + // Not generated fast enough, hopefully blocked inside on missing trie node fail + } + // Signal abortion to the generator and wait for it to tear down + dl.generator.stop() +} + +func TestGenerateMissingStorageTrie(t *testing.T) { + var ( + acc1 = hashData([]byte("acc-1")) + acc3 = hashData([]byte("acc-3")) + helper = newGenTester() + ) + stRoot := helper.makeStorageTrie("acc-1", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true) // 0xddefcd9376dd029653ef384bd2f0a126bb755fe84fdcc9e7cf421ba454f2bc67 + helper.addTrieAccount("acc-1", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}) // 0x9250573b9c18c664139f3b6a7a8081b7d8f8916a8fcc5d94feec6c29f5fd4e9e + helper.addTrieAccount("acc-2", &types.StateAccount{Balance: uint256.NewInt(2), Root: types.EmptyRootHash, CodeHash: types.EmptyCodeHash.Bytes()}) // 0x65145f923027566669a1ae5ccac66f945b55ff6eaeb17d2ea8e048b7d381f2d7 + stRoot = helper.makeStorageTrie("acc-3", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true) + helper.addTrieAccount("acc-3", &types.StateAccount{Balance: uint256.NewInt(3), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}) // 0x50815097425d000edfc8b3a4a13e175fc2bdcfee8bdfbf2d1ff61041d3c235b2 + + root := helper.Commit() + + // Delete storage trie root of account one and three. + rawdb.DeleteStorageTrieNode(helper.diskdb, acc1, nil) + rawdb.DeleteStorageTrieNode(helper.diskdb, acc3, nil) + helper.db.tree.bottom().resetCache() + + dl := generateSnapshot(helper.db, root) + select { + case <-dl.generator.done: + // Snapshot generation succeeded + t.Errorf("Snapshot generated against corrupt storage trie") + + case <-time.After(time.Second): + // Not generated fast enough, hopefully blocked inside on missing trie node fail + } + // Signal abortion to the generator and wait for it to tear down + dl.generator.stop() +} + +func TestGenerateCorruptStorageTrie(t *testing.T) { + helper := newGenTester() + + stRoot := helper.makeStorageTrie("acc-1", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true) // 0xddefcd9376dd029653ef384bd2f0a126bb755fe84fdcc9e7cf421ba454f2bc67 + helper.addTrieAccount("acc-1", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}) // 0x9250573b9c18c664139f3b6a7a8081b7d8f8916a8fcc5d94feec6c29f5fd4e9e + helper.addTrieAccount("acc-2", &types.StateAccount{Balance: uint256.NewInt(2), Root: types.EmptyRootHash, CodeHash: types.EmptyCodeHash.Bytes()}) // 0x65145f923027566669a1ae5ccac66f945b55ff6eaeb17d2ea8e048b7d381f2d7 + stRoot = helper.makeStorageTrie("acc-3", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true) + helper.addTrieAccount("acc-3", &types.StateAccount{Balance: uint256.NewInt(3), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}) // 0x50815097425d000edfc8b3a4a13e175fc2bdcfee8bdfbf2d1ff61041d3c235b2 + + root := helper.Commit() + + // Delete a node in the storage trie. + rawdb.DeleteStorageTrieNode(helper.diskdb, hashData([]byte("acc-1")), []byte{0x4}) + rawdb.DeleteStorageTrieNode(helper.diskdb, hashData([]byte("acc-3")), []byte{0x4}) + helper.db.tree.bottom().resetCache() + + dl := generateSnapshot(helper.db, root) + select { + case <-dl.generator.done: + // Snapshot generation succeeded + t.Errorf("Snapshot generated against corrupt storage trie") + + case <-time.After(time.Second): + // Not generated fast enough, hopefully blocked inside on missing trie node fail + } + // Signal abortion to the generator and wait for it to tear down + dl.generator.stop() +} + +func TestGenerateWithExtraAccounts(t *testing.T) { + helper := newGenTester() + + // Account one in the trie + stRoot := helper.makeStorageTrie("acc-1", + []string{"key-1", "key-2", "key-3", "key-4", "key-5"}, + []string{"val-1", "val-2", "val-3", "val-4", "val-5"}, + true, + ) + acc := &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()} + val, _ := rlp.EncodeToBytes(acc) + helper.acctTrie.MustUpdate(hashData([]byte("acc-1")).Bytes(), val) // 0x9250573b9c18c664139f3b6a7a8081b7d8f8916a8fcc5d94feec6c29f5fd4e9e + + // Identical in the snap + key := hashData([]byte("acc-1")) + rawdb.WriteAccountSnapshot(helper.diskdb, key, val) + rawdb.WriteStorageSnapshot(helper.diskdb, key, hashData([]byte("key-1")), []byte("val-1")) + rawdb.WriteStorageSnapshot(helper.diskdb, key, hashData([]byte("key-2")), []byte("val-2")) + rawdb.WriteStorageSnapshot(helper.diskdb, key, hashData([]byte("key-3")), []byte("val-3")) + rawdb.WriteStorageSnapshot(helper.diskdb, key, hashData([]byte("key-4")), []byte("val-4")) + rawdb.WriteStorageSnapshot(helper.diskdb, key, hashData([]byte("key-5")), []byte("val-5")) + + // Account two exists only in the snapshot + stRoot = helper.makeStorageTrie("acc-2", + []string{"key-1", "key-2", "key-3", "key-4", "key-5"}, + []string{"val-1", "val-2", "val-3", "val-4", "val-5"}, + true, + ) + acc = &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()} + val, _ = rlp.EncodeToBytes(acc) + key = hashData([]byte("acc-2")) + rawdb.WriteAccountSnapshot(helper.diskdb, key, val) + rawdb.WriteStorageSnapshot(helper.diskdb, key, hashData([]byte("b-key-1")), []byte("b-val-1")) + rawdb.WriteStorageSnapshot(helper.diskdb, key, hashData([]byte("b-key-2")), []byte("b-val-2")) + rawdb.WriteStorageSnapshot(helper.diskdb, key, hashData([]byte("b-key-3")), []byte("b-val-3")) + + root := helper.Commit() + + // To verify the test: If we now inspect the snap db, there should exist extraneous storage items + if data := rawdb.ReadStorageSnapshot(helper.diskdb, hashData([]byte("acc-2")), hashData([]byte("b-key-1"))); data == nil { + t.Fatalf("expected snap storage to exist") + } + dl := generateSnapshot(helper.db, root) + select { + case <-dl.generator.done: + // Snapshot generation succeeded + + case <-time.After(3 * time.Second): + t.Errorf("Snapshot generation failed") + } + //checkSnapRoot(t, snap, root) + + // Signal abortion to the generator and wait for it to tear down + dl.generator.stop() + + // If we now inspect the snap db, there should exist no extraneous storage items + if data := rawdb.ReadStorageSnapshot(helper.diskdb, hashData([]byte("acc-2")), hashData([]byte("b-key-1"))); data != nil { + t.Fatalf("expected slot to be removed, got %v", string(data)) + } +} + +func TestGenerateWithManyExtraAccounts(t *testing.T) { + helper := newGenTester() + + // Account one in the trie + stRoot := helper.makeStorageTrie("acc-1", + []string{"key-1", "key-2", "key-3"}, + []string{"val-1", "val-2", "val-3"}, + true, + ) + acc := &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()} + val, _ := rlp.EncodeToBytes(acc) + helper.acctTrie.MustUpdate(hashData([]byte("acc-1")).Bytes(), val) // 0x9250573b9c18c664139f3b6a7a8081b7d8f8916a8fcc5d94feec6c29f5fd4e9e + + // Identical in the snap + key := hashData([]byte("acc-1")) + rawdb.WriteAccountSnapshot(helper.diskdb, key, val) + rawdb.WriteStorageSnapshot(helper.diskdb, key, hashData([]byte("key-1")), []byte("val-1")) + rawdb.WriteStorageSnapshot(helper.diskdb, key, hashData([]byte("key-2")), []byte("val-2")) + rawdb.WriteStorageSnapshot(helper.diskdb, key, hashData([]byte("key-3")), []byte("val-3")) + + // 100 accounts exist only in snapshot + for i := 0; i < 1000; i++ { + acc := &types.StateAccount{Balance: uint256.NewInt(uint64(i)), Root: types.EmptyRootHash, CodeHash: types.EmptyCodeHash.Bytes()} + val, _ := rlp.EncodeToBytes(acc) + key := hashData([]byte(fmt.Sprintf("acc-%d", i))) + rawdb.WriteAccountSnapshot(helper.diskdb, key, val) + } + + _, dl := helper.CommitAndGenerate() + select { + case <-dl.generator.done: + // Snapshot generation succeeded + + case <-time.After(3 * time.Second): + t.Errorf("Snapshot generation failed") + } + //checkSnapRoot(t, snap, root) + // Signal abortion to the generator and wait for it to tear down + dl.generator.stop() +} + +func TestGenerateWithExtraBeforeAndAfter(t *testing.T) { + helper := newGenTester() + + acc := &types.StateAccount{Balance: uint256.NewInt(1), Root: types.EmptyRootHash, CodeHash: types.EmptyCodeHash.Bytes()} + val, _ := rlp.EncodeToBytes(acc) + + acctHashA := hashData([]byte("acc-1")) + acctHashB := hashData([]byte("acc-2")) + + helper.acctTrie.MustUpdate(acctHashA.Bytes(), val) + helper.acctTrie.MustUpdate(acctHashB.Bytes(), val) + + rawdb.WriteAccountSnapshot(helper.diskdb, acctHashA, val) + rawdb.WriteAccountSnapshot(helper.diskdb, acctHashB, val) + + for i := 0; i < 16; i++ { + rawdb.WriteAccountSnapshot(helper.diskdb, common.Hash{byte(i)}, val) + } + _, dl := helper.CommitAndGenerate() + select { + case <-dl.generator.done: + // Snapshot generation succeeded + + case <-time.After(3 * time.Second): + t.Errorf("Snapshot generation failed") + } + //checkSnapRoot(t, snap, root) + // Signal abortion to the generator and wait for it to tear down + dl.generator.stop() +} + +func TestGenerateWithMalformedStateData(t *testing.T) { + helper := newGenTester() + + acctHash := hashData([]byte("acc")) + acc := &types.StateAccount{Balance: uint256.NewInt(1), Root: types.EmptyRootHash, CodeHash: types.EmptyCodeHash.Bytes()} + val, _ := rlp.EncodeToBytes(acc) + helper.acctTrie.MustUpdate(acctHash.Bytes(), val) + + junk := make([]byte, 100) + copy(junk, []byte{0xde, 0xad}) + rawdb.WriteAccountSnapshot(helper.diskdb, acctHash, junk) + for i := 0; i < 16; i++ { + rawdb.WriteAccountSnapshot(helper.diskdb, common.Hash{byte(i)}, junk) + } + + _, dl := helper.CommitAndGenerate() + select { + case <-dl.generator.done: + // Snapshot generation succeeded + + case <-time.After(3 * time.Second): + t.Errorf("Snapshot generation failed") + } + //checkSnapRoot(t, snap, root) + + // Signal abortion to the generator and wait for it to tear down + dl.generator.stop() +} + +func TestGenerateFromEmptySnap(t *testing.T) { + helper := newGenTester() + + for i := 0; i < 400; i++ { + stRoot := helper.makeStorageTrie(fmt.Sprintf("acc-%d", i), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true) + helper.addTrieAccount(fmt.Sprintf("acc-%d", i), &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}) + } + root, snap := helper.CommitAndGenerate() + t.Logf("Root: %#x\n", root) // Root: 0x6f7af6d2e1a1bf2b84a3beb3f8b64388465fbc1e274ca5d5d3fc787ca78f59e4 + + select { + case <-snap.generator.done: + // Snapshot generation succeeded + + case <-time.After(3 * time.Second): + t.Errorf("Snapshot generation failed") + } + //checkSnapRoot(t, snap, root) + // Signal abortion to the generator and wait for it to tear down + snap.generator.stop() +} + +func TestGenerateWithIncompleteStorage(t *testing.T) { + helper := newGenTester() + stKeys := []string{"1", "2", "3", "4", "5", "6", "7", "8"} + stVals := []string{"v1", "v2", "v3", "v4", "v5", "v6", "v7", "v8"} + + // We add 8 accounts, each one is missing exactly one of the storage slots. This means + // we don't have to order the keys and figure out exactly which hash-key winds up + // on the sensitive spots at the boundaries + for i := 0; i < 8; i++ { + accKey := fmt.Sprintf("acc-%d", i) + stRoot := helper.makeStorageTrie(accKey, stKeys, stVals, true) + helper.addAccount(accKey, &types.StateAccount{Balance: uint256.NewInt(uint64(i)), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}) + var moddedKeys []string + var moddedVals []string + for ii := 0; ii < 8; ii++ { + if ii != i { + moddedKeys = append(moddedKeys, stKeys[ii]) + moddedVals = append(moddedVals, stVals[ii]) + } + } + helper.addSnapStorage(accKey, moddedKeys, moddedVals) + } + root, dl := helper.CommitAndGenerate() + t.Logf("Root: %#x\n", root) // Root: 0xca73f6f05ba4ca3024ef340ef3dfca8fdabc1b677ff13f5a9571fd49c16e67ff + + select { + case <-dl.generator.done: + // Snapshot generation succeeded + + case <-time.After(3 * time.Second): + t.Errorf("Snapshot generation failed") + } + //checkSnapRoot(t, snap, root) + // Signal abortion to the generator and wait for it to tear down + dl.generator.stop() +} + +func incKey(key []byte) []byte { + for i := len(key) - 1; i >= 0; i-- { + key[i]++ + if key[i] != 0x0 { + break + } + } + return key +} + +func decKey(key []byte) []byte { + for i := len(key) - 1; i >= 0; i-- { + key[i]-- + if key[i] != 0xff { + break + } + } + return key +} + +func populateDangling(disk ethdb.KeyValueStore) { + populate := func(accountHash common.Hash, keys []string, vals []string) { + for i, key := range keys { + rawdb.WriteStorageSnapshot(disk, accountHash, hashData([]byte(key)), []byte(vals[i])) + } + } + // Dangling storages of the "first" account + populate(common.Hash{}, []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}) + + // Dangling storages of the "last" account + populate(common.HexToHash("ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}) + + // Dangling storages around the account 1 + hash := decKey(hashData([]byte("acc-1")).Bytes()) + populate(common.BytesToHash(hash), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}) + hash = incKey(hashData([]byte("acc-1")).Bytes()) + populate(common.BytesToHash(hash), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}) + + // Dangling storages around the account 2 + hash = decKey(hashData([]byte("acc-2")).Bytes()) + populate(common.BytesToHash(hash), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}) + hash = incKey(hashData([]byte("acc-2")).Bytes()) + populate(common.BytesToHash(hash), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}) + + // Dangling storages around the account 3 + hash = decKey(hashData([]byte("acc-3")).Bytes()) + populate(common.BytesToHash(hash), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}) + hash = incKey(hashData([]byte("acc-3")).Bytes()) + populate(common.BytesToHash(hash), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}) + + // Dangling storages of the random account + populate(testrand.Hash(), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}) + populate(testrand.Hash(), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}) + populate(testrand.Hash(), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}) +} + +func TestGenerateCompleteSnapshotWithDanglingStorage(t *testing.T) { + var helper = newGenTester() + + stRoot := helper.makeStorageTrie("acc-1", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true) + helper.addAccount("acc-1", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}) + helper.addAccount("acc-2", &types.StateAccount{Balance: uint256.NewInt(1), Root: types.EmptyRootHash, CodeHash: types.EmptyCodeHash.Bytes()}) + + helper.makeStorageTrie("acc-3", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true) + helper.addAccount("acc-3", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}) + + helper.addSnapStorage("acc-1", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}) + helper.addSnapStorage("acc-3", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}) + + populateDangling(helper.diskdb) + + _, dl := helper.CommitAndGenerate() + select { + case <-dl.generator.done: + // Snapshot generation succeeded + + case <-time.After(3 * time.Second): + t.Errorf("Snapshot generation failed") + } + //checkSnapRoot(t, snap, root) + + // Signal abortion to the generator and wait for it to tear down + dl.generator.stop() +} + +func TestGenerateBrokenSnapshotWithDanglingStorage(t *testing.T) { + var helper = newGenTester() + + stRoot := helper.makeStorageTrie("acc-1", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true) + helper.addTrieAccount("acc-1", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}) + helper.addTrieAccount("acc-2", &types.StateAccount{Balance: uint256.NewInt(2), Root: types.EmptyRootHash, CodeHash: types.EmptyCodeHash.Bytes()}) + + helper.makeStorageTrie("acc-3", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true) + helper.addTrieAccount("acc-3", &types.StateAccount{Balance: uint256.NewInt(3), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}) + + populateDangling(helper.diskdb) + + _, dl := helper.CommitAndGenerate() + select { + case <-dl.generator.done: + // Snapshot generation succeeded + + case <-time.After(3 * time.Second): + t.Errorf("Snapshot generation failed") + } + //checkSnapRoot(t, snap, root) + + // Signal abortion to the generator and wait for it to tear down + dl.generator.stop() +} diff --git a/triedb/pathdb/iterator_test.go b/triedb/pathdb/iterator_test.go index eccee65623e1..68e312152a8b 100644 --- a/triedb/pathdb/iterator_test.go +++ b/triedb/pathdb/iterator_test.go @@ -139,15 +139,12 @@ func TestAccountIteratorBasics(t *testing.T) { it := newDiffAccountIterator(common.Hash{}, states, nil) verifyIterator(t, 100, it, verifyNothing) // Nil is allowed for single layer iterator - // TODO reenable these tests once the persistent state iteration - // is implemented. - - //db := rawdb.NewMemoryDatabase() - //batch := db.NewBatch() - //states.write(db, batch, nil, nil) - //batch.Write() - //it = newDiskAccountIterator(db, common.Hash{}) - //verifyIterator(t, 100, it, verifyNothing) // Nil is allowed for single layer iterator + db := rawdb.NewMemoryDatabase() + batch := db.NewBatch() + states.write(db, batch, nil, nil) + batch.Write() + it = newDiskAccountIterator(db, common.Hash{}) + verifyIterator(t, 100, it, verifyNothing) // Nil is allowed for single layer iterator } // TestStorageIteratorBasics tests some simple single-layer(diff and disk) iteration for storage @@ -182,17 +179,14 @@ func TestStorageIteratorBasics(t *testing.T) { verifyIterator(t, 100, it, verifyNothing) // Nil is allowed for single layer iterator } - // TODO reenable these tests once the persistent state iteration - // is implemented. - - //db := rawdb.NewMemoryDatabase() - //batch := db.NewBatch() - //states.write(db, batch, nil, nil) - //batch.Write() - //for account := range accounts { - // it := newDiskStorageIterator(db, account, common.Hash{}) - // verifyIterator(t, 100-nilStorage[account], it, verifyNothing) // Nil is allowed for single layer iterator - //} + db := rawdb.NewMemoryDatabase() + batch := db.NewBatch() + states.write(db, batch, nil, nil) + batch.Write() + for account := range accounts { + it := newDiskStorageIterator(db, account, common.Hash{}) + verifyIterator(t, 100-nilStorage[account], it, verifyNothing) // Nil is allowed for single layer iterator + } } type testIterator struct { @@ -268,7 +262,7 @@ func TestAccountIteratorTraversal(t *testing.T) { WriteBufferSize: 0, } db := New(rawdb.NewMemoryDatabase(), config, false) - // db.WaitGeneration() + db.WaitGeneration() // Stack three diff layers on top with various overlaps db.Update(common.HexToHash("0x02"), types.EmptyRootHash, 0, trienode.NewMergedNodeSet(), @@ -291,19 +285,16 @@ func TestAccountIteratorTraversal(t *testing.T) { verifyIterator(t, 7, it, verifyAccount) it.Release() - // TODO reenable these tests once the persistent state iteration - // is implemented. - // Test after persist some bottom-most layers into the disk, // the functionalities still work. - //db.tree.cap(common.HexToHash("0x04"), 2) + db.tree.cap(common.HexToHash("0x04"), 2) - //head = db.tree.get(common.HexToHash("0x04")) - //verifyIterator(t, 7, head.(*diffLayer).newBinaryAccountIterator(), verifyAccount) - // - //it, _ = db.AccountIterator(common.HexToHash("0x04"), common.Hash{}) - //verifyIterator(t, 7, it, verifyAccount) - //it.Release() + head = db.tree.get(common.HexToHash("0x04")) + verifyIterator(t, 7, head.(*diffLayer).newBinaryAccountIterator(), verifyAccount) + + it, _ = db.AccountIterator(common.HexToHash("0x04"), common.Hash{}) + verifyIterator(t, 7, it, verifyAccount) + it.Release() } func TestStorageIteratorTraversal(t *testing.T) { @@ -311,7 +302,7 @@ func TestStorageIteratorTraversal(t *testing.T) { WriteBufferSize: 0, } db := New(rawdb.NewMemoryDatabase(), config, false) - // db.WaitGeneration() + db.WaitGeneration() // Stack three diff layers on top with various overlaps db.Update(common.HexToHash("0x02"), types.EmptyRootHash, 0, trienode.NewMergedNodeSet(), @@ -334,17 +325,14 @@ func TestStorageIteratorTraversal(t *testing.T) { verifyIterator(t, 6, it, verifyStorage) it.Release() - // TODO reenable these tests once the persistent state iteration - // is implemented. - // Test after persist some bottom-most layers into the disk, // the functionalities still work. - //db.tree.cap(common.HexToHash("0x04"), 2) - //verifyIterator(t, 6, head.(*diffLayer).newBinaryStorageIterator(common.HexToHash("0xaa")), verifyStorage) - // - //it, _ = db.StorageIterator(common.HexToHash("0x04"), common.HexToHash("0xaa"), common.Hash{}) - //verifyIterator(t, 6, it, verifyStorage) - //it.Release() + db.tree.cap(common.HexToHash("0x04"), 2) + verifyIterator(t, 6, head.(*diffLayer).newBinaryStorageIterator(common.HexToHash("0xaa")), verifyStorage) + + it, _ = db.StorageIterator(common.HexToHash("0x04"), common.HexToHash("0xaa"), common.Hash{}) + verifyIterator(t, 6, it, verifyStorage) + it.Release() } // TestAccountIteratorTraversalValues tests some multi-layer iteration, where we @@ -587,7 +575,7 @@ func TestAccountIteratorFlattening(t *testing.T) { WriteBufferSize: 0, } db := New(rawdb.NewMemoryDatabase(), config, false) - // db.WaitGeneration() + db.WaitGeneration() // Create a stack of diffs on top db.Update(common.HexToHash("0x02"), types.EmptyRootHash, 1, trienode.NewMergedNodeSet(), @@ -614,7 +602,7 @@ func TestAccountIteratorSeek(t *testing.T) { WriteBufferSize: 0, } db := New(rawdb.NewMemoryDatabase(), config, false) - // db.WaitGeneration() + db.WaitGeneration() db.Update(common.HexToHash("0x02"), types.EmptyRootHash, 1, trienode.NewMergedNodeSet(), NewStateSetWithOrigin(nil, randomAccountSet("0xaa", "0xee", "0xff", "0xf0"), nil, nil, nil)) @@ -672,7 +660,7 @@ func TestStorageIteratorSeek(t *testing.T) { WriteBufferSize: 0, } db := New(rawdb.NewMemoryDatabase(), config, false) - // db.WaitGeneration() + db.WaitGeneration() // Stack three diff layers on top with various overlaps db.Update(common.HexToHash("0x02"), types.EmptyRootHash, 1, trienode.NewMergedNodeSet(), @@ -730,7 +718,7 @@ func TestAccountIteratorDeletions(t *testing.T) { WriteBufferSize: 0, } db := New(rawdb.NewMemoryDatabase(), config, false) - // db.WaitGeneration() + db.WaitGeneration() // Stack three diff layers on top with various overlaps db.Update(common.HexToHash("0x02"), types.EmptyRootHash, 1, trienode.NewMergedNodeSet(), @@ -771,7 +759,7 @@ func TestStorageIteratorDeletions(t *testing.T) { WriteBufferSize: 0, } db := New(rawdb.NewMemoryDatabase(), config, false) - // db.WaitGeneration() + db.WaitGeneration() // Stack three diff layers on top with various overlaps db.Update(common.HexToHash("0x02"), types.EmptyRootHash, 1, trienode.NewMergedNodeSet(), diff --git a/triedb/pathdb/journal.go b/triedb/pathdb/journal.go index 779a262fdd05..a0e8208aa1d2 100644 --- a/triedb/pathdb/journal.go +++ b/triedb/pathdb/journal.go @@ -27,6 +27,7 @@ import ( "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rlp" ) @@ -90,6 +91,50 @@ func (db *Database) loadJournal(diskRoot common.Hash) (layer, error) { return head, nil } +// journalGenerator is a disk layer entry containing the generator progress marker. +type journalGenerator struct { + // Indicator that whether the database was in progress of being wiped. + // It's deprecated but keep it here for background compatibility. + Wiping bool + + Done bool // Whether the generator finished creating the snapshot + Marker []byte + Accounts uint64 + Slots uint64 + Storage uint64 +} + +// loadGenerator loads the state generation progress marker from the database. +func loadGenerator(db ethdb.KeyValueReader) (*journalGenerator, common.Hash) { + trieRoot := types.EmptyRootHash + if blob := rawdb.ReadAccountTrieNode(db, nil); len(blob) > 0 { + trieRoot = crypto.Keccak256Hash(blob) + } + // State generation progress marker is lost, rebuild it + blob := rawdb.ReadSnapshotGenerator(db) + if len(blob) == 0 { + log.Info("State snapshot generator is not found") + return nil, trieRoot + } + // State generation progress marker is not compatible, rebuild it + var generator journalGenerator + if err := rlp.DecodeBytes(blob, &generator); err != nil { + log.Info("State snapshot generator is not compatible") + return nil, trieRoot + } + // State snapshot is not consistent with the trie data, rebuild it + stateRoot := rawdb.ReadSnapshotRoot(db) + if trieRoot != stateRoot { + log.Info("State snapshot is not consistent with trie data", "trie", trieRoot, "state", stateRoot) + return nil, trieRoot + } + // Slice null-ness is lost after rlp decoding, reset it back to empty + if !generator.Done && generator.Marker == nil { + generator.Marker = []byte{} + } + return &generator, trieRoot +} + // loadLayers loads a pre-existing state layer backed by a key-value store. func (db *Database) loadLayers() layer { // Retrieve the root node of persistent state. @@ -109,7 +154,7 @@ func (db *Database) loadLayers() layer { log.Info("Failed to load journal, discard it", "err", err) } // Return single layer with persistent state. - return newDiskLayer(root, rawdb.ReadPersistentStateID(db.diskdb), db, nil, newBuffer(db.config.WriteBufferSize, nil, nil, 0)) + return newDiskLayer(root, rawdb.ReadPersistentStateID(db.diskdb), db, nil, nil, newBuffer(db.config.WriteBufferSize, nil, nil, 0)) } // loadDiskLayer reads the binary blob from the layer journal, reconstructing @@ -141,7 +186,7 @@ func (db *Database) loadDiskLayer(r *rlp.Stream) (layer, error) { if err := states.decode(r); err != nil { return nil, err } - return newDiskLayer(root, id, db, nil, newBuffer(db.config.WriteBufferSize, &nodes, &states, id-stored)), nil + return newDiskLayer(root, id, db, nil, nil, newBuffer(db.config.WriteBufferSize, &nodes, &states, id-stored)), nil } // loadDiffLayer reads the next sections of a layer journal, reconstructing a new @@ -248,6 +293,10 @@ func (db *Database) Journal(root common.Hash) error { } else { // disk layer only on noop runs (likely) or deep reorgs (unlikely) log.Info("Persisting dirty state to disk", "root", root, "layers", disk.buffer.layers) } + // Terminate the background state generation if it's active + if disk.generator != nil { + disk.generator.stop() + } start := time.Now() // Run the journaling diff --git a/triedb/pathdb/metrics.go b/triedb/pathdb/metrics.go index 6af45e079d3b..c597c390064e 100644 --- a/triedb/pathdb/metrics.go +++ b/triedb/pathdb/metrics.go @@ -24,10 +24,19 @@ var ( cleanNodeReadMeter = metrics.NewRegisteredMeter("pathdb/clean/node/read", nil) cleanNodeWriteMeter = metrics.NewRegisteredMeter("pathdb/clean/node/write", nil) - stateAccountMissMeter = metrics.NewRegisteredMeter("pathdb/state/account/miss/total", nil) - stateAccountHitMeter = metrics.NewRegisteredMeter("pathdb/state/account/hit/total", nil) - stateStorageMissMeter = metrics.NewRegisteredMeter("pathdb/state/storage/miss/total", nil) - stateStorageHitMeter = metrics.NewRegisteredMeter("pathdb/state/storage/hit/total", nil) + cleanStateHitMeter = metrics.NewRegisteredMeter("pathdb/clean/state/hit", nil) + cleanStateMissMeter = metrics.NewRegisteredMeter("pathdb/clean/state/miss", nil) + cleanStateReadMeter = metrics.NewRegisteredMeter("pathdb/clean/state/read", nil) + cleanStateWriteMeter = metrics.NewRegisteredMeter("pathdb/clean/state/write", nil) + + stateAccountDiskMissMeter = metrics.NewRegisteredMeter("pathdb/state/account/miss/disk", nil) + stateAccountMissMeter = metrics.NewRegisteredMeter("pathdb/state/account/miss/total", nil) + stateAccountDiskHitMeter = metrics.NewRegisteredMeter("pathdb/state/account/hit/disk", nil) + stateAccountHitMeter = metrics.NewRegisteredMeter("pathdb/state/account/hit/total", nil) + stateStorageDiskMissMeter = metrics.NewRegisteredMeter("pathdb/state/storage/miss/disk", nil) + stateStorageMissMeter = metrics.NewRegisteredMeter("pathdb/state/storage/miss/total", nil) + stateStorageDiskHitMeter = metrics.NewRegisteredMeter("pathdb/state/storage/hit/disk", nil) + stateStorageHitMeter = metrics.NewRegisteredMeter("pathdb/state/storage/hit/total", nil) dirtyNodeHitMeter = metrics.NewRegisteredMeter("pathdb/dirty/node/hit", nil) dirtyNodeMissMeter = metrics.NewRegisteredMeter("pathdb/dirty/node/miss", nil) @@ -46,9 +55,11 @@ var ( diskFalseMeter = metrics.NewRegisteredMeter("pathdb/disk/false", nil) diffFalseMeter = metrics.NewRegisteredMeter("pathdb/diff/false", nil) - commitTimeTimer = metrics.NewRegisteredTimer("pathdb/commit/time", nil) - commitNodesMeter = metrics.NewRegisteredMeter("pathdb/commit/nodes", nil) - commitBytesMeter = metrics.NewRegisteredMeter("pathdb/commit/bytes", nil) + commitTimeTimer = metrics.NewRegisteredTimer("pathdb/commit/time", nil) + commitNodesMeter = metrics.NewRegisteredMeter("pathdb/commit/nodes", nil) + commitAccountsMeter = metrics.NewRegisteredMeter("pathdb/commit/accounts", nil) + commitStoragesMeter = metrics.NewRegisteredMeter("pathdb/commit/slots", nil) + commitBytesMeter = metrics.NewRegisteredMeter("pathdb/commit/bytes", nil) gcTrieNodeMeter = metrics.NewRegisteredMeter("pathdb/gc/node/count", nil) gcTrieNodeBytesMeter = metrics.NewRegisteredMeter("pathdb/gc/node/bytes", nil) @@ -61,3 +72,28 @@ var ( historyDataBytesMeter = metrics.NewRegisteredMeter("pathdb/history/bytes/data", nil) historyIndexBytesMeter = metrics.NewRegisteredMeter("pathdb/history/bytes/index", nil) ) + +// Metrics in generation +var ( + generatedAccountMeter = metrics.NewRegisteredMeter("pathdb/generation/account/generated", nil) + recoveredAccountMeter = metrics.NewRegisteredMeter("pathdb/generation/account/recovered", nil) + wipedAccountMeter = metrics.NewRegisteredMeter("pathdb/generation/account/wiped", nil) + missallAccountMeter = metrics.NewRegisteredMeter("pathdb/generation/account/missall", nil) + generatedStorageMeter = metrics.NewRegisteredMeter("pathdb/generation/storage/generated", nil) + recoveredStorageMeter = metrics.NewRegisteredMeter("pathdb/generation/storage/recovered", nil) + wipedStorageMeter = metrics.NewRegisteredMeter("pathdb/generation/storage/wiped", nil) + missallStorageMeter = metrics.NewRegisteredMeter("pathdb/generation/storage/missall", nil) + danglingStorageMeter = metrics.NewRegisteredMeter("pathdb/generation/storage/dangling", nil) + successfulRangeProofMeter = metrics.NewRegisteredMeter("pathdb/generation/proof/success", nil) + failedRangeProofMeter = metrics.NewRegisteredMeter("pathdb/generation/proof/failure", nil) + + accountProveCounter = metrics.NewRegisteredCounter("pathdb/generation/duration/account/prove", nil) + accountTrieReadCounter = metrics.NewRegisteredCounter("pathdb/generation/duration/account/trieread", nil) + accountSnapReadCounter = metrics.NewRegisteredCounter("pathdb/generation/duration/account/snapread", nil) + accountWriteCounter = metrics.NewRegisteredCounter("pathdb/generation/duration/account/write", nil) + storageProveCounter = metrics.NewRegisteredCounter("pathdb/generation/duration/storage/prove", nil) + storageTrieReadCounter = metrics.NewRegisteredCounter("pathdb/generation/duration/storage/trieread", nil) + storageSnapReadCounter = metrics.NewRegisteredCounter("pathdb/generation/duration/storage/snapread", nil) + storageWriteCounter = metrics.NewRegisteredCounter("pathdb/generation/duration/storage/write", nil) + storageCleanCounter = metrics.NewRegisteredCounter("state/snapshot/generation/duration/storage/clean", nil) +) diff --git a/triedb/pathdb/states.go b/triedb/pathdb/states.go index ca58a05c283a..2261c1b287cc 100644 --- a/triedb/pathdb/states.go +++ b/triedb/pathdb/states.go @@ -23,8 +23,10 @@ import ( "slices" "sync" + "github.com/VictoriaMetrics/fastcache" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/rlp" @@ -539,6 +541,11 @@ func (s *stateSet) decode(r *rlp.Stream) error { return nil } +// write flushes state mutations into the provided database batch as a whole. +func (s *stateSet) write(db ethdb.KeyValueStore, batch ethdb.Batch, genMarker []byte, clean *fastcache.Cache) (int, int) { + return writeStates(db, batch, genMarker, s.destructSet, s.accountData, s.storageData, clean) +} + // reset clears all cached state data, including any optional sorted lists that // may have been generated. func (s *stateSet) reset() { @@ -552,8 +559,6 @@ func (s *stateSet) reset() { } // dbsize returns the approximate size for db write. -// -//nolint:unused func (s *stateSet) dbsize() int { m := (len(s.destructSet) + len(s.accountData)) * len(rawdb.SnapshotAccountPrefix) for _, slots := range s.storageData {