1717package pathdb
1818
1919import (
20+ "errors"
2021 "fmt"
2122 "time"
2223
@@ -37,6 +38,13 @@ type buffer struct {
3738 limit uint64 // The maximum memory allowance in bytes
3839 nodes * nodeSet // Aggregated trie node set
3940 states * stateSet // Aggregated state set
41+
42+ // done is the notifier whether the content in buffer has been flushed or not.
43+ // This channel is nil if the buffer is not frozen.
44+ done chan struct {}
45+
46+ // flushErr memorizes the error if any exception occurs during flushing
47+ flushErr error
4048}
4149
4250// newBuffer initializes the buffer with the provided states and trie nodes.
@@ -61,7 +69,7 @@ func (b *buffer) account(hash common.Hash) ([]byte, bool) {
6169 return b .states .account (hash )
6270}
6371
64- // storage retrieves the storage slot with account address hash and slot key.
72+ // storage retrieves the storage slot with account address hash and slot key hash .
6573func (b * buffer ) storage (addrHash common.Hash , storageHash common.Hash ) ([]byte , bool ) {
6674 return b .states .storage (addrHash , storageHash )
6775}
@@ -124,43 +132,78 @@ func (b *buffer) size() uint64 {
124132
125133// flush persists the in-memory dirty trie node into the disk if the configured
126134// memory threshold is reached. Note, all data must be written atomically.
127- func (b * buffer ) flush (root common.Hash , db ethdb.KeyValueStore , freezer ethdb.AncientWriter , progress []byte , nodesCache , statesCache * fastcache.Cache , id uint64 ) error {
128- // Ensure the target state id is aligned with the internal counter.
129- head := rawdb .ReadPersistentStateID (db )
130- if head + b .layers != id {
131- return fmt .Errorf ("buffer layers (%d) cannot be applied on top of persisted state id (%d) to reach requested state id (%d)" , b .layers , head , id )
135+ func (b * buffer ) flush (root common.Hash , db ethdb.KeyValueStore , freezer ethdb.AncientWriter , progress []byte , nodesCache , statesCache * fastcache.Cache , id uint64 , postFlush func ()) {
136+ if b .done != nil {
137+ panic ("duplicated flush operation" )
132138 }
133- // Terminate the state snapshot generation if it's active
134- var (
135- start = time .Now ()
136- batch = db .NewBatchWithSize ((b .nodes .dbsize () + b .states .dbsize ()) * 11 / 10 ) // extra 10% for potential pebble internal stuff
137- )
138- // Explicitly sync the state freezer to ensure all written data is persisted to disk
139- // before updating the key-value store.
140- //
141- // This step is crucial to guarantee that the corresponding state history remains
142- // available for state rollback.
143- if freezer != nil {
144- if err := freezer .SyncAncient (); err != nil {
145- return err
139+ b .done = make (chan struct {}) // allocate the channel for notification
140+
141+ // Schedule the background thread to construct the batch, which usually
142+ // take a few seconds.
143+ go func () {
144+ defer func () {
145+ if postFlush != nil {
146+ postFlush ()
147+ }
148+ close (b .done )
149+ }()
150+
151+ // Ensure the target state id is aligned with the internal counter.
152+ head := rawdb .ReadPersistentStateID (db )
153+ if head + b .layers != id {
154+ b .flushErr = fmt .Errorf ("buffer layers (%d) cannot be applied on top of persisted state id (%d) to reach requested state id (%d)" , b .layers , head , id )
155+ return
146156 }
157+
158+ // Terminate the state snapshot generation if it's active
159+ var (
160+ start = time .Now ()
161+ batch = db .NewBatchWithSize ((b .nodes .dbsize () + b .states .dbsize ()) * 11 / 10 ) // extra 10% for potential pebble internal stuff
162+ )
163+ // Explicitly sync the state freezer to ensure all written data is persisted to disk
164+ // before updating the key-value store.
165+ //
166+ // This step is crucial to guarantee that the corresponding state history remains
167+ // available for state rollback.
168+ if freezer != nil {
169+ if err := freezer .SyncAncient (); err != nil {
170+ b .flushErr = err
171+ return
172+ }
173+ }
174+ nodes := b .nodes .write (batch , nodesCache )
175+ accounts , slots := b .states .write (batch , progress , statesCache )
176+ rawdb .WritePersistentStateID (batch , id )
177+ rawdb .WriteSnapshotRoot (batch , root )
178+
179+ // Flush all mutations in a single batch
180+ size := batch .ValueSize ()
181+ if err := batch .Write (); err != nil {
182+ b .flushErr = err
183+ return
184+ }
185+ commitBytesMeter .Mark (int64 (size ))
186+ commitNodesMeter .Mark (int64 (nodes ))
187+ commitAccountsMeter .Mark (int64 (accounts ))
188+ commitStoragesMeter .Mark (int64 (slots ))
189+ commitTimeTimer .UpdateSince (start )
190+
191+ // The content in the frozen buffer is kept for consequent state access,
192+ // TODO (rjl493456442) measure the gc overhead for holding this struct.
193+ // TODO (rjl493456442) can we somehow get rid of it after flushing??
194+ // TODO (rjl493456442) buffer itself is not thread-safe, add the lock
195+ // protection if try to reset the buffer here.
196+ // b.reset()
197+ log .Debug ("Persisted buffer content" , "nodes" , nodes , "accounts" , accounts , "slots" , slots , "bytes" , common .StorageSize (size ), "elapsed" , common .PrettyDuration (time .Since (start )))
198+ }()
199+ }
200+
201+ // waitFlush blocks until the buffer has been fully flushed and returns any
202+ // stored errors that occurred during the process.
203+ func (b * buffer ) waitFlush () error {
204+ if b .done == nil {
205+ return errors .New ("the buffer is not frozen" )
147206 }
148- nodes := b .nodes .write (batch , nodesCache )
149- accounts , slots := b .states .write (batch , progress , statesCache )
150- rawdb .WritePersistentStateID (batch , id )
151- rawdb .WriteSnapshotRoot (batch , root )
152-
153- // Flush all mutations in a single batch
154- size := batch .ValueSize ()
155- if err := batch .Write (); err != nil {
156- return err
157- }
158- commitBytesMeter .Mark (int64 (size ))
159- commitNodesMeter .Mark (int64 (nodes ))
160- commitAccountsMeter .Mark (int64 (accounts ))
161- commitStoragesMeter .Mark (int64 (slots ))
162- commitTimeTimer .UpdateSince (start )
163- b .reset ()
164- log .Debug ("Persisted buffer content" , "nodes" , nodes , "accounts" , accounts , "slots" , slots , "bytes" , common .StorageSize (size ), "elapsed" , common .PrettyDuration (time .Since (start )))
165- return nil
207+ <- b .done
208+ return b .flushErr
166209}
0 commit comments