Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Go] move tracing to an internal package #224

Merged
merged 1 commit into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions go/genkit/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/firebase/genkit/go/internal"
"github.com/firebase/genkit/go/internal/tracing"
"github.com/invopop/jsonschema"
)

Expand Down Expand Up @@ -56,7 +57,7 @@ type NoStream = StreamingCallback[struct{}]
type Action[I, O, S any] struct {
name string
fn Func[I, O, S]
tstate *tracingState
tstate *tracing.State
inputSchema *jsonschema.Schema
outputSchema *jsonschema.Schema
// optional
Expand Down Expand Up @@ -89,18 +90,18 @@ func NewStreamingAction[I, O, S any](name string, metadata map[string]any, fn Fu
// Name returns the Action's name.
func (a *Action[I, O, S]) Name() string { return a.name }

// setTracingState sets the action's tracingState.
func (a *Action[I, O, S]) setTracingState(tstate *tracingState) { a.tstate = tstate }
// setTracingState sets the action's tracing.State.
func (a *Action[I, O, S]) setTracingState(tstate *tracing.State) { a.tstate = tstate }

// Run executes the Action's function in a new trace span.
func (a *Action[I, O, S]) Run(ctx context.Context, input I, cb StreamingCallback[S]) (output O, err error) {
// TODO: validate input against JSONSchema for I.
// TODO: validate output against JSONSchema for O.
Logger(ctx).Debug("Action.Run",
internal.Logger(ctx).Debug("Action.Run",
"name", a.name,
"input", fmt.Sprintf("%#v", input))
defer func() {
Logger(ctx).Debug("Action.Run",
internal.Logger(ctx).Debug("Action.Run",
"name", a.name,
"output", fmt.Sprintf("%#v", output),
"err", err)
Expand All @@ -110,7 +111,7 @@ func (a *Action[I, O, S]) Run(ctx context.Context, input I, cb StreamingCallback
// This action has probably not been registered.
tstate = globalRegistry.tstate
}
return runInNewSpan(ctx, tstate, a.name, "action", false, input,
return tracing.RunInNewSpan(ctx, tstate, a.name, "action", false, input,
func(ctx context.Context, input I) (O, error) {
start := time.Now()
out, err := a.fn(ctx, input, cb)
Expand Down Expand Up @@ -163,8 +164,8 @@ type action interface {
// the registry will set.
desc() actionDesc

// setTracingState set's the action's tracingState.
setTracingState(*tracingState)
// setTracingState set's the action's tracing.State.
setTracingState(*tracing.State)
}

// An actionDesc is a description of an Action.
Expand Down
27 changes: 27 additions & 0 deletions go/genkit/action_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,30 @@ func TestActionStreaming(t *testing.T) {
t.Errorf("got %d, want %d", got, n)
}
}

func TestActionTracing(t *testing.T) {
ctx := context.Background()
const actionName = "TestTracing-inc"
a := NewAction(actionName, nil, inc)
if _, err := a.Run(context.Background(), 3, nil); err != nil {
t.Fatal(err)
}
// The dev TraceStore is registered by Init, called from TestMain.
ts := globalRegistry.lookupTraceStore(EnvironmentDev)
tds, _, err := ts.List(ctx, nil)
if err != nil {
t.Fatal(err)
}
// The same trace store is used for all tests, so there might be several traces.
// Look for this one, which has a unique name.
for _, td := range tds {
if td.DisplayName == actionName {
// Spot check: expect a single span.
if g, w := len(td.Spans), 1; g != w {
t.Errorf("got %d spans, want %d", g, w)
}
return
}
}
t.Fatalf("did not find trace named %q", actionName)
}
21 changes: 11 additions & 10 deletions go/genkit/dev_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ import (
"syscall"
"time"

"github.com/firebase/genkit/go/gtrace"
"github.com/firebase/genkit/go/internal"
"github.com/firebase/genkit/go/internal/tracing"
"go.opentelemetry.io/otel/trace"
)

Expand Down Expand Up @@ -160,7 +161,7 @@ func (s *devServer) handleRunAction(w http.ResponseWriter, r *http.Request) erro
return err
}
}
Logger(ctx).Debug("running action",
internal.Logger(ctx).Debug("running action",
"key", body.Key,
"stream", stream)
var callback StreamingCallback[json.RawMessage]
Expand Down Expand Up @@ -193,8 +194,8 @@ func runAction(ctx context.Context, reg *registry, key string, input json.RawMes
return nil, &httpError{http.StatusNotFound, fmt.Errorf("no action with key %q", key)}
}
var traceID string
output, err := runInNewSpan(ctx, reg.tstate, "dev-run-action-wrapper", "", true, input, func(ctx context.Context, input json.RawMessage) (json.RawMessage, error) {
SetCustomMetadataAttr(ctx, "genkit-dev-internal", "true")
output, err := tracing.RunInNewSpan(ctx, reg.tstate, "dev-run-action-wrapper", "", true, input, func(ctx context.Context, input json.RawMessage) (json.RawMessage, error) {
tracing.SetCustomMetadataAttr(ctx, "genkit-dev-internal", "true")
traceID = trace.SpanContextFromContext(ctx).TraceID().String()
return action.runJSON(ctx, input, cb)
})
Expand Down Expand Up @@ -251,22 +252,22 @@ func (s *devServer) handleListTraces(w http.ResponseWriter, r *http.Request) err
}
}
ctoken := r.FormValue("continuationToken")
tds, ctoken, err := ts.List(r.Context(), &gtrace.Query{Limit: limit, ContinuationToken: ctoken})
if errors.Is(err, gtrace.ErrBadQuery) {
tds, ctoken, err := ts.List(r.Context(), &tracing.Query{Limit: limit, ContinuationToken: ctoken})
if errors.Is(err, tracing.ErrBadQuery) {
return &httpError{http.StatusBadRequest, err}
}
if err != nil {
return err
}
if tds == nil {
tds = []*gtrace.Data{}
tds = []*tracing.Data{}
}
return writeJSON(r.Context(), w, listTracesResult{tds, ctoken})
}

type listTracesResult struct {
Traces []*gtrace.Data `json:"traces"`
ContinuationToken string `json:"continuationToken"`
Traces []*tracing.Data `json:"traces"`
ContinuationToken string `json:"continuationToken"`
}

func (s *devServer) handleListFlowStates(w http.ResponseWriter, r *http.Request) error {
Expand All @@ -286,7 +287,7 @@ func writeJSON(ctx context.Context, w http.ResponseWriter, value any) error {
}
_, err = w.Write(data)
if err != nil {
Logger(ctx).Error("writing output", "err", err)
internal.Logger(ctx).Error("writing output", "err", err)
}
return nil
}
16 changes: 8 additions & 8 deletions go/genkit/dev_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"strings"
"testing"

"github.com/firebase/genkit/go/gtrace"
"github.com/firebase/genkit/go/internal/tracing"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
)
Expand Down Expand Up @@ -114,13 +114,13 @@ func checkActionTrace(t *testing.T, reg *registry, tid, name string) {
t.Fatal(err)
}
rootSpan := findRootSpan(t, td.Spans)
want := &gtrace.SpanData{
want := &tracing.SpanData{
TraceID: tid,
DisplayName: "dev-run-action-wrapper",
SpanKind: "INTERNAL",
SameProcessAsParentSpan: gtrace.BoolValue{Value: true},
Status: gtrace.Status{Code: 0},
InstrumentationLibrary: gtrace.InstrumentationLibrary{
SameProcessAsParentSpan: tracing.BoolValue{Value: true},
Status: tracing.Status{Code: 0},
InstrumentationLibrary: tracing.InstrumentationLibrary{
Name: "genkit-tracer",
Version: "v1",
},
Expand All @@ -134,17 +134,17 @@ func checkActionTrace(t *testing.T, reg *registry, tid, name string) {
"genkit:state": "success",
},
}
diff := cmp.Diff(want, rootSpan, cmpopts.IgnoreFields(gtrace.SpanData{}, "SpanID", "StartTime", "EndTime"))
diff := cmp.Diff(want, rootSpan, cmpopts.IgnoreFields(tracing.SpanData{}, "SpanID", "StartTime", "EndTime"))
if diff != "" {
t.Errorf("mismatch (-want, +got):\n%s", diff)
}
}

// findRootSpan finds the root span in spans.
// It also verifies that it is unique.
func findRootSpan(t *testing.T, spans map[string]*gtrace.SpanData) *gtrace.SpanData {
func findRootSpan(t *testing.T, spans map[string]*tracing.SpanData) *tracing.SpanData {
t.Helper()
var root *gtrace.SpanData
var root *tracing.SpanData
for _, sd := range spans {
if sd.ParentSpanID == "" {
if root != nil {
Expand Down
3 changes: 2 additions & 1 deletion go/genkit/dotprompt/genkit.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/firebase/genkit/go/ai"
"github.com/firebase/genkit/go/genkit"
"github.com/firebase/genkit/go/internal/tracing"
)

// ActionInput is the input type of a prompt action.
Expand Down Expand Up @@ -160,7 +161,7 @@ func (p *Prompt) Register() error {
// passes the rendered template to the AI generator specified by
// the prompt.
func (p *Prompt) Execute(ctx context.Context, input *ActionInput) (*ai.GenerateResponse, error) {
genkit.SetCustomMetadataAttr(ctx, "subtype", "prompt")
tracing.SetCustomMetadataAttr(ctx, "subtype", "prompt")

genReq, err := p.buildRequest(input)
if err != nil {
Expand Down
35 changes: 18 additions & 17 deletions go/genkit/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/firebase/genkit/go/gtime"
"github.com/firebase/genkit/go/internal"
"github.com/firebase/genkit/go/internal/tracing"
"github.com/google/uuid"
otrace "go.opentelemetry.io/otel/trace"
)
Expand Down Expand Up @@ -89,7 +90,7 @@ type Flow[I, O, S any] struct {
name string // The last component of the flow's key in the registry.
fn Func[I, O, S] // The function to run.
stateStore FlowStateStore // Where FlowStates are stored, to support resumption.
tstate *tracingState // set from the action when the flow is defined
tstate *tracing.State // set from the action when the flow is defined
// TODO(jba): scheduler
// TODO(jba): experimentalDurable
// TODO(jba): authPolicy
Expand Down Expand Up @@ -239,7 +240,7 @@ type FlowResult[O any] struct {
// action creates an action for the flow. See the comment at the top of this file for more information.
func (f *Flow[I, O, S]) action() *Action[*flowInstruction[I], *flowState[I, O], S] {
return NewStreamingAction(f.name, nil, func(ctx context.Context, inst *flowInstruction[I], cb StreamingCallback[S]) (*flowState[I, O], error) {
spanMetaKey.fromContext(ctx).SetAttr("flow:wrapperAction", "true")
tracing.SpanMetaKey.FromContext(ctx).SetAttr("flow:wrapperAction", "true")
return f.runInstruction(ctx, inst, cb)
})
}
Expand Down Expand Up @@ -289,10 +290,10 @@ func (f *Flow[I, O, S]) execute(ctx context.Context, state *flowState[I, O], dis
defer func() {
if err := fctx.finish(ctx); err != nil {
// TODO(jba): do something more with this error?
Logger(ctx).Error("flowContext.finish", "err", err.Error())
internal.Logger(ctx).Error("flowContext.finish", "err", err.Error())
}
}()
ctx = flowContextKey.newContext(ctx, fctx)
ctx = flowContextKey.NewContext(ctx, fctx)
exec := &flowExecution{
StartTime: gtime.ToMilliseconds(time.Now()),
}
Expand All @@ -301,8 +302,8 @@ func (f *Flow[I, O, S]) execute(ctx context.Context, state *flowState[I, O], dis
state.mu.Unlock()
// TODO(jba): retrieve the JSON-marshaled SpanContext from state.traceContext.
// TODO(jba): add a span link to the context.
output, err := runInNewSpan(ctx, fctx.tracingState(), f.name, "flow", true, state.Input, func(ctx context.Context, input I) (O, error) {
spanMeta := spanMetaKey.fromContext(ctx)
output, err := tracing.RunInNewSpan(ctx, fctx.tracingState(), f.name, "flow", true, state.Input, func(ctx context.Context, input I) (O, error) {
spanMeta := tracing.SpanMetaKey.FromContext(ctx)
spanMeta.SetAttr("flow:execution", strconv.Itoa(len(state.Executions)-1))
// TODO(jba): put labels into span metadata.
spanMeta.SetAttr("flow:name", f.name)
Expand All @@ -318,14 +319,14 @@ func (f *Flow[I, O, S]) execute(ctx context.Context, state *flowState[I, O], dis
latency := time.Since(start)
if err != nil {
// TODO(jba): handle InterruptError
Logger(ctx).Error("flow failed",
internal.Logger(ctx).Error("flow failed",
"path", spanMeta.Path,
"err", err.Error(),
)
writeFlowFailure(ctx, f.name, latency, err)
spanMeta.SetAttr("flow:state", "error")
} else {
Logger(ctx).Info("flow succeeded", "path", spanMeta.Path)
internal.Logger(ctx).Info("flow succeeded", "path", spanMeta.Path)
writeFlowSuccess(ctx, f.name, latency)
spanMeta.SetAttr("flow:state", "done")

Expand Down Expand Up @@ -363,7 +364,7 @@ func generateFlowID() (string, error) {
type flowContext[I, O any] struct {
state *flowState[I, O]
stateStore FlowStateStore
tstate *tracingState
tstate *tracing.State
mu sync.Mutex
seenSteps map[string]int // number of times each name appears, to avoid duplicate names
// TODO(jba): auth
Expand All @@ -373,19 +374,19 @@ type flowContext[I, O any] struct {
type flowContexter interface {
uniqueStepName(string) string
stater() flowStater
tracingState() *tracingState
tracingState() *tracing.State
}

func newFlowContext[I, O any](state *flowState[I, O], store FlowStateStore, tstate *tracingState) *flowContext[I, O] {
func newFlowContext[I, O any](state *flowState[I, O], store FlowStateStore, tstate *tracing.State) *flowContext[I, O] {
return &flowContext[I, O]{
state: state,
stateStore: store,
tstate: tstate,
seenSteps: map[string]int{},
}
}
func (fc *flowContext[I, O]) stater() flowStater { return fc.state }
func (fc *flowContext[I, O]) tracingState() *tracingState { return fc.tstate }
func (fc *flowContext[I, O]) stater() flowStater { return fc.state }
func (fc *flowContext[I, O]) tracingState() *tracing.State { return fc.tstate }

// finish is called at the end of a flow execution.
func (fc *flowContext[I, O]) finish(ctx context.Context) error {
Expand All @@ -408,7 +409,7 @@ func (fc *flowContext[I, O]) uniqueStepName(name string) string {
return fmt.Sprintf("%s-%d", name, n)
}

var flowContextKey = newContextKey[flowContexter]()
var flowContextKey = internal.NewContextKey[flowContexter]()

// Run runs the function f in the context of the current flow.
// It returns an error if no flow is active.
Expand All @@ -418,16 +419,16 @@ var flowContextKey = newContextKey[flowContexter]()
// is restarted, f will not be called a second time.
func Run[T any](ctx context.Context, name string, f func() (T, error)) (T, error) {
// from js/flow/src/steps.ts
fc := flowContextKey.fromContext(ctx)
fc := flowContextKey.FromContext(ctx)
if fc == nil {
var z T
return z, fmt.Errorf("genkit.Run(%q): must be called from a flow", name)
}
// TODO(jba): The input here is irrelevant. Perhaps runInNewSpan should have only a result type param,
// as in the js.
return runInNewSpan(ctx, fc.tracingState(), name, "flowStep", false, 0, func(ctx context.Context, _ int) (T, error) {
return tracing.RunInNewSpan(ctx, fc.tracingState(), name, "flowStep", false, 0, func(ctx context.Context, _ int) (T, error) {
uName := fc.uniqueStepName(name)
spanMeta := spanMetaKey.fromContext(ctx)
spanMeta := tracing.SpanMetaKey.FromContext(ctx)
spanMeta.SetAttr("flow:stepType", "run")
spanMeta.SetAttr("flow:stepName", name)
spanMeta.SetAttr("flow:resolvedStepName", uName)
Expand Down
3 changes: 2 additions & 1 deletion go/genkit/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sync"
"time"

"github.com/firebase/genkit/go/internal"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
Expand All @@ -37,7 +38,7 @@ var fetchInstruments = sync.OnceValue(func() *metricInstruments {
insts, err := initInstruments()
if err != nil {
// Do not stop the program because we can't collect metrics.
Logger(context.Background()).Error("metric initialization failed; no metrics will be collected", "err", err)
internal.Logger(context.Background()).Error("metric initialization failed; no metrics will be collected", "err", err)
return nil
}
return insts
Expand Down
Loading
Loading