Skip to content

Commit

Permalink
feat: Max pending jobs in prover node (#8045)
Browse files Browse the repository at this point in the history
Adds a max number of pending block-proving jobs to the prover node. When
reached, the prover node stops proving, and resumes once a job finishes.
Also adds error handling to the creation of a new proving job, so if a
block-proving-job fails to start, it is skipped instead of blocking the
prover node.
  • Loading branch information
spalladino authored Aug 16, 2024
1 parent abc6b19 commit c857604
Show file tree
Hide file tree
Showing 10 changed files with 315 additions and 22 deletions.
10 changes: 8 additions & 2 deletions yarn-project/end-to-end/src/e2e_prover_node.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {
sleep,
} from '@aztec/aztec.js';
import { StatefulTestContract, TestContract } from '@aztec/noir-contracts.js';
import { createProverNode } from '@aztec/prover-node';
import { type ProverNodeConfig, createProverNode } from '@aztec/prover-node';

import { sendL1ToL2Message } from './fixtures/l1_to_l2_messaging.js';
import {
Expand Down Expand Up @@ -127,7 +127,13 @@ describe('e2e_prover_node', () => {
logger.info(`Creating prover node ${proverId.toString()}`);
// HACK: We have to use the existing archiver to fetch L2 data, since anvil's chain dump/load used by the
// snapshot manager does not include events nor txs, so a new archiver would not "see" old blocks.
const proverConfig = { ...ctx.aztecNodeConfig, txProviderNodeUrl: undefined, dataDirectory: undefined, proverId };
const proverConfig: ProverNodeConfig = {
...ctx.aztecNodeConfig,
txProviderNodeUrl: undefined,
dataDirectory: undefined,
proverId,
proverNodeMaxPendingJobs: 100,
};
const archiver = ctx.aztecNode.getBlockSource() as Archiver;
const proverNode = await createProverNode(proverConfig, { aztecNodeTxProvider: ctx.aztecNode, archiver });

Expand Down
4 changes: 3 additions & 1 deletion yarn-project/foundation/src/config/env_var.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,6 @@ export type EnvVar =
| 'BB_SKIP_CLEANUP'
| 'PXE_PROVER_ENABLED'
| 'VALIDATOR_PRIVATE_KEY'
| 'VALIDATOR_DISABLED';
| 'VALIDATOR_DISABLED'
| 'PROVER_NODE_DISABLE_AUTOMATIC_PROVING'
| 'PROVER_NODE_MAX_PENDING_JOBS';
1 change: 1 addition & 0 deletions yarn-project/prover-node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
"@aztec/sequencer-client": "workspace:^",
"@aztec/simulator": "workspace:^",
"@aztec/telemetry-client": "workspace:^",
"@aztec/types": "workspace:^",
"@aztec/world-state": "workspace:^",
"source-map-support": "^0.5.21",
"tslib": "^2.4.0"
Expand Down
29 changes: 27 additions & 2 deletions yarn-project/prover-node/src/config.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import { type ArchiverConfig, archiverConfigMappings, getArchiverConfigFromEnv } from '@aztec/archiver';
import { type ConfigMappingsType } from '@aztec/foundation/config';
import {
type ConfigMappingsType,
booleanConfigHelper,
getConfigFromMappings,
numberConfigHelper,
} from '@aztec/foundation/config';
import { type ProverClientConfig, getProverEnvVars, proverClientConfigMappings } from '@aztec/prover-client';
import {
type PublisherConfig,
Expand All @@ -18,7 +23,25 @@ export type ProverNodeConfig = ArchiverConfig &
WorldStateConfig &
PublisherConfig &
TxSenderConfig &
TxProviderConfig;
TxProviderConfig & {
proverNodeDisableAutomaticProving?: boolean;
proverNodeMaxPendingJobs?: number;
};

const specificProverNodeConfigMappings: ConfigMappingsType<
Pick<ProverNodeConfig, 'proverNodeDisableAutomaticProving' | 'proverNodeMaxPendingJobs'>
> = {
proverNodeDisableAutomaticProving: {
env: 'PROVER_NODE_DISABLE_AUTOMATIC_PROVING',
description: 'Whether to disable automatic proving of pending blocks seen on L1',
...booleanConfigHelper(false),
},
proverNodeMaxPendingJobs: {
env: 'PROVER_NODE_MAX_PENDING_JOBS',
description: 'The maximum number of pending jobs for the prover node',
...numberConfigHelper(100),
},
};

export const proverNodeConfigMappings: ConfigMappingsType<ProverNodeConfig> = {
...archiverConfigMappings,
Expand All @@ -27,6 +50,7 @@ export const proverNodeConfigMappings: ConfigMappingsType<ProverNodeConfig> = {
...getPublisherConfigMappings('PROVER'),
...getTxSenderConfigMappings('PROVER'),
...txProviderConfigMappings,
...specificProverNodeConfigMappings,
};

export function getProverNodeConfigFromEnv(): ProverNodeConfig {
Expand All @@ -37,5 +61,6 @@ export function getProverNodeConfigFromEnv(): ProverNodeConfig {
...getPublisherConfigFromEnv('PROVER'),
...getTxSenderConfigFromEnv('PROVER'),
...getTxProviderConfigFromEnv(),
...getConfigFromMappings(specificProverNodeConfigMappings),
};
}
4 changes: 4 additions & 0 deletions yarn-project/prover-node/src/factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,9 @@ export async function createProverNode(
txProvider,
simulationProvider,
telemetry,
{
disableAutomaticProving: config.proverNodeDisableAutomaticProving,
maxPendingJobs: config.proverNodeMaxPendingJobs,
},
);
}
29 changes: 22 additions & 7 deletions yarn-project/prover-node/src/job/block-proving-job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import { createDebugLogger } from '@aztec/foundation/log';
import { type L1Publisher } from '@aztec/sequencer-client';
import { type PublicProcessor, type PublicProcessorFactory } from '@aztec/simulator';

import * as crypto from 'node:crypto';

/**
* Job that grabs a range of blocks from the unfinalised chain from L1, gets their txs given their hashes,
* re-executes their public calls, generates a rollup proof, and submits it to L1. This job will update the
Expand All @@ -22,6 +24,7 @@ import { type PublicProcessor, type PublicProcessorFactory } from '@aztec/simula
export class BlockProvingJob {
private state: BlockProvingJobState = 'initialized';
private log = createDebugLogger('aztec:block-proving-job');
private uuid: string;

constructor(
private prover: BlockProver,
Expand All @@ -30,8 +33,14 @@ export class BlockProvingJob {
private l2BlockSource: L2BlockSource,
private l1ToL2MessageSource: L1ToL2MessageSource,
private txProvider: TxProvider,
private cleanUp: () => Promise<void> = () => Promise.resolve(),
) {}
private cleanUp: (job: BlockProvingJob) => Promise<void> = () => Promise.resolve(),
) {
this.uuid = crypto.randomUUID();
}

public getId(): string {
return this.uuid;
}

public getState(): BlockProvingJobState {
return this.state;
Expand All @@ -42,7 +51,7 @@ export class BlockProvingJob {
throw new Error(`Block ranges are not yet supported`);
}

this.log.info(`Starting block proving job`, { fromBlock, toBlock });
this.log.info(`Starting block proving job`, { fromBlock, toBlock, uuid: this.uuid });
this.state = 'processing';
try {
let historicalHeader = (await this.l2BlockSource.getBlock(fromBlock - 1))?.header;
Expand All @@ -61,6 +70,7 @@ export class BlockProvingJob {
nullifierTreeRoot: block.header.state.partial.nullifierTree.root,
publicDataTreeRoot: block.header.state.partial.publicDataTree.root,
historicalHeader: historicalHeader?.hash(),
uuid: this.uuid,
...globalVariables,
});

Expand All @@ -75,6 +85,7 @@ export class BlockProvingJob {
this.log.verbose(`Processed all txs for block`, {
blockNumber: block.number,
blockHash: block.hash().toString(),
uuid: this.uuid,
});

await this.prover.setBlockCompleted();
Expand All @@ -90,7 +101,7 @@ export class BlockProvingJob {
}

const { block, aggregationObject, proof } = await this.prover.finaliseBlock();
this.log.info(`Finalised proof for block range`, { fromBlock, toBlock });
this.log.info(`Finalised proof for block range`, { fromBlock, toBlock, uuid: this.uuid });

this.state = 'publishing-proof';
await this.publisher.submitProof(
Expand All @@ -100,17 +111,21 @@ export class BlockProvingJob {
aggregationObject,
proof,
);
this.log.info(`Submitted proof for block range`, { fromBlock, toBlock });
this.log.info(`Submitted proof for block range`, { fromBlock, toBlock, uuid: this.uuid });

this.state = 'completed';
} catch (err) {
this.log.error(`Error running block prover job: ${err}`);
this.log.error(`Error running block prover job`, err, { uuid: this.uuid });
this.state = 'failed';
} finally {
await this.cleanUp();
await this.cleanUp(this);
}
}

public stop() {
this.prover.cancelBlock();
}

private async getBlock(blockNumber: number): Promise<L2Block> {
const block = await this.l2BlockSource.getBlock(blockNumber);
if (!block) {
Expand Down
179 changes: 179 additions & 0 deletions yarn-project/prover-node/src/prover-node.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
import {
type L1ToL2MessageSource,
type L2BlockSource,
type MerkleTreeOperations,
type ProverClient,
type TxProvider,
} from '@aztec/circuit-types';
import { type L1Publisher } from '@aztec/sequencer-client';
import { type PublicProcessorFactory, type SimulationProvider } from '@aztec/simulator';
import { type TelemetryClient } from '@aztec/telemetry-client';
import { type ContractDataSource } from '@aztec/types/contracts';
import { WorldStateRunningState, type WorldStateSynchronizer } from '@aztec/world-state';

import { type MockProxy, mock } from 'jest-mock-extended';

import { type BlockProvingJob } from './job/block-proving-job.js';
import { ProverNode } from './prover-node.js';

describe('prover-node', () => {
let prover: MockProxy<ProverClient>;
let publisher: MockProxy<L1Publisher>;
let l2BlockSource: MockProxy<L2BlockSource>;
let l1ToL2MessageSource: MockProxy<L1ToL2MessageSource>;
let contractDataSource: MockProxy<ContractDataSource>;
let worldState: MockProxy<WorldStateSynchronizer>;
let txProvider: MockProxy<TxProvider>;
let simulator: MockProxy<SimulationProvider>;
let telemetryClient: MockProxy<TelemetryClient>;

let proverNode: TestProverNode;

// List of all jobs ever created by the test prover node and their dependencies
let jobs: {
job: MockProxy<BlockProvingJob>;
cleanUp: (job: BlockProvingJob) => Promise<void>;
db: MerkleTreeOperations;
}[];

beforeEach(() => {
prover = mock<ProverClient>();
publisher = mock<L1Publisher>();
l2BlockSource = mock<L2BlockSource>();
l1ToL2MessageSource = mock<L1ToL2MessageSource>();
contractDataSource = mock<ContractDataSource>();
worldState = mock<WorldStateSynchronizer>();
txProvider = mock<TxProvider>();
simulator = mock<SimulationProvider>();
telemetryClient = mock<TelemetryClient>();

// World state returns a new mock db every time it is asked to fork
worldState.syncImmediateAndFork.mockImplementation(() => Promise.resolve(mock<MerkleTreeOperations>()));

jobs = [];
proverNode = new TestProverNode(
prover,
publisher,
l2BlockSource,
l1ToL2MessageSource,
contractDataSource,
worldState,
txProvider,
simulator,
telemetryClient,
{ maxPendingJobs: 3, pollingIntervalMs: 10 },
);
});

afterEach(async () => {
await proverNode.stop();
});

const setBlockNumbers = (blockNumber: number, provenBlockNumber: number) => {
l2BlockSource.getBlockNumber.mockResolvedValue(blockNumber);
l2BlockSource.getProvenBlockNumber.mockResolvedValue(provenBlockNumber);
worldState.status.mockResolvedValue({ syncedToL2Block: provenBlockNumber, state: WorldStateRunningState.RUNNING });
};

it('proves pending blocks', async () => {
setBlockNumbers(5, 3);

await proverNode.work();
await proverNode.work();
await proverNode.work();

expect(jobs.length).toEqual(2);
expect(jobs[0].job.run).toHaveBeenCalledWith(4, 4);
expect(jobs[1].job.run).toHaveBeenCalledWith(5, 5);
});

it('stops proving when maximum jobs are reached', async () => {
setBlockNumbers(10, 3);

await proverNode.work();
await proverNode.work();
await proverNode.work();
await proverNode.work();

expect(jobs.length).toEqual(3);
expect(jobs[0].job.run).toHaveBeenCalledWith(4, 4);
expect(jobs[1].job.run).toHaveBeenCalledWith(5, 5);
expect(jobs[2].job.run).toHaveBeenCalledWith(6, 6);
});

it('reports on pending jobs', async () => {
setBlockNumbers(5, 3);

await proverNode.work();
await proverNode.work();

expect(jobs.length).toEqual(2);
expect(proverNode.getJobs().length).toEqual(2);
expect(proverNode.getJobs()).toEqual([
{ uuid: '0', status: 'processing' },
{ uuid: '1', status: 'processing' },
]);
});

it('cleans up jobs when completed', async () => {
setBlockNumbers(10, 3);

await proverNode.work();
await proverNode.work();
await proverNode.work();
await proverNode.work();

expect(jobs.length).toEqual(3);
expect(jobs[0].job.run).toHaveBeenCalledWith(4, 4);
expect(jobs[1].job.run).toHaveBeenCalledWith(5, 5);
expect(jobs[2].job.run).toHaveBeenCalledWith(6, 6);

expect(proverNode.getJobs().length).toEqual(3);

// Clean up the first job
await jobs[0].cleanUp(jobs[0].job);
expect(proverNode.getJobs().length).toEqual(2);
expect(jobs[0].db.delete).toHaveBeenCalled();

// Request another job to run and ensure it gets pushed
await proverNode.work();
expect(jobs.length).toEqual(4);
expect(jobs[3].job.run).toHaveBeenCalledWith(7, 7);
expect(proverNode.getJobs().length).toEqual(3);
expect(proverNode.getJobs().map(({ uuid }) => uuid)).toEqual(['1', '2', '3']);
});

it('moves forward when proving fails', async () => {
setBlockNumbers(10, 3);

// We trigger an error by setting world state past the block that the prover node will try proving
worldState.status.mockResolvedValue({ syncedToL2Block: 5, state: WorldStateRunningState.RUNNING });

// These two calls should return in failures
await proverNode.work();
await proverNode.work();
expect(jobs.length).toEqual(0);

// But now the prover node should move forward
await proverNode.work();
expect(jobs.length).toEqual(1);
expect(jobs[0].job.run).toHaveBeenCalledWith(6, 6);
});

class TestProverNode extends ProverNode {
protected override doCreateBlockProvingJob(
db: MerkleTreeOperations,
_publicProcessorFactory: PublicProcessorFactory,
cleanUp: (job: BlockProvingJob) => Promise<void>,
): BlockProvingJob {
const job = mock<BlockProvingJob>({ getState: () => 'processing' });
job.getId.mockReturnValue(jobs.length.toString());
jobs.push({ job, cleanUp, db });
return job;
}

public override work() {
return super.work();
}
}
});
Loading

0 comments on commit c857604

Please sign in to comment.