Skip to content

Commit

Permalink
adjust lock (ethereum#39)
Browse files Browse the repository at this point in the history
* adjust lock

* change error log

* change error log
  • Loading branch information
liam-lai authored Jan 16, 2022
1 parent 38c3582 commit aab040f
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 20 deletions.
29 changes: 19 additions & 10 deletions consensus/XDPoS/engines/engine_v2/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func New(config *params.XDPoSConfig, db ethdb.Database) *XDPoS_v2 {
highestCommitBlock: nil,
}
// Add callback to the timer
timer.OnTimeoutFn = engine.onCountdownTimeout
timer.OnTimeoutFn = engine.OnCountdownTimeout

return engine
}
Expand Down Expand Up @@ -192,6 +192,7 @@ func (x *XDPoS_v2) Prepare(chain consensus.ChainReader, header *types.Header) er

// Ensure the timestamp has the correct delay

// TODO: Proper deal with time
// TODO: if timestamp > current time, how to deal with future timestamp
header.Time = new(big.Int).Add(parent.Time, new(big.Int).SetUint64(x.config.Period))
if header.Time.Int64() < time.Now().Unix() {
Expand Down Expand Up @@ -525,8 +526,8 @@ func (x *XDPoS_v2) VerifySyncInfoMessage(syncInfo *utils.SyncInfo) error {
}

func (x *XDPoS_v2) SyncInfoHandler(chain consensus.ChainReader, syncInfo *utils.SyncInfo) error {
x.signLock.Lock()
defer x.signLock.Unlock()
x.lock.Lock()
defer x.lock.Unlock()
/*
1. processQC
2. processTC
Expand Down Expand Up @@ -564,13 +565,17 @@ func (x *XDPoS_v2) voteHandler(chain consensus.ChainReader, voteMsg *utils.Vote)

// 1. checkRoundNumber
if voteMsg.ProposedBlockInfo.Round != x.currentRound {
return fmt.Errorf("Vote message round number: %v does not match currentRound: %v", voteMsg.ProposedBlockInfo.Round, x.currentRound)
return &utils.ErrIncomingMessageRoundNotEqualCurrentRound{
Type: "vote",
IncomingRound: voteMsg.ProposedBlockInfo.Round,
CurrentRound: x.currentRound,
}
}

// Collect vote
thresholdReached, numberOfVotesInPool, pooledVotes := x.votePool.Add(voteMsg)
if thresholdReached {
log.Debug("Vote pool threashold reached: %v, number of items in the pool: %v", thresholdReached, numberOfVotesInPool)
log.Info(fmt.Sprintf("Vote pool threashold reached: %v, number of items in the pool: %v", thresholdReached, numberOfVotesInPool))
err := x.onVotePoolThresholdReached(chain, pooledVotes, voteMsg)
if err != nil {
return err
Expand Down Expand Up @@ -634,14 +639,16 @@ func (x *XDPoS_v2) timeoutHandler(timeout *utils.Timeout) error {
// 1. checkRoundNumber
if timeout.Round != x.currentRound {
return &utils.ErrIncomingMessageRoundNotEqualCurrentRound{
Type: "timeout",
IncomingRound: timeout.Round,
CurrentRound: x.currentRound}
CurrentRound: x.currentRound,
}
}
// Collect timeout, generate TC
isThresholdReached, numberOfTimeoutsInPool, pooledTimeouts := x.timeoutPool.Add(timeout)
// Threshold reached
if isThresholdReached {
log.Debug("Timeout pool threashold reached: %v, number of items in the pool: %v", isThresholdReached, numberOfTimeoutsInPool)
log.Info(fmt.Sprintf("Timeout pool threashold reached: %v, number of items in the pool: %v", isThresholdReached, numberOfTimeoutsInPool))
err := x.onTimeoutPoolThresholdReached(pooledTimeouts, timeout)
if err != nil {
return err
Expand Down Expand Up @@ -954,7 +961,7 @@ func (x *XDPoS_v2) verifyMsgSignature(signedHashToBeVerified common.Hash, signat
Function that will be called by timer when countdown reaches its threshold.
In the engine v2, we would need to broadcast timeout messages to other peers
*/
func (x *XDPoS_v2) onCountdownTimeout(time time.Time) error {
func (x *XDPoS_v2) OnCountdownTimeout(time time.Time) error {
x.lock.Lock()
defer x.lock.Unlock()

Expand Down Expand Up @@ -1064,13 +1071,15 @@ func (x *XDPoS_v2) SetNewRoundFaker(newRound utils.Round, resetTimer bool) {

// Utils for test to check currentRound value
func (x *XDPoS_v2) GetCurrentRound() utils.Round {
x.lock.RLock()
defer x.lock.RUnlock()
return x.currentRound
}

// Utils for test to check currentRound value
func (x *XDPoS_v2) GetProperties() (utils.Round, *utils.QuorumCert, *utils.QuorumCert, utils.Round, *utils.BlockInfo) {
x.lock.Lock()
defer x.lock.Unlock()
x.lock.RLock()
defer x.lock.RUnlock()
return x.currentRound, x.lockQuorumCert, x.highestQuorumCert, x.highestVotedRound, x.highestCommitBlock
}

Expand Down
3 changes: 2 additions & 1 deletion consensus/XDPoS/utils/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,11 @@ var (
)

type ErrIncomingMessageRoundNotEqualCurrentRound struct {
Type string
IncomingRound Round
CurrentRound Round
}

func (e *ErrIncomingMessageRoundNotEqualCurrentRound) Error() string {
return fmt.Sprintf("Timeout message round number: %v does not match currentRound: %v", e.IncomingRound, e.CurrentRound)
return fmt.Sprintf("%s message round number: %v does not match currentRound: %v", e.Type, e.IncomingRound, e.CurrentRound)
}
4 changes: 2 additions & 2 deletions consensus/tests/timeout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ func TestThrowErrorIfTimeoutMsgRoundNotEqualToCurrentRound(t *testing.T) {
err := engineV2.TimeoutHandler(timeoutMsg)
assert.NotNil(t, err)
// Timeout msg round > currentRound
assert.Equal(t, "Timeout message round number: 2 does not match currentRound: 3", err.Error())
assert.Equal(t, "timeout message round number: 2 does not match currentRound: 3", err.Error())

// Set round to 1
engineV2.SetNewRoundFaker(utils.Round(1), false)
err = engineV2.TimeoutHandler(timeoutMsg)
assert.NotNil(t, err)
// Timeout msg round < currentRound
assert.Equal(t, "Timeout message round number: 2 does not match currentRound: 1", err.Error())
assert.Equal(t, "timeout message round number: 2 does not match currentRound: 1", err.Error())
}
6 changes: 3 additions & 3 deletions consensus/tests/vote_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,14 @@ func TestThrowErrorIfVoteMsgRoundNotEqualToCurrentRound(t *testing.T) {
// voteRound > currentRound
err := engineV2.VoteHandler(blockchain, voteMsg)
assert.NotNil(t, err)
assert.Equal(t, "Vote message round number: 6 does not match currentRound: 7", err.Error())
assert.Equal(t, "vote message round number: 6 does not match currentRound: 7", err.Error())

// Set round to 5
engineV2.SetNewRoundFaker(utils.Round(5), false)
err = engineV2.VoteHandler(blockchain, voteMsg)
assert.NotNil(t, err)
// voteRound < currentRound
assert.Equal(t, "Vote message round number: 6 does not match currentRound: 5", err.Error())
assert.Equal(t, "vote message round number: 6 does not match currentRound: 5", err.Error())
}

func TestProcessVoteMsgThenTimeoutMsg(t *testing.T) {
Expand Down Expand Up @@ -218,7 +218,7 @@ func TestProcessVoteMsgThenTimeoutMsg(t *testing.T) {

err = engineV2.TimeoutHandler(timeoutMsg)
assert.NotNil(t, err)
assert.Equal(t, "Timeout message round number: 5 does not match currentRound: 6", err.Error())
assert.Equal(t, "timeout message round number: 5 does not match currentRound: 6", err.Error())

// Ok, let's do the timeout msg which is on the same round as the current round by creating two timeout message which will not reach timeout pool threshold
timeoutMsg = &utils.Timeout{
Expand Down
8 changes: 6 additions & 2 deletions eth/bft/bft_hander_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,11 @@ func TestTimeoutHandlerRoundNotEqual(t *testing.T) {
}

tester.bfter.consensus.timeoutHandler = func(timeout *utils.Timeout) error {
return &utils.ErrIncomingMessageRoundNotEqualCurrentRound{utils.Round(1), utils.Round(2)}
return &utils.ErrIncomingMessageRoundNotEqualCurrentRound{
Type: "timeout",
IncomingRound: utils.Round(1),
CurrentRound: utils.Round(2),
}
}

tester.bfter.broadcast.Timeout = func(*utils.Timeout) {
Expand All @@ -201,5 +205,5 @@ func TestTimeoutHandlerRoundNotEqual(t *testing.T) {
timeoutMsg := &utils.Timeout{}

err := tester.bfter.Timeout(timeoutMsg)
assert.Equal(t, "Timeout message round number: 1 does not match currentRound: 2", err.Error())
assert.Equal(t, "timeout message round number: 1 does not match currentRound: 2", err.Error())
}
8 changes: 6 additions & 2 deletions eth/bft/bft_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (b *Bfter) SetConsensusFuns(engine consensus.Engine) {

// TODO: rename
func (b *Bfter) Vote(vote *utils.Vote) error {
log.Trace("Receive Vote", "vote hash", vote.Hash(), "voted block hash", vote.ProposedBlockInfo.Hash.Hex(), "number", vote.ProposedBlockInfo.Number, "round", vote.ProposedBlockInfo.Round, "signature", vote.Signature)
log.Trace("Receive Vote", "hash", vote.Hash(), "voted block hash", vote.ProposedBlockInfo.Hash.Hex(), "number", vote.ProposedBlockInfo.Number, "round", vote.ProposedBlockInfo.Round, "signature", vote.Signature)
if exist, _ := b.knownVotes.ContainsOrAdd(vote.Hash(), true); exist {
log.Info("Discarded vote, known vote", "vote hash", vote.Hash(), "voted block hash", vote.ProposedBlockInfo.Hash.Hex(), "number", vote.ProposedBlockInfo.Number, "round", vote.ProposedBlockInfo.Round)
return nil
Expand All @@ -94,6 +94,10 @@ func (b *Bfter) Vote(vote *utils.Vote) error {

err = b.consensus.voteHandler(b.blockChainReader, vote)
if err != nil {
if _, ok := err.(*utils.ErrIncomingMessageRoundNotEqualCurrentRound); ok {
log.Warn("vote round not equal", "error", err, "vote", vote.Hash())
return err
}
log.Error("handle BFT Vote", "error", err)
return err
}
Expand All @@ -115,7 +119,7 @@ func (b *Bfter) Timeout(timeout *utils.Timeout) error {
err = b.consensus.timeoutHandler(timeout)
if err != nil {
if _, ok := err.(*utils.ErrIncomingMessageRoundNotEqualCurrentRound); ok {
log.Debug("timeout message round not equal", "error", err)
log.Warn("timeout round not equal", "error", err)
return err
}
log.Error("handle BFT Timeout", "error", err)
Expand Down

0 comments on commit aab040f

Please sign in to comment.