Skip to content

Commit cdb1bb5

Browse files
bryanatkinsonrandall77
authored andcommitted
Update path variant logic to store paths in a new trace-scoped async local storage (#297)
* This allows us to keep the path variants written to metrics to be scoped to a single trace.
1 parent 009b6b8 commit cdb1bb5

File tree

4 files changed

+76
-59
lines changed

4 files changed

+76
-59
lines changed

js/core/src/tracing/instrumentation.ts

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@ import {
2121
trace,
2222
} from '@opentelemetry/api';
2323
import { AsyncLocalStorage } from 'node:async_hooks';
24-
import { SpanMetadata } from './types.js';
24+
import { SpanMetadata, TraceMetadata } from './types.js';
2525

2626
export const spanMetadataAls = new AsyncLocalStorage<SpanMetadata>();
27-
export const pathVariants = new Set<string>();
27+
export const traceMetadataAls = new AsyncLocalStorage<TraceMetadata>();
2828

2929
export const ATTR_PREFIX = 'genkit';
3030
export const SPAN_TYPE_ATTR = ATTR_PREFIX + ':type';
@@ -42,18 +42,23 @@ export async function newTrace<T>(
4242
},
4343
fn: (metadata: SpanMetadata, rootSpan: ApiSpan) => Promise<T>
4444
) {
45-
return await runInNewSpan(
46-
{
47-
metadata: {
48-
name: opts.name,
49-
isRoot: true,
45+
const traceMetadata = traceMetadataAls.getStore() || {
46+
paths: new Set<string>(),
47+
};
48+
return await traceMetadataAls.run(traceMetadata, () =>
49+
runInNewSpan(
50+
{
51+
metadata: {
52+
name: opts.name,
53+
isRoot: true,
54+
},
55+
labels: opts.labels,
56+
links: opts.links,
5057
},
51-
labels: opts.labels,
52-
links: opts.links,
53-
},
54-
async (metadata, otSpan) => {
55-
return await fn(metadata, otSpan);
56-
}
58+
async (metadata, otSpan) => {
59+
return await fn(metadata, otSpan);
60+
}
61+
)
5762
);
5863
}
5964

@@ -84,7 +89,7 @@ export async function runInNewSpan<T>(
8489
: '';
8590
opts.metadata.path = parentPath + `/{${opts.metadata.name}${stepType}}`;
8691

87-
const pathVariantCount = pathVariants.size;
92+
const pathCount = getCurrentPathCount();
8893
const output = await spanMetadataAls.run(opts.metadata, () =>
8994
fn(opts.metadata, otSpan, isInRoot)
9095
);
@@ -93,13 +98,14 @@ export async function runInNewSpan<T>(
9398
}
9499

95100
opts.metadata.path = decoratePathWithSubtype(opts.metadata);
96-
97-
if (pathVariantCount == pathVariants.size) {
98-
pathVariants.add(opts.metadata.path);
101+
if (pathCount == getCurrentPathCount()) {
102+
traceMetadataAls.getStore()?.paths?.add(opts.metadata.path);
99103
}
100104

101105
return output;
102106
} catch (e) {
107+
opts.metadata.path = decoratePathWithSubtype(opts.metadata);
108+
traceMetadataAls.getStore()?.paths?.add(opts.metadata.path);
103109
opts.metadata.state = 'error';
104110
otSpan.setStatus({
105111
code: SpanStatusCode.ERROR,
@@ -182,6 +188,10 @@ function getCurrentSpan(): SpanMetadata {
182188
return step;
183189
}
184190

191+
function getCurrentPathCount(): number {
192+
return traceMetadataAls.getStore()?.paths?.size || 0;
193+
}
194+
185195
function decoratePathWithSubtype(metadata: SpanMetadata): string {
186196
if (!metadata.path) {
187197
return '';

js/core/src/tracing/types.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@ export interface TraceStore {
3535
list(query?: TraceQuery): Promise<TraceQueryResponse>;
3636
}
3737

38+
export const TraceMetadataSchema = z.object({
39+
paths: z.set(z.string()).optional(),
40+
});
41+
export type TraceMetadata = z.infer<typeof TraceMetadataSchema>;
42+
3843
export const SpanMetadataSchema = z.object({
3944
name: z.string(),
4045
state: z.enum(['success', 'error']).optional(),

js/flow/src/flow.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,6 @@ import {
6464
runWithActiveContext,
6565
} from './utils.js';
6666

67-
import { pathVariants } from '@genkit-ai/core/tracing';
68-
6967
const streamDelimiter = '\n';
7068

7169
const CREATED_FLOWS = 'genkit__CREATED_FLOWS';
@@ -451,7 +449,6 @@ export class Flow<
451449
setCustomMetadataAttribute(metadataPrefix('state'), 'done');
452450
telemetry.writeFlowSuccess(
453451
ctx.flow.name,
454-
pathVariants,
455452
performance.now() - startTimeMs
456453
);
457454
return output;
@@ -482,7 +479,6 @@ export class Flow<
482479
telemetry.recordError(e);
483480
telemetry.writeFlowFailure(
484481
ctx.flow.name,
485-
pathVariants,
486482
performance.now() - startTimeMs,
487483
e
488484
);

js/flow/src/telemetry.ts

Lines changed: 44 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import {
2121
MetricCounter,
2222
MetricHistogram,
2323
} from '@genkit-ai/core/metrics';
24-
import { spanMetadataAls } from '@genkit-ai/core/tracing';
24+
import { spanMetadataAls, traceMetadataAls } from '@genkit-ai/core/tracing';
2525
import { ValueType } from '@opentelemetry/api';
2626
import express from 'express';
2727

@@ -58,11 +58,7 @@ export function recordError(err: any) {
5858
});
5959
}
6060

61-
export function writeFlowSuccess(
62-
flowName: string,
63-
variants: Set<string>,
64-
latencyMs: number
65-
) {
61+
export function writeFlowSuccess(flowName: string, latencyMs: number) {
6662
const dimensions = {
6763
name: flowName,
6864
source: 'ts',
@@ -71,27 +67,29 @@ export function writeFlowSuccess(
7167
flowCounter.add(1, dimensions);
7268
flowLatencies.record(latencyMs, dimensions);
7369

74-
const relevantVariants = Array.from(variants).filter((variant) =>
75-
variant.includes(flowName)
76-
);
70+
const paths = traceMetadataAls.getStore()?.paths || new Set<string>();
71+
if (paths) {
72+
const relevantVariants = Array.from(paths).filter((path) =>
73+
path.includes(flowName)
74+
);
7775

78-
logger.logStructured(`Variants[/${flowName}]`, {
79-
flowName: flowName,
80-
variants: relevantVariants,
81-
});
82-
83-
relevantVariants.forEach((variant) =>
84-
variantCounter.add(1, {
85-
...dimensions,
86-
success: 'success',
87-
variant,
88-
})
89-
);
76+
logger.logStructured(`Variants[/${flowName}]`, {
77+
flowName: flowName,
78+
variants: relevantVariants,
79+
});
80+
81+
relevantVariants.forEach((variant) =>
82+
variantCounter.add(1, {
83+
...dimensions,
84+
success: 'success',
85+
variant,
86+
})
87+
);
88+
}
9089
}
9190

9291
export function writeFlowFailure(
9392
flowName: string,
94-
variants: Set<string>,
9593
latencyMs: number,
9694
err: any
9795
) {
@@ -104,25 +102,33 @@ export function writeFlowFailure(
104102
flowCounter.add(1, dimensions);
105103
flowLatencies.record(latencyMs, dimensions);
106104

107-
const path = spanMetadataAls?.getStore()?.path;
108-
const relevantVariants = Array.from(variants).filter(
109-
(variant) => variant.includes(flowName) && variant !== path
110-
);
105+
const allPaths = traceMetadataAls.getStore()?.paths || new Set<string>();
106+
if (allPaths) {
107+
const failPath = spanMetadataAls?.getStore()?.path;
108+
const relevantVariants = Array.from(allPaths).filter(
109+
(path) => path.includes(flowName) && path !== failPath
110+
);
111111

112-
// All variants that have succeeded need to be tracked as succeeded.
113-
relevantVariants.forEach((variant) =>
114-
variantCounter.add(1, {
112+
logger.logStructured(`Variants[/${flowName}]`, {
115113
flowName: flowName,
116-
success: 'success',
117-
variant: variant,
118-
})
119-
);
114+
variants: relevantVariants,
115+
});
116+
117+
// All variants that have succeeded need to be tracked as succeeded.
118+
relevantVariants.forEach((variant) =>
119+
variantCounter.add(1, {
120+
flowName: flowName,
121+
success: 'success',
122+
variant: variant,
123+
})
124+
);
120125

121-
variantCounter.add(1, {
122-
flowName: flowName,
123-
success: 'failure',
124-
variant: path,
125-
});
126+
variantCounter.add(1, {
127+
flowName: flowName,
128+
success: 'failure',
129+
variant: failPath,
130+
});
131+
}
126132
}
127133

128134
export function logRequest(flowName: string, req: express.Request) {

0 commit comments

Comments
 (0)