diff --git a/consensus/XDPoS/engines/engine_v2/engine.go b/consensus/XDPoS/engines/engine_v2/engine.go index bd905e6b9e0a..1c2c6cba7d28 100644 --- a/consensus/XDPoS/engines/engine_v2/engine.go +++ b/consensus/XDPoS/engines/engine_v2/engine.go @@ -59,16 +59,16 @@ type XDPoS_v2 struct { } func New(config *params.XDPoSConfig, db ethdb.Database, waitPeriodCh chan int) *XDPoS_v2 { - // Setup Timer + // Setup timeoutTimer duration := time.Duration(config.V2.TimeoutPeriod) * time.Second - timer := countdown.NewCountDown(duration) - timeoutPool := utils.NewPool(config.V2.CertThreshold) + timeoutTimer := countdown.NewCountDown(duration) snapshots, _ := lru.NewARC(utils.InmemorySnapshots) signatures, _ := lru.NewARC(utils.InmemorySnapshots) epochSwitches, _ := lru.NewARC(int(utils.InmemoryEpochs)) verifiedHeaders, _ := lru.NewARC(utils.InmemorySnapshots) + timeoutPool := utils.NewPool(config.V2.CertThreshold) votePool := utils.NewPool(config.V2.CertThreshold) engine := &XDPoS_v2{ config: config, @@ -80,7 +80,7 @@ func New(config *params.XDPoSConfig, db ethdb.Database, waitPeriodCh chan int) * verifiedHeaders: verifiedHeaders, snapshots: snapshots, epochSwitches: epochSwitches, - timeoutWorker: timer, + timeoutWorker: timeoutTimer, BroadcastCh: make(chan interface{}), waitPeriodCh: waitPeriodCh, @@ -104,8 +104,9 @@ func New(config *params.XDPoSConfig, db ethdb.Database, waitPeriodCh chan int) * highestCommitBlock: nil, } // Add callback to the timer - timer.OnTimeoutFn = engine.OnCountdownTimeout + timeoutTimer.OnTimeoutFn = engine.OnCountdownTimeout + engine.periodicJob() return engine } @@ -1042,3 +1043,14 @@ func (x *XDPoS_v2) allowedToSend(chain consensus.ChainReader, blockHeader *types } return nil } + +// Periodlly execution(Attached to engine initialisation during "new"). Used for pool cleaning etc +func (x *XDPoS_v2) periodicJob() { + go func() { + for { + <-time.After(utils.PeriodicJobPeriod * time.Second) + x.hygieneVotePool() + x.hygieneTimeoutPool() + } + }() +} diff --git a/consensus/XDPoS/engines/engine_v2/testing_utils.go b/consensus/XDPoS/engines/engine_v2/testing_utils.go index f3edef19b178..148652a1eb7f 100644 --- a/consensus/XDPoS/engines/engine_v2/testing_utils.go +++ b/consensus/XDPoS/engines/engine_v2/testing_utils.go @@ -57,3 +57,19 @@ func (x *XDPoS_v2) SetPropertiesFaker(highestQC *utils.QuorumCert, highestTC *ut x.highestQuorumCert = highestQC x.highestTimeoutCert = highestTC } + +func (x *XDPoS_v2) HygieneVotePoolFaker() { + x.hygieneVotePool() +} + +func (x *XDPoS_v2) GetVotePoolKeyListFaker() []string { + return x.votePool.PoolObjKeysList() +} + +func (x *XDPoS_v2) HygieneTimeoutPoolFaker() { + x.hygieneTimeoutPool() +} + +func (x *XDPoS_v2) GetTimeoutPoolKeyListFaker() []string { + return x.timeoutPool.PoolObjKeysList() +} diff --git a/consensus/XDPoS/engines/engine_v2/timeout.go b/consensus/XDPoS/engines/engine_v2/timeout.go index bf5c7c2cd63b..1327400b2964 100644 --- a/consensus/XDPoS/engines/engine_v2/timeout.go +++ b/consensus/XDPoS/engines/engine_v2/timeout.go @@ -2,6 +2,8 @@ package engine_v2 import ( "fmt" + "strconv" + "strings" "sync" "time" @@ -227,3 +229,24 @@ func (x *XDPoS_v2) OnCountdownTimeout(time time.Time, chain interface{}) error { return nil } + +func (x *XDPoS_v2) hygieneTimeoutPool() { + x.lock.RLock() + currentRound := x.currentRound + x.lock.RUnlock() + timeoutPoolKeys := x.timeoutPool.PoolObjKeysList() + + // Extract round number + for _, k := range timeoutPoolKeys { + keyedRound, err := strconv.ParseInt(strings.Split(k, ":")[0], 10, 64) + if err != nil { + log.Error("[hygieneTimeoutPool] Error while trying to get keyedRound inside pool", "Error", err) + continue + } + // Clean up any timeouts round that is 10 rounds older + if keyedRound < int64(currentRound)-utils.PoolHygieneRound { + log.Debug("[hygieneTimeoutPool] Cleaned timeout pool at round", "Round", keyedRound, "CurrentRound", currentRound, "Key", k) + x.timeoutPool.ClearByPoolKey(k) + } + } +} diff --git a/consensus/XDPoS/engines/engine_v2/vote.go b/consensus/XDPoS/engines/engine_v2/vote.go index 084e95f1a547..cfce42ad33d5 100644 --- a/consensus/XDPoS/engines/engine_v2/vote.go +++ b/consensus/XDPoS/engines/engine_v2/vote.go @@ -3,6 +3,8 @@ package engine_v2 import ( "fmt" "math/big" + "strconv" + "strings" "sync" "github.com/XinFinOrg/XDPoSChain/common" @@ -77,22 +79,8 @@ func (x *XDPoS_v2) voteHandler(chain consensus.ChainReader, voteMsg *utils.Vote) err := x.VerifyBlockInfo(chain, voteMsg.ProposedBlockInfo) if err != nil { - x.votePool.ClearPoolKeyByObj(voteMsg) return err } - // verify vote.GapNumber - epochSwitchInfo, err := x.getEpochSwitchInfo(chain, nil, voteMsg.ProposedBlockInfo.Hash) - if err != nil { - log.Error("getEpochSwitchInfo when handle Vote", "BlockInfoHash", voteMsg.ProposedBlockInfo.Hash, "Error", err) - return err - } - epochSwitchNumber := epochSwitchInfo.EpochSwitchBlockInfo.Number.Uint64() - gapNumber := epochSwitchNumber - epochSwitchNumber%x.config.Epoch - x.config.Gap - if gapNumber != voteMsg.GapNumber { - log.Error("[voteHandler] gap number mismatch", "BlockInfoHash", voteMsg.ProposedBlockInfo.Hash, "Gap", voteMsg.GapNumber, "GapShouldBe", gapNumber) - return fmt.Errorf("gap number mismatch %v", voteMsg) - } - err = x.onVotePoolThresholdReached(chain, pooledVotes, voteMsg, proposedBlockHeader) if err != nil { return err @@ -157,8 +145,6 @@ func (x *XDPoS_v2) onVotePoolThresholdReached(chain consensus.ChainReader, poole return err } log.Info("Successfully processed the vote and produced QC!", "QcRound", quorumCert.ProposedBlockInfo.Round, "QcNumOfSig", len(quorumCert.Signatures), "QcHash", quorumCert.ProposedBlockInfo.Hash, "QcNumber", quorumCert.ProposedBlockInfo.Number.Uint64()) - // clean up vote at the same poolKey. and pookKey is proposed block hash - x.votePool.ClearPoolKeyByObj(currentVoteMsg) return nil } @@ -216,3 +202,24 @@ func (x *XDPoS_v2) isExtendingFromAncestor(blockChainReader consensus.ChainReade } return false, nil } + +func (x *XDPoS_v2) hygieneVotePool() { + x.lock.RLock() + round := x.currentRound + x.lock.RUnlock() + votePoolKeys := x.votePool.PoolObjKeysList() + + // Extract round number + for _, k := range votePoolKeys { + keyedRound, err := strconv.ParseInt(strings.Split(k, ":")[0], 10, 64) + if err != nil { + log.Error("[hygieneVotePool] Error while trying to get keyedRound inside pool", "Error", err) + continue + } + // Clean up any votes round that is 10 rounds older + if keyedRound < int64(round)-utils.PoolHygieneRound { + log.Debug("[hygieneVotePool] Cleaned vote poll at round", "Round", keyedRound, "currentRound", round, "Key", k) + x.votePool.ClearByPoolKey(k) + } + } +} diff --git a/consensus/XDPoS/utils/constants.go b/consensus/XDPoS/utils/constants.go index 6b378c41aba8..c8df06cf4664 100644 --- a/consensus/XDPoS/utils/constants.go +++ b/consensus/XDPoS/utils/constants.go @@ -24,3 +24,8 @@ const ( BlockSignersCacheLimit = 9000 M2ByteLength = 4 ) + +const ( + PeriodicJobPeriod = 60 + PoolHygieneRound = 10 +) diff --git a/consensus/XDPoS/utils/pool.go b/consensus/XDPoS/utils/pool.go index 949cff3cacee..b671e027fbc1 100644 --- a/consensus/XDPoS/utils/pool.go +++ b/consensus/XDPoS/utils/pool.go @@ -1,6 +1,8 @@ package utils import ( + "sync" + "github.com/XinFinOrg/XDPoSChain/common" ) @@ -11,6 +13,7 @@ type PoolObj interface { type Pool struct { objList map[string]map[common.Hash]PoolObj threshold int + lock sync.RWMutex // Protects the pool fields } func NewPool(threshold int) *Pool { @@ -22,6 +25,8 @@ func NewPool(threshold int) *Pool { // return true if it has reached threshold func (p *Pool) Add(obj PoolObj) (bool, int, map[common.Hash]PoolObj) { + p.lock.Lock() + defer p.lock.Unlock() poolKey := obj.PoolKey() objListKeyed, ok := p.objList[poolKey] if !ok { @@ -45,16 +50,44 @@ func (p *Pool) Size(obj PoolObj) int { return len(objListKeyed) } +func (p *Pool) PoolObjKeysList() []string { + p.lock.RLock() + defer p.lock.RUnlock() + + var keyList []string + for key := range p.objList { + keyList = append(keyList, key) + } + return keyList +} + // Given the pool object, clear all object under the same pool key func (p *Pool) ClearPoolKeyByObj(obj PoolObj) { + p.lock.Lock() + defer p.lock.Unlock() + poolKey := obj.PoolKey() delete(p.objList, poolKey) } +// Given the pool key, clean its content +func (p *Pool) ClearByPoolKey(poolKey string) { + p.lock.Lock() + defer p.lock.Unlock() + + delete(p.objList, poolKey) +} + func (p *Pool) Clear() { + p.lock.Lock() + defer p.lock.Unlock() + p.objList = make(map[string]map[common.Hash]PoolObj) } func (p *Pool) SetThreshold(t int) { + p.lock.Lock() + defer p.lock.Unlock() + p.threshold = t } diff --git a/consensus/XDPoS/utils/types.go b/consensus/XDPoS/utils/types.go index 36ad62556d63..13a21d12466e 100644 --- a/consensus/XDPoS/utils/types.go +++ b/consensus/XDPoS/utils/types.go @@ -13,6 +13,7 @@ import ( "github.com/XinFinOrg/XDPoSChain/core/state" "github.com/XinFinOrg/XDPoSChain/core/types" "github.com/XinFinOrg/XDPoSChain/crypto/sha3" + "github.com/XinFinOrg/XDPoSChain/log" "github.com/XinFinOrg/XDPoSChain/rlp" "gopkg.in/karalabe/cookiejar.v2/collections/prque" ) @@ -131,7 +132,10 @@ func (e *ExtraFields_v2) EncodeToBytes() ([]byte, error) { func rlpHash(x interface{}) (h common.Hash) { hw := sha3.NewKeccak256() - rlp.Encode(hw, x) + err := rlp.Encode(hw, x) + if err != nil { + log.Error("[rlpHash] Fail to hash item", "Error", err) + } hw.Sum(h[:0]) return h } @@ -168,7 +172,7 @@ func TimeoutSigHash(m *TimeoutForSign) common.Hash { func (m *Vote) PoolKey() string { // return the voted block hash - return m.ProposedBlockInfo.Hash.Hex() + return fmt.Sprint(m.ProposedBlockInfo.Round, ":", m.GapNumber, ":", m.ProposedBlockInfo.Number, ":", m.ProposedBlockInfo.Hash.Hex()) } func (m *Timeout) PoolKey() string { diff --git a/consensus/XDPoS/utils/types_test.go b/consensus/XDPoS/utils/types_test.go index a8971db8843d..549fe7c179cd 100644 --- a/consensus/XDPoS/utils/types_test.go +++ b/consensus/XDPoS/utils/types_test.go @@ -3,9 +3,11 @@ package utils import ( "math/big" "reflect" + "strings" "testing" "github.com/XinFinOrg/XDPoSChain/common" + "github.com/stretchr/testify/assert" ) func toyExtraFields() *ExtraFields_v2 { @@ -75,3 +77,21 @@ func TestHashAndSigHash(t *testing.T) { t.Fatalf("SigHash of two round shouldn't equal") } } + +func TestPoolKeyFormat(t *testing.T) { + voteMsg := &Vote{ + ProposedBlockInfo: &BlockInfo{ + Hash: common.Hash{1}, + Round: 5, + Number: big.NewInt(4), + }, + Signature: []byte{}, + GapNumber: 450, + } + + voteKey := strings.Split(voteMsg.PoolKey(), ":") + assert.Equal(t, "5", voteKey[0]) + assert.Equal(t, "450", voteKey[1]) + assert.Equal(t, "4", voteKey[2]) + assert.Equal(t, common.Hash{1}.String(), voteKey[3]) +} diff --git a/consensus/tests/engine_v2_tests/timeout_test.go b/consensus/tests/engine_v2_tests/timeout_test.go index 1b8920c0411e..0b2345c09b75 100644 --- a/consensus/tests/engine_v2_tests/timeout_test.go +++ b/consensus/tests/engine_v2_tests/timeout_test.go @@ -1,7 +1,8 @@ package engine_v2_tests import ( - "fmt" + "strconv" + "strings" "testing" "time" @@ -23,7 +24,6 @@ func TestCountdownTimeoutToSendTimeoutMessage(t *testing.T) { assert.Equal(t, poolSize, 1) assert.NotNil(t, timeoutMsg) assert.Equal(t, uint64(1350), timeoutMsg.(*utils.Timeout).GapNumber) - fmt.Println(timeoutMsg.(*utils.Timeout).GapNumber) assert.Equal(t, utils.Round(1), timeoutMsg.(*utils.Timeout).Round) } @@ -229,3 +229,66 @@ func TestShouldVerifyTimeoutMessage(t *testing.T) { assert.Nil(t, err) assert.True(t, verified) } + +func TestTimeoutPoolKeeyGoodHygiene(t *testing.T) { + blockchain, _, _, signer, signFn, _ := PrepareXDCTestBlockChainForV2Engine(t, 905, params.TestXDPoSMockChainConfig, 0) + engineV2 := blockchain.Engine().(*XDPoS.XDPoS).EngineV2 + + // Set round to 5 + engineV2.SetNewRoundFaker(blockchain, utils.Round(5), false) + // Inject the first timeout with round 5 + + signedHash, _ := signFn(accounts.Account{Address: signer}, utils.TimeoutSigHash(&utils.TimeoutForSign{ + Round: utils.Round(5), + GapNumber: 450, + }).Bytes()) + timeoutMsg := &utils.Timeout{ + Round: utils.Round(5), + GapNumber: 450, + Signature: signedHash, + } + engineV2.TimeoutHandler(blockchain, timeoutMsg) + + // Inject a second timeout with round 16 + signedHash, _ = signFn(accounts.Account{Address: signer}, utils.TimeoutSigHash(&utils.TimeoutForSign{ + Round: utils.Round(16), + GapNumber: 450, + }).Bytes()) + timeoutMsg = &utils.Timeout{ + Round: utils.Round(16), + GapNumber: 450, + Signature: signedHash, + } + // Set round to 16 + engineV2.SetNewRoundFaker(blockchain, utils.Round(16), false) + engineV2.TimeoutHandler(blockchain, timeoutMsg) + + // Inject a third timeout with round 17 + signedHash, _ = signFn(accounts.Account{Address: signer}, utils.TimeoutSigHash(&utils.TimeoutForSign{ + Round: utils.Round(17), + GapNumber: 450, + }).Bytes()) + timeoutMsg = &utils.Timeout{ + Round: utils.Round(17), + GapNumber: 450, + Signature: signedHash, + } + // Set round to 16 + engineV2.SetNewRoundFaker(blockchain, utils.Round(17), false) + engineV2.TimeoutHandler(blockchain, timeoutMsg) + + // Let's keep good Hygiene + engineV2.HygieneTimeoutPoolFaker() + // Let's wait for 5 second for the goroutine + <-time.After(5 * time.Second) + keyList := engineV2.GetTimeoutPoolKeyListFaker() + + assert.Equal(t, 2, len(keyList)) + for _, k := range keyList { + keyedRound, err := strconv.ParseInt(strings.Split(k, ":")[0], 10, 64) + assert.Nil(t, err) + if keyedRound < 25-10 { + assert.Fail(t, "Did not clean up the timeout pool") + } + } +} diff --git a/consensus/tests/engine_v2_tests/vote_test.go b/consensus/tests/engine_v2_tests/vote_test.go index 7ee0398c4b02..e1da4ee96852 100644 --- a/consensus/tests/engine_v2_tests/vote_test.go +++ b/consensus/tests/engine_v2_tests/vote_test.go @@ -3,8 +3,10 @@ package engine_v2_tests import ( "fmt" "math/big" + "strconv" "strings" "testing" + "time" "github.com/XinFinOrg/XDPoSChain/accounts" "github.com/XinFinOrg/XDPoSChain/accounts/abi/bind/backends" @@ -555,5 +557,94 @@ func TestVoteMessageHandlerWrongGapNumber(t *testing.T) { } err := engineV2.VoteHandler(blockchain, voteMsg) - assert.True(t, strings.Contains(err.Error(), "gap number mismatch")) + // Shall not even trigger the vote threashold as vote pool key also contains the gapNumber + assert.Nil(t, err) +} + +func TestVotePoolKeepGoodHygiene(t *testing.T) { + blockchain, _, currentBlock, signer, signFn, _ := PrepareXDCTestBlockChainForV2Engine(t, 905, params.TestXDPoSMockChainConfig, 0) + engineV2 := blockchain.Engine().(*XDPoS.XDPoS).EngineV2 + + blockInfo := &utils.BlockInfo{ + Hash: currentBlock.Hash(), + Round: utils.Round(5), + Number: big.NewInt(905), + } + voteForSign := &utils.VoteForSign{ + ProposedBlockInfo: blockInfo, + GapNumber: 450, + } + voteSigningHash := utils.VoteSigHash(voteForSign) + + // Set round to 5 + engineV2.SetNewRoundFaker(blockchain, utils.Round(5), false) + // Create two vote messages which will not reach vote pool threshold + signedHash, _ := signFn(accounts.Account{Address: signer}, voteSigningHash.Bytes()) + voteMsg := &utils.Vote{ + ProposedBlockInfo: blockInfo, + Signature: signedHash, + GapNumber: 450, + } + engineV2.VoteHandler(blockchain, voteMsg) + + // Inject a second vote with round 16 + blockInfo = &utils.BlockInfo{ + Hash: currentBlock.Hash(), + Round: utils.Round(16), + Number: big.NewInt(906), + } + voteForSign = &utils.VoteForSign{ + ProposedBlockInfo: blockInfo, + GapNumber: 450, + } + voteSigningHash = utils.VoteSigHash(voteForSign) + + // Set round to 16 + engineV2.SetNewRoundFaker(blockchain, utils.Round(16), false) + // Create two vote messages which will not reach vote pool threshold + signedHash, _ = signFn(accounts.Account{Address: signer}, voteSigningHash.Bytes()) + voteMsg = &utils.Vote{ + ProposedBlockInfo: blockInfo, + Signature: signedHash, + GapNumber: 450, + } + engineV2.VoteHandler(blockchain, voteMsg) + + // Inject a second vote with round 25, which is less than 10 rounds difference to the last vote round + blockInfo = &utils.BlockInfo{ + Hash: currentBlock.Hash(), + Round: utils.Round(25), + Number: big.NewInt(907), + } + voteForSign = &utils.VoteForSign{ + ProposedBlockInfo: blockInfo, + GapNumber: 450, + } + voteSigningHash = utils.VoteSigHash(voteForSign) + + // Set round to 25 + engineV2.SetNewRoundFaker(blockchain, utils.Round(25), false) + // Create two vote messages which will not reach vote pool threshold + signedHash, _ = signFn(accounts.Account{Address: signer}, voteSigningHash.Bytes()) + voteMsg = &utils.Vote{ + ProposedBlockInfo: blockInfo, + Signature: signedHash, + GapNumber: 450, + } + engineV2.VoteHandler(blockchain, voteMsg) + + // Let's keep good Hygiene + engineV2.HygieneVotePoolFaker() + // Let's wait for 5 second for the goroutine + <-time.After(5 * time.Second) + keyList := engineV2.GetVotePoolKeyListFaker() + + assert.Equal(t, 2, len(keyList)) + for _, k := range keyList { + keyedRound, err := strconv.ParseInt(strings.Split(k, ":")[0], 10, 64) + assert.Nil(t, err) + if keyedRound < 25-10 { + assert.Fail(t, "Did not clean up the vote pool") + } + } }