Skip to content
This repository has been archived by the owner on Feb 18, 2025. It is now read-only.

core, accounts, eth, trie: pbss fix release v1.13.2 #615

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions accounts/abi/bind/backends/simulated.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ func (b *SimulatedBackend) CodeAt(ctx context.Context, contract common.Address,
if err != nil {
return nil, err
}

return stateDB.GetCode(contract), nil
}

Expand All @@ -194,7 +193,6 @@ func (b *SimulatedBackend) BalanceAt(ctx context.Context, contract common.Addres
if err != nil {
return nil, err
}

return stateDB.GetBalance(contract), nil
}

Expand All @@ -207,7 +205,6 @@ func (b *SimulatedBackend) NonceAt(ctx context.Context, contract common.Address,
if err != nil {
return 0, err
}

return stateDB.GetNonce(contract), nil
}

Expand All @@ -220,7 +217,6 @@ func (b *SimulatedBackend) StorageAt(ctx context.Context, contract common.Addres
if err != nil {
return nil, err
}

val := stateDB.GetState(contract, key)
return val[:], nil
}
Expand Down Expand Up @@ -667,7 +663,10 @@ func (b *SimulatedBackend) SendTransaction(ctx context.Context, tx *types.Transa
}
block.AddTxWithChain(b.blockchain, tx)
}, true)
stateDB, _ := b.blockchain.State()
stateDB, err := b.blockchain.State()
if err != nil {
return err
}

b.pendingBlock = blocks[0]
b.pendingState, _ = state.New(b.pendingBlock.Root(), stateDB.Database(), nil)
Expand Down Expand Up @@ -782,11 +781,13 @@ func (b *SimulatedBackend) AdjustTime(adjustment time.Duration) error {
blocks, _ := core.GenerateChain(b.config, b.blockchain.CurrentBlock(), ethash.NewFaker(), b.database, 1, func(number int, block *core.BlockGen) {
block.OffsetTime(int64(adjustment.Seconds()))
}, true)
stateDB, _ := b.blockchain.State()
stateDB, err := b.blockchain.State()
if err != nil {
return err
}

b.pendingBlock = blocks[0]
b.pendingState, _ = state.New(b.pendingBlock.Root(), stateDB.Database(), nil)

return nil
}

