Skip to content

Commit

Permalink
fix: code review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
joeylichang committed Nov 6, 2023
1 parent 3c87f78 commit 7aeb811
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 45 deletions.
12 changes: 11 additions & 1 deletion cmd/geth/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,17 @@ func pruneBlock(ctx *cli.Context) error {
if path == "" {
return errors.New("prune failed, did not specify the AncientPath")
}
if !strings.HasSuffix(oldAncientPath, "geth/chaindata/ancient/chain") {
newVersionPath := false
files, err := os.ReadDir(oldAncientPath)
if err != nil {
return err
}
for _, file := range files {
if file.IsDir() && file.Name() == "chain" {
newVersionPath = true
}
}
if newVersionPath && !strings.HasSuffix(oldAncientPath, "geth/chaindata/ancient/chain") {
log.Error("datadir.ancient subdirectory incorrect", "got path", oldAncientPath, "want subdirectory", "geth/chaindata/ancient/chain/")
return errors.New("datadir.ancient subdirectory incorrect")
}
Expand Down
8 changes: 4 additions & 4 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -695,18 +695,18 @@ func (bc *BlockChain) loadLastState() error {
blockTd = bc.GetTd(headBlock.Hash(), headBlock.NumberU64())
)
if headHeader.Hash() != headBlock.Hash() {
log.Info("Loaded most recent local header", "number", headHeader.Number, "hash", headHeader.Hash(), "td", headerTd, "age", common.PrettyAge(time.Unix(int64(headHeader.Time), 0)))
log.Info("Loaded most recent local header", "number", headHeader.Number, "hash", headHeader.Hash(), "hash", headHeader.Root, "td", headerTd, "age", common.PrettyAge(time.Unix(int64(headHeader.Time), 0)))
}
log.Info("Loaded most recent local block", "number", headBlock.Number(), "hash", headBlock.Hash(), "td", blockTd, "age", common.PrettyAge(time.Unix(int64(headBlock.Time()), 0)))
log.Info("Loaded most recent local block", "number", headBlock.Number(), "hash", headBlock.Hash(), "root", headBlock.Root(), "td", blockTd, "age", common.PrettyAge(time.Unix(int64(headBlock.Time()), 0)))
if headBlock.Hash() != currentSnapBlock.Hash() {
snapTd := bc.GetTd(currentSnapBlock.Hash(), currentSnapBlock.Number.Uint64())
log.Info("Loaded most recent local snap block", "number", currentSnapBlock.Number, "hash", currentSnapBlock.Hash(), "td", snapTd, "age", common.PrettyAge(time.Unix(int64(currentSnapBlock.Time), 0)))
log.Info("Loaded most recent local snap block", "number", currentSnapBlock.Number, "hash", currentSnapBlock.Hash(), "root", currentSnapBlock.Root, "td", snapTd, "age", common.PrettyAge(time.Unix(int64(currentSnapBlock.Time), 0)))
}
if posa, ok := bc.engine.(consensus.PoSA); ok {
if currentFinalizedHeader := posa.GetFinalizedHeader(bc, headHeader); currentFinalizedHeader != nil {
if currentFinalizedBlock := bc.GetBlockByHash(currentFinalizedHeader.Hash()); currentFinalizedBlock != nil {
finalTd := bc.GetTd(currentFinalizedBlock.Hash(), currentFinalizedBlock.NumberU64())
log.Info("Loaded most recent local finalized block", "number", currentFinalizedBlock.Number(), "hash", currentFinalizedBlock.Hash(), "td", finalTd, "age", common.PrettyAge(time.Unix(int64(currentFinalizedBlock.Time()), 0)))
log.Info("Loaded most recent local finalized block", "number", currentFinalizedBlock.Number(), "hash", currentFinalizedBlock.Hash(), "root", currentFinalizedBlock.Root(), "td", finalTd, "age", common.PrettyAge(time.Unix(int64(currentFinalizedBlock.Time()), 0)))
}
}
}
Expand Down
9 changes: 5 additions & 4 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/trie/triedb/pathdb"
)

const (
Expand Down Expand Up @@ -979,7 +978,9 @@ func (h *handler) voteBroadcastLoop() {
// sync is finished.
func (h *handler) enableSyncedFeatures() {
h.acceptTxs.Store(true)
if h.chain.TrieDB().Scheme() == rawdb.PathScheme {
h.chain.TrieDB().SetBufferSize(pathdb.DefaultDirtyBufferSize)
}
// In the bsc scenario, pathdb.MaxDirtyBufferSize (256MB) will be used.
// The performance is better than DefaultDirtyBufferSize (64MB).
//if h.chain.TrieDB().Scheme() == rawdb.PathScheme {
// h.chain.TrieDB().SetBufferSize(pathdb.DefaultDirtyBufferSize)
//}
}
46 changes: 36 additions & 10 deletions trie/triedb/pathdb/asyncnodebuffer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pathdb

import (
"errors"
"fmt"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -82,14 +83,18 @@ func (a *asyncnodebuffer) revert(db ethdb.KeyValueReader, nodes map[common.Hash]

var err error
a.current, err = a.current.merge(a.background)
a.background.reset()
if err != nil {
log.Crit("[BUG] failed to merge memory table under revert nodebuffer", "error", err)
log.Crit("[BUG] failed to merge node cache under revert async node buffer", "error", err)
}
a.background.reset()
return a.current.revert(db, nodes)
}

// setSize is unsupported in asyncnodebuffer, due to the double buffer, blocking will occur.
func (a *asyncnodebuffer) setSize(size int, db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error {
return errors.New("not supported")
}

// reset cleans up the disk cache.
func (a *asyncnodebuffer) reset() {
a.mux.Lock()
Expand Down Expand Up @@ -142,12 +147,9 @@ func (a *asyncnodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache,
}

atomic.StoreUint64(&a.current.immutable, 1)
tmp := a.background
a.background = a.current
a.current = tmp
a.current, a.background = a.background, a.current

persistId := id - a.current.layers
go func() {
go func(persistId uint64) {
for {
err := a.background.flush(db, clean, persistId)
if err == nil {
Expand All @@ -156,7 +158,7 @@ func (a *asyncnodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache,
}
log.Error("failed to flush background nodecahce to disk", "state_id", persistId, "error", err)
}
}()
}(id)
return nil
}

Expand Down Expand Up @@ -321,11 +323,13 @@ func (nc *nodecache) merge(nc1 *nodecache) (*nodecache, error) {
return nil, nil
}
if nc == nil || nc.empty() {
atomic.StoreUint64(&nc1.immutable, 0)
res := copyNodeCache(nc1)
atomic.StoreUint64(&res.immutable, 0)
return nc1, nil
}
if nc1 == nil || nc1.empty() {
atomic.StoreUint64(&nc.immutable, 0)
res := copyNodeCache(nc)
atomic.StoreUint64(&res.immutable, 0)
return nc, nil
}
if atomic.LoadUint64(&nc.immutable) == atomic.LoadUint64(&nc1.immutable) {
Expand Down Expand Up @@ -420,3 +424,25 @@ func (nc *nodecache) revert(db ethdb.KeyValueReader, nodes map[common.Hash]map[s
nc.updateSize(delta)
return nil
}

func copyNodeCache(n *nodecache) *nodecache {
if n == nil {
return nil
}
nc := &nodecache{
layers: n.layers,
size: n.size,
limit: n.limit,
immutable: atomic.LoadUint64(&n.immutable),
nodes: make(map[common.Hash]map[string]*trienode.Node),
}
for acc, subTree := range n.nodes {
if _, ok := nc.nodes[acc]; !ok {
nc.nodes[acc] = make(map[string]*trienode.Node)
}
for path, node := range subTree {
nc.nodes[acc][path] = node
}
}
return nc
}
19 changes: 8 additions & 11 deletions trie/triedb/pathdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,18 +419,15 @@ func (db *Database) Initialized(genesisRoot common.Hash) bool {

// SetBufferSize sets the node buffer size to the provided value(in bytes).
func (db *Database) SetBufferSize(size int) error {
// disable SetBufferSize after init db
return nil
db.lock.Lock()
defer db.lock.Unlock()

//db.lock.Lock()
//defer db.lock.Unlock()
//
//if size > MaxDirtyBufferSize {
// log.Info("Capped node buffer size", "provided", common.StorageSize(size), "adjusted", common.StorageSize(MaxDirtyBufferSize))
// size = MaxDirtyBufferSize
//}
//db.bufferSize = size
//return db.tree.bottom().setBufferSize(db.bufferSize)
if size > MaxDirtyBufferSize {
log.Info("Capped node buffer size", "provided", common.StorageSize(size), "adjusted", common.StorageSize(MaxDirtyBufferSize))
size = MaxDirtyBufferSize
}
db.bufferSize = size
return db.tree.bottom().setBufferSize(db.bufferSize)
}

// Scheme returns the node scheme used in the database.
Expand Down
24 changes: 14 additions & 10 deletions trie/triedb/pathdb/disklayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ type trienodebuffer interface {
// memory threshold is reached. Note, all data must be written atomically.
flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64, force bool) error

// setSize sets the buffer size to the provided number, and invokes a flush
// operation if the current memory usage exceeds the new limit.
setSize(size int, db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error

// reset cleans up the disk cache.
reset()

Expand Down Expand Up @@ -330,16 +334,16 @@ func (dl *diskLayer) revert(h *history, loader triestate.TrieLoader) (*diskLayer
return newDiskLayer(h.meta.parent, dl.id-1, dl.db, dl.cleans, dl.buffer), nil
}

// setBufferSize sets the node buffer size to the provided value.
//func (dl *diskLayer) setBufferSize(size int) error {
// dl.lock.RLock()
// defer dl.lock.RUnlock()
//
// if dl.stale {
// return errSnapshotStale
// }
// return dl.buffer.setSize(size, dl.db.diskdb, dl.cleans, dl.id)
//}
// setBufferSize sets the trie node buffer size to the provided value.
func (dl *diskLayer) setBufferSize(size int) error {
dl.lock.RLock()
defer dl.lock.RUnlock()

if dl.stale {
return errSnapshotStale
}
return dl.buffer.setSize(size, dl.db.diskdb, dl.cleans, dl.id)
}

// size returns the approximate size of cached nodes in the disk layer.
func (dl *diskLayer) size() (common.StorageSize, common.StorageSize) {
Expand Down
6 changes: 5 additions & 1 deletion trie/triedb/pathdb/layertree.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ func (tree *layerTree) front() common.Hash {
switch dl := layer.(type) {
case *diskLayer:
if dl.stale {
log.Info("pathdb top disklayer is stale")
return base
}
base = dl.rootHash()
Expand All @@ -234,19 +235,22 @@ func (tree *layerTree) front() common.Hash {
}
chain[dl.parentLayer().rootHash()] = append(chain[dl.parentLayer().rootHash()], dl.rootHash())
default:
log.Warn("unsupported layer type")
log.Crit("unsupported layer type")
}
}
if (base == common.Hash{}) {
log.Info("pathdb top difflayer is empty")
return base
}
parent := base
for {
children, ok := chain[parent]
if !ok {
log.Info("pathdb top difflayer", "root", parent)
return parent
}
if len(children) != 1 {
log.Info("pathdb top difflayer is forked", "common ancestor root", parent)
return parent
}
parent = children[0]
Expand Down
12 changes: 8 additions & 4 deletions trie/triedb/pathdb/nodebuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,10 @@ func (b *nodebuffer) empty() bool {

// setSize sets the buffer size to the provided number, and invokes a flush
// operation if the current memory usage exceeds the new limit.
//func (b *nodebuffer) setSize(size int, db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error {
// b.limit = uint64(size)
// return b.flush(db, clean, id, false)
//}
func (b *nodebuffer) setSize(size int, db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error {
b.limit = uint64(size)
return b.flush(db, clean, id, false)
}

// flush persists the in-memory dirty trie node into the disk if the configured
// memory threshold is reached. Note, all data must be written atomically.
Expand All @@ -219,6 +219,10 @@ func (b *nodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id ui
}
var (
start = time.Now()
// Although the calculation of b.size has been as accurate as possible,
// some omissions were still found during testing and code review, but
// we are still not sure it is completely accurate. For better protection,
// some redundancy is added here.
batch = db.NewBatchWithSize(int(float64(b.size) * DefaultBatchRedundancyRate))
)
nodes := writeNodes(batch, b.nodes, clean)
Expand Down

0 comments on commit 7aeb811

Please sign in to comment.