From 1eb91b87b215abb8b956c72137d6f7da2cfe94e6 Mon Sep 17 00:00:00 2001 From: Teja Date: Mon, 2 Feb 2026 00:33:47 -0500 Subject: [PATCH 1/2] feat: add OpenTelemetry support for telemetry metrics and tracing - Introduced optional telemetry configuration in `pyproject.toml` for OpenTelemetry dependencies. - Added `TelemetryOptions` class to manage telemetry settings. - Integrated telemetry into the `ClaudeSDKClient`, `Query`, and `SubprocessCLITransport` classes for tracing and metrics. - Updated README with instructions for enabling telemetry and examples of span names and metrics emitted. - Added tests for telemetry utilities to ensure proper functionality. This enhancement allows users to monitor and trace SDK operations, improving observability and debugging capabilities. --- README.md | 44 + pyproject.toml | 6 +- src/claude_agent_sdk/__init__.py | 1 + src/claude_agent_sdk/_internal/client.py | 155 ++-- src/claude_agent_sdk/_internal/query.py | 792 +++++++++++++----- src/claude_agent_sdk/_internal/telemetry.py | 167 ++++ .../_internal/transport/subprocess_cli.py | 447 ++++++---- src/claude_agent_sdk/client.py | 219 +++-- src/claude_agent_sdk/types.py | 31 + tests/test_telemetry.py | 298 +++++++ 10 files changed, 1601 insertions(+), 559 deletions(-) create mode 100644 src/claude_agent_sdk/_internal/telemetry.py create mode 100644 tests/test_telemetry.py diff --git a/README.md b/README.md index 790c9249..51efb490 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,50 @@ Python SDK for Claude Agent. See the [Claude Agent SDK documentation](https://pl pip install claude-agent-sdk ``` +### Telemetry (OpenTelemetry) + +Install the optional OpenTelemetry dependencies to enable tracing/metrics: + +```bash +pip install "claude-agent-sdk[telemetry]" +``` + +Then enable telemetry in options: + +```python +from claude_agent_sdk import ClaudeAgentOptions, TelemetryOptions + +options = ClaudeAgentOptions( + telemetry=TelemetryOptions(enabled=True), +) +``` + +### Telemetry span names + +Spans use the `claude_agent_sdk..` convention. Examples: + +- `claude_agent_sdk.client.connect`, `claude_agent_sdk.client.query`, `claude_agent_sdk.client.disconnect` +- `claude_agent_sdk.query.lifecycle`, `claude_agent_sdk.query.initialize`, `claude_agent_sdk.query.read_messages`, `claude_agent_sdk.query.stream_input`, `claude_agent_sdk.query.close` +- `claude_agent_sdk.transport.connect`, `claude_agent_sdk.transport.read_messages`, `claude_agent_sdk.transport.write`, `claude_agent_sdk.transport.close` +- `claude_agent_sdk.permission.can_use_tool`, `claude_agent_sdk.hooks.callback` +- `claude_agent_sdk.mcp.request`, `claude_agent_sdk.mcp.tool_call` +- `claude_agent_sdk.cli.tool_call` + +### Telemetry metrics + +When enabled, the SDK emits the following metrics (all prefixed with `claude_agent_sdk.`): + +- `messages`, `results`, `errors` +- `invocations` +- `tokens.prompt`, `tokens.completion`, `tokens.total` +- `model.latency_ms`, `model.errors` +- `result.duration_ms`, `result.cost_usd` +- `cost.total_usd` +- `response.size_bytes` +- `throttled` (rate-limit events) + +Most metrics include a `session.id` attribute to allow per-session grouping. + **Prerequisites:** - Python 3.10+ diff --git a/pyproject.toml b/pyproject.toml index 14d299e1..e94999ba 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,6 +39,10 @@ dev = [ "mypy>=1.0.0", "ruff>=0.1.0", ] +telemetry = [ + "opentelemetry-api>=1.0.0", + "opentelemetry-sdk>=1.0.0", +] [project.urls] Homepage = "https://github.com/anthropics/claude-agent-sdk-python" @@ -106,4 +110,4 @@ ignore = [ ] [tool.ruff.lint.isort] -known-first-party = ["claude_agent_sdk"] \ No newline at end of file +known-first-party = ["claude_agent_sdk"] diff --git a/src/claude_agent_sdk/__init__.py b/src/claude_agent_sdk/__init__.py index 611ad352..56a3a55b 100644 --- a/src/claude_agent_sdk/__init__.py +++ b/src/claude_agent_sdk/__init__.py @@ -50,6 +50,7 @@ StopHookInput, SubagentStopHookInput, SystemMessage, + TelemetryOptions, TextBlock, ThinkingBlock, ToolPermissionContext, diff --git a/src/claude_agent_sdk/_internal/client.py b/src/claude_agent_sdk/_internal/client.py index 52466272..3fbf1369 100644 --- a/src/claude_agent_sdk/_internal/client.py +++ b/src/claude_agent_sdk/_internal/client.py @@ -12,6 +12,7 @@ ) from .message_parser import parse_message from .query import Query +from .telemetry import get_otel_tracer, span_kind_client, traced_span_async from .transport import Transport from .transport.subprocess_cli import SubprocessCLITransport @@ -48,77 +49,93 @@ async def process_query( ) -> AsyncIterator[Message]: """Process a query through transport and Query.""" - # Validate and configure permission settings (matching TypeScript SDK logic) - configured_options = options - if options.can_use_tool: - # canUseTool callback requires streaming mode (AsyncIterable prompt) - if isinstance(prompt, str): - raise ValueError( - "can_use_tool callback requires streaming mode. " - "Please provide prompt as an AsyncIterable instead of a string." + tracer = None + if options.telemetry and options.telemetry.enabled: + tracer = options.telemetry.tracer or get_otel_tracer("claude_agent_sdk.query") + async with traced_span_async( + tracer, + "claude_agent_sdk.query.lifecycle", + kind=span_kind_client(), + attributes={ + "query.streaming": not isinstance(prompt, str), + "query.has_hooks": bool(options.hooks), + "query.has_mcp_servers": bool(options.mcp_servers), + }, + ) as span: + # Validate and configure permission settings (matching TypeScript SDK logic) + configured_options = options + if options.can_use_tool: + # canUseTool callback requires streaming mode (AsyncIterable prompt) + if isinstance(prompt, str): + raise ValueError( + "can_use_tool callback requires streaming mode. " + "Please provide prompt as an AsyncIterable instead of a string." + ) + + # canUseTool and permission_prompt_tool_name are mutually exclusive + if options.permission_prompt_tool_name: + raise ValueError( + "can_use_tool callback cannot be used with permission_prompt_tool_name. " + "Please use one or the other." + ) + + # Automatically set permission_prompt_tool_name to "stdio" for control protocol + configured_options = replace(options, permission_prompt_tool_name="stdio") + + # Use provided transport or create subprocess transport + if transport is not None: + chosen_transport = transport + else: + chosen_transport = SubprocessCLITransport( + prompt=prompt, + options=configured_options, ) + if span: + span.set_attribute("transport.type", type(chosen_transport).__name__) + + # Connect transport (instrumentation handled by transport implementation) + await chosen_transport.connect() + + # Extract SDK MCP servers from configured options + sdk_mcp_servers = {} + if configured_options.mcp_servers and isinstance( + configured_options.mcp_servers, dict + ): + for name, config in configured_options.mcp_servers.items(): + if isinstance(config, dict) and config.get("type") == "sdk": + sdk_mcp_servers[name] = config["instance"] # type: ignore[typeddict-item] + + # Create Query to handle control protocol + is_streaming = not isinstance(prompt, str) + query = Query( + transport=chosen_transport, + is_streaming_mode=is_streaming, + can_use_tool=configured_options.can_use_tool, + hooks=self._convert_hooks_to_internal_format(configured_options.hooks) + if configured_options.hooks + else None, + sdk_mcp_servers=sdk_mcp_servers, + telemetry=configured_options.telemetry, + ) - # canUseTool and permission_prompt_tool_name are mutually exclusive - if options.permission_prompt_tool_name: - raise ValueError( - "can_use_tool callback cannot be used with permission_prompt_tool_name. " - "Please use one or the other." - ) + try: + # Start reading messages + await query.start() - # Automatically set permission_prompt_tool_name to "stdio" for control protocol - configured_options = replace(options, permission_prompt_tool_name="stdio") + # Initialize if streaming + if is_streaming: + await query.initialize() - # Use provided transport or create subprocess transport - if transport is not None: - chosen_transport = transport - else: - chosen_transport = SubprocessCLITransport( - prompt=prompt, - options=configured_options, - ) + # Stream input if it's an AsyncIterable + if isinstance(prompt, AsyncIterable) and query._tg: + # Start streaming in background + # Create a task that will run in the background + query._tg.start_soon(query.stream_input, prompt) + # For string prompts, the prompt is already passed via CLI args + + # Yield parsed messages + async for data in query.receive_messages(): + yield parse_message(data) - # Connect transport - await chosen_transport.connect() - - # Extract SDK MCP servers from configured options - sdk_mcp_servers = {} - if configured_options.mcp_servers and isinstance( - configured_options.mcp_servers, dict - ): - for name, config in configured_options.mcp_servers.items(): - if isinstance(config, dict) and config.get("type") == "sdk": - sdk_mcp_servers[name] = config["instance"] # type: ignore[typeddict-item] - - # Create Query to handle control protocol - is_streaming = not isinstance(prompt, str) - query = Query( - transport=chosen_transport, - is_streaming_mode=is_streaming, - can_use_tool=configured_options.can_use_tool, - hooks=self._convert_hooks_to_internal_format(configured_options.hooks) - if configured_options.hooks - else None, - sdk_mcp_servers=sdk_mcp_servers, - ) - - try: - # Start reading messages - await query.start() - - # Initialize if streaming - if is_streaming: - await query.initialize() - - # Stream input if it's an AsyncIterable - if isinstance(prompt, AsyncIterable) and query._tg: - # Start streaming in background - # Create a task that will run in the background - query._tg.start_soon(query.stream_input, prompt) - # For string prompts, the prompt is already passed via CLI args - - # Yield parsed messages - async for data in query.receive_messages(): - yield parse_message(data) - - finally: - await query.close() + finally: + await query.close() diff --git a/src/claude_agent_sdk/_internal/query.py b/src/claude_agent_sdk/_internal/query.py index 6bf5a73c..4ba46797 100644 --- a/src/claude_agent_sdk/_internal/query.py +++ b/src/claude_agent_sdk/_internal/query.py @@ -5,6 +5,7 @@ import os from collections.abc import AsyncIterable, AsyncIterator, Awaitable, Callable from contextlib import suppress +from dataclasses import dataclass from typing import TYPE_CHECKING, Any import anyio @@ -21,12 +22,22 @@ SDKControlRequest, SDKControlResponse, SDKHookCallbackRequest, + TelemetryOptions, ToolPermissionContext, ) +from .telemetry import ( + get_otel_meter, + get_otel_tracer, + record_span_exception, + span_kind_client, + span_kind_internal, + traced_span_async, +) from .transport import Transport if TYPE_CHECKING: from mcp.server import Server as McpServer + from opentelemetry.metrics import Counter, Histogram, Meter logger = logging.getLogger(__name__) @@ -50,6 +61,53 @@ def _convert_hook_output_for_cli(hook_output: dict[str, Any]) -> dict[str, Any]: return converted +@dataclass +class QueryMetrics: + """Container for Query telemetry metrics.""" + + message_counter: "Counter | None" = None + result_counter: "Counter | None" = None + error_counter: "Counter | None" = None + result_duration_ms: "Histogram | None" = None + result_cost_usd: "Histogram | None" = None + cost_total_usd: "Counter | None" = None + token_prompt: "Histogram | None" = None + token_completion: "Histogram | None" = None + token_total: "Histogram | None" = None + model_latency_ms: "Histogram | None" = None + model_error_counter: "Counter | None" = None + rate_limit_counter: "Counter | None" = None + invocation_counter: "Counter | None" = None + response_size_bytes: "Histogram | None" = None + + @classmethod + def from_meter(cls, meter: "Meter | None") -> "QueryMetrics": + if not meter: + return cls() + return cls( + message_counter=meter.create_counter("claude_agent_sdk.messages"), + result_counter=meter.create_counter("claude_agent_sdk.results"), + error_counter=meter.create_counter("claude_agent_sdk.errors"), + result_duration_ms=meter.create_histogram( + "claude_agent_sdk.result.duration_ms" + ), + result_cost_usd=meter.create_histogram("claude_agent_sdk.result.cost_usd"), + cost_total_usd=meter.create_counter("claude_agent_sdk.cost.total_usd"), + token_prompt=meter.create_histogram("claude_agent_sdk.tokens.prompt"), + token_completion=meter.create_histogram("claude_agent_sdk.tokens.completion"), + token_total=meter.create_histogram("claude_agent_sdk.tokens.total"), + model_latency_ms=meter.create_histogram( + "claude_agent_sdk.model.latency_ms" + ), + model_error_counter=meter.create_counter("claude_agent_sdk.model.errors"), + rate_limit_counter=meter.create_counter("claude_agent_sdk.throttled"), + invocation_counter=meter.create_counter("claude_agent_sdk.invocations"), + response_size_bytes=meter.create_histogram( + "claude_agent_sdk.response.size_bytes" + ), + ) + + class Query: """Handles bidirectional control protocol on top of Transport. @@ -73,6 +131,7 @@ def __init__( hooks: dict[str, list[dict[str, Any]]] | None = None, sdk_mcp_servers: dict[str, "McpServer"] | None = None, initialize_timeout: float = 60.0, + telemetry: TelemetryOptions | None = None, ): """Initialize Query with transport and callbacks. @@ -90,6 +149,16 @@ def __init__( self.can_use_tool = can_use_tool self.hooks = hooks or {} self.sdk_mcp_servers = sdk_mcp_servers or {} + self._telemetry = telemetry + self._tracer = None + self._meter = None + self._metrics = QueryMetrics() + self._tool_spans: dict[str, Any] = {} + + if telemetry and telemetry.enabled: + self._tracer = telemetry.tracer or get_otel_tracer("claude_agent_sdk") + self._meter = telemetry.meter or get_otel_meter("claude_agent_sdk") + self._metrics = QueryMetrics.from_meter(self._meter) # Control protocol state self.pending_control_responses: dict[str, anyio.Event] = {} @@ -150,9 +219,18 @@ async def initialize(self) -> dict[str, Any] | None: } # Use longer timeout for initialize since MCP servers may take time to start - response = await self._send_control_request( - request, timeout=self._initialize_timeout - ) + async with traced_span_async( + self._tracer, + "claude_agent_sdk.query.initialize", + kind=span_kind_client(), + attributes={ + "has_hooks": bool(self.hooks), + "has_mcp_servers": bool(self.sdk_mcp_servers), + }, + ): + response = await self._send_control_request( + request, timeout=self._initialize_timeout + ) self._initialized = True self._initialization_result = response # Store for later access return response @@ -166,64 +244,70 @@ async def start(self) -> None: async def _read_messages(self) -> None: """Read messages from transport and route them.""" - try: - async for message in self.transport.read_messages(): - if self._closed: - break - - msg_type = message.get("type") - - # Route control messages - if msg_type == "control_response": - response = message.get("response", {}) - request_id = response.get("request_id") - if request_id in self.pending_control_responses: - event = self.pending_control_responses[request_id] - if response.get("subtype") == "error": - self.pending_control_results[request_id] = Exception( - response.get("error", "Unknown error") - ) - else: - self.pending_control_results[request_id] = response + async with traced_span_async( + self._tracer, + "claude_agent_sdk.query.read_messages", + kind=span_kind_internal(), + ) as span: + try: + async for message in self.transport.read_messages(): + if self._closed: + break + + msg_type = message.get("type") + + # Route control messages + if msg_type == "control_response": + response = message.get("response", {}) + request_id = response.get("request_id") + if request_id in self.pending_control_responses: + event = self.pending_control_responses[request_id] + if response.get("subtype") == "error": + self.pending_control_results[request_id] = Exception( + response.get("error", "Unknown error") + ) + else: + self.pending_control_results[request_id] = response + event.set() + continue + + elif msg_type == "control_request": + # Handle incoming control requests from CLI + # Cast message to SDKControlRequest for type safety + request: SDKControlRequest = message # type: ignore[assignment] + if self._tg: + self._tg.start_soon(self._handle_control_request, request) + continue + + elif msg_type == "control_cancel_request": + # Handle cancel requests + # TODO: Implement cancellation support + continue + + # Track results for proper stream closure + if msg_type == "result": + self._first_result_event.set() + + # Regular SDK messages go to the stream + await self._message_send.send(message) + + except anyio.get_cancelled_exc_class(): + # Task was cancelled - this is expected behavior + logger.debug("Read task cancelled") + raise # Re-raise to properly handle cancellation + except Exception as e: + record_span_exception(span, e) + logger.error(f"Fatal error in message reader: {e}") + # Signal all pending control requests so they fail fast instead of timing out + for request_id, event in list(self.pending_control_responses.items()): + if request_id not in self.pending_control_results: + self.pending_control_results[request_id] = e event.set() - continue - - elif msg_type == "control_request": - # Handle incoming control requests from CLI - # Cast message to SDKControlRequest for type safety - request: SDKControlRequest = message # type: ignore[assignment] - if self._tg: - self._tg.start_soon(self._handle_control_request, request) - continue - - elif msg_type == "control_cancel_request": - # Handle cancel requests - # TODO: Implement cancellation support - continue - - # Track results for proper stream closure - if msg_type == "result": - self._first_result_event.set() - - # Regular SDK messages go to the stream - await self._message_send.send(message) - - except anyio.get_cancelled_exc_class(): - # Task was cancelled - this is expected behavior - logger.debug("Read task cancelled") - raise # Re-raise to properly handle cancellation - except Exception as e: - logger.error(f"Fatal error in message reader: {e}") - # Signal all pending control requests so they fail fast instead of timing out - for request_id, event in list(self.pending_control_responses.items()): - if request_id not in self.pending_control_results: - self.pending_control_results[request_id] = e - event.set() - # Put error in stream so iterators can handle it - await self._message_send.send({"type": "error", "error": str(e)}) - finally: - # Always signal end of stream - await self._message_send.send({"type": "end"}) + # Put error in stream so iterators can handle it + await self._message_send.send({"type": "error", "error": str(e)}) + finally: + # Always signal end of stream + await self._message_send.send({"type": "end"}) async def _handle_control_request(self, request: SDKControlRequest) -> None: """Handle incoming control request from CLI.""" @@ -247,11 +331,17 @@ async def _handle_control_request(self, request: SDKControlRequest) -> None: or [], ) - response = await self.can_use_tool( - permission_request["tool_name"], - permission_request["input"], - context, - ) + async with traced_span_async( + self._tracer, + "claude_agent_sdk.permission.can_use_tool", + kind=span_kind_internal(), + attributes={"tool.name": permission_request["tool_name"]}, + ) as span: + response = await self.can_use_tool( + permission_request["tool_name"], + permission_request["input"], + context, + ) # Convert PermissionResult to expected dict format if isinstance(response, PermissionResultAllow): @@ -268,10 +358,27 @@ async def _handle_control_request(self, request: SDKControlRequest) -> None: permission.to_dict() for permission in response.updated_permissions ] + if span: + span.set_attribute("permission.behavior", "allow") + span.set_attribute( + "permission.updated_input", response.updated_input is not None + ) + span.set_attribute( + "permission.updated_permissions_count", + len(response.updated_permissions) + if response.updated_permissions is not None + else 0, + ) elif isinstance(response, PermissionResultDeny): response_data = {"behavior": "deny", "message": response.message} if response.interrupt: response_data["interrupt"] = response.interrupt + if span: + span.set_attribute("permission.behavior", "deny") + span.set_attribute( + "permission.interrupt", + bool(response.interrupt) if response.interrupt is not None else False, + ) else: raise TypeError( f"Tool permission callback must return PermissionResult (PermissionResultAllow or PermissionResultDeny), got {type(response)}" @@ -285,11 +392,24 @@ async def _handle_control_request(self, request: SDKControlRequest) -> None: if not callback: raise Exception(f"No hook callback found for ID: {callback_id}") - hook_output = await callback( - request_data.get("input"), - request_data.get("tool_use_id"), - {"signal": None}, # TODO: Add abort signal support - ) + hook_input = request_data.get("input") + hook_event_name = None + if isinstance(hook_input, dict): + hook_event_name = hook_input.get("hook_event_name") + async with traced_span_async( + self._tracer, + "claude_agent_sdk.hooks.callback", + kind=span_kind_internal(), + attributes={ + "hook.callback_id": callback_id, + "hook.event": hook_event_name, + }, + ): + hook_output = await callback( + hook_input, + request_data.get("tool_use_id"), + {"signal": None}, # TODO: Add abort signal support + ) # Convert Python-safe field names (async_, continue_) to CLI-expected names (async, continue) response_data = _convert_hook_output_for_cli(hook_output) @@ -363,25 +483,44 @@ async def _send_control_request( "request": request, } - await self.transport.write(json.dumps(control_request) + "\n") - - # Wait for response - try: - with anyio.fail_after(timeout): - await event.wait() - - result = self.pending_control_results.pop(request_id) - self.pending_control_responses.pop(request_id, None) - - if isinstance(result, Exception): - raise result + async with traced_span_async( + self._tracer, + "claude_agent_sdk.control.request", + kind=span_kind_client(), + attributes={ + "control.request_id": request_id, + "control.subtype": request.get("subtype"), + "control.timeout_s": timeout, + }, + ) as span: + if span and request.get("subtype") == "set_permission_mode": + span.set_attribute("permission.mode", request.get("mode")) + await self.transport.write(json.dumps(control_request) + "\n") + + # Wait for response + try: + with anyio.fail_after(timeout): + await event.wait() + + result = self.pending_control_results.pop(request_id) + self.pending_control_responses.pop(request_id, None) + + if isinstance(result, Exception): + raise result + + if span: + span.set_attribute( + "control.response_subtype", result.get("subtype") + ) - response_data = result.get("response", {}) - return response_data if isinstance(response_data, dict) else {} - except TimeoutError as e: - self.pending_control_responses.pop(request_id, None) - self.pending_control_results.pop(request_id, None) - raise Exception(f"Control request timeout: {request.get('subtype')}") from e + response_data = result.get("response", {}) + return response_data if isinstance(response_data, dict) else {} + except TimeoutError as e: + self.pending_control_responses.pop(request_id, None) + self.pending_control_results.pop(request_id, None) + raise Exception( + f"Control request timeout: {request.get('subtype')}" + ) from e async def _handle_sdk_mcp_request( self, server_name: str, message: dict[str, Any] @@ -399,123 +538,159 @@ async def _handle_sdk_mcp_request( Returns: The response message """ - if server_name not in self.sdk_mcp_servers: - return { - "jsonrpc": "2.0", - "id": message.get("id"), - "error": { - "code": -32601, - "message": f"Server '{server_name}' not found", - }, - } - - server = self.sdk_mcp_servers[server_name] - method = message.get("method") - params = message.get("params", {}) - - try: - # TODO: Python MCP SDK lacks the Transport abstraction that TypeScript has. - # TypeScript: server.connect(transport) allows custom transports - # Python: server.run(read_stream, write_stream) requires actual streams - # - # This forces us to manually route methods. When Python MCP adds Transport - # support, we can refactor to match the TypeScript approach. - if method == "initialize": - # Handle MCP initialization - hardcoded for tools only, no listChanged - return { - "jsonrpc": "2.0", - "id": message.get("id"), - "result": { - "protocolVersion": "2024-11-05", - "capabilities": { - "tools": {} # Tools capability without listChanged - }, - "serverInfo": { - "name": server.name, - "version": server.version or "1.0.0", - }, - }, - } - - elif method == "tools/list": - request = ListToolsRequest(method=method) - handler = server.request_handlers.get(ListToolsRequest) - if handler: - result = await handler(request) - # Convert MCP result to JSONRPC response - tools_data = [ - { - "name": tool.name, - "description": tool.description, - "inputSchema": ( - tool.inputSchema.model_dump() - if hasattr(tool.inputSchema, "model_dump") - else tool.inputSchema - ) - if tool.inputSchema - else {}, - } - for tool in result.root.tools # type: ignore[union-attr] - ] + async with traced_span_async( + self._tracer, + "claude_agent_sdk.mcp.request", + kind=span_kind_internal(), + attributes={ + "mcp.server": server_name, + "mcp.method": message.get("method"), + }, + ) as span: + try: + if server_name not in self.sdk_mcp_servers: return { "jsonrpc": "2.0", "id": message.get("id"), - "result": {"tools": tools_data}, + "error": { + "code": -32601, + "message": f"Server '{server_name}' not found", + }, } - elif method == "tools/call": - call_request = CallToolRequest( - method=method, - params=CallToolRequestParams( - name=params.get("name"), arguments=params.get("arguments", {}) - ), - ) - handler = server.request_handlers.get(CallToolRequest) - if handler: - result = await handler(call_request) - # Convert MCP result to JSONRPC response - content = [] - for item in result.root.content: # type: ignore[union-attr] - if hasattr(item, "text"): - content.append({"type": "text", "text": item.text}) - elif hasattr(item, "data") and hasattr(item, "mimeType"): - content.append( - { - "type": "image", - "data": item.data, - "mimeType": item.mimeType, - } - ) - - response_data = {"content": content} - if hasattr(result.root, "is_error") and result.root.is_error: - response_data["is_error"] = True # type: ignore[assignment] - + server = self.sdk_mcp_servers[server_name] + method = message.get("method") + params = message.get("params", {}) + + # TODO: Python MCP SDK lacks the Transport abstraction that TypeScript has. + # TypeScript: server.connect(transport) allows custom transports + # Python: server.run(read_stream, write_stream) requires actual streams + # + # This forces us to manually route methods. When Python MCP adds Transport + # support, we can refactor to match the TypeScript approach. + if method == "initialize": + # Handle MCP initialization - hardcoded for tools only, no listChanged return { "jsonrpc": "2.0", "id": message.get("id"), - "result": response_data, + "result": { + "protocolVersion": "2024-11-05", + "capabilities": { + "tools": {} # Tools capability without listChanged + }, + "serverInfo": { + "name": server.name, + "version": server.version or "1.0.0", + }, + }, } - elif method == "notifications/initialized": - # Handle initialized notification - just acknowledge it - return {"jsonrpc": "2.0", "result": {}} + elif method == "tools/list": + request = ListToolsRequest(method=method) + handler = server.request_handlers.get(ListToolsRequest) + if handler: + result = await handler(request) + # Convert MCP result to JSONRPC response + tools_data = [ + { + "name": tool.name, + "description": tool.description, + "inputSchema": ( + tool.inputSchema.model_dump() + if hasattr(tool.inputSchema, "model_dump") + else tool.inputSchema + ) + if tool.inputSchema + else {}, + } + for tool in result.root.tools # type: ignore[union-attr] + ] + if span: + span.set_attribute("mcp.result.items", len(tools_data)) + return { + "jsonrpc": "2.0", + "id": message.get("id"), + "result": {"tools": tools_data}, + } - # Add more methods here as MCP SDK adds them (resources, prompts, etc.) - # This is the limitation Ashwin pointed out - we have to manually update + elif method == "tools/call": + tool_name = params.get("name") + async with traced_span_async( + self._tracer, + "claude_agent_sdk.mcp.tool_call", + kind=span_kind_internal(), + attributes={ + "mcp.server": server_name, + "mcp.tool.name": tool_name, + "mcp.request_id": message.get("id"), + }, + ): + call_request = CallToolRequest( + method=method, + params=CallToolRequestParams( + name=tool_name, + arguments=params.get("arguments", {}), + ), + ) + handler = server.request_handlers.get(CallToolRequest) + if handler: + result = await handler(call_request) + # Convert MCP result to JSONRPC response + content = [] + for item in result.root.content: # type: ignore[union-attr] + if hasattr(item, "text"): + content.append({"type": "text", "text": item.text}) + elif hasattr(item, "data") and hasattr( + item, "mimeType" + ): + content.append( + { + "type": "image", + "data": item.data, + "mimeType": item.mimeType, + } + ) + + response_data = {"content": content} + is_error = ( + bool(getattr(result.root, "is_error", False)) # type: ignore[union-attr] + ) + if is_error: + response_data["is_error"] = True # type: ignore[assignment] + if span: + span.set_attribute("mcp.tool.name", tool_name) + span.set_attribute("mcp.tool.is_error", is_error) + span.set_attribute("mcp.result.items", len(content)) + + return { + "jsonrpc": "2.0", + "id": message.get("id"), + "result": response_data, + } + + elif method == "notifications/initialized": + # Handle initialized notification - just acknowledge it + return {"jsonrpc": "2.0", "result": {}} + + # Add more methods here as MCP SDK adds them (resources, prompts, etc.) + # This is the limitation Ashwin pointed out - we have to manually update - return { - "jsonrpc": "2.0", - "id": message.get("id"), - "error": {"code": -32601, "message": f"Method '{method}' not found"}, - } + return { + "jsonrpc": "2.0", + "id": message.get("id"), + "error": { + "code": -32601, + "message": f"Method '{method}' not found", + }, + } - except Exception as e: - return { - "jsonrpc": "2.0", - "id": message.get("id"), - "error": {"code": -32603, "message": str(e)}, - } + except Exception as e: + record_span_exception(span, e) + return { + "jsonrpc": "2.0", + "id": message.get("id"), + "error": {"code": -32603, "message": str(e)}, + } async def get_mcp_status(self) -> dict[str, Any]: """Get current MCP server connection status.""" @@ -565,30 +740,39 @@ async def stream_input(self, stream: AsyncIterable[dict[str, Any]]) -> None: before closing stdin to allow bidirectional control protocol communication. """ try: - async for message in stream: - if self._closed: - break - await self.transport.write(json.dumps(message) + "\n") - - # If we have SDK MCP servers or hooks that need bidirectional communication, - # wait for first result before closing the channel - has_hooks = bool(self.hooks) - if self.sdk_mcp_servers or has_hooks: - logger.debug( - f"Waiting for first result before closing stdin " - f"(sdk_mcp_servers={len(self.sdk_mcp_servers)}, has_hooks={has_hooks})" - ) - try: - with anyio.move_on_after(self._stream_close_timeout): - await self._first_result_event.wait() - logger.debug("Received first result, closing input stream") - except Exception: + async with traced_span_async( + self._tracer, + "claude_agent_sdk.query.stream_input", + kind=span_kind_client(), + attributes={ + "has_hooks": bool(self.hooks), + "has_mcp_servers": bool(self.sdk_mcp_servers), + }, + ): + async for message in stream: + if self._closed: + break + await self.transport.write(json.dumps(message) + "\n") + + # If we have SDK MCP servers or hooks that need bidirectional communication, + # wait for first result before closing the channel + has_hooks = bool(self.hooks) + if self.sdk_mcp_servers or has_hooks: logger.debug( - "Timed out waiting for first result, closing input stream" + f"Waiting for first result before closing stdin " + f"(sdk_mcp_servers={len(self.sdk_mcp_servers)}, has_hooks={has_hooks})" ) - - # After all messages sent (and result received if needed), end input - await self.transport.end_input() + try: + with anyio.move_on_after(self._stream_close_timeout): + await self._first_result_event.wait() + logger.debug("Received first result, closing input stream") + except Exception: + logger.debug( + "Timed out waiting for first result, closing input stream" + ) + + # After all messages sent (and result received if needed), end input + await self.transport.end_input() except Exception as e: logger.debug(f"Error streaming input: {e}") @@ -597,21 +781,120 @@ async def receive_messages(self) -> AsyncIterator[dict[str, Any]]: async for message in self._message_receive: # Check for special messages if message.get("type") == "end": + self._close_open_tool_spans("stream_end") break elif message.get("type") == "error": + self._close_open_tool_spans("stream_error") + if self._metrics.error_counter: + self._metrics.error_counter.add(1, {"error.source": "stream"}) raise Exception(message.get("error", "Unknown error")) + self._handle_tool_blocks(message) + + msg_type = message.get("type") or "unknown" + session_id = message.get("session_id") + message_attributes = {"message.type": msg_type} + if isinstance(session_id, str): + message_attributes["session.id"] = session_id + + if self._metrics.message_counter: + self._metrics.message_counter.add(1, message_attributes) + + if self._metrics.response_size_bytes: + try: + size_bytes = len(json.dumps(message, ensure_ascii=False).encode("utf-8")) + self._metrics.response_size_bytes.record( + size_bytes, message_attributes + ) + except Exception: + pass + + if self._metrics.rate_limit_counter and message.get("type") == "system": + subtype = message.get("subtype") + if isinstance(subtype, str) and subtype.lower() == "rate_limit": + rate_limit_attrs = {"event": "rate_limit"} + if isinstance(session_id, str): + rate_limit_attrs["session.id"] = session_id + self._metrics.rate_limit_counter.add(1, rate_limit_attrs) + + if message.get("type") == "result": + result_attributes = {"message.type": "result"} + if isinstance(session_id, str): + result_attributes["session.id"] = session_id + stop_reason = message.get("stop_reason") + if isinstance(stop_reason, str): + result_attributes["result.stop_reason"] = stop_reason + model = message.get("model") + if isinstance(model, str): + result_attributes["result.model"] = model + + if self._metrics.result_counter: + self._metrics.result_counter.add(1, result_attributes) + if self._metrics.invocation_counter: + self._metrics.invocation_counter.add(1, result_attributes) + if self._metrics.result_duration_ms: + duration_ms = message.get("duration_ms") + if isinstance(duration_ms, (int, float)): + self._metrics.result_duration_ms.record( + duration_ms, result_attributes + ) + if self._metrics.model_latency_ms: + duration_api_ms = message.get("duration_api_ms") + if isinstance(duration_api_ms, (int, float)): + self._metrics.model_latency_ms.record( + duration_api_ms, result_attributes + ) + if self._metrics.result_cost_usd: + total_cost_usd = message.get("total_cost_usd") + if isinstance(total_cost_usd, (int, float)): + self._metrics.result_cost_usd.record( + total_cost_usd, result_attributes + ) + if self._metrics.cost_total_usd: + total_cost_usd = message.get("total_cost_usd") + if isinstance(total_cost_usd, (int, float)): + self._metrics.cost_total_usd.add( + total_cost_usd, result_attributes + ) + usage = message.get("usage") + if isinstance(usage, dict): + prompt_tokens = usage.get("input_tokens", usage.get("prompt_tokens")) + completion_tokens = usage.get( + "output_tokens", usage.get("completion_tokens") + ) + total_tokens = usage.get("total_tokens") + if self._metrics.token_prompt and isinstance(prompt_tokens, (int, float)): + self._metrics.token_prompt.record(prompt_tokens, result_attributes) + if self._metrics.token_completion and isinstance( + completion_tokens, (int, float) + ): + self._metrics.token_completion.record( + completion_tokens, result_attributes + ) + if self._metrics.token_total and isinstance(total_tokens, (int, float)): + self._metrics.token_total.record(total_tokens, result_attributes) + if self._metrics.model_error_counter: + is_error = message.get("is_error") + if isinstance(is_error, bool) and is_error: + self._metrics.model_error_counter.add(1, result_attributes) + yield message async def close(self) -> None: """Close the query and transport.""" - self._closed = True - if self._tg: - self._tg.cancel_scope.cancel() - # Wait for task group to complete cancellation - with suppress(anyio.get_cancelled_exc_class()): - await self._tg.__aexit__(None, None, None) - await self.transport.close() + async with traced_span_async( + self._tracer, + "claude_agent_sdk.query.close", + kind=span_kind_internal(), + ): + self._closed = True + self._close_open_tool_spans("close") + if self._tg: + self._tg.cancel_scope.cancel() + # Wait for task group to complete cancellation + with suppress(anyio.get_cancelled_exc_class()): + await self._tg.__aexit__(None, None, None) + await self.transport.close() # Make Query an async iterator def __aiter__(self) -> AsyncIterator[dict[str, Any]]: @@ -623,3 +906,70 @@ async def __anext__(self) -> dict[str, Any]: async for message in self.receive_messages(): return message raise StopAsyncIteration + + def _handle_tool_blocks(self, message: dict[str, Any]) -> None: + """Start/end tool spans for CLI tool_use/tool_result blocks.""" + msg = message.get("message") + if not isinstance(msg, dict): + return + content = msg.get("content") + if not isinstance(content, list): + return + msg_type = message.get("type") + for block in content: + if not isinstance(block, dict): + continue + block_type = block.get("type") + if block_type == "tool_use": + tool_use_id = block.get("id") + tool_name = block.get("name") + if isinstance(tool_use_id, str): + self._start_tool_span(tool_use_id, tool_name, msg_type) + elif block_type == "tool_result": + tool_use_id = block.get("tool_use_id") + is_error = block.get("is_error") + if isinstance(tool_use_id, str): + self._end_tool_span(tool_use_id, is_error) + + def _start_tool_span( + self, tool_use_id: str, tool_name: str | None, msg_type: str | None + ) -> None: + if not self._tracer: + return + if tool_use_id in self._tool_spans: + logger.warning( + "Duplicate tool_use_id %s encountered; closing prior span", + tool_use_id, + ) + self._end_tool_span(tool_use_id, is_error=None) + span = self._tracer.start_span( + "claude_agent_sdk.cli.tool_call", + kind=span_kind_internal(), + ) + span.set_attribute("tool.use_id", tool_use_id) + span.set_attribute("tool.source", "cli") + if tool_name is not None: + span.set_attribute("tool.name", tool_name) + if msg_type is not None: + span.set_attribute("tool.message_type", msg_type) + self._tool_spans[tool_use_id] = span + + def _end_tool_span(self, tool_use_id: str, is_error: Any) -> None: + span = self._tool_spans.pop(tool_use_id, None) + if not span: + return + if isinstance(is_error, bool): + span.set_attribute("tool.result.is_error", is_error) + span.end() + + def _close_open_tool_spans(self, reason: str) -> None: + if not self._tool_spans: + return + for tool_use_id, span in list(self._tool_spans.items()): + try: + span.set_attribute("tool.span_incomplete", True) + span.set_attribute("tool.span_end_reason", reason) + span.end() + except Exception: + pass + self._tool_spans.pop(tool_use_id, None) diff --git a/src/claude_agent_sdk/_internal/telemetry.py b/src/claude_agent_sdk/_internal/telemetry.py new file mode 100644 index 00000000..3467c024 --- /dev/null +++ b/src/claude_agent_sdk/_internal/telemetry.py @@ -0,0 +1,167 @@ +"""Telemetry utilities for OpenTelemetry integration.""" + +from contextlib import asynccontextmanager, contextmanager +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from opentelemetry.metrics import Meter + from opentelemetry.trace import Span, SpanKind, Tracer + +try: + from opentelemetry.trace import SpanKind as _SpanKind +except ImportError: + _SpanKind = None + + +def get_otel_tracer(name: str) -> "Tracer | None": + """Get OpenTelemetry tracer, returns None if not available. + + Args: + name: The name of the tracer (typically the module path) + + Returns: + OpenTelemetry Tracer if available, None otherwise + """ + try: + from opentelemetry import trace + + return trace.get_tracer(name) + except ImportError: + return None + + +def get_otel_meter(name: str) -> "Meter | None": + """Get OpenTelemetry meter, returns None if not available. + + Args: + name: The name of the meter (typically the module path) + + Returns: + OpenTelemetry Meter if available, None otherwise + """ + try: + from opentelemetry import metrics + + return metrics.get_meter(name) + except ImportError: + return None + + +def record_span_exception(span: "Span | None", exc: Exception) -> None: + """Record an exception on a span and mark it as error.""" + if not span: + return + + try: + from opentelemetry.trace import Status, StatusCode + except ImportError: + return + + span.record_exception(exc) + span.set_status(Status(StatusCode.ERROR, description=str(exc))) + + +def span_kind_client() -> "SpanKind | None": + return _SpanKind.CLIENT if _SpanKind is not None else None + + +def span_kind_internal() -> "SpanKind | None": + return _SpanKind.INTERNAL if _SpanKind is not None else None + + +@contextmanager +def traced_span( + tracer: "Tracer | None", + span_name: str, + kind: "SpanKind | None" = None, + attributes: dict[str, Any] | None = None, +): + """Context manager for creating traced spans with proper error handling. + + Args: + tracer: OpenTelemetry tracer or None + span_name: Name of the span + kind: SpanKind (defaults to INTERNAL if not specified) + attributes: Optional dictionary of span attributes + + Yields: + Span if tracer is available, None otherwise + """ + if not tracer: + yield None + return + + try: + from opentelemetry.trace import Status, StatusCode + except ImportError: + yield None + return + + if kind is None and _SpanKind is not None: + kind = _SpanKind.INTERNAL + + kwargs = {"name": span_name} + if kind is not None: + kwargs["kind"] = kind + + with tracer.start_as_current_span(**kwargs) as span: + if attributes: + for key, value in attributes.items(): + if value is not None: # Skip None values + span.set_attribute(key, value) + try: + yield span + span.set_status(Status(StatusCode.OK)) + except Exception as e: + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, description=str(e))) + raise + + +@asynccontextmanager +async def traced_span_async( + tracer: "Tracer | None", + span_name: str, + kind: "SpanKind | None" = None, + attributes: dict[str, Any] | None = None, +): + """Async context manager for creating traced spans with proper error handling. + + Args: + tracer: OpenTelemetry tracer or None + span_name: Name of the span + kind: SpanKind (defaults to INTERNAL if not specified) + attributes: Optional dictionary of span attributes + + Yields: + Span if tracer is available, None otherwise + """ + if not tracer: + yield None + return + + try: + from opentelemetry.trace import Status, StatusCode + except ImportError: + yield None + return + + if kind is None and _SpanKind is not None: + kind = _SpanKind.INTERNAL + + kwargs = {"name": span_name} + if kind is not None: + kwargs["kind"] = kind + + with tracer.start_as_current_span(**kwargs) as span: + if attributes: + for key, value in attributes.items(): + if value is not None: # Skip None values + span.set_attribute(key, value) + try: + yield span + span.set_status(Status(StatusCode.OK)) + except Exception as e: + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, description=str(e))) + raise diff --git a/src/claude_agent_sdk/_internal/transport/subprocess_cli.py b/src/claude_agent_sdk/_internal/transport/subprocess_cli.py index a4882db1..03c919d4 100644 --- a/src/claude_agent_sdk/_internal/transport/subprocess_cli.py +++ b/src/claude_agent_sdk/_internal/transport/subprocess_cli.py @@ -24,6 +24,7 @@ from ..._errors import CLIJSONDecodeError as SDKJSONDecodeError from ..._version import __version__ from ...types import ClaudeAgentOptions +from ..telemetry import get_otel_tracer, span_kind_client, traced_span_async from . import Transport logger = logging.getLogger(__name__) @@ -66,6 +67,12 @@ def __init__( ) self._temp_files: list[str] = [] # Track temporary files for cleanup self._write_lock: anyio.Lock = anyio.Lock() + self._telemetry = options.telemetry + self._tracer = None + if self._telemetry and self._telemetry.enabled: + self._tracer = self._telemetry.tracer or get_otel_tracer( + "claude_agent_sdk.transport" + ) def _find_cli(self) -> str: """Find Claude Code CLI binary.""" @@ -371,80 +378,90 @@ async def connect(self) -> None: if self._process: return - if not os.environ.get("CLAUDE_AGENT_SDK_SKIP_VERSION_CHECK"): - await self._check_claude_version() - - cmd = self._build_command() - try: - # Merge environment variables: system -> user -> SDK required - process_env = { - **os.environ, - **self._options.env, # User-provided env vars - "CLAUDE_CODE_ENTRYPOINT": "sdk-py", - "CLAUDE_AGENT_SDK_VERSION": __version__, - } - - # Enable file checkpointing if requested - if self._options.enable_file_checkpointing: - process_env["CLAUDE_CODE_ENABLE_SDK_FILE_CHECKPOINTING"] = "true" - - if self._cwd: - process_env["PWD"] = self._cwd - - # Pipe stderr if we have a callback OR debug mode is enabled - should_pipe_stderr = ( - self._options.stderr is not None - or "debug-to-stderr" in self._options.extra_args - ) + async with traced_span_async( + self._tracer, + "claude_agent_sdk.transport.connect", + kind=span_kind_client(), + attributes={ + "transport.type": "subprocess_cli", + "transport.streaming": self._is_streaming, + "transport.cwd": self._cwd, + }, + ): + try: + if not os.environ.get("CLAUDE_AGENT_SDK_SKIP_VERSION_CHECK"): + await self._check_claude_version() + + cmd = self._build_command() + # Merge environment variables: system -> user -> SDK required + process_env = { + **os.environ, + **self._options.env, # User-provided env vars + "CLAUDE_CODE_ENTRYPOINT": "sdk-py", + "CLAUDE_AGENT_SDK_VERSION": __version__, + } + + # Enable file checkpointing if requested + if self._options.enable_file_checkpointing: + process_env["CLAUDE_CODE_ENABLE_SDK_FILE_CHECKPOINTING"] = "true" + + if self._cwd: + process_env["PWD"] = self._cwd + + # Pipe stderr if we have a callback OR debug mode is enabled + should_pipe_stderr = ( + self._options.stderr is not None + or "debug-to-stderr" in self._options.extra_args + ) - # For backward compat: use debug_stderr file object if no callback and debug is on - stderr_dest = PIPE if should_pipe_stderr else None - - self._process = await anyio.open_process( - cmd, - stdin=PIPE, - stdout=PIPE, - stderr=stderr_dest, - cwd=self._cwd, - env=process_env, - user=self._options.user, - ) + # For backward compat: use debug_stderr file object if no callback and debug is on + stderr_dest = PIPE if should_pipe_stderr else None - if self._process.stdout: - self._stdout_stream = TextReceiveStream(self._process.stdout) - - # Setup stderr stream if piped - if should_pipe_stderr and self._process.stderr: - self._stderr_stream = TextReceiveStream(self._process.stderr) - # Start async task to read stderr - self._stderr_task_group = anyio.create_task_group() - await self._stderr_task_group.__aenter__() - self._stderr_task_group.start_soon(self._handle_stderr) - - # Setup stdin for streaming mode - if self._is_streaming and self._process.stdin: - self._stdin_stream = TextSendStream(self._process.stdin) - elif not self._is_streaming and self._process.stdin: - # String mode: close stdin immediately - await self._process.stdin.aclose() - - self._ready = True - - except FileNotFoundError as e: - # Check if the error comes from the working directory or the CLI - if self._cwd and not Path(self._cwd).exists(): - error = CLIConnectionError( - f"Working directory does not exist: {self._cwd}" + self._process = await anyio.open_process( + cmd, + stdin=PIPE, + stdout=PIPE, + stderr=stderr_dest, + cwd=self._cwd, + env=process_env, + user=self._options.user, ) + + if self._process.stdout: + self._stdout_stream = TextReceiveStream(self._process.stdout) + + # Setup stderr stream if piped + if should_pipe_stderr and self._process.stderr: + self._stderr_stream = TextReceiveStream(self._process.stderr) + # Start async task to read stderr + self._stderr_task_group = anyio.create_task_group() + await self._stderr_task_group.__aenter__() + self._stderr_task_group.start_soon(self._handle_stderr) + + # Setup stdin for streaming mode + if self._is_streaming and self._process.stdin: + self._stdin_stream = TextSendStream(self._process.stdin) + elif not self._is_streaming and self._process.stdin: + # String mode: close stdin immediately + await self._process.stdin.aclose() + + self._ready = True + + except FileNotFoundError as e: + # Check if the error comes from the working directory or the CLI + if self._cwd and not Path(self._cwd).exists(): + error = CLIConnectionError( + f"Working directory does not exist: {self._cwd}" + ) + self._exit_error = error + raise error from e + error = CLINotFoundError(f"Claude Code not found at: {self._cli_path}") + self._exit_error = error + raise error from e + except Exception as e: + error = CLIConnectionError(f"Failed to start Claude Code: {e}") self._exit_error = error raise error from e - error = CLINotFoundError(f"Claude Code not found at: {self._cli_path}") - self._exit_error = error - raise error from e - except Exception as e: - error = CLIConnectionError(f"Failed to start Claude Code: {e}") - self._exit_error = error - raise error from e async def _handle_stderr(self) -> None: """Handle stderr stream - read and invoke callbacks.""" @@ -476,76 +493,123 @@ async def _handle_stderr(self) -> None: async def close(self) -> None: """Close the transport and clean up resources.""" - # Clean up temporary files first (before early return) - for temp_file in self._temp_files: - with suppress(Exception): - Path(temp_file).unlink(missing_ok=True) - self._temp_files.clear() - - if not self._process: - self._ready = False - return + async with traced_span_async( + self._tracer, + "claude_agent_sdk.transport.close", + kind=span_kind_client(), + attributes={ + "transport.type": "subprocess_cli", + "transport.had_process": self._process is not None, + }, + ) as span: + cleanup_failed = False + + def _record_cleanup_error(err: Exception) -> None: + nonlocal cleanup_failed + cleanup_failed = True + if span: + span.record_exception(err) + + # Clean up temporary files first (before early return) + for temp_file in self._temp_files: + try: + Path(temp_file).unlink(missing_ok=True) + except Exception as e: + _record_cleanup_error(e) + self._temp_files.clear() - # Close stderr task group if active - if self._stderr_task_group: - with suppress(Exception): - self._stderr_task_group.cancel_scope.cancel() - await self._stderr_task_group.__aexit__(None, None, None) - self._stderr_task_group = None + if not self._process: + self._ready = False + return - # Close stdin stream (acquire lock to prevent race with concurrent writes) - async with self._write_lock: - self._ready = False # Set inside lock to prevent TOCTOU with write() - if self._stdin_stream: - with suppress(Exception): - await self._stdin_stream.aclose() - self._stdin_stream = None + # Close stderr task group if active + if self._stderr_task_group: + try: + self._stderr_task_group.cancel_scope.cancel() + await self._stderr_task_group.__aexit__(None, None, None) + except Exception as e: + _record_cleanup_error(e) + self._stderr_task_group = None + + # Close stdin stream (acquire lock to prevent race with concurrent writes) + async with self._write_lock: + self._ready = False # Set inside lock to prevent TOCTOU with write() + if self._stdin_stream: + try: + await self._stdin_stream.aclose() + except Exception as e: + _record_cleanup_error(e) + self._stdin_stream = None - if self._stderr_stream: - with suppress(Exception): - await self._stderr_stream.aclose() - self._stderr_stream = None + if self._stderr_stream: + try: + await self._stderr_stream.aclose() + except Exception as e: + _record_cleanup_error(e) + self._stderr_stream = None - # Terminate and wait for process - if self._process.returncode is None: - with suppress(ProcessLookupError): - self._process.terminate() - # Wait for process to finish with timeout - with suppress(Exception): - # Just try to wait, but don't block if it fails - await self._process.wait() + # Terminate and wait for process + if self._process.returncode is None: + try: + self._process.terminate() + # Wait for process to finish with timeout + try: + # Just try to wait, but don't block if it fails + await self._process.wait() + except Exception as e: + _record_cleanup_error(e) + except ProcessLookupError: + pass + except Exception as e: + _record_cleanup_error(e) + + self._process = None + self._stdout_stream = None + self._stdin_stream = None + self._stderr_stream = None + self._exit_error = None - self._process = None - self._stdout_stream = None - self._stdin_stream = None - self._stderr_stream = None - self._exit_error = None + if cleanup_failed and span: + span.set_attribute("transport.cleanup_failed", True) async def write(self, data: str) -> None: """Write raw data to the transport.""" - async with self._write_lock: - # All checks inside lock to prevent TOCTOU races with close()/end_input() - if not self._ready or not self._stdin_stream: - raise CLIConnectionError("ProcessTransport is not ready for writing") + async with traced_span_async( + self._tracer, + "claude_agent_sdk.transport.write", + kind=span_kind_client(), + attributes={ + "transport.type": "subprocess_cli", + "transport.write.bytes": len(data), + }, + ): + async with self._write_lock: + # All checks inside lock to prevent TOCTOU races with close()/end_input() + if not self._ready or not self._stdin_stream: + raise CLIConnectionError( + "ProcessTransport is not ready for writing" + ) - if self._process and self._process.returncode is not None: - raise CLIConnectionError( - f"Cannot write to terminated process (exit code: {self._process.returncode})" - ) + if self._process and self._process.returncode is not None: + raise CLIConnectionError( + "Cannot write to terminated process " + f"(exit code: {self._process.returncode})" + ) - if self._exit_error: - raise CLIConnectionError( - f"Cannot write to process that exited with error: {self._exit_error}" - ) from self._exit_error + if self._exit_error: + raise CLIConnectionError( + "Cannot write to process that exited with error: " + f"{self._exit_error}" + ) from self._exit_error - try: - await self._stdin_stream.send(data) - except Exception as e: - self._ready = False - self._exit_error = CLIConnectionError( - f"Failed to write to process stdin: {e}" - ) - raise self._exit_error from e + try: + await self._stdin_stream.send(data) + except Exception as e: + self._ready = False + self._exit_error = CLIConnectionError( + f"Failed to write to process stdin: {e}" + ) + raise self._exit_error from e async def end_input(self) -> None: """End the input stream (close stdin).""" @@ -564,68 +628,79 @@ async def _read_messages_impl(self) -> AsyncIterator[dict[str, Any]]: if not self._process or not self._stdout_stream: raise CLIConnectionError("Not connected") - json_buffer = "" - - # Process stdout messages - try: - async for line in self._stdout_stream: - line_str = line.strip() - if not line_str: - continue - - # Accumulate partial JSON until we can parse it - # Note: TextReceiveStream can truncate long lines, so we need to buffer - # and speculatively parse until we get a complete JSON object - json_lines = line_str.split("\n") - - for json_line in json_lines: - json_line = json_line.strip() - if not json_line: - continue - - # Keep accumulating partial JSON until we can parse it - json_buffer += json_line - - if len(json_buffer) > self._max_buffer_size: - buffer_length = len(json_buffer) - json_buffer = "" - raise SDKJSONDecodeError( - f"JSON message exceeded maximum buffer size of {self._max_buffer_size} bytes", - ValueError( - f"Buffer size {buffer_length} exceeds limit {self._max_buffer_size}" - ), - ) + async with traced_span_async( + self._tracer, + "claude_agent_sdk.transport.read_messages", + kind=span_kind_client(), + attributes={ + "transport.type": "subprocess_cli", + "transport.max_buffer_size": self._max_buffer_size, + }, + ): + json_buffer = "" - try: - data = json.loads(json_buffer) - json_buffer = "" - yield data - except json.JSONDecodeError: - # We are speculatively decoding the buffer until we get - # a full JSON object. If there is an actual issue, we - # raise an error after exceeding the configured limit. + # Process stdout messages + try: + async for line in self._stdout_stream: + line_str = line.strip() + if not line_str: continue - except anyio.ClosedResourceError: - pass - except GeneratorExit: - # Client disconnected - pass + # Accumulate partial JSON until we can parse it + # Note: TextReceiveStream can truncate long lines, so we need to buffer + # and speculatively parse until we get a complete JSON object + json_lines = line_str.split("\n") + + for json_line in json_lines: + json_line = json_line.strip() + if not json_line: + continue + + # Keep accumulating partial JSON until we can parse it + json_buffer += json_line + + if len(json_buffer) > self._max_buffer_size: + buffer_length = len(json_buffer) + json_buffer = "" + raise SDKJSONDecodeError( + "JSON message exceeded maximum buffer size of " + f"{self._max_buffer_size} bytes", + ValueError( + "Buffer size " + f"{buffer_length} exceeds limit {self._max_buffer_size}" + ), + ) - # Check process completion and handle errors - try: - returncode = await self._process.wait() - except Exception: - returncode = -1 - - # Use exit code for error detection - if returncode is not None and returncode != 0: - self._exit_error = ProcessError( - f"Command failed with exit code {returncode}", - exit_code=returncode, - stderr="Check stderr output for details", - ) - raise self._exit_error + try: + data = json.loads(json_buffer) + json_buffer = "" + yield data + except json.JSONDecodeError: + # We are speculatively decoding the buffer until we get + # a full JSON object. If there is an actual issue, we + # raise an error after exceeding the configured limit. + continue + + except anyio.ClosedResourceError: + pass + except GeneratorExit: + # Client disconnected + pass + + # Check process completion and handle errors + try: + returncode = await self._process.wait() + except Exception: + returncode = -1 + + # Use exit code for error detection + if returncode is not None and returncode != 0: + self._exit_error = ProcessError( + f"Command failed with exit code {returncode}", + exit_code=returncode, + stderr="Check stderr output for details", + ) + raise self._exit_error async def _check_claude_version(self) -> None: """Check Claude Code version and warn if below minimum.""" diff --git a/src/claude_agent_sdk/client.py b/src/claude_agent_sdk/client.py index 445285f8..d73fe0d1 100644 --- a/src/claude_agent_sdk/client.py +++ b/src/claude_agent_sdk/client.py @@ -8,6 +8,11 @@ from . import Transport from ._errors import CLIConnectionError +from ._internal.telemetry import ( + get_otel_tracer, + span_kind_client, + traced_span_async, +) from .types import ClaudeAgentOptions, HookEvent, HookMatcher, Message, ResultMessage @@ -102,70 +107,91 @@ async def _empty_stream() -> AsyncIterator[dict[str, Any]]: actual_prompt = _empty_stream() if prompt is None else prompt - # Validate and configure permission settings (matching TypeScript SDK logic) - if self.options.can_use_tool: - # canUseTool callback requires streaming mode (AsyncIterable prompt) - if isinstance(prompt, str): - raise ValueError( - "can_use_tool callback requires streaming mode. " - "Please provide prompt as an AsyncIterable instead of a string." - ) - - # canUseTool and permission_prompt_tool_name are mutually exclusive - if self.options.permission_prompt_tool_name: - raise ValueError( - "can_use_tool callback cannot be used with permission_prompt_tool_name. " - "Please use one or the other." + tracer = None + if self.options.telemetry and self.options.telemetry.enabled: + tracer = self.options.telemetry.tracer or get_otel_tracer( + "claude_agent_sdk.client" + ) + # Span kind helper handles missing OpenTelemetry dependency. + + async def _do_connect() -> None: + # Validate and configure permission settings (matching TypeScript SDK logic) + if self.options.can_use_tool: + # canUseTool callback requires streaming mode (AsyncIterable prompt) + if isinstance(prompt, str): + raise ValueError( + "can_use_tool callback requires streaming mode. " + "Please provide prompt as an AsyncIterable instead of a string." + ) + + # canUseTool and permission_prompt_tool_name are mutually exclusive + if self.options.permission_prompt_tool_name: + raise ValueError( + "can_use_tool callback cannot be used with permission_prompt_tool_name. " + "Please use one or the other." + ) + + # Automatically set permission_prompt_tool_name to "stdio" for control protocol + options = replace(self.options, permission_prompt_tool_name="stdio") + else: + options = self.options + + # Use provided custom transport or create subprocess transport + if self._custom_transport: + self._transport = self._custom_transport + else: + self._transport = SubprocessCLITransport( + prompt=actual_prompt, + options=options, ) - - # Automatically set permission_prompt_tool_name to "stdio" for control protocol - options = replace(self.options, permission_prompt_tool_name="stdio") - else: - options = self.options - - # Use provided custom transport or create subprocess transport - if self._custom_transport: - self._transport = self._custom_transport - else: - self._transport = SubprocessCLITransport( - prompt=actual_prompt, - options=options, + await self._transport.connect() + + # Extract SDK MCP servers from options + sdk_mcp_servers = {} + if self.options.mcp_servers and isinstance(self.options.mcp_servers, dict): + for name, config in self.options.mcp_servers.items(): + if isinstance(config, dict) and config.get("type") == "sdk": + sdk_mcp_servers[name] = config["instance"] # type: ignore[typeddict-item] + + # Calculate initialize timeout from CLAUDE_CODE_STREAM_CLOSE_TIMEOUT env var if set + # CLAUDE_CODE_STREAM_CLOSE_TIMEOUT is in milliseconds, convert to seconds + initialize_timeout_ms = int( + os.environ.get("CLAUDE_CODE_STREAM_CLOSE_TIMEOUT", "60000") + ) + initialize_timeout = max(initialize_timeout_ms / 1000.0, 60.0) + + # Create Query to handle control protocol + self._query = Query( + transport=self._transport, + is_streaming_mode=True, # ClaudeSDKClient always uses streaming mode + can_use_tool=self.options.can_use_tool, + hooks=self._convert_hooks_to_internal_format(self.options.hooks) + if self.options.hooks + else None, + sdk_mcp_servers=sdk_mcp_servers, + initialize_timeout=initialize_timeout, + telemetry=self.options.telemetry, ) - await self._transport.connect() - - # Extract SDK MCP servers from options - sdk_mcp_servers = {} - if self.options.mcp_servers and isinstance(self.options.mcp_servers, dict): - for name, config in self.options.mcp_servers.items(): - if isinstance(config, dict) and config.get("type") == "sdk": - sdk_mcp_servers[name] = config["instance"] # type: ignore[typeddict-item] - - # Calculate initialize timeout from CLAUDE_CODE_STREAM_CLOSE_TIMEOUT env var if set - # CLAUDE_CODE_STREAM_CLOSE_TIMEOUT is in milliseconds, convert to seconds - initialize_timeout_ms = int( - os.environ.get("CLAUDE_CODE_STREAM_CLOSE_TIMEOUT", "60000") - ) - initialize_timeout = max(initialize_timeout_ms / 1000.0, 60.0) - - # Create Query to handle control protocol - self._query = Query( - transport=self._transport, - is_streaming_mode=True, # ClaudeSDKClient always uses streaming mode - can_use_tool=self.options.can_use_tool, - hooks=self._convert_hooks_to_internal_format(self.options.hooks) - if self.options.hooks - else None, - sdk_mcp_servers=sdk_mcp_servers, - initialize_timeout=initialize_timeout, - ) - - # Start reading messages and initialize - await self._query.start() - await self._query.initialize() - - # If we have an initial prompt stream, start streaming it - if prompt is not None and isinstance(prompt, AsyncIterable) and self._query._tg: - self._query._tg.start_soon(self._query.stream_input, prompt) + + # Start reading messages and initialize + await self._query.start() + await self._query.initialize() + + # If we have an initial prompt stream, start streaming it + if prompt is not None and isinstance(prompt, AsyncIterable) and self._query._tg: + self._query._tg.start_soon(self._query.stream_input, prompt) + + async with traced_span_async( + tracer, + "claude_agent_sdk.client.connect", + kind=span_kind_client(), + attributes={ + "client.streaming": not isinstance(actual_prompt, str), + "client.has_hooks": bool(self.options.hooks), + "client.has_mcp_servers": bool(self.options.mcp_servers), + }, + ): + await _do_connect() async def receive_messages(self) -> AsyncIterator[Message]: """Receive all messages from Claude.""" @@ -190,22 +216,40 @@ async def query( if not self._query or not self._transport: raise CLIConnectionError("Not connected. Call connect() first.") - # Handle string prompts - if isinstance(prompt, str): - message = { - "type": "user", - "message": {"role": "user", "content": prompt}, - "parent_tool_use_id": None, - "session_id": session_id, - } - await self._transport.write(json.dumps(message) + "\n") - else: - # Handle AsyncIterable prompts - stream them - async for msg in prompt: - # Ensure session_id is set on each message - if "session_id" not in msg: - msg["session_id"] = session_id - await self._transport.write(json.dumps(msg) + "\n") + tracer = None + if self.options.telemetry and self.options.telemetry.enabled: + tracer = self.options.telemetry.tracer or get_otel_tracer( + "claude_agent_sdk.client" + ) + + async def _do_query() -> None: + # Handle string prompts + if isinstance(prompt, str): + message = { + "type": "user", + "message": {"role": "user", "content": prompt}, + "parent_tool_use_id": None, + "session_id": session_id, + } + await self._transport.write(json.dumps(message) + "\n") + else: + # Handle AsyncIterable prompts - stream them + async for msg in prompt: + # Ensure session_id is set on each message + if "session_id" not in msg: + msg["session_id"] = session_id + await self._transport.write(json.dumps(msg) + "\n") + + async with traced_span_async( + tracer, + "claude_agent_sdk.client.query", + kind=span_kind_client(), + attributes={ + "client.streaming": not isinstance(prompt, str), + "client.session_id": session_id, + }, + ): + await _do_query() async def interrupt(self) -> None: """Send interrupt signal (only works with streaming mode).""" @@ -387,10 +431,21 @@ async def receive_response(self) -> AsyncIterator[Message]: async def disconnect(self) -> None: """Disconnect from Claude.""" - if self._query: - await self._query.close() - self._query = None - self._transport = None + tracer = None + if self.options.telemetry and self.options.telemetry.enabled: + tracer = self.options.telemetry.tracer or get_otel_tracer( + "claude_agent_sdk.client" + ) + + async with traced_span_async( + tracer, + "claude_agent_sdk.client.disconnect", + kind=span_kind_client(), + ): + if self._query: + await self._query.close() + self._query = None + self._transport = None async def __aenter__(self) -> "ClaudeSDKClient": """Enter async context - automatically connects with empty stream for interactive use.""" diff --git a/src/claude_agent_sdk/types.py b/src/claude_agent_sdk/types.py index 08b1e023..ea3e7715 100644 --- a/src/claude_agent_sdk/types.py +++ b/src/claude_agent_sdk/types.py @@ -10,9 +10,13 @@ if TYPE_CHECKING: from mcp.server import Server as McpServer + from opentelemetry.metrics import Meter + from opentelemetry.trace import Tracer else: # Runtime placeholder for forward reference resolution in Pydantic 2.12+ McpServer = Any + Meter = Any + Tracer = Any # Permission modes PermissionMode = Literal["default", "acceptEdits", "plan", "bypassPermissions"] @@ -701,6 +705,33 @@ class ClaudeAgentOptions: # using `ClaudeSDKClient.rewind_files()`. enable_file_checkpointing: bool = False + # Telemetry configuration (tracing/metrics). + telemetry: "TelemetryOptions | None" = None + + +@dataclass +class TelemetryOptions: + """Telemetry configuration for tracing and metrics.""" + + enabled: bool = False + tracer: Tracer | None = None + meter: Meter | None = None + + def __post_init__(self) -> None: + if self.tracer is not None and not hasattr( + self.tracer, "start_as_current_span" + ): + raise TypeError( + "TelemetryOptions.tracer must provide start_as_current_span()" + ) + if self.meter is not None and not ( + hasattr(self.meter, "create_counter") + and hasattr(self.meter, "create_histogram") + ): + raise TypeError( + "TelemetryOptions.meter must provide create_counter() and create_histogram()" + ) + # SDK Control Protocol class SDKControlInterruptRequest(TypedDict): diff --git a/tests/test_telemetry.py b/tests/test_telemetry.py new file mode 100644 index 00000000..5139063f --- /dev/null +++ b/tests/test_telemetry.py @@ -0,0 +1,298 @@ +"""Tests for telemetry helpers.""" + +from __future__ import annotations + +import pytest + +from claude_agent_sdk._internal.telemetry import ( + get_otel_meter, + get_otel_tracer, + record_span_exception, + span_kind_client, + span_kind_internal, + traced_span, + traced_span_async, +) + +# Import OTel test utilities - these tests will be skipped if OTel isn't installed +pytest.importorskip("opentelemetry") + +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.trace import StatusCode + + +@pytest.fixture +def span_exporter(): + """Create an in-memory span exporter for testing.""" + return InMemorySpanExporter() + + +@pytest.fixture +def tracer(span_exporter): + """Create a tracer with in-memory exporter for testing.""" + provider = TracerProvider() + provider.add_span_processor(SimpleSpanProcessor(span_exporter)) + return provider.get_tracer("test.tracer") + + +# --- No-op tests (when tracer is None) --- + + +def test_traced_span_no_tracer_is_noop() -> None: + """traced_span should no-op when tracer is None.""" + with traced_span(None, "test.span") as span: + assert span is None + + +@pytest.mark.asyncio +async def test_traced_span_async_no_tracer_is_noop() -> None: + """traced_span_async should no-op when tracer is None.""" + async with traced_span_async(None, "test.span") as span: + assert span is None + + +# --- Span creation tests --- + + +def test_traced_span_creates_span(tracer, span_exporter) -> None: + """traced_span should create a span with correct name.""" + with traced_span(tracer, "test.operation") as span: + assert span is not None + span.set_attribute("custom.attr", "value") + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].name == "test.operation" + assert spans[0].status.status_code == StatusCode.OK + + +@pytest.mark.asyncio +async def test_traced_span_async_creates_span(tracer, span_exporter) -> None: + """traced_span_async should create a span with correct name.""" + async with traced_span_async(tracer, "test.async.operation") as span: + assert span is not None + span.set_attribute("custom.attr", "async_value") + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].name == "test.async.operation" + assert spans[0].status.status_code == StatusCode.OK + + +# --- Attributes tests --- + + +def test_traced_span_sets_attributes(tracer, span_exporter) -> None: + """traced_span should set initial attributes on the span.""" + attributes = { + "string.attr": "hello", + "int.attr": 42, + "bool.attr": True, + "none.attr": None, # Should be skipped + } + + with traced_span(tracer, "test.with.attrs", attributes=attributes): + pass + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + span_attrs = dict(spans[0].attributes) + assert span_attrs["string.attr"] == "hello" + assert span_attrs["int.attr"] == 42 + assert span_attrs["bool.attr"] is True + assert "none.attr" not in span_attrs # None values should be skipped + + +@pytest.mark.asyncio +async def test_traced_span_async_sets_attributes(tracer, span_exporter) -> None: + """traced_span_async should set initial attributes on the span.""" + attributes = { + "query.type": "streaming", + "query.timeout": 30, + } + + async with traced_span_async(tracer, "test.async.attrs", attributes=attributes): + pass + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + span_attrs = dict(spans[0].attributes) + assert span_attrs["query.type"] == "streaming" + assert span_attrs["query.timeout"] == 30 + + +# --- SpanKind tests --- + + +def test_traced_span_with_client_kind(tracer, span_exporter) -> None: + """traced_span should accept SpanKind.""" + with traced_span(tracer, "test.client.span", kind=span_kind_client()): + pass + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].kind == trace.SpanKind.CLIENT + + +def test_traced_span_defaults_to_internal_kind(tracer, span_exporter) -> None: + """traced_span should default to INTERNAL SpanKind.""" + with traced_span(tracer, "test.internal.span"): + pass + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].kind == trace.SpanKind.INTERNAL + + +@pytest.mark.asyncio +async def test_traced_span_async_with_internal_kind(tracer, span_exporter) -> None: + """traced_span_async should accept SpanKind.""" + async with traced_span_async( + tracer, "test.async.internal", kind=span_kind_internal() + ): + pass + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].kind == trace.SpanKind.INTERNAL + + +# --- Error handling tests --- + + +def test_traced_span_records_exception(tracer, span_exporter) -> None: + """traced_span should record exceptions and set error status.""" + with pytest.raises(ValueError, match="test error"): + with traced_span(tracer, "test.error.span"): + raise ValueError("test error") + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].status.status_code == StatusCode.ERROR + assert "test error" in spans[0].status.description + + # Check exception was recorded (may be recorded multiple times by OTel internals) + events = spans[0].events + assert len(events) >= 1 + exception_events = [e for e in events if e.name == "exception"] + assert len(exception_events) >= 1 + + +@pytest.mark.asyncio +async def test_traced_span_async_records_exception(tracer, span_exporter) -> None: + """traced_span_async should record exceptions and set error status.""" + with pytest.raises(RuntimeError, match="async error"): + async with traced_span_async(tracer, "test.async.error"): + raise RuntimeError("async error") + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].status.status_code == StatusCode.ERROR + assert "async error" in spans[0].status.description + + # Check exception was recorded (may be recorded multiple times by OTel internals) + events = spans[0].events + assert len(events) >= 1 + exception_events = [e for e in events if e.name == "exception"] + assert len(exception_events) >= 1 + + +# --- record_span_exception tests --- + + +def test_record_span_exception_on_active_span(tracer, span_exporter) -> None: + """record_span_exception should record exception on span.""" + with tracer.start_as_current_span("test.manual.span") as span: + exc = ValueError("manual exception") + record_span_exception(span, exc) + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].status.status_code == StatusCode.ERROR + assert "manual exception" in spans[0].status.description + + +def test_record_span_exception_noop_on_none() -> None: + """record_span_exception should no-op when span is None.""" + # Should not raise + record_span_exception(None, ValueError("ignored")) + + +# --- SpanKind helper tests --- + + +def test_span_kind_client_returns_client() -> None: + """span_kind_client should return SpanKind.CLIENT.""" + kind = span_kind_client() + assert kind == trace.SpanKind.CLIENT + + +def test_span_kind_internal_returns_internal() -> None: + """span_kind_internal should return SpanKind.INTERNAL.""" + kind = span_kind_internal() + assert kind == trace.SpanKind.INTERNAL + + +# --- get_otel_tracer / get_otel_meter tests --- + + +def test_get_otel_tracer_returns_tracer() -> None: + """get_otel_tracer should return a tracer when OTel is available.""" + tracer = get_otel_tracer("test.module") + assert tracer is not None + # Should have start_as_current_span method + assert hasattr(tracer, "start_as_current_span") + + +def test_get_otel_meter_returns_meter() -> None: + """get_otel_meter should return a meter when OTel is available.""" + meter = get_otel_meter("test.module") + assert meter is not None + # Should have create_counter method + assert hasattr(meter, "create_counter") + + +# --- Nested spans test --- + + +def test_nested_traced_spans(tracer, span_exporter) -> None: + """Nested traced_span calls should create parent-child relationship.""" + with traced_span(tracer, "parent.span") as parent: + parent.set_attribute("level", "parent") + with traced_span(tracer, "child.span") as child: + child.set_attribute("level", "child") + + spans = span_exporter.get_finished_spans() + assert len(spans) == 2 + + # Spans are exported in order of completion (child first) + child_span = spans[0] + parent_span = spans[1] + + assert child_span.name == "child.span" + assert parent_span.name == "parent.span" + + # Child should have parent as parent + assert child_span.parent.span_id == parent_span.context.span_id + + +@pytest.mark.asyncio +async def test_nested_traced_spans_async(tracer, span_exporter) -> None: + """Nested traced_span_async calls should create parent-child relationship.""" + async with traced_span_async(tracer, "async.parent") as parent: + parent.set_attribute("level", "parent") + async with traced_span_async(tracer, "async.child") as child: + child.set_attribute("level", "child") + + spans = span_exporter.get_finished_spans() + assert len(spans) == 2 + + child_span = spans[0] + parent_span = spans[1] + + assert child_span.name == "async.child" + assert parent_span.name == "async.parent" + assert child_span.parent.span_id == parent_span.context.span_id From 15f55a9c25787fb3da1203441c776227062e86b0 Mon Sep 17 00:00:00 2001 From: Teja Date: Mon, 2 Feb 2026 22:47:02 -0500 Subject: [PATCH 2/2] feat: enhance SDK installation and testing with telemetry support - Updated Dockerfile and GitHub Actions workflow to install SDK with telemetry dependencies. - Modified README to clarify installation instructions and added details about telemetry tests. - Introduced new end-to-end tests for telemetry integration, verifying metrics and tracing functionality. - Adjusted `conftest.py` to improve test marker documentation. These changes improve the observability of SDK operations and ensure proper testing of telemetry features. --- .github/workflows/test.yml | 4 +- Dockerfile.test | 4 +- e2e-tests/README.md | 26 +- e2e-tests/conftest.py | 4 +- e2e-tests/test_telemetry.py | 534 ++++++++++++++++++++++++++++++++++ src/claude_agent_sdk/types.py | 4 +- 6 files changed, 566 insertions(+), 10 deletions(-) create mode 100644 e2e-tests/test_telemetry.py diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 7f7c9ea8..867d8184 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -24,7 +24,7 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - pip install -e ".[dev]" + pip install -e ".[dev,telemetry]" - name: Run tests run: | @@ -73,7 +73,7 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - pip install -e ".[dev]" + pip install -e ".[dev,telemetry]" - name: Run end-to-end tests with real API env: diff --git a/Dockerfile.test b/Dockerfile.test index 22adf2ec..c327af4d 100644 --- a/Dockerfile.test +++ b/Dockerfile.test @@ -19,8 +19,8 @@ WORKDIR /app # Copy the SDK source COPY . . -# Install SDK with dev dependencies -RUN pip install -e ".[dev]" +# Install SDK with dev + telemetry dependencies +RUN pip install -e ".[dev,telemetry]" # Verify CLI installation RUN claude -v diff --git a/e2e-tests/README.md b/e2e-tests/README.md index 6dfe374e..d1d6107d 100644 --- a/e2e-tests/README.md +++ b/e2e-tests/README.md @@ -16,12 +16,14 @@ export ANTHROPIC_API_KEY="your-api-key-here" ### Dependencies -Install the development dependencies: +Install the development dependencies including telemetry support: ```bash -pip install -e ".[dev]" +pip install -e ".[dev,telemetry]" ``` +> **Note**: Telemetry tests (`test_telemetry.py`) require the `telemetry` extra. Without it, these tests will be silently skipped via `pytest.importorskip()`. + ## Running the Tests ### Run all e2e tests: @@ -52,6 +54,24 @@ python -m pytest e2e-tests/test_mcp_calculator.py::test_basic_addition -v ## Test Coverage +### Telemetry Tests (`test_telemetry.py`) + +Tests OpenTelemetry tracing and metrics integration: + +- **test_telemetry_tracing_spans_emitted**: Verifies core spans are emitted during SDK session +- **test_telemetry_metrics_emitted**: Verifies core metrics (messages, tokens, cost) are recorded +- **test_telemetry_token_metrics_detailed**: Tests prompt/completion token split metrics +- **test_telemetry_tool_use**: Validates tool use spans for both CLI and SDK MCP tools +- **test_telemetry_disabled_no_crash**: Ensures SDK works when telemetry is disabled +- **test_telemetry_not_provided_no_crash**: Ensures SDK works without telemetry config +- **test_telemetry_enabled_without_tracer_or_meter**: Tests fallback to default tracer/meter +- **test_telemetry_options_invalid_tracer/meter**: Validates type checking on TelemetryOptions +- **test_telemetry_hook_spans**: Verifies hook callback spans are emitted +- **test_telemetry_permission_callback_spans**: Verifies permission callback spans +- **test_telemetry_error_recording_invalid_cwd**: Tests error recording on spans +- **test_telemetry_duration_metrics**: Validates duration-related metrics +- **test_telemetry_invocation_counter**: Verifies invocation counter increments + ### MCP Calculator Tests (`test_mcp_calculator.py`) Tests the MCP (Model Context Protocol) integration with calculator tools: @@ -99,4 +119,4 @@ When adding new e2e tests: 2. Use the `api_key` fixture to ensure API key is available 3. Keep prompts simple to minimize costs 4. Verify actual tool execution, not just mocked responses -5. Document any special setup requirements in this README \ No newline at end of file +5. Document any special setup requirements in this README diff --git a/e2e-tests/conftest.py b/e2e-tests/conftest.py index 392c213a..ea419acb 100644 --- a/e2e-tests/conftest.py +++ b/e2e-tests/conftest.py @@ -27,4 +27,6 @@ def event_loop_policy(): def pytest_configure(config): """Add e2e marker.""" - config.addinivalue_line("markers", "e2e: marks tests as e2e tests requiring API key") \ No newline at end of file + config.addinivalue_line( + "markers", "e2e: marks tests as e2e tests requiring API key" + ) diff --git a/e2e-tests/test_telemetry.py b/e2e-tests/test_telemetry.py new file mode 100644 index 00000000..8193271c --- /dev/null +++ b/e2e-tests/test_telemetry.py @@ -0,0 +1,534 @@ +"""End-to-end tests for telemetry integration with real Claude API calls.""" + +from typing import Any + +import pytest + +from claude_agent_sdk import ( + ClaudeAgentOptions, + ClaudeSDKClient, + HookMatcher, + PermissionResultAllow, + ResultMessage, + TelemetryOptions, + create_sdk_mcp_server, + tool, +) +from claude_agent_sdk._errors import CLIConnectionError + +pytest.importorskip("opentelemetry") +pytest.importorskip("opentelemetry.sdk") +pytest.importorskip("opentelemetry.sdk.metrics") +pytest.importorskip("opentelemetry.sdk.metrics.export") + +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import InMemoryMetricReader +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter +from opentelemetry.trace import StatusCode + +# ============================================================================= +# Helper Functions +# ============================================================================= + + +def get_metric_names(metrics_data: Any) -> set[str]: + """Extract metric names from OpenTelemetry metrics data.""" + names: set[str] = set() + for resource_metrics in metrics_data.resource_metrics: + for scope_metrics in resource_metrics.scope_metrics: + for metric in scope_metrics.metrics: + names.add(metric.name) + return names + + +def get_metric_by_name(metrics_data: Any, name: str) -> Any: + """Get a specific metric by name from OpenTelemetry metrics data.""" + for resource_metrics in metrics_data.resource_metrics: + for scope_metrics in resource_metrics.scope_metrics: + for metric in scope_metrics.metrics: + if metric.name == name: + return metric + return None + + +# ============================================================================= +# Fixtures +# ============================================================================= + + +@pytest.fixture +def telemetry_tracer(): + """Create a tracer and span exporter for test assertions.""" + exporter = InMemorySpanExporter() + provider = TracerProvider() + provider.add_span_processor(SimpleSpanProcessor(exporter)) + tracer = provider.get_tracer("e2e.telemetry") + return tracer, exporter + + +@pytest.fixture +def telemetry_meter(): + """Create a meter and metric reader for test assertions.""" + reader = InMemoryMetricReader() + provider = MeterProvider(metric_readers=[reader]) + meter = provider.get_meter("e2e.telemetry") + return meter, reader, provider + + +# ============================================================================= +# Core Span Tests +# ============================================================================= + + +@pytest.mark.e2e +@pytest.mark.asyncio +async def test_telemetry_tracing_spans_emitted(api_key, telemetry_tracer): + """Verify that core spans are emitted during a real SDK session.""" + tracer, exporter = telemetry_tracer + options = ClaudeAgentOptions( + telemetry=TelemetryOptions(enabled=True, tracer=tracer), + ) + + async with ClaudeSDKClient(options=options) as client: + await client.query("Respond with the single word 'OK'.") + async for _ in client.receive_response(): + pass + + spans = exporter.get_finished_spans() + span_names = {span.name for span in spans} + expected = { + "claude_agent_sdk.client.connect", + "claude_agent_sdk.client.query", + "claude_agent_sdk.client.disconnect", + "claude_agent_sdk.query.initialize", + "claude_agent_sdk.query.read_messages", + "claude_agent_sdk.transport.connect", + "claude_agent_sdk.transport.close", + } + missing = expected - span_names + assert not missing, f"Missing spans: {sorted(missing)}" + + # Check specific attributes on initialize span + init_span = next(s for s in spans if s.name == "claude_agent_sdk.query.initialize") + assert "has_hooks" in init_span.attributes + assert "has_mcp_servers" in init_span.attributes + + # Verify no spans have ERROR status (OK or UNSET is fine) + for span in spans: + if span.name in expected: + assert span.status.status_code != StatusCode.ERROR, ( + f"Span {span.name} should not have ERROR status" + ) + + +# ============================================================================= +# Core Metrics Tests +# ============================================================================= + + +@pytest.mark.e2e +@pytest.mark.asyncio +async def test_telemetry_metrics_emitted(api_key, telemetry_tracer, telemetry_meter): + """Verify that core metrics are recorded during a real SDK session.""" + tracer, _ = telemetry_tracer + meter, reader, provider = telemetry_meter + options = ClaudeAgentOptions( + telemetry=TelemetryOptions(enabled=True, tracer=tracer, meter=meter), + ) + + async with ClaudeSDKClient(options=options) as client: + await client.query("Respond with the single word 'OK'.") + async for _ in client.receive_response(): + pass + + provider.force_flush() + metrics_data = reader.get_metrics_data() + metric_names = get_metric_names(metrics_data) + + # Core metrics that should always be recorded + assert "claude_agent_sdk.messages" in metric_names + assert "claude_agent_sdk.results" in metric_names + + # Cost metric should be recorded (may be 0 but should exist) + assert "claude_agent_sdk.cost.total_usd" in metric_names + + total_cost = get_metric_by_name(metrics_data, "claude_agent_sdk.cost.total_usd") + assert total_cost is not None + assert len(total_cost.data.data_points) > 0 + + +@pytest.mark.e2e +@pytest.mark.asyncio +async def test_telemetry_token_metrics_detailed( + api_key, telemetry_tracer, telemetry_meter +): + """Verify detailed token metrics (prompt/completion split) are recorded.""" + tracer, _ = telemetry_tracer + meter, reader, provider = telemetry_meter + options = ClaudeAgentOptions( + telemetry=TelemetryOptions(enabled=True, tracer=tracer, meter=meter), + ) + + async with ClaudeSDKClient(options=options) as client: + await client.query("Respond with a paragraph about Python programming.") + async for _ in client.receive_response(): + pass + + provider.force_flush() + metrics_data = reader.get_metrics_data() + metric_names = get_metric_names(metrics_data) + + # Check that we have some metrics recorded + assert len(metric_names) > 0, "Expected at least some metrics to be recorded" + + # Duration metrics should always be recorded + assert "claude_agent_sdk.result.duration_ms" in metric_names + + # If token metrics are available, verify they have data + # (Token metrics depend on usage data being in the result) + if "claude_agent_sdk.tokens.total" in metric_names: + total_tokens = get_metric_by_name(metrics_data, "claude_agent_sdk.tokens.total") + if total_tokens and total_tokens.data.data_points: + assert len(total_tokens.data.data_points) > 0 + + if "claude_agent_sdk.tokens.prompt" in metric_names: + prompt_tokens = get_metric_by_name( + metrics_data, "claude_agent_sdk.tokens.prompt" + ) + if prompt_tokens and prompt_tokens.data.data_points: + assert len(prompt_tokens.data.data_points) > 0 + + +# ============================================================================= +# Tool Use Span Tests +# ============================================================================= + + +@pytest.mark.e2e +@pytest.mark.asyncio +async def test_telemetry_tool_use(api_key, telemetry_tracer): + """Verify that tool use spans are emitted.""" + tracer, exporter = telemetry_tracer + executions = [] + + @tool("echo", "Echo back the input text", {"text": str}) + async def echo_tool(args: dict[str, Any]) -> dict[str, Any]: + """Echo back whatever text is provided.""" + executions.append("echo") + return {"content": [{"type": "text", "text": f"Echo: {args['text']}"}]} + + server = create_sdk_mcp_server( + name="test", + version="1.0.0", + tools=[echo_tool], + ) + + options = ClaudeAgentOptions( + telemetry=TelemetryOptions(enabled=True, tracer=tracer), + mcp_servers={"test": server}, + allowed_tools=["mcp__test__echo"], + ) + + async with ClaudeSDKClient(options=options) as client: + await client.query("Call the mcp__test__echo tool with text 'hello'") + async for _ in client.receive_response(): + pass + + assert "echo" in executions + + spans = exporter.get_finished_spans() + span_names = {span.name for span in spans} + + # Check for CLI tool call span (the outer span from CLI perspective) + assert "claude_agent_sdk.cli.tool_call" in span_names + + # Check for SDK MCP tool call span (the inner span from SDK executing the tool) + assert "claude_agent_sdk.mcp.tool_call" in span_names + + # Check attributes for CLI tool span + cli_tool_span = next(s for s in spans if s.name == "claude_agent_sdk.cli.tool_call") + assert cli_tool_span.attributes.get("tool.source") == "cli" + tool_name = cli_tool_span.attributes.get("tool.name") + assert tool_name is not None, "tool.name attribute should be set" + assert "echo" in tool_name, f"Expected 'echo' in tool name, got: {tool_name}" + + # Check attributes for SDK MCP span + mcp_tool_span = next(s for s in spans if s.name == "claude_agent_sdk.mcp.tool_call") + assert mcp_tool_span.attributes.get("mcp.server") == "test" + assert mcp_tool_span.attributes.get("mcp.tool.name") == "echo" + + +# ============================================================================= +# Telemetry Disabled/Not Configured Tests +# ============================================================================= + + +@pytest.mark.e2e +@pytest.mark.asyncio +async def test_telemetry_disabled_no_crash(api_key): + """Verify SDK works correctly when telemetry is explicitly disabled.""" + options = ClaudeAgentOptions( + telemetry=TelemetryOptions(enabled=False), + ) + async with ClaudeSDKClient(options=options) as client: + await client.query("Respond with 'OK'.") + messages = [msg async for msg in client.receive_response()] + + # Verify we got a result message + assert any(isinstance(m, ResultMessage) for m in messages), ( + "Expected a ResultMessage" + ) + + +@pytest.mark.e2e +@pytest.mark.asyncio +async def test_telemetry_not_provided_no_crash(api_key): + """Verify SDK works correctly when telemetry option is not provided at all.""" + options = ClaudeAgentOptions() # No telemetry config + async with ClaudeSDKClient(options=options) as client: + await client.query("Respond with 'OK'.") + messages = [msg async for msg in client.receive_response()] + + # Verify we got a result message + assert any(isinstance(m, ResultMessage) for m in messages), ( + "Expected a ResultMessage" + ) + + +@pytest.mark.e2e +@pytest.mark.asyncio +async def test_telemetry_enabled_without_tracer_or_meter(api_key): + """Verify SDK works when telemetry is enabled but no tracer/meter provided.""" + # This tests the fallback to get_otel_tracer/get_otel_meter + options = ClaudeAgentOptions( + telemetry=TelemetryOptions(enabled=True), # No tracer or meter + ) + async with ClaudeSDKClient(options=options) as client: + await client.query("Respond with 'OK'.") + messages = [msg async for msg in client.receive_response()] + + # Verify we got a result message + assert any(isinstance(m, ResultMessage) for m in messages), ( + "Expected a ResultMessage" + ) + + +# ============================================================================= +# TelemetryOptions Validation Tests +# ============================================================================= + + +@pytest.mark.e2e +def test_telemetry_options_invalid_tracer(): + """Verify TelemetryOptions rejects invalid tracer (missing start_as_current_span).""" + with pytest.raises(TypeError, match="start_as_current_span"): + TelemetryOptions(enabled=True, tracer=object()) + + +@pytest.mark.e2e +def test_telemetry_options_invalid_meter(): + """Verify TelemetryOptions rejects invalid meter (missing create_counter/create_histogram).""" + with pytest.raises(TypeError, match="create_counter"): + TelemetryOptions(enabled=True, meter=object()) + + +@pytest.mark.e2e +def test_telemetry_options_valid_none_tracer_meter(): + """Verify TelemetryOptions accepts None for tracer and meter.""" + # This should not raise - None is valid (will use defaults) + options = TelemetryOptions(enabled=True, tracer=None, meter=None) + assert options.enabled is True + assert options.tracer is None + assert options.meter is None + + +# ============================================================================= +# Hook Callback Span Tests +# ============================================================================= + + +@pytest.mark.e2e +@pytest.mark.asyncio +async def test_telemetry_hook_spans(api_key, telemetry_tracer): + """Verify that hook callback spans are emitted when hooks are invoked.""" + tracer, exporter = telemetry_tracer + hook_invocations = [] + + async def pre_tool_hook(input_data, tool_use_id, context): + hook_invocations.append(input_data.get("hook_event_name")) + return {"continue_": True} + + options = ClaudeAgentOptions( + telemetry=TelemetryOptions(enabled=True, tracer=tracer), + hooks={ + "PreToolUse": [HookMatcher(matcher=None, hooks=[pre_tool_hook])], + }, + permission_mode="bypassPermissions", + ) + + async with ClaudeSDKClient(options=options) as client: + # Request an action that will trigger tool use + await client.query("List files in the current directory using the Bash tool") + async for _ in client.receive_response(): + pass + + # Only check for hook spans if hooks were actually called + if hook_invocations: + spans = exporter.get_finished_spans() + span_names = {span.name for span in spans} + assert "claude_agent_sdk.hooks.callback" in span_names, ( + f"Expected hooks.callback span. Got spans: {sorted(span_names)}" + ) + + hook_span = next( + s for s in spans if s.name == "claude_agent_sdk.hooks.callback" + ) + assert "hook.callback_id" in hook_span.attributes + assert "hook.event" in hook_span.attributes + + +# ============================================================================= +# Permission Callback Span Tests +# ============================================================================= + + +@pytest.mark.e2e +@pytest.mark.asyncio +async def test_telemetry_permission_callback_spans(api_key, telemetry_tracer): + """Verify that permission callback spans are emitted when can_use_tool is invoked.""" + tracer, exporter = telemetry_tracer + permission_calls = [] + + async def can_use_tool(tool_name, tool_input, context): + permission_calls.append(tool_name) + return PermissionResultAllow() + + options = ClaudeAgentOptions( + telemetry=TelemetryOptions(enabled=True, tracer=tracer), + can_use_tool=can_use_tool, + ) + + async def prompt_stream(): + yield { + "type": "user", + "message": { + "role": "user", + "content": "List files in the current directory", + }, + "session_id": "test-session", + } + + async with ClaudeSDKClient(options=options) as client: + await client.query(prompt_stream()) + async for _ in client.receive_response(): + pass + + # Only check for permission spans if permissions were actually requested + if permission_calls: + spans = exporter.get_finished_spans() + span_names = {span.name for span in spans} + assert "claude_agent_sdk.permission.can_use_tool" in span_names, ( + f"Expected permission.can_use_tool span. Got spans: {sorted(span_names)}" + ) + + perm_span = next( + s for s in spans if s.name == "claude_agent_sdk.permission.can_use_tool" + ) + assert "tool.name" in perm_span.attributes + assert "permission.behavior" in perm_span.attributes + assert perm_span.attributes.get("permission.behavior") == "allow" + + +# ============================================================================= +# Error Recording Tests +# ============================================================================= + + +@pytest.mark.e2e +@pytest.mark.asyncio +async def test_telemetry_error_recording_invalid_cwd(api_key, telemetry_tracer): + """Verify that errors are properly recorded on spans when connection fails.""" + tracer, exporter = telemetry_tracer + options = ClaudeAgentOptions( + telemetry=TelemetryOptions(enabled=True, tracer=tracer), + cwd="/nonexistent/path/that/does/not/exist/at/all", + ) + + with pytest.raises(CLIConnectionError): + async with ClaudeSDKClient(options=options) as client: + await client.query("test") + async for _ in client.receive_response(): + pass + + spans = exporter.get_finished_spans() + # Find any span with an error status + error_spans = [s for s in spans if s.status.status_code == StatusCode.ERROR] + assert len(error_spans) > 0, ( + f"Expected at least one span with error status. " + f"Got spans: {[(s.name, s.status.status_code) for s in spans]}" + ) + + +# ============================================================================= +# Additional Metrics Tests +# ============================================================================= + + +@pytest.mark.e2e +@pytest.mark.asyncio +async def test_telemetry_duration_metrics(api_key, telemetry_tracer, telemetry_meter): + """Verify duration-related metrics are recorded.""" + tracer, _ = telemetry_tracer + meter, reader, provider = telemetry_meter + options = ClaudeAgentOptions( + telemetry=TelemetryOptions(enabled=True, tracer=tracer, meter=meter), + ) + + async with ClaudeSDKClient(options=options) as client: + await client.query("Respond with 'OK'.") + async for _ in client.receive_response(): + pass + + provider.force_flush() + metrics_data = reader.get_metrics_data() + metric_names = get_metric_names(metrics_data) + + # Check for duration metrics + assert "claude_agent_sdk.result.duration_ms" in metric_names + assert "claude_agent_sdk.model.latency_ms" in metric_names + + # Verify duration > 0 + duration = get_metric_by_name(metrics_data, "claude_agent_sdk.result.duration_ms") + if duration and duration.data.data_points: + # Histogram data points have different structure + assert len(duration.data.data_points) > 0 + + +@pytest.mark.e2e +@pytest.mark.asyncio +async def test_telemetry_invocation_counter(api_key, telemetry_tracer, telemetry_meter): + """Verify invocation counter is incremented.""" + tracer, _ = telemetry_tracer + meter, reader, provider = telemetry_meter + options = ClaudeAgentOptions( + telemetry=TelemetryOptions(enabled=True, tracer=tracer, meter=meter), + ) + + async with ClaudeSDKClient(options=options) as client: + await client.query("Respond with 'OK'.") + async for _ in client.receive_response(): + pass + + provider.force_flush() + metrics_data = reader.get_metrics_data() + metric_names = get_metric_names(metrics_data) + + assert "claude_agent_sdk.invocations" in metric_names + + invocations = get_metric_by_name(metrics_data, "claude_agent_sdk.invocations") + assert invocations is not None + # Should have at least 1 invocation + total = sum(p.value for p in invocations.data.data_points) + assert total >= 1, f"Expected at least 1 invocation, got {total}" diff --git a/src/claude_agent_sdk/types.py b/src/claude_agent_sdk/types.py index ea3e7715..97ac4b52 100644 --- a/src/claude_agent_sdk/types.py +++ b/src/claude_agent_sdk/types.py @@ -714,8 +714,8 @@ class TelemetryOptions: """Telemetry configuration for tracing and metrics.""" enabled: bool = False - tracer: Tracer | None = None - meter: Meter | None = None + tracer: "Tracer | None" = None + meter: "Meter | None" = None def __post_init__(self) -> None: if self.tracer is not None and not hasattr(