Skip to content

Commit

Permalink
fix(openai): async streaming instrumentation (#245)
Browse files Browse the repository at this point in the history
  • Loading branch information
galkleinman authored Dec 12, 2023
1 parent 553e5e7 commit 3976bbf
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ def _set_response_attributes(span, response):


def is_streaming_response(response):
return isinstance(response, types.GeneratorType) or (
is_openai_v1() and isinstance(response, openai.Stream)
)
if is_openai_v1():
return isinstance(response, openai.Stream)

return isinstance(response, types.GeneratorType) or isinstance(response, types.AsyncGeneratorType)
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@

from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY
from opentelemetry.instrumentation.openai.utils import (
_with_tracer_wrapper,
start_as_current_span_async,
_with_tracer_wrapper
)
from opentelemetry.instrumentation.openai.shared import (
_set_request_attributes,
Expand Down Expand Up @@ -43,9 +42,8 @@ def chat_wrapper(tracer, wrapped, instance, args, kwargs):
if is_streaming_response(response):
# span will be closed after the generator is done
return _build_from_streaming_response(span, response)
else:
_handle_response(response, span)

_handle_response(response, span)
span.end()

return response
Expand All @@ -56,14 +54,18 @@ async def achat_wrapper(tracer, wrapped, instance, args, kwargs):
if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY):
return wrapped(*args, **kwargs)

async with start_as_current_span_async(
tracer=tracer, name=SPAN_NAME, kind=SpanKind.CLIENT
) as span:
_handle_request(span, kwargs)
response = await wrapped(*args, **kwargs)
_handle_response(response, span)
span = tracer.start_span(SPAN_NAME, kind=SpanKind.CLIENT)
_handle_request(span, kwargs)
response = await wrapped(*args, **kwargs)

if is_streaming_response(response):
# span will be closed after the generator is done
return _abuild_from_streaming_response(span, response)

_handle_response(response, span)
span.end()

return response
return response


def _handle_request(span, kwargs):
Expand Down Expand Up @@ -139,25 +141,24 @@ def _build_from_streaming_response(span, response):
complete_response = {"choices": [], "model": ""}
for item in response:
item_to_yield = item
if is_openai_v1():
item = item.model_dump()

for choice in item.get("choices"):
index = choice.get("index")
if len(complete_response.get("choices")) <= index:
complete_response["choices"].append(
{"index": index, "message": {"content": "", "role": ""}}
)
complete_choice = complete_response.get("choices")[index]
if choice.get("finish_reason"):
complete_choice["finish_reason"] = choice.get("finish_reason")

delta = choice.get("delta")

if delta.get("content"):
complete_choice["message"]["content"] += delta.get("content")
if delta.get("role"):
complete_choice["message"]["role"] = delta.get("role")
_accumulate_stream_items(item, complete_response)

yield item_to_yield

_set_response_attributes(span, complete_response)

if should_send_prompts():
_set_completions(span, complete_response.get("choices"))

span.set_status(Status(StatusCode.OK))
span.end()


async def _abuild_from_streaming_response(span, response):
complete_response = {"choices": [], "model": ""}
async for item in response:
item_to_yield = item
_accumulate_stream_items(item, complete_response)

yield item_to_yield

Expand All @@ -168,3 +169,25 @@ def _build_from_streaming_response(span, response):

span.set_status(Status(StatusCode.OK))
span.end()


def _accumulate_stream_items(item, complete_response):
if is_openai_v1():
item = item.model_dump()

for choice in item.get("choices"):
index = choice.get("index")
if len(complete_response.get("choices")) <= index:
complete_response["choices"].append(
{"index": index, "message": {"content": "", "role": ""}}
)
complete_choice = complete_response.get("choices")[index]
if choice.get("finish_reason"):
complete_choice["finish_reason"] = choice.get("finish_reason")

delta = choice.get("delta")

if delta.get("content"):
complete_choice["message"]["content"] += delta.get("content")
if delta.get("role"):
complete_choice["message"]["role"] = delta.get("role")

0 comments on commit 3976bbf

Please sign in to comment.