Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Maximize Peer Capacity When Syncing #13820

Merged
merged 11 commits into from
Mar 30, 2024
65 changes: 57 additions & 8 deletions beacon-chain/sync/initial-sync/blocks_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func (f *blocksFetcher) handleRequest(ctx context.Context, start primitives.Slot

response.bwb, response.pid, response.err = f.fetchBlocksFromPeer(ctx, start, count, peers)
if response.err == nil {
bwb, err := f.fetchBlobsFromPeer(ctx, response.bwb, response.pid)
bwb, err := f.fetchBlobsFromPeer(ctx, response.bwb, response.pid, peers)
if err != nil {
response.err = err
}
Expand All @@ -336,6 +336,11 @@ func (f *blocksFetcher) fetchBlocksFromPeer(
Count: count,
Step: 1,
}
bestPeers := f.hasSufficientBandwidth(peers, req.Count)
// We append the best peers to the front so that higher capacity
// peers are dialed first.
peers = append(bestPeers, peers...)
peers = dedupPeers(peers)
for i := 0; i < len(peers); i++ {
p := peers[i]
blocks, err := f.requestBlocks(ctx, req, p)
Expand Down Expand Up @@ -472,7 +477,7 @@ func missingCommitError(root [32]byte, slot primitives.Slot, missing [][]byte) e
}

// fetchBlobsFromPeer fetches blocks from a single randomly selected peer.
func (f *blocksFetcher) fetchBlobsFromPeer(ctx context.Context, bwb []blocks2.BlockWithROBlobs, pid peer.ID) ([]blocks2.BlockWithROBlobs, error) {
func (f *blocksFetcher) fetchBlobsFromPeer(ctx context.Context, bwb []blocks2.BlockWithROBlobs, pid peer.ID, peers []peer.ID) ([]blocks2.BlockWithROBlobs, error) {
ctx, span := trace.StartSpan(ctx, "initialsync.fetchBlobsFromPeer")
defer span.End()
if slots.ToEpoch(f.clock.CurrentSlot()) < params.BeaconConfig().DenebForkEpoch {
Expand All @@ -487,13 +492,30 @@ func (f *blocksFetcher) fetchBlobsFromPeer(ctx context.Context, bwb []blocks2.Bl
if req == nil {
return bwb, nil
}
// Request blobs from the same peer that gave us the blob batch.
blobs, err := f.requestBlobs(ctx, req, pid)
if err != nil {
return nil, errors.Wrap(err, "could not request blobs by range")
peers = f.filterPeers(ctx, peers, peersPercentagePerRequest)
// We dial the initial peer first to ensure that we get the desired set of blobs.
wantedPeers := append([]peer.ID{pid}, peers...)
bestPeers := f.hasSufficientBandwidth(wantedPeers, req.Count)
// We append the best peers to the front so that higher capacity
// peers are dialed first. If all of them fail, we fallback to the
// initial peer we wanted to request blobs from.
peers = append(bestPeers, pid)
for i := 0; i < len(peers); i++ {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the addition of hasSufficientBandwith (sic), to fall back to other peers if bandwidth isn't available. But I worry that this loop can burn through too much peer capacity in unhappy cases (eg bad block batch, all peers fail to give corresponding blobs). Could we just try the first best peer and then fail the batch as usual?

Copy link
Member Author

@nisdas nisdas Mar 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous behaviour was to try all peers rather than failing at one peer. If we do fail after 1 peer, it might make this more fragile. The main reason for our blob issues has been that we wait for 1 peer. If you notice for blocks we have always tried to dial many peers before exiting the method

p := peers[i]
blobs, err := f.requestBlobs(ctx, req, p)
if err != nil {
log.WithField("peer", p).WithError(err).Debug("Could not request blobs by range from peer")
continue
}
f.p2p.Peers().Scorers().BlockProviderScorer().Touch(p)
robs, err := verifyAndPopulateBlobs(bwb, blobs, blobWindowStart)
if err != nil {
log.WithField("peer", p).WithError(err).Debug("Invalid BeaconBlobsByRange response")
continue
}
return robs, err
}
f.p2p.Peers().Scorers().BlockProviderScorer().Touch(pid)
return verifyAndPopulateBlobs(bwb, blobs, blobWindowStart)
return nil, errNoPeersAvailable
}

// requestBlocks is a wrapper for handling BeaconBlocksByRangeRequest requests/streams.
Expand Down Expand Up @@ -606,6 +628,18 @@ func (f *blocksFetcher) waitForBandwidth(pid peer.ID, count uint64) error {
return nil
}

func (f *blocksFetcher) hasSufficientBandwidth(peers []peer.ID, count uint64) []peer.ID {
filteredPeers := []peer.ID{}
for _, p := range peers {
if uint64(f.rateLimiter.Remaining(p.String())) < count {
continue
}
copiedP := p
filteredPeers = append(filteredPeers, copiedP)
}
return filteredPeers
}

// Determine how long it will take for us to have the required number of blocks allowed by our rate limiter.
// We do this by calculating the duration till the rate limiter can request these blocks without exceeding
// the provided bandwidth limits per peer.
Expand All @@ -626,3 +660,18 @@ func timeToWait(wanted, rem, capacity int64, timeTillEmpty time.Duration) time.D
expectedTime := int64(timeTillEmpty) * blocksNeeded / currentNumBlks
return time.Duration(expectedTime)
}

// deduplicates the provided peer list.
func dedupPeers(peers []peer.ID) []peer.ID {
newPeerList := make([]peer.ID, 0, len(peers))
peerExists := make(map[peer.ID]bool)

for i := range peers {
if peerExists[peers[i]] {
continue
}
newPeerList = append(newPeerList, peers[i])
peerExists[peers[i]] = true
}
return newPeerList
}
20 changes: 20 additions & 0 deletions beacon-chain/sync/initial-sync/blocks_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

libp2pcore "github.com/libp2p/go-libp2p/core"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
mock "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/testing"
dbtest "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/testing"
p2pm "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
Expand Down Expand Up @@ -1166,3 +1167,22 @@ func TestBatchLimit(t *testing.T) {

assert.Equal(t, params.BeaconConfig().MaxRequestBlocksDeneb, uint64(maxBatchLimit()))
}

func TestBlockFetcher_HasSufficientBandwidth(t *testing.T) {
bf := newBlocksFetcher(context.Background(), &blocksFetcherConfig{})
currCap := bf.rateLimiter.Capacity()
wantedAmt := currCap - 100
bf.rateLimiter.Add(peer.ID("a").String(), wantedAmt)
bf.rateLimiter.Add(peer.ID("c").String(), wantedAmt)
bf.rateLimiter.Add(peer.ID("f").String(), wantedAmt)
bf.rateLimiter.Add(peer.ID("d").String(), wantedAmt)

receivedPeers := bf.hasSufficientBandwidth([]peer.ID{"a", "b", "c", "d", "e", "f"}, 110)
for _, p := range receivedPeers {
switch p {
case "a", "c", "f", "d":
t.Errorf("peer has exceeded capacity: %s", p)
}
}
assert.Equal(t, 2, len(receivedPeers))
}
4 changes: 2 additions & 2 deletions beacon-chain/sync/initial-sync/blocks_fetcher_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func (f *blocksFetcher) findForkWithPeer(ctx context.Context, pid peer.ID, slot
}
// We need to fetch the blobs for the given alt-chain if any exist, so that we can try to verify and import
// the blocks.
bwb, err := f.fetchBlobsFromPeer(ctx, altBlocks, pid)
bwb, err := f.fetchBlobsFromPeer(ctx, altBlocks, pid, []peer.ID{pid})
if err != nil {
return nil, errors.Wrap(err, "unable to retrieve blobs for blocks found in findForkWithPeer")
}
Expand All @@ -302,7 +302,7 @@ func (f *blocksFetcher) findAncestor(ctx context.Context, pid peer.ID, b interfa
if err != nil {
return nil, errors.Wrap(err, "received invalid blocks in findAncestor")
}
bwb, err = f.fetchBlobsFromPeer(ctx, bwb, pid)
bwb, err = f.fetchBlobsFromPeer(ctx, bwb, pid, []peer.ID{pid})
if err != nil {
return nil, errors.Wrap(err, "unable to retrieve blobs for blocks found in findAncestor")
}
Expand Down
Loading