Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,10 @@ const (
// BlockChainConfig contains the configuration of the BlockChain object.
type BlockChainConfig struct {
// Trie database related options
TrieCleanLimit int // Memory allowance (MB) to use for caching trie nodes in memory
TrieDirtyLimit int // Memory limit (MB) at which to start flushing dirty trie nodes to disk
TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk
TrieCleanLimit int // Memory allowance (MB) to use for caching trie nodes in memory
TrieDirtyLimit int // Memory limit (MB) at which to start flushing dirty trie nodes to disk
TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk
TrieNoAsyncFlush bool // Whether the asynchronous buffer flushing is disallowed

Preimages bool // Whether to store preimage of trie key to the disk
StateHistory uint64 // Number of blocks from head whose state histories are reserved.
Expand Down Expand Up @@ -200,7 +201,7 @@ func DefaultConfig() *BlockChainConfig {
}
}

// WithArchive enabled/disables archive mode on the config.
// WithArchive enables/disables archive mode on the config.
func (cfg BlockChainConfig) WithArchive(on bool) *BlockChainConfig {
cfg.ArchiveMode = on
return &cfg
Expand All @@ -212,6 +213,12 @@ func (cfg BlockChainConfig) WithStateScheme(scheme string) *BlockChainConfig {
return &cfg
}

// WithNoAsyncFlush enables/disables asynchronous buffer flushing mode on the config.
func (cfg BlockChainConfig) WithNoAsyncFlush(on bool) *BlockChainConfig {
cfg.TrieNoAsyncFlush = on
return &cfg
}

// triedbConfig derives the configures for trie database.
func (cfg *BlockChainConfig) triedbConfig(isVerkle bool) *triedb.Config {
config := &triedb.Config{
Expand All @@ -233,6 +240,7 @@ func (cfg *BlockChainConfig) triedbConfig(isVerkle bool) *triedb.Config {
// for flushing both trie data and state data to disk. The config name
// should be updated to eliminate the confusion.
WriteBufferSize: cfg.TrieDirtyLimit * 1024 * 1024,
NoAsyncFlush: cfg.TrieNoAsyncFlush,
}
}
return config
Expand Down
4 changes: 2 additions & 2 deletions core/blockchain_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (basic *snapshotTestBasic) prepare(t *testing.T) (*BlockChain, []*types.Blo
}
engine = ethash.NewFullFaker()
)
chain, err := NewBlockChain(db, gspec, engine, DefaultConfig().WithStateScheme(basic.scheme))
chain, err := NewBlockChain(db, gspec, engine, DefaultConfig().WithStateScheme(basic.scheme).WithNoAsyncFlush(true))
if err != nil {
t.Fatalf("Failed to create chain: %v", err)
}
Expand Down Expand Up @@ -572,7 +572,7 @@ func TestHighCommitCrashWithNewSnapshot(t *testing.T) {
//
// Expected head header : C8
// Expected head fast block: C8
// Expected head block : G (Hash mode), C6 (Hash mode)
// Expected head block : G (Hash mode), C6 (Path mode)
// Expected snapshot disk : C4 (Hash mode)
for _, scheme := range []string{rawdb.HashScheme, rawdb.PathScheme} {
expHead := uint64(0)
Expand Down
13 changes: 11 additions & 2 deletions core/genesis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,9 @@ func newDbConfig(scheme string) *triedb.Config {
if scheme == rawdb.HashScheme {
return triedb.HashDefaults
}
return &triedb.Config{PathDB: pathdb.Defaults}
config := *pathdb.Defaults
config.NoAsyncFlush = true
return &triedb.Config{PathDB: &config}
}

func TestVerkleGenesisCommit(t *testing.T) {
Expand Down Expand Up @@ -313,7 +315,14 @@ func TestVerkleGenesisCommit(t *testing.T) {
}

db := rawdb.NewMemoryDatabase()
triedb := triedb.NewDatabase(db, triedb.VerkleDefaults)

config := *pathdb.Defaults
config.NoAsyncFlush = true

triedb := triedb.NewDatabase(db, &triedb.Config{
IsVerkle: true,
PathDB: &config,
})
block := genesis.MustCommit(db, triedb)
if !bytes.Equal(block.Root().Bytes(), expected) {
t.Fatalf("invalid genesis state root, expected %x, got %x", expected, block.Root())
Expand Down
1 change: 1 addition & 0 deletions core/state/snapshot/generate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func newHelper(scheme string) *testHelper {
if scheme == rawdb.PathScheme {
config.PathDB = &pathdb.Config{
SnapshotNoBuild: true,
NoAsyncFlush: true,
} // disable caching
} else {
config.HashDB = &hashdb.Config{} // disable caching
Expand Down
20 changes: 14 additions & 6 deletions core/state/statedb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -982,6 +982,7 @@ func testMissingTrieNodes(t *testing.T, scheme string) {
TrieCleanSize: 0,
StateCleanSize: 0,
WriteBufferSize: 0,
NoAsyncFlush: true,
}}) // disable caching
} else {
tdb = triedb.NewDatabase(memDb, &triedb.Config{HashDB: &hashdb.Config{
Expand All @@ -1004,18 +1005,25 @@ func testMissingTrieNodes(t *testing.T, scheme string) {
// force-flush
tdb.Commit(root, false)
}
// Create a new state on the old root
state, _ = New(root, db)
// Now we clear out the memdb
it := memDb.NewIterator(nil, nil)
for it.Next() {
k := it.Key()
// Leave the root intact
if !bytes.Equal(k, root[:]) {
t.Logf("key: %x", k)
memDb.Delete(k)
if scheme == rawdb.HashScheme {
if !bytes.Equal(k, root[:]) {
t.Logf("key: %x", k)
memDb.Delete(k)
}
}
if scheme == rawdb.PathScheme {
rk := k[len(rawdb.TrieNodeAccountPrefix):]
if len(rk) != 0 {
t.Logf("key: %x", k)
memDb.Delete(k)
}
}
}
state, _ = New(root, db)
balance := state.GetBalance(addr)
// The removed elem should lead to it returning zero balance
if exp, got := uint64(0), balance.Uint64(); got != exp {
Expand Down
4 changes: 3 additions & 1 deletion core/state/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ func makeTestState(scheme string) (ethdb.Database, Database, *triedb.Database, c
// Create an empty state
config := &triedb.Config{Preimages: true}
if scheme == rawdb.PathScheme {
config.PathDB = pathdb.Defaults
pconfig := *pathdb.Defaults
pconfig.NoAsyncFlush = true
config.PathDB = &pconfig
} else {
config.HashDB = hashdb.Defaults
}
Expand Down
117 changes: 80 additions & 37 deletions triedb/pathdb/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package pathdb

import (
"errors"
"fmt"
"time"

Expand All @@ -37,6 +38,13 @@ type buffer struct {
limit uint64 // The maximum memory allowance in bytes
nodes *nodeSet // Aggregated trie node set
states *stateSet // Aggregated state set

// done is the notifier whether the content in buffer has been flushed or not.
// This channel is nil if the buffer is not frozen.
done chan struct{}

// flushErr memorizes the error if any exception occurs during flushing
flushErr error
}

// newBuffer initializes the buffer with the provided states and trie nodes.
Expand All @@ -61,7 +69,7 @@ func (b *buffer) account(hash common.Hash) ([]byte, bool) {
return b.states.account(hash)
}

// storage retrieves the storage slot with account address hash and slot key.
// storage retrieves the storage slot with account address hash and slot key hash.
func (b *buffer) storage(addrHash common.Hash, storageHash common.Hash) ([]byte, bool) {
return b.states.storage(addrHash, storageHash)
}
Expand Down Expand Up @@ -124,43 +132,78 @@ func (b *buffer) size() uint64 {

// flush persists the in-memory dirty trie node into the disk if the configured
// memory threshold is reached. Note, all data must be written atomically.
func (b *buffer) flush(root common.Hash, db ethdb.KeyValueStore, freezer ethdb.AncientWriter, progress []byte, nodesCache, statesCache *fastcache.Cache, id uint64) error {
// Ensure the target state id is aligned with the internal counter.
head := rawdb.ReadPersistentStateID(db)
if head+b.layers != id {
return fmt.Errorf("buffer layers (%d) cannot be applied on top of persisted state id (%d) to reach requested state id (%d)", b.layers, head, id)
func (b *buffer) flush(root common.Hash, db ethdb.KeyValueStore, freezer ethdb.AncientWriter, progress []byte, nodesCache, statesCache *fastcache.Cache, id uint64, postFlush func()) {
if b.done != nil {
panic("duplicated flush operation")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can never happen, even if the flushing takes a long time to do, right? Because we are always rotating out the buffers

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly. The buffer is supposed to be flushed for one time.

}
// Terminate the state snapshot generation if it's active
var (
start = time.Now()
batch = db.NewBatchWithSize((b.nodes.dbsize() + b.states.dbsize()) * 11 / 10) // extra 10% for potential pebble internal stuff
)
// Explicitly sync the state freezer to ensure all written data is persisted to disk
// before updating the key-value store.
//
// This step is crucial to guarantee that the corresponding state history remains
// available for state rollback.
if freezer != nil {
if err := freezer.SyncAncient(); err != nil {
return err
b.done = make(chan struct{}) // allocate the channel for notification

// Schedule the background thread to construct the batch, which usually
// take a few seconds.
go func() {
defer func() {
if postFlush != nil {
postFlush()
}
close(b.done)
}()

// Ensure the target state id is aligned with the internal counter.
head := rawdb.ReadPersistentStateID(db)
if head+b.layers != id {
b.flushErr = fmt.Errorf("buffer layers (%d) cannot be applied on top of persisted state id (%d) to reach requested state id (%d)", b.layers, head, id)
return
}

// Terminate the state snapshot generation if it's active
var (
start = time.Now()
batch = db.NewBatchWithSize((b.nodes.dbsize() + b.states.dbsize()) * 11 / 10) // extra 10% for potential pebble internal stuff
)
// Explicitly sync the state freezer to ensure all written data is persisted to disk
// before updating the key-value store.
//
// This step is crucial to guarantee that the corresponding state history remains
// available for state rollback.
if freezer != nil {
if err := freezer.SyncAncient(); err != nil {
b.flushErr = err
return
}
}
nodes := b.nodes.write(batch, nodesCache)
accounts, slots := b.states.write(batch, progress, statesCache)
rawdb.WritePersistentStateID(batch, id)
rawdb.WriteSnapshotRoot(batch, root)

// Flush all mutations in a single batch
size := batch.ValueSize()
if err := batch.Write(); err != nil {
b.flushErr = err
return
}
commitBytesMeter.Mark(int64(size))
commitNodesMeter.Mark(int64(nodes))
commitAccountsMeter.Mark(int64(accounts))
commitStoragesMeter.Mark(int64(slots))
commitTimeTimer.UpdateSince(start)

// The content in the frozen buffer is kept for consequent state access,
// TODO (rjl493456442) measure the gc overhead for holding this struct.
// TODO (rjl493456442) can we somehow get rid of it after flushing??
// TODO (rjl493456442) buffer itself is not thread-safe, add the lock
// protection if try to reset the buffer here.
// b.reset()
log.Debug("Persisted buffer content", "nodes", nodes, "accounts", accounts, "slots", slots, "bytes", common.StorageSize(size), "elapsed", common.PrettyDuration(time.Since(start)))
}()
}

// waitFlush blocks until the buffer has been fully flushed and returns any
// stored errors that occurred during the process.
func (b *buffer) waitFlush() error {
if b.done == nil {
return errors.New("the buffer is not frozen")
}
nodes := b.nodes.write(batch, nodesCache)
accounts, slots := b.states.write(batch, progress, statesCache)
rawdb.WritePersistentStateID(batch, id)
rawdb.WriteSnapshotRoot(batch, root)

// Flush all mutations in a single batch
size := batch.ValueSize()
if err := batch.Write(); err != nil {
return err
}
commitBytesMeter.Mark(int64(size))
commitNodesMeter.Mark(int64(nodes))
commitAccountsMeter.Mark(int64(accounts))
commitStoragesMeter.Mark(int64(slots))
commitTimeTimer.UpdateSince(start)
b.reset()
log.Debug("Persisted buffer content", "nodes", nodes, "accounts", accounts, "slots", slots, "bytes", common.StorageSize(size), "elapsed", common.PrettyDuration(time.Since(start)))
return nil
<-b.done
return b.flushErr
}
42 changes: 22 additions & 20 deletions triedb/pathdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,11 @@ type Config struct {
StateCleanSize int // Maximum memory allowance (in bytes) for caching clean state data
WriteBufferSize int // Maximum memory allowance (in bytes) for write buffer
ReadOnly bool // Flag whether the database is opened in read only mode
SnapshotNoBuild bool // Flag Whether the background generation is allowed

// Testing configurations
SnapshotNoBuild bool // Flag Whether the state generation is allowed
NoAsyncFlush bool // Flag whether the background buffer flushing is allowed
NoAsyncGeneration bool // Flag whether the background generation is allowed
}

// sanitize checks the provided user configurations and changes anything that's
Expand Down Expand Up @@ -366,6 +370,12 @@ func (db *Database) setStateGenerator() error {
}
stats.log("Starting snapshot generation", root, generator.Marker)
dl.generator.run(root)

// Block until the generation completes. It's the feature used in
// unit tests.
if db.config.NoAsyncGeneration {
<-dl.generator.done
}
return nil
}

Expand Down Expand Up @@ -434,8 +444,8 @@ func (db *Database) Disable() error {
// Terminate the state generator if it's active and mark the disk layer
// as stale to prevent access to persistent state.
disk := db.tree.bottom()
if disk.generator != nil {
disk.generator.stop()
if err := disk.terminate(); err != nil {
return err
}
disk.markStale()

Expand Down Expand Up @@ -592,12 +602,14 @@ func (db *Database) Close() error {
// following mutations.
db.readOnly = true

// Terminate the background generation if it's active
disk := db.tree.bottom()
if disk.generator != nil {
disk.generator.stop()
// Block until the background flushing is finished. It must
// be done before terminating the potential background snapshot
// generator.
dl := db.tree.bottom()
if err := dl.terminate(); err != nil {
return err
}
disk.resetCache() // release the memory held by clean cache
dl.resetCache() // release the memory held by clean cache

// Close the attached state history freezer.
if db.freezer == nil {
Expand Down Expand Up @@ -662,16 +674,6 @@ func (db *Database) HistoryRange() (uint64, uint64, error) {
return historyRange(db.freezer)
}

// waitGeneration waits until the background generation is finished. It assumes
// that the generation is permitted; otherwise, it will block indefinitely.
func (db *Database) waitGeneration() {
gen := db.tree.bottom().generator
if gen == nil || gen.completed() {
return
}
<-gen.done
}

// AccountIterator creates a new account iterator for the specified root hash and
// seeks to a starting account hash.
func (db *Database) AccountIterator(root common.Hash, seek common.Hash) (AccountIterator, error) {
Expand All @@ -681,7 +683,7 @@ func (db *Database) AccountIterator(root common.Hash, seek common.Hash) (Account
if wait {
return nil, errDatabaseWaitSync
}
if gen := db.tree.bottom().generator; gen != nil && !gen.completed() {
if !db.tree.bottom().genComplete() {
return nil, errNotConstructed
}
return newFastAccountIterator(db, root, seek)
Expand All @@ -696,7 +698,7 @@ func (db *Database) StorageIterator(root common.Hash, account common.Hash, seek
if wait {
return nil, errDatabaseWaitSync
}
if gen := db.tree.bottom().generator; gen != nil && !gen.completed() {
if !db.tree.bottom().genComplete() {
return nil, errNotConstructed
}
return newFastStorageIterator(db, root, account, seek)
Expand Down
1 change: 1 addition & 0 deletions triedb/pathdb/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func newTester(t *testing.T, historyLimit uint64, isVerkle bool, layers int) *te
TrieCleanSize: 256 * 1024,
StateCleanSize: 256 * 1024,
WriteBufferSize: 256 * 1024,
NoAsyncFlush: true,
}, isVerkle)

obj = &tester{
Expand Down
Loading