Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Peer Score Algorithm #248

Merged
merged 18 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions internal/blocksync/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,11 @@
if peer != nil {
peer.decrPending(blockSize)
}
} else if setBlockResult < 0 {
err := errors.New("bpr requester peer is different from original peer")

// Increment the number of consecutive successful block syncs for the peer
pool.peerManager.IncrementBlockSyncs(peerID)
} else {
err := errors.New("requester is different or block already exists")

Check warning on line 334 in internal/blocksync/pool.go

View check run for this annotation

Codecov / codecov/patch

internal/blocksync/pool.go#L333-L334

Added lines #L333 - L334 were not covered by tests
pool.sendError(err, peerID)
return fmt.Errorf("%w (peer: %s, requester: %s, block height: %d)", err, peerID, requester.getPeerID(), block.Height)
}
Expand Down Expand Up @@ -358,7 +361,6 @@

blockSyncPeers := pool.peerManager.GetBlockSyncPeers()
if len(blockSyncPeers) > 0 && !blockSyncPeers[peerID] {
pool.logger.Info(fmt.Sprintf("Skip adding peer %s for blocksync, num of blocksync peers: %d, num of pool peers: %d", peerID, len(blockSyncPeers), len(pool.peers)))
return
}

Expand Down
9 changes: 9 additions & 0 deletions internal/blocksync/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,10 @@ func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, blo
return r.respondToPeer(ctx, msg, envelope.From, blockSyncCh)
case *bcproto.BlockResponse:
block, err := types.BlockFromProto(msg.Block)

