Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- `opentelemetry-instrumentation-aiohttp-server`: Support passing `TracerProvider` when instrumenting.
([#3819](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3819))
### Added

- `opentelemetry-instrumentation-aiohttp-client`: add support for url exclusions via `OTEL_PYTHON_EXCLUDED_URLS` / `OTEL_PYTHON_AIOHTTP_CLIENT_EXCLUDED_URLS`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,13 @@
from opentelemetry.instrumentation.aiohttp_server import (
AioHttpServerInstrumentor
)
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.sampling import ParentBased, TraceIdRatioBased

AioHttpServerInstrumentor().instrument()
resource = Resource(attributes={"service.name": "my-aiohttp-service"})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great! I would also add a comment so newer/quickstart users know that these are optional:

Suggested change
resource = Resource(attributes={"service.name": "my-aiohttp-service"})
# Optional: configure non-default TracerProvider, resource, sampler
resource = Resource(attributes={"service.name": "my-aiohttp-service"})

sampler = ParentBased(root=TraceIdRatioBased(rate=0.25)) # sample 25% of traces
AioHttpServerInstrumentor().instrument(tracer_provider=TracerProvider(resource=resource, sampler=sampler))

async def hello(request):
return web.Response(text="Hello, world")
Expand Down Expand Up @@ -248,63 +253,87 @@ def keys(self, carrier: Dict) -> List:
getter = AiohttpGetter()


@web.middleware
async def middleware(request, handler):
"""Middleware for aiohttp implementing tracing logic"""
if not is_http_instrumentation_enabled() or _excluded_urls.url_disabled(
request.url.path
):
return await handler(request)
def create_aiohttp_middleware(
tracer_provider: trace.TracerProvider | None = None,
):
_tracer = (
tracer_provider.get_tracer(__name__, __version__)
if tracer_provider
else tracer
)

span_name, additional_attributes = get_default_span_details(request)
@web.middleware
async def _middleware(request, handler):
"""Middleware for aiohttp implementing tracing logic"""
if (
not is_http_instrumentation_enabled()
or _excluded_urls.url_disabled(request.url.path)
):
return await handler(request)

span_name, additional_attributes = get_default_span_details(request)

req_attrs = collect_request_attributes(request)
duration_attrs = _parse_duration_attrs(req_attrs)
active_requests_count_attrs = _parse_active_request_count_attrs(
req_attrs
)

req_attrs = collect_request_attributes(request)
duration_attrs = _parse_duration_attrs(req_attrs)
active_requests_count_attrs = _parse_active_request_count_attrs(req_attrs)
duration_histogram = meter.create_histogram(
name=MetricInstruments.HTTP_SERVER_DURATION,
unit="ms",
description="Measures the duration of inbound HTTP requests.",
)

duration_histogram = meter.create_histogram(
name=MetricInstruments.HTTP_SERVER_DURATION,
unit="ms",
description="Measures the duration of inbound HTTP requests.",
)
active_requests_counter = meter.create_up_down_counter(
name=MetricInstruments.HTTP_SERVER_ACTIVE_REQUESTS,
unit="requests",
description="measures the number of concurrent HTTP requests those are currently in flight",
)

with _tracer.start_as_current_span(
span_name,
context=extract(request, getter=getter),
kind=trace.SpanKind.SERVER,
) as span:
attributes = collect_request_attributes(request)
attributes.update(additional_attributes)
span.set_attributes(attributes)
start = default_timer()
active_requests_counter.add(1, active_requests_count_attrs)
try:
resp = await handler(request)
set_status_code(span, resp.status)
except web.HTTPException as ex:
set_status_code(span, ex.status_code)
raise
finally:
duration = max((default_timer() - start) * 1000, 0)
duration_histogram.record(duration, duration_attrs)
active_requests_counter.add(-1, active_requests_count_attrs)
return resp

return _middleware

active_requests_counter = meter.create_up_down_counter(
name=MetricInstruments.HTTP_SERVER_ACTIVE_REQUESTS,
unit="requests",
description="measures the number of concurrent HTTP requests those are currently in flight",
)

with tracer.start_as_current_span(
span_name,
context=extract(request, getter=getter),
kind=trace.SpanKind.SERVER,
) as span:
attributes = collect_request_attributes(request)
attributes.update(additional_attributes)
span.set_attributes(attributes)
start = default_timer()
active_requests_counter.add(1, active_requests_count_attrs)
try:
resp = await handler(request)
set_status_code(span, resp.status)
except web.HTTPException as ex:
set_status_code(span, ex.status_code)
raise
finally:
duration = max((default_timer() - start) * 1000, 0)
duration_histogram.record(duration, duration_attrs)
active_requests_counter.add(-1, active_requests_count_attrs)
return resp


class _InstrumentedApplication(web.Application):
"""Insert tracing middleware"""

def __init__(self, *args, **kwargs):
middlewares = kwargs.pop("middlewares", [])
middlewares.insert(0, middleware)
kwargs["middlewares"] = middlewares
super().__init__(*args, **kwargs)
middleware = create_aiohttp_middleware() # for backwards compatibility


def create_instrumented_application(
tracer_provider: trace.TracerProvider | None = None,
):
_middleware = create_aiohttp_middleware(tracer_provider=tracer_provider)

class _InstrumentedApplication(web.Application):
"""Insert tracing middleware"""

def __init__(self, *args, **kwargs):
middlewares = kwargs.pop("middlewares", [])
middlewares.insert(0, _middleware)
kwargs["middlewares"] = middlewares
super().__init__(*args, **kwargs)

return _InstrumentedApplication


class AioHttpServerInstrumentor(BaseInstrumentor):
Expand All @@ -315,6 +344,10 @@ class AioHttpServerInstrumentor(BaseInstrumentor):
"""

def _instrument(self, **kwargs):
tracer_provider = kwargs.get("tracer_provider", None)
assert tracer_provider is None or isinstance(
tracer_provider, trace.TracerProvider
)
# update global values at instrument time so we can test them
global _excluded_urls # pylint: disable=global-statement
_excluded_urls = get_excluded_urls("AIOHTTP_SERVER")
Expand All @@ -326,6 +359,10 @@ def _instrument(self, **kwargs):
meter = metrics.get_meter(__name__, __version__)

self._original_app = web.Application

_InstrumentedApplication = create_instrumented_application(
tracer_provider=tracer_provider
)
setattr(web, "Application", _InstrumentedApplication)

def _uninstrument(self, **kwargs):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
AioHttpServerInstrumentor,
)
from opentelemetry.instrumentation.utils import suppress_http_instrumentation
from opentelemetry.sdk.trace.sampling import ParentBased, TraceIdRatioBased
from opentelemetry.semconv._incubating.attributes.http_attributes import (
HTTP_METHOD,
HTTP_STATUS_CODE,
Expand Down Expand Up @@ -94,9 +95,9 @@ def fixture_suppress():

@pytest_asyncio.fixture(name="server_fixture")
async def fixture_server_fixture(tracer, aiohttp_server, suppress):
_, memory_exporter = tracer
tracer_provider, memory_exporter = tracer

AioHttpServerInstrumentor().instrument()
AioHttpServerInstrumentor().instrument(tracer_provider=tracer_provider)

app = aiohttp.web.Application()
app.add_routes([aiohttp.web.get("/test-path", default_handler)])
Expand Down Expand Up @@ -221,20 +222,6 @@ async def handler(request):
memory_exporter.clear()


def _get_sorted_metrics(metrics_data):
resource_metrics = metrics_data.resource_metrics if metrics_data else []

all_metrics = []
for metrics in resource_metrics:
for scope_metrics in metrics.scope_metrics:
all_metrics.extend(scope_metrics.metrics)

return sorted(
all_metrics,
key=lambda m: m.name,
)


@pytest.mark.asyncio
@pytest.mark.parametrize(
"env_var",
Expand Down Expand Up @@ -272,3 +259,56 @@ async def handler(request):
assert len(metrics) == 0

AioHttpServerInstrumentor().uninstrument()


@pytest.mark.asyncio
@pytest.mark.parametrize(
"tracer",
[
TestBase().create_tracer_provider(
sampler=ParentBased(TraceIdRatioBased(0.05))
)
],
)
async def test_non_global_tracer_provider(
tracer,
server_fixture,
aiohttp_client,
):
n_requests = 1000
collection_ratio = 0.05
n_expected_trace_ids = n_requests * collection_ratio

_, memory_exporter = tracer
server, _ = server_fixture

assert len(memory_exporter.get_finished_spans()) == 0

client = await aiohttp_client(server)
for _ in range(n_requests):
await client.get("/test-path")

trace_ids = {
span.context.trace_id
for span in memory_exporter.get_finished_spans()
if span.context is not None
}
assert (
0.5 * n_expected_trace_ids
<= len(trace_ids)
<= 1.5 * n_expected_trace_ids
)


def _get_sorted_metrics(metrics_data):
resource_metrics = metrics_data.resource_metrics if metrics_data else []

all_metrics = []
for metrics in resource_metrics:
for scope_metrics in metrics.scope_metrics:
all_metrics.extend(scope_metrics.metrics)

return sorted(
all_metrics,
key=lambda m: m.name,
)
Loading