Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor httpx instrumentation #577

Merged
merged 24 commits into from
Jul 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
5fc1b4e
Create changelog copy.yml
lzchen Apr 30, 2021
1882a70
test
lzchen Apr 30, 2021
c095206
name
lzchen Apr 30, 2021
e291d1e
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
lzchen May 4, 2021
f5cb0df
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
lzchen May 24, 2021
35fcccc
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
lzchen May 25, 2021
f1f099a
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
lzchen May 25, 2021
9226d8b
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
lzchen Jun 1, 2021
ae7ccd5
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
lzchen Jun 1, 2021
d5b1c7f
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
lzchen Jun 2, 2021
f3c5162
Delete contributing-message.yml
lzchen Jun 2, 2021
7397369
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
lzchen Jun 2, 2021
beecaf4
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
lzchen Jun 2, 2021
8c34bd5
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
lzchen Jun 4, 2021
0ee5067
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
lzchen Jun 17, 2021
2eb0d59
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
lzchen Jun 28, 2021
eb9f9e0
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
lzchen Jul 9, 2021
3d218b7
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
lzchen Jul 12, 2021
272e757
http
lzchen Jul 12, 2021
58e60ec
http
lzchen Jul 12, 2021
9f8036e
test
lzchen Jul 12, 2021
ffe6612
test
lzchen Jul 12, 2021
8afa4bc
lint
lzchen Jul 13, 2021
6f50d10
Merge branch 'main' into httpx
lzchen Jul 14, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased](https://github.com/open-telemetry/opentelemetry-python/compare/v1.3.0-0.22b0...HEAD)
- `opentelemetry-sdk-extension-aws` Update AWS entry points to match spec
([#566](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/566))
- Include Flask 2.0 as compatible with existing flask instrumentation
([#545](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/545))
- `openelemetry-sdk-extension-aws` Take a dependency on `opentelemetry-sdk`
([#558](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/558))

### Changed
- `opentelemetry-instrumentation-tornado` properly instrument work done in tornado on_finish method.
Expand All @@ -36,6 +30,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#567](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/567))
- `opentelemetry-instrumentation-grpc` Fixed asynchonous unary call traces
([#536](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/536))
- `opentelemetry-sdk-extension-aws` Update AWS entry points to match spec
([#566](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/566))
- Include Flask 2.0 as compatible with existing flask instrumentation
([#545](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/545))
- `openelemetry-sdk-extension-aws` Take a dependency on `opentelemetry-sdk`
([#558](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/558))
- Change `opentelemetry-instrumentation-httpx` to replace `client` classes with instrumented versions.
([#577](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/577))

### Added
- `opentelemetry-instrumentation-httpx` Add `httpx` instrumentation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import typing

import httpx
Expand All @@ -31,6 +32,8 @@
from opentelemetry.trace.span import Span
from opentelemetry.trace.status import Status

_logger = logging.getLogger(__name__)

URL = typing.Tuple[bytes, bytes, typing.Optional[int], bytes]
Headers = typing.List[typing.Tuple[bytes, bytes]]
RequestHook = typing.Callable[[Span, "RequestInfo"], None]
Expand Down Expand Up @@ -258,98 +261,48 @@ async def handle_async_request(
return status_code, headers, stream, extensions


def _instrument(
tracer_provider: TracerProvider = None,
request_hook: typing.Optional[RequestHook] = None,
response_hook: typing.Optional[ResponseHook] = None,
) -> None:
"""Enables tracing of all Client and AsyncClient instances
When a Client or AsyncClient gets created, a telemetry transport is passed
in to the instance.
"""
# pylint:disable=unused-argument
def instrumented_sync_send(wrapped, instance, args, kwargs):
if context.get_value("suppress_instrumentation"):
return wrapped(*args, **kwargs)
class _InstrumentedClient(httpx.Client):

transport = instance._transport or httpx.HTTPTransport()
telemetry_transport = SyncOpenTelemetryTransport(
transport,
tracer_provider=tracer_provider,
request_hook=request_hook,
response_hook=response_hook,
)
_tracer_provider = None
_request_hook = None
_response_hook = None

instance._transport = telemetry_transport
return wrapped(*args, **kwargs)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

async def instrumented_async_send(wrapped, instance, args, kwargs):
if context.get_value("suppress_instrumentation"):
return await wrapped(*args, **kwargs)
self._original_transport = self._transport
self._is_instrumented_by_opentelemetry = True

transport = instance._transport or httpx.AsyncHTTPTransport()
telemetry_transport = AsyncOpenTelemetryTransport(
transport,
tracer_provider=tracer_provider,
request_hook=request_hook,
response_hook=response_hook,
self._transport = SyncOpenTelemetryTransport(
self._transport,
tracer_provider=_InstrumentedClient._tracer_provider,
request_hook=_InstrumentedClient._request_hook,
response_hook=_InstrumentedClient._response_hook,
)

instance._transport = telemetry_transport
return await wrapped(*args, **kwargs)

wrapt.wrap_function_wrapper(httpx.Client, "send", instrumented_sync_send)
class _InstrumentedAsyncClient(httpx.AsyncClient):

wrapt.wrap_function_wrapper(
httpx.AsyncClient, "send", instrumented_async_send
)
_tracer_provider = None
_request_hook = None
_response_hook = None

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

def _instrument_client(
client: typing.Union[httpx.Client, httpx.AsyncClient],
tracer_provider: TracerProvider = None,
request_hook: typing.Optional[RequestHook] = None,
response_hook: typing.Optional[ResponseHook] = None,
) -> None:
"""Enables instrumentation for the given Client or AsyncClient"""
# pylint: disable=protected-access
if isinstance(client, httpx.Client):
transport = client._transport or httpx.HTTPTransport()
telemetry_transport = SyncOpenTelemetryTransport(
transport,
tracer_provider=tracer_provider,
request_hook=request_hook,
response_hook=response_hook,
)
elif isinstance(client, httpx.AsyncClient):
transport = client._transport or httpx.AsyncHTTPTransport()
telemetry_transport = AsyncOpenTelemetryTransport(
transport,
tracer_provider=tracer_provider,
request_hook=request_hook,
response_hook=response_hook,
)
else:
raise TypeError("Invalid client provided")
client._transport = telemetry_transport
self._original_transport = self._transport
self._is_instrumented_by_opentelemetry = True


def _uninstrument() -> None:
"""Disables instrumenting for all newly created Client and AsyncClient instances"""
unwrap(httpx.Client, "send")
unwrap(httpx.AsyncClient, "send")


def _uninstrument_client(
client: typing.Union[httpx.Client, httpx.AsyncClient]
) -> None:
"""Disables instrumentation for the given Client or AsyncClient"""
# pylint: disable=protected-access
unwrap(client, "send")
self._transport = AsyncOpenTelemetryTransport(
self._transport,
tracer_provider=_InstrumentedAsyncClient._tracer_provider,
request_hook=_InstrumentedAsyncClient._request_hook,
response_hook=_InstrumentedAsyncClient._response_hook,
)


class HTTPXClientInstrumentor(BaseInstrumentor):
# pylint: disable=protected-access,attribute-defined-outside-init
"""An instrumentor for httpx Client and AsyncClient
See `BaseInstrumentor`
Expand All @@ -369,14 +322,31 @@ def _instrument(self, **kwargs):
``response_hook``: A hook that receives the span, request, and response
that is called right before the span ends
"""
_instrument(
tracer_provider=kwargs.get("tracer_provider"),
request_hook=kwargs.get("request_hook"),
response_hook=kwargs.get("response_hook"),
)
self._original_client = httpx.Client
self._original_async_client = httpx.AsyncClient
request_hook = kwargs.get("request_hook")
response_hook = kwargs.get("response_hook")
if callable(request_hook):
_InstrumentedClient._request_hook = request_hook
_InstrumentedAsyncClient._request_hook = request_hook
if callable(response_hook):
_InstrumentedClient._response_hook = response_hook
_InstrumentedAsyncClient._response_hook = response_hook
tracer_provider = kwargs.get("tracer_provider")
_InstrumentedClient._tracer_provider = tracer_provider
_InstrumentedAsyncClient._tracer_provider = tracer_provider
httpx.Client = _InstrumentedClient
httpx.AsyncClient = _InstrumentedAsyncClient

def _uninstrument(self, **kwargs):
_uninstrument()
httpx.Client = self._original_client
httpx.AsyncClient = self._original_async_client
_InstrumentedClient._tracer_provider = None
_InstrumentedClient._request_hook = None
_InstrumentedClient._response_hook = None
_InstrumentedAsyncClient._tracer_provider = None
_InstrumentedAsyncClient._request_hook = None
_InstrumentedAsyncClient._response_hook = None

@staticmethod
def instrument_client(
Expand All @@ -395,12 +365,34 @@ def instrument_client(
response_hook: A hook that receives the span, request, and response
that is called right before the span ends
"""
_instrument_client(
client,
tracer_provider=tracer_provider,
request_hook=request_hook,
response_hook=response_hook,
)
# pylint: disable=protected-access
if not hasattr(client, "_is_instrumented_by_opentelemetry"):
client._is_instrumented_by_opentelemetry = False

if not client._is_instrumented_by_opentelemetry:
if isinstance(client, httpx.Client):
client._original_transport = client._transport
transport = client._transport or httpx.HTTPTransport()
client._transport = SyncOpenTelemetryTransport(
transport,
tracer_provider=tracer_provider,
request_hook=request_hook,
response_hook=response_hook,
)
client._is_instrumented_by_opentelemetry = True
if isinstance(client, httpx.AsyncClient):
transport = client._transport or httpx.AsyncHTTPTransport()
client._transport = AsyncOpenTelemetryTransport(
transport,
tracer_provider=tracer_provider,
request_hook=request_hook,
response_hook=response_hook,
)
client._is_instrumented_by_opentelemetry = True
else:
_logger.warning(
"Attempting to instrument Httpx client while already instrumented"
)

@staticmethod
def uninstrument_client(
Expand All @@ -411,4 +403,12 @@ def uninstrument_client(
Args:
client: The httpx Client or AsyncClient instance
"""
_uninstrument_client(client)
if hasattr(client, "_original_transport"):
client._transport = client._original_transport
del client._original_transport
client._is_instrumented_by_opentelemetry = False
else:
_logger.warning(
"Attempting to uninstrument Httpx "
"client while already uninstrumented"
)
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@ def test_basic(self):
span, opentelemetry.instrumentation.httpx
)

def test_basic_multiple(self):
self.perform_request(self.URL)
self.perform_request(self.URL)
self.assert_span(num_spans=2)

def test_not_foundbasic(self):
url_404 = "http://httpbin.org/status/404"

Expand Down Expand Up @@ -375,20 +380,16 @@ def create_client(
pass

def setUp(self):
self.client = self.create_client()
HTTPXClientInstrumentor().instrument()
super().setUp()

def tearDown(self):
super().tearDown()
HTTPXClientInstrumentor().instrument()
self.client = self.create_client()
HTTPXClientInstrumentor().uninstrument()

def test_custom_tracer_provider(self):
resource = resources.Resource.create({})
result = self.create_tracer_provider(resource=resource)
tracer_provider, exporter = result

HTTPXClientInstrumentor().uninstrument()
HTTPXClientInstrumentor().instrument(
tracer_provider=tracer_provider
)
Expand All @@ -398,9 +399,9 @@ def test_custom_tracer_provider(self):
self.assertEqual(result.text, "Hello!")
span = self.assert_span(exporter=exporter)
self.assertIs(span.resource, resource)
HTTPXClientInstrumentor().uninstrument()

def test_response_hook(self):
HTTPXClientInstrumentor().uninstrument()
HTTPXClientInstrumentor().instrument(
tracer_provider=self.tracer_provider,
response_hook=self.response_hook,
Expand All @@ -419,9 +420,9 @@ def test_response_hook(self):
HTTP_RESPONSE_BODY: "Hello!",
},
)
HTTPXClientInstrumentor().uninstrument()

def test_request_hook(self):
HTTPXClientInstrumentor().uninstrument()
HTTPXClientInstrumentor().instrument(
tracer_provider=self.tracer_provider,
request_hook=self.request_hook,
Expand All @@ -432,9 +433,9 @@ def test_request_hook(self):
self.assertEqual(result.text, "Hello!")
span = self.assert_span()
self.assertEqual(span.name, "GET" + self.URL)
HTTPXClientInstrumentor().uninstrument()

def test_request_hook_no_span_update(self):
HTTPXClientInstrumentor().uninstrument()
HTTPXClientInstrumentor().instrument(
tracer_provider=self.tracer_provider,
request_hook=self.no_update_request_hook,
Expand All @@ -445,10 +446,10 @@ def test_request_hook_no_span_update(self):
self.assertEqual(result.text, "Hello!")
span = self.assert_span()
self.assertEqual(span.name, "HTTP GET")
HTTPXClientInstrumentor().uninstrument()

def test_not_recording(self):
with mock.patch("opentelemetry.trace.INVALID_SPAN") as mock_span:
HTTPXClientInstrumentor().uninstrument()
HTTPXClientInstrumentor().instrument(
tracer_provider=trace._DefaultTracerProvider()
)
Expand All @@ -463,8 +464,10 @@ def test_not_recording(self):
self.assertTrue(mock_span.is_recording.called)
self.assertFalse(mock_span.set_attribute.called)
self.assertFalse(mock_span.set_status.called)
HTTPXClientInstrumentor().uninstrument()

def test_suppress_instrumentation_new_client(self):
HTTPXClientInstrumentor().instrument()
token = context.attach(
context.set_value("suppress_instrumentation", True)
)
Expand All @@ -476,32 +479,22 @@ def test_suppress_instrumentation_new_client(self):
context.detach(token)

self.assert_span(num_spans=0)

def test_existing_client(self):
HTTPXClientInstrumentor().uninstrument()
client = self.create_client()
HTTPXClientInstrumentor().instrument()
result = self.perform_request(self.URL, client=client)
self.assertEqual(result.text, "Hello!")
self.assert_span(num_spans=1)

def test_instrument_client(self):
HTTPXClientInstrumentor().uninstrument()
client = self.create_client()
HTTPXClientInstrumentor().instrument_client(client)
result = self.perform_request(self.URL, client=client)
self.assertEqual(result.text, "Hello!")
self.assert_span(num_spans=1)
# instrument again to avoid annoying warning message
HTTPXClientInstrumentor().instrument()

def test_uninstrument(self):
HTTPXClientInstrumentor().instrument()
HTTPXClientInstrumentor().uninstrument()
result = self.perform_request(self.URL)
client = self.create_client()
result = self.perform_request(self.URL, client=client)
self.assertEqual(result.text, "Hello!")
self.assert_span(num_spans=0)
# instrument again to avoid annoying warning message
HTTPXClientInstrumentor().instrument()

def test_uninstrument_client(self):
HTTPXClientInstrumentor().uninstrument_client(self.client)
Expand All @@ -512,6 +505,7 @@ def test_uninstrument_client(self):
self.assert_span(num_spans=0)

def test_uninstrument_new_client(self):
HTTPXClientInstrumentor().instrument()
client1 = self.create_client()
HTTPXClientInstrumentor().uninstrument_client(client1)

Expand Down