Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/forbidden flags #128

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 162 additions & 0 deletions __tests__/forbiddenFlags.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
import {
makeWorkerUtils,
runTaskListOnce,
Task,
WorkerSharedOptions,
} from "../src/index";
import {
ESCAPED_GRAPHILE_WORKER_SCHEMA,
reset,
TEST_CONNECTION_STRING,
withPgClient,
withPgPool,
} from "./helpers";

const options: WorkerSharedOptions = {};

test("supports the flags API", () =>
withPgClient(async (pgClient) => {
await reset(pgClient, options);

// Schedule a job
const utils = await makeWorkerUtils({
connectionString: TEST_CONNECTION_STRING,
});
await utils.addJob("job1", { a: 1 }, { flags: ["a", "b"] });
await utils.release();

// Assert that it has an entry in jobs / job_queues
const { rows: jobs } = await pgClient.query(
`select * from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}.jobs`,
);
expect(jobs).toHaveLength(1);
expect(jobs[0]).toHaveProperty("flags");
expect(jobs[0].flags).toHaveLength(2);

const task: Task = jest.fn();
const taskList = { task };
await runTaskListOnce(options, taskList, pgClient);
}));

test("get_job skips forbidden flags with string[] arg", () =>
withPgPool(async (pgPool) => {
await reset(pgPool, options);

const badFlag = "d";

const shouldRun = jest.fn();
const shouldSkip = jest.fn();

const job: Task = async (_payload, helpers) => {
const flags = helpers.job.flags || [];

if (flags.includes(badFlag)) {
shouldSkip();
} else {
shouldRun();
}
};

// Schedule a job
const utils = await makeWorkerUtils({ pgPool });

await utils.addJob("flag-test", { a: 1 }, { flags: ["a", "b"] });
await utils.addJob("flag-test", { a: 1 }, { flags: ["c", "d"] });
await utils.release();

// Assert that it has an entry in jobs / job_queues
const pgClient = await pgPool.connect();

await runTaskListOnce(
{ forbiddenFlags: ["d"] },
{ "flag-test": job },
pgClient,
);

await pgClient.release();

expect(shouldRun).toHaveBeenCalled();
expect(shouldSkip).not.toHaveBeenCalled();
}));

test("get_job skips forbidden flags with () => string[] arg", () =>
withPgPool(async (pgPool) => {
await reset(pgPool, options);

const badFlag = "d";

const shouldRun = jest.fn();
const shouldSkip = jest.fn();

const job: Task = async (_payload, helpers) => {
const flags = helpers.job.flags || [];

if (flags.includes(badFlag)) {
shouldSkip();
} else {
shouldRun();
}
};

// Schedule a job
const utils = await makeWorkerUtils({ pgPool });

await utils.addJob("flag-test", { a: 1 }, { flags: ["a", "b"] });
await utils.addJob("flag-test", { a: 1 }, { flags: ["c", "d"] });
await utils.release();

// Assert that it has an entry in jobs / job_queues
const pgClient = await pgPool.connect();

await runTaskListOnce(
{ forbiddenFlags: () => ["d"] },
{ "flag-test": job },
pgClient,
);

await pgClient.release();

expect(shouldRun).toHaveBeenCalled();
expect(shouldSkip).not.toHaveBeenCalled();
}));

test("get_job skips forbidden flags with () => Promise<string[]> arg", () =>
withPgPool(async (pgPool) => {
await reset(pgPool, options);

const badFlag = "d";

const shouldRun = jest.fn();
const shouldSkip = jest.fn();

const job: Task = async (_payload, helpers) => {
const flags = helpers.job.flags || [];

if (flags.includes(badFlag)) {
shouldSkip();
} else {
shouldRun();
}
};

// Schedule a job
const utils = await makeWorkerUtils({ pgPool });

await utils.addJob("flag-test", { a: 1 }, { flags: ["a", "b"] });
await utils.addJob("flag-test", { a: 1 }, { flags: ["c", "d"] });
await utils.release();

// Assert that it has an entry in jobs / job_queues
const pgClient = await pgPool.connect();

await runTaskListOnce(
{ forbiddenFlags: async () => ["d"] },
{ "flag-test": job },
pgClient,
);

await pgClient.release();

expect(shouldRun).toHaveBeenCalled();
expect(shouldSkip).not.toHaveBeenCalled();
}));
1 change: 1 addition & 0 deletions __tests__/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ export function makeMockJob(taskIdentifier: string): Job {
locked_at: null,
locked_by: null,
key: null,
flags: null,
};
}

