Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ibft] Ignore ibft message when node is not a validator #231

Merged
merged 3 commits into from
Oct 31, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 68 additions & 11 deletions consensus/ibft/ibft.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/dogechain-lab/dogechain/consensus"
"github.com/dogechain-lab/dogechain/consensus/ibft/currentstate"
"github.com/dogechain-lab/dogechain/consensus/ibft/proto"
"github.com/dogechain-lab/dogechain/consensus/ibft/validator"
"github.com/dogechain-lab/dogechain/contracts/systemcontracts"
"github.com/dogechain-lab/dogechain/contracts/upgrader"
"github.com/dogechain-lab/dogechain/contracts/validatorset"
Expand Down Expand Up @@ -111,6 +112,10 @@ type Ibft struct {
mechanisms []ConsensusMechanism // IBFT ConsensusMechanism used (PoA / PoS)

blockTime time.Duration // Minimum block generation time in seconds

// Dynamic References for signing and validating
currentTxSigner crypto.TxSigner // Tx Signer at current sequence
currentValidators validator.Validators // Validator set at current sequence
}

// runHook runs a specified hook if it is present in the hook map
Expand Down Expand Up @@ -200,6 +205,11 @@ func (i *Ibft) Initialize() error {
return err
}

// set up current module cache
if err := i.updateCurrentModules(i.blockchain.Header().Number + 1); err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -329,26 +339,31 @@ func (i *Ibft) setupTransport() error {

// Subscribe to the newly created topic
err = topic.Subscribe(func(obj interface{}) {
if !i.isActiveValidator(i.validatorKeyAddr) {
// we're not active validator, don't ever care about any ibft messages
return
}

msg, ok := obj.(*proto.MessageReq)
if !ok {
i.logger.Error("invalid type assertion for message request")

return
}

if !i.isSealing() {
// if we are not sealing we do not care about the messages
// but we need to subscribe to propagate the messages
return
}

// decode sender
if err := validateMsg(msg); err != nil {
i.logger.Error("failed to validate msg", "err", err)

return
}

if !i.isActiveValidator(msg.FromAddr()) {
// TODO: punish bad node
// ignore message from non-validator
return
}

if msg.From == i.validatorKeyAddr.String() {
// we are the sender, skip this message since we already
// relay our own messages internally.
Expand Down Expand Up @@ -484,10 +499,21 @@ func (i *Ibft) isValidSnapshot() bool {
func (i *Ibft) runSyncState() {
// updateSnapshotCallback keeps the snapshot store in sync with the updated
// chain data, by calling the SyncStateHook
callInsertBlockHook := func(blockNumber uint64) {
callInsertBlockHook := func(block *types.Block) {
blockNumber := block.Number()

// insert block
if hookErr := i.runHook(InsertBlockHook, blockNumber, blockNumber); hookErr != nil {
i.logger.Error(fmt.Sprintf("Unable to run hook %s, %v", InsertBlockHook, hookErr))
}

// update module cache
if err := i.updateCurrentModules(blockNumber + 1); err != nil {
i.logger.Error("failed to update sub modules", "height", blockNumber+1, "err", err)
}

// reset headers of txpool
i.txpool.ResetWithHeaders(block.Header)
}

// save current height to check whether new blocks are added or not during syncing
Expand Down Expand Up @@ -519,8 +545,7 @@ func (i *Ibft) runSyncState() {
}

if err := i.syncer.BulkSyncWithPeer(p, func(newBlock *types.Block) {
callInsertBlockHook(newBlock.Number())
i.txpool.ResetWithHeaders(newBlock.Header)
callInsertBlockHook(newBlock)
}); err != nil {
i.logger.Error("failed to bulk sync", "err", err)

Expand All @@ -542,10 +567,9 @@ func (i *Ibft) runSyncState() {
i.syncer.WatchSyncWithPeer(p, func(newBlock *types.Block) bool {
// After each written block, update the snapshot store for PoS.
// The snapshot store is currently updated for PoA inside the ProcessHeadersHook
callInsertBlockHook(newBlock.Number())
callInsertBlockHook(newBlock)

i.syncer.Broadcast(newBlock)
i.txpool.ResetWithHeaders(newBlock.Header)
isValidator = i.isValidSnapshot()

return isValidator
Expand Down Expand Up @@ -805,6 +829,29 @@ func (i *Ibft) makeTransitionSlashTx(
return tx, err
}

func (i *Ibft) isActiveValidator(addr types.Address) bool {
return i.currentValidators.Includes(addr)
}

// updateCurrentModules updates Txsigner and Validators
// that are used at specified height
func (i *Ibft) updateCurrentModules(height uint64) error {
snap, err := i.getSnapshot(height)
if err != nil {
return err
}

i.currentValidators = snap.Set
i.currentTxSigner = i.getSigner(height)

i.logger.Info("update current module",
"height", height,
"validators", i.currentValidators,
)

return nil
}

func (i *Ibft) getSigner(height uint64) crypto.TxSigner {
return crypto.NewSigner(
i.config.Params.Forks.At(height),
Expand Down Expand Up @@ -934,6 +981,15 @@ func (i *Ibft) runAcceptState() { // start new round
return
}

// update current module cache
if err := i.updateCurrentModules(number); err != nil {
i.logger.Error(
"failed to update submodules",
"height", number,
"err", err,
)
}

snap, err := i.getSnapshot(parent.Number)

if err != nil {
Expand Down Expand Up @@ -1268,6 +1324,7 @@ func (i *Ibft) updateMetrics(block *types.Block) {
//Update the Number of transactions in the block metric
i.metrics.NumTxs.Set(float64(len(block.Body().Transactions)))
}

func (i *Ibft) insertBlock(block *types.Block) error {
// Gather the committed seals for the block
committedSeals := make([][]byte, 0)
Expand Down