Skip to content

Commit 5f21198

Browse files
authored
[JS] Split out defineFlow into stream/non-stream and make flows callable as functions. (#795)
1 parent 62c241e commit 5f21198

File tree

22 files changed

+420
-314
lines changed

22 files changed

+420
-314
lines changed

genkit-tools/cli/config/nextjs.genkit.ts.template

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import * as z from 'zod';
55
// Import the Genkit core libraries and plugins.
66
import { generate } from '@genkit-ai/ai';
77
import { configureGenkit } from '@genkit-ai/core';
8-
import { defineFlow, runFlow } from '@genkit-ai/flow';
8+
import { defineFlow } from '@genkit-ai/flow';
99
$GENKIT_CONFIG_IMPORTS
1010
$GENKIT_MODEL_IMPORT
1111

@@ -47,6 +47,6 @@ const menuSuggestionFlow = defineFlow(
4747
export async function callMenuSuggestionFlow() {
4848
// Invoke the flow. The value you pass as the second parameter must conform to
4949
// your flow's input schema.
50-
const flowResponse = await runFlow(menuSuggestionFlow, 'banana');
50+
const flowResponse = await menuSuggestionFlow('banana');
5151
console.log(flowResponse);
5252
}

js/flow/src/context.ts

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@ import { metadataPrefix } from './utils.js';
3232
* Context object encapsulates flow execution state at runtime.
3333
*/
3434
export class Context<
35-
I extends z.ZodTypeAny,
36-
O extends z.ZodTypeAny,
37-
S extends z.ZodTypeAny,
35+
I extends z.ZodTypeAny = z.ZodTypeAny,
36+
O extends z.ZodTypeAny = z.ZodTypeAny,
37+
S extends z.ZodTypeAny = z.ZodTypeAny,
3838
> {
3939
private seenSteps: Record<string, number> = {};
4040

@@ -73,7 +73,9 @@ export class Context<
7373
}
7474
}
7575

76-
// Runs provided function in the current context. The config can specify retry and other behaviors.
76+
/**
77+
* Runs provided function in the current context. The config can specify retry and other behaviors.
78+
*/
7779
async run<T>(
7880
config: RunStepConfig,
7981
input: any | undefined,
@@ -124,8 +126,10 @@ export class Context<
124126
return name;
125127
}
126128

127-
// Executes interrupt step in the current context.
128-
async interrupt<I extends z.ZodTypeAny, O>(
129+
/**
130+
* Executes interrupt step.
131+
*/
132+
async interrupt(
129133
stepName: string,
130134
func: (payload: I) => Promise<O>,
131135
responseSchema: I | null,
@@ -189,17 +193,18 @@ export class Context<
189193
);
190194
}
191195

192-
// Sleep for the specified number of seconds.
193-
async sleep<I extends z.ZodTypeAny, O extends z.ZodTypeAny>(
194-
stepName: string,
195-
seconds: number
196-
): Promise<O> {
196+
/**
197+
* Sleep for the specified number of seconds.
198+
*/
199+
async sleep(stepName: string, seconds: number): Promise<O> {
200+
if (!this.flow.scheduler) {
201+
throw new Error('Cannot sleep in a flow with no scheduler.');
202+
}
197203
const resolvedStepName = this.resolveStepName(stepName);
198204
if (this.isCached(resolvedStepName)) {
199205
setCustomMetadataAttribute(metadataPrefix('state'), 'skipped');
200206
return this.getCached(resolvedStepName);
201207
}
202-
203208
await this.flow.scheduler(
204209
this.flow,
205210
{
@@ -227,6 +232,9 @@ export class Context<
227232
flowIds: string[];
228233
pollingConfig?: PollingConfig;
229234
}): Promise<Operation[]> {
235+
if (!this.flow.scheduler) {
236+
throw new Error('Cannot wait for a flow with no scheduler.');
237+
}
230238
const resolvedStepName = this.resolveStepName(opts.stepName);
231239
if (this.isCached(resolvedStepName)) {
232240
return this.getCached(resolvedStepName);

js/flow/src/experimental.ts

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ import {
2424
FlowStillRunningError,
2525
} from './errors.js';
2626
import {
27+
CallableFlow,
2728
Flow,
28-
FlowWrapper,
2929
RunStepConfig,
3030
StepsFunction,
3131
defineFlow,
@@ -39,30 +39,27 @@ import { getActiveContext } from './utils.js';
3939
export function durableFlow<
4040
I extends z.ZodTypeAny = z.ZodTypeAny,
4141
O extends z.ZodTypeAny = z.ZodTypeAny,
42-
S extends z.ZodTypeAny = z.ZodTypeAny,
4342
>(
4443
config: {
4544
name: string;
4645
inputSchema?: I;
4746
outputSchema?: O;
48-
streamSchema?: S;
49-
invoker?: Invoker<I, O, S>;
50-
scheduler?: Scheduler<I, O, S>;
47+
invoker?: Invoker<I, O, z.ZodVoid>;
48+
scheduler?: Scheduler<I, O>;
5149
},
52-
steps: StepsFunction<I, O, S>
53-
): Flow<I, O, S> {
50+
steps: StepsFunction<I, O, z.ZodVoid>
51+
): Flow<I, O, z.ZodVoid> {
5452
return defineFlow(
5553
{
5654
name: config.name,
5755
inputSchema: config.inputSchema,
5856
outputSchema: config.outputSchema,
59-
streamSchema: config.streamSchema,
6057
invoker: config.invoker,
6158
experimentalScheduler: config.scheduler,
6259
experimentalDurable: true,
6360
},
6461
steps
65-
);
62+
).flow;
6663
}
6764

6865
/**
@@ -71,9 +68,8 @@ export function durableFlow<
7168
export async function scheduleFlow<
7269
I extends z.ZodTypeAny = z.ZodTypeAny,
7370
O extends z.ZodTypeAny = z.ZodTypeAny,
74-
S extends z.ZodTypeAny = z.ZodTypeAny,
7571
>(
76-
flow: Flow<I, O, S> | FlowWrapper<I, O, S>,
72+
flow: Flow<I, O, z.ZodVoid> | CallableFlow<I, O>,
7773
payload: z.infer<I>,
7874
delaySeconds?: number
7975
): Promise<Operation> {
@@ -97,7 +93,7 @@ export async function resumeFlow<
9793
O extends z.ZodTypeAny = z.ZodTypeAny,
9894
S extends z.ZodTypeAny = z.ZodTypeAny,
9995
>(
100-
flow: Flow<I, O, S> | FlowWrapper<I, O, S>,
96+
flow: Flow<I, O, z.ZodVoid> | CallableFlow<I, O>,
10197
flowId: string,
10298
payload: any
10399
): Promise<Operation> {
@@ -118,9 +114,8 @@ export async function resumeFlow<
118114
export async function getFlowState<
119115
I extends z.ZodTypeAny,
120116
O extends z.ZodTypeAny,
121-
S extends z.ZodTypeAny,
122117
>(
123-
flow: Flow<I, O, S> | FlowWrapper<I, O, S>,
118+
flow: Flow<I, O, z.ZodVoid> | CallableFlow<I, O>,
124119
flowId: string
125120
): Promise<Operation> {
126121
if (!(flow instanceof Flow)) {
@@ -163,9 +158,8 @@ export function runAction<I extends z.ZodTypeAny, O extends z.ZodTypeAny>(
163158
export async function waitFlowToComplete<
164159
I extends z.ZodTypeAny = z.ZodTypeAny,
165160
O extends z.ZodTypeAny = z.ZodTypeAny,
166-
S extends z.ZodTypeAny = z.ZodTypeAny,
167161
>(
168-
flow: Flow<I, O, S> | FlowWrapper<I, O, S>,
162+
flow: Flow<I, O, z.ZodVoid> | CallableFlow<I, O>,
169163
flowId: string
170164
): Promise<z.infer<O>> {
171165
if (!(flow instanceof Flow)) {

0 commit comments

Comments
 (0)