Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1023,7 +1023,7 @@ func (bc *BlockChain) Stop() {
for !bc.triegc.Empty() {
triedb.Dereference(bc.triegc.PopItem())
}
if _, nodes, _ := triedb.Size(); nodes != 0 { // all memory is contained within the nodes return for hashdb
if _, nodes, _, _ := triedb.Size(); nodes != 0 { // all memory is contained within the nodes return for hashdb
log.Error("Dangling trie nodes after full cleanup")
}
}
Expand Down Expand Up @@ -1431,8 +1431,9 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
}
// If we exceeded our memory allowance, flush matured singleton nodes to disk
var (
_, nodes, imgs = bc.triedb.Size() // all memory is contained within the nodes return for hashdb
limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024
_, nodesMutable, nodesImmutable, imgs = bc.triedb.Size() // all memory is contained within the nodes return for hashdb
limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024
nodes = nodesMutable + nodesImmutable
)
if nodes > limit || imgs > 4*1024*1024 {
bc.triedb.Cap(limit - ethdb.IdealBatchSize)
Expand Down Expand Up @@ -1872,8 +1873,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
if bc.snaps != nil {
snapDiffItems, snapBufItems = bc.snaps.Size()
}
trieDiffNodes, trieBufNodes, _ := bc.triedb.Size()
stats.report(chain, it.index, snapDiffItems, snapBufItems, trieDiffNodes, trieBufNodes, setHead)
trieDiffNodes, trieBufNodes, trieBufNodesImmutable, _ := bc.triedb.Size()
stats.report(chain, it.index, snapDiffItems, snapBufItems, trieDiffNodes, trieBufNodes, trieBufNodesImmutable, setHead)

if !setHead {
// After merge we expect few side chains. Simply count
Expand Down
3 changes: 2 additions & 1 deletion core/blockchain_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const statsReportLimit = 8 * time.Second

// report prints statistics if some number of blocks have been processed
// or more than a few seconds have passed since the last message.
func (st *insertStats) report(chain []*types.Block, index int, snapDiffItems, snapBufItems, trieDiffNodes, triebufNodes common.StorageSize, setHead bool) {
func (st *insertStats) report(chain []*types.Block, index int, snapDiffItems, snapBufItems, trieDiffNodes, triebufNodes, trieBufNodesImmutable common.StorageSize, setHead bool) {
// Fetch the timings for the batch
var (
now = mclock.Now()
Expand Down Expand Up @@ -71,6 +71,7 @@ func (st *insertStats) report(chain []*types.Block, index int, snapDiffItems, sn
}
if trieDiffNodes != 0 { // pathdb
context = append(context, []interface{}{"triediffs", trieDiffNodes}...)
context = append(context, []interface{}{"triedirtyimmutable", trieBufNodesImmutable}...)
}
context = append(context, []interface{}{"triedirty", triebufNodes}...)

Expand Down
2 changes: 1 addition & 1 deletion core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1844,7 +1844,7 @@ func TestTrieForkGC(t *testing.T) {
chain.TrieDB().Dereference(blocks[len(blocks)-1-i].Root())
chain.TrieDB().Dereference(forks[len(blocks)-1-i].Root())
}
if _, nodes, _ := chain.TrieDB().Size(); nodes > 0 { // all memory is returned in the nodes return for hashdb
if _, nodes, _, _ := chain.TrieDB().Size(); nodes > 0 { // all memory is returned in the nodes return for hashdb
t.Fatalf("stale tries still alive after garbase collection")
}
}
Expand Down
4 changes: 2 additions & 2 deletions eth/state_accessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ func (eth *Ethereum) hashState(ctx context.Context, block *types.Block, reexec u
parent = root
}
if report {
_, nodes, imgs := triedb.Size() // all memory is contained within the nodes return in hashdb
log.Info("Historical state regenerated", "block", current.NumberU64(), "elapsed", time.Since(start), "nodes", nodes, "preimages", imgs)
_, nodes, nodeImmutable, imgs := triedb.Size() // all memory is contained within the nodes return in hashdb
log.Info("Historical state regenerated", "block", current.NumberU64(), "elapsed", time.Since(start), "nodes", nodes, "nodesimmutable", nodeImmutable, "preimages", imgs)
}
return statedb, func() { triedb.Dereference(block.Root()) }, nil
}
Expand Down
4 changes: 2 additions & 2 deletions eth/tracers/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,8 @@ func (api *API) traceChain(start, end *types.Block, config *TraceConfig, closed
// if the relevant state is available in disk.
var preferDisk bool
if statedb != nil {
s1, s2, s3 := statedb.Database().TrieDB().Size()
preferDisk = s1+s2+s3 > defaultTracechainMemLimit
s1, s2, s3, s4 := statedb.Database().TrieDB().Size()
preferDisk = s1+s2+s3+s4 > defaultTracechainMemLimit
}
statedb, release, err = api.backend.StateAtBlock(ctx, block, reexec, statedb, false, preferDisk)
if err != nil {
Expand Down
20 changes: 11 additions & 9 deletions trie/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type backend interface {
//
// For hash scheme, there is no differentiation between diff layer nodes
// and dirty disk layer nodes, so both are merged into the second return.
Size() (common.StorageSize, common.StorageSize)
Size() (common.StorageSize, common.StorageSize, common.StorageSize)

// Update performs a state transition by committing dirty nodes contained
// in the given set in order to update state from the specified parent to
Expand Down Expand Up @@ -148,19 +148,21 @@ func (db *Database) Commit(root common.Hash, report bool) error {
return db.backend.Commit(root, report)
}

// Size returns the storage size of diff layer nodes above the persistent disk
// layer, the dirty nodes buffered within the disk layer, and the size of cached
// preimages.
func (db *Database) Size() (common.StorageSize, common.StorageSize, common.StorageSize) {
// Size returns the sizes of:
// - the diff layer nodes above the persistent disk layer,
// - the mutable dirty nodes buffered within the disk layer,
// - the immutable nodes in the disk layer,
// - the cached preimages.
func (db *Database) Size() (common.StorageSize, common.StorageSize, common.StorageSize, common.StorageSize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update the method docs. With four return values, might be time to split it into a bullet-list, e.g. something like

// Size returns the sizes of: 
// - the diff layer nodes above the persistent disk layer, 
// - the dirty nodes buffered within the disk layer, 
// - the immutable nodes in the disk layer, 
// - the cached preimages

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added the docs.

var (
diffs, nodes common.StorageSize
preimages common.StorageSize
diffs, nodes, nodesImmutable common.StorageSize
preimages common.StorageSize
)
diffs, nodes = db.backend.Size()
diffs, nodes, nodesImmutable = db.backend.Size()
if db.preimages != nil {
preimages = db.preimages.size()
}
return diffs, nodes, preimages
return diffs, nodes, nodesImmutable, preimages
}

// Initialized returns an indicator if the state data is already initialized
Expand Down
4 changes: 2 additions & 2 deletions trie/triedb/hashdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,15 +627,15 @@ func (db *Database) Update(root common.Hash, parent common.Hash, block uint64, n
//
// The first return will always be 0, representing the memory stored in unbounded
// diff layers above the dirty cache. This is only available in pathdb.
func (db *Database) Size() (common.StorageSize, common.StorageSize) {
func (db *Database) Size() (common.StorageSize, common.StorageSize, common.StorageSize) {
db.lock.RLock()
defer db.lock.RUnlock()

// db.dirtiesSize only contains the useful data in the cache, but when reporting
// the total memory consumption, the maintenance metadata is also needed to be
// counted.
var metadataSize = common.StorageSize(len(db.dirties) * cachedNodeSize)
return 0, db.dirtiesSize + db.childrenSize + metadataSize
return 0, db.dirtiesSize + db.childrenSize + metadataSize, 0
}

// Close closes the trie database and releases all held resources.
Expand Down
12 changes: 8 additions & 4 deletions trie/triedb/pathdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ const (
// Do not increase the buffer size arbitrarily, otherwise the system
// pause time will increase when the database writes happen.
DefaultBufferSize = 64 * 1024 * 1024

// DefaultBackgroundFlushInterval defines the default the wait interval
// that background node cache flush disk.
DefaultBackgroundFlushInterval = 3
)

// layer is the interface implemented by all state layers which includes some
Expand Down Expand Up @@ -303,7 +307,7 @@ func (db *Database) Enable(root common.Hash) error {
}
// Re-construct a new disk layer backed by persistent state
// with **empty clean cache and node buffer**.
db.tree.reset(newDiskLayer(root, 0, db, nil, newNodeBuffer(db.bufferSize, nil, 0)))
db.tree.reset(newDiskLayer(root, 0, db, nil, newAsyncNodeBuffer(db.bufferSize, nil, 0)))

// Re-enable the database as the final step.
db.waitSync = false
Expand Down Expand Up @@ -410,16 +414,16 @@ func (db *Database) Close() error {

// Size returns the current storage size of the memory cache in front of the
// persistent database layer.
func (db *Database) Size() (diffs common.StorageSize, nodes common.StorageSize) {
func (db *Database) Size() (diffs common.StorageSize, nodes common.StorageSize, nodesImmutable common.StorageSize) {
db.tree.forEach(func(layer layer) {
if diff, ok := layer.(*diffLayer); ok {
diffs += common.StorageSize(diff.memory)
}
if disk, ok := layer.(*diskLayer); ok {
nodes += disk.size()
nodes, nodesImmutable = disk.size()
}
})
return diffs, nodes
return diffs, nodes, nodesImmutable
}

// Initialized returns an indicator if the state data is already
Expand Down
2 changes: 1 addition & 1 deletion trie/triedb/pathdb/difflayer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
func emptyLayer() *diskLayer {
return &diskLayer{
db: New(rawdb.NewMemoryDatabase(), nil),
buffer: newNodeBuffer(DefaultBufferSize, nil, 0),
buffer: newAsyncNodeBuffer(DefaultBufferSize, nil, 0),
}
}

Expand Down
60 changes: 54 additions & 6 deletions trie/triedb/pathdb/disklayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,70 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/trie/trienode"
"github.com/ethereum/go-ethereum/trie/triestate"
"golang.org/x/crypto/sha3"
)

// trienodebuffer is a collection of modified trie nodes to aggregate the disk
// write. The content of the trienodebuffer must be checked before diving into
// disk (since it basically is not-yet-written data).
type trienodebuffer interface {
// node retrieves the trie node with given node info.
node(owner common.Hash, path []byte, hash common.Hash) (*trienode.Node, error)

// commit merges the dirty nodes into the trienodebuffer. This operation won't take
// the ownership of the nodes map which belongs to the bottom-most diff layer.
// It will just hold the node references from the given map which are safe to
// copy.
commit(nodes map[common.Hash]map[string]*trienode.Node) trienodebuffer

// revert is the reverse operation of commit. It also merges the provided nodes
// into the trienodebuffer, the difference is that the provided node set should
// revert the changes made by the last state transition.
revert(db ethdb.KeyValueReader, nodes map[common.Hash]map[string]*trienode.Node) error

// flush persists the in-memory dirty trie node into the disk if the configured
// memory threshold is reached. Note, all data must be written atomically.
flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64, force bool) error

// setSize sets the buffer size to the provided number, and invokes a flush
// operation if the current memory usage exceeds the new limit.
setSize(size int, db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error

// reset cleans up the disk cache.
reset()

// empty returns an indicator if trienodebuffer contains any state transition inside.
empty() bool

// getSize return the trienodebuffer used size, includes:
// - the mutable dirty nodes buffered within the disk layer,
// - the immutable nodes in the disk layer.
getSize() (uint64, uint64)

// getAllNodes return all the trie nodes are cached in trienodebuffer.
getAllNodes() map[common.Hash]map[string]*trienode.Node

// getLayers return the size of cached difflayers.
getLayers() uint64
}

// diskLayer is a low level persistent layer built on top of a key-value store.
type diskLayer struct {
root common.Hash // Immutable, root hash to which this layer was made for
id uint64 // Immutable, corresponding state id
db *Database // Path-based trie database
cleans *fastcache.Cache // GC friendly memory cache of clean node RLPs
buffer *nodebuffer // Node buffer to aggregate writes
buffer trienodebuffer // Node buffer to aggregate writes
stale bool // Signals that the layer became stale (state progressed)
lock sync.RWMutex // Lock used to protect stale flag
}

// newDiskLayer creates a new disk layer based on the passing arguments.
func newDiskLayer(root common.Hash, id uint64, db *Database, cleans *fastcache.Cache, buffer *nodebuffer) *diskLayer {
func newDiskLayer(root common.Hash, id uint64, db *Database, cleans *fastcache.Cache, buffer trienodebuffer) *diskLayer {
// Initialize a clean cache if the memory allowance is not zero
// or reuse the provided cache if it is not nil (inherited from
// the original disk layer).
Expand Down Expand Up @@ -293,15 +338,18 @@ func (dl *diskLayer) setBufferSize(size int) error {
return dl.buffer.setSize(size, dl.db.diskdb, dl.cleans, dl.id)
}

// size returns the approximate size of cached nodes in the disk layer.
func (dl *diskLayer) size() common.StorageSize {
// size returns the approximate size of:
// - the mutable dirty nodes buffered within the disk layer,
// - the immutable nodes in the disk layer.
func (dl *diskLayer) size() (common.StorageSize, common.StorageSize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update method doc

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added the docs.

dl.lock.RLock()
defer dl.lock.RUnlock()

if dl.stale {
return 0
return 0, 0
}
return common.StorageSize(dl.buffer.size)
nodeBuf, nodeImmutableBuf := dl.buffer.getSize()
return common.StorageSize(nodeBuf), common.StorageSize(nodeImmutableBuf)
}

// resetCache releases the memory held by clean cache to prevent memory leak.
Expand Down
12 changes: 12 additions & 0 deletions trie/triedb/pathdb/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,18 @@ var (
// errUnexpectedNode is returned if the requested node with specified path is
// not hash matched with expectation.
errUnexpectedNode = errors.New("unexpected node")

// errWriteImmutable is returned if write to background immutable nodebuffer
errWriteImmutable = errors.New("write immutable node buffer")

// errFlushMutable is returned if flush the background mutable nodebuffer to disk
errFlushMutable = errors.New("flush mutable node buffer")

// errRevertImmutable is returned if revert the background immutable nodebuffer
errRevertImmutable = errors.New("revert immutable node buffer")

// errIncompatibleMerge is returned when merge node cache occurs error.
errIncompatibleMerge = errors.New("incompatible node buffer merge")
)

func newUnexpectedNodeError(loc string, expHash common.Hash, gotHash common.Hash, owner common.Hash, path []byte, blob []byte) error {
Expand Down
15 changes: 8 additions & 7 deletions trie/triedb/pathdb/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (db *Database) loadLayers() layer {
log.Info("Failed to load journal, discard it", "err", err)
}
// Return single layer with persistent state.
return newDiskLayer(root, rawdb.ReadPersistentStateID(db.diskdb), db, nil, newNodeBuffer(db.bufferSize, nil, 0))
return newDiskLayer(root, rawdb.ReadPersistentStateID(db.diskdb), db, nil, newAsyncNodeBuffer(db.bufferSize, nil, 0))
}

// loadDiskLayer reads the binary blob from the layer journal, reconstructing
Expand Down Expand Up @@ -170,7 +170,7 @@ func (db *Database) loadDiskLayer(r *rlp.Stream) (layer, error) {
nodes[entry.Owner] = subset
}
// Calculate the internal state transitions by id difference.
base := newDiskLayer(root, id, db, nil, newNodeBuffer(db.bufferSize, nodes, id-stored))
base := newDiskLayer(root, id, db, nil, newAsyncNodeBuffer(db.bufferSize, nodes, id-stored))
return base, nil
}

Expand Down Expand Up @@ -260,8 +260,9 @@ func (dl *diskLayer) journal(w io.Writer) error {
return err
}
// Step three, write all unwritten nodes into the journal
nodes := make([]journalNodes, 0, len(dl.buffer.nodes))
for owner, subset := range dl.buffer.nodes {
cachedNodes := dl.buffer.getAllNodes()
nodes := make([]journalNodes, 0, len(cachedNodes))
for owner, subset := range cachedNodes {
entry := journalNodes{Owner: owner}
for path, node := range subset {
entry.Nodes = append(entry.Nodes, journalNode{Path: []byte(path), Blob: node.Blob})
Expand All @@ -271,7 +272,7 @@ func (dl *diskLayer) journal(w io.Writer) error {
if err := rlp.Encode(w, nodes); err != nil {
return err
}
log.Debug("Journaled pathdb disk layer", "root", dl.root, "nodes", len(dl.buffer.nodes))
log.Debug("Journaled pathdb disk layer", "root", dl.root, "nodes", len(cachedNodes))
return nil
}

Expand Down Expand Up @@ -344,9 +345,9 @@ func (db *Database) Journal(root common.Hash) error {
}
disk := db.tree.bottom()
if l, ok := l.(*diffLayer); ok {
log.Info("Persisting dirty state to disk", "head", l.block, "root", root, "layers", l.id-disk.id+disk.buffer.layers)
log.Info("Persisting dirty state to disk", "head", l.block, "root", root, "layers", l.id-disk.id+disk.buffer.getLayers())
} else { // disk layer only on noop runs (likely) or deep reorgs (unlikely)
log.Info("Persisting dirty state to disk", "root", root, "layers", disk.buffer.layers)
log.Info("Persisting dirty state to disk", "root", root, "layers", disk.buffer.getLayers())
}
start := time.Now()

Expand Down
Loading