From 1e24fd47ff01be5141afb44e9d5745eaa36ee932 Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Wed, 19 Mar 2025 17:04:54 +0100 Subject: [PATCH 1/5] openai: inherit StreamWrapper from wrapt's ObjectProxy So we have whatever attribute the callers expect. --- .../opentelemetry/instrumentation/openai/wrappers.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/wrappers.py b/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/wrappers.py index a9193b8..10e418e 100644 --- a/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/wrappers.py +++ b/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/wrappers.py @@ -28,13 +28,14 @@ from opentelemetry.trace import Span from opentelemetry.trace.status import StatusCode from opentelemetry.util.types import Attributes +from wrapt import ObjectProxy EVENT_GEN_AI_CONTENT_COMPLETION = "gen_ai.content.completion" logger = logging.getLogger(__name__) -class StreamWrapper: +class StreamWrapper(ObjectProxy): def __init__( self, stream, @@ -47,7 +48,8 @@ def __init__( token_usage_metric: Histogram, operation_duration_metric: Histogram, ): - self.stream = stream + super().__init__(stream) + self.span = span self.span_attributes = span_attributes self.capture_message_content = capture_message_content @@ -121,7 +123,7 @@ def __aiter__(self): def __next__(self): try: - chunk = next(self.stream) + chunk = next(self.__wrapped__) self.process_chunk(chunk) return chunk except Exception as exc: @@ -130,7 +132,7 @@ def __next__(self): async def __anext__(self): try: - chunk = await self.stream.__anext__() + chunk = await self.__wrapped__.__anext__() self.process_chunk(chunk) return chunk except Exception as exc: From 6a16f04a9343ec8cc81833b8fc79ca6bc7534f1d Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Wed, 19 Mar 2025 17:51:39 +0100 Subject: [PATCH 2/5] Try to map StreamWrapper to actual code --- .../instrumentation/openai/wrappers.py | 30 +++++++------------ 1 file changed, 11 insertions(+), 19 deletions(-) diff --git a/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/wrappers.py b/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/wrappers.py index 10e418e..c41284f 100644 --- a/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/wrappers.py +++ b/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/wrappers.py @@ -109,32 +109,24 @@ def process_chunk(self, chunk): if hasattr(chunk, "service_tier"): self.service_tier = chunk.service_tier - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_value, traceback): - self.end(exc_value) - def __iter__(self): - return self - - def __aiter__(self): - return self - - def __next__(self): try: - chunk = next(self.__wrapped__) - self.process_chunk(chunk) - return chunk + for chunk in self.__wrapped__: + self.process_chunk(chunk) + yield chunk except Exception as exc: self.end(exc) raise + self.end() - async def __anext__(self): + async def __aiter__(self): try: - chunk = await self.__wrapped__.__anext__() - self.process_chunk(chunk) - return chunk + async for chunk in self.__wrapped__: + print("chunk") + self.process_chunk(chunk) + yield chunk except Exception as exc: + print("exc!", exc) self.end(exc) raise + self.end() From 111b322c0cfbb8aea6a7d21c27dbf7064aa6ba0b Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Wed, 19 Mar 2025 18:07:19 +0100 Subject: [PATCH 3/5] Remove dead code --- .../src/opentelemetry/instrumentation/openai/wrappers.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/wrappers.py b/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/wrappers.py index c41284f..d7e9e6f 100644 --- a/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/wrappers.py +++ b/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/wrappers.py @@ -66,8 +66,7 @@ def __init__( self.service_tier = None def end(self, exc=None): - # StopIteration is not an error, it signals that we have consumed all the stream - if exc is not None and not isinstance(exc, (StopIteration, StopAsyncIteration)): + if exc is not None: self.span.set_status(StatusCode.ERROR, str(exc)) self.span.set_attribute(ERROR_TYPE, exc.__class__.__qualname__) self.span.end() From 01f2ab21cdc4ec049f423aab1a91a4d16dd207ba Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Fri, 21 Mar 2025 12:18:34 +0100 Subject: [PATCH 4/5] Add tests --- .../instrumentation/openai/__init__.py | 14 + .../instrumentation/openai/wrappers.py | 15 +- .../test_chat_stream_with_raw_response.yaml | 110 ++++++++ .../test_chat_with_raw_response.yaml | 137 ++++++++++ .../tests/test_chat_completions.py | 248 ++++++++++++++++++ 5 files changed, 520 insertions(+), 4 deletions(-) create mode 100644 instrumentation/elastic-opentelemetry-instrumentation-openai/tests/cassettes/test_chat_stream_with_raw_response.yaml create mode 100644 instrumentation/elastic-opentelemetry-instrumentation-openai/tests/cassettes/test_chat_with_raw_response.yaml diff --git a/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/__init__.py b/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/__init__.py index 2042ef9..4837d4f 100644 --- a/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/__init__.py +++ b/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/__init__.py @@ -163,6 +163,8 @@ def _uninstrument(self, **kwargs): unwrap(openai.resources.embeddings.AsyncEmbeddings, "create") def _chat_completion_wrapper(self, wrapped, instance, args, kwargs): + from openai._legacy_response import LegacyAPIResponse + logger.debug(f"{wrapped} kwargs: {kwargs}") span_attributes = _get_attributes_from_wrapper(instance, kwargs) @@ -195,6 +197,7 @@ def _chat_completion_wrapper(self, wrapped, instance, args, kwargs): _record_operation_duration_metric(self.operation_duration_metric, error_attributes, start_time) raise + is_raw_response = isinstance(result, LegacyAPIResponse) if kwargs.get("stream"): return StreamWrapper( stream=result, @@ -206,10 +209,14 @@ def _chat_completion_wrapper(self, wrapped, instance, args, kwargs): start_time=start_time, token_usage_metric=self.token_usage_metric, operation_duration_metric=self.operation_duration_metric, + is_raw_response=is_raw_response, ) logger.debug(f"openai.resources.chat.completions.Completions.create result: {result}") + # if the caller is using with_raw_response we need to parse the output to get the response class we expect + if is_raw_response: + result = result.parse() response_attributes = _get_attributes_from_response( result.id, result.model, result.choices, result.usage, getattr(result, "service_tier", None) ) @@ -233,6 +240,8 @@ def _chat_completion_wrapper(self, wrapped, instance, args, kwargs): return result async def _async_chat_completion_wrapper(self, wrapped, instance, args, kwargs): + from openai._legacy_response import LegacyAPIResponse + logger.debug(f"openai.resources.chat.completions.AsyncCompletions.create kwargs: {kwargs}") span_attributes = _get_attributes_from_wrapper(instance, kwargs) @@ -265,6 +274,7 @@ async def _async_chat_completion_wrapper(self, wrapped, instance, args, kwargs): _record_operation_duration_metric(self.operation_duration_metric, error_attributes, start_time) raise + is_raw_response = isinstance(result, LegacyAPIResponse) if kwargs.get("stream"): return StreamWrapper( stream=result, @@ -276,10 +286,14 @@ async def _async_chat_completion_wrapper(self, wrapped, instance, args, kwargs): start_time=start_time, token_usage_metric=self.token_usage_metric, operation_duration_metric=self.operation_duration_metric, + is_raw_response=is_raw_response, ) logger.debug(f"openai.resources.chat.completions.AsyncCompletions.create result: {result}") + # if the caller is using with_raw_response we need to parse the output to get the response class we expect + if is_raw_response: + result = result.parse() response_attributes = _get_attributes_from_response( result.id, result.model, result.choices, result.usage, getattr(result, "service_tier", None) ) diff --git a/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/wrappers.py b/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/wrappers.py index d7e9e6f..8256822 100644 --- a/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/wrappers.py +++ b/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/wrappers.py @@ -47,7 +47,9 @@ def __init__( start_time: float, token_usage_metric: Histogram, operation_duration_metric: Histogram, + is_raw_response: bool, ): + # we need to wrap the original response even in case of raw_responses super().__init__(stream) self.span = span @@ -58,6 +60,7 @@ def __init__( self.token_usage_metric = token_usage_metric self.operation_duration_metric = operation_duration_metric self.start_time = start_time + self.is_raw_response = is_raw_response self.response_id = None self.model = None @@ -109,8 +112,11 @@ def process_chunk(self, chunk): self.service_tier = chunk.service_tier def __iter__(self): + stream = self.__wrapped__ try: - for chunk in self.__wrapped__: + if self.is_raw_response: + stream = stream.parse() + for chunk in stream: self.process_chunk(chunk) yield chunk except Exception as exc: @@ -119,13 +125,14 @@ def __iter__(self): self.end() async def __aiter__(self): + stream = self.__wrapped__ try: - async for chunk in self.__wrapped__: - print("chunk") + if self.is_raw_response: + stream = stream.parse() + async for chunk in stream: self.process_chunk(chunk) yield chunk except Exception as exc: - print("exc!", exc) self.end(exc) raise self.end() diff --git a/instrumentation/elastic-opentelemetry-instrumentation-openai/tests/cassettes/test_chat_stream_with_raw_response.yaml b/instrumentation/elastic-opentelemetry-instrumentation-openai/tests/cassettes/test_chat_stream_with_raw_response.yaml new file mode 100644 index 0000000..0f44a59 --- /dev/null +++ b/instrumentation/elastic-opentelemetry-instrumentation-openai/tests/cassettes/test_chat_stream_with_raw_response.yaml @@ -0,0 +1,110 @@ +interactions: +- request: + body: |- + { + "messages": [ + { + "role": "user", + "content": "Answer in up to 3 words: Which ocean contains Bouvet Island?" + } + ], + "model": "gpt-4o-mini", + "stream": true + } + headers: + accept: + - application/json + accept-encoding: + - gzip, deflate + authorization: + - Bearer test_openai_api_key + connection: + - keep-alive + content-length: + - '147' + content-type: + - application/json + host: + - api.openai.com + user-agent: + - OpenAI/Python 1.54.4 + x-stainless-arch: + - x64 + x-stainless-async: + - 'false' + x-stainless-lang: + - python + x-stainless-os: + - Linux + x-stainless-package-version: + - 1.54.4 + x-stainless-raw-response: + - 'true' + x-stainless-retry-count: + - '0' + x-stainless-runtime: + - CPython + x-stainless-runtime-version: + - 3.12.9 + method: POST + uri: https://api.openai.com/v1/chat/completions + response: + body: + string: |+ + data: {"id":"chatcmpl-BDDnEHqYLBd36X8hHNTQfPKx4KMJT","object":"chat.completion.chunk","created":1742490984,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_b8bc95a0ac","choices":[{"index":0,"delta":{"role":"assistant","content":"","refusal":null},"logprobs":null,"finish_reason":null}]} + + data: {"id":"chatcmpl-BDDnEHqYLBd36X8hHNTQfPKx4KMJT","object":"chat.completion.chunk","created":1742490984,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_b8bc95a0ac","choices":[{"index":0,"delta":{"content":"Atlantic"},"logprobs":null,"finish_reason":null}]} + + data: {"id":"chatcmpl-BDDnEHqYLBd36X8hHNTQfPKx4KMJT","object":"chat.completion.chunk","created":1742490984,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_b8bc95a0ac","choices":[{"index":0,"delta":{"content":" Ocean"},"logprobs":null,"finish_reason":null}]} + + data: {"id":"chatcmpl-BDDnEHqYLBd36X8hHNTQfPKx4KMJT","object":"chat.completion.chunk","created":1742490984,"model":"gpt-4o-mini-2024-07-18","service_tier":"default","system_fingerprint":"fp_b8bc95a0ac","choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"stop"}]} + + data: [DONE] + + headers: + CF-RAY: + - 9236db6a7e55ed43-MXP + Connection: + - keep-alive + Content-Type: + - text/event-stream; charset=utf-8 + Date: + - Thu, 20 Mar 2025 17:16:24 GMT + Server: + - cloudflare + Set-Cookie: test_set_cookie + Transfer-Encoding: + - chunked + X-Content-Type-Options: + - nosniff + access-control-expose-headers: + - X-Request-ID + alt-svc: + - h3=":443"; ma=86400 + cf-cache-status: + - DYNAMIC + openai-organization: test_openai_org_id + openai-processing-ms: + - '284' + openai-version: + - '2020-10-01' + strict-transport-security: + - max-age=31536000; includeSubDomains; preload + x-ratelimit-limit-requests: + - '10000' + x-ratelimit-limit-tokens: + - '200000' + x-ratelimit-remaining-requests: + - '9998' + x-ratelimit-remaining-tokens: + - '199983' + x-ratelimit-reset-requests: + - 16.088s + x-ratelimit-reset-tokens: + - 5ms + x-request-id: + - req_d6f6a5d19533f6596e408dd665f07ec5 + status: + code: 200 + message: OK +version: 1 diff --git a/instrumentation/elastic-opentelemetry-instrumentation-openai/tests/cassettes/test_chat_with_raw_response.yaml b/instrumentation/elastic-opentelemetry-instrumentation-openai/tests/cassettes/test_chat_with_raw_response.yaml new file mode 100644 index 0000000..77ef6bf --- /dev/null +++ b/instrumentation/elastic-opentelemetry-instrumentation-openai/tests/cassettes/test_chat_with_raw_response.yaml @@ -0,0 +1,137 @@ +interactions: +- request: + body: |- + { + "messages": [ + { + "role": "user", + "content": "Answer in up to 3 words: Which ocean contains Bouvet Island?" + } + ], + "model": "gpt-4o-mini" + } + headers: + accept: + - application/json + accept-encoding: + - gzip, deflate + authorization: + - Bearer test_openai_api_key + connection: + - keep-alive + content-length: + - '131' + content-type: + - application/json + host: + - api.openai.com + user-agent: + - OpenAI/Python 1.54.4 + x-stainless-arch: + - x64 + x-stainless-async: + - 'false' + x-stainless-lang: + - python + x-stainless-os: + - Linux + x-stainless-package-version: + - 1.54.4 + x-stainless-raw-response: + - 'true' + x-stainless-retry-count: + - '0' + x-stainless-runtime: + - CPython + x-stainless-runtime-version: + - 3.12.9 + method: POST + uri: https://api.openai.com/v1/chat/completions + response: + body: + string: |- + { + "id": "chatcmpl-BDDnDacM4nUxi3Qsplkrewf7L7Y10", + "object": "chat.completion", + "created": 1742490983, + "model": "gpt-4o-mini-2024-07-18", + "choices": [ + { + "index": 0, + "message": { + "role": "assistant", + "content": "South Atlantic Ocean.", + "refusal": null, + "annotations": [] + }, + "logprobs": null, + "finish_reason": "stop" + } + ], + "usage": { + "prompt_tokens": 22, + "completion_tokens": 5, + "total_tokens": 27, + "prompt_tokens_details": { + "cached_tokens": 0, + "audio_tokens": 0 + }, + "completion_tokens_details": { + "reasoning_tokens": 0, + "audio_tokens": 0, + "accepted_prediction_tokens": 0, + "rejected_prediction_tokens": 0 + } + }, + "service_tier": "default", + "system_fingerprint": "fp_b8bc95a0ac" + } + headers: + CF-RAY: + - 9236db5c4c0b0e0a-MXP + Connection: + - keep-alive + Content-Type: + - application/json + Date: + - Thu, 20 Mar 2025 17:16:23 GMT + Server: + - cloudflare + Set-Cookie: test_set_cookie + Transfer-Encoding: + - chunked + X-Content-Type-Options: + - nosniff + access-control-expose-headers: + - X-Request-ID + alt-svc: + - h3=":443"; ma=86400 + cf-cache-status: + - DYNAMIC + content-length: + - '827' + openai-organization: test_openai_org_id + openai-processing-ms: + - '751' + openai-version: + - '2020-10-01' + strict-transport-security: + - max-age=31536000; includeSubDomains; preload + x-ratelimit-limit-requests: + - '10000' + x-ratelimit-limit-tokens: + - '200000' + x-ratelimit-remaining-requests: + - '9999' + x-ratelimit-remaining-tokens: + - '199982' + x-ratelimit-reset-requests: + - 8.64s + x-ratelimit-reset-tokens: + - 5ms + x-request-id: + - req_5cd9e571d9b49c3af52c942f4e8df240 + status: + code: 200 + message: OK +version: 1 diff --git a/instrumentation/elastic-opentelemetry-instrumentation-openai/tests/test_chat_completions.py b/instrumentation/elastic-opentelemetry-instrumentation-openai/tests/test_chat_completions.py index bcbffaf..8d8c1d5 100644 --- a/instrumentation/elastic-opentelemetry-instrumentation-openai/tests/test_chat_completions.py +++ b/instrumentation/elastic-opentelemetry-instrumentation-openai/tests/test_chat_completions.py @@ -130,6 +130,71 @@ def test_chat(default_openai_env, trace_exporter, metrics_reader, logs_exporter) ) +@pytest.mark.vcr() +def test_chat_with_raw_response(default_openai_env, trace_exporter, metrics_reader, logs_exporter): + client = openai.OpenAI() + + messages = [ + { + "role": "user", + "content": TEST_CHAT_INPUT, + } + ] + + chat_completion = client.chat.completions.with_raw_response.create(model=TEST_CHAT_MODEL, messages=messages) + + assert chat_completion.choices[0].message.content == "South Atlantic Ocean." + + spans = trace_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + assert span.name == f"chat {TEST_CHAT_MODEL}" + assert span.kind == SpanKind.CLIENT + assert span.status.status_code == StatusCode.UNSET + + address, port = address_and_port(client) + assert dict(span.attributes) == { + GEN_AI_OPENAI_RESPONSE_SERVICE_TIER: "default", + GEN_AI_OPERATION_NAME: "chat", + GEN_AI_REQUEST_MODEL: TEST_CHAT_MODEL, + GEN_AI_SYSTEM: "openai", + GEN_AI_RESPONSE_ID: "chatcmpl-BDDnDacM4nUxi3Qsplkrewf7L7Y10", + GEN_AI_RESPONSE_MODEL: TEST_CHAT_RESPONSE_MODEL, + GEN_AI_RESPONSE_FINISH_REASONS: ("stop",), + GEN_AI_USAGE_INPUT_TOKENS: 22, + GEN_AI_USAGE_OUTPUT_TOKENS: 5, + SERVER_ADDRESS: address, + SERVER_PORT: port, + } + + logs = logs_exporter.get_finished_logs() + assert len(logs) == 2 + log_records = logrecords_from_logs(logs) + user_message, choice = log_records + assert dict(user_message.attributes) == {"gen_ai.system": "openai", "event.name": "gen_ai.user.message"} + assert dict(user_message.body) == {} + + assert_stop_log_record(choice) + + operation_duration_metric, token_usage_metric = get_sorted_metrics(metrics_reader) + attributes = { + GEN_AI_REQUEST_MODEL: TEST_CHAT_MODEL, + GEN_AI_RESPONSE_MODEL: "gpt-4o-mini-2024-07-18", + } + assert_operation_duration_metric( + client, "chat", operation_duration_metric, attributes=attributes, min_data_point=0.006761051714420319 + ) + assert_token_usage_metric( + client, + "chat", + token_usage_metric, + attributes=attributes, + input_data_point=span.attributes[GEN_AI_USAGE_INPUT_TOKENS], + output_data_point=span.attributes[GEN_AI_USAGE_OUTPUT_TOKENS], + ) + + @pytest.mark.vcr() def test_chat_with_developer_role_message(default_openai_env, trace_exporter, metrics_reader, logs_exporter): client = openai.OpenAI() @@ -1013,6 +1078,64 @@ def test_chat_stream(default_openai_env, trace_exporter, metrics_reader, logs_ex ) +@pytest.mark.vcr() +def test_chat_stream_with_raw_response(default_openai_env, trace_exporter, metrics_reader, logs_exporter): + client = openai.OpenAI() + + messages = [ + { + "role": "user", + "content": TEST_CHAT_INPUT, + } + ] + + chat_completion = client.chat.completions.with_raw_response.create( + model=TEST_CHAT_MODEL, messages=messages, stream=True + ) + + chunks = [chunk.choices[0].delta.content or "" for chunk in chat_completion if chunk.choices] + assert "".join(chunks) == "Atlantic Ocean" + + spans = trace_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + assert span.name == f"chat {TEST_CHAT_MODEL}" + assert span.kind == SpanKind.CLIENT + assert span.status.status_code == StatusCode.UNSET + + address, port = address_and_port(client) + assert dict(span.attributes) == { + GEN_AI_OPENAI_RESPONSE_SERVICE_TIER: "default", + GEN_AI_OPERATION_NAME: "chat", + GEN_AI_REQUEST_MODEL: TEST_CHAT_MODEL, + GEN_AI_SYSTEM: "openai", + GEN_AI_RESPONSE_ID: "chatcmpl-BDDnEHqYLBd36X8hHNTQfPKx4KMJT", + GEN_AI_RESPONSE_MODEL: TEST_CHAT_RESPONSE_MODEL, + GEN_AI_RESPONSE_FINISH_REASONS: ("stop",), + SERVER_ADDRESS: address, + SERVER_PORT: port, + } + + logs = logs_exporter.get_finished_logs() + assert len(logs) == 2 + log_records = logrecords_from_logs(logs) + user_message, choice = log_records + assert dict(user_message.attributes) == {"gen_ai.system": "openai", "event.name": "gen_ai.user.message"} + assert dict(user_message.body) == {} + + assert_stop_log_record(choice) + + (operation_duration_metric,) = get_sorted_metrics(metrics_reader) + attributes = { + GEN_AI_REQUEST_MODEL: TEST_CHAT_MODEL, + GEN_AI_RESPONSE_MODEL: TEST_CHAT_RESPONSE_MODEL, + } + assert_operation_duration_metric( + client, "chat", operation_duration_metric, attributes=attributes, min_data_point=0.006761051714420319 + ) + + @pytest.mark.skipif(OPENAI_VERSION < (1, 35, 0), reason="service tier added in 1.35.0") @pytest.mark.vcr() def test_chat_stream_all_the_client_options(default_openai_env, trace_exporter, metrics_reader, logs_exporter): @@ -1678,6 +1801,72 @@ async def test_chat_async(default_openai_env, trace_exporter, metrics_reader, lo ) +@pytest.mark.asyncio +@pytest.mark.vcr() +async def test_chat_async_with_raw_response(default_openai_env, trace_exporter, metrics_reader, logs_exporter): + client = openai.AsyncOpenAI() + + messages = [ + { + "role": "user", + "content": TEST_CHAT_INPUT, + } + ] + + chat_completion = await client.chat.completions.with_raw_response.create(model=TEST_CHAT_MODEL, messages=messages) + + assert chat_completion.choices[0].message.content == "South Atlantic Ocean." + + spans = trace_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + assert span.name == f"chat {TEST_CHAT_MODEL}" + assert span.kind == SpanKind.CLIENT + assert span.status.status_code == StatusCode.UNSET + + address, port = address_and_port(client) + assert dict(span.attributes) == { + GEN_AI_OPENAI_RESPONSE_SERVICE_TIER: "default", + GEN_AI_OPERATION_NAME: "chat", + GEN_AI_REQUEST_MODEL: TEST_CHAT_MODEL, + GEN_AI_SYSTEM: "openai", + GEN_AI_RESPONSE_ID: "chatcmpl-BDDnDacM4nUxi3Qsplkrewf7L7Y10", + GEN_AI_RESPONSE_MODEL: TEST_CHAT_RESPONSE_MODEL, + GEN_AI_RESPONSE_FINISH_REASONS: ("stop",), + GEN_AI_USAGE_INPUT_TOKENS: 22, + GEN_AI_USAGE_OUTPUT_TOKENS: 5, + SERVER_ADDRESS: address, + SERVER_PORT: port, + } + + logs = logs_exporter.get_finished_logs() + assert len(logs) == 2 + log_records = logrecords_from_logs(logs) + user_message, choice = log_records + assert dict(user_message.attributes) == {"gen_ai.system": "openai", "event.name": "gen_ai.user.message"} + assert dict(user_message.body) == {} + + assert_stop_log_record(choice) + + operation_duration_metric, token_usage_metric = get_sorted_metrics(metrics_reader) + attributes = { + GEN_AI_REQUEST_MODEL: TEST_CHAT_MODEL, + GEN_AI_RESPONSE_MODEL: TEST_CHAT_RESPONSE_MODEL, + } + assert_operation_duration_metric( + client, "chat", operation_duration_metric, attributes=attributes, min_data_point=0.006761051714420319 + ) + assert_token_usage_metric( + client, + "chat", + token_usage_metric, + attributes=attributes, + input_data_point=span.attributes[GEN_AI_USAGE_INPUT_TOKENS], + output_data_point=span.attributes[GEN_AI_USAGE_OUTPUT_TOKENS], + ) + + @pytest.mark.asyncio @pytest.mark.vcr() async def test_chat_async_with_capture_message_content( @@ -1880,6 +2069,65 @@ async def test_chat_async_stream(default_openai_env, trace_exporter, metrics_rea ) +@pytest.mark.vcr() +@pytest.mark.asyncio +async def test_chat_async_stream_with_raw_response(default_openai_env, trace_exporter, metrics_reader, logs_exporter): + client = openai.AsyncOpenAI() + + messages = [ + { + "role": "user", + "content": TEST_CHAT_INPUT, + } + ] + + chat_completion = await client.chat.completions.with_raw_response.create( + model=TEST_CHAT_MODEL, messages=messages, stream=True + ) + + chunks = [chunk.choices[0].delta.content or "" async for chunk in chat_completion if chunk.choices] + assert "".join(chunks) == "Atlantic Ocean" + + spans = trace_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + assert span.name == f"chat {TEST_CHAT_MODEL}" + assert span.kind == SpanKind.CLIENT + assert span.status.status_code == StatusCode.UNSET + + address, port = address_and_port(client) + assert dict(span.attributes) == { + GEN_AI_OPENAI_RESPONSE_SERVICE_TIER: "default", + GEN_AI_OPERATION_NAME: "chat", + GEN_AI_REQUEST_MODEL: TEST_CHAT_MODEL, + GEN_AI_SYSTEM: "openai", + GEN_AI_RESPONSE_ID: "chatcmpl-BDDnEHqYLBd36X8hHNTQfPKx4KMJT", + GEN_AI_RESPONSE_MODEL: TEST_CHAT_RESPONSE_MODEL, + GEN_AI_RESPONSE_FINISH_REASONS: ("stop",), + SERVER_ADDRESS: address, + SERVER_PORT: port, + } + + logs = logs_exporter.get_finished_logs() + assert len(logs) == 2 + log_records = logrecords_from_logs(logs) + user_message, choice = log_records + assert dict(user_message.attributes) == {"gen_ai.system": "openai", "event.name": "gen_ai.user.message"} + assert dict(user_message.body) == {} + + assert_stop_log_record(choice) + + (operation_duration_metric,) = get_sorted_metrics(metrics_reader) + attributes = { + GEN_AI_REQUEST_MODEL: TEST_CHAT_MODEL, + GEN_AI_RESPONSE_MODEL: TEST_CHAT_RESPONSE_MODEL, + } + assert_operation_duration_metric( + client, "chat", operation_duration_metric, attributes=attributes, min_data_point=0.006761051714420319 + ) + + @pytest.mark.vcr() @pytest.mark.asyncio async def test_chat_async_stream_with_capture_message_content( From bd701fa997f47c96ab81628945cba48304df4ca6 Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Fri, 21 Mar 2025 14:19:54 +0100 Subject: [PATCH 5/5] Run with_raw_response only with openai > 1.8.0 And move the check for a raw response to an helper --- .../opentelemetry/instrumentation/openai/__init__.py | 9 +++------ .../opentelemetry/instrumentation/openai/helpers.py | 10 ++++++++++ .../tests/test_chat_completions.py | 4 ++++ 3 files changed, 17 insertions(+), 6 deletions(-) diff --git a/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/__init__.py b/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/__init__.py index 4837d4f..c1d312d 100644 --- a/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/__init__.py +++ b/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/__init__.py @@ -30,6 +30,7 @@ _get_embeddings_attributes_from_response, _get_embeddings_attributes_from_wrapper, _get_event_attributes, + _is_raw_response, _record_operation_duration_metric, _record_token_usage_metrics, _send_log_events_from_choices, @@ -163,8 +164,6 @@ def _uninstrument(self, **kwargs): unwrap(openai.resources.embeddings.AsyncEmbeddings, "create") def _chat_completion_wrapper(self, wrapped, instance, args, kwargs): - from openai._legacy_response import LegacyAPIResponse - logger.debug(f"{wrapped} kwargs: {kwargs}") span_attributes = _get_attributes_from_wrapper(instance, kwargs) @@ -197,7 +196,7 @@ def _chat_completion_wrapper(self, wrapped, instance, args, kwargs): _record_operation_duration_metric(self.operation_duration_metric, error_attributes, start_time) raise - is_raw_response = isinstance(result, LegacyAPIResponse) + is_raw_response = _is_raw_response(result) if kwargs.get("stream"): return StreamWrapper( stream=result, @@ -240,8 +239,6 @@ def _chat_completion_wrapper(self, wrapped, instance, args, kwargs): return result async def _async_chat_completion_wrapper(self, wrapped, instance, args, kwargs): - from openai._legacy_response import LegacyAPIResponse - logger.debug(f"openai.resources.chat.completions.AsyncCompletions.create kwargs: {kwargs}") span_attributes = _get_attributes_from_wrapper(instance, kwargs) @@ -274,7 +271,7 @@ async def _async_chat_completion_wrapper(self, wrapped, instance, args, kwargs): _record_operation_duration_metric(self.operation_duration_metric, error_attributes, start_time) raise - is_raw_response = isinstance(result, LegacyAPIResponse) + is_raw_response = _is_raw_response(result) if kwargs.get("stream"): return StreamWrapper( stream=result, diff --git a/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/helpers.py b/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/helpers.py index 73144f5..98d862f 100644 --- a/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/helpers.py +++ b/instrumentation/elastic-opentelemetry-instrumentation-openai/src/opentelemetry/instrumentation/openai/helpers.py @@ -373,3 +373,13 @@ def _send_log_events_from_stream_choices( trace_flags=ctx.trace_flags, ) event_logger.emit(event) + + +def _is_raw_response(response): + try: + # available since 1.8.0 + from openai._legacy_response import LegacyAPIResponse + except ImportError: + return False + + return isinstance(response, LegacyAPIResponse) diff --git a/instrumentation/elastic-opentelemetry-instrumentation-openai/tests/test_chat_completions.py b/instrumentation/elastic-opentelemetry-instrumentation-openai/tests/test_chat_completions.py index 8d8c1d5..5ce1a35 100644 --- a/instrumentation/elastic-opentelemetry-instrumentation-openai/tests/test_chat_completions.py +++ b/instrumentation/elastic-opentelemetry-instrumentation-openai/tests/test_chat_completions.py @@ -130,6 +130,7 @@ def test_chat(default_openai_env, trace_exporter, metrics_reader, logs_exporter) ) +@pytest.mark.skipif(OPENAI_VERSION < (1, 8, 0), reason="LegacyAPIResponse available") @pytest.mark.vcr() def test_chat_with_raw_response(default_openai_env, trace_exporter, metrics_reader, logs_exporter): client = openai.OpenAI() @@ -1078,6 +1079,7 @@ def test_chat_stream(default_openai_env, trace_exporter, metrics_reader, logs_ex ) +@pytest.mark.skipif(OPENAI_VERSION < (1, 8, 0), reason="LegacyAPIResponse available") @pytest.mark.vcr() def test_chat_stream_with_raw_response(default_openai_env, trace_exporter, metrics_reader, logs_exporter): client = openai.OpenAI() @@ -1801,6 +1803,7 @@ async def test_chat_async(default_openai_env, trace_exporter, metrics_reader, lo ) +@pytest.mark.skipif(OPENAI_VERSION < (1, 8, 0), reason="LegacyAPIResponse available") @pytest.mark.asyncio @pytest.mark.vcr() async def test_chat_async_with_raw_response(default_openai_env, trace_exporter, metrics_reader, logs_exporter): @@ -2069,6 +2072,7 @@ async def test_chat_async_stream(default_openai_env, trace_exporter, metrics_rea ) +@pytest.mark.skipif(OPENAI_VERSION < (1, 8, 0), reason="LegacyAPIResponse available") @pytest.mark.vcr() @pytest.mark.asyncio async def test_chat_async_stream_with_raw_response(default_openai_env, trace_exporter, metrics_reader, logs_exporter):