Skip to content

Commit

Permalink
core, accounts, eth, trie: pbss fix release v1.13.2 (#615)
Browse files Browse the repository at this point in the history
* core, accounts, eth, trie: handle genesis state missing (#28171)

* core, accounts, eth, trie: handle genesis state missing

* core, eth, trie: polish

* core: manage txpool subscription in mainpool

* eth/backend: fix test

* cmd, eth: fix test

* core/rawdb, trie/triedb/pathdb: address comments

* eth, trie: address comments

* eth: inline the function

* eth: use synced flag

* core/txpool: revert changes in txpool

* core, eth, trie: rename functions

* trie: remove internal nodes between shortNode and child in path mode (#28163)

* trie: remove internal nodes between shortNode and child in path mode

* trie: address comments

* core/rawdb, trie: address comments

* core/rawdb: delete unused func

* trie: change comments

* trie: add missing tests

* trie: fix lint

---------

Co-authored-by: rjl493456442 <garyrong0905@gmail.com>
  • Loading branch information
Francesco4203 and rjl493456442 authored Oct 29, 2024
1 parent 1615821 commit 661778c
Show file tree
Hide file tree
Showing 22 changed files with 480 additions and 170 deletions.
15 changes: 8 additions & 7 deletions accounts/abi/bind/backends/simulated.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ func (b *SimulatedBackend) CodeAt(ctx context.Context, contract common.Address,
if err != nil {
return nil, err
}

return stateDB.GetCode(contract), nil
}

Expand All @@ -194,7 +193,6 @@ func (b *SimulatedBackend) BalanceAt(ctx context.Context, contract common.Addres
if err != nil {
return nil, err
}

return stateDB.GetBalance(contract), nil
}

Expand All @@ -207,7 +205,6 @@ func (b *SimulatedBackend) NonceAt(ctx context.Context, contract common.Address,
if err != nil {
return 0, err
}

return stateDB.GetNonce(contract), nil
}

Expand All @@ -220,7 +217,6 @@ func (b *SimulatedBackend) StorageAt(ctx context.Context, contract common.Addres
if err != nil {
return nil, err
}

val := stateDB.GetState(contract, key)
return val[:], nil
}
Expand Down Expand Up @@ -667,7 +663,10 @@ func (b *SimulatedBackend) SendTransaction(ctx context.Context, tx *types.Transa
}
block.AddTxWithChain(b.blockchain, tx)
}, true)
stateDB, _ := b.blockchain.State()
stateDB, err := b.blockchain.State()
if err != nil {
return err
}

b.pendingBlock = blocks[0]
b.pendingState, _ = state.New(b.pendingBlock.Root(), stateDB.Database(), nil)
Expand Down Expand Up @@ -782,11 +781,13 @@ func (b *SimulatedBackend) AdjustTime(adjustment time.Duration) error {
blocks, _ := core.GenerateChain(b.config, b.blockchain.CurrentBlock(), ethash.NewFaker(), b.database, 1, func(number int, block *core.BlockGen) {
block.OffsetTime(int64(adjustment.Seconds()))
}, true)
stateDB, _ := b.blockchain.State()
stateDB, err := b.blockchain.State()
if err != nil {
return err
}

b.pendingBlock = blocks[0]
b.pendingState, _ = state.New(b.pendingBlock.Root(), stateDB.Database(), nil)

return nil
}

Expand Down
1 change: 1 addition & 0 deletions cmd/devp2p/internal/ethtest/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func setupGeth(stack *node.Node) error {
if err != nil {
return err
}
backend.SetSynced()

_, err = backend.BlockChain().InsertChain(chain.blocks[1:], nil)
return err
Expand Down
51 changes: 17 additions & 34 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,12 +374,11 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
if !bc.HasState(head.Root()) {
if head.NumberU64() == 0 {
// The genesis state is missing, which is only possible in the path-based
// scheme. This situation occurs when the state syncer overwrites it.
//
// The solution is to reset the state to the genesis state. Although it may not
// match the sync target, the state healer will later address and correct any
// inconsistencies.
bc.resetState()
// scheme. This situation occurs when the initial state sync is not finished
// yet, or the chain head is rewound below the pivot point. In both scenario,
// there is no possible recovery approach except for rerunning a snap sync.
// Do nothing here until the state syncer picks it up.
log.Info("Genesis state is missing, wait state sync")
} else {
// Head state is missing, before the state recovery, find out the
// disk layer point of snapshot(if it's enabled). Make sure the
Expand Down Expand Up @@ -675,28 +674,6 @@ func (bc *BlockChain) SetHead(head uint64) error {
return err
}

// resetState resets the persistent state to genesis state if it's not present.
func (bc *BlockChain) resetState() {
// Short circuit if the genesis state is already present.
root := bc.genesisBlock.Root()
if bc.HasState(root) {
return
}
// Reset the state database to empty for committing genesis state.
// Note, it should only happen in path-based scheme and Reset function
// is also only call-able in this mode.
if bc.triedb.Scheme() == rawdb.PathScheme {
if err := bc.triedb.Reset(types.EmptyRootHash); err != nil {
log.Crit("Failed to clean state", "err", err) // Shouldn't happen
}
}
// Write genesis state into database.
if err := CommitGenesisState(bc.db, bc.triedb, bc.genesisBlock.Hash()); err != nil {
log.Crit("Failed to commit genesis state", "err", err)
}
log.Info("Reset state to genesis", "root", root)
}

// setHeadBeyondRoot rewinds the local chain to a new head with the extra condition
// that the rewind must pass the specified state root. This method is meant to be
// used when rewinding with snapshots enabled to ensure that we go back further than
Expand Down Expand Up @@ -728,7 +705,6 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, root common.Hash, repair bo
if newHeadBlock == nil {
log.Error("Gap in the chain, rewinding to genesis", "number", header.Number, "hash", header.Hash())
newHeadBlock = bc.genesisBlock
bc.resetState()
} else {
// Block exists, keep rewinding until we find one with state,
// keeping rewinding until we exceed the optional threshold
Expand Down Expand Up @@ -758,16 +734,14 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, root common.Hash, repair bo
}
}
if beyondRoot || newHeadBlock.NumberU64() == 0 {
if newHeadBlock.NumberU64() == 0 {
bc.resetState()
} else if !bc.HasState(newHeadBlock.Root()) {
if !bc.HasState(newHeadBlock.Root()) && bc.stateRecoverable(newHeadBlock.Root()) {
// Rewind to a block with recoverable state. If the state is
// missing, run the state recovery here.
if err := bc.triedb.Recover(newHeadBlock.Root()); err != nil {
log.Crit("Failed to rollback state", "err", err) // Shouldn't happen
}
log.Debug("Rewound to block with state", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash())
}
log.Debug("Rewound to block with state", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash())
break
}
log.Debug("Skipping block with threshold state", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash(), "root", newHeadBlock.Root())
Expand All @@ -782,6 +756,15 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, root common.Hash, repair bo
// to low, so it's safe the update in-memory markers directly.
bc.currentBlock.Store(newHeadBlock)
headBlockGauge.Update(int64(newHeadBlock.NumberU64()))

// The head state is missing, which is only possible in the path-based
// scheme. This situation occurs when the chain head is rewound below
// the pivot point. In this scenario, there is no possible recovery
// approach except for rerunning a snap sync. Do nothing here until the
// state syncer picks it up.
if !bc.HasState(newHeadBlock.Root()) {
log.Info("Chain is stateless, wait state sync", "number", newHeadBlock.Number(), "hash", newHeadBlock.Hash())
}
}
// Rewind the fast block in a simpleton way to the target head
if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock != nil && header.Number.Uint64() < currentFastBlock.NumberU64() {
Expand Down Expand Up @@ -865,7 +848,7 @@ func (bc *BlockChain) FastSyncCommitHead(hash common.Hash) error {
// Reset the trie database with the fresh fast synced state.
root := block.Root()
if bc.triedb.Scheme() == rawdb.PathScheme {
if err := bc.triedb.Reset(root); err != nil {
if err := bc.triedb.Enable(root); err != nil {
return err
}
}
Expand Down
44 changes: 44 additions & 0 deletions core/rawdb/accessors_sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2022 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package rawdb

import (
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
)

const (
StateSyncUnknown = uint8(0) // flags the state snap sync is unknown
StateSyncRunning = uint8(1) // flags the state snap sync is not completed yet
StateSyncFinished = uint8(2) // flags the state snap sync is completed
)

// ReadSnapSyncStatusFlag retrieves the state snap sync status flag.
func ReadSnapSyncStatusFlag(db ethdb.KeyValueReader) uint8 {
blob, err := db.Get(snapSyncStatusFlagKey)
if err != nil || len(blob) != 1 {
return StateSyncUnknown
}
return blob[0]
}

// WriteSnapSyncStatusFlag stores the state snap sync status flag into database.
func WriteSnapSyncStatusFlag(db ethdb.KeyValueWriter, flag uint8) {
if err := db.Put(snapSyncStatusFlagKey, []byte{flag}); err != nil {
log.Crit("Failed to store sync status flag", "err", err)
}
}
20 changes: 20 additions & 0 deletions core/rawdb/accessors_trie.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,16 @@ func HasAccountTrieNode(db ethdb.KeyValueReader, path []byte, hash common.Hash)
return h.hash(data) == hash
}

// ExistsAccountTrieNode checks the presence of the account trie node with the
// specified node path, regardless of the node hash.
func ExistsAccountTrieNode(db ethdb.KeyValueReader, path []byte) bool {
has, err := db.Has(accountTrieNodeKey(path))
if err != nil {
return false
}
return has
}

// WriteAccountTrieNode writes the provided account trie node into database.
func WriteAccountTrieNode(db ethdb.KeyValueWriter, path []byte, node []byte) {
if err := db.Put(accountTrieNodeKey(path), node); err != nil {
Expand Down Expand Up @@ -126,6 +136,16 @@ func HasStorageTrieNode(db ethdb.KeyValueReader, accountHash common.Hash, path [
return h.hash(data) == hash
}

// ExistsStorageTrieNode checks the presence of the storage trie node with the
// specified account hash and node path, regardless of the node hash.
func ExistsStorageTrieNode(db ethdb.KeyValueReader, accountHash common.Hash, path []byte) bool {
has, err := db.Has(storageTrieNodeKey(accountHash, path))
if err != nil {
return false
}
return has
}

// WriteStorageTrieNode writes the provided storage trie node into database.
func WriteStorageTrieNode(db ethdb.KeyValueWriter, accountHash common.Hash, path []byte, node []byte) {
if err := db.Put(storageTrieNodeKey(accountHash, path), node); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion core/rawdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
fastTrieProgressKey, snapshotDisabledKey, SnapshotRootKey, snapshotJournalKey,
snapshotGeneratorKey, snapshotRecoveryKey, txIndexTailKey, fastTxLookupLimitKey,
uncleanShutdownKey, badBlockKey, highestFinalityVoteKey, storeInternalTxsEnabledKey,
snapshotSyncStatusKey, persistentStateIDKey, trieJournalKey, snapshotSyncStatusKey,
snapshotSyncStatusKey, persistentStateIDKey, trieJournalKey, snapshotSyncStatusKey, snapSyncStatusFlagKey,
} {
if bytes.Equal(key, meta) {
metadata.Add(size)
Expand Down
3 changes: 3 additions & 0 deletions core/rawdb/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ var (

snapshotConsortiumPrefix = []byte("consortium-") // key = ConsortiumSnapshotPrefix + block hash

// snapSyncStatusFlagKey flags that status of snap sync.
snapSyncStatusFlagKey = []byte("SnapSyncStatus")

// Data item prefixes (use single byte to avoid mixing data types, avoid `i`, used for indexes).
headerPrefix = []byte("h") // headerPrefix + num (uint64 big endian) + hash -> header
headerTDSuffix = []byte("t") // headerPrefix + num (uint64 big endian) + hash + headerTDSuffix -> td
Expand Down
10 changes: 8 additions & 2 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,10 @@ func (b *EthAPIBackend) StateAndHeaderByNumber(ctx context.Context, number rpc.B
return nil, nil, errors.New("header not found")
}
stateDb, err := b.eth.BlockChain().StateAt(header.Root)
return stateDb, header, err
if err != nil {
return nil, nil, err
}
return stateDb, header, nil
}

func (b *EthAPIBackend) StateAndHeaderByNumberOrHash(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) (*state.StateDB, *types.Header, error) {
Expand All @@ -187,7 +190,10 @@ func (b *EthAPIBackend) StateAndHeaderByNumberOrHash(ctx context.Context, blockN
return nil, nil, errors.New("hash is not currently canonical")
}
stateDb, err := b.eth.BlockChain().StateAt(header.Root)
return stateDb, header, err
if err != nil {
return nil, nil, err
}
return stateDb, header, nil
}
return nil, nil, errors.New("invalid arguments; neither block nor hash specified")
}
Expand Down
3 changes: 2 additions & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,8 @@ func (s *Ethereum) Engine() consensus.Engine { return s.engine }
func (s *Ethereum) ChainDb() ethdb.Database { return s.chainDb }
func (s *Ethereum) IsListening() bool { return true } // Always listening
func (s *Ethereum) Downloader() *downloader.Downloader { return s.handler.downloader }
func (s *Ethereum) Synced() bool { return atomic.LoadUint32(&s.handler.acceptTxs) == 1 }
func (s *Ethereum) Synced() bool { return atomic.LoadUint32(&s.handler.synced) == 1 }
func (s *Ethereum) SetSynced() { s.handler.enableSyncedFeatures() }
func (s *Ethereum) ArchiveMode() bool { return s.config.NoPruning }
func (s *Ethereum) BloomIndexer() *core.ChainIndexer { return s.bloomIndexer }

Expand Down
4 changes: 3 additions & 1 deletion eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,9 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
// subsequent state reads, explicitly disable the trie database and state
// syncer is responsible to address and correct any state missing.
if d.blockchain.TrieDB().Scheme() == rawdb.PathScheme {
d.blockchain.TrieDB().Reset(types.EmptyRootHash)
if err := d.blockchain.TrieDB().Disable(); err != nil {
return err
}
}
// Snap sync uses the snapshot namespace to store potentially flakey data until
// sync completely heals and finishes. Pause snapshot maintenance in the mean
Expand Down
39 changes: 26 additions & 13 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ type handler struct {
networkID uint64
forkFilter forkid.Filter // Fork ID filter, constant across the lifetime of the node

fastSync uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks)
snapSync uint32 // Flag whether fast sync should operate on top of the snap protocol
acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing)
fastSync uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks)
snapSync uint32 // Flag whether fast sync should operate on top of the snap protocol
synced uint32 // Flag whether we're considered synchronised (enables transaction processing)

checkpointNumber uint64 // Block number for the sync progress validator to cross reference
checkpointHash common.Hash // Block hash for the sync progress validator to cross reference
Expand Down Expand Up @@ -183,17 +183,23 @@ func newHandler(config *handlerConfig) (*handler, error) {
fullBlock, fastBlock := h.chain.CurrentBlock(), h.chain.CurrentFastBlock()
if fullBlock.NumberU64() == 0 && fastBlock.NumberU64() > 0 {
h.fastSync = uint32(1)
log.Warn("Switch sync mode from full sync to fast sync")
log.Warn("Switch sync mode from full sync to fast sync", "reason", "snap sync incomplete")
} else if !h.chain.HasState(fullBlock.Root()) {
h.fastSync = uint32(1)
log.Warn("Switch sync mode from full sync to snap sync", "reason", "head state missing")
}
} else {
if h.chain.CurrentBlock().NumberU64() > 0 {
head := h.chain.CurrentBlock()
if head.NumberU64() > 0 {
// Print warning log if database is not empty to run fast sync.
log.Warn("Switch sync mode from fast sync to full sync")
} else {
// If fast sync was requested and our database is empty, grant it
h.fastSync = uint32(1)
log.Info("Enabled fast sync", "head", head.Number, "hash", head.Hash())
if config.Sync == downloader.SnapSync {
h.snapSync = uint32(1)
log.Info("Enabled snap sync", "head", head.Number, "hash", head.Hash())
}
}
}
Expand Down Expand Up @@ -237,15 +243,11 @@ func newHandler(config *handlerConfig) (*handler, error) {
// accept each others' blocks until a restart. Unfortunately we haven't figured
// out a way yet where nodes can decide unilaterally whether the network is new
// or not. This should be fixed if we figure out a solution.
if atomic.LoadUint32(&h.fastSync) == 1 {
log.Warn("Fast syncing, discarded propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash())
if atomic.LoadUint32(&h.synced) == 0 {
log.Warn("Syncing, discarded propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash())
return 0, nil
}
n, err := h.chain.InsertChain(blocks, sidecars)
if err == nil {
h.enableSyncedFeatures() // Mark initial sync done on any fetcher import
}
return n, err
return h.chain.InsertChain(blocks, sidecars)
}
h.blockFetcher = fetcher.NewBlockFetcher(false, nil, h.chain.GetBlockByHash, validator, h.chain.Engine().VerifyBlobHeader,
h.BroadcastBlock, heighter, nil, inserter, h.removePeer)
Expand Down Expand Up @@ -706,7 +708,18 @@ func (h *handler) voteBroadcastLoop() {
// enableSyncedFeatures enables the post-sync functionalities when the initial
// sync is finished.
func (h *handler) enableSyncedFeatures() {
atomic.StoreUint32(&h.acceptTxs, 1)
atomic.StoreUint32(&h.synced, 1)

// If we were running fast/snap sync and it finished, disable doing another
// round on next sync cycle
if atomic.LoadUint32(&h.fastSync) == 1 {
log.Info("Fast sync complete, auto disabling")
atomic.StoreUint32(&h.fastSync, 0)
}
if atomic.LoadUint32(&h.snapSync) == 1 {
log.Info("Snap sync complete, auto disabling")
atomic.StoreUint32(&h.snapSync, 0)
}
if h.chain.TrieDB().Scheme() == rawdb.PathScheme {
h.chain.TrieDB().SetBufferSize(pathdb.DefaultBufferSize)
}
Expand Down
2 changes: 1 addition & 1 deletion eth/handler_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (h *ethHandler) PeerInfo(id enode.ID) interface{} {
// AcceptTxs retrieves whether transaction processing is enabled on the node
// or if inbound transactions should simply be dropped.
func (h *ethHandler) AcceptTxs() bool {
return atomic.LoadUint32(&h.acceptTxs) == 1
return atomic.LoadUint32(&h.synced) == 1
}

// Handle is invoked from a peer's message handler when it receives a new remote
Expand Down
Loading

0 comments on commit 661778c

Please sign in to comment.