Expand Down
2 changes: 1 addition & 1 deletion __tests__/migrate.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ test("migration installs schema; second migration does no harm", async () => {
const { rows: migrationRows } = await pgClient.query(
`select * from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}.migrations`,
);
expect(migrationRows).toHaveLength(4);
expect(migrationRows).toHaveLength(5);
const migration = migrationRows[0];
expect(migration.id).toEqual(1);

Expand Down
178 changes: 178 additions & 0 deletions sql/000005.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
alter table :GRAPHILE_WORKER_SCHEMA.jobs add column revision int default 0 not null;
alter table :GRAPHILE_WORKER_SCHEMA.jobs add column flags text[] default null;

drop function :GRAPHILE_WORKER_SCHEMA.add_job;
create function :GRAPHILE_WORKER_SCHEMA.add_job(
identifier text,
payload json = null,
queue_name text = null,
run_at timestamptz = null,
max_attempts int = null,
job_key text = null,
priority int = null,
flags text[] = null
) returns :GRAPHILE_WORKER_SCHEMA.jobs as $$
declare
v_job :GRAPHILE_WORKER_SCHEMA.jobs;
begin
-- Apply rationality checks
if length(identifier) > 128 then
raise exception 'Task identifier is too long (max length: 128).' using errcode = 'GWBID';
end if;
if queue_name is not null and length(queue_name) > 128 then
raise exception 'Job queue name is too long (max length: 128).' using errcode = 'GWBQN';
end if;
if job_key is not null and length(job_key) > 512 then
raise exception 'Job key is too long (max length: 512).' using errcode = 'GWBJK';
end if;
if max_attempts < 1 then
raise exception 'Job maximum attempts must be at least 1' using errcode = 'GWBMA';
end if;

if job_key is not null then
-- Upsert job
insert into :GRAPHILE_WORKER_SCHEMA.jobs (
task_identifier,
payload,
queue_name,
run_at,
max_attempts,
key,
priority,
flags
)
values(
identifier,
coalesce(payload, '{}'::json),
queue_name,
coalesce(run_at, now()),
coalesce(max_attempts, 25),
job_key,
coalesce(priority, 0),
flags
)
on conflict (key) do update set
task_identifier=excluded.task_identifier,
payload=excluded.payload,
queue_name=excluded.queue_name,
max_attempts=excluded.max_attempts,
run_at=excluded.run_at,
priority=excluded.priority,
revision=jobs.revision + 1,
flags=excluded.flags,
-- always reset error/retry state
attempts=0,
last_error=null
where jobs.locked_at is null
returning *
into v_job;

-- If upsert succeeded (insert or update), return early
if not (v_job is null) then
return v_job;
end if;

-- Upsert failed -> there must be an existing job that is locked. Remove
-- existing key to allow a new one to be inserted, and prevent any
-- subsequent retries by bumping attempts to the max allowed.
update :GRAPHILE_WORKER_SCHEMA.jobs
set
key = null,
attempts = jobs.max_attempts
where key = job_key;
end if;

-- insert the new job. Assume no conflicts due to the update above
insert into :GRAPHILE_WORKER_SCHEMA.jobs(
task_identifier,
payload,
queue_name,
run_at,
max_attempts,
key,
priority,
flags
)
values(
identifier,
coalesce(payload, '{}'::json),
queue_name,
coalesce(run_at, now()),
coalesce(max_attempts, 25),
job_key,
coalesce(priority, 0),
flags
)
returning *
into v_job;

return v_job;
end;
$$ language plpgsql volatile;

drop function :GRAPHILE_WORKER_SCHEMA.get_job;

create function :GRAPHILE_WORKER_SCHEMA.get_job(
worker_id text,
task_identifiers text[] = null,
job_expiry interval = interval '4 hours',
forbidden_flags text[] = null
) returns :GRAPHILE_WORKER_SCHEMA.jobs as $$
declare
v_job_id bigint;
v_queue_name text;
v_row :GRAPHILE_WORKER_SCHEMA.jobs;
v_now timestamptz = now();
begin
if worker_id is null or length(worker_id) < 10 then
raise exception 'invalid worker id';
end if;

select jobs.queue_name, jobs.id into v_queue_name, v_job_id
from :GRAPHILE_WORKER_SCHEMA.jobs
where (jobs.locked_at is null or jobs.locked_at < (v_now - job_expiry))
and (
jobs.queue_name is null
or
exists (
select 1
from :GRAPHILE_WORKER_SCHEMA.job_queues
where job_queues.queue_name = jobs.queue_name
and (job_queues.locked_at is null or job_queues.locked_at < (v_now - job_expiry))
for update
skip locked
)
)
and run_at <= v_now
and attempts < max_attempts
and (task_identifiers is null or task_identifier = any(task_identifiers))
and (forbidden_flags is null or not (forbidden_flags && flags))
order by priority asc, run_at asc, id asc
limit 1
for update
skip locked;

if v_job_id is null then
return null;
end if;

if v_queue_name is not null then
update :GRAPHILE_WORKER_SCHEMA.job_queues
set
locked_by = worker_id,
locked_at = v_now
where job_queues.queue_name = v_queue_name;
end if;

update :GRAPHILE_WORKER_SCHEMA.jobs
set
attempts = attempts + 1,
locked_by = worker_id,
locked_at = v_now
where id = v_job_id
returning * into v_row;

return v_row;
end;
$$ language plpgsql volatile;

4 changes: 3 additions & 1 deletion src/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ export function makeAddJob(
run_at => $4::timestamptz,
max_attempts => $5::int,
job_key => $6::text,
priority => $7::int
priority => $7::int,
flags => $8::text[]
);
`,
[
Expand All @@ -38,6 +39,7 @@ export function makeAddJob(
spec.maxAttempts || null,
spec.jobKey || null,
spec.priority || null,
spec.flags || null,
],
);
const job: Job = rows[0];
Expand Down
Loading