Skip to content

Commit

Permalink
trie/triedb/aggpathdb: reallocate the trie clean cache
Browse files Browse the repository at this point in the history
  • Loading branch information
fynnss committed Nov 6, 2023
1 parent 4a4f33c commit 37be668
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 15 deletions.
7 changes: 7 additions & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import (
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/trie/triedb/aggpathdb"
"github.com/ethereum/go-ethereum/trie/triedb/pathdb"
)

Expand Down Expand Up @@ -148,6 +149,12 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
log.Info("Clean cache size", "provided", common.StorageSize(config.TrieCleanCache)*1024*1024)
config.TrieDirtyCache = pathdb.MaxDirtyBufferSize / 1024 / 1024
}
if config.StateScheme == rawdb.AggPathScheme && config.TrieDirtyCache > aggpathdb.MaxDirtyBufferSize/1024/1024 {
log.Info("Capped dirty cache size", "provided", common.StorageSize(config.TrieDirtyCache)*1024*1024, "adjusted", common.StorageSize(aggpathdb.MaxDirtyBufferSize))
log.Info("Clean cache size", "provided", common.StorageSize(config.TrieCleanCache)*1024*1024)
config.TrieCleanCache += config.TrieDirtyCache - aggpathdb.MaxDirtyBufferSize/1024/1024
config.TrieDirtyCache = aggpathdb.MaxDirtyBufferSize / 1024 / 1024
}
log.Info("Allocated trie memory caches", "clean", common.StorageSize(config.TrieCleanCache)*1024*1024, "dirty", common.StorageSize(config.TrieDirtyCache)*1024*1024)

// Assemble the Ethereum object
Expand Down
6 changes: 3 additions & 3 deletions trie/triedb/aggpathdb/aggnodebuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (b *aggNodeBuffer) flush(db ethdb.KeyValueStore, cleans *aggNodeCache, id u
batch = db.NewBatchWithSize(int(float64(b.size) * DefaultBatchRedundancyRate))
)

nodes := aggregateAndWriteNodes(cleans, batch, b.aggNodes)
nodes := writeAggNodes(cleans, batch, b.aggNodes)
rawdb.WritePersistentStateID(batch, id)

// Flush all mutations in a single batch
Expand All @@ -236,9 +236,9 @@ func (b *aggNodeBuffer) flush(db ethdb.KeyValueStore, cleans *aggNodeCache, id u
return nil
}

