@@ -379,7 +379,7 @@ func (dl *diskLayer) proveRange(stats *generatorStats, root common.Hash, prefix
379379type onStateCallback func (key []byte , val []byte , write bool , delete bool ) error
380380
381381// generateRange generates the state segment with particular prefix. Generation can
382- // either verify the correctness of existing state through rangeproof and skip
382+ // either verify the correctness of existing state through range-proof and skip
383383// generation, or iterate trie to regenerate state on demand.
384384func (dl * diskLayer ) generateRange (root common.Hash , prefix []byte , kind string , origin []byte , max int , stats * generatorStats , onState onStateCallback , valueConvertFn func ([]byte ) ([]byte , error )) (bool , []byte , error ) {
385385 // Use range prover to check the validity of the flat state in the range
@@ -532,66 +532,94 @@ func (dl *diskLayer) generateRange(root common.Hash, prefix []byte, kind string,
532532 return ! trieMore && ! result .diskMore , last , nil
533533}
534534
535- // generate is a background thread that iterates over the state and storage tries,
536- // constructing the state snapshot. All the arguments are purely for statistics
537- // gathering and logging, since the method surfs the blocks as they arrive, often
538- // being restarted.
539- func (dl * diskLayer ) generate (stats * generatorStats ) {
540- var (
541- accMarker []byte
542- accountRange = accountCheckRange
543- )
544- if len (dl .genMarker ) > 0 { // []byte{} is the start, use nil for that
545- // Always reset the initial account range as 1
546- // whenever recover from the interruption.
547- accMarker , accountRange = dl .genMarker [:common .HashLength ], 1
535+ // checkAndFlush checks if an interruption signal is received or the
536+ // batch size has exceeded the allowance.
537+ func (dl * diskLayer ) checkAndFlush (current []byte , batch ethdb.Batch , stats * generatorStats , logged * time.Time ) error {
538+ var abort chan * generatorStats
539+ select {
540+ case abort = <- dl .genAbort :
541+ default :
548542 }
549- var (
550- batch = dl .diskdb .NewBatch ()
551- logged = time .Now ()
552- accOrigin = common .CopyBytes (accMarker )
553- abort chan * generatorStats
554- )
555- stats .Log ("Resuming state snapshot generation" , dl .root , dl .genMarker )
543+ if batch .ValueSize () > ethdb .IdealBatchSize || abort != nil {
544+ if bytes .Compare (current , dl .genMarker ) < 0 {
545+ log .Error ("Snapshot generator went backwards" , "current" , fmt .Sprintf ("%x" , current ), "genMarker" , fmt .Sprintf ("%x" , dl .genMarker ))
546+ }
547+ // Flush out the batch anyway no matter it's empty or not.
548+ // It's possible that all the states are recovered and the
549+ // generation indeed makes progress.
550+ journalProgress (batch , current , stats )
556551
557- checkAndFlush := func (currentLocation []byte ) error {
558- select {
559- case abort = <- dl .genAbort :
560- default :
552+ if err := batch .Write (); err != nil {
553+ return err
561554 }
562- if batch .ValueSize () > ethdb .IdealBatchSize || abort != nil {
563- if bytes .Compare (currentLocation , dl .genMarker ) < 0 {
564- log .Error ("Snapshot generator went backwards" ,
565- "currentLocation" , fmt .Sprintf ("%x" , currentLocation ),
566- "genMarker" , fmt .Sprintf ("%x" , dl .genMarker ))
567- }
555+ batch .Reset ()
568556
569- // Flush out the batch anyway no matter it's empty or not.
570- // It's possible that all the states are recovered and the
571- // generation indeed makes progress.
572- journalProgress (batch , currentLocation , stats )
557+ dl .lock .Lock ()
558+ dl .genMarker = current
559+ dl .lock .Unlock ()
573560
574- if err := batch .Write (); err != nil {
575- return err
576- }
577- batch .Reset ()
561+ if abort != nil {
562+ stats .Log ("Aborting state snapshot generation" , dl .root , current )
563+ return newAbortErr (abort ) // bubble up an error for interruption
564+ }
565+ }
566+ if time .Since (* logged ) > 8 * time .Second {
567+ stats .Log ("Generating state snapshot" , dl .root , current )
568+ * logged = time .Now ()
569+ }
570+ return nil
571+ }
578572
579- dl .lock .Lock ()
580- dl .genMarker = currentLocation
581- dl .lock .Unlock ()
573+ // generateStorages generates the missing storage slots of the specific contract.
574+ // It's supposed to restart the generation from the given origin position.
575+ func generateStorages (dl * diskLayer , account common.Hash , storageRoot common.Hash , storeMarker []byte , batch ethdb.Batch , stats * generatorStats , logged * time.Time ) error {
576+ onStorage := func (key []byte , val []byte , write bool , delete bool ) error {
577+ defer func (start time.Time ) {
578+ snapStorageWriteCounter .Inc (time .Since (start ).Nanoseconds ())
579+ }(time .Now ())
582580
583- if abort != nil {
584- stats .Log ("Aborting state snapshot generation" , dl .root , currentLocation )
585- return errors .New ("aborted" )
586- }
581+ if delete {
582+ rawdb .DeleteStorageSnapshot (batch , account , common .BytesToHash (key ))
583+ snapWipedStorageMeter .Mark (1 )
584+ return nil
585+ }
586+ if write {
587+ rawdb .WriteStorageSnapshot (batch , account , common .BytesToHash (key ), val )
588+ snapGeneratedStorageMeter .Mark (1 )
589+ } else {
590+ snapRecoveredStorageMeter .Mark (1 )
587591 }
588- if time .Since (logged ) > 8 * time .Second {
589- stats .Log ("Generating state snapshot" , dl .root , currentLocation )
590- logged = time .Now ()
592+ stats .storage += common .StorageSize (1 + 2 * common .HashLength + len (val ))
593+ stats .slots ++
594+
595+ // If we've exceeded our batch allowance or termination was requested, flush to disk
596+ if err := dl .checkAndFlush (append (account [:], key ... ), batch , stats , logged ); err != nil {
597+ return err
591598 }
592599 return nil
593600 }
601+ // Loop for re-generating the missing storage slots.
602+ var origin = common .CopyBytes (storeMarker )
603+ for {
604+ exhausted , last , err := dl .generateRange (storageRoot , append (rawdb .SnapshotStoragePrefix , account .Bytes ()... ), "storage" , origin , storageCheckRange , stats , onStorage , nil )
605+ if err != nil {
606+ return err // The procedure it aborted, either by external signal or internal error.
607+ }
608+ // Abort the procedure if the entire contract storage is generated
609+ if exhausted {
610+ break
611+ }
612+ if origin = increaseKey (last ); origin == nil {
613+ break // special case, the last is 0xffffffff...fff
614+ }
615+ }
616+ return nil
617+ }
594618
619+ // generateAccounts generates the missing snapshot accounts as well as their
620+ // storage slots in the main trie. It's supposed to restart the generation
621+ // from the given origin position.
622+ func generateAccounts (dl * diskLayer , accMarker []byte , batch ethdb.Batch , stats * generatorStats , logged * time.Time ) error {
595623 onAccount := func (key []byte , val []byte , write bool , delete bool ) error {
596624 var (
597625 start = time .Now ()
@@ -647,7 +675,7 @@ func (dl *diskLayer) generate(stats *generatorStats) {
647675 marker = dl .genMarker [:]
648676 }
649677 // If we've exceeded our batch allowance or termination was requested, flush to disk
650- if err := checkAndFlush (marker ); err != nil {
678+ if err := dl . checkAndFlush (marker , batch , stats , logged ); err != nil {
651679 return err
652680 }
653681 // If the iterated account is the contract, create a further loop to
@@ -671,70 +699,67 @@ func (dl *diskLayer) generate(stats *generatorStats) {
671699 if accMarker != nil && bytes .Equal (accountHash [:], accMarker ) && len (dl .genMarker ) > common .HashLength {
672700 storeMarker = dl .genMarker [common .HashLength :]
673701 }
674- onStorage := func (key []byte , val []byte , write bool , delete bool ) error {
675- defer func (start time.Time ) {
676- snapStorageWriteCounter .Inc (time .Since (start ).Nanoseconds ())
677- }(time .Now ())
678-
679- if delete {
680- rawdb .DeleteStorageSnapshot (batch , accountHash , common .BytesToHash (key ))
681- snapWipedStorageMeter .Mark (1 )
682- return nil
683- }
684- if write {
685- rawdb .WriteStorageSnapshot (batch , accountHash , common .BytesToHash (key ), val )
686- snapGeneratedStorageMeter .Mark (1 )
687- } else {
688- snapRecoveredStorageMeter .Mark (1 )
689- }
690- stats .storage += common .StorageSize (1 + 2 * common .HashLength + len (val ))
691- stats .slots ++
692-
693- // If we've exceeded our batch allowance or termination was requested, flush to disk
694- if err := checkAndFlush (append (accountHash [:], key ... )); err != nil {
695- return err
696- }
697- return nil
698- }
699- var storeOrigin = common .CopyBytes (storeMarker )
700- for {
701- exhausted , last , err := dl .generateRange (acc .Root , append (rawdb .SnapshotStoragePrefix , accountHash .Bytes ()... ), "storage" , storeOrigin , storageCheckRange , stats , onStorage , nil )
702- if err != nil {
703- return err
704- }
705- if exhausted {
706- break
707- }
708- if storeOrigin = increaseKey (last ); storeOrigin == nil {
709- break // special case, the last is 0xffffffff...fff
710- }
702+ if err := generateStorages (dl , accountHash , acc .Root , storeMarker , batch , stats , logged ); err != nil {
703+ return err
711704 }
712705 }
713706 // Some account processed, unmark the marker
714707 accMarker = nil
715708 return nil
716709 }
717-
718- // Global loop for regerating the entire state trie + all layered storage tries.
710+ // Always reset the initial account range as 1 whenever recover from the interruption.
711+ var accountRange = accountCheckRange
712+ if len (accMarker ) > 0 {
713+ accountRange = 1
714+ }
715+ // Global loop for re-generating the account snapshots + all layered storage snapshots.
716+ origin := common .CopyBytes (accMarker )
719717 for {
720- exhausted , last , err := dl .generateRange (dl .root , rawdb .SnapshotAccountPrefix , "account" , accOrigin , accountRange , stats , onAccount , FullAccountRLP )
721- // The procedure it aborted, either by external signal or internal error
718+ exhausted , last , err := dl .generateRange (dl .root , rawdb .SnapshotAccountPrefix , "account" , origin , accountRange , stats , onAccount , FullAccountRLP )
722719 if err != nil {
723- if abort == nil { // aborted by internal error, wait the signal
724- abort = <- dl .genAbort
725- }
726- abort <- stats
727- return
720+ return err // The procedure it aborted, either by external signal or internal error.
728721 }
729722 // Abort the procedure if the entire snapshot is generated
730723 if exhausted {
731724 break
732725 }
733- if accOrigin = increaseKey (last ); accOrigin == nil {
726+ if origin = increaseKey (last ); origin == nil {
734727 break // special case, the last is 0xffffffff...fff
735728 }
736729 accountRange = accountCheckRange
737730 }
731+ return nil
732+ }
733+
734+ // generate is a background thread that iterates over the state and storage tries,
735+ // constructing the state snapshot. All the arguments are purely for statistics
736+ // gathering and logging, since the method surfs the blocks as they arrive, often
737+ // being restarted.
738+ func (dl * diskLayer ) generate (stats * generatorStats ) {
739+ var accMarker []byte
740+ if len (dl .genMarker ) > 0 { // []byte{} is the start, use nil for that
741+ accMarker = dl .genMarker [:common .HashLength ]
742+ }
743+ var (
744+ batch = dl .diskdb .NewBatch ()
745+ logged = time .Now ()
746+ abort chan * generatorStats
747+ )
748+ stats .Log ("Resuming state snapshot generation" , dl .root , dl .genMarker )
749+
750+ // Generate the snapshot accounts from the point where they left off.
751+ if err := generateAccounts (dl , accMarker , batch , stats , & logged ); err != nil {
752+ // Extract the received interruption signal if exists
753+ if aerr , ok := err .(* abortErr ); ok {
754+ abort = aerr .abort
755+ }
756+ // Aborted by internal error, wait the signal
757+ if abort == nil {
758+ abort = <- dl .genAbort
759+ }
760+ abort <- stats
761+ return
762+ }
738763 // Snapshot fully generated, set the marker to nil.
739764 // Note even there is nothing to commit, persist the
740765 // generator anyway to mark the snapshot is complete.
@@ -762,7 +787,7 @@ func (dl *diskLayer) generate(stats *generatorStats) {
762787}
763788
764789// increaseKey increase the input key by one bit. Return nil if the entire
765- // addition operation overflows,
790+ // addition operation overflows.
766791func increaseKey (key []byte ) []byte {
767792 for i := len (key ) - 1 ; i >= 0 ; i -- {
768793 key [i ]++
@@ -772,3 +797,17 @@ func increaseKey(key []byte) []byte {
772797 }
773798 return nil
774799}
800+
801+ // abortErr wraps an interruption signal received to represent the
802+ // generation is aborted by external processes.
803+ type abortErr struct {
804+ abort chan * generatorStats
805+ }
806+
807+ func newAbortErr (abort chan * generatorStats ) error {
808+ return & abortErr {abort : abort }
809+ }
810+
811+ func (err * abortErr ) Error () string {
812+ return "aborted"
813+ }
0 commit comments