Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 37 additions & 30 deletions js/core/src/tracing/instrumentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,43 +87,23 @@ export async function runInNewSpan<T>(
async (otSpan) => {
if (opts.labels) otSpan.setAttributes(opts.labels);
try {
const parentPath = parentStep?.path || '';
const stepType =
opts.labels && opts.labels['genkit:type']
? `,t:${opts.labels['genkit:type']}`
: '';
opts.metadata.path = parentPath + `/{${opts.metadata.name}${stepType}}`;

const pathCount = getCurrentPathCount();
opts.metadata.path = buildPath(
opts.metadata.name,
parentStep?.path || '',
opts.labels
);

const output = await spanMetadataAls.run(opts.metadata, () =>
fn(opts.metadata, otSpan, isInRoot)
);
if (opts.metadata.state !== 'error') {
opts.metadata.state = 'success';
}

opts.metadata.path = decoratePathWithSubtype(opts.metadata);
if (pathCount == getCurrentPathCount()) {
const now = performance.now();
const start = traceMetadataAls.getStore()?.timestamp || now;
traceMetadataAls.getStore()?.paths?.add({
path: opts.metadata.path,
status: 'success',
latency: now - start,
});
}

recordPath(opts.metadata);
return output;
} catch (e) {
opts.metadata.path = decoratePathWithSubtype(opts.metadata);
const now = performance.now();
const start = traceMetadataAls.getStore()?.timestamp || now;
traceMetadataAls.getStore()?.paths?.add({
path: opts.metadata.path,
status: 'failure',
error: (e as any).name,
latency: now - start,
});
recordPath(opts.metadata, e);
opts.metadata.state = 'error';
otSpan.setStatus({
code: SpanStatusCode.ERROR,
Expand Down Expand Up @@ -212,8 +192,35 @@ function getCurrentSpan(): SpanMetadata {
return step;
}

function getCurrentPathCount(): number {
return traceMetadataAls.getStore()?.paths?.size || 0;
function buildPath(
name: string,
parentPath: string,
labels?: Record<string, string>
) {
const stepType =
labels && labels['genkit:type'] ? `,t:${labels['genkit:type']}` : '';
return parentPath + `/{${name}${stepType}}`;
}

function recordPath(spanMeta: SpanMetadata, err?: any) {
const path = spanMeta.path || '';
const decoratedPath = decoratePathWithSubtype(spanMeta);
// Only add the path if a child has not already been added. In the event that
// an error is rethrown, we don't want to add each step in the unwind.
const paths = Array.from(
traceMetadataAls.getStore()?.paths || new Set<PathMetadata>()
);
if (!paths.some((p) => p.path.startsWith(path))) {
const now = performance.now();
const start = traceMetadataAls.getStore()?.timestamp || now;
traceMetadataAls.getStore()?.paths?.add({
path: decoratedPath,
status: err ? 'failure' : 'success',
error: err?.name,
latency: now - start,
});
}
spanMeta.path = decoratedPath;
}

function decoratePathWithSubtype(metadata: SpanMetadata): string {
Expand Down
43 changes: 42 additions & 1 deletion js/plugins/google-cloud/tests/metrics_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ describe('GoogleCloudMetrics', () => {
});
});

it('writes flow path failure metrics', async () => {
it('writes flow path failure metrics in root', async () => {
const flow = createFlow('testFlow', async () => {
const subPath = await run('sub-action', async () => {
return 'done';
Expand Down Expand Up @@ -440,6 +440,47 @@ describe('GoogleCloudMetrics', () => {
]);
});

it('writes flow path failure metrics in subaction', async () => {
const flow = createFlow('testFlow', async () => {
const subPath1 = await run('sub-action-1', async () => {
const subPath2 = await run('sub-action-2', async () => {
return Promise.reject(new Error('failed'));
});
return 'done';
});
return 'done';
});

assert.rejects(async () => {
await runFlow(flow);
});

const reqPoints = await getCounterDataPoints('genkit/flow/path/requests');
const reqStatuses = reqPoints.map((p) => [
p.attributes.path,
p.attributes.status,
]);
assert.deepEqual(reqStatuses, [
[
'/{testFlow,t:flow}/{sub-action-1,t:flowStep}/{sub-action-2,t:flowStep}',
'failure',
],
]);
const latencyPoints = await getHistogramDataPoints(
'genkit/flow/path/latency'
);
const latencyStatuses = latencyPoints.map((p) => [
p.attributes.path,
p.attributes.status,
]);
assert.deepEqual(latencyStatuses, [
[
'/{testFlow,t:flow}/{sub-action-1,t:flowStep}/{sub-action-2,t:flowStep}',
'failure',
],
]);
});

it('writes flow path failure in sub-action metrics', async () => {
const flow = createFlow('testFlow', async () => {
const subPath1 = await run('sub-action-1', async () => {
Expand Down