diff --git a/go/genkit/genkit.go b/go/genkit/genkit.go index e13c8b2e4..af094454c 100644 --- a/go/genkit/genkit.go +++ b/go/genkit/genkit.go @@ -25,20 +25,23 @@ import ( // DefineFlow creates a Flow that runs fn, and registers it as an action. // -// fn takes an input of type I and returns an output of type O, optionally -// streaming values of type S incrementally by invoking a callback. -// If the callback is non-nil and the function supports streaming, it should +// fn takes an input of type In and returns an output of type Out, optionally +// streaming values of type Stream incrementally by invoking a callback. +// Pass [NoStream] for functions that do not support streaming. +// +// If the function supports streaming and the callback is non-nil, it should // stream the results by invoking the callback periodically, ultimately returning -// with a final return value. Otherwise, it should ignore the StreamingCallback and +// with a final return value. Otherwise, it should ignore the callback and // just return a result. -func DefineFlow[I, O, S any]( +func DefineFlow[In, Out, Stream any]( name string, - fn func(ctx context.Context, input I, callback func(context.Context, S) error) (O, error), -) *core.Flow[I, O, S] { - return core.DefineFlow(name, core.Func[I, O, S](fn)) + fn func(ctx context.Context, input In, callback func(context.Context, Stream) error) (Out, error), +) *core.Flow[In, Out, Stream] { + return core.DefineFlow(name, core.Func[In, Out, Stream](fn)) } -// Run runs the function f in the context of the current flow. +// Run runs the function f in the context of the current flow +// and returns what f returns. // It returns an error if no flow is active. // // Each call to Run results in a new step in the flow. @@ -50,17 +53,18 @@ func Run[T any](ctx context.Context, name string, f func() (T, error)) (T, error // RunFlow runs flow in the context of another flow. The flow must run to completion when started // (that is, it must not have interrupts). -func RunFlow[I, O, S any](ctx context.Context, flow *core.Flow[I, O, S], input I) (O, error) { +func RunFlow[In, Out, Stream any](ctx context.Context, flow *core.Flow[In, Out, Stream], input In) (Out, error) { return core.RunFlow(ctx, flow, input) } +// NoStream indicates that a flow does not support streaming. type NoStream = core.NoStream // StreamFlowValue is either a streamed value or a final output of a flow. -type StreamFlowValue[O, S any] struct { +type StreamFlowValue[Out, Stream any] struct { Done bool - Output O // valid if Done is true - Stream S // valid if Done is false + Output Out // valid if Done is true + Stream Stream // valid if Done is false } // StreamFlow runs flow on input and delivers both the streamed values and the final output. @@ -77,13 +81,13 @@ type StreamFlowValue[O, S any] struct { // again. // // Otherwise the Stream field of the passed [StreamFlowValue] holds a streamed result. -func StreamFlow[I, O, S any](ctx context.Context, flow *core.Flow[I, O, S], input I) func(func(*StreamFlowValue[O, S], error) bool) { - return func(yield func(*StreamFlowValue[O, S], error) bool) { - cb := func(ctx context.Context, s S) error { +func StreamFlow[In, Out, Stream any](ctx context.Context, flow *core.Flow[In, Out, Stream], input In) func(func(*StreamFlowValue[Out, Stream], error) bool) { + return func(yield func(*StreamFlowValue[Out, Stream], error) bool) { + cb := func(ctx context.Context, s Stream) error { if ctx.Err() != nil { return ctx.Err() } - if !yield(&StreamFlowValue[O, S]{Stream: s}, nil) { + if !yield(&StreamFlowValue[Out, Stream]{Stream: s}, nil) { return errStop } return nil @@ -92,7 +96,7 @@ func StreamFlow[I, O, S any](ctx context.Context, flow *core.Flow[I, O, S], inpu if err != nil { yield(nil, err) } else { - yield(&StreamFlowValue[O, S]{Done: true, Output: output}, nil) + yield(&StreamFlowValue[Out, Stream]{Done: true, Output: output}, nil) } } }