Skip to content

Commit

Permalink
feat: Validator deadline for reexecution
Browse files Browse the repository at this point in the history
Adds a reexecution deadline for validators, so they abort trying to
reexecute public txs while block building if too late into the slot.

Fixes #10959
  • Loading branch information
spalladino committed Jan 3, 2025
1 parent 1575590 commit 3d9c959
Show file tree
Hide file tree
Showing 9 changed files with 205 additions and 68 deletions.
1 change: 1 addition & 0 deletions yarn-project/end-to-end/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
"@aztec/simulator": "workspace:^",
"@aztec/telemetry-client": "workspace:^",
"@aztec/types": "workspace:^",
"@aztec/validator-client": "workspace:^",
"@aztec/world-state": "workspace:^",
"@iarna/toml": "^2.2.5",
"@jest/globals": "^29.5.0",
Expand Down
169 changes: 116 additions & 53 deletions yarn-project/end-to-end/src/e2e_p2p/reex.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import { type SentTx, sleep } from '@aztec/aztec.js';

/* eslint-disable-next-line no-restricted-imports */
import { BlockProposal, SignatureDomainSeperator, getHashedSignaturePayload } from '@aztec/circuit-types';
import { type PublicTxSimulator } from '@aztec/simulator';
import { ReExFailedTxsError, ReExStateMismatchError, ReExTimeoutError } from '@aztec/validator-client/errors';

import { beforeAll, describe, it, jest } from '@jest/globals';
import fs from 'fs';
Expand Down Expand Up @@ -31,6 +33,7 @@ describe('e2e_p2p_reex', () => {
basePort: BOOT_NODE_UDP_PORT,
// To collect metrics - run in aztec-packages `docker compose --profile metrics up` and set COLLECT_METRICS=true
metricsPort: shouldCollectMetrics(),
initialConfig: { enforceTimeTable: true },
});

t.logger.verbose('Setup account');
Expand All @@ -55,30 +58,10 @@ describe('e2e_p2p_reex', () => {
}
});