r.logger.Info("received block response from peer",
"peer", envelope.From,
"height", block.Height)
if err != nil {
r.logger.Error("failed to convert block from proto",
"peer", envelope.From,
Expand Down Expand Up @@ -495,6 +499,7 @@ func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool, blockSyncCh
var (
trySyncTicker = time.NewTicker(trySyncIntervalMS * time.Millisecond)
switchToConsensusTicker = time.NewTicker(switchToConsensusIntervalSeconds * time.Second)
lastApplyBlockTime = time.Now()

blocksSynced = uint64(0)

Expand Down Expand Up @@ -695,7 +700,11 @@ func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool, blockSyncCh

// TODO: Same thing for app - but we would need a way to get the hash
// without persisting the state.
r.logger.Info(fmt.Sprintf("Requesting block %d from peer took %s", first.Height, time.Since(lastApplyBlockTime)))
startTime := time.Now()
state, err = r.blockExec.ApplyBlock(ctx, state, firstID, first, nil)
r.logger.Info(fmt.Sprintf("ApplyBlock %d took %s", first.Height, time.Since(startTime)))
lastApplyBlockTime = time.Now()
if err != nil {
panic(fmt.Sprintf("failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
}
Expand Down
62 changes: 45 additions & 17 deletions internal/p2p/peermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
const (
// retryNever is returned by retryDelay() when retries are disabled.
retryNever time.Duration = math.MaxInt64
// DefaultMutableScore is the default score for a peer during initialization
DefaultMutableScore int64 = 10
)

// PeerStatus is a peer status.
Expand All @@ -47,9 +45,10 @@
type PeerScore uint8

const (
PeerScoreUnconditional PeerScore = math.MaxUint8 // unconditional peers
PeerScorePersistent PeerScore = PeerScoreUnconditional - 1 // persistent peers
MaxPeerScoreNotPersistent PeerScore = PeerScorePersistent - 1
PeerScoreUnconditional PeerScore = math.MaxUint8 // unconditional peers, 255
PeerScorePersistent PeerScore = PeerScoreUnconditional - 1 // persistent peers, 254
MaxPeerScoreNotPersistent PeerScore = PeerScorePersistent - 1 // not persistent peers, 253
DefaultMutableScore PeerScore = MaxPeerScoreNotPersistent - 10 // mutable score, 243
)

// PeerUpdate is a peer update event sent via PeerUpdates.
Expand Down Expand Up @@ -598,6 +597,7 @@

addressInfo.LastDialFailure = time.Now().UTC()
addressInfo.DialFailures++
peer.ConsecSuccessfulBlocks = 0
// We need to invalidate the cache after score changed
m.store.ranked = nil
if err := m.store.Set(peer); err != nil {
Expand Down Expand Up @@ -845,7 +845,15 @@

// Update score and invalidate cache if a peer got disconnected
if _, ok := m.store.peers[peerID]; ok {
m.store.peers[peerID].NumOfDisconnections++
// check for potential overflow
if m.store.peers[peerID].NumOfDisconnections < math.MaxInt64 {
m.store.peers[peerID].NumOfDisconnections++
} else {
fmt.Printf("Warning: NumOfDisconnections for peer %s has reached its maximum value\n", peerID)
m.store.peers[peerID].NumOfDisconnections = 0
}

Check warning on line 854 in internal/p2p/peermanager.go

View check run for this annotation

Codecov / codecov/patch

internal/p2p/peermanager.go#L852-L854

Added lines #L852 - L854 were not covered by tests

m.store.peers[peerID].ConsecSuccessfulBlocks = 0
m.store.ranked = nil
}

Expand Down Expand Up @@ -992,16 +1000,16 @@
return
}

if _, ok := m.store.peers[pu.NodeID]; !ok {
m.store.peers[pu.NodeID] = &peerInfo{}
}

switch pu.Status {
case PeerStatusBad:
m.store.peers[pu.NodeID].MutableScore--
case PeerStatusGood:
m.store.peers[pu.NodeID].MutableScore++
}

if _, ok := m.store.peers[pu.NodeID]; !ok {
m.store.peers[pu.NodeID] = &peerInfo{}
}

Check warning on line 1012 in internal/p2p/peermanager.go

View check run for this annotation

Codecov / codecov/patch

internal/p2p/peermanager.go#L1011-L1012

Added lines #L1011 - L1012 were not covered by tests
// Invalidate the cache after score changed
m.store.ranked = nil
}
Expand Down Expand Up @@ -1350,7 +1358,9 @@
Height int64
FixedScore PeerScore // mainly for tests

MutableScore int64 // updated by router
MutableScore PeerScore // updated by router

ConsecSuccessfulBlocks int64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add another one called NumOfTimeouts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the timeout will trigger the Disconnected() method which has counted to decrease score. the flow is:

timeout detection -> peer.didTimeout = true -> pool.removePeer(id, true) -> peerManager.Disconnected()

}

// peerInfoFromProto converts a Protobuf PeerInfo message to a peerInfo,
Expand Down Expand Up @@ -1408,27 +1418,35 @@
// Score calculates a score for the peer. Higher-scored peers will be
// preferred over lower scores.
func (p *peerInfo) Score() PeerScore {
// Use predetermined scores if set
if p.FixedScore > 0 {
return p.FixedScore
}
if p.Unconditional {
return PeerScoreUnconditional
}

score := p.MutableScore
score := int64(p.MutableScore)
if p.Persistent || p.BlockSync {
score = int64(PeerScorePersistent)
}

// Add points for block sync performance
score += p.ConsecSuccessfulBlocks / 5

// Penalize for dial failures with time decay
for _, addr := range p.AddressInfo {
// DialFailures is reset when dials succeed, so this
// is either the number of dial failures or 0.
score -= int64(addr.DialFailures)
failureScore := float64(addr.DialFailures) * math.Exp(-0.1*float64(time.Since(addr.LastDialFailure).Hours()))
score -= int64(failureScore)
}

// We consider lowering the score for every 3 disconnection events
score -= p.NumOfDisconnections / 3
// Penalize for disconnections with time decay
timeSinceLastDisconnect := time.Since(p.LastConnected)
decayFactor := math.Exp(-0.1 * timeSinceLastDisconnect.Hours())
effectiveDisconnections := int64(float64(p.NumOfDisconnections) * decayFactor)
score -= effectiveDisconnections / 3

// Cap score for non-persistent peers
if !p.Persistent && score > int64(MaxPeerScoreNotPersistent) {
score = int64(MaxPeerScoreNotPersistent)
}
Expand Down Expand Up @@ -1535,3 +1553,13 @@
m.ready[nodeId] = true
m.connected[nodeId] = true
}

func (m *PeerManager) IncrementBlockSyncs(peerID types.NodeID) {
m.mtx.Lock()
defer m.mtx.Unlock()

if peer, ok := m.store.peers[peerID]; ok {
peer.ConsecSuccessfulBlocks++
m.store.ranked = nil
}
}
38 changes: 21 additions & 17 deletions internal/p2p/peermanager_scoring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package p2p

import (
"context"
"github.com/tendermint/tendermint/libs/log"
"strings"
"testing"
"time"

"github.com/tendermint/tendermint/libs/log"

"github.com/stretchr/testify/require"
dbm "github.com/tendermint/tm-db"

Expand Down Expand Up @@ -44,26 +45,29 @@ func TestPeerScoring(t *testing.T) {
NodeID: id,
Status: PeerStatusGood,
})
require.EqualValues(t, defaultScore+int64(i), peerManager.Scores()[id])
require.EqualValues(t, defaultScore+PeerScore(i), peerManager.Scores()[id])
}
// watch the corresponding decreases respond to update
for i := 1; i < 10; i++ {
peerManager.processPeerEvent(ctx, PeerUpdate{
NodeID: id,
Status: PeerStatusBad,
})
require.EqualValues(t, DefaultMutableScore+int64(9)-int64(i), peerManager.Scores()[id])
require.EqualValues(t, DefaultMutableScore+PeerScore(9)-PeerScore(i), peerManager.Scores()[id])
}

// Dial failure should decrease score
_ = peerManager.DialFailed(ctx, NodeAddress{NodeID: id, Protocol: "memory"})
require.EqualValues(t, DefaultMutableScore-1, peerManager.Scores()[id])
addr := NodeAddress{NodeID: id, Protocol: "memory"}
_ = peerManager.DialFailed(ctx, addr)
_ = peerManager.DialFailed(ctx, addr)
_ = peerManager.DialFailed(ctx, addr)
require.EqualValues(t, DefaultMutableScore-2, peerManager.Scores()[id])

// Disconnect every 3 times should also decrease score
for i := 1; i < 7; i++ {
peerManager.Disconnected(ctx, id)
}
require.EqualValues(t, DefaultMutableScore-3, peerManager.Scores()[id])
require.EqualValues(t, DefaultMutableScore-2, peerManager.Scores()[id])
})
t.Run("AsynchronousIncrement", func(t *testing.T) {
start := peerManager.Scores()[id]
Expand Down Expand Up @@ -92,18 +96,18 @@ func TestPeerScoring(t *testing.T) {
"startAt=%d score=%d", start, peerManager.Scores()[id])
})
t.Run("TestNonPersistantPeerUpperBound", func(t *testing.T) {
start := int64(peerManager.Scores()[id] + 1)
for i := start; i <= int64(PeerScorePersistent)+start; i++ {
peerManager.processPeerEvent(ctx, PeerUpdate{
NodeID: id,
Status: PeerStatusGood,
})
// Reset peer state to remove any previous penalties
peerManager.store.peers[id] = &peerInfo{
ID: id,
MutableScore: DefaultMutableScore,
}

if i >= int64(PeerScorePersistent) {
require.EqualValues(t, MaxPeerScoreNotPersistent, peerManager.Scores()[id])
} else {
require.EqualValues(t, i, peerManager.Scores()[id])
}
// Add successful blocks to increase score
for i := 0; i < 100; i++ {
peerManager.IncrementBlockSyncs(id)
}

// Score should be capped at MaxPeerScoreNotPersistent
require.EqualValues(t, MaxPeerScoreNotPersistent, peerManager.Scores()[id])
})
}
Loading
Loading