-
Notifications
You must be signed in to change notification settings - Fork 310
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(p2p): batch request response (#11331)
- Loading branch information
Showing
9 changed files
with
782 additions
and
27 deletions.
There are no files selected for viewing
178 changes: 178 additions & 0 deletions
178
yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.test.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,178 @@ | ||
import { describe, expect, it, jest } from '@jest/globals'; | ||
import { createSecp256k1PeerId } from '@libp2p/peer-id-factory'; | ||
import { type Libp2p } from 'libp2p'; | ||
|
||
import { BatchConnectionSampler } from './batch_connection_sampler.js'; | ||
import { ConnectionSampler, type RandomSampler } from './connection_sampler.js'; | ||
|
||
describe('BatchConnectionSampler', () => { | ||
const mockRandomSampler = { | ||
random: jest.fn(), | ||
} as jest.Mocked<RandomSampler>; | ||
|
||
let peers: Awaited<ReturnType<typeof createSecp256k1PeerId>>[]; | ||
let libp2p: jest.Mocked<Libp2p>; | ||
let connectionSampler: ConnectionSampler; | ||
|
||
beforeEach(async () => { | ||
jest.clearAllMocks(); | ||
|
||
// Create a set of test peers | ||
peers = await Promise.all(new Array(5).fill(0).map(() => createSecp256k1PeerId())); | ||
|
||
// Mock libp2p to return our test peers | ||
libp2p = { | ||
getPeers: jest.fn().mockReturnValue(peers), | ||
} as unknown as jest.Mocked<Libp2p>; | ||
|
||
// Create a real connection sampler with mocked random sampling | ||
connectionSampler = new ConnectionSampler(libp2p, 1000, mockRandomSampler); | ||
}); | ||
|
||
afterEach(async () => { | ||
await connectionSampler.stop(); | ||
}); | ||
|
||
it('initializes with correct number of peers and request distribution', () => { | ||
// Mock random to return sequential indices | ||
mockRandomSampler.random.mockImplementation(_ => 0); | ||
|
||
const sampler = new BatchConnectionSampler(connectionSampler, /* batchSize */ 10, /* maxPeers */ 3); | ||
|
||
expect(sampler.activePeerCount).toBe(3); | ||
expect(sampler.requestsPerBucket).toBe(3); // floor(10/3) = 3 | ||
}); | ||
|
||
it('assigns requests to peers deterministically with wraparound', () => { | ||
// Mock to return first two peers | ||
let callCount = 0; | ||
mockRandomSampler.random.mockImplementation(() => callCount++ % 2); | ||
|
||
// With 5 requests and 2 peers: | ||
// floor(5/2) = 2 requests per peer | ||
// Peer 0: 0,1,4 (gets extra from wraparound) | ||
// Peer 1: 2,3 | ||
const sampler = new BatchConnectionSampler(connectionSampler, /* batchSize */ 5, /* maxPeers */ 2); | ||
const assignments = new Array(5).fill(0).map((_, i) => sampler.getPeerForRequest(i)); | ||
|
||
// First peer gets first bucket and wraparound | ||
expect(assignments[0]).toBe(peers[0]); // First bucket | ||
expect(assignments[1]).toBe(peers[0]); // First bucket | ||
expect(assignments[4]).toBe(peers[0]); // Wraparound | ||
|
||
// Second peer gets middle bucket | ||
expect(assignments[2]).toBe(peers[1]); | ||
expect(assignments[3]).toBe(peers[1]); | ||
}); | ||
|
||
it('handles peer removal and replacement', () => { | ||
mockRandomSampler.random.mockImplementation(_ => { | ||
return 2; // Return index 2 for replacement peer | ||
}); | ||
|
||
// With 4 requests and 2 peers: | ||
// floor(4/2) = 2 requests per peer | ||
// Initial distribution: | ||
// Peer 0: 0,1 | ||
// Peer 1: 2,3 | ||
const sampler = new BatchConnectionSampler(connectionSampler, /* batchSize */ 4, /* maxPeers */ 2); | ||
|
||
const initialPeer = sampler.getPeerForRequest(0); | ||
expect(initialPeer).toBe(peers[0]); | ||
|
||
sampler.removePeerAndReplace(peers[0]); | ||
|
||
// After replacement: | ||
// Replacement peer should handle the same bucket | ||
const newPeer = sampler.getPeerForRequest(0); | ||
expect(newPeer).toBe(peers[2]); | ||
expect(sampler.getPeerForRequest(1)).toBe(peers[2]); | ||
|
||
// Other peer's bucket remains unchanged | ||
expect(sampler.getPeerForRequest(2)).toBe(peers[1]); | ||
expect(sampler.getPeerForRequest(3)).toBe(peers[1]); | ||
}); | ||
|
||
it('handles peer removal and replacement - no replacement available', () => { | ||
mockRandomSampler.random.mockImplementation(() => 2); | ||
const sampler = new BatchConnectionSampler(connectionSampler, /* batchSize */ 4, /* maxPeers */ 2); | ||
|
||
expect(sampler.activePeerCount).toBe(2); | ||
expect(sampler.getPeerForRequest(0)).toBe(peers[0]); | ||
|
||
// Will sample no peers | ||
libp2p.getPeers.mockReturnValue([]); | ||
|
||
// Remove peer 0, its requests will be distributed to peer 1 | ||
sampler.removePeerAndReplace(peers[0]); | ||
// Decrease the number of active peers | ||
expect(sampler.activePeerCount).toBe(1); | ||
|
||
expect(sampler.getPeerForRequest(0)).toBe(peers[1]); | ||
}); | ||
|
||
it('distributes requests according to documentation example', () => { | ||
let callCount = 0; | ||
mockRandomSampler.random.mockImplementation(() => { | ||
if (callCount < 3) { | ||
return callCount++; | ||
} | ||
return 0; | ||
}); | ||
|
||
// Example from doc comment: | ||
// Peers: [P1] [P2] [P3] | ||
// Requests: 0,1,2,9 | 3,4,5 | 6,7,8 | ||
const sampler = new BatchConnectionSampler(connectionSampler, /* batchSize */ 10, /* maxPeers */ 3); | ||
|
||
expect(sampler.activePeerCount).toBe(3); | ||
expect(sampler.requestsPerBucket).toBe(3); // floor(10/3) = 3 | ||
|
||
// P1's bucket (0-2) plus wraparound (9) | ||
expect(sampler.getPeerForRequest(0)).toBe(peers[0]); | ||
expect(sampler.getPeerForRequest(1)).toBe(peers[0]); | ||
expect(sampler.getPeerForRequest(2)).toBe(peers[0]); | ||
expect(sampler.getPeerForRequest(9)).toBe(peers[0]); // Wraparound | ||
|
||
// P2's bucket (3-5) | ||
expect(sampler.getPeerForRequest(3)).toBe(peers[1]); | ||
expect(sampler.getPeerForRequest(4)).toBe(peers[1]); | ||
expect(sampler.getPeerForRequest(5)).toBe(peers[1]); | ||
|
||
// P3's bucket (6-8) | ||
expect(sampler.getPeerForRequest(6)).toBe(peers[2]); | ||
expect(sampler.getPeerForRequest(7)).toBe(peers[2]); | ||
expect(sampler.getPeerForRequest(8)).toBe(peers[2]); | ||
}); | ||
|
||
it('same number of requests per peers', () => { | ||
let callCount = 0; | ||
mockRandomSampler.random.mockImplementation(() => callCount++ % 2); | ||
|
||
const sampler = new BatchConnectionSampler(connectionSampler, /* batchSize */ 2, /* maxPeers */ 2); | ||
expect(sampler.requestsPerBucket).toBe(1); | ||
expect(sampler.activePeerCount).toBe(2); | ||
|
||
expect(sampler.getPeerForRequest(0)).toBe(peers[0]); | ||
expect(sampler.getPeerForRequest(1)).toBe(peers[1]); | ||
}); | ||
|
||
it('handles edge cases, 0 peers, smaller batch than max peers', () => { | ||
mockRandomSampler.random.mockImplementation(() => 0); | ||
libp2p.getPeers.mockReturnValue([]); | ||
|
||
const sampler = new BatchConnectionSampler(connectionSampler, /* batchSize */ 5, /* maxPeers */ 2); | ||
expect(sampler.activePeerCount).toBe(0); | ||
expect(sampler.getPeerForRequest(0)).toBeUndefined(); | ||
|
||
let i = 0; | ||
mockRandomSampler.random.mockImplementation(() => i++ % 3); | ||
|
||
libp2p.getPeers.mockReturnValue(peers); | ||
const samplerWithMorePeers = new BatchConnectionSampler(connectionSampler, /* batchSize */ 2, /* maxPeers */ 3); | ||
expect(samplerWithMorePeers.requestsPerBucket).toBe(1); // floor(2/3) = 0 | ||
// First two requests go to first two peers | ||
expect(samplerWithMorePeers.getPeerForRequest(0)).toBe(peers[0]); | ||
expect(samplerWithMorePeers.getPeerForRequest(1)).toBe(peers[1]); | ||
}); | ||
}); |
94 changes: 94 additions & 0 deletions
94
yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
import { createLogger } from '@aztec/foundation/log'; | ||
|
||
import { type PeerId } from '@libp2p/interface'; | ||
|
||
import { type ConnectionSampler } from './connection_sampler.js'; | ||
|
||
/** | ||
* Manages batches of peers for parallel request processing. | ||
* Tracks active peers and provides deterministic peer assignment for requests. | ||
* | ||
* Example with 3 peers and 10 requests: | ||
* | ||
* Peers: [P1] [P2] [P3] | ||
* ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ | ||
* Requests: 0,1,2,9 | 3,4,5 | 6,7,8 | ||
* | ||
* Each peer handles a bucket of consecutive requests. | ||
* If a peer fails, it is replaced while maintaining the same bucket. | ||
*/ | ||
export class BatchConnectionSampler { | ||
private readonly logger = createLogger('p2p:reqresp:batch-connection-sampler'); | ||
private readonly batch: PeerId[] = []; | ||
private readonly requestsPerPeer: number; | ||
|
||
constructor(private readonly connectionSampler: ConnectionSampler, batchSize: number, maxPeers: number) { | ||
if (maxPeers <= 0) { | ||
throw new Error('Max peers cannot be 0'); | ||
} | ||
if (batchSize <= 0) { | ||
throw new Error('Batch size cannot be 0'); | ||
} | ||
|
||
// Calculate how many requests each peer should handle, cannot be 0 | ||
this.requestsPerPeer = Math.max(1, Math.floor(batchSize / maxPeers)); | ||
|
||
// Sample initial peers | ||
this.batch = this.connectionSampler.samplePeersBatch(maxPeers); | ||
} | ||
|
||
/** | ||
* Gets the peer responsible for handling a specific request index | ||
* | ||
* @param index - The request index | ||
* @returns The peer assigned to handle this request | ||
*/ | ||
getPeerForRequest(index: number): PeerId | undefined { | ||
if (this.batch.length === 0) { | ||
return undefined; | ||
} | ||
|
||
// Calculate which peer bucket this index belongs to | ||
const peerIndex = Math.floor(index / this.requestsPerPeer) % this.batch.length; | ||
return this.batch[peerIndex]; | ||
} | ||
|
||
/** | ||
* Removes a peer and replaces it with a new one, maintaining the same position | ||
* in the batch array to keep request distribution consistent | ||
* | ||
* @param peerId - The peer to remove and replace | ||
*/ | ||
removePeerAndReplace(peerId: PeerId): void { | ||
const index = this.batch.findIndex(p => p === peerId); | ||
if (index === -1) { | ||
return; | ||
} | ||
|
||
const excluding = new Map([[peerId, true]]); | ||
const newPeer = this.connectionSampler.getPeer(excluding); | ||
|
||
if (newPeer) { | ||
this.batch[index] = newPeer; | ||
this.logger.trace(`Replaced peer ${peerId} with ${newPeer}`, { peerId, newPeer }); | ||
} else { | ||
// If we couldn't get a replacement, remove the peer and compact the array | ||
this.batch.splice(index, 1); | ||
this.logger.trace(`Removed peer ${peerId}`, { peerId }); | ||
} | ||
} | ||
|
||
/** | ||
* Gets the number of active peers | ||
*/ | ||
get activePeerCount(): number { | ||
return this.batch.length; | ||
} | ||
|
||
/** | ||
* Gets the number of requests each peer is assigned to handle | ||
*/ | ||
get requestsPerBucket(): number { | ||
return this.requestsPerPeer; | ||
} | ||
} |
Oops, something went wrong.