Skip to content

Commit

Permalink
feat(p2p): timeout peers, disconnect from badly scored peers (#10907)
Browse files Browse the repository at this point in the history
fixes: #10878
  • Loading branch information
Maddiaa0 authored Dec 20, 2024
1 parent 84a4005 commit 76a23eb
Show file tree
Hide file tree
Showing 9 changed files with 632 additions and 93 deletions.
45 changes: 25 additions & 20 deletions yarn-project/p2p/src/bootstrap/bootstrap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,19 @@ import { type AztecKVStore } from '@aztec/kv-store';
import { OtelMetricsAdapter, type TelemetryClient } from '@aztec/telemetry-client';

import { Discv5, type Discv5EventEmitter } from '@chainsafe/discv5';
import { SignableENR } from '@chainsafe/enr';
import { type ENR, SignableENR } from '@chainsafe/enr';
import type { PeerId } from '@libp2p/interface';
import { type Multiaddr, multiaddr } from '@multiformats/multiaddr';

import type { BootnodeConfig } from '../config.js';
import { AZTEC_ENR_KEY, AZTEC_NET } from '../services/discv5/discV5_service.js';
import { AZTEC_ENR_KEY, AZTEC_NET } from '../services/types.js';
import { convertToMultiaddr, createLibP2PPeerIdFromPrivateKey, getPeerIdPrivateKey } from '../util.js';

/**
* Encapsulates a 'Bootstrap' node, used for the purpose of assisting new joiners in acquiring peers.
*/
export class BootstrapNode implements P2PBootstrapApi {
private node?: Discv5 = undefined;
private node?: Discv5 & Discv5EventEmitter = undefined;
private peerId?: PeerId;

constructor(
Expand Down Expand Up @@ -49,6 +49,7 @@ export class BootstrapNode implements P2PBootstrapApi {
enr.set(AZTEC_ENR_KEY, Uint8Array.from([AZTEC_NET]));

this.logger.debug(`Starting bootstrap node ${peerId} listening on ${listenAddrUdp.toString()}`);

const metricsRegistry = new OtelMetricsAdapter(this.telemetry);
this.node = Discv5.create({
enr,
Expand All @@ -61,10 +62,10 @@ export class BootstrapNode implements P2PBootstrapApi {
metricsRegistry,
});

(this.node as Discv5EventEmitter).on('multiaddrUpdated', (addr: Multiaddr) => {
this.node.on('multiaddrUpdated', (addr: Multiaddr) => {
this.logger.info('Advertised socket address updated', { addr: addr.toString() });
});
(this.node as Discv5EventEmitter).on('discovered', async (enr: SignableENR) => {
this.node.on('discovered', async (enr: SignableENR) => {
const addr = await enr.getFullMultiaddr('udp');
this.logger.verbose(`Discovered new peer`, { enr: enr.encodeTxt(), addr: addr?.toString() });
});
Expand All @@ -88,35 +89,39 @@ export class BootstrapNode implements P2PBootstrapApi {
this.logger.info('Bootstrap node stopped');
}

private assertNodeStarted() {
if (!this.node) {
throw new Error('Node not started');
}
}

private assertPeerId() {
if (!this.peerId) {
throw new Error('No peerId found');
}
}

/**
* Returns the peerId of this node.
* @returns The node's peer Id
*/
public getPeerId() {
if (!this.peerId) {
throw new Error('Node not started');
}
return this.peerId;
this.assertPeerId();
return this.peerId!;
}

public getENR() {
if (!this.node) {
throw new Error('Node not started');
}
this.assertNodeStarted();
return this.node?.enr.toENR();
}

public getEncodedEnr() {
if (!this.node) {
throw new Error('Node not started');
}
return Promise.resolve(this.node.enr.encodeTxt());
this.assertNodeStarted();
return Promise.resolve(this.node!.enr.encodeTxt());
}

public getRoutingTable() {
if (!this.node) {
throw new Error('Node not started');
}
return Promise.resolve(this.node.kadValues().map(enr => enr.encodeTxt()));
this.assertNodeStarted();
return Promise.resolve(this.node!.kadValues().map((enr: ENR) => enr.encodeTxt()));
}
}
36 changes: 16 additions & 20 deletions yarn-project/p2p/src/services/discv5/discV5_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,16 @@ import EventEmitter from 'events';
import type { P2PConfig } from '../../config.js';
import { convertToMultiaddr } from '../../util.js';
import { type PeerDiscoveryService, PeerDiscoveryState } from '../service.js';

export const AZTEC_ENR_KEY = 'aztec_network';
import { AZTEC_ENR_KEY, AZTEC_NET, Discv5Event, PeerEvent } from '../types.js';

const delayBeforeStart = 2000; // 2sec

export enum AztecENR {
devnet = 0x01,
testnet = 0x02,
mainnet = 0x03,
}

// TODO: Make this an env var
export const AZTEC_NET = AztecENR.devnet;

/**
* Peer discovery service using Discv5.
*/
export class DiscV5Service extends EventEmitter implements PeerDiscoveryService {
/** The Discv5 instance */
private discv5: Discv5;
private discv5: Discv5 & Discv5EventEmitter;

/** This instance's ENR */
private enr: SignableENR;
Expand Down Expand Up @@ -88,13 +78,8 @@ export class DiscV5Service extends EventEmitter implements PeerDiscoveryService
metricsRegistry,
});

(this.discv5 as Discv5EventEmitter).on('discovered', (enr: ENR) => this.onDiscovered(enr));
(this.discv5 as Discv5EventEmitter).on('enrAdded', async (enr: ENR) => {
const multiAddrTcp = await enr.getFullMultiaddr('tcp');
const multiAddrUdp = await enr.getFullMultiaddr('udp');
this.logger.debug(`Added ENR ${enr.encodeTxt()}`, { multiAddrTcp, multiAddrUdp, nodeId: enr.nodeId });
this.onDiscovered(enr);
});
this.discv5.on(Discv5Event.DISCOVERED, this.onDiscovered.bind(this));
this.discv5.on(Discv5Event.ENR_ADDED, this.onEnrAdded.bind(this));
}

public async start(): Promise<void> {
Expand Down Expand Up @@ -168,18 +153,29 @@ export class DiscV5Service extends EventEmitter implements PeerDiscoveryService
}

public async stop(): Promise<void> {
await this.discv5.off(Discv5Event.DISCOVERED, this.onDiscovered);
await this.discv5.off(Discv5Event.ENR_ADDED, this.onEnrAdded);

await this.discv5.stop();

this.currentState = PeerDiscoveryState.STOPPED;
}

private async onEnrAdded(enr: ENR) {
const multiAddrTcp = await enr.getFullMultiaddr('tcp');
const multiAddrUdp = await enr.getFullMultiaddr('udp');
this.logger.debug(`Added ENR ${enr.encodeTxt()}`, { multiAddrTcp, multiAddrUdp, nodeId: enr.nodeId });
this.onDiscovered(enr);
}

private onDiscovered(enr: ENR) {
// check the peer is an aztec peer
const value = enr.kvs.get(AZTEC_ENR_KEY);
if (value) {
const network = value[0];
// check if the peer is on the same network
if (network === AZTEC_NET) {
this.emit('peer:discovered', enr);
this.emit(PeerEvent.DISCOVERED, enr);
}
}
}
Expand Down
31 changes: 23 additions & 8 deletions yarn-project/p2p/src/services/libp2p/libp2p_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@ import type { AztecKVStore } from '@aztec/kv-store';
import { Attributes, OtelMetricsAdapter, type TelemetryClient, WithTracer, trackSpan } from '@aztec/telemetry-client';

import { type ENR } from '@chainsafe/enr';
import { type GossipSub, type GossipSubComponents, gossipsub } from '@chainsafe/libp2p-gossipsub';
import {
type GossipSub,
type GossipSubComponents,
type GossipsubMessage,
gossipsub,
} from '@chainsafe/libp2p-gossipsub';
import { createPeerScoreParams, createTopicScoreParams } from '@chainsafe/libp2p-gossipsub/score';
import { noise } from '@chainsafe/libp2p-noise';
import { yamux } from '@chainsafe/libp2p-yamux';
Expand Down Expand Up @@ -64,6 +69,7 @@ import {
} from '../reqresp/interface.js';
import { ReqResp } from '../reqresp/reqresp.js';
import type { P2PService, PeerDiscoveryService } from '../service.js';
import { GossipSubEvent } from '../types.js';

interface MessageValidator {
validator: {
Expand Down Expand Up @@ -119,7 +125,7 @@ export class LibP2PService<T extends P2PClientType> extends WithTracer implement
) {
super(telemetry, 'LibP2PService');

this.peerManager = new PeerManager(node, peerDiscoveryService, config, this.tracer, logger);
this.peerManager = new PeerManager(node, peerDiscoveryService, config, telemetry, logger);
this.node.services.pubsub.score.params.appSpecificScore = (peerId: string) => {
return this.peerManager.getPeerScore(peerId);
};
Expand Down Expand Up @@ -179,12 +185,7 @@ export class LibP2PService<T extends P2PClientType> extends WithTracer implement
}

// add GossipSub listener
this.node.services.pubsub.addEventListener('gossipsub:message', async e => {
const { msg } = e.detail;
this.logger.trace(`Received PUBSUB message.`);

await this.jobQueue.put(() => this.handleNewGossipMessage(msg));
});
this.node.services.pubsub.addEventListener(GossipSubEvent.MESSAGE, this.handleGossipSubEvent.bind(this));

// Start running promise for peer discovery
this.discoveryRunningPromise = new RunningPromise(
Expand Down Expand Up @@ -212,6 +213,13 @@ export class LibP2PService<T extends P2PClientType> extends WithTracer implement
* @returns An empty promise.
*/
public async stop() {
// Remove gossip sub listener
this.node.services.pubsub.removeEventListener(GossipSubEvent.MESSAGE, this.handleGossipSubEvent.bind(this));

// Stop peer manager
this.logger.debug('Stopping peer manager...');
this.peerManager.stop();

this.logger.debug('Stopping job queue...');
await this.jobQueue.end();
this.logger.debug('Stopping running promise...');
Expand Down Expand Up @@ -365,6 +373,13 @@ export class LibP2PService<T extends P2PClientType> extends WithTracer implement
return this.peerManager.getPeers(includePending);
}

private async handleGossipSubEvent(e: CustomEvent<GossipsubMessage>) {
const { msg } = e.detail;
this.logger.trace(`Received PUBSUB message.`);

await this.jobQueue.put(() => this.handleNewGossipMessage(msg));
}

/**
* Send Request via the ReqResp service
* The subprotocol defined will determine the request and response types
Expand Down
37 changes: 36 additions & 1 deletion yarn-project/p2p/src/services/peer-scoring/peer_scoring.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { PeerErrorSeverity } from '@aztec/circuit-types';
import { jest } from '@jest/globals';

import { getP2PDefaultConfig } from '../../config.js';
import { PeerScoring } from './peer_scoring.js';
import { PeerScoreState, PeerScoring } from './peer_scoring.js';

describe('PeerScoring', () => {
let peerScoring: PeerScoring;
Expand Down Expand Up @@ -103,4 +103,39 @@ describe('PeerScoring', () => {
peerScoring.updateScore(testPeerId, -peerScoring.peerPenalties[PeerErrorSeverity.LowToleranceError]);
expect(peerScoring.getScore(testPeerId)).toBe(-63);
});

test('should correctly determine peer score state', () => {
const testPeerId = 'testPeerState';

// Test Healthy state (default)
expect(peerScoring.getScoreState(testPeerId)).toBe(PeerScoreState.Healthy);

// Test Disconnect state (score between -100 and -50)
peerScoring.updateScore(testPeerId, -60);
expect(peerScoring.getScoreState(testPeerId)).toBe(PeerScoreState.Disconnect);

// Test Banned state (score below -100)
peerScoring.updateScore(testPeerId, -50); // Total now -110
expect(peerScoring.getScoreState(testPeerId)).toBe(PeerScoreState.Banned);

// Test return to Healthy state
peerScoring.updateScore(testPeerId, 120); // Total now +10
expect(peerScoring.getScoreState(testPeerId)).toBe(PeerScoreState.Healthy);
});

test('should handle score state transitions with decay', () => {
const testPeerId = 'testPeerStateDecay';

// Put peer in Disconnect state
peerScoring.updateScore(testPeerId, -60);
expect(peerScoring.getScoreState(testPeerId)).toBe(PeerScoreState.Disconnect);

// Advance time by 10 minutes (should decay score significantly)
jest.advanceTimersByTime(10 * 60 * 1000);
peerScoring.decayAllScores();

// Score should have decayed enough to return to Healthy state
// -60 * (0.9^10) ≈ -23.2, which is above the Disconnect threshold
expect(peerScoring.getScoreState(testPeerId)).toBe(PeerScoreState.Healthy);
});
});
21 changes: 21 additions & 0 deletions yarn-project/p2p/src/services/peer-scoring/peer_scoring.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,16 @@ const DefaultPeerPenalties = {
[PeerErrorSeverity.HighToleranceError]: 2,
};

export enum PeerScoreState {
Banned,
Disconnect,
Healthy,
}

// TODO: move into config / constants
const MIN_SCORE_BEFORE_BAN = -100;
const MIN_SCORE_BEFORE_DISCONNECT = -50;

export class PeerScoring {
private scores: Map<string, number> = new Map();
private lastUpdateTime: Map<string, number> = new Map();
Expand Down Expand Up @@ -65,6 +75,17 @@ export class PeerScoring {
return this.scores.get(peerId) || 0;
}

getScoreState(peerId: string) {
// TODO: permanently store banned peers???
const score = this.getScore(peerId);
if (score < MIN_SCORE_BEFORE_BAN) {
return PeerScoreState.Banned;
} else if (score < MIN_SCORE_BEFORE_DISCONNECT) {
return PeerScoreState.Disconnect;
}
return PeerScoreState.Healthy;
}

getStats(): { medianScore: number } {
return { medianScore: median(Array.from(this.scores.values())) ?? 0 };
}
Expand Down
Loading

0 comments on commit 76a23eb

Please sign in to comment.