Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions agents/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,11 @@
"@opentelemetry/sdk-trace-base": "^1.28.0",
"@opentelemetry/sdk-trace-node": "^1.28.0",
"@opentelemetry/semantic-conventions": "^1.28.0",
"@types/form-data": "^2.5.2",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a stub types definition. form-data provides its own type definitions, so you do not need this installed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"@types/pidusage": "^2.0.5",
"commander": "^12.0.0",
"fluent-ffmpeg": "^2.1.3",
"form-data": "^4.0.5",
"heap-js": "^2.6.0",
"json-schema": "^0.4.0",
"livekit-server-sdk": "^2.14.1",
Expand Down
39 changes: 30 additions & 9 deletions agents/src/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { AsyncLocalStorage } from 'node:async_hooks';
import type { Logger } from 'pino';
import type { InferenceExecutor } from './ipc/inference_executor.js';
import { log } from './log.js';
import { setupCloudTracer } from './telemetry/index.js';
import { setupCloudTracer, uploadSessionReport } from './telemetry/index.js';
import { isCloud } from './utils.js';
import type { AgentSession } from './voice/agent_session.js';
import { type SessionReport, createSessionReport } from './voice/report.js';
Expand Down Expand Up @@ -252,7 +252,6 @@ export class JobContext {
}

// TODO(brian): implement and check recorder io
// TODO(brian): PR5 - Ensure chat history serialization includes all required fields (use sessionReportToJSON helper)

return createSessionReport({
jobId: this.job.id,
Expand All @@ -275,14 +274,36 @@ export class JobContext {

// TODO(brian): Implement CLI/console

// TODO(brian): PR5 - Call uploadSessionReport() if report.enableUserDataTraining is true
// TODO(brian): PR5 - Upload includes: multipart form with header (protobuf), chat_history (JSON), and audio recording (if available)
// Upload session report to LiveKit Cloud if enabled
const url = new URL(this.#info.url);

this.#logger.debug('Session ended, report generated', {
jobId: report.jobId,
roomId: report.roomId,
eventsCount: report.events.length,
});
if (report.enableRecording && isCloud(url)) {
try {
await uploadSessionReport({
agentName: this.job.agentName,
cloudHostname: url.hostname,
report,
});
this.#logger.info(
{
jobId: report.jobId,
roomId: report.roomId,
},
'Session report uploaded to LiveKit Cloud',
);
} catch (error) {
this.#logger.error({ error }, 'Failed to upload session report');
}
}

this.#logger.debug(
{
jobId: report.jobId,
roomId: report.roomId,
eventsCount: report.events.length,
},
'Session ended, report generated',
);
}

