Skip to content

Commit

Permalink
[Go] improve doc for genkit package
Browse files Browse the repository at this point in the history
  • Loading branch information
jba committed May 28, 2024
1 parent d367216 commit 7518996
Showing 1 changed file with 22 additions and 18 deletions.
40 changes: 22 additions & 18 deletions go/genkit/genkit.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,23 @@ import (

// DefineFlow creates a Flow that runs fn, and registers it as an action.
//
// fn takes an input of type I and returns an output of type O, optionally
// streaming values of type S incrementally by invoking a callback.
// If the callback is non-nil and the function supports streaming, it should
// fn takes an input of type In and returns an output of type Out, optionally
// streaming values of type Stream incrementally by invoking a callback.
// Pass [NoStream] for functions that do not support streaming.
//
// If the function supports streaming and the callback is non-nil, it should
// stream the results by invoking the callback periodically, ultimately returning
// with a final return value. Otherwise, it should ignore the StreamingCallback and
// with a final return value. Otherwise, it should ignore the callback and
// just return a result.
func DefineFlow[I, O, S any](
func DefineFlow[In, Out, Stream any](
name string,
fn func(ctx context.Context, input I, callback func(context.Context, S) error) (O, error),
) *core.Flow[I, O, S] {
return core.DefineFlow(name, core.Func[I, O, S](fn))
fn func(ctx context.Context, input In, callback func(context.Context, Stream) error) (Out, error),
) *core.Flow[In, Out, Stream] {
return core.DefineFlow(name, core.Func[In, Out, Stream](fn))
}

// Run runs the function f in the context of the current flow.
// Run runs the function f in the context of the current flow
// and returns what f returns.
// It returns an error if no flow is active.
//
// Each call to Run results in a new step in the flow.
Expand All @@ -50,17 +53,18 @@ func Run[T any](ctx context.Context, name string, f func() (T, error)) (T, error

// RunFlow runs flow in the context of another flow. The flow must run to completion when started
// (that is, it must not have interrupts).
func RunFlow[I, O, S any](ctx context.Context, flow *core.Flow[I, O, S], input I) (O, error) {
func RunFlow[In, Out, Stream any](ctx context.Context, flow *core.Flow[In, Out, Stream], input In) (Out, error) {
return core.RunFlow(ctx, flow, input)
}

// NoStream indicates that a flow does not support streaming.
type NoStream = core.NoStream

// StreamFlowValue is either a streamed value or a final output of a flow.
type StreamFlowValue[O, S any] struct {
type StreamFlowValue[Out, Stream any] struct {
Done bool
Output O // valid if Done is true
Stream S // valid if Done is false
Output Out // valid if Done is true
Stream Stream // valid if Done is false
}

// StreamFlow runs flow on input and delivers both the streamed values and the final output.
Expand All @@ -77,13 +81,13 @@ type StreamFlowValue[O, S any] struct {
// again.
//
// Otherwise the Stream field of the passed [StreamFlowValue] holds a streamed result.
func StreamFlow[I, O, S any](ctx context.Context, flow *core.Flow[I, O, S], input I) func(func(*StreamFlowValue[O, S], error) bool) {
return func(yield func(*StreamFlowValue[O, S], error) bool) {
cb := func(ctx context.Context, s S) error {
func StreamFlow[In, Out, Stream any](ctx context.Context, flow *core.Flow[In, Out, Stream], input In) func(func(*StreamFlowValue[Out, Stream], error) bool) {
return func(yield func(*StreamFlowValue[Out, Stream], error) bool) {
cb := func(ctx context.Context, s Stream) error {
if ctx.Err() != nil {
return ctx.Err()
}
if !yield(&StreamFlowValue[O, S]{Stream: s}, nil) {
if !yield(&StreamFlowValue[Out, Stream]{Stream: s}, nil) {
return errStop
}
return nil
Expand All @@ -92,7 +96,7 @@ func StreamFlow[I, O, S any](ctx context.Context, flow *core.Flow[I, O, S], inpu
if err != nil {
yield(nil, err)
} else {
yield(&StreamFlowValue[O, S]{Done: true, Output: output}, nil)
yield(&StreamFlowValue[Out, Stream]{Done: true, Output: output}, nil)
}
}
}
Expand Down

0 comments on commit 7518996

Please sign in to comment.