Skip to content

Commit

Permalink
feat: do not allow defining new actions from within other actions/flo…
Browse files Browse the repository at this point in the history
…ws (#725)
  • Loading branch information
pavelgj authored Aug 9, 2024
1 parent d063e26 commit 1dba5f0
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 7 deletions.
22 changes: 22 additions & 0 deletions docs/errors/no_new_actions_at_runtime.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# No new actions at runtime error

Defining new actions at runtime is not allowed.

✅ DO:

```ts
const prompt = defineDotprompt({...})

const flow = defineFlow({...}, async (input) => {
await prompt.generate(...);
})
```

❌ DON'T:

```ts
const flow = defineFlow({...}, async (input) => {
const prompt = defineDotprompt({...})
prompt.generate(...);
})
```
34 changes: 31 additions & 3 deletions js/core/src/action.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@ import { JSONSchema7 } from 'json-schema';
import { AsyncLocalStorage } from 'node:async_hooks';
import { performance } from 'node:perf_hooks';
import * as z from 'zod';
import { ActionType, lookupPlugin, registerAction } from './registry.js';
import {
ActionType,
initializeAllPlugins,
lookupPlugin,
registerAction,
} from './registry.js';
import { parseSchema } from './schema.js';
import * as telemetry from './telemetry.js';
import {
Expand Down Expand Up @@ -216,9 +221,16 @@ export function defineAction<
},
fn: (input: z.infer<I>) => Promise<z.infer<O>>
): Action<I, O> {
const act = action(config, (i: I): Promise<z.infer<O>> => {
if (isInRuntimeContext()) {
throw new Error(
'Cannot define new actions at runtime.\n' +
'See: https://github.com/firebase/genkit/blob/main/docs/errors/no_new_actions_at_runtime.md'
);
}
const act = action(config, async (i: I): Promise<z.infer<O>> => {
setCustomMetadataAttributes({ subtype: config.actionType });
return fn(i);
await initializeAllPlugins();
return await runInActionRuntimeContext(() => fn(i));
});
act.__action.actionType = config.actionType;
registerAction(config.actionType, act);
Expand Down Expand Up @@ -252,3 +264,19 @@ export function getStreamingCallback<S>(): StreamingCallback<S> | undefined {
}
return cb;
}

const runtimeCtxAls = new AsyncLocalStorage<any>();

/**
* Checks whether the caller is currently in the runtime context of an action.
*/
export function isInRuntimeContext() {
return !!runtimeCtxAls.getStore();
}

/**
* Execute the provided function in the action runtime context.
*/
export function runInActionRuntimeContext<R>(fn: () => R) {
return runtimeCtxAls.run('runtime', fn);
}
8 changes: 7 additions & 1 deletion js/core/src/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/

import { z } from 'zod';
import { Action } from './action.js';
import { Action, isInRuntimeContext } from './action.js';
import { FlowStateStore } from './flowTypes.js';
import { LoggerConfig, TelemetryConfig } from './telemetryTypes.js';
import { TraceStore } from './tracing.js';
Expand Down Expand Up @@ -60,6 +60,12 @@ export function genkitPlugin<T extends PluginInit>(
pluginName: string,
initFn: T
): Plugin<Parameters<T>> {
if (isInRuntimeContext()) {
throw new Error(
'Cannot define new plugins at runtime.\n' +
'See: https://github.com/firebase/genkit/blob/main/docs/errors/no_new_actions_at_runtime.md'
);
}
return (...args: Parameters<T>) => ({
name: pluginName,
initializer: async () => {
Expand Down
16 changes: 14 additions & 2 deletions js/core/src/registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,19 @@ type ActionsRecord = Record<string, Action<z.ZodTypeAny, z.ZodTypeAny>>;
* Returns all actions in the registry.
*/
export async function listActions(): Promise<ActionsRecord> {
await initializeAllPlugins();
return Object.assign({}, actionsById());
}

let allPluginsInitialized = false;
export async function initializeAllPlugins() {
if (allPluginsInitialized) {
return;
}
for (const pluginName of Object.keys(pluginsByName())) {
await initializePlugin(pluginName);
}
return Object.assign({}, actionsById());
allPluginsInitialized = true;
}

/**
Expand Down Expand Up @@ -195,14 +204,17 @@ export async function lookupFlowStateStore(
* Registers a flow state store for the given environment.
*/
export function registerPluginProvider(name: string, provider: PluginProvider) {
allPluginsInitialized = false;
let cached;
let isInitialized = false;
pluginsByName()[name] = {
name: provider.name,
initializer: () => {
if (cached) {
if (isInitialized) {
return cached;
}
cached = provider.initializer();
isInitialized = true;
return cached;
},
};
Expand Down
2 changes: 2 additions & 0 deletions js/flow/src/flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import {
StreamingCallback,
} from '@genkit-ai/core';
import { logger } from '@genkit-ai/core/logging';
import { initializeAllPlugins } from '@genkit-ai/core/registry';
import { toJsonSchema } from '@genkit-ai/core/schema';
import {
newTrace,
Expand Down Expand Up @@ -391,6 +392,7 @@ export class Flow<
labels: Record<string, string> | undefined
) {
const startTimeMs = performance.now();
await initializeAllPlugins();
await runWithActiveContext(ctx, async () => {
let traceContext;
if (ctx.state.traceContext) {
Expand Down
3 changes: 2 additions & 1 deletion js/flow/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/

import { runInActionRuntimeContext } from '@genkit-ai/core';
import { AsyncLocalStorage } from 'node:async_hooks';
import { v4 as uuidv4 } from 'uuid';
import z from 'zod';
Expand Down Expand Up @@ -45,7 +46,7 @@ export function runWithActiveContext<R>(
ctx: Context<z.ZodTypeAny, z.ZodTypeAny, z.ZodTypeAny>,
fn: () => R
) {
return ctxAsyncLocalStorage.run(ctx, fn);
return ctxAsyncLocalStorage.run(ctx, () => runInActionRuntimeContext(fn));
}

/**
Expand Down

0 comments on commit 1dba5f0

Please sign in to comment.