From d9acd2465ec65ec28c7f63d64b8f5f2a05dfe05b Mon Sep 17 00:00:00 2001 From: Pavel Jbanov Date: Wed, 31 Jul 2024 14:49:59 -0400 Subject: [PATCH 1/9] feat: do not allow defining new actions from within other actions/flows --- docs/errors/no_new_actions_at_runtime.md | 22 ++++++++++++++++++++++ js/core/src/action.ts | 24 +++++++++++++++++++++++- js/flow/src/flow.ts | 2 +- js/flow/src/utils.ts | 3 ++- 4 files changed, 48 insertions(+), 3 deletions(-) create mode 100644 docs/errors/no_new_actions_at_runtime.md diff --git a/docs/errors/no_new_actions_at_runtime.md b/docs/errors/no_new_actions_at_runtime.md new file mode 100644 index 000000000..ce1ad52ea --- /dev/null +++ b/docs/errors/no_new_actions_at_runtime.md @@ -0,0 +1,22 @@ +# No new actions at runtime error + +Defining new action at runtime is not alowed. + +✅ DO: + +```ts +const prompt = defineDotprompt({...}) + +const flow = defineFlow({...}, async (input) => { + await prompt.generate(...); +}) +``` + +❌ DON'T: + +```ts +const flow = defineFlow({...}, async (input) => { + const prompt = defineDotprompt({...}) + prompt.generate(...); +}) +``` diff --git a/js/core/src/action.ts b/js/core/src/action.ts index a78c8a724..f00eed714 100644 --- a/js/core/src/action.ts +++ b/js/core/src/action.ts @@ -216,9 +216,15 @@ export function defineAction< }, fn: (input: z.infer) => Promise> ): Action { + if (isInRuntimeContext()) { + throw new Error( + 'Cannot define new actions at runtime.\n' + + 'See: https://github.com/firebase/genkit/blob/main/docs/errors/no_new_actions_at_runtime.md' + ); + } const act = action(config, (i: I): Promise> => { setCustomMetadataAttributes({ subtype: config.actionType }); - return fn(i); + return runInActionRuntimeContext(() => fn(i)); }); act.__action.actionType = config.actionType; registerAction(config.actionType, act); @@ -252,3 +258,19 @@ export function getStreamingCallback(): StreamingCallback | undefined { } return cb; } + +const runtimeCtxAls = new AsyncLocalStorage(); + +/** + * Checks whether the caller is currently in the runtime context of an action. + */ +function isInRuntimeContext() { + return !!runtimeCtxAls.getStore(); +} + +/** + * Execute the provided function in the action runtime context. + */ +export function runInActionRuntimeContext(fn: () => R) { + return runtimeCtxAls.run('runtime', fn); +} diff --git a/js/flow/src/flow.ts b/js/flow/src/flow.ts index 8d318a1c3..cc6ba8904 100644 --- a/js/flow/src/flow.ts +++ b/js/flow/src/flow.ts @@ -153,7 +153,7 @@ export function defineFlow< setTimeout(() => flow.runEnvelope(msg), delay * 1000); }, }, - steps + (input, streamingCallback) => steps(input, streamingCallback) ); createdFlows().push(f); wrapAsAction(f); diff --git a/js/flow/src/utils.ts b/js/flow/src/utils.ts index 774d81c4c..411019517 100644 --- a/js/flow/src/utils.ts +++ b/js/flow/src/utils.ts @@ -14,6 +14,7 @@ * limitations under the License. */ +import { runInActionRuntimeContext } from '@genkit-ai/core'; import { AsyncLocalStorage } from 'node:async_hooks'; import { v4 as uuidv4 } from 'uuid'; import z from 'zod'; @@ -45,7 +46,7 @@ export function runWithActiveContext( ctx: Context, fn: () => R ) { - return ctxAsyncLocalStorage.run(ctx, fn); + return ctxAsyncLocalStorage.run(ctx, () => runInActionRuntimeContext(fn)); } /** From 2c7f52bbbfc6fda843ad7098b3235e94ff751d7c Mon Sep 17 00:00:00 2001 From: Pavel Jbanov Date: Wed, 31 Jul 2024 15:19:12 -0400 Subject: [PATCH 2/9] proactively inialize all plugins on first run --- js/core/src/action.ts | 9 +++++---- js/core/src/plugin.ts | 8 +++++++- js/core/src/registry.ts | 17 ++++++++++++++--- js/flow/src/flow.ts | 2 ++ 4 files changed, 28 insertions(+), 8 deletions(-) diff --git a/js/core/src/action.ts b/js/core/src/action.ts index f00eed714..4d3b46be3 100644 --- a/js/core/src/action.ts +++ b/js/core/src/action.ts @@ -18,7 +18,7 @@ import { JSONSchema7 } from 'json-schema'; import { AsyncLocalStorage } from 'node:async_hooks'; import { performance } from 'node:perf_hooks'; import * as z from 'zod'; -import { ActionType, lookupPlugin, registerAction } from './registry.js'; +import { ActionType, lookupPlugin, maybeInitializeAllPlugins, registerAction } from './registry.js'; import { parseSchema } from './schema.js'; import * as telemetry from './telemetry.js'; import { @@ -222,9 +222,10 @@ export function defineAction< 'See: https://github.com/firebase/genkit/blob/main/docs/errors/no_new_actions_at_runtime.md' ); } - const act = action(config, (i: I): Promise> => { + const act = action(config, async (i: I): Promise> => { setCustomMetadataAttributes({ subtype: config.actionType }); - return runInActionRuntimeContext(() => fn(i)); + await maybeInitializeAllPlugins(); + return await runInActionRuntimeContext(() => fn(i)); }); act.__action.actionType = config.actionType; registerAction(config.actionType, act); @@ -264,7 +265,7 @@ const runtimeCtxAls = new AsyncLocalStorage(); /** * Checks whether the caller is currently in the runtime context of an action. */ -function isInRuntimeContext() { +export function isInRuntimeContext() { return !!runtimeCtxAls.getStore(); } diff --git a/js/core/src/plugin.ts b/js/core/src/plugin.ts index a21a3a5f3..97955ce00 100644 --- a/js/core/src/plugin.ts +++ b/js/core/src/plugin.ts @@ -15,7 +15,7 @@ */ import { z } from 'zod'; -import { Action } from './action.js'; +import { Action, isInRuntimeContext } from './action.js'; import { FlowStateStore } from './flowTypes.js'; import { LoggerConfig, TelemetryConfig } from './telemetryTypes.js'; import { TraceStore } from './tracing.js'; @@ -60,6 +60,12 @@ export function genkitPlugin( pluginName: string, initFn: T ): Plugin> { + if (isInRuntimeContext()) { + throw new Error( + 'Cannot define new plugins at runtime.\n' + + 'See: https://github.com/firebase/genkit/blob/main/docs/errors/no_new_actions_at_runtime.md' + ); + } return (...args: Parameters) => ({ name: pluginName, initializer: async () => { diff --git a/js/core/src/registry.ts b/js/core/src/registry.ts index 94647c92b..a19cab22c 100644 --- a/js/core/src/registry.ts +++ b/js/core/src/registry.ts @@ -15,7 +15,7 @@ */ import * as z from 'zod'; -import { Action } from './action.js'; +import { Action, isInRuntimeContext } from './action.js'; import { FlowStateStore } from './flowTypes.js'; import { logger } from './logging.js'; import { PluginProvider } from './plugin.js'; @@ -126,10 +126,19 @@ type ActionsRecord = Record>; * Returns all actions in the registry. */ export async function listActions(): Promise { + maybeInitializeAllPlugins(); + return Object.assign({}, actionsById()); +} + +let initializedAllPlugins = false; +export async function maybeInitializeAllPlugins() { + if (initializedAllPlugins) { + return; + } for (const pluginName of Object.keys(pluginsByName())) { await initializePlugin(pluginName); } - return Object.assign({}, actionsById()); + initializedAllPlugins = true; } /** @@ -196,13 +205,15 @@ export async function lookupFlowStateStore( */ export function registerPluginProvider(name: string, provider: PluginProvider) { let cached; + let isInitialized = false; pluginsByName()[name] = { name: provider.name, initializer: () => { - if (cached) { + if (isInitialized) { return cached; } cached = provider.initializer(); + isInitialized = true; return cached; }, }; diff --git a/js/flow/src/flow.ts b/js/flow/src/flow.ts index cc6ba8904..ffcf26256 100644 --- a/js/flow/src/flow.ts +++ b/js/flow/src/flow.ts @@ -28,6 +28,7 @@ import { StreamingCallback, } from '@genkit-ai/core'; import { logger } from '@genkit-ai/core/logging'; +import { maybeInitializeAllPlugins } from '@genkit-ai/core/registry'; import { toJsonSchema } from '@genkit-ai/core/schema'; import { newTrace, @@ -391,6 +392,7 @@ export class Flow< labels: Record | undefined ) { const startTimeMs = performance.now(); + await maybeInitializeAllPlugins(); await runWithActiveContext(ctx, async () => { let traceContext; if (ctx.state.traceContext) { From 8170b20cb8da10b7ace53a0d9699f3d319963a11 Mon Sep 17 00:00:00 2001 From: Pavel Jbanov Date: Wed, 31 Jul 2024 15:19:59 -0400 Subject: [PATCH 3/9] undo --- js/flow/src/flow.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/js/flow/src/flow.ts b/js/flow/src/flow.ts index ffcf26256..0b43344d0 100644 --- a/js/flow/src/flow.ts +++ b/js/flow/src/flow.ts @@ -154,7 +154,7 @@ export function defineFlow< setTimeout(() => flow.runEnvelope(msg), delay * 1000); }, }, - (input, streamingCallback) => steps(input, streamingCallback) + steps ); createdFlows().push(f); wrapAsAction(f); From 48fe7133f92c4d3d2d5a3169db916bc8256ad68f Mon Sep 17 00:00:00 2001 From: Pavel Jbanov Date: Wed, 31 Jul 2024 15:20:59 -0400 Subject: [PATCH 4/9] await maybeInitializeAllPlugins --- js/core/src/registry.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/js/core/src/registry.ts b/js/core/src/registry.ts index a19cab22c..25c590e52 100644 --- a/js/core/src/registry.ts +++ b/js/core/src/registry.ts @@ -126,7 +126,7 @@ type ActionsRecord = Record>; * Returns all actions in the registry. */ export async function listActions(): Promise { - maybeInitializeAllPlugins(); + await maybeInitializeAllPlugins(); return Object.assign({}, actionsById()); } From 03aa6dff900f64fa831c702f9b2bfa6e476e45c0 Mon Sep 17 00:00:00 2001 From: Pavel Jbanov Date: Wed, 31 Jul 2024 15:38:30 -0400 Subject: [PATCH 5/9] format --- js/core/src/action.ts | 7 ++++++- js/core/src/registry.ts | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/js/core/src/action.ts b/js/core/src/action.ts index 4d3b46be3..bd4af19fe 100644 --- a/js/core/src/action.ts +++ b/js/core/src/action.ts @@ -18,7 +18,12 @@ import { JSONSchema7 } from 'json-schema'; import { AsyncLocalStorage } from 'node:async_hooks'; import { performance } from 'node:perf_hooks'; import * as z from 'zod'; -import { ActionType, lookupPlugin, maybeInitializeAllPlugins, registerAction } from './registry.js'; +import { + ActionType, + lookupPlugin, + maybeInitializeAllPlugins, + registerAction, +} from './registry.js'; import { parseSchema } from './schema.js'; import * as telemetry from './telemetry.js'; import { diff --git a/js/core/src/registry.ts b/js/core/src/registry.ts index 25c590e52..cfd24f775 100644 --- a/js/core/src/registry.ts +++ b/js/core/src/registry.ts @@ -15,7 +15,7 @@ */ import * as z from 'zod'; -import { Action, isInRuntimeContext } from './action.js'; +import { Action } from './action.js'; import { FlowStateStore } from './flowTypes.js'; import { logger } from './logging.js'; import { PluginProvider } from './plugin.js'; From 874c5373fb1b015c40c2e8cebf1f46ee0397b918 Mon Sep 17 00:00:00 2001 From: Pavel Jbanov Date: Wed, 31 Jul 2024 15:47:01 -0400 Subject: [PATCH 6/9] fix tests --- js/core/src/registry.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/js/core/src/registry.ts b/js/core/src/registry.ts index cfd24f775..d0b93f406 100644 --- a/js/core/src/registry.ts +++ b/js/core/src/registry.ts @@ -204,6 +204,7 @@ export async function lookupFlowStateStore( * Registers a flow state store for the given environment. */ export function registerPluginProvider(name: string, provider: PluginProvider) { + initializedAllPlugins = false; let cached; let isInitialized = false; pluginsByName()[name] = { From c97bf3f06f02cbdddbe0a00223c02b579e983100 Mon Sep 17 00:00:00 2001 From: Pavel Jbanov Date: Wed, 31 Jul 2024 15:48:14 -0400 Subject: [PATCH 7/9] typos --- docs/errors/no_new_actions_at_runtime.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/errors/no_new_actions_at_runtime.md b/docs/errors/no_new_actions_at_runtime.md index ce1ad52ea..cfed39eac 100644 --- a/docs/errors/no_new_actions_at_runtime.md +++ b/docs/errors/no_new_actions_at_runtime.md @@ -1,6 +1,6 @@ # No new actions at runtime error -Defining new action at runtime is not alowed. +Defining new actions at runtime is not allowed. ✅ DO: From adbbd5728b0a5a5c05ffa96feddf9e80db95ad8d Mon Sep 17 00:00:00 2001 From: Pavel Jbanov Date: Thu, 8 Aug 2024 21:06:53 -0400 Subject: [PATCH 8/9] initializeAllPlugins --- js/core/src/action.ts | 4 ++-- js/core/src/registry.ts | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/js/core/src/action.ts b/js/core/src/action.ts index bd4af19fe..182995b8f 100644 --- a/js/core/src/action.ts +++ b/js/core/src/action.ts @@ -20,8 +20,8 @@ import { performance } from 'node:perf_hooks'; import * as z from 'zod'; import { ActionType, + initializeAllPlugins, lookupPlugin, - maybeInitializeAllPlugins, registerAction, } from './registry.js'; import { parseSchema } from './schema.js'; @@ -229,7 +229,7 @@ export function defineAction< } const act = action(config, async (i: I): Promise> => { setCustomMetadataAttributes({ subtype: config.actionType }); - await maybeInitializeAllPlugins(); + await initializeAllPlugins(); return await runInActionRuntimeContext(() => fn(i)); }); act.__action.actionType = config.actionType; diff --git a/js/core/src/registry.ts b/js/core/src/registry.ts index d0b93f406..b46b636ba 100644 --- a/js/core/src/registry.ts +++ b/js/core/src/registry.ts @@ -126,19 +126,19 @@ type ActionsRecord = Record>; * Returns all actions in the registry. */ export async function listActions(): Promise { - await maybeInitializeAllPlugins(); + await initializeAllPlugins(); return Object.assign({}, actionsById()); } -let initializedAllPlugins = false; -export async function maybeInitializeAllPlugins() { - if (initializedAllPlugins) { +let allPluginsInitialized = false; +export async function initializeAllPlugins() { + if (allPluginsInitialized) { return; } for (const pluginName of Object.keys(pluginsByName())) { await initializePlugin(pluginName); } - initializedAllPlugins = true; + allPluginsInitialized = true; } /** @@ -204,7 +204,7 @@ export async function lookupFlowStateStore( * Registers a flow state store for the given environment. */ export function registerPluginProvider(name: string, provider: PluginProvider) { - initializedAllPlugins = false; + allPluginsInitialized = false; let cached; let isInitialized = false; pluginsByName()[name] = { From 5a059d1ddcdd95a39ffb8e42c721f7eceb2c23dd Mon Sep 17 00:00:00 2001 From: Pavel Jbanov Date: Thu, 8 Aug 2024 21:08:36 -0400 Subject: [PATCH 9/9] initializeAllPlugins --- js/flow/src/flow.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/js/flow/src/flow.ts b/js/flow/src/flow.ts index 0b43344d0..1eb51fb7b 100644 --- a/js/flow/src/flow.ts +++ b/js/flow/src/flow.ts @@ -28,7 +28,7 @@ import { StreamingCallback, } from '@genkit-ai/core'; import { logger } from '@genkit-ai/core/logging'; -import { maybeInitializeAllPlugins } from '@genkit-ai/core/registry'; +import { initializeAllPlugins } from '@genkit-ai/core/registry'; import { toJsonSchema } from '@genkit-ai/core/schema'; import { newTrace, @@ -392,7 +392,7 @@ export class Flow< labels: Record | undefined ) { const startTimeMs = performance.now(); - await maybeInitializeAllPlugins(); + await initializeAllPlugins(); await runWithActiveContext(ctx, async () => { let traceContext; if (ctx.state.traceContext) {