Skip to content

Commit

Permalink
Merge pull request ethereum#30 from blockchaindevsh/tm_w3q
Browse files Browse the repository at this point in the history
refactor: reuse getSealingBlock
  • Loading branch information
qizhou authored Mar 6, 2022
2 parents 5c6dc6d + ad56212 commit 1f35a08
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 56 deletions.
8 changes: 4 additions & 4 deletions consensus/tendermint/adapter/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ type Store struct {
chain *core.BlockChain
governance *gov.Governance
verifyHeaderFunc func(chain consensus.ChainHeaderReader, header *types.Header, seal bool) error
makeBlock func() (block *types.FullBlock)
makeBlock func(parentHash common.Hash, timestamp uint64) (block *types.FullBlock)
}

func NewStore(
chain *core.BlockChain,
governance *gov.Governance,
verifyHeaderFunc func(chain consensus.ChainHeaderReader, header *types.Header, seal bool) error,
makeBlock func() (block *types.FullBlock)) *Store {
makeBlock func(parentHash common.Hash, timestamp uint64) (block *types.FullBlock)) *Store {
return &Store{chain: chain, governance: governance, verifyHeaderFunc: verifyHeaderFunc, makeBlock: makeBlock}
}

Expand Down Expand Up @@ -161,6 +161,6 @@ func updateState(
}, nil
}

