Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/XDC/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ var (
utils.CacheDatabaseFlag,
utils.CacheTrieFlag,
utils.CacheGCFlag,
utils.CacheNoPrefetchFlag,
//utils.TrieCacheGenFlag,
utils.CacheLogSizeFlag,
utils.FDLimitFlag,
Expand Down
15 changes: 11 additions & 4 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,11 @@ var (
Value: 25,
Category: flags.PerfCategory,
}
CacheNoPrefetchFlag = &cli.BoolFlag{
Name: "cache.noprefetch",
Usage: "Disable heuristic state prefetch during block import (less CPU and disk IO, more time waiting for data)",
Category: flags.PerfCategory,
}
CacheLogSizeFlag = &cli.IntFlag{
Name: "cache-blocklogs",
Aliases: []string{"cache.blocklogs"},
Expand Down Expand Up @@ -1474,6 +1479,7 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
Fatalf("--%s must be either 'full' or 'archive'", GCModeFlag.Name)
}
cfg.NoPruning = ctx.String(GCModeFlag.Name) == "archive"
cfg.NoPrefetch = ctx.Bool(CacheNoPrefetchFlag.Name)

if ctx.IsSet(CacheFlag.Name) || ctx.IsSet(CacheTrieFlag.Name) {
cfg.TrieCleanCache = ctx.Int(CacheFlag.Name) * ctx.Int(CacheTrieFlag.Name) / 100
Expand Down Expand Up @@ -1736,10 +1742,11 @@ func MakeChain(ctx *cli.Context, stack *node.Node, readonly bool) (chain *core.B
Fatalf("--%s must be either 'full' or 'archive'", GCModeFlag.Name)
}
cache := &core.CacheConfig{
Disabled: ctx.String(GCModeFlag.Name) == "archive",
TrieCleanLimit: ethconfig.Defaults.TrieCleanCache,
TrieDirtyLimit: ethconfig.Defaults.TrieDirtyCache,
TrieTimeLimit: ethconfig.Defaults.TrieTimeout,
TrieCleanLimit: ethconfig.Defaults.TrieCleanCache,
TrieCleanNoPrefetch: ctx.Bool(CacheNoPrefetchFlag.Name),
TrieDirtyLimit: ethconfig.Defaults.TrieDirtyCache,
TrieDirtyDisabled: ctx.String(GCModeFlag.Name) == "archive",
TrieTimeLimit: ethconfig.Defaults.TrieTimeout,
}
if ctx.IsSet(CacheFlag.Name) || ctx.IsSet(CacheTrieFlag.Name) {
cache.TrieCleanLimit = ctx.Int(CacheFlag.Name) * ctx.Int(CacheTrieFlag.Name) / 100
Expand Down
101 changes: 54 additions & 47 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ var (
blockReorgAddMeter = metrics.NewRegisteredMeter("chain/reorg/add", nil)
blockReorgDropMeter = metrics.NewRegisteredMeter("chain/reorg/drop", nil)

blockPrefetchExecuteTimer = metrics.NewRegisteredTimer("chain/prefetch/executes", nil)
blockPrefetchInterruptMeter = metrics.NewRegisteredMeter("chain/prefetch/interrupts", nil)

errInsertionInterrupted = errors.New("insertion is interrupted")
errChainStopped = errors.New("blockchain is stopped")
errInvalidOldChain = errors.New("invalid old chain")
Expand Down Expand Up @@ -127,10 +130,11 @@ const (
// CacheConfig contains the configuration values for the trie caching/pruning
// that's resident in a blockchain.
type CacheConfig struct {
Disabled bool // Whether to disable trie write caching (archive node)
TrieCleanLimit int // Memory allowance (MB) to use for caching trie nodes in memory
TrieDirtyLimit int // Memory limit (MB) at which to start flushing dirty trie nodes to disk
TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk
TrieCleanLimit int // Memory allowance (MB) to use for caching trie nodes in memory
TrieCleanNoPrefetch bool // Whether to disable heuristic state prefetching for followup blocks
TrieDirtyLimit int // Memory limit (MB) at which to start flushing dirty trie nodes to disk
TrieDirtyDisabled bool // Whether to disable trie write caching and GC altogether (archive node)
TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk
}

type ResultProcessBlock struct {
Expand Down Expand Up @@ -179,8 +183,6 @@ type BlockChain struct {
// Readers don't need to take it, they can just read the database.
chainmu *syncx.ClosableMutex

procmu sync.RWMutex // block processor lock

currentBlock atomic.Value // Current head of the block chain
currentFastBlock atomic.Value // Current head of the fast-sync chain (may be above the block chain!)

Expand All @@ -200,13 +202,14 @@ type BlockChain struct {

wg sync.WaitGroup
quit chan struct{} // shutdown signal, closed in Stop.
running int32 // 0 if chain is running, 1 when stopped
procInterrupt int32 // interrupt signaler for block processing
stopping atomic.Bool // false if chain is running, true when stopped
procInterrupt atomic.Bool // interrupt signaler for block processing

engine consensus.Engine
processor Processor // block processor interface
validator Validator // block and state validator interface
vmConfig vm.Config
engine consensus.Engine
validator Validator // Block and state validator interface
prefetcher Prefetcher // Block state prefetcher interface
processor Processor // Block transaction processor interface
vmConfig vm.Config

IPCEndpoint string
Client bind.ContractBackend // Global ipc client instance.
Expand Down Expand Up @@ -260,8 +263,9 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
rejectedLendingItem: lru.NewCache[common.Hash, interface{}](tradingstate.OrderCacheLimit),
finalizedTrade: lru.NewCache[common.Hash, interface{}](tradingstate.OrderCacheLimit),
}
bc.SetValidator(NewBlockValidator(chainConfig, bc, engine))
bc.SetProcessor(NewStateProcessor(chainConfig, bc, engine))
bc.validator = NewBlockValidator(chainConfig, bc, engine)
bc.prefetcher = newStatePrefetcher(chainConfig, bc, engine)
bc.processor = NewStateProcessor(chainConfig, bc, engine)

var err error
bc.hc, err = NewHeaderChain(db, chainConfig, engine, bc.insertStopped)
Expand Down Expand Up @@ -554,31 +558,13 @@ func (bc *BlockChain) CurrentFastBlock() *types.Block {
return bc.currentFastBlock.Load().(*types.Block)
}

// SetProcessor sets the processor required for making state modifications.
func (bc *BlockChain) SetProcessor(processor Processor) {
bc.procmu.Lock()
defer bc.procmu.Unlock()
bc.processor = processor
}

// SetValidator sets the validator which is used to validate incoming blocks.
func (bc *BlockChain) SetValidator(validator Validator) {
bc.procmu.Lock()
defer bc.procmu.Unlock()
bc.validator = validator
}

// Validator returns the current validator.
func (bc *BlockChain) Validator() Validator {
bc.procmu.RLock()
defer bc.procmu.RUnlock()
return bc.validator
}

// Processor returns the current processor.
func (bc *BlockChain) Processor() Processor {
bc.procmu.RLock()
defer bc.procmu.RUnlock()
return bc.processor
}

Expand Down Expand Up @@ -1045,7 +1031,7 @@ func (bc *BlockChain) saveData() {
// - HEAD: So we don't need to reprocess any blocks in the general case
// - HEAD-1: So we don't do large reorgs if our HEAD becomes an uncle
// - HEAD-127: So we have a hard limit on the number of blocks reexecuted
if !bc.cacheConfig.Disabled {
if !bc.cacheConfig.TrieDirtyDisabled {
var tradingTriedb *trie.Database
var lendingTriedb *trie.Database
engine, _ := bc.Engine().(*XDPoS.XDPoS)
Expand Down Expand Up @@ -1115,7 +1101,7 @@ func (bc *BlockChain) saveData() {
// Stop stops the blockchain service. If any imports are currently in progress
// it will abort them using the procInterrupt.
func (bc *BlockChain) Stop() {
if !atomic.CompareAndSwapInt32(&bc.running, 0, 1) {
if !bc.stopping.CompareAndSwap(false, true) {
return
}

Expand All @@ -1142,12 +1128,12 @@ func (bc *BlockChain) Stop() {
// errInsertionInterrupted as soon as possible. Insertion is permanently disabled after
// calling this method.
func (bc *BlockChain) StopInsert() {
atomic.StoreInt32(&bc.procInterrupt, 1)
bc.procInterrupt.Store(true)
}

// insertStopped returns true after StopInsert has been called.
func (bc *BlockChain) insertStopped() bool {
return atomic.LoadInt32(&bc.procInterrupt) == 1
return bc.procInterrupt.Load()
}

func (bc *BlockChain) procFutureBlocks() {
Expand Down Expand Up @@ -1258,7 +1244,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
for i, block := range blockChain {
receipts := receiptChain[i]
// Short circuit insertion if shutting down or processing failed
if atomic.LoadInt32(&bc.procInterrupt) == 1 {
if bc.insertStopped() {
return 0, nil
}
blockHash, blockNumber := block.Hash(), block.NumberU64()
Expand Down Expand Up @@ -1426,7 +1412,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
}

// If we're running an archive node, always flush
if bc.cacheConfig.Disabled {
if bc.cacheConfig.TrieDirtyDisabled {
if err := triedb.Commit(root, false); err != nil {
return NonStatTy, err
}
Expand Down Expand Up @@ -1666,7 +1652,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
defer close(abort)

// Peek the error for the first block to decide the directing import logic
it := newInsertIterator(chain, results, bc.Validator())
it := newInsertIterator(chain, results, bc.validator)

block, err := it.next()
switch {
Expand Down Expand Up @@ -1712,7 +1698,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
// No validation errors for the first block (or chain prefix skipped)
for ; block != nil && err == nil; block, err = it.next() {
// If the chain is terminating, stop processing blocks
if atomic.LoadInt32(&bc.procInterrupt) == 1 {
if bc.insertStopped() {
log.Debug("Premature abort during blocks processing")
break
}
Expand All @@ -1733,23 +1719,43 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
if err != nil {
return it.index, events, coalescedLogs, err
}

// If we have a followup block, run that against the current state to pre-cache
// transactions and probabilistically some of the account/storage trie nodes.
var followupInterrupt atomic.Bool
if !bc.cacheConfig.TrieCleanNoPrefetch {
if followup, err := it.peek(); followup != nil && err == nil {
go func(start time.Time) {
throwaway, _ := state.New(parent.Root, bc.stateCache)
bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, &followupInterrupt)

blockPrefetchExecuteTimer.Update(time.Since(start))
if followupInterrupt.Load() {
blockPrefetchInterruptMeter.Mark(1)
}
}(time.Now())
}
}

// Process block using the parent state as reference point.
t0 := time.Now()
isTIPXDCXReceiver := bc.Config().IsTIPXDCXReceiver(block.Number())
tradingState, lendingState, err := bc.processTradingAndLendingStates(isTIPXDCXReceiver, block, parent, statedb)
if err != nil {
bc.reportBlock(block, nil, err)
followupInterrupt.Store(true)
return it.index, events, coalescedLogs, err
}
feeCapacity := state.GetTRC21FeeCapacityFromStateWithCache(parent.Root, statedb)
receipts, logs, usedGas, err := bc.processor.Process(block, statedb, tradingState, bc.vmConfig, feeCapacity)
t1 := time.Now()
if err != nil {
bc.reportBlock(block, receipts, err)
followupInterrupt.Store(true)
return it.index, events, coalescedLogs, err
}
// Validate the state using the default validator
err = bc.Validator().ValidateState(block, statedb, receipts, usedGas)
err = bc.validator.ValidateState(block, statedb, receipts, usedGas)
if err != nil {
bc.reportBlock(block, receipts, err)
return it.index, events, coalescedLogs, err
Expand All @@ -1760,6 +1766,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
// Write the block to the chain and get the status.
status, err := bc.writeBlockWithState(block, receipts, statedb, tradingState, lendingState)
t3 := time.Now()
followupInterrupt.Store(true)
if err != nil {
return it.index, events, coalescedLogs, err
}
Expand Down Expand Up @@ -1948,8 +1955,8 @@ func (bc *BlockChain) insertSidechain(block *types.Block, it *insertIterator) (i
blocks, memory = blocks[:0], 0

// If the chain is terminating, stop processing blocks
if atomic.LoadInt32(&bc.procInterrupt) == 1 {
log.Debug("Premature abort during blocks processing")
if bc.insertStopped() {
log.Debug("Abort during blocks processing")
return 0, nil, nil, nil
}
}
Expand Down Expand Up @@ -2010,7 +2017,7 @@ func (bc *BlockChain) getResultBlock(block *types.Block, verifiedM2 bool) (*Resu
bc.calculatingBlock.Add(block.HashNoValidator(), calculatedBlock)
// Start the parallel header verifier
// If the chain is terminating, stop processing blocks
if atomic.LoadInt32(&bc.procInterrupt) == 1 {
if bc.insertStopped() {
log.Debug("Premature abort during blocks processing")
return nil, ErrBlacklistedHash
}
Expand All @@ -2021,7 +2028,7 @@ func (bc *BlockChain) getResultBlock(block *types.Block, verifiedM2 bool) (*Resu
}
// Wait for the block's verification to complete
bstart := time.Now()
err := bc.Validator().ValidateBody(block)
err := bc.validator.ValidateBody(block)
switch {
case err == ErrKnownBlock:
// Block and state both already known. However if the current block is below
Expand Down Expand Up @@ -2953,7 +2960,7 @@ func (bc *BlockChain) processTradingAndLendingStates(isValidBlockNumber bool, bl
}
for _, txMatchBatch := range txMatchBatchData {
log.Debug("Verify matching transaction", "txHash", txMatchBatch.TxHash.Hex())
err := bc.Validator().ValidateTradingOrder(statedb, tradingState, txMatchBatch, author, block.Header())
err := bc.validator.ValidateTradingOrder(statedb, tradingState, txMatchBatch, author, block.Header())
if err != nil {
return tradingState, lendingState, err
}
Expand All @@ -2964,7 +2971,7 @@ func (bc *BlockChain) processTradingAndLendingStates(isValidBlockNumber bool, bl
}
for _, batch := range batches {
log.Debug("Verify matching transaction", "txHash", batch.TxHash.Hex())
err := bc.Validator().ValidateLendingOrder(statedb, lendingState, tradingState, batch, author, block.Header())
err := bc.validator.ValidateLendingOrder(statedb, lendingState, tradingState, batch, author, block.Header())
if err != nil {
return tradingState, lendingState, err
}
Expand Down
45 changes: 38 additions & 7 deletions core/blockchain_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (st *insertStats) report(chain []*types.Block, index int, cache common.Stor
if timestamp := time.Unix(end.Time().Int64(), 0); time.Since(timestamp) > time.Minute {
context = append(context, []interface{}{"age", common.PrettyAge(timestamp)}...)
}
context = append(context, []interface{}{"cache", cache}...)
context = append(context, []interface{}{"dirty", cache}...)

if st.queued > 0 {
context = append(context, []interface{}{"queued", st.queued}...)
Expand All @@ -80,10 +80,13 @@ func (st *insertStats) report(chain []*types.Block, index int, cache common.Stor

// insertIterator is a helper to assist during chain import.
type insertIterator struct {
chain types.Blocks
results <-chan error
index int
validator Validator
chain types.Blocks // Chain of blocks being iterated over

results <-chan error // Verification result sink from the consensus engine
errors []error // Header verification errors for the blocks

index int // Current offset of the iterator
validator Validator // Validator to run if verification succeeds
}

// newInsertIterator creates a new iterator based on the given blocks, which are
Expand All @@ -92,6 +95,7 @@ func newInsertIterator(chain types.Blocks, results <-chan error, validator Valid
return &insertIterator{
chain: chain,
results: results,
errors: make([]error, 0, len(chain)),
index: -1,
validator: validator,
}
Expand All @@ -100,17 +104,44 @@ func newInsertIterator(chain types.Blocks, results <-chan error, validator Valid
// next returns the next block in the iterator, along with any potential validation
// error for that block. When the end is reached, it will return (nil, nil).
func (it *insertIterator) next() (*types.Block, error) {
// If we reached the end of the chain, abort
if it.index+1 >= len(it.chain) {
it.index = len(it.chain)
return nil, nil
}
// Advance the iterator and wait for verification result if not yet done
it.index++
if err := <-it.results; err != nil {
return it.chain[it.index], err
if len(it.errors) <= it.index {
it.errors = append(it.errors, <-it.results)
}
if it.errors[it.index] != nil {
return it.chain[it.index], it.errors[it.index]
}
// Block header valid, run body validation and return
return it.chain[it.index], it.validator.ValidateBody(it.chain[it.index])
}

// peek returns the next block in the iterator, along with any potential validation
// error for that block, but does **not** advance the iterator.
//
// Both header and body validation errors (nil too) is cached into the iterator
// to avoid duplicating work on the following next() call.
func (it *insertIterator) peek() (*types.Block, error) {
// If we reached the end of the chain, abort
if it.index+1 >= len(it.chain) {
return nil, nil
}
// Wait for verification result if not yet done
if len(it.errors) <= it.index+1 {
it.errors = append(it.errors, <-it.results)
}
if it.errors[it.index+1] != nil {
return it.chain[it.index+1], it.errors[it.index+1]
}
// Block header valid, ignore body validation since we don't have a parent anyway
return it.chain[it.index+1], nil
}

// previous returns the previous header that was being processed, or nil.
func (it *insertIterator) previous() *types.Header {
if it.index < 1 {
Expand Down
Loading