// aggregateAndWriteNodes will persist all agg node into the database
// writeAggNodes will persist all agg node into the database
// Note this function will inject all the clean node into the cleanCache
func aggregateAndWriteNodes(cache *aggNodeCache, batch ethdb.Batch, nodes map[common.Hash]map[string]*AggNode) (total int) {
func writeAggNodes(cache *aggNodeCache, batch ethdb.Batch, nodes map[common.Hash]map[string]*AggNode) (total int) {
// load the node from clean memory cache and update it, then persist it.
for owner, subset := range nodes {
for path, n := range subset {
Expand Down
11 changes: 8 additions & 3 deletions trie/triedb/aggpathdb/aggnodecache.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (c *aggNodeCache) node(owner common.Hash, path []byte, hash common.Hash) ([
log.Error("Unexpected trie node in disk", "owner", owner, "path", path, "expect", hash, "got", nHash)
return nil, newUnexpectedNodeError("disk", hash, nHash, owner, path, nBlob)
}
if c.cleans != nil && len(nBlob) > 0 {
if c.cleans != nil {
c.cleans.Set(key, nBlob)
cleanWriteMeter.Mark(int64(len(nBlob)))
}
Expand All @@ -91,9 +91,10 @@ func (c *aggNodeCache) node(owner common.Hash, path []byte, hash common.Hash) ([

func (c *aggNodeCache) aggNode(owner common.Hash, aggPath []byte) (*AggNode, error) {
var blob []byte
cKey := cacheKey(owner, aggPath)
if c.cleans != nil {
cacheHit := false
blob, cacheHit = c.cleans.HasGet(nil, cacheKey(owner, aggPath))
blob, cacheHit = c.cleans.HasGet(nil, cKey)
if cacheHit {
cleanHitMeter.Mark(1)
cleanReadMeter.Mark(int64(len(blob)))
Expand All @@ -108,10 +109,14 @@ func (c *aggNodeCache) aggNode(owner common.Hash, aggPath []byte) (*AggNode, err
} else {
blob = rawdb.ReadStorageTrieAggNode(c.db.diskdb, owner, aggPath)
}

if blob == nil {
return nil, nil
}

if c.cleans != nil {
c.cleans.Set(cKey, blob)
cleanWriteMeter.Mark(int64(len(blob)))
}
return DecodeAggNode(blob)
}

Expand Down
16 changes: 8 additions & 8 deletions trie/triedb/aggpathdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ const (
// defaultCleanSize is the default memory allowance of clean cache.
defaultCleanSize = 16 * 1024 * 1024

// maxDirtyBufferSize is the maximum memory allowance of node buffer.
// MaxDirtyBufferSize is the maximum memory allowance of node buffer.
// Too large aggNodeBuffer will cause the system to pause for a long
// time when write happens. Also, the largest batch that pebble can
// support is 4GB, node will panic if batch size exceeds this limit.
maxDirtyBufferSize = 256 * 1024 * 1024
MaxDirtyBufferSize = 256 * 1024 * 1024

// DefaultDirtyBufferSize is the default memory allowance of node buffer
// that aggregates the writes from above until it's flushed into the
Expand Down Expand Up @@ -98,9 +98,9 @@ type Config struct {
// unreasonable or unworkable.
func (c *Config) sanitize() *Config {
conf := *c
if conf.DirtyCacheSize > maxDirtyBufferSize {
log.Warn("Sanitizing invalid node buffer size", "provided", common.StorageSize(conf.DirtyCacheSize), "updated", common.StorageSize(maxDirtyBufferSize))
conf.DirtyCacheSize = maxDirtyBufferSize
if conf.DirtyCacheSize > MaxDirtyBufferSize {
log.Warn("Sanitizing invalid node buffer size", "provided", common.StorageSize(conf.DirtyCacheSize), "updated", common.StorageSize(MaxDirtyBufferSize))
conf.DirtyCacheSize = MaxDirtyBufferSize
}
return &conf
}
Expand Down Expand Up @@ -414,9 +414,9 @@ func (db *Database) SetBufferSize(size int) error {
db.lock.Lock()
defer db.lock.Unlock()

if size > maxDirtyBufferSize {
log.Info("Capped node buffer size", "provided", common.StorageSize(size), "adjusted", common.StorageSize(maxDirtyBufferSize))
size = maxDirtyBufferSize
if size > MaxDirtyBufferSize {
log.Info("Capped node buffer size", "provided", common.StorageSize(size), "adjusted", common.StorageSize(MaxDirtyBufferSize))
size = MaxDirtyBufferSize
}
db.bufferSize = size
return db.tree.bottom().setBufferSize(db.bufferSize)
Expand Down
2 changes: 1 addition & 1 deletion trie/triedb/aggpathdb/disklayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func (dl *diskLayer) revert(h *history, loader triestate.TrieLoader) (*diskLayer
} else {
batch := dl.db.diskdb.NewBatch()
dl.commitNodes(nodes)
aggregateAndWriteNodes(dl.cleans, batch, dl.buffer.aggNodes)
writeAggNodes(dl.cleans, batch, dl.buffer.aggNodes)
rawdb.WritePersistentStateID(batch, dl.id-1)
if err := batch.Write(); err != nil {
log.Crit("Failed to write states", "err", err)
Expand Down

0 comments on commit 37be668

Please sign in to comment.