diff --git a/js/core/src/tracing/instrumentation.ts b/js/core/src/tracing/instrumentation.ts index b1ff3b49b5..9f68565518 100644 --- a/js/core/src/tracing/instrumentation.ts +++ b/js/core/src/tracing/instrumentation.ts @@ -87,14 +87,12 @@ export async function runInNewSpan( async (otSpan) => { if (opts.labels) otSpan.setAttributes(opts.labels); try { - const parentPath = parentStep?.path || ''; - const stepType = - opts.labels && opts.labels['genkit:type'] - ? `,t:${opts.labels['genkit:type']}` - : ''; - opts.metadata.path = parentPath + `/{${opts.metadata.name}${stepType}}`; - - const pathCount = getCurrentPathCount(); + opts.metadata.path = buildPath( + opts.metadata.name, + parentStep?.path || '', + opts.labels + ); + const output = await spanMetadataAls.run(opts.metadata, () => fn(opts.metadata, otSpan, isInRoot) ); @@ -102,28 +100,10 @@ export async function runInNewSpan( opts.metadata.state = 'success'; } - opts.metadata.path = decoratePathWithSubtype(opts.metadata); - if (pathCount == getCurrentPathCount()) { - const now = performance.now(); - const start = traceMetadataAls.getStore()?.timestamp || now; - traceMetadataAls.getStore()?.paths?.add({ - path: opts.metadata.path, - status: 'success', - latency: now - start, - }); - } - + recordPath(opts.metadata); return output; } catch (e) { - opts.metadata.path = decoratePathWithSubtype(opts.metadata); - const now = performance.now(); - const start = traceMetadataAls.getStore()?.timestamp || now; - traceMetadataAls.getStore()?.paths?.add({ - path: opts.metadata.path, - status: 'failure', - error: (e as any).name, - latency: now - start, - }); + recordPath(opts.metadata, e); opts.metadata.state = 'error'; otSpan.setStatus({ code: SpanStatusCode.ERROR, @@ -212,8 +192,35 @@ function getCurrentSpan(): SpanMetadata { return step; } -function getCurrentPathCount(): number { - return traceMetadataAls.getStore()?.paths?.size || 0; +function buildPath( + name: string, + parentPath: string, + labels?: Record +) { + const stepType = + labels && labels['genkit:type'] ? `,t:${labels['genkit:type']}` : ''; + return parentPath + `/{${name}${stepType}}`; +} + +function recordPath(spanMeta: SpanMetadata, err?: any) { + const path = spanMeta.path || ''; + const decoratedPath = decoratePathWithSubtype(spanMeta); + // Only add the path if a child has not already been added. In the event that + // an error is rethrown, we don't want to add each step in the unwind. + const paths = Array.from( + traceMetadataAls.getStore()?.paths || new Set() + ); + if (!paths.some((p) => p.path.startsWith(path))) { + const now = performance.now(); + const start = traceMetadataAls.getStore()?.timestamp || now; + traceMetadataAls.getStore()?.paths?.add({ + path: decoratedPath, + status: err ? 'failure' : 'success', + error: err?.name, + latency: now - start, + }); + } + spanMeta.path = decoratedPath; } function decoratePathWithSubtype(metadata: SpanMetadata): string { diff --git a/js/plugins/google-cloud/tests/metrics_test.ts b/js/plugins/google-cloud/tests/metrics_test.ts index f278e269ab..f3253c7ee8 100644 --- a/js/plugins/google-cloud/tests/metrics_test.ts +++ b/js/plugins/google-cloud/tests/metrics_test.ts @@ -406,7 +406,7 @@ describe('GoogleCloudMetrics', () => { }); }); - it('writes flow path failure metrics', async () => { + it('writes flow path failure metrics in root', async () => { const flow = createFlow('testFlow', async () => { const subPath = await run('sub-action', async () => { return 'done'; @@ -440,6 +440,47 @@ describe('GoogleCloudMetrics', () => { ]); }); + it('writes flow path failure metrics in subaction', async () => { + const flow = createFlow('testFlow', async () => { + const subPath1 = await run('sub-action-1', async () => { + const subPath2 = await run('sub-action-2', async () => { + return Promise.reject(new Error('failed')); + }); + return 'done'; + }); + return 'done'; + }); + + assert.rejects(async () => { + await runFlow(flow); + }); + + const reqPoints = await getCounterDataPoints('genkit/flow/path/requests'); + const reqStatuses = reqPoints.map((p) => [ + p.attributes.path, + p.attributes.status, + ]); + assert.deepEqual(reqStatuses, [ + [ + '/{testFlow,t:flow}/{sub-action-1,t:flowStep}/{sub-action-2,t:flowStep}', + 'failure', + ], + ]); + const latencyPoints = await getHistogramDataPoints( + 'genkit/flow/path/latency' + ); + const latencyStatuses = latencyPoints.map((p) => [ + p.attributes.path, + p.attributes.status, + ]); + assert.deepEqual(latencyStatuses, [ + [ + '/{testFlow,t:flow}/{sub-action-1,t:flowStep}/{sub-action-2,t:flowStep}', + 'failure', + ], + ]); + }); + it('writes flow path failure in sub-action metrics', async () => { const flow = createFlow('testFlow', async () => { const subPath1 = await run('sub-action-1', async () => {