Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Go] rename type params in core package #269

Merged
merged 2 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 23 additions & 23 deletions go/core/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ import (
)

// Func is the type of function that Actions and Flows execute.
// It takes an input of type I and returns an output of type O, optionally
// streaming values of type S incrementally by invoking a callback.
// It takes an input of type Int and returns an output of type Out, optionally
// streaming values of type Stream incrementally by invoking a callback.
// If the StreamingCallback is non-nil and the function supports streaming, it should
// stream the results by invoking the callback periodically, ultimately returning
// with a final return value. Otherwise, it should ignore the StreamingCallback and
// just return a result.
type Func[I, O, S any] func(context.Context, I, func(context.Context, S) error) (O, error)
type Func[In, Out, Stream any] func(context.Context, In, func(context.Context, Stream) error) (Out, error)

// TODO(jba): use a generic type alias for the above when they become available?

Expand All @@ -44,7 +44,7 @@ type Func[I, O, S any] func(context.Context, I, func(context.Context, S) error)
// Such a function corresponds to a Flow[I, O, struct{}].
type NoStream = func(context.Context, struct{}) error

type streamingCallback[S any] func(context.Context, S) error
type streamingCallback[Stream any] func(context.Context, Stream) error

// An Action is a named, observable operation.
// It consists of a function that takes an input of type I and returns an output
Expand All @@ -53,10 +53,10 @@ type streamingCallback[S any] func(context.Context, S) error
// and JSON Schemas for its input and output.
//
// Each time an Action is run, it results in a new trace span.
type Action[I, O, S any] struct {
type Action[In, Out, Stream any] struct {
name string
atype ActionType
fn Func[I, O, S]
fn Func[In, Out, Stream]
tstate *tracing.State
inputSchema *jsonschema.Schema
outputSchema *jsonschema.Schema
Expand All @@ -68,20 +68,20 @@ type Action[I, O, S any] struct {
// See js/common/src/types.ts

// NewAction creates a new Action with the given name and non-streaming function.
func NewAction[I, O any](name string, atype ActionType, metadata map[string]any, fn func(context.Context, I) (O, error)) *Action[I, O, struct{}] {
return NewStreamingAction(name, atype, metadata, func(ctx context.Context, in I, cb NoStream) (O, error) {
func NewAction[In, Out any](name string, atype ActionType, metadata map[string]any, fn func(context.Context, In) (Out, error)) *Action[In, Out, struct{}] {
return NewStreamingAction(name, atype, metadata, func(ctx context.Context, in In, cb NoStream) (Out, error) {
return fn(ctx, in)
})
}

// NewStreamingAction creates a new Action with the given name and streaming function.
func NewStreamingAction[I, O, S any](name string, atype ActionType, metadata map[string]any, fn Func[I, O, S]) *Action[I, O, S] {
var i I
var o O
return &Action[I, O, S]{
func NewStreamingAction[In, Out, Stream any](name string, atype ActionType, metadata map[string]any, fn Func[In, Out, Stream]) *Action[In, Out, Stream] {
var i In
var o Out
return &Action[In, Out, Stream]{
name: name,
atype: atype,
fn: func(ctx context.Context, input I, sc func(context.Context, S) error) (O, error) {
fn: func(ctx context.Context, input In, sc func(context.Context, Stream) error) (Out, error) {
tracing.SetCustomMetadataAttr(ctx, "subtype", string(atype))
return fn(ctx, input, sc)
},
Expand All @@ -92,15 +92,15 @@ func NewStreamingAction[I, O, S any](name string, atype ActionType, metadata map
}

// Name returns the Action's name.
func (a *Action[I, O, S]) Name() string { return a.name }
func (a *Action[In, Out, Stream]) Name() string { return a.name }

func (a *Action[I, O, S]) actionType() ActionType { return a.atype }
func (a *Action[In, Out, Stream]) actionType() ActionType { return a.atype }

// setTracingState sets the action's tracing.State.
func (a *Action[I, O, S]) setTracingState(tstate *tracing.State) { a.tstate = tstate }
func (a *Action[In, Out, Stream]) setTracingState(tstate *tracing.State) { a.tstate = tstate }

// Run executes the Action's function in a new trace span.
func (a *Action[I, O, S]) Run(ctx context.Context, input I, cb func(context.Context, S) error) (output O, err error) {
func (a *Action[In, Out, Stream]) Run(ctx context.Context, input In, cb func(context.Context, Stream) error) (output Out, err error) {
// TODO: validate input against JSONSchema for I.
// TODO: validate output against JSONSchema for O.
logger.FromContext(ctx).Debug("Action.Run",
Expand All @@ -118,27 +118,27 @@ func (a *Action[I, O, S]) Run(ctx context.Context, input I, cb func(context.Cont
tstate = globalRegistry.tstate
}
return tracing.RunInNewSpan(ctx, tstate, a.name, "action", false, input,
func(ctx context.Context, input I) (O, error) {
func(ctx context.Context, input In) (Out, error) {
start := time.Now()
out, err := a.fn(ctx, input, cb)
latency := time.Since(start)
if err != nil {
writeActionFailure(ctx, a.name, latency, err)
return internal.Zero[O](), err
return internal.Zero[Out](), err
}
writeActionSuccess(ctx, a.name, latency)
return out, nil
})
}

func (a *Action[I, O, S]) runJSON(ctx context.Context, input json.RawMessage, cb func(context.Context, json.RawMessage) error) (json.RawMessage, error) {
var in I
func (a *Action[In, Out, Stream]) runJSON(ctx context.Context, input json.RawMessage, cb func(context.Context, json.RawMessage) error) (json.RawMessage, error) {
var in In
if err := json.Unmarshal(input, &in); err != nil {
return nil, err
}
var callback func(context.Context, S) error
var callback func(context.Context, Stream) error
if cb != nil {
callback = func(ctx context.Context, s S) error {
callback = func(ctx context.Context, s Stream) error {
bytes, err := json.Marshal(s)
if err != nil {
return err
Expand Down
Loading
Loading