From 99fbf1d3012c48d67efa96de7a9303afd6b22a87 Mon Sep 17 00:00:00 2001 From: Jason Kuhrt Date: Wed, 5 Jun 2024 00:32:47 -0400 Subject: [PATCH 1/5] feat: anyware hook retries --- src/lib/anyware/main.test.ts | 26 ++++++++++- src/lib/anyware/main.ts | 86 ++++++++++++++++++++++++++++++------ src/lib/prelude.ts | 8 +++- 3 files changed, 105 insertions(+), 15 deletions(-) diff --git a/src/lib/anyware/main.test.ts b/src/lib/anyware/main.test.ts index 6184a8090..0c433659f 100644 --- a/src/lib/anyware/main.test.ts +++ b/src/lib/anyware/main.test.ts @@ -203,7 +203,7 @@ describe(`errors`, () => { `) }) - test(`implementation throws`, async () => { + test(`if implementation fails, without extensions, result is the error`, async () => { core.hooks.a.mockReset().mockRejectedValueOnce(oops) const result = await run() as ContextualError expect({ @@ -221,4 +221,28 @@ describe(`errors`, () => { } `) }) + test('calling a hook twice leads to clear error', async () => { + const result = await run(async ({ a }) => { + a() + a() + }) as ContextualError + const cause = result.cause as ContextualError + expect(cause.message).toMatchInlineSnapshot( + `"You already invoked hook "a". Hooks can only be invoked multiple times if the previous attempt failed."`, + ) + expect(cause.context).toEqual({ hookName: 'a' }) + }) + test('if implementation fails, hook results in an error', async () => { + core.hooks.a.mockReset().mockRejectedValueOnce(oops).mockResolvedValueOnce(1) + const result = await run(async ({ a }) => { + console.log('pre result1') + const result1 = await a() + expect(result1).toEqual(oops) + const result2 = await a() + expect(typeof result2.b).toEqual('function') + expect(result2.b.input).toEqual(1) + return result2.b.input + }) + expect(result).toEqual(1) + }) }) diff --git a/src/lib/anyware/main.ts b/src/lib/anyware/main.ts index 7a0cb55db..69c0520f7 100644 --- a/src/lib/anyware/main.ts +++ b/src/lib/anyware/main.ts @@ -132,6 +132,19 @@ type HookDoneData = type HookDoneResolver = (input: HookDoneData) => void +// const runHookErroring = async <$HookName extends string>( +// { core, error, name, done, originalInput, currentHookStack, nextHookStack }: { +// core: Core +// error: Error +// name: $HookName +// done: HookDoneResolver +// originalInput: unknown +// currentHookStack: Extension[] +// nextHookStack: Extension[] +// }, +// ) => { +// } + const runHook = async <$HookName extends string>( { core, name, done, originalInput, currentHookStack, nextHookStack }: { core: Core @@ -149,31 +162,69 @@ const runHook = async <$HookName extends string>( if (pausedExtension) { const hookUsedDeferred = createDeferred() + let previousAttemptErrored = false - debug(`${name}: extension ${pausedExtension.name}`) + debug(`${name}: ${pausedExtension.name}: start`) // The extension is responsible for calling the next hook. // If no input is passed that means use the original input. const hook = createHook(originalInput, (maybeNextOriginalInput?: object) => { // Once called, the extension is paused again and we continue down the current hook stack. - hookUsedDeferred.resolve(true) - debug(`${name}: ${pausedExtension.name}: pause`) + const nextPausedExtension: Extension = { ...pausedExtension, currentChunk: createDeferred(), } const nextNextHookStack = [...nextHookStack, nextPausedExtension] // tempting to mutate here but simpler to think about as copy. - void runHook({ - core, - name, - done, - originalInput: maybeNextOriginalInput ?? originalInput, - currentHookStack: nextCurrentHookStack, - nextHookStack: nextNextHookStack, - }) + if (hookUsedDeferred.isResolved()) { + if (previousAttemptErrored) { + const d = createDeferred() + debug(`${name}: ${pausedExtension.name}: retry`) + void runHook({ + core, + name, + done, + originalInput: maybeNextOriginalInput ?? originalInput, + // currentHookStack, + currentHookStack: [{ + ...pausedExtension, + currentChunk: d, + }, ...nextCurrentHookStack], + nextHookStack: nextNextHookStack, + }) + // automate/forward the retry + void d.promise.then(envelope => envelope[name](maybeNextOriginalInput ?? originalInput)) + return nextPausedExtension.currentChunk.promise + } else { + throw new ContextualError( + `You already invoked hook "${name}". Hooks can only be invoked multiple times if the previous attempt failed.`, + { + hookName: name, + }, + ) + } + } else { + hookUsedDeferred.resolve(true) + + void runHook({ + core, + name, + done, + originalInput: maybeNextOriginalInput ?? originalInput, + currentHookStack: nextCurrentHookStack, + nextHookStack: nextNextHookStack, + }) - return nextPausedExtension.currentChunk.promise + return nextPausedExtension.currentChunk.promise.then(_ => { + // console.log({ _ }) + if (_ instanceof Error) { + console.log(`chunk resolved with error`) + previousAttemptErrored = true + } + return _ + }) + } }) // The extension is resumed. It is responsible for calling the next hook. @@ -190,6 +241,7 @@ const runHook = async <$HookName extends string>( hookUsedDeferred.promise.then(result => { return { branch: `hook`, result } as const }).catch((e: unknown) => ({ branch: `hookError`, result: e } as const)), + // rename branch to "extension" pausedExtension.body.promise.then(result => { return { branch: `body`, result } as const }).catch((e: unknown) => ({ branch: `bodyError`, result: e } as const)), @@ -223,6 +275,7 @@ const runHook = async <$HookName extends string>( return } case `hookError`: + // todo rename source to "hook" done({ type: `error`, hookName: name, source: `implementation`, error: errorFromMaybeError(result) }) return case `hook`: { @@ -238,6 +291,7 @@ const runHook = async <$HookName extends string>( // --------------------------- // Run core to get result + debug(`${name}: run core`) const implementation = core.hooks[name] if (!implementation) { @@ -247,7 +301,13 @@ const runHook = async <$HookName extends string>( try { result = await implementation(originalInput as any) } catch (error) { - done({ type: `error`, hookName: name, source: `implementation`, error: errorFromMaybeError(error) }) + if (nextHookStack.length) { + debug(`${name}: hook error`) + // send back up the stack + nextHookStack[0]?.currentChunk.resolve(error) + } else { + done({ type: `error`, hookName: name, source: `implementation`, error: errorFromMaybeError(error) }) + } return } diff --git a/src/lib/prelude.ts b/src/lib/prelude.ts index 3ad6f4f3d..3d2e187b5 100644 --- a/src/lib/prelude.ts +++ b/src/lib/prelude.ts @@ -245,11 +245,13 @@ export type SomeMaybeAsyncFunction = (...args: unknown[]) => MaybePromise = { promise: Promise + isResolved: () => boolean resolve: (value: T) => void reject: (error: unknown) => void } export const createDeferred = <$T>(): Deferred<$T> => { + let isResolved = false let resolve: (value: $T) => void let reject: (error: unknown) => void @@ -260,7 +262,11 @@ export const createDeferred = <$T>(): Deferred<$T> => { return { promise, - resolve: (value) => resolve(value), + isResolved: () => isResolved, + resolve: (value) => { + isResolved = true + resolve(value) + }, reject: (error) => reject(error), } } From 10a6d3a218be9fd13ec5a13e9bb028cd4c416b7e Mon Sep 17 00:00:00 2001 From: Jason Kuhrt Date: Thu, 6 Jun 2024 20:43:36 -0400 Subject: [PATCH 2/5] progress --- src/lib/anyware/main.test.ts | 17 +- src/lib/anyware/main.ts | 293 +-------------------------------- src/lib/anyware/runHook.ts | 236 ++++++++++++++++++++++++++ src/lib/anyware/runPipeline.ts | 170 +++++++++++++++++++ src/lib/prelude.ts | 4 + 5 files changed, 434 insertions(+), 286 deletions(-) create mode 100644 src/lib/anyware/runHook.ts create mode 100644 src/lib/anyware/runPipeline.ts diff --git a/src/lib/anyware/main.test.ts b/src/lib/anyware/main.test.ts index 0c433659f..06de4ca28 100644 --- a/src/lib/anyware/main.test.ts +++ b/src/lib/anyware/main.test.ts @@ -1,5 +1,6 @@ /* eslint-disable */ +import console from 'node:console' import { describe, expect, test, vi } from 'vitest' import type { ContextualError } from '../errors/ContextualError.js' import { core, initialInput, oops, run, runWithOptions } from './specHelpers.js' @@ -232,10 +233,9 @@ describe(`errors`, () => { ) expect(cause.context).toEqual({ hookName: 'a' }) }) - test('if implementation fails, hook results in an error', async () => { + test.only('if hook fails, extension can retry, then short-circuit', async () => { core.hooks.a.mockReset().mockRejectedValueOnce(oops).mockResolvedValueOnce(1) const result = await run(async ({ a }) => { - console.log('pre result1') const result1 = await a() expect(result1).toEqual(oops) const result2 = await a() @@ -245,4 +245,17 @@ describe(`errors`, () => { }) expect(result).toEqual(1) }) + test.skip('if hook fails, extension can retry it, then continue to next hook.', async () => { + core.hooks.a.mockReset().mockRejectedValueOnce(oops).mockResolvedValueOnce(1) + const result = await run( + async function foo({ a }) { + const resultA1 = await a() + const resultA2 = await a() + const resultB1 = await resultA2.b() + return resultB1 + }, + ) + console.log(result) + expect(result).toEqual(1) + }) }) diff --git a/src/lib/anyware/main.ts b/src/lib/anyware/main.ts index 69c0520f7..9f4bf55e1 100644 --- a/src/lib/anyware/main.ts +++ b/src/lib/anyware/main.ts @@ -1,9 +1,9 @@ import { Errors } from '../errors/__.js' import { partitionAndAggregateErrors } from '../errors/ContextualAggregateError.js' -import { ContextualError } from '../errors/ContextualError.js' import type { Deferred, FindValueAfter, IsLastValue, MaybePromise } from '../prelude.js' -import { casesExhausted, createDeferred, debug, errorFromMaybeError } from '../prelude.js' +import { casesExhausted, createDeferred } from '../prelude.js' import { getEntrypoint } from './getEntrypoint.js' +import { runPipeline } from './runPipeline.js' type HookSequence = readonly [string, ...string[]] @@ -33,7 +33,7 @@ type CoreInitialInput<$Core extends Core> = const PrivateTypesSymbol = Symbol(`private`) -type PrivateTypesSymbol = typeof PrivateTypesSymbol +export type PrivateTypesSymbol = typeof PrivateTypesSymbol const hookSymbol = Symbol(`hook`) @@ -104,17 +104,7 @@ export type Core< export type HookName = string -const createHook = <$X, $F extends (...args: any[]) => any>( - originalInput: $X, - fn: $F, -): $F & { input: $X } => { - // @ts-expect-error - fn.input = originalInput - // @ts-expect-error - return fn -} - -type Extension = { +export type Extension = { name: string entrypoint: string body: Deferred @@ -124,199 +114,6 @@ type Extension = { // export type ExtensionInput<$Input extends object = object> = (input: $Input) => MaybePromise export type ExtensionInput<$Input extends object = any> = (input: $Input) => MaybePromise -type HookDoneData = - | { type: 'completed'; result: unknown; nextHookStack: Extension[] } - | { type: 'shortCircuited'; result: unknown } - | { type: 'error'; hookName: string; source: 'implementation'; error: Error } - | { type: 'error'; hookName: string; source: 'extension'; error: Error; extensionName: string } - -type HookDoneResolver = (input: HookDoneData) => void - -// const runHookErroring = async <$HookName extends string>( -// { core, error, name, done, originalInput, currentHookStack, nextHookStack }: { -// core: Core -// error: Error -// name: $HookName -// done: HookDoneResolver -// originalInput: unknown -// currentHookStack: Extension[] -// nextHookStack: Extension[] -// }, -// ) => { -// } - -const runHook = async <$HookName extends string>( - { core, name, done, originalInput, currentHookStack, nextHookStack }: { - core: Core - name: $HookName - done: HookDoneResolver - originalInput: unknown - currentHookStack: Extension[] - nextHookStack: Extension[] - }, -) => { - const [pausedExtension, ...nextCurrentHookStack] = currentHookStack - - // Going down the stack - // -------------------- - - if (pausedExtension) { - const hookUsedDeferred = createDeferred() - let previousAttemptErrored = false - - debug(`${name}: ${pausedExtension.name}: start`) - // The extension is responsible for calling the next hook. - // If no input is passed that means use the original input. - const hook = createHook(originalInput, (maybeNextOriginalInput?: object) => { - // Once called, the extension is paused again and we continue down the current hook stack. - debug(`${name}: ${pausedExtension.name}: pause`) - - const nextPausedExtension: Extension = { - ...pausedExtension, - currentChunk: createDeferred(), - } - const nextNextHookStack = [...nextHookStack, nextPausedExtension] // tempting to mutate here but simpler to think about as copy. - - if (hookUsedDeferred.isResolved()) { - if (previousAttemptErrored) { - const d = createDeferred() - debug(`${name}: ${pausedExtension.name}: retry`) - void runHook({ - core, - name, - done, - originalInput: maybeNextOriginalInput ?? originalInput, - // currentHookStack, - currentHookStack: [{ - ...pausedExtension, - currentChunk: d, - }, ...nextCurrentHookStack], - nextHookStack: nextNextHookStack, - }) - // automate/forward the retry - void d.promise.then(envelope => envelope[name](maybeNextOriginalInput ?? originalInput)) - return nextPausedExtension.currentChunk.promise - } else { - throw new ContextualError( - `You already invoked hook "${name}". Hooks can only be invoked multiple times if the previous attempt failed.`, - { - hookName: name, - }, - ) - } - } else { - hookUsedDeferred.resolve(true) - - void runHook({ - core, - name, - done, - originalInput: maybeNextOriginalInput ?? originalInput, - currentHookStack: nextCurrentHookStack, - nextHookStack: nextNextHookStack, - }) - - return nextPausedExtension.currentChunk.promise.then(_ => { - // console.log({ _ }) - if (_ instanceof Error) { - console.log(`chunk resolved with error`) - previousAttemptErrored = true - } - return _ - }) - } - }) - - // The extension is resumed. It is responsible for calling the next hook. - - debug(`${name}: ${pausedExtension.name}: resume`) - const envelope = { [name]: hook } - pausedExtension.currentChunk.resolve(envelope) - - // If the extension does not return, it wants to tap into more hooks. - // If the extension returns the hook envelope, it wants the rest of the pipeline - // to pass through it. - // If the extension returns a non-hook-envelope value, it wants to short-circuit the pipeline. - const { branch, result } = await Promise.race([ - hookUsedDeferred.promise.then(result => { - return { branch: `hook`, result } as const - }).catch((e: unknown) => ({ branch: `hookError`, result: e } as const)), - // rename branch to "extension" - pausedExtension.body.promise.then(result => { - return { branch: `body`, result } as const - }).catch((e: unknown) => ({ branch: `bodyError`, result: e } as const)), - ]) - - debug(`${name}: ${pausedExtension.name}: branch`, branch) - switch (branch) { - case `body`: { - if (result === envelope) { - void runHook({ - core, - name, - done, - originalInput, - currentHookStack: nextCurrentHookStack, - nextHookStack, - }) - } else { - done({ type: `shortCircuited`, result }) - } - return - } - case `bodyError`: { - done({ - type: `error`, - hookName: name, - source: `extension`, - error: errorFromMaybeError(result), - extensionName: pausedExtension.name, - }) - return - } - case `hookError`: - // todo rename source to "hook" - done({ type: `error`, hookName: name, source: `implementation`, error: errorFromMaybeError(result) }) - return - case `hook`: { - // do nothing, hook is making the processing continue. - return - } - default: - throw casesExhausted(branch) - } - } - - // Reached bottom of the stack - // --------------------------- - - // Run core to get result - debug(`${name}: run core`) - - const implementation = core.hooks[name] - if (!implementation) { - throw new Errors.ContextualError(`Implementation not found for hook name ${name}`, { hookName: name }) - } - let result - try { - result = await implementation(originalInput as any) - } catch (error) { - if (nextHookStack.length) { - debug(`${name}: hook error`) - // send back up the stack - nextHookStack[0]?.currentChunk.resolve(error) - } else { - done({ type: `error`, hookName: name, source: `implementation`, error: errorFromMaybeError(error) }) - } - return - } - - // Return to root with the next result and hook stack - - done({ type: `completed`, result, nextHookStack }) - return -} - const ResultEnvelopeSymbol = Symbol(`resultEnvelope`) type ResultEnvelopeSymbol = typeof ResultEnvelopeSymbol @@ -326,84 +123,11 @@ export type ResultEnvelop = { result: T } -const createResultEnvelope = (result: T): ResultEnvelop => ({ +export const createResultEnvelope = (result: T): ResultEnvelop => ({ [ResultEnvelopeSymbol]: ResultEnvelopeSymbol, result, }) -const run = async ( - { core, initialInput, initialHookStack }: { core: Core; initialInput: unknown; initialHookStack: Extension[] }, -): Promise | ContextualError> => { - let currentInput = initialInput - let currentHookStack = initialHookStack - - for (const hookName of core.hookNamesOrderedBySequence) { - debug(`running hook`, hookName) - const doneDeferred = createDeferred() - void runHook({ - core, - name: hookName, - done: doneDeferred.resolve, - originalInput: currentInput, - currentHookStack, - nextHookStack: [], - }) - - const signal = await doneDeferred.promise - - switch (signal.type) { - case `completed`: { - const { result, nextHookStack } = signal - currentInput = result - currentHookStack = nextHookStack - break - } - case `shortCircuited`: { - debug(`signal: shortCircuited`) - const { result } = signal - return createResultEnvelope(result) - } - case `error`: { - debug(`signal: error`) - // todo type test for this possible return value - switch (signal.source) { - case `extension`: { - const nameTip = signal.extensionName ? ` (use named functions to improve this error message)` : `` - const message = - `There was an error in the extension "${signal.extensionName}"${nameTip} while running hook "${signal.hookName}".` - return new ContextualError(message, { - hookName: signal.hookName, - source: signal.source, - extensionName: signal.extensionName, - }, signal.error) - } - case `implementation`: { - const message = `There was an error in the core implementation of hook "${signal.hookName}".` - return new ContextualError(message, { hookName: signal.hookName, source: signal.source }, signal.error) - } - default: - throw casesExhausted(signal) - } - } - default: - throw casesExhausted(signal) - } - } - - debug(`ending`) - - let currentResult = currentInput - for (const hook of currentHookStack) { - debug(`end: ${hook.name}`) - hook.currentChunk.resolve(currentResult) - currentResult = await hook.body.promise - } - - debug(`returning`) - - return createResultEnvelope(currentResult) // last loop result -} - const createPassthrough = (hookName: string) => async (hookEnvelope: SomeHookEnvelope) => { const hook = hookEnvelope[hookName] if (!hook) { @@ -462,11 +186,12 @@ export const create = < return error } - const result = await run({ + const result = await runPipeline({ core, - initialInput, + hookNamesOrderedBySequence: core.hookNamesOrderedBySequence, + originalInput: initialInput, // @ts-expect-error fixme - initialHookStack, + extensionsStack: initialHookStack, }) if (result instanceof Error) return result diff --git a/src/lib/anyware/runHook.ts b/src/lib/anyware/runHook.ts new file mode 100644 index 000000000..51322e714 --- /dev/null +++ b/src/lib/anyware/runHook.ts @@ -0,0 +1,236 @@ +import { Errors } from '../errors/__.js' +import { casesExhausted, createDeferred, debug, debugSub, errorFromMaybeError } from '../prelude.js' +import type { Core, Extension } from './main.js' + +type HookDoneResolver = (input: HookDoneData) => void + +export type HookDoneData = + | { type: 'completed'; result: unknown; nextExtensionsStack: Extension[] } + | { type: 'shortCircuited'; result: unknown } + | { type: 'error'; hookName: string; source: 'implementation'; error: Error } + | { type: 'error'; hookName: string; source: 'extension'; error: Error; extensionName: string } + +type Input = { + core: Core + name: string + done: HookDoneResolver + originalInput: unknown + isRetry?: boolean + /** + * The extensions that are at this hook awaiting. + */ + extensionsStack: readonly Extension[] + /** + * The extensions that have advanced past this hook, to their next hook, + * and are now awaiting. + * + * @remarks every extension popped off the stack is added here (except those + * that short-circuit the pipeline or enter passthrough mode). + */ + nextExtensionsStack: readonly Extension[] +} + +export const runHook = async ( + { core, name, done, originalInput, extensionsStack, nextExtensionsStack, isRetry }: Input, +) => { + const debugHook = debugSub(`hook ${name}:`) + + debugHook(`advance to next extension`) + + const [extension, ...extensionsStackRest] = extensionsStack + + /** + * If extension is defined then that means there + * are still extensions to run for this hook. + * + * Otherwise we can run the core implementation. + */ + + if (extension) { + const debugExtension = debugSub(`hook ${name}: extension ${extension.name}:`) + const hookInvokedDeferred = createDeferred() + let previousAttemptErrored = false + + debugExtension(`start`) + // The extension is responsible for calling the next hook. + // If no input is passed that means use the original input. + const hook = createHook(originalInput, (maybeNextOriginalInput?: object) => { + // Once called, the extension is paused again and we continue down the current hook stack. + debugExtension(`extension runs this hook from envelope`) + + const inputResolved = maybeNextOriginalInput ?? originalInput + + if (hookInvokedDeferred.isResolved()) { + if (previousAttemptErrored) { + const d = createDeferred() + const extensionRetry = { + ...extension, + currentChunk: d, + } + // automate/forward the retry + void d.promise.then(envelope => envelope[name](maybeNextOriginalInput ?? originalInput)) + const currentHookStack = [extensionRetry, ...extensionsStackRest] + + const extensionWithNextChunk: Extension = { + ...extension, + currentChunk: createDeferred(), + } + const nextNextHookStack = [...nextExtensionsStack, extensionWithNextChunk] // tempting to mutate here but simpler to think about as copy. + // debug(1, nextHookStack) + debug(`${name}: ${extension.name}: execute branch: retry`) + void runHook({ + core, + name, + done, + originalInput: inputResolved, + extensionsStack, + nextExtensionsStack: nextNextHookStack, + // isRetry: true, + }) + return // extensionWithNextChunk.currentChunk.promise + } else { + throw new Errors.ContextualError( + `You already invoked hook "${name}". Hooks can only be invoked multiple times if the previous attempt failed.`, + { + hookName: name, + }, + ) + } + } else { + let extensionWithNextChunk: Extension + let nextNextHookStack: Extension[] + if (isRetry) { + nextNextHookStack = nextExtensionsStack + extensionWithNextChunk = nextNextHookStack[nextNextHookStack.length - 1]! + debug(`is-retry`) + } else { + extensionWithNextChunk = { + ...extension, + currentChunk: createDeferred(), + } + nextNextHookStack = [...nextExtensionsStack, extensionWithNextChunk] // tempting to mutate here but simpler to think about as copy. + } + + hookInvokedDeferred.resolve(true) + + void runHook({ + core, + name, + done, + originalInput: inputResolved, + extensionsStack: extensionsStackRest, + nextExtensionsStack: nextNextHookStack, + }) + + return extensionWithNextChunk.currentChunk.promise.then(_ => { + if (_ instanceof Error) { + debugExtension(`received hook error`) + previousAttemptErrored = true + } + return _ + }) + } + }) + + // The extension is resumed. It is responsible for calling the next hook. + + debug(`${name}: ${extension.name}: advance with envelope`) + const envelope = { + [name]: hook, + } + extension.currentChunk.resolve(envelope) + + // If the extension does not return, it wants to tap into more hooks. + // If the extension returns the hook envelope, it wants the rest of the pipeline + // to pass through it. + // If the extension returns a non-hook-envelope value, it wants to short-circuit the pipeline. + const { branch, result } = await Promise.race([ + hookInvokedDeferred.promise.then(result => { + return { branch: `hookInvoked`, result } as const + }).catch((e: unknown) => ({ branch: `hookInvokedButThrew`, result: e } as const)), + // rename branch to "extension" + extension.body.promise.then(result => { + return { branch: `extensionReturned`, result } as const + }).catch((e: unknown) => ({ branch: `extensionThrew`, result: e } as const)), + ]) + + switch (branch) { + case `hookInvoked`: { + debug(`hookInvoked`) + // do nothing, hook is making the processing continue. + return + } + case `extensionReturned`: { + debug(`${name}: ${extension.name}: extension returned`) + if (result === envelope) { + void runHook({ + core, + name, + done, + originalInput, + extensionsStack: extensionsStackRest, + nextExtensionsStack, + }) + } else { + done({ type: `shortCircuited`, result }) + } + return + } + case `extensionThrew`: { + debug(`${name}: ${extension.name}: extension threw`) + done({ + type: `error`, + hookName: name, + source: `extension`, + error: errorFromMaybeError(result), + extensionName: extension.name, + }) + return + } + case `hookInvokedButThrew`: + debug(`${name}: ${extension.name}: hook error`) + // todo rename source to "hook" + done({ type: `error`, hookName: name, source: `implementation`, error: errorFromMaybeError(result) }) + return + default: + throw casesExhausted(branch) + } + } /* reached core for this hook */ else { + debugHook(`no more extensions to advance, run implementation`) + + const implementation = core.hooks[name] + if (!implementation) { + throw new Errors.ContextualError(`Implementation not found for hook name ${name}`, { hookName: name }) + } + + let result + try { + result = await implementation(originalInput as any) + } catch (error) { + if (nextExtensionsStack.length) { + debugHook(`implementation error`) + // send back up the stack + nextExtensionsStack[nextExtensionsStack.length - 1]!.currentChunk.resolve(error) + } else { + done({ type: `error`, hookName: name, source: `implementation`, error: errorFromMaybeError(error) }) + } + return + } + + // Return to root with the next result and hook stack + + debugHook(`completed`) + + done({ type: `completed`, result, nextExtensionsStack }) + } +} + +const createHook = <$X, $F extends (...args: any[]) => any>( + originalInput: $X, + fn: $F, +): $F & { input: $X } => { + // @ts-expect-error + fn.input = originalInput + // @ts-expect-error + return fn +} diff --git a/src/lib/anyware/runPipeline.ts b/src/lib/anyware/runPipeline.ts new file mode 100644 index 000000000..63dc9ffb7 --- /dev/null +++ b/src/lib/anyware/runPipeline.ts @@ -0,0 +1,170 @@ +import type { Errors } from '../errors/__.js' +import { ContextualError } from '../errors/ContextualError.js' +import { casesExhausted, createDeferred, debug } from '../prelude.js' +import type { Core, Extension, PrivateTypesSymbol, ResultEnvelop } from './main.js' +import { createResultEnvelope } from './main.js' +import type { HookDoneData } from './runHook.js' +import { runHook } from './runHook.js' + +export const runPipeline = async ( + { core, hookNamesOrderedBySequence, originalInput, extensionsStack }: { + core: Core + hookNamesOrderedBySequence: readonly string[] + originalInput: unknown + extensionsStack: readonly Extension[] + }, +): Promise | Errors.ContextualError> => { + const [hookName, ...hookNamesRest] = hookNamesOrderedBySequence + + if (!hookName) { + debug(`pipeline: ending`) + const result = await runPipelineEnd({ extensionsStack, result: originalInput }) + debug(`pipeline: returning`) + return createResultEnvelope(result) + } + + debug(`hook ${hookName}: start`) + + const doneDeferred = createDeferred() + + void runHook({ + core, + name: hookName, + done: doneDeferred.resolve, + originalInput, + extensionsStack, + nextExtensionsStack: [], + }) + + const signal = await doneDeferred.promise + + switch (signal.type) { + case `completed`: { + const { result, nextExtensionsStack } = signal + return await runPipeline({ + core, + hookNamesOrderedBySequence: hookNamesRest, + originalInput: result, + extensionsStack: nextExtensionsStack, + }) + } + case `shortCircuited`: { + debug(`signal: shortCircuited`) + const { result } = signal + return createResultEnvelope(result) + } + case `error`: { + debug(`signal: error`) + // todo type test for this possible return value + switch (signal.source) { + case `extension`: { + const nameTip = signal.extensionName ? ` (use named functions to improve this error message)` : `` + const message = + `There was an error in the extension "${signal.extensionName}"${nameTip} while running hook "${signal.hookName}".` + return new ContextualError(message, { + hookName: signal.hookName, + source: signal.source, + extensionName: signal.extensionName, + }, signal.error) + } + case `implementation`: { + const message = `There was an error in the core implementation of hook "${signal.hookName}".` + return new ContextualError(message, { hookName: signal.hookName, source: signal.source }, signal.error) + } + default: + throw casesExhausted(signal) + } + } + default: + throw casesExhausted(signal) + } +} + +const runPipelineEnd = async ({ + extensionsStack, + result, +}: { result: unknown; extensionsStack: readonly Extension[] }): Promise => { + const [extension, ...extensionsRest] = extensionsStack + if (!extension) return result + + debug(`extension ${extension.name}: end`) + extension.currentChunk.resolve(result) + const nextResult = await extension.body.promise + return await runPipelineEnd({ + extensionsStack: extensionsRest, + result: nextResult, + }) +} + +// const runPipelineOld = async ( +// { core, initialInput, initialHookStack }: { core: Core; initialInput: unknown; initialHookStack: Extension[] }, +// ): Promise | ContextualError> => { +// let currentInput = initialInput +// let currentHookStack = initialHookStack + +// for (const hookName of core.hookNamesOrderedBySequence) { +// debug(`hook ${hookName}: start`) +// const doneDeferred = createDeferred() +// void runHook({ +// core, +// name: hookName, +// done: doneDeferred.resolve, +// originalInput: currentInput, +// extensionsStack: currentHookStack, +// nextExtensionsStack: [], +// }) + +// const signal = await doneDeferred.promise + +// switch (signal.type) { +// case `completed`: { +// const { result, nextExtensionsStack } = signal +// currentInput = result +// currentHookStack = nextExtensionsStack +// break +// } +// case `shortCircuited`: { +// debug(`signal: shortCircuited`) +// const { result } = signal +// return createResultEnvelope(result) +// } +// case `error`: { +// debug(`signal: error`) +// // todo type test for this possible return value +// switch (signal.source) { +// case `extension`: { +// const nameTip = signal.extensionName ? ` (use named functions to improve this error message)` : `` +// const message = +// `There was an error in the extension "${signal.extensionName}"${nameTip} while running hook "${signal.hookName}".` +// return new ContextualError(message, { +// hookName: signal.hookName, +// source: signal.source, +// extensionName: signal.extensionName, +// }, signal.error) +// } +// case `implementation`: { +// const message = `There was an error in the core implementation of hook "${signal.hookName}".` +// return new ContextualError(message, { hookName: signal.hookName, source: signal.source }, signal.error) +// } +// default: +// throw casesExhausted(signal) +// } +// } +// default: +// throw casesExhausted(signal) +// } +// } + +// debug(`pipeline: ending`) + +// let currentResult = currentInput +// for (const extension of currentHookStack) { +// debug(`extension ${extension.name}: end`) +// extension.currentChunk.resolve(currentResult) +// currentResult = await extension.body.promise +// } + +// debug(`pipeline: returning`) + +// return createResultEnvelope(currentResult) // last loop result +// } diff --git a/src/lib/prelude.ts b/src/lib/prelude.ts index 3d2e187b5..ca788f368 100644 --- a/src/lib/prelude.ts +++ b/src/lib/prelude.ts @@ -277,6 +277,10 @@ export const debug = (...args: any[]) => { } } +export const debugSub = (...args: any[]) => (...subArgs: any[]) => { + debug(...args, ...subArgs) +} + export type PlusOneUpToTen = n extends 0 ? 1 : n extends 1 ? 2 : n extends 2 ? 3 From 06e6786438a42048c3730ac1f015b36ee13d9898 Mon Sep 17 00:00:00 2001 From: Jason Kuhrt Date: Sat, 8 Jun 2024 21:30:00 -0400 Subject: [PATCH 3/5] work --- src/lib/anyware/lib.ts | 1 + src/lib/anyware/main.test.ts | 91 +++++++++++++---- src/lib/anyware/main.ts | 57 ++++++++--- src/lib/anyware/runHook.ts | 179 ++++++++++++++++++++------------- src/lib/anyware/runPipeline.ts | 109 +++++--------------- src/lib/prelude.ts | 5 +- 6 files changed, 255 insertions(+), 187 deletions(-) create mode 100644 src/lib/anyware/lib.ts diff --git a/src/lib/anyware/lib.ts b/src/lib/anyware/lib.ts new file mode 100644 index 000000000..48fadee4a --- /dev/null +++ b/src/lib/anyware/lib.ts @@ -0,0 +1 @@ +export const defaultFunctionName = `anonymous` diff --git a/src/lib/anyware/main.test.ts b/src/lib/anyware/main.test.ts index 06de4ca28..8be1e4642 100644 --- a/src/lib/anyware/main.test.ts +++ b/src/lib/anyware/main.test.ts @@ -1,9 +1,10 @@ /* eslint-disable */ -import console from 'node:console' import { describe, expect, test, vi } from 'vitest' +import { Errors } from '../errors/__.js' import type { ContextualError } from '../errors/ContextualError.js' -import { core, initialInput, oops, run, runWithOptions } from './specHelpers.js' +import { createRetryingExtension } from './main.js' +import { core, oops, run, runWithOptions } from './specHelpers.js' describe(`no extensions`, () => { test(`passthrough to implementation`, async () => { @@ -223,39 +224,87 @@ describe(`errors`, () => { `) }) test('calling a hook twice leads to clear error', async () => { + let neverRan = true const result = await run(async ({ a }) => { - a() - a() + await a() + await a() + neverRan = false }) as ContextualError + expect(neverRan).toBe(true) const cause = result.cause as ContextualError expect(cause.message).toMatchInlineSnapshot( - `"You already invoked hook "a". Hooks can only be invoked multiple times if the previous attempt failed."`, + `"Only a retrying extension can retry hooks."`, ) - expect(cause.context).toEqual({ hookName: 'a' }) + expect(cause.context).toMatchInlineSnapshot(` + { + "extensionsAfter": [], + "hookName": "a", + } + `) }) - test.only('if hook fails, extension can retry, then short-circuit', async () => { +}) + +describe('retrying extension', () => { + test('if hook fails, extension can retry, then short-circuit', async () => { core.hooks.a.mockReset().mockRejectedValueOnce(oops).mockResolvedValueOnce(1) - const result = await run(async ({ a }) => { + const result = await run(createRetryingExtension(async function foo({ a }) { const result1 = await a() expect(result1).toEqual(oops) const result2 = await a() expect(typeof result2.b).toEqual('function') expect(result2.b.input).toEqual(1) return result2.b.input - }) + })) expect(result).toEqual(1) }) - test.skip('if hook fails, extension can retry it, then continue to next hook.', async () => { - core.hooks.a.mockReset().mockRejectedValueOnce(oops).mockResolvedValueOnce(1) - const result = await run( - async function foo({ a }) { - const resultA1 = await a() - const resultA2 = await a() - const resultB1 = await resultA2.b() - return resultB1 - }, - ) - console.log(result) - expect(result).toEqual(1) + + describe('errors', () => { + test('not last extension', async () => { + const result = await run( + createRetryingExtension(async function foo({ a }) { + return a() + }), + async function bar({ a }) { + return a() + }, + ) + expect(result).toMatchInlineSnapshot(`[ContextualError: Only the last extension can retry hooks.]`) + expect((result as Errors.ContextualError).context).toMatchInlineSnapshot(` + { + "extensionsAfter": [ + { + "name": "bar", + }, + ], + } + `) + }) + test('call hook twice even though it succeeded the first time', async () => { + let neverRan = true + const result = await run( + createRetryingExtension(async function foo({ a }) { + const result1 = await a() + expect('b' in result1).toBe(true) + await a() // <-- Extension bug here under test. + neverRan = false + }), + ) + expect(neverRan).toBe(true) + expect(result).toMatchInlineSnapshot( + `[ContextualError: There was an error in the extension "foo".]`, + ) + expect((result as Errors.ContextualError).context).toMatchInlineSnapshot( + ` + { + "extensionName": "foo", + "hookName": "a", + "source": "extension", + } + `, + ) + expect((result as Errors.ContextualError).cause).toMatchInlineSnapshot( + `[ContextualError: Only after failure can a hook be called again by a retrying extension.]`, + ) + }) }) }) diff --git a/src/lib/anyware/main.ts b/src/lib/anyware/main.ts index 9f4bf55e1..b22e5b085 100644 --- a/src/lib/anyware/main.ts +++ b/src/lib/anyware/main.ts @@ -3,6 +3,7 @@ import { partitionAndAggregateErrors } from '../errors/ContextualAggregateError. import type { Deferred, FindValueAfter, IsLastValue, MaybePromise } from '../prelude.js' import { casesExhausted, createDeferred } from '../prelude.js' import { getEntrypoint } from './getEntrypoint.js' +import type { HookResultErrorExtension } from './runHook.js' import { runPipeline } from './runPipeline.js' type HookSequence = readonly [string, ...string[]] @@ -39,9 +40,10 @@ const hookSymbol = Symbol(`hook`) type HookSymbol = typeof hookSymbol -type SomeHookEnvelope = { +export type SomeHookEnvelope = { [name: string]: SomeHook } + export type SomeHook any = (input: any) => any> = fn & { [hookSymbol]: HookSymbol // todo the result is unknown, but if we build a EndEnvelope, then we can work with this type more logically and put it here. @@ -104,21 +106,50 @@ export type Core< export type HookName = string -export type Extension = { +export type Extension = NonRetryingExtension | RetryingExtension + +export type NonRetryingExtension = { + retrying: false + name: string + entrypoint: string + body: Deferred + currentChunk: Deferred +} + +export type RetryingExtension = { + retrying: true name: string entrypoint: string body: Deferred - currentChunk: Deferred + currentChunk: Deferred +} + +export const createRetryingExtension = (extension: NonRetryingExtensionInput): RetryingExtensionInput => { + return { + retrying: true, + run: extension, + } } // export type ExtensionInput<$Input extends object = object> = (input: $Input) => MaybePromise -export type ExtensionInput<$Input extends object = any> = (input: $Input) => MaybePromise +export type ExtensionInput<$Input extends object = any> = + | NonRetryingExtensionInput<$Input> + | RetryingExtensionInput<$Input> + +export type NonRetryingExtensionInput<$Input extends object = any> = ( + input: $Input, +) => MaybePromise + +export type RetryingExtensionInput<$Input extends object = any> = { + retrying: boolean + run: (input: $Input) => MaybePromise +} const ResultEnvelopeSymbol = Symbol(`resultEnvelope`) type ResultEnvelopeSymbol = typeof ResultEnvelopeSymbol -export type ResultEnvelop = { +export type ResultEnvelop = { [ResultEnvelopeSymbol]: ResultEnvelopeSymbol result: T } @@ -181,17 +212,16 @@ export const create = < toInternalExtension(core, resolveOptions(options), extension) ) const [initialHookStack, error] = partitionAndAggregateErrors(initialHookStackAndErrors) + if (error) return error - if (error) { - return error - } - + const asyncErrorDeferred = createDeferred({ strict: false }) const result = await runPipeline({ core, hookNamesOrderedBySequence: core.hookNamesOrderedBySequence, originalInput: initialInput, // @ts-expect-error fixme extensionsStack: initialHookStack, + asyncErrorDeferred, }) if (result instanceof Error) return result @@ -205,16 +235,18 @@ export const create = < const toInternalExtension = (core: Core, config: Config, extension: ExtensionInput) => { const currentChunk = createDeferred() const body = createDeferred() + const extensionRun = typeof extension === `function` ? extension : extension.run + const retrying = typeof extension === `function` ? false : extension.retrying const applyBody = async (input: object) => { try { - const result = await extension(input) + const result = await extensionRun(input) body.resolve(result) } catch (error) { body.reject(error) } } - const extensionName = extension.name || `anonymous` + const extensionName = extensionRun.name || `anonymous` switch (config.entrypointSelectionMode) { case `off`: { @@ -228,7 +260,7 @@ const toInternalExtension = (core: Core, config: Config, extension: ExtensionInp } case `optional`: case `required`: { - const entrypoint = getEntrypoint(core.hookNamesOrderedBySequence, extension) + const entrypoint = getEntrypoint(core.hookNamesOrderedBySequence, extensionRun) if (entrypoint instanceof Error) { if (config.entrypointSelectionMode === `required`) { return entrypoint @@ -257,6 +289,7 @@ const toInternalExtension = (core: Core, config: Config, extension: ExtensionInp void currentChunkPromiseChain.then(applyBody) return { + retrying, name: extensionName, entrypoint, body, diff --git a/src/lib/anyware/runHook.ts b/src/lib/anyware/runHook.ts index 51322e714..a28048f4b 100644 --- a/src/lib/anyware/runHook.ts +++ b/src/lib/anyware/runHook.ts @@ -1,21 +1,39 @@ import { Errors } from '../errors/__.js' +import type { Deferred } from '../prelude.js' import { casesExhausted, createDeferred, debug, debugSub, errorFromMaybeError } from '../prelude.js' -import type { Core, Extension } from './main.js' +import type { Core, Extension, ResultEnvelop, SomeHookEnvelope } from './main.js' -type HookDoneResolver = (input: HookDoneData) => void +type HookDoneResolver = (input: HookResult) => void -export type HookDoneData = - | { type: 'completed'; result: unknown; nextExtensionsStack: Extension[] } +export type HookResultErrorAsync = Deferred + +export type HookResult = + | { type: 'completed'; result: unknown; nextExtensionsStack: readonly Extension[] } | { type: 'shortCircuited'; result: unknown } - | { type: 'error'; hookName: string; source: 'implementation'; error: Error } - | { type: 'error'; hookName: string; source: 'extension'; error: Error; extensionName: string } + | { type: 'error'; hookName: string; source: 'user'; error: Errors.ContextualError; extensionName: string } + | HookResultErrorImplementation + | HookResultErrorExtension + +export type HookResultErrorExtension = { + type: 'error' + hookName: string + source: 'extension' + error: Error + extensionName: string +} + +export type HookResultErrorImplementation = { + type: 'error' + hookName: string + source: 'implementation' + error: Error +} type Input = { core: Core name: string done: HookDoneResolver originalInput: unknown - isRetry?: boolean /** * The extensions that are at this hook awaiting. */ @@ -28,16 +46,33 @@ type Input = { * that short-circuit the pipeline or enter passthrough mode). */ nextExtensionsStack: readonly Extension[] + asyncErrorDeferred: HookResultErrorAsync } +const createExecutableChunk = <$Extension extends Extension>(extension: $Extension) => ({ + ...extension, + currentChunk: createDeferred(), +}) + export const runHook = async ( - { core, name, done, originalInput, extensionsStack, nextExtensionsStack, isRetry }: Input, + { core, name, done, originalInput, extensionsStack, nextExtensionsStack, asyncErrorDeferred }: Input, ) => { const debugHook = debugSub(`hook ${name}:`) debugHook(`advance to next extension`) const [extension, ...extensionsStackRest] = extensionsStack + const isLastExtension = extensionsStackRest.length === 0 + if (!isLastExtension && extension?.retrying) { + done({ + type: `error`, + source: `user`, + extensionName: extension.name, // must be defined because is NOT last extension + hookName: name, + // dprint-ignore + error: new Errors.ContextualError(`Only the last extension can retry hooks.`, { extensionsAfter: extensionsStackRest.map(_=>({ name: _.name })) }), + }) + } /** * If extension is defined then that means there @@ -49,74 +84,77 @@ export const runHook = async ( if (extension) { const debugExtension = debugSub(`hook ${name}: extension ${extension.name}:`) const hookInvokedDeferred = createDeferred() - let previousAttemptErrored = false debugExtension(`start`) - // The extension is responsible for calling the next hook. - // If no input is passed that means use the original input. - const hook = createHook(originalInput, (maybeNextOriginalInput?: object) => { - // Once called, the extension is paused again and we continue down the current hook stack. - debugExtension(`extension runs this hook from envelope`) + let hookFailed = false + const hook = createHook(originalInput, (extensionInput) => { + debugExtension(`extension calls this hook`) - const inputResolved = maybeNextOriginalInput ?? originalInput + const inputResolved = extensionInput ?? originalInput + // [1] + // Never resolve this hook call, the extension is in an invalid state and should not continue executing. + // While it is possible the extension could continue by not await this hook at least if they are awaiting + // it and so have code depending on its result it will never run. if (hookInvokedDeferred.isResolved()) { - if (previousAttemptErrored) { - const d = createDeferred() - const extensionRetry = { - ...extension, - currentChunk: d, - } - // automate/forward the retry - void d.promise.then(envelope => envelope[name](maybeNextOriginalInput ?? originalInput)) - const currentHookStack = [extensionRetry, ...extensionsStackRest] - - const extensionWithNextChunk: Extension = { - ...extension, - currentChunk: createDeferred(), - } - const nextNextHookStack = [...nextExtensionsStack, extensionWithNextChunk] // tempting to mutate here but simpler to think about as copy. - // debug(1, nextHookStack) - debug(`${name}: ${extension.name}: execute branch: retry`) + if (!extension.retrying) { + asyncErrorDeferred.resolve({ + type: `error`, + source: `extension`, + extensionName: extension.name, + hookName: name, + error: new Errors.ContextualError(`Only a retrying extension can retry hooks.`, { + hookName: name, + extensionsAfter: extensionsStackRest.map(_ => ({ name: _.name })), + }), + }) + return createDeferred().promise // [1] + } else if (!hookFailed) { + asyncErrorDeferred.resolve({ + type: `error`, + source: `extension`, + extensionName: extension.name, + hookName: name, + error: new Errors.ContextualError( + `Only after failure can a hook be called again by a retrying extension.`, + { + hookName: name, + extensionName: extension.name, + }, + ), + }) + return createDeferred().promise // [1] + } else { + debugExtension(`execute branch: retry`) + const extensionRetry = createExecutableChunk(extension) void runHook({ core, name, done, - originalInput: inputResolved, - extensionsStack, - nextExtensionsStack: nextNextHookStack, - // isRetry: true, + originalInput, + asyncErrorDeferred, + extensionsStack: [extensionRetry], + nextExtensionsStack, + }) + return extensionRetry.currentChunk.promise.then(async (envelope) => { + const envelop_ = envelope as SomeHookEnvelope // todo ... better way? + const hook = envelop_[name] + if (!hook) throw new Error(`Hook not found in envelope: ${name}`) + const result = await hook(extensionInput ?? originalInput) as Promise< + SomeHookEnvelope | Error | ResultEnvelop + > + return result }) - return // extensionWithNextChunk.currentChunk.promise - } else { - throw new Errors.ContextualError( - `You already invoked hook "${name}". Hooks can only be invoked multiple times if the previous attempt failed.`, - { - hookName: name, - }, - ) } } else { - let extensionWithNextChunk: Extension - let nextNextHookStack: Extension[] - if (isRetry) { - nextNextHookStack = nextExtensionsStack - extensionWithNextChunk = nextNextHookStack[nextNextHookStack.length - 1]! - debug(`is-retry`) - } else { - extensionWithNextChunk = { - ...extension, - currentChunk: createDeferred(), - } - nextNextHookStack = [...nextExtensionsStack, extensionWithNextChunk] // tempting to mutate here but simpler to think about as copy. - } - + const extensionWithNextChunk = createExecutableChunk(extension) + const nextNextHookStack = [...nextExtensionsStack, extensionWithNextChunk] // tempting to mutate here but simpler to think about as copy. hookInvokedDeferred.resolve(true) - void runHook({ core, name, done, + asyncErrorDeferred, originalInput: inputResolved, extensionsStack: extensionsStackRest, nextExtensionsStack: nextNextHookStack, @@ -125,7 +163,7 @@ export const runHook = async ( return extensionWithNextChunk.currentChunk.promise.then(_ => { if (_ instanceof Error) { debugExtension(`received hook error`) - previousAttemptErrored = true + hookFailed = true } return _ }) @@ -134,8 +172,9 @@ export const runHook = async ( // The extension is resumed. It is responsible for calling the next hook. - debug(`${name}: ${extension.name}: advance with envelope`) - const envelope = { + debugExtension(`advance with envelope`) + // @ts-expect-error fixme + const envelope: SomeHookEnvelope = { [name]: hook, } extension.currentChunk.resolve(envelope) @@ -144,6 +183,7 @@ export const runHook = async ( // If the extension returns the hook envelope, it wants the rest of the pipeline // to pass through it. // If the extension returns a non-hook-envelope value, it wants to short-circuit the pipeline. + debugHook(`start race between extension returning or invoking next hook`) const { branch, result } = await Promise.race([ hookInvokedDeferred.promise.then(result => { return { branch: `hookInvoked`, result } as const @@ -156,7 +196,7 @@ export const runHook = async ( switch (branch) { case `hookInvoked`: { - debug(`hookInvoked`) + debugExtension(`invoked next hook (or retrying extension got error pushed through)`) // do nothing, hook is making the processing continue. return } @@ -168,6 +208,7 @@ export const runHook = async ( name, done, originalInput, + asyncErrorDeferred, extensionsStack: extensionsStackRest, nextExtensionsStack, }) @@ -207,10 +248,10 @@ export const runHook = async ( try { result = await implementation(originalInput as any) } catch (error) { - if (nextExtensionsStack.length) { - debugHook(`implementation error`) - // send back up the stack - nextExtensionsStack[nextExtensionsStack.length - 1]!.currentChunk.resolve(error) + debugHook(`implementation error`) + const lastExtension = nextExtensionsStack[nextExtensionsStack.length - 1] + if (lastExtension && lastExtension.retrying) { + lastExtension.currentChunk.resolve(errorFromMaybeError(error)) } else { done({ type: `error`, hookName: name, source: `implementation`, error: errorFromMaybeError(error) }) } @@ -221,11 +262,11 @@ export const runHook = async ( debugHook(`completed`) - done({ type: `completed`, result, nextExtensionsStack }) + done({ type: `completed`, result, nextExtensionsStack: nextExtensionsStack }) } } -const createHook = <$X, $F extends (...args: any[]) => any>( +const createHook = <$X, $F extends (input?: object) => any>( originalInput: $X, fn: $F, ): $F & { input: $X } => { diff --git a/src/lib/anyware/runPipeline.ts b/src/lib/anyware/runPipeline.ts index 63dc9ffb7..6118de2a6 100644 --- a/src/lib/anyware/runPipeline.ts +++ b/src/lib/anyware/runPipeline.ts @@ -1,19 +1,21 @@ import type { Errors } from '../errors/__.js' import { ContextualError } from '../errors/ContextualError.js' import { casesExhausted, createDeferred, debug } from '../prelude.js' -import type { Core, Extension, PrivateTypesSymbol, ResultEnvelop } from './main.js' +import { defaultFunctionName } from './lib.js' +import type { Core, Extension, ResultEnvelop } from './main.js' import { createResultEnvelope } from './main.js' -import type { HookDoneData } from './runHook.js' +import type { HookResult, HookResultErrorAsync } from './runHook.js' import { runHook } from './runHook.js' export const runPipeline = async ( - { core, hookNamesOrderedBySequence, originalInput, extensionsStack }: { + { core, hookNamesOrderedBySequence, originalInput, extensionsStack, asyncErrorDeferred }: { core: Core hookNamesOrderedBySequence: readonly string[] originalInput: unknown extensionsStack: readonly Extension[] + asyncErrorDeferred: HookResultErrorAsync }, -): Promise | Errors.ContextualError> => { +): Promise => { const [hookName, ...hookNamesRest] = hookNamesOrderedBySequence if (!hookName) { @@ -25,18 +27,21 @@ export const runPipeline = async ( debug(`hook ${hookName}: start`) - const doneDeferred = createDeferred() + const done = createDeferred({ strict: false }) void runHook({ core, name: hookName, - done: doneDeferred.resolve, + done: done.resolve, originalInput, extensionsStack, + asyncErrorDeferred, nextExtensionsStack: [], }) - const signal = await doneDeferred.promise + const signal = await Promise.race( + [done.promise, asyncErrorDeferred.promise], + ) switch (signal.type) { case `completed`: { @@ -46,6 +51,7 @@ export const runPipeline = async ( hookNamesOrderedBySequence: hookNamesRest, originalInput: result, extensionsStack: nextExtensionsStack, + asyncErrorDeferred, }) } case `shortCircuited`: { @@ -55,12 +61,17 @@ export const runPipeline = async ( } case `error`: { debug(`signal: error`) + const wasAsync = asyncErrorDeferred.isResolved() // todo type test for this possible return value switch (signal.source) { case `extension`: { - const nameTip = signal.extensionName ? ` (use named functions to improve this error message)` : `` - const message = - `There was an error in the extension "${signal.extensionName}"${nameTip} while running hook "${signal.hookName}".` + // todo test these 2 branches explicitly + const nameTip = signal.extensionName === defaultFunctionName + ? ` (use named functions to improve this error message)` + : `` + const message = wasAsync + ? `There was an error in the extension "${signal.extensionName}"${nameTip}.` + : `There was an error in the extension "${signal.extensionName}"${nameTip} while running hook "${signal.hookName}".` return new ContextualError(message, { hookName: signal.hookName, source: signal.source, @@ -71,6 +82,9 @@ export const runPipeline = async ( const message = `There was an error in the core implementation of hook "${signal.hookName}".` return new ContextualError(message, { hookName: signal.hookName, source: signal.source }, signal.error) } + case `user`: { + return signal.error + } default: throw casesExhausted(signal) } @@ -88,83 +102,10 @@ const runPipelineEnd = async ({ if (!extension) return result debug(`extension ${extension.name}: end`) - extension.currentChunk.resolve(result) + extension.currentChunk.resolve(result as any) const nextResult = await extension.body.promise return await runPipelineEnd({ extensionsStack: extensionsRest, result: nextResult, }) } - -// const runPipelineOld = async ( -// { core, initialInput, initialHookStack }: { core: Core; initialInput: unknown; initialHookStack: Extension[] }, -// ): Promise | ContextualError> => { -// let currentInput = initialInput -// let currentHookStack = initialHookStack - -// for (const hookName of core.hookNamesOrderedBySequence) { -// debug(`hook ${hookName}: start`) -// const doneDeferred = createDeferred() -// void runHook({ -// core, -// name: hookName, -// done: doneDeferred.resolve, -// originalInput: currentInput, -// extensionsStack: currentHookStack, -// nextExtensionsStack: [], -// }) - -// const signal = await doneDeferred.promise - -// switch (signal.type) { -// case `completed`: { -// const { result, nextExtensionsStack } = signal -// currentInput = result -// currentHookStack = nextExtensionsStack -// break -// } -// case `shortCircuited`: { -// debug(`signal: shortCircuited`) -// const { result } = signal -// return createResultEnvelope(result) -// } -// case `error`: { -// debug(`signal: error`) -// // todo type test for this possible return value -// switch (signal.source) { -// case `extension`: { -// const nameTip = signal.extensionName ? ` (use named functions to improve this error message)` : `` -// const message = -// `There was an error in the extension "${signal.extensionName}"${nameTip} while running hook "${signal.hookName}".` -// return new ContextualError(message, { -// hookName: signal.hookName, -// source: signal.source, -// extensionName: signal.extensionName, -// }, signal.error) -// } -// case `implementation`: { -// const message = `There was an error in the core implementation of hook "${signal.hookName}".` -// return new ContextualError(message, { hookName: signal.hookName, source: signal.source }, signal.error) -// } -// default: -// throw casesExhausted(signal) -// } -// } -// default: -// throw casesExhausted(signal) -// } -// } - -// debug(`pipeline: ending`) - -// let currentResult = currentInput -// for (const extension of currentHookStack) { -// debug(`extension ${extension.name}: end`) -// extension.currentChunk.resolve(currentResult) -// currentResult = await extension.body.promise -// } - -// debug(`pipeline: returning`) - -// return createResultEnvelope(currentResult) // last loop result -// } diff --git a/src/lib/prelude.ts b/src/lib/prelude.ts index ca788f368..109a9784e 100644 --- a/src/lib/prelude.ts +++ b/src/lib/prelude.ts @@ -250,7 +250,7 @@ export type Deferred = { reject: (error: unknown) => void } -export const createDeferred = <$T>(): Deferred<$T> => { +export const createDeferred = <$T>(options?: { strict?: boolean }): Deferred<$T> => { let isResolved = false let resolve: (value: $T) => void let reject: (error: unknown) => void @@ -265,6 +265,9 @@ export const createDeferred = <$T>(): Deferred<$T> => { isResolved: () => isResolved, resolve: (value) => { isResolved = true + if (options?.strict && isResolved) { + throw new Error(`Deferred is already resolved. Attempted to resolve with: ${JSON.stringify(value)}`) + } resolve(value) }, reject: (error) => reject(error), From 8a8b7816e13e3dda4bdc54551113b8fb1e046113 Mon Sep 17 00:00:00 2001 From: Jason Kuhrt Date: Sat, 8 Jun 2024 21:33:15 -0400 Subject: [PATCH 4/5] type error --- src/lib/anyware/getEntrypoint.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/lib/anyware/getEntrypoint.ts b/src/lib/anyware/getEntrypoint.ts index 394daa1c4..2680954d5 100644 --- a/src/lib/anyware/getEntrypoint.ts +++ b/src/lib/anyware/getEntrypoint.ts @@ -1,7 +1,7 @@ // import type { Extension, HookName } from '../../layers/5_client/extension/types.js' import { analyzeFunction } from '../analyzeFunction.js' import { ContextualError } from '../errors/ContextualError.js' -import type { ExtensionInput, HookName } from './main.js' +import type { HookName, NonRetryingExtensionInput } from './main.js' export class ErrorAnywareExtensionEntrypoint extends ContextualError< 'ErrorGraffleExtensionEntryHook', @@ -25,7 +25,7 @@ export type ExtensionEntryHookIssue = typeof ExtensionEntryHookIssue[keyof typeo export const getEntrypoint = ( hookNames: readonly string[], - extension: ExtensionInput, + extension: NonRetryingExtensionInput, ): ErrorAnywareExtensionEntrypoint | HookName => { const x = analyzeFunction(extension) if (x.parameters.length > 1) { From 7e00a939ab967d9cfb4499f9d0aacf9695fdba0c Mon Sep 17 00:00:00 2001 From: Jason Kuhrt Date: Sun, 9 Jun 2024 00:12:46 -0400 Subject: [PATCH 5/5] work --- src/layers/5_client/client.extend.test.ts | 26 +++++++++++- src/layers/5_client/client.ts | 10 ++++- src/lib/anyware/__.test-d.ts | 11 +++++ src/lib/anyware/main.ts | 49 +++++++++++++++-------- 4 files changed, 77 insertions(+), 19 deletions(-) diff --git a/src/layers/5_client/client.extend.test.ts b/src/layers/5_client/client.extend.test.ts index f2595a101..41dec82de 100644 --- a/src/layers/5_client/client.extend.test.ts +++ b/src/layers/5_client/client.extend.test.ts @@ -1,10 +1,9 @@ /* eslint-disable */ -import { ExecutionResult } from 'graphql' import { describe, expect } from 'vitest' import { db } from '../../../tests/_/db.js' import { createResponse, test } from '../../../tests/_/helpers.js' import { Graffle } from '../../../tests/_/schema/generated/__.js' -import { GraphQLExecutionResult } from '../../legacy/lib/graphql.js' +import { oops } from '../../lib/anyware/specHelpers.js' const client = Graffle.create({ schema: 'https://foo', returnMode: 'dataAndErrors' }) const headers = { 'x-foo': 'bar' } @@ -38,3 +37,26 @@ describe(`entrypoint request`, () => { expect(await client2.query.id()).toEqual(db.id) }) }) + +test('can retry failed request', async ({ fetch }) => { + fetch + .mockImplementationOnce(async () => { + throw oops + }) + .mockImplementationOnce(async () => { + throw oops + }) + .mockImplementationOnce(async () => { + return createResponse({ data: { id: db.id } }) + }) + const client2 = client.retry(async ({ exchange }) => { + let result = await exchange() + while (result instanceof Error) { + result = await exchange() + } + return result + }) + const result = await client2.query.id() + expect(result).toEqual(db.id) + expect(fetch.mock.calls.length).toEqual(3) +}) diff --git a/src/layers/5_client/client.ts b/src/layers/5_client/client.ts index 04e785d48..6ba7ca985 100644 --- a/src/layers/5_client/client.ts +++ b/src/layers/5_client/client.ts @@ -39,6 +39,7 @@ export type SelectionSetOrIndicator = 0 | 1 | boolean | object export type SelectionSetOrArgs = object export interface Context { + retry: undefined | Anyware.Extension2 extensions: Anyware.Extension2[] config: Config } @@ -65,6 +66,7 @@ export type Client<$Index extends Schema.Index | null, $Config extends Config> = ) & { extend: (extension: Anyware.Extension2) => Client<$Index, $Config> + retry: (extension: Anyware.Extension2) => Client<$Index, $Config> } export type ClientTyped<$Index extends Schema.Index, $Config extends Config> = @@ -147,9 +149,10 @@ type Create = < export const create: Create = ( input_, -) => createInternal(input_, { extensions: [] }) +) => createInternal(input_, { extensions: [], retry: undefined }) interface CreateState { + retry?: Anyware.Extension2 extensions: Anyware.Extension2[] } @@ -251,6 +254,7 @@ export const createInternal = ( } const context: Context = { + retry: state.retry, extensions: state.extensions, config: { returnMode, @@ -260,6 +264,7 @@ export const createInternal = ( const run = async (context: Context, initialInput: HookInputEncode) => { const result = await Core.anyware.run({ initialInput, + retryingExtension: context.retry, extensions: context.extensions, }) as GraffleExecutionResult return handleReturn(context, result) @@ -296,6 +301,9 @@ export const createInternal = ( // todo test that adding extensions returns a copy of client return createInternal(input, { extensions: [...state.extensions, extension] }) }, + retry: (extension: Anyware.Extension2) => { + return createInternal(input, { ...state, retry: extension }) + }, } // todo extract this into constructor "create typed client" diff --git a/src/lib/anyware/__.test-d.ts b/src/lib/anyware/__.test-d.ts index c8ec9371e..6d05d19ac 100644 --- a/src/lib/anyware/__.test-d.ts +++ b/src/lib/anyware/__.test-d.ts @@ -1,5 +1,6 @@ /* eslint-disable */ +import { run } from 'node:test' import { expectTypeOf, test } from 'vitest' import { Result } from '../../../tests/_/schema/generated/SchemaRuntime.js' import { ContextualError } from '../errors/ContextualError.js' @@ -32,6 +33,16 @@ test('run', () => { (input: { initialInput: InputA options?: Anyware.Options + retryingExtension?: (input: { + a: SomeHook< + (input?: InputA) => MaybePromise< + Error | { + b: SomeHook<(input?: InputB) => MaybePromise> + } + > + > + b: SomeHook<(input?: InputB) => MaybePromise> + }) => Promise extensions: ((input: { a: SomeHook< (input?: InputA) => MaybePromise<{ diff --git a/src/lib/anyware/main.ts b/src/lib/anyware/main.ts index b22e5b085..c221d8bc5 100644 --- a/src/lib/anyware/main.ts +++ b/src/lib/anyware/main.ts @@ -8,13 +8,19 @@ import { runPipeline } from './runPipeline.js' type HookSequence = readonly [string, ...string[]] +type ExtensionOptions = { + retrying: boolean +} + export type Extension2< $Core extends Core = Core, + $Options extends ExtensionOptions = ExtensionOptions, > = ( hooks: ExtensionHooks< $Core[PrivateTypesSymbol]['hookSequence'], $Core[PrivateTypesSymbol]['hookMap'], - $Core[PrivateTypesSymbol]['result'] + $Core[PrivateTypesSymbol]['result'], + $Options >, ) => Promise< | $Core[PrivateTypesSymbol]['result'] @@ -25,8 +31,9 @@ type ExtensionHooks< $HookSequence extends HookSequence, $HookMap extends Record<$HookSequence[number], object> = Record<$HookSequence[number], object>, $Result = unknown, + $Options extends ExtensionOptions = ExtensionOptions, > = { - [$HookName in $HookSequence[number]]: Hook<$HookSequence, $HookMap, $Result, $HookName> + [$HookName in $HookSequence[number]]: Hook<$HookSequence, $HookMap, $Result, $HookName, $Options> } type CoreInitialInput<$Core extends Core> = @@ -65,24 +72,32 @@ type Hook< $HookMap extends HookMap<$HookSequence> = HookMap<$HookSequence>, $Result = unknown, $Name extends $HookSequence[number] = $HookSequence[number], -> = (<$$Input extends $HookMap[$Name]>(input?: $$Input) => HookReturn<$HookSequence, $HookMap, $Result, $Name>) & { - [hookSymbol]: HookSymbol - input: $HookMap[$Name] -} + $Options extends ExtensionOptions = ExtensionOptions, +> = + & (<$$Input extends $HookMap[$Name]>( + input?: $$Input, + ) => HookReturn<$HookSequence, $HookMap, $Result, $Name, $Options>) + & { + [hookSymbol]: HookSymbol + input: $HookMap[$Name] + } type HookReturn< $HookSequence extends HookSequence, $HookMap extends HookMap<$HookSequence> = HookMap<$HookSequence>, $Result = unknown, $Name extends $HookSequence[number] = $HookSequence[number], -> = IsLastValue<$Name, $HookSequence> extends true ? $Result : { - [$NameNext in FindValueAfter<$Name, $HookSequence>]: Hook< - $HookSequence, - $HookMap, - $Result, - $NameNext - > -} + $Options extends ExtensionOptions = ExtensionOptions, +> = + | ($Options['retrying'] extends true ? Error : never) + | (IsLastValue<$Name, $HookSequence> extends true ? $Result : { + [$NameNext in FindValueAfter<$Name, $HookSequence>]: Hook< + $HookSequence, + $HookMap, + $Result, + $NameNext + > + }) export type Core< $HookSequence extends HookSequence = HookSequence, @@ -188,6 +203,7 @@ export type Builder<$Core extends Core = Core> = { { initialInput, extensions, options }: { initialInput: CoreInitialInput<$Core> extensions: Extension2<$Core>[] + retryingExtension?: Extension2<$Core, { retrying: true }> options?: Options }, ) => Promise<$Core[PrivateTypesSymbol]['result'] | Errors.ContextualError> @@ -207,8 +223,9 @@ export const create = < const builder: Builder<$Core> = { core, run: async (input) => { - const { initialInput, extensions, options } = input - const initialHookStackAndErrors = extensions.map(extension => + const { initialInput, extensions, options, retryingExtension } = input + const extensions_ = retryingExtension ? [...extensions, createRetryingExtension(retryingExtension)] : extensions + const initialHookStackAndErrors = extensions_.map(extension => toInternalExtension(core, resolveOptions(options), extension) ) const [initialHookStack, error] = partitionAndAggregateErrors(initialHookStackAndErrors)