Skip to content

Commit

Permalink
Capture httpx response JSON bodies (#700)
Browse files Browse the repository at this point in the history
Co-authored-by: Marcelo Trylesinski <marcelotryle@gmail.com>
  • Loading branch information
alexmojaki and Kludex authored Dec 20, 2024
1 parent cac58a9 commit 930fa87
Show file tree
Hide file tree
Showing 3 changed files with 303 additions and 10 deletions.
93 changes: 85 additions & 8 deletions logfire/_internal/integrations/httpx.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import httpx

from logfire.propagate import attach_context, get_context

try:
from opentelemetry.instrumentation.httpx import (
AsyncRequestHook,
Expand Down Expand Up @@ -64,6 +66,7 @@ def instrument_httpx(
capture_request_headers: bool,
capture_response_headers: bool,
capture_request_json_body: bool,
capture_response_json_body: bool,
**kwargs: Unpack[ClientKwargs],
) -> None: ...

Expand All @@ -74,6 +77,7 @@ def instrument_httpx(
capture_request_headers: bool,
capture_response_headers: bool,
capture_request_json_body: bool,
capture_response_json_body: bool,
**kwargs: Unpack[AsyncClientKwargs],
) -> None: ...

Expand All @@ -84,6 +88,7 @@ def instrument_httpx(
capture_request_headers: bool,
capture_response_headers: bool,
capture_request_json_body: bool,
capture_response_json_body: bool,
**kwargs: Unpack[HTTPXInstrumentKwargs],
) -> None: ...

Expand All @@ -94,6 +99,7 @@ def instrument_httpx(
capture_request_headers: bool,
capture_response_headers: bool,
capture_request_json_body: bool,
capture_response_json_body: bool,
**kwargs: Any,
) -> None:
"""Instrument the `httpx` module so that spans are automatically created for each request.
Expand All @@ -108,6 +114,7 @@ def instrument_httpx(
del kwargs # make sure only final_kwargs is used

instrumentor = HTTPXClientInstrumentor()
logfire_instance = logfire_instance.with_settings(custom_scope_suffix='httpx')

if client is None:
request_hook = cast('RequestHook | None', final_kwargs.get('request_hook'))
Expand All @@ -117,11 +124,15 @@ def instrument_httpx(
final_kwargs['request_hook'] = make_request_hook(
request_hook, capture_request_headers, capture_request_json_body
)
final_kwargs['response_hook'] = make_response_hook(response_hook, capture_response_headers)
final_kwargs['response_hook'] = make_response_hook(
response_hook, capture_response_headers, capture_response_json_body, logfire_instance
)
final_kwargs['async_request_hook'] = make_async_request_hook(
async_request_hook, capture_request_headers, capture_request_json_body
)
final_kwargs['async_response_hook'] = make_async_response_hook(async_response_hook, capture_response_headers)
final_kwargs['async_response_hook'] = make_async_response_hook(
async_response_hook, capture_response_headers, capture_response_json_body, logfire_instance
)

instrumentor.instrument(**final_kwargs)
else:
Expand All @@ -130,13 +141,17 @@ def instrument_httpx(
response_hook = cast('ResponseHook | AsyncResponseHook | None', final_kwargs.get('response_hook'))

request_hook = make_async_request_hook(request_hook, capture_request_headers, capture_request_json_body)
response_hook = make_async_response_hook(response_hook, capture_response_headers)
response_hook = make_async_response_hook(
response_hook, capture_response_headers, capture_response_json_body, logfire_instance
)
else:
request_hook = cast('RequestHook | None', final_kwargs.get('request_hook'))
response_hook = cast('ResponseHook | None', final_kwargs.get('response_hook'))

request_hook = make_request_hook(request_hook, capture_request_headers, capture_request_json_body)
response_hook = make_response_hook(response_hook, capture_response_headers)
response_hook = make_response_hook(
response_hook, capture_response_headers, capture_response_json_body, logfire_instance
)

tracer_provider = final_kwargs['tracer_provider']
instrumentor.instrument_client(client, tracer_provider, request_hook, response_hook)
Expand Down Expand Up @@ -176,34 +191,96 @@ async def new_hook(span: Span, request: RequestInfo) -> None:
return new_hook


def make_response_hook(hook: ResponseHook | None, should_capture_headers: bool) -> ResponseHook | None:
if not should_capture_headers and not hook:
def make_response_hook(
hook: ResponseHook | None, should_capture_headers: bool, should_capture_json: bool, logfire_instance: Logfire
) -> ResponseHook | None:
if not should_capture_headers and not should_capture_json and not hook:
return None

def new_hook(span: Span, request: RequestInfo, response: ResponseInfo) -> None:
with handle_internal_errors():
if should_capture_headers:
capture_response_headers(span, response)
if should_capture_json:
capture_response_json(logfire_instance, response, False)
run_hook(hook, span, request, response)

return new_hook


def make_async_response_hook(
hook: ResponseHook | AsyncResponseHook | None, should_capture_headers: bool
hook: ResponseHook | AsyncResponseHook | None,
should_capture_headers: bool,
should_capture_json: bool,
logfire_instance: Logfire,
) -> AsyncResponseHook | None:
if not should_capture_headers and not hook:
if not should_capture_headers and not should_capture_json and not hook:
return None

async def new_hook(span: Span, request: RequestInfo, response: ResponseInfo) -> None:
with handle_internal_errors():
if should_capture_headers:
capture_response_headers(span, response)
if should_capture_json:
capture_response_json(logfire_instance, response, True)
await run_async_hook(hook, span, request, response)

return new_hook


def capture_response_json(logfire_instance: Logfire, response_info: ResponseInfo, is_async: bool) -> None:
headers = cast('httpx.Headers', response_info.headers)
if not headers.get('content-type', '').lower().startswith('application/json'):
return

frame = inspect.currentframe().f_back.f_back # type: ignore
while frame:
response = frame.f_locals.get('response')
frame = frame.f_back
if isinstance(response, httpx.Response): # pragma: no branch
break
else: # pragma: no cover
return

ctx = get_context()
attr_name = 'http.response.body.json'

if is_async: # these two branches should be kept almost identical
original_aread = response.aread

async def aread(*args: Any, **kwargs: Any):
try:
# Only log the body the first time it's read
return response.content
except httpx.ResponseNotRead:
pass
with attach_context(ctx), logfire_instance.span('Reading response body') as span:
content = await original_aread(*args, **kwargs)
span.set_attribute(attr_name, {}) # Set the JSON schema
# Set the attribute to the raw text so that the backend can parse it
span._span.set_attribute(attr_name, response.text) # type: ignore
return content

response.aread = aread
else:
original_read = response.read

def read(*args: Any, **kwargs: Any):
try:
# Only log the body the first time it's read
return response.content
except httpx.ResponseNotRead:
pass
with attach_context(ctx), logfire_instance.span('Reading response body') as span:
content = original_read(*args, **kwargs)
span.set_attribute(attr_name, {}) # Set the JSON schema
# Set the attribute to the raw text so that the backend can parse it
span._span.set_attribute(attr_name, response.text) # type: ignore
return content

response.read = read


async def run_async_hook(hook: Callable[P, Any] | None, *args: P.args, **kwargs: P.kwargs) -> None:
if hook:
result = hook(*args, **kwargs)
Expand Down
6 changes: 6 additions & 0 deletions logfire/_internal/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -1165,6 +1165,7 @@ def instrument_httpx(
capture_request_headers: bool = False,
capture_response_headers: bool = False,
capture_request_json_body: bool = False,
capture_response_json_body: bool = False,
**kwargs: Unpack[ClientKwargs],
) -> None: ...

Expand All @@ -1175,6 +1176,7 @@ def instrument_httpx(
capture_request_headers: bool = False,
capture_response_headers: bool = False,
capture_request_json_body: bool = False,
capture_response_json_body: bool = False,
**kwargs: Unpack[AsyncClientKwargs],
) -> None: ...

Expand All @@ -1185,6 +1187,7 @@ def instrument_httpx(
capture_request_headers: bool = False,
capture_response_headers: bool = False,
capture_request_json_body: bool = False,
capture_response_json_body: bool = False,
**kwargs: Unpack[HTTPXInstrumentKwargs],
) -> None: ...

Expand All @@ -1194,6 +1197,7 @@ def instrument_httpx(
capture_request_headers: bool = False,
capture_response_headers: bool = False,
capture_request_json_body: bool = False,
capture_response_json_body: bool = False,
**kwargs: Any,
) -> None:
"""Instrument the `httpx` module so that spans are automatically created for each request.
Expand All @@ -1210,6 +1214,7 @@ def instrument_httpx(
capture_request_headers: Set to `True` to capture all request headers.
capture_response_headers: Set to `True` to capture all response headers.
capture_request_json_body: Set to `True` to capture the request JSON body.
capture_response_json_body: Set to `True` to capture the response JSON body.
**kwargs: Additional keyword arguments to pass to the OpenTelemetry `instrument` method, for future compatibility.
"""
from .integrations.httpx import instrument_httpx
Expand All @@ -1221,6 +1226,7 @@ def instrument_httpx(
capture_request_headers,
capture_response_headers,
capture_request_json_body=capture_request_json_body,
capture_response_json_body=capture_response_json_body,
**kwargs,
)

Expand Down
Loading

0 comments on commit 930fa87

Please sign in to comment.