Expand Down
1 change: 1 addition & 0 deletions cmd/devp2p/internal/ethtest/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func setupGeth(stack *node.Node) error {
if err != nil {
return err
}
backend.SetSynced()

_, err = backend.BlockChain().InsertChain(chain.blocks[1:], nil)
return err
Expand Down
51 changes: 17 additions & 34 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,12 +374,11 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
if !bc.HasState(head.Root()) {
if head.NumberU64() == 0 {
// The genesis state is missing, which is only possible in the path-based
// scheme. This situation occurs when the state syncer overwrites it.
//
// The solution is to reset the state to the genesis state. Although it may not
// match the sync target, the state healer will later address and correct any
// inconsistencies.
bc.resetState()
// scheme. This situation occurs when the initial state sync is not finished
// yet, or the chain head is rewound below the pivot point. In both scenario,
// there is no possible recovery approach except for rerunning a snap sync.
// Do nothing here until the state syncer picks it up.
log.Info("Genesis state is missing, wait state sync")
} else {
// Head state is missing, before the state recovery, find out the
// disk layer point of snapshot(if it's enabled). Make sure the
Expand Down Expand Up @@ -675,28 +674,6 @@ func (bc *BlockChain) SetHead(head uint64) error {
return err
}

// resetState resets the persistent state to genesis state if it's not present.
func (bc *BlockChain) resetState() {
// Short circuit if the genesis state is already present.
root := bc.genesisBlock.Root()
if bc.HasState(root) {
return
}
// Reset the state database to empty for committing genesis state.
// Note, it should only happen in path-based scheme and Reset function
// is also only call-able in this mode.
if bc.triedb.Scheme() == rawdb.PathScheme {
if err := bc.triedb.Reset(types.EmptyRootHash); err != nil {
log.Crit("Failed to clean state", "err", err) // Shouldn't happen
}
}
// Write genesis state into database.
if err := CommitGenesisState(bc.db, bc.triedb, bc.genesisBlock.Hash()); err != nil {
log.Crit("Failed to commit genesis state", "err", err)
}
log.Info("Reset state to genesis", "root", root)
}

// setHeadBeyondRoot rewinds the local chain to a new head with the extra condition
// that the rewind must pass the specified state root. This method is meant to be
// used when rewinding with snapshots enabled to ensure that we go back further than
Expand Down Expand Up @@ -728,7 +705,6 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, root common.Hash, repair bo
if newHeadBlock == nil {
log.Error("Gap in the chain, rewinding to genesis", "number", header.Number, "hash", header.Hash())
newHeadBlock = bc.genesisBlock
bc.resetState()
} else {
// Block exists, keep rewinding until we find one with state,
// keeping rewinding until we exceed the optional threshold
Expand Down Expand Up @@ -758,16 +734,14 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, root common.Hash, repair bo
}
}
if beyondRoot || newHeadBlock.NumberU64() == 0 {
if newHeadBlock.NumberU64() == 0 {
bc.resetState()
} else if !bc.HasState(newHeadBlock.Root()) {
if !bc.HasState(newHeadBlock.Root()) && bc.stateRecoverable(newHeadBlock.Root()) {
// Rewind to a block with recoverable state. If the state is
// missing, run the state recovery here.
if err := bc.triedb.Recover(newHeadBlock.Root()); err != nil {
log.Crit("Failed to rollback state", "err", err) // Shouldn't happen
}
log.Debug("Rewound to block with state", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash())
}
log.Debug("Rewound to block with state", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash())
break
}
log.Debug("Skipping block with threshold state", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash(), "root", newHeadBlock.Root())
Expand All @@ -782,6 +756,15 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, root common.Hash, repair bo
// to low, so it's safe the update in-memory markers directly.
bc.currentBlock.Store(newHeadBlock)
headBlockGauge.Update(int64(newHeadBlock.NumberU64()))

// The head state is missing, which is only possible in the path-based
// scheme. This situation occurs when the chain head is rewound below
// the pivot point. In this scenario, there is no possible recovery
// approach except for rerunning a snap sync. Do nothing here until the
// state syncer picks it up.
if !bc.HasState(newHeadBlock.Root()) {
log.Info("Chain is stateless, wait state sync", "number", newHeadBlock.Number(), "hash", newHeadBlock.Hash())
}
}
// Rewind the fast block in a simpleton way to the target head
if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock != nil && header.Number.Uint64() < currentFastBlock.NumberU64() {
Expand Down Expand Up @@ -865,7 +848,7 @@ func (bc *BlockChain) FastSyncCommitHead(hash common.Hash) error {
// Reset the trie database with the fresh fast synced state.
root := block.Root()
if bc.triedb.Scheme() == rawdb.PathScheme {
if err := bc.triedb.Reset(root); err != nil {
if err := bc.triedb.Enable(root); err != nil {
return err
}
}
Expand Down
44 changes: 44 additions & 0 deletions core/rawdb/accessors_sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2022 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package rawdb

import (
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
)

const (
StateSyncUnknown = uint8(0) // flags the state snap sync is unknown
StateSyncRunning = uint8(1) // flags the state snap sync is not completed yet
StateSyncFinished = uint8(2) // flags the state snap sync is completed
)

// ReadSnapSyncStatusFlag retrieves the state snap sync status flag.
func ReadSnapSyncStatusFlag(db ethdb.KeyValueReader) uint8 {
blob, err := db.Get(snapSyncStatusFlagKey)
if err != nil || len(blob) != 1 {
return StateSyncUnknown
}
return blob[0]
}

// WriteSnapSyncStatusFlag stores the state snap sync status flag into database.
func WriteSnapSyncStatusFlag(db ethdb.KeyValueWriter, flag uint8) {
if err := db.Put(snapSyncStatusFlagKey, []byte{flag}); err != nil {
log.Crit("Failed to store sync status flag", "err", err)
}
}
20 changes: 20 additions & 0 deletions core/rawdb/accessors_trie.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,16 @@ func HasAccountTrieNode(db ethdb.KeyValueReader, path []byte, hash common.Hash)
return h.hash(data) == hash
}

// ExistsAccountTrieNode checks the presence of the account trie node with the
// specified node path, regardless of the node hash.
func ExistsAccountTrieNode(db ethdb.KeyValueReader, path []byte) bool {
has, err := db.Has(accountTrieNodeKey(path))
if err != nil {
return false
}
return has
}

// WriteAccountTrieNode writes the provided account trie node into database.
func WriteAccountTrieNode(db ethdb.KeyValueWriter, path []byte, node []byte) {
if err := db.Put(accountTrieNodeKey(path), node); err != nil {
Expand Down Expand Up @@ -126,6 +136,16 @@ func HasStorageTrieNode(db ethdb.KeyValueReader, accountHash common.Hash, path [
return h.hash(data) == hash
}

// ExistsStorageTrieNode checks the presence of the storage trie node with the
// specified account hash and node path, regardless of the node hash.
func ExistsStorageTrieNode(db ethdb.KeyValueReader, accountHash common.Hash, path []byte) bool {
has, err := db.Has(storageTrieNodeKey(accountHash, path))
if err != nil {
return false
}
return has
}

// WriteStorageTrieNode writes the provided storage trie node into database.
func WriteStorageTrieNode(db ethdb.KeyValueWriter, accountHash common.Hash, path []byte, node []byte) {
if err := db.Put(storageTrieNodeKey(accountHash, path), node); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion core/rawdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
fastTrieProgressKey, snapshotDisabledKey, SnapshotRootKey, snapshotJournalKey,
snapshotGeneratorKey, snapshotRecoveryKey, txIndexTailKey, fastTxLookupLimitKey,
uncleanShutdownKey, badBlockKey, highestFinalityVoteKey, storeInternalTxsEnabledKey,
snapshotSyncStatusKey, persistentStateIDKey, trieJournalKey, snapshotSyncStatusKey,
snapshotSyncStatusKey, persistentStateIDKey, trieJournalKey, snapshotSyncStatusKey, snapSyncStatusFlagKey,
} {
if bytes.Equal(key, meta) {
metadata.Add(size)
Expand Down
3 changes: 3 additions & 0 deletions core/rawdb/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ var (

snapshotConsortiumPrefix = []byte("consortium-") // key = ConsortiumSnapshotPrefix + block hash

// snapSyncStatusFlagKey flags that status of snap sync.
snapSyncStatusFlagKey = []byte("SnapSyncStatus")

// Data item prefixes (use single byte to avoid mixing data types, avoid `i`, used for indexes).
headerPrefix = []byte("h") // headerPrefix + num (uint64 big endian) + hash -> header
headerTDSuffix = []byte("t") // headerPrefix + num (uint64 big endian) + hash + headerTDSuffix -> td
Expand Down
10 changes: 8 additions & 2 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,10 @@ func (b *EthAPIBackend) StateAndHeaderByNumber(ctx context.Context, number rpc.B
return nil, nil, errors.New("header not found")
}
stateDb, err := b.eth.BlockChain().StateAt(header.Root)
return stateDb, header, err
if err != nil {
return nil, nil, err
}
return stateDb, header, nil
}

func (b *EthAPIBackend) StateAndHeaderByNumberOrHash(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) (*state.StateDB, *types.Header, error) {
Expand All @@ -187,7 +190,10 @@ func (b *EthAPIBackend) StateAndHeaderByNumberOrHash(ctx context.Context, blockN
return nil, nil, errors.New("hash is not currently canonical")
}
stateDb, err := b.eth.BlockChain().StateAt(header.Root)
return stateDb, header, err
if err != nil {
return nil, nil, err
}
return stateDb, header, nil
}
return nil, nil, errors.New("invalid arguments; neither block nor hash specified")
}
Expand Down
3 changes: 2 additions & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,8 @@ func (s *Ethereum) Engine() consensus.Engine { return s.engine }
func (s *Ethereum) ChainDb() ethdb.Database { return s.chainDb }
func (s *Ethereum) IsListening() bool { return true } // Always listening
func (s *Ethereum) Downloader() *downloader.Downloader { return s.handler.downloader }
func (s *Ethereum) Synced() bool { return atomic.LoadUint32(&s.handler.acceptTxs) == 1 }
func (s *Ethereum) Synced() bool { return atomic.LoadUint32(&s.handler.synced) == 1 }
func (s *Ethereum) SetSynced() { s.handler.enableSyncedFeatures() }
func (s *Ethereum) ArchiveMode() bool { return s.config.NoPruning }
func (s *Ethereum) BloomIndexer() *core.ChainIndexer { return s.bloomIndexer }

Expand Down
4 changes: 3 additions & 1 deletion eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,9 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
// subsequent state reads, explicitly disable the trie database and state
// syncer is responsible to address and correct any state missing.
if d.blockchain.TrieDB().Scheme() == rawdb.PathScheme {
d.blockchain.TrieDB().Reset(types.EmptyRootHash)
if err := d.blockchain.TrieDB().Disable(); err != nil {
return err
}
}
// Snap sync uses the snapshot namespace to store potentially flakey data until
// sync completely heals and finishes. Pause snapshot maintenance in the mean
Expand Down
39 changes: 26 additions & 13 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ type handler struct {
networkID uint64
forkFilter forkid.Filter // Fork ID filter, constant across the lifetime of the node

fastSync uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks)
snapSync uint32 // Flag whether fast sync should operate on top of the snap protocol
acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing)
fastSync uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks)
snapSync uint32 // Flag whether fast sync should operate on top of the snap protocol
synced uint32 // Flag whether we're considered synchronised (enables transaction processing)

checkpointNumber uint64 // Block number for the sync progress validator to cross reference
checkpointHash common.Hash // Block hash for the sync progress validator to cross reference
Expand Down Expand Up @@ -183,17 +183,23 @@ func newHandler(config *handlerConfig) (*handler, error) {
fullBlock, fastBlock := h.chain.CurrentBlock(), h.chain.CurrentFastBlock()
if fullBlock.NumberU64() == 0 && fastBlock.NumberU64() > 0 {
h.fastSync = uint32(1)
log.Warn("Switch sync mode from full sync to fast sync")
log.Warn("Switch sync mode from full sync to fast sync", "reason", "snap sync incomplete")
} else if !h.chain.HasState(fullBlock.Root()) {
h.fastSync = uint32(1)
log.Warn("Switch sync mode from full sync to snap sync", "reason", "head state missing")
}
} else {
if h.chain.CurrentBlock().NumberU64() > 0 {
head := h.chain.CurrentBlock()
if head.NumberU64() > 0 {
// Print warning log if database is not empty to run fast sync.
log.Warn("Switch sync mode from fast sync to full sync")
} else {
// If fast sync was requested and our database is empty, grant it
h.fastSync = uint32(1)
log.Info("Enabled fast sync", "head", head.Number, "hash", head.Hash())
if config.Sync == downloader.SnapSync {
h.snapSync = uint32(1)
log.Info("Enabled snap sync", "head", head.Number, "hash", head.Hash())
}
}
}
Expand Down Expand Up @@ -237,15 +243,11 @@ func newHandler(config *handlerConfig) (*handler, error) {
// accept each others' blocks until a restart. Unfortunately we haven't figured
// out a way yet where nodes can decide unilaterally whether the network is new
// or not. This should be fixed if we figure out a solution.
if atomic.LoadUint32(&h.fastSync) == 1 {
log.Warn("Fast syncing, discarded propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash())
if atomic.LoadUint32(&h.synced) == 0 {
log.Warn("Syncing, discarded propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash())
return 0, nil
}
n, err := h.chain.InsertChain(blocks, sidecars)
if err == nil {
h.enableSyncedFeatures() // Mark initial sync done on any fetcher import
}
return n, err
return h.chain.InsertChain(blocks, sidecars)
}
h.blockFetcher = fetcher.NewBlockFetcher(false, nil, h.chain.GetBlockByHash, validator, h.chain.Engine().VerifyBlobHeader,
h.BroadcastBlock, heighter, nil, inserter, h.removePeer)
Expand Down Expand Up @@ -706,7 +708,18 @@ func (h *handler) voteBroadcastLoop() {
// enableSyncedFeatures enables the post-sync functionalities when the initial
// sync is finished.
func (h *handler) enableSyncedFeatures() {
atomic.StoreUint32(&h.acceptTxs, 1)
atomic.StoreUint32(&h.synced, 1)

// If we were running fast/snap sync and it finished, disable doing another
// round on next sync cycle
if atomic.LoadUint32(&h.fastSync) == 1 {
log.Info("Fast sync complete, auto disabling")
atomic.StoreUint32(&h.fastSync, 0)
}
if atomic.LoadUint32(&h.snapSync) == 1 {
log.Info("Snap sync complete, auto disabling")
atomic.StoreUint32(&h.snapSync, 0)
}
if h.chain.TrieDB().Scheme() == rawdb.PathScheme {
h.chain.TrieDB().SetBufferSize(pathdb.DefaultBufferSize)
}
Expand Down
2 changes: 1 addition & 1 deletion eth/handler_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (h *ethHandler) PeerInfo(id enode.ID) interface{} {
// AcceptTxs retrieves whether transaction processing is enabled on the node
// or if inbound transactions should simply be dropped.
func (h *ethHandler) AcceptTxs() bool {
return atomic.LoadUint32(&h.acceptTxs) == 1
return atomic.LoadUint32(&h.synced) == 1
}

// Handle is invoked from a peer's message handler when it receives a new remote
Expand Down
Loading
Loading