Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update telemetry to properly attribute failures to correct path. #563

Merged
merged 4 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions js/core/src/tracing/instrumentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ export async function runInNewSpan<T>(
const start = traceMetadataAls.getStore()?.timestamp || now;
traceMetadataAls.getStore()?.paths?.add({
path: opts.metadata.path,
status: 'success',
latency: now - start,
});
}
Expand All @@ -119,6 +120,8 @@ export async function runInNewSpan<T>(
const start = traceMetadataAls.getStore()?.timestamp || now;
traceMetadataAls.getStore()?.paths?.add({
path: opts.metadata.path,
status: 'failure',
error: (e as any).name,
latency: now - start,
});
opts.metadata.state = 'error';
Expand Down
2 changes: 2 additions & 0 deletions js/core/src/tracing/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ export interface TraceStore {

export const PathMetadataSchema = z.object({
path: z.string(),
status: z.string(),
error: z.string().optional(),
latency: z.number(),
});
export type PathMetadata = z.infer<typeof PathMetadataSchema>;
Expand Down
134 changes: 51 additions & 83 deletions js/flow/src/telemetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,15 @@ const flowLatencies = new MetricHistogram(_N('latency'), {
});

export function recordError(err: any) {
const qualifiedPath = spanMetadataAls?.getStore()?.path || '';
const path = toDisplayPath(qualifiedPath);
logger.logStructuredError(`Error[${path}, ${err.name}]`, {
path,
qualifiedPath,
const paths = traceMetadataAls?.getStore()?.paths || new Set<PathMetadata>();
const failedPath =
Array.from(paths).find((p) => p.status === 'failure')?.path ||
spanMetadataAls?.getStore()?.path ||
'';
const displayPath = toDisplayPath(failedPath);
logger.logStructuredError(`Error[${displayPath}, ${err.name}]`, {
path: displayPath,
qualifiedPath: failedPath,
name: err.name,
message: err.message,
stack: err.stack,
Expand All @@ -81,35 +85,7 @@ export function writeFlowSuccess(flowName: string, latencyMs: number) {
flowCounter.add(1, dimensions);
flowLatencies.record(latencyMs, dimensions);

const paths = traceMetadataAls.getStore()?.paths || new Set<PathMetadata>();
if (paths) {
const pathDimensions = {
flowName: flowName,
status: 'success',
source: 'ts',
sourceVersion: GENKIT_VERSION,
};
const relevantPaths = Array.from(paths).filter((meta) =>
meta.path.includes(flowName)
);

logger.logStructured(`Paths[${flowName}]`, {
flowName: flowName,
paths: relevantPaths.map((p) => p.path),
});

relevantPaths.forEach((p) => {
pathCounter.add(1, {
...pathDimensions,
path: p.path,
});

pathLatencies.record(p.latency, {
...pathDimensions,
path: p.path,
});
});
}
writePathMetrics(flowName, latencyMs);
}

export function writeFlowFailure(
Expand All @@ -127,55 +103,7 @@ export function writeFlowFailure(
flowCounter.add(1, dimensions);
flowLatencies.record(latencyMs, dimensions);

const allQualifiedPaths =
traceMetadataAls.getStore()?.paths || new Set<PathMetadata>();
if (allQualifiedPaths) {
const qualifiedFailPath = spanMetadataAls?.getStore()?.path || '';
const failPath = toDisplayPath(qualifiedFailPath);
const relevantPaths = Array.from(allQualifiedPaths).filter(
(meta) => meta.path.includes(flowName) && meta.path !== qualifiedFailPath
);

logger.logStructured(`Paths[${flowName}]`, {
flowName: flowName,
paths: relevantPaths.map((p) => toDisplayPath(p.path)),
});

const pathDimensions = {
flowName: flowName,
source: 'ts',
sourceVersion: GENKIT_VERSION,
};
// All paths that have succeeded need to be tracked as succeeded.
relevantPaths.forEach((p) => {
const path = toDisplayPath(p.path);
pathCounter.add(1, {
...pathDimensions,
status: 'success',
path: p.path,
});

pathLatencies.record(p.latency, {
...pathDimensions,
status: 'success',
path: p.path,
});
});

pathCounter.add(1, {
...pathDimensions,
status: 'failure',
error: err.name,
path: qualifiedFailPath,
});

pathLatencies.record(latencyMs, {
...pathDimensions,
status: 'failure',
error: err.name,
path: qualifiedFailPath,
});
}
writePathMetrics(flowName, latencyMs, err);
}

export function logRequest(flowName: string, req: express.Request) {
Expand Down Expand Up @@ -211,3 +139,43 @@ export function logResponse(flowName: string, respCode: number, respBody: any) {
sourceVersion: GENKIT_VERSION,
});
}

/** Writes all path-level metrics stored in the current flow execution. */
function writePathMetrics(flowName: string, latencyMs: number, err?: any) {
const paths = traceMetadataAls.getStore()?.paths || new Set<PathMetadata>();
const flowPaths = Array.from(paths).filter((meta) =>
meta.path.includes(flowName)
);
if (flowPaths) {
logger.logStructured(`Paths[${flowName}]`, {
flowName: flowName,
paths: flowPaths.map((p) => toDisplayPath(p.path)),
});

flowPaths.forEach((p) => writePathMetric(flowName, p));
// If we're writing a failure, but none of the stored paths have failed,
// this means the root flow threw the error.
if (err && !flowPaths.some((p) => p.status === 'failure')) {
writePathMetric(flowName, {
status: 'failure',
path: spanMetadataAls?.getStore()?.path || '',
error: err,
latency: latencyMs,
});
}
}
}

/** Writes metrics for a single PathMetadata */
function writePathMetric(flowName: string, meta: PathMetadata) {
const pathDimensions = {
flowName: flowName,
status: meta.status,
error: meta.error,
path: meta.path,
source: 'ts',
sourceVersion: GENKIT_VERSION,
};
pathCounter.add(1, pathDimensions);
pathLatencies.record(meta.latency, pathDimensions);
}
43 changes: 40 additions & 3 deletions js/plugins/google-cloud/tests/metrics_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@
import { generate } from '@genkit-ai/ai';
import { defineModel } from '@genkit-ai/ai/model';
import {
configureGenkit,
defineAction,
FlowState,
FlowStateQuery,
FlowStateQueryResponse,
FlowStateStore,
configureGenkit,
defineAction,
} from '@genkit-ai/core';
import { registerFlowStateStore } from '@genkit-ai/core/registry';
import { defineFlow, run, runAction, runFlow } from '@genkit-ai/flow';
import {
GcpOpenTelemetry,
__getMetricExporterForTesting,
GcpOpenTelemetry,
googleCloud,
} from '@genkit-ai/google-cloud';
import {
Expand Down Expand Up @@ -440,6 +440,43 @@ describe('GoogleCloudMetrics', () => {
]);
});

it('writes flow path failure in sub-action metrics', async () => {
const flow = createFlow('testFlow', async () => {
const subPath1 = await run('sub-action-1', async () => {
return 'done';
});
const subPath2 = await run('sub-action-2', async () => {
return Promise.reject(new Error('failed'));
});
return 'done';
});

assert.rejects(async () => {
await runFlow(flow);
});

const reqPoints = await getCounterDataPoints('genkit/flow/path/requests');
const reqStatuses = reqPoints.map((p) => [
p.attributes.path,
p.attributes.status,
]);
assert.deepEqual(reqStatuses, [
['/{testFlow,t:flow}/{sub-action-1,t:flowStep}', 'success'],
['/{testFlow,t:flow}/{sub-action-2,t:flowStep}', 'failure'],
]);
const latencyPoints = await getHistogramDataPoints(
'genkit/flow/path/latency'
);
const latencyStatuses = latencyPoints.map((p) => [
p.attributes.path,
p.attributes.status,
]);
assert.deepEqual(latencyStatuses, [
['/{testFlow,t:flow}/{sub-action-1,t:flowStep}', 'success'],
['/{testFlow,t:flow}/{sub-action-2,t:flowStep}', 'failure'],
]);
});

describe('Configuration', () => {
it('should export only traces', async () => {
const telemetry = new GcpOpenTelemetry({
Expand Down
Loading