From becbe8fdb8d5c3b40e107214c2c0017382135b2d Mon Sep 17 00:00:00 2001 From: joeycli Date: Fri, 3 Nov 2023 20:46:28 +0800 Subject: [PATCH] fix: code review comments --- cmd/geth/snapshot.go | 12 +++++++- core/blockchain.go | 8 +++--- eth/handler.go | 9 +++--- trie/triedb/pathdb/asyncnodebuffer.go | 41 ++++++++++++++++++++++----- trie/triedb/pathdb/database.go | 19 ++++++------- trie/triedb/pathdb/disklayer.go | 24 +++++++++------- trie/triedb/pathdb/layertree.go | 6 +++- trie/triedb/pathdb/nodebuffer.go | 12 +++++--- 8 files changed, 89 insertions(+), 42 deletions(-) diff --git a/cmd/geth/snapshot.go b/cmd/geth/snapshot.go index a9695d05ca..c6ce0c7840 100644 --- a/cmd/geth/snapshot.go +++ b/cmd/geth/snapshot.go @@ -361,7 +361,17 @@ func pruneBlock(ctx *cli.Context) error { if path == "" { return errors.New("prune failed, did not specify the AncientPath") } - if !strings.HasSuffix(oldAncientPath, "geth/chaindata/ancient/chain") { + newVersionPath := false + files, err := os.ReadDir(oldAncientPath) + if err != nil { + return err + } + for _, file := range files { + if file.IsDir() && file.Name() == "chain" { + newVersionPath = true + } + } + if newVersionPath && !strings.HasSuffix(oldAncientPath, "geth/chaindata/ancient/chain") { log.Error("datadir.ancient subdirectory incorrect", "got path", oldAncientPath, "want subdirectory", "geth/chaindata/ancient/chain/") return errors.New("datadir.ancient subdirectory incorrect") } diff --git a/core/blockchain.go b/core/blockchain.go index fd46af2f14..454c9768a6 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -695,18 +695,18 @@ func (bc *BlockChain) loadLastState() error { blockTd = bc.GetTd(headBlock.Hash(), headBlock.NumberU64()) ) if headHeader.Hash() != headBlock.Hash() { - log.Info("Loaded most recent local header", "number", headHeader.Number, "hash", headHeader.Hash(), "td", headerTd, "age", common.PrettyAge(time.Unix(int64(headHeader.Time), 0))) + log.Info("Loaded most recent local header", "number", headHeader.Number, "hash", headHeader.Hash(), "hash", headHeader.Root, "td", headerTd, "age", common.PrettyAge(time.Unix(int64(headHeader.Time), 0))) } - log.Info("Loaded most recent local block", "number", headBlock.Number(), "hash", headBlock.Hash(), "td", blockTd, "age", common.PrettyAge(time.Unix(int64(headBlock.Time()), 0))) + log.Info("Loaded most recent local block", "number", headBlock.Number(), "hash", headBlock.Hash(), "root", headBlock.Root(), "td", blockTd, "age", common.PrettyAge(time.Unix(int64(headBlock.Time()), 0))) if headBlock.Hash() != currentSnapBlock.Hash() { snapTd := bc.GetTd(currentSnapBlock.Hash(), currentSnapBlock.Number.Uint64()) - log.Info("Loaded most recent local snap block", "number", currentSnapBlock.Number, "hash", currentSnapBlock.Hash(), "td", snapTd, "age", common.PrettyAge(time.Unix(int64(currentSnapBlock.Time), 0))) + log.Info("Loaded most recent local snap block", "number", currentSnapBlock.Number, "hash", currentSnapBlock.Hash(), "root", currentSnapBlock.Root, "td", snapTd, "age", common.PrettyAge(time.Unix(int64(currentSnapBlock.Time), 0))) } if posa, ok := bc.engine.(consensus.PoSA); ok { if currentFinalizedHeader := posa.GetFinalizedHeader(bc, headHeader); currentFinalizedHeader != nil { if currentFinalizedBlock := bc.GetBlockByHash(currentFinalizedHeader.Hash()); currentFinalizedBlock != nil { finalTd := bc.GetTd(currentFinalizedBlock.Hash(), currentFinalizedBlock.NumberU64()) - log.Info("Loaded most recent local finalized block", "number", currentFinalizedBlock.Number(), "hash", currentFinalizedBlock.Hash(), "td", finalTd, "age", common.PrettyAge(time.Unix(int64(currentFinalizedBlock.Time()), 0))) + log.Info("Loaded most recent local finalized block", "number", currentFinalizedBlock.Number(), "hash", currentFinalizedBlock.Hash(), "root", currentFinalizedBlock.Root(), "td", finalTd, "age", common.PrettyAge(time.Unix(int64(currentFinalizedBlock.Time()), 0))) } } } diff --git a/eth/handler.go b/eth/handler.go index c6a2c61308..e59bbb4884 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -45,7 +45,6 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p" - "github.com/ethereum/go-ethereum/trie/triedb/pathdb" ) const ( @@ -979,7 +978,9 @@ func (h *handler) voteBroadcastLoop() { // sync is finished. func (h *handler) enableSyncedFeatures() { h.acceptTxs.Store(true) - if h.chain.TrieDB().Scheme() == rawdb.PathScheme { - h.chain.TrieDB().SetBufferSize(pathdb.DefaultDirtyBufferSize) - } + // In the bsc scenario, pathdb.MaxDirtyBufferSize (256MB) will be used. + // The performance is better than DefaultDirtyBufferSize (64MB). + //if h.chain.TrieDB().Scheme() == rawdb.PathScheme { + // h.chain.TrieDB().SetBufferSize(pathdb.DefaultDirtyBufferSize) + //} } diff --git a/trie/triedb/pathdb/asyncnodebuffer.go b/trie/triedb/pathdb/asyncnodebuffer.go index ce47b2845f..7f8a1c822f 100644 --- a/trie/triedb/pathdb/asyncnodebuffer.go +++ b/trie/triedb/pathdb/asyncnodebuffer.go @@ -1,6 +1,7 @@ package pathdb import ( + "errors" "fmt" "sync" "sync/atomic" @@ -82,14 +83,18 @@ func (a *asyncnodebuffer) revert(db ethdb.KeyValueReader, nodes map[common.Hash] var err error a.current, err = a.current.merge(a.background) - a.background.reset() if err != nil { - log.Crit("[BUG] failed to merge memory table under revert nodebuffer", "error", err) + log.Crit("[BUG] failed to merge node cache under revert async node buffer", "error", err) } a.background.reset() return a.current.revert(db, nodes) } +// setSize is unsupported in asyncnodebuffer, due to the double buffer, blocking will occur. +func (a *asyncnodebuffer) setSize(size int, db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error { + return errors.New("not supported") +} + // reset cleans up the disk cache. func (a *asyncnodebuffer) reset() { a.mux.Lock() @@ -142,9 +147,7 @@ func (a *asyncnodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, } atomic.StoreUint64(&a.current.immutable, 1) - tmp := a.background - a.background = a.current - a.current = tmp + a.current, a.background = a.background, a.current persistId := id - a.current.layers go func() { @@ -321,11 +324,13 @@ func (nc *nodecache) merge(nc1 *nodecache) (*nodecache, error) { return nil, nil } if nc == nil || nc.empty() { - atomic.StoreUint64(&nc1.immutable, 0) + res := copyNodeCache(nc1) + atomic.StoreUint64(&res.immutable, 0) return nc1, nil } if nc1 == nil || nc1.empty() { - atomic.StoreUint64(&nc.immutable, 0) + res := copyNodeCache(nc) + atomic.StoreUint64(&res.immutable, 0) return nc, nil } if atomic.LoadUint64(&nc.immutable) == atomic.LoadUint64(&nc1.immutable) { @@ -420,3 +425,25 @@ func (nc *nodecache) revert(db ethdb.KeyValueReader, nodes map[common.Hash]map[s nc.updateSize(delta) return nil } + +func copyNodeCache(n *nodecache) *nodecache { + if n == nil { + return nil + } + nc := &nodecache{ + layers: n.layers, + size: n.size, + limit: n.limit, + immutable: atomic.LoadUint64(&n.immutable), + nodes: make(map[common.Hash]map[string]*trienode.Node), + } + for acc, subTree := range n.nodes { + if _, ok := nc.nodes[acc]; !ok { + nc.nodes[acc] = make(map[string]*trienode.Node) + } + for path, node := range subTree { + nc.nodes[acc][path] = node + } + } + return nc +} diff --git a/trie/triedb/pathdb/database.go b/trie/triedb/pathdb/database.go index 67f93975c8..167db6643d 100644 --- a/trie/triedb/pathdb/database.go +++ b/trie/triedb/pathdb/database.go @@ -419,18 +419,15 @@ func (db *Database) Initialized(genesisRoot common.Hash) bool { // SetBufferSize sets the node buffer size to the provided value(in bytes). func (db *Database) SetBufferSize(size int) error { - // disable SetBufferSize after init db - return nil + db.lock.Lock() + defer db.lock.Unlock() - //db.lock.Lock() - //defer db.lock.Unlock() - // - //if size > MaxDirtyBufferSize { - // log.Info("Capped node buffer size", "provided", common.StorageSize(size), "adjusted", common.StorageSize(MaxDirtyBufferSize)) - // size = MaxDirtyBufferSize - //} - //db.bufferSize = size - //return db.tree.bottom().setBufferSize(db.bufferSize) + if size > MaxDirtyBufferSize { + log.Info("Capped node buffer size", "provided", common.StorageSize(size), "adjusted", common.StorageSize(MaxDirtyBufferSize)) + size = MaxDirtyBufferSize + } + db.bufferSize = size + return db.tree.bottom().setBufferSize(db.bufferSize) } // Scheme returns the node scheme used in the database. diff --git a/trie/triedb/pathdb/disklayer.go b/trie/triedb/pathdb/disklayer.go index 540b977b50..59be04c744 100644 --- a/trie/triedb/pathdb/disklayer.go +++ b/trie/triedb/pathdb/disklayer.go @@ -54,6 +54,10 @@ type trienodebuffer interface { // memory threshold is reached. Note, all data must be written atomically. flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64, force bool) error + // setSize sets the buffer size to the provided number, and invokes a flush + // operation if the current memory usage exceeds the new limit. + setSize(size int, db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error + // reset cleans up the disk cache. reset() @@ -330,16 +334,16 @@ func (dl *diskLayer) revert(h *history, loader triestate.TrieLoader) (*diskLayer return newDiskLayer(h.meta.parent, dl.id-1, dl.db, dl.cleans, dl.buffer), nil } -// setBufferSize sets the node buffer size to the provided value. -//func (dl *diskLayer) setBufferSize(size int) error { -// dl.lock.RLock() -// defer dl.lock.RUnlock() -// -// if dl.stale { -// return errSnapshotStale -// } -// return dl.buffer.setSize(size, dl.db.diskdb, dl.cleans, dl.id) -//} +// setBufferSize sets the trie node buffer size to the provided value. +func (dl *diskLayer) setBufferSize(size int) error { + dl.lock.RLock() + defer dl.lock.RUnlock() + + if dl.stale { + return errSnapshotStale + } + return dl.buffer.setSize(size, dl.db.diskdb, dl.cleans, dl.id) +} // size returns the approximate size of cached nodes in the disk layer. func (dl *diskLayer) size() (common.StorageSize, common.StorageSize) { diff --git a/trie/triedb/pathdb/layertree.go b/trie/triedb/pathdb/layertree.go index 9f7e24bfe7..ed94d2e19e 100644 --- a/trie/triedb/pathdb/layertree.go +++ b/trie/triedb/pathdb/layertree.go @@ -225,6 +225,7 @@ func (tree *layerTree) front() common.Hash { switch dl := layer.(type) { case *diskLayer: if dl.stale { + log.Info("pathdb top disklayer is stale") return base } base = dl.rootHash() @@ -234,19 +235,22 @@ func (tree *layerTree) front() common.Hash { } chain[dl.parentLayer().rootHash()] = append(chain[dl.parentLayer().rootHash()], dl.rootHash()) default: - log.Warn("unsupported layer type") + log.Crit("unsupported layer type") } } if (base == common.Hash{}) { + log.Info("pathdb top difflayer is empty") return base } parent := base for { children, ok := chain[parent] if !ok { + log.Info("pathdb top difflayer", "root", parent) return parent } if len(children) != 1 { + log.Info("pathdb top difflayer is forked", "common ancestor root", parent) return parent } parent = children[0] diff --git a/trie/triedb/pathdb/nodebuffer.go b/trie/triedb/pathdb/nodebuffer.go index 6800e621a1..9d79df37ff 100644 --- a/trie/triedb/pathdb/nodebuffer.go +++ b/trie/triedb/pathdb/nodebuffer.go @@ -201,10 +201,10 @@ func (b *nodebuffer) empty() bool { // setSize sets the buffer size to the provided number, and invokes a flush // operation if the current memory usage exceeds the new limit. -//func (b *nodebuffer) setSize(size int, db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error { -// b.limit = uint64(size) -// return b.flush(db, clean, id, false) -//} +func (b *nodebuffer) setSize(size int, db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error { + b.limit = uint64(size) + return b.flush(db, clean, id, false) +} // flush persists the in-memory dirty trie node into the disk if the configured // memory threshold is reached. Note, all data must be written atomically. @@ -219,6 +219,10 @@ func (b *nodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id ui } var ( start = time.Now() + // Although the calculation of b.size has been as accurate as possible, + // some omissions were still found during testing and code review, but + // we are still not sure it is completely accurate. For better protection, + // some redundancy is added here. batch = db.NewBatchWithSize(int(float64(b.size) * DefaultBatchRedundancyRate)) ) nodes := writeNodes(batch, b.nodes, clean)