@@ -24,9 +24,9 @@ import (
24
24
"sync"
25
25
"time"
26
26
27
+ "github.com/firebase/genkit/go/core/tracing"
27
28
"github.com/firebase/genkit/go/gtime"
28
29
"github.com/firebase/genkit/go/internal"
29
- "github.com/firebase/genkit/go/internal/tracing"
30
30
"github.com/google/uuid"
31
31
otrace "go.opentelemetry.io/otel/trace"
32
32
)
@@ -251,7 +251,7 @@ func (f *Flow[I, O, S]) action() *Action[*flowInstruction[I], *flowState[I, O],
251
251
"outputSchema" : inferJSONSchema (o ),
252
252
}
253
253
cback := func (ctx context.Context , inst * flowInstruction [I ], cb func (context.Context , S ) error ) (* flowState [I , O ], error ) {
254
- tracing .SpanMetaKey . FromContext (ctx ). SetAttr ( "flow:wrapperAction" , "true" )
254
+ tracing .SetCustomMetadataAttr (ctx , "flow:wrapperAction" , "true" )
255
255
return f .runInstruction (ctx , inst , streamingCallback [S ](cb ))
256
256
}
257
257
return NewStreamingAction (f .name , ActionTypeFlow , metadata , cback )
@@ -359,12 +359,11 @@ func (f *Flow[I, O, S]) execute(ctx context.Context, state *flowState[I, O], dis
359
359
// TODO(jba): retrieve the JSON-marshaled SpanContext from state.traceContext.
360
360
// TODO(jba): add a span link to the context.
361
361
output , err := tracing .RunInNewSpan (ctx , fctx .tracingState (), f .name , "flow" , true , state .Input , func (ctx context.Context , input I ) (O , error ) {
362
- spanMeta := tracing .SpanMetaKey .FromContext (ctx )
363
- spanMeta .SetAttr ("flow:execution" , strconv .Itoa (len (state .Executions )- 1 ))
362
+ tracing .SetCustomMetadataAttr (ctx , "flow:execution" , strconv .Itoa (len (state .Executions )- 1 ))
364
363
// TODO(jba): put labels into span metadata.
365
- spanMeta . SetAttr ( "flow:name" , f .name )
366
- spanMeta . SetAttr ( "flow:id" , state .FlowID )
367
- spanMeta . SetAttr ( "flow:dispatchType" , dispatchType )
364
+ tracing . SetCustomMetadataAttr ( ctx , "flow:name" , f .name )
365
+ tracing . SetCustomMetadataAttr ( ctx , "flow:id" , state .FlowID )
366
+ tracing . SetCustomMetadataAttr ( ctx , "flow:dispatchType" , dispatchType )
368
367
rootSpanContext := otrace .SpanContextFromContext (ctx )
369
368
traceID := rootSpanContext .TraceID ().String ()
370
369
exec .TraceIDs = append (exec .TraceIDs , traceID )
@@ -376,15 +375,15 @@ func (f *Flow[I, O, S]) execute(ctx context.Context, state *flowState[I, O], dis
376
375
if err != nil {
377
376
// TODO(jba): handle InterruptError
378
377
internal .Logger (ctx ).Error ("flow failed" ,
379
- "path" , spanMeta . Path ,
378
+ "path" , tracing . SpanPath ( ctx ) ,
380
379
"err" , err .Error (),
381
380
)
382
381
writeFlowFailure (ctx , f .name , latency , err )
383
- spanMeta . SetAttr ( "flow:state" , "error" )
382
+ tracing . SetCustomMetadataAttr ( ctx , "flow:state" , "error" )
384
383
} else {
385
- internal .Logger (ctx ).Info ("flow succeeded" , "path" , spanMeta . Path )
384
+ internal .Logger (ctx ).Info ("flow succeeded" , "path" , tracing . SpanPath ( ctx ) )
386
385
writeFlowSuccess (ctx , f .name , latency )
387
- spanMeta . SetAttr ( "flow:state" , "done" )
386
+ tracing . SetCustomMetadataAttr ( ctx , "flow:state" , "done" )
388
387
389
388
}
390
389
// TODO(jba): telemetry
@@ -485,10 +484,9 @@ func Run[T any](ctx context.Context, name string, f func() (T, error)) (T, error
485
484
// as in the js.
486
485
return tracing .RunInNewSpan (ctx , fc .tracingState (), name , "flowStep" , false , 0 , func (ctx context.Context , _ int ) (T , error ) {
487
486
uName := fc .uniqueStepName (name )
488
- spanMeta := tracing .SpanMetaKey .FromContext (ctx )
489
- spanMeta .SetAttr ("flow:stepType" , "run" )
490
- spanMeta .SetAttr ("flow:stepName" , name )
491
- spanMeta .SetAttr ("flow:resolvedStepName" , uName )
487
+ tracing .SetCustomMetadataAttr (ctx , "flow:stepType" , "run" )
488
+ tracing .SetCustomMetadataAttr (ctx , "flow:stepName" , name )
489
+ tracing .SetCustomMetadataAttr (ctx , "flow:resolvedStepName" , uName )
492
490
// Memoize the function call, using the cache in the flowState.
493
491
// The locking here prevents corruption of the cache from concurrent access, but doesn't
494
492
// prevent two goroutines racing to check the cache and call f. However, that shouldn't
@@ -503,7 +501,7 @@ func Run[T any](ctx context.Context, name string, f func() (T, error)) (T, error
503
501
if err := json .Unmarshal (j , & t ); err != nil {
504
502
return internal .Zero [T ](), err
505
503
}
506
- spanMeta . SetAttr ( "flow:state" , "cached" )
504
+ tracing . SetCustomMetadataAttr ( ctx , "flow:state" , "cached" )
507
505
return t , nil
508
506
}
509
507
t , err := f ()
@@ -517,7 +515,7 @@ func Run[T any](ctx context.Context, name string, f func() (T, error)) (T, error
517
515
fs .lock ()
518
516
fs .cache ()[uName ] = json .RawMessage (bytes )
519
517
fs .unlock ()
520
- spanMeta . SetAttr ( "flow:state" , "run" )
518
+ tracing . SetCustomMetadataAttr ( ctx , "flow:state" , "run" )
521
519
return t , nil
522
520
})
523
521
}
0 commit comments