Skip to content

Commit

Permalink
fix some async loops
Browse files Browse the repository at this point in the history
  • Loading branch information
Sheraff committed Jun 27, 2024
1 parent a5296b7 commit 74c073f
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 38 deletions.
4 changes: 2 additions & 2 deletions src/v3/lib/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ export const registration = new AsyncLocalStorage<RegistrationContext>()

export interface ExecutionContext {
run<Out extends Data>(options: RunOptions, fn: () => Out | Promise<Out>): Promise<Out>
sleep(ms: number): Promise<void>
waitFor(instance: Job | Pipe, event: string, options: WaitForOptions<InputData>): Promise<Data>
sleep(ms: number): Promise<void> | void
waitFor(instance: Job | Pipe, event: string, options: WaitForOptions<InputData>): Promise<Data> | void
invoke(job: Job, data: InputData): Promise<Data>
dispatch(instance: Job | Pipe, data: InputData): Promise<void>
cancel(instance: Job, input: InputData, reason: CancelReason): Promise<void>
Expand Down
56 changes: 22 additions & 34 deletions src/v3/lib/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export type RunOptions = {
* Defaults to a list of delays that increase with each attempt: `"100ms", "30s", "2m", "10m", "30m", "1h", "2h", "12h", "1d"`
*/
backoff?: number | Duration | ((attempt: number) => number | Duration) | number[] | Duration[]
// TODO: timeout
// timeout?: number | Duration
// TODO: concurrency
// ...
}
Expand Down Expand Up @@ -317,7 +317,7 @@ export class Job<
}

