diff --git a/consensus/tendermint/adapter/store.go b/consensus/tendermint/adapter/store.go index 2bd6484dae8e..1c6610b09420 100644 --- a/consensus/tendermint/adapter/store.go +++ b/consensus/tendermint/adapter/store.go @@ -17,14 +17,14 @@ type Store struct { chain *core.BlockChain governance *gov.Governance verifyHeaderFunc func(chain consensus.ChainHeaderReader, header *types.Header, seal bool) error - makeBlock func() (block *types.FullBlock) + makeBlock func(parentHash common.Hash, timestamp uint64) (block *types.FullBlock) } func NewStore( chain *core.BlockChain, governance *gov.Governance, verifyHeaderFunc func(chain consensus.ChainHeaderReader, header *types.Header, seal bool) error, - makeBlock func() (block *types.FullBlock)) *Store { + makeBlock func(parentHash common.Hash, timestamp uint64) (block *types.FullBlock)) *Store { return &Store{chain: chain, governance: governance, verifyHeaderFunc: verifyHeaderFunc, makeBlock: makeBlock} } @@ -161,6 +161,6 @@ func updateState( }, nil } -func (s *Store) MakeBlock() *types.FullBlock { - return s.makeBlock() +func (s *Store) MakeBlock(parentHash common.Hash, timestamp uint64) *types.FullBlock { + return s.makeBlock(parentHash, timestamp) } diff --git a/consensus/tendermint/tendermint.go b/consensus/tendermint/tendermint.go index c555a1a69389..8a914364fda4 100644 --- a/consensus/tendermint/tendermint.go +++ b/consensus/tendermint/tendermint.go @@ -111,7 +111,7 @@ func (c *Tendermint) SetBlockChain(chain *core.BlockChain) { c.chain = chain } -func (c *Tendermint) Init(makeBlock func(chan *types.Block)) (err error) { +func (c *Tendermint) Init(makeBlock func(parent common.Hash, timestamp uint64) (*types.Block, error)) (err error) { chain := c.chain // Outbound gossip message queue sendC := make(chan pbftconsensus.Message, 1000) @@ -124,22 +124,22 @@ func (c *Tendermint) Init(makeBlock func(chan *types.Block)) (err error) { c.rootCtxCancel = rootCtxCancel c.rootCtx = rootCtx - makeFullBlock := func() *types.FullBlock { - resultCh := make(chan *types.Block, 1) - makeBlock(resultCh) - select { - case block := <-resultCh: - if block == nil { - return nil - } - parent := chain.GetHeaderByHash(block.ParentHash()) - if parent == nil { - return nil - } - return &types.FullBlock{Block: block, LastCommit: parent.Commit} - case <-rootCtx.Done(): + makeFullBlock := func(parentHash common.Hash, timestamp uint64) *types.FullBlock { + + block, err := makeBlock(parentHash, timestamp) + if err != nil { + log.Warn("makeBlock", "err", err) + return nil + } + if block == nil { + log.Warn("makeBlock returns nil block") return nil } + parentHeader := chain.GetHeaderByHash(block.ParentHash()) + if parentHeader == nil { + return nil + } + return &types.FullBlock{Block: block, LastCommit: parentHeader.Commit} } // datastore store := adapter.NewStore(chain, c.governance, c.VerifyHeader, makeFullBlock) @@ -403,12 +403,8 @@ func (c *Tendermint) FinalizeAndAssemble(chain consensus.ChainHeaderReader, head // Seal implements consensus.Engine, attempting to create a sealed block using // the local signing credentials. func (c *Tendermint) Seal(chain consensus.ChainHeaderReader, block *types.Block, resultCh chan<- *types.Block, stop <-chan struct{}) error { - // resultCh is generated by makeFullBlock - select { - case resultCh <- block: - case <-c.rootCtx.Done(): - } - return nil + + panic("should never be called") } // CalcDifficulty is the difficulty adjustment algorithm. It returns the difficulty diff --git a/miner/worker.go b/miner/worker.go index fc5028d1ea3e..7cb89f4ed51e 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -148,7 +148,6 @@ type task struct { state *state.StateDB block *types.Block createdAt time.Time - resultCh chan *types.Block } const ( @@ -162,7 +161,6 @@ type newWorkReq struct { interrupt *int32 noempty bool timestamp int64 - resultCh chan *types.Block } // getWorkReq represents a request for getting a new sealing work with provided parameters. @@ -195,7 +193,6 @@ type worker struct { txsCh chan core.NewTxsEvent txsSub event.Subscription chainHeadCh chan core.ChainHeadEvent - makeBlockCh chan chan *types.Block chainHeadSub event.Subscription chainSideCh chan core.ChainSideEvent chainSideSub event.Subscription @@ -266,7 +263,6 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus txsCh: make(chan core.NewTxsEvent, txChanSize), chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize), - makeBlockCh: make(chan chan *types.Block), newWorkCh: make(chan *newWorkReq), getWorkCh: make(chan *getWorkReq), taskCh: make(chan *task), @@ -413,15 +409,9 @@ func recalcRecommit(minRecommit, prev time.Duration, target float64, inc bool) t return time.Duration(int64(next)) } -func (w *worker) makeBlock(resultCh chan *types.Block) { +func (w *worker) makeBlock(parent common.Hash, timestamp uint64) (*types.Block, error) { - select { - case w.makeBlockCh <- resultCh: - case <-w.exitCh: - return - } - - return + return w.getSealingBlock(parent, timestamp, w.coinbase, common.Hash{}) } // newWorkLoop is a standalone goroutine to submit new sealing work upon received events. @@ -439,13 +429,13 @@ func (w *worker) newWorkLoop(recommit time.Duration) { tm, isTm := w.engine.(*tendermint.Tendermint) // commit aborts in-flight transaction execution with given signal and resubmits a new one. - commit := func(noempty bool, s int32, resultCh chan *types.Block) { + commit := func(noempty bool, s int32) { if interrupt != nil { atomic.StoreInt32(interrupt, s) } interrupt = new(int32) select { - case w.newWorkCh <- &newWorkReq{interrupt: interrupt, noempty: noempty, timestamp: timestamp, resultCh: resultCh}: + case w.newWorkCh <- &newWorkReq{interrupt: interrupt, noempty: noempty, timestamp: timestamp}: case <-w.exitCh: return } @@ -465,12 +455,6 @@ func (w *worker) newWorkLoop(recommit time.Duration) { for { select { - case resultCh := <-w.makeBlockCh: - if resultCh == nil { - continue - } - - commit(true, commitInterruptNewHead, resultCh) case <-w.startCh: clearPending(w.chain.CurrentBlock().NumberU64()) if isTm { @@ -482,7 +466,7 @@ func (w *worker) newWorkLoop(recommit time.Duration) { } timestamp = time.Now().Unix() - commit(false, commitInterruptNewHead, nil) + commit(false, commitInterruptNewHead) case head := <-w.chainHeadCh: @@ -491,7 +475,7 @@ func (w *worker) newWorkLoop(recommit time.Duration) { continue } timestamp = time.Now().Unix() - commit(false, commitInterruptNewHead, nil) + commit(false, commitInterruptNewHead) case <-timer.C: if isTm { @@ -505,7 +489,7 @@ func (w *worker) newWorkLoop(recommit time.Duration) { timer.Reset(recommit) continue } - commit(true, commitInterruptResubmit, nil) + commit(true, commitInterruptResubmit) } case interval := <-w.resubmitIntervalCh: @@ -570,7 +554,7 @@ func (w *worker) mainLoop() { for { select { case req := <-w.newWorkCh: - w.commitWork(req.interrupt, req.noempty, req.timestamp, req.resultCh) + w.commitWork(req.interrupt, req.noempty, req.timestamp) case req := <-w.getWorkCh: block, err := w.generateWork(req.params) @@ -648,7 +632,7 @@ func (w *worker) mainLoop() { // submit sealing work here since all empty submission will be rejected // by clique. Of course the advance sealing(empty submission) is disabled. if w.chainConfig.Clique != nil && w.chainConfig.Clique.Period == 0 { - w.commitWork(nil, true, time.Now().Unix(), nil) + w.commitWork(nil, true, time.Now().Unix()) } } atomic.AddInt32(&w.newTxs, int32(len(ev.Txs))) @@ -704,11 +688,7 @@ func (w *worker) taskLoop() { w.pendingTasks[sealHash] = task w.pendingMu.Unlock() - resultCh := w.resultCh - if task.resultCh != nil { - resultCh = task.resultCh - } - if err := w.engine.Seal(w.chain, task.block, resultCh, stopCh); err != nil { + if err := w.engine.Seal(w.chain, task.block, w.resultCh, stopCh); err != nil { log.Warn("Block sealing failed", "err", err) w.pendingMu.Lock() delete(w.pendingTasks, sealHash) @@ -1137,7 +1117,7 @@ func (w *worker) generateWork(params *generateParams) (*types.Block, error) { // commitWork generates several new sealing tasks based on the parent block // and submit them to the sealer. -func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64, resultCh chan *types.Block) { +func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) { start := time.Now() // Set the coinbase if the worker is running or it's required