Skip to content

Commit 37ec06a

Browse files
committed
core, eth, trie: write nodebuffer asynchronously to disk
1 parent e91cdb4 commit 37ec06a

File tree

13 files changed

+367
-46
lines changed

13 files changed

+367
-46
lines changed

core/blockchain.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1023,7 +1023,7 @@ func (bc *BlockChain) Stop() {
10231023
for !bc.triegc.Empty() {
10241024
triedb.Dereference(bc.triegc.PopItem())
10251025
}
1026-
if _, nodes, _ := triedb.Size(); nodes != 0 { // all memory is contained within the nodes return for hashdb
1026+
if _, nodes, _, _ := triedb.Size(); nodes != 0 { // all memory is contained within the nodes return for hashdb
10271027
log.Error("Dangling trie nodes after full cleanup")
10281028
}
10291029
}
@@ -1431,8 +1431,9 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
14311431
}
14321432
// If we exceeded our memory allowance, flush matured singleton nodes to disk
14331433
var (
1434-
_, nodes, imgs = bc.triedb.Size() // all memory is contained within the nodes return for hashdb
1435-
limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024
1434+
_, nodesMutable, nodesImmutable, imgs = bc.triedb.Size() // all memory is contained within the nodes return for hashdb
1435+
limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024
1436+
nodes = nodesMutable + nodesImmutable
14361437
)
14371438
if nodes > limit || imgs > 4*1024*1024 {
14381439
bc.triedb.Cap(limit - ethdb.IdealBatchSize)
@@ -1872,8 +1873,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
18721873
if bc.snaps != nil {
18731874
snapDiffItems, snapBufItems = bc.snaps.Size()
18741875
}
1875-
trieDiffNodes, trieBufNodes, _ := bc.triedb.Size()
1876-
stats.report(chain, it.index, snapDiffItems, snapBufItems, trieDiffNodes, trieBufNodes, setHead)
1876+
trieDiffNodes, trieBufNodes, trieBufNodesImmutable, _ := bc.triedb.Size()
1877+
stats.report(chain, it.index, snapDiffItems, snapBufItems, trieDiffNodes, trieBufNodes, trieBufNodesImmutable, setHead)
18771878

18781879
if !setHead {
18791880
// After merge we expect few side chains. Simply count

core/blockchain_insert.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ const statsReportLimit = 8 * time.Second
3939

4040
// report prints statistics if some number of blocks have been processed
4141
// or more than a few seconds have passed since the last message.
42-
func (st *insertStats) report(chain []*types.Block, index int, snapDiffItems, snapBufItems, trieDiffNodes, triebufNodes common.StorageSize, setHead bool) {
42+
func (st *insertStats) report(chain []*types.Block, index int, snapDiffItems, snapBufItems, trieDiffNodes, triebufNodes, trieBufNodesImmutable common.StorageSize, setHead bool) {
4343
// Fetch the timings for the batch
4444
var (
4545
now = mclock.Now()
@@ -71,6 +71,7 @@ func (st *insertStats) report(chain []*types.Block, index int, snapDiffItems, sn
7171
}
7272
if trieDiffNodes != 0 { // pathdb
7373
context = append(context, []interface{}{"triediffs", trieDiffNodes}...)
74+
context = append(context, []interface{}{"triedirtyimmutable", trieBufNodesImmutable}...)
7475
}
7576
context = append(context, []interface{}{"triedirty", triebufNodes}...)
7677

core/blockchain_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1844,7 +1844,7 @@ func TestTrieForkGC(t *testing.T) {
18441844
chain.TrieDB().Dereference(blocks[len(blocks)-1-i].Root())
18451845
chain.TrieDB().Dereference(forks[len(blocks)-1-i].Root())
18461846
}
1847-
if _, nodes, _ := chain.TrieDB().Size(); nodes > 0 { // all memory is returned in the nodes return for hashdb
1847+
if _, nodes, _, _ := chain.TrieDB().Size(); nodes > 0 { // all memory is returned in the nodes return for hashdb
18481848
t.Fatalf("stale tries still alive after garbase collection")
18491849
}
18501850
}

eth/state_accessor.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,8 +168,8 @@ func (eth *Ethereum) hashState(ctx context.Context, block *types.Block, reexec u
168168
parent = root
169169
}
170170
if report {
171-
_, nodes, imgs := triedb.Size() // all memory is contained within the nodes return in hashdb
172-
log.Info("Historical state regenerated", "block", current.NumberU64(), "elapsed", time.Since(start), "nodes", nodes, "preimages", imgs)
171+
_, nodes, nodeImmutable, imgs := triedb.Size() // all memory is contained within the nodes return in hashdb
172+
log.Info("Historical state regenerated", "block", current.NumberU64(), "elapsed", time.Since(start), "nodes", nodes, "nodesimmutable", nodeImmutable, "preimages", imgs)
173173
}
174174
return statedb, func() { triedb.Dereference(block.Root()) }, nil
175175
}

eth/tracers/api.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -368,8 +368,8 @@ func (api *API) traceChain(start, end *types.Block, config *TraceConfig, closed
368368
// if the relevant state is available in disk.
369369
var preferDisk bool
370370
if statedb != nil {
371-
s1, s2, s3 := statedb.Database().TrieDB().Size()
372-
preferDisk = s1+s2+s3 > defaultTracechainMemLimit
371+
s1, s2, s3, s4 := statedb.Database().TrieDB().Size()
372+
preferDisk = s1+s2+s3+s4 > defaultTracechainMemLimit
373373
}
374374
statedb, release, err = api.backend.StateAtBlock(ctx, block, reexec, statedb, false, preferDisk)
375375
if err != nil {

trie/database.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ type backend interface {
5757
//
5858
// For hash scheme, there is no differentiation between diff layer nodes
5959
// and dirty disk layer nodes, so both are merged into the second return.
60-
Size() (common.StorageSize, common.StorageSize)
60+
Size() (common.StorageSize, common.StorageSize, common.StorageSize)
6161

6262
// Update performs a state transition by committing dirty nodes contained
6363
// in the given set in order to update state from the specified parent to
@@ -151,16 +151,16 @@ func (db *Database) Commit(root common.Hash, report bool) error {
151151
// Size returns the storage size of diff layer nodes above the persistent disk
152152
// layer, the dirty nodes buffered within the disk layer, and the size of cached
153153
// preimages.
154-
func (db *Database) Size() (common.StorageSize, common.StorageSize, common.StorageSize) {
154+
func (db *Database) Size() (common.StorageSize, common.StorageSize, common.StorageSize, common.StorageSize) {
155155
var (
156-
diffs, nodes common.StorageSize
157-
preimages common.StorageSize
156+
diffs, nodes, nodesImmutable common.StorageSize
157+
preimages common.StorageSize
158158
)
159-
diffs, nodes = db.backend.Size()
159+
diffs, nodes, nodesImmutable = db.backend.Size()
160160
if db.preimages != nil {
161161
preimages = db.preimages.size()
162162
}
163-
return diffs, nodes, preimages
163+
return diffs, nodes, nodesImmutable, preimages
164164
}
165165

166166
// Initialized returns an indicator if the state data is already initialized

trie/triedb/hashdb/database.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -627,15 +627,15 @@ func (db *Database) Update(root common.Hash, parent common.Hash, block uint64, n
627627
//
628628
// The first return will always be 0, representing the memory stored in unbounded
629629
// diff layers above the dirty cache. This is only available in pathdb.
630-
func (db *Database) Size() (common.StorageSize, common.StorageSize) {
630+
func (db *Database) Size() (common.StorageSize, common.StorageSize, common.StorageSize) {
631631
db.lock.RLock()
632632
defer db.lock.RUnlock()
633633

634634
// db.dirtiesSize only contains the useful data in the cache, but when reporting
635635
// the total memory consumption, the maintenance metadata is also needed to be
636636
// counted.
637637
var metadataSize = common.StorageSize(len(db.dirties) * cachedNodeSize)
638-
return 0, db.dirtiesSize + db.childrenSize + metadataSize
638+
return 0, db.dirtiesSize + db.childrenSize + metadataSize, 0
639639
}
640640

641641
// Close closes the trie database and releases all held resources.

trie/triedb/pathdb/database.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ const (
5252
// Do not increase the buffer size arbitrarily, otherwise the system
5353
// pause time will increase when the database writes happen.
5454
DefaultBufferSize = 64 * 1024 * 1024
55+
56+
// DefaultBackgroundFlushInterval defines the default the wait interval
57+
// that background node cache flush disk.
58+
DefaultBackgroundFlushInterval = 3
5559
)
5660

5761
// layer is the interface implemented by all state layers which includes some
@@ -303,7 +307,7 @@ func (db *Database) Enable(root common.Hash) error {
303307
}
304308
// Re-construct a new disk layer backed by persistent state
305309
// with **empty clean cache and node buffer**.
306-
db.tree.reset(newDiskLayer(root, 0, db, nil, newNodeBuffer(db.bufferSize, nil, 0)))
310+
db.tree.reset(newDiskLayer(root, 0, db, nil, newAsyncNodeBuffer(db.bufferSize, nil, 0)))
307311

308312
// Re-enable the database as the final step.
309313
db.waitSync = false
@@ -410,16 +414,16 @@ func (db *Database) Close() error {
410414

411415
// Size returns the current storage size of the memory cache in front of the
412416
// persistent database layer.
413-
func (db *Database) Size() (diffs common.StorageSize, nodes common.StorageSize) {
417+
func (db *Database) Size() (diffs common.StorageSize, nodes common.StorageSize, nodesImmutable common.StorageSize) {
414418
db.tree.forEach(func(layer layer) {
415419
if diff, ok := layer.(*diffLayer); ok {
416420
diffs += common.StorageSize(diff.memory)
417421
}
418422
if disk, ok := layer.(*diskLayer); ok {
419-
nodes += disk.size()
423+
nodes, nodesImmutable = disk.size()
420424
}
421425
})
422-
return diffs, nodes
426+
return diffs, nodes, nodesImmutable
423427
}
424428

425429
// Initialized returns an indicator if the state data is already

trie/triedb/pathdb/difflayer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import (
2929
func emptyLayer() *diskLayer {
3030
return &diskLayer{
3131
db: New(rawdb.NewMemoryDatabase(), nil),
32-
buffer: newNodeBuffer(DefaultBufferSize, nil, 0),
32+
buffer: newAsyncNodeBuffer(DefaultBufferSize, nil, 0),
3333
}
3434
}
3535

trie/triedb/pathdb/disklayer.go

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,25 +25,68 @@ import (
2525
"github.com/ethereum/go-ethereum/common"
2626
"github.com/ethereum/go-ethereum/core/rawdb"
2727
"github.com/ethereum/go-ethereum/crypto"
28+
"github.com/ethereum/go-ethereum/ethdb"
2829
"github.com/ethereum/go-ethereum/log"
2930
"github.com/ethereum/go-ethereum/trie/trienode"
3031
"github.com/ethereum/go-ethereum/trie/triestate"
3132
"golang.org/x/crypto/sha3"
3233
)
3334

35+
// trienodebuffer is a collection of modified trie nodes to aggregate the disk
36+
// write. The content of the trienodebuffer must be checked before diving into
37+
// disk (since it basically is not-yet-written data).
38+
type trienodebuffer interface {
39+
// node retrieves the trie node with given node info.
40+
node(owner common.Hash, path []byte, hash common.Hash) (*trienode.Node, error)
41+
42+
// commit merges the dirty nodes into the trienodebuffer. This operation won't take
43+
// the ownership of the nodes map which belongs to the bottom-most diff layer.
44+
// It will just hold the node references from the given map which are safe to
45+
// copy.
46+
commit(nodes map[common.Hash]map[string]*trienode.Node) trienodebuffer
47+
48+
// revert is the reverse operation of commit. It also merges the provided nodes
49+
// into the trienodebuffer, the difference is that the provided node set should
50+
// revert the changes made by the last state transition.
51+
revert(db ethdb.KeyValueReader, nodes map[common.Hash]map[string]*trienode.Node) error
52+
53+
// flush persists the in-memory dirty trie node into the disk if the configured
54+
// memory threshold is reached. Note, all data must be written atomically.
55+
flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64, force bool) error
56+
57+
// setSize sets the buffer size to the provided number, and invokes a flush
58+
// operation if the current memory usage exceeds the new limit.
59+
setSize(size int, db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error
60+
61+
// reset cleans up the disk cache.
62+
reset()
63+
64+
// empty returns an indicator if trienodebuffer contains any state transition inside.
65+
empty() bool
66+
67+
// getSize return the trienodebuffer used size.
68+
getSize() (uint64, uint64)
69+
70+
// getAllNodes return all the trie nodes are cached in trienodebuffer.
71+
getAllNodes() map[common.Hash]map[string]*trienode.Node
72+
73+
// getLayers return the size of cached difflayers.
74+
getLayers() uint64
75+
}
76+
3477
// diskLayer is a low level persistent layer built on top of a key-value store.
3578
type diskLayer struct {
3679
root common.Hash // Immutable, root hash to which this layer was made for
3780
id uint64 // Immutable, corresponding state id
3881
db *Database // Path-based trie database
3982
cleans *fastcache.Cache // GC friendly memory cache of clean node RLPs
40-
buffer *nodebuffer // Node buffer to aggregate writes
83+
buffer trienodebuffer // Node buffer to aggregate writes
4184
stale bool // Signals that the layer became stale (state progressed)
4285
lock sync.RWMutex // Lock used to protect stale flag
4386
}
4487

4588
// newDiskLayer creates a new disk layer based on the passing arguments.
46-
func newDiskLayer(root common.Hash, id uint64, db *Database, cleans *fastcache.Cache, buffer *nodebuffer) *diskLayer {
89+
func newDiskLayer(root common.Hash, id uint64, db *Database, cleans *fastcache.Cache, buffer trienodebuffer) *diskLayer {
4790
// Initialize a clean cache if the memory allowance is not zero
4891
// or reuse the provided cache if it is not nil (inherited from
4992
// the original disk layer).
@@ -294,14 +337,15 @@ func (dl *diskLayer) setBufferSize(size int) error {
294337
}
295338

296339
// size returns the approximate size of cached nodes in the disk layer.
297-
func (dl *diskLayer) size() common.StorageSize {
340+
func (dl *diskLayer) size() (common.StorageSize, common.StorageSize) {
298341
dl.lock.RLock()
299342
defer dl.lock.RUnlock()
300343

301344
if dl.stale {
302-
return 0
345+
return 0, 0
303346
}
304-
return common.StorageSize(dl.buffer.size)
347+
nodeBuf, nodeImmutableBuf := dl.buffer.getSize()
348+
return common.StorageSize(nodeBuf), common.StorageSize(nodeImmutableBuf)
305349
}
306350

307351
// resetCache releases the memory held by clean cache to prevent memory leak.

0 commit comments

Comments
 (0)