From c02a9417af8327010112817f4d88d9544f11d53a Mon Sep 17 00:00:00 2001 From: Jonathan Amsterdam Date: Wed, 22 May 2024 09:39:55 -0400 Subject: [PATCH] [Go] move tracing to an internal package End users should not need to look at traces, so move all the tracing machinery to an internal package. This requires that other symbols be moved to internal as well. --- go/genkit/action.go | 17 ++++--- go/genkit/action_test.go | 27 ++++++++++ go/genkit/dev_server.go | 21 ++++---- go/genkit/dev_server_test.go | 16 +++--- go/genkit/dotprompt/genkit.go | 3 +- go/genkit/flow.go | 35 ++++++------- go/genkit/metrics.go | 3 +- go/genkit/registry.go | 24 ++++----- .../util.go => internal/context_key.go} | 19 +++---- go/{genkit => internal}/logging.go | 6 +-- go/{gtrace => internal/tracing}/file_store.go | 2 +- .../tracing}/file_store_test.go | 2 +- go/{gtrace => internal/tracing}/store.go | 3 +- go/{gtrace => internal/tracing}/store_test.go | 2 +- .../tracing}/testdata/trace.json | 0 .../tracing}/trace_store_exporter.go | 41 ++++++++------- .../tracing}/trace_store_exporter_test.go | 17 +++---- go/{genkit => internal/tracing}/tracing.go | 50 +++++++++---------- .../tracing}/tracing_test.go | 30 +---------- go/plugins/localvec/localvec.go | 4 +- 20 files changed, 162 insertions(+), 160 deletions(-) rename go/{genkit/util.go => internal/context_key.go} (61%) rename go/{genkit => internal}/logging.go (90%) rename go/{gtrace => internal/tracing}/file_store.go (99%) rename go/{gtrace => internal/tracing}/file_store_test.go (99%) rename go/{gtrace => internal/tracing}/store.go (98%) rename go/{gtrace => internal/tracing}/store_test.go (99%) rename go/{gtrace => internal/tracing}/testdata/trace.json (100%) rename go/{genkit => internal/tracing}/trace_store_exporter.go (81%) rename go/{genkit => internal/tracing}/trace_store_exporter_test.go (89%) rename go/{genkit => internal/tracing}/tracing.go (81%) rename go/{genkit => internal/tracing}/tracing_test.go (63%) diff --git a/go/genkit/action.go b/go/genkit/action.go index 0711dee39..f9c1774e0 100644 --- a/go/genkit/action.go +++ b/go/genkit/action.go @@ -23,6 +23,7 @@ import ( "time" "github.com/firebase/genkit/go/internal" + "github.com/firebase/genkit/go/internal/tracing" "github.com/invopop/jsonschema" ) @@ -56,7 +57,7 @@ type NoStream = StreamingCallback[struct{}] type Action[I, O, S any] struct { name string fn Func[I, O, S] - tstate *tracingState + tstate *tracing.State inputSchema *jsonschema.Schema outputSchema *jsonschema.Schema // optional @@ -89,18 +90,18 @@ func NewStreamingAction[I, O, S any](name string, metadata map[string]any, fn Fu // Name returns the Action's name. func (a *Action[I, O, S]) Name() string { return a.name } -// setTracingState sets the action's tracingState. -func (a *Action[I, O, S]) setTracingState(tstate *tracingState) { a.tstate = tstate } +// setTracingState sets the action's tracing.State. +func (a *Action[I, O, S]) setTracingState(tstate *tracing.State) { a.tstate = tstate } // Run executes the Action's function in a new trace span. func (a *Action[I, O, S]) Run(ctx context.Context, input I, cb StreamingCallback[S]) (output O, err error) { // TODO: validate input against JSONSchema for I. // TODO: validate output against JSONSchema for O. - Logger(ctx).Debug("Action.Run", + internal.Logger(ctx).Debug("Action.Run", "name", a.name, "input", fmt.Sprintf("%#v", input)) defer func() { - Logger(ctx).Debug("Action.Run", + internal.Logger(ctx).Debug("Action.Run", "name", a.name, "output", fmt.Sprintf("%#v", output), "err", err) @@ -110,7 +111,7 @@ func (a *Action[I, O, S]) Run(ctx context.Context, input I, cb StreamingCallback // This action has probably not been registered. tstate = globalRegistry.tstate } - return runInNewSpan(ctx, tstate, a.name, "action", false, input, + return tracing.RunInNewSpan(ctx, tstate, a.name, "action", false, input, func(ctx context.Context, input I) (O, error) { start := time.Now() out, err := a.fn(ctx, input, cb) @@ -163,8 +164,8 @@ type action interface { // the registry will set. desc() actionDesc - // setTracingState set's the action's tracingState. - setTracingState(*tracingState) + // setTracingState set's the action's tracing.State. + setTracingState(*tracing.State) } // An actionDesc is a description of an Action. diff --git a/go/genkit/action_test.go b/go/genkit/action_test.go index ef28b6f57..d7e23bae1 100644 --- a/go/genkit/action_test.go +++ b/go/genkit/action_test.go @@ -97,3 +97,30 @@ func TestActionStreaming(t *testing.T) { t.Errorf("got %d, want %d", got, n) } } + +func TestActionTracing(t *testing.T) { + ctx := context.Background() + const actionName = "TestTracing-inc" + a := NewAction(actionName, nil, inc) + if _, err := a.Run(context.Background(), 3, nil); err != nil { + t.Fatal(err) + } + // The dev TraceStore is registered by Init, called from TestMain. + ts := globalRegistry.lookupTraceStore(EnvironmentDev) + tds, _, err := ts.List(ctx, nil) + if err != nil { + t.Fatal(err) + } + // The same trace store is used for all tests, so there might be several traces. + // Look for this one, which has a unique name. + for _, td := range tds { + if td.DisplayName == actionName { + // Spot check: expect a single span. + if g, w := len(td.Spans), 1; g != w { + t.Errorf("got %d spans, want %d", g, w) + } + return + } + } + t.Fatalf("did not find trace named %q", actionName) +} diff --git a/go/genkit/dev_server.go b/go/genkit/dev_server.go index 0e13a6fb5..a78216112 100644 --- a/go/genkit/dev_server.go +++ b/go/genkit/dev_server.go @@ -34,7 +34,8 @@ import ( "syscall" "time" - "github.com/firebase/genkit/go/gtrace" + "github.com/firebase/genkit/go/internal" + "github.com/firebase/genkit/go/internal/tracing" "go.opentelemetry.io/otel/trace" ) @@ -160,7 +161,7 @@ func (s *devServer) handleRunAction(w http.ResponseWriter, r *http.Request) erro return err } } - Logger(ctx).Debug("running action", + internal.Logger(ctx).Debug("running action", "key", body.Key, "stream", stream) var callback StreamingCallback[json.RawMessage] @@ -193,8 +194,8 @@ func runAction(ctx context.Context, reg *registry, key string, input json.RawMes return nil, &httpError{http.StatusNotFound, fmt.Errorf("no action with key %q", key)} } var traceID string - output, err := runInNewSpan(ctx, reg.tstate, "dev-run-action-wrapper", "", true, input, func(ctx context.Context, input json.RawMessage) (json.RawMessage, error) { - SetCustomMetadataAttr(ctx, "genkit-dev-internal", "true") + output, err := tracing.RunInNewSpan(ctx, reg.tstate, "dev-run-action-wrapper", "", true, input, func(ctx context.Context, input json.RawMessage) (json.RawMessage, error) { + tracing.SetCustomMetadataAttr(ctx, "genkit-dev-internal", "true") traceID = trace.SpanContextFromContext(ctx).TraceID().String() return action.runJSON(ctx, input, cb) }) @@ -251,22 +252,22 @@ func (s *devServer) handleListTraces(w http.ResponseWriter, r *http.Request) err } } ctoken := r.FormValue("continuationToken") - tds, ctoken, err := ts.List(r.Context(), >race.Query{Limit: limit, ContinuationToken: ctoken}) - if errors.Is(err, gtrace.ErrBadQuery) { + tds, ctoken, err := ts.List(r.Context(), &tracing.Query{Limit: limit, ContinuationToken: ctoken}) + if errors.Is(err, tracing.ErrBadQuery) { return &httpError{http.StatusBadRequest, err} } if err != nil { return err } if tds == nil { - tds = []*gtrace.Data{} + tds = []*tracing.Data{} } return writeJSON(r.Context(), w, listTracesResult{tds, ctoken}) } type listTracesResult struct { - Traces []*gtrace.Data `json:"traces"` - ContinuationToken string `json:"continuationToken"` + Traces []*tracing.Data `json:"traces"` + ContinuationToken string `json:"continuationToken"` } func (s *devServer) handleListFlowStates(w http.ResponseWriter, r *http.Request) error { @@ -286,7 +287,7 @@ func writeJSON(ctx context.Context, w http.ResponseWriter, value any) error { } _, err = w.Write(data) if err != nil { - Logger(ctx).Error("writing output", "err", err) + internal.Logger(ctx).Error("writing output", "err", err) } return nil } diff --git a/go/genkit/dev_server_test.go b/go/genkit/dev_server_test.go index 83c450178..77238dc50 100644 --- a/go/genkit/dev_server_test.go +++ b/go/genkit/dev_server_test.go @@ -24,7 +24,7 @@ import ( "strings" "testing" - "github.com/firebase/genkit/go/gtrace" + "github.com/firebase/genkit/go/internal/tracing" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" ) @@ -114,13 +114,13 @@ func checkActionTrace(t *testing.T, reg *registry, tid, name string) { t.Fatal(err) } rootSpan := findRootSpan(t, td.Spans) - want := >race.SpanData{ + want := &tracing.SpanData{ TraceID: tid, DisplayName: "dev-run-action-wrapper", SpanKind: "INTERNAL", - SameProcessAsParentSpan: gtrace.BoolValue{Value: true}, - Status: gtrace.Status{Code: 0}, - InstrumentationLibrary: gtrace.InstrumentationLibrary{ + SameProcessAsParentSpan: tracing.BoolValue{Value: true}, + Status: tracing.Status{Code: 0}, + InstrumentationLibrary: tracing.InstrumentationLibrary{ Name: "genkit-tracer", Version: "v1", }, @@ -134,7 +134,7 @@ func checkActionTrace(t *testing.T, reg *registry, tid, name string) { "genkit:state": "success", }, } - diff := cmp.Diff(want, rootSpan, cmpopts.IgnoreFields(gtrace.SpanData{}, "SpanID", "StartTime", "EndTime")) + diff := cmp.Diff(want, rootSpan, cmpopts.IgnoreFields(tracing.SpanData{}, "SpanID", "StartTime", "EndTime")) if diff != "" { t.Errorf("mismatch (-want, +got):\n%s", diff) } @@ -142,9 +142,9 @@ func checkActionTrace(t *testing.T, reg *registry, tid, name string) { // findRootSpan finds the root span in spans. // It also verifies that it is unique. -func findRootSpan(t *testing.T, spans map[string]*gtrace.SpanData) *gtrace.SpanData { +func findRootSpan(t *testing.T, spans map[string]*tracing.SpanData) *tracing.SpanData { t.Helper() - var root *gtrace.SpanData + var root *tracing.SpanData for _, sd := range spans { if sd.ParentSpanID == "" { if root != nil { diff --git a/go/genkit/dotprompt/genkit.go b/go/genkit/dotprompt/genkit.go index 172f650e9..fc4c9173a 100644 --- a/go/genkit/dotprompt/genkit.go +++ b/go/genkit/dotprompt/genkit.go @@ -22,6 +22,7 @@ import ( "github.com/firebase/genkit/go/ai" "github.com/firebase/genkit/go/genkit" + "github.com/firebase/genkit/go/internal/tracing" ) // ActionInput is the input type of a prompt action. @@ -160,7 +161,7 @@ func (p *Prompt) Register() error { // passes the rendered template to the AI generator specified by // the prompt. func (p *Prompt) Execute(ctx context.Context, input *ActionInput) (*ai.GenerateResponse, error) { - genkit.SetCustomMetadataAttr(ctx, "subtype", "prompt") + tracing.SetCustomMetadataAttr(ctx, "subtype", "prompt") genReq, err := p.buildRequest(input) if err != nil { diff --git a/go/genkit/flow.go b/go/genkit/flow.go index 5839e5b6a..86c330424 100644 --- a/go/genkit/flow.go +++ b/go/genkit/flow.go @@ -25,6 +25,7 @@ import ( "github.com/firebase/genkit/go/gtime" "github.com/firebase/genkit/go/internal" + "github.com/firebase/genkit/go/internal/tracing" "github.com/google/uuid" otrace "go.opentelemetry.io/otel/trace" ) @@ -89,7 +90,7 @@ type Flow[I, O, S any] struct { name string // The last component of the flow's key in the registry. fn Func[I, O, S] // The function to run. stateStore FlowStateStore // Where FlowStates are stored, to support resumption. - tstate *tracingState // set from the action when the flow is defined + tstate *tracing.State // set from the action when the flow is defined // TODO(jba): scheduler // TODO(jba): experimentalDurable // TODO(jba): authPolicy @@ -239,7 +240,7 @@ type FlowResult[O any] struct { // action creates an action for the flow. See the comment at the top of this file for more information. func (f *Flow[I, O, S]) action() *Action[*flowInstruction[I], *flowState[I, O], S] { return NewStreamingAction(f.name, nil, func(ctx context.Context, inst *flowInstruction[I], cb StreamingCallback[S]) (*flowState[I, O], error) { - spanMetaKey.fromContext(ctx).SetAttr("flow:wrapperAction", "true") + tracing.SpanMetaKey.FromContext(ctx).SetAttr("flow:wrapperAction", "true") return f.runInstruction(ctx, inst, cb) }) } @@ -289,10 +290,10 @@ func (f *Flow[I, O, S]) execute(ctx context.Context, state *flowState[I, O], dis defer func() { if err := fctx.finish(ctx); err != nil { // TODO(jba): do something more with this error? - Logger(ctx).Error("flowContext.finish", "err", err.Error()) + internal.Logger(ctx).Error("flowContext.finish", "err", err.Error()) } }() - ctx = flowContextKey.newContext(ctx, fctx) + ctx = flowContextKey.NewContext(ctx, fctx) exec := &flowExecution{ StartTime: gtime.ToMilliseconds(time.Now()), } @@ -301,8 +302,8 @@ func (f *Flow[I, O, S]) execute(ctx context.Context, state *flowState[I, O], dis state.mu.Unlock() // TODO(jba): retrieve the JSON-marshaled SpanContext from state.traceContext. // TODO(jba): add a span link to the context. - output, err := runInNewSpan(ctx, fctx.tracingState(), f.name, "flow", true, state.Input, func(ctx context.Context, input I) (O, error) { - spanMeta := spanMetaKey.fromContext(ctx) + output, err := tracing.RunInNewSpan(ctx, fctx.tracingState(), f.name, "flow", true, state.Input, func(ctx context.Context, input I) (O, error) { + spanMeta := tracing.SpanMetaKey.FromContext(ctx) spanMeta.SetAttr("flow:execution", strconv.Itoa(len(state.Executions)-1)) // TODO(jba): put labels into span metadata. spanMeta.SetAttr("flow:name", f.name) @@ -318,14 +319,14 @@ func (f *Flow[I, O, S]) execute(ctx context.Context, state *flowState[I, O], dis latency := time.Since(start) if err != nil { // TODO(jba): handle InterruptError - Logger(ctx).Error("flow failed", + internal.Logger(ctx).Error("flow failed", "path", spanMeta.Path, "err", err.Error(), ) writeFlowFailure(ctx, f.name, latency, err) spanMeta.SetAttr("flow:state", "error") } else { - Logger(ctx).Info("flow succeeded", "path", spanMeta.Path) + internal.Logger(ctx).Info("flow succeeded", "path", spanMeta.Path) writeFlowSuccess(ctx, f.name, latency) spanMeta.SetAttr("flow:state", "done") @@ -363,7 +364,7 @@ func generateFlowID() (string, error) { type flowContext[I, O any] struct { state *flowState[I, O] stateStore FlowStateStore - tstate *tracingState + tstate *tracing.State mu sync.Mutex seenSteps map[string]int // number of times each name appears, to avoid duplicate names // TODO(jba): auth @@ -373,10 +374,10 @@ type flowContext[I, O any] struct { type flowContexter interface { uniqueStepName(string) string stater() flowStater - tracingState() *tracingState + tracingState() *tracing.State } -func newFlowContext[I, O any](state *flowState[I, O], store FlowStateStore, tstate *tracingState) *flowContext[I, O] { +func newFlowContext[I, O any](state *flowState[I, O], store FlowStateStore, tstate *tracing.State) *flowContext[I, O] { return &flowContext[I, O]{ state: state, stateStore: store, @@ -384,8 +385,8 @@ func newFlowContext[I, O any](state *flowState[I, O], store FlowStateStore, tsta seenSteps: map[string]int{}, } } -func (fc *flowContext[I, O]) stater() flowStater { return fc.state } -func (fc *flowContext[I, O]) tracingState() *tracingState { return fc.tstate } +func (fc *flowContext[I, O]) stater() flowStater { return fc.state } +func (fc *flowContext[I, O]) tracingState() *tracing.State { return fc.tstate } // finish is called at the end of a flow execution. func (fc *flowContext[I, O]) finish(ctx context.Context) error { @@ -408,7 +409,7 @@ func (fc *flowContext[I, O]) uniqueStepName(name string) string { return fmt.Sprintf("%s-%d", name, n) } -var flowContextKey = newContextKey[flowContexter]() +var flowContextKey = internal.NewContextKey[flowContexter]() // Run runs the function f in the context of the current flow. // It returns an error if no flow is active. @@ -418,16 +419,16 @@ var flowContextKey = newContextKey[flowContexter]() // is restarted, f will not be called a second time. func Run[T any](ctx context.Context, name string, f func() (T, error)) (T, error) { // from js/flow/src/steps.ts - fc := flowContextKey.fromContext(ctx) + fc := flowContextKey.FromContext(ctx) if fc == nil { var z T return z, fmt.Errorf("genkit.Run(%q): must be called from a flow", name) } // TODO(jba): The input here is irrelevant. Perhaps runInNewSpan should have only a result type param, // as in the js. - return runInNewSpan(ctx, fc.tracingState(), name, "flowStep", false, 0, func(ctx context.Context, _ int) (T, error) { + return tracing.RunInNewSpan(ctx, fc.tracingState(), name, "flowStep", false, 0, func(ctx context.Context, _ int) (T, error) { uName := fc.uniqueStepName(name) - spanMeta := spanMetaKey.fromContext(ctx) + spanMeta := tracing.SpanMetaKey.FromContext(ctx) spanMeta.SetAttr("flow:stepType", "run") spanMeta.SetAttr("flow:stepName", name) spanMeta.SetAttr("flow:resolvedStepName", uName) diff --git a/go/genkit/metrics.go b/go/genkit/metrics.go index 66861d1d2..c29ec9b97 100644 --- a/go/genkit/metrics.go +++ b/go/genkit/metrics.go @@ -19,6 +19,7 @@ import ( "sync" "time" + "github.com/firebase/genkit/go/internal" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" @@ -37,7 +38,7 @@ var fetchInstruments = sync.OnceValue(func() *metricInstruments { insts, err := initInstruments() if err != nil { // Do not stop the program because we can't collect metrics. - Logger(context.Background()).Error("metric initialization failed; no metrics will be collected", "err", err) + internal.Logger(context.Background()).Error("metric initialization failed; no metrics will be collected", "err", err) return nil } return insts diff --git a/go/genkit/registry.go b/go/genkit/registry.go index bb755f38b..50ac53d9c 100644 --- a/go/genkit/registry.go +++ b/go/genkit/registry.go @@ -22,7 +22,7 @@ import ( "slices" "sync" - "github.com/firebase/genkit/go/gtrace" + "github.com/firebase/genkit/go/internal/tracing" sdktrace "go.opentelemetry.io/otel/sdk/trace" "golang.org/x/exp/maps" ) @@ -43,27 +43,27 @@ func init() { } type registry struct { - tstate *tracingState + tstate *tracing.State mu sync.Mutex actions map[string]action // TraceStores, at most one for each [Environment]. // Only the prod trace store is actually registered; the dev one is // always created automatically. But it's simpler if we keep them together here. - traceStores map[Environment]gtrace.Store + traceStores map[Environment]tracing.Store } func newRegistry() (*registry, error) { r := ®istry{ actions: map[string]action{}, - traceStores: map[Environment]gtrace.Store{}, + traceStores: map[Environment]tracing.Store{}, } - tstore, err := newDevTraceStore() + tstore, err := tracing.NewDevStore() if err != nil { return nil, err } r.registerTraceStore(EnvironmentDev, tstore) - r.tstate = newTracingState() - r.tstate.addTraceStoreImmediate(tstore) + r.tstate = tracing.NewState() + r.tstate.AddTraceStoreImmediate(tstore) return r, nil } @@ -149,12 +149,12 @@ func (r *registry) listActions() []actionDesc { // The returned function should be called before the program ends to ensure that // all pending data is stored. // RegisterTraceStore panics if called more than once. -func RegisterTraceStore(ts gtrace.Store) (shutdown func(context.Context) error) { +func RegisterTraceStore(ts tracing.Store) (shutdown func(context.Context) error) { globalRegistry.registerTraceStore(EnvironmentProd, ts) - return globalRegistry.tstate.addTraceStoreBatch(ts) + return globalRegistry.tstate.AddTraceStoreBatch(ts) } -func (r *registry) registerTraceStore(env Environment, ts gtrace.Store) { +func (r *registry) registerTraceStore(env Environment, ts tracing.Store) { r.mu.Lock() defer r.mu.Unlock() if _, ok := r.traceStores[env]; ok { @@ -163,7 +163,7 @@ func (r *registry) registerTraceStore(env Environment, ts gtrace.Store) { r.traceStores[env] = ts } -func (r *registry) lookupTraceStore(env Environment) gtrace.Store { +func (r *registry) lookupTraceStore(env Environment) tracing.Store { r.mu.Lock() defer r.mu.Unlock() return r.traceStores[env] @@ -177,5 +177,5 @@ func RegisterSpanProcessor(sp sdktrace.SpanProcessor) { func (r *registry) registerSpanProcessor(sp sdktrace.SpanProcessor) { r.mu.Lock() defer r.mu.Unlock() - r.tstate.registerSpanProcessor(sp) + r.tstate.RegisterSpanProcessor(sp) } diff --git a/go/genkit/util.go b/go/internal/context_key.go similarity index 61% rename from go/genkit/util.go rename to go/internal/context_key.go index 424deeda8..15740c129 100644 --- a/go/genkit/util.go +++ b/go/internal/context_key.go @@ -12,29 +12,30 @@ // See the License for the specific language governing permissions and // limitations under the License. -package genkit +package internal import ( "context" ) -// A contextKey is a unique, typed key for a value stored in a context. -type contextKey[T any] struct { +// A ContextKey is a unique, typed key for a value stored in a context. +type ContextKey[T any] struct { key *int } -func newContextKey[T any]() contextKey[T] { - return contextKey[T]{key: new(int)} +// NewContextKey returns a context key for a value of type T. +func NewContextKey[T any]() ContextKey[T] { + return ContextKey[T]{key: new(int)} } -// newContext returns ctx augmented with this key and the given value. -func (k contextKey[T]) newContext(ctx context.Context, value T) context.Context { +// NewContext returns ctx augmented with this key and the given value. +func (k ContextKey[T]) NewContext(ctx context.Context, value T) context.Context { return context.WithValue(ctx, k.key, value) } -// fromContext returns the value associated with this key in the context, +// FromContext returns the value associated with this key in the context, // or the internal.Zero value for T if the key is not present. -func (k contextKey[T]) fromContext(ctx context.Context) T { +func (k ContextKey[T]) FromContext(ctx context.Context) T { t, _ := ctx.Value(k.key).(T) return t } diff --git a/go/genkit/logging.go b/go/internal/logging.go similarity index 90% rename from go/genkit/logging.go rename to go/internal/logging.go index 1edbafaa7..2e8425649 100644 --- a/go/genkit/logging.go +++ b/go/internal/logging.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package genkit +package internal import ( "context" @@ -29,12 +29,12 @@ func init() { slog.SetDefault(h) } -var loggerKey = newContextKey[*slog.Logger]() +var loggerKey = NewContextKey[*slog.Logger]() // Logger returns the Logger in ctx, or the default Logger // if there is none. func Logger(ctx context.Context) *slog.Logger { - if l := loggerKey.fromContext(ctx); l != nil { + if l := loggerKey.FromContext(ctx); l != nil { return l } return slog.Default() diff --git a/go/gtrace/file_store.go b/go/internal/tracing/file_store.go similarity index 99% rename from go/gtrace/file_store.go rename to go/internal/tracing/file_store.go index dd5fa42f3..49df688f1 100644 --- a/go/gtrace/file_store.go +++ b/go/internal/tracing/file_store.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package gtrace +package tracing import ( "context" diff --git a/go/gtrace/file_store_test.go b/go/internal/tracing/file_store_test.go similarity index 99% rename from go/gtrace/file_store_test.go rename to go/internal/tracing/file_store_test.go index b56635242..431e89de0 100644 --- a/go/gtrace/file_store_test.go +++ b/go/internal/tracing/file_store_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package gtrace +package tracing import ( "context" diff --git a/go/gtrace/store.go b/go/internal/tracing/store.go similarity index 98% rename from go/gtrace/store.go rename to go/internal/tracing/store.go index 33968aa73..0d5725792 100644 --- a/go/gtrace/store.go +++ b/go/internal/tracing/store.go @@ -12,8 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -// The trace package provides support for storing and exporting traces. -package gtrace +package tracing import ( "context" diff --git a/go/gtrace/store_test.go b/go/internal/tracing/store_test.go similarity index 99% rename from go/gtrace/store_test.go rename to go/internal/tracing/store_test.go index e62a2b2ab..426c7f76d 100644 --- a/go/gtrace/store_test.go +++ b/go/internal/tracing/store_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package gtrace +package tracing import ( "encoding/json" diff --git a/go/gtrace/testdata/trace.json b/go/internal/tracing/testdata/trace.json similarity index 100% rename from go/gtrace/testdata/trace.json rename to go/internal/tracing/testdata/trace.json diff --git a/go/genkit/trace_store_exporter.go b/go/internal/tracing/trace_store_exporter.go similarity index 81% rename from go/genkit/trace_store_exporter.go rename to go/internal/tracing/trace_store_exporter.go index 9f6865e5e..991a6b4a3 100644 --- a/go/genkit/trace_store_exporter.go +++ b/go/internal/tracing/trace_store_exporter.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package genkit +package tracing import ( "context" @@ -20,7 +20,6 @@ import ( "strings" "github.com/firebase/genkit/go/gtime" - "github.com/firebase/genkit/go/gtrace" "go.opentelemetry.io/otel/attribute" sdktrace "go.opentelemetry.io/otel/sdk/trace" otrace "go.opentelemetry.io/otel/trace" @@ -29,10 +28,10 @@ import ( // A traceStoreExporter is an OpenTelemetry SpanExporter that // writes spans to a TraceStore. type traceStoreExporter struct { - store gtrace.Store + store Store } -func newTraceStoreExporter(store gtrace.Store) *traceStoreExporter { +func newTraceStoreExporter(store Store) *traceStoreExporter { return &traceStoreExporter{store} } @@ -65,8 +64,8 @@ func (e *traceStoreExporter) ExportSpans(ctx context.Context, spans []sdktrace.R // convertTrace converts a list of spans to a TraceData. // The spans must all have the same trace ID. -func convertTrace(spans []sdktrace.ReadOnlySpan) (*gtrace.Data, error) { - td := >race.Data{Spans: map[string]*gtrace.SpanData{}} +func convertTrace(spans []sdktrace.ReadOnlySpan) (*Data, error) { + td := &Data{Spans: map[string]*SpanData{}} for _, span := range spans { cspan := convertSpan(span) // The unique span with no parent determines @@ -85,9 +84,9 @@ func convertTrace(spans []sdktrace.ReadOnlySpan) (*gtrace.Data, error) { } // convertSpan converts an OpenTelemetry span to a SpanData. -func convertSpan(span sdktrace.ReadOnlySpan) *gtrace.SpanData { +func convertSpan(span sdktrace.ReadOnlySpan) *SpanData { sc := span.SpanContext() - sd := >race.SpanData{ + sd := &SpanData{ SpanID: sc.SpanID().String(), TraceID: sc.TraceID().String(), StartTime: gtime.ToMilliseconds(span.StartTime()), @@ -95,9 +94,9 @@ func convertSpan(span sdktrace.ReadOnlySpan) *gtrace.SpanData { Attributes: attributesToMap(span.Attributes()), DisplayName: span.Name(), Links: convertLinks(span.Links()), - InstrumentationLibrary: gtrace.InstrumentationLibrary(span.InstrumentationLibrary()), + InstrumentationLibrary: InstrumentationLibrary(span.InstrumentationLibrary()), SpanKind: strings.ToUpper(span.SpanKind().String()), - SameProcessAsParentSpan: gtrace.BoolValue{!sc.IsRemote()}, + SameProcessAsParentSpan: BoolValue{!sc.IsRemote()}, Status: convertStatus(span.Status()), } if p := span.Parent(); p.HasSpanID() { @@ -115,10 +114,10 @@ func attributesToMap(attrs []attribute.KeyValue) map[string]any { return m } -func convertLinks(links []sdktrace.Link) []*gtrace.Link { - var cls []*gtrace.Link +func convertLinks(links []sdktrace.Link) []*Link { + var cls []*Link for _, l := range links { - cl := >race.Link{ + cl := &Link{ SpanContext: convertSpanContext(l.SpanContext), Attributes: attributesToMap(l.Attributes), DroppedAttributesCount: l.DroppedAttributeCount, @@ -128,8 +127,8 @@ func convertLinks(links []sdktrace.Link) []*gtrace.Link { return cls } -func convertSpanContext(sc otrace.SpanContext) gtrace.SpanContext { - return gtrace.SpanContext{ +func convertSpanContext(sc otrace.SpanContext) SpanContext { + return SpanContext{ TraceID: sc.TraceID().String(), SpanID: sc.SpanID().String(), IsRemote: sc.IsRemote(), @@ -137,12 +136,12 @@ func convertSpanContext(sc otrace.SpanContext) gtrace.SpanContext { } } -func convertEvents(evs []sdktrace.Event) []gtrace.TimeEvent { - var tes []gtrace.TimeEvent +func convertEvents(evs []sdktrace.Event) []TimeEvent { + var tes []TimeEvent for _, e := range evs { - tes = append(tes, gtrace.TimeEvent{ + tes = append(tes, TimeEvent{ Time: gtime.ToMilliseconds(e.Time), - Annotation: gtrace.Annotation{ + Annotation: Annotation{ Description: e.Name, Attributes: attributesToMap(e.Attributes), }, @@ -151,8 +150,8 @@ func convertEvents(evs []sdktrace.Event) []gtrace.TimeEvent { return tes } -func convertStatus(s sdktrace.Status) gtrace.Status { - return gtrace.Status{ +func convertStatus(s sdktrace.Status) Status { + return Status{ Code: uint32(s.Code), Description: s.Description, } diff --git a/go/genkit/trace_store_exporter_test.go b/go/internal/tracing/trace_store_exporter_test.go similarity index 89% rename from go/genkit/trace_store_exporter_test.go rename to go/internal/tracing/trace_store_exporter_test.go index 529733938..29333212d 100644 --- a/go/genkit/trace_store_exporter_test.go +++ b/go/internal/tracing/trace_store_exporter_test.go @@ -12,14 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -package genkit +package tracing import ( "testing" "time" "github.com/firebase/genkit/go/gtime" - "github.com/firebase/genkit/go/gtrace" "github.com/google/go-cmp/cmp" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" @@ -85,7 +84,7 @@ func TestConvertSpan(t *testing.T) { SchemaURL: "surl", }, } - want := >race.SpanData{ + want := &SpanData{ DisplayName: "name", TraceID: traceID, SpanID: spanID1, @@ -94,15 +93,15 @@ func TestConvertSpan(t *testing.T) { StartTime: gtime.Milliseconds(1e3), EndTime: gtime.Milliseconds(2e3), Attributes: map[string]any{"k": "v"}, - TimeEvents: gtrace.TimeEvents{TimeEvent: []gtrace.TimeEvent{{ + TimeEvents: TimeEvents{TimeEvent: []TimeEvent{{ Time: gtime.Milliseconds(3e3), - Annotation: gtrace.Annotation{ + Annotation: Annotation{ Attributes: map[string]any{"k2": "v2"}, Description: "ename", }, }}}, - Links: []*gtrace.Link{{ - SpanContext: gtrace.SpanContext{ + Links: []*Link{{ + SpanContext: SpanContext{ TraceID: traceID, SpanID: spanID1, IsRemote: true, @@ -111,8 +110,8 @@ func TestConvertSpan(t *testing.T) { Attributes: map[string]any{"k3": "v3"}, DroppedAttributesCount: 1, }}, - Status: gtrace.Status{Code: 2, Description: "desc"}, - InstrumentationLibrary: gtrace.InstrumentationLibrary{ + Status: Status{Code: 2, Description: "desc"}, + InstrumentationLibrary: InstrumentationLibrary{ Name: "iname", Version: "version", SchemaURL: "surl", diff --git a/go/genkit/tracing.go b/go/internal/tracing/tracing.go similarity index 81% rename from go/genkit/tracing.go rename to go/internal/tracing/tracing.go index 94b585667..acdebb0fc 100644 --- a/go/genkit/tracing.go +++ b/go/internal/tracing/tracing.go @@ -12,7 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -package genkit +// The tracing package provides support for execution traces. +package tracing import ( "context" @@ -22,7 +23,6 @@ import ( "path/filepath" "sync" - "github.com/firebase/genkit/go/gtrace" "github.com/firebase/genkit/go/internal" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" @@ -30,51 +30,51 @@ import ( "go.opentelemetry.io/otel/trace" ) -// tracingState holds OpenTelemetry values for creating traces. -type tracingState struct { - tp *sdktrace.TracerProvider // references gtrace.Stores +// State holds OpenTelemetry values for creating traces. +type State struct { + tp *sdktrace.TracerProvider // references Stores tracer trace.Tracer // returned from tp.Tracer(), cached } -func newTracingState() *tracingState { +func NewState() *State { tp := sdktrace.NewTracerProvider() - return &tracingState{ + return &State{ tp: tp, tracer: tp.Tracer("genkit-tracer", trace.WithInstrumentationVersion("v1")), } } -func (ts *tracingState) registerSpanProcessor(sp sdktrace.SpanProcessor) { +func (ts *State) RegisterSpanProcessor(sp sdktrace.SpanProcessor) { ts.tp.RegisterSpanProcessor(sp) } -// addTraceStoreImmediate adds tstore to the tracingState. +// AddTraceStoreImmediate adds tstore to the tracingState. // Traces are saved immediately as they are finshed. // Use this for a gtrace.Store with a fast Save method, // such as one that writes to a file. -func (ts *tracingState) addTraceStoreImmediate(tstore gtrace.Store) { +func (ts *State) AddTraceStoreImmediate(tstore Store) { e := newTraceStoreExporter(tstore) // Adding a SimpleSpanProcessor is like using the WithSyncer option. - ts.registerSpanProcessor(sdktrace.NewSimpleSpanProcessor(e)) + ts.RegisterSpanProcessor(sdktrace.NewSimpleSpanProcessor(e)) // Ignore tracerProvider.Shutdown. It shouldn't be needed when using WithSyncer. // Confirmed for OTel packages as of v1.24.0. // Also requires traceStoreExporter.Shutdown to be a no-op. } -// addTraceStoreBatch adds ts to the tracingState. +// AddTraceStoreBatch adds ts to the tracingState. // Traces are batched before being sent for processing. // Use this for a gtrace.Store with a potentially expensive Save method, // such as one that makes an RPC. // Callers must invoke the returned function at the end of the program to flush the final batch // and perform other cleanup. -func (ts *tracingState) addTraceStoreBatch(tstore gtrace.Store) (shutdown func(context.Context) error) { +func (ts *State) AddTraceStoreBatch(tstore Store) (shutdown func(context.Context) error) { e := newTraceStoreExporter(tstore) // Adding a BatchSpanProcessor is like using the WithBatcher option. - ts.registerSpanProcessor(sdktrace.NewBatchSpanProcessor(e)) + ts.RegisterSpanProcessor(sdktrace.NewBatchSpanProcessor(e)) return ts.tp.Shutdown } -func newDevTraceStore() (gtrace.Store, error) { +func NewDevStore() (Store, error) { programName := filepath.Base(os.Args[0]) rootHash := fmt.Sprintf("%02x", md5.Sum([]byte(programName))) dir := filepath.Join(os.TempDir(), ".genkit", rootHash, "traces") @@ -82,7 +82,7 @@ func newDevTraceStore() (gtrace.Store, error) { return nil, err } // Don't remove the temp directory, for post-mortem debugging. - return gtrace.NewFileStore(dir) + return NewFileStore(dir) } // The rest of this file contains code translated from js/common/src/tracing/*.ts. @@ -92,18 +92,18 @@ const ( spanTypeAttr = attrPrefix + ":type" ) -// runInNewSpan runs f on input in a new span with the given name. +// RunInNewSpan runs f on input in a new span with the given name. // The attrs map provides the span's initial attributes. -func runInNewSpan[I, O any]( +func RunInNewSpan[I, O any]( ctx context.Context, - tstate *tracingState, + tstate *State, name, spanType string, isRoot bool, input I, f func(context.Context, I) (O, error), ) (O, error) { // TODO(jba): support span links. - log := Logger(ctx) + log := internal.Logger(ctx) log.Debug("span start", "name", name) defer log.Debug("span end", "name", name) sm := &spanMetadata{ @@ -111,7 +111,7 @@ func runInNewSpan[I, O any]( Input: input, IsRoot: isRoot, } - parentSpanMeta := spanMetaKey.fromContext(ctx) + parentSpanMeta := SpanMetaKey.FromContext(ctx) var parentPath string if parentSpanMeta != nil { parentPath = parentSpanMeta.Path @@ -126,7 +126,7 @@ func runInNewSpan[I, O any]( // At the end, copy some of the spanMetadata to the OpenTelemetry span. defer func() { span.SetAttributes(sm.attributes()...) }() // Add the spanMetadata to the context, so the function can access it. - ctx = spanMetaKey.newContext(ctx, sm) + ctx = SpanMetaKey.NewContext(ctx, sm) // Run the function. output, err := f(ctx, input) @@ -197,10 +197,10 @@ func (sm *spanMetadata) attributes() []attribute.KeyValue { return kvs } -// spanMetaKey is for storing spanMetadatas in a context. -var spanMetaKey = newContextKey[*spanMetadata]() +// SpanMetaKey is for storing spanMetadatas in a context. +var SpanMetaKey = internal.NewContextKey[*spanMetadata]() // SetCustomMetadataAttr records a key in the current span metadata. func SetCustomMetadataAttr(ctx context.Context, key, value string) { - spanMetaKey.fromContext(ctx).SetAttr(key, value) + SpanMetaKey.FromContext(ctx).SetAttr(key, value) } diff --git a/go/genkit/tracing_test.go b/go/internal/tracing/tracing_test.go similarity index 63% rename from go/genkit/tracing_test.go rename to go/internal/tracing/tracing_test.go index e573d18e4..6a6df1294 100644 --- a/go/genkit/tracing_test.go +++ b/go/internal/tracing/tracing_test.go @@ -12,10 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -package genkit +package tracing import ( - "context" "slices" "strconv" "testing" @@ -52,30 +51,3 @@ func TestSpanMetadata(t *testing.T) { t.Errorf("\ngot %v\nwant %v", got, want) } } - -func TestTracing(t *testing.T) { - ctx := context.Background() - const actionName = "TestTracing-inc" - a := NewAction(actionName, nil, inc) - if _, err := a.Run(context.Background(), 3, nil); err != nil { - t.Fatal(err) - } - // The dev TraceStore is registered by Init, called from TestMain. - ts := globalRegistry.lookupTraceStore(EnvironmentDev) - tds, _, err := ts.List(ctx, nil) - if err != nil { - t.Fatal(err) - } - // The same trace store is used for all tests, so there might be several traces. - // Look for this one, which has a unique name. - for _, td := range tds { - if td.DisplayName == actionName { - // Spot check: expect a single span. - if g, w := len(td.Spans), 1; g != w { - t.Errorf("got %d spans, want %d", g, w) - } - return - } - } - t.Fatalf("did not find trace named %q", actionName) -} diff --git a/go/plugins/localvec/localvec.go b/go/plugins/localvec/localvec.go index 168e02bc5..79305fd43 100644 --- a/go/plugins/localvec/localvec.go +++ b/go/plugins/localvec/localvec.go @@ -31,7 +31,7 @@ import ( "slices" "github.com/firebase/genkit/go/ai" - "github.com/firebase/genkit/go/genkit" + "github.com/firebase/genkit/go/internal" ) // New returns a new local vector database. This will register a new @@ -110,7 +110,7 @@ func (r *retriever) Index(ctx context.Context, req *ai.IndexerRequest) error { } if _, ok := r.data[id]; ok { - genkit.Logger(ctx).Debug("localvec skipping document because already present", "id", id) + internal.Logger(ctx).Debug("localvec skipping document because already present", "id", id) continue }