func (s *Store) MakeBlock() *types.FullBlock {
return s.makeBlock()
func (s *Store) MakeBlock(parentHash common.Hash, timestamp uint64) *types.FullBlock {
return s.makeBlock(parentHash, timestamp)
}
38 changes: 17 additions & 21 deletions consensus/tendermint/tendermint.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (c *Tendermint) SetBlockChain(chain *core.BlockChain) {
c.chain = chain
}

func (c *Tendermint) Init(makeBlock func(chan *types.Block)) (err error) {
func (c *Tendermint) Init(makeBlock func(parent common.Hash, timestamp uint64) (*types.Block, error)) (err error) {
chain := c.chain
// Outbound gossip message queue
sendC := make(chan pbftconsensus.Message, 1000)
Expand All @@ -124,22 +124,22 @@ func (c *Tendermint) Init(makeBlock func(chan *types.Block)) (err error) {
c.rootCtxCancel = rootCtxCancel
c.rootCtx = rootCtx

makeFullBlock := func() *types.FullBlock {
resultCh := make(chan *types.Block, 1)
makeBlock(resultCh)
select {
case block := <-resultCh:
if block == nil {
return nil
}
parent := chain.GetHeaderByHash(block.ParentHash())
if parent == nil {
return nil
}
return &types.FullBlock{Block: block, LastCommit: parent.Commit}
case <-rootCtx.Done():
makeFullBlock := func(parentHash common.Hash, timestamp uint64) *types.FullBlock {

block, err := makeBlock(parentHash, timestamp)
if err != nil {
log.Warn("makeBlock", "err", err)
return nil
}
if block == nil {
log.Warn("makeBlock returns nil block")
return nil
}
parentHeader := chain.GetHeaderByHash(block.ParentHash())
if parentHeader == nil {
return nil
}
return &types.FullBlock{Block: block, LastCommit: parentHeader.Commit}
}
// datastore
store := adapter.NewStore(chain, c.governance, c.VerifyHeader, makeFullBlock)
Expand Down Expand Up @@ -403,12 +403,8 @@ func (c *Tendermint) FinalizeAndAssemble(chain consensus.ChainHeaderReader, head
// Seal implements consensus.Engine, attempting to create a sealed block using
// the local signing credentials.
func (c *Tendermint) Seal(chain consensus.ChainHeaderReader, block *types.Block, resultCh chan<- *types.Block, stop <-chan struct{}) error {
// resultCh is generated by makeFullBlock
select {
case resultCh <- block:
case <-c.rootCtx.Done():
}
return nil

panic("should never be called")
}

// CalcDifficulty is the difficulty adjustment algorithm. It returns the difficulty
Expand Down
42 changes: 11 additions & 31 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ type task struct {
state *state.StateDB
block *types.Block
createdAt time.Time
resultCh chan *types.Block
}

const (
Expand All @@ -162,7 +161,6 @@ type newWorkReq struct {
interrupt *int32
noempty bool
timestamp int64
resultCh chan *types.Block
}

// getWorkReq represents a request for getting a new sealing work with provided parameters.
Expand Down Expand Up @@ -195,7 +193,6 @@ type worker struct {
txsCh chan core.NewTxsEvent
txsSub event.Subscription
chainHeadCh chan core.ChainHeadEvent
makeBlockCh chan chan *types.Block
chainHeadSub event.Subscription
chainSideCh chan core.ChainSideEvent
chainSideSub event.Subscription
Expand Down Expand Up @@ -266,7 +263,6 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus
txsCh: make(chan core.NewTxsEvent, txChanSize),
chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize),
chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize),
makeBlockCh: make(chan chan *types.Block),
newWorkCh: make(chan *newWorkReq),
getWorkCh: make(chan *getWorkReq),
taskCh: make(chan *task),
Expand Down Expand Up @@ -413,15 +409,9 @@ func recalcRecommit(minRecommit, prev time.Duration, target float64, inc bool) t
return time.Duration(int64(next))
}

func (w *worker) makeBlock(resultCh chan *types.Block) {
func (w *worker) makeBlock(parent common.Hash, timestamp uint64) (*types.Block, error) {

select {
case w.makeBlockCh <- resultCh:
case <-w.exitCh:
return
}

return
return w.getSealingBlock(parent, timestamp, w.coinbase, common.Hash{})
}

// newWorkLoop is a standalone goroutine to submit new sealing work upon received events.
Expand All @@ -439,13 +429,13 @@ func (w *worker) newWorkLoop(recommit time.Duration) {

tm, isTm := w.engine.(*tendermint.Tendermint)
// commit aborts in-flight transaction execution with given signal and resubmits a new one.
commit := func(noempty bool, s int32, resultCh chan *types.Block) {
commit := func(noempty bool, s int32) {
if interrupt != nil {
atomic.StoreInt32(interrupt, s)
}
interrupt = new(int32)
select {
case w.newWorkCh <- &newWorkReq{interrupt: interrupt, noempty: noempty, timestamp: timestamp, resultCh: resultCh}:
case w.newWorkCh <- &newWorkReq{interrupt: interrupt, noempty: noempty, timestamp: timestamp}:
case <-w.exitCh:
return
}
Expand All @@ -465,12 +455,6 @@ func (w *worker) newWorkLoop(recommit time.Duration) {

for {
select {
case resultCh := <-w.makeBlockCh:
if resultCh == nil {
continue
}

commit(true, commitInterruptNewHead, resultCh)
case <-w.startCh:
clearPending(w.chain.CurrentBlock().NumberU64())
if isTm {
Expand All @@ -482,7 +466,7 @@ func (w *worker) newWorkLoop(recommit time.Duration) {
}

timestamp = time.Now().Unix()
commit(false, commitInterruptNewHead, nil)
commit(false, commitInterruptNewHead)

case head := <-w.chainHeadCh:

Expand All @@ -491,7 +475,7 @@ func (w *worker) newWorkLoop(recommit time.Duration) {
continue
}
timestamp = time.Now().Unix()
commit(false, commitInterruptNewHead, nil)
commit(false, commitInterruptNewHead)

case <-timer.C:
if isTm {
Expand All @@ -505,7 +489,7 @@ func (w *worker) newWorkLoop(recommit time.Duration) {
timer.Reset(recommit)
continue
}
commit(true, commitInterruptResubmit, nil)
commit(true, commitInterruptResubmit)
}

case interval := <-w.resubmitIntervalCh:
Expand Down Expand Up @@ -570,7 +554,7 @@ func (w *worker) mainLoop() {
for {
select {
case req := <-w.newWorkCh:
w.commitWork(req.interrupt, req.noempty, req.timestamp, req.resultCh)
w.commitWork(req.interrupt, req.noempty, req.timestamp)

case req := <-w.getWorkCh:
block, err := w.generateWork(req.params)
Expand Down Expand Up @@ -648,7 +632,7 @@ func (w *worker) mainLoop() {
// submit sealing work here since all empty submission will be rejected
// by clique. Of course the advance sealing(empty submission) is disabled.
if w.chainConfig.Clique != nil && w.chainConfig.Clique.Period == 0 {
w.commitWork(nil, true, time.Now().Unix(), nil)
w.commitWork(nil, true, time.Now().Unix())
}
}
atomic.AddInt32(&w.newTxs, int32(len(ev.Txs)))
Expand Down Expand Up @@ -704,11 +688,7 @@ func (w *worker) taskLoop() {
w.pendingTasks[sealHash] = task
w.pendingMu.Unlock()

resultCh := w.resultCh
if task.resultCh != nil {
resultCh = task.resultCh
}
if err := w.engine.Seal(w.chain, task.block, resultCh, stopCh); err != nil {
if err := w.engine.Seal(w.chain, task.block, w.resultCh, stopCh); err != nil {
log.Warn("Block sealing failed", "err", err)
w.pendingMu.Lock()
delete(w.pendingTasks, sealHash)
Expand Down Expand Up @@ -1137,7 +1117,7 @@ func (w *worker) generateWork(params *generateParams) (*types.Block, error) {

// commitWork generates several new sealing tasks based on the parent block
// and submit them to the sealer.
func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64, resultCh chan *types.Block) {
func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) {
start := time.Now()

// Set the coinbase if the worker is running or it's required
Expand Down

0 comments on commit 1f35a08

Please sign in to comment.