Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix integration test for span cleanup #1374

Merged
merged 1 commit into from
Feb 7, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions truss/templates/server/model_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ async def _stream_with_background_task(
generator: Union[Generator[bytes, None, None], AsyncGenerator[bytes, None]],
span: trace.Span,
trace_ctx: trace.Context,
release_and_end: Callable[[], None],
cleanup_fn: Callable[[], None],
) -> AsyncGenerator[bytes, None]:
# The streaming read timeout is the amount of time in between streamed chunk
# before a timeout is triggered.
Expand All @@ -661,7 +661,7 @@ async def _stream_with_background_task(
self._write_response_to_queue(response_queue, async_generator, span)
)
# Defer the release of the semaphore until the write_response_to_queue task.
gen_task.add_done_callback(lambda _: release_and_end())
gen_task.add_done_callback(lambda _: cleanup_fn())

# The gap between responses in a stream must be < streaming_read_timeout
# TODO: this whole buffering might be superfluous and sufficiently done by
Expand Down Expand Up @@ -717,7 +717,7 @@ async def _process_model_fn(

if inspect.isgenerator(result) or inspect.isasyncgen(result):
return await self._handle_generator_response(
request, result, fn_span, detached_ctx, release_and_end=lambda: None
request, result, fn_span, detached_ctx
)

return result
Expand All @@ -738,13 +738,13 @@ async def _handle_generator_response(
generator: Union[Generator[bytes, None, None], AsyncGenerator[bytes, None]],
span: trace.Span,
trace_ctx: trace.Context,
release_and_end: Callable[[], None],
get_cleanup_fn: Callable[[], Callable[[], None]] = lambda: lambda: None,
):
if self._should_gather_generator(request):
return await _gather_generator(generator)
else:
return await self._stream_with_background_task(
generator, span, trace_ctx, release_and_end
generator, span, trace_ctx, cleanup_fn=get_cleanup_fn()
)

async def completions(
Expand Down Expand Up @@ -824,7 +824,7 @@ async def __call__(
predict_result,
span_predict,
detached_ctx,
release_and_end=get_defer_fn(),
get_cleanup_fn=get_defer_fn,
Copy link
Contributor Author

@nnarayen nnarayen Feb 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, we'd only request the defer_fn if we knew we were going to execute it explicitly, which prevents the default behavior of releasing the semaphore (and ending a span). My refactor unintentionally changed this behavior, so that we wouldn't release the semaphore if we invoked my new shared helper. Very lucky the integration test caught this.

)

if isinstance(predict_result, starlette.responses.Response):
Expand Down
Loading