Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add abortPromise as a helper alongside abortSignal #512

Merged
merged 8 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ Read more:
output via logging (thanks @wineTGH).
- Fix race condition when multiple workers attempt to initialise the database at
the same time
- `helpers.abortSignal` is no longer typed as `| undefined`. It is still
experimental!
- `helpers.abortPromise` added; will reject when `abortSignal` aborts (useful
for `Promise.race()`)

## v0.16.6

Expand Down
30 changes: 22 additions & 8 deletions __tests__/getTasks.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ import { makeMockJob, withPgClient } from "./helpers";

const options: WorkerSharedOptions = {};

const neverAbortController = new AbortController();
const abortSignal = neverAbortController.signal;
const abortPromise = new Promise<void>((_, reject) => {
abortSignal.addEventListener("abort", reject);
});

describe("commonjs", () => {
test("gets tasks from folder", () =>
withPgClient(async (client) => {
Expand All @@ -32,7 +38,8 @@ Array [
withPgClient: makeEnhancedWithPgClient(
makeWithPgClientFromClient(client),
),
abortSignal: undefined,
abortSignal,
abortPromise,
},
);
expect(await tasks.wouldyoulike!(helpers.job.payload, helpers)).toEqual(
Expand Down Expand Up @@ -68,7 +75,8 @@ Array [
withPgClient: makeEnhancedWithPgClient(
makeWithPgClientFromClient(client),
),
abortSignal: undefined,
abortSignal,
abortPromise,
},
);
expect(await tasks.task1!(helpers.job.payload, helpers)).toEqual("hi");
Expand Down Expand Up @@ -98,7 +106,8 @@ Array [
withPgClient: makeEnhancedWithPgClient(
makeWithPgClientFromClient(client),
),
abortSignal: undefined,
abortSignal,
abortPromise,
},
);
expect(await tasks.task1!(helpers.job.payload, helpers)).toEqual("hi");
Expand Down Expand Up @@ -127,7 +136,8 @@ Array [
withPgClient: makeEnhancedWithPgClient(
makeWithPgClientFromClient(client),
),
abortSignal: undefined,
abortSignal,
abortPromise,
});
expect(await tasks.t1!(helpers.job.payload, helpers)).toEqual(
"come with me",
Expand Down Expand Up @@ -157,7 +167,8 @@ Array [
withPgClient: makeEnhancedWithPgClient(
makeWithPgClientFromClient(client),
),
abortSignal: undefined,
abortSignal,
abortPromise,
});
expect(await tasks.t1!(helpers.job.payload, helpers)).toEqual(
"come with me, TS",
Expand Down Expand Up @@ -191,7 +202,8 @@ Array [
withPgClient: makeEnhancedWithPgClient(
makeWithPgClientFromClient(client),
),
abortSignal: undefined,
abortSignal,
abortPromise,
},
);
expect(await tasks.wouldyoulike!(helpers.job.payload, helpers)).toEqual(
Expand Down Expand Up @@ -224,7 +236,8 @@ Array [
withPgClient: makeEnhancedWithPgClient(
makeWithPgClientFromClient(client),
),
abortSignal: undefined,
abortSignal,
abortPromise,
},
);
expect(await tasks.task1!(helpers.job.payload, helpers)).toEqual("hi");
Expand All @@ -251,7 +264,8 @@ Array [
withPgClient: makeEnhancedWithPgClient(
makeWithPgClientFromClient(client),
),
abortSignal: undefined,
abortSignal,
abortPromise,
});
expect(await tasks.t1!(helpers.job.payload, helpers)).toEqual(
"come with me",
Expand Down
28 changes: 19 additions & 9 deletions __tests__/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,16 @@ export async function withPgPool<T>(
cb: (pool: pg.Pool) => Promise<T>,
): Promise<T> {
const { TEST_CONNECTION_STRING } = databaseDetails!;
const pool = new pg.Pool({
const pgPool = new pg.Pool({
connectionString: TEST_CONNECTION_STRING,
max: 100,
});
pgPool.on("error", () => {});
pgPool.on("connect", () => {});
try {
return await cb(pool);
return await cb(pgPool);
} finally {
pool.end();
pgPool.end();
}
}

Expand Down Expand Up @@ -298,14 +300,22 @@ export function makeMockJob(taskIdentifier: string): Job {

export async function makeSelectionOfJobs(
utils: WorkerUtils,
pgClient: pg.PoolClient,
pgClient: pg.PoolClient | pg.Pool,
) {
const future = new Date(Date.now() + 60 * 60 * 1000);
const failedJob: DbJob = await utils.addJob("job3", { a: 1, runAt: future });
const regularJob1 = await utils.addJob("job3", { a: 2, runAt: future });
const lockedJob: DbJob = await utils.addJob("job3", { a: 3, runAt: future });
const regularJob2 = await utils.addJob("job3", { a: 4, runAt: future });
const untouchedJob = await utils.addJob("job3", { a: 5, runAt: future });
const failedJob: DbJob = await utils.addJob(
"job3",
{ a: 1 },
{ runAt: future },
);
const regularJob1 = await utils.addJob("job3", { a: 2 }, { runAt: future });
const lockedJob: DbJob = await utils.addJob(
"job3",
{ a: 3 },
{ runAt: future },
);
const regularJob2 = await utils.addJob("job3", { a: 4 }, { runAt: future });
const untouchedJob = await utils.addJob("job3", { a: 5 }, { runAt: future });
const {
rows: [lockedJobUpdate],
} = await pgClient.query<DbJob>(
Expand Down
35 changes: 34 additions & 1 deletion __tests__/main.runTaskList.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
import { Pool } from "pg";

import deferred, { Deferred } from "../src/deferred";
import { Task, TaskList, WorkerSharedOptions } from "../src/interfaces";
import { Job, Task, TaskList, WorkerSharedOptions } from "../src/interfaces";
import { runTaskList } from "../src/main";
import {
ESCAPED_GRAPHILE_WORKER_SCHEMA,
expectJobCount,
getJobs,
reset,
sleep,
sleepUntil,
Expand Down Expand Up @@ -100,3 +101,35 @@ test("doesn't bail on deprecated `debug` function", () =>
}
}
}));

test("gracefulShutdown", async () =>
withPgPool(async (pgPool) => {
let jobStarted = false;
const tasks: TaskList = {
job1(payload, helpers) {
jobStarted = true;
return Promise.race([sleep(100000, true), helpers.abortPromise]);
},
};
const workerPool = runTaskList(
{ concurrency: 3, gracefulShutdownAbortTimeout: 20, useNodeTime: true },
tasks,
pgPool,
);
await addJob(pgPool);
await sleepUntil(() => jobStarted);
await workerPool.gracefulShutdown();
await workerPool.promise;
let jobs: Job[] = [];
for (let attempts = 0; attempts < 10; attempts++) {
jobs = await getJobs(pgPool);
if (jobs[0]?.last_error) {
break;
} else {
await sleep(25 * attempts);
}
}
expect(jobs).toHaveLength(1);
const [job] = jobs;
expect(job.last_error).toBeTruthy();
}));
108 changes: 105 additions & 3 deletions __tests__/runner.runOnce.test.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
import { Pool } from "pg";

import { makeWorkerPresetWorkerOptions } from "../src/config";
import { RunnerOptions } from "../src/interfaces";
import { Job, RunnerOptions, WorkerUtils } from "../src/interfaces";
import { _allWorkerPools } from "../src/main";
import { WorkerPreset } from "../src/preset";
import { runOnce } from "../src/runner";
import { databaseDetails, withPgPool } from "./helpers";
import { makeWorkerUtils } from "../src/workerUtils";
import {
databaseDetails,
getJobs,
makeSelectionOfJobs,
reset,
sleep,
sleepUntil,
withPgPool,
} from "./helpers";

delete process.env.DATABASE_URL;
delete process.env.PGDATABASE;
Expand Down Expand Up @@ -83,10 +93,13 @@ test("at least a connectionString, a pgPool, the DATABASE_URL or PGDATABASE envv
});

test("connectionString and a pgPool cannot provided a the same time", async () => {
const pgPool = new Pool();
pgPool.on("error", () => {});
pgPool.on("connect", () => {});
const options: RunnerOptions = {
taskList: { task: () => {} },
connectionString: databaseDetails!.TEST_CONNECTION_STRING,
pgPool: new Pool(),
pgPool,
};
await runOnceErrorAssertion(
options,
Expand Down Expand Up @@ -141,3 +154,92 @@ test("providing just a pgPool is possible", async () =>
expect.assertions(0);
await runOnce(options);
}));

let utils: WorkerUtils | null = null;
afterEach(async () => {
await utils?.release();
utils = null;
});

test("runs all available tasks and then exits", async () =>
withPgPool(async (pgPool) => {
const options: RunnerOptions = {
taskList: { job1: () => {}, job2: () => {}, job3: () => {} },
pgPool: pgPool,
useNodeTime: true,
};
utils = await makeWorkerUtils(options);
await utils.addJob("job1", { id: "PRE_SELECTION_1" });
await utils.addJob("job2", { id: "PRE_SELECTION_2" });
await utils.addJob("job3", { id: "PRE_SELECTION_3" });
const unavailableJobs = Object.values(
await makeSelectionOfJobs(utils, pgPool),
);
await utils.addJob("job1", { id: "POST_SELECTION_1" });
await utils.addJob("job2", { id: "POST_SELECTION_2" });
await utils.addJob("job3", { id: "POST_SELECTION_3" });
{
const jobs = await getJobs(pgPool);
expect(jobs).toHaveLength(unavailableJobs.length + 6);
}
await runOnce(options);
{
const unavailableJobIds = unavailableJobs.map((j) => j.id);
let jobs!: Job[];
for (let attempts = 0; attempts < 10; attempts++) {
jobs = await getJobs(pgPool);
if (jobs.length === unavailableJobs.length) {
break;
} else {
await sleep(attempts * 50);
}
}
expect(jobs).toHaveLength(unavailableJobs.length);
expect(
jobs.filter((j) => !unavailableJobIds.includes(j.id)),
).toHaveLength(0);
}
}));

test("gracefulShutdown", async () =>
withPgPool(async (pgPool) => {
let jobStarted = false;
const options: RunnerOptions = {
taskList: {
job1(payload, helpers) {
jobStarted = true;
return Promise.race([sleep(100000, true), helpers.abortPromise]);
},
},
pgPool,
preset: {
worker: {
gracefulShutdownAbortTimeout: 20,
useNodeTime: true,
},
},
};
await reset(pgPool, options);
utils = await makeWorkerUtils(options);
await utils.addJob("job1", { id: "test sleep" });
expect(_allWorkerPools).toHaveLength(0);
const promise = runOnce(options);
await sleepUntil(() => _allWorkerPools.length === 1);
expect(_allWorkerPools).toHaveLength(1);
const pool = _allWorkerPools[0];
await sleepUntil(() => jobStarted);
await pool.gracefulShutdown();
await promise;
let jobs: Job[] = [];
for (let attempts = 0; attempts < 10; attempts++) {
jobs = await getJobs(pgPool);
if (jobs[0]?.last_error) {
break;
} else {
await sleep(25 * attempts);
}
}
expect(jobs).toHaveLength(1);
const [job] = jobs;
expect(job.last_error).toBeTruthy();
}));
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"prettier:check": "prettier --cache --ignore-path .eslintignore --check '**/*.{js,jsx,ts,tsx,graphql,md,json}'",
"test": "yarn prepack && yarn depcheck && yarn test:setupdb && yarn test:only",
"test:setupdb": "./scripts/setup_template_db.sh",
"test:only": "node --experimental-vm-modules node_modules/.bin/jest",
"test:only": "NO_LOG_SUCCESS=1 node --experimental-vm-modules node_modules/.bin/jest",
"depcheck": "depcheck --ignores='graphile-worker,faktory-worker,@google-cloud/tasks,bullmq,jest-environment-node,@docusaurus/*,@fortawesome/*,@mdx-js/*,@types/jest,clsx,eslint_d,graphile,juice,postcss-nested,prism-react-renderer,react,react-dom,svgo,ts-node,@types/debug,tslib'",
"db:dump": "./scripts/dump_db",
"perfTest": "cd perfTest && node ./run.js",
Expand Down Expand Up @@ -84,7 +84,7 @@
"eslint_d": "^13.0.0",
"graphile": "^5.0.0-beta.16",
"jest": "^26.0.0",
"jest-time-helpers": "0.1.0",
"jest-time-helpers": "0.1.1",
"juice": "5.2.0",
"pg-connection-string": "^2.6.2",
"postcss-nested": "^6.0.1",
Expand Down
5 changes: 4 additions & 1 deletion src/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,12 @@ export function makeJobHelpers(
{
withPgClient,
abortSignal,
abortPromise,
logger: overrideLogger,
}: {
withPgClient: EnhancedWithPgClient;
abortSignal: AbortSignal | undefined;
abortSignal: AbortSignal;
abortPromise: Promise<void>;
logger?: Logger;
},
): JobHelpers {
Expand All @@ -240,6 +242,7 @@ export function makeJobHelpers(
});
const helpers: JobHelpers = {
abortSignal,
abortPromise,
job,
getQueueName(queueId = job.job_queue_id) {
return getQueueName(compiledSharedOptions, withPgClient, queueId);
Expand Down
Loading
Loading