Skip to content

Commit

Permalink
feat(p2p): activate gossipsub tx validators (#10695)
Browse files Browse the repository at this point in the history
  • Loading branch information
Maddiaa0 authored Dec 17, 2024
1 parent 923826a commit 9cce2c6
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 80 deletions.
4 changes: 1 addition & 3 deletions yarn-project/aztec-node/src/aztec-node/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -855,9 +855,7 @@ export class AztecNodeService implements AztecNode, Traceable {
new DataTxValidator(),
new MetadataTxValidator(new Fr(this.l1ChainId), new Fr(blockNumber)),
new DoubleSpendTxValidator({
getNullifierIndex(nullifier) {
return db.findLeafIndices(MerkleTreeId.NULLIFIER_TREE, [nullifier.toBuffer()]).then(x => x[0]);
},
getNullifierIndices: nullifiers => db.findLeafIndices(MerkleTreeId.NULLIFIER_TREE, nullifiers),
}),
];

Expand Down
218 changes: 156 additions & 62 deletions yarn-project/p2p/src/services/libp2p/libp2p_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import { createPeerScoreParams, createTopicScoreParams } from '@chainsafe/libp2p
import { noise } from '@chainsafe/libp2p-noise';
import { yamux } from '@chainsafe/libp2p-yamux';
import { identify } from '@libp2p/identify';
import type { PeerId } from '@libp2p/interface';
import { type Message, type PeerId, TopicValidatorResult } from '@libp2p/interface';
import '@libp2p/kad-dht';
import { mplex } from '@libp2p/mplex';
import { tcp } from '@libp2p/tcp';
Expand Down Expand Up @@ -62,6 +62,21 @@ import {
import { ReqResp } from '../reqresp/reqresp.js';
import type { P2PService, PeerDiscoveryService } from '../service.js';

interface MessageValidator {
validator: {
validateTx(tx: Tx): Promise<boolean>;
};
severity: PeerErrorSeverity;
}

interface ValidationResult {
name: string;
isValid: boolean;
severity: PeerErrorSeverity;
}

type ValidationOutcome = { allPassed: true } | { allPassed: false; failure: ValidationResult };

/**
* Lib P2P implementation of the P2PService interface.
*/
Expand Down Expand Up @@ -137,12 +152,15 @@ export class LibP2PService<T extends P2PClientType> extends WithTracer implement
this.subscribeToTopic(TopicTypeMap[topic].p2pTopic);
}

// Add p2p topic validators
this.node.services.pubsub.topicValidators.set(Tx.p2pTopic, this.validatePropagatedTxFromMessage.bind(this));

// add GossipSub listener
this.node.services.pubsub.addEventListener('gossipsub:message', async e => {
const { msg, propagationSource: peerId } = e.detail;
const { msg } = e.detail;
this.logger.trace(`Received PUBSUB message.`);

await this.jobQueue.put(() => this.handleNewGossipMessage(msg, peerId));
await this.jobQueue.put(() => this.handleNewGossipMessage(msg));
});

// Start running promise for peer discovery
Expand Down Expand Up @@ -256,6 +274,7 @@ export class LibP2PService<T extends P2PClientType> extends WithTracer implement
dataTransform: new SnappyTransform(),
metricsRegister: otelMetricsAdapter,
metricsTopicStrToLabel: metricsTopicStrToLabels(),
asyncValidation: true,
scoreParams: createPeerScoreParams({
topics: {
[Tx.p2pTopic]: createTopicScoreParams({
Expand Down Expand Up @@ -382,10 +401,10 @@ export class LibP2PService<T extends P2PClientType> extends WithTracer implement
* @param topic - The message's topic.
* @param data - The message data
*/
private async handleNewGossipMessage(message: RawGossipMessage, peerId: PeerId) {
private async handleNewGossipMessage(message: RawGossipMessage) {
if (message.topic === Tx.p2pTopic) {
const tx = Tx.fromBuffer(Buffer.from(message.data));
await this.processTxFromPeer(tx, peerId);
await this.processTxFromPeer(tx);
}
if (message.topic === BlockAttestation.p2pTopic && this.clientType === P2PClientType.Full) {
const attestation = BlockAttestation.fromBuffer(Buffer.from(message.data));
Expand Down Expand Up @@ -474,16 +493,11 @@ export class LibP2PService<T extends P2PClientType> extends WithTracer implement
});
}

private async processTxFromPeer(tx: Tx, peerId: PeerId): Promise<void> {
private async processTxFromPeer(tx: Tx): Promise<void> {
const txHash = tx.getTxHash();
const txHashString = txHash.toString();
this.logger.verbose(`Received tx ${txHashString} from external peer.`);

const isValidTx = await this.validatePropagatedTx(tx, peerId);

if (isValidTx) {
await this.mempools.txPool.addTxs([tx]);
}
await this.mempools.txPool.addTxs([tx]);
}

/**
Expand Down Expand Up @@ -523,70 +537,150 @@ export class LibP2PService<T extends P2PClientType> extends WithTracer implement
return true;
}

private async validatePropagatedTxFromMessage(
propagationSource: PeerId,
msg: Message,
): Promise<TopicValidatorResult> {
const tx = Tx.fromBuffer(Buffer.from(msg.data));
const isValid = await this.validatePropagatedTx(tx, propagationSource);
this.logger.trace(`validatePropagatedTx: ${isValid}`, {
[Attributes.TX_HASH]: tx.getTxHash().toString(),
[Attributes.P2P_ID]: propagationSource.toString(),
});
return isValid ? TopicValidatorResult.Accept : TopicValidatorResult.Reject;
}

/**
* Validate a tx that has been propagated from a peer.
* @param tx - The tx to validate.
* @param peerId - The peer ID of the peer that sent the tx.
* @returns True if the tx is valid, false otherwise.
*/
@trackSpan('Libp2pService.validatePropagatedTx', tx => ({
[Attributes.TX_HASH]: tx.getTxHash().toString(),
}))
private async validatePropagatedTx(tx: Tx, peerId: PeerId): Promise<boolean> {
const blockNumber = (await this.l2BlockSource.getBlockNumber()) + 1;
// basic data validation
const dataValidator = new DataTxValidator();
const validData = await dataValidator.validateTx(tx);
if (!validData) {
// penalize
this.node.services.pubsub.score.markInvalidMessageDelivery(peerId.toString(), Tx.p2pTopic);
return false;
const messageValidators = this.createMessageValidators(blockNumber);
const outcome = await this.runValidations(tx, messageValidators);

if (outcome.allPassed) {
return true;
}

// metadata validation
const metadataValidator = new MetadataTxValidator(new Fr(this.config.l1ChainId), new Fr(blockNumber));
const validMetadata = await metadataValidator.validateTx(tx);
if (!validMetadata) {
// penalize
this.node.services.pubsub.score.markInvalidMessageDelivery(peerId.toString(), Tx.p2pTopic);
return false;
const { name, severity } = outcome.failure;

// Double spend validator has a special case handler
if (name === 'doubleSpendValidator') {
const isValid = await this.handleDoubleSpendFailure(tx, blockNumber, peerId);
if (isValid) {
return true;
}
}

// double spend validation
const doubleSpendValidator = new DoubleSpendTxValidator({
getNullifierIndex: async (nullifier: Fr) => {
const merkleTree = this.worldStateSynchronizer.getCommitted();
const index = (await merkleTree.findLeafIndices(MerkleTreeId.NULLIFIER_TREE, [nullifier.toBuffer()]))[0];
return index;
this.peerManager.penalizePeer(peerId, severity);
return false;
}

/**
* Create message validators for the given block number.
*
* Each validator is a pair of a validator and a severity.
* If a validator fails, the peer is penalized with the severity of the validator.
*
* @param blockNumber - The block number to create validators for.
* @returns The message validators.
*/
private createMessageValidators(blockNumber: number): Record<string, MessageValidator> {
return {
dataValidator: {
validator: new DataTxValidator(),
severity: PeerErrorSeverity.HighToleranceError,
},
});
const validDoubleSpend = await doubleSpendValidator.validateTx(tx);
if (!validDoubleSpend) {
// check if nullifier is older than 20 blocks
if (blockNumber - this.config.severePeerPenaltyBlockLength > 0) {
const snapshotValidator = new DoubleSpendTxValidator({
getNullifierIndex: async (nullifier: Fr) => {
const merkleTree = this.worldStateSynchronizer.getSnapshot(
blockNumber - this.config.severePeerPenaltyBlockLength,
);
const index = (await merkleTree.findLeafIndices(MerkleTreeId.NULLIFIER_TREE, [nullifier.toBuffer()]))[0];
return index;
metadataValidator: {
validator: new MetadataTxValidator(new Fr(this.config.l1ChainId), new Fr(blockNumber)),
severity: PeerErrorSeverity.HighToleranceError,
},
proofValidator: {
validator: new TxProofValidator(this.proofVerifier),
severity: PeerErrorSeverity.MidToleranceError,
},
doubleSpendValidator: {
validator: new DoubleSpendTxValidator({
getNullifierIndices: (nullifiers: Buffer[]) => {
const merkleTree = this.worldStateSynchronizer.getCommitted();
return merkleTree.findLeafIndices(MerkleTreeId.NULLIFIER_TREE, nullifiers);
},
});

const validSnapshot = await snapshotValidator.validateTx(tx);
// High penalty if nullifier is older than 20 blocks
if (!validSnapshot) {
// penalize
this.peerManager.penalizePeer(peerId, PeerErrorSeverity.LowToleranceError);
return false;
}
}
// penalize
this.peerManager.penalizePeer(peerId, PeerErrorSeverity.HighToleranceError);
}),
severity: PeerErrorSeverity.HighToleranceError,
},
};
}

/**
* Run validations on a tx.
* @param tx - The tx to validate.
* @param messageValidators - The message validators to run.
* @returns The validation outcome.
*/
private async runValidations(
tx: Tx,
messageValidators: Record<string, MessageValidator>,
): Promise<ValidationOutcome> {
const validationPromises = Object.entries(messageValidators).map(async ([name, { validator, severity }]) => {
const isValid = await validator.validateTx(tx);
return { name, isValid, severity };
});

// A promise that resolves when all validations have been run
const allValidations = Promise.all(validationPromises);

// A promise that resolves when the first validation fails
const firstFailure = Promise.race(
validationPromises.map(async promise => {
const result = await promise;
return result.isValid ? new Promise(() => {}) : result;
}),
);

// Wait for the first validation to fail or all validations to pass
const result = await Promise.race([
allValidations.then(() => ({ allPassed: true as const })),
firstFailure.then(failure => ({ allPassed: false as const, failure: failure as ValidationResult })),
]);

// If all validations pass, allPassed will be true, if failed, then the failure will be the first validation to fail
return result;
}

/**
* Handle a double spend failure.
*
* Double spend failures are managed on their own because they are a special case.
* We must check if the double spend is recent or old, if it is past a threshold, then we heavily penalize the peer.
*
* @param tx - The tx that failed the double spend validator.
* @param blockNumber - The block number of the tx.
* @param peerId - The peer ID of the peer that sent the tx.
* @returns True if the tx is valid, false otherwise.
*/
private async handleDoubleSpendFailure(tx: Tx, blockNumber: number, peerId: PeerId): Promise<boolean> {
if (blockNumber <= this.config.severePeerPenaltyBlockLength) {
return false;
}

// proof validation
const proofValidator = new TxProofValidator(this.proofVerifier);
const validProof = await proofValidator.validateTx(tx);
if (!validProof) {
// penalize
this.peerManager.penalizePeer(peerId, PeerErrorSeverity.MidToleranceError);
const snapshotValidator = new DoubleSpendTxValidator({
getNullifierIndices: (nullifiers: Buffer[]) => {
const merkleTree = this.worldStateSynchronizer.getSnapshot(
blockNumber - this.config.severePeerPenaltyBlockLength,
);
return merkleTree.findLeafIndices(MerkleTreeId.NULLIFIER_TREE, nullifiers);
},
});

const validSnapshot = await snapshotValidator.validateTx(tx);
if (!validSnapshot) {
this.peerManager.penalizePeer(peerId, PeerErrorSeverity.LowToleranceError);
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ describe('DoubleSpendTxValidator', () => {

beforeEach(() => {
nullifierSource = mock<NullifierSource>({
getNullifierIndex: mockFn().mockImplementation(() => {
return Promise.resolve(undefined);
getNullifierIndices: mockFn().mockImplementation(() => {
return Promise.resolve([undefined]);
}),
});
txValidator = new DoubleSpendTxValidator(nullifierSource);
Expand Down Expand Up @@ -48,7 +48,7 @@ describe('DoubleSpendTxValidator', () => {

it('rejects duplicates against history', async () => {
const badTx = mockTx();
nullifierSource.getNullifierIndex.mockReturnValueOnce(Promise.resolve(1n));
nullifierSource.getNullifierIndices.mockReturnValueOnce(Promise.resolve([1n]));
await expect(txValidator.validateTxs([badTx])).resolves.toEqual([[], [badTx]]);
});
});
14 changes: 6 additions & 8 deletions yarn-project/p2p/src/tx_validator/double_spend_validator.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import { type AnyTx, Tx, type TxValidator } from '@aztec/circuit-types';
import { Fr } from '@aztec/circuits.js';
import { createLogger } from '@aztec/foundation/log';

export interface NullifierSource {
getNullifierIndex: (nullifier: Fr) => Promise<bigint | undefined>;
getNullifierIndices: (nullifiers: Buffer[]) => Promise<(bigint | undefined)[]>;
}

export class DoubleSpendTxValidator<T extends AnyTx> implements TxValidator<T> {
Expand Down Expand Up @@ -36,9 +35,7 @@ export class DoubleSpendTxValidator<T extends AnyTx> implements TxValidator<T> {
}

async #uniqueNullifiers(tx: AnyTx, thisBlockNullifiers: Set<bigint>): Promise<boolean> {
const nullifiers = (tx instanceof Tx ? tx.data.getNonEmptyNullifiers() : tx.txEffect.nullifiers).map(x =>
x.toBigInt(),
);
const nullifiers = tx instanceof Tx ? tx.data.getNonEmptyNullifiers() : tx.txEffect.nullifiers;

// Ditch this tx if it has repeated nullifiers
const uniqueNullifiers = new Set(nullifiers);
Expand All @@ -49,16 +46,17 @@ export class DoubleSpendTxValidator<T extends AnyTx> implements TxValidator<T> {

if (this.isValidatingBlock) {
for (const nullifier of nullifiers) {
if (thisBlockNullifiers.has(nullifier)) {
const nullifierBigInt = nullifier.toBigInt();
if (thisBlockNullifiers.has(nullifierBigInt)) {
this.#log.warn(`Rejecting tx ${Tx.getHash(tx)} for repeating a nullifier in the same block`);
return false;
}

thisBlockNullifiers.add(nullifier);
thisBlockNullifiers.add(nullifierBigInt);
}
}

const nullifierIndexes = await Promise.all(nullifiers.map(n => this.#nullifierSource.getNullifierIndex(new Fr(n))));
const nullifierIndexes = await this.#nullifierSource.getNullifierIndices(nullifiers.map(n => n.toBuffer()));

const hasDuplicates = nullifierIndexes.some(index => index !== undefined);
if (hasDuplicates) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ export class TxValidatorFactory {
private enforceFees: boolean,
) {
this.nullifierSource = {
getNullifierIndex: nullifier =>
this.committedDb.findLeafIndices(MerkleTreeId.NULLIFIER_TREE, [nullifier.toBuffer()]).then(x => x[0]),
getNullifierIndices: nullifiers =>
this.committedDb
.findLeafIndices(MerkleTreeId.NULLIFIER_TREE, nullifiers)
.then(x => x.filter(index => index !== undefined) as bigint[]),
};

this.publicStateSource = {
Expand All @@ -57,8 +59,7 @@ export class TxValidatorFactory {

validatorForProcessedTxs(fork: MerkleTreeReadOperations): TxValidator<ProcessedTx> {
return new DoubleSpendTxValidator({
getNullifierIndex: nullifier =>
fork.findLeafIndices(MerkleTreeId.NULLIFIER_TREE, [nullifier.toBuffer()]).then(x => x[0]),
getNullifierIndices: nullifiers => fork.findLeafIndices(MerkleTreeId.NULLIFIER_TREE, nullifiers),
});
}
}

0 comments on commit 9cce2c6

Please sign in to comment.