it('validators should re-execute transactions before attesting', async () => {
// create the bootstrap node for the network
if (!t.bootstrapNodeEnr) {
throw new Error('Bootstrap node ENR is not available');
}

t.ctx.aztecNodeConfig.validatorReexecute = true;

nodes = await createNodes(
t.ctx.aztecNodeConfig,
t.ctx.dateProvider,
t.bootstrapNodeEnr,
NUM_NODES,
BOOT_NODE_UDP_PORT,
DATA_DIR,
// To collect metrics - run in aztec-packages `docker compose --profile metrics up` and set COLLECT_METRICS=true
shouldCollectMetrics(),
);

// Hook into the node and intercept re-execution logic, ensuring that it was infact called
const reExecutionSpies = [];
for (const node of nodes) {
// Make sure the nodes submit faulty proposals, in this case a faulty proposal is one where we remove one of the transactions
// Such that the calculated archive will be different!
describe('validators re-execute transactions before attesting', () => {
// Make sure the nodes submit faulty proposals, in this case a faulty proposal is one where we remove one of the transactions
// Such that the calculated archive will be different!
const interceptBroadcastProposal = (node: AztecNodeService) => {
jest.spyOn((node as any).p2pClient, 'broadcastProposal').mockImplementation(async (...args: unknown[]) => {
// We remove one of the transactions, therefore the block root will be different!
const proposal = args[0] as BlockProposal;
Expand All @@ -97,40 +80,120 @@ describe('e2e_p2p_reex', () => {

return (node as any).p2pClient.p2pService.propagate(newProposal);
});
};

// Intercepts the simulator within the tx processor within the processor factory with the given function
// Only the processor for validators is intercepted, the one for the proposer is left untouched
// We abuse the fact that the proposer will always run before the validators
let interceptTxProcessorSimulatorCallCount = 0;
const interceptTxProcessorSimulator = (
node: AztecNodeService,
intercept: (simulator: PublicTxSimulator) => void,
) => {
const processorFactory = (node as any).sequencer.sequencer.publicProcessorFactory;
const originalCreate = processorFactory.create.bind(processorFactory);
jest
.spyOn((node as any).sequencer.sequencer.publicProcessorFactory, 'create')
.mockImplementation(async (...args: unknown[]) => {
interceptTxProcessorSimulatorCallCount++;
const processor = await originalCreate(...args);
if (interceptTxProcessorSimulatorCallCount > 1) {
t.logger.warn('Creating mocked processor factory');
const simulator = (processor as any).publicTxSimulator;
intercept(simulator);
} else {
t.logger.warn('Creating vanilla processor factory');
}
return processor;
});
};

// Have the public tx processor take an extra long time to process the tx, so the validator times out
const interceptTxProcessorWithTimeout = (node: AztecNodeService) => {
interceptTxProcessorSimulator(node, simulator => {
const anySimulator: any = simulator;
const originalProcess = anySimulator.process.bind(simulator);
jest.spyOn(anySimulator, 'process').mockImplementation(async (...args: unknown[]) => {
t.logger.warn('Public tx simulator sleeping for 40s to simulate timeout');
await sleep(40_000);
return originalProcess(...args);
});
});
};

// Have the public tx processor throw when processing a tx
const interceptTxProcessorWithFailure = (node: AztecNodeService) => {
interceptTxProcessorSimulator(node, simulator => {
const anySimulator: any = simulator;
jest.spyOn(anySimulator, 'process').mockImplementation(async () => {
t.logger.warn('Public tx simulator failing');
await sleep(1);
throw new Error(`Fake tx failure`);
});
});
};

it.each([
[new ReExStateMismatchError().message, interceptBroadcastProposal],
[new ReExTimeoutError().message, interceptTxProcessorWithTimeout],
[new ReExFailedTxsError(1).message, interceptTxProcessorWithFailure],
])('rejects proposal with %s', async (errMsg: string, nodeInterceptor: (node: AztecNodeService) => void) => {
// create the bootstrap node for the network
if (!t.bootstrapNodeEnr) {
throw new Error('Bootstrap node ENR is not available');
}

// Store re-execution spys node -> sequencer Client -> seqeuncer -> validator
const spy = jest.spyOn((node as any).sequencer.sequencer.validatorClient, 'reExecuteTransactions');
reExecutionSpies.push(spy);
}
t.ctx.aztecNodeConfig.validatorReexecute = true;

nodes = await createNodes(
t.ctx.aztecNodeConfig,
t.ctx.dateProvider,
t.bootstrapNodeEnr,
NUM_NODES,
BOOT_NODE_UDP_PORT,
DATA_DIR,
// To collect metrics - run in aztec-packages `docker compose --profile metrics up` and set COLLECT_METRICS=true
shouldCollectMetrics(),
);

// wait a bit for peers to discover each other
await sleep(4000);
// Hook into the node and intercept re-execution logic
const reExecutionSpies = [];
for (const node of nodes) {
nodeInterceptor(node);
// Collect re-execution spies node -> sequencer client -> sequencer -> validator
const spy = jest.spyOn((node as any).sequencer.sequencer.validatorClient, 'reExecuteTransactions');
reExecutionSpies.push(spy);
}

// Wait a bit for peers to discover each other
await sleep(4000);

nodes.forEach(node => {
node.getSequencer()?.updateSequencerConfig({
minTxsPerBlock: NUM_TXS_PER_NODE,
maxTxsPerBlock: NUM_TXS_PER_NODE,
nodes.forEach(node => {
node.getSequencer()?.updateSequencerConfig({
minTxsPerBlock: NUM_TXS_PER_NODE,
maxTxsPerBlock: NUM_TXS_PER_NODE,
});
});
});
const txs = await submitComplexTxsTo(t.logger, t.spamContract!, NUM_TXS_PER_NODE);

// We ensure that the transactions are NOT mined
try {
await Promise.all(
txs.map(async (tx: SentTx, i: number) => {
t.logger.info(`Waiting for tx ${i}: ${await tx.getTxHash()} to be mined`);
return tx.wait();
}),
);
} catch (e) {
t.logger.info('Failed to mine all txs, as planned');
}
const txs = await submitComplexTxsTo(t.logger, t.spamContract!, NUM_TXS_PER_NODE, { callPublic: true });

// We ensure that the transactions are NOT mined
try {
await Promise.all(
txs.map(async (tx: SentTx, i: number) => {
t.logger.info(`Waiting for tx ${i}: ${await tx.getTxHash()} to be mined`);
return tx.wait();
}),
);
} catch (e) {
t.logger.info('Failed to mine all txs, as planned');
}

// Expect that all of the re-execution attempts failed with an invalid root
for (const spy of reExecutionSpies) {
for (const result of spy.mock.results) {
await expect(result.value).rejects.toThrow('Validator Error: Re-execution state mismatch');
// Expect that all of the re-execution attempts failed with an invalid root
for (const spy of reExecutionSpies) {
for (const result of spy.mock.results) {
await expect(result.value).rejects.toThrow(errMsg);
}
}
}
});
});
});
11 changes: 9 additions & 2 deletions yarn-project/end-to-end/src/e2e_p2p/shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,20 @@ import { type PXEService, createPXEService, getPXEServiceConfig as getRpcConfig
import { type NodeContext } from '../fixtures/setup_p2p_test.js';

// submits a set of transactions to the provided Private eXecution Environment (PXE)
export const submitComplexTxsTo = async (logger: Logger, spamContract: SpamContract, numTxs: number) => {
export const submitComplexTxsTo = async (
logger: Logger,
spamContract: SpamContract,
numTxs: number,
opts: { callPublic?: boolean } = {},
) => {
const txs: SentTx[] = [];

const seed = 1234n;
const spamCount = 15;
for (let i = 0; i < numTxs; i++) {
const tx = spamContract.methods.spam(seed + BigInt(i * spamCount), spamCount, false).send();
const tx = spamContract.methods
.spam(seed + BigInt(i * spamCount), spamCount, !!opts.callPublic)
.send({ skipPublicSimulation: true });
const txHash = await tx.getTxHash();

logger.info(`Tx sent with hash ${txHash}`);
Expand Down
3 changes: 3 additions & 0 deletions yarn-project/end-to-end/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@
{
"path": "../types"
},
{
"path": "../validator-client"
},
{
"path": "../world-state"
}
Expand Down
49 changes: 39 additions & 10 deletions yarn-project/sequencer-client/src/sequencer/sequencer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import { type PublicProcessorFactory } from '@aztec/simulator';
import { Attributes, type TelemetryClient, type Tracer, trackSpan } from '@aztec/telemetry-client';
import { type ValidatorClient } from '@aztec/validator-client';

import assert from 'assert';

import { type GlobalVariableBuilder } from '../global_variable_builder/global_builder.js';
import { type L1Publisher, VoteType } from '../publisher/l1-publisher.js';
import { prettyLogViemErrorMsg } from '../publisher/utils.js';
Expand Down Expand Up @@ -86,6 +88,7 @@ export class Sequencer {
private maxBlockSizeInBytes: number = 1024 * 1024;
private maxBlockGas: Gas = new Gas(10e9, 10e9);
private processTxTime: number = 12;
private attestationPropagationTime: number = 2;
private metrics: SequencerMetrics;
private isFlushing: boolean = false;

Expand Down Expand Up @@ -183,27 +186,29 @@ export class Sequencer {
// How long it takes to get ready to start building
const blockPrepareTime = 1;

// How long it takes to for attestations to travel across the p2p layer.
// How long it takes to for proposals and attestations to travel across the p2p layer (one-way)
const attestationPropagationTime = 2;
this.attestationPropagationTime = attestationPropagationTime;

// How long it takes to get a published block into L1. L1 builders typically accept txs up to 4 seconds into their slot,
// but we'll timeout sooner to give it more time to propagate (remember we also have blobs!). Still, when working in anvil,
// we can just post in the very last second of the L1 slot.
// we can just post in the very last second of the L1 slot and still expect the tx to be accepted.
const l1PublishingTime = this.l1Constants.ethereumSlotDuration - this.maxL1TxInclusionTimeIntoSlot;

// How much time we spend validating and processing a block after building it
// How much time we spend validating and processing a block after building it,
// and assembling the proposal to send to attestors
const blockValidationTime = 1;

// How much time we have left in the slot for actually processing txs and building the block.
const remainingTimeInSlot =
this.aztecSlotDuration -
initialTime -
blockPrepareTime -
l1PublishingTime -
blockValidationTime -
2 * attestationPropagationTime -
blockValidationTime;
l1PublishingTime;

// Check that numbers make sense
// Check that we actually have time left for processing txs
if (this.enforceTimeTable && remainingTimeInSlot < 0) {
throw new Error(`Not enough time for block building in ${this.aztecSlotDuration}s slot`);
}
Expand All @@ -212,6 +217,22 @@ export class Sequencer {
const processTxsTime = remainingTimeInSlot / 2;
this.processTxTime = processTxsTime;

// Sanity check
const totalSlotTime =
initialTime + // Archiver, world-state, and p2p sync
blockPrepareTime + // Setup globals, initial checks, etc
processTxsTime + // Processing public txs for building the block
blockValidationTime + // Validating the block produced
attestationPropagationTime + // Propagating the block proposal to validators
processTxsTime + // Validators run public txs before signing
attestationPropagationTime + // Attestations fly back to the proposer
l1PublishingTime; // The publish tx sits on the L1 mempool waiting to be picked up

assert(
totalSlotTime === this.aztecSlotDuration,
`Computed total slot time does not match slot duration: ${totalSlotTime}s`,
);

const newTimeTable: Record<SequencerState, number> = {
// No checks needed for any of these transitions
[SequencerState.STOPPED]: this.aztecSlotDuration,
Expand Down Expand Up @@ -493,12 +514,19 @@ export class Sequencer {
const blockBuilder = this.blockBuilderFactory.create(orchestratorFork);
await blockBuilder.startNewBlock(newGlobalVariables, l1ToL2Messages);

// We set the deadline for tx processing to the start of the CREATING_BLOCK phase, plus the expected time for tx processing.
// When building a block as a proposer, we set the deadline for tx processing to the start of the
// CREATING_BLOCK phase, plus the expected time for tx processing. When validating, we start counting
// the time for tx processing from the start of the COLLECTING_ATTESTATIONS phase plus the attestation
// propagation time. See the comments in setTimeTable for more details.
const processingEndTimeWithinSlot = opts.validateOnly
? this.timeTable[SequencerState.COLLECTING_ATTESTATIONS] + this.attestationPropagationTime + this.processTxTime
: this.timeTable[SequencerState.CREATING_BLOCK] + this.processTxTime;

// Deadline is only set if enforceTimeTable is enabled.
const processingEndTimeWithinSlot = this.timeTable[SequencerState.CREATING_BLOCK] + this.processTxTime;
const deadline = this.enforceTimeTable
? new Date((this.getSlotStartTimestamp(slot) + processingEndTimeWithinSlot) * 1000)
: undefined;

this.log.verbose(`Processing pending txs`, {
slot,
slotStart: new Date(this.getSlotStartTimestamp(slot) * 1000),
Expand All @@ -514,15 +542,15 @@ export class Sequencer {
this.allowedInSetup,
);

// REFACTOR: Public processor should just handle processing, one tx at a time. It should be responsibility
// TODO(#11000): Public processor should just handle processing, one tx at a time. It should be responsibility
// of the sequencer to update world state and iterate over txs. We should refactor this along with unifying the
// publicProcessorFork and orchestratorFork, to avoid doing tree insertions twice when building the block.
const limits = { deadline, maxTransactions: this.maxTxsPerBlock, maxBlockSize: this.maxBlockSizeInBytes };
const [publicProcessorDuration, [processedTxs, failedTxs]] = await elapsed(() =>
processor.process(pendingTxs, limits, validators),
);

if (failedTxs.length > 0) {
if (!opts.validateOnly && failedTxs.length > 0) {
const failedTxData = failedTxs.map(fail => fail.tx);
this.log.verbose(`Dropping failed txs ${Tx.getHashes(failedTxData).join(', ')}`);
await this.p2pClient.deleteTxs(Tx.getHashes(failedTxData));
Expand Down Expand Up @@ -555,6 +583,7 @@ export class Sequencer {
publicProcessorDuration,
numMsgs: l1ToL2Messages.length,
numTxs: processedTxs.length,
numFailedTxs: failedTxs.length,
blockBuildingTimer,
};
} finally {
Expand Down
3 changes: 2 additions & 1 deletion yarn-project/validator-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
"type": "module",
"exports": {
".": "./dest/index.js",
"./config": "./dest/config.js"
"./config": "./dest/config.js",
"./errors": "./dest/errors/index.js"
},
"bin": "./dest/bin/index.js",
"typedocOptions": {
Expand Down
12 changes: 12 additions & 0 deletions yarn-project/validator-client/src/errors/validator.error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,18 @@ export class ReExStateMismatchError extends ValidatorError {
}
}

export class ReExFailedTxsError extends ValidatorError {
constructor(numFailedTxs: number) {
super(`Re-execution failed to process ${numFailedTxs} txs`);
}
}

export class ReExTimeoutError extends ValidatorError {
constructor() {
super('Re-execution timed out or failed to process all txs in the proposal');
}
}

export class BlockBuilderNotProvidedError extends ValidatorError {
constructor() {
super('Block builder not provided');
Expand Down
Loading

0 comments on commit 3d9c959

Please sign in to comment.