diff --git a/js/ai/src/index.ts b/js/ai/src/index.ts index 5dda23e438..e4c111246c 100644 --- a/js/ai/src/index.ts +++ b/js/ai/src/index.ts @@ -33,6 +33,16 @@ export { type GenerateStreamOptions, type GenerateStreamResponse, } from './generate.js'; +export { + GenerateRequest, + GenerateRequestData, + GenerateResponseData, + GenerationUsage, + MediaPart, + Part, + ToolRequestPart, + ToolResponsePart, +} from './model.js'; export { definePrompt, renderPrompt, type PromptAction } from './prompt.js'; export { index, diff --git a/js/ai/src/model.ts b/js/ai/src/model.ts index bd527bc138..d98bf9c67b 100644 --- a/js/ai/src/model.ts +++ b/js/ai/src/model.ts @@ -31,7 +31,6 @@ import { conformOutput, validateSupport, } from './model/middleware.js'; -import * as telemetry from './telemetry.js'; // // IMPORTANT: Please keep type definitions in sync with @@ -196,6 +195,7 @@ export const GenerateRequestSchema = z.object({ context: z.array(DocumentDataSchema).optional(), candidates: z.number().optional(), }); +export type GenerateRequestData = z.infer; export interface GenerateRequest< CustomOptionsSchema extends z.ZodTypeAny = z.ZodTypeAny, @@ -322,25 +322,15 @@ export function defineModel< use: middleware, }, (input) => { - telemetry.recordGenerateActionInputLogs(options.name, input); const startTimeMs = performance.now(); - return runner(input, getStreamingCallback()) - .then((response) => { - const timedResponse = { - ...response, - latencyMs: performance.now() - startTimeMs, - }; - telemetry.recordGenerateActionOutputLogs(options.name, response); - telemetry.recordGenerateActionMetrics(options.name, input, { - response: timedResponse, - }); - return timedResponse; - }) - .catch((err) => { - telemetry.recordGenerateActionMetrics(options.name, input, { err }); - throw err; - }); + return runner(input, getStreamingCallback()).then((response) => { + const timedResponse = { + ...response, + latencyMs: performance.now() - startTimeMs, + }; + return timedResponse; + }); } ); Object.assign(act, { diff --git a/js/ai/src/telemetry.ts b/js/ai/src/telemetry.ts deleted file mode 100644 index 90a41018c7..0000000000 --- a/js/ai/src/telemetry.ts +++ /dev/null @@ -1,373 +0,0 @@ -/** - * Copyright 2024 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import { GENKIT_VERSION } from '@genkit-ai/core'; -import { logger } from '@genkit-ai/core/logging'; -import { - internalMetricNamespaceWrap, - MetricCounter, - MetricHistogram, -} from '@genkit-ai/core/metrics'; -import { - spanMetadataAls, - toDisplayPath, - traceMetadataAls, -} from '@genkit-ai/core/tracing'; -import { ValueType } from '@opentelemetry/api'; -import { createHash } from 'crypto'; -import { - GenerateRequest, - GenerateResponseData, - GenerationUsage, - MediaPart, - Part, - ToolRequestPart, - ToolResponsePart, -} from './model.js'; - -/** The maximum length (in characters) of a logged prompt message. */ -const MAX_LOG_CONTENT_CHARS = 128_000; - -/** - * Wraps the declared metrics in a Genkit-specific, internal namespace. - */ -const _N = internalMetricNamespaceWrap.bind(null, 'ai'); - -const generateActionCounter = new MetricCounter(_N('generate/requests'), { - description: 'Counts calls to genkit generate actions.', - valueType: ValueType.INT, -}); - -const generateActionLatencies = new MetricHistogram(_N('generate/latency'), { - description: 'Latencies when interacting with a Genkit model.', - valueType: ValueType.DOUBLE, - unit: 'ms', -}); - -const generateActionInputCharacters = new MetricCounter( - _N('generate/input/characters'), - { - description: 'Counts input characters to any Genkit model.', - valueType: ValueType.INT, - } -); - -const generateActionInputTokens = new MetricCounter( - _N('generate/input/tokens'), - { - description: 'Counts input tokens to a Genkit model.', - valueType: ValueType.INT, - } -); - -const generateActionInputImages = new MetricCounter( - _N('generate/input/images'), - { - description: 'Counts input images to a Genkit model.', - valueType: ValueType.INT, - } -); - -const generateActionInputVideos = new MetricCounter( - _N('generate/input/videos'), - { - description: 'Counts input videos to a Genkit model.', - valueType: ValueType.INT, - } -); - -const generateActionInputAudio = new MetricCounter(_N('generate/input/audio'), { - description: 'Counts input audio files to a Genkit model.', - valueType: ValueType.INT, -}); - -const generateActionOutputCharacters = new MetricCounter( - _N('generate/output/characters'), - { - description: 'Counts output characters from a Genkit model.', - valueType: ValueType.INT, - } -); - -const generateActionOutputTokens = new MetricCounter( - _N('generate/output/tokens'), - { - description: 'Counts output tokens from a Genkit model.', - valueType: ValueType.INT, - } -); - -const generateActionOutputImages = new MetricCounter( - _N('generate/output/images'), - { - description: 'Count output images from a Genkit model.', - valueType: ValueType.INT, - } -); - -const generateActionOutputVideos = new MetricCounter( - _N('generate/output/videos'), - { - description: 'Count output videos from a Genkit model.', - valueType: ValueType.INT, - } -); - -const generateActionOutputAudio = new MetricCounter( - _N('generate/output/audio'), - { - description: 'Count output audio files from a Genkit model.', - valueType: ValueType.INT, - } -); - -type SharedDimensions = { - modelName?: string; - flowName?: string; - path?: string; - temperature?: number; - topK?: number; - topP?: number; - status?: string; - source?: string; - sourceVersion?: string; -}; - -export function recordGenerateActionMetrics( - modelName: string, - input: GenerateRequest, - opts: { - response?: GenerateResponseData; - err?: any; - } -) { - doRecordGenerateActionMetrics(modelName, opts.response?.usage || {}, { - temperature: input.config?.temperature, - topK: input.config?.topK, - topP: input.config?.topP, - maxOutputTokens: input.config?.maxOutputTokens, - flowName: traceMetadataAls?.getStore()?.flowName, - path: spanMetadataAls?.getStore()?.path, - latencyMs: opts.response?.latencyMs, - err: opts.err, - source: 'ts', - sourceVersion: GENKIT_VERSION, - }); -} - -export function recordGenerateActionInputLogs( - model: string, - input: GenerateRequest -) { - const flowName = traceMetadataAls?.getStore()?.flowName; - const qualifiedPath = spanMetadataAls?.getStore()?.path || ''; - const path = toDisplayPath(qualifiedPath); - const sharedMetadata = { model, path, qualifiedPath, flowName }; - logger.logStructured(`Config[${path}, ${model}]`, { - ...sharedMetadata, - temperature: input.config?.temperature, - topK: input.config?.topK, - topP: input.config?.topP, - maxOutputTokens: input.config?.maxOutputTokens, - stopSequences: input.config?.stopSequences, - source: 'ts', - sourceVersion: GENKIT_VERSION, - }); - - const messages = input.messages.length; - input.messages.forEach((msg, msgIdx) => { - const parts = msg.content.length; - msg.content.forEach((part, partIdx) => { - const partCounts = toPartCounts(partIdx, parts, msgIdx, messages); - logger.logStructured(`Input[${path}, ${model}] ${partCounts}`, { - ...sharedMetadata, - content: toPartLogContent(part), - partIndex: partIdx, - totalParts: parts, - messageIndex: msgIdx, - totalMessages: messages, - role: msg.role, - }); - }); - }); -} - -export function recordGenerateActionOutputLogs( - model: string, - output: GenerateResponseData -) { - const flowName = traceMetadataAls?.getStore()?.flowName; - const qualifiedPath = spanMetadataAls?.getStore()?.path || ''; - const path = toDisplayPath(qualifiedPath); - const sharedMetadata = { model, path, qualifiedPath, flowName }; - const candidates = output.candidates.length; - output.candidates.forEach((cand, candIdx) => { - const parts = cand.message.content.length; - cand.message.content.forEach((part, partIdx) => { - const partCounts = toPartCounts(partIdx, parts, candIdx, candidates); - const initial = cand.finishMessage - ? { finishMessage: toPartLogText(cand.finishMessage) } - : {}; - logger.logStructured(`Output[${path}, ${model}] ${partCounts}`, { - ...initial, - ...sharedMetadata, - content: toPartLogContent(part), - partIndex: partIdx, - totalParts: parts, - candidateIndex: candIdx, - totalCandidates: candidates, - messageIndex: cand.index, - finishReason: cand.finishReason, - role: cand.message.role, - }); - }); - }); -} - -function toPartCounts( - partOrdinal: number, - parts: number, - msgOrdinal: number, - messages: number -): string { - if (parts > 1 && messages > 1) { - return `(part ${xOfY(partOrdinal, parts)} in message ${xOfY( - msgOrdinal, - messages - )})`; - } - if (parts > 1) { - return `(part ${xOfY(partOrdinal, parts)})`; - } - if (messages > 1) { - return `(message ${xOfY(msgOrdinal, messages)})`; - } - return ''; -} - -function xOfY(x: number, y: number): string { - return `${x} of ${y}`; -} - -function toPartLogContent(part: Part): string { - if (part.text) { - return toPartLogText(part.text); - } - if (part.media) { - return toPartLogMedia(part); - } - if (part.toolRequest) { - return toPartLogToolRequest(part); - } - if (part.toolResponse) { - return toPartLogToolResponse(part); - } - return ''; -} - -function toPartLogText(text: string): string { - return text.substring(0, MAX_LOG_CONTENT_CHARS); -} - -function toPartLogMedia(part: MediaPart): string { - if (part.media.url.startsWith('data:')) { - const splitIdx = part.media.url.indexOf('base64,'); - if (splitIdx < 0) { - return ''; - } - const prefix = part.media.url.substring(0, splitIdx + 7); - const hashedContent = createHash('sha256') - .update(part.media.url.substring(splitIdx + 7)) - .digest('hex'); - return `${prefix}`; - } - return toPartLogText(part.media.url); -} - -function toPartLogToolRequest(part: ToolRequestPart): string { - const inputText = - typeof part.toolRequest.input === 'string' - ? part.toolRequest.input - : JSON.stringify(part.toolRequest.input); - return toPartLogText( - `Tool request: ${part.toolRequest.name}, ref: ${part.toolRequest.ref}, input: ${inputText}` - ); -} - -function toPartLogToolResponse(part: ToolResponsePart): string { - const outputText = - typeof part.toolResponse.output === 'string' - ? part.toolResponse.output - : JSON.stringify(part.toolResponse.output); - return toPartLogText( - `Tool response: ${part.toolResponse.name}, ref: ${part.toolResponse.ref}, output: ${outputText}` - ); -} - -/** - * - * Records all metrics associated with performing a GenerateAction. - */ -function doRecordGenerateActionMetrics( - modelName: string, - usage: GenerationUsage, - dimensions: { - flowName?: string; - path?: string; - temperature?: number; - maxOutputTokens?: number; - topK?: number; - topP?: number; - latencyMs?: number; - err?: any; - source?: string; - sourceVersion: string; - } -) { - const shared: SharedDimensions = { - modelName: modelName, - flowName: dimensions.flowName, - path: dimensions.path, - temperature: dimensions.temperature, - topK: dimensions.topK, - topP: dimensions.topP, - source: dimensions.source, - sourceVersion: dimensions.sourceVersion, - status: dimensions.err ? 'failure' : 'success', - }; - - generateActionCounter.add(1, { - maxOutputTokens: dimensions.maxOutputTokens, - error: dimensions.err?.name, - ...shared, - }); - - generateActionLatencies.record(dimensions.latencyMs, shared); - - // inputs - generateActionInputTokens.add(usage.inputTokens, shared); - generateActionInputCharacters.add(usage.inputCharacters, shared); - generateActionInputImages.add(usage.inputImages, shared); - generateActionInputVideos.add(usage.inputVideos, shared); - generateActionInputAudio.add(usage.inputAudioFiles, shared); - - // outputs - generateActionOutputTokens.add(usage.outputTokens, shared); - generateActionOutputCharacters.add(usage.outputCharacters, shared); - generateActionOutputImages.add(usage.outputImages, shared); - generateActionOutputVideos.add(usage.outputVideos, shared); - generateActionOutputAudio.add(usage.outputAudioFiles, shared); -} diff --git a/js/core/src/action.ts b/js/core/src/action.ts index a78c8a724f..3019889667 100644 --- a/js/core/src/action.ts +++ b/js/core/src/action.ts @@ -16,11 +16,9 @@ import { JSONSchema7 } from 'json-schema'; import { AsyncLocalStorage } from 'node:async_hooks'; -import { performance } from 'node:perf_hooks'; import * as z from 'zod'; import { ActionType, lookupPlugin, registerAction } from './registry.js'; import { parseSchema } from './schema.js'; -import * as telemetry from './telemetry.js'; import { SPAN_TYPE_ATTR, runInNewSpan, @@ -138,23 +136,11 @@ export function action< async (metadata) => { metadata.name = actionName; metadata.input = input; - const startTimeMs = performance.now(); - try { - const output = await fn(input); - metadata.output = JSON.stringify(output); - telemetry.writeActionSuccess( - metadata.name, - performance.now() - startTimeMs - ); - return output; - } catch (e) { - telemetry.writeActionFailure( - metadata.name, - performance.now() - startTimeMs, - e - ); - throw e; - } + + const output = await fn(input); + + metadata.output = JSON.stringify(output); + return output; } ); output = parseSchema(output, { diff --git a/js/core/src/telemetry.ts b/js/core/src/telemetry.ts deleted file mode 100644 index b22efabc3e..0000000000 --- a/js/core/src/telemetry.ts +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Copyright 2024 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import { ValueType } from '@opentelemetry/api'; -import { GENKIT_VERSION } from './index.js'; -import { - internalMetricNamespaceWrap, - MetricCounter, - MetricHistogram, -} from './metrics.js'; -import { - spanMetadataAls, - traceMetadataAls, -} from './tracing/instrumentation.js'; - -/** - * Wraps the declared metrics in a Genkit-specific, internal namespace. - */ -const _N = internalMetricNamespaceWrap.bind(null, 'action'); - -const actionCounter = new MetricCounter(_N('requests'), { - description: 'Counts calls to genkit actions.', - valueType: ValueType.INT, -}); - -const actionLatencies = new MetricHistogram(_N('latency'), { - description: 'Latencies when calling Genkit actions.', - valueType: ValueType.DOUBLE, - unit: 'ms', -}); - -export function writeActionSuccess(actionName: string, latencyMs: number) { - const dimensions = { - name: actionName, - flowName: traceMetadataAls?.getStore()?.flowName, - path: spanMetadataAls?.getStore()?.path, - status: 'success', - source: 'ts', - sourceVersion: GENKIT_VERSION, - }; - actionCounter.add(1, dimensions); - actionLatencies.record(latencyMs, dimensions); -} - -export function writeActionFailure( - actionName: string, - latencyMs: number, - err: any -) { - const dimensions = { - name: actionName, - flowName: traceMetadataAls?.getStore()?.flowName, - path: spanMetadataAls?.getStore()?.path, - source: 'ts', - sourceVersion: GENKIT_VERSION, - status: 'failure', - error: err?.name, - }; - actionCounter.add(1, dimensions); - actionLatencies.record(latencyMs, dimensions); -} diff --git a/js/flow/src/flow.ts b/js/flow/src/flow.ts index 8d318a1c33..09b533d24a 100644 --- a/js/flow/src/flow.ts +++ b/js/flow/src/flow.ts @@ -49,7 +49,6 @@ import { getErrorStack, InterruptError, } from './errors.js'; -import * as telemetry from './telemetry.js'; import { FlowActionInputSchema, FlowInvokeEnvelopeMessage, @@ -447,10 +446,6 @@ export class Flow< const output = await handler(input, streamingCallback); metadata.output = JSON.stringify(output); setCustomMetadataAttribute(metadataPrefix('state'), 'done'); - telemetry.writeFlowSuccess( - ctx.flow.name, - performance.now() - startTimeMs - ); return output; } catch (e) { if (e instanceof InterruptError) { @@ -475,13 +470,6 @@ export class Flow< error: getErrorMessage(e), stacktrace: getErrorStack(e), } as FlowError; - - telemetry.recordError(e); - telemetry.writeFlowFailure( - ctx.flow.name, - performance.now() - startTimeMs, - e - ); } errored = true; } @@ -499,7 +487,6 @@ export class Flow< req: express.Request, res: express.Response ): Promise { - telemetry.logRequest(this.name, req); if (req.query.stream === 'true') { const respBody = { error: { @@ -508,7 +495,6 @@ export class Flow< }, }; res.status(400).send(respBody).end(); - telemetry.logResponse(this.name, 400, respBody); return; } @@ -521,11 +507,9 @@ export class Flow< try { const state = await this.runEnvelope(envMsg); res.status(200).send(state.operation).end(); - telemetry.logResponse(this.name, 200, state.operation); } catch (e) { // Pass errors as operations instead of a standard API error // (https://cloud.google.com/apis/design/errors#http_mapping) - telemetry.recordError(e); const respBody = { done: true, result: { @@ -537,7 +521,6 @@ export class Flow< .status(500) .send(respBody as Operation) .end(); - telemetry.logResponse(this.name, 500, respBody); } } @@ -545,7 +528,6 @@ export class Flow< req: __RequestWithAuth, res: express.Response ): Promise { - telemetry.logRequest(this.name, req); const { stream } = req.query; const auth = req.auth; @@ -554,7 +536,6 @@ export class Flow< try { await this.authPolicy?.(auth, input); } catch (e: any) { - telemetry.recordError(e); const respBody = { error: { status: 'PERMISSION_DENIED', @@ -562,7 +543,6 @@ export class Flow< }, }; res.status(403).send(respBody).end(); - telemetry.logResponse(this.name, 403, respBody); return; } @@ -580,10 +560,8 @@ export class Flow< }); res.write(JSON.stringify(state.operation)); res.end(); - telemetry.logResponse(this.name, 200, state.operation); } catch (e) { // Errors while streaming are also passed back as operations - telemetry.recordError(e); const respBody = { done: true, result: { @@ -593,7 +571,6 @@ export class Flow< }; res.write(JSON.stringify(respBody as Operation)); res.end(); - telemetry.logResponse(this.name, 500, respBody); } } else { try { @@ -609,11 +586,9 @@ export class Flow< result: state.operation.result?.response, }) .end(); - telemetry.logResponse(this.name, 200, state.operation); } catch (e) { // Errors for non-durable, non-streaming flows are passed back as // standard API errors. - telemetry.recordError(e); res .status(500) .send({ diff --git a/js/flow/src/telemetry.ts b/js/flow/src/telemetry.ts deleted file mode 100644 index 39e4ec879f..0000000000 --- a/js/flow/src/telemetry.ts +++ /dev/null @@ -1,181 +0,0 @@ -/** - * Copyright 2024 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import { GENKIT_VERSION } from '@genkit-ai/core'; -import { logger } from '@genkit-ai/core/logging'; -import { - internalMetricNamespaceWrap, - MetricCounter, - MetricHistogram, -} from '@genkit-ai/core/metrics'; -import { - PathMetadata, - spanMetadataAls, - toDisplayPath, - traceMetadataAls, -} from '@genkit-ai/core/tracing'; -import { ValueType } from '@opentelemetry/api'; -import express from 'express'; - -/** - * Wraps the declared metrics in a Genkit-specific, internal namespace. - */ -const _N = internalMetricNamespaceWrap.bind(null, 'flow'); - -const flowCounter = new MetricCounter(_N('requests'), { - description: 'Counts calls to genkit flows.', - valueType: ValueType.INT, -}); - -const pathCounter = new MetricCounter(_N('path/requests'), { - description: 'Tracks unique flow paths per flow.', - valueType: ValueType.INT, -}); - -const pathLatencies = new MetricHistogram(_N('path/latency'), { - description: 'Latencies per flow path.', - ValueType: ValueType.DOUBLE, - unit: 'ms', -}); - -const flowLatencies = new MetricHistogram(_N('latency'), { - description: 'Latencies when calling Genkit flows.', - valueType: ValueType.DOUBLE, - unit: 'ms', -}); - -export function recordError(err: any) { - const paths = traceMetadataAls?.getStore()?.paths || new Set(); - const failedPath = - Array.from(paths).find((p) => p.status === 'failure')?.path || - spanMetadataAls?.getStore()?.path || - ''; - const displayPath = toDisplayPath(failedPath); - logger.logStructuredError(`Error[${displayPath}, ${err.name}]`, { - path: displayPath, - qualifiedPath: failedPath, - name: err.name, - message: err.message, - stack: err.stack, - source: 'ts', - sourceVersion: GENKIT_VERSION, - }); -} - -export function writeFlowSuccess(flowName: string, latencyMs: number) { - const dimensions = { - name: flowName, - status: 'success', - source: 'ts', - sourceVersion: GENKIT_VERSION, - }; - flowCounter.add(1, dimensions); - flowLatencies.record(latencyMs, dimensions); - - writePathMetrics(flowName, latencyMs); -} - -export function writeFlowFailure( - flowName: string, - latencyMs: number, - err: any -) { - const dimensions = { - name: flowName, - status: 'failure', - source: 'ts', - sourceVersion: GENKIT_VERSION, - error: err.name, - }; - flowCounter.add(1, dimensions); - flowLatencies.record(latencyMs, dimensions); - - writePathMetrics(flowName, latencyMs, err); -} - -export function logRequest(flowName: string, req: express.Request) { - const qualifiedPath = spanMetadataAls?.getStore()?.path || ''; - const path = toDisplayPath(qualifiedPath); - logger.logStructured(`Request[${flowName}]`, { - flowName: flowName, - headers: { - ...req.headers, - authorization: '', - }, - params: req.params, - body: req.body, - query: req.query, - originalUrl: req.originalUrl, - path, - qualifiedPath, - source: 'ts', - sourceVersion: GENKIT_VERSION, - }); -} - -export function logResponse(flowName: string, respCode: number, respBody: any) { - const qualifiedPath = spanMetadataAls?.getStore()?.path || ''; - const path = toDisplayPath(qualifiedPath); - logger.logStructured(`Response[${flowName}]`, { - flowName: flowName, - path, - qualifiedPath, - code: respCode, - body: respBody, - source: 'ts', - sourceVersion: GENKIT_VERSION, - }); -} - -/** Writes all path-level metrics stored in the current flow execution. */ -function writePathMetrics(flowName: string, latencyMs: number, err?: any) { - const paths = traceMetadataAls.getStore()?.paths || new Set(); - const flowPaths = Array.from(paths).filter((meta) => - meta.path.includes(flowName) - ); - if (flowPaths) { - logger.logStructured(`Paths[${flowName}]`, { - flowName: flowName, - paths: flowPaths.map((p) => toDisplayPath(p.path)), - }); - - flowPaths.forEach((p) => writePathMetric(flowName, p)); - // If we're writing a failure, but none of the stored paths have failed, - // this means the root flow threw the error. - if (err && !flowPaths.some((p) => p.status === 'failure')) { - writePathMetric(flowName, { - status: 'failure', - path: spanMetadataAls?.getStore()?.path || '', - error: err, - latency: latencyMs, - }); - } - } -} - -/** Writes metrics for a single PathMetadata */ -function writePathMetric(flowName: string, meta: PathMetadata) { - const pathDimensions = { - flowName: flowName, - status: meta.status, - error: meta.error, - path: meta.path, - source: 'ts', - sourceVersion: GENKIT_VERSION, - }; - pathCounter.add(1, pathDimensions); - pathLatencies.record(meta.latency, pathDimensions); -} diff --git a/js/plugins/google-cloud/src/gcpOpenTelemetry.ts b/js/plugins/google-cloud/src/gcpOpenTelemetry.ts index 68dfff4dfc..e2c0947778 100644 --- a/js/plugins/google-cloud/src/gcpOpenTelemetry.ts +++ b/js/plugins/google-cloud/src/gcpOpenTelemetry.ts @@ -20,7 +20,11 @@ import { TraceExporter } from '@google-cloud/opentelemetry-cloud-trace-exporter' import { GcpDetectorSync } from '@google-cloud/opentelemetry-resource-util'; import { Span, SpanStatusCode, TraceFlags } from '@opentelemetry/api'; import { getNodeAutoInstrumentations } from '@opentelemetry/auto-instrumentations-node'; -import { ExportResult } from '@opentelemetry/core'; +import { + ExportResult, + hrTimeDuration, + hrTimeToMilliseconds, +} from '@opentelemetry/core'; import { Instrumentation } from '@opentelemetry/instrumentation'; import { PinoInstrumentation } from '@opentelemetry/instrumentation-pino'; import { WinstonInstrumentation } from '@opentelemetry/instrumentation-winston'; @@ -42,7 +46,14 @@ import { ReadableSpan, SpanExporter, } from '@opentelemetry/sdk-trace-base'; + +import { extractErrorName } from './utils'; + +import { PathMetadata } from '@genkit-ai/core/tracing'; import { PluginOptions } from './index.js'; +import { actionTelemetry } from './telemetry/action.js'; +import { flowsTelemetry } from './telemetry/flow.js'; +import { generateTelemetry } from './telemetry/generate.js'; let metricExporter: PushMetricExporter; let spanProcessor: BatchSpanProcessor; @@ -61,12 +72,14 @@ export class GcpOpenTelemetry implements TelemetryConfig { * required by GCP. */ private gcpTraceLogHook = (span: Span, record: any) => { - const isSampled = !!(span.spanContext().traceFlags & TraceFlags.SAMPLED); - record['logging.googleapis.com/trace'] = `projects/${ - this.options.projectId - }/traces/${span.spanContext().traceId}`; - record['logging.googleapis.com/spanId'] = span.spanContext().spanId; - record['logging.googleapis.com/trace_sampled'] = isSampled ? '1' : '0'; + const spanContext = span.spanContext(); + const isSampled = !!(spanContext.traceFlags & TraceFlags.SAMPLED); + const projectId = this.options.projectId; + + record['logging.googleapis.com/trace'] ??= + `projects/${projectId}/traces/${spanContext.traceId}`; + record['logging.googleapis.com/trace_sampled'] ??= isSampled ? '1' : '0'; + record['logging.googleapis.com/spanId'] ??= spanContext.spanId; }; constructor(options?: PluginOptions) { @@ -93,7 +106,8 @@ export class GcpOpenTelemetry implements TelemetryConfig { ? new TraceExporter({ credentials: this.options.credentials, }) - : new InMemorySpanExporter() + : new InMemorySpanExporter(), + this.options.projectId ); return spanExporter; } @@ -178,7 +192,10 @@ export class GcpOpenTelemetry implements TelemetryConfig { * error spans that marks them as error in GCP. */ class AdjustingTraceExporter implements SpanExporter { - constructor(private exporter: SpanExporter) {} + constructor( + private exporter: SpanExporter, + private projectId?: string + ) {} export( spans: ReadableSpan[], @@ -203,7 +220,38 @@ class AdjustingTraceExporter implements SpanExporter { } private adjust(spans: ReadableSpan[]): ReadableSpan[] { + const allPaths = spans + .filter((span) => span.attributes['genkit:path']) + .map( + (span) => + ({ + path: span.attributes['genkit:path'] as string, + status: + (span.attributes['genkit:state'] as string) === 'error' + ? 'failure' + : 'success', + error: extractErrorName(span.events), + latency: hrTimeToMilliseconds( + hrTimeDuration(span.startTime, span.endTime) + ), + }) as PathMetadata + ); + + const allLeafPaths = new Set( + allPaths.filter((leafPath) => + allPaths.every( + (path) => + path.path === leafPath.path || + !path.path.startsWith(leafPath.path) || + (path.path.startsWith(leafPath.path) && + path.status !== leafPath.status) + ) + ) + ); + return spans.map((span) => { + this.tickTelemetry(span, allLeafPaths); + span = this.redactPii(span); span = this.markErrorSpanAsError(span); span = this.normalizeLabels(span); @@ -211,6 +259,31 @@ class AdjustingTraceExporter implements SpanExporter { }); } + private tickTelemetry(span: ReadableSpan, paths: Set) { + const attributes = span.attributes; + + if (!Object.keys(attributes).includes('genkit:type')) { + return; + } + + const type = attributes['genkit:type'] as string; + const subtype = attributes['genkit:metadata:subtype'] as string; + + if (type === 'flow') { + flowsTelemetry.tick(span, paths, this.projectId); + return; + } + + if (type === 'action' && subtype === 'model') { + generateTelemetry.tick(span, paths, this.projectId); + return; + } + + if (type === 'action' || type == 'flowStep') { + actionTelemetry.tick(span, paths, this.projectId); + } + } + private redactPii(span: ReadableSpan): ReadableSpan { const hasInput = 'genkit:input' in span.attributes; const hasOutput = 'genkit:output' in span.attributes; diff --git a/js/core/src/metrics.ts b/js/plugins/google-cloud/src/metrics.ts similarity index 93% rename from js/core/src/metrics.ts rename to js/plugins/google-cloud/src/metrics.ts index 1398eb1838..b659de9405 100644 --- a/js/core/src/metrics.ts +++ b/js/plugins/google-cloud/src/metrics.ts @@ -14,7 +14,9 @@ * limitations under the License. */ +import { PathMetadata } from '@genkit-ai/core/tracing'; import { Counter, Histogram, Meter, metrics } from '@opentelemetry/api'; +import { ReadableSpan } from '@opentelemetry/sdk-trace-base'; export const METER_NAME = 'genkit'; export const METRIC_NAME_PREFIX = 'genkit'; @@ -110,3 +112,7 @@ function truncateDimensions(opts?: any) { }); } } + +export interface Telemetry { + tick(span: ReadableSpan, paths?: Set, projectId?: string): void; +} diff --git a/js/plugins/google-cloud/src/telemetry/action.ts b/js/plugins/google-cloud/src/telemetry/action.ts new file mode 100644 index 0000000000..8e8fbe16e9 --- /dev/null +++ b/js/plugins/google-cloud/src/telemetry/action.ts @@ -0,0 +1,118 @@ +/** + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { GENKIT_VERSION } from '@genkit-ai/core'; +import { logger } from '@genkit-ai/core/logging'; +import { PathMetadata } from '@genkit-ai/core/tracing'; +import { ValueType } from '@opentelemetry/api'; +import { hrTimeDuration, hrTimeToMilliseconds } from '@opentelemetry/core'; +import { + MetricCounter, + MetricHistogram, + Telemetry, + internalMetricNamespaceWrap, +} from '../metrics.js'; +import { extractErrorName, extractOuterFlowNameFromPath } from '../utils'; + +import { ReadableSpan } from '@opentelemetry/sdk-trace-base'; + +class ActionTelemetry implements Telemetry { + /** + * Wraps the declared metrics in a Genkit-specific, internal namespace. + */ + private _N = internalMetricNamespaceWrap.bind(null, 'action'); + + private actionCounter = new MetricCounter(this._N('requests'), { + description: 'Counts calls to genkit actions.', + valueType: ValueType.INT, + }); + + private actionLatencies = new MetricHistogram(this._N('latency'), { + description: 'Latencies when calling Genkit actions.', + valueType: ValueType.DOUBLE, + unit: 'ms', + }); + + tick( + span: ReadableSpan, + paths?: Set, + projectId?: string + ): void { + const attributes = span.attributes; + + const actionName = (attributes['genkit:name'] as string) || ''; + const path = (attributes['genkit:path'] as string) || ''; + const flowName = + (attributes['genkit:metadata:flow:name'] as string) || + extractOuterFlowNameFromPath(path); + const state = attributes['genkit:state'] || 'success'; + const latencyMs = hrTimeToMilliseconds( + hrTimeDuration(span.startTime, span.endTime) + ); + const errorName = extractErrorName(span.events); + + if (state === 'success') { + this.writeSuccess(actionName, flowName, path, latencyMs); + return; + } + if (state === 'error') { + this.writeFailure(actionName, flowName, path, latencyMs, errorName); + } + + logger.warn(`Unknown action state; ${state}`); + } + + private writeSuccess( + actionName: string, + flowName: string, + path: string, + latencyMs: number + ) { + const dimensions = { + name: actionName, + flowName, + path, + status: 'success', + source: 'ts', + sourceVersion: GENKIT_VERSION, + }; + this.actionCounter.add(1, dimensions); + this.actionLatencies.record(latencyMs, dimensions); + } + + private writeFailure( + actionName: string, + flowName: string, + path: string, + latencyMs: number, + errorName?: string + ) { + const dimensions = { + name: actionName, + flowName, + path, + source: 'ts', + sourceVersion: GENKIT_VERSION, + status: 'failure', + error: errorName, + }; + this.actionCounter.add(1, dimensions); + this.actionLatencies.record(latencyMs, dimensions); + } +} + +const actionTelemetry = new ActionTelemetry(); +export { actionTelemetry }; diff --git a/js/plugins/google-cloud/src/telemetry/flow.ts b/js/plugins/google-cloud/src/telemetry/flow.ts new file mode 100644 index 0000000000..57f128576c --- /dev/null +++ b/js/plugins/google-cloud/src/telemetry/flow.ts @@ -0,0 +1,256 @@ +/** + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { GENKIT_VERSION } from '@genkit-ai/core'; +import { logger } from '@genkit-ai/core/logging'; +import { PathMetadata, toDisplayPath } from '@genkit-ai/core/tracing'; +import { ValueType } from '@opentelemetry/api'; +import { hrTimeDuration, hrTimeToMilliseconds } from '@opentelemetry/core'; +import { ReadableSpan } from '@opentelemetry/sdk-trace-base'; +import { + MetricCounter, + MetricHistogram, + Telemetry, + internalMetricNamespaceWrap, +} from '../metrics'; +import { + createCommonLogAttributes, + extractErrorMessage, + extractErrorName, + extractErrorStack, +} from '../utils'; + +class FlowsTelemetry implements Telemetry { + /** + * Wraps the declared metrics in a Genkit-specific, internal namespace. + */ + private _N = internalMetricNamespaceWrap.bind(null, 'flow'); + + private flowCounter = new MetricCounter(this._N('requests'), { + description: 'Counts calls to genkit flows.', + valueType: ValueType.INT, + }); + + private pathCounter = new MetricCounter(this._N('path/requests'), { + description: 'Tracks unique flow paths per flow.', + valueType: ValueType.INT, + }); + + private pathLatencies = new MetricHistogram(this._N('path/latency'), { + description: 'Latencies per flow path.', + ValueType: ValueType.DOUBLE, + unit: 'ms', + }); + + private flowLatencies = new MetricHistogram(this._N('latency'), { + description: 'Latencies when calling Genkit flows.', + valueType: ValueType.DOUBLE, + unit: 'ms', + }); + + tick( + span: ReadableSpan, + paths?: Set, + projectId?: string + ): void { + const attributes = span.attributes; + const name = attributes['genkit:name'] as string; + const path = attributes['genkit:path'] as string; + const latencyMs = hrTimeToMilliseconds( + hrTimeDuration(span.startTime, span.endTime) + ); + const isRoot = (attributes['genkit:isRoot'] as boolean) || false; + const state = attributes['genkit:state'] as string; + + if (state === 'success') { + this.writeFlowSuccess( + span, + paths!, + name, + path, + latencyMs, + isRoot, + projectId + ); + return; + } + + if (state === 'error') { + const errorName = extractErrorName(span.events) || ''; + const errorMessage = extractErrorMessage(span.events) || ''; + const errorStack = extractErrorStack(span.events) || ''; + + this.writeFlowFailure( + span, + paths!, + name, + path, + latencyMs, + errorName, + isRoot, + projectId + ); + this.recordError( + span, + path, + errorName, + errorMessage, + errorStack, + projectId + ); + return; + } + + logger.warn(`Unknown flow state; ${state}`); + } + + private recordError( + span: ReadableSpan, + path: string, + errorName: string, + errorMessage: string, + errorStack: string, + projectId?: string + ) { + const displayPath = toDisplayPath(path); + logger.logStructuredError(`Error[${displayPath}, ${errorName}]`, { + ...createCommonLogAttributes(span, projectId), + path: displayPath, + qualifiedPath: path, + name: errorName, + message: errorMessage, + stack: errorStack, + source: 'ts', + sourceVersion: GENKIT_VERSION, + }); + } + + private writeFlowSuccess( + span: ReadableSpan, + paths: Set, + flowName: string, + path: string, + latencyMs: number, + isRoot: boolean, + projectId?: string + ) { + const dimensions = { + name: flowName, + status: 'success', + source: 'ts', + sourceVersion: GENKIT_VERSION, + }; + this.flowCounter.add(1, dimensions); + this.flowLatencies.record(latencyMs, dimensions); + + if (isRoot) { + this.writePathMetrics( + span, + path, + paths, + flowName, + latencyMs, + undefined, + projectId + ); + } + } + + private writeFlowFailure( + span: ReadableSpan, + paths: Set, + flowName: string, + path: string, + latencyMs: number, + errorName: string, + isRoot: boolean, + projectId?: string + ) { + const dimensions = { + name: flowName, + status: 'failure', + source: 'ts', + sourceVersion: GENKIT_VERSION, + error: errorName, + }; + this.flowCounter.add(1, dimensions); + this.flowLatencies.record(latencyMs, dimensions); + + if (isRoot) { + this.writePathMetrics( + span, + path, + paths, + flowName, + latencyMs, + errorName, + projectId + ); + } + } + + /** Writes all path-level metrics stored in the current flow execution. */ + private writePathMetrics( + span: ReadableSpan, + rootPath: string, + paths: Set, + flowName: string, + latencyMs: number, + err?: string, + projectId?: string + ) { + const flowPaths = Array.from(paths).filter((meta) => + meta.path.includes(flowName) + ); + + if (flowPaths) { + logger.logStructured(`Paths[${flowName}]`, { + ...createCommonLogAttributes(span, projectId), + flowName: flowName, + paths: flowPaths.map((p) => toDisplayPath(p.path)), + }); + + flowPaths.forEach((p) => this.writePathMetric(flowName, p)); + // If we're writing a failure, but none of the stored paths have failed, + // this means the root flow threw the error. + if (err && !flowPaths.some((p) => p.status === 'failure')) { + this.writePathMetric(flowName, { + status: 'failure', + path: rootPath, + error: err, + latency: latencyMs, + }); + } + } + } + + /** Writes metrics for a single PathMetadata */ + private writePathMetric(flowName: string, meta: PathMetadata) { + const pathDimensions = { + flowName: flowName, + status: meta.status, + error: meta.error, + path: meta.path, + source: 'ts', + sourceVersion: GENKIT_VERSION, + }; + this.pathCounter.add(1, pathDimensions); + this.pathLatencies.record(meta.latency, pathDimensions); + } +} + +const flowsTelemetry = new FlowsTelemetry(); +export { flowsTelemetry }; diff --git a/js/plugins/google-cloud/src/telemetry/generate.ts b/js/plugins/google-cloud/src/telemetry/generate.ts new file mode 100644 index 0000000000..6cb949d9d9 --- /dev/null +++ b/js/plugins/google-cloud/src/telemetry/generate.ts @@ -0,0 +1,433 @@ +/** + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { + GenerateRequestData, + GenerateResponseData, + GenerationUsage, + MediaPart, + Part, + ToolRequestPart, + ToolResponsePart, +} from '@genkit-ai/ai'; +import { GENKIT_VERSION } from '@genkit-ai/core'; +import { logger } from '@genkit-ai/core/logging'; +import { PathMetadata, toDisplayPath } from '@genkit-ai/core/tracing'; +import { ValueType } from '@opentelemetry/api'; +import { createHash } from 'crypto'; +import { + MetricCounter, + MetricHistogram, + internalMetricNamespaceWrap, +} from '../metrics'; + +import { ReadableSpan } from '@opentelemetry/sdk-trace-base'; +import { Telemetry } from '../metrics'; +import { + createCommonLogAttributes, + extractErrorName, + extractOuterFlowNameFromPath, +} from '../utils'; + +type SharedDimensions = { + modelName?: string; + flowName?: string; + path?: string; + temperature?: number; + topK?: number; + topP?: number; + status?: string; + source?: string; + sourceVersion?: string; +}; + +class GenerateTelemetry implements Telemetry { + /** + * Wraps the declared metrics in a Genkit-specific, internal namespace. + */ + private _N = internalMetricNamespaceWrap.bind(null, 'ai'); + + /** The maximum length (in characters) of a logged prompt message. */ + private MAX_LOG_CONTENT_CHARS = 128_000; + + private actionCounter = new MetricCounter(this._N('generate/requests'), { + description: 'Counts calls to genkit generate actions.', + valueType: ValueType.INT, + }); + + private latencies = new MetricHistogram(this._N('generate/latency'), { + description: 'Latencies when interacting with a Genkit model.', + valueType: ValueType.DOUBLE, + unit: 'ms', + }); + + private inputCharacters = new MetricCounter( + this._N('generate/input/characters'), + { + description: 'Counts input characters to any Genkit model.', + valueType: ValueType.INT, + } + ); + + private inputTokens = new MetricCounter(this._N('generate/input/tokens'), { + description: 'Counts input tokens to a Genkit model.', + valueType: ValueType.INT, + }); + + private inputImages = new MetricCounter(this._N('generate/input/images'), { + description: 'Counts input images to a Genkit model.', + valueType: ValueType.INT, + }); + + private inputVideos = new MetricCounter(this._N('generate/input/videos'), { + description: 'Counts input videos to a Genkit model.', + valueType: ValueType.INT, + }); + + private inputAudio = new MetricCounter(this._N('generate/input/audio'), { + description: 'Counts input audio files to a Genkit model.', + valueType: ValueType.INT, + }); + + private outputCharacters = new MetricCounter( + this._N('generate/output/characters'), + { + description: 'Counts output characters from a Genkit model.', + valueType: ValueType.INT, + } + ); + + private outputTokens = new MetricCounter(this._N('generate/output/tokens'), { + description: 'Counts output tokens from a Genkit model.', + valueType: ValueType.INT, + }); + + private outputImages = new MetricCounter(this._N('generate/output/images'), { + description: 'Count output images from a Genkit model.', + valueType: ValueType.INT, + }); + + private outputVideos = new MetricCounter(this._N('generate/output/videos'), { + description: 'Count output videos from a Genkit model.', + valueType: ValueType.INT, + }); + + private outputAudio = new MetricCounter(this._N('generate/output/audio'), { + description: 'Count output audio files from a Genkit model.', + valueType: ValueType.INT, + }); + + tick( + span: ReadableSpan, + paths?: Set, + projectId?: string + ): void { + const attributes = span.attributes; + const modelName = attributes['genkit:name'] as string; + const path = (attributes['genkit:path'] as string) || ''; + const input = + 'genkit:input' in attributes + ? (JSON.parse( + attributes['genkit:input']! as string + ) as GenerateRequestData) + : undefined; + const output = + 'genkit:output' in attributes + ? (JSON.parse( + attributes['genkit:output']! as string + ) as GenerateResponseData) + : undefined; + + const errName = extractErrorName(span.events); + const flowName = extractOuterFlowNameFromPath(path); + + if (input) { + this.recordGenerateActionMetrics(modelName, flowName, path, input, { + response: output, + errName, + }); + this.recordGenerateActionInputLogs( + span, + modelName, + flowName, + path, + input, + projectId + ); + } + + if (output) { + this.recordGenerateActionOutputLogs( + span, + modelName, + flowName, + path, + output, + projectId + ); + } + } + + private recordGenerateActionMetrics( + modelName: string, + flowName: string | undefined, + path: string, + input: GenerateRequestData, + opts: { + response?: GenerateResponseData; + errName?: string; + } + ) { + this.doRecordGenerateActionMetrics(modelName, opts.response?.usage || {}, { + temperature: input.config?.temperature, + topK: input.config?.topK, + topP: input.config?.topP, + maxOutputTokens: input.config?.maxOutputTokens, + flowName, + path, + latencyMs: opts.response?.latencyMs, + errName: opts.errName, + source: 'ts', + sourceVersion: GENKIT_VERSION, + }); + } + + private recordGenerateActionInputLogs( + span: ReadableSpan, + model: string, + flowName: string | undefined, + qualifiedPath: string, + input: GenerateRequestData, + projectId?: string + ) { + const path = toDisplayPath(qualifiedPath); + const sharedMetadata = { + ...createCommonLogAttributes(span, projectId), + model, + path, + qualifiedPath, + flowName, + }; + logger.logStructured(`Config[${path}, ${model}]`, { + ...sharedMetadata, + temperature: input.config?.temperature, + topK: input.config?.topK, + topP: input.config?.topP, + maxOutputTokens: input.config?.maxOutputTokens, + stopSequences: input.config?.stopSequences, + source: 'ts', + sourceVersion: GENKIT_VERSION, + }); + + const messages = input.messages.length; + input.messages.forEach((msg, msgIdx) => { + const parts = msg.content.length; + msg.content.forEach((part, partIdx) => { + const partCounts = this.toPartCounts(partIdx, parts, msgIdx, messages); + logger.logStructured(`Input[${path}, ${model}] ${partCounts}`, { + ...sharedMetadata, + content: this.toPartLogContent(part), + partIndex: partIdx, + totalParts: parts, + messageIndex: msgIdx, + totalMessages: messages, + }); + }); + }); + } + + private recordGenerateActionOutputLogs( + span: ReadableSpan, + model: string, + flowName: string | undefined, + qualifiedPath: string, + output: GenerateResponseData, + projectId?: string + ) { + const path = toDisplayPath(qualifiedPath); + const sharedMetadata = { + ...createCommonLogAttributes(span, projectId), + model, + path, + qualifiedPath, + flowName, + }; + const candidates = output.candidates.length; + + output.candidates.forEach((cand, candIdx) => { + const parts = cand.message.content.length; + cand.message.content.forEach((part, partIdx) => { + const partCounts = this.toPartCounts( + partIdx, + parts, + candIdx, + candidates + ); + const initial = cand.finishMessage + ? { finishMessage: this.toPartLogText(cand.finishMessage) } + : {}; + logger.logStructured(`Output[${path}, ${model}] ${partCounts}`, { + ...initial, + ...sharedMetadata, + content: this.toPartLogContent(part), + partIndex: partIdx, + totalParts: parts, + candidateIndex: candIdx, + totalCandidates: candidates, + messageIndex: cand.index, + finishReason: cand.finishReason, + }); + }); + }); + } + + private toPartCounts( + partOrdinal: number, + parts: number, + msgOrdinal: number, + messages: number + ): string { + if (parts > 1 && messages > 1) { + return `(part ${this.xOfY(partOrdinal, parts)} in message ${this.xOfY( + msgOrdinal, + messages + )})`; + } + if (parts > 1) { + return `(part ${this.xOfY(partOrdinal, parts)})`; + } + if (messages > 1) { + return `(message ${this.xOfY(msgOrdinal, messages)})`; + } + return ''; + } + + private xOfY(x: number, y: number): string { + return `${x} of ${y}`; + } + + private toPartLogContent(part: Part): string { + if (part.text) { + return this.toPartLogText(part.text); + } + if (part.media) { + return this.toPartLogMedia(part); + } + if (part.toolRequest) { + return this.toPartLogToolRequest(part); + } + if (part.toolResponse) { + return this.toPartLogToolResponse(part); + } + return ''; + } + + private toPartLogText(text: string): string { + return text.substring(0, this.MAX_LOG_CONTENT_CHARS); + } + + private toPartLogMedia(part: MediaPart): string { + if (part.media.url.startsWith('data:')) { + const splitIdx = part.media.url.indexOf('base64,'); + if (splitIdx < 0) { + return ''; + } + const prefix = part.media.url.substring(0, splitIdx + 7); + const hashedContent = createHash('sha256') + .update(part.media.url.substring(splitIdx + 7)) + .digest('hex'); + return `${prefix}`; + } + return this.toPartLogText(part.media.url); + } + + private toPartLogToolRequest(part: ToolRequestPart): string { + const inputText = + typeof part.toolRequest.input === 'string' + ? part.toolRequest.input + : JSON.stringify(part.toolRequest.input); + return this.toPartLogText( + `Tool request: ${part.toolRequest.name}, ref: ${part.toolRequest.ref}, input: ${inputText}` + ); + } + + private toPartLogToolResponse(part: ToolResponsePart): string { + const outputText = + typeof part.toolResponse.output === 'string' + ? part.toolResponse.output + : JSON.stringify(part.toolResponse.output); + return this.toPartLogText( + `Tool response: ${part.toolResponse.name}, ref: ${part.toolResponse.ref}, output: ${outputText}` + ); + } + + /** + * Records all metrics associated with performing a GenerateAction. + */ + private doRecordGenerateActionMetrics( + modelName: string, + usage: GenerationUsage, + dimensions: { + flowName?: string; + path?: string; + temperature?: number; + maxOutputTokens?: number; + topK?: number; + topP?: number; + latencyMs?: number; + errName?: string; + source?: string; + sourceVersion: string; + } + ) { + const shared: SharedDimensions = { + modelName: modelName, + flowName: dimensions.flowName, + path: dimensions.path, + temperature: dimensions.temperature, + topK: dimensions.topK, + topP: dimensions.topP, + source: dimensions.source, + sourceVersion: dimensions.sourceVersion, + status: dimensions.errName ? 'failure' : 'success', + }; + + this.actionCounter.add(1, { + maxOutputTokens: dimensions.maxOutputTokens, + error: dimensions.errName, + ...shared, + }); + + this.latencies.record(dimensions.latencyMs, shared); + + // inputs + this.inputTokens.add(usage.inputTokens, shared); + this.inputCharacters.add(usage.inputCharacters, shared); + this.inputImages.add(usage.inputImages, shared); + this.inputVideos.add(usage.inputVideos, shared); + this.inputAudio.add(usage.inputAudioFiles, shared); + + // outputs + this.outputTokens.add(usage.outputTokens, shared); + this.outputCharacters.add(usage.outputCharacters, shared); + this.outputImages.add(usage.outputImages, shared); + this.outputVideos.add(usage.outputVideos, shared); + this.outputAudio.add(usage.outputAudioFiles, shared); + } +} + +const generateTelemetry = new GenerateTelemetry(); +export { generateTelemetry }; diff --git a/js/plugins/google-cloud/src/utils.ts b/js/plugins/google-cloud/src/utils.ts new file mode 100644 index 0000000000..d8ea43cde4 --- /dev/null +++ b/js/plugins/google-cloud/src/utils.ts @@ -0,0 +1,76 @@ +/** + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { TraceFlags } from '@opentelemetry/api'; +import { ReadableSpan, TimedEvent } from '@opentelemetry/sdk-trace-base'; + +export function extractOuterFlowNameFromPath(path: string) { + if (!path || path === '') { + return ''; + } + + const flowName = path.match('/{(.+),t:flow}+'); + return flowName ? flowName[1] : ''; +} + +export function extractErrorName(events: TimedEvent[]): string | undefined { + return events + .filter((event) => event.name === 'exception') + .map((event) => { + const attributes = event.attributes; + return attributes + ? (attributes['exception.type'] as string) + : ''; + }) + .at(0); +} + +export function extractErrorMessage(events: TimedEvent[]): string | undefined { + return events + .filter((event) => event.name === 'exception') + .map((event) => { + const attributes = event.attributes; + return attributes + ? (attributes['exception.message'] as string) + : ''; + }) + .at(0); +} + +export function extractErrorStack(events: TimedEvent[]): string | undefined { + return events + .filter((event) => event.name === 'exception') + .map((event) => { + const attributes = event.attributes; + return attributes + ? (attributes['exception.stacktrace'] as string) + : ''; + }) + .at(0); +} + +export function createCommonLogAttributes( + span: ReadableSpan, + projectId?: string +) { + const spanContext = span.spanContext(); + const isSampled = !!(spanContext.traceFlags & TraceFlags.SAMPLED); + return { + 'logging.googleapis.com/spanId': spanContext.spanId, + 'logging.googleapis.com/trace': `projects/${projectId}/traces/${spanContext.traceId}`, + 'logging.googleapis.com/trace_sampled': isSampled ? '1' : '0', + }; +} diff --git a/js/plugins/google-cloud/tests/logs_test.ts b/js/plugins/google-cloud/tests/logs_test.ts index d81b447770..c21f01cc0b 100644 --- a/js/plugins/google-cloud/tests/logs_test.ts +++ b/js/plugins/google-cloud/tests/logs_test.ts @@ -27,6 +27,8 @@ import { registerFlowStateStore } from '@genkit-ai/core/registry'; import { defineFlow, run, runFlow } from '@genkit-ai/flow'; import { __addTransportStreamForTesting, + __forceFlushSpansForTesting, + __getSpanExporterForTesting, googleCloud, } from '@genkit-ai/google-cloud'; import assert from 'node:assert'; @@ -70,6 +72,7 @@ describe('GoogleCloudLogs', () => { }); beforeEach(async () => { logLines = ''; + __getSpanExporterForTesting().reset(); }); it('writes path logs', async () => { @@ -77,6 +80,8 @@ describe('GoogleCloudLogs', () => { await runFlow(testFlow); + await getExportedSpans(); + const logMessages = await getLogs(); assert.equal(logMessages.includes('[info] Paths[testFlow]'), true); }); @@ -91,6 +96,8 @@ describe('GoogleCloudLogs', () => { await runFlow(testFlow); }); + await getExportedSpans(); + const logMessages = await getLogs(); assert.equal( logMessages.includes( @@ -147,6 +154,8 @@ describe('GoogleCloudLogs', () => { await runFlow(testFlow); + await getExportedSpans(); + const logMessages = await getLogs(); assert.equal( logMessages.includes( @@ -217,6 +226,22 @@ describe('GoogleCloudLogs', () => { } }); +/** Polls the in memory metric exporter until the genkit scope is found. */ +async function getExportedSpans( + maxAttempts: number = 200 +): promise { + __forceFlushSpansForTesting(); + var attempts = 0; + while (attempts++ < maxAttempts) { + await new Promise((resolve) => setTimeout(resolve, 50)); + const found = __getSpanExporterForTesting().getFinishedSpans(); + if (found.length > 0) { + return found; + } + } + assert.fail(`Timed out while waiting for spans to be exported.`); +} + class NoOpFlowStateStore implements FlowStateStore { state: Record = {}; diff --git a/js/plugins/google-cloud/tests/metrics_test.ts b/js/plugins/google-cloud/tests/metrics_test.ts index 658bfdfa08..e64cf9b0ac 100644 --- a/js/plugins/google-cloud/tests/metrics_test.ts +++ b/js/plugins/google-cloud/tests/metrics_test.ts @@ -27,7 +27,9 @@ import { import { registerFlowStateStore } from '@genkit-ai/core/registry'; import { defineFlow, run, runAction, runFlow } from '@genkit-ai/flow'; import { + __forceFlushSpansForTesting, __getMetricExporterForTesting, + __getSpanExporterForTesting, GcpOpenTelemetry, googleCloud, } from '@genkit-ai/google-cloud'; @@ -68,6 +70,7 @@ describe('GoogleCloudMetrics', () => { }); beforeEach(async () => { __getMetricExporterForTesting().reset(); + __getSpanExporterForTesting().reset(); }); it('writes flow metrics', async () => { @@ -76,6 +79,8 @@ describe('GoogleCloudMetrics', () => { await runFlow(testFlow); await runFlow(testFlow); + await getExportedSpans(); + const requestCounter = await getCounterMetric('genkit/flow/requests'); const latencyHistogram = await getHistogramMetric('genkit/flow/latency'); assert.equal(requestCounter.value, 2); @@ -100,6 +105,8 @@ describe('GoogleCloudMetrics', () => { await runFlow(testFlow); }); + await getExportedSpans(); + const requestCounter = await getCounterMetric('genkit/flow/requests'); assert.equal(requestCounter.value, 1); assert.equal(requestCounter.attributes.name, 'testFlow'); @@ -121,6 +128,8 @@ describe('GoogleCloudMetrics', () => { await runFlow(testFlow); await runFlow(testFlow); + await getExportedSpans(); + const requestCounter = await getCounterMetric('genkit/action/requests'); const latencyHistogram = await getHistogramMetric('genkit/action/latency'); assert.equal(requestCounter.value, 6); @@ -140,6 +149,8 @@ describe('GoogleCloudMetrics', () => { await runFlow(testFlow); + await getExportedSpans(); + const requestCounter = await getCounterMetric('genkit/flow/requests'); const latencyHistogram = await getHistogramMetric('genkit/flow/latency'); assert.equal( @@ -165,6 +176,8 @@ describe('GoogleCloudMetrics', () => { await runFlow(testFlow); }); + await getExportedSpans(); + const requestCounter = await getCounterMetric('genkit/action/requests'); assert.equal(requestCounter.value, 1); assert.equal(requestCounter.attributes.name, 'testActionWithFailure'); @@ -212,6 +225,8 @@ describe('GoogleCloudMetrics', () => { }, }); + await getExportedSpans(); + const requestCounter = await getCounterMetric( 'genkit/ai/generate/requests' ); @@ -284,6 +299,8 @@ describe('GoogleCloudMetrics', () => { }); }); + await getExportedSpans(); + const requestCounter = await getCounterMetric( 'genkit/ai/generate/requests' ); @@ -306,6 +323,8 @@ describe('GoogleCloudMetrics', () => { await runFlow(flow); + await getExportedSpans(); + const requestCounter = await getCounterMetric('genkit/action/requests'); const latencyHistogram = await getHistogramMetric('genkit/action/latency'); assert.equal(requestCounter.attributes.flowName, 'flowNameLabelTestFlow'); @@ -348,6 +367,8 @@ describe('GoogleCloudMetrics', () => { await runFlow(flow); + await getExportedSpans(); + const metrics = [ await getCounterMetric('genkit/ai/generate/requests'), await getCounterMetric('genkit/ai/generate/input/tokens'), @@ -376,6 +397,8 @@ describe('GoogleCloudMetrics', () => { await runFlow(flow); + await getExportedSpans(); + const expectedPaths = new Set([ '/{pathTestFlow,t:flow}/{step2,t:flowStep}', '/{pathTestFlow,t:flow}/{step1,t:flowStep}/{substep_a,t:flowStep}/{substep_b,t:flowStep}', @@ -418,6 +441,8 @@ describe('GoogleCloudMetrics', () => { await runFlow(flow); }); + await getExportedSpans(); + const reqPoints = await getCounterDataPoints('genkit/flow/path/requests'); const reqStatuses = reqPoints.map((p) => [ p.attributes.path, @@ -455,6 +480,8 @@ describe('GoogleCloudMetrics', () => { await runFlow(flow); }); + await getExportedSpans(); + const reqPoints = await getCounterDataPoints('genkit/flow/path/requests'); const reqStatuses = reqPoints.map((p) => [ p.attributes.path, @@ -496,6 +523,8 @@ describe('GoogleCloudMetrics', () => { await runFlow(flow); }); + await getExportedSpans(); + const reqPoints = await getCounterDataPoints('genkit/flow/path/requests'); const reqStatuses = reqPoints.map((p) => [ p.attributes.path, @@ -539,6 +568,8 @@ describe('GoogleCloudMetrics', () => { await runFlow(flow); }); + await getExportedSpans(); + const reqPoints = await getCounterDataPoints('genkit/flow/path/requests'); const reqStatuses = reqPoints.map((p) => [ p.attributes.path, @@ -588,7 +619,7 @@ describe('GoogleCloudMetrics', () => { /** Polls the in memory metric exporter until the genkit scope is found. */ async function getGenkitMetrics( - name: string = 'genkit', + name: string, maxAttempts: number = 100 ): promise { var attempts = 0; @@ -604,11 +635,27 @@ describe('GoogleCloudMetrics', () => { assert.fail(`Waiting for metric ${name} but it has not been written.`); } + /** Polls the in memory metric exporter until the genkit scope is found. */ + async function getExportedSpans( + maxAttempts: number = 200 + ): promise { + __forceFlushSpansForTesting(); + var attempts = 0; + while (attempts++ < maxAttempts) { + await new Promise((resolve) => setTimeout(resolve, 50)); + const found = __getSpanExporterForTesting().getFinishedSpans(); + if (found.length > 0) { + return found; + } + } + assert.fail(`Timed out while waiting for spans to be exported.`); + } + /** Finds all datapoints for a counter metric with the given name in the in memory exporter */ async function getCounterDataPoints( metricName: string ): Promise>> { - const genkitMetrics = await getGenkitMetrics(); + const genkitMetrics = await getGenkitMetrics('genkit'); const counterMetric: SumMetricData = genkitMetrics.metrics.find( (e) => e.descriptor.name === metricName && e.descriptor.type === 'COUNTER' ); @@ -634,7 +681,7 @@ describe('GoogleCloudMetrics', () => { async function getHistogramDataPoints( metricName: string ): Promise>> { - const genkitMetrics = await getGenkitMetrics(); + const genkitMetrics = await getGenkitMetrics('genkit'); const histogramMetric: HistogramMetricData = genkitMetrics.metrics.find( (e) => e.descriptor.name === metricName && e.descriptor.type === 'HISTOGRAM' diff --git a/js/plugins/google-cloud/tests/traces_test.ts b/js/plugins/google-cloud/tests/traces_test.ts index bcdefa9dce..e2687b526e 100644 --- a/js/plugins/google-cloud/tests/traces_test.ts +++ b/js/plugins/google-cloud/tests/traces_test.ts @@ -134,8 +134,7 @@ describe('GoogleCloudTracing', () => { /** Polls the in memory metric exporter until the genkit scope is found. */ async function getExportedSpans( - name: string = 'genkit', - maxAttempts: number = 100 + maxAttempts: number = 200 ): promise { __forceFlushSpansForTesting(); var attempts = 0; @@ -146,7 +145,7 @@ describe('GoogleCloudTracing', () => { return found; } } - assert.fail(`Waiting for metric ${name} but it has not been written.`); + assert.fail(`Timed out while waiting for spans to be exported.`); } });