diff --git a/yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.test.ts b/yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.test.ts new file mode 100644 index 00000000000..4908ed6c97e --- /dev/null +++ b/yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.test.ts @@ -0,0 +1,178 @@ +import { describe, expect, it, jest } from '@jest/globals'; +import { createSecp256k1PeerId } from '@libp2p/peer-id-factory'; +import { type Libp2p } from 'libp2p'; + +import { BatchConnectionSampler } from './batch_connection_sampler.js'; +import { ConnectionSampler, type RandomSampler } from './connection_sampler.js'; + +describe('BatchConnectionSampler', () => { + const mockRandomSampler = { + random: jest.fn(), + } as jest.Mocked; + + let peers: Awaited>[]; + let libp2p: jest.Mocked; + let connectionSampler: ConnectionSampler; + + beforeEach(async () => { + jest.clearAllMocks(); + + // Create a set of test peers + peers = await Promise.all(new Array(5).fill(0).map(() => createSecp256k1PeerId())); + + // Mock libp2p to return our test peers + libp2p = { + getPeers: jest.fn().mockReturnValue(peers), + } as unknown as jest.Mocked; + + // Create a real connection sampler with mocked random sampling + connectionSampler = new ConnectionSampler(libp2p, 1000, mockRandomSampler); + }); + + afterEach(async () => { + await connectionSampler.stop(); + }); + + it('initializes with correct number of peers and request distribution', () => { + // Mock random to return sequential indices + mockRandomSampler.random.mockImplementation(_ => 0); + + const sampler = new BatchConnectionSampler(connectionSampler, /* batchSize */ 10, /* maxPeers */ 3); + + expect(sampler.activePeerCount).toBe(3); + expect(sampler.requestsPerBucket).toBe(3); // floor(10/3) = 3 + }); + + it('assigns requests to peers deterministically with wraparound', () => { + // Mock to return first two peers + let callCount = 0; + mockRandomSampler.random.mockImplementation(() => callCount++ % 2); + + // With 5 requests and 2 peers: + // floor(5/2) = 2 requests per peer + // Peer 0: 0,1,4 (gets extra from wraparound) + // Peer 1: 2,3 + const sampler = new BatchConnectionSampler(connectionSampler, /* batchSize */ 5, /* maxPeers */ 2); + const assignments = new Array(5).fill(0).map((_, i) => sampler.getPeerForRequest(i)); + + // First peer gets first bucket and wraparound + expect(assignments[0]).toBe(peers[0]); // First bucket + expect(assignments[1]).toBe(peers[0]); // First bucket + expect(assignments[4]).toBe(peers[0]); // Wraparound + + // Second peer gets middle bucket + expect(assignments[2]).toBe(peers[1]); + expect(assignments[3]).toBe(peers[1]); + }); + + it('handles peer removal and replacement', () => { + mockRandomSampler.random.mockImplementation(_ => { + return 2; // Return index 2 for replacement peer + }); + + // With 4 requests and 2 peers: + // floor(4/2) = 2 requests per peer + // Initial distribution: + // Peer 0: 0,1 + // Peer 1: 2,3 + const sampler = new BatchConnectionSampler(connectionSampler, /* batchSize */ 4, /* maxPeers */ 2); + + const initialPeer = sampler.getPeerForRequest(0); + expect(initialPeer).toBe(peers[0]); + + sampler.removePeerAndReplace(peers[0]); + + // After replacement: + // Replacement peer should handle the same bucket + const newPeer = sampler.getPeerForRequest(0); + expect(newPeer).toBe(peers[2]); + expect(sampler.getPeerForRequest(1)).toBe(peers[2]); + + // Other peer's bucket remains unchanged + expect(sampler.getPeerForRequest(2)).toBe(peers[1]); + expect(sampler.getPeerForRequest(3)).toBe(peers[1]); + }); + + it('handles peer removal and replacement - no replacement available', () => { + mockRandomSampler.random.mockImplementation(() => 2); + const sampler = new BatchConnectionSampler(connectionSampler, /* batchSize */ 4, /* maxPeers */ 2); + + expect(sampler.activePeerCount).toBe(2); + expect(sampler.getPeerForRequest(0)).toBe(peers[0]); + + // Will sample no peers + libp2p.getPeers.mockReturnValue([]); + + // Remove peer 0, its requests will be distributed to peer 1 + sampler.removePeerAndReplace(peers[0]); + // Decrease the number of active peers + expect(sampler.activePeerCount).toBe(1); + + expect(sampler.getPeerForRequest(0)).toBe(peers[1]); + }); + + it('distributes requests according to documentation example', () => { + let callCount = 0; + mockRandomSampler.random.mockImplementation(() => { + if (callCount < 3) { + return callCount++; + } + return 0; + }); + + // Example from doc comment: + // Peers: [P1] [P2] [P3] + // Requests: 0,1,2,9 | 3,4,5 | 6,7,8 + const sampler = new BatchConnectionSampler(connectionSampler, /* batchSize */ 10, /* maxPeers */ 3); + + expect(sampler.activePeerCount).toBe(3); + expect(sampler.requestsPerBucket).toBe(3); // floor(10/3) = 3 + + // P1's bucket (0-2) plus wraparound (9) + expect(sampler.getPeerForRequest(0)).toBe(peers[0]); + expect(sampler.getPeerForRequest(1)).toBe(peers[0]); + expect(sampler.getPeerForRequest(2)).toBe(peers[0]); + expect(sampler.getPeerForRequest(9)).toBe(peers[0]); // Wraparound + + // P2's bucket (3-5) + expect(sampler.getPeerForRequest(3)).toBe(peers[1]); + expect(sampler.getPeerForRequest(4)).toBe(peers[1]); + expect(sampler.getPeerForRequest(5)).toBe(peers[1]); + + // P3's bucket (6-8) + expect(sampler.getPeerForRequest(6)).toBe(peers[2]); + expect(sampler.getPeerForRequest(7)).toBe(peers[2]); + expect(sampler.getPeerForRequest(8)).toBe(peers[2]); + }); + + it('same number of requests per peers', () => { + let callCount = 0; + mockRandomSampler.random.mockImplementation(() => callCount++ % 2); + + const sampler = new BatchConnectionSampler(connectionSampler, /* batchSize */ 2, /* maxPeers */ 2); + expect(sampler.requestsPerBucket).toBe(1); + expect(sampler.activePeerCount).toBe(2); + + expect(sampler.getPeerForRequest(0)).toBe(peers[0]); + expect(sampler.getPeerForRequest(1)).toBe(peers[1]); + }); + + it('handles edge cases, 0 peers, smaller batch than max peers', () => { + mockRandomSampler.random.mockImplementation(() => 0); + libp2p.getPeers.mockReturnValue([]); + + const sampler = new BatchConnectionSampler(connectionSampler, /* batchSize */ 5, /* maxPeers */ 2); + expect(sampler.activePeerCount).toBe(0); + expect(sampler.getPeerForRequest(0)).toBeUndefined(); + + let i = 0; + mockRandomSampler.random.mockImplementation(() => i++ % 3); + + libp2p.getPeers.mockReturnValue(peers); + const samplerWithMorePeers = new BatchConnectionSampler(connectionSampler, /* batchSize */ 2, /* maxPeers */ 3); + expect(samplerWithMorePeers.requestsPerBucket).toBe(1); // floor(2/3) = 0 + // First two requests go to first two peers + expect(samplerWithMorePeers.getPeerForRequest(0)).toBe(peers[0]); + expect(samplerWithMorePeers.getPeerForRequest(1)).toBe(peers[1]); + }); +}); diff --git a/yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.ts b/yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.ts new file mode 100644 index 00000000000..665d706a01f --- /dev/null +++ b/yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.ts @@ -0,0 +1,94 @@ +import { createLogger } from '@aztec/foundation/log'; + +import { type PeerId } from '@libp2p/interface'; + +import { type ConnectionSampler } from './connection_sampler.js'; + +/** + * Manages batches of peers for parallel request processing. + * Tracks active peers and provides deterministic peer assignment for requests. + * + * Example with 3 peers and 10 requests: + * + * Peers: [P1] [P2] [P3] + * ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ + * Requests: 0,1,2,9 | 3,4,5 | 6,7,8 + * + * Each peer handles a bucket of consecutive requests. + * If a peer fails, it is replaced while maintaining the same bucket. + */ +export class BatchConnectionSampler { + private readonly logger = createLogger('p2p:reqresp:batch-connection-sampler'); + private readonly batch: PeerId[] = []; + private readonly requestsPerPeer: number; + + constructor(private readonly connectionSampler: ConnectionSampler, batchSize: number, maxPeers: number) { + if (maxPeers <= 0) { + throw new Error('Max peers cannot be 0'); + } + if (batchSize <= 0) { + throw new Error('Batch size cannot be 0'); + } + + // Calculate how many requests each peer should handle, cannot be 0 + this.requestsPerPeer = Math.max(1, Math.floor(batchSize / maxPeers)); + + // Sample initial peers + this.batch = this.connectionSampler.samplePeersBatch(maxPeers); + } + + /** + * Gets the peer responsible for handling a specific request index + * + * @param index - The request index + * @returns The peer assigned to handle this request + */ + getPeerForRequest(index: number): PeerId | undefined { + if (this.batch.length === 0) { + return undefined; + } + + // Calculate which peer bucket this index belongs to + const peerIndex = Math.floor(index / this.requestsPerPeer) % this.batch.length; + return this.batch[peerIndex]; + } + + /** + * Removes a peer and replaces it with a new one, maintaining the same position + * in the batch array to keep request distribution consistent + * + * @param peerId - The peer to remove and replace + */ + removePeerAndReplace(peerId: PeerId): void { + const index = this.batch.findIndex(p => p === peerId); + if (index === -1) { + return; + } + + const excluding = new Map([[peerId, true]]); + const newPeer = this.connectionSampler.getPeer(excluding); + + if (newPeer) { + this.batch[index] = newPeer; + this.logger.trace(`Replaced peer ${peerId} with ${newPeer}`, { peerId, newPeer }); + } else { + // If we couldn't get a replacement, remove the peer and compact the array + this.batch.splice(index, 1); + this.logger.trace(`Removed peer ${peerId}`, { peerId }); + } + } + + /** + * Gets the number of active peers + */ + get activePeerCount(): number { + return this.batch.length; + } + + /** + * Gets the number of requests each peer is assigned to handle + */ + get requestsPerBucket(): number { + return this.requestsPerPeer; + } +} diff --git a/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.test.ts b/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.test.ts index 04a975e1d05..b718c835390 100644 --- a/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.test.ts +++ b/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.test.ts @@ -11,6 +11,7 @@ describe('ConnectionSampler', () => { let sampler: ConnectionSampler; let mockLibp2p: any; let peers: PeerId[]; + let excluding: Map; let mockRandomSampler: MockProxy; beforeEach(async () => { @@ -27,6 +28,7 @@ describe('ConnectionSampler', () => { mockRandomSampler.random.mockReturnValue(0); sampler = new ConnectionSampler(mockLibp2p, 500, mockRandomSampler); + excluding = new Map(); }); afterEach(async () => { @@ -35,10 +37,16 @@ describe('ConnectionSampler', () => { describe('getPeer', () => { it('returns a random peer from the list', () => { - const peer = sampler.getPeer(); + const peer = sampler.getPeer(excluding); expect(peers).toContain(peer); }); + it('returns undefined if no peers are available', () => { + mockLibp2p.getPeers.mockReturnValue([]); + const peer = sampler.getPeer(excluding); + expect(peer).toBeUndefined(); + }); + it('attempts to find peer with no active connections', async () => { // Setup: Create active connection to first two peers const mockStream1: Partial = { id: '1', close: jest.fn() } as Partial; @@ -52,10 +60,23 @@ describe('ConnectionSampler', () => { // Force Math.random to return values that would select the first two peers mockRandomSampler.random.mockReturnValueOnce(0).mockReturnValueOnce(1).mockReturnValueOnce(2); - const selectedPeer = sampler.getPeer(); + const selectedPeer = sampler.getPeer(excluding); // Should select peers[2] as it has no active connections expect(selectedPeer).toBe(peers[2]); }); + + it('should not sample a peer that is being excluded', () => { + // Sample the excluded peer multiple times, but it should not be selected + mockRandomSampler.random + .mockReturnValueOnce(0) + .mockReturnValueOnce(0) + .mockReturnValueOnce(0) + .mockReturnValueOnce(1); + + excluding.set(peers[0], true); + const selectedPeer = sampler.getPeer(excluding); + expect(selectedPeer).toBe(peers[1]); + }); }); describe('connection management', () => { @@ -167,4 +188,95 @@ describe('ConnectionSampler', () => { expect((sampler as any).streams.size).toBe(0); }); }); + + describe('samplePeersBatch', () => { + beforeEach(async () => { + // Create test peers + peers = await Promise.all(new Array(5).fill(0).map(() => createSecp256k1PeerId())); + + // Mock libp2p + mockLibp2p = { + getPeers: jest.fn().mockReturnValue(peers), + dialProtocol: jest.fn(), + }; + + mockRandomSampler = mock(); + sampler = new ConnectionSampler(mockLibp2p, 1000, mockRandomSampler); + }); + + it('prioritizes peers without active connections', () => { + // Set up some peers with active connections + sampler['activeConnectionsCount'].set(peers[3], 1); + sampler['activeConnectionsCount'].set(peers[4], 2); + + // Sample 3 peers + const sampledPeers = sampler.samplePeersBatch(3); + + // Should get peers[0,1,2] first as they have no connections + expect(sampledPeers).toHaveLength(3); + expect(sampledPeers).toContain(peers[0]); + expect(sampledPeers).toContain(peers[1]); + expect(sampledPeers).toContain(peers[2]); + // Should not include peers with active connections when enough peers without connections exist + expect(sampledPeers).not.toContain(peers[3]); + expect(sampledPeers).not.toContain(peers[4]); + }); + + it('falls back to peers with connections when needed', () => { + // Set up most peers with active connections + sampler['activeConnectionsCount'].set(peers[1], 1); + sampler['activeConnectionsCount'].set(peers[2], 1); + sampler['activeConnectionsCount'].set(peers[3], 1); + sampler['activeConnectionsCount'].set(peers[4], 1); + + mockRandomSampler.random.mockReturnValue(0); // Always pick first available peer + + const sampledPeers = sampler.samplePeersBatch(3); + + // Should get peers[0] first (no connections), then some with connections + expect(sampledPeers).toHaveLength(3); + expect(sampledPeers[0]).toBe(peers[0]); // The only peer without connections + expect(sampledPeers.slice(1)).toEqual(expect.arrayContaining([peers[1]])); // Should include some peers with connections + }); + + it('handles case when all peers have active connections', () => { + // Set up all peers with active connections + peers.forEach(peer => sampler['activeConnectionsCount'].set(peer, 1)); + + mockRandomSampler.random.mockReturnValue(0); // Always pick first available peer + + const sampledPeers = sampler.samplePeersBatch(3); + + expect(sampledPeers).toHaveLength(3); + expect(sampledPeers).toEqual(expect.arrayContaining([peers[0], peers[1], peers[2]])); + }); + + it('handles case when fewer peers available than requested', () => { + // Mock libp2p to return fewer peers + const fewPeers = peers.slice(0, 2); + mockLibp2p.getPeers.mockReturnValue(fewPeers); + + const sampledPeers = sampler.samplePeersBatch(5); + + expect(sampledPeers).toHaveLength(2); // Should only return available peers + expect(sampledPeers).toEqual(expect.arrayContaining(fewPeers)); + }); + + it('handles case when no peers available', () => { + mockLibp2p.getPeers.mockReturnValue([]); + + const sampledPeers = sampler.samplePeersBatch(3); + + expect(sampledPeers).toHaveLength(0); + }); + + it('returns exactly the number of peers requested when available', () => { + const sampledPeers = sampler.samplePeersBatch(3); + + expect(sampledPeers).toHaveLength(3); + // Verify all peers are unique + const uniquePeers = new Set(sampledPeers); + expect(uniquePeers.size).toBe(3); + }); + }); }); diff --git a/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts b/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts index a44164eed09..4c18816330a 100644 --- a/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts +++ b/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts @@ -1,4 +1,5 @@ import { createLogger } from '@aztec/foundation/log'; +import { SerialQueue } from '@aztec/foundation/queue'; import { RunningPromise } from '@aztec/foundation/running-promise'; import { type Libp2p, type PeerId, type Stream } from '@libp2p/interface'; @@ -25,42 +26,62 @@ export class RandomSampler { */ export class ConnectionSampler { private readonly logger = createLogger('p2p:reqresp:connection-sampler'); - private cleanupJob?: RunningPromise; + private cleanupJob: RunningPromise; private readonly activeConnectionsCount: Map = new Map(); private readonly streams: Map = new Map(); + // Serial queue to ensure that we only dial one peer at a time + private dialQueue: SerialQueue = new SerialQueue(); + constructor( private readonly libp2p: Libp2p, private readonly cleanupIntervalMs: number = 60000, // Default to 1 minute - - // Random sampler provided so that it can be mocked - private readonly sampler: RandomSampler = new RandomSampler(), + private readonly sampler: RandomSampler = new RandomSampler(), // Allow randomness to be mocked for testing ) { this.cleanupJob = new RunningPromise(() => this.cleanupStaleConnections(), this.logger, this.cleanupIntervalMs); this.cleanupJob.start(); + + this.dialQueue.start(); } /** * Stops the cleanup job and closes all active connections */ async stop() { - await this.cleanupJob?.stop(); + this.logger.info('Stopping connection sampler'); + await this.cleanupJob.stop(); + await this.dialQueue.end(); // Close all active streams const closePromises = Array.from(this.streams.keys()).map(streamId => this.close(streamId)); - await Promise.all(closePromises); + this.logger.info('Connection sampler stopped'); } - getPeer(): PeerId { + /** + * + * @param excluding - The peers to exclude from the sampling + * This is to prevent sampling with replacement + * @returns + */ + getPeer(excluding?: Map): PeerId | undefined { const peers = this.libp2p.getPeers(); + if (peers.length === 0) { + return undefined; + } + let randomIndex = this.sampler.random(peers.length); let attempts = 0; - // If the active connections count is greater than 0, then we already have a connection open - // So we try to sample a different peer, but only MAX_SAMPLE_ATTEMPTS times - while ((this.activeConnectionsCount.get(peers[randomIndex]) ?? 0) > 0 && attempts < MAX_SAMPLE_ATTEMPTS) { + + // Keep sampling while: + // - we haven't exceeded max attempts AND + // - either the peer has active connections OR is in the exclusion list + while ( + attempts < MAX_SAMPLE_ATTEMPTS && + ((this.activeConnectionsCount.get(peers[randomIndex]) ?? 0) > 0 || (excluding?.get(peers[randomIndex]) ?? false)) + ) { randomIndex = this.sampler.random(peers.length); attempts++; } @@ -72,6 +93,44 @@ export class ConnectionSampler { return peers[randomIndex]; } + /** + * Samples a batch of unique peers from the libp2p node, prioritizing peers without active connections + * + * @param numberToSample - The number of peers to sample + * @returns Array of unique sampled peers, prioritizing those without active connections + */ + samplePeersBatch(numberToSample: number): PeerId[] { + const peers = this.libp2p.getPeers(); + const sampledPeers: PeerId[] = []; + const peersWithConnections: PeerId[] = []; // Hold onto peers with active connections incase we need to sample more + + for (const peer of peers) { + const activeConnections = this.activeConnectionsCount.get(peer) ?? 0; + if (activeConnections === 0) { + if (sampledPeers.push(peer) === numberToSample) { + return sampledPeers; + } + } else { + peersWithConnections.push(peer); + } + } + + // If we still need more peers, sample from those with connections + while (sampledPeers.length < numberToSample && peersWithConnections.length > 0) { + const randomIndex = this.sampler.random(peersWithConnections.length); + const [peer] = peersWithConnections.splice(randomIndex, 1); + sampledPeers.push(peer); + } + + this.logger.trace(`Batch sampled ${sampledPeers.length} unique peers`, { + peers: sampledPeers, + withoutConnections: sampledPeers.length - peersWithConnections.length, + withConnections: peersWithConnections.length, + }); + + return sampledPeers; + } + // Set of passthrough functions to keep track of active connections /** @@ -82,9 +141,11 @@ export class ConnectionSampler { * @returns The stream */ async dialProtocol(peerId: PeerId, protocol: string): Promise { - const stream = await this.libp2p.dialProtocol(peerId, protocol); - this.streams.set(stream.id, { stream, peerId }); + // Dialling at the same time can cause race conditions where two different streams + // end up with the same id, hence a serial queue + const stream = await this.dialQueue.put(() => this.libp2p.dialProtocol(peerId, protocol)); + this.streams.set(stream.id, { stream, peerId }); const updatedActiveConnectionsCount = (this.activeConnectionsCount.get(peerId) ?? 0) + 1; this.activeConnectionsCount.set(peerId, updatedActiveConnectionsCount); @@ -117,7 +178,7 @@ export class ConnectionSampler { await stream?.close(); } catch (error) { - this.logger.error(`Failed to close connection to peer ${streamId}`, { error }); + this.logger.warn(`Failed to close connection to peer with stream id ${streamId}`); } finally { this.streams.delete(streamId); } diff --git a/yarn-project/p2p/src/services/reqresp/metrics.ts b/yarn-project/p2p/src/services/reqresp/metrics.ts new file mode 100644 index 00000000000..e32b4cdb4f4 --- /dev/null +++ b/yarn-project/p2p/src/services/reqresp/metrics.ts @@ -0,0 +1,57 @@ +// Request response metrics +import { Attributes, Metrics, ValueType } from '@aztec/telemetry-client'; +import { type TelemetryClient, type Tracer, type UpDownCounter } from '@aztec/telemetry-client'; + +export class ReqRespMetrics { + public readonly tracer: Tracer; + + private readonly sentRequests: UpDownCounter; + private readonly receivedRequests: UpDownCounter; + + private readonly failedOutboundRequests: UpDownCounter; + private readonly failedInboundRequests: UpDownCounter; + + constructor(readonly telemetryClient: TelemetryClient, name = 'ReqResp') { + this.tracer = telemetryClient.getTracer(name); + + const meter = telemetryClient.getMeter(name); + this.sentRequests = meter.createUpDownCounter(Metrics.P2P_REQ_RESP_SENT_REQUESTS, { + description: 'Number of requests sent to peers', + unit: 'requests', + valueType: ValueType.INT, + }); + this.receivedRequests = meter.createUpDownCounter(Metrics.P2P_REQ_RESP_RECEIVED_REQUESTS, { + description: 'Number of requests received from peers', + unit: 'requests', + valueType: ValueType.INT, + }); + + this.failedOutboundRequests = meter.createUpDownCounter(Metrics.P2P_REQ_RESP_FAILED_OUTBOUND_REQUESTS, { + description: 'Number of failed outbound requests - nodes not getting valid responses', + unit: 'requests', + valueType: ValueType.INT, + }); + + this.failedInboundRequests = meter.createUpDownCounter(Metrics.P2P_REQ_RESP_FAILED_INBOUND_REQUESTS, { + description: 'Number of failed inbound requests - node failing to respond to requests', + unit: 'requests', + valueType: ValueType.INT, + }); + } + + public recordRequestSent(protocol: string) { + this.sentRequests.add(1, { [Attributes.P2P_REQ_RESP_PROTOCOL]: protocol }); + } + + public recordRequestReceived(protocol: string) { + this.receivedRequests.add(1, { [Attributes.P2P_REQ_RESP_PROTOCOL]: protocol }); + } + + public recordRequestError(protocol: string) { + this.failedOutboundRequests.add(1, { [Attributes.P2P_REQ_RESP_PROTOCOL]: protocol }); + } + + public recordResponseError(protocol: string) { + this.failedInboundRequests.add(1, { [Attributes.P2P_REQ_RESP_PROTOCOL]: protocol }); + } +} diff --git a/yarn-project/p2p/src/services/reqresp/reqresp.test.ts b/yarn-project/p2p/src/services/reqresp/reqresp.test.ts index 5758b7a58bd..2fbdfce7456 100644 --- a/yarn-project/p2p/src/services/reqresp/reqresp.test.ts +++ b/yarn-project/p2p/src/services/reqresp/reqresp.test.ts @@ -1,4 +1,5 @@ import { PeerErrorSeverity, TxHash, mockTx } from '@aztec/circuit-types'; +import { createLogger } from '@aztec/foundation/log'; import { sleep } from '@aztec/foundation/sleep'; import { describe, expect, it, jest } from '@jest/globals'; @@ -28,6 +29,7 @@ describe('ReqResp', () => { let peerManager: MockProxy; let peerScoring: MockProxy; let nodes: ReqRespNode[]; + const logger = createLogger('test:reqresp.test.ts'); beforeEach(() => { peerScoring = mock(); @@ -96,6 +98,7 @@ describe('ReqResp', () => { if (!res) { // The peer chosen is randomly selected, and the node above wont respond, so if // we wait and try again, there will only be one node to chose from + logger.debug('No response from node, retrying'); await sleep(500); res = await nodes[0].req.sendRequest(ReqRespSubProtocol.PING, PING_REQUEST); } @@ -337,4 +340,60 @@ describe('ReqResp', () => { expect(response).toEqual(Buffer.from([0x0])); }); }); + + describe('Batch requests', () => { + it('should send a batch request between many peers', async () => { + const batchSize = 9; + nodes = await createNodes(peerScoring, 3); + + await startNodes(nodes); + await sleep(500); + await connectToPeers(nodes); + await sleep(500); + + const sendRequestToPeerSpy = jest.spyOn(nodes[0].req, 'sendRequestToPeer'); + + const requests = Array.from({ length: batchSize }, _ => RequestableBuffer.fromBuffer(Buffer.from(`ping`))); + const expectResponses = Array.from({ length: batchSize }, _ => RequestableBuffer.fromBuffer(Buffer.from(`pong`))); + + const res = await nodes[0].req.sendBatchRequest(ReqRespSubProtocol.PING, requests); + expect(res).toEqual(expectResponses); + + // Expect one request to have been sent to each peer + expect(sendRequestToPeerSpy).toHaveBeenCalledTimes(batchSize); + expect(sendRequestToPeerSpy).toHaveBeenCalledWith( + expect.objectContaining({ + publicKey: nodes[1].p2p.peerId.publicKey, + }), + ReqRespSubProtocol.PING, + Buffer.from('ping'), + ); + expect(sendRequestToPeerSpy).toHaveBeenCalledWith( + expect.objectContaining({ + publicKey: nodes[2].p2p.peerId.publicKey, + }), + ReqRespSubProtocol.PING, + Buffer.from('ping'), + ); + }); + + it('should stop after max retry attempts', async () => { + const batchSize = 12; + nodes = await createNodes(peerScoring, 3); + + await startNodes(nodes); + await sleep(500); + await connectToPeers(nodes); + await sleep(500); + + const requests = Array.from({ length: batchSize }, _ => RequestableBuffer.fromBuffer(Buffer.from(`ping`))); + // We will fail two of the responses - due to hitting the ping rate limit on the responding nodes + const expectResponses = Array.from({ length: batchSize - 2 }, _ => + RequestableBuffer.fromBuffer(Buffer.from(`pong`)), + ); + + const res = await nodes[0].req.sendBatchRequest(ReqRespSubProtocol.PING, requests); + expect(res).toEqual(expectResponses); + }); + }); }); diff --git a/yarn-project/p2p/src/services/reqresp/reqresp.ts b/yarn-project/p2p/src/services/reqresp/reqresp.ts index 604c5eaf3cd..ff2f01195cd 100644 --- a/yarn-project/p2p/src/services/reqresp/reqresp.ts +++ b/yarn-project/p2p/src/services/reqresp/reqresp.ts @@ -2,6 +2,7 @@ import { PeerErrorSeverity } from '@aztec/circuit-types'; import { type Logger, createLogger } from '@aztec/foundation/log'; import { executeTimeout } from '@aztec/foundation/timer'; +import { Attributes, type TelemetryClient, getTelemetryClient, trackSpan } from '@aztec/telemetry-client'; import { type IncomingStreamData, type PeerId, type Stream } from '@libp2p/interface'; import { pipe } from 'it-pipe'; @@ -16,6 +17,7 @@ import { import { SnappyTransform } from '../encoding.js'; import { type PeerScoring } from '../peer-manager/peer_scoring.js'; import { type P2PReqRespConfig } from './config.js'; +import { BatchConnectionSampler } from './connection-sampler/batch_connection_sampler.js'; import { ConnectionSampler } from './connection-sampler/connection_sampler.js'; import { DEFAULT_SUB_PROTOCOL_HANDLERS, @@ -26,6 +28,7 @@ import { type SubProtocolMap, subProtocolMap, } from './interface.js'; +import { ReqRespMetrics } from './metrics.js'; import { RequestResponseRateLimiter } from './rate-limiter/rate_limiter.js'; /** @@ -52,13 +55,19 @@ export class ReqResp { private subProtocolHandlers: ReqRespSubProtocolHandlers = DEFAULT_SUB_PROTOCOL_HANDLERS; private subProtocolValidators: ReqRespSubProtocolValidators = DEFAULT_SUB_PROTOCOL_VALIDATORS; + private connectionSampler: ConnectionSampler; private rateLimiter: RequestResponseRateLimiter; private snappyTransform: SnappyTransform; - private connectionSampler: ConnectionSampler; + private metrics: ReqRespMetrics; - constructor(config: P2PReqRespConfig, private libp2p: Libp2p, private peerScoring: PeerScoring) { + constructor( + config: P2PReqRespConfig, + private libp2p: Libp2p, + private peerScoring: PeerScoring, + telemetryClient: TelemetryClient = getTelemetryClient(), + ) { this.logger = createLogger('p2p:reqresp'); this.overallRequestTimeoutMs = config.overallRequestTimeoutMs; @@ -70,6 +79,11 @@ export class ReqResp { this.connectionSampler = new ConnectionSampler(libp2p); this.snappyTransform = new SnappyTransform(); + this.metrics = new ReqRespMetrics(telemetryClient); + } + + get tracer() { + return this.metrics.tracer; } /** @@ -96,6 +110,9 @@ export class ReqResp { } // Close all active connections + await this.connectionSampler.stop(); + this.logger.debug('ReqResp: Connection sampler stopped'); + const closeStreamPromises = this.libp2p.getConnections().map(connection => connection.close()); await Promise.all(closeStreamPromises); this.logger.debug('ReqResp: All active streams closed'); @@ -103,9 +120,6 @@ export class ReqResp { this.rateLimiter.stop(); this.logger.debug('ReqResp: Rate limiter stopped'); - await this.connectionSampler.stop(); - this.logger.debug('ReqResp: Connection sampler stopped'); - // NOTE: We assume libp2p instance is managed by the caller } @@ -123,8 +137,8 @@ export class ReqResp { * If no response is received from any peer, it returns undefined. * * The method performs the following steps: - * - Iterates over all active peers. - * - Opens a stream with each peer using the specified sub-protocol. + * - Sample a peer to send the request to. + * - Opens a stream with the peer using the specified sub-protocol. * * When a response is received, it is validated using the given sub protocols response validator. * To see the interface for the response validator - see `interface.ts` @@ -142,16 +156,29 @@ export class ReqResp { subProtocol: SubProtocol, request: InstanceType, ): Promise | undefined> { - const requestFunction = async () => { - const responseValidator = this.subProtocolValidators[subProtocol]; - const requestBuffer = request.toBuffer(); + const responseValidator = this.subProtocolValidators[subProtocol]; + const requestBuffer = request.toBuffer(); + const requestFunction = async () => { // Attempt to ask all of our peers, but sampled in a random order // This function is wrapped in a timeout, so we will exit the loop if we have not received a response const numberOfPeers = this.libp2p.getPeers().length; + + if (numberOfPeers === 0) { + this.logger.debug('No active peers to send requests to'); + return undefined; + } + + const attemptedPeers: Map = new Map(); for (let i = 0; i < numberOfPeers; i++) { // Sample a peer to make a request to - const peer = this.connectionSampler.getPeer(); + const peer = this.connectionSampler.getPeer(attemptedPeers); + if (!peer) { + this.logger.debug('No peers available to send requests to'); + return undefined; + } + + attemptedPeers.set(peer, true); this.logger.trace(`Sending request to peer: ${peer.toString()}`); const response = await this.sendRequestToPeer(peer, subProtocol, requestBuffer); @@ -182,6 +209,152 @@ export class ReqResp { } } + /** + * Request multiple messages over the same sub protocol, balancing the requests across peers. + * + * @devnote + * - The function prioritizes sending requests to free peers using a batch sampling strategy. + * - If a peer fails to respond or returns an invalid response, it is removed from the sampling pool and replaced. + * - The function stops retrying once all requests are processed, no active peers remain, or the maximum retry attempts are reached. + * - Responses are validated using a custom validator for the sub-protocol.* + * + * Requests are sent in parallel to each peer, but multiple requests are sent to the same peer in series + * - If a peer fails to respond or returns an invalid response, it is removed from the sampling pool and replaced. + * - The function stops retrying once all requests are processed, no active peers remain, or the maximum retry attempts are reached. + * - Responses are validated using a custom validator for the sub-protocol.* + * + * @param subProtocol + * @param requests + * @param timeoutMs + * @param maxPeers + * @returns + * + * @throws {CollectiveReqRespTimeoutError} - If the request batch exceeds the specified timeout (`timeoutMs`). + */ + @trackSpan( + 'ReqResp.sendBatchRequest', + (subProtocol: ReqRespSubProtocol, requests: InstanceType[]) => ({ + [Attributes.P2P_REQ_RESP_PROTOCOL]: subProtocol, + [Attributes.P2P_REQ_RESP_BATCH_REQUESTS_COUNT]: requests.length, + }), + ) + async sendBatchRequest( + subProtocol: SubProtocol, + requests: InstanceType[], + timeoutMs = 10000, + maxPeers = Math.min(10, requests.length), + maxRetryAttempts = 3, + ): Promise[]> { + const responseValidator = this.subProtocolValidators[subProtocol]; + const responses: InstanceType[] = new Array(requests.length); + const requestBuffers = requests.map(req => req.toBuffer()); + + const requestFunction = async () => { + // Track which requests still need to be processed + const pendingRequestIndices = new Set(requestBuffers.map((_, i) => i)); + + // Create batch sampler with the total number of requests and max peers + const batchSampler = new BatchConnectionSampler(this.connectionSampler, requests.length, maxPeers); + + if (batchSampler.activePeerCount === 0) { + this.logger.debug('No active peers to send requests to'); + return []; + } + + // This is where it gets fun + // The outer loop is the retry loop, we will continue to retry until we process all indices we have + // not received a response for, or we have reached the max retry attempts + + // The inner loop is the batch loop, we will process all requests for each peer in parallel + // We will then process the results of the requests, and resample any peers that failed to respond + // We will continue to retry until we have processed all indices, or we have reached the max retry attempts + + let retryAttempts = 0; + while (pendingRequestIndices.size > 0 && batchSampler.activePeerCount > 0 && retryAttempts < maxRetryAttempts) { + // Process requests in parallel for each available peer + const requestBatches = new Map(); + + // Group requests by peer + for (const requestIndex of pendingRequestIndices) { + const peer = batchSampler.getPeerForRequest(requestIndex); + if (!peer) { + break; + } + + if (!requestBatches.has(peer)) { + requestBatches.set(peer, []); + } + requestBatches.get(peer)!.push(requestIndex); + } + + // Make parallel requests for each peer's batch + // A batch entry will look something like this: + // PeerId0: [0, 1, 2, 3] + // PeerId1: [4, 5, 6, 7] + + // Peer Id 0 will send requests 0, 1, 2, 3 in serial + // while simultaneously Peer Id 1 will send requests 4, 5, 6, 7 in serial + + const batchResults = await Promise.all( + Array.from(requestBatches.entries()).map(async ([peer, indices]) => { + try { + // Requests all going to the same peer are sent synchronously + const peerResults: { index: number; response: InstanceType }[] = + []; + for (const index of indices) { + const response = await this.sendRequestToPeer(peer, subProtocol, requestBuffers[index]); + + if (response && response.length > 0) { + const object = subProtocolMap[subProtocol].response.fromBuffer(response); + const isValid = await responseValidator(requests[index], object, peer); + + if (isValid) { + peerResults.push({ index, response: object }); + } + } + } + + return { peer, results: peerResults }; + } catch (error) { + this.logger.debug(`Failed batch request to peer ${peer.toString()}:`, error); + batchSampler.removePeerAndReplace(peer); + return { peer, results: [] }; + } + }), + ); + + // Process results + for (const { results } of batchResults) { + for (const { index, response } of results) { + if (response) { + responses[index] = response; + pendingRequestIndices.delete(index); + } + } + } + + retryAttempts++; + } + + if (retryAttempts >= maxRetryAttempts) { + this.logger.debug(`Max retry attempts ${maxRetryAttempts} reached for batch request`); + } + + return responses; + }; + + try { + return await executeTimeout[]>( + requestFunction, + timeoutMs, + () => new CollectiveReqRespTimeoutError(), + ); + } catch (e: any) { + this.logger.debug(`${e.message} | subProtocol: ${subProtocol}`); + return []; + } + } + /** * Sends a request to a specific peer * @@ -206,6 +379,10 @@ export class ReqResp { * If the stream is not closed by the dialled peer, and a timeout occurs, then * the stream is closed on the requester's end and sender (us) updates its peer score */ + @trackSpan('ReqResp.sendRequestToPeer', (peerId: PeerId, subProtocol: ReqRespSubProtocol, _: Buffer) => ({ + [Attributes.P2P_ID]: peerId.toString(), + [Attributes.P2P_REQ_RESP_PROTOCOL]: subProtocol, + })) public async sendRequestToPeer( peerId: PeerId, subProtocol: ReqRespSubProtocol, @@ -213,8 +390,9 @@ export class ReqResp { ): Promise { let stream: Stream | undefined; try { + this.metrics.recordRequestSent(subProtocol); + stream = await this.connectionSampler.dialProtocol(peerId, subProtocol); - this.logger.trace(`Stream opened with ${peerId.toString()} for ${subProtocol}`); // Open the stream with a timeout const result = await executeTimeout( @@ -225,8 +403,10 @@ export class ReqResp { return result; } catch (e: any) { + this.metrics.recordRequestError(subProtocol); this.handleResponseError(e, peerId, subProtocol); } finally { + // Only close the stream if we created it if (stream) { try { await this.connectionSampler.close(stream.id); @@ -331,7 +511,13 @@ export class ReqResp { * We check rate limits for each peer, note the peer will be penalised within the rate limiter implementation * if they exceed their peer specific limits. */ + @trackSpan('ReqResp.streamHandler', (protocol: ReqRespSubProtocol, { connection }: IncomingStreamData) => ({ + [Attributes.P2P_REQ_RESP_PROTOCOL]: protocol, + [Attributes.P2P_ID]: connection.remotePeer.toString(), + })) private async streamHandler(protocol: ReqRespSubProtocol, { stream, connection }: IncomingStreamData) { + this.metrics.recordRequestReceived(protocol); + // Store a reference to from this for the async generator if (!this.rateLimiter.allow(protocol, connection.remotePeer)) { this.logger.warn(`Rate limit exceeded for ${protocol} from ${connection.remotePeer}`); @@ -358,6 +544,7 @@ export class ReqResp { ); } catch (e: any) { this.logger.warn(e); + this.metrics.recordResponseError(protocol); } finally { await stream.close(); } diff --git a/yarn-project/telemetry-client/src/attributes.ts b/yarn-project/telemetry-client/src/attributes.ts index 15fdeafa62f..3df53eaf7b6 100644 --- a/yarn-project/telemetry-client/src/attributes.ts +++ b/yarn-project/telemetry-client/src/attributes.ts @@ -84,6 +84,8 @@ export const ROLLUP_PROVER_ID = 'aztec.rollup.prover_id'; export const PROOF_TIMED_OUT = 'aztec.proof.timed_out'; export const P2P_ID = 'aztec.p2p.id'; +export const P2P_REQ_RESP_PROTOCOL = 'aztec.p2p.req_resp.protocol'; +export const P2P_REQ_RESP_BATCH_REQUESTS_COUNT = 'aztec.p2p.req_resp.batch_requests_count'; export const POOL_NAME = 'aztec.pool.name'; export const SEQUENCER_STATE = 'aztec.sequencer.state'; diff --git a/yarn-project/telemetry-client/src/metrics.ts b/yarn-project/telemetry-client/src/metrics.ts index f755bbde8f4..5314a203968 100644 --- a/yarn-project/telemetry-client/src/metrics.ts +++ b/yarn-project/telemetry-client/src/metrics.ts @@ -71,6 +71,11 @@ export const L1_PUBLISHER_TX_BLOBDATA_GAS_COST = 'aztec.l1_publisher.tx_blobdata export const PEER_MANAGER_GOODBYES_SENT = 'aztec.peer_manager.goodbyes_sent'; export const PEER_MANAGER_GOODBYES_RECEIVED = 'aztec.peer_manager.goodbyes_received'; +export const P2P_REQ_RESP_SENT_REQUESTS = 'aztec.p2p.req_resp.sent_requests'; +export const P2P_REQ_RESP_RECEIVED_REQUESTS = 'aztec.p2p.req_resp.received_requests'; +export const P2P_REQ_RESP_FAILED_OUTBOUND_REQUESTS = 'aztec.p2p.req_resp.failed_outbound_requests'; +export const P2P_REQ_RESP_FAILED_INBOUND_REQUESTS = 'aztec.p2p.req_resp.failed_inbound_requests'; + export const PUBLIC_PROCESSOR_TX_DURATION = 'aztec.public_processor.tx_duration'; export const PUBLIC_PROCESSOR_TX_COUNT = 'aztec.public_processor.tx_count'; export const PUBLIC_PROCESSOR_TX_PHASE_COUNT = 'aztec.public_processor.tx_phase_count';