Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

triedb/pathdb: fix async node buffer diskroot mismatches when journaling #2083

Merged
merged 3 commits into from
Dec 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions eth/ethconfig/gen_config.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

51 changes: 31 additions & 20 deletions trie/triedb/pathdb/asyncnodebuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/VictoriaMetrics/fastcache"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/crypto"
Expand All @@ -18,12 +19,14 @@ import (

var _ trienodebuffer = &asyncnodebuffer{}

// asyncnodebuffer implement trienodebuffer interface, and aysnc the nodecache
// asyncnodebuffer implement trienodebuffer interface, and async the nodecache
// to disk.
type asyncnodebuffer struct {
mux sync.RWMutex
current *nodecache
background *nodecache
mux sync.RWMutex
current *nodecache
background *nodecache
isFlushing atomic.Bool
stopFlushing atomic.Bool
}

// newAsyncNodeBuffer initializes the async node buffer with the provided nodes.
Expand Down Expand Up @@ -112,24 +115,21 @@ func (a *asyncnodebuffer) empty() bool {
return a.current.empty() && a.background.empty()
}

// setSize sets the buffer size to the provided number, and invokes a flush
// operation if the current memory usage exceeds the new limit.
//func (b *nodebuffer) setSize(size int, db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error {
// b.limit = uint64(size)
// return b.flush(db, clean, id, false)
//}

// flush persists the in-memory dirty trie node into the disk if the configured
// memory threshold is reached. Note, all data must be written atomically.
func (a *asyncnodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64, force bool) error {
a.mux.Lock()
defer a.mux.Unlock()

if a.stopFlushing.Load() {
return nil
}

if force {
for {
if atomic.LoadUint64(&a.background.immutable) == 1 {
time.Sleep(time.Duration(DefaultBackgroundFlushInterval) * time.Second)
log.Info("waiting background memory table flush to disk for force flush node buffer")
log.Info("waiting background memory table flushed into disk for forcing flush node buffer")
continue
}
atomic.StoreUint64(&a.current.immutable, 1)
Expand All @@ -149,26 +149,36 @@ func (a *asyncnodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache,
atomic.StoreUint64(&a.current.immutable, 1)
a.current, a.background = a.background, a.current

go func(persistId uint64) {
a.isFlushing.Store(true)
go func(persistID uint64) {
defer a.isFlushing.Store(false)
for {
err := a.background.flush(db, clean, persistId)
err := a.background.flush(db, clean, persistID)
if err == nil {
log.Debug("succeed to flush background nodecahce to disk", "state_id", persistId)
log.Debug("succeed to flush background nodecache to disk", "state_id", persistID)
return
}
log.Error("failed to flush background nodecahce to disk", "state_id", persistId, "error", err)
log.Error("failed to flush background nodecache to disk", "state_id", persistID, "error", err)
}
}(id)
return nil
}

func (a *asyncnodebuffer) waitAndStopFlushing() {
a.stopFlushing.Store(true)
for a.isFlushing.Load() {
time.Sleep(time.Second)
log.Warn("waiting background memory table flushed into disk")
}
}

func (a *asyncnodebuffer) getAllNodes() map[common.Hash]map[string]*trienode.Node {
a.mux.Lock()
defer a.mux.Unlock()

cached, err := a.current.merge(a.background)
if err != nil {
log.Crit("[BUG] failed to merge nodecache under revert asyncnodebuffer", "error", err)
log.Crit("[BUG] failed to merge node cache under revert async node buffer", "error", err)
}
return cached.nodes
}
Expand Down Expand Up @@ -226,6 +236,7 @@ func (nc *nodecache) commit(nodes map[common.Hash]map[string]*trienode.Node) err
if atomic.LoadUint64(&nc.immutable) == 1 {
return errWriteImmutable
}

var (
delta int64
overwrite int64
Expand Down Expand Up @@ -325,12 +336,12 @@ func (nc *nodecache) merge(nc1 *nodecache) (*nodecache, error) {
if nc == nil || nc.empty() {
res := copyNodeCache(nc1)
atomic.StoreUint64(&res.immutable, 0)
return nc1, nil
return res, nil
}
if nc1 == nil || nc1.empty() {
res := copyNodeCache(nc)
atomic.StoreUint64(&res.immutable, 0)
return nc, nil
return res, nil
}
if atomic.LoadUint64(&nc.immutable) == atomic.LoadUint64(&nc1.immutable) {
return nil, errIncompatibleMerge
Expand All @@ -350,7 +361,7 @@ func (nc *nodecache) merge(nc1 *nodecache) (*nodecache, error) {
}
res.size = immutable.size + mutable.size
res.layers = immutable.layers + mutable.layers
res.limit = immutable.size
res.limit = immutable.limit
res.nodes = make(map[common.Hash]map[string]*trienode.Node)
for acc, subTree := range immutable.nodes {
if _, ok := res.nodes[acc]; !ok {
Expand Down
3 changes: 3 additions & 0 deletions trie/triedb/pathdb/disklayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ type trienodebuffer interface {

// getLayers return the size of cached difflayers.
getLayers() uint64

// waitAndStopFlushing will block unit writing the trie nodes of trienodebuffer to disk.
waitAndStopFlushing()
}

func NewTrieNodeBuffer(sync bool, limit int, nodes map[common.Hash]map[string]*trienode.Node, layers uint64) trienodebuffer {
Expand Down
12 changes: 7 additions & 5 deletions trie/triedb/pathdb/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func (db *Database) loadDiffLayer(parent layer, r *rlp.Stream) (layer, error) {
}

// journal implements the layer interface, marshaling the un-flushed trie nodes
// along with layer meta data into provided byte buffer.
// along with layer metadata into provided byte buffer.
func (dl *diskLayer) journal(w io.Writer) error {
dl.lock.RLock()
defer dl.lock.RUnlock()
Expand Down Expand Up @@ -338,6 +338,10 @@ func (dl *diffLayer) journal(w io.Writer) error {
// flattening everything down (bad for reorgs). And this function will mark the
// database as read-only to prevent all following mutation to disk.
func (db *Database) Journal(root common.Hash) error {
// Run the journaling
db.lock.Lock()
defer db.lock.Unlock()

// Retrieve the head layer to journal from.
l := db.tree.get(root)
if l == nil {
Expand All @@ -351,10 +355,8 @@ func (db *Database) Journal(root common.Hash) error {
}
start := time.Now()

// Run the journaling
db.lock.Lock()
defer db.lock.Unlock()

// wait and stop the flush trienodebuffer, for asyncnodebuffer need fixed diskroot
disk.buffer.waitAndStopFlushing()
// Short circuit if the database is in read only mode.
if db.readOnly {
return errSnapshotReadOnly
Expand Down
4 changes: 3 additions & 1 deletion trie/triedb/pathdb/nodebuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (b *nodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id ui
start = time.Now()
// Although the calculation of b.size has been as accurate as possible,
// some omissions were still found during testing and code review, but
// we are still not sure it is completely accurate. For better protection,
// we are still not sure if it is completely accurate. For better protection,
// some redundancy is added here.
batch = db.NewBatchWithSize(int(float64(b.size) * DefaultBatchRedundancyRate))
)
Expand All @@ -241,6 +241,8 @@ func (b *nodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id ui
return nil
}

func (b *nodebuffer) waitAndStopFlushing() {}

// writeNodes writes the trie nodes into the provided database batch.
// Note this function will also inject all the newly written nodes
// into clean cache.
Expand Down