diff --git a/consensus/ibft/ibft.go b/consensus/ibft/ibft.go index 430b262c40..def8adc994 100644 --- a/consensus/ibft/ibft.go +++ b/consensus/ibft/ibft.go @@ -13,6 +13,7 @@ import ( "github.com/dogechain-lab/dogechain/consensus" "github.com/dogechain-lab/dogechain/consensus/ibft/currentstate" "github.com/dogechain-lab/dogechain/consensus/ibft/proto" + "github.com/dogechain-lab/dogechain/consensus/ibft/validator" "github.com/dogechain-lab/dogechain/contracts/systemcontracts" "github.com/dogechain-lab/dogechain/contracts/upgrader" "github.com/dogechain-lab/dogechain/contracts/validatorset" @@ -111,6 +112,10 @@ type Ibft struct { mechanisms []ConsensusMechanism // IBFT ConsensusMechanism used (PoA / PoS) blockTime time.Duration // Minimum block generation time in seconds + + // Dynamic References for signing and validating + currentTxSigner crypto.TxSigner // Tx Signer at current sequence + currentValidators validator.Validators // Validator set at current sequence } // runHook runs a specified hook if it is present in the hook map @@ -200,6 +205,11 @@ func (i *Ibft) Initialize() error { return err } + // set up current module cache + if err := i.updateCurrentModules(i.blockchain.Header().Number + 1); err != nil { + return err + } + return nil } @@ -329,6 +339,11 @@ func (i *Ibft) setupTransport() error { // Subscribe to the newly created topic err = topic.Subscribe(func(obj interface{}) { + if !i.isActiveValidator(i.validatorKeyAddr) { + // we're not active validator, don't ever care about any ibft messages + return + } + msg, ok := obj.(*proto.MessageReq) if !ok { i.logger.Error("invalid type assertion for message request") @@ -336,12 +351,6 @@ func (i *Ibft) setupTransport() error { return } - if !i.isSealing() { - // if we are not sealing we do not care about the messages - // but we need to subscribe to propagate the messages - return - } - // decode sender if err := validateMsg(msg); err != nil { i.logger.Error("failed to validate msg", "err", err) @@ -349,6 +358,12 @@ func (i *Ibft) setupTransport() error { return } + if !i.isActiveValidator(msg.FromAddr()) { + // TODO: punish bad node + // ignore message from non-validator + return + } + if msg.From == i.validatorKeyAddr.String() { // we are the sender, skip this message since we already // relay our own messages internally. @@ -484,10 +499,21 @@ func (i *Ibft) isValidSnapshot() bool { func (i *Ibft) runSyncState() { // updateSnapshotCallback keeps the snapshot store in sync with the updated // chain data, by calling the SyncStateHook - callInsertBlockHook := func(blockNumber uint64) { + callInsertBlockHook := func(block *types.Block) { + blockNumber := block.Number() + + // insert block if hookErr := i.runHook(InsertBlockHook, blockNumber, blockNumber); hookErr != nil { i.logger.Error(fmt.Sprintf("Unable to run hook %s, %v", InsertBlockHook, hookErr)) } + + // update module cache + if err := i.updateCurrentModules(blockNumber + 1); err != nil { + i.logger.Error("failed to update sub modules", "height", blockNumber+1, "err", err) + } + + // reset headers of txpool + i.txpool.ResetWithHeaders(block.Header) } // save current height to check whether new blocks are added or not during syncing @@ -519,8 +545,7 @@ func (i *Ibft) runSyncState() { } if err := i.syncer.BulkSyncWithPeer(p, func(newBlock *types.Block) { - callInsertBlockHook(newBlock.Number()) - i.txpool.ResetWithHeaders(newBlock.Header) + callInsertBlockHook(newBlock) }); err != nil { i.logger.Error("failed to bulk sync", "err", err) @@ -542,10 +567,9 @@ func (i *Ibft) runSyncState() { i.syncer.WatchSyncWithPeer(p, func(newBlock *types.Block) bool { // After each written block, update the snapshot store for PoS. // The snapshot store is currently updated for PoA inside the ProcessHeadersHook - callInsertBlockHook(newBlock.Number()) + callInsertBlockHook(newBlock) i.syncer.Broadcast(newBlock) - i.txpool.ResetWithHeaders(newBlock.Header) isValidator = i.isValidSnapshot() return isValidator @@ -805,6 +829,29 @@ func (i *Ibft) makeTransitionSlashTx( return tx, err } +func (i *Ibft) isActiveValidator(addr types.Address) bool { + return i.currentValidators.Includes(addr) +} + +// updateCurrentModules updates Txsigner and Validators +// that are used at specified height +func (i *Ibft) updateCurrentModules(height uint64) error { + snap, err := i.getSnapshot(height) + if err != nil { + return err + } + + i.currentValidators = snap.Set + i.currentTxSigner = i.getSigner(height) + + i.logger.Info("update current module", + "height", height, + "validators", i.currentValidators, + ) + + return nil +} + func (i *Ibft) getSigner(height uint64) crypto.TxSigner { return crypto.NewSigner( i.config.Params.Forks.At(height), @@ -934,6 +981,15 @@ func (i *Ibft) runAcceptState() { // start new round return } + // update current module cache + if err := i.updateCurrentModules(number); err != nil { + i.logger.Error( + "failed to update submodules", + "height", number, + "err", err, + ) + } + snap, err := i.getSnapshot(parent.Number) if err != nil { @@ -1268,6 +1324,7 @@ func (i *Ibft) updateMetrics(block *types.Block) { //Update the Number of transactions in the block metric i.metrics.NumTxs.Set(float64(len(block.Body().Transactions))) } + func (i *Ibft) insertBlock(block *types.Block) error { // Gather the committed seals for the block committedSeals := make([][]byte, 0)