diff --git a/js/core/src/tracing/instrumentation.ts b/js/core/src/tracing/instrumentation.ts index 9cf810a9b..0641eba17 100644 --- a/js/core/src/tracing/instrumentation.ts +++ b/js/core/src/tracing/instrumentation.ts @@ -21,10 +21,10 @@ import { trace, } from '@opentelemetry/api'; import { AsyncLocalStorage } from 'node:async_hooks'; -import { SpanMetadata } from './types.js'; +import { SpanMetadata, TraceMetadata } from './types.js'; export const spanMetadataAls = new AsyncLocalStorage(); -export const pathVariants = new Set(); +export const traceMetadataAls = new AsyncLocalStorage(); export const ATTR_PREFIX = 'genkit'; export const SPAN_TYPE_ATTR = ATTR_PREFIX + ':type'; @@ -42,18 +42,23 @@ export async function newTrace( }, fn: (metadata: SpanMetadata, rootSpan: ApiSpan) => Promise ) { - return await runInNewSpan( - { - metadata: { - name: opts.name, - isRoot: true, + const traceMetadata = traceMetadataAls.getStore() || { + paths: new Set(), + }; + return await traceMetadataAls.run(traceMetadata, () => + runInNewSpan( + { + metadata: { + name: opts.name, + isRoot: true, + }, + labels: opts.labels, + links: opts.links, }, - labels: opts.labels, - links: opts.links, - }, - async (metadata, otSpan) => { - return await fn(metadata, otSpan); - } + async (metadata, otSpan) => { + return await fn(metadata, otSpan); + } + ) ); } @@ -84,7 +89,7 @@ export async function runInNewSpan( : ''; opts.metadata.path = parentPath + `/{${opts.metadata.name}${stepType}}`; - const pathVariantCount = pathVariants.size; + const pathCount = getCurrentPathCount(); const output = await spanMetadataAls.run(opts.metadata, () => fn(opts.metadata, otSpan, isInRoot) ); @@ -93,13 +98,14 @@ export async function runInNewSpan( } opts.metadata.path = decoratePathWithSubtype(opts.metadata); - - if (pathVariantCount == pathVariants.size) { - pathVariants.add(opts.metadata.path); + if (pathCount == getCurrentPathCount()) { + traceMetadataAls.getStore()?.paths?.add(opts.metadata.path); } return output; } catch (e) { + opts.metadata.path = decoratePathWithSubtype(opts.metadata); + traceMetadataAls.getStore()?.paths?.add(opts.metadata.path); opts.metadata.state = 'error'; otSpan.setStatus({ code: SpanStatusCode.ERROR, @@ -182,6 +188,10 @@ function getCurrentSpan(): SpanMetadata { return step; } +function getCurrentPathCount(): number { + return traceMetadataAls.getStore()?.paths?.size || 0; +} + function decoratePathWithSubtype(metadata: SpanMetadata): string { if (!metadata.path) { return ''; diff --git a/js/core/src/tracing/types.ts b/js/core/src/tracing/types.ts index 632b0e30e..008a2e140 100644 --- a/js/core/src/tracing/types.ts +++ b/js/core/src/tracing/types.ts @@ -35,6 +35,11 @@ export interface TraceStore { list(query?: TraceQuery): Promise; } +export const TraceMetadataSchema = z.object({ + paths: z.set(z.string()).optional(), +}); +export type TraceMetadata = z.infer; + export const SpanMetadataSchema = z.object({ name: z.string(), state: z.enum(['success', 'error']).optional(), diff --git a/js/flow/src/flow.ts b/js/flow/src/flow.ts index 137f08328..e251f99b0 100644 --- a/js/flow/src/flow.ts +++ b/js/flow/src/flow.ts @@ -64,8 +64,6 @@ import { runWithActiveContext, } from './utils.js'; -import { pathVariants } from '@genkit-ai/core/tracing'; - const streamDelimiter = '\n'; const CREATED_FLOWS = 'genkit__CREATED_FLOWS'; @@ -451,7 +449,6 @@ export class Flow< setCustomMetadataAttribute(metadataPrefix('state'), 'done'); telemetry.writeFlowSuccess( ctx.flow.name, - pathVariants, performance.now() - startTimeMs ); return output; @@ -482,7 +479,6 @@ export class Flow< telemetry.recordError(e); telemetry.writeFlowFailure( ctx.flow.name, - pathVariants, performance.now() - startTimeMs, e ); diff --git a/js/flow/src/telemetry.ts b/js/flow/src/telemetry.ts index e4b1a192c..1e0492d3e 100644 --- a/js/flow/src/telemetry.ts +++ b/js/flow/src/telemetry.ts @@ -21,7 +21,7 @@ import { MetricCounter, MetricHistogram, } from '@genkit-ai/core/metrics'; -import { spanMetadataAls } from '@genkit-ai/core/tracing'; +import { spanMetadataAls, traceMetadataAls } from '@genkit-ai/core/tracing'; import { ValueType } from '@opentelemetry/api'; import express from 'express'; @@ -58,11 +58,7 @@ export function recordError(err: any) { }); } -export function writeFlowSuccess( - flowName: string, - variants: Set, - latencyMs: number -) { +export function writeFlowSuccess(flowName: string, latencyMs: number) { const dimensions = { name: flowName, source: 'ts', @@ -71,27 +67,29 @@ export function writeFlowSuccess( flowCounter.add(1, dimensions); flowLatencies.record(latencyMs, dimensions); - const relevantVariants = Array.from(variants).filter((variant) => - variant.includes(flowName) - ); + const paths = traceMetadataAls.getStore()?.paths || new Set(); + if (paths) { + const relevantVariants = Array.from(paths).filter((path) => + path.includes(flowName) + ); - logger.logStructured(`Variants[/${flowName}]`, { - flowName: flowName, - variants: relevantVariants, - }); - - relevantVariants.forEach((variant) => - variantCounter.add(1, { - ...dimensions, - success: 'success', - variant, - }) - ); + logger.logStructured(`Variants[/${flowName}]`, { + flowName: flowName, + variants: relevantVariants, + }); + + relevantVariants.forEach((variant) => + variantCounter.add(1, { + ...dimensions, + success: 'success', + variant, + }) + ); + } } export function writeFlowFailure( flowName: string, - variants: Set, latencyMs: number, err: any ) { @@ -104,25 +102,33 @@ export function writeFlowFailure( flowCounter.add(1, dimensions); flowLatencies.record(latencyMs, dimensions); - const path = spanMetadataAls?.getStore()?.path; - const relevantVariants = Array.from(variants).filter( - (variant) => variant.includes(flowName) && variant !== path - ); + const allPaths = traceMetadataAls.getStore()?.paths || new Set(); + if (allPaths) { + const failPath = spanMetadataAls?.getStore()?.path; + const relevantVariants = Array.from(allPaths).filter( + (path) => path.includes(flowName) && path !== failPath + ); - // All variants that have succeeded need to be tracked as succeeded. - relevantVariants.forEach((variant) => - variantCounter.add(1, { + logger.logStructured(`Variants[/${flowName}]`, { flowName: flowName, - success: 'success', - variant: variant, - }) - ); + variants: relevantVariants, + }); + + // All variants that have succeeded need to be tracked as succeeded. + relevantVariants.forEach((variant) => + variantCounter.add(1, { + flowName: flowName, + success: 'success', + variant: variant, + }) + ); - variantCounter.add(1, { - flowName: flowName, - success: 'failure', - variant: path, - }); + variantCounter.add(1, { + flowName: flowName, + success: 'failure', + variant: failPath, + }); + } } export function logRequest(flowName: string, req: express.Request) {