From 761ac5d45ffaaacc4991f534fc22c9330235e065 Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Tue, 10 Jun 2025 16:22:45 +0800 Subject: [PATCH 1/6] triedb/pathdb, eth: use double-buffer mechanism in pathdb --- core/state/snapshot/generate_test.go | 12 ++ core/state/statedb_test.go | 49 ++++--- triedb/pathdb/buffer.go | 107 +++++++++----- triedb/pathdb/database.go | 22 ++- triedb/pathdb/database_test.go | 1 + triedb/pathdb/disklayer.go | 202 +++++++++++++++++++-------- triedb/pathdb/generate.go | 2 +- triedb/pathdb/generate_test.go | 1 + triedb/pathdb/iterator_test.go | 12 ++ triedb/pathdb/journal.go | 10 +- triedb/pathdb/layertree.go | 4 + triedb/pathdb/layertree_test.go | 2 +- 12 files changed, 302 insertions(+), 122 deletions(-) diff --git a/core/state/snapshot/generate_test.go b/core/state/snapshot/generate_test.go index 3f83cb1a00b8..d5cf6dce3118 100644 --- a/core/state/snapshot/generate_test.go +++ b/core/state/snapshot/generate_test.go @@ -242,6 +242,18 @@ func (t *testHelper) Commit() common.Hash { } t.triedb.Update(root, types.EmptyRootHash, 0, t.nodes, t.states) t.triedb.Commit(root, false) + + // re-open the trie database to ensure the frozen buffer + // is not referenced + //config := &triedb.Config{} + //if t.triedb.Scheme() == rawdb.PathScheme { + // config.PathDB = &pathdb.Config{ + // SnapshotNoBuild: true, + // } // disable caching + //} else { + // config.HashDB = &hashdb.Config{} // disable caching + //} + //t.triedb = triedb.NewDatabase(t.triedb.Disk(), config) return root } diff --git a/core/state/statedb_test.go b/core/state/statedb_test.go index 2c87d81d4009..48c2fc49be3b 100644 --- a/core/state/statedb_test.go +++ b/core/state/statedb_test.go @@ -974,20 +974,23 @@ func TestMissingTrieNodes(t *testing.T) { func testMissingTrieNodes(t *testing.T, scheme string) { // Create an initial state with a few accounts var ( - tdb *triedb.Database - memDb = rawdb.NewMemoryDatabase() + tdb *triedb.Database + memDb = rawdb.NewMemoryDatabase() + openDb = func() *triedb.Database { + if scheme == rawdb.PathScheme { + return triedb.NewDatabase(memDb, &triedb.Config{PathDB: &pathdb.Config{ + TrieCleanSize: 0, + StateCleanSize: 0, + WriteBufferSize: 0, + }}) // disable caching + } else { + return triedb.NewDatabase(memDb, &triedb.Config{HashDB: &hashdb.Config{ + CleanCacheSize: 0, + }}) // disable caching + } + } ) - if scheme == rawdb.PathScheme { - tdb = triedb.NewDatabase(memDb, &triedb.Config{PathDB: &pathdb.Config{ - TrieCleanSize: 0, - StateCleanSize: 0, - WriteBufferSize: 0, - }}) // disable caching - } else { - tdb = triedb.NewDatabase(memDb, &triedb.Config{HashDB: &hashdb.Config{ - CleanCacheSize: 0, - }}) // disable caching - } + tdb = openDb() db := NewDatabase(tdb, nil) var root common.Hash @@ -1005,17 +1008,27 @@ func testMissingTrieNodes(t *testing.T, scheme string) { tdb.Commit(root, false) } // Create a new state on the old root - state, _ = New(root, db) // Now we clear out the memdb it := memDb.NewIterator(nil, nil) for it.Next() { k := it.Key() - // Leave the root intact - if !bytes.Equal(k, root[:]) { - t.Logf("key: %x", k) - memDb.Delete(k) + if scheme == rawdb.HashScheme { + if !bytes.Equal(k, root[:]) { + t.Logf("key: %x", k) + memDb.Delete(k) + } + } + if scheme == rawdb.PathScheme { + rk := k[len(rawdb.TrieNodeAccountPrefix):] + if len(rk) != 0 { + t.Logf("key: %x", k) + memDb.Delete(k) + } } } + tdb = openDb() + db = NewDatabase(tdb, nil) + state, _ = New(root, db) balance := state.GetBalance(addr) // The removed elem should lead to it returning zero balance if exp, got := uint64(0), balance.Uint64(); got != exp { diff --git a/triedb/pathdb/buffer.go b/triedb/pathdb/buffer.go index e639a43835de..8d2a6b694329 100644 --- a/triedb/pathdb/buffer.go +++ b/triedb/pathdb/buffer.go @@ -17,6 +17,7 @@ package pathdb import ( + "errors" "fmt" "time" @@ -37,6 +38,9 @@ type buffer struct { limit uint64 // The maximum memory allowance in bytes nodes *nodeSet // Aggregated trie node set states *stateSet // Aggregated state set + + done chan struct{} // notifier whether the content in buffer has been flushed or not + flushErr error // error if any exception occurs during flushing } // newBuffer initializes the buffer with the provided states and trie nodes. @@ -124,43 +128,74 @@ func (b *buffer) size() uint64 { // flush persists the in-memory dirty trie node into the disk if the configured // memory threshold is reached. Note, all data must be written atomically. -func (b *buffer) flush(root common.Hash, db ethdb.KeyValueStore, freezer ethdb.AncientWriter, progress []byte, nodesCache, statesCache *fastcache.Cache, id uint64) error { - // Ensure the target state id is aligned with the internal counter. - head := rawdb.ReadPersistentStateID(db) - if head+b.layers != id { - return fmt.Errorf("buffer layers (%d) cannot be applied on top of persisted state id (%d) to reach requested state id (%d)", b.layers, head, id) +func (b *buffer) flush(root common.Hash, db ethdb.KeyValueStore, freezer ethdb.AncientWriter, progress []byte, nodesCache, statesCache *fastcache.Cache, id uint64, postFlush func()) { + if b.done != nil { + panic("duplicated flush operation") } - // Terminate the state snapshot generation if it's active - var ( - start = time.Now() - batch = db.NewBatchWithSize((b.nodes.dbsize() + b.states.dbsize()) * 11 / 10) // extra 10% for potential pebble internal stuff - ) - // Explicitly sync the state freezer to ensure all written data is persisted to disk - // before updating the key-value store. - // - // This step is crucial to guarantee that the corresponding state history remains - // available for state rollback. - if freezer != nil { - if err := freezer.SyncAncient(); err != nil { - return err + b.done = make(chan struct{}) // allocate the channel for notification + + go func() { + defer func() { + if postFlush != nil { + postFlush() + } + close(b.done) + }() + + // Ensure the target state id is aligned with the internal counter. + head := rawdb.ReadPersistentStateID(db) + if head+b.layers != id { + b.flushErr = fmt.Errorf("buffer layers (%d) cannot be applied on top of persisted state id (%d) to reach requested state id (%d)", b.layers, head, id) + return } + + // Terminate the state snapshot generation if it's active + var ( + start = time.Now() + batch = db.NewBatchWithSize((b.nodes.dbsize() + b.states.dbsize()) * 11 / 10) // extra 10% for potential pebble internal stuff + ) + // Explicitly sync the state freezer to ensure all written data is persisted to disk + // before updating the key-value store. + // + // This step is crucial to guarantee that the corresponding state history remains + // available for state rollback. + if freezer != nil { + if err := freezer.SyncAncient(); err != nil { + b.flushErr = err + return + } + } + nodes := b.nodes.write(batch, nodesCache) + accounts, slots := b.states.write(batch, progress, statesCache) + rawdb.WritePersistentStateID(batch, id) + rawdb.WriteSnapshotRoot(batch, root) + + // Flush all mutations in a single batch + size := batch.ValueSize() + if err := batch.Write(); err != nil { + b.flushErr = err + return + } + commitBytesMeter.Mark(int64(size)) + commitNodesMeter.Mark(int64(nodes)) + commitAccountsMeter.Mark(int64(accounts)) + commitStoragesMeter.Mark(int64(slots)) + commitTimeTimer.UpdateSince(start) + + // The content in the frozen buffer is kept for consequent state access, + // TODO (rjl493456442) measure the gc overhead for holding this struct. + // TODO (rjl493456442) can we somehow get rid of it after flushing?? + b.reset() + log.Debug("Persisted buffer content", "nodes", nodes, "accounts", accounts, "slots", slots, "bytes", common.StorageSize(size), "elapsed", common.PrettyDuration(time.Since(start))) + }() +} + +// waitFlush blocks until the buffer has been fully flushed and returns any +// stored errors that occurred during the process. +func (b *buffer) waitFlush() error { + if b.done == nil { + return errors.New("the buffer is not frozen") } - nodes := b.nodes.write(batch, nodesCache) - accounts, slots := b.states.write(batch, progress, statesCache) - rawdb.WritePersistentStateID(batch, id) - rawdb.WriteSnapshotRoot(batch, root) - - // Flush all mutations in a single batch - size := batch.ValueSize() - if err := batch.Write(); err != nil { - return err - } - commitBytesMeter.Mark(int64(size)) - commitNodesMeter.Mark(int64(nodes)) - commitAccountsMeter.Mark(int64(accounts)) - commitStoragesMeter.Mark(int64(slots)) - commitTimeTimer.UpdateSince(start) - b.reset() - log.Debug("Persisted buffer content", "nodes", nodes, "accounts", accounts, "slots", slots, "bytes", common.StorageSize(size), "elapsed", common.PrettyDuration(time.Since(start))) - return nil + <-b.done + return b.flushErr } diff --git a/triedb/pathdb/database.go b/triedb/pathdb/database.go index 3174a7c964b3..83fb6ac4319d 100644 --- a/triedb/pathdb/database.go +++ b/triedb/pathdb/database.go @@ -119,7 +119,10 @@ type Config struct { StateCleanSize int // Maximum memory allowance (in bytes) for caching clean state data WriteBufferSize int // Maximum memory allowance (in bytes) for write buffer ReadOnly bool // Flag whether the database is opened in read only mode - SnapshotNoBuild bool // Flag Whether the background generation is allowed + + // Testing configurations + SnapshotNoBuild bool // Flag Whether the background generation is allowed + NoAsyncFlush bool // Flag whether the background generation is allowed } // sanitize checks the provided user configurations and changes anything that's @@ -434,6 +437,9 @@ func (db *Database) Disable() error { // Terminate the state generator if it's active and mark the disk layer // as stale to prevent access to persistent state. disk := db.tree.bottom() + if err := disk.waitFlush(); err != nil { + return err + } if disk.generator != nil { disk.generator.stop() } @@ -592,12 +598,18 @@ func (db *Database) Close() error { // following mutations. db.readOnly = true + // Block until the background flushing is finished. It must + // be done before terminating the potential background snapshot + // generator. + dl := db.tree.bottom() + if err := dl.waitFlush(); err != nil { + return err + } // Terminate the background generation if it's active - disk := db.tree.bottom() - if disk.generator != nil { - disk.generator.stop() + if dl.generator != nil { + dl.generator.stop() } - disk.resetCache() // release the memory held by clean cache + dl.resetCache() // release the memory held by clean cache // Close the attached state history freezer. if db.freezer == nil { diff --git a/triedb/pathdb/database_test.go b/triedb/pathdb/database_test.go index ca106cc27cf8..6fc099015c7c 100644 --- a/triedb/pathdb/database_test.go +++ b/triedb/pathdb/database_test.go @@ -129,6 +129,7 @@ func newTester(t *testing.T, historyLimit uint64, isVerkle bool, layers int) *te TrieCleanSize: 256 * 1024, StateCleanSize: 256 * 1024, WriteBufferSize: 256 * 1024, + NoAsyncFlush: true, }, isVerkle) obj = &tester{ diff --git a/triedb/pathdb/disklayer.go b/triedb/pathdb/disklayer.go index 1c9efb024bd6..19732317716b 100644 --- a/triedb/pathdb/disklayer.go +++ b/triedb/pathdb/disklayer.go @@ -41,9 +41,11 @@ type diskLayer struct { nodes *fastcache.Cache // GC friendly memory cache of clean nodes states *fastcache.Cache // GC friendly memory cache of clean states - buffer *buffer // Dirty buffer to aggregate writes of nodes and states - stale bool // Signals that the layer became stale (state progressed) - lock sync.RWMutex // Lock used to protect stale flag and genMarker + buffer *buffer // Live buffer to aggregate writes + frozen *buffer // Frozen node buffer waiting for flushing + + stale bool // Signals that the layer became stale (state progressed) + lock sync.RWMutex // Lock used to protect stale flag and genMarker // The generator is set if the state snapshot was not fully completed, // regardless of whether the background generation is running or not. @@ -51,7 +53,7 @@ type diskLayer struct { } // newDiskLayer creates a new disk layer based on the passing arguments. -func newDiskLayer(root common.Hash, id uint64, db *Database, nodes *fastcache.Cache, states *fastcache.Cache, buffer *buffer) *diskLayer { +func newDiskLayer(root common.Hash, id uint64, db *Database, nodes *fastcache.Cache, states *fastcache.Cache, buffer *buffer, frozen *buffer) *diskLayer { // Initialize the clean caches if the memory allowance is not zero // or reuse the provided caches if they are not nil (inherited from // the original disk layer). @@ -68,6 +70,7 @@ func newDiskLayer(root common.Hash, id uint64, db *Database, nodes *fastcache.Ca nodes: nodes, states: states, buffer: buffer, + frozen: frozen, } } @@ -114,16 +117,19 @@ func (dl *diskLayer) node(owner common.Hash, path []byte, depth int) ([]byte, co if dl.stale { return nil, common.Hash{}, nil, errSnapshotStale } - // Try to retrieve the trie node from the not-yet-written - // node buffer first. Note the buffer is lock free since - // it's impossible to mutate the buffer before tagging the - // layer as stale. - n, found := dl.buffer.node(owner, path) - if found { - dirtyNodeHitMeter.Mark(1) - dirtyNodeReadMeter.Mark(int64(len(n.Blob))) - dirtyNodeHitDepthHist.Update(int64(depth)) - return n.Blob, n.Hash, &nodeLoc{loc: locDirtyCache, depth: depth}, nil + // Try to retrieve the trie node from the not-yet-written node buffer first + // (both the live one and the frozen one). Note the buffer is lock free since + // it's impossible to mutate the buffer before tagging the layer as stale. + for _, buffer := range []*buffer{dl.buffer, dl.frozen} { + if buffer != nil && !buffer.empty() { + n, found := buffer.node(owner, path) + if found { + dirtyNodeHitMeter.Mark(1) + dirtyNodeReadMeter.Mark(int64(len(n.Blob))) + dirtyNodeHitDepthHist.Update(int64(depth)) + return n.Blob, n.Hash, &nodeLoc{loc: locDirtyCache, depth: depth}, nil + } + } } dirtyNodeMissMeter.Mark(1) @@ -144,6 +150,11 @@ func (dl *diskLayer) node(owner common.Hash, path []byte, depth int) ([]byte, co } else { blob = rawdb.ReadStorageTrieNode(dl.db.diskdb, owner, path) } + // Store the resolved data in the clean cache. The background buffer flusher + // may also write to the clean cache concurrently, but two writers cannot + // write the same item with different content. If the item already exists, + // it will be found in the frozen buffer, eliminating the need to check the + // database. if dl.nodes != nil && len(blob) > 0 { dl.nodes.Set(key, blob) cleanNodeWriteMeter.Mark(int64(len(blob))) @@ -162,22 +173,25 @@ func (dl *diskLayer) account(hash common.Hash, depth int) ([]byte, error) { if dl.stale { return nil, errSnapshotStale } - // Try to retrieve the account from the not-yet-written - // node buffer first. Note the buffer is lock free since - // it's impossible to mutate the buffer before tagging the - // layer as stale. - blob, found := dl.buffer.account(hash) - if found { - dirtyStateHitMeter.Mark(1) - dirtyStateReadMeter.Mark(int64(len(blob))) - dirtyStateHitDepthHist.Update(int64(depth)) - - if len(blob) == 0 { - stateAccountInexMeter.Mark(1) - } else { - stateAccountExistMeter.Mark(1) + // Try to retrieve the trie node from the not-yet-written node buffer first + // (both the live one and the frozen one). Note the buffer is lock free since + // it's impossible to mutate the buffer before tagging the layer as stale. + for _, buffer := range []*buffer{dl.buffer, dl.frozen} { + if buffer != nil && !buffer.empty() { + blob, found := buffer.account(hash) + if found { + dirtyStateHitMeter.Mark(1) + dirtyStateReadMeter.Mark(int64(len(blob))) + dirtyStateHitDepthHist.Update(int64(depth)) + + if len(blob) == 0 { + stateAccountInexMeter.Mark(1) + } else { + stateAccountExistMeter.Mark(1) + } + return blob, nil + } } - return blob, nil } dirtyStateMissMeter.Mark(1) @@ -203,7 +217,13 @@ func (dl *diskLayer) account(hash common.Hash, depth int) ([]byte, error) { cleanStateMissMeter.Mark(1) } // Try to retrieve the account from the disk. - blob = rawdb.ReadAccountSnapshot(dl.db.diskdb, hash) + blob := rawdb.ReadAccountSnapshot(dl.db.diskdb, hash) + + // Store the resolved data in the clean cache. The background buffer flusher + // may also write to the clean cache concurrently, but two writers cannot + // write the same item with different content. If the item already exists, + // it will be found in the frozen buffer, eliminating the need to check the + // database. if dl.states != nil { dl.states.Set(hash[:], blob) cleanStateWriteMeter.Mark(int64(len(blob))) @@ -231,21 +251,24 @@ func (dl *diskLayer) storage(accountHash, storageHash common.Hash, depth int) ([ if dl.stale { return nil, errSnapshotStale } - // Try to retrieve the storage slot from the not-yet-written - // node buffer first. Note the buffer is lock free since - // it's impossible to mutate the buffer before tagging the - // layer as stale. - if blob, found := dl.buffer.storage(accountHash, storageHash); found { - dirtyStateHitMeter.Mark(1) - dirtyStateReadMeter.Mark(int64(len(blob))) - dirtyStateHitDepthHist.Update(int64(depth)) - - if len(blob) == 0 { - stateStorageInexMeter.Mark(1) - } else { - stateStorageExistMeter.Mark(1) + // Try to retrieve the trie node from the not-yet-written node buffer first + // (both the live one and the frozen one). Note the buffer is lock free since + // it's impossible to mutate the buffer before tagging the layer as stale. + for _, buffer := range []*buffer{dl.buffer, dl.frozen} { + if buffer != nil && !buffer.empty() { + if blob, found := buffer.storage(accountHash, storageHash); found { + dirtyStateHitMeter.Mark(1) + dirtyStateReadMeter.Mark(int64(len(blob))) + dirtyStateHitDepthHist.Update(int64(depth)) + + if len(blob) == 0 { + stateStorageInexMeter.Mark(1) + } else { + stateStorageExistMeter.Mark(1) + } + return blob, nil + } } - return blob, nil } dirtyStateMissMeter.Mark(1) @@ -273,6 +296,12 @@ func (dl *diskLayer) storage(accountHash, storageHash common.Hash, depth int) ([ } // Try to retrieve the account from the disk blob := rawdb.ReadStorageSnapshot(dl.db.diskdb, accountHash, storageHash) + + // Store the resolved data in the clean cache. The background buffer flusher + // may also write to the clean cache concurrently, but two writers cannot + // write the same item with different content. If the item already exists, + // it will be found in the frozen buffer, eliminating the need to check the + // database. if dl.states != nil { dl.states.Set(key, blob) cleanStateWriteMeter.Mark(int64(len(blob))) @@ -341,7 +370,8 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) { // truncation) surpasses the persisted state ID, we take the necessary action // of forcibly committing the cached dirty states to ensure that the persisted // state ID remains higher. - if !force && rawdb.ReadPersistentStateID(dl.db.diskdb) < oldest { + persistedID := rawdb.ReadPersistentStateID(dl.db.diskdb) + if !force && persistedID < oldest { force = true } // Merge the trie nodes and flat states of the bottom-most diff layer into the @@ -351,32 +381,67 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) { // Terminate the background state snapshot generation before mutating the // persistent state. if combined.full() || force { + // Wait until the previous frozen buffer is fully flushed + if dl.frozen != nil { + if err := dl.frozen.waitFlush(); err != nil { + return nil, err + } + } + // Release the frozen buffer and the internally referenced maps will + // be reclaimed by GC. + dl.frozen = nil + // Terminate the background state snapshot generator before flushing // to prevent data race. - var progress []byte - if dl.generator != nil { - dl.generator.stop() - progress = dl.generator.progressMarker() + var ( + progress []byte + gen = dl.generator + ) + if gen != nil { + gen.stop() + progress = gen.progressMarker() // If the snapshot has been fully generated, unset the generator if progress == nil { + gen = nil dl.setGenerator(nil) } else { log.Info("Paused snapshot generation") } } - // Flush the content in combined buffer. Any state data after the progress - // marker will be ignored, as the generator will pick it up later. - if err := combined.flush(bottom.root, dl.db.diskdb, dl.db.freezer, progress, dl.nodes, dl.states, bottom.stateID()); err != nil { - return nil, err + + // Freeze the live buffer and schedule background flushing + dl.frozen = combined + dl.frozen.flush(bottom.root, dl.db.diskdb, dl.db.freezer, progress, dl.nodes, dl.states, bottom.stateID(), func() { + // Resume the background generation if it's not completed yet. + // The generator is assumed to be available if the progress is + // not nil. + // + // Notably, the generator will be shared and linked by all the + // disk layer instances, regardless of the generation is terminated + // or not. + if progress != nil { + gen.run(bottom.root) + } + }) + // Block until the frozen buffer is fully flushed out if the async flushing + // is not allowed. + if dl.db.config.NoAsyncFlush { + if err := dl.frozen.waitFlush(); err != nil { + return nil, err + } } - // Resume the background generation if it's not completed yet - if progress != nil { - dl.generator.run(bottom.root) + // Block until the frozen buffer is fully flushed out if the oldest history + // surpasses the persisted state ID. + if persistedID < oldest { + if err := dl.frozen.waitFlush(); err != nil { + return nil, err + } } + combined = newBuffer(dl.db.config.WriteBufferSize, nil, nil, 0) } // Link the generator if snapshot is not yet completed - ndl := newDiskLayer(bottom.root, bottom.stateID(), dl.db, dl.nodes, dl.states, combined) + ndl := newDiskLayer(bottom.root, bottom.stateID(), dl.db, dl.nodes, dl.states, combined, dl.frozen) if dl.generator != nil { ndl.setGenerator(dl.generator) } @@ -428,7 +493,7 @@ func (dl *diskLayer) revert(h *history) (*diskLayer, error) { if err != nil { return nil, err } - ndl := newDiskLayer(h.meta.parent, dl.id-1, dl.db, dl.nodes, dl.states, dl.buffer) + ndl := newDiskLayer(h.meta.parent, dl.id-1, dl.db, dl.nodes, dl.states, dl.buffer, dl.frozen) // Link the generator if it exists if dl.generator != nil { @@ -437,6 +502,16 @@ func (dl *diskLayer) revert(h *history) (*diskLayer, error) { log.Debug("Reverted data in write buffer", "oldroot", h.meta.root, "newroot", h.meta.parent, "elapsed", common.PrettyDuration(time.Since(start))) return ndl, nil } + // Block until the frozen buffer is fully flushed + if dl.frozen != nil { + if err := dl.frozen.waitFlush(); err != nil { + return nil, err + } + // Unset the frozen buffer if it exists, otherwise these "reverted" + // states will still be accessible after revert in frozen buffer. + dl.frozen = nil + } + // Terminate the generation before writing any data into database var progress []byte if dl.generator != nil { @@ -455,7 +530,7 @@ func (dl *diskLayer) revert(h *history) (*diskLayer, error) { } // Link the generator and resume generation if the snapshot is not yet // fully completed. - ndl := newDiskLayer(h.meta.parent, dl.id-1, dl.db, dl.nodes, dl.states, dl.buffer) + ndl := newDiskLayer(h.meta.parent, dl.id-1, dl.db, dl.nodes, dl.states, dl.buffer, dl.frozen) if dl.generator != nil && !dl.generator.completed() { ndl.generator = dl.generator ndl.generator.run(h.meta.parent) @@ -500,3 +575,12 @@ func (dl *diskLayer) genMarker() []byte { } return dl.generator.progressMarker() } + +// waitFlush blocks until the buffer has been fully flushed and returns any +// stored errors that occurred during the process. +func (dl *diskLayer) waitFlush() error { + if dl.frozen == nil { + return nil + } + return dl.frozen.waitFlush() +} diff --git a/triedb/pathdb/generate.go b/triedb/pathdb/generate.go index f4f98c9d19f2..2efbbbb4e1ea 100644 --- a/triedb/pathdb/generate.go +++ b/triedb/pathdb/generate.go @@ -186,7 +186,7 @@ func generateSnapshot(triedb *Database, root common.Hash, noBuild bool) *diskLay stats = &generatorStats{start: time.Now()} genMarker = []byte{} // Initialized but empty! ) - dl := newDiskLayer(root, 0, triedb, nil, nil, newBuffer(triedb.config.WriteBufferSize, nil, nil, 0)) + dl := newDiskLayer(root, 0, triedb, nil, nil, newBuffer(triedb.config.WriteBufferSize, nil, nil, 0), nil) dl.setGenerator(newGenerator(triedb.diskdb, noBuild, genMarker, stats)) if !noBuild { diff --git a/triedb/pathdb/generate_test.go b/triedb/pathdb/generate_test.go index 23efb0e3c551..f38a1ed7c43a 100644 --- a/triedb/pathdb/generate_test.go +++ b/triedb/pathdb/generate_test.go @@ -49,6 +49,7 @@ func newGenTester() *genTester { disk := rawdb.NewMemoryDatabase() config := *Defaults config.SnapshotNoBuild = true // no background generation + config.NoAsyncFlush = true // no async flush db := New(disk, &config, false) tr, _ := trie.New(trie.StateTrieID(types.EmptyRootHash), db) return &genTester{ diff --git a/triedb/pathdb/iterator_test.go b/triedb/pathdb/iterator_test.go index 6517735d7e9b..29cb8dfd4712 100644 --- a/triedb/pathdb/iterator_test.go +++ b/triedb/pathdb/iterator_test.go @@ -255,6 +255,7 @@ func TestFastIteratorBasics(t *testing.T) { func TestAccountIteratorTraversal(t *testing.T) { config := &Config{ WriteBufferSize: 0, + NoAsyncFlush: true, } db := New(rawdb.NewMemoryDatabase(), config, false) db.waitGeneration() @@ -299,6 +300,7 @@ func TestAccountIteratorTraversal(t *testing.T) { func TestStorageIteratorTraversal(t *testing.T) { config := &Config{ WriteBufferSize: 0, + NoAsyncFlush: true, } db := New(rawdb.NewMemoryDatabase(), config, false) db.waitGeneration() @@ -343,6 +345,7 @@ func TestStorageIteratorTraversal(t *testing.T) { func TestAccountIteratorTraversalValues(t *testing.T) { config := &Config{ WriteBufferSize: 0, + NoAsyncFlush: true, } db := New(rawdb.NewMemoryDatabase(), config, false) db.waitGeneration() @@ -459,6 +462,7 @@ func TestAccountIteratorTraversalValues(t *testing.T) { func TestStorageIteratorTraversalValues(t *testing.T) { config := &Config{ WriteBufferSize: 0, + NoAsyncFlush: true, } db := New(rawdb.NewMemoryDatabase(), config, false) db.waitGeneration() @@ -592,6 +596,7 @@ func TestAccountIteratorLargeTraversal(t *testing.T) { // Build up a large stack of snapshots config := &Config{ WriteBufferSize: 0, + NoAsyncFlush: true, } db := New(rawdb.NewMemoryDatabase(), config, false) db.waitGeneration() @@ -664,6 +669,7 @@ func TestAccountIteratorFlattening(t *testing.T) { func TestAccountIteratorSeek(t *testing.T) { config := &Config{ WriteBufferSize: 0, + NoAsyncFlush: true, } db := New(rawdb.NewMemoryDatabase(), config, false) db.waitGeneration() @@ -736,6 +742,7 @@ func TestStorageIteratorSeek(t *testing.T) { func testStorageIteratorSeek(t *testing.T, newIterator func(db *Database, root, account, seek common.Hash) StorageIterator) { config := &Config{ WriteBufferSize: 0, + NoAsyncFlush: true, } db := New(rawdb.NewMemoryDatabase(), config, false) db.waitGeneration() @@ -808,6 +815,7 @@ func TestAccountIteratorDeletions(t *testing.T) { func testAccountIteratorDeletions(t *testing.T, newIterator func(db *Database, root, seek common.Hash) AccountIterator) { config := &Config{ WriteBufferSize: 0, + NoAsyncFlush: true, } db := New(rawdb.NewMemoryDatabase(), config, false) db.waitGeneration() @@ -848,6 +856,7 @@ func testAccountIteratorDeletions(t *testing.T, newIterator func(db *Database, r func TestStorageIteratorDeletions(t *testing.T) { config := &Config{ WriteBufferSize: 0, + NoAsyncFlush: true, } db := New(rawdb.NewMemoryDatabase(), config, false) db.waitGeneration() @@ -916,6 +925,7 @@ func TestStaleIterator(t *testing.T) { func testStaleIterator(t *testing.T, newIter func(db *Database, hash common.Hash) Iterator) { config := &Config{ WriteBufferSize: 16 * 1024 * 1024, + NoAsyncFlush: true, } db := New(rawdb.NewMemoryDatabase(), config, false) db.waitGeneration() @@ -971,6 +981,7 @@ func BenchmarkAccountIteratorTraversal(b *testing.B) { } config := &Config{ WriteBufferSize: 0, + NoAsyncFlush: true, } db := New(rawdb.NewMemoryDatabase(), config, false) db.waitGeneration() @@ -1066,6 +1077,7 @@ func BenchmarkAccountIteratorLargeBaselayer(b *testing.B) { } config := &Config{ WriteBufferSize: 0, + NoAsyncFlush: true, } db := New(rawdb.NewMemoryDatabase(), config, false) db.waitGeneration() diff --git a/triedb/pathdb/journal.go b/triedb/pathdb/journal.go index 5dc6da92b85b..cf81e2199da8 100644 --- a/triedb/pathdb/journal.go +++ b/triedb/pathdb/journal.go @@ -160,7 +160,7 @@ func (db *Database) loadLayers() layer { log.Info("Failed to load journal, discard it", "err", err) } // Return single layer with persistent state. - return newDiskLayer(root, rawdb.ReadPersistentStateID(db.diskdb), db, nil, nil, newBuffer(db.config.WriteBufferSize, nil, nil, 0)) + return newDiskLayer(root, rawdb.ReadPersistentStateID(db.diskdb), db, nil, nil, newBuffer(db.config.WriteBufferSize, nil, nil, 0), nil) } // loadDiskLayer reads the binary blob from the layer journal, reconstructing @@ -192,7 +192,7 @@ func (db *Database) loadDiskLayer(r *rlp.Stream) (layer, error) { if err := states.decode(r); err != nil { return nil, err } - return newDiskLayer(root, id, db, nil, nil, newBuffer(db.config.WriteBufferSize, &nodes, &states, id-stored)), nil + return newDiskLayer(root, id, db, nil, nil, newBuffer(db.config.WriteBufferSize, &nodes, &states, id-stored), nil), nil } // loadDiffLayer reads the next sections of a layer journal, reconstructing a new @@ -301,6 +301,12 @@ func (db *Database) Journal(root common.Hash) error { } else { // disk layer only on noop runs (likely) or deep reorgs (unlikely) log.Info("Persisting dirty state to disk", "root", root, "layers", disk.buffer.layers) } + // Block until the background flushing is finished. It must + // be done before terminating the potential background snapshot + // generator. + if err := disk.waitFlush(); err != nil { + return err + } // Terminate the background state generation if it's active if disk.generator != nil { disk.generator.stop() diff --git a/triedb/pathdb/layertree.go b/triedb/pathdb/layertree.go index b2f3f7f37ded..a0615bc8d4cf 100644 --- a/triedb/pathdb/layertree.go +++ b/triedb/pathdb/layertree.go @@ -195,6 +195,10 @@ func (tree *layerTree) cap(root common.Hash, layers int) error { } tree.base = base + // Block until the frozen buffer is fully flushed + if err := base.waitFlush(); err != nil { + return err + } // Reset the layer tree with the single new disk layer tree.layers = map[common.Hash]layer{ base.rootHash(): base, diff --git a/triedb/pathdb/layertree_test.go b/triedb/pathdb/layertree_test.go index a72ff5899eed..a76d60ba5b24 100644 --- a/triedb/pathdb/layertree_test.go +++ b/triedb/pathdb/layertree_test.go @@ -27,7 +27,7 @@ import ( func newTestLayerTree() *layerTree { db := New(rawdb.NewMemoryDatabase(), nil, false) - l := newDiskLayer(common.Hash{0x1}, 0, db, nil, nil, newBuffer(0, nil, nil, 0)) + l := newDiskLayer(common.Hash{0x1}, 0, db, nil, nil, newBuffer(0, nil, nil, 0), nil) t := newLayerTree(l) return t } From dbf6d0460b3d74cf8aac0b0376333a265b722235 Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Wed, 18 Jun 2025 15:58:56 +0800 Subject: [PATCH 2/6] triedb/pathdb: polish --- core/state/snapshot/generate_test.go | 13 +------ triedb/pathdb/buffer.go | 16 ++++++--- triedb/pathdb/database.go | 18 ++++------ triedb/pathdb/disklayer.go | 54 +++++++++++++++++----------- triedb/pathdb/journal.go | 6 +--- triedb/pathdb/layertree.go | 4 --- 6 files changed, 55 insertions(+), 56 deletions(-) diff --git a/core/state/snapshot/generate_test.go b/core/state/snapshot/generate_test.go index d5cf6dce3118..44886300958e 100644 --- a/core/state/snapshot/generate_test.go +++ b/core/state/snapshot/generate_test.go @@ -168,6 +168,7 @@ func newHelper(scheme string) *testHelper { if scheme == rawdb.PathScheme { config.PathDB = &pathdb.Config{ SnapshotNoBuild: true, + NoAsyncFlush: true, } // disable caching } else { config.HashDB = &hashdb.Config{} // disable caching @@ -242,18 +243,6 @@ func (t *testHelper) Commit() common.Hash { } t.triedb.Update(root, types.EmptyRootHash, 0, t.nodes, t.states) t.triedb.Commit(root, false) - - // re-open the trie database to ensure the frozen buffer - // is not referenced - //config := &triedb.Config{} - //if t.triedb.Scheme() == rawdb.PathScheme { - // config.PathDB = &pathdb.Config{ - // SnapshotNoBuild: true, - // } // disable caching - //} else { - // config.HashDB = &hashdb.Config{} // disable caching - //} - //t.triedb = triedb.NewDatabase(t.triedb.Disk(), config) return root } diff --git a/triedb/pathdb/buffer.go b/triedb/pathdb/buffer.go index 8d2a6b694329..138962110f08 100644 --- a/triedb/pathdb/buffer.go +++ b/triedb/pathdb/buffer.go @@ -39,8 +39,12 @@ type buffer struct { nodes *nodeSet // Aggregated trie node set states *stateSet // Aggregated state set - done chan struct{} // notifier whether the content in buffer has been flushed or not - flushErr error // error if any exception occurs during flushing + // done is the notifier whether the content in buffer has been flushed or not. + // This channel is nil if the buffer is not frozen. + done chan struct{} + + // flushErr memorizes the error if any exception occurs during flushing + flushErr error } // newBuffer initializes the buffer with the provided states and trie nodes. @@ -65,7 +69,7 @@ func (b *buffer) account(hash common.Hash) ([]byte, bool) { return b.states.account(hash) } -// storage retrieves the storage slot with account address hash and slot key. +// storage retrieves the storage slot with account address hash and slot key hash. func (b *buffer) storage(addrHash common.Hash, storageHash common.Hash) ([]byte, bool) { return b.states.storage(addrHash, storageHash) } @@ -134,6 +138,8 @@ func (b *buffer) flush(root common.Hash, db ethdb.KeyValueStore, freezer ethdb.A } b.done = make(chan struct{}) // allocate the channel for notification + // Schedule the background thread to construct the batch, which usually + // take a few seconds. go func() { defer func() { if postFlush != nil { @@ -185,7 +191,9 @@ func (b *buffer) flush(root common.Hash, db ethdb.KeyValueStore, freezer ethdb.A // The content in the frozen buffer is kept for consequent state access, // TODO (rjl493456442) measure the gc overhead for holding this struct. // TODO (rjl493456442) can we somehow get rid of it after flushing?? - b.reset() + // TODO (rjl493456442) buffer itself is not thread-safe, add the lock + // protection if try to reset the buffer here. + // b.reset() log.Debug("Persisted buffer content", "nodes", nodes, "accounts", accounts, "slots", slots, "bytes", common.StorageSize(size), "elapsed", common.PrettyDuration(time.Since(start))) }() } diff --git a/triedb/pathdb/database.go b/triedb/pathdb/database.go index 83fb6ac4319d..159c91a7c17c 100644 --- a/triedb/pathdb/database.go +++ b/triedb/pathdb/database.go @@ -437,12 +437,9 @@ func (db *Database) Disable() error { // Terminate the state generator if it's active and mark the disk layer // as stale to prevent access to persistent state. disk := db.tree.bottom() - if err := disk.waitFlush(); err != nil { + if err := disk.terminate(); err != nil { return err } - if disk.generator != nil { - disk.generator.stop() - } disk.markStale() // Write the initial sync flag to persist it across restarts. @@ -602,13 +599,9 @@ func (db *Database) Close() error { // be done before terminating the potential background snapshot // generator. dl := db.tree.bottom() - if err := dl.waitFlush(); err != nil { + if err := dl.terminate(); err != nil { return err } - // Terminate the background generation if it's active - if dl.generator != nil { - dl.generator.stop() - } dl.resetCache() // release the memory held by clean cache // Close the attached state history freezer. @@ -677,6 +670,9 @@ func (db *Database) HistoryRange() (uint64, uint64, error) { // waitGeneration waits until the background generation is finished. It assumes // that the generation is permitted; otherwise, it will block indefinitely. func (db *Database) waitGeneration() { + db.lock.RLock() + defer db.lock.RUnlock() + gen := db.tree.bottom().generator if gen == nil || gen.completed() { return @@ -693,7 +689,7 @@ func (db *Database) AccountIterator(root common.Hash, seek common.Hash) (Account if wait { return nil, errDatabaseWaitSync } - if gen := db.tree.bottom().generator; gen != nil && !gen.completed() { + if !db.tree.bottom().genComplete() { return nil, errNotConstructed } return newFastAccountIterator(db, root, seek) @@ -708,7 +704,7 @@ func (db *Database) StorageIterator(root common.Hash, account common.Hash, seek if wait { return nil, errDatabaseWaitSync } - if gen := db.tree.bottom().generator; gen != nil && !gen.completed() { + if !db.tree.bottom().genComplete() { return nil, errNotConstructed } return newFastStorageIterator(db, root, account, seek) diff --git a/triedb/pathdb/disklayer.go b/triedb/pathdb/disklayer.go index 19732317716b..b360a8f15810 100644 --- a/triedb/pathdb/disklayer.go +++ b/triedb/pathdb/disklayer.go @@ -49,6 +49,7 @@ type diskLayer struct { // The generator is set if the state snapshot was not fully completed, // regardless of whether the background generation is running or not. + // It should only be unset if the generation completes. generator *generator } @@ -121,7 +122,7 @@ func (dl *diskLayer) node(owner common.Hash, path []byte, depth int) ([]byte, co // (both the live one and the frozen one). Note the buffer is lock free since // it's impossible to mutate the buffer before tagging the layer as stale. for _, buffer := range []*buffer{dl.buffer, dl.frozen} { - if buffer != nil && !buffer.empty() { + if buffer != nil { n, found := buffer.node(owner, path) if found { dirtyNodeHitMeter.Mark(1) @@ -177,7 +178,7 @@ func (dl *diskLayer) account(hash common.Hash, depth int) ([]byte, error) { // (both the live one and the frozen one). Note the buffer is lock free since // it's impossible to mutate the buffer before tagging the layer as stale. for _, buffer := range []*buffer{dl.buffer, dl.frozen} { - if buffer != nil && !buffer.empty() { + if buffer != nil { blob, found := buffer.account(hash) if found { dirtyStateHitMeter.Mark(1) @@ -255,7 +256,7 @@ func (dl *diskLayer) storage(accountHash, storageHash common.Hash, depth int) ([ // (both the live one and the frozen one). Note the buffer is lock free since // it's impossible to mutate the buffer before tagging the layer as stale. for _, buffer := range []*buffer{dl.buffer, dl.frozen} { - if buffer != nil && !buffer.empty() { + if buffer != nil { if blob, found := buffer.storage(accountHash, storageHash); found { dirtyStateHitMeter.Mark(1) dirtyStateReadMeter.Mark(int64(len(blob))) @@ -403,7 +404,6 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) { // If the snapshot has been fully generated, unset the generator if progress == nil { - gen = nil dl.setGenerator(nil) } else { log.Info("Paused snapshot generation") @@ -425,18 +425,12 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) { } }) // Block until the frozen buffer is fully flushed out if the async flushing - // is not allowed. - if dl.db.config.NoAsyncFlush { - if err := dl.frozen.waitFlush(); err != nil { - return nil, err - } - } - // Block until the frozen buffer is fully flushed out if the oldest history - // surpasses the persisted state ID. - if persistedID < oldest { + // is not allowed, or if the oldest history surpasses the persisted state ID. + if dl.db.config.NoAsyncFlush || persistedID < oldest { if err := dl.frozen.waitFlush(); err != nil { return nil, err } + dl.frozen = nil } combined = newBuffer(dl.db.config.WriteBufferSize, nil, nil, 0) } @@ -512,7 +506,9 @@ func (dl *diskLayer) revert(h *history) (*diskLayer, error) { dl.frozen = nil } - // Terminate the generation before writing any data into database + // Terminate the generator before writing any data to the database. + // This must be done after flushing the frozen buffer, as the generator + // may be restarted at the end of the flush process. var progress []byte if dl.generator != nil { dl.generator.stop() @@ -576,11 +572,29 @@ func (dl *diskLayer) genMarker() []byte { return dl.generator.progressMarker() } -// waitFlush blocks until the buffer has been fully flushed and returns any -// stored errors that occurred during the process. -func (dl *diskLayer) waitFlush() error { - if dl.frozen == nil { - return nil +// genComplete returns a flag indicating whether the state snapshot has been +// fully generated. +func (dl *diskLayer) genComplete() bool { + dl.lock.RLock() + defer dl.lock.RUnlock() + + return dl.genMarker() == nil +} + +// terminate releases the frozen buffer if it's not nil and terminates the +// background state generator. +func (dl *diskLayer) terminate() error { + dl.lock.Lock() + defer dl.lock.Unlock() + + if dl.frozen != nil { + if err := dl.frozen.waitFlush(); err != nil { + return err + } + dl.frozen = nil } - return dl.frozen.waitFlush() + if dl.generator != nil { + dl.generator.stop() + } + return nil } diff --git a/triedb/pathdb/journal.go b/triedb/pathdb/journal.go index cf81e2199da8..d4a6b3e262dc 100644 --- a/triedb/pathdb/journal.go +++ b/triedb/pathdb/journal.go @@ -304,13 +304,9 @@ func (db *Database) Journal(root common.Hash) error { // Block until the background flushing is finished. It must // be done before terminating the potential background snapshot // generator. - if err := disk.waitFlush(); err != nil { + if err := disk.terminate(); err != nil { return err } - // Terminate the background state generation if it's active - if disk.generator != nil { - disk.generator.stop() - } start := time.Now() // Run the journaling diff --git a/triedb/pathdb/layertree.go b/triedb/pathdb/layertree.go index a0615bc8d4cf..b2f3f7f37ded 100644 --- a/triedb/pathdb/layertree.go +++ b/triedb/pathdb/layertree.go @@ -195,10 +195,6 @@ func (tree *layerTree) cap(root common.Hash, layers int) error { } tree.base = base - // Block until the frozen buffer is fully flushed - if err := base.waitFlush(); err != nil { - return err - } // Reset the layer tree with the single new disk layer tree.layers = map[common.Hash]layer{ base.rootHash(): base, From 5b80a7af3885019353b0aa6790cc187ab79870ec Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Thu, 19 Jun 2025 14:19:23 +0800 Subject: [PATCH 3/6] core/state: improve test --- core/state/statedb_test.go | 33 +++++------- triedb/pathdb/database.go | 24 ++++----- triedb/pathdb/disklayer.go | 11 ++++ triedb/pathdb/iterator_binary.go | 8 +++ triedb/pathdb/iterator_fast.go | 10 ++++ triedb/pathdb/iterator_test.go | 89 ++++++++++++++------------------ 6 files changed, 92 insertions(+), 83 deletions(-) diff --git a/core/state/statedb_test.go b/core/state/statedb_test.go index 48c2fc49be3b..061ac5975259 100644 --- a/core/state/statedb_test.go +++ b/core/state/statedb_test.go @@ -974,23 +974,21 @@ func TestMissingTrieNodes(t *testing.T) { func testMissingTrieNodes(t *testing.T, scheme string) { // Create an initial state with a few accounts var ( - tdb *triedb.Database - memDb = rawdb.NewMemoryDatabase() - openDb = func() *triedb.Database { - if scheme == rawdb.PathScheme { - return triedb.NewDatabase(memDb, &triedb.Config{PathDB: &pathdb.Config{ - TrieCleanSize: 0, - StateCleanSize: 0, - WriteBufferSize: 0, - }}) // disable caching - } else { - return triedb.NewDatabase(memDb, &triedb.Config{HashDB: &hashdb.Config{ - CleanCacheSize: 0, - }}) // disable caching - } - } + tdb *triedb.Database + memDb = rawdb.NewMemoryDatabase() ) - tdb = openDb() + if scheme == rawdb.PathScheme { + tdb = triedb.NewDatabase(memDb, &triedb.Config{PathDB: &pathdb.Config{ + TrieCleanSize: 0, + StateCleanSize: 0, + WriteBufferSize: 0, + NoAsyncFlush: true, + }}) // disable caching + } else { + tdb = triedb.NewDatabase(memDb, &triedb.Config{HashDB: &hashdb.Config{ + CleanCacheSize: 0, + }}) // disable caching + } db := NewDatabase(tdb, nil) var root common.Hash @@ -1007,7 +1005,6 @@ func testMissingTrieNodes(t *testing.T, scheme string) { // force-flush tdb.Commit(root, false) } - // Create a new state on the old root // Now we clear out the memdb it := memDb.NewIterator(nil, nil) for it.Next() { @@ -1026,8 +1023,6 @@ func testMissingTrieNodes(t *testing.T, scheme string) { } } } - tdb = openDb() - db = NewDatabase(tdb, nil) state, _ = New(root, db) balance := state.GetBalance(addr) // The removed elem should lead to it returning zero balance diff --git a/triedb/pathdb/database.go b/triedb/pathdb/database.go index 159c91a7c17c..9795d81e6365 100644 --- a/triedb/pathdb/database.go +++ b/triedb/pathdb/database.go @@ -121,8 +121,9 @@ type Config struct { ReadOnly bool // Flag whether the database is opened in read only mode // Testing configurations - SnapshotNoBuild bool // Flag Whether the background generation is allowed - NoAsyncFlush bool // Flag whether the background generation is allowed + SnapshotNoBuild bool // Flag Whether the state generation is allowed + NoAsyncFlush bool // Flag whether the background buffer flushing is allowed + NoAsyncGeneration bool // Flag whether the background generation is allowed } // sanitize checks the provided user configurations and changes anything that's @@ -369,6 +370,12 @@ func (db *Database) setStateGenerator() error { } stats.log("Starting snapshot generation", root, generator.Marker) dl.generator.run(root) + + // Block until the generation completes. It's the feature used in + // unit tests. + if db.config.NoAsyncGeneration { + <-dl.generator.done + } return nil } @@ -667,19 +674,6 @@ func (db *Database) HistoryRange() (uint64, uint64, error) { return historyRange(db.freezer) } -// waitGeneration waits until the background generation is finished. It assumes -// that the generation is permitted; otherwise, it will block indefinitely. -func (db *Database) waitGeneration() { - db.lock.RLock() - defer db.lock.RUnlock() - - gen := db.tree.bottom().generator - if gen == nil || gen.completed() { - return - } - <-gen.done -} - // AccountIterator creates a new account iterator for the specified root hash and // seeks to a starting account hash. func (db *Database) AccountIterator(root common.Hash, seek common.Hash) (AccountIterator, error) { diff --git a/triedb/pathdb/disklayer.go b/triedb/pathdb/disklayer.go index b360a8f15810..494164987f3b 100644 --- a/triedb/pathdb/disklayer.go +++ b/triedb/pathdb/disklayer.go @@ -581,6 +581,17 @@ func (dl *diskLayer) genComplete() bool { return dl.genMarker() == nil } +// waitFlush blocks until the background buffer flush is completed. +func (dl *diskLayer) waitFlush() error { + dl.lock.RLock() + defer dl.lock.RUnlock() + + if dl.frozen == nil { + return nil + } + return dl.frozen.waitFlush() +} + // terminate releases the frozen buffer if it's not nil and terminates the // background state generator. func (dl *diskLayer) terminate() error { diff --git a/triedb/pathdb/iterator_binary.go b/triedb/pathdb/iterator_binary.go index 0620081d0c12..97a291898960 100644 --- a/triedb/pathdb/iterator_binary.go +++ b/triedb/pathdb/iterator_binary.go @@ -45,6 +45,10 @@ type binaryIterator struct { // accounts in a slow, but easily verifiable way. Note this function is used // for initialization, use `newBinaryAccountIterator` as the API. func (dl *diskLayer) initBinaryAccountIterator(seek common.Hash) *binaryIterator { + // Block until the frozen buffer flushing is completed. + if err := dl.waitFlush(); err != nil { + panic(err) + } // The state set in the disk layer is mutable, hold the lock before obtaining // the account list to prevent concurrent map iteration and write. dl.lock.RLock() @@ -113,6 +117,10 @@ func (dl *diffLayer) initBinaryAccountIterator(seek common.Hash) *binaryIterator // storage slots in a slow, but easily verifiable way. Note this function is used // for initialization, use `newBinaryStorageIterator` as the API. func (dl *diskLayer) initBinaryStorageIterator(account common.Hash, seek common.Hash) *binaryIterator { + // Block until the frozen buffer flushing is completed. + if err := dl.waitFlush(); err != nil { + panic(err) + } // The state set in the disk layer is mutable, hold the lock before obtaining // the storage list to prevent concurrent map iteration and write. dl.lock.RLock() diff --git a/triedb/pathdb/iterator_fast.go b/triedb/pathdb/iterator_fast.go index 87b2dd567ac8..04bf89e874e3 100644 --- a/triedb/pathdb/iterator_fast.go +++ b/triedb/pathdb/iterator_fast.go @@ -76,6 +76,11 @@ func newFastIterator(db *Database, root common.Hash, account common.Hash, seek c if accountIterator { switch dl := current.(type) { case *diskLayer: + // Ensure no active background buffer flush is in progress, otherwise, + // part of the state data may become invisible. + if err := dl.waitFlush(); err != nil { + return nil, err + } // The state set in the disk layer is mutable, hold the lock before obtaining // the account list to prevent concurrent map iteration and write. dl.lock.RLock() @@ -113,6 +118,11 @@ func newFastIterator(db *Database, root common.Hash, account common.Hash, seek c } else { switch dl := current.(type) { case *diskLayer: + // Ensure no active background buffer flush is in progress, otherwise, + // part of the state data may become invisible. + if err := dl.waitFlush(); err != nil { + return nil, err + } // The state set in the disk layer is mutable, hold the lock before obtaining // the storage list to prevent concurrent map iteration and write. dl.lock.RLock() diff --git a/triedb/pathdb/iterator_test.go b/triedb/pathdb/iterator_test.go index 29cb8dfd4712..b24575cb470f 100644 --- a/triedb/pathdb/iterator_test.go +++ b/triedb/pathdb/iterator_test.go @@ -254,11 +254,9 @@ func TestFastIteratorBasics(t *testing.T) { // TestAccountIteratorTraversal tests some simple multi-layer iteration. func TestAccountIteratorTraversal(t *testing.T) { config := &Config{ - WriteBufferSize: 0, - NoAsyncFlush: true, + NoAsyncGeneration: true, } db := New(rawdb.NewMemoryDatabase(), config, false) - db.waitGeneration() // Stack three diff layers on top with various overlaps db.Update(common.HexToHash("0x02"), types.EmptyRootHash, 0, trienode.NewMergedNodeSet(), @@ -299,11 +297,9 @@ func TestAccountIteratorTraversal(t *testing.T) { func TestStorageIteratorTraversal(t *testing.T) { config := &Config{ - WriteBufferSize: 0, - NoAsyncFlush: true, + NoAsyncGeneration: true, } db := New(rawdb.NewMemoryDatabase(), config, false) - db.waitGeneration() // Stack three diff layers on top with various overlaps db.Update(common.HexToHash("0x02"), types.EmptyRootHash, 0, trienode.NewMergedNodeSet(), @@ -313,14 +309,14 @@ func TestStorageIteratorTraversal(t *testing.T) { NewStateSetWithOrigin(randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x04", "0x05", "0x06"}}, nil), nil, nil, false)) db.Update(common.HexToHash("0x04"), common.HexToHash("0x03"), 0, trienode.NewMergedNodeSet(), - NewStateSetWithOrigin(randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x01", "0x02", "0x03"}}, nil), nil, nil, false)) + NewStateSetWithOrigin(randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x01", "0x02"}}, nil), nil, nil, false)) // Verify the single and multi-layer iterators head := db.tree.get(common.HexToHash("0x04")) // singleLayer: 0x1, 0x2, 0x3 diffIter := newDiffStorageIterator(common.HexToHash("0xaa"), common.Hash{}, head.(*diffLayer).states.stateSet.storageList(common.HexToHash("0xaa")), nil) - verifyIterator(t, 3, diffIter, verifyNothing) + verifyIterator(t, 2, diffIter, verifyNothing) // binaryIterator: 0x1, 0x2, 0x3, 0x4, 0x5, 0x6 verifyIterator(t, 6, head.(*diffLayer).newBinaryStorageIterator(common.HexToHash("0xaa"), common.Hash{}), verifyStorage) @@ -344,11 +340,9 @@ func TestStorageIteratorTraversal(t *testing.T) { // also expect the correct values to show up. func TestAccountIteratorTraversalValues(t *testing.T) { config := &Config{ - WriteBufferSize: 0, - NoAsyncFlush: true, + NoAsyncGeneration: true, } db := New(rawdb.NewMemoryDatabase(), config, false) - db.waitGeneration() // Create a batch of account sets to seed subsequent layers with var ( @@ -461,11 +455,9 @@ func TestAccountIteratorTraversalValues(t *testing.T) { func TestStorageIteratorTraversalValues(t *testing.T) { config := &Config{ - WriteBufferSize: 0, - NoAsyncFlush: true, + NoAsyncGeneration: true, } db := New(rawdb.NewMemoryDatabase(), config, false) - db.waitGeneration() wrapStorage := func(storage map[common.Hash][]byte) map[common.Hash]map[common.Hash][]byte { return map[common.Hash]map[common.Hash][]byte{ @@ -595,11 +587,9 @@ func TestAccountIteratorLargeTraversal(t *testing.T) { } // Build up a large stack of snapshots config := &Config{ - WriteBufferSize: 0, - NoAsyncFlush: true, + NoAsyncGeneration: true, } db := New(rawdb.NewMemoryDatabase(), config, false) - db.waitGeneration() for i := 1; i < 128; i++ { parent := types.EmptyRootHash @@ -635,10 +625,10 @@ func TestAccountIteratorLargeTraversal(t *testing.T) { // - continues iterating func TestAccountIteratorFlattening(t *testing.T) { config := &Config{ - WriteBufferSize: 10 * 1024, + WriteBufferSize: 10 * 1024, + NoAsyncGeneration: true, } db := New(rawdb.NewMemoryDatabase(), config, false) - db.waitGeneration() // Create a stack of diffs on top db.Update(common.HexToHash("0x02"), types.EmptyRootHash, 1, trienode.NewMergedNodeSet(), @@ -667,12 +657,24 @@ func TestAccountIteratorFlattening(t *testing.T) { } func TestAccountIteratorSeek(t *testing.T) { + t.Run("fast", func(t *testing.T) { + testAccountIteratorSeek(t, func(db *Database, root, seek common.Hash) AccountIterator { + it, _ := db.AccountIterator(root, seek) + return it + }) + }) + t.Run("binary", func(t *testing.T) { + testAccountIteratorSeek(t, func(db *Database, root, seek common.Hash) AccountIterator { + return db.tree.get(root).(*diffLayer).newBinaryAccountIterator(seek) + }) + }) +} + +func testAccountIteratorSeek(t *testing.T, newIterator func(db *Database, root, seek common.Hash) AccountIterator) { config := &Config{ - WriteBufferSize: 0, - NoAsyncFlush: true, + NoAsyncGeneration: true, } db := New(rawdb.NewMemoryDatabase(), config, false) - db.waitGeneration() db.Update(common.HexToHash("0x02"), types.EmptyRootHash, 1, trienode.NewMergedNodeSet(), NewStateSetWithOrigin(randomAccountSet("0xaa", "0xee", "0xff", "0xf0"), nil, nil, nil, false)) @@ -688,39 +690,39 @@ func TestAccountIteratorSeek(t *testing.T) { // 03: aa, bb, dd, ee, f0 (, f0), ff // 04: aa, bb, cc, dd, ee, f0 (, f0), ff (, ff) // Construct various iterators and ensure their traversal is correct - it, _ := db.AccountIterator(common.HexToHash("0x02"), common.HexToHash("0xdd")) + it := newIterator(db, common.HexToHash("0x02"), common.HexToHash("0xdd")) defer it.Release() verifyIterator(t, 3, it, verifyAccount) // expected: ee, f0, ff - it, _ = db.AccountIterator(common.HexToHash("0x02"), common.HexToHash("0xaa")) + it = newIterator(db, common.HexToHash("0x02"), common.HexToHash("0xaa")) defer it.Release() verifyIterator(t, 4, it, verifyAccount) // expected: aa, ee, f0, ff - it, _ = db.AccountIterator(common.HexToHash("0x02"), common.HexToHash("0xff")) + it = newIterator(db, common.HexToHash("0x02"), common.HexToHash("0xff")) defer it.Release() verifyIterator(t, 1, it, verifyAccount) // expected: ff - it, _ = db.AccountIterator(common.HexToHash("0x02"), common.HexToHash("0xff1")) + it = newIterator(db, common.HexToHash("0x02"), common.HexToHash("0xff1")) defer it.Release() verifyIterator(t, 0, it, verifyAccount) // expected: nothing - it, _ = db.AccountIterator(common.HexToHash("0x04"), common.HexToHash("0xbb")) + it = newIterator(db, common.HexToHash("0x04"), common.HexToHash("0xbb")) defer it.Release() verifyIterator(t, 6, it, verifyAccount) // expected: bb, cc, dd, ee, f0, ff - it, _ = db.AccountIterator(common.HexToHash("0x04"), common.HexToHash("0xef")) + it = newIterator(db, common.HexToHash("0x04"), common.HexToHash("0xef")) defer it.Release() verifyIterator(t, 2, it, verifyAccount) // expected: f0, ff - it, _ = db.AccountIterator(common.HexToHash("0x04"), common.HexToHash("0xf0")) + it = newIterator(db, common.HexToHash("0x04"), common.HexToHash("0xf0")) defer it.Release() verifyIterator(t, 2, it, verifyAccount) // expected: f0, ff - it, _ = db.AccountIterator(common.HexToHash("0x04"), common.HexToHash("0xff")) + it = newIterator(db, common.HexToHash("0x04"), common.HexToHash("0xff")) defer it.Release() verifyIterator(t, 1, it, verifyAccount) // expected: ff - it, _ = db.AccountIterator(common.HexToHash("0x04"), common.HexToHash("0xff1")) + it = newIterator(db, common.HexToHash("0x04"), common.HexToHash("0xff1")) defer it.Release() verifyIterator(t, 0, it, verifyAccount) // expected: nothing } @@ -741,11 +743,9 @@ func TestStorageIteratorSeek(t *testing.T) { func testStorageIteratorSeek(t *testing.T, newIterator func(db *Database, root, account, seek common.Hash) StorageIterator) { config := &Config{ - WriteBufferSize: 0, - NoAsyncFlush: true, + NoAsyncGeneration: true, } db := New(rawdb.NewMemoryDatabase(), config, false) - db.waitGeneration() // Stack three diff layers on top with various overlaps db.Update(common.HexToHash("0x02"), types.EmptyRootHash, 1, trienode.NewMergedNodeSet(), @@ -814,11 +814,9 @@ func TestAccountIteratorDeletions(t *testing.T) { func testAccountIteratorDeletions(t *testing.T, newIterator func(db *Database, root, seek common.Hash) AccountIterator) { config := &Config{ - WriteBufferSize: 0, - NoAsyncFlush: true, + NoAsyncGeneration: true, } db := New(rawdb.NewMemoryDatabase(), config, false) - db.waitGeneration() // Stack three diff layers on top with various overlaps db.Update(common.HexToHash("0x02"), types.EmptyRootHash, 1, trienode.NewMergedNodeSet(), @@ -855,11 +853,9 @@ func testAccountIteratorDeletions(t *testing.T, newIterator func(db *Database, r func TestStorageIteratorDeletions(t *testing.T) { config := &Config{ - WriteBufferSize: 0, - NoAsyncFlush: true, + NoAsyncGeneration: true, } db := New(rawdb.NewMemoryDatabase(), config, false) - db.waitGeneration() // Stack three diff layers on top with various overlaps db.Update(common.HexToHash("0x02"), types.EmptyRootHash, 1, trienode.NewMergedNodeSet(), @@ -924,11 +920,10 @@ func TestStaleIterator(t *testing.T) { func testStaleIterator(t *testing.T, newIter func(db *Database, hash common.Hash) Iterator) { config := &Config{ - WriteBufferSize: 16 * 1024 * 1024, - NoAsyncFlush: true, + WriteBufferSize: 16 * 1024 * 1024, + NoAsyncGeneration: true, } db := New(rawdb.NewMemoryDatabase(), config, false) - db.waitGeneration() // [02 (disk), 03] db.Update(common.HexToHash("0x02"), types.EmptyRootHash, 1, trienode.NewMergedNodeSet(), @@ -980,11 +975,9 @@ func BenchmarkAccountIteratorTraversal(b *testing.B) { return accounts } config := &Config{ - WriteBufferSize: 0, - NoAsyncFlush: true, + NoAsyncGeneration: true, } db := New(rawdb.NewMemoryDatabase(), config, false) - db.waitGeneration() for i := 1; i <= 100; i++ { parent := types.EmptyRootHash @@ -1076,11 +1069,9 @@ func BenchmarkAccountIteratorLargeBaselayer(b *testing.B) { return accounts } config := &Config{ - WriteBufferSize: 0, - NoAsyncFlush: true, + NoAsyncGeneration: true, } db := New(rawdb.NewMemoryDatabase(), config, false) - db.waitGeneration() db.Update(common.HexToHash("0x02"), types.EmptyRootHash, 1, trienode.NewMergedNodeSet(), NewStateSetWithOrigin(makeAccounts(2000), nil, nil, nil, false)) for i := 2; i <= 100; i++ { From 41f85203a763d469e6517163bf243a200e17c9f1 Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Thu, 19 Jun 2025 18:50:51 +0800 Subject: [PATCH 4/6] triedb/pathdb: fix comments --- triedb/pathdb/journal.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/triedb/pathdb/journal.go b/triedb/pathdb/journal.go index d4a6b3e262dc..e88b3e062fd5 100644 --- a/triedb/pathdb/journal.go +++ b/triedb/pathdb/journal.go @@ -301,9 +301,8 @@ func (db *Database) Journal(root common.Hash) error { } else { // disk layer only on noop runs (likely) or deep reorgs (unlikely) log.Info("Persisting dirty state to disk", "root", root, "layers", disk.buffer.layers) } - // Block until the background flushing is finished. It must - // be done before terminating the potential background snapshot - // generator. + // Block until the background flushing is finished and terminate + // the potential active state generator. if err := disk.terminate(); err != nil { return err } From 24f6b1f4e72fbd22a0685cd8aeb02cf3ee768359 Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Fri, 20 Jun 2025 14:20:41 +0800 Subject: [PATCH 5/6] core: fix broken tests --- core/genesis_test.go | 13 +++++++++++-- core/state/sync_test.go | 4 +++- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/core/genesis_test.go b/core/genesis_test.go index 9dc4bfb3df6d..a41dfce5783e 100644 --- a/core/genesis_test.go +++ b/core/genesis_test.go @@ -256,7 +256,9 @@ func newDbConfig(scheme string) *triedb.Config { if scheme == rawdb.HashScheme { return triedb.HashDefaults } - return &triedb.Config{PathDB: pathdb.Defaults} + config := *pathdb.Defaults + config.NoAsyncFlush = true + return &triedb.Config{PathDB: &config} } func TestVerkleGenesisCommit(t *testing.T) { @@ -313,7 +315,14 @@ func TestVerkleGenesisCommit(t *testing.T) { } db := rawdb.NewMemoryDatabase() - triedb := triedb.NewDatabase(db, triedb.VerkleDefaults) + + config := *pathdb.Defaults + config.NoAsyncFlush = true + + triedb := triedb.NewDatabase(db, &triedb.Config{ + IsVerkle: true, + PathDB: &config, + }) block := genesis.MustCommit(db, triedb) if !bytes.Equal(block.Root().Bytes(), expected) { t.Fatalf("invalid genesis state root, expected %x, got %x", expected, block.Root()) diff --git a/core/state/sync_test.go b/core/state/sync_test.go index 5c8b5a90f763..cae0e0a936bc 100644 --- a/core/state/sync_test.go +++ b/core/state/sync_test.go @@ -46,7 +46,9 @@ func makeTestState(scheme string) (ethdb.Database, Database, *triedb.Database, c // Create an empty state config := &triedb.Config{Preimages: true} if scheme == rawdb.PathScheme { - config.PathDB = pathdb.Defaults + pconfig := *pathdb.Defaults + pconfig.NoAsyncFlush = true + config.PathDB = &pconfig } else { config.HashDB = hashdb.Defaults } From e4f7a372d6340dfc751d4faab30e7dabf7b5c249 Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Fri, 20 Jun 2025 14:31:49 +0800 Subject: [PATCH 6/6] core: fix broken test --- core/blockchain.go | 16 ++++++++++++---- core/blockchain_snapshot_test.go | 4 ++-- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 25b37b5cce36..2b48d9179492 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -152,9 +152,10 @@ const ( // BlockChainConfig contains the configuration of the BlockChain object. type BlockChainConfig struct { // Trie database related options - TrieCleanLimit int // Memory allowance (MB) to use for caching trie nodes in memory - TrieDirtyLimit int // Memory limit (MB) at which to start flushing dirty trie nodes to disk - TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk + TrieCleanLimit int // Memory allowance (MB) to use for caching trie nodes in memory + TrieDirtyLimit int // Memory limit (MB) at which to start flushing dirty trie nodes to disk + TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk + TrieNoAsyncFlush bool // Whether the asynchronous buffer flushing is disallowed Preimages bool // Whether to store preimage of trie key to the disk StateHistory uint64 // Number of blocks from head whose state histories are reserved. @@ -200,7 +201,7 @@ func DefaultConfig() *BlockChainConfig { } } -// WithArchive enabled/disables archive mode on the config. +// WithArchive enables/disables archive mode on the config. func (cfg BlockChainConfig) WithArchive(on bool) *BlockChainConfig { cfg.ArchiveMode = on return &cfg @@ -212,6 +213,12 @@ func (cfg BlockChainConfig) WithStateScheme(scheme string) *BlockChainConfig { return &cfg } +// WithNoAsyncFlush enables/disables asynchronous buffer flushing mode on the config. +func (cfg BlockChainConfig) WithNoAsyncFlush(on bool) *BlockChainConfig { + cfg.TrieNoAsyncFlush = on + return &cfg +} + // triedbConfig derives the configures for trie database. func (cfg *BlockChainConfig) triedbConfig(isVerkle bool) *triedb.Config { config := &triedb.Config{ @@ -233,6 +240,7 @@ func (cfg *BlockChainConfig) triedbConfig(isVerkle bool) *triedb.Config { // for flushing both trie data and state data to disk. The config name // should be updated to eliminate the confusion. WriteBufferSize: cfg.TrieDirtyLimit * 1024 * 1024, + NoAsyncFlush: cfg.TrieNoAsyncFlush, } } return config diff --git a/core/blockchain_snapshot_test.go b/core/blockchain_snapshot_test.go index 5550907b0dd3..ae9398b97d90 100644 --- a/core/blockchain_snapshot_test.go +++ b/core/blockchain_snapshot_test.go @@ -81,7 +81,7 @@ func (basic *snapshotTestBasic) prepare(t *testing.T) (*BlockChain, []*types.Blo } engine = ethash.NewFullFaker() ) - chain, err := NewBlockChain(db, gspec, engine, DefaultConfig().WithStateScheme(basic.scheme)) + chain, err := NewBlockChain(db, gspec, engine, DefaultConfig().WithStateScheme(basic.scheme).WithNoAsyncFlush(true)) if err != nil { t.Fatalf("Failed to create chain: %v", err) } @@ -572,7 +572,7 @@ func TestHighCommitCrashWithNewSnapshot(t *testing.T) { // // Expected head header : C8 // Expected head fast block: C8 - // Expected head block : G (Hash mode), C6 (Hash mode) + // Expected head block : G (Hash mode), C6 (Path mode) // Expected snapshot disk : C4 (Hash mode) for _, scheme := range []string{rawdb.HashScheme, rawdb.PathScheme} { expHead := uint64(0)