Skip to content

Commit

Permalink
Xin 163 (ethereum#76)
Browse files Browse the repository at this point in the history
* clean up the pool old round

* add unit test to cover the vote key format

* add gapNumber to the vote pool key

* fix race condition in pool

* remove verify gap number in vote handler
  • Loading branch information
wjrjerome authored Apr 1, 2022
1 parent b98005a commit cb67e8e
Show file tree
Hide file tree
Showing 10 changed files with 300 additions and 26 deletions.
22 changes: 17 additions & 5 deletions consensus/XDPoS/engines/engine_v2/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,16 @@ type XDPoS_v2 struct {
}

func New(config *params.XDPoSConfig, db ethdb.Database, waitPeriodCh chan int) *XDPoS_v2 {
// Setup Timer
// Setup timeoutTimer
duration := time.Duration(config.V2.TimeoutPeriod) * time.Second
timer := countdown.NewCountDown(duration)
timeoutPool := utils.NewPool(config.V2.CertThreshold)
timeoutTimer := countdown.NewCountDown(duration)

snapshots, _ := lru.NewARC(utils.InmemorySnapshots)
signatures, _ := lru.NewARC(utils.InmemorySnapshots)
epochSwitches, _ := lru.NewARC(int(utils.InmemoryEpochs))
verifiedHeaders, _ := lru.NewARC(utils.InmemorySnapshots)

timeoutPool := utils.NewPool(config.V2.CertThreshold)
votePool := utils.NewPool(config.V2.CertThreshold)
engine := &XDPoS_v2{
config: config,
Expand All @@ -80,7 +80,7 @@ func New(config *params.XDPoSConfig, db ethdb.Database, waitPeriodCh chan int) *
verifiedHeaders: verifiedHeaders,
snapshots: snapshots,
epochSwitches: epochSwitches,
timeoutWorker: timer,
timeoutWorker: timeoutTimer,
BroadcastCh: make(chan interface{}),
waitPeriodCh: waitPeriodCh,

Expand All @@ -104,8 +104,9 @@ func New(config *params.XDPoSConfig, db ethdb.Database, waitPeriodCh chan int) *
highestCommitBlock: nil,
}
// Add callback to the timer
timer.OnTimeoutFn = engine.OnCountdownTimeout
timeoutTimer.OnTimeoutFn = engine.OnCountdownTimeout

engine.periodicJob()
return engine
}

Expand Down Expand Up @@ -1042,3 +1043,14 @@ func (x *XDPoS_v2) allowedToSend(chain consensus.ChainReader, blockHeader *types
}
return nil
}

// Periodlly execution(Attached to engine initialisation during "new"). Used for pool cleaning etc
func (x *XDPoS_v2) periodicJob() {
go func() {
for {
<-time.After(utils.PeriodicJobPeriod * time.Second)
x.hygieneVotePool()
x.hygieneTimeoutPool()
}
}()
}
16 changes: 16 additions & 0 deletions consensus/XDPoS/engines/engine_v2/testing_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,19 @@ func (x *XDPoS_v2) SetPropertiesFaker(highestQC *utils.QuorumCert, highestTC *ut
x.highestQuorumCert = highestQC
x.highestTimeoutCert = highestTC
}

func (x *XDPoS_v2) HygieneVotePoolFaker() {
x.hygieneVotePool()
}

func (x *XDPoS_v2) GetVotePoolKeyListFaker() []string {
return x.votePool.PoolObjKeysList()
}

func (x *XDPoS_v2) HygieneTimeoutPoolFaker() {
x.hygieneTimeoutPool()
}

func (x *XDPoS_v2) GetTimeoutPoolKeyListFaker() []string {
return x.timeoutPool.PoolObjKeysList()
}
23 changes: 23 additions & 0 deletions consensus/XDPoS/engines/engine_v2/timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package engine_v2

