From 76a23ebc933dca0bb83254f92a2a9ad9a5d36a2f Mon Sep 17 00:00:00 2001 From: Maddiaa <47148561+Maddiaa0@users.noreply.github.com> Date: Sat, 21 Dec 2024 07:08:11 +0800 Subject: [PATCH] feat(p2p): timeout peers, disconnect from badly scored peers (#10907) fixes: https://github.com/AztecProtocol/aztec-packages/issues/10878 --- yarn-project/p2p/src/bootstrap/bootstrap.ts | 45 +-- .../p2p/src/services/discv5/discV5_service.ts | 36 +- .../p2p/src/services/libp2p/libp2p_service.ts | 31 +- .../peer-scoring/peer_scoring.test.ts | 37 ++- .../src/services/peer-scoring/peer_scoring.ts | 21 ++ .../p2p/src/services/peer_manager.test.ts | 314 ++++++++++++++++++ yarn-project/p2p/src/services/peer_manager.ts | 195 ++++++++--- .../reqresp/reqresp.integration.test.ts | 2 +- yarn-project/p2p/src/services/types.ts | 44 +++ 9 files changed, 632 insertions(+), 93 deletions(-) create mode 100644 yarn-project/p2p/src/services/peer_manager.test.ts create mode 100644 yarn-project/p2p/src/services/types.ts diff --git a/yarn-project/p2p/src/bootstrap/bootstrap.ts b/yarn-project/p2p/src/bootstrap/bootstrap.ts index 32aa21faf3e..371d5789a4a 100644 --- a/yarn-project/p2p/src/bootstrap/bootstrap.ts +++ b/yarn-project/p2p/src/bootstrap/bootstrap.ts @@ -4,19 +4,19 @@ import { type AztecKVStore } from '@aztec/kv-store'; import { OtelMetricsAdapter, type TelemetryClient } from '@aztec/telemetry-client'; import { Discv5, type Discv5EventEmitter } from '@chainsafe/discv5'; -import { SignableENR } from '@chainsafe/enr'; +import { type ENR, SignableENR } from '@chainsafe/enr'; import type { PeerId } from '@libp2p/interface'; import { type Multiaddr, multiaddr } from '@multiformats/multiaddr'; import type { BootnodeConfig } from '../config.js'; -import { AZTEC_ENR_KEY, AZTEC_NET } from '../services/discv5/discV5_service.js'; +import { AZTEC_ENR_KEY, AZTEC_NET } from '../services/types.js'; import { convertToMultiaddr, createLibP2PPeerIdFromPrivateKey, getPeerIdPrivateKey } from '../util.js'; /** * Encapsulates a 'Bootstrap' node, used for the purpose of assisting new joiners in acquiring peers. */ export class BootstrapNode implements P2PBootstrapApi { - private node?: Discv5 = undefined; + private node?: Discv5 & Discv5EventEmitter = undefined; private peerId?: PeerId; constructor( @@ -49,6 +49,7 @@ export class BootstrapNode implements P2PBootstrapApi { enr.set(AZTEC_ENR_KEY, Uint8Array.from([AZTEC_NET])); this.logger.debug(`Starting bootstrap node ${peerId} listening on ${listenAddrUdp.toString()}`); + const metricsRegistry = new OtelMetricsAdapter(this.telemetry); this.node = Discv5.create({ enr, @@ -61,10 +62,10 @@ export class BootstrapNode implements P2PBootstrapApi { metricsRegistry, }); - (this.node as Discv5EventEmitter).on('multiaddrUpdated', (addr: Multiaddr) => { + this.node.on('multiaddrUpdated', (addr: Multiaddr) => { this.logger.info('Advertised socket address updated', { addr: addr.toString() }); }); - (this.node as Discv5EventEmitter).on('discovered', async (enr: SignableENR) => { + this.node.on('discovered', async (enr: SignableENR) => { const addr = await enr.getFullMultiaddr('udp'); this.logger.verbose(`Discovered new peer`, { enr: enr.encodeTxt(), addr: addr?.toString() }); }); @@ -88,35 +89,39 @@ export class BootstrapNode implements P2PBootstrapApi { this.logger.info('Bootstrap node stopped'); } + private assertNodeStarted() { + if (!this.node) { + throw new Error('Node not started'); + } + } + + private assertPeerId() { + if (!this.peerId) { + throw new Error('No peerId found'); + } + } + /** * Returns the peerId of this node. * @returns The node's peer Id */ public getPeerId() { - if (!this.peerId) { - throw new Error('Node not started'); - } - return this.peerId; + this.assertPeerId(); + return this.peerId!; } public getENR() { - if (!this.node) { - throw new Error('Node not started'); - } + this.assertNodeStarted(); return this.node?.enr.toENR(); } public getEncodedEnr() { - if (!this.node) { - throw new Error('Node not started'); - } - return Promise.resolve(this.node.enr.encodeTxt()); + this.assertNodeStarted(); + return Promise.resolve(this.node!.enr.encodeTxt()); } public getRoutingTable() { - if (!this.node) { - throw new Error('Node not started'); - } - return Promise.resolve(this.node.kadValues().map(enr => enr.encodeTxt())); + this.assertNodeStarted(); + return Promise.resolve(this.node!.kadValues().map((enr: ENR) => enr.encodeTxt())); } } diff --git a/yarn-project/p2p/src/services/discv5/discV5_service.ts b/yarn-project/p2p/src/services/discv5/discV5_service.ts index 3ce7e499ae6..b629f1807b2 100644 --- a/yarn-project/p2p/src/services/discv5/discV5_service.ts +++ b/yarn-project/p2p/src/services/discv5/discV5_service.ts @@ -11,26 +11,16 @@ import EventEmitter from 'events'; import type { P2PConfig } from '../../config.js'; import { convertToMultiaddr } from '../../util.js'; import { type PeerDiscoveryService, PeerDiscoveryState } from '../service.js'; - -export const AZTEC_ENR_KEY = 'aztec_network'; +import { AZTEC_ENR_KEY, AZTEC_NET, Discv5Event, PeerEvent } from '../types.js'; const delayBeforeStart = 2000; // 2sec -export enum AztecENR { - devnet = 0x01, - testnet = 0x02, - mainnet = 0x03, -} - -// TODO: Make this an env var -export const AZTEC_NET = AztecENR.devnet; - /** * Peer discovery service using Discv5. */ export class DiscV5Service extends EventEmitter implements PeerDiscoveryService { /** The Discv5 instance */ - private discv5: Discv5; + private discv5: Discv5 & Discv5EventEmitter; /** This instance's ENR */ private enr: SignableENR; @@ -88,13 +78,8 @@ export class DiscV5Service extends EventEmitter implements PeerDiscoveryService metricsRegistry, }); - (this.discv5 as Discv5EventEmitter).on('discovered', (enr: ENR) => this.onDiscovered(enr)); - (this.discv5 as Discv5EventEmitter).on('enrAdded', async (enr: ENR) => { - const multiAddrTcp = await enr.getFullMultiaddr('tcp'); - const multiAddrUdp = await enr.getFullMultiaddr('udp'); - this.logger.debug(`Added ENR ${enr.encodeTxt()}`, { multiAddrTcp, multiAddrUdp, nodeId: enr.nodeId }); - this.onDiscovered(enr); - }); + this.discv5.on(Discv5Event.DISCOVERED, this.onDiscovered.bind(this)); + this.discv5.on(Discv5Event.ENR_ADDED, this.onEnrAdded.bind(this)); } public async start(): Promise { @@ -168,10 +153,21 @@ export class DiscV5Service extends EventEmitter implements PeerDiscoveryService } public async stop(): Promise { + await this.discv5.off(Discv5Event.DISCOVERED, this.onDiscovered); + await this.discv5.off(Discv5Event.ENR_ADDED, this.onEnrAdded); + await this.discv5.stop(); + this.currentState = PeerDiscoveryState.STOPPED; } + private async onEnrAdded(enr: ENR) { + const multiAddrTcp = await enr.getFullMultiaddr('tcp'); + const multiAddrUdp = await enr.getFullMultiaddr('udp'); + this.logger.debug(`Added ENR ${enr.encodeTxt()}`, { multiAddrTcp, multiAddrUdp, nodeId: enr.nodeId }); + this.onDiscovered(enr); + } + private onDiscovered(enr: ENR) { // check the peer is an aztec peer const value = enr.kvs.get(AZTEC_ENR_KEY); @@ -179,7 +175,7 @@ export class DiscV5Service extends EventEmitter implements PeerDiscoveryService const network = value[0]; // check if the peer is on the same network if (network === AZTEC_NET) { - this.emit('peer:discovered', enr); + this.emit(PeerEvent.DISCOVERED, enr); } } } diff --git a/yarn-project/p2p/src/services/libp2p/libp2p_service.ts b/yarn-project/p2p/src/services/libp2p/libp2p_service.ts index 2056cf80f28..6b052af024a 100644 --- a/yarn-project/p2p/src/services/libp2p/libp2p_service.ts +++ b/yarn-project/p2p/src/services/libp2p/libp2p_service.ts @@ -26,7 +26,12 @@ import type { AztecKVStore } from '@aztec/kv-store'; import { Attributes, OtelMetricsAdapter, type TelemetryClient, WithTracer, trackSpan } from '@aztec/telemetry-client'; import { type ENR } from '@chainsafe/enr'; -import { type GossipSub, type GossipSubComponents, gossipsub } from '@chainsafe/libp2p-gossipsub'; +import { + type GossipSub, + type GossipSubComponents, + type GossipsubMessage, + gossipsub, +} from '@chainsafe/libp2p-gossipsub'; import { createPeerScoreParams, createTopicScoreParams } from '@chainsafe/libp2p-gossipsub/score'; import { noise } from '@chainsafe/libp2p-noise'; import { yamux } from '@chainsafe/libp2p-yamux'; @@ -64,6 +69,7 @@ import { } from '../reqresp/interface.js'; import { ReqResp } from '../reqresp/reqresp.js'; import type { P2PService, PeerDiscoveryService } from '../service.js'; +import { GossipSubEvent } from '../types.js'; interface MessageValidator { validator: { @@ -119,7 +125,7 @@ export class LibP2PService extends WithTracer implement ) { super(telemetry, 'LibP2PService'); - this.peerManager = new PeerManager(node, peerDiscoveryService, config, this.tracer, logger); + this.peerManager = new PeerManager(node, peerDiscoveryService, config, telemetry, logger); this.node.services.pubsub.score.params.appSpecificScore = (peerId: string) => { return this.peerManager.getPeerScore(peerId); }; @@ -179,12 +185,7 @@ export class LibP2PService extends WithTracer implement } // add GossipSub listener - this.node.services.pubsub.addEventListener('gossipsub:message', async e => { - const { msg } = e.detail; - this.logger.trace(`Received PUBSUB message.`); - - await this.jobQueue.put(() => this.handleNewGossipMessage(msg)); - }); + this.node.services.pubsub.addEventListener(GossipSubEvent.MESSAGE, this.handleGossipSubEvent.bind(this)); // Start running promise for peer discovery this.discoveryRunningPromise = new RunningPromise( @@ -212,6 +213,13 @@ export class LibP2PService extends WithTracer implement * @returns An empty promise. */ public async stop() { + // Remove gossip sub listener + this.node.services.pubsub.removeEventListener(GossipSubEvent.MESSAGE, this.handleGossipSubEvent.bind(this)); + + // Stop peer manager + this.logger.debug('Stopping peer manager...'); + this.peerManager.stop(); + this.logger.debug('Stopping job queue...'); await this.jobQueue.end(); this.logger.debug('Stopping running promise...'); @@ -365,6 +373,13 @@ export class LibP2PService extends WithTracer implement return this.peerManager.getPeers(includePending); } + private async handleGossipSubEvent(e: CustomEvent) { + const { msg } = e.detail; + this.logger.trace(`Received PUBSUB message.`); + + await this.jobQueue.put(() => this.handleNewGossipMessage(msg)); + } + /** * Send Request via the ReqResp service * The subprotocol defined will determine the request and response types diff --git a/yarn-project/p2p/src/services/peer-scoring/peer_scoring.test.ts b/yarn-project/p2p/src/services/peer-scoring/peer_scoring.test.ts index a8e38b6e42b..4fb13df549b 100644 --- a/yarn-project/p2p/src/services/peer-scoring/peer_scoring.test.ts +++ b/yarn-project/p2p/src/services/peer-scoring/peer_scoring.test.ts @@ -3,7 +3,7 @@ import { PeerErrorSeverity } from '@aztec/circuit-types'; import { jest } from '@jest/globals'; import { getP2PDefaultConfig } from '../../config.js'; -import { PeerScoring } from './peer_scoring.js'; +import { PeerScoreState, PeerScoring } from './peer_scoring.js'; describe('PeerScoring', () => { let peerScoring: PeerScoring; @@ -103,4 +103,39 @@ describe('PeerScoring', () => { peerScoring.updateScore(testPeerId, -peerScoring.peerPenalties[PeerErrorSeverity.LowToleranceError]); expect(peerScoring.getScore(testPeerId)).toBe(-63); }); + + test('should correctly determine peer score state', () => { + const testPeerId = 'testPeerState'; + + // Test Healthy state (default) + expect(peerScoring.getScoreState(testPeerId)).toBe(PeerScoreState.Healthy); + + // Test Disconnect state (score between -100 and -50) + peerScoring.updateScore(testPeerId, -60); + expect(peerScoring.getScoreState(testPeerId)).toBe(PeerScoreState.Disconnect); + + // Test Banned state (score below -100) + peerScoring.updateScore(testPeerId, -50); // Total now -110 + expect(peerScoring.getScoreState(testPeerId)).toBe(PeerScoreState.Banned); + + // Test return to Healthy state + peerScoring.updateScore(testPeerId, 120); // Total now +10 + expect(peerScoring.getScoreState(testPeerId)).toBe(PeerScoreState.Healthy); + }); + + test('should handle score state transitions with decay', () => { + const testPeerId = 'testPeerStateDecay'; + + // Put peer in Disconnect state + peerScoring.updateScore(testPeerId, -60); + expect(peerScoring.getScoreState(testPeerId)).toBe(PeerScoreState.Disconnect); + + // Advance time by 10 minutes (should decay score significantly) + jest.advanceTimersByTime(10 * 60 * 1000); + peerScoring.decayAllScores(); + + // Score should have decayed enough to return to Healthy state + // -60 * (0.9^10) ≈ -23.2, which is above the Disconnect threshold + expect(peerScoring.getScoreState(testPeerId)).toBe(PeerScoreState.Healthy); + }); }); diff --git a/yarn-project/p2p/src/services/peer-scoring/peer_scoring.ts b/yarn-project/p2p/src/services/peer-scoring/peer_scoring.ts index 930a23b12d9..34233435b86 100644 --- a/yarn-project/p2p/src/services/peer-scoring/peer_scoring.ts +++ b/yarn-project/p2p/src/services/peer-scoring/peer_scoring.ts @@ -9,6 +9,16 @@ const DefaultPeerPenalties = { [PeerErrorSeverity.HighToleranceError]: 2, }; +export enum PeerScoreState { + Banned, + Disconnect, + Healthy, +} + +// TODO: move into config / constants +const MIN_SCORE_BEFORE_BAN = -100; +const MIN_SCORE_BEFORE_DISCONNECT = -50; + export class PeerScoring { private scores: Map = new Map(); private lastUpdateTime: Map = new Map(); @@ -65,6 +75,17 @@ export class PeerScoring { return this.scores.get(peerId) || 0; } + getScoreState(peerId: string) { + // TODO: permanently store banned peers??? + const score = this.getScore(peerId); + if (score < MIN_SCORE_BEFORE_BAN) { + return PeerScoreState.Banned; + } else if (score < MIN_SCORE_BEFORE_DISCONNECT) { + return PeerScoreState.Disconnect; + } + return PeerScoreState.Healthy; + } + getStats(): { medianScore: number } { return { medianScore: median(Array.from(this.scores.values())) ?? 0 }; } diff --git a/yarn-project/p2p/src/services/peer_manager.test.ts b/yarn-project/p2p/src/services/peer_manager.test.ts new file mode 100644 index 00000000000..2feba167ffd --- /dev/null +++ b/yarn-project/p2p/src/services/peer_manager.test.ts @@ -0,0 +1,314 @@ +import { PeerErrorSeverity } from '@aztec/circuit-types'; +import { createLogger } from '@aztec/foundation/log'; +import { sleep } from '@aztec/foundation/sleep'; +import { NoopTelemetryClient } from '@aztec/telemetry-client/noop'; + +import { type ENR, SignableENR } from '@chainsafe/enr'; +import { jest } from '@jest/globals'; +import { createSecp256k1PeerId } from '@libp2p/peer-id-factory'; +import { multiaddr } from '@multiformats/multiaddr'; + +import { getP2PDefaultConfig } from '../config.js'; +import { PeerManager } from './peer_manager.js'; +import { PeerEvent } from './types.js'; + +describe('PeerManager', () => { + const mockLibP2PNode: any = { + addEventListener: jest.fn(), + removeEventListener: jest.fn(), + getPeers: jest.fn().mockReturnValue([]), + getDialQueue: jest.fn().mockReturnValue([]), + getConnections: jest.fn().mockReturnValue([]), + peerStore: { + merge: jest.fn(), + }, + dial: jest.fn(), + hangUp: jest.fn(), + }; + + const mockPeerDiscoveryService: any = { + on: jest.fn(), + off: jest.fn(), + isBootstrapPeer: jest.fn().mockReturnValue(false), + runRandomNodesQuery: jest.fn(), + }; + + let peerManager: PeerManager; + // The function provided to the discovery servive callback will be run here + let discoveredPeerCallback: (enr: ENR) => Promise; + + beforeEach(() => { + // Reset all mocks + jest.clearAllMocks(); + + // Setup default mock returns + mockLibP2PNode.getPeers.mockReturnValue([]); + mockLibP2PNode.getDialQueue.mockReturnValue([]); + mockLibP2PNode.getConnections.mockReturnValue([]); + + // Capture the callback for discovered peers + mockPeerDiscoveryService.on.mockImplementation((event: string, callback: any) => { + if (event === PeerEvent.DISCOVERED) { + discoveredPeerCallback = callback; + } + }); + + peerManager = new PeerManager( + mockLibP2PNode, + mockPeerDiscoveryService, + getP2PDefaultConfig(), + new NoopTelemetryClient(), + createLogger('test'), + ); + }); + + afterEach(() => { + jest.useRealTimers(); + }); + + const createMockENR = async () => { + const peerId = await createSecp256k1PeerId(); + const enr = SignableENR.createFromPeerId(peerId); + // Add required TCP multiaddr + enr.setLocationMultiaddr(multiaddr('/ip4/127.0.0.1/tcp/8000')); + return enr.toENR(); + }; + + describe('peer management', () => { + it('should return connected peers', async () => { + const peerId = await createSecp256k1PeerId(); + mockLibP2PNode.getPeers.mockReturnValue([peerId]); + + const peers = peerManager.getPeers(); + expect(peers).toHaveLength(1); + expect(peers[0].id).toBe(peerId.toString()); + expect(peers[0].status).toBe('connected'); + }); + + it('should return peers in dial queue', async () => { + const peerId = await createSecp256k1PeerId(); + mockLibP2PNode.getDialQueue.mockReturnValue([ + { + peerId, + status: 'queued', + multiaddrs: [multiaddr('/ip4/127.0.0.1/tcp/8000')], + }, + ]); + + const peers = peerManager.getPeers(true); + expect(peers).toHaveLength(1); + expect(peers[0].id).toBe(peerId.toString()); + expect(peers[0].status).toBe('dialing'); + }); + + it('should penalize peers', async () => { + const peerId = await createSecp256k1PeerId(); + + peerManager.penalizePeer(peerId, PeerErrorSeverity.LowToleranceError); + + const score = peerManager.getPeerScore(peerId.toString()); + expect(score).toBeLessThan(0); + }); + + it('should handle heartbeat', () => { + // Mock some connected peers + const connections = [{ remotePeer: 'peer1' }, { remotePeer: 'peer2' }]; + mockLibP2PNode.getConnections.mockReturnValue(connections); + + peerManager.heartbeat(); + + // Verify that discover was called + expect(mockPeerDiscoveryService.runRandomNodesQuery).toHaveBeenCalled(); + }); + }); + + describe('peer timeout functionality', () => { + it('should attempt to dial a discovered peer', async () => { + const enr = await createMockENR(); + await discoveredPeerCallback(enr); + + expect(mockLibP2PNode.dial).toHaveBeenCalled(); + }); + + it('should retry failed dials up to MAX_DIAL_ATTEMPTS', async () => { + const enr = await createMockENR(); + mockLibP2PNode.dial.mockRejectedValue(new Error('Connection failed')); + + // First attempt - adds it to the cache + await discoveredPeerCallback(enr); + expect(mockLibP2PNode.dial).toHaveBeenCalledTimes(1); + + // dial peer happens asynchronously, so we need to wait + await sleep(100); + + // Second attempt + await (peerManager as any).discover(); + expect(mockLibP2PNode.dial).toHaveBeenCalledTimes(2); + + // dial peer happens asynchronously, so we need to wait + await sleep(100); + + // Third attempt + await (peerManager as any).discover(); + expect(mockLibP2PNode.dial).toHaveBeenCalledTimes(3); + + // dial peer happens asynchronously, so we need to wait + await sleep(100); + + // After the third attempt, the peer should be removed from + // the cache, and placed in timeout + await discoveredPeerCallback(enr); + expect(mockLibP2PNode.dial).toHaveBeenCalledTimes(3); + }); + + const triggerTimeout = async (enr: ENR) => { + // First attempt - adds it to the cache + await discoveredPeerCallback(enr); + await sleep(100); + // Second attempt - on heartbeat + await (peerManager as any).discover(); + await sleep(100); + // Third attempt - on heartbeat + await (peerManager as any).discover(); + }; + + it('should timeout a peer after max dial attempts and ignore it for the timeout period', async () => { + const enr = await createMockENR(); + mockLibP2PNode.dial.mockRejectedValue(new Error('Connection failed')); + + // Fail three times to trigger timeout + await triggerTimeout(enr); + + expect(mockLibP2PNode.dial).toHaveBeenCalledTimes(3); + + jest.useFakeTimers(); + + // Try to dial immediately after timeout - should be ignored + mockLibP2PNode.dial.mockClear(); + await discoveredPeerCallback(enr); + expect(mockLibP2PNode.dial).not.toHaveBeenCalled(); + + // Advance time by 4 minutes (less than timeout period) + jest.advanceTimersByTime(4 * 60 * 1000); + await discoveredPeerCallback(enr); + expect(mockLibP2PNode.dial).not.toHaveBeenCalled(); + + // Advance time to complete 5 minute timeout + jest.advanceTimersByTime(1 * 60 * 1000); + await discoveredPeerCallback(enr); + expect(mockLibP2PNode.dial).toHaveBeenCalled(); + }); + + it('should cleanup expired timeouts during heartbeat', async () => { + const enr = await createMockENR(); + mockLibP2PNode.dial.mockRejectedValue(new Error('Connection failed')); + + // Fail three times to trigger timeout + await triggerTimeout(enr); + + // Verify peer is timed out + mockLibP2PNode.dial.mockClear(); + await discoveredPeerCallback(enr); + expect(mockLibP2PNode.dial).not.toHaveBeenCalled(); + + jest.useFakeTimers(); + + // Advance time past timeout period and trigger heartbeat + jest.advanceTimersByTime(5 * 60 * 1000); + peerManager.heartbeat(); + + // Peer should now be allowed to dial again + await discoveredPeerCallback(enr); + expect(mockLibP2PNode.dial).toHaveBeenCalled(); + }); + + it('should include timed out peers in getPeers when includePending is true', async () => { + const enr = await createMockENR(); + const peerId = await enr.peerId(); + mockLibP2PNode.dial.mockRejectedValue(new Error('Connection failed')); + + // Fail three times to trigger timeout + for (let i = 0; i < 3; i++) { + await discoveredPeerCallback(enr); + } + + const peers = peerManager.getPeers(true); + const timedOutPeer = peers.find(p => p.id === peerId.toString()); + expect(timedOutPeer).toBeDefined(); + expect(timedOutPeer?.status).toBe('cached'); + }); + + it('should handle multiple peer discoveries and timeouts', async () => { + const enr1 = await createMockENR(); + const enr2 = await createMockENR(); + const peerId1 = await enr1.peerId(); + const peerId2 = await enr2.peerId(); + mockLibP2PNode.dial.mockRejectedValue(new Error('Connection failed')); + + // Fail peer1 three times + for (let i = 0; i < 3; i++) { + await discoveredPeerCallback(enr1); + } + + // Try peer2 once + await discoveredPeerCallback(enr2); + await sleep(100); + + const peers = peerManager.getPeers(true); + expect(peers).toHaveLength(2); + + const peer1 = peers.find(p => p.id === peerId1.toString()); + const peer2 = peers.find(p => p.id === peerId2.toString()); + + expect(peer1?.status).toBe('cached'); // timed out + expect(peer2?.status).toBe('cached'); // in dial queue + }); + + it('should disconnect from unhealthy peers during heartbeat', async () => { + // Create two peers with different states + const bannedPeerId = await createSecp256k1PeerId(); + const disconnectPeerId = await createSecp256k1PeerId(); + const healthyPeerId = await createSecp256k1PeerId(); + + // Mock the connections to return our test peers + mockLibP2PNode.getConnections.mockReturnValue([ + { remotePeer: bannedPeerId }, + { remotePeer: disconnectPeerId }, + { remotePeer: healthyPeerId }, + ]); + + // Set the peer scores to trigger different states + peerManager.penalizePeer(bannedPeerId, PeerErrorSeverity.LowToleranceError); // Will set score below -100 + peerManager.penalizePeer(bannedPeerId, PeerErrorSeverity.LowToleranceError); // Additional penalty to ensure banned state + + peerManager.penalizePeer(disconnectPeerId, PeerErrorSeverity.LowToleranceError); // Will set score between -100 and -50 + peerManager.penalizePeer(disconnectPeerId, PeerErrorSeverity.HighToleranceError); + + // Trigger heartbeat which should call pruneUnhealthyPeers + peerManager.heartbeat(); + + await sleep(100); + + // Verify that hangUp was called for both unhealthy peers + expect(mockLibP2PNode.hangUp).toHaveBeenCalledWith(bannedPeerId); + expect(mockLibP2PNode.hangUp).toHaveBeenCalledWith(disconnectPeerId); + + // Verify that hangUp was not called for the healthy peer + expect(mockLibP2PNode.hangUp).not.toHaveBeenCalledWith(healthyPeerId); + + // Verify hangUp was called exactly twice (once for each unhealthy peer) + expect(mockLibP2PNode.hangUp).toHaveBeenCalledTimes(2); + }); + + it('should properly clean up peers on stop', async () => { + const enr = await createMockENR(); + await discoveredPeerCallback(enr); + + peerManager.stop(); + + expect(mockLibP2PNode.removeEventListener).toHaveBeenCalledWith(PeerEvent.CONNECTED, expect.any(Function)); + expect(mockLibP2PNode.removeEventListener).toHaveBeenCalledWith(PeerEvent.DISCONNECTED, expect.any(Function)); + expect(mockPeerDiscoveryService.off).toHaveBeenCalledWith(PeerEvent.DISCOVERED, expect.any(Function)); + }); + }); +}); diff --git a/yarn-project/p2p/src/services/peer_manager.ts b/yarn-project/p2p/src/services/peer_manager.ts index cd4451b55cd..59052dfb9ad 100644 --- a/yarn-project/p2p/src/services/peer_manager.ts +++ b/yarn-project/p2p/src/services/peer_manager.ts @@ -1,71 +1,116 @@ import { type PeerErrorSeverity, type PeerInfo } from '@aztec/circuit-types'; import { createLogger } from '@aztec/foundation/log'; -import { type Traceable, type Tracer, trackSpan } from '@aztec/telemetry-client'; +import { type TelemetryClient, WithTracer, trackSpan } from '@aztec/telemetry-client'; import { type ENR } from '@chainsafe/enr'; -import { type PeerId } from '@libp2p/interface'; +import { type Connection, type PeerId } from '@libp2p/interface'; import { type Multiaddr } from '@multiformats/multiaddr'; import { inspect } from 'util'; import { type P2PConfig } from '../config.js'; import { type PubSubLibp2p } from '../util.js'; -import { PeerScoring } from './peer-scoring/peer_scoring.js'; +import { PeerScoreState, PeerScoring } from './peer-scoring/peer_scoring.js'; import { type PeerDiscoveryService } from './service.js'; +import { PeerEvent } from './types.js'; const MAX_DIAL_ATTEMPTS = 3; const MAX_CACHED_PEERS = 100; +const MAX_CACHED_PEER_AGE_MS = 5 * 60 * 1000; // 5 minutes +const FAILED_PEER_BAN_TIME_MS = 5 * 60 * 1000; // 5 minutes timeout after failing MAX_DIAL_ATTEMPTS type CachedPeer = { peerId: PeerId; enr: ENR; multiaddrTcp: Multiaddr; dialAttempts: number; + addedUnixMs: number; }; -export class PeerManager implements Traceable { +type TimedOutPeer = { + peerId: string; + timeoutUntilMs: number; +}; + +export class PeerManager extends WithTracer { private cachedPeers: Map = new Map(); private peerScoring: PeerScoring; private heartbeatCounter: number = 0; + private displayPeerCountsPeerHeartbeat: number = 0; + private timedOutPeers: Map = new Map(); constructor( private libP2PNode: PubSubLibp2p, private peerDiscoveryService: PeerDiscoveryService, private config: P2PConfig, - public readonly tracer: Tracer, + telemetryClient: TelemetryClient, private logger = createLogger('p2p:peer-manager'), ) { + super(telemetryClient, 'PeerManager'); + this.peerScoring = new PeerScoring(config); // Handle new established connections - this.libP2PNode.addEventListener('peer:connect', evt => { - const peerId = evt.detail; - if (this.peerDiscoveryService.isBootstrapPeer(peerId)) { - this.logger.verbose(`Connected to bootstrap peer ${peerId.toString()}`); - } else { - this.logger.verbose(`Connected to transaction peer ${peerId.toString()}`); - } - }); - + this.libP2PNode.addEventListener(PeerEvent.CONNECTED, this.handleConnectedPeerEvent.bind(this)); // Handle lost connections - this.libP2PNode.addEventListener('peer:disconnect', evt => { - const peerId = evt.detail; - if (this.peerDiscoveryService.isBootstrapPeer(peerId)) { - this.logger.verbose(`Disconnected from bootstrap peer ${peerId.toString()}`); - } else { - this.logger.verbose(`Disconnected from transaction peer ${peerId.toString()}`); - } - }); + this.libP2PNode.addEventListener(PeerEvent.DISCONNECTED, this.handleDisconnectedPeerEvent.bind(this)); // Handle Discovered peers - this.peerDiscoveryService.on('peer:discovered', async (enr: ENR) => { - await this.handleDiscoveredPeer(enr); - }); + this.peerDiscoveryService.on(PeerEvent.DISCOVERED, this.handleDiscoveredPeer.bind(this)); + + // Display peer counts every 60 seconds + this.displayPeerCountsPeerHeartbeat = Math.floor(60_000 / this.config.peerCheckIntervalMS); } @trackSpan('PeerManager.heartbeat') public heartbeat() { this.heartbeatCounter++; - this.discover(); this.peerScoring.decayAllScores(); + + this.cleanupExpiredTimeouts(); + + this.discover(); + } + + /** + * Cleans up expired timeouts. + * + * When peers fail to dial after a number of retries, they are temporarily timed out. + * This function removes any peers that have been in the timed out state for too long. + * To give them a chance to reconnect. + */ + private cleanupExpiredTimeouts() { + // Clean up expired timeouts + const now = Date.now(); + for (const [peerId, timedOutPeer] of this.timedOutPeers.entries()) { + if (now >= timedOutPeer.timeoutUntilMs) { + this.timedOutPeers.delete(peerId); + } + } + } + + /** + * Simply logs the type of connected peer. + * @param e - The connected peer event. + */ + private handleConnectedPeerEvent(e: CustomEvent) { + const peerId = e.detail; + if (this.peerDiscoveryService.isBootstrapPeer(peerId)) { + this.logger.verbose(`Connected to bootstrap peer ${peerId.toString()}`); + } else { + this.logger.verbose(`Connected to transaction peer ${peerId.toString()}`); + } + } + + /** + * Simply logs the type of disconnected peer. + * @param e - The disconnected peer event. + */ + private handleDisconnectedPeerEvent(e: CustomEvent) { + const peerId = e.detail; + if (this.peerDiscoveryService.isBootstrapPeer(peerId)) { + this.logger.verbose(`Disconnected from bootstrap peer ${peerId.toString()}`); + } else { + this.logger.verbose(`Disconnected from transaction peer ${peerId.toString()}`); + } } public penalizePeer(peerId: PeerId, penalty: PeerErrorSeverity) { @@ -116,13 +161,14 @@ export class PeerManager implements Traceable { * Discovers peers. */ private discover() { - // Get current connections const connections = this.libP2PNode.getConnections(); + const healthyConnections = this.pruneUnhealthyPeers(connections); + // Calculate how many connections we're looking to make - const peersToConnect = this.config.maxPeerCount - connections.length; + const peersToConnect = this.config.maxPeerCount - healthyConnections.length; - const logLevel = this.heartbeatCounter % 60 === 0 ? 'info' : 'debug'; + const logLevel = this.heartbeatCounter % this.displayPeerCountsPeerHeartbeat === 0 ? 'info' : 'debug'; this.logger[logLevel](`Connected to ${connections.length} peers`, { connections: connections.length, maxPeerCount: this.config.maxPeerCount, @@ -146,7 +192,12 @@ export class PeerManager implements Traceable { for (const [id, peerData] of this.cachedPeers.entries()) { // if already dialling or connected to, remove from cache - if (pendingDials.has(id) || connections.some(conn => conn.remotePeer.equals(peerData.peerId))) { + if ( + pendingDials.has(id) || + healthyConnections.some(conn => conn.remotePeer.equals(peerData.peerId)) || + // if peer has been in cache for the max cache age, remove from cache + Date.now() - peerData.addedUnixMs > MAX_CACHED_PEER_AGE_MS + ) { this.cachedPeers.delete(id); } else { // cachedPeersToDial.set(id, enr); @@ -158,6 +209,7 @@ export class PeerManager implements Traceable { cachedPeersToDial.reverse(); for (const peer of cachedPeersToDial) { + // We remove from the cache before, as dialling will add it back if it fails this.cachedPeers.delete(peer.peerId.toString()); void this.dialPeer(peer); } @@ -169,35 +221,74 @@ export class PeerManager implements Traceable { } } + private pruneUnhealthyPeers(connections: Connection[]): Connection[] { + const connectedHealthyPeers: Connection[] = []; + + for (const peer of connections) { + const score = this.peerScoring.getScoreState(peer.remotePeer.toString()); + switch (score) { + // TODO: add goodbye and give reasons + case PeerScoreState.Banned: + case PeerScoreState.Disconnect: + void this.disconnectPeer(peer.remotePeer); + break; + case PeerScoreState.Healthy: + connectedHealthyPeers.push(peer); + } + } + + return connectedHealthyPeers; + } + + // TODO: send a goodbye with a reason to the peer + private async disconnectPeer(peer: PeerId) { + this.logger.debug(`Disconnecting peer ${peer.toString()}`); + await this.libP2PNode.hangUp(peer); + } + /** * Handles a discovered peer. * @param enr - The discovered peer's ENR. */ private async handleDiscoveredPeer(enr: ENR) { - // TODO: Will be handling peer scoring here + // Check that the peer has not already been banned + const peerId = await enr.peerId(); + const peerIdString = peerId.toString(); + + // Check if peer is temporarily timed out + const timedOutPeer = this.timedOutPeers.get(peerIdString); + if (timedOutPeer) { + if (Date.now() < timedOutPeer.timeoutUntilMs) { + this.logger.trace(`Skipping timed out peer ${peerId}`); + return; + } + // Timeout period expired, remove from timed out peers + this.timedOutPeers.delete(peerIdString); + } - // check if peer is already connected - const [peerId, multiaddrTcp] = await Promise.all([enr.peerId(), enr.getFullMultiaddr('tcp')]); + if (this.peerScoring.getScoreState(peerIdString) != PeerScoreState.Healthy) { + return; + } - this.logger.trace( - `Handling discovered peer ${peerId.toString()} at ${multiaddrTcp?.toString() ?? 'undefined address'}`, - ); + const [multiaddrTcp] = await Promise.all([enr.getFullMultiaddr('tcp')]); + + this.logger.trace(`Handling discovered peer ${peerId} at ${multiaddrTcp?.toString() ?? 'undefined address'}`); - // throw if no tcp addr in multiaddr + // stop if no tcp addr in multiaddr if (!multiaddrTcp) { this.logger.debug(`No TCP address in discovered node's multiaddr ${enr.encodeTxt()}`); return; } + // check if peer is already connected const connections = this.libP2PNode.getConnections(); if (connections.some(conn => conn.remotePeer.equals(peerId))) { - this.logger.trace(`Already connected to peer ${peerId.toString()}`); + this.logger.trace(`Already connected to peer ${peerId}`); return; } // check if peer is already in cache - const id = peerId.toString(); - if (this.cachedPeers.has(id)) { - this.logger.trace(`Peer already in cache ${id}`); + if (this.cachedPeers.has(peerIdString)) { + this.logger.trace(`Peer already in cache ${peerIdString}`); return; } @@ -207,14 +298,15 @@ export class PeerManager implements Traceable { enr, multiaddrTcp, dialAttempts: 0, + addedUnixMs: Date.now(), }; // Determine if we should dial immediately or not if (this.shouldDialPeer()) { void this.dialPeer(cachedPeer); } else { - this.logger.trace(`Caching peer ${id}`); - this.cachedPeers.set(id, cachedPeer); + this.logger.trace(`Caching peer ${peerIdString}`); + this.cachedPeers.set(peerIdString, cachedPeer); // Prune set of cached peers this.pruneCachedPeers(); } @@ -222,6 +314,8 @@ export class PeerManager implements Traceable { private async dialPeer(peer: CachedPeer) { const id = peer.peerId.toString(); + + // Add to the address book before dialing await this.libP2PNode.peerStore.merge(peer.peerId, { multiaddrs: [peer.multiaddrTcp] }); this.logger.trace(`Dialing peer ${id}`); @@ -236,6 +330,11 @@ export class PeerManager implements Traceable { formatLibp2pDialError(error as Error); this.logger.debug(`Failed to dial peer ${id} (dropping)`, { error: inspect(error) }); this.cachedPeers.delete(id); + // Add to timed out peers + this.timedOutPeers.set(id, { + peerId: id, + timeoutUntilMs: Date.now() + FAILED_PEER_BAN_TIME_MS, + }); } } } @@ -267,6 +366,16 @@ export class PeerManager implements Traceable { } } } + + /** + * Stops the peer manager. + * Removing all event listeners. + */ + public stop() { + this.libP2PNode.removeEventListener(PeerEvent.CONNECTED, this.handleConnectedPeerEvent); + this.libP2PNode.removeEventListener(PeerEvent.DISCONNECTED, this.handleDisconnectedPeerEvent); + this.peerDiscoveryService.off(PeerEvent.DISCOVERED, this.handleDiscoveredPeer); + } } /** diff --git a/yarn-project/p2p/src/services/reqresp/reqresp.integration.test.ts b/yarn-project/p2p/src/services/reqresp/reqresp.integration.test.ts index d930341268c..70e6c6b51ba 100644 --- a/yarn-project/p2p/src/services/reqresp/reqresp.integration.test.ts +++ b/yarn-project/p2p/src/services/reqresp/reqresp.integration.test.ts @@ -29,8 +29,8 @@ import { type AttestationPool } from '../../mem_pools/attestation_pool/attestati import { type EpochProofQuotePool } from '../../mem_pools/epoch_proof_quote_pool/epoch_proof_quote_pool.js'; import { type TxPool } from '../../mem_pools/tx_pool/index.js'; import { AlwaysFalseCircuitVerifier, AlwaysTrueCircuitVerifier } from '../../mocks/index.js'; +import { AZTEC_ENR_KEY, AZTEC_NET } from '../../services/types.js'; import { convertToMultiaddr, createLibP2PPeerIdFromPrivateKey } from '../../util.js'; -import { AZTEC_ENR_KEY, AZTEC_NET } from '../discv5/discV5_service.js'; const TEST_TIMEOUT = 80000; diff --git a/yarn-project/p2p/src/services/types.ts b/yarn-project/p2p/src/services/types.ts new file mode 100644 index 00000000000..16a67a724b2 --- /dev/null +++ b/yarn-project/p2p/src/services/types.ts @@ -0,0 +1,44 @@ +/*************************************************** + * Events + ***************************************************/ + +/** + * Events emitted from the libp2p node. + */ +export enum PeerEvent { + DISCOVERED = 'peer:discovered', + CONNECTED = 'peer:connect', + DISCONNECTED = 'peer:disconnect', +} + +/** + * Events emitted from the Discv5 service. + */ +export enum Discv5Event { + DISCOVERED = 'discovered', + ENR_ADDED = 'enrAdded', +} + +/** + * Events emitted from the GossipSub protocol. + */ +export enum GossipSubEvent { + MESSAGE = 'gossipsub:message', +} + +/*************************************************** + * Types + ***************************************************/ +/** + * Aztec network specific types + */ +export const AZTEC_ENR_KEY = 'aztec_network'; + +export enum AztecENR { + devnet = 0x01, + testnet = 0x02, + mainnet = 0x03, +} + +// TODO: Make this an env var +export const AZTEC_NET = AztecENR.devnet;