Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: use atomic type #27011

Merged
merged 1 commit into from
Mar 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ type BlockChain struct {
triegc *prque.Prque[int64, common.Hash] // Priority queue mapping block numbers to tries to gc
gcproc time.Duration // Accumulates canonical block processing for trie dumping
lastWrite uint64 // Last block when the state was flushed
flushInterval int64 // Time interval (processing time) after which to flush a state
flushInterval atomic.Int64 // Time interval (processing time) after which to flush a state
triedb *trie.Database // The database handler for maintaining trie nodes.
stateCache state.Database // State database to reuse between imports (contains state cache)

Expand Down Expand Up @@ -215,8 +215,8 @@ type BlockChain struct {

wg sync.WaitGroup //
quit chan struct{} // shutdown signal, closed in Stop.
running int32 // 0 if chain is running, 1 when stopped
procInterrupt int32 // interrupt signaler for block processing
stopping atomic.Bool // false if chain is running, true when stopped
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps stopped?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think stopping is more correct -- it becomes true when stopWithoutSaving is invoked -- flipping this bool is basically the very first thing that happens on a call to Stop(). After this, all the waits happen.

So this boolean does not mean it is stopped, but rather that the process of stopping the blockchain has (at least) begun.

So IMO it's good the way it is

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the comment need to change to true when stopping?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, "need to change" I dunno, but it would be slightly more accurate to say "set to true when stop is initiated"

procInterrupt atomic.Bool // interrupt signaler for block processing

engine consensus.Engine
validator Validator // Block and state validator interface
Expand Down Expand Up @@ -260,7 +260,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
cacheConfig: cacheConfig,
db: db,
triedb: triedb,
flushInterval: int64(cacheConfig.TrieTimeLimit),
triegc: prque.New[int64, common.Hash](nil),
quit: make(chan struct{}),
chainmu: syncx.NewClosableMutex(),
Expand All @@ -273,6 +272,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
engine: engine,
vmConfig: vmConfig,
}
bc.flushInterval.Store(int64(cacheConfig.TrieTimeLimit))
bc.forker = NewForkChoice(bc, shouldPreserve)
bc.stateCache = state.NewDatabaseWithNodeDB(bc.db, bc.triedb)
bc.validator = NewBlockValidator(chainConfig, bc, engine)
Expand Down Expand Up @@ -916,7 +916,7 @@ func (bc *BlockChain) writeHeadBlock(block *types.Block) {
// This method has been exposed to allow tests to stop the blockchain while simulating
// a crash.
func (bc *BlockChain) stopWithoutSaving() {
if !atomic.CompareAndSwapInt32(&bc.running, 0, 1) {
if !bc.stopping.CompareAndSwap(false, true) {
return
}

Expand Down Expand Up @@ -998,12 +998,12 @@ func (bc *BlockChain) Stop() {
// errInsertionInterrupted as soon as possible. Insertion is permanently disabled after
// calling this method.
func (bc *BlockChain) StopInsert() {
atomic.StoreInt32(&bc.procInterrupt, 1)
bc.procInterrupt.Store(true)
}

// insertStopped returns true after StopInsert has been called.
func (bc *BlockChain) insertStopped() bool {
return atomic.LoadInt32(&bc.procInterrupt) == 1
return bc.procInterrupt.Load()
}

func (bc *BlockChain) procFutureBlocks() {
Expand Down Expand Up @@ -1382,7 +1382,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
}
// Find the next state trie we need to commit
chosen := current - TriesInMemory
flushInterval := time.Duration(atomic.LoadInt64(&bc.flushInterval))
flushInterval := time.Duration(bc.flushInterval.Load())
// If we exceeded time allowance, flush an entire trie to disk
if bc.gcproc > flushInterval {
// If the header is missing (canonical chain behind), we're reorging a low
Expand Down Expand Up @@ -1735,7 +1735,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool)

// If we have a followup block, run that against the current state to pre-cache
// transactions and probabilistically some of the account/storage trie nodes.
var followupInterrupt uint32
var followupInterrupt atomic.Bool
if !bc.cacheConfig.TrieCleanNoPrefetch {
if followup, err := it.peek(); followup != nil && err == nil {
throwaway, _ := state.New(parent.Root, bc.stateCache, bc.snaps)
Expand All @@ -1744,7 +1744,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool)
bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, &followupInterrupt)

blockPrefetchExecuteTimer.Update(time.Since(start))
if atomic.LoadUint32(&followupInterrupt) == 1 {
if followupInterrupt.Load() {
blockPrefetchInterruptMeter.Mark(1)
}
}(time.Now(), followup, throwaway)
Expand All @@ -1756,15 +1756,15 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool)
receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig)
if err != nil {
bc.reportBlock(block, receipts, err)
atomic.StoreUint32(&followupInterrupt, 1)
followupInterrupt.Store(true)
return it.index, err
}
ptime := time.Since(pstart)

vstart := time.Now()
if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil {
bc.reportBlock(block, receipts, err)
atomic.StoreUint32(&followupInterrupt, 1)
followupInterrupt.Store(true)
return it.index, err
}
vtime := time.Since(vstart)
Expand Down Expand Up @@ -1797,7 +1797,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool)
} else {
status, err = bc.writeBlockAndSetHead(block, receipts, logs, statedb, false)
}
atomic.StoreUint32(&followupInterrupt, 1)
followupInterrupt.Store(true)
if err != nil {
return it.index, err
}
Expand Down Expand Up @@ -2497,5 +2497,5 @@ func (bc *BlockChain) SetBlockValidatorAndProcessorForTesting(v Validator, p Pro
// The interval is in terms of block processing time, not wall clock.
// It is thread-safe and can be called repeatedly without side effects.
func (bc *BlockChain) SetTrieFlushInterval(interval time.Duration) {
atomic.StoreInt64(&bc.flushInterval, int64(interval))
bc.flushInterval.Store(int64(interval))
}
6 changes: 3 additions & 3 deletions core/chain_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type ChainIndexer struct {
backend ChainIndexerBackend // Background processor generating the index data content
children []*ChainIndexer // Child indexers to cascade chain updates to

active uint32 // Flag whether the event loop was started
active atomic.Bool // Flag whether the event loop was started
update chan struct{} // Notification channel that headers should be processed
quit chan chan error // Quit channel to tear down running goroutines
ctx context.Context
Expand Down Expand Up @@ -166,7 +166,7 @@ func (c *ChainIndexer) Close() error {
errs = append(errs, err)
}
// If needed, tear down the secondary event loop
if atomic.LoadUint32(&c.active) != 0 {
if c.active.Load() {
c.quit <- errc
if err := <-errc; err != nil {
errs = append(errs, err)
Expand Down Expand Up @@ -196,7 +196,7 @@ func (c *ChainIndexer) Close() error {
// queue.
func (c *ChainIndexer) eventLoop(currentHeader *types.Header, events chan ChainHeadEvent, sub event.Subscription) {
// Mark the chain indexer as active, requiring an additional teardown
atomic.StoreUint32(&c.active, 1)
c.active.Store(true)

defer sub.Unsubscribe()

Expand Down
4 changes: 2 additions & 2 deletions core/state_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func newStatePrefetcher(config *params.ChainConfig, bc *BlockChain, engine conse
// Prefetch processes the state changes according to the Ethereum rules by running
// the transaction messages using the statedb, but any changes are discarded. The
// only goal is to pre-cache transaction signatures and state trie nodes.
func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *uint32) {
func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *atomic.Bool) {
var (
header = block.Header()
gaspool = new(GasPool).AddGas(block.GasLimit())
Expand All @@ -59,7 +59,7 @@ func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, c
byzantium := p.config.IsByzantium(block.Number())
for i, tx := range block.Transactions() {
// If block precaching was interrupted, abort
if interrupt != nil && atomic.LoadUint32(interrupt) == 1 {
if interrupt != nil && interrupt.Load() {
return
}
// Convert the transaction into an executable message and pre-cache its sender
Expand Down
4 changes: 3 additions & 1 deletion core/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package core

import (
"sync/atomic"

"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
Expand All @@ -39,7 +41,7 @@ type Prefetcher interface {
// Prefetch processes the state changes according to the Ethereum rules by running
// the transaction messages using the statedb, but any changes are discarded. The
// only goal is to pre-cache transaction signatures and state trie nodes.
Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *uint32)
Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *atomic.Bool)
}

// Processor is an interface for processing blocks using a given initial state.
Expand Down