/**
Expand Down
8 changes: 7 additions & 1 deletion agents/src/telemetry/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,11 @@
export { ExtraDetailsProcessor, MetadataLogProcessor } from './logging.js';
export { enablePinoOTELInstrumentation } from './pino_bridge.js';
export * as traceTypes from './trace_types.js';
export { setTracerProvider, setupCloudTracer, tracer, type StartSpanOptions } from './traces.js';
export {
setTracerProvider,
setupCloudTracer,
tracer,
uploadSessionReport,
type StartSpanOptions,
} from './traces.js';
export { recordException, recordRealtimeMetrics } from './utils.js';
281 changes: 280 additions & 1 deletion agents/src/telemetry/traces.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// SPDX-FileCopyrightText: 2025 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import { MetricsRecordingHeader } from '@livekit/protocol';
import {
type Attributes,
type Context,
Expand All @@ -11,7 +12,7 @@ import {
context as otelContext,
trace,
} from '@opentelemetry/api';
import { logs } from '@opentelemetry/api-logs';
import { SeverityNumber, logs } from '@opentelemetry/api-logs';
import { OTLPLogExporter } from '@opentelemetry/exporter-logs-otlp-http';
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-http';
import { CompressionAlgorithm } from '@opentelemetry/otlp-exporter-base';
Expand All @@ -21,6 +22,7 @@ import type { ReadableSpan, SpanProcessor } from '@opentelemetry/sdk-trace-base'
import { BatchSpanProcessor, NodeTracerProvider } from '@opentelemetry/sdk-trace-node';
import { ATTR_SERVICE_NAME } from '@opentelemetry/semantic-conventions';
import { AccessToken } from 'livekit-server-sdk';
import type { SessionReport } from '../voice/report.js';
import { ExtraDetailsProcessor, MetadataLogProcessor } from './logging.js';
import { enablePinoOTELInstrumentation } from './pino_bridge.js';

Expand Down Expand Up @@ -274,3 +276,280 @@ export async function setupCloudTracer(options: {
throw error;
}
}

/**
* Convert ChatItem to proto-compatible dictionary format.
* Matches Python's _to_proto_chat_item implementation.
*
* TODO: Use actual agent_session proto types once @livekit/protocol v1.43.1+ is published
*
* @internal
*/
function chatItemToProto(item: any): Record<string, any> {
const itemDict: Record<string, any> = {};

if (item.type === 'message') {
const roleMap: Record<string, string> = {
developer: 'DEVELOPER',
system: 'SYSTEM',
user: 'USER',
assistant: 'ASSISTANT',
};

const msg: Record<string, any> = {
id: item.id,
role: roleMap[item.role] || item.role.toUpperCase(),
content: item.content.map((c: string) => ({ text: c })),
interrupted: item.interrupted,
extra: item.extra || {},
createdAt: toRFC3339(item.createdAt),
};

if (item.transcriptConfidence !== undefined && item.transcriptConfidence !== null) {
msg.transcriptConfidence = item.transcriptConfidence;
}

// Add metrics if available
const metrics = item.metrics || {};
if (Object.keys(metrics).length > 0) {
msg.metrics = {};
if (metrics.started_speaking_at) {
msg.metrics.startedSpeakingAt = toRFC3339(metrics.started_speaking_at);
}
if (metrics.stopped_speaking_at) {
msg.metrics.stoppedSpeakingAt = toRFC3339(metrics.stopped_speaking_at);
}
if (metrics.transcription_delay !== undefined) {
msg.metrics.transcriptionDelay = metrics.transcription_delay;
}
if (metrics.end_of_turn_delay !== undefined) {
msg.metrics.endOfTurnDelay = metrics.end_of_turn_delay;
}
if (metrics.on_user_turn_completed_delay !== undefined) {
msg.metrics.onUserTurnCompletedDelay = metrics.on_user_turn_completed_delay;
}
if (metrics.llm_node_ttft !== undefined) {
msg.metrics.llmNodeTtft = metrics.llm_node_ttft;
}
if (metrics.tts_node_ttfb !== undefined) {
msg.metrics.ttsNodeTtfb = metrics.tts_node_ttfb;
}
if (metrics.e2e_latency !== undefined) {
msg.metrics.e2eLatency = metrics.e2e_latency;
}
}

itemDict.message = msg;
} else if (item.type === 'function_call') {
itemDict.functionCall = {
id: item.id,
callId: item.callId,
arguments: item.arguments,
name: item.name,
createdAt: toRFC3339(item.createdAt),
};
} else if (item.type === 'function_call_output') {
itemDict.functionCallOutput = {
id: item.id,
name: item.name,
callId: item.callId,
output: item.output,
isError: item.isError,
createdAt: toRFC3339(item.createdAt),
};
} else if (item.type === 'agent_handoff') {
itemDict.agentHandoff = {
id: item.id,
oldAgentId: item.oldAgentId,
newAgentId: item.newAgentId,
createdAt: toRFC3339(item.createdAt),
};
}

// Patch arguments & output to make them indexable (parse JSON strings)
try {
if (item.type === 'function_call' && typeof itemDict.functionCall?.arguments === 'string') {
itemDict.functionCall.arguments = JSON.parse(itemDict.functionCall.arguments);
} else if (
item.type === 'function_call_output' &&
typeof itemDict.functionCallOutput?.output === 'string'
) {
itemDict.functionCallOutput.output = JSON.parse(itemDict.functionCallOutput.output);
}
} catch {
// ignore parsing errors
}

return itemDict;
}

/**
* Convert timestamp to RFC3339 format matching Python's _to_rfc3339.
* @internal
*/
function toRFC3339(value: number | Date): string {
const dt = value instanceof Date ? value : new Date(value * 1000);
// Truncate to milliseconds
const ms = Math.floor(dt.getMilliseconds() / 1000) * 1000;
const truncated = new Date(dt);
truncated.setMilliseconds(ms);
return truncated.toISOString();
}

