Skip to content

Commit

Permalink
Update path variant logic to store paths in a new trace-scoped async …
Browse files Browse the repository at this point in the history
…local storage (#297)

* This allows us to keep the path variants written to metrics to be scoped to a single trace.
  • Loading branch information
bryanatkinson authored Jun 10, 2024
1 parent b9cfd14 commit 937f2b9
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 59 deletions.
44 changes: 27 additions & 17 deletions js/core/src/tracing/instrumentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import {
trace,
} from '@opentelemetry/api';
import { AsyncLocalStorage } from 'node:async_hooks';
import { SpanMetadata } from './types.js';
import { SpanMetadata, TraceMetadata } from './types.js';

export const spanMetadataAls = new AsyncLocalStorage<SpanMetadata>();
export const pathVariants = new Set<string>();
export const traceMetadataAls = new AsyncLocalStorage<TraceMetadata>();

export const ATTR_PREFIX = 'genkit';
export const SPAN_TYPE_ATTR = ATTR_PREFIX + ':type';
Expand All @@ -42,18 +42,23 @@ export async function newTrace<T>(
},
fn: (metadata: SpanMetadata, rootSpan: ApiSpan) => Promise<T>
) {
return await runInNewSpan(
{
metadata: {
name: opts.name,
isRoot: true,
const traceMetadata = traceMetadataAls.getStore() || {
paths: new Set<string>(),
};
return await traceMetadataAls.run(traceMetadata, () =>
runInNewSpan(
{
metadata: {
name: opts.name,
isRoot: true,
},
labels: opts.labels,
links: opts.links,
},
labels: opts.labels,
links: opts.links,
},
async (metadata, otSpan) => {
return await fn(metadata, otSpan);
}
async (metadata, otSpan) => {
return await fn(metadata, otSpan);
}
)
);
}

Expand Down Expand Up @@ -84,7 +89,7 @@ export async function runInNewSpan<T>(
: '';
opts.metadata.path = parentPath + `/{${opts.metadata.name}${stepType}}`;

const pathVariantCount = pathVariants.size;
const pathCount = getCurrentPathCount();
const output = await spanMetadataAls.run(opts.metadata, () =>
fn(opts.metadata, otSpan, isInRoot)
);
Expand All @@ -93,13 +98,14 @@ export async function runInNewSpan<T>(
}

opts.metadata.path = decoratePathWithSubtype(opts.metadata);

if (pathVariantCount == pathVariants.size) {
pathVariants.add(opts.metadata.path);
if (pathCount == getCurrentPathCount()) {
traceMetadataAls.getStore()?.paths?.add(opts.metadata.path);
}

return output;
} catch (e) {
opts.metadata.path = decoratePathWithSubtype(opts.metadata);
traceMetadataAls.getStore()?.paths?.add(opts.metadata.path);
opts.metadata.state = 'error';
otSpan.setStatus({
code: SpanStatusCode.ERROR,
Expand Down Expand Up @@ -182,6 +188,10 @@ function getCurrentSpan(): SpanMetadata {
return step;
}

function getCurrentPathCount(): number {
return traceMetadataAls.getStore()?.paths?.size || 0;
}

function decoratePathWithSubtype(metadata: SpanMetadata): string {
if (!metadata.path) {
return '';
Expand Down
5 changes: 5 additions & 0 deletions js/core/src/tracing/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ export interface TraceStore {
list(query?: TraceQuery): Promise<TraceQueryResponse>;
}

export const TraceMetadataSchema = z.object({
paths: z.set(z.string()).optional(),
});
export type TraceMetadata = z.infer<typeof TraceMetadataSchema>;

export const SpanMetadataSchema = z.object({
name: z.string(),
state: z.enum(['success', 'error']).optional(),
Expand Down
4 changes: 0 additions & 4 deletions js/flow/src/flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ import {
runWithActiveContext,
} from './utils.js';

import { pathVariants } from '@genkit-ai/core/tracing';

const streamDelimiter = '\n';

const CREATED_FLOWS = 'genkit__CREATED_FLOWS';
Expand Down Expand Up @@ -451,7 +449,6 @@ export class Flow<
setCustomMetadataAttribute(metadataPrefix('state'), 'done');
telemetry.writeFlowSuccess(
ctx.flow.name,
pathVariants,
performance.now() - startTimeMs
);
return output;
Expand Down Expand Up @@ -482,7 +479,6 @@ export class Flow<
telemetry.recordError(e);
telemetry.writeFlowFailure(
ctx.flow.name,
pathVariants,
performance.now() - startTimeMs,
e
);
Expand Down
82 changes: 44 additions & 38 deletions js/flow/src/telemetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import {
MetricCounter,
MetricHistogram,
} from '@genkit-ai/core/metrics';
import { spanMetadataAls } from '@genkit-ai/core/tracing';
import { spanMetadataAls, traceMetadataAls } from '@genkit-ai/core/tracing';
import { ValueType } from '@opentelemetry/api';
import express from 'express';

Expand Down Expand Up @@ -58,11 +58,7 @@ export function recordError(err: any) {
});
}

export function writeFlowSuccess(
flowName: string,
variants: Set<string>,
latencyMs: number
) {
export function writeFlowSuccess(flowName: string, latencyMs: number) {
const dimensions = {
name: flowName,
source: 'ts',
Expand All @@ -71,27 +67,29 @@ export function writeFlowSuccess(
flowCounter.add(1, dimensions);
flowLatencies.record(latencyMs, dimensions);

const relevantVariants = Array.from(variants).filter((variant) =>
variant.includes(flowName)
);
const paths = traceMetadataAls.getStore()?.paths || new Set<string>();
if (paths) {
const relevantVariants = Array.from(paths).filter((path) =>
path.includes(flowName)
);

logger.logStructured(`Variants[/${flowName}]`, {
flowName: flowName,
variants: relevantVariants,
});

relevantVariants.forEach((variant) =>
variantCounter.add(1, {
...dimensions,
success: 'success',
variant,
})
);
logger.logStructured(`Variants[/${flowName}]`, {
flowName: flowName,
variants: relevantVariants,
});

relevantVariants.forEach((variant) =>
variantCounter.add(1, {
...dimensions,
success: 'success',
variant,
})
);
}
}

export function writeFlowFailure(
flowName: string,
variants: Set<string>,
latencyMs: number,
err: any
) {
Expand All @@ -104,25 +102,33 @@ export function writeFlowFailure(
flowCounter.add(1, dimensions);
flowLatencies.record(latencyMs, dimensions);

const path = spanMetadataAls?.getStore()?.path;
const relevantVariants = Array.from(variants).filter(
(variant) => variant.includes(flowName) && variant !== path
);
const allPaths = traceMetadataAls.getStore()?.paths || new Set<string>();
if (allPaths) {
const failPath = spanMetadataAls?.getStore()?.path;
const relevantVariants = Array.from(allPaths).filter(
(path) => path.includes(flowName) && path !== failPath
);

// All variants that have succeeded need to be tracked as succeeded.
relevantVariants.forEach((variant) =>
variantCounter.add(1, {
logger.logStructured(`Variants[/${flowName}]`, {
flowName: flowName,
success: 'success',
variant: variant,
})
);
variants: relevantVariants,
});

// All variants that have succeeded need to be tracked as succeeded.
relevantVariants.forEach((variant) =>
variantCounter.add(1, {
flowName: flowName,
success: 'success',
variant: variant,
})
);

variantCounter.add(1, {
flowName: flowName,
success: 'failure',
variant: path,
});
variantCounter.add(1, {
flowName: flowName,
success: 'failure',
variant: failPath,
});
}
}

export function logRequest(flowName: string, req: express.Request) {
Expand Down

0 comments on commit 937f2b9

Please sign in to comment.