diff --git a/core/block_validator.go b/core/block_validator.go index a1d66d72be..d713464836 100644 --- a/core/block_validator.go +++ b/core/block_validator.go @@ -31,29 +31,26 @@ import ( // // BlockValidator implements Validator. type BlockValidator struct { - config *params.ChainConfig // Chain configuration options - bc *BlockChain // Canonical block chain - engine consensus.Engine // Consensus engine used for validating - remoteValidator *VerifyManager + config *params.ChainConfig // Chain configuration options + bc *BlockChain // Canonical block chain + engine consensus.Engine // Consensus engine used for validating + remoteValidator *verifyManager } // NewBlockValidator returns a new block validator which is safe for re-use -func NewBlockValidator(config *params.ChainConfig, blockchain *BlockChain, engine consensus.Engine, mode *VerifyMode) *BlockValidator { +func NewBlockValidator(config *params.ChainConfig, blockchain *BlockChain, engine consensus.Engine, mode *VerifyMode, peers verifyPeers) *BlockValidator { validator := &BlockValidator{ config: config, engine: engine, bc: blockchain, } if mode.NeedRemoteVerify() { - validator.remoteValidator = NewVerifyManager(blockchain, *mode == InsecureVerify) + remoteValidator := NewVerifyManager(blockchain, peers, *mode == InsecureVerify) + go remoteValidator.mainLoop() } return validator } -func(v *BlockValidator) RemoteVerifyManager() *VerifyManager{ - return v.remoteValidator -} - // ValidateBody validates the given block's uncles and verifies the block // header's transaction and uncle roots. The headers are assumed to be already // validated at this point. @@ -101,6 +98,12 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error { return r } } + // for fast node which verify trie from remote verify peers, a block's H-11 ancestor should have been verify. + if v.remoteValidator != nil { + if !v.remoteValidator.AncestorVerified(v.bc.GetHeaderByNumber(header.Number.Uint64())) { + return fmt.Errorf("block's ancessor %x has not been verified", block.Hash()) + } + } return nil } @@ -157,35 +160,14 @@ func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateD return err } -func (v *BlockValidator) StartRemoteVerify(peers VerifyPeers) { - v.remoteValidator.peers = peers - if v.remoteValidator != nil { - go v.remoteValidator.verifyManagerLoop() - } -} - -func (v * BlockValidator) StopRemoteVerify() { - if v.remoteValidator != nil { - v.remoteValidator.Stop() - } -} - -func (v * BlockValidator) VerifyBlock(header *types.Header) { +func (v *BlockValidator) VerifyBlockTrie(header *types.Header) { if v.remoteValidator != nil { v.remoteValidator.newTaskCh <- header } } -// ValidateBlockVerify validate that weather the H-11 ancestor of the block has been verified by peers. -// If not, the blockchain should halt. -func (v * BlockValidator) ValidateBlockVerify(block *types.Block) error { - if v.remoteValidator != nil { - header := block.Header() - if !v.remoteValidator.AncestorVerified(v.bc.GetHeaderByNumber(header.Number.Uint64())) { - return fmt.Errorf("block's ancessor %x has not been verified", block.Hash()) - } - } - return nil +func (v *BlockValidator) RemoteVerifyManager() *verifyManager { + return v.remoteValidator } // CalcGasLimit computes the gas limit of the next block after parent. It aims diff --git a/core/blockchain.go b/core/blockchain.go index 414be33852..0095c6add6 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1190,7 +1190,6 @@ func (bc *BlockChain) Stop() { close(bc.quit) bc.StopInsert() bc.wg.Wait() - bc.validator.StopRemoteVerify() // Ensure that the entirety of the state snapshot is journalled to disk. var snapBase common.Hash @@ -2123,7 +2122,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er blockInsertTimer.UpdateSince(start) //Start a routine to verify this block. - bc.validator.VerifyBlock(block.Header()) + bc.validator.VerifyBlockTrie(block.Header()) switch status { case CanonStatTy: @@ -3023,9 +3022,9 @@ func EnablePersistDiff(limit uint64) BlockChainOption { } } -func EnableBlockValidator(chainConfig *params.ChainConfig, engine consensus.Engine, mode *VerifyMode) BlockChainOption { +func EnableBlockValidator(chainConfig *params.ChainConfig, engine consensus.Engine, mode *VerifyMode, peers verifyPeers) BlockChainOption { return func(bc *BlockChain) *BlockChain { - bc.validator = NewBlockValidator(chainConfig, bc, engine, mode) + bc.validator = NewBlockValidator(chainConfig, bc, engine, mode, peers) return bc } } diff --git a/core/blockchain_insert.go b/core/blockchain_insert.go index 3bd51825d2..cb8473c084 100644 --- a/core/blockchain_insert.go +++ b/core/blockchain_insert.go @@ -118,11 +118,7 @@ func (it *insertIterator) next() (*types.Block, error) { return it.chain[it.index], it.errors[it.index] } // Block header valid, run body validation and return - if err := it.validator.ValidateBody(it.chain[it.index]); err != nil { - return it.chain[it.index], err - } - // Block body valid, run remote verify and return - return it.chain[it.index], it.validator.ValidateBlockVerify(it.chain[it.index]) + return it.chain[it.index], it.validator.ValidateBody(it.chain[it.index]) } // peek returns the next block in the iterator, along with any potential validation diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index 66f6abbae5..6489a600fb 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -64,26 +64,6 @@ func DeleteCanonicalHash(db ethdb.KeyValueWriter, number uint64) { } } -func IsTrustBlock(db ethdb.Reader, hash common.Hash) bool { - data, _ := db.Get(trustBlockHashKey(hash)) - if len(data) == 0 { - return false - } - return bytes.Equal(data,[]byte{byteTrue}) -} - -func MarkTrustBlock(db ethdb.KeyValueWriter, hashkey common.Hash) { - if err := db.Put(trustBlockHashKey(hashkey),[]byte{byteTrue}); err != nil { - log.Crit("Failed to store trust block hash") - } -} - -func DeleteTrustBlockHash(db ethdb.KeyValueWriter, hash common.Hash) { - if err := db.Delete(trustBlockHashKey(hash)); err != nil { - log.Crit("Failed to delete trust block hash") - } -} - // ReadAllHashes retrieves all the hashes assigned to blocks at a certain heights, // both canonical and reorged forks included. func ReadAllHashes(db ethdb.Iteratee, number uint64) []common.Hash { diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index 5c9950866a..42ae6434ce 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -121,8 +121,6 @@ const ( // freezerDifficultyTable indicates the name of the freezer total difficulty table. freezerDifficultyTable = "diffs" - // - byteTrue = 0x01 ) // FreezerNoSnappy configures whether compression is disabled for the ancient-tables. @@ -169,6 +167,7 @@ func headerTDKey(number uint64, hash common.Hash) []byte { func headerHashKey(number uint64) []byte { return append(append(headerPrefix, encodeBlockNumber(number)...), headerHashSuffix...) } + // trustBlockHashKey = trustBlockPrefix + hash func trustBlockHashKey(hash common.Hash) []byte { return append(append(trustBlockPrefix, hash.Bytes()...)) diff --git a/core/trie_verify.go b/core/trie_verify.go index 1c3f96678d..ff7fc84eca 100644 --- a/core/trie_verify.go +++ b/core/trie_verify.go @@ -8,71 +8,75 @@ import ( lru "github.com/hashicorp/golang-lru" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" ) const ( verifiedCacheSize = 256 maxForkHeight = 11 - resendInterval = 2 * time.Second + // defaultPeerNumber is default number of verify peers defaultPeerNumber = 3 + // pruneHeightDiff indicates that if the height difference between current block and task's + // corresponding block is larger than it, the task should be pruned. + pruneHeightDiff = 15 + pruneInterval = 5 * time.Second + resendInterval = 2 * time.Second // tryAllPeersTime is the time that a block has not been verified and then try all the valid verify peers. tryAllPeersTime = 15 * time.Second ) -type VerifyManager struct { - bc *BlockChain - tasks map[common.Hash]*VerifyTask - peers VerifyPeers - verifiedCache *lru.Cache - allowUntrustedVerify bool - newTaskCh chan *types.Header - verifyCh chan common.Hash - messageCh chan VerifyMessage - exitCh chan struct{} +type verifyManager struct { + bc *BlockChain + tasks map[common.Hash]*verifyTask + peers verifyPeers + verifiedCache *lru.Cache + allowUntrusted bool + + newTaskCh chan *types.Header + verifyCh chan common.Hash + messageCh chan verifyMessage } -func NewVerifyManager(blockchain *BlockChain, allowUntrustedVerify bool) *VerifyManager { +func NewVerifyManager(blockchain *BlockChain, peers verifyPeers, allowUntrusted bool) *verifyManager { verifiedCache, _ := lru.New(verifiedCacheSize) - vm := &VerifyManager{ - bc: blockchain, - tasks: make(map[common.Hash]*VerifyTask), - verifiedCache: verifiedCache, - newTaskCh: make(chan *types.Header), - verifyCh: make(chan common.Hash), - messageCh: make(chan VerifyMessage), - exitCh: make(chan struct{}), - allowUntrustedVerify: allowUntrustedVerify, + vm := &verifyManager{ + bc: blockchain, + tasks: make(map[common.Hash]*verifyTask), + peers: peers, + verifiedCache: verifiedCache, + allowUntrusted: allowUntrusted, + + newTaskCh: make(chan *types.Header), + verifyCh: make(chan common.Hash), + messageCh: make(chan verifyMessage), } return vm } -func (vm *VerifyManager) verifyManagerLoop() { +func (vm *verifyManager) mainLoop() { // read disk store to initial verified cache // load unverified blocks in a normalized chain and start a batch of verify task header := vm.bc.CurrentHeader() // Start verify task from H to H-11 if need. vm.NewBlockVerifyTask(header) - prune := time.NewTicker(time.Second) - defer prune.Stop() + pruneTicker := time.NewTicker(pruneInterval) + defer pruneTicker.Stop() for { select { case h := <-vm.newTaskCh: vm.NewBlockVerifyTask(h) case hash := <-vm.verifyCh: vm.cacheBlockVerified(hash) - rawdb.MarkTrustBlock(vm.bc.db, hash) if task, ok := vm.tasks[hash]; ok { delete(vm.tasks, hash) close(task.terminalCh) } - case <-prune.C: + case <-pruneTicker.C: for hash, task := range vm.tasks { - if vm.bc.CurrentHeader().Number.Uint64()-task.blockHeader.Number.Uint64() > 15 { + if vm.bc.CurrentHeader().Number.Cmp(task.blockHeader.Number) == 1 && + vm.bc.CurrentHeader().Number.Uint64()-task.blockHeader.Number.Uint64() > pruneHeightDiff { delete(vm.tasks, hash) close(task.terminalCh) } @@ -81,23 +85,18 @@ func (vm *VerifyManager) verifyManagerLoop() { if vt, ok := vm.tasks[message.verifyResult.BlockHash]; ok { vt.messageCh <- message } - case <-vm.exitCh: + case <-vm.bc.quit: + for _, task := range vm.tasks { + close(task.terminalCh) + } return } } } -func (vm *VerifyManager) Stop() { - // stop all the tasks - for _, task := range vm.tasks { - close(task.terminalCh) - } - close(vm.exitCh) -} - -func (vm *VerifyManager) NewBlockVerifyTask(header *types.Header) { +func (vm *verifyManager) NewBlockVerifyTask(header *types.Header) { for i := 0; header != nil && i <= maxForkHeight; i++ { - func(hash common.Hash){ + func(hash common.Hash) { // if verified cache record that this block has been verified, skip. if _, ok := vm.verifiedCache.Get(hash); ok { return @@ -106,11 +105,6 @@ func (vm *VerifyManager) NewBlockVerifyTask(header *types.Header) { if _, ok := vm.tasks[hash]; ok { return } - // if verified storage record that this block has been verified, skip. - if rawdb.IsTrustBlock(vm.bc.db, hash) { - vm.cacheBlockVerified(hash) - return - } diffLayer := vm.bc.GetTrustedDiffLayer(hash) // if this block has no diff, there is no need to verify it. var err error @@ -125,14 +119,14 @@ func (vm *VerifyManager) NewBlockVerifyTask(header *types.Header) { log.Error("failed to get diff hash", "block", hash, "number", header.Number, "error", err) return } - verifyTask := NewVerifyTask(diffHash, header, vm.peers, vm.bc.db, vm.verifyCh, vm.allowUntrustedVerify) + verifyTask := NewVerifyTask(diffHash, header, vm.peers, vm.verifyCh, vm.allowUntrusted) vm.tasks[hash] = verifyTask }(header.Hash()) header = vm.bc.GetHeaderByHash(header.ParentHash) } } -func (vm *VerifyManager) cacheBlockVerified(hash common.Hash) { +func (vm *verifyManager) cacheBlockVerified(hash common.Hash) { if vm.verifiedCache.Len() >= verifiedCacheSize { vm.verifiedCache.RemoveOldest() } @@ -140,7 +134,7 @@ func (vm *VerifyManager) cacheBlockVerified(hash common.Hash) { } // AncestorVerified function check block has been verified or it's a empty block. -func (vm *VerifyManager) AncestorVerified(header *types.Header) bool { +func (vm *verifyManager) AncestorVerified(header *types.Header) bool { // find header of H-11 block. header = vm.bc.GetHeaderByNumber(header.Number.Uint64() - maxForkHeight) // If start from genesis block, there has not a H-11 block. @@ -155,14 +149,12 @@ func (vm *VerifyManager) AncestorVerified(header *types.Header) bool { } } hash := header.Hash() - if _, ok := vm.verifiedCache.Get(hash); ok { - return true - } - return rawdb.IsTrustBlock(vm.bc.db, hash) + _, exist := vm.verifiedCache.Get(hash) + return exist } -func (vm *VerifyManager) HandleRootResponse(vr *VerifyResult, pid string) error { - vm.messageCh <- VerifyMessage{verifyResult: vr, peerId: pid} +func (vm *verifyManager) HandleRootResponse(vr *VerifyResult, pid string) error { + vm.messageCh <- verifyMessage{verifyResult: vr, peerId: pid} return nil } @@ -173,43 +165,41 @@ type VerifyResult struct { Root common.Hash } -type VerifyMessage struct { +type verifyMessage struct { verifyResult *VerifyResult peerId string } -type VerifyTask struct { - diffhash common.Hash - blockHeader *types.Header - candidatePeers VerifyPeers - BadPeers map[string]struct{} - startAt time.Time - db ethdb.Database - allowUntrustedVerify bool +type verifyTask struct { + diffhash common.Hash + blockHeader *types.Header + candidatePeers verifyPeers + BadPeers map[string]struct{} + startAt time.Time + allowUntrusted bool - messageCh chan VerifyMessage + messageCh chan verifyMessage terminalCh chan struct{} } -func NewVerifyTask(diffhash common.Hash, header *types.Header, peers VerifyPeers, db ethdb.Database, verifyCh chan common.Hash, allowUntrustedVerify bool) *VerifyTask { - vt := &VerifyTask{ - diffhash: diffhash, - blockHeader: header, - candidatePeers: peers, - BadPeers: make(map[string]struct{}), - db: db, - allowUntrustedVerify: allowUntrustedVerify, - messageCh: make(chan VerifyMessage), - terminalCh: make(chan struct{}), +func NewVerifyTask(diffhash common.Hash, header *types.Header, peers verifyPeers, verifyCh chan common.Hash, allowUntrusted bool) *verifyTask { + vt := &verifyTask{ + diffhash: diffhash, + blockHeader: header, + candidatePeers: peers, + BadPeers: make(map[string]struct{}), + allowUntrusted: allowUntrusted, + messageCh: make(chan verifyMessage), + terminalCh: make(chan struct{}), } go vt.Start(verifyCh) return vt } -func (vt *VerifyTask) Start(verifyCh chan common.Hash) { +func (vt *verifyTask) Start(verifyCh chan common.Hash) { vt.startAt = time.Now() - vt.selectPeersToVerify(defaultPeerNumber) + vt.sendVerifyRequest(defaultPeerNumber) resend := time.NewTicker(resendInterval) defer resend.Stop() for { @@ -220,7 +210,7 @@ func (vt *VerifyTask) Start(verifyCh chan common.Hash) { vt.compareRootHashAndWrite(msg, verifyCh) case types.StatusUntrustedVerified: log.Warn("block %s , num= %s is untrusted verified", msg.verifyResult.BlockHash, msg.verifyResult.BlockNumber) - if vt.allowUntrustedVerify { + if vt.allowUntrusted { vt.compareRootHashAndWrite(msg, verifyCh) } case types.StatusDiffHashMismatch, types.StatusImpossibleFork, types.StatusUnexpectedError: @@ -232,9 +222,9 @@ func (vt *VerifyTask) Start(verifyCh chan common.Hash) { case <-resend.C: // if a task has run over 15s, try all the vaild peers to verify. if time.Since(vt.startAt) < tryAllPeersTime { - vt.selectPeersToVerify(1) + vt.sendVerifyRequest(1) } else { - vt.selectPeersToVerify(-1) + vt.sendVerifyRequest(-1) } case <-vt.terminalCh: return @@ -242,9 +232,9 @@ func (vt *VerifyTask) Start(verifyCh chan common.Hash) { } } -// selectPeersAndVerify func select at most n peers from (candidatePeers-badPeers) randomly and send verify request. +// sendVerifyRequest func select at most n peers from (candidatePeers-badPeers) randomly and send verify request. // when n<0, send to all the peers exclude badPeers. -func (vt *VerifyTask) selectPeersToVerify(n int) { +func (vt *verifyTask) sendVerifyRequest(n int) { var validPeers []VerifyPeer candidatePeers := vt.candidatePeers.GetVerifyPeers() for _, p := range candidatePeers { @@ -252,7 +242,10 @@ func (vt *VerifyTask) selectPeersToVerify(n int) { validPeers = append(validPeers, p) } } - // if + // if has not valid peer, log warning. + if len(validPeers) == 0 { + log.Warn("there is no valid peer for block", vt.blockHeader.Number) + } if n < 0 || n >= len(validPeers) { for _, p := range validPeers { p.RequestRoot(vt.blockHeader.Number.Uint64(), vt.blockHeader.Hash(), vt.diffhash) @@ -269,10 +262,9 @@ func (vt *VerifyTask) selectPeersToVerify(n int) { } } -func (vt *VerifyTask) compareRootHashAndWrite(msg VerifyMessage, verifyCh chan common.Hash) { +func (vt *verifyTask) compareRootHashAndWrite(msg verifyMessage, verifyCh chan common.Hash) { if msg.verifyResult.Root == vt.blockHeader.Root { blockhash := msg.verifyResult.BlockHash - rawdb.MarkTrustBlock(vt.db, blockhash) // write back to manager so that manager can cache the result and delete this task. verifyCh <- blockhash } else { @@ -285,14 +277,14 @@ type VerifyPeer interface { ID() string } -type VerifyPeers interface { +type verifyPeers interface { GetVerifyPeers() []VerifyPeer } type VerifyMode uint32 const ( - LocalVerify VerifyMode = iota // + LocalVerify VerifyMode = iota FullVerify InsecureVerify NoneVerify diff --git a/core/types.go b/core/types.go index fb4b215926..deb4f580c7 100644 --- a/core/types.go +++ b/core/types.go @@ -32,17 +32,10 @@ type Validator interface { // ValidateState validates the given statedb and optionally the receipts and // gas used. ValidateState(block *types.Block, state *state.StateDB, receipts types.Receipts, usedGas uint64) error - - // StartRemoteVerify start the remote verify routine for given peers. - StartRemoteVerify(peers VerifyPeers) - // StopRemoteVerify stop the remote verify routine. - StopRemoteVerify() - // VerifyBlock verify the given blcok. - VerifyBlock(header *types.Header) - // ValidateBlockVerify validate the given block has been verified. - ValidateBlockVerify(block *types.Block) error + // VerifyBlockTrie verify the given block's trie from remote verify peers. + VerifyBlockTrie(header *types.Header) // RemoteVerifyManager return remoteVerifyManager of validator. - RemoteVerifyManager() *VerifyManager + RemoteVerifyManager() *verifyManager } // Prefetcher is an interface for pre-caching transaction signatures and state. diff --git a/eth/backend.go b/eth/backend.go index 7b00e43448..90b1ef2ec8 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -208,11 +208,12 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { if config.DiffSync && config.TriesVerifyMode == core.LocalVerify { bcOps = append(bcOps, core.EnableLightProcessor) } + peers := newPeerSet() if config.PersistDiff { bcOps = append(bcOps, core.EnablePersistDiff(config.DiffBlock)) } - bcOps = append(bcOps, core.EnableBlockValidator(chainConfig, eth.engine, &config.TriesVerifyMode)) + bcOps = append(bcOps, core.EnableBlockValidator(chainConfig, eth.engine, &config.TriesVerifyMode, peers)) eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, chainConfig, eth.engine, vmConfig, eth.shouldPreserve, &config.TxLookupLimit, bcOps...) if err != nil { return nil, err @@ -250,12 +251,11 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { DirectBroadcast: config.DirectBroadcast, DiffSync: config.DiffSync, DisablePeerTxBroadcast: config.DisablePeerTxBroadcast, + PeerSet: peers, }); err != nil { return nil, err } - eth.blockchain.Validator().StartRemoteVerify(eth.handler.peers) - eth.miner = miner.New(eth, &config.Miner, chainConfig, eth.EventMux(), eth.engine, eth.isLocalBlock) eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData)) diff --git a/eth/handler.go b/eth/handler.go index e47d3eee8d..f9854766c4 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -96,6 +96,7 @@ type handlerConfig struct { Whitelist map[uint64]common.Hash // Hard coded whitelist for sync challenged DirectBroadcast bool DisablePeerTxBroadcast bool + PeerSet *peerSet } type handler struct { @@ -155,7 +156,7 @@ func newHandler(config *handlerConfig) (*handler, error) { database: config.Database, txpool: config.TxPool, chain: config.Chain, - peers: newPeerSet(), + peers: config.PeerSet, whitelist: config.Whitelist, directBroadcast: config.DirectBroadcast, diffSync: config.DiffSync,