/**
* Upload session report to LiveKit Cloud observability.
* Matches Python's _upload_session_report implementation.
*
* @param options - Configuration with agentName, cloudHostname, and report
* @internal
*/
export async function uploadSessionReport(options: {
agentName: string;
cloudHostname: string;
report: SessionReport;
}): Promise<void> {
const { agentName, cloudHostname, report } = options;

// Get OTEL logger for chat_history
const chatLogger = logs.getLoggerProvider().getLogger('chat_history');

// Helper to emit logs
const emitLog = (
body: string,
timestamp: number,
attributes: Record<string, any>,
severityNumber = 0, // UNSPECIFIED
severityText = 'unspecified',
) => {
// Add room/job metadata to attributes
const fullAttributes = {
...attributes,
room_id: report.roomId,
job_id: report.jobId,
room: report.room,
};

chatLogger.emit({
body,
timestamp: timestamp * 1000000, // Convert to nanoseconds
attributes: fullAttributes,
severityNumber,
severityText,
context: otelContext.active(),
});
};

// Log session report
emitLog('session report', Math.floor((report.timestamp || Date.now()) * 1000), {
'session.options': report.options,
'session.report_timestamp': report.timestamp,
agent_name: agentName,
});

// Log each chat item
for (const item of report.chatHistory.items) {
const itemLog = chatItemToProto(item);
let severityNumber = SeverityNumber.UNSPECIFIED;
let severityText = 'unspecified';

if (item.type === 'function_call_output' && item.isError) {
severityNumber = SeverityNumber.ERROR;
severityText = 'error';
}

emitLog(
'chat item',
Math.floor(item.createdAt * 1000),
{ 'chat.item': itemLog },
severityNumber,
severityText,
);
}

// Create JWT token for upload
const apiKey = process.env.LIVEKIT_API_KEY;
const apiSecret = process.env.LIVEKIT_API_SECRET;

if (!apiKey || !apiSecret) {
throw new Error('LIVEKIT_API_KEY and LIVEKIT_API_SECRET must be set for session upload');
}

const token = new AccessToken(apiKey, apiSecret, {
identity: 'livekit-agents-telemetry',
ttl: '6h',
});
token.addObservabilityGrant({ write: true });
const jwt = await token.toJwt();

// Create multipart form data with explicit header control (like Python's aiohttp.MultipartWriter)
const FormData = (await import('form-data')).default;
const formData = new FormData();

// Add header (protobuf MetricsRecordingHeader)
const headerMsg = new MetricsRecordingHeader({
roomId: report.roomId,
duration: BigInt(0), // TODO: Calculate actual duration from report
startTime: {
seconds: BigInt(Math.floor(report.timestamp / 1000)),
nanos: Math.floor((report.timestamp % 1000) * 1e6),
},
});

const headerBytes = Buffer.from(headerMsg.toBinary());
formData.append('header', headerBytes, {
filename: 'header.binpb',
contentType: 'application/protobuf',
knownLength: headerBytes.length,
header: {
'Content-Type': 'application/protobuf',
'Content-Length': headerBytes.length.toString(),
},
});

// Add chat_history JSON
const chatHistoryJson = JSON.stringify(report.chatHistory.toJSON({ excludeTimestamp: false }));
const chatHistoryBuffer = Buffer.from(chatHistoryJson, 'utf-8');
formData.append('chat_history', chatHistoryBuffer, {
filename: 'chat_history.json',
contentType: 'application/json',
knownLength: chatHistoryBuffer.length,
header: {
'Content-Type': 'application/json',
'Content-Length': chatHistoryBuffer.length.toString(),
},
});

// TODO(brian): Add audio recording file when recorder IO is implemented

// Upload to LiveKit Cloud using form-data's submit method
// This properly streams the multipart form with all headers including Content-Length
return new Promise<void>((resolve, reject) => {
formData.submit(
{
protocol: 'https:',
host: cloudHostname,
path: '/observability/recordings/v0',
method: 'POST',
headers: {
Authorization: `Bearer ${jwt}`,
},
},
(err, res) => {
if (err) {
reject(new Error(`Failed to upload session report: ${err.message}`));
return;
}

if (res.statusCode && res.statusCode >= 400) {
reject(
new Error(`Failed to upload session report: ${res.statusCode} ${res.statusMessage}`),
);
return;
}

res.resume(); // Drain the response
res.on('end', () => resolve());
},
);
});
}
Loading