Skip to content

Commit

Permalink
resolve events sql: very minor perf improvement, also improves readab…
Browse files Browse the repository at this point in the history
…ility
  • Loading branch information
Sheraff committed Jun 30, 2024
1 parent 4f58aa9 commit 3ea93fd
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 18 deletions.
29 changes: 14 additions & 15 deletions src/v3/lib/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,16 @@ export class SQLiteStorage implements Storage {
* update all steps that are waiting for an event.
*/
const resolveAllStepEventsStmt = this.#db.prepare<{ queue: string }, {}>(/* sql */ `
WITH base AS (
WITH waiting_steps AS (
SELECT
queue, status, wait_for, wait_filter, wait_from, id
FROM ${stepsTable} step
WHERE step.status = 'waiting'
AND step.queue = @queue
AND step.wait_for IS NOT NULL
AND step.wait_filter IS NOT NULL
),
base AS (
SELECT
step.id AS step_id,
step.wait_for,
Expand All @@ -711,13 +720,9 @@ export class SQLiteStorage implements Storage {
filter.type,
filter.fullKey,
filter.value
FROM ${stepsTable} step
FROM waiting_steps step
INNER JOIN json_tree(step.wait_filter) filter
ON filter.type != 'null'
WHERE step.status = 'waiting'
AND step.queue = @queue
AND step.wait_for IS NOT NULL
AND step.wait_filter IS NOT NULL
),
matches AS (
SELECT
Expand All @@ -731,7 +736,7 @@ export class SQLiteStorage implements Storage {
ON event.queue = @queue
AND event.key = base.wait_for
AND event.created_at >= base.wait_from
AND CASE base.type
WHERE CASE base.type
WHEN 'object' THEN (
json_extract(event.input, base.fullKey) IS NOT NULL
AND json_type(json_extract(event.input, base.fullKey)) = 'object'
Expand All @@ -753,15 +758,9 @@ export class SQLiteStorage implements Storage {
step.id,
matches.key,
matches.data as event_data
FROM ${stepsTable} step
FROM waiting_steps step
LEFT JOIN matches
ON step.id = matches.step_id
WHERE
-- TODO the conditions are repeated, maybe take steps out in their own reused CTE
step.status = 'waiting'
AND step.queue = @queue
AND step.wait_for IS NOT NULL
AND step.wait_filter IS NOT NULL
)
UPDATE ${stepsTable}
SET status = CASE
Expand All @@ -778,7 +777,7 @@ export class SQLiteStorage implements Storage {
results.id = ${stepsTable}.id
AND (
results.key IS NOT NULL
OR wait_from < unixepoch('subsec') - 0.01 -- if no event found, only update 'wait_from' in increments of 10ms minimum
OR wait_from < unixepoch('subsec') - 0.05 -- if no event found, only update 'wait_from' in increments of 50ms minimum
)
RETURNING 1
`)
Expand Down
7 changes: 4 additions & 3 deletions src/v3/tests/benchmark.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,9 @@ test.describe('benchmark', {
}))

const COUNT = 250
for (let i = 0; i < COUNT * 2; i++) {
if (i % 2 === 0)
const pollutionRatio = 2
for (let i = 0; i < COUNT * pollutionRatio; i++) {
if (i % pollutionRatio === 0)
queue.jobs.pollution.dispatch({ i })
else
queue.jobs.hello.dispatch({ i })
Expand All @@ -172,7 +173,7 @@ test.describe('benchmark', {
performance.mark('pipe-end')

const duration = performance.measure('hello', 'pipe-start', 'pipe-end').duration
t.diagnostic(`Many wait for pipe took ${duration.toFixed(2)}ms (< 100ms) for ${COUNT} steps with ${COUNT} unrelated tasks in the database`)
t.diagnostic(`Many wait for pipe took ${duration.toFixed(2)}ms (< 100ms) for ${COUNT} steps with ${COUNT * pollutionRatio - COUNT} unrelated tasks in the database`)
t.diagnostic(`Overall: ${(duration / COUNT).toFixed(2)} ms/step`)

t.diagnostic(`Many wait for pipe result: ${res?.res}`)
Expand Down

0 comments on commit 3ea93fd

Please sign in to comment.