From b139f40fbc04d95f8088fb47581c01f2793addca Mon Sep 17 00:00:00 2001 From: Fatih Acar Date: Thu, 25 Jan 2024 18:56:04 +0100 Subject: [PATCH] feat(backend): add e2e useful tracing Enabled tracing component inside git agent in order to have end to end spans for a single request. Added spans at most useful places such as message bus' execute_message and database execute_query. Signed-off-by: Fatih Acar --- backend/infrahub/cli/git_agent.py | 12 +++- backend/infrahub/database/__init__.py | 30 +++++----- backend/infrahub/graphql/mutations/branch.py | 2 + .../message_bus/operations/__init__.py | 37 +++++++----- backend/infrahub/server.py | 35 ++++++----- .../services/adapters/message_bus/__init__.py | 33 ++++++++++- .../services/adapters/message_bus/rabbitmq.py | 32 ++++++---- backend/infrahub/trace.py | 58 +++---------------- development/docker-compose.override.yml.tmp | 9 ++- development/infrahub.toml | 2 +- poetry.lock | 36 +++++++++++- pyproject.toml | 2 + 12 files changed, 179 insertions(+), 109 deletions(-) diff --git a/backend/infrahub/cli/git_agent.py b/backend/infrahub/cli/git_agent.py index 96113c2a67..4e9788439d 100644 --- a/backend/infrahub/cli/git_agent.py +++ b/backend/infrahub/cli/git_agent.py @@ -9,7 +9,7 @@ from prometheus_client import start_http_server from rich.logging import RichHandler -from infrahub import config +from infrahub import __version__, config from infrahub.components import ComponentType from infrahub.core.initialization import initialization from infrahub.database import InfrahubDatabase, get_db @@ -20,6 +20,7 @@ from infrahub.services import InfrahubServices from infrahub.services.adapters.cache.redis import RedisCache from infrahub.services.adapters.message_bus.rabbitmq import RabbitMQMessageBus +from infrahub.trace import configure_trace app = typer.Typer() @@ -65,6 +66,15 @@ async def _start(debug: bool, port: int) -> None: client = await InfrahubClient.init(address=config.SETTINGS.main.internal_address, retry_on_failure=True, log=log) await client.branch.all() + # Initialize trace + if config.SETTINGS.trace.enable: + configure_trace( + service="infrahub-git-agent", + version=__version__, + exporter_endpoint=config.SETTINGS.trace.trace_endpoint, + exporter_protocol=config.SETTINGS.trace.exporter_protocol, + ) + # Initialize the lock initialize_lock() diff --git a/backend/infrahub/database/__init__.py b/backend/infrahub/database/__init__.py index e7712123e8..adcc7601e6 100644 --- a/backend/infrahub/database/__init__.py +++ b/backend/infrahub/database/__init__.py @@ -15,6 +15,7 @@ # from contextlib import asynccontextmanager from neo4j.exceptions import ClientError, ServiceUnavailable +from otel_extensions import get_tracer from infrahub import config from infrahub.exceptions import DatabaseError @@ -192,19 +193,22 @@ async def close(self): async def execute_query( self, query: str, params: Optional[Dict[str, Any]] = None, name: Optional[str] = "undefined" ) -> List[Record]: - with QUERY_EXECUTION_METRICS.labels(str(self._session_mode), name).time(): - if self.is_transaction: - execution_method = await self.transaction() - else: - execution_method = await self.session() - - try: - response = await execution_method.run(query=query, parameters=params) - except ServiceUnavailable as exc: - log.error("Database Service unavailable", error=str(exc)) - raise DatabaseError(message="Unable to connect to the database") from exc - - return [item async for item in response] + with get_tracer(__name__).start_as_current_span("execute_db_query") as span: + span.set_attribute("query", query) + + with QUERY_EXECUTION_METRICS.labels(str(self._session_mode), name).time(): + if self.is_transaction: + execution_method = await self.transaction() + else: + execution_method = await self.session() + + try: + response = await execution_method.run(query=query, parameters=params) + except ServiceUnavailable as exc: + log.error("Database Service unavailable", error=str(exc)) + raise DatabaseError(message="Unable to connect to the database") from exc + + return [item async for item in response] def render_list_comprehension(self, items: str, item_name: str) -> str: if self.db_type == DatabaseType.MEMGRAPH: diff --git a/backend/infrahub/graphql/mutations/branch.py b/backend/infrahub/graphql/mutations/branch.py index 7e96bda278..c34a2c0ff9 100644 --- a/backend/infrahub/graphql/mutations/branch.py +++ b/backend/infrahub/graphql/mutations/branch.py @@ -4,6 +4,7 @@ from graphene import Boolean, Field, InputObjectType, List, Mutation, String from graphql import GraphQLResolveInfo from infrahub_sdk.utils import extract_fields +from otel_extensions import instrumented from infrahub import config, lock from infrahub.core import registry @@ -44,6 +45,7 @@ class Arguments: object = Field(BranchType) @classmethod + @instrumented async def mutate(cls, root: dict, info: GraphQLResolveInfo, data: BranchCreateInput, background_execution=False): db: InfrahubDatabase = info.context.get("infrahub_database") diff --git a/backend/infrahub/message_bus/operations/__init__.py b/backend/infrahub/message_bus/operations/__init__.py index 962c3a7005..8ec945c3c5 100644 --- a/backend/infrahub/message_bus/operations/__init__.py +++ b/backend/infrahub/message_bus/operations/__init__.py @@ -1,5 +1,7 @@ import json +from otel_extensions import get_tracer + from infrahub.log import get_logger from infrahub.message_bus import InfrahubResponse, messages from infrahub.message_bus.operations import check, event, finalize, git, refresh, requests, send, transform, trigger @@ -53,19 +55,22 @@ async def execute_message(routing_key: str, message_body: bytes, service: InfrahubServices): - message_data = json.loads(message_body) - message = messages.MESSAGE_MAP[routing_key](**message_data) - message.set_log_data(routing_key=routing_key) - try: - await COMMAND_MAP[routing_key](message=message, service=service) - except Exception as exc: # pylint: disable=broad-except - if message.reply_requested: - response = InfrahubResponse(passed=False, response_class="rpc_error", response_data={"error": str(exc)}) - await service.reply(message=response, initiator=message) - return - if message.reached_max_retries: - log.error("Message failed after maximum number of retries", error=str(exc)) - await set_check_status(message, conclusion="failure", service=service) - return - message.increase_retry_count() - await service.send(message, delay=MessageTTL.FIVE) + with get_tracer(__name__).start_as_current_span("execute_message") as span: + span.set_attribute("routing_key", routing_key) + + message_data = json.loads(message_body) + message = messages.MESSAGE_MAP[routing_key](**message_data) + message.set_log_data(routing_key=routing_key) + try: + await COMMAND_MAP[routing_key](message=message, service=service) + except Exception as exc: # pylint: disable=broad-except + if message.reply_requested: + response = InfrahubResponse(passed=False, response_class="rpc_error", response_data={"error": str(exc)}) + await service.reply(message=response, initiator=message) + return + if message.reached_max_retries: + log.error("Message failed after maximum number of retries", error=str(exc)) + await set_check_status(message, conclusion="failure", service=service) + return + message.increase_retry_count() + await service.send(message, delay=MessageTTL.FIVE) diff --git a/backend/infrahub/server.py b/backend/infrahub/server.py index d6abdf593c..5fc32e17fc 100644 --- a/backend/infrahub/server.py +++ b/backend/infrahub/server.py @@ -12,7 +12,7 @@ from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from infrahub_sdk.timestamp import TimestampFormatError -from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor +from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor, Span from pydantic import ValidationError from starlette_exporter import PrometheusMiddleware, handle_metrics @@ -32,7 +32,7 @@ from infrahub.services import InfrahubServices, services from infrahub.services.adapters.cache.redis import RedisCache from infrahub.services.adapters.message_bus.rabbitmq import RabbitMQMessageBus -from infrahub.trace import add_span_exception, configure_trace, get_traceid, get_tracer +from infrahub.trace import add_span_exception, configure_trace, get_traceid from infrahub.worker import WORKER_IDENTITY @@ -46,8 +46,8 @@ async def app_initialization(application: FastAPI) -> None: # Initialize trace if config.SETTINGS.trace.enable: configure_trace( + service="infrahub-server", version=__version__, - exporter_type=config.SETTINGS.trace.exporter_type, exporter_endpoint=config.SETTINGS.trace.trace_endpoint, exporter_protocol=config.SETTINGS.trace.exporter_protocol, ) @@ -101,8 +101,13 @@ async def lifespan(application: FastAPI): redoc_url="/api/redoc", ) -FastAPIInstrumentor().instrument_app(app, excluded_urls=".*/metrics") -tracer = get_tracer() + +def server_request_hook(span: Span, scope: dict): # pylint: disable=unused-argument + if span and span.is_recording(): + span.set_attribute("worker", WORKER_IDENTITY) + + +FastAPIInstrumentor().instrument_app(app, excluded_urls=".*/metrics", server_request_hook=server_request_hook) FRONTEND_DIRECTORY = os.environ.get("INFRAHUB_FRONTEND_DIRECTORY", os.path.abspath("frontend")) FRONTEND_ASSET_DIRECTORY = f"{FRONTEND_DIRECTORY}/dist/assets" @@ -121,15 +126,17 @@ async def lifespan(application: FastAPI): async def logging_middleware(request: Request, call_next: Callable[[Request], Awaitable[Response]]) -> Response: clear_log_context() request_id = correlation_id.get() - with tracer.start_as_current_span("processing request " + request_id): - trace_id = get_traceid() - set_log_data(key="request_id", value=request_id) - set_log_data(key="app", value="infrahub.api") - set_log_data(key="worker", value=WORKER_IDENTITY) - if trace_id: - set_log_data(key="trace_id", value=trace_id) - response = await call_next(request) - return response + + set_log_data(key="request_id", value=request_id) + set_log_data(key="app", value="infrahub.api") + set_log_data(key="worker", value=WORKER_IDENTITY) + + trace_id = get_traceid() + if trace_id: + set_log_data(key="trace_id", value=trace_id) + + response = await call_next(request) + return response @app.middleware("http") diff --git a/backend/infrahub/services/adapters/message_bus/__init__.py b/backend/infrahub/services/adapters/message_bus/__init__.py index f0251266b3..72d525b373 100644 --- a/backend/infrahub/services/adapters/message_bus/__init__.py +++ b/backend/infrahub/services/adapters/message_bus/__init__.py @@ -2,14 +2,45 @@ from typing import TYPE_CHECKING, Optional +import opentelemetry.instrumentation.aio_pika.span_builder +from opentelemetry.instrumentation.aio_pika import AioPikaInstrumentor +from opentelemetry.semconv.trace import SpanAttributes + if TYPE_CHECKING: - from aio_pika.abc import AbstractExchange + from aio_pika.abc import AbstractChannel, AbstractExchange + from opentelemetry.instrumentation.aio_pika.span_builder import SpanBuilder from infrahub.message_bus import InfrahubMessage, InfrahubResponse from infrahub.message_bus.types import MessageTTL from infrahub.services import InfrahubServices +AioPikaInstrumentor().instrument() + + +# TODO: remove this once https://github.com/open-telemetry/opentelemetry-python-contrib/issues/1835 is resolved +def patch_spanbuilder_set_channel() -> None: + """ + The default SpanBuilder.set_channel does not work with aio_pika 9.1 and the refactored connection + attribute + """ + + def set_channel(self: SpanBuilder, channel: AbstractChannel) -> None: + if hasattr(channel, "_connection"): + url = channel._connection.url + self._attributes.update( + { + SpanAttributes.NET_PEER_NAME: url.host, + SpanAttributes.NET_PEER_PORT: url.port, + } + ) + + opentelemetry.instrumentation.aio_pika.span_builder.SpanBuilder.set_channel = set_channel # type: ignore[misc] + + +patch_spanbuilder_set_channel() + + class InfrahubMessageBus: # This exchange attribute should be removed when the InfrahubRpcClient # class has been removed diff --git a/backend/infrahub/services/adapters/message_bus/rabbitmq.py b/backend/infrahub/services/adapters/message_bus/rabbitmq.py index 20773ac54e..9f9a9359a2 100644 --- a/backend/infrahub/services/adapters/message_bus/rabbitmq.py +++ b/backend/infrahub/services/adapters/message_bus/rabbitmq.py @@ -6,6 +6,7 @@ import aio_pika from infrahub_sdk import UUIDT +from opentelemetry import context, propagate from infrahub import config from infrahub.components import ComponentType @@ -183,17 +184,28 @@ async def subscribe(self) -> None: async for message in qiterator: try: async with message.process(requeue=False): + # auto instrumentation not supported yet for RPCs, do it ourselves... + token = None + headers = message.headers or {} + ctx = propagate.extract(headers) + if ctx is not None: + token = context.attach(ctx) + clear_log_context() - if message.routing_key in messages.MESSAGE_MAP: - await execute_message( - routing_key=message.routing_key, message_body=message.body, service=self.service - ) - else: - self.service.log.error( - "Unhandled routing key for message", - routing_key=message.routing_key, - message=message.body, - ) + try: + if message.routing_key in messages.MESSAGE_MAP: + await execute_message( + routing_key=message.routing_key, message_body=message.body, service=self.service + ) + else: + self.service.log.error( + "Unhandled routing key for message", + routing_key=message.routing_key, + message=message.body, + ) + finally: + if token is not None: + context.detach(token) except Exception: # pylint: disable=broad-except self.service.log.exception("Processing error for message %r" % message) diff --git a/backend/infrahub/trace.py b/backend/infrahub/trace.py index 689b1f7aac..a7ae89626d 100644 --- a/backend/infrahub/trace.py +++ b/backend/infrahub/trace.py @@ -1,18 +1,6 @@ from opentelemetry import trace -from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( - OTLPSpanExporter as GRPCSpanExporter, -) -from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( - OTLPSpanExporter as HTTPSpanExporter, -) -from opentelemetry.sdk.resources import Resource -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter from opentelemetry.trace import StatusCode - - -def get_tracer(name: str = "infrahub") -> trace.Tracer: - return trace.get_tracer(name) +from otel_extensions import TelemetryOptions, init_telemetry_provider def get_current_span_with_context() -> trace.Span: @@ -54,42 +42,10 @@ def add_span_exception(exception: Exception) -> None: current_span.record_exception(exception) -def create_tracer_provider( - version: str, exporter_type: str, exporter_endpoint: str = None, exporter_protocol: str = None -) -> TracerProvider: - # Create a BatchSpanProcessor exporter based on the type - if exporter_type == "console": - exporter = ConsoleSpanExporter() - elif exporter_type == "otlp": - if not exporter_endpoint: - raise ValueError("Exporter type is set to otlp but endpoint is not set") - if exporter_protocol == "http/protobuf": - exporter = HTTPSpanExporter(endpoint=exporter_endpoint) - elif exporter_protocol == "grpc": - exporter = GRPCSpanExporter(endpoint=exporter_endpoint) - else: - raise ValueError("Exporter type unsupported by Infrahub") - - # Resource can be required for some backends, e.g. Jaeger - resource = Resource(attributes={"service.name": "infrahub", "service.version": version}) - span_processor = BatchSpanProcessor(exporter) - tracer_provider = TracerProvider(resource=resource) - tracer_provider.add_span_processor(span_processor) - - return tracer_provider - - -def configure_trace( - version: str, exporter_type: str, exporter_endpoint: str = None, exporter_protocol: str = None -) -> None: - # Create a trace provider with the exporter - tracer_provider = create_tracer_provider( - version=version, - exporter_type=exporter_type, - exporter_endpoint=exporter_endpoint, - exporter_protocol=exporter_protocol, +def configure_trace(service: str, version: str, exporter_endpoint: str = None, exporter_protocol: str = None) -> None: + options = TelemetryOptions( + OTEL_SERVICE_NAME=service, + OTEL_EXPORTER_OTLP_ENDPOINT=exporter_endpoint, + OTEL_EXPORTER_OTLP_PROTOCOL=exporter_protocol, ) - tracer_provider.get_tracer(__name__) - - # Register the trace provider - trace.set_tracer_provider(tracer_provider) + init_telemetry_provider(options, **{"service.version": version}) diff --git a/development/docker-compose.override.yml.tmp b/development/docker-compose.override.yml.tmp index aacccba4a6..33e077ec11 100644 --- a/development/docker-compose.override.yml.tmp +++ b/development/docker-compose.override.yml.tmp @@ -3,7 +3,7 @@ version: "3.4" services: # -------------------------------------------------------------------------------- # - Prometheus to collect all metrics endpoints - # - Tempo to receive traces + # - Tempo or Jaeger to receive traces # - Grafana to visualize these metrics # - Loki to receive logs from promtail # - Promtail to parse logs from different source @@ -43,6 +43,13 @@ services: ports: - "3200:3200" + # jaeger: + # image: jaegertracing/all-in-one:1.53 + # environment: + # COLLECTOR_ZIPKIN_HOST_PORT: ":9411" + # ports: + # - "16686:16686" + prometheus: image: prom/prometheus:latest volumes: diff --git a/development/infrahub.toml b/development/infrahub.toml index 7aa6b1121f..c8aec827fc 100644 --- a/development/infrahub.toml +++ b/development/infrahub.toml @@ -30,7 +30,7 @@ enable = false insecure = "True" exporter_type = "otlp" exporter_protocol = "grpc" -exporter_endpoint = "tempo" +exporter_endpoint = "jaeger" exporter_port = 4317 diff --git a/poetry.lock b/poetry.lock index a54ede17ed..7278cc9f9b 100644 --- a/poetry.lock +++ b/poetry.lock @@ -2438,6 +2438,25 @@ opentelemetry-api = ">=1.4,<2.0" setuptools = ">=16.0" wrapt = ">=1.0.0,<2.0.0" +[[package]] +name = "opentelemetry-instrumentation-aio-pika" +version = "0.43b0" +description = "OpenTelemetry Aio-pika instrumentation" +optional = false +python-versions = ">=3.7" +files = [ + {file = "opentelemetry_instrumentation_aio_pika-0.43b0-py3-none-any.whl", hash = "sha256:1ca914033c093d3c720bd74116456df61c3a67b46f90af6b0cb9afda377ae59f"}, + {file = "opentelemetry_instrumentation_aio_pika-0.43b0.tar.gz", hash = "sha256:90847c68f8fd4ff40818e694e7f6f26d01616e617c8aa50f6de5bfba33f33e3e"}, +] + +[package.dependencies] +opentelemetry-api = ">=1.5,<2.0" +wrapt = ">=1.0.0,<2.0.0" + +[package.extras] +instruments = ["aio-pika (>=7.2.0,<10.0.0)"] +test = ["opentelemetry-instrumentation-aio-pika[instruments]", "opentelemetry-test-utils (==0.43b0)", "pytest", "wrapt (>=1.0.0,<2.0.0)"] + [[package]] name = "opentelemetry-instrumentation-asgi" version = "0.42b0" @@ -2548,6 +2567,21 @@ files = [ [package.extras] dev = ["black", "mypy", "pytest"] +[[package]] +name = "otel-extensions" +version = "1.0.1" +description = "Python extensions for OpenTelemetry" +optional = false +python-versions = ">=3.7" +files = [ + {file = "otel-extensions-1.0.1.tar.gz", hash = "sha256:b301ae271f37d7405fed648be92619f4af936e03fb96bf306195bdcf36478f2b"}, + {file = "otel_extensions-1.0.1-py2.py3-none-any.whl", hash = "sha256:45f548bf264424c5212be429c4d80687fcf59390cc0c6de32f91829d4a2bb8bb"}, +] + +[package.dependencies] +opentelemetry-api = "*" +opentelemetry-sdk = "*" + [[package]] name = "packaging" version = "23.2" @@ -4639,4 +4673,4 @@ testing = ["coverage (>=5.0.3)", "zope.event", "zope.testing"] [metadata] lock-version = "2.0" python-versions = "^3.8, < 3.12" -content-hash = "afb5efbd0f0bbc380dc76a04e10414de47507c6537c5265a9983dfb34f54d933" +content-hash = "c19c48936f71eb9a6af69366445a4c199f33ab4a8479aed62590a2939273a175" diff --git a/pyproject.toml b/pyproject.toml index b8c30e2575..5f28df0c96 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,6 +18,8 @@ infrahub-sdk = {path = "python_sdk", develop=true} pydantic = "^2.5" pydantic-settings = "^2.1" fastapi-storages = "~0.2" +otel-extensions = "1.0.1" +opentelemetry-instrumentation-aio-pika = "^0.43b0" [tool.poetry.group.server.dependencies] fastapi = "~0.108"