Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add persisted database of proving jobs #9942

Merged
merged 1 commit into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 64 additions & 36 deletions yarn-project/prover-client/src/proving_broker/proving_broker.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {
ProvingRequestType,
type V2ProofOutput,
type V2ProvingJob,
type V2ProvingJobId,
makePublicInputsAndRecursiveProof,
Expand All @@ -13,35 +14,54 @@ import {
makeRootParityInputs,
} from '@aztec/circuits.js/testing';
import { randomBytes } from '@aztec/foundation/crypto';
import { AztecLmdbStore } from '@aztec/kv-store/lmdb';

import { jest } from '@jest/globals';

import { ProvingBroker } from './proving_broker.js';
import { InMemoryDatabase } from './proving_broker_database.js';
import { type ProvingJobDatabase } from './proving_job_database.js';
import { InMemoryDatabase } from './proving_job_database/memory.js';
import { PersistedProvingJobDatabase } from './proving_job_database/persisted.js';

beforeAll(() => {
jest.useFakeTimers();
});

describe('ProvingBroker', () => {
let database: InMemoryDatabase;
describe.each([
() => ({ database: new InMemoryDatabase(), cleanup: undefined }),
() => {
const store = AztecLmdbStore.open(undefined, true);
const database = new PersistedProvingJobDatabase(store);
const cleanup = () => store.close();
return { database, cleanup };
},
])('ProvingBroker', createDb => {
let broker: ProvingBroker;
let jobTimeoutSec: number;
let maxRetries: number;
let database: ProvingJobDatabase;
let cleanup: undefined | (() => Promise<void> | void);

const now = () => Math.floor(Date.now() / 1000);

beforeEach(() => {
jobTimeoutSec = 10;
maxRetries = 2;
database = new InMemoryDatabase();
({ database, cleanup } = createDb());

broker = new ProvingBroker(database, {
jobTimeoutSec: jobTimeoutSec,
timeoutIntervalSec: jobTimeoutSec / 4,
maxRetries,
});
});

afterEach(async () => {
if (cleanup) {
await cleanup();
}
});

describe('Producer API', () => {
beforeEach(async () => {
await broker.start();
Expand Down Expand Up @@ -909,10 +929,6 @@ describe('ProvingBroker', () => {
inputs: makePrivateBaseRollupInputs(),
});

expect(database.getProvingJob(id1)).not.toBeUndefined();
expect(database.getProvingJobResult(id1)).not.toBeUndefined();
expect(database.getProvingJob(id2)).not.toBeUndefined();

await broker.start();

await expect(broker.getProvingJobStatus(id1)).resolves.toEqual({
Expand All @@ -922,27 +938,31 @@ describe('ProvingBroker', () => {

await expect(broker.getProvingJobStatus(id2)).resolves.toEqual({ status: 'in-queue' });

jest.spyOn(database, 'deleteProvingJobAndResult');

await broker.removeAndCancelProvingJob(id1);
await broker.removeAndCancelProvingJob(id2);

expect(database.deleteProvingJobAndResult).toHaveBeenCalledWith(id1);
expect(database.deleteProvingJobAndResult).toHaveBeenCalledWith(id2);

await expect(broker.getProvingJobStatus(id1)).resolves.toEqual({ status: 'not-found' });
await expect(broker.getProvingJobStatus(id2)).resolves.toEqual({ status: 'not-found' });

expect(database.getProvingJob(id1)).toBeUndefined();
expect(database.getProvingJobResult(id1)).toBeUndefined();
expect(database.getProvingJob(id2)).toBeUndefined();
});

it('saves job when enqueued', async () => {
await broker.start();
const id = makeProvingJobId();
await broker.enqueueProvingJob({
id,
const job: V2ProvingJob = {
id: makeProvingJobId(),
type: ProvingRequestType.BASE_PARITY,
blockNumber: 1,
inputs: makeBaseParityInputs(),
});
expect(database.getProvingJob(id)).not.toBeUndefined();
};

jest.spyOn(database, 'addProvingJob');
await broker.enqueueProvingJob(job);

expect(database.addProvingJob).toHaveBeenCalledWith(job);
});

it('does not retain job if database fails to save', async () => {
Expand All @@ -963,23 +983,29 @@ describe('ProvingBroker', () => {

it('saves job result', async () => {
await broker.start();
const id = makeProvingJobId();
await broker.enqueueProvingJob({
id,

const job: V2ProvingJob = {
id: makeProvingJobId(),
type: ProvingRequestType.BASE_PARITY,
blockNumber: 1,
inputs: makeBaseParityInputs(),
});
await broker.reportProvingJobSuccess(id, {
};
jest.spyOn(database, 'setProvingJobResult');

await broker.enqueueProvingJob(job);

const result: V2ProofOutput = {
type: ProvingRequestType.BASE_PARITY,
value: makePublicInputsAndRecursiveProof(
makeParityPublicInputs(RECURSIVE_PROOF_LENGTH),
makeRecursiveProof(RECURSIVE_PROOF_LENGTH),
VerificationKeyData.makeFake(),
),
});
await assertJobStatus(id, 'resolved');
expect(database.getProvingJobResult(id)).toEqual({ value: expect.any(Object) });
};
await broker.reportProvingJobSuccess(job.id, result);

await assertJobStatus(job.id, 'resolved');
expect(database.setProvingJobResult).toHaveBeenCalledWith(job.id, result);
});

it('does not retain job result if database fails to save', async () => {
Expand All @@ -1003,22 +1029,25 @@ describe('ProvingBroker', () => {
}),
).rejects.toThrow(new Error('db error'));
await assertJobStatus(id, 'in-queue');
expect(database.getProvingJobResult(id)).toBeUndefined();
});

it('saves job error', async () => {
await broker.start();

const id = makeProvingJobId();
jest.spyOn(database, 'setProvingJobError');

await broker.enqueueProvingJob({
id,
type: ProvingRequestType.BASE_PARITY,
blockNumber: 1,
inputs: makeBaseParityInputs(),
});

const error = new Error('test error');
await broker.reportProvingJobError(id, error);
await assertJobStatus(id, 'rejected');
expect(database.getProvingJobResult(id)).toEqual({ error: String(error) });
expect(database.setProvingJobError).toHaveBeenCalledWith(id, error);
});

it('does not retain job error if database fails to save', async () => {
Expand All @@ -1033,15 +1062,14 @@ describe('ProvingBroker', () => {
});
await expect(broker.reportProvingJobError(id, new Error())).rejects.toThrow(new Error('db error'));
await assertJobStatus(id, 'in-queue');
expect(database.getProvingJobResult(id)).toBeUndefined();
});

it('does not save job result if job is unknown', async () => {
await broker.start();
const id = makeProvingJobId();

expect(database.getProvingJob(id)).toBeUndefined();
expect(database.getProvingJobResult(id)).toBeUndefined();
jest.spyOn(database, 'setProvingJobResult');
jest.spyOn(database, 'addProvingJob');

await broker.reportProvingJobSuccess(id, {
type: ProvingRequestType.BASE_PARITY,
Expand All @@ -1052,21 +1080,21 @@ describe('ProvingBroker', () => {
),
});

expect(database.getProvingJob(id)).toBeUndefined();
expect(database.getProvingJobResult(id)).toBeUndefined();
expect(database.setProvingJobResult).not.toHaveBeenCalled();
expect(database.addProvingJob).not.toHaveBeenCalled();
});

it('does not save job error if job is unknown', async () => {
await broker.start();
const id = makeProvingJobId();

expect(database.getProvingJob(id)).toBeUndefined();
expect(database.getProvingJobResult(id)).toBeUndefined();
jest.spyOn(database, 'setProvingJobError');
jest.spyOn(database, 'addProvingJob');

await broker.reportProvingJobError(id, new Error('test error'));

expect(database.getProvingJob(id)).toBeUndefined();
expect(database.getProvingJobResult(id)).toBeUndefined();
expect(database.setProvingJobError).not.toHaveBeenCalled();
expect(database.addProvingJob).not.toHaveBeenCalled();
});
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import { PriorityMemoryQueue } from '@aztec/foundation/queue';

import assert from 'assert';

import { type ProvingBrokerDatabase } from './proving_broker_database.js';
import type { ProvingJobConsumer, ProvingJobFilter, ProvingJobProducer } from './proving_broker_interface.js';
import { type ProvingJobDatabase } from './proving_job_database.js';

type InProgressMetadata = {
id: V2ProvingJobId;
Expand Down Expand Up @@ -71,7 +71,7 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer {
private maxRetries: number;

public constructor(
private database: ProvingBrokerDatabase,
private database: ProvingJobDatabase,
{ jobTimeoutSec = 30, timeoutIntervalSec = 10, maxRetries = 3 }: ProofRequestBrokerConfig = {},
private logger = createDebugLogger('aztec:prover-client:proof-request-broker'),
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import {
type V2ProvingJobResult,
} from '@aztec/circuit-types';

export interface ProvingBrokerDatabase {
/**
* A database for storing proof requests and their results
*/
export interface ProvingJobDatabase {
/**
* Saves a proof request so it can be retrieved later
* @param request - The proof request to save
Expand Down Expand Up @@ -39,43 +42,3 @@ export interface ProvingBrokerDatabase {
*/
setProvingJobError(id: V2ProvingJobId, err: Error): Promise<void>;
}

export class InMemoryDatabase implements ProvingBrokerDatabase {
private jobs = new Map<V2ProvingJobId, V2ProvingJob>();
private results = new Map<V2ProvingJobId, V2ProvingJobResult>();

getProvingJob(id: V2ProvingJobId): V2ProvingJob | undefined {
return this.jobs.get(id);
}

getProvingJobResult(id: V2ProvingJobId): V2ProvingJobResult | undefined {
return this.results.get(id);
}

addProvingJob(request: V2ProvingJob): Promise<void> {
this.jobs.set(request.id, request);
return Promise.resolve();
}

setProvingJobResult(id: V2ProvingJobId, value: V2ProofOutput): Promise<void> {
this.results.set(id, { value });
return Promise.resolve();
}

setProvingJobError(id: V2ProvingJobId, error: Error): Promise<void> {
this.results.set(id, { error: String(error) });
return Promise.resolve();
}

deleteProvingJobAndResult(id: V2ProvingJobId): Promise<void> {
this.jobs.delete(id);
this.results.delete(id);
return Promise.resolve();
}

*allProvingJobs(): Iterable<[V2ProvingJob, V2ProvingJobResult | undefined]> {
for (const item of this.jobs.values()) {
yield [item, this.results.get(item.id)] as const;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import type { V2ProofOutput, V2ProvingJob, V2ProvingJobId, V2ProvingJobResult } from '@aztec/circuit-types';

import { type ProvingJobDatabase } from '../proving_job_database.js';

export class InMemoryDatabase implements ProvingJobDatabase {
private jobs = new Map<V2ProvingJobId, V2ProvingJob>();
private results = new Map<V2ProvingJobId, V2ProvingJobResult>();

getProvingJob(id: V2ProvingJobId): V2ProvingJob | undefined {
return this.jobs.get(id);
}

getProvingJobResult(id: V2ProvingJobId): V2ProvingJobResult | undefined {
return this.results.get(id);
}

addProvingJob(request: V2ProvingJob): Promise<void> {
this.jobs.set(request.id, request);
return Promise.resolve();
}

setProvingJobResult(id: V2ProvingJobId, value: V2ProofOutput): Promise<void> {
this.results.set(id, { value });
return Promise.resolve();
}

setProvingJobError(id: V2ProvingJobId, error: Error): Promise<void> {
this.results.set(id, { error: String(error) });
return Promise.resolve();
}

deleteProvingJobAndResult(id: V2ProvingJobId): Promise<void> {
this.jobs.delete(id);
this.results.delete(id);
return Promise.resolve();
}

*allProvingJobs(): Iterable<[V2ProvingJob, V2ProvingJobResult | undefined]> {
for (const item of this.jobs.values()) {
yield [item, this.results.get(item.id)] as const;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { type V2ProofOutput, V2ProvingJob, type V2ProvingJobId, V2ProvingJobResult } from '@aztec/circuit-types';
import { type AztecKVStore, type AztecMap } from '@aztec/kv-store';

import { type ProvingJobDatabase } from '../proving_job_database.js';

export class PersistedProvingJobDatabase implements ProvingJobDatabase {
private jobs: AztecMap<V2ProvingJobId, string>;
private jobResults: AztecMap<V2ProvingJobId, string>;

constructor(private store: AztecKVStore) {
this.jobs = store.openMap('proving_jobs');
this.jobResults = store.openMap('proving_job_results');
}

async addProvingJob(job: V2ProvingJob): Promise<void> {
await this.jobs.set(job.id, JSON.stringify(job));
}

*allProvingJobs(): Iterable<[V2ProvingJob, V2ProvingJobResult | undefined]> {
for (const jobStr of this.jobs.values()) {
const job = V2ProvingJob.parse(JSON.parse(jobStr));
const resultStr = this.jobResults.get(job.id);
const result = resultStr ? V2ProvingJobResult.parse(JSON.parse(resultStr)) : undefined;
yield [job, result];
}
}

deleteProvingJobAndResult(id: V2ProvingJobId): Promise<void> {
return this.store.transaction(() => {
void this.jobs.delete(id);
void this.jobResults.delete(id);
});
}

async setProvingJobError(id: V2ProvingJobId, err: Error): Promise<void> {
const res: V2ProvingJobResult = { error: err.message };
await this.jobResults.set(id, JSON.stringify(res));
}

async setProvingJobResult(id: V2ProvingJobId, value: V2ProofOutput): Promise<void> {
const res: V2ProvingJobResult = { value };
await this.jobResults.set(id, JSON.stringify(res));
}
}