import (
"fmt"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -227,3 +229,24 @@ func (x *XDPoS_v2) OnCountdownTimeout(time time.Time, chain interface{}) error {

return nil
}

func (x *XDPoS_v2) hygieneTimeoutPool() {
x.lock.RLock()
currentRound := x.currentRound
x.lock.RUnlock()
timeoutPoolKeys := x.timeoutPool.PoolObjKeysList()

// Extract round number
for _, k := range timeoutPoolKeys {
keyedRound, err := strconv.ParseInt(strings.Split(k, ":")[0], 10, 64)
if err != nil {
log.Error("[hygieneTimeoutPool] Error while trying to get keyedRound inside pool", "Error", err)
continue
}
// Clean up any timeouts round that is 10 rounds older
if keyedRound < int64(currentRound)-utils.PoolHygieneRound {
log.Debug("[hygieneTimeoutPool] Cleaned timeout pool at round", "Round", keyedRound, "CurrentRound", currentRound, "Key", k)
x.timeoutPool.ClearByPoolKey(k)
}
}
}
39 changes: 23 additions & 16 deletions consensus/XDPoS/engines/engine_v2/vote.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package engine_v2
import (
"fmt"
"math/big"
"strconv"
"strings"
"sync"

"github.com/XinFinOrg/XDPoSChain/common"
Expand Down Expand Up @@ -77,22 +79,8 @@ func (x *XDPoS_v2) voteHandler(chain consensus.ChainReader, voteMsg *utils.Vote)

err := x.VerifyBlockInfo(chain, voteMsg.ProposedBlockInfo)
if err != nil {
x.votePool.ClearPoolKeyByObj(voteMsg)
return err
}
// verify vote.GapNumber
epochSwitchInfo, err := x.getEpochSwitchInfo(chain, nil, voteMsg.ProposedBlockInfo.Hash)
if err != nil {
log.Error("getEpochSwitchInfo when handle Vote", "BlockInfoHash", voteMsg.ProposedBlockInfo.Hash, "Error", err)
return err
}
epochSwitchNumber := epochSwitchInfo.EpochSwitchBlockInfo.Number.Uint64()
gapNumber := epochSwitchNumber - epochSwitchNumber%x.config.Epoch - x.config.Gap
if gapNumber != voteMsg.GapNumber {
log.Error("[voteHandler] gap number mismatch", "BlockInfoHash", voteMsg.ProposedBlockInfo.Hash, "Gap", voteMsg.GapNumber, "GapShouldBe", gapNumber)
return fmt.Errorf("gap number mismatch %v", voteMsg)
}

err = x.onVotePoolThresholdReached(chain, pooledVotes, voteMsg, proposedBlockHeader)
if err != nil {
return err
Expand Down Expand Up @@ -157,8 +145,6 @@ func (x *XDPoS_v2) onVotePoolThresholdReached(chain consensus.ChainReader, poole
return err
}
log.Info("Successfully processed the vote and produced QC!", "QcRound", quorumCert.ProposedBlockInfo.Round, "QcNumOfSig", len(quorumCert.Signatures), "QcHash", quorumCert.ProposedBlockInfo.Hash, "QcNumber", quorumCert.ProposedBlockInfo.Number.Uint64())
// clean up vote at the same poolKey. and pookKey is proposed block hash
x.votePool.ClearPoolKeyByObj(currentVoteMsg)
return nil
}

Expand Down Expand Up @@ -216,3 +202,24 @@ func (x *XDPoS_v2) isExtendingFromAncestor(blockChainReader consensus.ChainReade
}
return false, nil
}

func (x *XDPoS_v2) hygieneVotePool() {
x.lock.RLock()
round := x.currentRound
x.lock.RUnlock()
votePoolKeys := x.votePool.PoolObjKeysList()

// Extract round number
for _, k := range votePoolKeys {
keyedRound, err := strconv.ParseInt(strings.Split(k, ":")[0], 10, 64)
if err != nil {
log.Error("[hygieneVotePool] Error while trying to get keyedRound inside pool", "Error", err)
continue
}
// Clean up any votes round that is 10 rounds older
if keyedRound < int64(round)-utils.PoolHygieneRound {
log.Debug("[hygieneVotePool] Cleaned vote poll at round", "Round", keyedRound, "currentRound", round, "Key", k)
x.votePool.ClearByPoolKey(k)
}
}
}
5 changes: 5 additions & 0 deletions consensus/XDPoS/utils/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,8 @@ const (
BlockSignersCacheLimit = 9000
M2ByteLength = 4
)

const (
PeriodicJobPeriod = 60
PoolHygieneRound = 10
)
33 changes: 33 additions & 0 deletions consensus/XDPoS/utils/pool.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package utils

import (
"sync"

"github.com/XinFinOrg/XDPoSChain/common"
)

Expand All @@ -11,6 +13,7 @@ type PoolObj interface {
type Pool struct {
objList map[string]map[common.Hash]PoolObj
threshold int
lock sync.RWMutex // Protects the pool fields
}

func NewPool(threshold int) *Pool {
Expand All @@ -22,6 +25,8 @@ func NewPool(threshold int) *Pool {

// return true if it has reached threshold
func (p *Pool) Add(obj PoolObj) (bool, int, map[common.Hash]PoolObj) {
p.lock.Lock()
defer p.lock.Unlock()
poolKey := obj.PoolKey()
objListKeyed, ok := p.objList[poolKey]
if !ok {
Expand All @@ -45,16 +50,44 @@ func (p *Pool) Size(obj PoolObj) int {
return len(objListKeyed)
}

func (p *Pool) PoolObjKeysList() []string {
p.lock.RLock()
defer p.lock.RUnlock()

var keyList []string
for key := range p.objList {
keyList = append(keyList, key)
}
return keyList
}

// Given the pool object, clear all object under the same pool key
func (p *Pool) ClearPoolKeyByObj(obj PoolObj) {
p.lock.Lock()
defer p.lock.Unlock()

poolKey := obj.PoolKey()
delete(p.objList, poolKey)
}

// Given the pool key, clean its content
func (p *Pool) ClearByPoolKey(poolKey string) {
p.lock.Lock()
defer p.lock.Unlock()

delete(p.objList, poolKey)
}

func (p *Pool) Clear() {
p.lock.Lock()
defer p.lock.Unlock()

p.objList = make(map[string]map[common.Hash]PoolObj)
}

func (p *Pool) SetThreshold(t int) {
p.lock.Lock()
defer p.lock.Unlock()

p.threshold = t
}
8 changes: 6 additions & 2 deletions consensus/XDPoS/utils/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/XinFinOrg/XDPoSChain/core/state"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/crypto/sha3"
"github.com/XinFinOrg/XDPoSChain/log"
"github.com/XinFinOrg/XDPoSChain/rlp"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)
Expand Down Expand Up @@ -131,7 +132,10 @@ func (e *ExtraFields_v2) EncodeToBytes() ([]byte, error) {

func rlpHash(x interface{}) (h common.Hash) {
hw := sha3.NewKeccak256()
rlp.Encode(hw, x)
err := rlp.Encode(hw, x)
if err != nil {
log.Error("[rlpHash] Fail to hash item", "Error", err)
}
hw.Sum(h[:0])
return h
}
Expand Down Expand Up @@ -168,7 +172,7 @@ func TimeoutSigHash(m *TimeoutForSign) common.Hash {

func (m *Vote) PoolKey() string {
// return the voted block hash
return m.ProposedBlockInfo.Hash.Hex()
return fmt.Sprint(m.ProposedBlockInfo.Round, ":", m.GapNumber, ":", m.ProposedBlockInfo.Number, ":", m.ProposedBlockInfo.Hash.Hex())
}

func (m *Timeout) PoolKey() string {
Expand Down
20 changes: 20 additions & 0 deletions consensus/XDPoS/utils/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package utils
import (
"math/big"
"reflect"
"strings"
"testing"

"github.com/XinFinOrg/XDPoSChain/common"
"github.com/stretchr/testify/assert"
)

func toyExtraFields() *ExtraFields_v2 {
Expand Down Expand Up @@ -75,3 +77,21 @@ func TestHashAndSigHash(t *testing.T) {
t.Fatalf("SigHash of two round shouldn't equal")
}
}

func TestPoolKeyFormat(t *testing.T) {
voteMsg := &Vote{
ProposedBlockInfo: &BlockInfo{
Hash: common.Hash{1},
Round: 5,
Number: big.NewInt(4),
},
Signature: []byte{},
GapNumber: 450,
}

voteKey := strings.Split(voteMsg.PoolKey(), ":")
assert.Equal(t, "5", voteKey[0])
assert.Equal(t, "450", voteKey[1])
assert.Equal(t, "4", voteKey[2])
assert.Equal(t, common.Hash{1}.String(), voteKey[3])
}
Loading

0 comments on commit cb67e8e

Please sign in to comment.