diff --git a/go/ai/gen.go b/go/ai/gen.go index b69af845d..11de30217 100644 --- a/go/ai/gen.go +++ b/go/ai/gen.go @@ -135,8 +135,8 @@ type ToolDefinition struct { OutputSchema map[string]any `json:"outputSchema,omitempty"` } -// A ToolRequest is a request from the model that the client should run -// a specific tool and pass a [ToolResponse] to the model on the next request it makes. +// A ToolRequest is a message from the model to the client that it should run a +// specific tool and pass a [ToolResponse] to the model on the next chat request it makes. // Any ToolRequest will correspond to some [ToolDefinition] previously sent by the client. type ToolRequest struct { // Input is a JSON object describing the input values to the tool. @@ -145,7 +145,7 @@ type ToolRequest struct { Name string `json:"name,omitempty"` } -// A ToolResponse is a response from the client to the model containing +// A ToolResponse is a message from the client to the model containing // the results of running a specific tool on the arguments passed to the client // by the model in a [ToolRequest]. type ToolResponse struct { diff --git a/go/genkit/action.go b/go/genkit/action.go index 0c27af2ce..13661f02c 100644 --- a/go/genkit/action.go +++ b/go/genkit/action.go @@ -22,19 +22,21 @@ import ( "reflect" "time" + "github.com/firebase/genkit/go/internal" "github.com/invopop/jsonschema" ) // Func is the type of function that Actions and Flows execute. // It takes an input of type I and returns an output of type O, optionally // streaming values of type S incrementally by invoking a callback. -// TODO(jba): use a generic type alias when they become available? // If the StreamingCallback is non-nil and the function supports streaming, it should // stream the results by invoking the callback periodically, ultimately returning // with a final return value. Otherwise, it should ignore the StreamingCallback and // just return a result. type Func[I, O, S any] func(context.Context, I, StreamingCallback[S]) (O, error) +// TODO(jba): use a generic type alias for the above when they become available? + // StreamingCallback is the type of streaming callbacks, which is passed to action // functions who should stream their responses. type StreamingCallback[S any] func(context.Context, S) error @@ -44,7 +46,7 @@ type StreamingCallback[S any] func(context.Context, S) error // Such a function corresponds to a Flow[I, O, struct{}]. type NoStream = StreamingCallback[struct{}] -// An Action is a function with a name. +// An Action is a named, observable operation. // It consists of a function that takes an input of type I and returns an output // of type O, optionally streaming values of type S incrementally by invoking a callback. // It optionally has other metadata, like a description @@ -90,7 +92,7 @@ func (a *Action[I, O, S]) Name() string { return a.name } // setTracingState sets the action's tracingState. func (a *Action[I, O, S]) setTracingState(tstate *tracingState) { a.tstate = tstate } -// Run executes the Action's function in a new span. +// Run executes the Action's function in a new trace span. func (a *Action[I, O, S]) Run(ctx context.Context, input I, cb StreamingCallback[S]) (output O, err error) { // TODO: validate input against JSONSchema for I. // TODO: validate output against JSONSchema for O. @@ -115,7 +117,7 @@ func (a *Action[I, O, S]) Run(ctx context.Context, input I, cb StreamingCallback latency := time.Since(start) if err != nil { writeActionFailure(ctx, a.name, latency, err) - return zero[O](), err + return internal.Zero[O](), err } writeActionSuccess(ctx, a.name, latency) return out, nil diff --git a/go/genkit/conformance_test.go b/go/genkit/conformance_test.go index c6f244847..c7e12ab21 100644 --- a/go/genkit/conformance_test.go +++ b/go/genkit/conformance_test.go @@ -27,6 +27,7 @@ import ( "testing" "time" + "github.com/firebase/genkit/go/internal" "golang.org/x/exp/maps" ) @@ -84,7 +85,7 @@ func TestFlowConformance(t *testing.T) { for _, filename := range testFiles { t.Run(strings.TrimSuffix(filepath.Base(filename), ".json"), func(t *testing.T) { var test conformanceTest - if err := readJSONFile(filename, &test); err != nil { + if err := internal.ReadJSONFile(filename, &test); err != nil { t.Fatal(err) } // Each test uses its own registry to avoid interference. @@ -111,7 +112,7 @@ func TestFlowConformance(t *testing.T) { } ts := r.lookupTraceStore(EnvironmentDev) var gotTrace any - if err := ts.loadAny(resp.Telemetry.TraceID, &gotTrace); err != nil { + if err := ts.LoadAny(resp.Telemetry.TraceID, &gotTrace); err != nil { t.Fatal(err) } renameSpans(t, gotTrace) diff --git a/go/genkit/dev_server.go b/go/genkit/dev_server.go index dcb1add3f..0015c4531 100644 --- a/go/genkit/dev_server.go +++ b/go/genkit/dev_server.go @@ -34,6 +34,7 @@ import ( "syscall" "time" + "github.com/firebase/genkit/go/gtrace" "go.opentelemetry.io/otel/trace" ) @@ -250,22 +251,22 @@ func (s *devServer) handleListTraces(w http.ResponseWriter, r *http.Request) err } } ctoken := r.FormValue("continuationToken") - tds, ctoken, err := ts.List(r.Context(), &TraceQuery{Limit: limit, ContinuationToken: ctoken}) - if errors.Is(err, errBadQuery) { + tds, ctoken, err := ts.List(r.Context(), >race.Query{Limit: limit, ContinuationToken: ctoken}) + if errors.Is(err, gtrace.ErrBadQuery) { return &httpError{http.StatusBadRequest, err} } if err != nil { return err } if tds == nil { - tds = []*TraceData{} + tds = []*gtrace.Data{} } return writeJSON(r.Context(), w, listTracesResult{tds, ctoken}) } type listTracesResult struct { - Traces []*TraceData `json:"traces"` - ContinuationToken string `json:"continuationToken"` + Traces []*gtrace.Data `json:"traces"` + ContinuationToken string `json:"continuationToken"` } func (s *devServer) handleListFlowStates(w http.ResponseWriter, r *http.Request) error { diff --git a/go/genkit/dev_server_test.go b/go/genkit/dev_server_test.go index 679f39cb5..83c450178 100644 --- a/go/genkit/dev_server_test.go +++ b/go/genkit/dev_server_test.go @@ -24,6 +24,7 @@ import ( "strings" "testing" + "github.com/firebase/genkit/go/gtrace" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" ) @@ -102,7 +103,7 @@ func TestDevServer(t *testing.T) { if err != nil { t.Fatal(err) } - // We may have any result, including zero traces, so don't check anything else. + // We may have any result, including internal.Zero traces, so don't check anything else. }) } @@ -113,13 +114,13 @@ func checkActionTrace(t *testing.T, reg *registry, tid, name string) { t.Fatal(err) } rootSpan := findRootSpan(t, td.Spans) - want := &SpanData{ + want := >race.SpanData{ TraceID: tid, DisplayName: "dev-run-action-wrapper", SpanKind: "INTERNAL", - SameProcessAsParentSpan: boolValue{Value: true}, - Status: Status{Code: 0}, - InstrumentationLibrary: InstrumentationLibrary{ + SameProcessAsParentSpan: gtrace.BoolValue{Value: true}, + Status: gtrace.Status{Code: 0}, + InstrumentationLibrary: gtrace.InstrumentationLibrary{ Name: "genkit-tracer", Version: "v1", }, @@ -133,7 +134,7 @@ func checkActionTrace(t *testing.T, reg *registry, tid, name string) { "genkit:state": "success", }, } - diff := cmp.Diff(want, rootSpan, cmpopts.IgnoreFields(SpanData{}, "SpanID", "StartTime", "EndTime")) + diff := cmp.Diff(want, rootSpan, cmpopts.IgnoreFields(gtrace.SpanData{}, "SpanID", "StartTime", "EndTime")) if diff != "" { t.Errorf("mismatch (-want, +got):\n%s", diff) } @@ -141,9 +142,9 @@ func checkActionTrace(t *testing.T, reg *registry, tid, name string) { // findRootSpan finds the root span in spans. // It also verifies that it is unique. -func findRootSpan(t *testing.T, spans map[string]*SpanData) *SpanData { +func findRootSpan(t *testing.T, spans map[string]*gtrace.SpanData) *gtrace.SpanData { t.Helper() - var root *SpanData + var root *gtrace.SpanData for _, sd := range spans { if sd.ParentSpanID == "" { if root != nil { diff --git a/go/genkit/file_flow_state_store.go b/go/genkit/file_flow_state_store.go index 0a5853597..442434212 100644 --- a/go/genkit/file_flow_state_store.go +++ b/go/genkit/file_flow_state_store.go @@ -18,6 +18,8 @@ import ( "context" "os" "path/filepath" + + "github.com/firebase/genkit/go/internal" ) // A FileFlowStateStore is a FlowStateStore that writes flowStates to files. @@ -37,9 +39,9 @@ func NewFileFlowStateStore(dir string) (*FileFlowStateStore, error) { func (s *FileFlowStateStore) Save(ctx context.Context, id string, fs flowStater) error { fs.lock() defer fs.unlock() - return writeJSONFile(filepath.Join(s.dir, clean(id)), fs) + return internal.WriteJSONFile(filepath.Join(s.dir, internal.Clean(id)), fs) } func (s *FileFlowStateStore) Load(ctx context.Context, id string, pfs any) error { - return readJSONFile(filepath.Join(s.dir, clean(id)), pfs) + return internal.ReadJSONFile(filepath.Join(s.dir, internal.Clean(id)), pfs) } diff --git a/go/genkit/flow.go b/go/genkit/flow.go index 0dcab4b78..5f43db1b6 100644 --- a/go/genkit/flow.go +++ b/go/genkit/flow.go @@ -12,7 +12,27 @@ // See the License for the specific language governing permissions and // limitations under the License. -// This file contains code for flows. +package genkit + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "strconv" + "sync" + "time" + + "github.com/firebase/genkit/go/gtime" + "github.com/firebase/genkit/go/internal" + "github.com/google/uuid" + otrace "go.opentelemetry.io/otel/trace" +) + +// TODO(jba): support auth +// TODO(jba): provide a way to start a Flow from user code. + +// A Flow is a kind of Action that can be interrupted and resumed. // // A Flow[I, O, S] represents a function from I to O (the S parameter is described // under "Streaming" below). But the function may run in pieces, with interruptions @@ -62,25 +82,6 @@ // // Streaming is only supported for the "start" flow instruction. Currently there is // no way to schedule or resume a flow with streaming. -package genkit - -import ( - "context" - "encoding/json" - "errors" - "fmt" - "strconv" - "sync" - "time" - - "github.com/google/uuid" - "go.opentelemetry.io/otel/trace" -) - -// TODO(jba): support auth -// TODO(jba): provide a way to start a Flow from user code. - -// A Flow is a kind of Action that can be interrupted and resumed. type Flow[I, O, S any] struct { name string // The last component of the flow's key in the registry. fn Func[I, O, S] // The function to run. @@ -110,6 +111,8 @@ func defineFlow[I, O, S any](r *registry, name string, fn Func[I, O, S]) *Flow[I return f } +// TODO(jba): use flowError? + // A flowInstruction is an instruction to follow with a flow. // It is the input for the flow's action. // Exactly one field will be non-nil. @@ -162,8 +165,8 @@ type flowState[I, O any] struct { FlowID string `json:"flowId,omitempty"` FlowName string `json:"name,omitempty"` // start time in milliseconds since the epoch - StartTime Milliseconds `json:"startTime,omitempty"` - Input I `json:"input,omitempty"` + StartTime gtime.Milliseconds `json:"startTime,omitempty"` + Input I `json:"input,omitempty"` mu sync.Mutex Cache map[string]json.RawMessage `json:"cache,omitempty"` @@ -179,7 +182,7 @@ func newFlowState[I, O any](id, name string, input I) *flowState[I, O] { FlowID: id, FlowName: name, Input: input, - StartTime: timeToMilliseconds(time.Now()), + StartTime: gtime.ToMilliseconds(time.Now()), Cache: map[string]json.RawMessage{}, Operation: &Operation[O]{ FlowID: id, @@ -283,7 +286,7 @@ func (f *Flow[I, O, S]) execute(ctx context.Context, state *flowState[I, O], dis }() ctx = flowContextKey.newContext(ctx, fctx) exec := &FlowExecution{ - StartTime: timeToMilliseconds(time.Now()), + StartTime: gtime.ToMilliseconds(time.Now()), } state.mu.Lock() state.Executions = append(state.Executions, exec) @@ -297,7 +300,7 @@ func (f *Flow[I, O, S]) execute(ctx context.Context, state *flowState[I, O], dis spanMeta.SetAttr("flow:name", f.name) spanMeta.SetAttr("flow:id", state.FlowID) spanMeta.SetAttr("flow:dispatchType", dispatchType) - rootSpanContext := trace.SpanContextFromContext(ctx) + rootSpanContext := otrace.SpanContextFromContext(ctx) traceID := rootSpanContext.TraceID().String() exec.TraceIDs = append(exec.TraceIDs, traceID) // TODO(jba): Save rootSpanContext in the state. @@ -432,18 +435,18 @@ func Run[T any](ctx context.Context, name string, f func() (T, error)) (T, error if ok { var t T if err := json.Unmarshal(j, &t); err != nil { - return zero[T](), err + return internal.Zero[T](), err } spanMeta.SetAttr("flow:state", "cached") return t, nil } t, err := f() if err != nil { - return zero[T](), err + return internal.Zero[T](), err } bytes, err := json.Marshal(t) if err != nil { - return zero[T](), err + return internal.Zero[T](), err } fs.lock() fs.cache()[uName] = json.RawMessage(bytes) @@ -458,7 +461,7 @@ func Run[T any](ctx context.Context, name string, f func() (T, error)) (T, error func RunFlow[I, O, S any](ctx context.Context, flow *Flow[I, O, S], input I) (O, error) { state, err := flow.start(ctx, input, nil) if err != nil { - return zero[O](), err + return internal.Zero[O](), err } return finishedOpResponse(state.Operation) } @@ -515,10 +518,10 @@ func StreamFlow[I, O, S any](ctx context.Context, flow *Flow[I, O, S], input I) func finishedOpResponse[O any](op *Operation[O]) (O, error) { if !op.Done { - return zero[O](), fmt.Errorf("flow %s did not finish execution", op.FlowID) + return internal.Zero[O](), fmt.Errorf("flow %s did not finish execution", op.FlowID) } if op.Result.Error != "" { - return zero[O](), fmt.Errorf("flow %s: %s", op.FlowID, op.Result.Error) + return internal.Zero[O](), fmt.Errorf("flow %s: %s", op.FlowID, op.Result.Error) } return op.Result.Response, nil } diff --git a/go/genkit/gen.go b/go/genkit/gen.go index 3ce8de077..153ef1620 100644 --- a/go/genkit/gen.go +++ b/go/genkit/gen.go @@ -16,50 +16,26 @@ package genkit -type FlowError struct { +import "github.com/firebase/genkit/go/gtime" + +type flowError struct { Error string `json:"error,omitempty"` Stacktrace string `json:"stacktrace,omitempty"` } -type FlowInvokeEnvelopeMessageResume struct { - FlowID string `json:"flowId,omitempty"` - Payload any `json:"payload,omitempty"` -} - -type FlowInvokeEnvelopeMessageRetry struct { - FlowID string `json:"flowId,omitempty"` -} - type FlowInvokeEnvelopeMessageRunScheduled struct { FlowID string `json:"flowId,omitempty"` } -type FlowInvokeEnvelopeMessageSchedule struct { - Delay float64 `json:"delay,omitempty"` - Input any `json:"input,omitempty"` -} - -type FlowInvokeEnvelopeMessageStart struct { - Input any `json:"input,omitempty"` - Labels map[string]string `json:"labels,omitempty"` -} - -type FlowInvokeEnvelopeMessageState struct { - FlowID string `json:"flowId,omitempty"` -} - -type FlowResponse struct { - Response any `json:"response,omitempty"` -} - type FlowExecution struct { // end time in milliseconds since the epoch - EndTime Milliseconds `json:"endTime,omitempty"` + EndTime gtime.Milliseconds `json:"endTime,omitempty"` // start time in milliseconds since the epoch - StartTime Milliseconds `json:"startTime,omitempty"` - TraceIDs []string `json:"traceIds,omitempty"` + StartTime gtime.Milliseconds `json:"startTime,omitempty"` + TraceIDs []string `json:"traceIds,omitempty"` } +// BlockedOnStep describes the step of the flow that the flow is blocked on. type BlockedOnStep struct { Name string `json:"name,omitempty"` Schema string `json:"schema,omitempty"` diff --git a/go/genkit/registry.go b/go/genkit/registry.go index 37ce88dd5..94a92ea34 100644 --- a/go/genkit/registry.go +++ b/go/genkit/registry.go @@ -22,6 +22,7 @@ import ( "slices" "sync" + "github.com/firebase/genkit/go/gtrace" sdktrace "go.opentelemetry.io/otel/sdk/trace" "golang.org/x/exp/maps" ) @@ -48,13 +49,13 @@ type registry struct { // TraceStores, at most one for each [Environment]. // Only the prod trace store is actually registered; the dev one is // always created automatically. But it's simpler if we keep them together here. - traceStores map[Environment]TraceStore + traceStores map[Environment]gtrace.Store } func newRegistry() (*registry, error) { r := ®istry{ actions: map[string]action{}, - traceStores: map[Environment]TraceStore{}, + traceStores: map[Environment]gtrace.Store{}, } tstore, err := newDevTraceStore() if err != nil { @@ -66,7 +67,7 @@ func newRegistry() (*registry, error) { return r, nil } -// An Environment is the development context that the program is running in. +// An Environment is the execution context that the program is running in. type Environment string const ( @@ -143,17 +144,17 @@ func (r *registry) listActions() []actionDesc { return ads } -// RegisterTraceStore uses the given TraceStore to record traces in the prod environment. -// (A TraceStore that writes to the local filesystem is always installed in the dev environment.) +// RegisterTraceStore uses the given trace.Store to record traces in the prod environment. +// (A trace.Store that writes to the local filesystem is always installed in the dev environment.) // The returned function should be called before the program ends to ensure that // all pending data is stored. // RegisterTraceStore panics if called more than once. -func RegisterTraceStore(ts TraceStore) (shutdown func(context.Context) error) { +func RegisterTraceStore(ts gtrace.Store) (shutdown func(context.Context) error) { globalRegistry.registerTraceStore(EnvironmentProd, ts) return globalRegistry.tstate.addTraceStoreBatch(ts) } -func (r *registry) registerTraceStore(env Environment, ts TraceStore) { +func (r *registry) registerTraceStore(env Environment, ts gtrace.Store) { r.mu.Lock() defer r.mu.Unlock() if _, ok := r.traceStores[env]; ok { @@ -162,7 +163,7 @@ func (r *registry) registerTraceStore(env Environment, ts TraceStore) { r.traceStores[env] = ts } -func (r *registry) lookupTraceStore(env Environment) TraceStore { +func (r *registry) lookupTraceStore(env Environment) gtrace.Store { r.mu.Lock() defer r.mu.Unlock() return r.traceStores[env] diff --git a/go/genkit/schemas.config b/go/genkit/schemas.config index 5f4b55829..bad4ecf1d 100644 --- a/go/genkit/schemas.config +++ b/go/genkit/schemas.config @@ -1,6 +1,8 @@ # This file holds configuration for the genkit-schema.json file # generated by the npm export:schemas script. +genkit import github.com/firebase/genkit/go/gtime + # DocumentData type was hand-written. DocumentData omit @@ -45,19 +47,32 @@ GenerationCommonConfig.topK type int GenerateRequestOutputFormat name OutputFormat OperationBlockedOnStep name BlockedOnStep +OperationBlockedOnStep doc +BlockedOnStep describes the step of the flow that the flow is blocked on. +. OutputFormatJson name OutputFormatJSON # Some flow types are generic, so they must be written manually. # There isn't enough information in the JSON Schema to determine the # generic type parameters. +# Other flow types are written manually. + FlowState omit +FlowResponse omit FlowInvokeEnvelopeMessage omit +FlowInvokeEnvelopeMessageResume omit +FlowInvokeEnvelopeMessageRetry omit +FlowInvokeEnvelopeMessageSchedule omit +FlowInvokeEnvelopeMessageStart omit +FlowInvokeEnvelopeMessageState omit Operation omit FlowStateExecution name FlowExecution -FlowStateExecution.startTime type Milliseconds -FlowStateExecution.endTime type Milliseconds +FlowStateExecution.startTime type gtime.Milliseconds +FlowStateExecution.endTime type gtime.Milliseconds + +FlowError name flowError GenerateRequest.messages doc Messages is a list of messages to pass to the model. The first n-1 Messages diff --git a/go/genkit/trace_store_exporter.go b/go/genkit/trace_store_exporter.go index ea3e15d03..9f6865e5e 100644 --- a/go/genkit/trace_store_exporter.go +++ b/go/genkit/trace_store_exporter.go @@ -19,18 +19,20 @@ import ( "errors" "strings" + "github.com/firebase/genkit/go/gtime" + "github.com/firebase/genkit/go/gtrace" "go.opentelemetry.io/otel/attribute" sdktrace "go.opentelemetry.io/otel/sdk/trace" - "go.opentelemetry.io/otel/trace" + otrace "go.opentelemetry.io/otel/trace" ) // A traceStoreExporter is an OpenTelemetry SpanExporter that // writes spans to a TraceStore. type traceStoreExporter struct { - store TraceStore + store gtrace.Store } -func newTraceStoreExporter(store TraceStore) *traceStoreExporter { +func newTraceStoreExporter(store gtrace.Store) *traceStoreExporter { return &traceStoreExporter{store} } @@ -39,7 +41,7 @@ func newTraceStoreExporter(store TraceStore) *traceStoreExporter { // Saving is not atomic: it is possible that some but not all spans will be saved. func (e *traceStoreExporter) ExportSpans(ctx context.Context, spans []sdktrace.ReadOnlySpan) error { // Group spans by trace ID. - spansByTrace := map[trace.TraceID][]sdktrace.ReadOnlySpan{} + spansByTrace := map[otrace.TraceID][]sdktrace.ReadOnlySpan{} for _, span := range spans { tid := span.SpanContext().TraceID() spansByTrace[tid] = append(spansByTrace[tid], span) @@ -63,8 +65,8 @@ func (e *traceStoreExporter) ExportSpans(ctx context.Context, spans []sdktrace.R // convertTrace converts a list of spans to a TraceData. // The spans must all have the same trace ID. -func convertTrace(spans []sdktrace.ReadOnlySpan) (*TraceData, error) { - td := &TraceData{Spans: map[string]*SpanData{}} +func convertTrace(spans []sdktrace.ReadOnlySpan) (*gtrace.Data, error) { + td := >race.Data{Spans: map[string]*gtrace.SpanData{}} for _, span := range spans { cspan := convertSpan(span) // The unique span with no parent determines @@ -83,19 +85,19 @@ func convertTrace(spans []sdktrace.ReadOnlySpan) (*TraceData, error) { } // convertSpan converts an OpenTelemetry span to a SpanData. -func convertSpan(span sdktrace.ReadOnlySpan) *SpanData { +func convertSpan(span sdktrace.ReadOnlySpan) *gtrace.SpanData { sc := span.SpanContext() - sd := &SpanData{ + sd := >race.SpanData{ SpanID: sc.SpanID().String(), TraceID: sc.TraceID().String(), - StartTime: timeToMilliseconds(span.StartTime()), - EndTime: timeToMilliseconds(span.EndTime()), + StartTime: gtime.ToMilliseconds(span.StartTime()), + EndTime: gtime.ToMilliseconds(span.EndTime()), Attributes: attributesToMap(span.Attributes()), DisplayName: span.Name(), Links: convertLinks(span.Links()), - InstrumentationLibrary: InstrumentationLibrary(span.InstrumentationLibrary()), + InstrumentationLibrary: gtrace.InstrumentationLibrary(span.InstrumentationLibrary()), SpanKind: strings.ToUpper(span.SpanKind().String()), - SameProcessAsParentSpan: boolValue{!sc.IsRemote()}, + SameProcessAsParentSpan: gtrace.BoolValue{!sc.IsRemote()}, Status: convertStatus(span.Status()), } if p := span.Parent(); p.HasSpanID() { @@ -113,10 +115,10 @@ func attributesToMap(attrs []attribute.KeyValue) map[string]any { return m } -func convertLinks(links []sdktrace.Link) []*Link { - var cls []*Link +func convertLinks(links []sdktrace.Link) []*gtrace.Link { + var cls []*gtrace.Link for _, l := range links { - cl := &Link{ + cl := >race.Link{ SpanContext: convertSpanContext(l.SpanContext), Attributes: attributesToMap(l.Attributes), DroppedAttributesCount: l.DroppedAttributeCount, @@ -126,8 +128,8 @@ func convertLinks(links []sdktrace.Link) []*Link { return cls } -func convertSpanContext(sc trace.SpanContext) SpanContext { - return SpanContext{ +func convertSpanContext(sc otrace.SpanContext) gtrace.SpanContext { + return gtrace.SpanContext{ TraceID: sc.TraceID().String(), SpanID: sc.SpanID().String(), IsRemote: sc.IsRemote(), @@ -135,12 +137,12 @@ func convertSpanContext(sc trace.SpanContext) SpanContext { } } -func convertEvents(evs []sdktrace.Event) []TimeEvent { - var tes []TimeEvent +func convertEvents(evs []sdktrace.Event) []gtrace.TimeEvent { + var tes []gtrace.TimeEvent for _, e := range evs { - tes = append(tes, TimeEvent{ - Time: timeToMilliseconds(e.Time), - Annotation: annotation{ + tes = append(tes, gtrace.TimeEvent{ + Time: gtime.ToMilliseconds(e.Time), + Annotation: gtrace.Annotation{ Description: e.Name, Attributes: attributesToMap(e.Attributes), }, @@ -149,8 +151,8 @@ func convertEvents(evs []sdktrace.Event) []TimeEvent { return tes } -func convertStatus(s sdktrace.Status) Status { - return Status{ +func convertStatus(s sdktrace.Status) gtrace.Status { + return gtrace.Status{ Code: uint32(s.Code), Description: s.Description, } diff --git a/go/genkit/trace_store_exporter_test.go b/go/genkit/trace_store_exporter_test.go index 683740a6e..529733938 100644 --- a/go/genkit/trace_store_exporter_test.go +++ b/go/genkit/trace_store_exporter_test.go @@ -18,6 +18,8 @@ import ( "testing" "time" + "github.com/firebase/genkit/go/gtime" + "github.com/firebase/genkit/go/gtrace" "github.com/google/go-cmp/cmp" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" @@ -83,24 +85,24 @@ func TestConvertSpan(t *testing.T) { SchemaURL: "surl", }, } - want := &SpanData{ + want := >race.SpanData{ DisplayName: "name", TraceID: traceID, SpanID: spanID1, ParentSpanID: spanID2, SpanKind: "INTERNAL", - StartTime: Milliseconds(1e3), - EndTime: Milliseconds(2e3), + StartTime: gtime.Milliseconds(1e3), + EndTime: gtime.Milliseconds(2e3), Attributes: map[string]any{"k": "v"}, - TimeEvents: timeEvents{TimeEvent: []TimeEvent{{ - Time: Milliseconds(3e3), - Annotation: annotation{ + TimeEvents: gtrace.TimeEvents{TimeEvent: []gtrace.TimeEvent{{ + Time: gtime.Milliseconds(3e3), + Annotation: gtrace.Annotation{ Attributes: map[string]any{"k2": "v2"}, Description: "ename", }, }}}, - Links: []*Link{{ - SpanContext: SpanContext{ + Links: []*gtrace.Link{{ + SpanContext: gtrace.SpanContext{ TraceID: traceID, SpanID: spanID1, IsRemote: true, @@ -109,8 +111,8 @@ func TestConvertSpan(t *testing.T) { Attributes: map[string]any{"k3": "v3"}, DroppedAttributesCount: 1, }}, - Status: Status{2, "desc"}, - InstrumentationLibrary: InstrumentationLibrary{ + Status: gtrace.Status{Code: 2, Description: "desc"}, + InstrumentationLibrary: gtrace.InstrumentationLibrary{ Name: "iname", Version: "version", SchemaURL: "surl", diff --git a/go/genkit/tracing.go b/go/genkit/tracing.go index dfe4a996f..a1facf631 100644 --- a/go/genkit/tracing.go +++ b/go/genkit/tracing.go @@ -22,6 +22,8 @@ import ( "path/filepath" "sync" + "github.com/firebase/genkit/go/gtrace" + "github.com/firebase/genkit/go/internal" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" sdktrace "go.opentelemetry.io/otel/sdk/trace" @@ -30,7 +32,7 @@ import ( // tracingState holds OpenTelemetry values for creating traces. type tracingState struct { - tp *sdktrace.TracerProvider // references TraceStores + tp *sdktrace.TracerProvider // references gtrace.Stores tracer trace.Tracer // returned from tp.Tracer(), cached } @@ -48,9 +50,9 @@ func (ts *tracingState) registerSpanProcessor(sp sdktrace.SpanProcessor) { // addTraceStoreImmediate adds tstore to the tracingState. // Traces are saved immediately as they are finshed. -// Use this for a TraceStore with a fast Save method, +// Use this for a gtrace.Store with a fast Save method, // such as one that writes to a file. -func (ts *tracingState) addTraceStoreImmediate(tstore TraceStore) { +func (ts *tracingState) addTraceStoreImmediate(tstore gtrace.Store) { e := newTraceStoreExporter(tstore) // Adding a SimpleSpanProcessor is like using the WithSyncer option. ts.registerSpanProcessor(sdktrace.NewSimpleSpanProcessor(e)) @@ -61,18 +63,18 @@ func (ts *tracingState) addTraceStoreImmediate(tstore TraceStore) { // addTraceStoreBatch adds ts to the tracingState. // Traces are batched before being sent for processing. -// Use this for a TraceStore with a potentially expensive Save method, +// Use this for a gtrace.Store with a potentially expensive Save method, // such as one that makes an RPC. // Callers must invoke the returned function at the end of the program to flush the final batch // and perform other cleanup. -func (ts *tracingState) addTraceStoreBatch(tstore TraceStore) (shutdown func(context.Context) error) { +func (ts *tracingState) addTraceStoreBatch(tstore gtrace.Store) (shutdown func(context.Context) error) { e := newTraceStoreExporter(tstore) // Adding a BatchSpanProcessor is like using the WithBatcher option. ts.registerSpanProcessor(sdktrace.NewBatchSpanProcessor(e)) return ts.tp.Shutdown } -func newDevTraceStore() (TraceStore, error) { +func newDevTraceStore() (gtrace.Store, error) { programName := filepath.Base(os.Args[0]) rootHash := fmt.Sprintf("%02x", md5.Sum([]byte(programName))) dir := filepath.Join(os.TempDir(), ".genkit", rootHash, "traces") @@ -80,7 +82,7 @@ func newDevTraceStore() (TraceStore, error) { return nil, err } // Don't remove the temp directory, for post-mortem debugging. - return NewFileTraceStore(dir) + return gtrace.NewFileStore(dir) } // The rest of this file contains code translated from js/common/src/tracing/*.ts. @@ -129,30 +131,30 @@ func runInNewSpan[I, O any]( output, err := f(ctx, input) if err != nil { - sm.State = SpanStateError + sm.State = spanStateError span.SetStatus(codes.Error, err.Error()) - return zero[O](), err + return internal.Zero[O](), err } // TODO(jba): the typescript code checks if sm.State == error here. Can that happen? - sm.State = SpanStateSuccess + sm.State = spanStateSuccess sm.Output = output return output, nil } -// SpanState is the completion status of a span. -// An empty SpanState indicates that the span has not ended. -type SpanState string +// spanState is the completion status of a span. +// An empty spanState indicates that the span has not ended. +type spanState string const ( - SpanStateSuccess SpanState = "success" - SpanStateError SpanState = "error" + spanStateSuccess spanState = "success" + spanStateError spanState = "error" ) // spanMetadata holds genkit-specific information about a span. type spanMetadata struct { Name string - State SpanState + State spanState IsRoot bool Input any Output any @@ -182,9 +184,9 @@ func (sm *spanMetadata) attributes() []attribute.KeyValue { kvs := []attribute.KeyValue{ attribute.String("genkit:name", sm.Name), attribute.String("genkit:state", string(sm.State)), - attribute.String("genkit:input", jsonString(sm.Input)), + attribute.String("genkit:input", internal.JSONString(sm.Input)), attribute.String("genkit:path", sm.Path), - attribute.String("genkit:output", jsonString(sm.Output)), + attribute.String("genkit:output", internal.JSONString(sm.Output)), } if sm.IsRoot { kvs = append(kvs, attribute.Bool("genkit:isRoot", sm.IsRoot)) diff --git a/go/genkit/tracing_test.go b/go/genkit/tracing_test.go index b3f117871..e573d18e4 100644 --- a/go/genkit/tracing_test.go +++ b/go/genkit/tracing_test.go @@ -32,7 +32,7 @@ func TestSpanMetadata(t *testing.T) { ) sm := &spanMetadata{ Name: "name", - State: SpanStateSuccess, + State: spanStateSuccess, Path: "parent/name", Input: testInput, Output: testOutput, diff --git a/go/genkit/util.go b/go/genkit/util.go index 8746924ef..424deeda8 100644 --- a/go/genkit/util.go +++ b/go/genkit/util.go @@ -16,8 +16,6 @@ package genkit import ( "context" - "net/url" - "time" ) // A contextKey is a unique, typed key for a value stored in a context. @@ -35,33 +33,8 @@ func (k contextKey[T]) newContext(ctx context.Context, value T) context.Context } // fromContext returns the value associated with this key in the context, -// or the zero value for T if the key is not present. +// or the internal.Zero value for T if the key is not present. func (k contextKey[T]) fromContext(ctx context.Context) T { t, _ := ctx.Value(k.key).(T) return t } - -// zero returns the zero value for T. -func zero[T any]() T { - var z T - return z -} - -// clean returns a valid filename for id. -func clean(id string) string { - return url.PathEscape(id) -} - -// Milliseconds represents a time as the number of milliseconds since the Unix epoch. -type Milliseconds float64 - -func timeToMilliseconds(t time.Time) Milliseconds { - nsec := t.UnixNano() - return Milliseconds(float64(nsec) / 1e6) -} - -func (m Milliseconds) time() time.Time { - sec := int64(m / 1e3) - nsec := int64((float64(m) - float64(sec*1e3)) * 1e6) - return time.Unix(sec, nsec) -} diff --git a/go/gtime/gtime.go b/go/gtime/gtime.go new file mode 100644 index 000000000..3b8c63cc9 --- /dev/null +++ b/go/gtime/gtime.go @@ -0,0 +1,34 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package gtime provides time functionality for Go Genkit. +package gtime + +import "time" + +// Milliseconds represents a time as the number of milliseconds since the Unix epoch. +type Milliseconds float64 + +// ToMilliseconds converts a time.Time to a Milliseconds. +func ToMilliseconds(t time.Time) Milliseconds { + nsec := t.UnixNano() + return Milliseconds(float64(nsec) / 1e6) +} + +// Time converts a Milliseconds to a time.Time. +func (m Milliseconds) Time() time.Time { + sec := int64(m / 1e3) + nsec := int64((float64(m) - float64(sec*1e3)) * 1e6) + return time.Unix(sec, nsec) +} diff --git a/go/gtime/gtime_test.go b/go/gtime/gtime_test.go new file mode 100644 index 000000000..94081bac4 --- /dev/null +++ b/go/gtime/gtime_test.go @@ -0,0 +1,37 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gtime + +import ( + "testing" + "time" +) + +func TestMilliseconds(t *testing.T) { + for _, tm := range []time.Time{ + time.Unix(0, 0), + time.Unix(1, 0), + time.Unix(100, 554), + time.Date(2024, time.March, 24, 1, 2, 3, 4, time.UTC), + } { + m := ToMilliseconds(tm) + got := m.Time() + // Compare to the nearest millisecond. Due to the floating-point operations in the above + // two functions, we can't be sure that the round trip is more accurate than that. + if !got.Round(time.Millisecond).Equal(tm.Round(time.Millisecond)) { + t.Errorf("got %v, want %v", got, tm) + } + } +} diff --git a/go/genkit/file_trace_store.go b/go/gtrace/file_store.go similarity index 72% rename from go/genkit/file_trace_store.go rename to go/gtrace/file_store.go index e61b9c9b4..dd5fa42f3 100644 --- a/go/genkit/file_trace_store.go +++ b/go/gtrace/file_store.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package genkit +package gtrace import ( "context" @@ -24,25 +24,27 @@ import ( "slices" "strconv" "time" + + "github.com/firebase/genkit/go/internal" ) -// A FileTraceStore is a TraceStore that writes traces to files. -type FileTraceStore struct { +// A FileStore is a Store that writes traces to files. +type FileStore struct { dir string } -// NewFileTraceStore creates a FileTraceStore that writes traces to the given +// NewFileStore creates a FileStore that writes traces to the given // directory. The directory is created if it does not exist. -func NewFileTraceStore(dir string) (*FileTraceStore, error) { +func NewFileStore(dir string) (*FileStore, error) { if err := os.MkdirAll(dir, 0o755); err != nil { return nil, err } - return &FileTraceStore{dir: dir}, nil + return &FileStore{dir: dir}, nil } -// Save implements [TraceStore.Save]. +// Save implements [Store.Save]. // It is not safe to call Save concurrently with the same ID. -func (s *FileTraceStore) Save(ctx context.Context, id string, td *TraceData) error { +func (s *FileStore) Save(ctx context.Context, id string, td *Data) error { existing, err := s.Load(ctx, id) if err == nil { // Merge the existing spans with the incoming ones. @@ -58,22 +60,22 @@ func (s *FileTraceStore) Save(ctx context.Context, id string, td *TraceData) err } else if !errors.Is(err, fs.ErrNotExist) { return err } - return writeJSONFile(filepath.Join(s.dir, clean(id)), td) + return internal.WriteJSONFile(filepath.Join(s.dir, internal.Clean(id)), td) } -// Load implements [TraceStore.Load]. -func (s *FileTraceStore) Load(ctx context.Context, id string) (*TraceData, error) { - var td *TraceData - if err := readJSONFile(filepath.Join(s.dir, clean(id)), &td); err != nil { +// Load implements [Store.Load]. +func (s *FileStore) Load(ctx context.Context, id string) (*Data, error) { + var td *Data + if err := internal.ReadJSONFile(filepath.Join(s.dir, internal.Clean(id)), &td); err != nil { return nil, err } return td, nil } -// List implements [TraceStore.List]. +// List implements [Store.List]. // The traces are returned in the order they were written, newest first. // The default limit is 10. -func (s *FileTraceStore) List(ctx context.Context, q *TraceQuery) ([]*TraceData, string, error) { +func (s *FileStore) List(ctx context.Context, q *Query) ([]*Data, string, error) { entries, err := os.ReadDir(s.dir) if err != nil { return nil, "", err @@ -97,10 +99,10 @@ func (s *FileTraceStore) List(ctx context.Context, q *TraceQuery) ([]*TraceData, return nil, "", err } - var ts []*TraceData + var ts []*Data for _, e := range entries[start:end] { - var t *TraceData - if err := readJSONFile(filepath.Join(s.dir, e.Name()), &t); err != nil { + var t *Data + if err := internal.ReadJSONFile(filepath.Join(s.dir, e.Name()), &t); err != nil { return nil, "", err } ts = append(ts, t) @@ -113,7 +115,7 @@ func (s *FileTraceStore) List(ctx context.Context, q *TraceQuery) ([]*TraceData, } // listRange returns the range of elements to return from a List call. -func listRange(q *TraceQuery, total int) (start, end int, err error) { +func listRange(q *Query, total int) (start, end int, err error) { const defaultLimit = 10 start = 0 end = total @@ -131,14 +133,14 @@ func listRange(q *TraceQuery, total int) (start, end int, err error) { // TODO(jba): consider using distance from the end (len(entries) - end). start, err = strconv.Atoi(ctoken) if err != nil { - return 0, 0, fmt.Errorf("%w: parsing continuation token: %v", errBadQuery, err) + return 0, 0, fmt.Errorf("%w: parsing continuation token: %v", ErrBadQuery, err) } if start < 0 || start >= total { - return 0, 0, fmt.Errorf("%w: continuation token out of range", errBadQuery) + return 0, 0, fmt.Errorf("%w: continuation token out of range", ErrBadQuery) } } if limit < 0 { - return 0, 0, fmt.Errorf("%w: negative limit", errBadQuery) + return 0, 0, fmt.Errorf("%w: negative limit", ErrBadQuery) } if limit == 0 { limit = defaultLimit @@ -150,6 +152,6 @@ func listRange(q *TraceQuery, total int) (start, end int, err error) { return start, end, nil } -func (s *FileTraceStore) loadAny(id string, p any) error { - return readJSONFile(filepath.Join(s.dir, clean(id)), p) +func (s *FileStore) LoadAny(id string, p any) error { + return internal.ReadJSONFile(filepath.Join(s.dir, internal.Clean(id)), p) } diff --git a/go/genkit/file_trace_store_test.go b/go/gtrace/file_store_test.go similarity index 78% rename from go/genkit/file_trace_store_test.go rename to go/gtrace/file_store_test.go index 04b0d851e..b56635242 100644 --- a/go/genkit/file_trace_store_test.go +++ b/go/gtrace/file_store_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package genkit +package gtrace import ( "context" @@ -23,9 +23,9 @@ import ( "github.com/google/go-cmp/cmp" ) -func TestFileTraceStore(t *testing.T) { +func TestFileStore(t *testing.T) { ctx := context.Background() - td1 := &TraceData{ + td1 := &Data{ DisplayName: "td1", StartTime: 10, EndTime: 20, @@ -34,7 +34,7 @@ func TestFileTraceStore(t *testing.T) { "s2": {SpanID: "sid2"}, }, } - ts, err := NewFileTraceStore(t.TempDir()) + ts, err := NewFileStore(t.TempDir()) if err != nil { t.Fatal(err) } @@ -51,7 +51,7 @@ func TestFileTraceStore(t *testing.T) { // Saving a span with the same ID merges spans and overrides the other // fields. - td2 := &TraceData{ + td2 := &Data{ DisplayName: "td2", StartTime: 30, EndTime: 40, @@ -62,7 +62,7 @@ func TestFileTraceStore(t *testing.T) { if err := ts.Save(ctx, "id1", td2); err != nil { t.Fatal(err) } - want := &TraceData{ + want := &Data{ TraceID: "id1", DisplayName: "td2", StartTime: 30, @@ -82,15 +82,15 @@ func TestFileTraceStore(t *testing.T) { } // Test List. - td3 := &TraceData{DisplayName: "td3"} + td3 := &Data{DisplayName: "td3"} time.Sleep(50 * time.Millisecond) // force different mtimes if err := ts.Save(ctx, "id3", td3); err != nil { t.Fatal(err) } gotTDs, gotCT, err := ts.List(ctx, nil) - // All the TraceDatas, in the expected order. - wantTDs := []*TraceData{td3, want} + // All the Datas, in the expected order. + wantTDs := []*Data{td3, want} if diff := cmp.Diff(wantTDs, gotTDs); diff != "" { t.Errorf("mismatch (-want, +got):\n%s", diff) } @@ -103,31 +103,31 @@ func TestListRange(t *testing.T) { // These tests assume the default limit is 10. total := 20 for _, test := range []struct { - q *TraceQuery + q *Query wantStart, wantEnd int wantErr bool }{ {nil, 0, 10, false}, { - &TraceQuery{Limit: 1}, + &Query{Limit: 1}, 0, 1, false, }, { - &TraceQuery{Limit: 5, ContinuationToken: "1"}, + &Query{Limit: 5, ContinuationToken: "1"}, 1, 6, false, }, { - &TraceQuery{ContinuationToken: "5"}, + &Query{ContinuationToken: "5"}, 5, 15, false, }, - {&TraceQuery{Limit: -1}, 0, 0, true}, // negative limit - {&TraceQuery{ContinuationToken: "x"}, 0, 0, true}, // not a number - {&TraceQuery{ContinuationToken: "-1"}, 0, 0, true}, // too small - {&TraceQuery{ContinuationToken: "21"}, 0, 0, true}, // too large + {&Query{Limit: -1}, 0, 0, true}, // negative limit + {&Query{ContinuationToken: "x"}, 0, 0, true}, // not a number + {&Query{ContinuationToken: "-1"}, 0, 0, true}, // too small + {&Query{ContinuationToken: "21"}, 0, 0, true}, // too large } { gotStart, gotEnd, err := listRange(test.q, total) if test.wantErr { - if !errors.Is(err, errBadQuery) { + if !errors.Is(err, ErrBadQuery) { t.Errorf("%+v: got err %v, want errBadQuery", test.q, err) } } else if gotStart != test.wantStart || gotEnd != test.wantEnd || err != nil { diff --git a/go/genkit/trace_store.go b/go/gtrace/store.go similarity index 73% rename from go/genkit/trace_store.go rename to go/gtrace/store.go index 667517602..33968aa73 100644 --- a/go/genkit/trace_store.go +++ b/go/gtrace/store.go @@ -12,42 +12,43 @@ // See the License for the specific language governing permissions and // limitations under the License. -package genkit - -// Types related to storing trace information. +// The trace package provides support for storing and exporting traces. +package gtrace import ( "context" "errors" + + "github.com/firebase/genkit/go/gtime" ) -// A TraceStore stores trace information. +// A Store stores trace information. // Every trace has a unique ID. -type TraceStore interface { - // Save saves the TraceData to the store. If a TraceData with the id already exists, +type Store interface { + // Save saves the Data to the store. If a Data with the id already exists, // the two are merged. - Save(ctx context.Context, id string, td *TraceData) error - // Load reads the TraceData with the given ID from the store. + Save(ctx context.Context, id string, td *Data) error + // Load reads the Data with the given ID from the store. // It returns an error that is fs.ErrNotExist if there isn't one. - Load(ctx context.Context, id string) (*TraceData, error) - // List returns all the TraceDatas in the store that satisfy q, in some deterministic + Load(ctx context.Context, id string) (*Data, error) + // List returns all the Datas in the store that satisfy q, in some deterministic // order. // It also returns a continuation token: an opaque string that can be passed // to the next call to List to resume the listing from where it left off. If // the listing reached the end, this is the empty string. - // If the TraceQuery is malformed, List returns an error that is errBadQuery. - List(ctx context.Context, q *TraceQuery) (tds []*TraceData, contToken string, err error) + // If the Query is malformed, List returns an error that is errBadQuery. + List(ctx context.Context, q *Query) (tds []*Data, contToken string, err error) - // loadAny is like Load, but accepts a pointer to any type. + // LoadAny is like Load, but accepts a pointer to any type. // It is for testing (see conformance_test.go). // TODO(jba): replace Load with this. - loadAny(id string, p any) error + LoadAny(id string, p any) error } -var errBadQuery = errors.New("bad TraceQuery") +var ErrBadQuery = errors.New("bad trace.Query") -// A TraceQuery filters the result of [TraceStore.List]. -type TraceQuery struct { +// A Query filters the result of [Store.List]. +type Query struct { // Maximum number of traces to return. If zero, a default value may be used. // Callers should not assume they will get the entire list; they should always // check the returned continuation token. @@ -57,12 +58,12 @@ type TraceQuery struct { ContinuationToken string } -// TraceData is information about a trace. -type TraceData struct { +// Data is information about a trace. +type Data struct { TraceID string `json:"traceId"` DisplayName string `json:"displayName"` - StartTime Milliseconds `json:"startTime"` - EndTime Milliseconds `json:"endTime"` + StartTime gtime.Milliseconds `json:"startTime"` + EndTime gtime.Milliseconds `json:"endTime"` Spans map[string]*SpanData `json:"spans"` } @@ -75,33 +76,33 @@ type SpanData struct { SpanID string `json:"spanId"` TraceID string `json:"traceId,omitempty"` ParentSpanID string `json:"parentSpanId,omitempty"` - StartTime Milliseconds `json:"startTime"` - EndTime Milliseconds `json:"endTime"` + StartTime gtime.Milliseconds `json:"startTime"` + EndTime gtime.Milliseconds `json:"endTime"` Attributes map[string]any `json:"attributes,omitempty"` DisplayName string `json:"displayName"` Links []*Link `json:"links,omitempty"` InstrumentationLibrary InstrumentationLibrary `json:"instrumentationLibrary,omitempty"` SpanKind string `json:"spanKind"` // trace.SpanKind as a string // This bool is in a separate struct, to match the js (and presumably the OTel) formats. - SameProcessAsParentSpan boolValue `json:"sameProcessAsParentSpan"` + SameProcessAsParentSpan BoolValue `json:"sameProcessAsParentSpan"` Status Status `json:"status"` - TimeEvents timeEvents `json:"timeEvents,omitempty"` + TimeEvents TimeEvents `json:"timeEvents,omitempty"` } -type timeEvents struct { +type TimeEvents struct { TimeEvent []TimeEvent `json:"timeEvent,omitempty"` } -type boolValue struct { +type BoolValue struct { Value bool `json:"value,omitempty"` } type TimeEvent struct { - Time Milliseconds `json:"time,omitempty"` - Annotation annotation `json:"annotation,omitempty"` + Time gtime.Milliseconds `json:"time,omitempty"` + Annotation Annotation `json:"annotation,omitempty"` } -type annotation struct { +type Annotation struct { Attributes map[string]any `json:"attributes,omitempty"` Description string `json:"description,omitempty"` } diff --git a/go/genkit/trace_store_test.go b/go/gtrace/store_test.go similarity index 77% rename from go/genkit/trace_store_test.go rename to go/gtrace/store_test.go index 534baf64d..e62a2b2ab 100644 --- a/go/genkit/trace_store_test.go +++ b/go/gtrace/store_test.go @@ -12,35 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. -package genkit +package gtrace import ( "encoding/json" "os" "path/filepath" "testing" - "time" "github.com/google/go-cmp/cmp" ) -func TestMilliseconds(t *testing.T) { - for _, tm := range []time.Time{ - time.Unix(0, 0), - time.Unix(1, 0), - time.Unix(100, 554), - time.Date(2024, time.March, 24, 1, 2, 3, 4, time.UTC), - } { - m := timeToMilliseconds(tm) - got := m.time() - // Compare to the nearest millisecond. Due to the floating-point operations in the above - // two functions, we can't be sure that the round trip is more accurate than that. - if !got.Round(time.Millisecond).Equal(tm.Round(time.Millisecond)) { - t.Errorf("got %v, want %v", got, tm) - } - } -} - func TestTraceJSON(t *testing.T) { // We want to compare a JSON trace produced by the genkit javascript code, // in testdata/trace.json, with our own JSON output. @@ -56,7 +38,7 @@ func TestTraceJSON(t *testing.T) { if err != nil { t.Fatal(err) } - var td TraceData + var td Data if err := json.Unmarshal(jsBytes, &td); err != nil { t.Fatal(err) } diff --git a/go/genkit/testdata/trace.json b/go/gtrace/testdata/trace.json similarity index 100% rename from go/genkit/testdata/trace.json rename to go/gtrace/testdata/trace.json diff --git a/go/internal/cmd/jsonschemagen/jsonschemagen.go b/go/internal/cmd/jsonschemagen/jsonschemagen.go index cbaf2dc96..c6b92df67 100644 --- a/go/internal/cmd/jsonschemagen/jsonschemagen.go +++ b/go/internal/cmd/jsonschemagen/jsonschemagen.go @@ -265,6 +265,10 @@ func (g *generator) generate() ([]byte, error) { g.pr("// This file was generated by jsonschemagen. DO NOT EDIT.\n\n") g.pr("package %s\n\n", g.pkgName) + if pc := g.cfg.configFor(g.pkgName); pc != nil { + g.pr("import %q\n", pc.pkgPath) + } + // Sort the names so the output is deterministic. for _, name := range sortedKeys(g.schemas) { if ic := g.cfg.configFor(name); ic != nil && ic.omit { @@ -502,8 +506,8 @@ func (c config) configFor(name string) *itemConfig { return nil } -// itemConfig is configuration for one item, either a type or field. Not all itemConfig -// fields apply to both, but using one type simplifies the parser. +// itemConfig is configuration for one item: a type, a field or a package. +// Not all itemConfig fields apply to both, but using one type simplifies the parser. type itemConfig struct { omit bool name string @@ -515,8 +519,9 @@ type itemConfig struct { // parseConfigFile parses the config file. // The config file is line-oriented. Empty lines and lines beginning // with '#' are ignored. -// Other lines start with a name which is either TYPE or TYPE.FIELD. -// The names are always the original JSONSchema names, not Go names. +// Other lines start with a word which names either a package, a type or the +// field of a type (as TYPE.FIELD). +// Except for packages, the names are always the original JSONSchema names, not Go names. // The rest of the line is a directive; one of // // omit @@ -529,6 +534,8 @@ type itemConfig struct { // doc is following lines until the line "." // pkg // package path, relative to outdir (last component is package name) +// import +// path of package to import (for packages only) func parseConfigFile(filename string) (config, error) { c := config{ itemConfigs: map[string]*itemConfig{}, @@ -589,7 +596,11 @@ func parseConfigFile(filename string) (config, error) { return errf("need NAME pkg PATH") } ic.pkgPath = words[2] - + case "import": + if len(words) < 3 { + return errf("need NAME import PATH") + } + ic.pkgPath = words[2] default: return errf("unknown directive %q", words[1]) } diff --git a/go/genkit/json.go b/go/internal/json.go similarity index 73% rename from go/genkit/json.go rename to go/internal/json.go index a3f92eca7..49f7f1b7c 100644 --- a/go/genkit/json.go +++ b/go/internal/json.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package genkit +package internal import ( "encoding/json" @@ -21,9 +21,9 @@ import ( "os" ) -// jsonString returns json.Marshal(x) as a string. If json.Marshal returns +// JSONString returns json.Marshal(x) as a string. If json.Marshal returns // an error, jsonString returns the error text as a JSON string beginning "ERROR:". -func jsonString(x any) string { +func JSONString(x any) string { bytes, err := json.Marshal(x) if err != nil { bytes, _ = json.Marshal(fmt.Sprintf("ERROR: %v", err)) @@ -31,8 +31,8 @@ func jsonString(x any) string { return string(bytes) } -// writeJSONFile writes value to filename as JSON. -func writeJSONFile(filename string, value any) error { +// WriteJSONFile writes value to filename as JSON. +func WriteJSONFile(filename string, value any) error { f, err := os.Create(filename) if err != nil { return err @@ -41,12 +41,13 @@ func writeJSONFile(filename string, value any) error { err = errors.Join(err, f.Close()) }() enc := json.NewEncoder(f) - enc.SetIndent("", " ") // make the trace easy to read for debugging + enc.SetIndent("", " ") // make the value easy to read for debugging return enc.Encode(value) } -// readJSONFile JSON-decodes the contents of filename into pvalue. -func readJSONFile(filename string, pvalue any) error { +// ReadJSONFile JSON-decodes the contents of filename into pvalue, +// which must be a pointer. +func ReadJSONFile(filename string, pvalue any) error { f, err := os.Open(filename) if err != nil { return err diff --git a/go/internal/misc.go b/go/internal/misc.go new file mode 100644 index 000000000..d13cd5985 --- /dev/null +++ b/go/internal/misc.go @@ -0,0 +1,28 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import "net/url" + +// Zero returns the Zero value for T. +func Zero[T any]() T { + var z T + return z +} + +// Clean returns a valid filename for id. +func Clean(id string) string { + return url.PathEscape(id) +}