Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use peermanager scores for blocksync peers and don't error out on block mismatch #162

Merged
merged 21 commits into from
Oct 31, 2023
Merged
40 changes: 38 additions & 2 deletions internal/blocksync/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import (
"context"
"errors"
"fmt"
"github.com/tendermint/tendermint/internal/p2p"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: fix lint issue

"math"
"math/rand"
"sort"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -80,6 +83,7 @@ type BlockPool struct {
height int64 // the lowest key in requesters.
// peers
peers map[types.NodeID]*bpPeer
peerManager *p2p.PeerManager
maxPeerHeight int64 // the biggest reported height

// atomic
Expand All @@ -101,8 +105,8 @@ func NewBlockPool(
start int64,
requestsCh chan<- BlockRequest,
errorsCh chan<- peerError,
peerManager *p2p.PeerManager,
) *BlockPool {

bp := &BlockPool{
logger: logger,
peers: make(map[types.NodeID]*bpPeer),
Expand All @@ -113,6 +117,7 @@ func NewBlockPool(
requestsCh: requestsCh,
errorsCh: errorsCh,
lastSyncRate: 0,
peerManager: peerManager,
}
bp.BaseService = *service.NewBaseService(logger, "BlockPool", bp)
return bp
Expand Down Expand Up @@ -408,13 +413,44 @@ func (pool *BlockPool) updateMaxPeerHeight() {
pool.maxPeerHeight = max
}

func (pool *BlockPool) getSortedPeers(peers map[types.NodeID]*bpPeer) []types.NodeID {
// Generate a sorted list
sortedPeers := make([]types.NodeID, 0, len(peers))

for peer := range peers {
sortedPeers = append(sortedPeers, peer)
}
// Sort from high to low score
sort.Slice(sortedPeers, func(i, j int) bool {
return pool.peerManager.Score(sortedPeers[i]) > pool.peerManager.Score(sortedPeers[j])
})
return sortedPeers
}

// Pick an available peer with the given height available.
// If no peers are available, returns nil.
func (pool *BlockPool) pickIncrAvailablePeer(height int64) *bpPeer {
pool.mtx.Lock()
defer pool.mtx.Unlock()

for _, peer := range pool.peers {
// Generate a sorted list
sortedPeers := pool.getSortedPeers(pool.peers)
var goodPeers []types.NodeID
// Remove peers with 0 score and shuffle list
for _, peer := range sortedPeers {
// We only want to work with peers that are ready & connected (not dialing)
if pool.peerManager.State(peer) == "ready,connected" {
goodPeers = append(goodPeers, peer)
}
if pool.peerManager.Score(peer) == 0 {
break
}
}
rand.Seed(time.Now().UnixNano())
rand.Shuffle(len(goodPeers), func(i, j int) { goodPeers[i], goodPeers[j] = goodPeers[j], goodPeers[i] })

for _, nodeId := range sortedPeers {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few further optimizations we can do:

  1. We should probably avoid using the low score peers, for example the ones with score 0
  2. We should do some random shuffling so that we don't always targeting the same few top peers

peer := pool.peers[nodeId]
if peer.didTimeout {
pool.removePeer(peer.id)
continue
Expand Down
78 changes: 67 additions & 11 deletions internal/blocksync/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@ package blocksync

import (
"context"
"crypto/rand"
"encoding/hex"
"fmt"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/crypto/ed25519"
"github.com/tendermint/tendermint/internal/p2p"
dbm "github.com/tendermint/tm-db"
mrand "math/rand"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/tendermint/tendermint/libs/log"
tmrand "github.com/tendermint/tendermint/libs/rand"
"github.com/tendermint/tendermint/types"
)

Expand All @@ -24,6 +28,7 @@ type testPeer struct {
base int64
height int64
inputChan chan inputData // make sure each peer's data is sequential
score p2p.PeerScore
}

type inputData struct {
Expand Down Expand Up @@ -70,17 +75,42 @@ func (ps testPeers) stop() {
func makePeers(numPeers int, minHeight, maxHeight int64) testPeers {
peers := make(testPeers, numPeers)
for i := 0; i < numPeers; i++ {
peerID := types.NodeID(tmrand.Str(12))
bytes := make([]byte, 20)
if _, err := rand.Read(bytes); err != nil {
panic(err)
}
peerID := types.NodeID(hex.EncodeToString(bytes))
height := minHeight + mrand.Int63n(maxHeight-minHeight)
base := minHeight + int64(i)
if base > height {
base = height
}
peers[peerID] = testPeer{peerID, base, height, make(chan inputData, 10)}
peers[peerID] = testPeer{peerID, base, height, make(chan inputData, 10), 1}
}
return peers
}

func makePeerManager(peers map[types.NodeID]testPeer) *p2p.PeerManager {
selfKey := ed25519.GenPrivKeyFromSecret([]byte{0xf9, 0x1b, 0x08, 0xaa, 0x38, 0xee, 0x34, 0xdd})
selfID := types.NodeIDFromPubKey(selfKey.PubKey())
peerScores := make(map[types.NodeID]p2p.PeerScore)
for nodeId, peer := range peers {
peerScores[nodeId] = peer.score

}
peerManager, _ := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
PeerScores: peerScores,
MaxConnected: 1,
MaxConnectedUpgrade: 2,
}, p2p.NopMetrics())
for nodeId, _ := range peers {
_, err := peerManager.Add(p2p.NodeAddress{Protocol: "memory", NodeID: nodeId})
if err != nil {
panic(err)
}
}
return peerManager
}
func TestBlockPoolBasic(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -89,7 +119,7 @@ func TestBlockPoolBasic(t *testing.T) {
peers := makePeers(10, start+1, 1000)
errorsCh := make(chan peerError, 1000)
requestsCh := make(chan BlockRequest, 1000)
pool := NewBlockPool(log.NewNopLogger(), start, requestsCh, errorsCh)
pool := NewBlockPool(log.NewNopLogger(), start, requestsCh, errorsCh, makePeerManager(peers))

if err := pool.Start(ctx); err != nil {
t.Error(err)
Expand Down Expand Up @@ -147,7 +177,7 @@ func TestBlockPoolTimeout(t *testing.T) {
peers := makePeers(10, start+1, 1000)
errorsCh := make(chan peerError, 1000)
requestsCh := make(chan BlockRequest, 1000)
pool := NewBlockPool(logger, start, requestsCh, errorsCh)
pool := NewBlockPool(logger, start, requestsCh, errorsCh, makePeerManager(peers))
err := pool.Start(ctx)
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -203,14 +233,19 @@ func TestBlockPoolRemovePeer(t *testing.T) {

peers := make(testPeers, 10)
for i := 0; i < 10; i++ {
peerID := types.NodeID(fmt.Sprintf("%d", i+1))
var peerID types.NodeID
if i+1 == 10 {
peerID = types.NodeID(strings.Repeat(fmt.Sprintf("%d", i+1), 20))
} else {
peerID = types.NodeID(strings.Repeat(fmt.Sprintf("%d", i+1), 40))
}
height := int64(i + 1)
peers[peerID] = testPeer{peerID, 0, height, make(chan inputData)}
peers[peerID] = testPeer{peerID, 0, height, make(chan inputData), 1}
}
requestsCh := make(chan BlockRequest)
errorsCh := make(chan peerError)

pool := NewBlockPool(log.NewNopLogger(), 1, requestsCh, errorsCh)
pool := NewBlockPool(log.NewNopLogger(), 1, requestsCh, errorsCh, makePeerManager(peers))
err := pool.Start(ctx)
require.NoError(t, err)
t.Cleanup(func() { cancel(); pool.Wait() })
Expand All @@ -225,7 +260,7 @@ func TestBlockPoolRemovePeer(t *testing.T) {
assert.NotPanics(t, func() { pool.RemovePeer(types.NodeID("Superman")) })

// remove peer with biggest height
pool.RemovePeer(types.NodeID("10"))
pool.RemovePeer(types.NodeID(strings.Repeat("10", 20)))
assert.EqualValues(t, 9, pool.MaxPeerHeight())

// remove all peers
Expand All @@ -235,3 +270,24 @@ func TestBlockPoolRemovePeer(t *testing.T) {

assert.EqualValues(t, 0, pool.MaxPeerHeight())
}

func TestSortedPeers(t *testing.T) {
peers := make(testPeers, 10)
peerIdA := types.NodeID(strings.Repeat("a", 40))
peerIdB := types.NodeID(strings.Repeat("b", 40))
peerIdC := types.NodeID(strings.Repeat("c", 40))

peers[peerIdA] = testPeer{peerIdA, 0, 1, make(chan inputData), 11}
peers[peerIdB] = testPeer{peerIdA, 0, 1, make(chan inputData), 10}
peers[peerIdC] = testPeer{peerIdA, 0, 1, make(chan inputData), 13}

requestsCh := make(chan BlockRequest)
errorsCh := make(chan peerError)
pool := NewBlockPool(log.NewNopLogger(), 1, requestsCh, errorsCh, makePeerManager(peers))
// add peers
for peerID, peer := range peers {
pool.SetPeerRange(peerID, peer.base, peer.height)
}
// Peers should be sorted by score via peerManager
assert.Equal(t, []types.NodeID{peerIdC, peerIdA, peerIdB}, pool.getSortedPeers(pool.peers))
}
9 changes: 6 additions & 3 deletions internal/blocksync/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ type Reactor struct {
consReactor consensusReactor
blockSync *atomicBool

peerEvents p2p.PeerEventSubscriber
channel *p2p.Channel
peerEvents p2p.PeerEventSubscriber
peerManager *p2p.PeerManager
channel *p2p.Channel

requestsCh <-chan BlockRequest
errorsCh <-chan peerError
Expand All @@ -105,6 +106,7 @@ func NewReactor(
store *store.BlockStore,
consReactor consensusReactor,
peerEvents p2p.PeerEventSubscriber,
peerManager *p2p.PeerManager,
blockSync bool,
metrics *consensus.Metrics,
eventBus *eventbus.EventBus,
Expand All @@ -119,6 +121,7 @@ func NewReactor(
consReactor: consReactor,
blockSync: newAtomicBool(blockSync),
peerEvents: peerEvents,
peerManager: peerManager,
metrics: metrics,
eventBus: eventBus,
restartCh: restartCh,
Expand Down Expand Up @@ -159,7 +162,7 @@ func (r *Reactor) OnStart(ctx context.Context) error {

requestsCh := make(chan BlockRequest, maxTotalRequesters)
errorsCh := make(chan peerError, maxPeerErrBuffer) // NOTE: The capacity should be larger than the peer count.
r.pool = NewBlockPool(r.logger, startHeight, requestsCh, errorsCh)
r.pool = NewBlockPool(r.logger, startHeight, requestsCh, errorsCh, r.peerManager)
r.requestsCh = requestsCh
r.errorsCh = errorsCh

Expand Down
Loading
Loading