From da711bf201bde1921f46728e996ddb88f0e18708 Mon Sep 17 00:00:00 2001 From: Alex Gherghisan Date: Fri, 15 Nov 2024 13:27:04 +0000 Subject: [PATCH] feat: new proving broker implementation (#9400) Reopening of #8609, which was closed/merged by mistake. This PR is stacked on top of #9391 This PR adds ProvingBroker which implements a new interface for distributing proving jobs to workers as specified in https://github.com/AztecProtocol/aztec-packages/issues/8495 --- .../src/interfaces/proving-job.ts | 144 +++ .../private_kernel_empty_inputs.test.ts | 34 +- .../kernel/private_kernel_empty_inputs.ts | 34 +- .../src/proving_broker/proving_broker.test.ts | 1086 +++++++++++++++++ .../src/proving_broker/proving_broker.ts | 372 ++++++ .../proving_broker/proving_broker_database.ts | 81 ++ .../proving_broker_interface.ts | 74 ++ 7 files changed, 1821 insertions(+), 4 deletions(-) create mode 100644 yarn-project/prover-client/src/proving_broker/proving_broker.test.ts create mode 100644 yarn-project/prover-client/src/proving_broker/proving_broker.ts create mode 100644 yarn-project/prover-client/src/proving_broker/proving_broker_database.ts create mode 100644 yarn-project/prover-client/src/proving_broker/proving_broker_interface.ts diff --git a/yarn-project/circuit-types/src/interfaces/proving-job.ts b/yarn-project/circuit-types/src/interfaces/proving-job.ts index ea4851fb3fa..7c0643192c8 100644 --- a/yarn-project/circuit-types/src/interfaces/proving-job.ts +++ b/yarn-project/circuit-types/src/interfaces/proving-job.ts @@ -237,3 +237,147 @@ export const ProvingRequestResultSchema = z.discriminatedUnion('type', [ result: schemaForRecursiveProofAndVerificationKey(TUBE_PROOF_LENGTH), }), ]) satisfies ZodFor; + +export const V2ProvingJobId = z.string().brand('ProvingJobId'); +export type V2ProvingJobId = z.infer; + +export const V2ProvingJob = z.discriminatedUnion('type', [ + z.object({ + id: V2ProvingJobId, + blockNumber: z.number(), + type: z.literal(ProvingRequestType.PUBLIC_VM), + inputs: AvmCircuitInputs.schema, + }), + z.object({ + id: V2ProvingJobId, + blockNumber: z.number(), + type: z.literal(ProvingRequestType.BASE_PARITY), + inputs: BaseParityInputs.schema, + }), + z.object({ + id: V2ProvingJobId, + blockNumber: z.number(), + type: z.literal(ProvingRequestType.ROOT_PARITY), + inputs: RootParityInputs.schema, + }), + z.object({ + id: V2ProvingJobId, + blockNumber: z.number(), + type: z.literal(ProvingRequestType.PRIVATE_BASE_ROLLUP), + inputs: PrivateBaseRollupInputs.schema, + }), + z.object({ + id: V2ProvingJobId, + blockNumber: z.number(), + type: z.literal(ProvingRequestType.PUBLIC_BASE_ROLLUP), + inputs: PublicBaseRollupInputs.schema, + }), + z.object({ + id: V2ProvingJobId, + blockNumber: z.number(), + type: z.literal(ProvingRequestType.MERGE_ROLLUP), + inputs: MergeRollupInputs.schema, + }), + z.object({ + id: V2ProvingJobId, + blockNumber: z.number(), + type: z.literal(ProvingRequestType.BLOCK_ROOT_ROLLUP), + inputs: BlockRootRollupInputs.schema, + }), + z.object({ + id: V2ProvingJobId, + blockNumber: z.number(), + type: z.literal(ProvingRequestType.EMPTY_BLOCK_ROOT_ROLLUP), + inputs: EmptyBlockRootRollupInputs.schema, + }), + z.object({ + id: V2ProvingJobId, + blockNumber: z.number(), + type: z.literal(ProvingRequestType.BLOCK_MERGE_ROLLUP), + inputs: BlockMergeRollupInputs.schema, + }), + z.object({ + id: V2ProvingJobId, + blockNumber: z.number(), + type: z.literal(ProvingRequestType.ROOT_ROLLUP), + inputs: RootRollupInputs.schema, + }), + z.object({ + id: V2ProvingJobId, + blockNumber: z.number(), + type: z.literal(ProvingRequestType.PRIVATE_KERNEL_EMPTY), + inputs: PrivateKernelEmptyInputData.schema, + }), + z.object({ + id: V2ProvingJobId, + blockNumber: z.number(), + type: z.literal(ProvingRequestType.TUBE_PROOF), + inputs: TubeInputs.schema, + }), +]); +export type V2ProvingJob = z.infer; + +export const V2ProofOutput = z.discriminatedUnion('type', [ + z.object({ + type: z.literal(ProvingRequestType.PRIVATE_KERNEL_EMPTY), + value: schemaForPublicInputsAndRecursiveProof(KernelCircuitPublicInputs.schema), + }), + z.object({ + type: z.literal(ProvingRequestType.PUBLIC_VM), + value: schemaForRecursiveProofAndVerificationKey(AVM_PROOF_LENGTH_IN_FIELDS), + }), + z.object({ + type: z.literal(ProvingRequestType.PRIVATE_BASE_ROLLUP), + value: schemaForPublicInputsAndRecursiveProof(BaseOrMergeRollupPublicInputs.schema), + }), + z.object({ + type: z.literal(ProvingRequestType.PUBLIC_BASE_ROLLUP), + value: schemaForPublicInputsAndRecursiveProof(BaseOrMergeRollupPublicInputs.schema), + }), + z.object({ + type: z.literal(ProvingRequestType.MERGE_ROLLUP), + value: schemaForPublicInputsAndRecursiveProof(BaseOrMergeRollupPublicInputs.schema), + }), + z.object({ + type: z.literal(ProvingRequestType.EMPTY_BLOCK_ROOT_ROLLUP), + value: schemaForPublicInputsAndRecursiveProof(BlockRootOrBlockMergePublicInputs.schema), + }), + z.object({ + type: z.literal(ProvingRequestType.BLOCK_ROOT_ROLLUP), + value: schemaForPublicInputsAndRecursiveProof(BlockRootOrBlockMergePublicInputs.schema), + }), + z.object({ + type: z.literal(ProvingRequestType.BLOCK_MERGE_ROLLUP), + value: schemaForPublicInputsAndRecursiveProof(BlockRootOrBlockMergePublicInputs.schema), + }), + z.object({ + type: z.literal(ProvingRequestType.ROOT_ROLLUP), + value: schemaForPublicInputsAndRecursiveProof(RootRollupPublicInputs.schema), + }), + z.object({ + type: z.literal(ProvingRequestType.BASE_PARITY), + value: schemaForPublicInputsAndRecursiveProof(ParityPublicInputs.schema, RECURSIVE_PROOF_LENGTH), + }), + z.object({ + type: z.literal(ProvingRequestType.ROOT_PARITY), + value: schemaForPublicInputsAndRecursiveProof(ParityPublicInputs.schema, NESTED_RECURSIVE_PROOF_LENGTH), + }), + z.object({ + type: z.literal(ProvingRequestType.TUBE_PROOF), + value: schemaForRecursiveProofAndVerificationKey(TUBE_PROOF_LENGTH), + }), +]); + +export type V2ProofOutput = z.infer; + +export const V2ProvingJobStatus = z.discriminatedUnion('status', [ + z.object({ status: z.literal('in-queue') }), + z.object({ status: z.literal('in-progress') }), + z.object({ status: z.literal('not-found') }), + z.object({ status: z.literal('resolved'), value: V2ProofOutput }), + z.object({ status: z.literal('rejected'), error: z.string() }), +]); +export type V2ProvingJobStatus = z.infer; + +export const V2ProvingJobResult = z.union([z.object({ value: V2ProofOutput }), z.object({ error: z.string() })]); +export type V2ProvingJobResult = z.infer; diff --git a/yarn-project/circuits.js/src/structs/kernel/private_kernel_empty_inputs.test.ts b/yarn-project/circuits.js/src/structs/kernel/private_kernel_empty_inputs.test.ts index c6652414529..1eb9eb51fb9 100644 --- a/yarn-project/circuits.js/src/structs/kernel/private_kernel_empty_inputs.test.ts +++ b/yarn-project/circuits.js/src/structs/kernel/private_kernel_empty_inputs.test.ts @@ -1,7 +1,14 @@ import { Fr } from '@aztec/foundation/fields'; +import { NESTED_RECURSIVE_PROOF_LENGTH } from '../../constants.gen.js'; import { makeHeader } from '../../tests/factories.js'; -import { PrivateKernelEmptyInputData } from './private_kernel_empty_inputs.js'; +import { makeRecursiveProof } from '../recursive_proof.js'; +import { VerificationKeyAsFields } from '../verification_key.js'; +import { + EmptyNestedData, + PrivateKernelEmptyInputData, + PrivateKernelEmptyInputs, +} from './private_kernel_empty_inputs.js'; describe('PrivateKernelEmptyInputData', () => { it('serializes and deserializes', () => { @@ -9,3 +16,28 @@ describe('PrivateKernelEmptyInputData', () => { expect(PrivateKernelEmptyInputData.fromString(obj.toString())).toEqual(obj); }); }); + +describe('PrivateKernelEmptyInputs', () => { + it('serializes and deserializes', () => { + const obj = new PrivateKernelEmptyInputs( + new EmptyNestedData(makeRecursiveProof(NESTED_RECURSIVE_PROOF_LENGTH), VerificationKeyAsFields.makeFakeHonk()), + makeHeader(), + Fr.random(), + Fr.random(), + Fr.random(), + Fr.random(), + ); + + expect(PrivateKernelEmptyInputs.fromBuffer(obj.toBuffer())).toEqual(obj); + }); +}); + +describe('EmptyNestedData', () => { + it('serializes and deserializes', () => { + const obj = new EmptyNestedData( + makeRecursiveProof(NESTED_RECURSIVE_PROOF_LENGTH), + VerificationKeyAsFields.makeFakeHonk(), + ); + expect(EmptyNestedData.fromBuffer(obj.toBuffer())).toEqual(obj); + }); +}); diff --git a/yarn-project/circuits.js/src/structs/kernel/private_kernel_empty_inputs.ts b/yarn-project/circuits.js/src/structs/kernel/private_kernel_empty_inputs.ts index 85340aa1aff..0a97d999581 100644 --- a/yarn-project/circuits.js/src/structs/kernel/private_kernel_empty_inputs.ts +++ b/yarn-project/circuits.js/src/structs/kernel/private_kernel_empty_inputs.ts @@ -3,10 +3,10 @@ import { hexSchemaFor } from '@aztec/foundation/schemas'; import { BufferReader, serializeToBuffer } from '@aztec/foundation/serialize'; import { type FieldsOf } from '@aztec/foundation/types'; -import { type RECURSIVE_PROOF_LENGTH } from '../../constants.gen.js'; +import { RECURSIVE_PROOF_LENGTH } from '../../constants.gen.js'; import { Header } from '../header.js'; -import { type RecursiveProof } from '../recursive_proof.js'; -import { type VerificationKeyAsFields } from '../verification_key.js'; +import { RecursiveProof } from '../recursive_proof.js'; +import { VerificationKeyAsFields } from '../verification_key.js'; export class PrivateKernelEmptyInputData { constructor( @@ -92,6 +92,18 @@ export class PrivateKernelEmptyInputs { fields.protocolContractTreeRoot, ); } + + static fromBuffer(buf: Buffer | BufferReader): PrivateKernelEmptyInputs { + const reader = BufferReader.asReader(buf); + return new PrivateKernelEmptyInputs( + reader.readObject(EmptyNestedData), + reader.readObject(Header), + reader.readObject(Fr), + reader.readObject(Fr), + reader.readObject(Fr), + reader.readObject(Fr), + ); + } } export class EmptyNestedCircuitInputs { @@ -109,4 +121,20 @@ export class EmptyNestedData { toBuffer(): Buffer { return serializeToBuffer(this.proof, this.vk); } + + static fromBuffer(buf: Buffer | BufferReader): EmptyNestedData { + const reader = BufferReader.asReader(buf); + const recursiveProof = reader.readObject(RecursiveProof); + + if (recursiveProof.proof.length !== RECURSIVE_PROOF_LENGTH) { + throw new TypeError( + `Invalid proof length. Expected: ${RECURSIVE_PROOF_LENGTH} got: ${recursiveProof.proof.length}`, + ); + } + + return new EmptyNestedData( + recursiveProof as RecursiveProof, + reader.readObject(VerificationKeyAsFields), + ); + } } diff --git a/yarn-project/prover-client/src/proving_broker/proving_broker.test.ts b/yarn-project/prover-client/src/proving_broker/proving_broker.test.ts new file mode 100644 index 00000000000..c992f18c87f --- /dev/null +++ b/yarn-project/prover-client/src/proving_broker/proving_broker.test.ts @@ -0,0 +1,1086 @@ +import { + ProvingRequestType, + type V2ProvingJob, + type V2ProvingJobId, + makePublicInputsAndRecursiveProof, +} from '@aztec/circuit-types'; +import { RECURSIVE_PROOF_LENGTH, VerificationKeyData, makeRecursiveProof } from '@aztec/circuits.js'; +import { + makeBaseOrMergeRollupPublicInputs, + makeBaseParityInputs, + makeParityPublicInputs, + makePrivateBaseRollupInputs, + makeRootParityInputs, +} from '@aztec/circuits.js/testing'; +import { randomBytes } from '@aztec/foundation/crypto'; + +import { jest } from '@jest/globals'; + +import { ProvingBroker } from './proving_broker.js'; +import { InMemoryDatabase } from './proving_broker_database.js'; + +beforeAll(() => { + jest.useFakeTimers(); +}); + +describe('ProvingBroker', () => { + let database: InMemoryDatabase; + let broker: ProvingBroker; + let jobTimeoutSec: number; + let maxRetries: number; + + const now = () => Math.floor(Date.now() / 1000); + + beforeEach(() => { + jobTimeoutSec = 10; + maxRetries = 2; + database = new InMemoryDatabase(); + broker = new ProvingBroker(database, { + jobTimeoutSec: jobTimeoutSec, + timeoutIntervalSec: jobTimeoutSec / 4, + maxRetries, + }); + }); + + describe('Producer API', () => { + beforeEach(async () => { + await broker.start(); + }); + + afterEach(async () => { + await broker.stop(); + }); + + it('enqueues jobs', async () => { + const id = makeProvingJobId(); + await broker.enqueueProvingJob({ + id, + blockNumber: 1, + type: ProvingRequestType.BASE_PARITY, + inputs: makeBaseParityInputs(), + }); + expect(await broker.getProvingJobStatus(id)).toEqual({ status: 'in-queue' }); + + const id2 = makeProvingJobId(); + await broker.enqueueProvingJob({ + id: id2, + blockNumber: 1, + type: ProvingRequestType.PRIVATE_BASE_ROLLUP, + inputs: makePrivateBaseRollupInputs(), + }); + expect(await broker.getProvingJobStatus(id2)).toEqual({ status: 'in-queue' }); + }); + + it('ignores duplicate jobs', async () => { + const provingJob: V2ProvingJob = { + id: makeProvingJobId(), + type: ProvingRequestType.BASE_PARITY, + blockNumber: 1, + inputs: makeBaseParityInputs(), + }; + + await broker.enqueueProvingJob(provingJob); + await expect(broker.enqueueProvingJob(provingJob)).resolves.toBeUndefined(); + await expect(broker.getProvingJobStatus(provingJob.id)).resolves.toEqual({ status: 'in-queue' }); + }); + + it('throws an error in case of duplicate job IDs', async () => { + const id = makeProvingJobId(); + await broker.enqueueProvingJob({ + id, + blockNumber: 1, + type: ProvingRequestType.BASE_PARITY, + inputs: makeBaseParityInputs(1), + }); + await expect( + broker.enqueueProvingJob({ + id, + blockNumber: 1, + type: ProvingRequestType.BASE_PARITY, + inputs: makeBaseParityInputs(2), + }), + ).rejects.toThrow('Duplicate proving job ID'); + }); + + it('returns not-found status for non-existing jobs', async () => { + const status = await broker.getProvingJobStatus(makeProvingJobId()); + expect(status).toEqual({ status: 'not-found' }); + }); + + it('cancels jobs in queue', async () => { + const id = makeProvingJobId(); + await broker.enqueueProvingJob({ + id, + blockNumber: 1, + type: ProvingRequestType.BASE_PARITY, + inputs: makeBaseParityInputs(), + }); + await expect(broker.getProvingJobStatus(id)).resolves.toEqual({ status: 'in-queue' }); + + await broker.removeAndCancelProvingJob(id); + + await expect(broker.getProvingJobStatus(id)).resolves.toEqual({ status: 'not-found' }); + }); + + it('cancels jobs in-progress', async () => { + const id = makeProvingJobId(); + await broker.enqueueProvingJob({ + id, + blockNumber: 1, + type: ProvingRequestType.BASE_PARITY, + inputs: makeBaseParityInputs(), + }); + await expect(broker.getProvingJobStatus(id)).resolves.toEqual({ status: 'in-queue' }); + await broker.getProvingJob(); + await expect(broker.getProvingJobStatus(id)).resolves.toEqual({ status: 'in-progress' }); + await broker.removeAndCancelProvingJob(id); + await expect(broker.getProvingJobStatus(id)).resolves.toEqual({ status: 'not-found' }); + }); + + it('returns job result if successful', async () => { + const provingJob: V2ProvingJob = { + id: makeProvingJobId(), + type: ProvingRequestType.BASE_PARITY, + blockNumber: 1, + inputs: makeBaseParityInputs(), + }; + + await broker.enqueueProvingJob(provingJob); + const value = makePublicInputsAndRecursiveProof( + makeParityPublicInputs(RECURSIVE_PROOF_LENGTH), + makeRecursiveProof(RECURSIVE_PROOF_LENGTH), + VerificationKeyData.makeFake(), + ); + await broker.reportProvingJobSuccess(provingJob.id, { type: ProvingRequestType.BASE_PARITY, value }); + + const status = await broker.getProvingJobStatus(provingJob.id); + expect(status).toEqual({ status: 'resolved', value: { type: ProvingRequestType.BASE_PARITY, value } }); + }); + + it('returns job error if failed', async () => { + const provingJob: V2ProvingJob = { + id: makeProvingJobId(), + type: ProvingRequestType.BASE_PARITY, + blockNumber: 1, + inputs: makeBaseParityInputs(), + }; + + await broker.enqueueProvingJob(provingJob); + const error = new Error('test error'); + await broker.reportProvingJobError(provingJob.id, error); + + const status = await broker.getProvingJobStatus(provingJob.id); + expect(status).toEqual({ status: 'rejected', error: String(error) }); + }); + }); + + describe('Consumer API', () => { + beforeEach(async () => { + await broker.start(); + }); + + afterEach(async () => { + await broker.stop(); + }); + + it('returns undefined if no jobs are available', async () => { + const provingJob = await broker.getProvingJob({ allowList: [ProvingRequestType.BASE_PARITY] }); + expect(provingJob).toBeUndefined(); + }); + + it('returns jobs in priority order', async () => { + const provingJob1: V2ProvingJob = { + id: makeProvingJobId(), + type: ProvingRequestType.BASE_PARITY, + blockNumber: 1, + inputs: makeBaseParityInputs(), + }; + + const provingJob2: V2ProvingJob = { + id: makeProvingJobId(), + type: ProvingRequestType.BASE_PARITY, + blockNumber: 2, + inputs: makeBaseParityInputs(), + }; + + const provingJob3: V2ProvingJob = { + id: makeProvingJobId(), + type: ProvingRequestType.BASE_PARITY, + blockNumber: 3, + inputs: makeBaseParityInputs(), + }; + + await broker.enqueueProvingJob(provingJob2); + await broker.enqueueProvingJob(provingJob3); + await broker.enqueueProvingJob(provingJob1); + + await getAndAssertNextJobId(provingJob1.id, ProvingRequestType.BASE_PARITY); + }); + + it('returns undefined if no jobs are available for the given allowList', async () => { + await broker.enqueueProvingJob({ + id: makeProvingJobId(), + type: ProvingRequestType.BASE_PARITY, + blockNumber: 1, + inputs: makeBaseParityInputs(), + }); + + await expect( + broker.getProvingJob({ allowList: [ProvingRequestType.PRIVATE_BASE_ROLLUP] }), + ).resolves.toBeUndefined(); + }); + + it('returns a job if it is in the allowList', async () => { + const baseParity1 = makeProvingJobId(); + await broker.enqueueProvingJob({ + id: baseParity1, + type: ProvingRequestType.BASE_PARITY, + blockNumber: 1, + inputs: makeBaseParityInputs(), + }); + + const baseRollup1 = makeProvingJobId(); + await broker.enqueueProvingJob({ + id: baseRollup1, + type: ProvingRequestType.PRIVATE_BASE_ROLLUP, + blockNumber: 1, + inputs: makePrivateBaseRollupInputs(), + }); + + const baseRollup2 = makeProvingJobId(); + await broker.enqueueProvingJob({ + id: baseRollup2, + type: ProvingRequestType.PRIVATE_BASE_ROLLUP, + blockNumber: 2, + inputs: makePrivateBaseRollupInputs(), + }); + + const rootParity1 = makeProvingJobId(); + await broker.enqueueProvingJob({ + id: rootParity1, + type: ProvingRequestType.ROOT_PARITY, + blockNumber: 1, + inputs: makeRootParityInputs(), + }); + + await getAndAssertNextJobId(baseParity1, ProvingRequestType.BASE_PARITY); + }); + + it('returns the most important job if it is in the allowList', async () => { + const baseParity1 = makeProvingJobId(); + await broker.enqueueProvingJob({ + id: baseParity1, + type: ProvingRequestType.BASE_PARITY, + blockNumber: 1, + inputs: makeBaseParityInputs(), + }); + + const baseRollup1 = makeProvingJobId(); + await broker.enqueueProvingJob({ + id: baseRollup1, + type: ProvingRequestType.PRIVATE_BASE_ROLLUP, + blockNumber: 1, + inputs: makePrivateBaseRollupInputs(), + }); + + const baseRollup2 = makeProvingJobId(); + await broker.enqueueProvingJob({ + id: baseRollup2, + type: ProvingRequestType.PRIVATE_BASE_ROLLUP, + blockNumber: 2, + inputs: makePrivateBaseRollupInputs(), + }); + + const rootParity1 = makeProvingJobId(); + await broker.enqueueProvingJob({ + id: rootParity1, + type: ProvingRequestType.ROOT_PARITY, + blockNumber: 1, + inputs: makeRootParityInputs(), + }); + + await getAndAssertNextJobId( + baseRollup1, + ProvingRequestType.BASE_PARITY, + ProvingRequestType.PRIVATE_BASE_ROLLUP, + ProvingRequestType.ROOT_PARITY, + ); + }); + + it('returns a new job when reporting progress if current one is cancelled', async () => { + const id = makeProvingJobId(); + await broker.enqueueProvingJob({ + id, + type: ProvingRequestType.BASE_PARITY, + blockNumber: 1, + inputs: makeBaseParityInputs(), + }); + await broker.getProvingJob(); + await assertJobStatus(id, 'in-progress'); + await broker.removeAndCancelProvingJob(id); + await assertJobStatus(id, 'not-found'); + + const id2 = makeProvingJobId(); + await broker.enqueueProvingJob({ + id: id2, + type: ProvingRequestType.BASE_PARITY, + blockNumber: 1, + inputs: makeBaseParityInputs(), + }); + await expect( + broker.reportProvingJobProgress(id, now(), { allowList: [ProvingRequestType.BASE_PARITY] }), + ).resolves.toEqual({ job: expect.objectContaining({ id: id2 }), time: expect.any(Number) }); + }); + + it('returns a new job if job is already in progress elsewhere', async () => { + // this test simulates the broker crashing and when it comes back online it has two agents working the same job + const job1: V2ProvingJob = { + id: makeProvingJobId(), + type: ProvingRequestType.BASE_PARITY, + blockNumber: 1, + inputs: makeBaseParityInputs(), + }; + + const job2: V2ProvingJob = { + id: makeProvingJobId(), + type: ProvingRequestType.BASE_PARITY, + blockNumber: 2, + inputs: makeBaseParityInputs(), + }; + + await broker.enqueueProvingJob(job1); + await broker.enqueueProvingJob(job2); + + const { job: firstAgentJob, time: firstAgentStartedAt } = (await broker.getProvingJob({ + allowList: [ProvingRequestType.BASE_PARITY], + }))!; + + expect(firstAgentJob).toEqual(job1); + await assertJobStatus(job1.id, 'in-progress'); + + await jest.advanceTimersByTimeAsync(jobTimeoutSec / 2); + await expect( + broker.reportProvingJobProgress(job1.id, firstAgentStartedAt, { + allowList: [ProvingRequestType.BASE_PARITY], + }), + ).resolves.toBeUndefined(); + + // restart the broker! + await broker.stop(); + + // fake some time passing while the broker restarts + await jest.advanceTimersByTimeAsync(10_000); + + broker = new ProvingBroker(database); + await broker.start(); + + await assertJobStatus(job1.id, 'in-queue'); + + const { job: secondAgentJob, time: secondAgentStartedAt } = (await broker.getProvingJob({ + allowList: [ProvingRequestType.BASE_PARITY], + }))!; + + // should be the same job! + expect(secondAgentJob).toEqual(job1); + await assertJobStatus(job1.id, 'in-progress'); + + // original agent should still be able to report progress + // and it should take over the job from the second agent + await expect( + broker.reportProvingJobProgress(job1.id, firstAgentStartedAt, { + allowList: [ProvingRequestType.BASE_PARITY], + }), + ).resolves.toBeUndefined(); + + // second agent should get a new job now + await expect( + broker.reportProvingJobProgress(job1.id, secondAgentStartedAt, { + allowList: [ProvingRequestType.BASE_PARITY], + }), + ).resolves.toEqual({ job: job2, time: expect.any(Number) }); + }); + + it('avoids sending the same job to a new agent after a restart', async () => { + // this test simulates the broker crashing and when it comes back online it has two agents working the same job + const job1: V2ProvingJob = { + id: makeProvingJobId(), + type: ProvingRequestType.BASE_PARITY, + blockNumber: 1, + inputs: makeBaseParityInputs(), + }; + + const job2: V2ProvingJob = { + id: makeProvingJobId(), + type: ProvingRequestType.BASE_PARITY, + blockNumber: 2, + inputs: makeBaseParityInputs(), + }; + + await broker.enqueueProvingJob(job1); + await broker.enqueueProvingJob(job2); + + const { job: firstAgentJob, time: firstAgentStartedAt } = (await broker.getProvingJob({ + allowList: [ProvingRequestType.BASE_PARITY], + }))!; + + expect(firstAgentJob).toEqual(job1); + await assertJobStatus(job1.id, 'in-progress'); + + // restart the broker! + await broker.stop(); + + // fake some time passing while the broker restarts + await jest.advanceTimersByTimeAsync(10_000); + + broker = new ProvingBroker(database); + await broker.start(); + + await assertJobStatus(job1.id, 'in-queue'); + + // original agent should still be able to report progress + // and it should take over the job from the second agent + await expect( + broker.reportProvingJobProgress(job1.id, firstAgentStartedAt, { + allowList: [ProvingRequestType.BASE_PARITY], + }), + ).resolves.toBeUndefined(); + + const { job: secondAgentJob } = (await broker.getProvingJob({ + allowList: [ProvingRequestType.BASE_PARITY], + }))!; + + // should be the same job! + expect(secondAgentJob).toEqual(job2); + await assertJobStatus(job1.id, 'in-progress'); + await assertJobStatus(job2.id, 'in-progress'); + }); + + it('avoids sending a completed job to a new agent after a restart', async () => { + // this test simulates the broker crashing and when it comes back online it has two agents working the same job + const job1: V2ProvingJob = { + id: makeProvingJobId(), + type: ProvingRequestType.BASE_PARITY, + blockNumber: 1, + inputs: makeBaseParityInputs(), + }; + + const job2: V2ProvingJob = { + id: makeProvingJobId(), + type: ProvingRequestType.BASE_PARITY, + blockNumber: 2, + inputs: makeBaseParityInputs(), + }; + + await broker.enqueueProvingJob(job1); + await broker.enqueueProvingJob(job2); + + await getAndAssertNextJobId(job1.id); + await assertJobStatus(job1.id, 'in-progress'); + + // restart the broker! + await broker.stop(); + + // fake some time passing while the broker restarts + await jest.advanceTimersByTimeAsync(100 * jobTimeoutSec * 1000); + + broker = new ProvingBroker(database); + await broker.start(); + await assertJobStatus(job1.id, 'in-queue'); + + // after the restart the new broker thinks job1 is available + // inform the agent of the job completion + + await expect( + broker.reportProvingJobSuccess(job1.id, { + type: ProvingRequestType.BASE_PARITY, + value: makePublicInputsAndRecursiveProof( + makeParityPublicInputs(), + makeRecursiveProof(RECURSIVE_PROOF_LENGTH), + VerificationKeyData.makeFake(), + ), + }), + ).resolves.toBeUndefined(); + await assertJobStatus(job1.id, 'resolved'); + + // make sure the the broker sends the next job to the agent + await getAndAssertNextJobId(job2.id); + + await assertJobStatus(job1.id, 'resolved'); + await assertJobStatus(job2.id, 'in-progress'); + }); + + it('tracks job result if in progress', async () => { + const id1 = makeProvingJobId(); + const id2 = makeProvingJobId(); + await broker.enqueueProvingJob({ + id: id1, + type: ProvingRequestType.BASE_PARITY, + blockNumber: 1, + inputs: makeBaseParityInputs(), + }); + await broker.enqueueProvingJob({ + id: id2, + type: ProvingRequestType.BASE_PARITY, + blockNumber: 2, + inputs: makeBaseParityInputs(), + }); + + await getAndAssertNextJobId(id1); + await assertJobStatus(id1, 'in-progress'); + await broker.reportProvingJobSuccess(id1, { + type: ProvingRequestType.BASE_PARITY, + value: makePublicInputsAndRecursiveProof( + makeParityPublicInputs(), + makeRecursiveProof(RECURSIVE_PROOF_LENGTH), + VerificationKeyData.makeFake(), + ), + }); + await assertJobStatus(id1, 'resolved'); + + await getAndAssertNextJobId(id2); + await assertJobStatus(id2, 'in-progress'); + await broker.reportProvingJobError(id2, new Error('test error')); + await assertJobStatus(id2, 'rejected'); + }); + + it('tracks job result even if job is in queue', async () => { + const id1 = makeProvingJobId(); + const id2 = makeProvingJobId(); + await broker.enqueueProvingJob({ + id: id1, + type: ProvingRequestType.BASE_PARITY, + blockNumber: 1, + inputs: makeBaseParityInputs(), + }); + await broker.enqueueProvingJob({ + id: id2, + type: ProvingRequestType.BASE_PARITY, + blockNumber: 2, + inputs: makeBaseParityInputs(), + }); + + await broker.reportProvingJobSuccess(id1, { + type: ProvingRequestType.BASE_PARITY, + value: makePublicInputsAndRecursiveProof( + makeParityPublicInputs(), + makeRecursiveProof(RECURSIVE_PROOF_LENGTH), + VerificationKeyData.makeFake(), + ), + }); + await assertJobStatus(id1, 'resolved'); + + await broker.reportProvingJobError(id2, new Error('test error')); + await assertJobStatus(id2, 'rejected'); + }); + + it('ignores reported job error if unknown job', async () => { + const id = makeProvingJobId(); + await assertJobStatus(id, 'not-found'); + await broker.reportProvingJobError(id, new Error('test error')); + await assertJobStatus(id, 'not-found'); + }); + + it('ignores job result if unknown job', async () => { + const id = makeProvingJobId(); + await assertJobStatus(id, 'not-found'); + await broker.reportProvingJobSuccess(id, { + type: ProvingRequestType.BASE_PARITY, + value: makePublicInputsAndRecursiveProof( + makeParityPublicInputs(), + makeRecursiveProof(RECURSIVE_PROOF_LENGTH), + VerificationKeyData.makeFake(), + ), + }); + await assertJobStatus(id, 'not-found'); + }); + }); + + describe('Timeouts', () => { + beforeEach(async () => { + await broker.start(); + }); + + afterEach(async () => { + await broker.stop(); + }); + + it('tracks in progress jobs', async () => { + const id = makeProvingJobId(); + await broker.enqueueProvingJob({ + id, + type: ProvingRequestType.BASE_PARITY, + blockNumber: 1, + inputs: makeBaseParityInputs(), + }); + + await assertJobStatus(id, 'in-queue'); + await getAndAssertNextJobId(id); + await assertJobStatus(id, 'in-progress'); + }); + + it('re-enqueues jobs that time out', async () => { + const id = makeProvingJobId(); + await broker.enqueueProvingJob({ + id, + type: ProvingRequestType.BASE_PARITY, + blockNumber: 1, + inputs: makeBaseParityInputs(), + }); + + await assertJobStatus(id, 'in-queue'); + await getAndAssertNextJobId(id); + await assertJobStatus(id, 'in-progress'); + + // advance time so job times out because of no heartbeats + await jest.advanceTimersByTimeAsync(jobTimeoutSec * 1000); + + // should be back in the queue now + await assertJobStatus(id, 'in-queue'); + }); + + it('keeps the jobs in progress while it is alive', async () => { + const id = makeProvingJobId(); + await broker.enqueueProvingJob({ + id, + type: ProvingRequestType.BASE_PARITY, + blockNumber: 1, + inputs: makeBaseParityInputs(), + }); + + await assertJobStatus(id, 'in-queue'); + const { job, time } = (await broker.getProvingJob())!; + expect(job.id).toEqual(id); + await assertJobStatus(id, 'in-progress'); + + // advance the time slightly, not enough for the request to timeout + await jest.advanceTimersByTimeAsync((jobTimeoutSec * 1000) / 2); + + await assertJobStatus(id, 'in-progress'); + + // send a heartbeat + await broker.reportProvingJobProgress(id, time); + + // advance the time again + await jest.advanceTimersByTimeAsync((jobTimeoutSec * 1000) / 2); + + // should still be our request to process + await assertJobStatus(id, 'in-progress'); + + // advance the time again and lose the request + await jest.advanceTimersByTimeAsync(jobTimeoutSec * 1000); + await assertJobStatus(id, 'in-queue'); + }); + }); + + describe('Retries', () => { + it('retries jobs', async () => { + const provingJob: V2ProvingJob = { + id: makeProvingJobId(), + type: ProvingRequestType.BASE_PARITY, + blockNumber: 1, + inputs: makeBaseParityInputs(), + }; + + await broker.enqueueProvingJob(provingJob); + + await expect(broker.getProvingJobStatus(provingJob.id)).resolves.toEqual({ + status: 'in-queue', + }); + + await expect(broker.getProvingJob()).resolves.toEqual({ job: provingJob, time: expect.any(Number) }); + + await expect(broker.getProvingJobStatus(provingJob.id)).resolves.toEqual({ + status: 'in-progress', + }); + + await broker.reportProvingJobError(provingJob.id, new Error('test error'), true); + + await expect(broker.getProvingJobStatus(provingJob.id)).resolves.toEqual({ + status: 'in-queue', + }); + }); + + it('retries up to a maximum number of times', async () => { + const id = makeProvingJobId(); + await broker.enqueueProvingJob({ + id, + type: ProvingRequestType.BASE_PARITY, + blockNumber: 1, + inputs: makeBaseParityInputs(), + }); + + for (let i = 0; i < maxRetries; i++) { + await assertJobStatus(id, 'in-queue'); + await getAndAssertNextJobId(id); + await assertJobStatus(id, 'in-progress'); + await broker.reportProvingJobError(id, new Error('test error'), true); + } + + await expect(broker.getProvingJobStatus(id)).resolves.toEqual({ + status: 'rejected', + error: String(new Error('test error')), + }); + }); + + it('passing retry=false does not retry', async () => { + const id = makeProvingJobId(); + await broker.enqueueProvingJob({ + id, + type: ProvingRequestType.BASE_PARITY, + blockNumber: 1, + inputs: makeBaseParityInputs(), + }); + + await getAndAssertNextJobId(id); + await assertJobStatus(id, 'in-progress'); + await broker.reportProvingJobError(id, new Error('test error'), false); + await expect(broker.getProvingJobStatus(id)).resolves.toEqual({ + status: 'rejected', + error: String(new Error('test error')), + }); + }); + }); + + describe('Database management', () => { + afterEach(async () => { + await broker.stop(); + }); + + it('re-enqueues proof requests on start', async () => { + const id1 = makeProvingJobId(); + + await database.addProvingJob({ + id: id1, + type: ProvingRequestType.BASE_PARITY, + blockNumber: 1, + inputs: makeBaseParityInputs(), + }); + + const id2 = makeProvingJobId(); + await database.addProvingJob({ + id: id2, + type: ProvingRequestType.PRIVATE_BASE_ROLLUP, + blockNumber: 2, + inputs: makePrivateBaseRollupInputs(), + }); + + await broker.start(); + + await expect(broker.getProvingJobStatus(id1)).resolves.toEqual({ status: 'in-queue' }); + await expect(broker.getProvingJobStatus(id2)).resolves.toEqual({ status: 'in-queue' }); + + await expect(broker.getProvingJob({ allowList: [ProvingRequestType.BASE_PARITY] })).resolves.toEqual({ + job: { + id: id1, + type: ProvingRequestType.BASE_PARITY, + blockNumber: 1, + inputs: expect.any(Object), + }, + time: expect.any(Number), + }); + + await expect(broker.getProvingJob()).resolves.toEqual({ + job: { + id: id2, + type: ProvingRequestType.PRIVATE_BASE_ROLLUP, + blockNumber: 2, + inputs: expect.any(Object), + }, + time: expect.any(Number), + }); + + await expect(broker.getProvingJobStatus(id1)).resolves.toEqual({ + status: 'in-progress', + }); + await expect(broker.getProvingJobStatus(id2)).resolves.toEqual({ + status: 'in-progress', + }); + }); + + it('restores proof results on start', async () => { + const id1 = makeProvingJobId(); + + await database.addProvingJob({ + id: id1, + type: ProvingRequestType.BASE_PARITY, + blockNumber: 1, + inputs: makeBaseParityInputs(), + }); + + const id2 = makeProvingJobId(); + await database.addProvingJob({ + id: id2, + type: ProvingRequestType.PRIVATE_BASE_ROLLUP, + blockNumber: 2, + inputs: makePrivateBaseRollupInputs(), + }); + + await database.setProvingJobResult(id1, { + type: ProvingRequestType.BASE_PARITY, + value: makePublicInputsAndRecursiveProof( + makeParityPublicInputs(RECURSIVE_PROOF_LENGTH), + makeRecursiveProof(RECURSIVE_PROOF_LENGTH), + VerificationKeyData.makeFake(), + ), + }); + + await database.setProvingJobResult(id2, { + type: ProvingRequestType.PRIVATE_BASE_ROLLUP, + value: makePublicInputsAndRecursiveProof( + makeBaseOrMergeRollupPublicInputs(), + makeRecursiveProof(RECURSIVE_PROOF_LENGTH), + VerificationKeyData.makeFake(), + ), + }); + + await broker.start(); + + await expect(broker.getProvingJobStatus(id1)).resolves.toEqual({ + status: 'resolved', + value: expect.any(Object), + }); + + await expect(broker.getProvingJobStatus(id2)).resolves.toEqual({ + status: 'resolved', + value: expect.any(Object), + }); + }); + + it('only re-enqueues unfinished jobs', async () => { + const id1 = makeProvingJobId(); + + await database.addProvingJob({ + id: id1, + type: ProvingRequestType.BASE_PARITY, + blockNumber: 1, + inputs: makeBaseParityInputs(), + }); + await database.setProvingJobResult(id1, { + type: ProvingRequestType.BASE_PARITY, + value: makePublicInputsAndRecursiveProof( + makeParityPublicInputs(RECURSIVE_PROOF_LENGTH), + makeRecursiveProof(RECURSIVE_PROOF_LENGTH), + VerificationKeyData.makeFake(), + ), + }); + + const id2 = makeProvingJobId(); + await database.addProvingJob({ + id: id2, + type: ProvingRequestType.PRIVATE_BASE_ROLLUP, + blockNumber: 2, + inputs: makePrivateBaseRollupInputs(), + }); + + await broker.start(); + + await expect(broker.getProvingJobStatus(id1)).resolves.toEqual({ + status: 'resolved', + value: expect.any(Object), + }); + + await expect(broker.getProvingJobStatus(id2)).resolves.toEqual({ status: 'in-queue' }); + await getAndAssertNextJobId(id2); + }); + + it('clears job state when job is removed', async () => { + const id1 = makeProvingJobId(); + + await database.addProvingJob({ + id: id1, + type: ProvingRequestType.BASE_PARITY, + blockNumber: 1, + inputs: makeBaseParityInputs(), + }); + await database.setProvingJobResult(id1, { + type: ProvingRequestType.BASE_PARITY, + value: makePublicInputsAndRecursiveProof( + makeParityPublicInputs(RECURSIVE_PROOF_LENGTH), + makeRecursiveProof(RECURSIVE_PROOF_LENGTH), + VerificationKeyData.makeFake(), + ), + }); + + const id2 = makeProvingJobId(); + await database.addProvingJob({ + id: id2, + type: ProvingRequestType.PRIVATE_BASE_ROLLUP, + blockNumber: 2, + inputs: makePrivateBaseRollupInputs(), + }); + + expect(database.getProvingJob(id1)).not.toBeUndefined(); + expect(database.getProvingJobResult(id1)).not.toBeUndefined(); + expect(database.getProvingJob(id2)).not.toBeUndefined(); + + await broker.start(); + + await expect(broker.getProvingJobStatus(id1)).resolves.toEqual({ + status: 'resolved', + value: expect.any(Object), + }); + + await expect(broker.getProvingJobStatus(id2)).resolves.toEqual({ status: 'in-queue' }); + + await broker.removeAndCancelProvingJob(id1); + await broker.removeAndCancelProvingJob(id2); + + await expect(broker.getProvingJobStatus(id1)).resolves.toEqual({ status: 'not-found' }); + await expect(broker.getProvingJobStatus(id2)).resolves.toEqual({ status: 'not-found' }); + + expect(database.getProvingJob(id1)).toBeUndefined(); + expect(database.getProvingJobResult(id1)).toBeUndefined(); + expect(database.getProvingJob(id2)).toBeUndefined(); + }); + + it('saves job when enqueued', async () => { + await broker.start(); + const id = makeProvingJobId(); + await broker.enqueueProvingJob({ + id, + type: ProvingRequestType.BASE_PARITY, + blockNumber: 1, + inputs: makeBaseParityInputs(), + }); + expect(database.getProvingJob(id)).not.toBeUndefined(); + }); + + it('does not retain job if database fails to save', async () => { + await broker.start(); + + jest.spyOn(database, 'addProvingJob').mockRejectedValue(new Error('db error')); + const id = makeProvingJobId(); + await expect( + broker.enqueueProvingJob({ + id, + type: ProvingRequestType.BASE_PARITY, + blockNumber: 1, + inputs: makeBaseParityInputs(), + }), + ).rejects.toThrow(new Error('db error')); + await assertJobStatus(id, 'not-found'); + }); + + it('saves job result', async () => { + await broker.start(); + const id = makeProvingJobId(); + await broker.enqueueProvingJob({ + id, + type: ProvingRequestType.BASE_PARITY, + blockNumber: 1, + inputs: makeBaseParityInputs(), + }); + await broker.reportProvingJobSuccess(id, { + type: ProvingRequestType.BASE_PARITY, + value: makePublicInputsAndRecursiveProof( + makeParityPublicInputs(RECURSIVE_PROOF_LENGTH), + makeRecursiveProof(RECURSIVE_PROOF_LENGTH), + VerificationKeyData.makeFake(), + ), + }); + await assertJobStatus(id, 'resolved'); + expect(database.getProvingJobResult(id)).toEqual({ value: expect.any(Object) }); + }); + + it('does not retain job result if database fails to save', async () => { + await broker.start(); + jest.spyOn(database, 'setProvingJobResult').mockRejectedValue(new Error('db error')); + const id = makeProvingJobId(); + await broker.enqueueProvingJob({ + id, + type: ProvingRequestType.BASE_PARITY, + blockNumber: 1, + inputs: makeBaseParityInputs(), + }); + await expect( + broker.reportProvingJobSuccess(id, { + type: ProvingRequestType.BASE_PARITY, + value: makePublicInputsAndRecursiveProof( + makeParityPublicInputs(RECURSIVE_PROOF_LENGTH), + makeRecursiveProof(RECURSIVE_PROOF_LENGTH), + VerificationKeyData.makeFake(), + ), + }), + ).rejects.toThrow(new Error('db error')); + await assertJobStatus(id, 'in-queue'); + expect(database.getProvingJobResult(id)).toBeUndefined(); + }); + + it('saves job error', async () => { + await broker.start(); + const id = makeProvingJobId(); + await broker.enqueueProvingJob({ + id, + type: ProvingRequestType.BASE_PARITY, + blockNumber: 1, + inputs: makeBaseParityInputs(), + }); + const error = new Error('test error'); + await broker.reportProvingJobError(id, error); + await assertJobStatus(id, 'rejected'); + expect(database.getProvingJobResult(id)).toEqual({ error: String(error) }); + }); + + it('does not retain job error if database fails to save', async () => { + await broker.start(); + jest.spyOn(database, 'setProvingJobError').mockRejectedValue(new Error('db error')); + const id = makeProvingJobId(); + await broker.enqueueProvingJob({ + id, + type: ProvingRequestType.BASE_PARITY, + blockNumber: 1, + inputs: makeBaseParityInputs(), + }); + await expect(broker.reportProvingJobError(id, new Error())).rejects.toThrow(new Error('db error')); + await assertJobStatus(id, 'in-queue'); + expect(database.getProvingJobResult(id)).toBeUndefined(); + }); + + it('does not save job result if job is unknown', async () => { + await broker.start(); + const id = makeProvingJobId(); + + expect(database.getProvingJob(id)).toBeUndefined(); + expect(database.getProvingJobResult(id)).toBeUndefined(); + + await broker.reportProvingJobSuccess(id, { + type: ProvingRequestType.BASE_PARITY, + value: makePublicInputsAndRecursiveProof( + makeParityPublicInputs(RECURSIVE_PROOF_LENGTH), + makeRecursiveProof(RECURSIVE_PROOF_LENGTH), + VerificationKeyData.makeFake(), + ), + }); + + expect(database.getProvingJob(id)).toBeUndefined(); + expect(database.getProvingJobResult(id)).toBeUndefined(); + }); + + it('does not save job error if job is unknown', async () => { + await broker.start(); + const id = makeProvingJobId(); + + expect(database.getProvingJob(id)).toBeUndefined(); + expect(database.getProvingJobResult(id)).toBeUndefined(); + + await broker.reportProvingJobError(id, new Error('test error')); + + expect(database.getProvingJob(id)).toBeUndefined(); + expect(database.getProvingJobResult(id)).toBeUndefined(); + }); + }); + + async function assertJobStatus(id: V2ProvingJobId, status: string) { + await expect(broker.getProvingJobStatus(id)).resolves.toEqual(expect.objectContaining({ status })); + } + + async function getAndAssertNextJobId(id: V2ProvingJobId, ...allowList: ProvingRequestType[]) { + await expect(broker.getProvingJob(allowList.length > 0 ? { allowList } : undefined)).resolves.toEqual( + expect.objectContaining({ job: expect.objectContaining({ id }) }), + ); + } +}); + +function makeProvingJobId(): V2ProvingJobId { + return randomBytes(8).toString('hex') as V2ProvingJobId; +} diff --git a/yarn-project/prover-client/src/proving_broker/proving_broker.ts b/yarn-project/prover-client/src/proving_broker/proving_broker.ts new file mode 100644 index 00000000000..396bbfb48ec --- /dev/null +++ b/yarn-project/prover-client/src/proving_broker/proving_broker.ts @@ -0,0 +1,372 @@ +import { + ProvingRequestType, + type V2ProofOutput, + type V2ProvingJob, + type V2ProvingJobId, + type V2ProvingJobResult, + type V2ProvingJobStatus, +} from '@aztec/circuit-types'; +import { createDebugLogger } from '@aztec/foundation/log'; +import { RunningPromise } from '@aztec/foundation/promise'; +import { PriorityMemoryQueue } from '@aztec/foundation/queue'; + +import assert from 'assert'; + +import { type ProvingBrokerDatabase } from './proving_broker_database.js'; +import type { ProvingJobConsumer, ProvingJobFilter, ProvingJobProducer } from './proving_broker_interface.js'; + +type InProgressMetadata = { + id: V2ProvingJobId; + startedAt: number; + lastUpdatedAt: number; +}; + +type ProofRequestBrokerConfig = { + timeoutIntervalSec?: number; + jobTimeoutSec?: number; + maxRetries?: number; +}; + +/** + * A broker that manages proof requests and distributes them to workers based on their priority. + * It takes a backend that is responsible for storing and retrieving proof requests and results. + */ +export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { + private queues: ProvingQueues = { + [ProvingRequestType.PUBLIC_VM]: new PriorityMemoryQueue(provingJobComparator), + [ProvingRequestType.TUBE_PROOF]: new PriorityMemoryQueue(provingJobComparator), + [ProvingRequestType.PRIVATE_KERNEL_EMPTY]: new PriorityMemoryQueue(provingJobComparator), + + [ProvingRequestType.PRIVATE_BASE_ROLLUP]: new PriorityMemoryQueue(provingJobComparator), + [ProvingRequestType.PUBLIC_BASE_ROLLUP]: new PriorityMemoryQueue(provingJobComparator), + [ProvingRequestType.MERGE_ROLLUP]: new PriorityMemoryQueue(provingJobComparator), + [ProvingRequestType.ROOT_ROLLUP]: new PriorityMemoryQueue(provingJobComparator), + + [ProvingRequestType.BLOCK_MERGE_ROLLUP]: new PriorityMemoryQueue(provingJobComparator), + [ProvingRequestType.BLOCK_ROOT_ROLLUP]: new PriorityMemoryQueue(provingJobComparator), + [ProvingRequestType.EMPTY_BLOCK_ROOT_ROLLUP]: new PriorityMemoryQueue(provingJobComparator), + + [ProvingRequestType.BASE_PARITY]: new PriorityMemoryQueue(provingJobComparator), + [ProvingRequestType.ROOT_PARITY]: new PriorityMemoryQueue(provingJobComparator), + }; + + // holds a copy of the database in memory in order to quickly fulfill requests + // this is fine because this broker is the only one that can modify the database + private jobsCache = new Map(); + // as above, but for results + private resultsCache = new Map(); + + // keeps track of which jobs are currently being processed + // in the event of a crash this information is lost, but that's ok + // the next time the broker starts it will recreate jobsCache and still + // accept results from the workers + private inProgress = new Map(); + + // keep track of which proving job has been retried + private retries = new Map(); + + private timeoutPromise: RunningPromise; + private timeSource = () => Math.floor(Date.now() / 1000); + private jobTimeoutSec: number; + private maxRetries: number; + + public constructor( + private database: ProvingBrokerDatabase, + { jobTimeoutSec = 30, timeoutIntervalSec = 10, maxRetries = 3 }: ProofRequestBrokerConfig = {}, + private logger = createDebugLogger('aztec:prover-client:proof-request-broker'), + ) { + this.timeoutPromise = new RunningPromise(this.timeoutCheck, timeoutIntervalSec * 1000); + this.jobTimeoutSec = jobTimeoutSec; + this.maxRetries = maxRetries; + } + + // eslint-disable-next-line require-await + public async start(): Promise { + for (const [item, result] of this.database.allProvingJobs()) { + this.logger.info(`Restoring proving job id=${item.id} settled=${!!result}`); + + this.jobsCache.set(item.id, item); + if (result) { + this.resultsCache.set(item.id, result); + } else { + this.logger.debug(`Re-enqueuing proving job id=${item.id}`); + this.enqueueJobInternal(item); + } + } + + this.timeoutPromise.start(); + } + + public stop(): Promise { + return this.timeoutPromise.stop(); + } + + public async enqueueProvingJob(job: V2ProvingJob): Promise { + if (this.jobsCache.has(job.id)) { + const existing = this.jobsCache.get(job.id); + assert.deepStrictEqual(job, existing, 'Duplicate proving job ID'); + return; + } + + await this.database.addProvingJob(job); + this.jobsCache.set(job.id, job); + this.enqueueJobInternal(job); + } + + public async removeAndCancelProvingJob(id: V2ProvingJobId): Promise { + this.logger.info(`Cancelling job id=${id}`); + await this.database.deleteProvingJobAndResult(id); + + this.jobsCache.delete(id); + this.resultsCache.delete(id); + this.inProgress.delete(id); + this.retries.delete(id); + } + + // eslint-disable-next-line require-await + public async getProvingJobStatus(id: V2ProvingJobId): Promise { + const result = this.resultsCache.get(id); + if (!result) { + // no result yet, check if we know the item + const item = this.jobsCache.get(id); + + if (!item) { + this.logger.warn(`Proving job id=${id} not found`); + return Promise.resolve({ status: 'not-found' }); + } + + return Promise.resolve({ status: this.inProgress.has(id) ? 'in-progress' : 'in-queue' }); + } else if ('value' in result) { + return Promise.resolve({ status: 'resolved', value: result.value }); + } else { + return Promise.resolve({ status: 'rejected', error: result.error }); + } + } + + // eslint-disable-next-line require-await + async getProvingJob( + filter: ProvingJobFilter = {}, + ): Promise<{ job: V2ProvingJob; time: number } | undefined> { + const allowedProofs: ProvingRequestType[] = filter.allowList + ? [...filter.allowList] + : Object.values(ProvingRequestType).filter((x): x is ProvingRequestType => typeof x === 'number'); + allowedProofs.sort(proofTypeComparator); + + for (const proofType of allowedProofs) { + const queue = this.queues[proofType]; + let job: V2ProvingJob | undefined; + // exhaust the queue and make sure we're not sending a job that's already in progress + // or has already been completed + // this can happen if the broker crashes and restarts + // it's possible agents will report progress or results for jobs that are no longer in the queue + while ((job = queue.getImmediate())) { + if (!this.inProgress.has(job.id) && !this.resultsCache.has(job.id)) { + const time = this.timeSource(); + this.inProgress.set(job.id, { + id: job.id, + startedAt: time, + lastUpdatedAt: time, + }); + + return { job, time }; + } + } + } + + return undefined; + } + + async reportProvingJobError(id: V2ProvingJobId, err: Error, retry = false): Promise { + const info = this.inProgress.get(id); + const item = this.jobsCache.get(id); + const retries = this.retries.get(id) ?? 0; + + if (!item) { + this.logger.warn(`Proving job id=${id} not found`); + return; + } + + if (!info) { + this.logger.warn(`Proving job id=${id} type=${ProvingRequestType[item.type]} not in the in-progress set`); + } else { + this.inProgress.delete(id); + } + + if (retry && retries + 1 < this.maxRetries) { + this.logger.info(`Retrying proving job id=${id} type=${ProvingRequestType[item.type]} retry=${retries + 1}`); + this.retries.set(id, retries + 1); + this.enqueueJobInternal(item); + return; + } + + this.logger.debug( + `Marking proving job id=${id} type=${ProvingRequestType[item.type]} totalAttempts=${retries + 1} as failed`, + ); + await this.database.setProvingJobError(id, err); + this.resultsCache.set(id, { error: String(err) }); + } + + reportProvingJobProgress( + id: V2ProvingJobId, + startedAt: number, + filter?: ProvingJobFilter, + ): Promise<{ job: V2ProvingJob; time: number } | undefined> { + const job = this.jobsCache.get(id); + if (!job) { + this.logger.warn(`Proving job id=${id} does not exist`); + return filter ? this.getProvingJob(filter) : Promise.resolve(undefined); + } + + const metadata = this.inProgress.get(id); + const now = this.timeSource(); + if (!metadata) { + this.logger.warn( + `Proving job id=${id} type=${ProvingRequestType[job.type]} not found in the in-progress cache, adding it`, + ); + // the queue will still contain the item at this point! + // we need to be careful when popping off the queue to make sure we're not sending + // a job that's already in progress + this.inProgress.set(id, { + id, + startedAt, + lastUpdatedAt: this.timeSource(), + }); + return Promise.resolve(undefined); + } else if (startedAt <= metadata.startedAt) { + if (startedAt < metadata.startedAt) { + this.logger.debug( + `Proving job id=${id} type=${ProvingRequestType[job.type]} startedAt=${startedAt} older agent has taken job`, + ); + } else { + this.logger.debug(`Proving job id=${id} type=${ProvingRequestType[job.type]} heartbeat`); + } + metadata.startedAt = startedAt; + metadata.lastUpdatedAt = now; + return Promise.resolve(undefined); + } else if (filter) { + this.logger.warn( + `Proving job id=${id} type=${ + ProvingRequestType[job.type] + } already being worked on by another agent. Sending new one`, + ); + return this.getProvingJob(filter); + } else { + return Promise.resolve(undefined); + } + } + + async reportProvingJobSuccess(id: V2ProvingJobId, value: V2ProofOutput): Promise { + const info = this.inProgress.get(id); + const item = this.jobsCache.get(id); + const retries = this.retries.get(id) ?? 0; + if (!item) { + this.logger.warn(`Proving job id=${id} not found`); + return; + } + + if (!info) { + this.logger.warn(`Proving job id=${id} type=${ProvingRequestType[item.type]} not in the in-progress set`); + } else { + this.inProgress.delete(id); + } + + this.logger.debug( + `Proving job complete id=${id} type=${ProvingRequestType[item.type]} totalAttempts=${retries + 1}`, + ); + await this.database.setProvingJobResult(id, value); + this.resultsCache.set(id, { value }); + } + + private timeoutCheck = () => { + const inProgressEntries = Array.from(this.inProgress.entries()); + for (const [id, metadata] of inProgressEntries) { + const item = this.jobsCache.get(id); + if (!item) { + this.logger.warn(`Proving job id=${id} not found. Removing it from the queue.`); + this.inProgress.delete(id); + continue; + } + + const secondsSinceLastUpdate = this.timeSource() - metadata.lastUpdatedAt; + if (secondsSinceLastUpdate >= this.jobTimeoutSec) { + this.logger.warn(`Proving job id=${id} timed out. Adding it back to the queue.`); + this.inProgress.delete(id); + this.enqueueJobInternal(item); + } + } + }; + + private enqueueJobInternal(job: V2ProvingJob): void { + this.queues[job.type].put(job); + this.logger.debug(`Enqueued new proving job id=${job.id}`); + } +} + +type ProvingQueues = { + [K in ProvingRequestType]: PriorityMemoryQueue; +}; + +/** + * Compares two proving jobs and selects which one's more important + * @param a - A proving job + * @param b - Another proving job + * @returns A number indicating the relative priority of the two proving jobs + */ +function provingJobComparator(a: V2ProvingJob, b: V2ProvingJob): -1 | 0 | 1 { + if (a.blockNumber < b.blockNumber) { + return -1; + } else if (a.blockNumber > b.blockNumber) { + return 1; + } else { + return 0; + } +} + +/** + * Compares two proofs and selects which one's more important. + * If some proofs does not exist in the priority array then it's considered the least important. + * + * @param a - A proof type + * @param b - Another proof type + * @returns A number indicating the relative priority of the two proof types + */ +function proofTypeComparator(a: ProvingRequestType, b: ProvingRequestType): -1 | 0 | 1 { + const indexOfA = PROOF_TYPES_IN_PRIORITY_ORDER.indexOf(a); + const indexOfB = PROOF_TYPES_IN_PRIORITY_ORDER.indexOf(b); + if (indexOfA === indexOfB) { + return 0; + } else if (indexOfA === -1) { + // a is some new proof that didn't get added to the array + // b is more important because we know about it + return 1; + } else if (indexOfB === -1) { + // the opposite of the previous if branch + return -1; + } else if (indexOfA < indexOfB) { + return -1; + } else { + return 1; + } +} + +/** + * Relative priority of each proof type. Proofs higher up on the list are more important and should be prioritized + * over proofs lower on the list. + * + * The aim is that this will speed up block proving as the closer we get to a block's root proof the more likely it + * is to get picked up by agents + */ +const PROOF_TYPES_IN_PRIORITY_ORDER: ProvingRequestType[] = [ + ProvingRequestType.BLOCK_ROOT_ROLLUP, + ProvingRequestType.BLOCK_MERGE_ROLLUP, + ProvingRequestType.ROOT_ROLLUP, + ProvingRequestType.MERGE_ROLLUP, + ProvingRequestType.PUBLIC_BASE_ROLLUP, + ProvingRequestType.PRIVATE_BASE_ROLLUP, + ProvingRequestType.PUBLIC_VM, + ProvingRequestType.TUBE_PROOF, + ProvingRequestType.ROOT_PARITY, + ProvingRequestType.BASE_PARITY, + ProvingRequestType.EMPTY_BLOCK_ROOT_ROLLUP, + ProvingRequestType.PRIVATE_KERNEL_EMPTY, +]; diff --git a/yarn-project/prover-client/src/proving_broker/proving_broker_database.ts b/yarn-project/prover-client/src/proving_broker/proving_broker_database.ts new file mode 100644 index 00000000000..0d7983f5252 --- /dev/null +++ b/yarn-project/prover-client/src/proving_broker/proving_broker_database.ts @@ -0,0 +1,81 @@ +import { + type V2ProofOutput, + type V2ProvingJob, + type V2ProvingJobId, + type V2ProvingJobResult, +} from '@aztec/circuit-types'; + +export interface ProvingBrokerDatabase { + /** + * Saves a proof request so it can be retrieved later + * @param request - The proof request to save + */ + addProvingJob(request: V2ProvingJob): Promise; + + /** + * Removes a proof request from the backend + * @param id - The ID of the proof request to remove + */ + deleteProvingJobAndResult(id: V2ProvingJobId): Promise; + + /** + * Returns an iterator over all saved proving jobs + */ + allProvingJobs(): Iterable<[V2ProvingJob, V2ProvingJobResult | undefined]>; + + /** + * Saves the result of a proof request + * @param id - The ID of the proof request to save the result for + * @param ProvingRequestType - The type of proof that was requested + * @param value - The result of the proof request + */ + setProvingJobResult(id: V2ProvingJobId, value: V2ProofOutput): Promise; + + /** + * Saves an error that occurred while processing a proof request + * @param id - The ID of the proof request to save the error for + * @param ProvingRequestType - The type of proof that was requested + * @param err - The error that occurred while processing the proof request + */ + setProvingJobError(id: V2ProvingJobId, err: Error): Promise; +} + +export class InMemoryDatabase implements ProvingBrokerDatabase { + private jobs = new Map(); + private results = new Map(); + + getProvingJob(id: V2ProvingJobId): V2ProvingJob | undefined { + return this.jobs.get(id); + } + + getProvingJobResult(id: V2ProvingJobId): V2ProvingJobResult | undefined { + return this.results.get(id); + } + + addProvingJob(request: V2ProvingJob): Promise { + this.jobs.set(request.id, request); + return Promise.resolve(); + } + + setProvingJobResult(id: V2ProvingJobId, value: V2ProofOutput): Promise { + this.results.set(id, { value }); + return Promise.resolve(); + } + + setProvingJobError(id: V2ProvingJobId, error: Error): Promise { + this.results.set(id, { error: String(error) }); + return Promise.resolve(); + } + + deleteProvingJobAndResult(id: V2ProvingJobId): Promise { + this.jobs.delete(id); + this.results.delete(id); + return Promise.resolve(); + } + + *allProvingJobs(): Iterable<[V2ProvingJob, V2ProvingJobResult | undefined]> { + for (const item of this.jobs.values()) { + yield [item, this.results.get(item.id)] as const; + } + } +} diff --git a/yarn-project/prover-client/src/proving_broker/proving_broker_interface.ts b/yarn-project/prover-client/src/proving_broker/proving_broker_interface.ts new file mode 100644 index 00000000000..493cab538a5 --- /dev/null +++ b/yarn-project/prover-client/src/proving_broker/proving_broker_interface.ts @@ -0,0 +1,74 @@ +import { + type ProvingRequestType, + type V2ProofOutput, + type V2ProvingJob, + type V2ProvingJobId, + type V2ProvingJobStatus, +} from '@aztec/circuit-types'; + +/** + * An interface for the proving orchestrator. The producer uses this to enqueue jobs for agents + */ +export interface ProvingJobProducer { + /** + * Enqueues a proving job + * @param job - The job to enqueue + */ + enqueueProvingJob(job: V2ProvingJob): Promise; + + /** + * Cancels a proving job and clears all of its + * @param id - The ID of the job to cancel + */ + removeAndCancelProvingJob(id: V2ProvingJobId): Promise; + + /** + * Returns the current status fof the proving job + * @param id - The ID of the job to get the status of + */ + getProvingJobStatus(id: V2ProvingJobId): Promise; +} + +export interface ProvingJobFilter { + allowList?: T; +} + +/** + * An interface for proving agents to request jobs and report results + */ +export interface ProvingJobConsumer { + /** + * Gets a proving job to work on + * @param filter - Optional filter for the type of job to get + */ + getProvingJob( + filter?: ProvingJobFilter, + ): Promise<{ job: V2ProvingJob; time: number } | undefined>; + + /** + * Marks a proving job as successful + * @param id - The ID of the job to report success for + * @param result - The result of the job + */ + reportProvingJobSuccess(id: V2ProvingJobId, result: V2ProofOutput): Promise; + + /** + * Marks a proving job as errored + * @param id - The ID of the job to report an error for + * @param err - The error that occurred while processing the job + * @param retry - Whether to retry the job + */ + reportProvingJobError(id: V2ProvingJobId, err: Error, retry?: boolean): Promise; + + /** + * Sends a heartbeat to the broker to indicate that the agent is still working on the given proving job + * @param id - The ID of the job to report progress for + * @param startedAt - The unix epoch when the job was started + * @param filter - Optional filter for the type of job to get + */ + reportProvingJobProgress( + id: V2ProvingJobId, + startedAt: number, + filter?: ProvingJobFilter, + ): Promise<{ job: V2ProvingJob; time: number } | undefined>; +}