-
Notifications
You must be signed in to change notification settings - Fork 5
Clean up Jobqueues, minor fixes for S3 Queue #451
Conversation
@@ -62,8 +64,10 @@ export class JobQueueBase implements JobQueue { | |||
resumeConsumer(): void | |||
// eslint-disable-next-line @typescript-eslint/require-await | |||
async resumeConsumer(): Promise<void> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resume is called on drain
events emitted by Piscina, which results in multiple calls to sync state. This is okay, as long as dequeue code is idempotent (which it isn't, thus leading to race conditions).
We didn't face this with graphile because the runner object is unchanged over multiple sync state calls.
@@ -77,8 +81,8 @@ export class JobQueueBase implements JobQueue { | |||
clearTimeout(this.timeout) | |||
} | |||
// eslint-disable-next-line @typescript-eslint/await-thenable | |||
const hadSomething = await this.readState() | |||
this.timeout = setTimeout(() => this.syncState(), hadSomething ? 0 : this.intervalSeconds * 1000) | |||
await this.readState() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given existing semantics, readState
is supposed to finish everything required with the jobs seen so far. So, it doesn't make sense to poll again right after, if it found some jobs to process.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea here is that if you have 10k jobs to run, you shouldn't wait 5 sec between them (or between every 100 of them) :). So if we got a job with the last run, immediately see if there's another one. Otherwise wait a few seconds and then try again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, that makes sense... assuming there's batching involved. I haven't seen it so far (am I missing something?) - so didn't assume such a state was possible.
Will revert.
@@ -284,5 +308,44 @@ describe('job queues', () => { | |||
Key: `prefix/2020-01-01/20200101-123456.123Z-deadbeef.json.gz`, | |||
}) | |||
}) | |||
|
|||
test('polls for new jobs', async () => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these tests would fail after 60s (jest timeout) if there's no polling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some thoughts below, still haven't ran it locally
@@ -77,8 +81,8 @@ export class JobQueueBase implements JobQueue { | |||
clearTimeout(this.timeout) | |||
} | |||
// eslint-disable-next-line @typescript-eslint/await-thenable | |||
const hadSomething = await this.readState() | |||
this.timeout = setTimeout(() => this.syncState(), hadSomething ? 0 : this.intervalSeconds * 1000) | |||
await this.readState() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea here is that if you have 10k jobs to run, you shouldn't wait 5 sec between them (or between every 100 of them) :). So if we got a job with the last run, immediately see if there's another one. Otherwise wait a few seconds and then try again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These changes make sense
…/plugin-server#451) * fix s3 consumer, cleanup graphile queue * tests * address comments
Changes
Ensures S3 queue consumer runs.
Makes GraphileQueue adhere to JobQueueBase.
Checklist