1717package pathdb
1818
1919import (
20+ "errors"
2021 "fmt"
2122 "time"
2223
@@ -37,6 +38,9 @@ type buffer struct {
3738 limit uint64 // The maximum memory allowance in bytes
3839 nodes * nodeSet // Aggregated trie node set
3940 states * stateSet // Aggregated state set
41+
42+ done chan struct {} // notifier whether the content in buffer has been flushed or not
43+ flushErr error // error if any exception occurs during flushing
4044}
4145
4246// newBuffer initializes the buffer with the provided states and trie nodes.
@@ -124,43 +128,74 @@ func (b *buffer) size() uint64 {
124128
125129// flush persists the in-memory dirty trie node into the disk if the configured
126130// memory threshold is reached. Note, all data must be written atomically.
127- func (b * buffer ) flush (root common.Hash , db ethdb.KeyValueStore , freezer ethdb.AncientWriter , progress []byte , nodesCache , statesCache * fastcache.Cache , id uint64 ) error {
128- // Ensure the target state id is aligned with the internal counter.
129- head := rawdb .ReadPersistentStateID (db )
130- if head + b .layers != id {
131- return fmt .Errorf ("buffer layers (%d) cannot be applied on top of persisted state id (%d) to reach requested state id (%d)" , b .layers , head , id )
131+ func (b * buffer ) flush (root common.Hash , db ethdb.KeyValueStore , freezer ethdb.AncientWriter , progress []byte , nodesCache , statesCache * fastcache.Cache , id uint64 , postFlush func ()) {
132+ if b .done != nil {
133+ panic ("duplicated flush operation" )
132134 }
133- // Terminate the state snapshot generation if it's active
134- var (
135- start = time .Now ()
136- batch = db .NewBatchWithSize ((b .nodes .dbsize () + b .states .dbsize ()) * 11 / 10 ) // extra 10% for potential pebble internal stuff
137- )
138- // Explicitly sync the state freezer to ensure all written data is persisted to disk
139- // before updating the key-value store.
140- //
141- // This step is crucial to guarantee that the corresponding state history remains
142- // available for state rollback.
143- if freezer != nil {
144- if err := freezer .SyncAncient (); err != nil {
145- return err
135+ b .done = make (chan struct {}) // allocate the channel for notification
136+
137+ go func () {
138+ defer func () {
139+ if postFlush != nil {
140+ postFlush ()
141+ }
142+ close (b .done )
143+ }()
144+
145+ // Ensure the target state id is aligned with the internal counter.
146+ head := rawdb .ReadPersistentStateID (db )
147+ if head + b .layers != id {
148+ b .flushErr = fmt .Errorf ("buffer layers (%d) cannot be applied on top of persisted state id (%d) to reach requested state id (%d)" , b .layers , head , id )
149+ return
146150 }
151+
152+ // Terminate the state snapshot generation if it's active
153+ var (
154+ start = time .Now ()
155+ batch = db .NewBatchWithSize ((b .nodes .dbsize () + b .states .dbsize ()) * 11 / 10 ) // extra 10% for potential pebble internal stuff
156+ )
157+ // Explicitly sync the state freezer to ensure all written data is persisted to disk
158+ // before updating the key-value store.
159+ //
160+ // This step is crucial to guarantee that the corresponding state history remains
161+ // available for state rollback.
162+ if freezer != nil {
163+ if err := freezer .SyncAncient (); err != nil {
164+ b .flushErr = err
165+ return
166+ }
167+ }
168+ nodes := b .nodes .write (batch , nodesCache )
169+ accounts , slots := b .states .write (batch , progress , statesCache )
170+ rawdb .WritePersistentStateID (batch , id )
171+ rawdb .WriteSnapshotRoot (batch , root )
172+
173+ // Flush all mutations in a single batch
174+ size := batch .ValueSize ()
175+ if err := batch .Write (); err != nil {
176+ b .flushErr = err
177+ return
178+ }
179+ commitBytesMeter .Mark (int64 (size ))
180+ commitNodesMeter .Mark (int64 (nodes ))
181+ commitAccountsMeter .Mark (int64 (accounts ))
182+ commitStoragesMeter .Mark (int64 (slots ))
183+ commitTimeTimer .UpdateSince (start )
184+
185+ // The content in the frozen buffer is kept for consequent state access,
186+ // TODO (rjl493456442) measure the gc overhead for holding this struct.
187+ // TODO (rjl493456442) can we somehow get rid of it after flushing??
188+ b .reset ()
189+ log .Debug ("Persisted buffer content" , "nodes" , nodes , "accounts" , accounts , "slots" , slots , "bytes" , common .StorageSize (size ), "elapsed" , common .PrettyDuration (time .Since (start )))
190+ }()
191+ }
192+
193+ // waitFlush blocks until the buffer has been fully flushed and returns any
194+ // stored errors that occurred during the process.
195+ func (b * buffer ) waitFlush () error {
196+ if b .done == nil {
197+ return errors .New ("the buffer is not frozen" )
147198 }
148- nodes := b .nodes .write (batch , nodesCache )
149- accounts , slots := b .states .write (batch , progress , statesCache )
150- rawdb .WritePersistentStateID (batch , id )
151- rawdb .WriteSnapshotRoot (batch , root )
152-
153- // Flush all mutations in a single batch
154- size := batch .ValueSize ()
155- if err := batch .Write (); err != nil {
156- return err
157- }
158- commitBytesMeter .Mark (int64 (size ))
159- commitNodesMeter .Mark (int64 (nodes ))
160- commitAccountsMeter .Mark (int64 (accounts ))
161- commitStoragesMeter .Mark (int64 (slots ))
162- commitTimeTimer .UpdateSince (start )
163- b .reset ()
164- log .Debug ("Persisted buffer content" , "nodes" , nodes , "accounts" , accounts , "slots" , slots , "bytes" , common .StorageSize (size ), "elapsed" , common .PrettyDuration (time .Since (start )))
165- return nil
199+ <- b .done
200+ return b .flushErr
166201}
0 commit comments