From 53559fc4d72e7569b0c9e109fae8521c203c463d Mon Sep 17 00:00:00 2001 From: joey Date: Mon, 6 Nov 2023 14:11:17 +0800 Subject: [PATCH] eth, trie/triedb/pathdb: pbss patches (#1955) * fix: use the top root hash for rewinding under path schema * feat: add async flush nodebuffer in path schema * chore: add prun-block param suffix check * fix: code review comments --- cmd/geth/main.go | 1 + cmd/geth/snapshot.go | 17 +- cmd/utils/flags.go | 9 + core/blockchain.go | 53 ++- core/blockchain_insert.go | 10 +- core/blockchain_test.go | 2 +- eth/backend.go | 3 +- eth/ethconfig/config.go | 1 + eth/handler.go | 9 +- eth/state_accessor.go | 4 +- eth/tracers/api.go | 4 +- trie/database.go | 23 +- trie/triedb/hashdb/database.go | 4 +- trie/triedb/pathdb/asyncnodebuffer.go | 448 ++++++++++++++++++++++++++ trie/triedb/pathdb/database.go | 28 +- trie/triedb/pathdb/difflayer.go | 2 +- trie/triedb/pathdb/disklayer.go | 65 +++- trie/triedb/pathdb/errors.go | 14 + trie/triedb/pathdb/journal.go | 15 +- trie/triedb/pathdb/layertree.go | 44 +++ trie/triedb/pathdb/nodebuffer.go | 29 +- 21 files changed, 709 insertions(+), 76 deletions(-) create mode 100644 trie/triedb/pathdb/asyncnodebuffer.go diff --git a/cmd/geth/main.go b/cmd/geth/main.go index b1b08588e2..2d1a45bf6d 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -97,6 +97,7 @@ var ( utils.TransactionHistoryFlag, utils.StateSchemeFlag, utils.StateHistoryFlag, + utils.PathDBSyncFlag, utils.LightServeFlag, utils.LightIngressFlag, utils.LightEgressFlag, diff --git a/cmd/geth/snapshot.go b/cmd/geth/snapshot.go index 0d9e583e79..c6ce0c7840 100644 --- a/cmd/geth/snapshot.go +++ b/cmd/geth/snapshot.go @@ -23,6 +23,7 @@ import ( "fmt" "os" "path/filepath" + "strings" "time" "github.com/prometheus/tsdb/fileutil" @@ -360,7 +361,21 @@ func pruneBlock(ctx *cli.Context) error { if path == "" { return errors.New("prune failed, did not specify the AncientPath") } - newAncientPath = filepath.Join(path, "ancient_back") + newVersionPath := false + files, err := os.ReadDir(oldAncientPath) + if err != nil { + return err + } + for _, file := range files { + if file.IsDir() && file.Name() == "chain" { + newVersionPath = true + } + } + if newVersionPath && !strings.HasSuffix(oldAncientPath, "geth/chaindata/ancient/chain") { + log.Error("datadir.ancient subdirectory incorrect", "got path", oldAncientPath, "want subdirectory", "geth/chaindata/ancient/chain/") + return errors.New("datadir.ancient subdirectory incorrect") + } + newAncientPath = filepath.Join(path, "chain_back") blockpruner = pruner.NewBlockPruner(chaindb, stack, oldAncientPath, newAncientPath, blockAmountReserved) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 2f23414ee8..36455d86b2 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -314,6 +314,12 @@ var ( Value: rawdb.HashScheme, Category: flags.StateCategory, } + PathDBSyncFlag = &cli.BoolFlag{ + Name: "pathdb.sync", + Usage: "sync flush nodes cache to disk in path schema", + Value: false, + Category: flags.StateCategory, + } StateHistoryFlag = &cli.Uint64Flag{ Name: "history.state", Usage: "Number of recent blocks to retain state history for (default = 90,000 blocks, 0 = entire chain)", @@ -1951,6 +1957,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { log.Warn("The flag --txlookuplimit is deprecated and will be removed, please use --history.transactions") cfg.TransactionHistory = ctx.Uint64(TransactionHistoryFlag.Name) } + if ctx.IsSet(PathDBSyncFlag.Name) { + cfg.PathSyncFlush = true + } if ctx.String(GCModeFlag.Name) == "archive" && cfg.TransactionHistory != 0 { cfg.TransactionHistory = 0 log.Warn("Disabled transaction unindexing for archive node") diff --git a/core/blockchain.go b/core/blockchain.go index a9de1aabd3..454c9768a6 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -155,6 +155,7 @@ type CacheConfig struct { NoTries bool // Insecure settings. Do not have any tries in databases if enabled. StateHistory uint64 // Number of blocks from head whose state histories are reserved. StateScheme string // Scheme used to store ethereum states and merkle tree nodes on top + PathSyncFlush bool // Whether sync flush the trienodebuffer of pathdb to disk. SnapshotNoBuild bool // Whether the background generation is allowed SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it @@ -174,6 +175,7 @@ func (c *CacheConfig) triedbConfig() *trie.Config { } if c.StateScheme == rawdb.PathScheme { config.PathDB = &pathdb.Config{ + SyncFlush: c.PathSyncFlush, StateHistory: c.StateHistory, CleanCacheSize: c.TrieCleanLimit * 1024 * 1024, DirtyCacheSize: c.TrieDirtyLimit * 1024 * 1024, @@ -396,11 +398,15 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis var diskRoot common.Hash if bc.cacheConfig.SnapshotLimit > 0 { diskRoot = rawdb.ReadSnapshotRoot(bc.db) - } else if bc.triedb.Scheme() == rawdb.PathScheme { - _, diskRoot = rawdb.ReadAccountTrieNode(bc.db, nil) + } + if bc.triedb.Scheme() == rawdb.PathScheme { + recoverable, _ := bc.triedb.Recoverable(diskRoot) + if !bc.HasState(diskRoot) && !recoverable { + diskRoot = bc.triedb.Head() + } } if diskRoot != (common.Hash{}) { - log.Warn("Head state missing, repairing", "number", head.Number, "hash", head.Hash(), "snaproot", diskRoot) + log.Warn("Head state missing, repairing", "number", head.Number, "hash", head.Hash(), "diskRoot", diskRoot) snapDisk, err := bc.setHeadBeyondRoot(head.Number.Uint64(), 0, diskRoot, true) if err != nil { @@ -689,18 +695,18 @@ func (bc *BlockChain) loadLastState() error { blockTd = bc.GetTd(headBlock.Hash(), headBlock.NumberU64()) ) if headHeader.Hash() != headBlock.Hash() { - log.Info("Loaded most recent local header", "number", headHeader.Number, "hash", headHeader.Hash(), "td", headerTd, "age", common.PrettyAge(time.Unix(int64(headHeader.Time), 0))) + log.Info("Loaded most recent local header", "number", headHeader.Number, "hash", headHeader.Hash(), "hash", headHeader.Root, "td", headerTd, "age", common.PrettyAge(time.Unix(int64(headHeader.Time), 0))) } - log.Info("Loaded most recent local block", "number", headBlock.Number(), "hash", headBlock.Hash(), "td", blockTd, "age", common.PrettyAge(time.Unix(int64(headBlock.Time()), 0))) + log.Info("Loaded most recent local block", "number", headBlock.Number(), "hash", headBlock.Hash(), "root", headBlock.Root(), "td", blockTd, "age", common.PrettyAge(time.Unix(int64(headBlock.Time()), 0))) if headBlock.Hash() != currentSnapBlock.Hash() { snapTd := bc.GetTd(currentSnapBlock.Hash(), currentSnapBlock.Number.Uint64()) - log.Info("Loaded most recent local snap block", "number", currentSnapBlock.Number, "hash", currentSnapBlock.Hash(), "td", snapTd, "age", common.PrettyAge(time.Unix(int64(currentSnapBlock.Time), 0))) + log.Info("Loaded most recent local snap block", "number", currentSnapBlock.Number, "hash", currentSnapBlock.Hash(), "root", currentSnapBlock.Root, "td", snapTd, "age", common.PrettyAge(time.Unix(int64(currentSnapBlock.Time), 0))) } if posa, ok := bc.engine.(consensus.PoSA); ok { if currentFinalizedHeader := posa.GetFinalizedHeader(bc, headHeader); currentFinalizedHeader != nil { if currentFinalizedBlock := bc.GetBlockByHash(currentFinalizedHeader.Hash()); currentFinalizedBlock != nil { finalTd := bc.GetTd(currentFinalizedBlock.Hash(), currentFinalizedBlock.NumberU64()) - log.Info("Loaded most recent local finalized block", "number", currentFinalizedBlock.Number(), "hash", currentFinalizedBlock.Hash(), "td", finalTd, "age", common.PrettyAge(time.Unix(int64(currentFinalizedBlock.Time()), 0))) + log.Info("Loaded most recent local finalized block", "number", currentFinalizedBlock.Number(), "hash", currentFinalizedBlock.Hash(), "root", currentFinalizedBlock.Root(), "td", finalTd, "age", common.PrettyAge(time.Unix(int64(currentFinalizedBlock.Time()), 0))) } } } @@ -802,6 +808,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha // resetState resets the persistent state to genesis if it's not available. resetState := func() { + log.Info("Reset to block with genesis state", "number", bc.genesisBlock.NumberU64(), "hash", bc.genesisBlock.Hash()) // Short circuit if the genesis state is already present. if bc.HasState(bc.genesisBlock.Root()) { return @@ -825,7 +832,6 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha // chain reparation mechanism without deleting any data! if currentBlock := bc.CurrentBlock(); currentBlock != nil && header.Number.Uint64() <= currentBlock.Number.Uint64() { newHeadBlock := bc.GetBlock(header.Hash(), header.Number.Uint64()) - lastBlockNum := header.Number.Uint64() if newHeadBlock == nil { log.Error("Gap in the chain, rewinding to genesis", "number", header.Number, "hash", header.Hash()) newHeadBlock = bc.genesisBlock @@ -835,10 +841,8 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha // keeping rewinding until we exceed the optional threshold // root hash beyondRoot := (root == common.Hash{}) // Flag whether we're beyond the requested root (no root, always true) - enoughBeyondCount := false - beyondCount := 0 + for { - beyondCount++ // If a root threshold was requested but not yet crossed, check if root != (common.Hash{}) && !beyondRoot && newHeadBlock.Root() == root { beyondRoot, rootNumber = true, newHeadBlock.NumberU64() @@ -854,24 +858,11 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha log.Error("Missing block in the middle, aiming genesis", "number", newHeadBlock.NumberU64()-1, "hash", newHeadBlock.ParentHash()) newHeadBlock = bc.genesisBlock } else { - log.Trace("Rewind passed pivot, aiming genesis", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash(), "pivot", *pivot) + log.Info("Rewind passed pivot, aiming genesis", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash(), "pivot", *pivot) newHeadBlock = bc.genesisBlock } } - if beyondRoot || (enoughBeyondCount && root != common.Hash{}) || newHeadBlock.NumberU64() == 0 { - if enoughBeyondCount && (root != common.Hash{}) && rootNumber == 0 { - for { - lastBlockNum++ - block := bc.GetBlockByNumber(lastBlockNum) - if block == nil { - break - } - if block.Root() == root { - rootNumber = block.NumberU64() - break - } - } - } + if beyondRoot || newHeadBlock.NumberU64() == 0 { if newHeadBlock.NumberU64() == 0 { resetState() } else if !bc.HasState(newHeadBlock.Root()) { @@ -1215,7 +1206,7 @@ func (bc *BlockChain) Stop() { for !bc.triegc.Empty() { triedb.Dereference(bc.triegc.PopItem()) } - if size, _ := triedb.Size(); size != 0 { + if _, size, _, _ := triedb.Size(); size != 0 { log.Error("Dangling trie nodes after full cleanup") } } @@ -1619,8 +1610,8 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. if current := block.NumberU64(); current > bc.triesInMemory { // If we exceeded our memory allowance, flush matured singleton nodes to disk var ( - nodes, imgs = triedb.Size() - limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024 + _, nodes, _, imgs = triedb.Size() + limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024 ) if nodes > limit || imgs > 4*1024*1024 { triedb.Cap(limit - ethdb.IdealBatchSize) @@ -2104,8 +2095,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) stats.processed++ stats.usedGas += usedGas - dirty, _ := bc.triedb.Size() - stats.report(chain, it.index, dirty, setHead) + trieDiffNodes, trieBufNodes, trieImmutableBufNodes, _ := bc.triedb.Size() + stats.report(chain, it.index, trieDiffNodes, trieBufNodes, trieImmutableBufNodes, setHead) if !setHead { // After merge we expect few side chains. Simply count diff --git a/core/blockchain_insert.go b/core/blockchain_insert.go index ffe2d6501c..520feaf277 100644 --- a/core/blockchain_insert.go +++ b/core/blockchain_insert.go @@ -39,7 +39,7 @@ const statsReportLimit = 8 * time.Second // report prints statistics if some number of blocks have been processed // or more than a few seconds have passed since the last message. -func (st *insertStats) report(chain []*types.Block, index int, dirty common.StorageSize, setHead bool) { +func (st *insertStats) report(chain []*types.Block, index int, trieDiffNodes, trieBufNodes, trieImmutableBufNodes common.StorageSize, setHead bool) { // Fetch the timings for the batch var ( now = mclock.Now() @@ -63,7 +63,13 @@ func (st *insertStats) report(chain []*types.Block, index int, dirty common.Stor if timestamp := time.Unix(int64(end.Time()), 0); time.Since(timestamp) > time.Minute { context = append(context, []interface{}{"age", common.PrettyAge(timestamp)}...) } - context = append(context, []interface{}{"dirty", dirty}...) + if trieDiffNodes != 0 { // pathdb + context = append(context, []interface{}{"triediffs", trieDiffNodes}...) + context = append(context, []interface{}{"triedirty", trieBufNodes}...) + context = append(context, []interface{}{"trieimutabledirty", trieImmutableBufNodes}...) + } else { + context = append(context, []interface{}{"triedirty", trieBufNodes}...) + } if st.queued > 0 { context = append(context, []interface{}{"queued", st.queued}...) diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 56a5c7763e..6bf6d2d881 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -1807,7 +1807,7 @@ func TestTrieForkGC(t *testing.T) { chain.stateCache.TrieDB().Dereference(blocks[len(blocks)-1-i].Root()) chain.stateCache.TrieDB().Dereference(forks[len(blocks)-1-i].Root()) } - if nodes, _ := chain.TrieDB().Size(); nodes > 0 { + if _, nodes, _, _ := chain.TrieDB().Size(); nodes > 0 { t.Fatalf("stale tries still alive after garbase collection") } } diff --git a/eth/backend.go b/eth/backend.go index 68d8634724..133b2f2a86 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -145,7 +145,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { // dirty cache to the clean cache. if config.StateScheme == rawdb.PathScheme && config.TrieDirtyCache > pathdb.MaxDirtyBufferSize/1024/1024 { log.Info("Capped dirty cache size", "provided", common.StorageSize(config.TrieDirtyCache)*1024*1024, "adjusted", common.StorageSize(pathdb.MaxDirtyBufferSize)) - config.TrieCleanCache += config.TrieDirtyCache - pathdb.MaxDirtyBufferSize/1024/1024 + log.Info("Clean cache size", "provided", common.StorageSize(config.TrieCleanCache)*1024*1024) config.TrieDirtyCache = pathdb.MaxDirtyBufferSize / 1024 / 1024 } log.Info("Allocated trie memory caches", "clean", common.StorageSize(config.TrieCleanCache)*1024*1024, "dirty", common.StorageSize(config.TrieDirtyCache)*1024*1024) @@ -225,6 +225,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { Preimages: config.Preimages, StateHistory: config.StateHistory, StateScheme: config.StateScheme, + PathSyncFlush: config.PathSyncFlush, } ) bcOps := make([]core.BlockChainOption, 0) diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index 384996e7fc..15ed20d702 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -123,6 +123,7 @@ type Config struct { TransactionHistory uint64 `toml:",omitempty"` // The maximum number of blocks from head whose tx indices are reserved. StateHistory uint64 `toml:",omitempty"` // The maximum number of blocks from head whose state histories are reserved. StateScheme string `toml:",omitempty"` // State scheme used to store ethereum state and merkle trie nodes on top + PathSyncFlush bool `toml:",omitempty"` // State scheme used to store ethereum state and merkle trie nodes on top // RequiredBlocks is a set of block number -> hash mappings which must be in the // canonical chain of all remote peers. Setting the option makes geth verify the diff --git a/eth/handler.go b/eth/handler.go index c6a2c61308..e59bbb4884 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -45,7 +45,6 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p" - "github.com/ethereum/go-ethereum/trie/triedb/pathdb" ) const ( @@ -979,7 +978,9 @@ func (h *handler) voteBroadcastLoop() { // sync is finished. func (h *handler) enableSyncedFeatures() { h.acceptTxs.Store(true) - if h.chain.TrieDB().Scheme() == rawdb.PathScheme { - h.chain.TrieDB().SetBufferSize(pathdb.DefaultDirtyBufferSize) - } + // In the bsc scenario, pathdb.MaxDirtyBufferSize (256MB) will be used. + // The performance is better than DefaultDirtyBufferSize (64MB). + //if h.chain.TrieDB().Scheme() == rawdb.PathScheme { + // h.chain.TrieDB().SetBufferSize(pathdb.DefaultDirtyBufferSize) + //} } diff --git a/eth/state_accessor.go b/eth/state_accessor.go index 71a38253ef..0971ef246a 100644 --- a/eth/state_accessor.go +++ b/eth/state_accessor.go @@ -175,8 +175,8 @@ func (eth *Ethereum) hashState(ctx context.Context, block *types.Block, reexec u parent = root } if report { - nodes, imgs := triedb.Size() - log.Info("Historical state regenerated", "block", current.NumberU64(), "elapsed", time.Since(start), "nodes", nodes, "preimages", imgs) + diff, nodes, immutablenodes, imgs := triedb.Size() + log.Info("Historical state regenerated", "block", current.NumberU64(), "elapsed", time.Since(start), "layer", diff, "nodes", nodes, "immutablenodes", immutablenodes, "preimages", imgs) } return statedb, func() { triedb.Dereference(block.Root()) }, nil } diff --git a/eth/tracers/api.go b/eth/tracers/api.go index 0891033e42..b4f80a6b08 100644 --- a/eth/tracers/api.go +++ b/eth/tracers/api.go @@ -372,8 +372,8 @@ func (api *API) traceChain(start, end *types.Block, config *TraceConfig, closed // if the relevant state is available in disk. var preferDisk bool if statedb != nil { - s1, s2 := statedb.Database().TrieDB().Size() - preferDisk = s1+s2 > defaultTracechainMemLimit + s1, s2, s3, s4 := statedb.Database().TrieDB().Size() + preferDisk = s1+s2+s3+s4 > defaultTracechainMemLimit } statedb, release, err = api.backend.StateAtBlock(ctx, block, reexec, statedb, false, preferDisk) if err != nil { diff --git a/trie/database.go b/trie/database.go index 7bad532dde..8581b8bcaa 100644 --- a/trie/database.go +++ b/trie/database.go @@ -61,7 +61,7 @@ type backend interface { // Size returns the current storage size of the memory cache in front of the // persistent database layer. - Size() common.StorageSize + Size() (common.StorageSize, common.StorageSize, common.StorageSize) // Update performs a state transition by committing dirty nodes contained // in the given set in order to update state from the specified parent to @@ -207,16 +207,16 @@ func (db *Database) Commit(root common.Hash, report bool) error { // Size returns the storage size of dirty trie nodes in front of the persistent // database and the size of cached preimages. -func (db *Database) Size() (common.StorageSize, common.StorageSize) { +func (db *Database) Size() (common.StorageSize, common.StorageSize, common.StorageSize, common.StorageSize) { var ( - storages common.StorageSize - preimages common.StorageSize + diffs, nodes, immutablenodes common.StorageSize + preimages common.StorageSize ) - storages = db.backend.Size() + diffs, nodes, immutablenodes = db.backend.Size() if db.preimages != nil { preimages = db.preimages.size() } - return storages, preimages + return diffs, nodes, immutablenodes, preimages } // Initialized returns an indicator if the state data is already initialized @@ -353,3 +353,14 @@ func (db *Database) SetBufferSize(size int) error { } return pdb.SetBufferSize(size) } + +// Head return the top non-fork difflayer/disklayer root hash for rewinding. +// It's only supported by path-based database and will return an error for +// others. +func (db *Database) Head() common.Hash { + pdb, ok := db.backend.(*pathdb.Database) + if !ok { + return common.Hash{} + } + return pdb.Head() +} diff --git a/trie/triedb/hashdb/database.go b/trie/triedb/hashdb/database.go index e9fd4722b6..caf79ef936 100644 --- a/trie/triedb/hashdb/database.go +++ b/trie/triedb/hashdb/database.go @@ -641,7 +641,7 @@ func (db *Database) Update(root common.Hash, parent common.Hash, block uint64, n // Size returns the current storage size of the memory cache in front of the // persistent database layer. -func (db *Database) Size() common.StorageSize { +func (db *Database) Size() (common.StorageSize, common.StorageSize, common.StorageSize) { db.lock.RLock() defer db.lock.RUnlock() @@ -649,7 +649,7 @@ func (db *Database) Size() common.StorageSize { // the total memory consumption, the maintenance metadata is also needed to be // counted. var metadataSize = common.StorageSize(len(db.dirties) * cachedNodeSize) - return db.dirtiesSize + db.childrenSize + metadataSize + return 0, db.dirtiesSize + db.childrenSize + metadataSize, 0 } // Close closes the trie database and releases all held resources. diff --git a/trie/triedb/pathdb/asyncnodebuffer.go b/trie/triedb/pathdb/asyncnodebuffer.go new file mode 100644 index 0000000000..c8c3921177 --- /dev/null +++ b/trie/triedb/pathdb/asyncnodebuffer.go @@ -0,0 +1,448 @@ +package pathdb + +import ( + "errors" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/VictoriaMetrics/fastcache" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/trie/trienode" +) + +var _ trienodebuffer = &asyncnodebuffer{} + +// asyncnodebuffer implement trienodebuffer interface, and aysnc the nodecache +// to disk. +type asyncnodebuffer struct { + mux sync.RWMutex + current *nodecache + background *nodecache +} + +// newAsyncNodeBuffer initializes the async node buffer with the provided nodes. +func newAsyncNodeBuffer(limit int, nodes map[common.Hash]map[string]*trienode.Node, layers uint64) *asyncnodebuffer { + if nodes == nil { + nodes = make(map[common.Hash]map[string]*trienode.Node) + } + var size uint64 + for _, subset := range nodes { + for path, n := range subset { + size += uint64(len(n.Blob) + len(path)) + } + } + + return &asyncnodebuffer{ + current: newNodeCache(uint64(limit), size, nodes, layers), + background: newNodeCache(uint64(limit), 0, make(map[common.Hash]map[string]*trienode.Node), 0), + } +} + +// node retrieves the trie node with given node info. +func (a *asyncnodebuffer) node(owner common.Hash, path []byte, hash common.Hash) (*trienode.Node, error) { + a.mux.RLock() + defer a.mux.RUnlock() + + node, err := a.current.node(owner, path, hash) + if err != nil { + return nil, err + } + if node == nil { + return a.background.node(owner, path, hash) + } + return node, nil +} + +// commit merges the dirty nodes into the nodebuffer. This operation won't take +// the ownership of the nodes map which belongs to the bottom-most diff layer. +// It will just hold the node references from the given map which are safe to +// copy. +func (a *asyncnodebuffer) commit(nodes map[common.Hash]map[string]*trienode.Node) trienodebuffer { + a.mux.Lock() + defer a.mux.Unlock() + + err := a.current.commit(nodes) + if err != nil { + log.Crit("[BUG] failed to commit nodes to asyncnodebuffer", "error", err) + } + return a +} + +// revert is the reverse operation of commit. It also merges the provided nodes +// into the nodebuffer, the difference is that the provided node set should +// revert the changes made by the last state transition. +func (a *asyncnodebuffer) revert(db ethdb.KeyValueReader, nodes map[common.Hash]map[string]*trienode.Node) error { + a.mux.Lock() + defer a.mux.Unlock() + + var err error + a.current, err = a.current.merge(a.background) + if err != nil { + log.Crit("[BUG] failed to merge node cache under revert async node buffer", "error", err) + } + a.background.reset() + return a.current.revert(db, nodes) +} + +// setSize is unsupported in asyncnodebuffer, due to the double buffer, blocking will occur. +func (a *asyncnodebuffer) setSize(size int, db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error { + return errors.New("not supported") +} + +// reset cleans up the disk cache. +func (a *asyncnodebuffer) reset() { + a.mux.Lock() + defer a.mux.Unlock() + + a.current.reset() + a.background.reset() +} + +// empty returns an indicator if nodebuffer contains any state transition inside. +func (a *asyncnodebuffer) empty() bool { + a.mux.RLock() + defer a.mux.RUnlock() + + return a.current.empty() && a.background.empty() +} + +// setSize sets the buffer size to the provided number, and invokes a flush +// operation if the current memory usage exceeds the new limit. +//func (b *nodebuffer) setSize(size int, db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error { +// b.limit = uint64(size) +// return b.flush(db, clean, id, false) +//} + +// flush persists the in-memory dirty trie node into the disk if the configured +// memory threshold is reached. Note, all data must be written atomically. +func (a *asyncnodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64, force bool) error { + a.mux.Lock() + defer a.mux.Unlock() + + if force { + for { + if atomic.LoadUint64(&a.background.immutable) == 1 { + time.Sleep(time.Duration(DefaultBackgroundFlushInterval) * time.Second) + log.Info("waiting background memory table flush to disk for force flush node buffer") + continue + } + atomic.StoreUint64(&a.current.immutable, 1) + return a.current.flush(db, clean, id) + } + } + + if a.current.size < a.current.limit { + return nil + } + + // background flush doing + if atomic.LoadUint64(&a.background.immutable) == 1 { + return nil + } + + atomic.StoreUint64(&a.current.immutable, 1) + a.current, a.background = a.background, a.current + + go func(persistId uint64) { + for { + err := a.background.flush(db, clean, persistId) + if err == nil { + log.Debug("succeed to flush background nodecahce to disk", "state_id", persistId) + return + } + log.Error("failed to flush background nodecahce to disk", "state_id", persistId, "error", err) + } + }(id) + return nil +} + +func (a *asyncnodebuffer) getAllNodes() map[common.Hash]map[string]*trienode.Node { + a.mux.Lock() + defer a.mux.Unlock() + + cached, err := a.current.merge(a.background) + if err != nil { + log.Crit("[BUG] failed to merge nodecache under revert asyncnodebuffer", "error", err) + } + return cached.nodes +} + +func (a *asyncnodebuffer) getLayers() uint64 { + a.mux.RLock() + defer a.mux.RUnlock() + + return a.current.layers + a.background.layers +} + +func (a *asyncnodebuffer) getSize() (uint64, uint64) { + a.mux.RLock() + defer a.mux.RUnlock() + + return a.current.size, a.background.size +} + +type nodecache struct { + layers uint64 // The number of diff layers aggregated inside + size uint64 // The size of aggregated writes + limit uint64 // The maximum memory allowance in bytes + nodes map[common.Hash]map[string]*trienode.Node // The dirty node set, mapped by owner and path + immutable uint64 // The flag equal 1, flush nodes to disk background +} + +func newNodeCache(limit, size uint64, nodes map[common.Hash]map[string]*trienode.Node, layers uint64) *nodecache { + return &nodecache{ + layers: layers, + size: size, + limit: limit, + nodes: nodes, + immutable: 0, + } +} + +func (nc *nodecache) node(owner common.Hash, path []byte, hash common.Hash) (*trienode.Node, error) { + subset, ok := nc.nodes[owner] + if !ok { + return nil, nil + } + n, ok := subset[string(path)] + if !ok { + return nil, nil + } + if n.Hash != hash { + dirtyFalseMeter.Mark(1) + log.Error("Unexpected trie node in node buffer", "owner", owner, "path", path, "expect", hash, "got", n.Hash) + return nil, newUnexpectedNodeError("dirty", hash, n.Hash, owner, path, n.Blob) + } + return n, nil +} + +func (nc *nodecache) commit(nodes map[common.Hash]map[string]*trienode.Node) error { + if atomic.LoadUint64(&nc.immutable) == 1 { + return errWriteImmutable + } + var ( + delta int64 + overwrite int64 + overwriteSize int64 + ) + for owner, subset := range nodes { + current, exist := nc.nodes[owner] + if !exist { + // Allocate a new map for the subset instead of claiming it directly + // from the passed map to avoid potential concurrent map read/write. + // The nodes belong to original diff layer are still accessible even + // after merging, thus the ownership of nodes map should still belong + // to original layer and any mutation on it should be prevented. + current = make(map[string]*trienode.Node) + for path, n := range subset { + current[path] = n + delta += int64(len(n.Blob) + len(path)) + } + nc.nodes[owner] = current + continue + } + for path, n := range subset { + if orig, exist := current[path]; !exist { + delta += int64(len(n.Blob) + len(path)) + } else { + delta += int64(len(n.Blob) - len(orig.Blob)) + overwrite++ + overwriteSize += int64(len(orig.Blob) + len(path)) + } + current[path] = n + } + nc.nodes[owner] = current + } + nc.updateSize(delta) + nc.layers++ + gcNodesMeter.Mark(overwrite) + gcBytesMeter.Mark(overwriteSize) + return nil +} + +func (nc *nodecache) updateSize(delta int64) { + size := int64(nc.size) + delta + if size >= 0 { + nc.size = uint64(size) + return + } + s := nc.size + nc.size = 0 + log.Error("Invalid pathdb buffer size", "prev", common.StorageSize(s), "delta", common.StorageSize(delta)) +} + +func (nc *nodecache) reset() { + atomic.StoreUint64(&nc.immutable, 0) + nc.layers = 0 + nc.size = 0 + nc.nodes = make(map[common.Hash]map[string]*trienode.Node) +} + +func (nc *nodecache) empty() bool { + return nc.layers == 0 +} + +func (nc *nodecache) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error { + if atomic.LoadUint64(&nc.immutable) != 1 { + return errFlushMutable + } + + // Ensure the target state id is aligned with the internal counter. + head := rawdb.ReadPersistentStateID(db) + if head+nc.layers != id { + return fmt.Errorf("buffer layers (%d) cannot be applied on top of persisted state id (%d) to reach requested state id (%d)", nc.layers, head, id) + } + var ( + start = time.Now() + batch = db.NewBatchWithSize(int(float64(nc.size) * DefaultBatchRedundancyRate)) + ) + nodes := writeNodes(batch, nc.nodes, clean) + rawdb.WritePersistentStateID(batch, id) + + // Flush all mutations in a single batch + size := batch.ValueSize() + if err := batch.Write(); err != nil { + return err + } + commitBytesMeter.Mark(int64(size)) + commitNodesMeter.Mark(int64(nodes)) + commitTimeTimer.UpdateSince(start) + log.Debug("Persisted pathdb nodes", "nodes", len(nc.nodes), "bytes", common.StorageSize(size), "elapsed", common.PrettyDuration(time.Since(start))) + nc.reset() + return nil +} + +func (nc *nodecache) merge(nc1 *nodecache) (*nodecache, error) { + if nc == nil && nc1 == nil { + return nil, nil + } + if nc == nil || nc.empty() { + res := copyNodeCache(nc1) + atomic.StoreUint64(&res.immutable, 0) + return nc1, nil + } + if nc1 == nil || nc1.empty() { + res := copyNodeCache(nc) + atomic.StoreUint64(&res.immutable, 0) + return nc, nil + } + if atomic.LoadUint64(&nc.immutable) == atomic.LoadUint64(&nc1.immutable) { + return nil, errIncompatibleMerge + } + + var ( + immutable *nodecache + mutable *nodecache + res = &nodecache{} + ) + if atomic.LoadUint64(&nc.immutable) == 1 { + immutable = nc + mutable = nc1 + } else { + immutable = nc1 + mutable = nc + } + res.size = immutable.size + mutable.size + res.layers = immutable.layers + mutable.layers + res.limit = immutable.size + res.nodes = make(map[common.Hash]map[string]*trienode.Node) + for acc, subTree := range immutable.nodes { + if _, ok := res.nodes[acc]; !ok { + res.nodes[acc] = make(map[string]*trienode.Node) + } + for path, node := range subTree { + res.nodes[acc][path] = node + } + } + + for acc, subTree := range mutable.nodes { + if _, ok := res.nodes[acc]; !ok { + res.nodes[acc] = make(map[string]*trienode.Node) + } + for path, node := range subTree { + res.nodes[acc][path] = node + } + } + return res, nil +} + +func (nc *nodecache) revert(db ethdb.KeyValueReader, nodes map[common.Hash]map[string]*trienode.Node) error { + if atomic.LoadUint64(&nc.immutable) == 1 { + return errRevertImmutable + } + + // Short circuit if no embedded state transition to revert. + if nc.layers == 0 { + return errStateUnrecoverable + } + nc.layers-- + + // Reset the entire buffer if only a single transition left. + if nc.layers == 0 { + nc.reset() + return nil + } + var delta int64 + for owner, subset := range nodes { + current, ok := nc.nodes[owner] + if !ok { + panic(fmt.Sprintf("non-existent subset (%x)", owner)) + } + for path, n := range subset { + orig, ok := current[path] + if !ok { + // There is a special case in MPT that one child is removed from + // a fullNode which only has two children, and then a new child + // with different position is immediately inserted into the fullNode. + // In this case, the clean child of the fullNode will also be + // marked as dirty because of node collapse and expansion. + // + // In case of database rollback, don't panic if this "clean" + // node occurs which is not present in buffer. + var nhash common.Hash + if owner == (common.Hash{}) { + _, nhash = rawdb.ReadAccountTrieNode(db, []byte(path)) + } else { + _, nhash = rawdb.ReadStorageTrieNode(db, owner, []byte(path)) + } + // Ignore the clean node in the case described above. + if nhash == n.Hash { + continue + } + panic(fmt.Sprintf("non-existent node (%x %v) blob: %v", owner, path, crypto.Keccak256Hash(n.Blob).Hex())) + } + current[path] = n + delta += int64(len(n.Blob)) - int64(len(orig.Blob)) + } + } + nc.updateSize(delta) + return nil +} + +func copyNodeCache(n *nodecache) *nodecache { + if n == nil { + return nil + } + nc := &nodecache{ + layers: n.layers, + size: n.size, + limit: n.limit, + immutable: atomic.LoadUint64(&n.immutable), + nodes: make(map[common.Hash]map[string]*trienode.Node), + } + for acc, subTree := range n.nodes { + if _, ok := nc.nodes[acc]; !ok { + nc.nodes[acc] = make(map[string]*trienode.Node) + } + for path, node := range subTree { + nc.nodes[acc][path] = node + } + } + return nc +} diff --git a/trie/triedb/pathdb/database.go b/trie/triedb/pathdb/database.go index ee57e50b6a..167db6643d 100644 --- a/trie/triedb/pathdb/database.go +++ b/trie/triedb/pathdb/database.go @@ -52,6 +52,14 @@ const ( // Do not increase the buffer size arbitrarily, otherwise the system // pause time will increase when the database writes happen. DefaultDirtyBufferSize = 64 * 1024 * 1024 + + // DefaultBackgroundFlushInterval defines the default the wait interval + // that background node cache flush disk. + DefaultBackgroundFlushInterval = 3 + + // DefaultBatchRedundancyRate defines the batch size, compatible write + // size calculation is inaccurate + DefaultBatchRedundancyRate = 1.1 ) // layer is the interface implemented by all state layers which includes some @@ -86,6 +94,7 @@ type layer interface { // Config contains the settings for database. type Config struct { + SyncFlush bool // Flag of trienodebuffer sync flush cache to disk StateHistory uint64 // Number of recent blocks to maintain state history for CleanCacheSize int // Maximum memory allowance (in bytes) for caching clean nodes DirtyCacheSize int // Maximum memory allowance (in bytes) for caching dirty nodes @@ -180,7 +189,7 @@ func New(diskdb ethdb.Database, config *Config) *Database { log.Warn("Truncated extra state histories", "number", pruned) } } - log.Warn("Path-based state scheme is an experimental feature") + log.Warn("Path-based state scheme is an experimental feature", "sync", db.config.SyncFlush) return db } @@ -283,7 +292,7 @@ func (db *Database) Reset(root common.Hash) error { } // Re-construct a new disk layer backed by persistent state // with **empty clean cache and node buffer**. - dl := newDiskLayer(root, 0, db, nil, newNodeBuffer(db.bufferSize, nil, 0)) + dl := newDiskLayer(root, 0, db, nil, NewTrieNodeBuffer(db.config.SyncFlush, db.bufferSize, nil, 0)) db.tree.reset(dl) log.Info("Rebuilt trie database", "root", root) return nil @@ -384,16 +393,16 @@ func (db *Database) Close() error { // Size returns the current storage size of the memory cache in front of the // persistent database layer. -func (db *Database) Size() (size common.StorageSize) { +func (db *Database) Size() (diffs common.StorageSize, nodes common.StorageSize, immutableNodes common.StorageSize) { db.tree.forEach(func(layer layer) { if diff, ok := layer.(*diffLayer); ok { - size += common.StorageSize(diff.memory) + diffs += common.StorageSize(diff.memory) } if disk, ok := layer.(*diskLayer); ok { - size += disk.size() + nodes, immutableNodes = disk.size() } }) - return size + return diffs, nodes, immutableNodes } // Initialized returns an indicator if the state data is already @@ -425,3 +434,10 @@ func (db *Database) SetBufferSize(size int) error { func (db *Database) Scheme() string { return rawdb.PathScheme } + +// Head return the top non-fork difflayer/disklayer root hash for rewinding. +func (db *Database) Head() common.Hash { + db.lock.Lock() + defer db.lock.Unlock() + return db.tree.front() +} diff --git a/trie/triedb/pathdb/difflayer.go b/trie/triedb/pathdb/difflayer.go index 10567715d2..ccd8d36764 100644 --- a/trie/triedb/pathdb/difflayer.go +++ b/trie/triedb/pathdb/difflayer.go @@ -71,7 +71,7 @@ func newDiffLayer(parent layer, root common.Hash, id uint64, block uint64, nodes dirtyWriteMeter.Mark(size) diffLayerNodesMeter.Mark(int64(count)) diffLayerBytesMeter.Mark(int64(dl.memory)) - log.Debug("Created new diff layer", "id", id, "block", block, "nodes", count, "size", common.StorageSize(dl.memory)) + log.Debug("Created new diff layer", "id", id, "block", block, "nodes", count, "size", common.StorageSize(dl.memory), "root", dl.root) return dl } diff --git a/trie/triedb/pathdb/disklayer.go b/trie/triedb/pathdb/disklayer.go index ef697cbce8..59be04c744 100644 --- a/trie/triedb/pathdb/disklayer.go +++ b/trie/triedb/pathdb/disklayer.go @@ -25,25 +25,77 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/trie/trienode" "github.com/ethereum/go-ethereum/trie/triestate" "golang.org/x/crypto/sha3" ) +// trienodebuffer is a collection of modified trie nodes to aggregate the disk +// write. The content of the trienodebuffer must be checked before diving into +// disk (since it basically is not-yet-written data). +type trienodebuffer interface { + // node retrieves the trie node with given node info. + node(owner common.Hash, path []byte, hash common.Hash) (*trienode.Node, error) + + // commit merges the dirty nodes into the trienodebuffer. This operation won't take + // the ownership of the nodes map which belongs to the bottom-most diff layer. + // It will just hold the node references from the given map which are safe to + // copy. + commit(nodes map[common.Hash]map[string]*trienode.Node) trienodebuffer + + // revert is the reverse operation of commit. It also merges the provided nodes + // into the trienodebuffer, the difference is that the provided node set should + // revert the changes made by the last state transition. + revert(db ethdb.KeyValueReader, nodes map[common.Hash]map[string]*trienode.Node) error + + // flush persists the in-memory dirty trie node into the disk if the configured + // memory threshold is reached. Note, all data must be written atomically. + flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64, force bool) error + + // setSize sets the buffer size to the provided number, and invokes a flush + // operation if the current memory usage exceeds the new limit. + setSize(size int, db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error + + // reset cleans up the disk cache. + reset() + + // empty returns an indicator if trienodebuffer contains any state transition inside. + empty() bool + + // getSize return the trienodebuffer used size. + getSize() (uint64, uint64) + + // getAllNodes return all the trie nodes are cached in trienodebuffer. + getAllNodes() map[common.Hash]map[string]*trienode.Node + + // getLayers return the size of cached difflayers. + getLayers() uint64 +} + +func NewTrieNodeBuffer(sync bool, limit int, nodes map[common.Hash]map[string]*trienode.Node, layers uint64) trienodebuffer { + if sync { + log.Info("new sync node buffer", "limit", common.StorageSize(limit), "layers", layers) + return newNodeBuffer(limit, nodes, layers) + } + log.Info("new async node buffer", "limit", common.StorageSize(limit), "layers", layers) + return newAsyncNodeBuffer(limit, nodes, layers) +} + // diskLayer is a low level persistent layer built on top of a key-value store. type diskLayer struct { root common.Hash // Immutable, root hash to which this layer was made for id uint64 // Immutable, corresponding state id db *Database // Path-based trie database cleans *fastcache.Cache // GC friendly memory cache of clean node RLPs - buffer *nodebuffer // Node buffer to aggregate writes + buffer trienodebuffer // Node buffer to aggregate writes stale bool // Signals that the layer became stale (state progressed) lock sync.RWMutex // Lock used to protect stale flag } // newDiskLayer creates a new disk layer based on the passing arguments. -func newDiskLayer(root common.Hash, id uint64, db *Database, cleans *fastcache.Cache, buffer *nodebuffer) *diskLayer { +func newDiskLayer(root common.Hash, id uint64, db *Database, cleans *fastcache.Cache, buffer trienodebuffer) *diskLayer { // Initialize a clean cache if the memory allowance is not zero // or reuse the provided cache if it is not nil (inherited from // the original disk layer). @@ -282,7 +334,7 @@ func (dl *diskLayer) revert(h *history, loader triestate.TrieLoader) (*diskLayer return newDiskLayer(h.meta.parent, dl.id-1, dl.db, dl.cleans, dl.buffer), nil } -// setBufferSize sets the node buffer size to the provided value. +// setBufferSize sets the trie node buffer size to the provided value. func (dl *diskLayer) setBufferSize(size int) error { dl.lock.RLock() defer dl.lock.RUnlock() @@ -294,14 +346,15 @@ func (dl *diskLayer) setBufferSize(size int) error { } // size returns the approximate size of cached nodes in the disk layer. -func (dl *diskLayer) size() common.StorageSize { +func (dl *diskLayer) size() (common.StorageSize, common.StorageSize) { dl.lock.RLock() defer dl.lock.RUnlock() if dl.stale { - return 0 + return 0, 0 } - return common.StorageSize(dl.buffer.size) + dirtyNodes, dirtyimmutableNodes := dl.buffer.getSize() + return common.StorageSize(dirtyNodes), common.StorageSize(dirtyimmutableNodes) } // resetCache releases the memory held by clean cache to prevent memory leak. diff --git a/trie/triedb/pathdb/errors.go b/trie/triedb/pathdb/errors.go index f6ac0ec4a0..03114f9d65 100644 --- a/trie/triedb/pathdb/errors.go +++ b/trie/triedb/pathdb/errors.go @@ -45,6 +45,20 @@ var ( // errUnexpectedNode is returned if the requested node with specified path is // not hash matched with expectation. errUnexpectedNode = errors.New("unexpected node") + + // errWriteImmutable is returned if write to background immutable nodecache + // under asyncnodebuffer + errWriteImmutable = errors.New("write immutable nodecache") + + // errFlushMutable is returned if flush the background mutable nodecache + // to disk, under asyncnodebuffer + errFlushMutable = errors.New("flush mutable nodecache") + + // errIncompatibleMerge is returned when merge node cache occurs error. + errIncompatibleMerge = errors.New("incompatible nodecache merge") + + // errRevertImmutable is returned if revert the background immutable nodecache + errRevertImmutable = errors.New("revert immutable nodecache") ) func newUnexpectedNodeError(loc string, expHash common.Hash, gotHash common.Hash, owner common.Hash, path []byte, blob []byte) error { diff --git a/trie/triedb/pathdb/journal.go b/trie/triedb/pathdb/journal.go index ea90207f29..3c7884cdf4 100644 --- a/trie/triedb/pathdb/journal.go +++ b/trie/triedb/pathdb/journal.go @@ -130,7 +130,7 @@ func (db *Database) loadLayers() layer { log.Info("Failed to load journal, discard it", "err", err) } // Return single layer with persistent state. - return newDiskLayer(root, rawdb.ReadPersistentStateID(db.diskdb), db, nil, newNodeBuffer(db.bufferSize, nil, 0)) + return newDiskLayer(root, rawdb.ReadPersistentStateID(db.diskdb), db, nil, NewTrieNodeBuffer(db.config.SyncFlush, db.bufferSize, nil, 0)) } // loadDiskLayer reads the binary blob from the layer journal, reconstructing @@ -170,7 +170,7 @@ func (db *Database) loadDiskLayer(r *rlp.Stream) (layer, error) { nodes[entry.Owner] = subset } // Calculate the internal state transitions by id difference. - base := newDiskLayer(root, id, db, nil, newNodeBuffer(db.bufferSize, nodes, id-stored)) + base := newDiskLayer(root, id, db, nil, NewTrieNodeBuffer(db.config.SyncFlush, db.bufferSize, nodes, id-stored)) return base, nil } @@ -260,8 +260,9 @@ func (dl *diskLayer) journal(w io.Writer) error { return err } // Step three, write all unwritten nodes into the journal - nodes := make([]journalNodes, 0, len(dl.buffer.nodes)) - for owner, subset := range dl.buffer.nodes { + bufferNodes := dl.buffer.getAllNodes() + nodes := make([]journalNodes, 0, len(bufferNodes)) + for owner, subset := range bufferNodes { entry := journalNodes{Owner: owner} for path, node := range subset { entry.Nodes = append(entry.Nodes, journalNode{Path: []byte(path), Blob: node.Blob}) @@ -271,7 +272,7 @@ func (dl *diskLayer) journal(w io.Writer) error { if err := rlp.Encode(w, nodes); err != nil { return err } - log.Debug("Journaled pathdb disk layer", "root", dl.root, "nodes", len(dl.buffer.nodes)) + log.Debug("Journaled pathdb disk layer", "root", dl.root, "nodes", len(bufferNodes)) return nil } @@ -344,9 +345,9 @@ func (db *Database) Journal(root common.Hash) error { } disk := db.tree.bottom() if l, ok := l.(*diffLayer); ok { - log.Info("Persisting dirty state to disk", "head", l.block, "root", root, "layers", l.id-disk.id+disk.buffer.layers) + log.Info("Persisting dirty state to disk", "head", l.block, "root", root, "layers", l.id-disk.id+disk.buffer.getLayers()) } else { // disk layer only on noop runs (likely) or deep reorgs (unlikely) - log.Info("Persisting dirty state to disk", "root", root, "layers", disk.buffer.layers) + log.Info("Persisting dirty state to disk", "root", root, "layers", disk.buffer.getLayers()) } start := time.Now() diff --git a/trie/triedb/pathdb/layertree.go b/trie/triedb/pathdb/layertree.go index d314779910..ed94d2e19e 100644 --- a/trie/triedb/pathdb/layertree.go +++ b/trie/triedb/pathdb/layertree.go @@ -23,6 +23,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/trie/trienode" "github.com/ethereum/go-ethereum/trie/triestate" ) @@ -212,3 +213,46 @@ func (tree *layerTree) bottom() *diskLayer { } return current.(*diskLayer) } + +// front return the top non-fork difflayer/disklayer root hash for rewinding. +func (tree *layerTree) front() common.Hash { + tree.lock.RLock() + defer tree.lock.RUnlock() + + chain := make(map[common.Hash][]common.Hash) + var base common.Hash + for _, layer := range tree.layers { + switch dl := layer.(type) { + case *diskLayer: + if dl.stale { + log.Info("pathdb top disklayer is stale") + return base + } + base = dl.rootHash() + case *diffLayer: + if _, ok := chain[dl.parentLayer().rootHash()]; !ok { + chain[dl.parentLayer().rootHash()] = make([]common.Hash, 0) + } + chain[dl.parentLayer().rootHash()] = append(chain[dl.parentLayer().rootHash()], dl.rootHash()) + default: + log.Crit("unsupported layer type") + } + } + if (base == common.Hash{}) { + log.Info("pathdb top difflayer is empty") + return base + } + parent := base + for { + children, ok := chain[parent] + if !ok { + log.Info("pathdb top difflayer", "root", parent) + return parent + } + if len(children) != 1 { + log.Info("pathdb top difflayer is forked", "common ancestor root", parent) + return parent + } + parent = children[0] + } +} diff --git a/trie/triedb/pathdb/nodebuffer.go b/trie/triedb/pathdb/nodebuffer.go index 4a7d328b9a..9d79df37ff 100644 --- a/trie/triedb/pathdb/nodebuffer.go +++ b/trie/triedb/pathdb/nodebuffer.go @@ -29,6 +29,8 @@ import ( "github.com/ethereum/go-ethereum/trie/trienode" ) +var _ trienodebuffer = &nodebuffer{} + // nodebuffer is a collection of modified trie nodes to aggregate the disk // write. The content of the nodebuffer must be checked before diving into // disk (since it basically is not-yet-written data). @@ -80,7 +82,7 @@ func (b *nodebuffer) node(owner common.Hash, path []byte, hash common.Hash) (*tr // the ownership of the nodes map which belongs to the bottom-most diff layer. // It will just hold the node references from the given map which are safe to // copy. -func (b *nodebuffer) commit(nodes map[common.Hash]map[string]*trienode.Node) *nodebuffer { +func (b *nodebuffer) commit(nodes map[common.Hash]map[string]*trienode.Node) trienodebuffer { var ( delta int64 overwrite int64 @@ -97,14 +99,14 @@ func (b *nodebuffer) commit(nodes map[common.Hash]map[string]*trienode.Node) *no current = make(map[string]*trienode.Node) for path, n := range subset { current[path] = n - delta += int64(len(n.Blob) + len(path)) + delta += int64(len(n.Blob) + len(path) + len(owner)) } b.nodes[owner] = current continue } for path, n := range subset { if orig, exist := current[path]; !exist { - delta += int64(len(n.Blob) + len(path)) + delta += int64(len(n.Blob) + len(path) + len(owner)) } else { delta += int64(len(n.Blob) - len(orig.Blob)) overwrite++ @@ -217,7 +219,11 @@ func (b *nodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id ui } var ( start = time.Now() - batch = db.NewBatchWithSize(int(b.size)) + // Although the calculation of b.size has been as accurate as possible, + // some omissions were still found during testing and code review, but + // we are still not sure it is completely accurate. For better protection, + // some redundancy is added here. + batch = db.NewBatchWithSize(int(float64(b.size) * DefaultBatchRedundancyRate)) ) nodes := writeNodes(batch, b.nodes, clean) rawdb.WritePersistentStateID(batch, id) @@ -273,3 +279,18 @@ func cacheKey(owner common.Hash, path []byte) []byte { } return append(owner.Bytes(), path...) } + +// getSize return the nodebuffer used size. +func (b *nodebuffer) getSize() (uint64, uint64) { + return b.size, 0 +} + +// getAllNodes return all the trie nodes are cached in nodebuffer. +func (b *nodebuffer) getAllNodes() map[common.Hash]map[string]*trienode.Node { + return b.nodes +} + +// getLayers return the size of cached difflayers. +func (b *nodebuffer) getLayers() uint64 { + return b.layers +}