Skip to content

Commit

Permalink
triedb: check whether the async flush is done
Browse files Browse the repository at this point in the history
  • Loading branch information
VM committed Dec 18, 2023
1 parent 5275063 commit 1ed7fef
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 24 deletions.
51 changes: 32 additions & 19 deletions trie/triedb/pathdb/asyncnodebuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ import (

var _ trienodebuffer = &asyncnodebuffer{}

// asyncnodebuffer implement trienodebuffer interface, and aysnc the nodecache
// asyncnodebuffer implement trienodebuffer interface, and async the nodecache
// to disk.
type asyncnodebuffer struct {
mux sync.RWMutex
current *nodecache
background *nodecache
stopFlushing uint64
flushing uint64
isFlushing atomic.Bool
stopFlushing atomic.Uint64

Check failure on line 28 in trie/triedb/pathdb/asyncnodebuffer.go

View workflow job for this annotation

GitHub Actions / golang-lint (1.20.x, ubuntu-latest)

field `stopFlushing` is unused (unused)
flushing atomic.Uint64

Check failure on line 29 in trie/triedb/pathdb/asyncnodebuffer.go

View workflow job for this annotation

GitHub Actions / golang-lint (1.20.x, ubuntu-latest)

field `flushing` is unused (unused)
}

// newAsyncNodeBuffer initializes the async node buffer with the provided nodes.
Expand Down Expand Up @@ -116,20 +117,26 @@ func (a *asyncnodebuffer) empty() bool {

// setSize sets the buffer size to the provided number, and invokes a flush
// operation if the current memory usage exceeds the new limit.
//func (b *nodebuffer) setSize(size int, db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error {
// func (b *nodebuffer) setSize(size int, db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error {
// b.limit = uint64(size)
// return b.flush(db, clean, id, false)
//}
// }

// flush persists the in-memory dirty trie node into the disk if the configured
// memory threshold is reached. Note, all data must be written atomically.
func (a *asyncnodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64, force bool) error {
a.mux.Lock()
defer a.mux.Unlock()

if atomic.LoadUint64(&a.stopFlushing) == 1 {
if a.isFlushing.Load() == false {

Check failure on line 131 in trie/triedb/pathdb/asyncnodebuffer.go

View workflow job for this annotation

GitHub Actions / golang-lint (1.20.x, ubuntu-latest)

S1002: should omit comparison to bool constant, can be simplified to `!a.isFlushing.Load()` (gosimple)
return nil
}
// if a.stopFlushing.Load() == 1 {
// return nil
// }
// if atomic.LoadUint64(&a.stopFlushing) == 1 {
// return nil
// }

if force {
for {
Expand All @@ -155,28 +162,34 @@ func (a *asyncnodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache,
atomic.StoreUint64(&a.current.immutable, 1)
a.current, a.background = a.background, a.current

atomic.StoreUint64(&a.flushing, 1)
go func(persistId uint64) {
defer atomic.StoreUint64(&a.flushing, 0)
a.isFlushing.Store(true)
// a.flushing.Store(1)
// atomic.StoreUint64(&a.flushing, 1)
go func(persistID uint64) {
defer a.isFlushing.Store(false)
// defer a.flushing.Store(0)
// defer atomic.StoreUint64(&a.flushing, 0)
for {
err := a.background.flush(db, clean, persistId)
err := a.background.flush(db, clean, persistID)
if err == nil {
log.Debug("succeed to flush background nodecahce to disk", "state_id", persistId)
log.Debug("succeed to flush background nodecache to disk", "state_id", persistID)
return
}
log.Error("failed to flush background nodecahce to disk", "state_id", persistId, "error", err)
log.Error("failed to flush background nodecache to disk", "state_id", persistID, "error", err)
}
}(id)
return nil
}

func (a *asyncnodebuffer) waitAndStopFlushing() {
atomic.StoreUint64(&a.stopFlushing, 1)
if atomic.LoadUint64(&a.flushing) == 1 {
// a.stopFlushing.Store(1)
// atomic.StoreUint64(&a.stopFlushing, 1)
// atomic.LoadUint64(&a.flushing) == 1
// a.flushing.Load() == 1
if a.isFlushing.Load() == true {

Check failure on line 189 in trie/triedb/pathdb/asyncnodebuffer.go

View workflow job for this annotation

GitHub Actions / golang-lint (1.20.x, ubuntu-latest)

S1002: should omit comparison to bool constant, can be simplified to `a.isFlushing.Load()` (gosimple)
time.Sleep(time.Duration(1) * time.Second)
log.Info("waiting background memory table flush to disk")
log.Info("waiting background memory table flushed into disk")
}
return
}

func (a *asyncnodebuffer) getAllNodes() map[common.Hash]map[string]*trienode.Node {
Expand Down Expand Up @@ -342,12 +355,12 @@ func (nc *nodecache) merge(nc1 *nodecache) (*nodecache, error) {
if nc == nil || nc.empty() {
res := copyNodeCache(nc1)
atomic.StoreUint64(&res.immutable, 0)
return nc1, nil
return res, nil
}
if nc1 == nil || nc1.empty() {
res := copyNodeCache(nc)
atomic.StoreUint64(&res.immutable, 0)
return nc, nil
return res, nil
}
if atomic.LoadUint64(&nc.immutable) == atomic.LoadUint64(&nc1.immutable) {
return nil, errIncompatibleMerge
Expand All @@ -367,7 +380,7 @@ func (nc *nodecache) merge(nc1 *nodecache) (*nodecache, error) {
}
res.size = immutable.size + mutable.size
res.layers = immutable.layers + mutable.layers
res.limit = immutable.size
res.limit = immutable.limit
res.nodes = make(map[common.Hash]map[string]*trienode.Node)
for acc, subTree := range immutable.nodes {
if _, ok := res.nodes[acc]; !ok {
Expand Down
2 changes: 1 addition & 1 deletion trie/triedb/pathdb/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ func (db *Database) Journal(root common.Hash) error {
}
start := time.Now()

// wait and stop the flush trienodebuffer, for async node buffer need fixed diskroot
// wait and stop the flush trienodebuffer, for asyncnodebuffer need fixed diskroot
disk.buffer.waitAndStopFlushing()
// Short circuit if the database is in read only mode.
if db.readOnly {
Expand Down
6 changes: 2 additions & 4 deletions trie/triedb/pathdb/nodebuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (b *nodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id ui
start = time.Now()
// Although the calculation of b.size has been as accurate as possible,
// some omissions were still found during testing and code review, but
// we are still not sure it is completely accurate. For better protection,
// we are still not sure if it is completely accurate. For better protection,
// some redundancy is added here.
batch = db.NewBatchWithSize(int(float64(b.size) * DefaultBatchRedundancyRate))
)
Expand All @@ -241,9 +241,7 @@ func (b *nodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id ui
return nil
}

func (b *nodebuffer) waitAndStopFlushing() {
return
}
func (b *nodebuffer) waitAndStopFlushing() {}

// writeNodes writes the trie nodes into the provided database batch.
// Note this function will also inject all the newly written nodes
Expand Down

0 comments on commit 1ed7fef

Please sign in to comment.