diff --git a/core/block_validator.go b/core/block_validator.go index 1ce8cc70a7..2afe81ce0b 100644 --- a/core/block_validator.go +++ b/core/block_validator.go @@ -94,6 +94,13 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error { } return nil }, + func() error { + if v.remoteValidator != nil && !v.remoteValidator.AncestorVerified(block.Header()) { + return fmt.Errorf("%w, number: %s, hash: %s", ErrAncestorHasNotBeenVerified, block.Number(), block.Hash()) + } + + return nil + }, } validateRes := make(chan error, len(validateFuns)) for _, f := range validateFuns { diff --git a/core/blockchain.go b/core/blockchain.go index c1bfbff616..b63730a319 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2092,15 +2092,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er }() for ; block != nil && err == nil || err == ErrKnownBlock; block, err = it.next() { - if bc.validator.RemoteVerifyManager() != nil { - for !bc.Validator().RemoteVerifyManager().AncestorVerified(block.Header()) { - if bc.insertStopped() { - break - } - log.Info("block ancestor has not been verified", "number", block.Number(), "hash", block.Hash()) - time.Sleep(100 * time.Millisecond) - } - } // If the chain is terminating, stop processing blocks if bc.insertStopped() { log.Debug("Abort during block processing") diff --git a/core/error.go b/core/error.go index 75fa6a205e..8934ef719c 100644 --- a/core/error.go +++ b/core/error.go @@ -35,6 +35,9 @@ var ( // ErrDiffLayerNotFound is returned when diff layer not found. ErrDiffLayerNotFound = errors.New("diff layer not found") + // ErrDiffLayerNotFound is returned when block - 11 has not been verified by the remote verifier. + ErrAncestorHasNotBeenVerified = errors.New("block ancestor has not been verified") + // ErrCurrentBlockNotFound is returned when current block not found. ErrCurrentBlockNotFound = errors.New("current block not found") diff --git a/core/remote_state_verifier.go b/core/remote_state_verifier.go index ed768e0a94..c9e137f300 100644 --- a/core/remote_state_verifier.go +++ b/core/remote_state_verifier.go @@ -4,6 +4,7 @@ import ( "fmt" "math/big" "math/rand" + "sync" "time" lru "github.com/hashicorp/golang-lru" @@ -40,6 +41,7 @@ var ( type remoteVerifyManager struct { bc *BlockChain + taskLock sync.RWMutex tasks map[common.Hash]*verifyTask peers verifyPeers verifiedCache *lru.Cache @@ -109,6 +111,7 @@ func (vm *remoteVerifyManager) mainLoop() { vm.NewBlockVerifyTask(h.Block.Header()) case hash := <-vm.verifyCh: vm.cacheBlockVerified(hash) + vm.taskLock.Lock() if task, ok := vm.tasks[hash]; ok { delete(vm.tasks, hash) verifyTaskCounter.Dec(1) @@ -116,7 +119,9 @@ func (vm *remoteVerifyManager) mainLoop() { verifyTaskExecutionTimer.Update(time.Since(task.startAt)) close(task.terminalCh) } + vm.taskLock.Unlock() case <-pruneTicker.C: + vm.taskLock.Lock() for hash, task := range vm.tasks { if vm.bc.CurrentHeader().Number.Cmp(task.blockHeader.Number) == 1 && vm.bc.CurrentHeader().Number.Uint64()-task.blockHeader.Number.Uint64() > pruneHeightDiff { @@ -126,16 +131,21 @@ func (vm *remoteVerifyManager) mainLoop() { close(task.terminalCh) } } + vm.taskLock.Unlock() case message := <-vm.messageCh: + vm.taskLock.RLock() if vt, ok := vm.tasks[message.verifyResult.BlockHash]; ok { vt.messageCh <- message } + vm.taskLock.RUnlock() // System stopped case <-vm.bc.quit: + vm.taskLock.RLock() for _, task := range vm.tasks { close(task.terminalCh) } + vm.taskLock.RUnlock() return case <-vm.chainHeadSub.Err(): return @@ -156,7 +166,10 @@ func (vm *remoteVerifyManager) NewBlockVerifyTask(header *types.Header) { return } // if there already has a verify task for this block, skip. - if _, ok := vm.tasks[hash]; ok { + vm.taskLock.RLock() + _, ok := vm.tasks[hash] + vm.taskLock.RUnlock() + if ok { return } @@ -184,7 +197,9 @@ func (vm *remoteVerifyManager) NewBlockVerifyTask(header *types.Header) { return } verifyTask := NewVerifyTask(diffHash, header, vm.peers, vm.verifyCh, vm.allowInsecure) + vm.taskLock.Lock() vm.tasks[hash] = verifyTask + vm.taskLock.Unlock() verifyTaskCounter.Inc(1) }(header.Hash()) header = vm.bc.GetHeaderByHash(header.ParentHash) @@ -208,7 +223,16 @@ func (vm *remoteVerifyManager) AncestorVerified(header *types.Header) bool { } hash := header.Hash() - _, exist := vm.verifiedCache.Get(hash) + + // Check if the task is complete + vm.taskLock.RLock() + task, exist := vm.tasks[hash] + vm.taskLock.RUnlock() + if exist { + <-task.terminalCh + } + + _, exist = vm.verifiedCache.Get(hash) return exist } diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index e1225e7a1c..dcbeb1eea7 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -27,6 +27,7 @@ import ( "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/state/snapshot" "github.com/ethereum/go-ethereum/core/types" @@ -1812,6 +1813,9 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error { // of the blocks delivered from the downloader, and the indexing will be off. log.Debug("Downloaded item processing failed on sidechain import", "index", index, "err", err) } + if errors.Is(err, core.ErrAncestorHasNotBeenVerified) { + return err + } return fmt.Errorf("%w: %v", errInvalidChain, err) } return nil