Skip to content

Commit

Permalink
xin-106 add generated message into its pool (ethereum#32)
Browse files Browse the repository at this point in the history
* add debug log and change to contain or add for cache

* add generated message into its pool
  • Loading branch information
liam-lai authored and wjrjerome committed Dec 30, 2021
1 parent e0d66d4 commit 35eebab
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 19 deletions.
29 changes: 25 additions & 4 deletions consensus/XDPoS/engines/engine_v2/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (x *XDPoS_v2) Prepare(chain consensus.ChainReader, header *types.Header) er
currentRound := x.currentRound
highestQC := x.highestQuorumCert
x.lock.Unlock()
//parentRound := highestQC.ProposedBlockInfo.Round

if (highestQC == nil) || (header.ParentHash != highestQC.ProposedBlockInfo.Hash) {
return consensus.ErrNotReadyToPropose
}
Expand Down Expand Up @@ -447,6 +447,16 @@ func (x *XDPoS_v2) VerifyHeader(chain consensus.ChainReader, header *types.Heade
return nil
}

// Utils for test to get current Pool size
func (x *XDPoS_v2) GetVotePoolSize(vote *utils.Vote) int {
return x.votePool.Size(vote)
}

// Utils for test to get Timeout Pool Size
func (x *XDPoS_v2) GetTimeoutPoolSize(timeout *utils.Timeout) int {
return x.timeoutPool.Size(timeout)
}

/*
SyncInfo workflow
*/
Expand Down Expand Up @@ -504,6 +514,10 @@ func (x *XDPoS_v2) VerifyVoteMessage(vote *utils.Vote) (bool, error) {
func (x *XDPoS_v2) VoteHandler(chain consensus.ChainReader, voteMsg *utils.Vote) error {
x.lock.Lock()
defer x.lock.Unlock()
return x.voteHandler(chain, voteMsg)
}

func (x *XDPoS_v2) voteHandler(chain consensus.ChainReader, voteMsg *utils.Vote) error {

// 1. checkRoundNumber
if voteMsg.ProposedBlockInfo.Round != x.currentRound {
Expand All @@ -516,7 +530,7 @@ func (x *XDPoS_v2) VoteHandler(chain consensus.ChainReader, voteMsg *utils.Vote)
log.Debug("Vote pool threashold reached: %v, number of items in the pool: %v", thresholdReached, numberOfVotesInPool)
err := x.onVotePoolThresholdReached(chain, pooledVotes, voteMsg)
if err != nil {
return nil
return err
}
}

Expand Down Expand Up @@ -570,7 +584,10 @@ func (x *XDPoS_v2) VerifyTimeoutMessage(timeoutMsg *utils.Timeout) (bool, error)
func (x *XDPoS_v2) TimeoutHandler(timeout *utils.Timeout) error {
x.lock.Lock()
defer x.lock.Unlock()
return x.timeoutHandler(timeout)
}

func (x *XDPoS_v2) timeoutHandler(timeout *utils.Timeout) error {
// 1. checkRoundNumber
if timeout.Round != x.currentRound {
return &utils.ErrIncomingMessageRoundNotEqualCurrentRound{
Expand Down Expand Up @@ -666,7 +683,7 @@ func (x *XDPoS_v2) ProposedBlockHandler(blockChainReader consensus.ChainReader,
return err
}
if verified {
return x.sendVote(blockInfo)
return x.sendVote(blockChainReader, blockInfo)
} else {
log.Info("Failed to pass the voting rule verification", "ProposeBlockHash", blockInfo.Hash)
}
Expand Down Expand Up @@ -806,7 +823,7 @@ func (x *XDPoS_v2) verifyVotingRule(blockChainReader consensus.ChainReader, bloc
}

// Once Hot stuff voting rule has verified, this node can then send vote
func (x *XDPoS_v2) sendVote(blockInfo *utils.BlockInfo) error {
func (x *XDPoS_v2) sendVote(chainReader consensus.ChainReader, blockInfo *utils.BlockInfo) error {
// First step: Update the highest Voted round
// Second step: Generate the signature by using node's private key(The signature is the blockInfo signature)
// Third step: Construct the vote struct with the above signature & blockinfo struct
Expand All @@ -822,6 +839,8 @@ func (x *XDPoS_v2) sendVote(blockInfo *utils.BlockInfo) error {
ProposedBlockInfo: blockInfo,
Signature: signedHash,
}

x.voteHandler(chainReader, voteMsg)
x.broadcastToBftChannel(voteMsg)
return nil
}
Expand All @@ -841,6 +860,8 @@ func (x *XDPoS_v2) sendTimeout() error {
Round: x.currentRound,
Signature: signedHash,
}

x.timeoutHandler(timeoutMsg)
x.broadcastToBftChannel(timeoutMsg)
return nil
}
Expand Down
8 changes: 8 additions & 0 deletions consensus/XDPoS/utils/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ func (p *Pool) Add(obj PoolObj) (bool, int, map[common.Hash]PoolObj) {
}
return false, numOfItems, objListKeyed
}
func (p *Pool) Size(obj PoolObj) int {
poolKey := obj.PoolKey()
objListKeyed, ok := p.objList[poolKey]
if !ok {
return 0
}
return len(objListKeyed)
}

func (p *Pool) Clear() {
p.objList = make(map[string]map[common.Hash]PoolObj)
Expand Down
2 changes: 2 additions & 0 deletions consensus/tests/countdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ func TestCountdownTimeoutToSendTimeoutMessage(t *testing.T) {
engineV2.SetNewRoundFaker(utils.Round(1), true)

timeoutMsg := <-engineV2.BroadcastCh
poolSize := engineV2.GetTimeoutPoolSize(timeoutMsg.(*utils.Timeout))
assert.Equal(t, poolSize, 1)
assert.NotNil(t, timeoutMsg)

valid, err := engineV2.VerifyTimeoutMessage(timeoutMsg.(*utils.Timeout))
Expand Down
3 changes: 3 additions & 0 deletions consensus/tests/proposed_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ func TestProcessFirstV2BlockAndSendVoteMsg(t *testing.T) {
}

voteMsg := <-engineV2.BroadcastCh
poolSize := engineV2.GetVotePoolSize(voteMsg.(*utils.Vote))

assert.Equal(t, poolSize, 1)
assert.NotNil(t, voteMsg)
assert.Equal(t, currentBlock.Hash(), voteMsg.(*utils.Vote).ProposedBlockInfo.Hash)

Expand Down
25 changes: 11 additions & 14 deletions eth/bft/bft_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ type Bfter struct {
broadcast BroadcastFns

// Message Cache
knownVotes *lru.ARCCache
knownSyncInfos *lru.ARCCache
knownTimeouts *lru.ARCCache
knownVotes *lru.Cache
knownSyncInfos *lru.Cache
knownTimeouts *lru.Cache
}

type ConsensusFns struct {
Expand All @@ -49,9 +49,9 @@ type BroadcastFns struct {
}

func New(broadcasts BroadcastFns, blockCahinReader *core.BlockChain) *Bfter {
knownVotes, _ := lru.NewARC(messageLimit)
knownSyncInfos, _ := lru.NewARC(messageLimit)
knownTimeouts, _ := lru.NewARC(messageLimit)
knownVotes, _ := lru.New(messageLimit)
knownSyncInfos, _ := lru.New(messageLimit)
knownTimeouts, _ := lru.New(messageLimit)
return &Bfter{
quit: make(chan struct{}),
broadcastCh: make(chan interface{}),
Expand Down Expand Up @@ -79,9 +79,9 @@ func (b *Bfter) SetConsensusFuns(engine consensus.Engine) {

// TODO: rename
func (b *Bfter) Vote(vote *utils.Vote) error {
log.Info("Receive Vote", "voted block hash", vote.ProposedBlockInfo.Hash.Hex(), "number", vote.ProposedBlockInfo.Number, "round", vote.ProposedBlockInfo.Round)
if b.knownVotes.Contains(vote.Hash()) {
log.Info("Discarded vote, known vote", "voted block hash", vote.ProposedBlockInfo.Hash.Hex(), "number", vote.ProposedBlockInfo.Number, "round", vote.ProposedBlockInfo.Round)
log.Trace("Receive Vote", "vote hash", vote.Hash(), "voted block hash", vote.ProposedBlockInfo.Hash.Hex(), "number", vote.ProposedBlockInfo.Number, "round", vote.ProposedBlockInfo.Round, "signature", vote.Signature)
if exist, _ := b.knownVotes.ContainsOrAdd(vote.Hash(), true); exist {
log.Info("Discarded vote, known vote", "vote hash", vote.Hash(), "voted block hash", vote.ProposedBlockInfo.Hash.Hex(), "number", vote.ProposedBlockInfo.Number, "round", vote.ProposedBlockInfo.Round)
return nil
}

Expand All @@ -90,7 +90,6 @@ func (b *Bfter) Vote(vote *utils.Vote) error {
log.Error("Verify BFT Vote", "error", err)
return err
}
b.knownVotes.Add(vote.Hash(), true)
b.broadcastCh <- vote

err = b.consensus.voteHandler(b.blockCahinReader, vote)
Expand All @@ -102,7 +101,7 @@ func (b *Bfter) Vote(vote *utils.Vote) error {
}
func (b *Bfter) Timeout(timeout *utils.Timeout) error {
log.Trace("Receive Timeout", "timeout", timeout)
if b.knownVotes.Contains(timeout.Hash()) {
if exist, _ := b.knownTimeouts.ContainsOrAdd(timeout.Hash(), true); exist {
log.Trace("Discarded Timeout, known Timeout", "Signature", timeout.Signature, "hash", timeout.Hash(), "round", timeout.Round)
return nil
}
Expand All @@ -111,7 +110,6 @@ func (b *Bfter) Timeout(timeout *utils.Timeout) error {
log.Error("Verify BFT Timeout", "error", err)
return err
}
b.knownTimeouts.Add(timeout.Hash(), true)
b.broadcastCh <- timeout

err = b.consensus.timeoutHandler(timeout)
Expand All @@ -127,7 +125,7 @@ func (b *Bfter) Timeout(timeout *utils.Timeout) error {
}
func (b *Bfter) SyncInfo(syncInfo *utils.SyncInfo) error {
log.Trace("Receive SyncInfo", "syncInfo", syncInfo)
if b.knownVotes.Contains(syncInfo.Hash()) {
if exist, _ := b.knownSyncInfos.ContainsOrAdd(syncInfo.Hash(), true); exist {
log.Trace("Discarded SyncInfo, known SyncInfo", "hash", syncInfo.Hash())
return nil
}
Expand All @@ -137,7 +135,6 @@ func (b *Bfter) SyncInfo(syncInfo *utils.SyncInfo) error {
return err
}

b.knownSyncInfos.Add(syncInfo.Hash(), true)
b.broadcastCh <- syncInfo

err = b.consensus.syncInfoHandler(b.blockCahinReader, syncInfo)
Expand Down
2 changes: 1 addition & 1 deletion eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,7 @@ func (pm *ProtocolManager) BroadcastVote(vote *utils.Vote) {
for _, peer := range peers {
peer.SendVote(vote)
}
log.Info("Propagated Vote", "voted block hash", vote.ProposedBlockInfo.Hash.Hex(), "number", vote.ProposedBlockInfo.Number, "round", vote.ProposedBlockInfo.Round, "recipients", len(peers))
log.Info("Propagated Vote", "vote hash", vote.Hash(), "voted block hash", vote.ProposedBlockInfo.Hash.Hex(), "number", vote.ProposedBlockInfo.Number, "round", vote.ProposedBlockInfo.Round, "recipients", len(peers))
}

// BroadcastTimeout will propagate a Timeout to all peers which are not known to
Expand Down

0 comments on commit 35eebab

Please sign in to comment.