Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 27 additions & 17 deletions js/core/src/tracing/instrumentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import {
trace,
} from '@opentelemetry/api';
import { AsyncLocalStorage } from 'node:async_hooks';
import { SpanMetadata } from './types.js';
import { SpanMetadata, TraceMetadata } from './types.js';

export const spanMetadataAls = new AsyncLocalStorage<SpanMetadata>();
export const pathVariants = new Set<string>();
export const traceMetadataAls = new AsyncLocalStorage<TraceMetadata>();

export const ATTR_PREFIX = 'genkit';
export const SPAN_TYPE_ATTR = ATTR_PREFIX + ':type';
Expand All @@ -42,18 +42,23 @@ export async function newTrace<T>(
},
fn: (metadata: SpanMetadata, rootSpan: ApiSpan) => Promise<T>
) {
return await runInNewSpan(
{
metadata: {
name: opts.name,
isRoot: true,
const traceMetadata = traceMetadataAls.getStore() || {
paths: new Set<string>(),
};
return await traceMetadataAls.run(traceMetadata, () =>
runInNewSpan(
{
metadata: {
name: opts.name,
isRoot: true,
},
labels: opts.labels,
links: opts.links,
},
labels: opts.labels,
links: opts.links,
},
async (metadata, otSpan) => {
return await fn(metadata, otSpan);
}
async (metadata, otSpan) => {
return await fn(metadata, otSpan);
}
)
);
}

Expand Down Expand Up @@ -84,7 +89,7 @@ export async function runInNewSpan<T>(
: '';
opts.metadata.path = parentPath + `/{${opts.metadata.name}${stepType}}`;

const pathVariantCount = pathVariants.size;
const pathCount = getCurrentPathCount();
const output = await spanMetadataAls.run(opts.metadata, () =>
fn(opts.metadata, otSpan, isInRoot)
);
Expand All @@ -93,13 +98,14 @@ export async function runInNewSpan<T>(
}

opts.metadata.path = decoratePathWithSubtype(opts.metadata);

if (pathVariantCount == pathVariants.size) {
pathVariants.add(opts.metadata.path);
if (pathCount == getCurrentPathCount()) {
traceMetadataAls.getStore()?.paths?.add(opts.metadata.path);
}

return output;
} catch (e) {
opts.metadata.path = decoratePathWithSubtype(opts.metadata);
traceMetadataAls.getStore()?.paths?.add(opts.metadata.path);
opts.metadata.state = 'error';
otSpan.setStatus({
code: SpanStatusCode.ERROR,
Expand Down Expand Up @@ -182,6 +188,10 @@ function getCurrentSpan(): SpanMetadata {
return step;
}

function getCurrentPathCount(): number {
return traceMetadataAls.getStore()?.paths?.size || 0;
}

function decoratePathWithSubtype(metadata: SpanMetadata): string {
if (!metadata.path) {
return '';
Expand Down
5 changes: 5 additions & 0 deletions js/core/src/tracing/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ export interface TraceStore {
list(query?: TraceQuery): Promise<TraceQueryResponse>;
}

export const TraceMetadataSchema = z.object({
paths: z.set(z.string()).optional(),
});
export type TraceMetadata = z.infer<typeof TraceMetadataSchema>;

export const SpanMetadataSchema = z.object({
name: z.string(),
state: z.enum(['success', 'error']).optional(),
Expand Down
4 changes: 0 additions & 4 deletions js/flow/src/flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ import {
runWithActiveContext,
} from './utils.js';

import { pathVariants } from '@genkit-ai/core/tracing';

const streamDelimiter = '\n';

const CREATED_FLOWS = 'genkit__CREATED_FLOWS';
Expand Down Expand Up @@ -451,7 +449,6 @@ export class Flow<
setCustomMetadataAttribute(metadataPrefix('state'), 'done');
telemetry.writeFlowSuccess(
ctx.flow.name,
pathVariants,
performance.now() - startTimeMs
);
return output;
Expand Down Expand Up @@ -482,7 +479,6 @@ export class Flow<
telemetry.recordError(e);
telemetry.writeFlowFailure(
ctx.flow.name,
pathVariants,
performance.now() - startTimeMs,
e
);
Expand Down
82 changes: 44 additions & 38 deletions js/flow/src/telemetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import {
MetricCounter,
MetricHistogram,
} from '@genkit-ai/core/metrics';
import { spanMetadataAls } from '@genkit-ai/core/tracing';
import { spanMetadataAls, traceMetadataAls } from '@genkit-ai/core/tracing';
import { ValueType } from '@opentelemetry/api';
import express from 'express';

Expand Down Expand Up @@ -58,11 +58,7 @@ export function recordError(err: any) {
});
}

export function writeFlowSuccess(
flowName: string,
variants: Set<string>,
latencyMs: number
) {
export function writeFlowSuccess(flowName: string, latencyMs: number) {
const dimensions = {
name: flowName,
source: 'ts',
Expand All @@ -71,27 +67,29 @@ export function writeFlowSuccess(
flowCounter.add(1, dimensions);
flowLatencies.record(latencyMs, dimensions);

const relevantVariants = Array.from(variants).filter((variant) =>
variant.includes(flowName)
);
const paths = traceMetadataAls.getStore()?.paths || new Set<string>();
if (paths) {
const relevantVariants = Array.from(paths).filter((path) =>
path.includes(flowName)
);

logger.logStructured(`Variants[/${flowName}]`, {
flowName: flowName,
variants: relevantVariants,
});

relevantVariants.forEach((variant) =>
variantCounter.add(1, {
...dimensions,
success: 'success',
variant,
})
);
logger.logStructured(`Variants[/${flowName}]`, {
flowName: flowName,
variants: relevantVariants,
});

relevantVariants.forEach((variant) =>
variantCounter.add(1, {
...dimensions,
success: 'success',
variant,
})
);
}
}

export function writeFlowFailure(
flowName: string,
variants: Set<string>,
latencyMs: number,
err: any
) {
Expand All @@ -104,25 +102,33 @@ export function writeFlowFailure(
flowCounter.add(1, dimensions);
flowLatencies.record(latencyMs, dimensions);

const path = spanMetadataAls?.getStore()?.path;
const relevantVariants = Array.from(variants).filter(
(variant) => variant.includes(flowName) && variant !== path
);
const allPaths = traceMetadataAls.getStore()?.paths || new Set<string>();
if (allPaths) {
const failPath = spanMetadataAls?.getStore()?.path;
const relevantVariants = Array.from(allPaths).filter(
(path) => path.includes(flowName) && path !== failPath
);

// All variants that have succeeded need to be tracked as succeeded.
relevantVariants.forEach((variant) =>
variantCounter.add(1, {
logger.logStructured(`Variants[/${flowName}]`, {
flowName: flowName,
success: 'success',
variant: variant,
})
);
variants: relevantVariants,
});

// All variants that have succeeded need to be tracked as succeeded.
relevantVariants.forEach((variant) =>
variantCounter.add(1, {
flowName: flowName,
success: 'success',
variant: variant,
})
);

variantCounter.add(1, {
flowName: flowName,
success: 'failure',
variant: path,
});
variantCounter.add(1, {
flowName: flowName,
success: 'failure',
variant: failPath,
});
}
}

export function logRequest(flowName: string, req: express.Request) {
Expand Down