diff --git a/changelog.d/11618.misc b/changelog.d/11618.misc new file mode 100644 index 000000000000..4076b30bf7c1 --- /dev/null +++ b/changelog.d/11618.misc @@ -0,0 +1 @@ +Improve opentracing support for incoming HTTP requests. diff --git a/synapse/federation/transport/server/_base.py b/synapse/federation/transport/server/_base.py index dc39e3537bf6..da1fbf8b6361 100644 --- a/synapse/federation/transport/server/_base.py +++ b/synapse/federation/transport/server/_base.py @@ -22,13 +22,11 @@ from synapse.http.server import HttpServer, ServletCallback from synapse.http.servlet import parse_json_object_from_request from synapse.http.site import SynapseRequest -from synapse.logging import opentracing from synapse.logging.context import run_in_background from synapse.logging.opentracing import ( - SynapseTags, - start_active_span, - start_active_span_from_request, - tags, + set_tag, + span_context_from_request, + start_active_span_follows_from, whitelisted_homeserver, ) from synapse.server import HomeServer @@ -279,30 +277,19 @@ async def new_func( logger.warning("authenticate_request failed: %s", e) raise - request_tags = { - SynapseTags.REQUEST_ID: request.get_request_id(), - tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER, - tags.HTTP_METHOD: request.get_method(), - tags.HTTP_URL: request.get_redacted_uri(), - tags.PEER_HOST_IPV6: request.getClientIP(), - "authenticated_entity": origin, - "servlet_name": request.request_metrics.name, - } - - # Only accept the span context if the origin is authenticated - # and whitelisted + # update the active opentracing span with the authenticated entity + set_tag("authenticated_entity", origin) + + # if the origin is authenticated and whitelisted, link to its span context + context = None if origin and whitelisted_homeserver(origin): - scope = start_active_span_from_request( - request, "incoming-federation-request", tags=request_tags - ) - else: - scope = start_active_span( - "incoming-federation-request", tags=request_tags - ) + context = span_context_from_request(request) - with scope: - opentracing.inject_response_headers(request.responseHeaders) + scope = start_active_span_follows_from( + "incoming-federation-request", contexts=(context,) if context else () + ) + with scope: if origin and self.RATELIMIT: with ratelimiter.ratelimit(origin) as d: await d diff --git a/synapse/http/site.py b/synapse/http/site.py index 9f68d7e19174..80f7a2ff58e5 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -14,7 +14,7 @@ import contextlib import logging import time -from typing import Any, Generator, Optional, Tuple, Union +from typing import TYPE_CHECKING, Any, Generator, Optional, Tuple, Union import attr from zope.interface import implementer @@ -35,6 +35,9 @@ ) from synapse.types import Requester +if TYPE_CHECKING: + import opentracing + logger = logging.getLogger(__name__) _next_request_seq = 0 @@ -81,6 +84,10 @@ def __init__( # server name, for client requests this is the Requester object. self._requester: Optional[Union[Requester, str]] = None + # An opentracing span for this request. Will be closed when the request is + # completely processed. + self._opentracing_span: "Optional[opentracing.Span]" = None + # we can't yet create the logcontext, as we don't know the method. self.logcontext: Optional[LoggingContext] = None @@ -148,6 +155,13 @@ def requester(self, value: Union[Requester, str]) -> None: # If there's no authenticated entity, it was the requester. self.logcontext.request.authenticated_entity = authenticated_entity or requester + def set_opentracing_span(self, span: "opentracing.Span") -> None: + """attach an opentracing span to this request + + Doing so will cause the span to be closed when we finish processing the request + """ + self._opentracing_span = span + def get_request_id(self) -> str: return "%s-%i" % (self.get_method(), self.request_seq) @@ -286,6 +300,9 @@ async def handle_request(request): self._processing_finished_time = time.time() self._is_processing = False + if self._opentracing_span: + self._opentracing_span.log_kv({"event": "finished processing"}) + # if we've already sent the response, log it now; otherwise, we wait for the # response to be sent. if self.finish_time is not None: @@ -299,6 +316,8 @@ def finish(self) -> None: """ self.finish_time = time.time() Request.finish(self) + if self._opentracing_span: + self._opentracing_span.log_kv({"event": "response sent"}) if not self._is_processing: assert self.logcontext is not None with PreserveLoggingContext(self.logcontext): @@ -333,6 +352,11 @@ def connectionLost(self, reason: Union[Failure, Exception]) -> None: with PreserveLoggingContext(self.logcontext): logger.info("Connection from client lost before response was sent") + if self._opentracing_span: + self._opentracing_span.log_kv( + {"event": "client connection lost", "reason": str(reason.value)} + ) + if not self._is_processing: self._finished_processing() @@ -421,6 +445,10 @@ def _finished_processing(self) -> None: usage.evt_db_fetch_count, ) + # complete the opentracing span, if any. + if self._opentracing_span: + self._opentracing_span.finish() + try: self.request_metrics.stop(self.finish_time, self.code, self.sentLength) except Exception as e: diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py index 5d93ab07f181..63642906151e 100644 --- a/synapse/logging/opentracing.py +++ b/synapse/logging/opentracing.py @@ -173,6 +173,7 @@ def set_fates(clotho, lachesis, atropos, father="Zues", mother="Themis"): import attr from twisted.internet import defer +from twisted.web.http import Request from twisted.web.http_headers import Headers from synapse.config import ConfigError @@ -490,48 +491,6 @@ def start_active_span_follows_from( return scope -def start_active_span_from_request( - request, - operation_name, - references=None, - tags=None, - start_time=None, - ignore_active_span=False, - finish_on_close=True, -): - """ - Extracts a span context from a Twisted Request. - args: - headers (twisted.web.http.Request) - - For the other args see opentracing.tracer - - returns: - span_context (opentracing.span.SpanContext) - """ - # Twisted encodes the values as lists whereas opentracing doesn't. - # So, we take the first item in the list. - # Also, twisted uses byte arrays while opentracing expects strings. - - if opentracing is None: - return noop_context_manager() # type: ignore[unreachable] - - header_dict = { - k.decode(): v[0].decode() for k, v in request.requestHeaders.getAllRawHeaders() - } - context = opentracing.tracer.extract(opentracing.Format.HTTP_HEADERS, header_dict) - - return opentracing.tracer.start_active_span( - operation_name, - child_of=context, - references=references, - tags=tags, - start_time=start_time, - ignore_active_span=ignore_active_span, - finish_on_close=finish_on_close, - ) - - def start_active_span_from_edu( edu_content, operation_name, @@ -743,6 +702,20 @@ def active_span_context_as_string(): return json_encoder.encode(carrier) +def span_context_from_request(request: Request) -> "Optional[opentracing.SpanContext]": + """Extract an opentracing context from the headers on an HTTP request + + This is useful when we have received an HTTP request from another part of our + system, and want to link our spans to those of the remote system. + """ + if not opentracing: + return None + header_dict = { + k.decode(): v[0].decode() for k, v in request.requestHeaders.getAllRawHeaders() + } + return opentracing.tracer.extract(opentracing.Format.HTTP_HEADERS, header_dict) + + @only_if_tracing def span_context_from_string(carrier): """ @@ -882,10 +855,13 @@ def trace_servlet(request: "SynapseRequest", extract_context: bool = False): } request_name = request.request_metrics.name - if extract_context: - scope = start_active_span_from_request(request, request_name) - else: - scope = start_active_span(request_name) + context = span_context_from_request(request) if extract_context else None + + # we configure the scope not to finish the span immediately on exit, and instead + # pass the span into the SynapseRequest, which will finish it once we've finished + # sending the response to the client. + scope = start_active_span(request_name, child_of=context, finish_on_close=False) + request.set_opentracing_span(scope.span) with scope: inject_response_headers(request.responseHeaders)