/** @public */
static sleep(ms: number | Duration): Promise<void> {
static async sleep(ms: number | Duration): Promise<void> {
const e = getExecutionContext()
if (typeof ms === 'string') ms = parseDuration(ms)
return e.sleep(ms)
Expand All @@ -344,7 +344,7 @@ export class Job<
}

/** @public */
static dispatch<I extends Job | Pipe>(instance: I, data: I['in']): Promise<void> {
static async dispatch<I extends Job | Pipe>(instance: I, data: I['in']): Promise<void> {
const e = getExecutionContext()
return e.dispatch(instance, data)
}
Expand Down Expand Up @@ -463,10 +463,9 @@ function makeExecutionContext(registrationContext: RegistrationContext, task: Ta
return i
}

const run: ExecutionContext['run'] = async (options, fn) => {
const run: ExecutionContext['run'] = (options, fn) => {
if (cancelled) {
await Promise.resolve()
throw interrupt
return Promise.reject(interrupt)
}
const index = getIndex(options.id, options[system] ?? false)
const step = `${options[system] ? 'system' : 'user'}/${options.id}#${index}`
Expand All @@ -481,8 +480,7 @@ function makeExecutionContext(registrationContext: RegistrationContext, task: Ta
} else if (entry.status === 'stalled') {
if (entry.sleep_done === null) throw new Error('Sleep step already created, but no duration found')
if (!entry.sleep_done) {
await Promise.resolve()
throw interrupt
return Promise.reject(interrupt)
}
}
}
Expand Down Expand Up @@ -566,17 +564,15 @@ function makeExecutionContext(registrationContext: RegistrationContext, task: Ta
}

if (delegateToNextTick) {
await Promise.resolve() // let parallel tasks resolve too
throw interrupt
return Promise.reject(interrupt) // let parallel tasks resolve too
}
if (syncError) throw syncError
return syncResult
}

const sleep: ExecutionContext['sleep'] = async (ms) => {
const sleep: ExecutionContext['sleep'] = (ms) => {
if (cancelled) {
await Promise.resolve()
throw interrupt
return Promise.reject(interrupt)
}
const index = getIndex('sleep', true)
const step = `system/sleep#${index}`
Expand All @@ -586,8 +582,7 @@ function makeExecutionContext(registrationContext: RegistrationContext, task: Ta
if (entry.sleep_done === null) throw new Error('Sleep step already created, but no duration found')
if (entry.sleep_done) throw new Error('Sleep step already completed')
if (!entry.sleep_done) {
await Promise.resolve()
throw interrupt
return Promise.reject(interrupt)
}
}
const status = ms <= 0 ? 'completed' : 'stalled'
Expand All @@ -601,14 +596,12 @@ function makeExecutionContext(registrationContext: RegistrationContext, task: Ta
if (isPromise(maybePromise)) {
promises.push(maybePromise)
}
await Promise.resolve()
throw interrupt
return Promise.reject(interrupt)
}

const waitFor: ExecutionContext['waitFor'] = async (instance, event, options) => {
const waitFor: ExecutionContext['waitFor'] = (instance, event, options) => {
if (cancelled) {
await Promise.resolve()
throw interrupt
return Promise.reject(interrupt)
}
const name = `waitFor::${instance.type}::${instance.id}::${event}`
const index = getIndex(name, true)
Expand All @@ -623,8 +616,7 @@ function makeExecutionContext(registrationContext: RegistrationContext, task: Ta
if (!entry.data) throw new Error('Step marked as failed in storage, but no error data found')
throw hydrateError(entry.data)
} else if (entry.status === 'waiting') {
await Promise.resolve()
throw interrupt
return Promise.reject(interrupt)
} else {
throw new Error(`Unexpected waitFor step status ${entry.status}`)
}
Expand Down Expand Up @@ -652,16 +644,14 @@ function makeExecutionContext(registrationContext: RegistrationContext, task: Ta
if (isPromise(maybePromise)) {
promises.push(maybePromise)
}
await Promise.resolve()
throw interrupt
return Promise.reject(interrupt)
}

const dispatch: ExecutionContext['dispatch'] = async (instance, data) => {
const dispatch: ExecutionContext['dispatch'] = (instance, data) => {
if (cancelled) {
await Promise.resolve()
throw interrupt
return Promise.reject(interrupt)
}
run({
return run({
id: `dispatch-${instance.type}-${instance.id}`,
[system]: true,
retry: 0,
Expand All @@ -672,8 +662,7 @@ function makeExecutionContext(registrationContext: RegistrationContext, task: Ta

const invoke: ExecutionContext['invoke'] = async (job, input) => {
if (cancelled) {
await Promise.resolve()
throw interrupt
return Promise.reject(interrupt)
}
const promise = waitFor(job, 'settled', { filter: input })
await dispatch(job, input)
Expand All @@ -682,12 +671,11 @@ function makeExecutionContext(registrationContext: RegistrationContext, task: Ta
return result
}

const cancel: ExecutionContext['cancel'] = async (instance, data, reason) => {
const cancel: ExecutionContext['cancel'] = (instance, data, reason) => {
if (cancelled) {
await Promise.resolve()
throw interrupt
return Promise.reject(interrupt)
}
run({
return run({
id: `cancel-${instance.type}-${instance.id}`,
[system]: true,
retry: 0
Expand Down
4 changes: 2 additions & 2 deletions src/v3/tests/benchmark.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ test.describe('benchmark', {
performance.mark('end')
const duration = performance.measure('hello', 'start', 'end').duration
t.diagnostic(`10000 sync steps took ${duration.toFixed(2)}ms (< 150ms)`)
t.diagnostic(`Overall: ${(duration / 10000).toFixed(4)} ms/step`)
t.diagnostic(`Overall: ${(duration / 10).toFixed(2)} µs/step`)
assert(duration < 150, `Benchmark took ${duration.toFixed(2)}ms, expected less than 150ms`)

await queue.close()
Expand All @@ -53,7 +53,7 @@ test.describe('benchmark', {
performance.mark('end')
const duration = performance.measure('hello', 'start', 'end').duration
t.diagnostic(`100 async steps took ${duration.toFixed(2)}ms (< 100ms)`)
t.diagnostic(`Overall: ${(duration / 100).toFixed(4)} ms/step`)
t.diagnostic(`Overall: ${(duration * 10).toFixed(2)} µs/step`)
assert(duration < 100, `Benchmark took ${duration}ms, expected less than 100ms`)

await queue.close()
Expand Down

0 comments on commit 74c073f

Please sign in to comment.