Skip to content

Commit

Permalink
Improve Peer Score Algorithm (#248)
Browse files Browse the repository at this point in the history
* feat: improve peer scoring algo

* debug

* debug

* more debug

* debug TryDiaNext

* remove log

* fix score type

* rever block sync logic

* rever block sync logic

* rever block sync logic

* Add block request log

* Add apply block latency

* add processEpeerEvent log back

* update unit test

* update unit test

---------

Co-authored-by: yzang2019 <zymfrank@gmail.com>
  • Loading branch information
2 people authored and Kbhat1 committed Nov 13, 2024
1 parent d877ef1 commit f6d399d
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 70 deletions.
8 changes: 5 additions & 3 deletions internal/blocksync/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,11 @@ func (pool *BlockPool) AddBlock(peerID types.NodeID, block *types.Block, extComm
if peer != nil {
peer.decrPending(blockSize)
}
} else if setBlockResult < 0 {
err := errors.New("bpr requester peer is different from original peer")

// Increment the number of consecutive successful block syncs for the peer
pool.peerManager.IncrementBlockSyncs(peerID)
} else {
err := errors.New("requester is different or block already exists")
pool.sendError(err, peerID)
return fmt.Errorf("%w (peer: %s, requester: %s, block height: %d)", err, peerID, requester.getPeerID(), block.Height)
}
Expand Down Expand Up @@ -358,7 +361,6 @@ func (pool *BlockPool) SetPeerRange(peerID types.NodeID, base int64, height int6

blockSyncPeers := pool.peerManager.GetBlockSyncPeers()
if len(blockSyncPeers) > 0 && !blockSyncPeers[peerID] {
pool.logger.Info(fmt.Sprintf("Skip adding peer %s for blocksync, num of blocksync peers: %d, num of pool peers: %d", peerID, len(blockSyncPeers), len(pool.peers)))
return
}

Expand Down
9 changes: 9 additions & 0 deletions internal/blocksync/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,10 @@ func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, blo
return r.respondToPeer(ctx, msg, envelope.From, blockSyncCh)
case *bcproto.BlockResponse:
block, err := types.BlockFromProto(msg.Block)

r.logger.Info("received block response from peer",
"peer", envelope.From,
"height", block.Height)
if err != nil {
r.logger.Error("failed to convert block from proto",
"peer", envelope.From,
Expand Down Expand Up @@ -495,6 +499,7 @@ func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool, blockSyncCh
var (
trySyncTicker = time.NewTicker(trySyncIntervalMS * time.Millisecond)
switchToConsensusTicker = time.NewTicker(switchToConsensusIntervalSeconds * time.Second)
lastApplyBlockTime = time.Now()

blocksSynced = uint64(0)

Expand Down Expand Up @@ -695,7 +700,11 @@ func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool, blockSyncCh

// TODO: Same thing for app - but we would need a way to get the hash
// without persisting the state.
r.logger.Info(fmt.Sprintf("Requesting block %d from peer took %s", first.Height, time.Since(lastApplyBlockTime)))
startTime := time.Now()
state, err = r.blockExec.ApplyBlock(ctx, state, firstID, first, nil)
r.logger.Info(fmt.Sprintf("ApplyBlock %d took %s", first.Height, time.Since(startTime)))
lastApplyBlockTime = time.Now()
if err != nil {
panic(fmt.Sprintf("failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
}
Expand Down
62 changes: 45 additions & 17 deletions internal/p2p/peermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import (
const (
// retryNever is returned by retryDelay() when retries are disabled.
retryNever time.Duration = math.MaxInt64
// DefaultMutableScore is the default score for a peer during initialization
DefaultMutableScore int64 = 10
)

// PeerStatus is a peer status.
Expand All @@ -47,9 +45,10 @@ const (
type PeerScore uint8

const (
PeerScoreUnconditional PeerScore = math.MaxUint8 // unconditional peers
PeerScorePersistent PeerScore = PeerScoreUnconditional - 1 // persistent peers
MaxPeerScoreNotPersistent PeerScore = PeerScorePersistent - 1
PeerScoreUnconditional PeerScore = math.MaxUint8 // unconditional peers, 255
PeerScorePersistent PeerScore = PeerScoreUnconditional - 1 // persistent peers, 254
MaxPeerScoreNotPersistent PeerScore = PeerScorePersistent - 1 // not persistent peers, 253
DefaultMutableScore PeerScore = MaxPeerScoreNotPersistent - 10 // mutable score, 243
)

// PeerUpdate is a peer update event sent via PeerUpdates.
Expand Down Expand Up @@ -598,6 +597,7 @@ func (m *PeerManager) DialFailed(ctx context.Context, address NodeAddress) error

addressInfo.LastDialFailure = time.Now().UTC()
addressInfo.DialFailures++
peer.ConsecSuccessfulBlocks = 0
// We need to invalidate the cache after score changed
m.store.ranked = nil
if err := m.store.Set(peer); err != nil {
Expand Down Expand Up @@ -845,7 +845,15 @@ func (m *PeerManager) Disconnected(ctx context.Context, peerID types.NodeID) {

// Update score and invalidate cache if a peer got disconnected
if _, ok := m.store.peers[peerID]; ok {
m.store.peers[peerID].NumOfDisconnections++
// check for potential overflow
if m.store.peers[peerID].NumOfDisconnections < math.MaxInt64 {
m.store.peers[peerID].NumOfDisconnections++
} else {
fmt.Printf("Warning: NumOfDisconnections for peer %s has reached its maximum value\n", peerID)
m.store.peers[peerID].NumOfDisconnections = 0
}

m.store.peers[peerID].ConsecSuccessfulBlocks = 0
m.store.ranked = nil
}

Expand Down Expand Up @@ -992,16 +1000,16 @@ func (m *PeerManager) processPeerEvent(ctx context.Context, pu PeerUpdate) {
return
}

if _, ok := m.store.peers[pu.NodeID]; !ok {
m.store.peers[pu.NodeID] = &peerInfo{}
}

switch pu.Status {
case PeerStatusBad:
m.store.peers[pu.NodeID].MutableScore--
case PeerStatusGood:
m.store.peers[pu.NodeID].MutableScore++
}

if _, ok := m.store.peers[pu.NodeID]; !ok {
m.store.peers[pu.NodeID] = &peerInfo{}
}
// Invalidate the cache after score changed
m.store.ranked = nil
}
Expand Down Expand Up @@ -1350,7 +1358,9 @@ type peerInfo struct {
Height int64
FixedScore PeerScore // mainly for tests

MutableScore int64 // updated by router
MutableScore PeerScore // updated by router

ConsecSuccessfulBlocks int64
}

// peerInfoFromProto converts a Protobuf PeerInfo message to a peerInfo,
Expand Down Expand Up @@ -1408,27 +1418,35 @@ func (p *peerInfo) Copy() peerInfo {
// Score calculates a score for the peer. Higher-scored peers will be
// preferred over lower scores.
func (p *peerInfo) Score() PeerScore {
// Use predetermined scores if set
if p.FixedScore > 0 {
return p.FixedScore
}
if p.Unconditional {
return PeerScoreUnconditional
}

score := p.MutableScore
score := int64(p.MutableScore)
if p.Persistent || p.BlockSync {
score = int64(PeerScorePersistent)
}

// Add points for block sync performance
score += p.ConsecSuccessfulBlocks / 5

// Penalize for dial failures with time decay
for _, addr := range p.AddressInfo {
// DialFailures is reset when dials succeed, so this
// is either the number of dial failures or 0.
score -= int64(addr.DialFailures)
failureScore := float64(addr.DialFailures) * math.Exp(-0.1*float64(time.Since(addr.LastDialFailure).Hours()))
score -= int64(failureScore)
}

// We consider lowering the score for every 3 disconnection events
score -= p.NumOfDisconnections / 3
// Penalize for disconnections with time decay
timeSinceLastDisconnect := time.Since(p.LastConnected)
decayFactor := math.Exp(-0.1 * timeSinceLastDisconnect.Hours())
effectiveDisconnections := int64(float64(p.NumOfDisconnections) * decayFactor)
score -= effectiveDisconnections / 3

// Cap score for non-persistent peers
if !p.Persistent && score > int64(MaxPeerScoreNotPersistent) {
score = int64(MaxPeerScoreNotPersistent)
}
Expand Down Expand Up @@ -1535,3 +1553,13 @@ func (m *PeerManager) MarkReadyConnected(nodeId types.NodeID) {
m.ready[nodeId] = true
m.connected[nodeId] = true
}

func (m *PeerManager) IncrementBlockSyncs(peerID types.NodeID) {
m.mtx.Lock()
defer m.mtx.Unlock()

if peer, ok := m.store.peers[peerID]; ok {
peer.ConsecSuccessfulBlocks++
m.store.ranked = nil
}
}
38 changes: 21 additions & 17 deletions internal/p2p/peermanager_scoring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package p2p

import (
"context"
"github.com/tendermint/tendermint/libs/log"
"strings"
"testing"
"time"

"github.com/tendermint/tendermint/libs/log"

"github.com/stretchr/testify/require"
dbm "github.com/tendermint/tm-db"

Expand Down Expand Up @@ -44,26 +45,29 @@ func TestPeerScoring(t *testing.T) {
NodeID: id,
Status: PeerStatusGood,
})
require.EqualValues(t, defaultScore+int64(i), peerManager.Scores()[id])
require.EqualValues(t, defaultScore+PeerScore(i), peerManager.Scores()[id])
}
// watch the corresponding decreases respond to update
for i := 1; i < 10; i++ {
peerManager.processPeerEvent(ctx, PeerUpdate{
NodeID: id,
Status: PeerStatusBad,
})
require.EqualValues(t, DefaultMutableScore+int64(9)-int64(i), peerManager.Scores()[id])
require.EqualValues(t, DefaultMutableScore+PeerScore(9)-PeerScore(i), peerManager.Scores()[id])
}

// Dial failure should decrease score
_ = peerManager.DialFailed(ctx, NodeAddress{NodeID: id, Protocol: "memory"})
require.EqualValues(t, DefaultMutableScore-1, peerManager.Scores()[id])
addr := NodeAddress{NodeID: id, Protocol: "memory"}
_ = peerManager.DialFailed(ctx, addr)
_ = peerManager.DialFailed(ctx, addr)
_ = peerManager.DialFailed(ctx, addr)
require.EqualValues(t, DefaultMutableScore-2, peerManager.Scores()[id])

// Disconnect every 3 times should also decrease score
for i := 1; i < 7; i++ {
peerManager.Disconnected(ctx, id)
}
require.EqualValues(t, DefaultMutableScore-3, peerManager.Scores()[id])
require.EqualValues(t, DefaultMutableScore-2, peerManager.Scores()[id])
})
t.Run("AsynchronousIncrement", func(t *testing.T) {
start := peerManager.Scores()[id]
Expand Down Expand Up @@ -92,18 +96,18 @@ func TestPeerScoring(t *testing.T) {
"startAt=%d score=%d", start, peerManager.Scores()[id])
})
t.Run("TestNonPersistantPeerUpperBound", func(t *testing.T) {
start := int64(peerManager.Scores()[id] + 1)
for i := start; i <= int64(PeerScorePersistent)+start; i++ {
peerManager.processPeerEvent(ctx, PeerUpdate{
NodeID: id,
Status: PeerStatusGood,
})
// Reset peer state to remove any previous penalties
peerManager.store.peers[id] = &peerInfo{
ID: id,
MutableScore: DefaultMutableScore,
}

if i >= int64(PeerScorePersistent) {
require.EqualValues(t, MaxPeerScoreNotPersistent, peerManager.Scores()[id])
} else {
require.EqualValues(t, i, peerManager.Scores()[id])
}
// Add successful blocks to increase score
for i := 0; i < 100; i++ {
peerManager.IncrementBlockSyncs(id)
}

// Score should be capped at MaxPeerScoreNotPersistent
require.EqualValues(t, MaxPeerScoreNotPersistent, peerManager.Scores()[id])
})
}
Loading

0 comments on commit f6d399d

Please sign in to comment.