From 9cce2c6fbae00008451940157690e0b5b99d9e59 Mon Sep 17 00:00:00 2001 From: Maddiaa <47148561+Maddiaa0@users.noreply.github.com> Date: Wed, 18 Dec 2024 00:17:17 +0800 Subject: [PATCH] feat(p2p): activate gossipsub tx validators (#10695) --- .../aztec-node/src/aztec-node/server.ts | 4 +- .../p2p/src/services/libp2p/libp2p_service.ts | 218 +++++++++++++----- .../double_spend_validator.test.ts | 6 +- .../tx_validator/double_spend_validator.ts | 14 +- .../src/tx_validator/tx_validator_factory.ts | 9 +- 5 files changed, 171 insertions(+), 80 deletions(-) diff --git a/yarn-project/aztec-node/src/aztec-node/server.ts b/yarn-project/aztec-node/src/aztec-node/server.ts index dd29b991f0c..60e2ce1ca22 100644 --- a/yarn-project/aztec-node/src/aztec-node/server.ts +++ b/yarn-project/aztec-node/src/aztec-node/server.ts @@ -855,9 +855,7 @@ export class AztecNodeService implements AztecNode, Traceable { new DataTxValidator(), new MetadataTxValidator(new Fr(this.l1ChainId), new Fr(blockNumber)), new DoubleSpendTxValidator({ - getNullifierIndex(nullifier) { - return db.findLeafIndices(MerkleTreeId.NULLIFIER_TREE, [nullifier.toBuffer()]).then(x => x[0]); - }, + getNullifierIndices: nullifiers => db.findLeafIndices(MerkleTreeId.NULLIFIER_TREE, nullifiers), }), ]; diff --git a/yarn-project/p2p/src/services/libp2p/libp2p_service.ts b/yarn-project/p2p/src/services/libp2p/libp2p_service.ts index e4a6449a539..bfbf5b7df20 100644 --- a/yarn-project/p2p/src/services/libp2p/libp2p_service.ts +++ b/yarn-project/p2p/src/services/libp2p/libp2p_service.ts @@ -29,7 +29,7 @@ import { createPeerScoreParams, createTopicScoreParams } from '@chainsafe/libp2p import { noise } from '@chainsafe/libp2p-noise'; import { yamux } from '@chainsafe/libp2p-yamux'; import { identify } from '@libp2p/identify'; -import type { PeerId } from '@libp2p/interface'; +import { type Message, type PeerId, TopicValidatorResult } from '@libp2p/interface'; import '@libp2p/kad-dht'; import { mplex } from '@libp2p/mplex'; import { tcp } from '@libp2p/tcp'; @@ -62,6 +62,21 @@ import { import { ReqResp } from '../reqresp/reqresp.js'; import type { P2PService, PeerDiscoveryService } from '../service.js'; +interface MessageValidator { + validator: { + validateTx(tx: Tx): Promise; + }; + severity: PeerErrorSeverity; +} + +interface ValidationResult { + name: string; + isValid: boolean; + severity: PeerErrorSeverity; +} + +type ValidationOutcome = { allPassed: true } | { allPassed: false; failure: ValidationResult }; + /** * Lib P2P implementation of the P2PService interface. */ @@ -137,12 +152,15 @@ export class LibP2PService extends WithTracer implement this.subscribeToTopic(TopicTypeMap[topic].p2pTopic); } + // Add p2p topic validators + this.node.services.pubsub.topicValidators.set(Tx.p2pTopic, this.validatePropagatedTxFromMessage.bind(this)); + // add GossipSub listener this.node.services.pubsub.addEventListener('gossipsub:message', async e => { - const { msg, propagationSource: peerId } = e.detail; + const { msg } = e.detail; this.logger.trace(`Received PUBSUB message.`); - await this.jobQueue.put(() => this.handleNewGossipMessage(msg, peerId)); + await this.jobQueue.put(() => this.handleNewGossipMessage(msg)); }); // Start running promise for peer discovery @@ -256,6 +274,7 @@ export class LibP2PService extends WithTracer implement dataTransform: new SnappyTransform(), metricsRegister: otelMetricsAdapter, metricsTopicStrToLabel: metricsTopicStrToLabels(), + asyncValidation: true, scoreParams: createPeerScoreParams({ topics: { [Tx.p2pTopic]: createTopicScoreParams({ @@ -382,10 +401,10 @@ export class LibP2PService extends WithTracer implement * @param topic - The message's topic. * @param data - The message data */ - private async handleNewGossipMessage(message: RawGossipMessage, peerId: PeerId) { + private async handleNewGossipMessage(message: RawGossipMessage) { if (message.topic === Tx.p2pTopic) { const tx = Tx.fromBuffer(Buffer.from(message.data)); - await this.processTxFromPeer(tx, peerId); + await this.processTxFromPeer(tx); } if (message.topic === BlockAttestation.p2pTopic && this.clientType === P2PClientType.Full) { const attestation = BlockAttestation.fromBuffer(Buffer.from(message.data)); @@ -474,16 +493,11 @@ export class LibP2PService extends WithTracer implement }); } - private async processTxFromPeer(tx: Tx, peerId: PeerId): Promise { + private async processTxFromPeer(tx: Tx): Promise { const txHash = tx.getTxHash(); const txHashString = txHash.toString(); this.logger.verbose(`Received tx ${txHashString} from external peer.`); - - const isValidTx = await this.validatePropagatedTx(tx, peerId); - - if (isValidTx) { - await this.mempools.txPool.addTxs([tx]); - } + await this.mempools.txPool.addTxs([tx]); } /** @@ -523,70 +537,150 @@ export class LibP2PService extends WithTracer implement return true; } + private async validatePropagatedTxFromMessage( + propagationSource: PeerId, + msg: Message, + ): Promise { + const tx = Tx.fromBuffer(Buffer.from(msg.data)); + const isValid = await this.validatePropagatedTx(tx, propagationSource); + this.logger.trace(`validatePropagatedTx: ${isValid}`, { + [Attributes.TX_HASH]: tx.getTxHash().toString(), + [Attributes.P2P_ID]: propagationSource.toString(), + }); + return isValid ? TopicValidatorResult.Accept : TopicValidatorResult.Reject; + } + + /** + * Validate a tx that has been propagated from a peer. + * @param tx - The tx to validate. + * @param peerId - The peer ID of the peer that sent the tx. + * @returns True if the tx is valid, false otherwise. + */ @trackSpan('Libp2pService.validatePropagatedTx', tx => ({ [Attributes.TX_HASH]: tx.getTxHash().toString(), })) private async validatePropagatedTx(tx: Tx, peerId: PeerId): Promise { const blockNumber = (await this.l2BlockSource.getBlockNumber()) + 1; - // basic data validation - const dataValidator = new DataTxValidator(); - const validData = await dataValidator.validateTx(tx); - if (!validData) { - // penalize - this.node.services.pubsub.score.markInvalidMessageDelivery(peerId.toString(), Tx.p2pTopic); - return false; + const messageValidators = this.createMessageValidators(blockNumber); + const outcome = await this.runValidations(tx, messageValidators); + + if (outcome.allPassed) { + return true; } - // metadata validation - const metadataValidator = new MetadataTxValidator(new Fr(this.config.l1ChainId), new Fr(blockNumber)); - const validMetadata = await metadataValidator.validateTx(tx); - if (!validMetadata) { - // penalize - this.node.services.pubsub.score.markInvalidMessageDelivery(peerId.toString(), Tx.p2pTopic); - return false; + const { name, severity } = outcome.failure; + + // Double spend validator has a special case handler + if (name === 'doubleSpendValidator') { + const isValid = await this.handleDoubleSpendFailure(tx, blockNumber, peerId); + if (isValid) { + return true; + } } - // double spend validation - const doubleSpendValidator = new DoubleSpendTxValidator({ - getNullifierIndex: async (nullifier: Fr) => { - const merkleTree = this.worldStateSynchronizer.getCommitted(); - const index = (await merkleTree.findLeafIndices(MerkleTreeId.NULLIFIER_TREE, [nullifier.toBuffer()]))[0]; - return index; + this.peerManager.penalizePeer(peerId, severity); + return false; + } + + /** + * Create message validators for the given block number. + * + * Each validator is a pair of a validator and a severity. + * If a validator fails, the peer is penalized with the severity of the validator. + * + * @param blockNumber - The block number to create validators for. + * @returns The message validators. + */ + private createMessageValidators(blockNumber: number): Record { + return { + dataValidator: { + validator: new DataTxValidator(), + severity: PeerErrorSeverity.HighToleranceError, }, - }); - const validDoubleSpend = await doubleSpendValidator.validateTx(tx); - if (!validDoubleSpend) { - // check if nullifier is older than 20 blocks - if (blockNumber - this.config.severePeerPenaltyBlockLength > 0) { - const snapshotValidator = new DoubleSpendTxValidator({ - getNullifierIndex: async (nullifier: Fr) => { - const merkleTree = this.worldStateSynchronizer.getSnapshot( - blockNumber - this.config.severePeerPenaltyBlockLength, - ); - const index = (await merkleTree.findLeafIndices(MerkleTreeId.NULLIFIER_TREE, [nullifier.toBuffer()]))[0]; - return index; + metadataValidator: { + validator: new MetadataTxValidator(new Fr(this.config.l1ChainId), new Fr(blockNumber)), + severity: PeerErrorSeverity.HighToleranceError, + }, + proofValidator: { + validator: new TxProofValidator(this.proofVerifier), + severity: PeerErrorSeverity.MidToleranceError, + }, + doubleSpendValidator: { + validator: new DoubleSpendTxValidator({ + getNullifierIndices: (nullifiers: Buffer[]) => { + const merkleTree = this.worldStateSynchronizer.getCommitted(); + return merkleTree.findLeafIndices(MerkleTreeId.NULLIFIER_TREE, nullifiers); }, - }); - - const validSnapshot = await snapshotValidator.validateTx(tx); - // High penalty if nullifier is older than 20 blocks - if (!validSnapshot) { - // penalize - this.peerManager.penalizePeer(peerId, PeerErrorSeverity.LowToleranceError); - return false; - } - } - // penalize - this.peerManager.penalizePeer(peerId, PeerErrorSeverity.HighToleranceError); + }), + severity: PeerErrorSeverity.HighToleranceError, + }, + }; + } + + /** + * Run validations on a tx. + * @param tx - The tx to validate. + * @param messageValidators - The message validators to run. + * @returns The validation outcome. + */ + private async runValidations( + tx: Tx, + messageValidators: Record, + ): Promise { + const validationPromises = Object.entries(messageValidators).map(async ([name, { validator, severity }]) => { + const isValid = await validator.validateTx(tx); + return { name, isValid, severity }; + }); + + // A promise that resolves when all validations have been run + const allValidations = Promise.all(validationPromises); + + // A promise that resolves when the first validation fails + const firstFailure = Promise.race( + validationPromises.map(async promise => { + const result = await promise; + return result.isValid ? new Promise(() => {}) : result; + }), + ); + + // Wait for the first validation to fail or all validations to pass + const result = await Promise.race([ + allValidations.then(() => ({ allPassed: true as const })), + firstFailure.then(failure => ({ allPassed: false as const, failure: failure as ValidationResult })), + ]); + + // If all validations pass, allPassed will be true, if failed, then the failure will be the first validation to fail + return result; + } + + /** + * Handle a double spend failure. + * + * Double spend failures are managed on their own because they are a special case. + * We must check if the double spend is recent or old, if it is past a threshold, then we heavily penalize the peer. + * + * @param tx - The tx that failed the double spend validator. + * @param blockNumber - The block number of the tx. + * @param peerId - The peer ID of the peer that sent the tx. + * @returns True if the tx is valid, false otherwise. + */ + private async handleDoubleSpendFailure(tx: Tx, blockNumber: number, peerId: PeerId): Promise { + if (blockNumber <= this.config.severePeerPenaltyBlockLength) { return false; } - // proof validation - const proofValidator = new TxProofValidator(this.proofVerifier); - const validProof = await proofValidator.validateTx(tx); - if (!validProof) { - // penalize - this.peerManager.penalizePeer(peerId, PeerErrorSeverity.MidToleranceError); + const snapshotValidator = new DoubleSpendTxValidator({ + getNullifierIndices: (nullifiers: Buffer[]) => { + const merkleTree = this.worldStateSynchronizer.getSnapshot( + blockNumber - this.config.severePeerPenaltyBlockLength, + ); + return merkleTree.findLeafIndices(MerkleTreeId.NULLIFIER_TREE, nullifiers); + }, + }); + + const validSnapshot = await snapshotValidator.validateTx(tx); + if (!validSnapshot) { + this.peerManager.penalizePeer(peerId, PeerErrorSeverity.LowToleranceError); return false; } diff --git a/yarn-project/p2p/src/tx_validator/double_spend_validator.test.ts b/yarn-project/p2p/src/tx_validator/double_spend_validator.test.ts index 1c123319f33..7b0fbb13974 100644 --- a/yarn-project/p2p/src/tx_validator/double_spend_validator.test.ts +++ b/yarn-project/p2p/src/tx_validator/double_spend_validator.test.ts @@ -10,8 +10,8 @@ describe('DoubleSpendTxValidator', () => { beforeEach(() => { nullifierSource = mock({ - getNullifierIndex: mockFn().mockImplementation(() => { - return Promise.resolve(undefined); + getNullifierIndices: mockFn().mockImplementation(() => { + return Promise.resolve([undefined]); }), }); txValidator = new DoubleSpendTxValidator(nullifierSource); @@ -48,7 +48,7 @@ describe('DoubleSpendTxValidator', () => { it('rejects duplicates against history', async () => { const badTx = mockTx(); - nullifierSource.getNullifierIndex.mockReturnValueOnce(Promise.resolve(1n)); + nullifierSource.getNullifierIndices.mockReturnValueOnce(Promise.resolve([1n])); await expect(txValidator.validateTxs([badTx])).resolves.toEqual([[], [badTx]]); }); }); diff --git a/yarn-project/p2p/src/tx_validator/double_spend_validator.ts b/yarn-project/p2p/src/tx_validator/double_spend_validator.ts index 5bb06bf1fa9..9f735e197b0 100644 --- a/yarn-project/p2p/src/tx_validator/double_spend_validator.ts +++ b/yarn-project/p2p/src/tx_validator/double_spend_validator.ts @@ -1,9 +1,8 @@ import { type AnyTx, Tx, type TxValidator } from '@aztec/circuit-types'; -import { Fr } from '@aztec/circuits.js'; import { createLogger } from '@aztec/foundation/log'; export interface NullifierSource { - getNullifierIndex: (nullifier: Fr) => Promise; + getNullifierIndices: (nullifiers: Buffer[]) => Promise<(bigint | undefined)[]>; } export class DoubleSpendTxValidator implements TxValidator { @@ -36,9 +35,7 @@ export class DoubleSpendTxValidator implements TxValidator { } async #uniqueNullifiers(tx: AnyTx, thisBlockNullifiers: Set): Promise { - const nullifiers = (tx instanceof Tx ? tx.data.getNonEmptyNullifiers() : tx.txEffect.nullifiers).map(x => - x.toBigInt(), - ); + const nullifiers = tx instanceof Tx ? tx.data.getNonEmptyNullifiers() : tx.txEffect.nullifiers; // Ditch this tx if it has repeated nullifiers const uniqueNullifiers = new Set(nullifiers); @@ -49,16 +46,17 @@ export class DoubleSpendTxValidator implements TxValidator { if (this.isValidatingBlock) { for (const nullifier of nullifiers) { - if (thisBlockNullifiers.has(nullifier)) { + const nullifierBigInt = nullifier.toBigInt(); + if (thisBlockNullifiers.has(nullifierBigInt)) { this.#log.warn(`Rejecting tx ${Tx.getHash(tx)} for repeating a nullifier in the same block`); return false; } - thisBlockNullifiers.add(nullifier); + thisBlockNullifiers.add(nullifierBigInt); } } - const nullifierIndexes = await Promise.all(nullifiers.map(n => this.#nullifierSource.getNullifierIndex(new Fr(n)))); + const nullifierIndexes = await this.#nullifierSource.getNullifierIndices(nullifiers.map(n => n.toBuffer())); const hasDuplicates = nullifierIndexes.some(index => index !== undefined); if (hasDuplicates) { diff --git a/yarn-project/sequencer-client/src/tx_validator/tx_validator_factory.ts b/yarn-project/sequencer-client/src/tx_validator/tx_validator_factory.ts index a3647b2d710..59b6baab1cf 100644 --- a/yarn-project/sequencer-client/src/tx_validator/tx_validator_factory.ts +++ b/yarn-project/sequencer-client/src/tx_validator/tx_validator_factory.ts @@ -29,8 +29,10 @@ export class TxValidatorFactory { private enforceFees: boolean, ) { this.nullifierSource = { - getNullifierIndex: nullifier => - this.committedDb.findLeafIndices(MerkleTreeId.NULLIFIER_TREE, [nullifier.toBuffer()]).then(x => x[0]), + getNullifierIndices: nullifiers => + this.committedDb + .findLeafIndices(MerkleTreeId.NULLIFIER_TREE, nullifiers) + .then(x => x.filter(index => index !== undefined) as bigint[]), }; this.publicStateSource = { @@ -57,8 +59,7 @@ export class TxValidatorFactory { validatorForProcessedTxs(fork: MerkleTreeReadOperations): TxValidator { return new DoubleSpendTxValidator({ - getNullifierIndex: nullifier => - fork.findLeafIndices(MerkleTreeId.NULLIFIER_TREE, [nullifier.toBuffer()]).then(x => x[0]), + getNullifierIndices: nullifiers => fork.findLeafIndices(MerkleTreeId.NULLIFIER_TREE, nullifiers), }); } }