Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Go] improve doc for genkit package #266

Merged
merged 1 commit into from
May 28, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 22 additions & 18 deletions go/genkit/genkit.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,23 @@ import (

// DefineFlow creates a Flow that runs fn, and registers it as an action.
//
// fn takes an input of type I and returns an output of type O, optionally
// streaming values of type S incrementally by invoking a callback.
// If the callback is non-nil and the function supports streaming, it should
// fn takes an input of type In and returns an output of type Out, optionally
// streaming values of type Stream incrementally by invoking a callback.
// Pass [NoStream] for functions that do not support streaming.
//
// If the function supports streaming and the callback is non-nil, it should
// stream the results by invoking the callback periodically, ultimately returning
// with a final return value. Otherwise, it should ignore the StreamingCallback and
// with a final return value. Otherwise, it should ignore the callback and
// just return a result.
func DefineFlow[I, O, S any](
func DefineFlow[In, Out, Stream any](
name string,
fn func(ctx context.Context, input I, callback func(context.Context, S) error) (O, error),
) *core.Flow[I, O, S] {
return core.DefineFlow(name, core.Func[I, O, S](fn))
fn func(ctx context.Context, input In, callback func(context.Context, Stream) error) (Out, error),
) *core.Flow[In, Out, Stream] {
return core.DefineFlow(name, core.Func[In, Out, Stream](fn))
}

// Run runs the function f in the context of the current flow.
// Run runs the function f in the context of the current flow
// and returns what f returns.
// It returns an error if no flow is active.
//
// Each call to Run results in a new step in the flow.
Expand All @@ -50,17 +53,18 @@ func Run[T any](ctx context.Context, name string, f func() (T, error)) (T, error

// RunFlow runs flow in the context of another flow. The flow must run to completion when started
// (that is, it must not have interrupts).
func RunFlow[I, O, S any](ctx context.Context, flow *core.Flow[I, O, S], input I) (O, error) {
func RunFlow[In, Out, Stream any](ctx context.Context, flow *core.Flow[In, Out, Stream], input In) (Out, error) {
return core.RunFlow(ctx, flow, input)
}

// NoStream indicates that a flow does not support streaming.
type NoStream = core.NoStream

// StreamFlowValue is either a streamed value or a final output of a flow.
type StreamFlowValue[O, S any] struct {
type StreamFlowValue[Out, Stream any] struct {
Done bool
Output O // valid if Done is true
Stream S // valid if Done is false
Output Out // valid if Done is true
Stream Stream // valid if Done is false
}

// StreamFlow runs flow on input and delivers both the streamed values and the final output.
Expand All @@ -77,13 +81,13 @@ type StreamFlowValue[O, S any] struct {
// again.
//
// Otherwise the Stream field of the passed [StreamFlowValue] holds a streamed result.
func StreamFlow[I, O, S any](ctx context.Context, flow *core.Flow[I, O, S], input I) func(func(*StreamFlowValue[O, S], error) bool) {
return func(yield func(*StreamFlowValue[O, S], error) bool) {
cb := func(ctx context.Context, s S) error {
func StreamFlow[In, Out, Stream any](ctx context.Context, flow *core.Flow[In, Out, Stream], input In) func(func(*StreamFlowValue[Out, Stream], error) bool) {
return func(yield func(*StreamFlowValue[Out, Stream], error) bool) {
cb := func(ctx context.Context, s Stream) error {
if ctx.Err() != nil {
return ctx.Err()
}
if !yield(&StreamFlowValue[O, S]{Stream: s}, nil) {
if !yield(&StreamFlowValue[Out, Stream]{Stream: s}, nil) {
return errStop
}
return nil
Expand All @@ -92,7 +96,7 @@ func StreamFlow[I, O, S any](ctx context.Context, flow *core.Flow[I, O, S], inpu
if err != nil {
yield(nil, err)
} else {
yield(&StreamFlowValue[O, S]{Done: true, Output: output}, nil)
yield(&StreamFlowValue[Out, Stream]{Done: true, Output: output}, nil)
}
}
}
Expand Down
Loading