From 1b60805e8b87a6d006e0bdef98280ac06650d5f9 Mon Sep 17 00:00:00 2001 From: Alex Gherghisan Date: Tue, 4 Jun 2024 14:03:26 +0000 Subject: [PATCH 1/2] feat: track timeout status of proving jobs --- .../src/interfaces/prover-client.ts | 4 + .../src/interfaces/proving-job.ts | 2 + yarn-project/foundation/src/error/index.ts | 2 +- .../foundation/src/promise/running-promise.ts | 2 +- yarn-project/prover-client/src/config.ts | 19 ++- .../prover-client/src/mocks/test_context.ts | 1 + .../src/orchestrator/orchestrator.ts | 4 +- .../agent-queue-integration.test.ts | 99 ++++++++++++++ .../prover-agent/memory-proving-queue.test.ts | 48 ++++++- .../src/prover-agent/memory-proving-queue.ts | 125 ++++++++++++++++-- .../src/prover-agent/prover-agent.test.ts | 2 + .../src/prover-agent/prover-agent.ts | 55 +++++--- .../prover-client/src/tx-prover/tx-prover.ts | 5 +- 13 files changed, 327 insertions(+), 41 deletions(-) create mode 100644 yarn-project/prover-client/src/prover-agent/agent-queue-integration.test.ts diff --git a/yarn-project/circuit-types/src/interfaces/prover-client.ts b/yarn-project/circuit-types/src/interfaces/prover-client.ts index 939ccc8f350..800a923e9ca 100644 --- a/yarn-project/circuit-types/src/interfaces/prover-client.ts +++ b/yarn-project/circuit-types/src/interfaces/prover-client.ts @@ -18,6 +18,10 @@ export type ProverConfig = { proverAgentPollInterval: number; /** The maximum number of proving jobs to be run in parallel */ proverAgentConcurrency: number; + /** Jobs are retried if not kept alive for this long */ + proverJobTimeoutMs: number; + /** The interval to check job health status */ + proverJobPollIntervalMs: number; }; /** diff --git a/yarn-project/circuit-types/src/interfaces/proving-job.ts b/yarn-project/circuit-types/src/interfaces/proving-job.ts index 8b187c47d9d..edb8a0cdad9 100644 --- a/yarn-project/circuit-types/src/interfaces/proving-job.ts +++ b/yarn-project/circuit-types/src/interfaces/proving-job.ts @@ -124,6 +124,8 @@ export type ProvingRequestResult = ProvingRequestP export interface ProvingJobSource { getProvingJob(): Promise | undefined>; + heartbeat(jobId: string): Promise; + resolveProvingJob(jobId: string, result: ProvingRequestResult): Promise; rejectProvingJob(jobId: string, reason: Error): Promise; diff --git a/yarn-project/foundation/src/error/index.ts b/yarn-project/foundation/src/error/index.ts index 1986e2dec0b..825e2bc9378 100644 --- a/yarn-project/foundation/src/error/index.ts +++ b/yarn-project/foundation/src/error/index.ts @@ -13,4 +13,4 @@ export class TimeoutError extends Error {} /** * Represents an error thrown when an operation is aborted. */ -export class AbortedError extends Error {} +export class AbortError extends Error {} diff --git a/yarn-project/foundation/src/promise/running-promise.ts b/yarn-project/foundation/src/promise/running-promise.ts index 2c3c7f15c33..c522662235d 100644 --- a/yarn-project/foundation/src/promise/running-promise.ts +++ b/yarn-project/foundation/src/promise/running-promise.ts @@ -10,7 +10,7 @@ export class RunningPromise { private runningPromise = Promise.resolve(); private interruptibleSleep = new InterruptibleSleep(); - constructor(private fn: () => Promise, private pollingIntervalMS = 10000) {} + constructor(private fn: () => void | Promise, private pollingIntervalMS = 10000) {} /** * Starts the running promise. diff --git a/yarn-project/prover-client/src/config.ts b/yarn-project/prover-client/src/config.ts index 4e375807695..3ebf5cf6626 100644 --- a/yarn-project/prover-client/src/config.ts +++ b/yarn-project/prover-client/src/config.ts @@ -34,16 +34,16 @@ export function getProverEnvVars(): ProverClientConfig { PROVER_AGENT_CONCURRENCY = PROVER_AGENTS, PROVER_AGENT_POLL_INTERVAL_MS = '100', PROVER_REAL_PROOFS = '', + PROVER_JOB_TIMEOUT_MS = '60000', + PROVER_JOB_POLL_INTERVAL_MS = '100', } = process.env; const realProofs = ['1', 'true'].includes(PROVER_REAL_PROOFS); const proverAgentEnabled = ['1', 'true'].includes(PROVER_AGENT_ENABLED); - const parsedProverConcurrency = parseInt(PROVER_AGENT_CONCURRENCY, 10); - const proverAgentConcurrency = Number.isSafeInteger(parsedProverConcurrency) ? parsedProverConcurrency : 1; - const parsedProverAgentPollInterval = parseInt(PROVER_AGENT_POLL_INTERVAL_MS, 10); - const proverAgentPollInterval = Number.isSafeInteger(parsedProverAgentPollInterval) - ? parsedProverAgentPollInterval - : 100; + const proverAgentConcurrency = safeParseNumber(PROVER_AGENT_CONCURRENCY, 1); + const proverAgentPollInterval = safeParseNumber(PROVER_AGENT_POLL_INTERVAL_MS, 100); + const proverJobTimeoutMs = safeParseNumber(PROVER_JOB_TIMEOUT_MS, 60000); + const proverJobPollIntervalMs = safeParseNumber(PROVER_JOB_POLL_INTERVAL_MS, 100); return { acvmWorkingDirectory: ACVM_WORKING_DIRECTORY, @@ -55,5 +55,12 @@ export function getProverEnvVars(): ProverClientConfig { proverAgentPollInterval, proverAgentConcurrency, nodeUrl: AZTEC_NODE_URL, + proverJobPollIntervalMs, + proverJobTimeoutMs, }; } + +function safeParseNumber(value: string, defaultValue: number): number { + const parsedValue = parseInt(value, 10); + return Number.isSafeInteger(parsedValue) ? parsedValue : defaultValue; +} diff --git a/yarn-project/prover-client/src/mocks/test_context.ts b/yarn-project/prover-client/src/mocks/test_context.ts index ae895ddb2d9..1af8c556c48 100644 --- a/yarn-project/prover-client/src/mocks/test_context.ts +++ b/yarn-project/prover-client/src/mocks/test_context.ts @@ -127,6 +127,7 @@ export class TestContext { const orchestrator = new ProvingOrchestrator(actualDb, queue); const agent = new ProverAgent(localProver, proverCount); + queue.start(); agent.start(queue); return new this( diff --git a/yarn-project/prover-client/src/orchestrator/orchestrator.ts b/yarn-project/prover-client/src/orchestrator/orchestrator.ts index afdeef02fc5..a270cbb0628 100644 --- a/yarn-project/prover-client/src/orchestrator/orchestrator.ts +++ b/yarn-project/prover-client/src/orchestrator/orchestrator.ts @@ -48,7 +48,7 @@ import { } from '@aztec/circuits.js'; import { makeTuple } from '@aztec/foundation/array'; import { padArrayEnd } from '@aztec/foundation/collection'; -import { AbortedError } from '@aztec/foundation/error'; +import { AbortError } from '@aztec/foundation/error'; import { createDebugLogger } from '@aztec/foundation/log'; import { promiseWithResolvers } from '@aztec/foundation/promise'; import { BufferReader, type Tuple } from '@aztec/foundation/serialize'; @@ -475,7 +475,7 @@ export class ProvingOrchestrator { await callback(result); } catch (err) { - if (err instanceof AbortedError) { + if (err instanceof AbortError) { // operation was cancelled, probably because the block was cancelled // drop this result return; diff --git a/yarn-project/prover-client/src/prover-agent/agent-queue-integration.test.ts b/yarn-project/prover-client/src/prover-agent/agent-queue-integration.test.ts new file mode 100644 index 00000000000..ca8b80573c4 --- /dev/null +++ b/yarn-project/prover-client/src/prover-agent/agent-queue-integration.test.ts @@ -0,0 +1,99 @@ +import { type ServerCircuitProver } from '@aztec/circuit-types'; +import { RECURSIVE_PROOF_LENGTH, type RootParityInput } from '@aztec/circuits.js'; +import { makeBaseParityInputs, makeRootParityInput } from '@aztec/circuits.js/testing'; +import { AbortError } from '@aztec/foundation/error'; +import { promiseWithResolvers } from '@aztec/foundation/promise'; +import { sleep } from '@aztec/foundation/sleep'; + +import { type MockProxy, mock } from 'jest-mock-extended'; + +import { MemoryProvingQueue } from './memory-proving-queue.js'; +import { ProverAgent } from './prover-agent.js'; + +describe('Prover agent <-> queue integration', () => { + let queue: MemoryProvingQueue; + let agent: ProverAgent; + let prover: MockProxy; + let agentPollInterval: number; + let queuePollInterval: number; + let queueJobTimeout: number; + + beforeEach(() => { + prover = mock(); + + queueJobTimeout = 100; + queuePollInterval = 10; + queue = new MemoryProvingQueue(queueJobTimeout, queuePollInterval); + + agentPollInterval = 10; + agent = new ProverAgent(prover, 1, agentPollInterval); + + queue.start(); + agent.start(queue); + }); + + afterEach(async () => { + await agent.stop(); + await queue.stop(); + }); + + it('picks up jobs from the queue', async () => { + const { promise, resolve } = promiseWithResolvers>(); + const output = makeRootParityInput(RECURSIVE_PROOF_LENGTH, 1); + prover.getBaseParityProof.mockResolvedValueOnce(promise); + const proofPromise = queue.getBaseParityProof(makeBaseParityInputs()); + + await sleep(agentPollInterval); + resolve(output); + await expect(proofPromise).resolves.toEqual(output); + }); + + it('keeps job alive', async () => { + const { promise, resolve } = promiseWithResolvers>(); + const output = makeRootParityInput(RECURSIVE_PROOF_LENGTH, 1); + prover.getBaseParityProof.mockResolvedValueOnce(promise); + const proofPromise = queue.getBaseParityProof(makeBaseParityInputs()); + + await sleep(2 * queueJobTimeout); + resolve(output); + await expect(proofPromise).resolves.toEqual(output); + }); + + it('reports cancellations', async () => { + const { promise, resolve } = promiseWithResolvers>(); + const output = makeRootParityInput(RECURSIVE_PROOF_LENGTH, 1); + prover.getBaseParityProof.mockResolvedValueOnce(promise); + const controller = new AbortController(); + const proofPromise = queue.getBaseParityProof(makeBaseParityInputs(), controller.signal); + await sleep(agentPollInterval); + controller.abort(); + resolve(output); + await expect(proofPromise).rejects.toThrow(AbortError); + }); + + it('re-queues timed out jobs', async () => { + const firstRun = promiseWithResolvers>(); + const output = makeRootParityInput(RECURSIVE_PROOF_LENGTH, 1); + prover.getBaseParityProof.mockResolvedValueOnce(firstRun.promise); + const proofPromise = queue.getBaseParityProof(makeBaseParityInputs()); + + // stop the agent to simulate a machine going down + await agent.stop(); + + // give the queue a chance to figure out the node is timed out and re-queue the job + await sleep(queueJobTimeout); + // reset the mock + const secondRun = promiseWithResolvers>(); + prover.getBaseParityProof.mockResolvedValueOnce(secondRun.promise); + const newAgent = new ProverAgent(prover, 1, agentPollInterval); + newAgent.start(queue); + // test that the job is re-queued and kept alive by the new agent + await sleep(queueJobTimeout * 2); + secondRun.resolve(output); + await expect(proofPromise).resolves.toEqual(output); + + firstRun.reject(new Error('stop this promise otherwise it hangs jest')); + + await newAgent.stop(); + }); +}); diff --git a/yarn-project/prover-client/src/prover-agent/memory-proving-queue.test.ts b/yarn-project/prover-client/src/prover-agent/memory-proving-queue.test.ts index dc3d77ab7dc..1aed172cc54 100644 --- a/yarn-project/prover-client/src/prover-agent/memory-proving-queue.test.ts +++ b/yarn-project/prover-client/src/prover-agent/memory-proving-queue.test.ts @@ -6,14 +6,25 @@ import { makeRecursiveProof, } from '@aztec/circuits.js'; import { makeBaseParityInputs, makeBaseRollupInputs, makeParityPublicInputs } from '@aztec/circuits.js/testing'; +import { AbortError } from '@aztec/foundation/error'; +import { sleep } from '@aztec/foundation/sleep'; import { MemoryProvingQueue } from './memory-proving-queue.js'; describe('MemoryProvingQueue', () => { let queue: MemoryProvingQueue; + let jobTimeoutMs: number; + let pollingIntervalMs: number; beforeEach(() => { - queue = new MemoryProvingQueue(); + jobTimeoutMs = 100; + pollingIntervalMs = 10; + queue = new MemoryProvingQueue(jobTimeoutMs, pollingIntervalMs); + queue.start(); + }); + + afterEach(async () => { + await queue.stop(); }); it('returns jobs in order', async () => { @@ -68,4 +79,39 @@ describe('MemoryProvingQueue', () => { await expect(promise).rejects.toEqual(error); }); + + it('reaps timed out jobs', async () => { + const controller = new AbortController(); + const promise = queue.getBaseParityProof(makeBaseParityInputs(), controller.signal); + const job = await queue.getProvingJob(); + + expect(queue.isJobRunning(job!.id)).toBe(true); + await sleep(jobTimeoutMs + 2 * pollingIntervalMs); + expect(queue.isJobRunning(job!.id)).toBe(false); + + controller.abort(); + await expect(promise).rejects.toThrow(AbortError); + }); + + it('keeps jobs running while heartbeat is called', async () => { + const promise = queue.getBaseParityProof(makeBaseParityInputs()); + const job = await queue.getProvingJob(); + + expect(queue.isJobRunning(job!.id)).toBe(true); + await sleep(pollingIntervalMs); + expect(queue.isJobRunning(job!.id)).toBe(true); + + await queue.heartbeat(job!.id); + expect(queue.isJobRunning(job!.id)).toBe(true); + await sleep(pollingIntervalMs); + expect(queue.isJobRunning(job!.id)).toBe(true); + + const output = new RootParityInput( + makeRecursiveProof(RECURSIVE_PROOF_LENGTH), + VerificationKeyAsFields.makeFake(), + makeParityPublicInputs(), + ); + await queue.resolveProvingJob(job!.id, output); + await expect(promise).resolves.toEqual(output); + }); }); diff --git a/yarn-project/prover-client/src/prover-agent/memory-proving-queue.ts b/yarn-project/prover-client/src/prover-agent/memory-proving-queue.ts index 094b1de0da4..2face42d04e 100644 --- a/yarn-project/prover-client/src/prover-agent/memory-proving-queue.ts +++ b/yarn-project/prover-client/src/prover-agent/memory-proving-queue.ts @@ -27,21 +27,23 @@ import type { RootRollupPublicInputs, } from '@aztec/circuits.js'; import { randomBytes } from '@aztec/foundation/crypto'; -import { AbortedError, TimeoutError } from '@aztec/foundation/error'; +import { AbortError, TimeoutError } from '@aztec/foundation/error'; import { MemoryFifo } from '@aztec/foundation/fifo'; import { createDebugLogger } from '@aztec/foundation/log'; -import { type PromiseWithResolvers, promiseWithResolvers } from '@aztec/foundation/promise'; +import { type PromiseWithResolvers, RunningPromise, promiseWithResolvers } from '@aztec/foundation/promise'; type ProvingJobWithResolvers = { id: string; request: T; signal?: AbortSignal; attempts: number; + heartbeat: number; } & PromiseWithResolvers>; const MAX_RETRIES = 3; const defaultIdGenerator = () => randomBytes(4).toString('hex'); +const defaultTimeSource = () => Date.now(); /** * A helper class that sits in between services that need proofs created and agents that can create them. @@ -52,20 +54,55 @@ export class MemoryProvingQueue implements ServerCircuitProver, ProvingJobSource private queue = new MemoryFifo(); private jobsInProgress = new Map(); - constructor(private generateId = defaultIdGenerator) {} + private runningPromise: RunningPromise; + + constructor( + /** Timeout the job if an agent doesn't report back in this time */ + private jobTimeoutMs = 60 * 1000, + /** How often to check for timed out jobs */ + pollingIntervalMs = 1000, + private generateId = defaultIdGenerator, + private timeSource = defaultTimeSource, + ) { + this.runningPromise = new RunningPromise(this.poll, pollingIntervalMs); + } + + public start() { + if (this.runningPromise.isRunning()) { + this.log.warn('Proving queue is already running'); + return; + } + + this.runningPromise.start(); + this.log.info('Proving queue started'); + } + + public async stop() { + if (!this.runningPromise.isRunning()) { + this.log.warn('Proving queue is already stopped'); + return; + } + + await this.runningPromise.stop(); + this.log.info('Proving queue stopped'); + } + + public async getProvingJob({ timeoutSec = 1 } = {}): Promise | undefined> { + if (!this.runningPromise.isRunning()) { + throw new Error('Proving queue is not running. Start the queue before getting jobs.'); + } - async getProvingJob({ timeoutSec = 1 } = {}): Promise | undefined> { try { const job = await this.queue.get(timeoutSec); if (!job) { return undefined; } - if (job.signal?.aborted) { - this.log.debug(`Job ${job.id} type=${job.request.type} has been aborted`); + if (this.rejectIfAborted(job)) { return undefined; } + job.heartbeat = this.timeSource(); this.jobsInProgress.set(job.id, job); return { id: job.id, @@ -81,30 +118,40 @@ export class MemoryProvingQueue implements ServerCircuitProver, ProvingJobSource } resolveProvingJob(jobId: string, result: ProvingRequestResult): Promise { + if (!this.runningPromise.isRunning()) { + throw new Error('Proving queue is not running.'); + } + const job = this.jobsInProgress.get(jobId); if (!job) { - return Promise.reject(new Error('Job not found')); + this.log.warn(`Job id=${jobId} not found. Can't resolve`); + return Promise.resolve(); } this.jobsInProgress.delete(jobId); - if (job.signal?.aborted) { + if (this.rejectIfAborted(job)) { + return Promise.resolve(); + } else { + job.resolve(result); return Promise.resolve(); } - - job.resolve(result); - return Promise.resolve(); } rejectProvingJob(jobId: string, err: any): Promise { + if (!this.runningPromise.isRunning()) { + throw new Error('Proving queue is not running.'); + } + const job = this.jobsInProgress.get(jobId); if (!job) { - return Promise.reject(new Error('Job not found')); + this.log.warn(`Job id=${jobId} not found. Can't reject`); + return Promise.resolve(); } this.jobsInProgress.delete(jobId); - if (job.signal?.aborted) { + if (this.rejectIfAborted(job)) { return Promise.resolve(); } @@ -123,10 +170,49 @@ export class MemoryProvingQueue implements ServerCircuitProver, ProvingJobSource return Promise.resolve(); } + public heartbeat(jobId: string): Promise { + if (!this.runningPromise.isRunning()) { + throw new Error('Proving queue is not running.'); + } + + const job = this.jobsInProgress.get(jobId); + if (job) { + job.heartbeat = this.timeSource(); + } + + return Promise.resolve(); + } + + public isJobRunning(jobId: string): boolean { + return this.jobsInProgress.has(jobId); + } + + private poll = () => { + const now = this.timeSource(); + + for (const job of this.jobsInProgress.values()) { + if (this.rejectIfAborted(job)) { + continue; + } + + if (job.heartbeat + this.jobTimeoutMs < now) { + this.log.warn(`Job ${job.id} type=${ProvingRequestType[job.request.type]} has timed out`); + + this.jobsInProgress.delete(job.id); + job.heartbeat = 0; + this.queue.put(job); + } + } + }; + private enqueue( request: T, signal?: AbortSignal, ): Promise> { + if (!this.runningPromise.isRunning()) { + return Promise.reject(new Error('Proving queue is not running.')); + } + const { promise, resolve, reject } = promiseWithResolvers>(); const item: ProvingJobWithResolvers = { id: this.generateId(), @@ -136,10 +222,11 @@ export class MemoryProvingQueue implements ServerCircuitProver, ProvingJobSource resolve, reject, attempts: 1, + heartbeat: 0, }; if (signal) { - signal.addEventListener('abort', () => reject(new AbortedError('Operation has been aborted'))); + signal.addEventListener('abort', () => reject(new AbortError('Operation has been aborted'))); } this.log.debug( @@ -300,4 +387,14 @@ export class MemoryProvingQueue implements ServerCircuitProver, ProvingJobSource verifyProof(): Promise { return Promise.reject('not implemented'); } + + private rejectIfAborted(job: ProvingJobWithResolvers): boolean { + if (job.signal?.aborted) { + this.log.debug(`Job ${job.id} type=${ProvingRequestType[job.request.type]} has been aborted`); + job.reject(new AbortError('Proving job has been aborted')); + return true; + } + + return false; + } } diff --git a/yarn-project/prover-client/src/prover-agent/prover-agent.test.ts b/yarn-project/prover-client/src/prover-agent/prover-agent.test.ts index b63e37ad34e..3c52df01acc 100644 --- a/yarn-project/prover-client/src/prover-agent/prover-agent.test.ts +++ b/yarn-project/prover-client/src/prover-agent/prover-agent.test.ts @@ -24,11 +24,13 @@ describe('ProverAgent', () => { }); beforeEach(() => { + queue.start(); agent.start(queue); }); afterEach(async () => { await agent.stop(); + await queue.stop(); }); it('takes jobs from the queue', async () => { diff --git a/yarn-project/prover-client/src/prover-agent/prover-agent.ts b/yarn-project/prover-client/src/prover-agent/prover-agent.ts index d0480f7a9f5..14cb93aa74b 100644 --- a/yarn-project/prover-client/src/prover-agent/prover-agent.ts +++ b/yarn-project/prover-client/src/prover-agent/prover-agent.ts @@ -16,7 +16,7 @@ import { ProvingError } from './proving-error.js'; * A helper class that encapsulates a circuit prover and connects it to a job source. */ export class ProverAgent { - private inFlightPromises = new Set>(); + private inFlightPromises = new Map>(); private runningPromise?: RunningPromise; constructor( @@ -50,20 +50,28 @@ export class ProverAgent { } this.runningPromise = new RunningPromise(async () => { + for (const jobId of this.inFlightPromises.keys()) { + await jobSource.heartbeat(jobId); + } + while (this.inFlightPromises.size < this.maxConcurrency) { - const job = await jobSource.getProvingJob(); - if (!job) { - // job source is fully drained, sleep for a bit and try again - return; + try { + const job = await jobSource.getProvingJob(); + if (!job) { + // job source is fully drained, sleep for a bit and try again + return; + } + + const promise = this.work(jobSource, job).finally(() => this.inFlightPromises.delete(job.id)); + this.inFlightPromises.set(job.id, promise); + } catch (err) { + this.log.warn(`Error processing job: ${err}`); } - - const promise = this.work(jobSource, job).finally(() => this.inFlightPromises.delete(promise)); - this.inFlightPromises.add(promise); } }, this.pollIntervalMs); this.runningPromise.start(); - this.log.info('Agent started'); + this.log.info(`Agent started with concurrency=${this.maxConcurrency}`); } async stop(): Promise { @@ -79,14 +87,31 @@ export class ProverAgent { private async work(jobSource: ProvingJobSource, job: ProvingJob): Promise { try { + this.log.debug(`Picked up proving job id=${job.id} type=${ProvingRequestType[job.request.type]}`); const [time, result] = await elapsed(this.getProof(job.request)); - await jobSource.resolveProvingJob(job.id, result); - this.log.debug( - `Processed proving job id=${job.id} type=${ProvingRequestType[job.request.type]} duration=${time}ms`, - ); + if (this.isRunning()) { + this.log.debug( + `Processed proving job id=${job.id} type=${ProvingRequestType[job.request.type]} duration=${time}ms`, + ); + await jobSource.resolveProvingJob(job.id, result); + } else { + this.log.debug( + `Dropping proving job id=${job.id} type=${ + ProvingRequestType[job.request.type] + } duration=${time}ms: agent stopped`, + ); + } } catch (err) { - this.log.error(`Error processing proving job id=${job.id} type=${ProvingRequestType[job.request.type]}: ${err}`); - await jobSource.rejectProvingJob(job.id, new ProvingError((err as any)?.message ?? String(err))); + if (this.isRunning()) { + this.log.error( + `Error processing proving job id=${job.id} type=${ProvingRequestType[job.request.type]}: ${err}`, + ); + await jobSource.rejectProvingJob(job.id, new ProvingError((err as any)?.message ?? String(err))); + } else { + this.log.debug( + `Dropping proving job id=${job.id} type=${ProvingRequestType[job.request.type]}: agent stopped: ${err}`, + ); + } } } diff --git a/yarn-project/prover-client/src/tx-prover/tx-prover.ts b/yarn-project/prover-client/src/tx-prover/tx-prover.ts index 4a0b06f7d81..6008cfe9db5 100644 --- a/yarn-project/prover-client/src/tx-prover/tx-prover.ts +++ b/yarn-project/prover-client/src/tx-prover/tx-prover.ts @@ -21,7 +21,7 @@ import { ProverAgent } from '../prover-agent/prover-agent.js'; */ export class TxProver implements ProverClient { private orchestrator: ProvingOrchestrator; - private queue = new MemoryProvingQueue(); + private queue: MemoryProvingQueue; private running = false; private constructor( @@ -31,6 +31,7 @@ export class TxProver implements ProverClient { private agent?: ProverAgent, initialHeader?: Header, ) { + this.queue = new MemoryProvingQueue(config.proverJobTimeoutMs, config.proverJobPollIntervalMs); this.orchestrator = new ProvingOrchestrator(worldStateSynchronizer.getLatest(), this.queue, initialHeader); } @@ -66,6 +67,7 @@ export class TxProver implements ProverClient { } this.running = true; + this.queue.start(); this.agent?.start(this.queue); return Promise.resolve(); } @@ -79,6 +81,7 @@ export class TxProver implements ProverClient { } this.running = false; await this.agent?.stop(); + await this.queue.stop(); } /** From 0a63107a7e52baa94ddc02e26139b8fb5c124e9f Mon Sep 17 00:00:00 2001 From: Alex Gherghisan Date: Tue, 4 Jun 2024 14:52:44 +0000 Subject: [PATCH 2/2] refactor: remove rejectIfAborted --- .../src/interfaces/proving-job.ts | 19 ++++++++++++++ yarn-project/prover-client/src/config.ts | 4 +-- .../src/prover-agent/memory-proving-queue.ts | 25 ++++++------------- 3 files changed, 28 insertions(+), 20 deletions(-) diff --git a/yarn-project/circuit-types/src/interfaces/proving-job.ts b/yarn-project/circuit-types/src/interfaces/proving-job.ts index edb8a0cdad9..e68eff265ac 100644 --- a/yarn-project/circuit-types/src/interfaces/proving-job.ts +++ b/yarn-project/circuit-types/src/interfaces/proving-job.ts @@ -122,11 +122,30 @@ export type ProvingRequestPublicInputs = { export type ProvingRequestResult = ProvingRequestPublicInputs[T]; export interface ProvingJobSource { + /** + * Gets the next proving job. `heartbeat` must be called periodically to keep the job alive. + * @returns The proving job, or undefined if there are no jobs available. + */ getProvingJob(): Promise | undefined>; + /** + * Keeps the job alive. If this isn't called regularly then the job will be + * considered abandoned and re-queued for another consumer to pick up + * @param jobId The ID of the job to heartbeat. + */ heartbeat(jobId: string): Promise; + /** + * Resolves a proving job. + * @param jobId - The ID of the job to resolve. + * @param result - The result of the proving job. + */ resolveProvingJob(jobId: string, result: ProvingRequestResult): Promise; + /** + * Rejects a proving job. + * @param jobId - The ID of the job to reject. + * @param reason - The reason for rejecting the job. + */ rejectProvingJob(jobId: string, reason: Error): Promise; } diff --git a/yarn-project/prover-client/src/config.ts b/yarn-project/prover-client/src/config.ts index 3ebf5cf6626..5f93b0b0e25 100644 --- a/yarn-project/prover-client/src/config.ts +++ b/yarn-project/prover-client/src/config.ts @@ -35,7 +35,7 @@ export function getProverEnvVars(): ProverClientConfig { PROVER_AGENT_POLL_INTERVAL_MS = '100', PROVER_REAL_PROOFS = '', PROVER_JOB_TIMEOUT_MS = '60000', - PROVER_JOB_POLL_INTERVAL_MS = '100', + PROVER_JOB_POLL_INTERVAL_MS = '1000', } = process.env; const realProofs = ['1', 'true'].includes(PROVER_REAL_PROOFS); @@ -43,7 +43,7 @@ export function getProverEnvVars(): ProverClientConfig { const proverAgentConcurrency = safeParseNumber(PROVER_AGENT_CONCURRENCY, 1); const proverAgentPollInterval = safeParseNumber(PROVER_AGENT_POLL_INTERVAL_MS, 100); const proverJobTimeoutMs = safeParseNumber(PROVER_JOB_TIMEOUT_MS, 60000); - const proverJobPollIntervalMs = safeParseNumber(PROVER_JOB_POLL_INTERVAL_MS, 100); + const proverJobPollIntervalMs = safeParseNumber(PROVER_JOB_POLL_INTERVAL_MS, 1000); return { acvmWorkingDirectory: ACVM_WORKING_DIRECTORY, diff --git a/yarn-project/prover-client/src/prover-agent/memory-proving-queue.ts b/yarn-project/prover-client/src/prover-agent/memory-proving-queue.ts index 2face42d04e..ba9935faca9 100644 --- a/yarn-project/prover-client/src/prover-agent/memory-proving-queue.ts +++ b/yarn-project/prover-client/src/prover-agent/memory-proving-queue.ts @@ -98,7 +98,7 @@ export class MemoryProvingQueue implements ServerCircuitProver, ProvingJobSource return undefined; } - if (this.rejectIfAborted(job)) { + if (job.signal?.aborted) { return undefined; } @@ -129,13 +129,11 @@ export class MemoryProvingQueue implements ServerCircuitProver, ProvingJobSource } this.jobsInProgress.delete(jobId); - - if (this.rejectIfAborted(job)) { - return Promise.resolve(); - } else { + if (!job.signal?.aborted) { job.resolve(result); - return Promise.resolve(); } + + return Promise.resolve(); } rejectProvingJob(jobId: string, err: any): Promise { @@ -151,7 +149,7 @@ export class MemoryProvingQueue implements ServerCircuitProver, ProvingJobSource this.jobsInProgress.delete(jobId); - if (this.rejectIfAborted(job)) { + if (job.signal?.aborted) { return Promise.resolve(); } @@ -191,7 +189,8 @@ export class MemoryProvingQueue implements ServerCircuitProver, ProvingJobSource const now = this.timeSource(); for (const job of this.jobsInProgress.values()) { - if (this.rejectIfAborted(job)) { + if (job.signal?.aborted) { + this.jobsInProgress.delete(job.id); continue; } @@ -387,14 +386,4 @@ export class MemoryProvingQueue implements ServerCircuitProver, ProvingJobSource verifyProof(): Promise { return Promise.reject('not implemented'); } - - private rejectIfAborted(job: ProvingJobWithResolvers): boolean { - if (job.signal?.aborted) { - this.log.debug(`Job ${job.id} type=${ProvingRequestType[job.request.type]} has been aborted`); - job.reject(new AbortError('Proving job has been aborted')); - return true; - } - - return false; - } }