diff --git a/README.md b/README.md index 35329481..d37a4ead 100644 --- a/README.md +++ b/README.md @@ -174,6 +174,7 @@ https://graphile.org/support/ - Adding jobs to same named queue runs them in series - Automatically re-attempts failed jobs with exponential back-off - Customisable retry count (default: 25 attempts over ~3 days) +- Crontab-like scheduling feature for recurring tasks (with optional backfill) - Task de-duplication via unique `job_key` - Flexible runtime controls that can be used for complex rate limiting (e.g. via (graphile-worker-rate-limiter)[https://github.com/politics-rewired/graphile-worker-rate-limiter]) @@ -1146,6 +1147,201 @@ This method can be used to postpone or advance job execution, or to schedule a previously failed or permanently failed job for execution. The updated jobs will be returned (note that this may be fewer jobs than you requested). +## Recurring tasks (crontab) + +**Stability: _experimental_**; we may make breaking changes to this +functionality in a minor release, so pay close attention to the changelog when +upgrading. + +Graphile Worker supports triggering recurring tasks according to a cron-like +schedule. This is designed for recurring tasks such as sending a weekly email, +running database maintenance tasks every day, performing data roll-ups hourly, +downloading external data every 20 minutes, etc. + +Graphile Worker's crontab support: + +- guarantees (thanks to ACID-compliant transactions) that no duplicate task + schedules will occur +- can backfill missed jobs if desired (e.g. if the Worker wasn't running when + the job was due to be scheduled) +- schedules tasks using Graphile Worker's regular job queue, so you get all the + regular features such as exponential back-off on failure. + +**NOTE**: It is not intended that you add recurring tasks for each of your +individual application users, instead you should have relatively few recurring +tasks, and those tasks can create additional jobs for the individual users (or +process multiple users) if necessary. + +Tasks are by default read from a `crontab` file next to the `tasks/` folder (but +this is configurable in library mode). Please note that our syntax is not 100% +compatible with cron's, and our task payload differs. We only handle timestamps +in UTC. The following diagram details the parts of a Graphile Worker crontab +schedule: + +```crontab +# ┌───────────── UTC minute (0 - 59) +# │ ┌───────────── UTC hour (0 - 23) +# │ │ ┌───────────── UTC day of the month (1 - 31) +# │ │ │ ┌───────────── UTC month (1 - 12) +# │ │ │ │ ┌───────────── UTC day of the week (0 - 6) (Sunday to Saturday) +# │ │ │ │ │ ┌───────────── task (identifier) to schedule +# │ │ │ │ │ │ ┌────────── optional scheduling options +# │ │ │ │ │ │ │ ┌────── optional payload to merge +# │ │ │ │ │ │ │ │ +# │ │ │ │ │ │ │ │ +# * * * * * task ?opts {payload} +``` + +Comment lines start with a `#`. + +For the first 5 fields we support an explicit numeric value, `*` to represent +all valid values, `*/n` (where `n` is a positive integer) to represent all valid +values divisible by `n`, range syntax such as `1-5`, and any combination of +these separated by commas. + +The task identifier should match the following regexp +`/^[_a-zA-Z][_a-zA-Z0-9:_-]*$/` (namely it should start with an alphabetic +character and it should only contain alphanumeric characters, colon, underscore +and hyphen). It should be the name of one of your Graphile Worker tasks. + +The `opts` must always be prefixed with a `?` if provided and details +configuration for the task such as what should be done in the event that the +previous event was not scheduled (e.g. because the Worker wasn't running). +Options are specified using HTTP query string syntax (with `&` separator). + +Currently we support the following `opts`: + +- `id=UID` where UID is a unique alphanumeric case-sensitive identifier starting + with a letter - specify an identifier for this crontab entry; by default this + will use the task identifier, but if you want more than one schedule for the + same task (e.g. with different payload, or different times) then you will need + to supply a unique identifier explicitly. +- `fill=t` where `t` is a "time phrase" (see below) - backfill any entries from + the last time period `t`, for example if the worker was not running when they + were due to be executed (by default, no backfilling). +- `max=n` where `n` is a small positive integer - override the `max_attempts` of + the job. +- `queue=name` where `name` is an alphanumeric queue name - add the job to a + named queue so it executes serially. +- `priority=n` where `n` is a relatively small integer - override the priority + of the job. + +**NOTE**: changing the identifier (e.g. via `id`) can result in duplicate +executions, so we recommend that you explicitly set it and never change it. + +**NOTE**: using `fill` will not backfill new tasks, only tasks that were +previously known. + +**NOTE**: the higher you set the `fill` parameter, the longer the worker startup +time will be; when used you should set it to be slightly larger than the longest +period of downtime you expect for your worker. + +Time phrases are comprised of a sequence of number-letter combinations, where +the number represents a quantity and the letter represents a time period, e.g. +`5d` for `five days`, or `3h` for `three hours`; e.g. `4w3d2h1m` represents +`4 weeks, 3 days, 2 hours and 1 minute` (i.e. a period of 44761 minutes). The +following time periods are supported: + +- `s` - one second (1000 milliseconds) +- `m` - one minute (60 seconds) +- `h` - one hour (60 minutes) +- `d` - one day (24 hours) +- `w` - one week (7 days) + +The `payload` is a JSON5 object; it must start with a `{`, must not contain +newlines or carriage returns (`\n` or `\r`), and must not contain trailing +whitespace. It will be merged into the default crontab payload properties. + +Each crontab job will have a JSON object payload containing the key `_cron` with +the value being an object with the following entries: + +- `ts` - ISO8601 timestamp representing when this job was due to execute +- `backfilled` - true if the task was "backfilled" (i.e. it wasn't scheduled on + time), false otherwise + +### Crontab examples + +The following schedules the `send_weekly_email` task at 4:30am (UTC) every +Monday: + +``` +30 4 * * 1 send_weekly_email +``` + +The following does similar, but also will backfill any tasks over the last two +days (`2d`), sets max attempts to `10` and merges in `{"onboarding": false}` +into the task payload: + +``` +30 4 * * 1 send_weekly_email ?fill=2d&max=10 {onboarding:false} +``` + +The following triggers the `rollup` task every 4 hours on the hour: + +``` +0 */4 * * * rollup +``` + +### Limiting backfill + +When you ask Graphile Worker to backfill jobs, it will do so for all jobs +matching that specification that should have been scheduled over the backfill +period. Other than the period itself, you cannot place limits on the backfilling +(for example, you cannot say "backfill at most one job" or "only backfill if the +next job isn't due within the next 3 hours"); this is because we've determined +that there's many situations (back-off, overloaded worker, serially executed +jobs, etc.) in which the result of this behaviour might result in outcomes that +the user did not expect. + +If you need these kinds of constraints on backfilled jobs, you should implement +them _at runtime_ (rather than at scheduling time) in the task executor itself, +which could use the `payload._cron.ts` property to determine whether execution +should continue or not. + +### Specifying cron items in library mode + +You've three options for specifying cron tasks in library mode: + +1. `crontab`: a crontab string (like the contents of a crontab file) +2. `crontabFile`: the (string) path to a crontab file, from which to read the + rules +3. `parsedCronItems`: explicit parsed cron items (see below) + +#### parsedCronItems + +The Graphile Worker internal format for cron items lists all the matching +minutes/hours/etc uniquely and in numerically ascending order. It also has other +requirements and is to be treated as an opaque type, so you must not construct +this value manually. + +Instead, you may specify the parsedCronItems using one of the helper functions: + +1. `parseCrontab`: pass a crontab string and it will be converted into a list of + `ParsedCronItem`s +2. `parseCronItems`: pass a list of `CronItem`s and it will be converted into a + list of `ParsedCronItem`s + +The `CronItem` type is designed to be written by humans (and their scripts) and +has the following properties: + +- `task` (required): the string identifier of the task that should be executed + (same as the first argument to `add_job`) +- `pattern` (required): a cron pattern (e.g. `* * * * *`) describing when to run + this task +- `options`: optional options influencing backfilling, etc + - `backfillPeriod`: how long (in milliseconds) to backfill (see above) + - `maxAttempts`: the maximum number of attempts we'll give the job + - `queueName`: if you want the job to run serially, you can add it to a named + queue + - `priority`: optionally override the priority of the job +- `payload`: an optional payload object to merge into the generated payload for + the job +- `identifier`: an optional string to give this cron item a permanent + identifier; if not given we will use the `task`. This is particularly useful + if you want to schedule the same task multiple times, perhaps on different + time patterns or with different payloads or other options (since every cron + item must have a unique identifier). + ## Forbidden flags When a job is created (or updated via `job_key`), you may set its `flags` to a diff --git a/__tests__/__snapshots__/crontab.test.ts.snap b/__tests__/__snapshots__/crontab.test.ts.snap new file mode 100644 index 00000000..6e5f394f --- /dev/null +++ b/__tests__/__snapshots__/crontab.test.ts.snap @@ -0,0 +1,632 @@ +// Jest Snapshot v1, https://goo.gl/fbAQLP + +exports[`parses crontab file correctly 1`] = ` +Array [ + Object { + "dates": Array [ + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 10, + 11, + 12, + 13, + 14, + 15, + 16, + 17, + 18, + 19, + 20, + 21, + 22, + 23, + 24, + 25, + 26, + 27, + 28, + 29, + 30, + 31, + ], + "dows": Array [ + 0, + 1, + 2, + 3, + 4, + 5, + 6, + ], + "hours": Array [ + 0, + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 10, + 11, + 12, + 13, + 14, + 15, + 16, + 17, + 18, + 19, + 20, + 21, + 22, + 23, + ], + "identifier": "simple", + "minutes": Array [ + 0, + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 10, + 11, + 12, + 13, + 14, + 15, + 16, + 17, + 18, + 19, + 20, + 21, + 22, + 23, + 24, + 25, + 26, + 27, + 28, + 29, + 30, + 31, + 32, + 33, + 34, + 35, + 36, + 37, + 38, + 39, + 40, + 41, + 42, + 43, + 44, + 45, + 46, + 47, + 48, + 49, + 50, + 51, + 52, + 53, + 54, + 55, + 56, + 57, + 58, + 59, + ], + "months": Array [ + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 10, + 11, + 12, + ], + "options": Object { + "backfillPeriod": 0, + "maxAttempts": undefined, + "priority": undefined, + "queueName": undefined, + }, + "payload": null, + "task": "simple", + }, + Object { + "dates": Array [ + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 10, + 11, + 12, + 13, + 14, + 15, + 16, + 17, + 18, + 19, + 20, + 21, + 22, + 23, + 24, + 25, + 26, + 27, + 28, + 29, + 30, + 31, + ], + "dows": Array [ + 0, + 1, + 2, + 3, + 4, + 5, + 6, + ], + "hours": Array [ + 4, + ], + "identifier": "every_day_at_4_am", + "minutes": Array [ + 0, + ], + "months": Array [ + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 10, + 11, + 12, + ], + "options": Object { + "backfillPeriod": 0, + "maxAttempts": undefined, + "priority": undefined, + "queueName": undefined, + }, + "payload": null, + "task": "every_day_at_4_am", + }, + Object { + "dates": Array [ + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 10, + 11, + 12, + 13, + 14, + 15, + 16, + 17, + 18, + 19, + 20, + 21, + 22, + 23, + 24, + 25, + 26, + 27, + 28, + 29, + 30, + 31, + ], + "dows": Array [ + 0, + ], + "hours": Array [ + 4, + ], + "identifier": "every_sunday_at_4_am", + "minutes": Array [ + 0, + ], + "months": Array [ + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 10, + 11, + 12, + ], + "options": Object { + "backfillPeriod": 0, + "maxAttempts": undefined, + "priority": undefined, + "queueName": undefined, + }, + "payload": null, + "task": "every_sunday_at_4_am", + }, + Object { + "dates": Array [ + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 10, + 11, + 12, + 13, + 14, + 15, + 16, + 17, + 18, + 19, + 20, + 21, + 22, + 23, + 24, + 25, + 26, + 27, + 28, + 29, + 30, + 31, + ], + "dows": Array [ + 0, + ], + "hours": Array [ + 4, + ], + "identifier": "sunday_7", + "minutes": Array [ + 0, + ], + "months": Array [ + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 10, + 11, + 12, + ], + "options": Object { + "backfillPeriod": 0, + "maxAttempts": undefined, + "priority": undefined, + "queueName": undefined, + }, + "payload": null, + "task": "every_sunday_at_4_am", + }, + Object { + "dates": Array [ + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 10, + 11, + 12, + 13, + 14, + 15, + 16, + 17, + 18, + 19, + 20, + 21, + 22, + 23, + 24, + 25, + 26, + 27, + 28, + 29, + 30, + 31, + ], + "dows": Array [ + 2, + ], + "hours": Array [ + 4, + ], + "identifier": "every_tuesday_at_4_am", + "minutes": Array [ + 0, + ], + "months": Array [ + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 10, + 11, + 12, + ], + "options": Object { + "backfillPeriod": 0, + "maxAttempts": undefined, + "priority": undefined, + "queueName": undefined, + }, + "payload": Object { + "isTuesday": true, + }, + "task": "every_tuesday_at_4_am", + }, + Object { + "dates": Array [ + 1, + ], + "dows": Array [ + 1, + ], + "hours": Array [ + 1, + ], + "identifier": "stuff", + "minutes": Array [ + 0, + 7, + 10, + 20, + 30, + 40, + 50, + 56, + 57, + 58, + 59, + ], + "months": Array [ + 1, + ], + "options": Object { + "backfillPeriod": 2685660000, + "maxAttempts": 3, + "priority": 3, + "queueName": "my_queue", + }, + "payload": Object { + "myExtraPayload": Object { + "stuff": "here with # hash char", + }, + }, + "task": "one", + }, + Object { + "dates": Array [ + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 10, + 11, + 12, + 13, + 14, + 15, + 16, + 17, + 18, + 19, + 20, + 21, + 22, + 23, + 24, + 25, + 26, + 27, + 28, + 29, + 30, + 31, + ], + "dows": Array [ + 0, + 1, + 2, + 3, + 4, + 5, + 6, + ], + "hours": Array [ + 0, + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 10, + 11, + 12, + 13, + 14, + 15, + 16, + 17, + 18, + 19, + 20, + 21, + 22, + 23, + ], + "identifier": "lots_of_spaces", + "minutes": Array [ + 0, + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 10, + 11, + 12, + 13, + 14, + 15, + 16, + 17, + 18, + 19, + 20, + 21, + 22, + 23, + 24, + 25, + 26, + 27, + 28, + 29, + 30, + 31, + 32, + 33, + 34, + 35, + 36, + 37, + 38, + 39, + 40, + 41, + 42, + 43, + 44, + 45, + 46, + 47, + 48, + 49, + 50, + 51, + 52, + 53, + 54, + 55, + 56, + 57, + 58, + 59, + ], + "months": Array [ + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 10, + 11, + 12, + ], + "options": Object { + "backfillPeriod": 0, + "maxAttempts": undefined, + "priority": undefined, + "queueName": undefined, + }, + "payload": null, + "task": "lots_of_spaces", + }, +] +`; diff --git a/__tests__/cron-timing.test.ts b/__tests__/cron-timing.test.ts new file mode 100644 index 00000000..a02314f3 --- /dev/null +++ b/__tests__/cron-timing.test.ts @@ -0,0 +1,141 @@ +import { run, Runner } from "../src"; +import { + EventMonitor, + getJobs, + HOUR, + MINUTE, + reset, + SECOND, + setupFakeTimers, + sleep, + withOptions, +} from "./helpers"; + +const { setTime } = setupFakeTimers(); +const REFERENCE_TIMESTAMP = 1609459200000; /* 1st January 2021, 00:00:00 UTC */ + +// Even on test fail we need the runner to shut down, so clean up after each test (rather than during). +let runner: null | Runner = null; +afterEach(() => { + if (runner) { + const promise = runner.stop(); + runner = null; + return promise; + } +}); + +test("check timestamp is correct", () => { + setTime(REFERENCE_TIMESTAMP); + expect(new Date().toISOString()).toMatch(/^2021-01-01T00:00:0.*Z$/); +}); + +test("executes job when expected", () => + withOptions(async (options) => { + await setTime(REFERENCE_TIMESTAMP + HOUR); // 1am + const { pgPool } = options; + await reset(pgPool, options); + const eventMonitor = new EventMonitor(); + const cronFinishedBackfilling = eventMonitor.awaitNext("cron:started"); + const poolReady = eventMonitor.awaitNext("pool:listen:success"); + const cronScheduleCalls = eventMonitor.count("cron:schedule"); + runner = await run({ + ...options, + crontab: `0 */4 * * * my_task`, + events: eventMonitor.events, + }); + await cronFinishedBackfilling; + await poolReady; + expect(cronScheduleCalls.count).toEqual(0); + + await setTime(REFERENCE_TIMESTAMP + 3 * HOUR + 1 * SECOND); // 3:00:01am + expect(cronScheduleCalls.count).toEqual(0); + + const cronScheduleComplete = eventMonitor.awaitNext("cron:scheduled"); + await setTime(REFERENCE_TIMESTAMP + 4 * HOUR + 1 * SECOND); // 4:00:01am + expect(cronScheduleCalls.count).toEqual(1); + + const { timestamp, jobsAndIdentifiers } = cronScheduleCalls.lastEvent!; + expect(timestamp).toEqual(REFERENCE_TIMESTAMP + 4 * HOUR); + expect(jobsAndIdentifiers).toHaveLength(1); + expect(jobsAndIdentifiers[0].job.task).toEqual("my_task"); + + // After this, the jobs should exist in the DB + await cronScheduleComplete; + await sleep(50); + + { + const jobs = await getJobs(pgPool); + expect(jobs).toHaveLength(1); + } + })); + +test("doesn't schedule tasks twice when system clock reverses", () => + withOptions(async (options) => { + await setTime(REFERENCE_TIMESTAMP + HOUR); // 1am + const { pgPool } = options; + await reset(pgPool, options); + const eventMonitor = new EventMonitor(); + const cronFinishedBackfilling = eventMonitor.awaitNext("cron:started"); + const poolReady = eventMonitor.awaitNext("pool:listen:success"); + const cronScheduleCalls = eventMonitor.count("cron:schedule"); + runner = await run({ + ...options, + crontab: `0 */4 * * * my_task`, + events: eventMonitor.events, + }); + await cronFinishedBackfilling; + await poolReady; + expect(cronScheduleCalls.count).toEqual(0); + + await setTime(REFERENCE_TIMESTAMP + 3 * HOUR + 1 * SECOND); // 3:00:01am + expect(cronScheduleCalls.count).toEqual(0); + + await setTime(REFERENCE_TIMESTAMP + 4 * HOUR + 1 * SECOND); // 4:00:01am + expect(cronScheduleCalls.count).toEqual(1); + + // REWIND TIME! + await setTime(REFERENCE_TIMESTAMP + 3 * HOUR + 1 * SECOND); + // Advance time again + await setTime(REFERENCE_TIMESTAMP + 4 * HOUR + 1 * SECOND); + // Although the time was matched again, no tasks should have been scheduled + expect(cronScheduleCalls.count).toEqual(1); + })); + +test("clock skew doesn't prevent task from being scheduled at the right time", () => + withOptions(async (options) => { + await setTime(REFERENCE_TIMESTAMP + HOUR); // 1am + const { pgPool } = options; + await reset(pgPool, options); + const eventMonitor = new EventMonitor(); + const cronFinishedBackfilling = eventMonitor.awaitNext("cron:started"); + const poolReady = eventMonitor.awaitNext("pool:listen:success"); + const cronScheduleCalls = eventMonitor.count("cron:schedule"); + runner = await run({ + ...options, + crontab: `0 */4 * * * my_task`, + events: eventMonitor.events, + }); + await cronFinishedBackfilling; + await poolReady; + expect(cronScheduleCalls.count).toEqual(0); + + await setTime(REFERENCE_TIMESTAMP + 3 * HOUR + 1 * SECOND); // 3:00:01am + expect(cronScheduleCalls.count).toEqual(0); + + // Advance time + await setTime(REFERENCE_TIMESTAMP + 4 * HOUR - 30 * SECOND); // 3:59:30am + expect(cronScheduleCalls.count).toEqual(0); + + // Jump back and forward a few times + for (let i = 0; i < 10; i++) { + await setTime(REFERENCE_TIMESTAMP + 4 * HOUR - 1 * MINUTE); // 3:59:00am + expect(cronScheduleCalls.count).toEqual(0); + + await setTime(REFERENCE_TIMESTAMP + 4 * HOUR - 30 * SECOND); // 3:59:30am + expect(cronScheduleCalls.count).toEqual(0); + } + + // Finally advance the clock to cron firing + await setTime(REFERENCE_TIMESTAMP + 4 * HOUR + 1 * SECOND); // 4:00:01am + expect(cronScheduleCalls.count).toEqual(1); + })); diff --git a/__tests__/cron.test.ts b/__tests__/cron.test.ts new file mode 100644 index 00000000..1f8788ed --- /dev/null +++ b/__tests__/cron.test.ts @@ -0,0 +1,146 @@ +import { run } from "../src"; +import { + ESCAPED_GRAPHILE_WORKER_SCHEMA, + EventMonitor, + getJobs, + getKnown, + reset, + withOptions, +} from "./helpers"; + +const CRONTAB_DO_IT = ` +0 */4 * * * do_it ?fill=1d +`; +const FOUR_HOURS = 4 * 60 * 60 * 1000; + +test("registers identifiers", () => + withOptions(async (options) => { + const { pgPool } = options; + await reset(pgPool, options); + { + const known = await getKnown(pgPool); + expect(known).toHaveLength(0); + } + const runner = await run({ + ...options, + crontab: CRONTAB_DO_IT, + }); + await runner.stop(); + { + const known = await getKnown(pgPool); + expect(known).toHaveLength(1); + expect(known[0].identifier).toEqual("do_it"); + expect(known[0].known_since).not.toBeNull(); + expect(known[0].last_execution).toBeNull(); + const jobs = await getJobs(pgPool); + expect(jobs).toHaveLength(0); + } + })); + +test("backfills if identifier already registered (5h)", () => + withOptions(async (options) => { + const { pgPool } = options; + await reset(pgPool, options); + const now = Date.now(); + const expectedTime = now - (now % FOUR_HOURS); + await pgPool.query( + ` + insert into ${ESCAPED_GRAPHILE_WORKER_SCHEMA}.known_crontabs ( + identifier, + known_since, + last_execution + ) + values ( + 'do_it', + NOW() - interval '14 days', + NOW() - interval '5 hours' + ) + `, + ); + const eventMonitor = new EventMonitor(); + const cronFinishedBackfilling = eventMonitor.awaitNext("cron:started"); + const runner = await run({ + ...options, + crontab: CRONTAB_DO_IT, + events: eventMonitor.events, + }); + await cronFinishedBackfilling; + await runner.stop(); + { + const known = await getKnown(pgPool); + expect(known).toHaveLength(1); + expect(known[0].identifier).toEqual("do_it"); + expect(known[0].known_since).not.toBeNull(); + if (!known[0].last_execution) { + throw new Error("Expected last_execution to exist"); + } + // There's a small window every 4 hours where the expect might fail due + // to the clock advancing, so we account for that by checking both of the + // expected times. + const lx = +known[0].last_execution; + if (lx !== expectedTime && lx !== expectedTime + FOUR_HOURS) { + // If we get here, then neither of the above were okay. + expect(+known[0].last_execution).toEqual(expectedTime); + } + const jobs = await getJobs(pgPool); + // It's a 5 hour window for a job that runs every 4 hours, there should + // be 1 or 2 jobs. + expect(jobs.length).toBeGreaterThanOrEqual(1); + expect(jobs.length).toBeLessThanOrEqual(2); + expect(jobs[0].task_identifier).toEqual("do_it"); + } + })); + +test("backfills if identifier already registered (25h)", () => + withOptions(async (options) => { + const { pgPool } = options; + await reset(pgPool, options); + const now = Date.now(); + const expectedTime = now - (now % FOUR_HOURS); + await pgPool.query( + ` + insert into ${ESCAPED_GRAPHILE_WORKER_SCHEMA}.known_crontabs ( + identifier, + known_since, + last_execution + ) + values ( + 'do_it', + NOW() - interval '14 days', + NOW() - interval '25 hours' + ) + `, + ); + const eventMonitor = new EventMonitor(); + const cronFinishedBackfilling = eventMonitor.awaitNext("cron:started"); + const runner = await run({ + ...options, + crontab: CRONTAB_DO_IT, + events: eventMonitor.events, + }); + await cronFinishedBackfilling; + await runner.stop(); + { + const known = await getKnown(pgPool); + expect(known).toHaveLength(1); + expect(known[0].identifier).toEqual("do_it"); + expect(known[0].known_since).not.toBeNull(); + if (!known[0].last_execution) { + throw new Error("Expected last_execution to exist"); + } + // There's a small window every 4 hours where the expect might fail due + // to the clock advancing, so we account for that by checking both of the + // expected times. + const lx = +known[0].last_execution; + if (lx !== expectedTime && lx !== expectedTime + FOUR_HOURS) { + // If we get here, then neither of the above were okay. + expect(+known[0].last_execution).toEqual(expectedTime); + } + const jobs = await getJobs(pgPool); + // It's a 25 hour window for a job that runs every 4 hours, there should + // be 6 or 7 jobs + expect(jobs.length).toBeGreaterThanOrEqual(6); + expect(jobs.length).toBeLessThanOrEqual(7); + expect(jobs.every((j) => j.task_identifier === "do_it")).toBe(true); + } + })); diff --git a/__tests__/crontab.test.ts b/__tests__/crontab.test.ts new file mode 100644 index 00000000..6496a090 --- /dev/null +++ b/__tests__/crontab.test.ts @@ -0,0 +1,189 @@ +import { CronItemOptions } from "../src"; +import { parseCrontab } from "../src/crontab"; + +// 0...59 +const ALL_MINUTES = Array.from(Array(60).keys()); +// 0...23 +const ALL_HOURS = Array.from(Array(24).keys()); +// 1...31 +const ALL_DATES = Array.from(Array(32).keys()).slice(1); +// 1...12 +const ALL_MONTHS = Array.from(Array(13).keys()).slice(1); +// 0...6 +const ALL_DOWS = [0, 1, 2, 3, 4, 5, 6]; + +const MINUTE = 60000; +const HOUR = 60 * MINUTE; +const DAY = 24 * HOUR; +const WEEK = 7 * DAY; + +test("parses crontab file correctly", () => { + const exampleCrontab = `\ +# ┌───────────── UTC minute (0 - 59) +# │ ┌───────────── UTC hour (0 - 23) +# │ │ ┌───────────── UTC day of the month (1 - 31) +# │ │ │ ┌───────────── UTC month (1 - 12) +# │ │ │ │ ┌───────────── UTC day of the week (0 - 6) (Sunday to Saturday) +# │ │ │ │ │ ┌───────────── task (identifier) to schedule +# │ │ │ │ │ │ ┌────────── optional scheduling options +# │ │ │ │ │ │ │ ┌────── optional payload to merge +# │ │ │ │ │ │ │ │ +# │ │ │ │ │ │ │ │ +# * * * * * task ?opts {payload} + +* * * * * simple +0 4 * * * every_day_at_4_am +0 4 * * 0 every_sunday_at_4_am +0 4 * * 7 every_sunday_at_4_am ?id=sunday_7 + + + + +0 4 * * 2 every_tuesday_at_4_am {isTuesday: true} +*/10,7,56-59 1 1 1 1 one ?id=stuff&fill=4w3d2h1m&max=3&queue=my_queue&priority=3 {myExtraPayload:{stuff:"here with # hash char"}} + * * * * * lots_of_spaces +`; + const parsed = parseCrontab(exampleCrontab); + + expect(parsed[0].task).toEqual("simple"); + expect(parsed[0].identifier).toEqual("simple"); + expect(parsed[0].minutes).toEqual(ALL_MINUTES); + expect(parsed[0].hours).toEqual(ALL_HOURS); + expect(parsed[0].dates).toEqual(ALL_DATES); + expect(parsed[0].months).toEqual(ALL_MONTHS); + expect(parsed[0].dows).toEqual(ALL_DOWS); + expect(parsed[0].options).toEqual({ backfillPeriod: 0 }); + expect(parsed[0].payload).toEqual(null); + + expect(parsed[1].task).toEqual("every_day_at_4_am"); + expect(parsed[1].identifier).toEqual("every_day_at_4_am"); + expect(parsed[1].minutes).toEqual([0]); + expect(parsed[1].hours).toEqual([4]); + expect(parsed[1].dates).toEqual(ALL_DATES); + expect(parsed[1].months).toEqual(ALL_MONTHS); + expect(parsed[1].dows).toEqual(ALL_DOWS); + expect(parsed[1].options).toEqual({ backfillPeriod: 0 }); + expect(parsed[1].payload).toEqual(null); + + expect(parsed[2].task).toEqual("every_sunday_at_4_am"); + expect(parsed[2].identifier).toEqual("every_sunday_at_4_am"); + expect(parsed[2].minutes).toEqual([0]); + expect(parsed[2].hours).toEqual([4]); + expect(parsed[2].dates).toEqual(ALL_DATES); + expect(parsed[2].months).toEqual(ALL_MONTHS); + expect(parsed[2].dows).toEqual([0]); + expect(parsed[2].options).toEqual({ backfillPeriod: 0 }); + expect(parsed[2].payload).toEqual(null); + + expect(parsed[3].task).toEqual("every_sunday_at_4_am"); + expect(parsed[3].identifier).toEqual("sunday_7"); + expect(parsed[3].minutes).toEqual([0]); + expect(parsed[3].hours).toEqual([4]); + expect(parsed[3].dates).toEqual(ALL_DATES); + expect(parsed[3].months).toEqual(ALL_MONTHS); + expect(parsed[3].dows).toEqual([0]); + expect(parsed[3].options).toEqual({ backfillPeriod: 0 }); + expect(parsed[3].payload).toEqual(null); + + expect(parsed[4].task).toEqual("every_tuesday_at_4_am"); + expect(parsed[4].identifier).toEqual("every_tuesday_at_4_am"); + expect(parsed[4].minutes).toEqual([0]); + expect(parsed[4].hours).toEqual([4]); + expect(parsed[4].dates).toEqual(ALL_DATES); + expect(parsed[4].months).toEqual(ALL_MONTHS); + expect(parsed[4].dows).toEqual([2]); + expect(parsed[4].options).toEqual({ backfillPeriod: 0 }); + expect(parsed[4].payload).toEqual({ isTuesday: true }); + + // */10,7,56-59 1 1 1 1 one ?id=stuff&fill=4w3d2h1m&max=3&queue=my_queue&priority=3 {myExtraPayload:{stuff:"here with # hash char"}} + expect(parsed[5].task).toEqual("one"); + expect(parsed[5].identifier).toEqual("stuff"); + expect(parsed[5].minutes).toEqual([0, 7, 10, 20, 30, 40, 50, 56, 57, 58, 59]); + expect(parsed[5].hours).toEqual([1]); + expect(parsed[5].dates).toEqual([1]); + expect(parsed[5].months).toEqual([1]); + expect(parsed[5].dows).toEqual([1]); + expect(parsed[5].options).toEqual({ + backfillPeriod: 4 * WEEK + 3 * DAY + 2 * HOUR + 1 * MINUTE, + maxAttempts: 3, + priority: 3, + queueName: "my_queue", + } as CronItemOptions); + expect(parsed[5].payload).toEqual({ + myExtraPayload: { stuff: "here with # hash char" }, + }); + + expect(parsed[6].task).toEqual("lots_of_spaces"); + expect(parsed[6].identifier).toEqual("lots_of_spaces"); + expect(parsed[6].minutes).toEqual(ALL_MINUTES); + expect(parsed[6].hours).toEqual(ALL_HOURS); + expect(parsed[6].dates).toEqual(ALL_DATES); + expect(parsed[6].months).toEqual(ALL_MONTHS); + expect(parsed[6].dows).toEqual(ALL_DOWS); + expect(parsed[6].options).toEqual({ backfillPeriod: 0 }); + expect(parsed[6].payload).toEqual(null); + + expect(parsed).toMatchSnapshot(); +}); + +describe("gives error on syntax error", () => { + test("too few parameters", () => { + expect(() => + parseCrontab(`\ +* * * * too_few_parameters +`), + ).toThrowErrorMatchingInlineSnapshot( + `"Could not process line '1' of crontab: '* * * * too_few_parameters'"`, + ); + }); + + test("invalid command (two parts)", () => { + expect(() => + parseCrontab(`\ +* * * * * two tasks +`), + ).toThrowErrorMatchingInlineSnapshot( + `"Invalid command specification in line 1 of crontab."`, + ); + }); + + test("range exceeded", () => { + expect(() => + parseCrontab(`\ +1,60 * * * * out_of_range +`), + ).toThrowErrorMatchingInlineSnapshot( + `"Too large value '60' in minutes range in line 1 of crontab: expected values in the range 0-59."`, + ); + }); + + test("invalid wildcard divisor", () => { + expect(() => + parseCrontab(`\ +*/0 * * * * division_by_zero +`), + ).toThrowErrorMatchingInlineSnapshot( + `"Invalid wildcard expression '*/0' in minutes range in line 1 of crontab: divisor '0' expected to be greater than zero"`, + ); + }); + + test("unknown option", () => { + expect(() => + parseCrontab(`\ +* * * * * invalid_options ?unknown=3 +`), + ).toThrowErrorMatchingInlineSnapshot( + `"Options on line 1 of crontab contains unsupported key 'unknown'; supported keys are: 'id', 'fill', 'max', 'queue', 'priority'."`, + ); + }); + + test("invalid JSON5 syntax", () => { + expect(() => + parseCrontab(`\ +* * * * * json_syntax_error {invalidJson=true} +`), + ).toThrowErrorMatchingInlineSnapshot( + `"Failed to parse JSON5 payload on line 1 of crontab: JSON5: invalid character '=' at 1:13"`, + ); + }); +}); diff --git a/__tests__/helpers.ts b/__tests__/helpers.ts index dbec8996..d715bc16 100644 --- a/__tests__/helpers.ts +++ b/__tests__/helpers.ts @@ -1,9 +1,29 @@ +import { EventEmitter } from "events"; import * as pg from "pg"; import { parse } from "pg-connection-string"; -import { Job, WorkerPoolOptions, WorkerUtils } from "../src/interfaces"; +import defer from "../src/deferred"; +import { + Job, + KnownCrontab, + RunnerOptions, + WorkerEventMap, + WorkerPoolOptions, + WorkerUtils, +} from "../src/interfaces"; import { migrate } from "../src/migrate"; +export { + sleep, + sleepUntil, + SECOND, + MINUTE, + HOUR, + DAY, + WEEK, + setupFakeTimers, +} from "jest-time-helpers"; + // Sometimes CI's clock can get interrupted (it is shared infra!) so this // extends the default timeout just in case. jest.setTimeout(15000); @@ -82,7 +102,7 @@ export async function reset( try { await migrate(options, client); } finally { - await client.release(); + client.release(); } } } @@ -98,6 +118,20 @@ export async function jobCount( return row ? row.count || 0 : 0; } +export async function getKnown(pgPool: pg.Pool) { + const { rows } = await pgPool.query( + `select * from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}.known_crontabs`, + ); + return rows; +} + +export async function getJobs(pgPool: pg.Pool) { + const { rows } = await pgPool.query( + `select * from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}.jobs`, + ); + return rows; +} + export function makeMockJob(taskIdentifier: string): Job { const createdAt = new Date(Date.now() - 12345678); return { @@ -120,22 +154,6 @@ export function makeMockJob(taskIdentifier: string): Job { }; } -export const sleep = (ms: number) => - new Promise((resolve) => setTimeout(resolve, ms)); - -export async function sleepUntil(condition: () => boolean, maxDuration = 2000) { - const start = Date.now(); - // Wait up to a second for the job to be executed - while (!condition() && Date.now() - start < maxDuration) { - await sleep(2); - } - if (!condition()) { - throw new Error( - `Slept for ${Date.now() - start}ms but condition never passed`, - ); - } -} - export async function makeSelectionOfJobs( utils: WorkerUtils, pgClient: pg.PoolClient, @@ -167,3 +185,51 @@ export async function makeSelectionOfJobs( untouchedJob, }; } + +interface EventCounter { + count: number; + lastEvent: null | WorkerEventMap[TEventName]; +} + +export class EventMonitor { + public events: EventEmitter; + + constructor(eventEmitter = new EventEmitter()) { + this.events = eventEmitter; + } + + awaitNext(eventName: keyof WorkerEventMap): Promise { + const d = defer(); + this.events.once(eventName, () => d.resolve()); + return d; + } + + count( + eventName: TEventName, + ): EventCounter { + const counter: EventCounter = { count: 0, lastEvent: null }; + this.events.on(eventName, (payload) => { + counter.count++; + counter.lastEvent = payload; + }); + return counter; + } + + release() {} +} + +export function withOptions( + callback: (options: RunnerOptions & { pgPool: pg.Pool }) => Promise, +) { + return withPgPool((pgPool) => + callback({ + pgPool, + taskList: { + /* DO NOT ADD do_it HERE! */ + do_something_else(payload, helpers) { + helpers.logger.debug("do_something_else called", { payload }); + }, + }, + }), + ); +} diff --git a/__tests__/migrate.test.ts b/__tests__/migrate.test.ts index 3a0c209f..e03def0e 100644 --- a/__tests__/migrate.test.ts +++ b/__tests__/migrate.test.ts @@ -33,7 +33,7 @@ test("migration installs schema; second migration does no harm", async () => { const { rows: migrationRows } = await pgClient.query( `select * from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}.migrations`, ); - expect(migrationRows).toHaveLength(7); + expect(migrationRows).toHaveLength(8); const migration = migrationRows[0]; expect(migration.id).toEqual(1); diff --git a/__tests__/schema.sql b/__tests__/schema.sql index 90e99cce..79484926 100644 --- a/__tests__/schema.sql +++ b/__tests__/schema.sql @@ -386,6 +386,11 @@ CREATE SEQUENCE graphile_worker.jobs_id_seq NO MAXVALUE CACHE 1; ALTER SEQUENCE graphile_worker.jobs_id_seq OWNED BY graphile_worker.jobs.id; +CREATE TABLE graphile_worker.known_crontabs ( + identifier text NOT NULL, + known_since timestamp with time zone NOT NULL, + last_execution timestamp with time zone +); CREATE TABLE graphile_worker.migrations ( id integer NOT NULL, ts timestamp with time zone DEFAULT now() NOT NULL @@ -397,6 +402,8 @@ ALTER TABLE ONLY graphile_worker.jobs ADD CONSTRAINT jobs_key_key UNIQUE (key); ALTER TABLE ONLY graphile_worker.jobs ADD CONSTRAINT jobs_pkey PRIMARY KEY (id); +ALTER TABLE ONLY graphile_worker.known_crontabs + ADD CONSTRAINT known_crontabs_pkey PRIMARY KEY (identifier); ALTER TABLE ONLY graphile_worker.migrations ADD CONSTRAINT migrations_pkey PRIMARY KEY (id); CREATE INDEX jobs_priority_run_at_id_locked_at_without_failures_idx ON graphile_worker.jobs USING btree (priority, run_at, id, locked_at) WHERE (attempts < max_attempts); @@ -408,3 +415,4 @@ CREATE TRIGGER _500_increase_job_queue_count_update AFTER UPDATE OF queue_name O CREATE TRIGGER _900_notify_worker AFTER INSERT ON graphile_worker.jobs FOR EACH STATEMENT EXECUTE PROCEDURE graphile_worker.tg_jobs__notify_new_jobs(); ALTER TABLE graphile_worker.job_queues ENABLE ROW LEVEL SECURITY; ALTER TABLE graphile_worker.jobs ENABLE ROW LEVEL SECURITY; +ALTER TABLE graphile_worker.known_crontabs ENABLE ROW LEVEL SECURITY; diff --git a/package.json b/package.json index 6fd9b1a9..c3b91ff2 100644 --- a/package.json +++ b/package.json @@ -44,12 +44,15 @@ "@types/pg": "^7.14.3", "chokidar": "^3.4.0", "cosmiconfig": "^6.0.0", + "json5": "^2.1.3", "pg": ">=6.5 <9", "tslib": "^1.11.1", "yargs": "^15.1.0" }, "devDependencies": { "@types/jest": "^25.2.1", + "@types/json5": "^0.0.30", + "@types/node": "^14.14.14", "@typescript-eslint/eslint-plugin": "^2.29.0", "@typescript-eslint/parser": "^2.29.0", "depcheck": "^0.9.2", @@ -60,6 +63,7 @@ "eslint-plugin-simple-import-sort": "^5.0.2", "eslint_d": "^8.1.1", "jest": "^25.4.0", + "jest-time-helpers": "^0.1.0", "pg-connection-string": "^2.2.0", "prettier": "^2.0.5", "ts-jest": "^25.4.0", diff --git a/sql/000008.sql b/sql/000008.sql new file mode 100644 index 00000000..c0aecd47 --- /dev/null +++ b/sql/000008.sql @@ -0,0 +1,6 @@ +create table :GRAPHILE_WORKER_SCHEMA.known_crontabs ( + identifier text not null primary key, + known_since timestamptz not null, + last_execution timestamptz +); +alter table :GRAPHILE_WORKER_SCHEMA.known_crontabs enable row level security; diff --git a/src/cli.ts b/src/cli.ts index a2b2b900..ed3cb9ce 100644 --- a/src/cli.ts +++ b/src/cli.ts @@ -2,6 +2,7 @@ import * as yargs from "yargs"; import { defaults } from "./config"; +import getCronItems from "./getCronItems"; import getTasks from "./getTasks"; import { run, runOnce } from "./index"; import { RunnerOptions } from "./interfaces"; @@ -121,11 +122,20 @@ async function main() { } const watchedTasks = await getTasks(options, `${process.cwd()}/tasks`, WATCH); + const watchedCronItems = await getCronItems( + options, + `${process.cwd()}/crontab`, + WATCH, + ); if (ONCE) { await runOnce(options, watchedTasks.tasks); } else { - const { promise } = await run(options, watchedTasks.tasks); + const { promise } = await run( + options, + watchedTasks.tasks, + watchedCronItems.items, + ); // Continue forever(ish) await promise; } diff --git a/src/cron.ts b/src/cron.ts new file mode 100644 index 00000000..4d8c4987 --- /dev/null +++ b/src/cron.ts @@ -0,0 +1,589 @@ +import * as assert from "assert"; +import { Pool } from "pg"; + +import { parseCrontab } from "./crontab"; +import defer from "./deferred"; +import getCronItems from "./getCronItems"; +import { + Cron, + CronJob, + JobAndCronIdentifier, + KnownCrontab, + ParsedCronItem, + RunnerOptions, + WorkerEvents, +} from "./interfaces"; +import { processSharedOptions, Releasers } from "./lib"; + +interface CronRequirements { + pgPool: Pool; + events: WorkerEvents; +} + +/** + * This function looks through all the cron items we have (e.g. from our + * crontab file) and compares them to the items we already know about. If the + * item is not previously know, we add it to the list `unknownIdentifiers` so + * that it can be recorded in the database (i.e. it will be "known" from now + * on). If the item was previously known, we add an entry to + * `backfillItemsAndDates` indicating the `item` and earliest time + * (`notBefore`) that a backfill should operate from. This is later compared + * to the configuration to see how much backfilling to do. + */ +function getBackfillAndUnknownItems( + parsedCronItems: ParsedCronItem[], + knownCrontabs: KnownCrontab[], +) { + const backfillItemsAndDates: Array<{ + item: ParsedCronItem; + notBefore: Date; + }> = []; + const unknownIdentifiers: string[] = []; + for (const item of parsedCronItems) { + const known = knownCrontabs.find( + (record) => record.identifier === item.identifier, + ); + if (known) { + // We only back-fill for tasks we already know about + const notBefore = known.last_execution || known.known_since; + backfillItemsAndDates.push({ + item, + notBefore, + }); + } else { + unknownIdentifiers.push(item.identifier); + } + } + return { backfillItemsAndDates, unknownIdentifiers }; +} + +/** + * Rounds the incoming date to the nearest minute (either rounding up or down). + * Tagged "unsafe" because it mutates the argument, this is desired for + * performance but may be unexpected. + */ +function unsafeRoundToMinute(ts: Date, roundUp = false): Date { + if (ts.getUTCSeconds() > 0 || ts.getUTCMilliseconds() > 0) { + ts.setUTCSeconds(0); + ts.setUTCMilliseconds(0); + if (roundUp) { + ts.setUTCMinutes(ts.getUTCMinutes() + 1); + } + } + return ts; +} + +function makeJobForItem( + item: ParsedCronItem, + ts: string, + backfilled = false, +): CronJob { + return { + task: item.task, + payload: { + ...item.payload, + _cron: { + ts, + backfilled, + }, + }, + queueName: item.options.queueName, + runAt: ts, + maxAttempts: item.options.maxAttempts, + priority: item.options.priority, + }; +} + +/** + * Schedules a list of cron jobs all due at the same timestamp. Jobs that were + * already scheduled (e.g. via a different Worker instance) will be skipped + * automatically. + */ +async function scheduleCronJobs( + pgPool: Pool, + escapedWorkerSchema: string, + jobsAndIdentifiers: JobAndCronIdentifier[], + ts: string, +) { + // Note that `identifier` is guaranteed to be unique for every record + // in `specs`. + await pgPool.query( + ` + with specs as ( + select + index, + (json->>'identifier')::text as identifier, + ((json->'job')->>'task')::text as task, + ((json->'job')->'payload')::json as payload, + ((json->'job')->>'queueName')::text as queue_name, + ((json->'job')->>'runAt')::timestamptz as run_at, + ((json->'job')->>'maxAttempts')::int as max_attempts, + ((json->'job')->>'priority')::int as priority + from json_array_elements($1::json) with ordinality AS entries (json, index) + ), + locks as ( + insert into ${escapedWorkerSchema}.known_crontabs (identifier, known_since, last_execution) + select + specs.identifier, + $2 as known_since, + $2 as last_execution + from specs + on conflict (identifier) + do update set last_execution = excluded.last_execution + where (known_crontabs.last_execution is null or known_crontabs.last_execution < excluded.last_execution) + returning known_crontabs.identifier + ) + select + ${escapedWorkerSchema}.add_job( + specs.task, + specs.payload, + specs.queue_name, + specs.run_at, + specs.max_attempts, + null, -- job key + specs.priority + ) + from specs + inner join locks on (locks.identifier = specs.identifier) + order by specs.index asc + `, + [JSON.stringify(jobsAndIdentifiers), ts], + ); +} + +/** + * Marks any previously unknown crontab identifiers as now being known. Then + * performs backfilling on any crontab tasks that need it. + */ +async function registerAndBackfillItems( + { pgPool, events, cron }: { pgPool: Pool; events: WorkerEvents; cron: Cron }, + escapedWorkerSchema: string, + parsedCronItems: ParsedCronItem[], + startTime: Date, +) { + // First, scan the DB to get our starting point. + const { rows } = await pgPool.query( + `SELECT * FROM ${escapedWorkerSchema}.known_crontabs`, + ); + + const { + backfillItemsAndDates, + unknownIdentifiers, + } = getBackfillAndUnknownItems(parsedCronItems, rows); + + if (unknownIdentifiers.length) { + // They're known now. + await pgPool.query( + ` + INSERT INTO ${escapedWorkerSchema}.known_crontabs (identifier, known_since) + SELECT identifier, $2 + FROM unnest($1::text[]) AS unnest (identifier) + ON CONFLICT DO NOTHING + `, + [unknownIdentifiers, startTime.toISOString()], + ); + } + + // If any jobs are overdue, trigger them. + // NOTE: this is not the fastest algorithm, we can definitely optimise this later. + // First find out the largest backfill period: + const largestBackfill = parsedCronItems.reduce( + (largest, item) => Math.max(item.options.backfillPeriod, largest), + 0, + ); + // Then go back this period in time and fill forward from there. + if (largestBackfill > 0) { + // Unsafe because we mutate it during the loop (for performance); be sure + // to take a copy of it (or convert to string) when used in places where + // later mutation would cause issues. + const unsafeTs = new Date(+startTime - largestBackfill); + + // Round up to the nearest minute. + unsafeRoundToMinute(unsafeTs, true); + + // We're `await`-ing inside this loop: serialization is desired. If we were + // to parallelize this (e.g. with `Promise.all`) then race conditions could + // mean that backfilling of earlier tasks is skipped because + // known_crontabs.last_execution may be advanced for a later backfill + // before an earlier backfill occurs. + while (unsafeTs < startTime) { + const timeAgo = +startTime - +unsafeTs; + // Note: `ts` and `digest` are both safe. + const ts = unsafeTs.toISOString(); + const digest = digestTimestamp(unsafeTs); + + // The identifiers in this array are guaranteed to be unique, since cron + // items are guaranteed to have unique identifiers. + const itemsToBackfill: Array = []; + + // See if anything needs backfilling for this timestamp + for (const { item, notBefore } of backfillItemsAndDates) { + if ( + item.options.backfillPeriod >= timeAgo && + unsafeTs >= notBefore && + cronItemMatches(item, digest) + ) { + itemsToBackfill.push({ + identifier: item.identifier, + job: makeJobForItem(item, ts, true), + }); + } + } + + if (itemsToBackfill.length) { + // We're currently backfilling once per timestamp (rather than + // gathering them all together and doing a single statement) due to + // the way the last_execution column of the known_crontabs table works. + // At this time it's not expected that backfilling will be sufficiently + // expensive to justify optimising this further. + events.emit("cron:backfill", { + cron, + itemsToBackfill, + timestamp: ts, + }); + await scheduleCronJobs( + pgPool, + escapedWorkerSchema, + itemsToBackfill, + ts, + ); + } + + // Advance our counter (or risk infinite loop!). + unsafeTs.setUTCMinutes(unsafeTs.getUTCMinutes() + 1); + } + } +} + +/** One minute in milliseconds */ +const ONE_MINUTE = 60 * 1000; + +/** + * Executes our scheduled jobs as required. + * + * This is not currently intended for usage directly; use `run` instead. + * + * @internal + * + * @param options - the common options + * @param parsedCronItems - MUTABLE list of _parsed_ cron items to monitor. Do not assume this is static. + * @param requirements - the helpers that this task needs + */ +export const runCron = ( + options: RunnerOptions, + parsedCronItems: ParsedCronItem[], + requirements: CronRequirements, +): Cron => { + const { pgPool } = requirements; + const { logger, escapedWorkerSchema, events } = processSharedOptions(options); + + const promise = defer(); + let released = false; + let timeout: NodeJS.Timer | null = null; + + let stopCalled = false; + function stop(e?: Error) { + if (!stopCalled) { + stopCalled = true; + if (timeout) { + clearTimeout(timeout); + timeout = null; + } + if (e) { + promise.reject(e); + } else { + promise.resolve(); + } + } else { + logger.error( + "Graphile Worker internal bug in src/cron.ts: calling `stop()` more than once shouldn't be possible. Please report this.", + ); + } + } + + async function cronMain() { + if (released) { + return stop(); + } + + const start = new Date(); + events.emit("cron:starting", { cron: this, start }); + + // We must backfill BEFORE scheduling any new jobs otherwise backfill won't + // work due to known_crontabs.last_execution having been updated. + await registerAndBackfillItems( + { pgPool, events, cron: this }, + escapedWorkerSchema, + parsedCronItems, + new Date(+start), + ); + + events.emit("cron:started", { cron: this, start }); + + if (released) { + return stop(); + } + + // The backfill may have taken a moment, we should continue from where the + // worker started and catch up as quickly as we can. This does **NOT** + // count as a backfill. + let nextTimestamp = unsafeRoundToMinute(new Date(+start), true); + + const scheduleNextLoop = () => { + if (released) { + return stop(); + } + // + 1 millisecond to try and ensure this happens in the next minute + // rather than at the end of the previous. + timeout = setTimeout(() => { + timeout = null; + loop(); + }, Math.max(+nextTimestamp - Date.now() + 1, 1)); + }; + + async function loop() { + try { + if (released) { + return stop(); + } + + // THIS MUST COME BEFORE nextTimestamp IS MUTATED + const digest = digestTimestamp(nextTimestamp); + const ts = nextTimestamp.toISOString(); + const expectedTimestamp = +nextTimestamp; + + // With seconds and milliseconds + const currentTimestamp = Date.now(); + // Round to beginning of current minute; should match expectedTimestamp + const roundedCurrentTimestamp = + currentTimestamp - (currentTimestamp % ONE_MINUTE); + + /* + * In the event of clock skew, or overloaded runloop causing delays, + * it's possible that expectedTimestamp and roundedCurrentTimestamp + * might not match up. If we've not hit expectedTimestamp yet, we + * should just reschedule. If we've gone past expectedTimestamp then we + * should do as much work as is necessary to catch up, ignoring + * backfill since this is never expected to be a large period. + */ + if (roundedCurrentTimestamp < expectedTimestamp) { + logger.warn( + `Graphile Worker Cron fired ${( + (expectedTimestamp - currentTimestamp) / + 1000 + ).toFixed(3)}s too early (clock skew?); rescheduling`, + ); + events.emit("cron:prematureTimer", { + cron: this, + currentTimestamp, + expectedTimestamp, + }); + // NOTE: we must NOT have mutated nextTimestamp before here in `loop()`. + scheduleNextLoop(); + return; + } else if (roundedCurrentTimestamp > expectedTimestamp) { + logger.warn( + `Graphile Worker Cron fired too late; catching up (${Math.floor( + (currentTimestamp - expectedTimestamp) / ONE_MINUTE, + )}m${Math.floor( + ((currentTimestamp - expectedTimestamp) % ONE_MINUTE) / 1000, + )}s behind)`, + ); + events.emit("cron:overdueTimer", { + cron: this, + currentTimestamp, + expectedTimestamp, + }); + } + + // The identifiers in this array are guaranteed to be unique. + const jobsAndIdentifiers: Array = []; + + // Gather the relevant jobs + for (const item of parsedCronItems) { + if (cronItemMatches(item, digest)) { + jobsAndIdentifiers.push({ + identifier: item.identifier, + job: makeJobForItem(item, ts), + }); + } + } + + // Finally actually run the jobs. + if (jobsAndIdentifiers.length) { + events.emit("cron:schedule", { + cron: this, + timestamp: expectedTimestamp, + jobsAndIdentifiers, + }); + await scheduleCronJobs( + pgPool, + escapedWorkerSchema, + jobsAndIdentifiers, + ts, + ); + events.emit("cron:scheduled", { + cron: this, + timestamp: expectedTimestamp, + jobsAndIdentifiers, + }); + + if (released) { + return stop(); + } + } + + // MUTATE nextTimestamp: advance by a minute ready for the next run. + nextTimestamp.setUTCMinutes(nextTimestamp.getUTCMinutes() + 1); + + // This must come at the very end (otherwise we might accidentally skip + // timestamps on error). + scheduleNextLoop(); + } catch (e) { + // If something goes wrong; abort. The calling code should re-schedule + // which will re-trigger the backfilling code. + return stop(e); + } + } + + scheduleNextLoop(); + } + + cronMain().catch(stop); + + return { + release() { + if (!released) { + released = true; + if (timeout) { + // Next loop is queued; lets cancel it early + stop(); + } + } + return promise; + }, + promise, + }; +}; + +export async function getParsedCronItemsFromOptions( + options: RunnerOptions, + releasers: Releasers, +): Promise> { + const { crontabFile, parsedCronItems, crontab } = options; + + if (!crontabFile && !parsedCronItems && !crontab) { + return []; + } + + if (crontab) { + assert( + !crontabFile, + "`crontab` and `crontabFile` must not be set at the same time.", + ); + assert( + !parsedCronItems, + "`crontab` and `parsedCronItems` must not be set at the same time.", + ); + + return parseCrontab(crontab); + } else if (crontabFile) { + assert( + !parsedCronItems, + "`crontabFile` and `parsedCronItems` must not be set at the same time.", + ); + + const watchedCronItems = await getCronItems(options, crontabFile, false); + releasers.push(() => watchedCronItems.release()); + return watchedCronItems.items; + } else { + assert(parsedCronItems != null, "Expected `parsedCronItems` to be set."); + // Basic check to ensure that users remembered to call + // `parseCronItems`/`parseCrontab`; not intended to be a full check, just a + // quick one to catch the obvious errors. Keep in mind that + // `parsedCronItems` is mutable so it may be changed later to contain more + // entries; we can't keep performing these checks everywhere for + // performance reasons. + assert( + Array.isArray(parsedCronItems), + "Expected `parsedCronItems` to be an array; you must use a helper e.g. `parseCrontab()` or `parseCronItems()` to produce this value.", + ); + const firstItem = parsedCronItems[0]; + if (firstItem) { + if ( + !Array.isArray(firstItem.minutes) || + !Array.isArray(firstItem.hours) || + !Array.isArray(firstItem.dates) || + !Array.isArray(firstItem.months) || + !Array.isArray(firstItem.dows) + ) { + throw new Error( + "Invalid `parsedCronItems`; you must use a helper e.g. `parseCrontab()` or `parseCronItems()` to produce this value.", + ); + } + } + + return parsedCronItems; + } +} + +/** + * The digest of a timestamp into the component parts that a cron schedule cares about. + */ +interface TimestampDigest { + min: number; + hour: number; + date: number; + month: number; + dow: number; +} + +/** + * Digests a timestamp into its min/hour/date/month/dow components that are + * needed for cron matching. + * + * WARNING: the timestamp passed into this function might be mutated later, so + * **do not memoize** using the value itself. If memoization is necessary, it + * could be done using `+ts` as the key. + */ +function digestTimestamp(ts: Date): TimestampDigest { + const min = ts.getUTCMinutes(); + const hour = ts.getUTCHours(); + const date = ts.getUTCDate(); + const month = ts.getUTCMonth() + 1; + const dow = ts.getUTCDay(); + return { min, hour, date, month, dow }; +} + +/** + * Returns true if the cronItem should fire for the given timestamp digest, + * false otherwise. + */ +export function cronItemMatches( + cronItem: ParsedCronItem, + digest: TimestampDigest, +): boolean { + const { min, hour, date, month, dow } = digest; + + if ( + // If minute, hour and month match + cronItem.minutes.includes(min) && + cronItem.hours.includes(hour) && + cronItem.months.includes(month) + ) { + const dateIsExclusionary = cronItem.dates.length !== 31; + const dowIsExclusionary = cronItem.dows.length !== 7; + if (dateIsExclusionary && dowIsExclusionary) { + // Cron has a special behaviour: if both date and day of week are + // exclusionary (i.e. not "*") then a match for *either* passes. + return cronItem.dates.includes(date) || cronItem.dows.includes(dow); + } else if (dateIsExclusionary) { + return cronItem.dates.includes(date); + } else if (dowIsExclusionary) { + return cronItem.dows.includes(date); + } else { + return true; + } + } + return false; +} diff --git a/src/crontab.ts b/src/crontab.ts new file mode 100644 index 00000000..bd622b95 --- /dev/null +++ b/src/crontab.ts @@ -0,0 +1,455 @@ +import * as JSON5 from "json5"; +import { parse } from "querystring"; + +import { CronItem, CronItemOptions, ParsedCronItem } from "./interfaces"; + +/** One second in milliseconds */ +const SECOND = 1000; +/** One minute in milliseconds */ +const MINUTE = 60 * SECOND; +/** One hour in milliseconds */ +const HOUR = 60 * MINUTE; +/** One day in milliseconds */ +const DAY = 24 * HOUR; +/** One week in milliseconds */ +const WEEK = 7 * DAY; + +// A (non-comment, non-empty) line in the crontab file +/** Separates crontab line into the minute, hour, day of month, month, day of week and command parts. */ +const CRONTAB_LINE_PARTS = /^([0-9*/,-]+)\s+([0-9*/,-]+)\s+([0-9*/,-]+)\s+([0-9*/,-]+)\s+([0-9*/,-]+)\s+(.*)$/; +/** Just the time expression from CRONTAB_LINE_PARTS */ +const CRONTAB_TIME_PARTS = /^([0-9*/,-]+)\s+([0-9*/,-]+)\s+([0-9*/,-]+)\s+([0-9*/,-]+)\s+([0-9*/,-]+)$/; + +// Crontab ranges from the minute, hour, day of month, month and day of week parts of the crontab line +/** Matches an explicit numeric value */ +const CRONTAB_NUMBER = /^([0-9]+)$/; +/** Matches a range of numeric values */ +const CRONTAB_RANGE = /^([0-9]+)-([0-9]+)$/; +/** Matches a numeric wildcard, optionally with a divisor */ +const CRONTAB_WILDCARD = /^\*(?:\/([0-9]+))?$/; + +// The command from the crontab line +/** Splits the command from the crontab line into the task, options and payload. */ +const CRONTAB_COMMAND = /^([_a-zA-Z][_a-zA-Z0-9:_-]*)(?:\s+\?([^\s]+))?(?:\s+(\{.*\}))?$/; + +// Crontab command options +/** Matches the id=UID option, capturing the unique identifier */ +const CRONTAB_OPTIONS_ID = /^([_a-zA-Z][-_a-zA-Z0-9]*)$/; +/** Matches the fill=t option, capturing the time phrase */ +const CRONTAB_OPTIONS_BACKFILL = /^((?:[0-9]+[smhdw])+)$/; +/** Matches the max=n option, capturing the max executions number */ +const CRONTAB_OPTIONS_MAX = /^([0-9]+)$/; +/** Matches the queue=name option, capturing the queue name */ +const CRONTAB_OPTIONS_QUEUE = /^([-a-zA-Z0-9_:]+)$/; +/** Matches the priority=n option, capturing the priority value */ +const CRONTAB_OPTIONS_PRIORITY = /^(-?[0-9]+)$/; + +/** + * Parses a range from a crontab line; a comma separated list of: + * + * - exact number + * - wildcard `*` optionally with `/n` divisor + * - range `a-b` + * + * Returns an ordered list of unique numbers in the range `min` to `max` that match the given range. + * + * If `wrap` is true, then the number `max + 1` will be replaced by the number + * `min`; this is specifically to handle the value `7` being used to represent + * Sunday (as opposed to `0` which is correct). + */ +const parseCrontabRange = ( + locationForError: string, + range: string, + min: number, + max: number, + wrap = false, +): number[] => { + const parts = range.split(","); + const numbers: number[] = []; + + /** + * Adds a number to our numbers array after wrapping it (if necessary) and + * checking it's in the valid range. + */ + function add(number: number) { + const wrappedNumber = wrap && number === max + 1 ? min : number; + if (wrappedNumber > max) { + throw new Error( + `Too large value '${number}' in ${locationForError}: expected values in the range ${min}-${max}.`, + ); + } else if (wrappedNumber < min) { + throw new Error( + `Too small value '${number}' in ${locationForError}: expected values in the range ${min}-${max}.`, + ); + } else { + numbers.push(wrappedNumber); + } + } + + for (const part of parts) { + { + const matches = CRONTAB_NUMBER.exec(part); + if (matches) { + add(parseInt(matches[1], 10)); + continue; + } + } + { + const matches = CRONTAB_RANGE.exec(part); + if (matches) { + const a = parseInt(matches[1], 10); + const b = parseInt(matches[2], 10); + if (b <= a) { + throw new Error( + `Invalid range '${part}' in ${locationForError}: destination is not larger than source`, + ); + } + for (let i = a; i <= b; i++) { + add(i); + } + continue; + } + } + { + const matches = CRONTAB_WILDCARD.exec(part); + if (matches) { + const divisor = matches[1] ? parseInt(matches[1], 10) : 1; + if (divisor >= 1) { + for (let i = min; i <= max; i += divisor) { + // We know this is fine, so no need to call `add` + numbers.push(i); + } + } else { + throw new Error( + `Invalid wildcard expression '${part}' in ${locationForError}: divisor '${matches[1]}' expected to be greater than zero`, + ); + } + continue; + } + } + throw new Error( + `Unsupported syntax '${part}' in ${locationForError}: this doesn't appear to be a number, range or wildcard`, + ); + } + + numbers.sort((a, b) => a - b); + + // Filter out numbers that are identical to the previous number + const uniqueNumbers = numbers.filter( + (currentNumber, idx) => idx === 0 || numbers[idx - 1] !== currentNumber, + ); + + return uniqueNumbers; +}; + +/** Matches the quantity and period string at the beginning of a timephrase */ +const TIMEPHRASE_PART = /^([0-9]+)([smhdw])/; +const PERIOD_DURATIONS = { + s: SECOND, + m: MINUTE, + h: HOUR, + d: DAY, + w: WEEK, +}; + +/** + * Returns a period of time in milliseconds representing the time phrase given. + * + * Time phrases are comprised of a sequence of number-letter combinations, + * where the number represents a quantity and the letter represents a time + * period, e.g. `5d` for `five days`, or `3h` for `three hours`; e.g. + * `4w3d2h1m` represents `four weeks, three days, 2 hours and 1 minute` (i.e. a + * period of 44761 minutes). The following time periods are supported: + * + * - `s` - one second (1000 milliseconds) + * - `m` - one minute (60 seconds) + * - `h` - one hour (60 minutes) + * - `d` - on day (24 hours) + * - `w` - one week (7 days) + */ +const parseTimePhrase = (timePhrase: string): number => { + let remaining = timePhrase; + let milliseconds = 0; + while (remaining.length) { + const matches = TIMEPHRASE_PART.exec(remaining); + if (!matches) { + throw new Error( + `Invalid time phrase '${timePhrase}', did not understand '${remaining}'`, + ); + } + const [wholeMatch, quantity, period] = matches; + const periodDuration = PERIOD_DURATIONS[period] || 0; + milliseconds += parseInt(quantity, 10) * periodDuration; + remaining = remaining.substr(wholeMatch.length); + } + return milliseconds; +}; + +const parseCrontabOptions = ( + lineNumber: number, + optionsString: string | undefined, +): { options: CronItemOptions; identifier: string | undefined } => { + const parsed = optionsString != null ? parse(optionsString) : {}; + let backfillPeriod: number | undefined = undefined; + let maxAttempts: number | undefined = undefined; + let identifier: string | undefined = undefined; + let queueName: string | undefined = undefined; + let priority: number | undefined = undefined; + + type MatcherTuple = [RegExp, (matches: RegExpExecArray) => void]; + + const matchers: { [key: string]: MatcherTuple } = { + id: [ + CRONTAB_OPTIONS_ID, + (matches) => { + identifier = matches[1]; + }, + ], + fill: [ + CRONTAB_OPTIONS_BACKFILL, + (matches) => { + backfillPeriod = parseTimePhrase(matches[1]); + }, + ], + max: [ + CRONTAB_OPTIONS_MAX, + (matches) => { + maxAttempts = parseInt(matches[1], 10); + }, + ], + queue: [ + CRONTAB_OPTIONS_QUEUE, + (matches) => { + queueName = matches[1]; + }, + ], + priority: [ + CRONTAB_OPTIONS_PRIORITY, + (matches) => { + priority = parseInt(matches[1], 10); + }, + ], + }; + + function match(matcher: MatcherTuple, key: string, value: string) { + const [regex, set] = matcher; + const matches = regex.exec(value); + if (matches) { + set(matches); + } else { + throw new Error( + `Options on line ${lineNumber} of crontab contains invalid value for '${key}', value '${value}' is not compatible with this option.`, + ); + } + } + + Object.entries(parsed).forEach(([key, value]) => { + if (typeof value !== "string") { + throw new Error( + `Options on line ${lineNumber} of crontab contains invalid value for '${key}', did you specify it more than once?`, + ); + } + const matcher = Object.prototype.hasOwnProperty.call(matchers, key) + ? matchers[key] + : null; + if (matcher) { + match(matcher, key, value); + } else { + throw new Error( + `Options on line ${lineNumber} of crontab contains unsupported key '${key}'; supported keys are: '${Object.keys( + matchers, + ).join("', '")}'.`, + ); + } + }); + + if (!backfillPeriod) { + backfillPeriod = 0; + } + + return { + options: { backfillPeriod, maxAttempts, queueName, priority }, + identifier, + }; +}; + +const parseCrontabPayload = ( + lineNumber: number, + payloadString: string | undefined, +): any => { + if (!payloadString) { + return null; + } + try { + return JSON5.parse(payloadString); + } catch (e) { + throw new Error( + `Failed to parse JSON5 payload on line ${lineNumber} of crontab: ${e.message}`, + ); + } +}; + +const parseCrontabCommand = ( + lineNumber: number, + command: string, +): Pick => { + const matches = CRONTAB_COMMAND.exec(command); + if (!matches) { + throw new Error( + `Invalid command specification in line ${lineNumber} of crontab.`, + ); + } + const [, task, optionsString, payloadString] = matches; + const { options, identifier = task } = parseCrontabOptions( + lineNumber, + optionsString, + ); + const payload = parseCrontabPayload(lineNumber, payloadString); + return { task, options, payload, identifier }; +}; + +function parseCrontabRanges(matches: string[], source: string) { + const minutes = parseCrontabRange( + `minutes range in ${source}`, + matches[1], + 0, + 59, + ); + const hours = parseCrontabRange( + `hours range in ${source}`, + matches[2], + 0, + 23, + ); + const dates = parseCrontabRange( + `dates range in ${source}`, + matches[3], + 1, + 31, + ); + const months = parseCrontabRange( + `months range in ${source}`, + matches[4], + 1, + 12, + ); + const dows = parseCrontabRange( + `days of week range in ${source}`, + matches[5], + 0, + 6, + true, + ); + return { minutes, hours, dates, months, dows }; +} + +/** + * Parses a line from a crontab file, such as `* * * * * my_task` + */ +export const parseCrontabLine = ( + crontabLine: string, + lineNumber: number, +): ParsedCronItem => { + const matches = CRONTAB_LINE_PARTS.exec(crontabLine); + if (!matches) { + throw new Error( + `Could not process line '${lineNumber}' of crontab: '${crontabLine}'`, + ); + } + const { minutes, hours, dates, months, dows } = parseCrontabRanges( + matches, + `line ${lineNumber} of crontab`, + ); + const { task, options, payload, identifier } = parseCrontabCommand( + lineNumber, + matches[6], + ); + + return { + minutes, + hours, + dates, + months, + dows, + task, + options, + payload, + identifier, + }; +}; + +export const parseCrontab = (crontab: string): Array => { + const lines = crontab.split(/\r?\n/); + const items: ParsedCronItem[] = []; + for ( + let lineNumber = 1, numberOfLines = lines.length; + lineNumber <= numberOfLines; + lineNumber++ + ) { + const line = lines[lineNumber - 1].trim(); + if (line.startsWith("#") || line === "") { + // Ignore comment lines and empty lines + continue; + } + items.push(parseCrontabLine(line, lineNumber)); + } + + // Assert that identifiers are unique + const identifiers = items.map((i) => i.identifier); + identifiers.sort(); + const duplicates = identifiers.filter( + (id, i) => i > 0 && id === identifiers[i - 1], + ); + if (duplicates.length) { + throw new Error( + `Invalid crontab; duplicate identifiers found: '${duplicates.join( + "', '", + )}' - please use '?id=...' to specify unique identifiers for your cron items`, + ); + } + + return items; +}; + +/** + * Parses a list of `CronItem`s into a list of `ParsedCronItem`s, ensuring the + * results comply with all the expectations of the `ParsedCronItem` type + * (including those that cannot be encoded in TypeScript). + */ +export const parseCronItems = (items: CronItem[]): ParsedCronItem[] => { + return items.map( + ( + { + pattern, + task, + options = {} as CronItemOptions, + payload = {}, + identifier = task, + }, + idx, + ) => { + const matches = CRONTAB_TIME_PARTS.exec(pattern); + if (!matches) { + throw new Error( + `Invalid cron pattern '${pattern}' in item ${idx} of parseCronItems call`, + ); + } + const { minutes, hours, dates, months, dows } = parseCrontabRanges( + matches, + `item ${idx} of parseCronItems call`, + ); + const item: ParsedCronItem = { + minutes, + hours, + dates, + months, + dows, + task, + options, + payload, + identifier, + }; + return item; + }, + ); +}; diff --git a/src/getCronItems.ts b/src/getCronItems.ts new file mode 100644 index 00000000..0fc70c20 --- /dev/null +++ b/src/getCronItems.ts @@ -0,0 +1,70 @@ +import * as chokidar from "chokidar"; +import { promises as fsp } from "fs"; + +import { parseCrontab } from "./crontab"; +import { ParsedCronItem, SharedOptions, WatchedCronItems } from "./interfaces"; +import { processSharedOptions } from "./lib"; +import { Logger } from "./logger"; + +async function loadCrontabIntoCronItems( + logger: Logger, + items: Array, + filename: string, +) { + const contents = await fsp.readFile(filename, "utf8").catch((e) => { + if (e.code !== "ENOENT") { + // Only log error if it's not a "file doesn't exist" error + logger.error(`Failed to read crontab file '${filename}': ${e}`); + } + return ""; + }); + if (contents != null) { + const parsed = parseCrontab(contents); + // Overwrite items' contents with the new cron items + items.splice(0, items.length, ...parsed); + } +} + +export default async function getCronItems( + options: SharedOptions, + crontabPath: string, + watch = false, +): Promise { + const { logger } = processSharedOptions(options); + + let watcher: chokidar.FSWatcher | null = null; + const items: Array = []; + + if (watch) { + const watchLogger = logger.scope({ label: "watch" }); + watcher = chokidar + .watch(crontabPath, { ignoreInitial: true }) + .on("all", () => { + loadCrontabIntoCronItems(watchLogger, items, crontabPath).catch( + (error) => { + watchLogger.error(`Error in ${crontabPath}: ${error.message}`, { + crontabPath, + error, + }); + }, + ); + }); + } + + // Try and require it + await loadCrontabIntoCronItems(logger, items, crontabPath); + + let released = false; + return { + items, + release: () => { + if (released) { + return; + } + released = true; + if (watcher) { + watcher.close(); + } + }, + }; +} diff --git a/src/getTasks.ts b/src/getTasks.ts index c44c3d99..6a7353cb 100644 --- a/src/getTasks.ts +++ b/src/getTasks.ts @@ -81,7 +81,7 @@ export default async function getTasks( taskPath: string, watch = false, ): Promise { - const { logger } = await processSharedOptions(options); + const { logger } = processSharedOptions(options); const pathStat = await tryStat(taskPath); if (!pathStat) { throw new Error( @@ -179,9 +179,14 @@ export default async function getTasks( } taskNames = Object.keys(tasks).sort(); + let released = false; return { tasks, release: () => { + if (released) { + return; + } + released = true; watchers.forEach((watcher) => watcher.close()); }, }; diff --git a/src/index.ts b/src/index.ts index 8fc76603..723abf38 100644 --- a/src/index.ts +++ b/src/index.ts @@ -5,5 +5,6 @@ export { runTaskList, runTaskListOnce } from "./main"; export { run, runOnce, runMigrations } from "./runner"; export { makeWorkerUtils, quickAddJob } from "./workerUtils"; export { Logger, LogFunctionFactory, consoleLogFactory } from "./logger"; +export { parseCronItems, parseCrontab } from "./crontab"; export { getTasks }; diff --git a/src/interfaces.ts b/src/interfaces.ts index 09f357c2..931023c7 100644 --- a/src/interfaces.ts +++ b/src/interfaces.ts @@ -162,6 +162,98 @@ export interface WatchedTaskList { release: () => void; } +export interface WatchedCronItems { + items: Array; + release: () => void; +} + +/** + * a.k.a. `opts`, this allows you to change the behaviour when scheduling a cron task. + */ +export interface CronItemOptions { + /** How far back (in milliseconds) should we backfill jobs when worker starts? (Only backfills since when the identifier was first used.) */ + backfillPeriod: number; + + /** Optionally override the default job max_attempts */ + maxAttempts?: number; + + /** Optionally set the job queue_name to enforce that the jobs run serially */ + queueName?: string; + + /** Optionally set the job priority */ + priority?: number; +} + +/** + * A recurring task schedule; this may represent a line in the `crontab` file, + * or may be the result of calling `parseCronItems` on a list of `CronItem`s + * the user has specified. + * + * You should use this as an opaque type; you should **not** read values from + * inside it, and you should not construct it manually, use `parseCrontab` or + * `parseCronItems` instead. The definition of this type may change + * dramatically between minor releases of Graphile Worker, these changes are + * not seen as breaking changes. + * + * @internal + * + * **WARNING**: it is assumed that values of this type adhere to the constraints in + * the comments below (many of these cannot be asserted by TypeScript). If you + * construct this type manually and do not adhere to these constraints then you + * may get unexpected behaviours. Graphile Worker enforces these rules when + * constructing `ParsedCronItem`s internally, you should use the Graphile + * Worker helpers to construct this type. + */ +export interface ParsedCronItem { + /** Minutes (0-59) on which to run the item; must contain unique numbers from the allowed range, ordered ascending. */ + minutes: number[]; + /** Hours (0-23) on which to run the item; must contain unique numbers from the allowed range, ordered ascending. */ + hours: number[]; + /** Dates (1-31) on which to run the item; must contain unique numbers from the allowed range, ordered ascending. */ + dates: number[]; + /** Months (1-12) on which to run the item; must contain unique numbers from the allowed range, ordered ascending. */ + months: number[]; + /** Days of the week (0-6) on which to run the item; must contain unique numbers from the allowed range, ordered ascending. */ + dows: number[]; + + /** The identifier of the task to execute */ + task: string; + + /** Options influencing backfilling and properties of the scheduled job */ + options: CronItemOptions; + + /** A payload object to merge into the default cron payload object for the scheduled job */ + payload: { [key: string]: any }; + + /** An identifier so that we can prevent double-scheduling of a task and determine whether or not to backfill. */ + identifier: string; +} + +/** + * A description of a cron item detailing a task to run, when to run it, and + * any additional options necessary. This is the human-writable form, it must + * be parsed via `parseCronItems` before being fed to a worker. (ParsedCronItem + * has strict rules and should only be constructed via Graphile Worker's + * helpers to ensure compliance.) + */ +export interface CronItem { + /** The identifier of the task to execute */ + task: string; + + /** Cron pattern (e.g. `* * * * *`) to detail when the task should be executed */ + pattern: string; + + /** Options influencing backfilling and properties of the scheduled job */ + options?: CronItemOptions; + + /** A payload object to merge into the default cron payload object for the scheduled job */ + payload?: { [key: string]: any }; + + /** An identifier so that we can prevent double-scheduling of a task and determine whether or not to backfill. */ + identifier?: string; +} + +/** Represents records in the `jobs` table */ export interface Job { id: string; queue_name: string | null; @@ -181,6 +273,13 @@ export interface Job { flags: { [flag: string]: true } | null; } +/** Represents records in the `known_crontabs` table */ +export interface KnownCrontab { + identifier: string; + known_since: Date; + last_execution: Date | null; +} + export interface Worker { nudge: () => boolean; workerId: string; @@ -202,6 +301,11 @@ export interface Runner { events: WorkerEvents; } +export interface Cron { + release(): Promise; + promise: Promise; +} + export interface TaskSpec { /** * The queue to run this task under (only specify if you want jobs in this @@ -348,6 +452,42 @@ export interface RunnerOptions extends WorkerPoolOptions { * Each file in this directory will be used as a task handler */ taskDirectory?: string; + + /** + * A crontab string to use instead of reading a crontab file + */ + crontab?: string; + + /** + * Path to the crontab file. Defaults to `crontab` + */ + crontabFile?: string; + + /** + * Programmatically generated cron items. **BE VERY CAREFUL** if you use this + * manually, there are requirements on this type that TypeScript cannot + * express, and if you don't adhere to them then you'll get unexpected + * behaviours. + */ + parsedCronItems?: Array; +} + +/** Spec for a job created from cron */ +export interface CronJob { + task: string; + payload: { + _cron: { ts: string; backfilled?: boolean }; + [key: string]: unknown; + }; + queueName?: string; + runAt: string; + maxAttempts?: number; + priority?: number; +} + +export interface JobAndCronIdentifier { + job: CronJob; + identifier: string; } export interface WorkerUtilsOptions extends SharedOptions {} @@ -389,7 +529,7 @@ interface TypedEventEmitter /** * These are the events that a worker instance supports. */ -export type WorkerEvents = TypedEventEmitter<{ +export type WorkerEventMap = { /** * When a worker pool is created */ @@ -491,6 +631,62 @@ export type WorkerEvents = TypedEventEmitter<{ */ "job:complete": { worker: Worker; job: Job; error: any }; + /** **Experimental** When the cron starts working (before backfilling) */ + "cron:starting": { cron: Cron; start: Date }; + + /** **Experimental** When the cron starts working (after backfilling completes) */ + "cron:started": { cron: Cron; start: Date }; + + /** **Experimental** When a number of jobs need backfilling for a particular timestamp. */ + "cron:backfill": { + cron: Cron; + itemsToBackfill: JobAndCronIdentifier[]; + timestamp: string; + }; + + /** + * **Experimental** When it seems that time went backwards (e.g. the system + * clock was adjusted) and we try again a little later. + */ + "cron:prematureTimer": { + cron: Cron; + currentTimestamp: number; + expectedTimestamp: number; + }; + + /** + * **Experimental** When it seems that time jumped forwards (e.g. the system + * was overloaded and couldn't fire the timer on time, or perhaps the system + * went to sleep) and we need to catch up. + */ + "cron:overdueTimer": { + cron: Cron; + currentTimestamp: number; + expectedTimestamp: number; + }; + + /** + * **Experimental** When 1 or more cron items match the current timestamp and + * will be scheduled into the database. (Like cron:scheduled but before the + * database write.) + */ + "cron:schedule": { + cron: Cron; + timestamp: number; + jobsAndIdentifiers: JobAndCronIdentifier[]; + }; + + /** + * **Experimental** When 1 or more cron items match the current timestamp and + * were scheduled into the database. (Like cron:schedule but after the + * database write.) + */ + "cron:scheduled": { + cron: Cron; + timestamp: number; + jobsAndIdentifiers: JobAndCronIdentifier[]; + }; + /** * When the runner is terminated by a signal */ @@ -500,4 +696,6 @@ export type WorkerEvents = TypedEventEmitter<{ * When the runner is stopped */ stop: {}; -}>; +}; + +export type WorkerEvents = TypedEventEmitter; diff --git a/src/main.ts b/src/main.ts index 3dbb2942..0547d073 100644 --- a/src/main.ts +++ b/src/main.ts @@ -127,10 +127,12 @@ export function runTaskList( await client.release(); } }; + let active = true; // This is a representation of us that can be interacted with externally const workerPool: WorkerPool = { release: async () => { + active = false; events.emit("pool:release", { pool: this }); unlistenForChanges(); promise.resolve(); @@ -189,6 +191,11 @@ export function runTaskList( client: PoolClient, release: () => void, ) => { + if (!active) { + // We were released, release this new client and abort + release(); + return; + } if (err) { events.emit("pool:listen:error", { workerPool, client, error: err }); logger.error( diff --git a/src/runner.ts b/src/runner.ts index 5cfdf156..27b8d471 100644 --- a/src/runner.ts +++ b/src/runner.ts @@ -1,7 +1,8 @@ import * as assert from "assert"; +import { getParsedCronItemsFromOptions, runCron } from "./cron"; import getTasks from "./getTasks"; -import { Runner, RunnerOptions, TaskList } from "./interfaces"; +import { ParsedCronItem, Runner, RunnerOptions, TaskList } from "./interfaces"; import { getUtilsAndReleasersFromOptions, Releasers } from "./lib"; import { runTaskList, runTaskListOnce } from "./main"; import { migrate } from "./migrate"; @@ -21,23 +22,21 @@ async function assertTaskList( options: RunnerOptions, releasers: Releasers, ): Promise { - let taskList: TaskList; assert( !options.taskDirectory || !options.taskList, "Exactly one of either `taskDirectory` or `taskList` should be set", ); if (options.taskList) { - taskList = options.taskList; + return options.taskList; } else if (options.taskDirectory) { const watchedTasks = await getTasks(options, options.taskDirectory, false); releasers.push(() => watchedTasks.release()); - taskList = watchedTasks.tasks; + return watchedTasks.tasks; } else { throw new Error( "You must specify either `options.taskList` or `options.taskDirectory`", ); } - return taskList; } export const runOnce = async ( @@ -69,6 +68,7 @@ export const runOnce = async ( export const run = async ( options: RunnerOptions, overrideTaskList?: TaskList, + overrideParsedCronItems?: Array, ): Promise => { const { pgPool, @@ -82,6 +82,13 @@ export const run = async ( const taskList = overrideTaskList || (await assertTaskList(options, releasers)); + const parsedCronItems = + overrideParsedCronItems || + (await getParsedCronItemsFromOptions(options, releasers)); + + const cron = runCron(options, parsedCronItems, { pgPool, events }); + releasers.push(() => cron.release()); + const workerPool = runTaskList(options, taskList, pgPool); releasers.push(() => workerPool.release()); diff --git a/yarn.lock b/yarn.lock index cddc803a..0437d6ca 100644 --- a/yarn.lock +++ b/yarn.lock @@ -532,11 +532,21 @@ resolved "https://registry.yarnpkg.com/@types/json-schema/-/json-schema-7.0.4.tgz#38fd73ddfd9b55abb1e1b2ed578cb55bd7b7d339" integrity sha512-8+KAKzEvSUdeo+kmqnKrqgeE+LcA0tjYWFY7RPProVYwnqDjukzO+3b6dLD56rYX5TdWejnEOLJYOIeh4CXKuA== +"@types/json5@^0.0.30": + version "0.0.30" + resolved "https://registry.yarnpkg.com/@types/json5/-/json5-0.0.30.tgz#44cb52f32a809734ca562e685c6473b5754a7818" + integrity sha512-sqm9g7mHlPY/43fcSNrCYfOeX9zkTTK+euO5E6+CVijSMm5tTjkVdwdqRkY3ljjIAf8679vps5jKUoJBCLsMDA== + "@types/node@*": version "13.13.4" resolved "https://registry.yarnpkg.com/@types/node/-/node-13.13.4.tgz#1581d6c16e3d4803eb079c87d4ac893ee7501c2c" integrity sha512-x26ur3dSXgv5AwKS0lNfbjpCakGIduWU1DU91Zz58ONRWrIKGunmZBNv4P7N+e27sJkiGDsw/3fT4AtsqQBrBA== +"@types/node@^14.14.14": + version "14.14.14" + resolved "https://registry.yarnpkg.com/@types/node/-/node-14.14.14.tgz#f7fd5f3cc8521301119f63910f0fb965c7d761ae" + integrity sha512-UHnOPWVWV1z+VV8k6L1HhG7UbGBgIdghqF3l9Ny9ApPghbjICXkUJSd/b9gOgQfjM1r+37cipdw/HJ3F6ICEnQ== + "@types/normalize-package-data@^2.4.0": version "2.4.0" resolved "https://registry.yarnpkg.com/@types/normalize-package-data/-/normalize-package-data-2.4.0.tgz#e486d0d97396d79beedd0a6e33f4534ff6b4973e" @@ -2723,6 +2733,11 @@ jest-snapshot@^25.4.0: pretty-format "^25.4.0" semver "^6.3.0" +jest-time-helpers@^0.1.0: + version "0.1.0" + resolved "https://registry.yarnpkg.com/jest-time-helpers/-/jest-time-helpers-0.1.0.tgz#0d28164b4109035ce5010bfc5375b78700bfe793" + integrity sha512-rj3g5CPey4t1n/6HCEWL5epe7b33bPKurGOFmdDG7A6PoO6jtPfaCWNtRangFUCX7Z9aPr7D6nQfga635j1Fuw== + jest-util@^25.4.0: version "25.4.0" resolved "https://registry.yarnpkg.com/jest-util/-/jest-util-25.4.0.tgz#6a093d09d86d2b41ef583e5fe7dd3976346e1acd" @@ -2854,7 +2869,7 @@ json-stringify-safe@~5.0.1: resolved "https://registry.yarnpkg.com/json-stringify-safe/-/json-stringify-safe-5.0.1.tgz#1296a2d58fd45f19a0f6ce01d65701e2c735b6eb" integrity sha1-Epai1Y/UXxmg9s4B1lcB4sc1tus= -json5@2.x, json5@^2.1.2: +json5@2.x, json5@^2.1.2, json5@^2.1.3: version "2.1.3" resolved "https://registry.yarnpkg.com/json5/-/json5-2.1.3.tgz#c9b0f7fa9233bfe5807fe66fcf3a5617ed597d43" integrity sha512-KXPvOm8K9IJKFM0bmdn8QXh7udDh1g/giieX0NLCaMnb4hEiVFqnop2ImTXCc5e0/oHz3LTqmHGtExn5hfMkOA==