From b6cb89dda4714aaec3f301d84ddb46199709f1ad Mon Sep 17 00:00:00 2001 From: Fatih Acar Date: Thu, 25 Jan 2024 18:56:04 +0100 Subject: [PATCH 1/2] feat(backend): add e2e useful tracing Enabled tracing component inside git agent in order to have end to end spans for a single request. Added spans at most useful places such as message bus' execute_message and database execute_query. Signed-off-by: Fatih Acar --- backend/infrahub/cli/git_agent.py | 13 +- backend/infrahub/database/__init__.py | 21 ++- backend/infrahub/graphql/app.py | 26 +-- backend/infrahub/graphql/mutations/branch.py | 2 + backend/infrahub/server.py | 34 ++-- .../services/adapters/message_bus/rabbitmq.py | 29 +++ backend/infrahub/trace.py | 58 +++--- development/docker-compose.override.yml.tmp | 9 +- development/infrahub.toml | 2 +- poetry.lock | 165 +++++++++--------- pyproject.toml | 9 +- 11 files changed, 216 insertions(+), 152 deletions(-) diff --git a/backend/infrahub/cli/git_agent.py b/backend/infrahub/cli/git_agent.py index e71e8c83e7..04c3266a03 100644 --- a/backend/infrahub/cli/git_agent.py +++ b/backend/infrahub/cli/git_agent.py @@ -8,7 +8,7 @@ from prometheus_client import start_http_server from rich.logging import RichHandler -from infrahub import config +from infrahub import __version__, config from infrahub.components import ComponentType from infrahub.core.initialization import initialization from infrahub.database import InfrahubDatabase, get_db @@ -20,6 +20,7 @@ from infrahub.services import InfrahubServices from infrahub.services.adapters.cache.redis import RedisCache from infrahub.services.adapters.message_bus.rabbitmq import RabbitMQMessageBus +from infrahub.trace import configure_trace app = typer.Typer() @@ -66,6 +67,16 @@ async def _start(debug: bool, port: int) -> None: client = await InfrahubClient.init(address=config.SETTINGS.main.internal_address, retry_on_failure=True, log=log) await client.branch.all() + # Initialize trace + if config.SETTINGS.trace.enable: + configure_trace( + service="infrahub-git-agent", + version=__version__, + exporter_type=config.SETTINGS.trace.exporter_type, + exporter_endpoint=config.SETTINGS.trace.trace_endpoint, + exporter_protocol=config.SETTINGS.trace.exporter_protocol, + ) + # Initialize the lock initialize_lock() diff --git a/backend/infrahub/database/__init__.py b/backend/infrahub/database/__init__.py index b083997f5b..f5556dc721 100644 --- a/backend/infrahub/database/__init__.py +++ b/backend/infrahub/database/__init__.py @@ -15,6 +15,7 @@ Record, ) from neo4j.exceptions import ClientError, Neo4jError, ServiceUnavailable, TransientError +from opentelemetry import trace from typing_extensions import Self from infrahub import config @@ -186,17 +187,23 @@ async def close(self): async def execute_query( self, query: str, params: Optional[Dict[str, Any]] = None, name: Optional[str] = "undefined" ) -> List[Record]: - with QUERY_EXECUTION_METRICS.labels(str(self._session_mode), name).time(): - response = await self.run_query(query=query, params=params) - return [item async for item in response] + with trace.get_tracer(__name__).start_as_current_span("execute_db_query") as span: + span.set_attribute("query", query) + + with QUERY_EXECUTION_METRICS.labels(str(self._session_mode), name).time(): + response = await self.run_query(query=query, params=params) + return [item async for item in response] async def execute_query_with_metadata( self, query: str, params: Optional[Dict[str, Any]] = None, name: Optional[str] = "undefined" ) -> Tuple[List[Record], Dict[str, Any]]: - with QUERY_EXECUTION_METRICS.labels(str(self._session_mode), name).time(): - response = await self.run_query(query=query, params=params) - results = [item async for item in response] - return results, response._metadata or {} + with trace.get_tracer(__name__).start_as_current_span("execute_db_query_with_metadata") as span: + span.set_attribute("query", query) + + with QUERY_EXECUTION_METRICS.labels(str(self._session_mode), name).time(): + response = await self.run_query(query=query, params=params) + results = [item async for item in response] + return results, response._metadata or {} async def run_query(self, query: str, params: Optional[Dict[str, Any]] = None) -> AsyncResult: if self.is_transaction: diff --git a/backend/infrahub/graphql/app.py b/backend/infrahub/graphql/app.py index 1d413932db..808b4b33ba 100644 --- a/backend/infrahub/graphql/app.py +++ b/backend/infrahub/graphql/app.py @@ -38,6 +38,7 @@ from graphql.utilities import ( get_operation_ast, ) +from opentelemetry import trace from starlette.datastructures import UploadFile from starlette.requests import HTTPConnection, Request from starlette.responses import JSONResponse, Response @@ -222,17 +223,20 @@ async def _handle_http_request( "query_id": "", } - with GRAPHQL_DURATION_METRICS.labels(**labels).time(): - result = await graphql( - schema=graphql_params.schema, - source=query, - context_value=graphql_params.context, - root_value=self.root_value, - middleware=self.middleware, - variable_values=variable_values, - operation_name=operation_name, - execution_context_class=self.execution_context_class, - ) + with trace.get_tracer(__name__).start_as_current_span("execute_graphql") as span: + span.set_attributes(labels) + + with GRAPHQL_DURATION_METRICS.labels(**labels).time(): + result = await graphql( + schema=graphql_params.schema, + source=query, + context_value=graphql_params.context, + root_value=self.root_value, + middleware=self.middleware, + variable_values=variable_values, + operation_name=operation_name, + execution_context_class=self.execution_context_class, + ) response: Dict[str, Any] = {"data": result.data} if result.errors: diff --git a/backend/infrahub/graphql/mutations/branch.py b/backend/infrahub/graphql/mutations/branch.py index 75453c2e56..9f749d889e 100644 --- a/backend/infrahub/graphql/mutations/branch.py +++ b/backend/infrahub/graphql/mutations/branch.py @@ -5,6 +5,7 @@ import pydantic from graphene import Boolean, Field, InputObjectType, List, Mutation, String from infrahub_sdk.utils import extract_fields, extract_fields_first_node +from opentelemetry import trace from typing_extensions import Self from infrahub import config, lock @@ -55,6 +56,7 @@ class Arguments: @classmethod @retry_db_transaction(name="branch_create") + @trace.get_tracer(__name__).start_as_current_span("branch_create") async def mutate( cls, root: dict, info: GraphQLResolveInfo, data: BranchCreateInput, background_execution: bool = False ) -> Self: diff --git a/backend/infrahub/server.py b/backend/infrahub/server.py index a7b5009737..30dfe2361e 100644 --- a/backend/infrahub/server.py +++ b/backend/infrahub/server.py @@ -14,7 +14,7 @@ from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from infrahub_sdk.timestamp import TimestampFormatError -from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor +from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor, Span from pydantic import ValidationError from starlette_exporter import PrometheusMiddleware, handle_metrics @@ -34,7 +34,7 @@ from infrahub.services import InfrahubServices, services from infrahub.services.adapters.cache.redis import RedisCache from infrahub.services.adapters.message_bus.rabbitmq import RabbitMQMessageBus -from infrahub.trace import add_span_exception, configure_trace, get_traceid, get_tracer +from infrahub.trace import add_span_exception, configure_trace, get_traceid from infrahub.worker import WORKER_IDENTITY @@ -44,6 +44,7 @@ async def app_initialization(application: FastAPI) -> None: # Initialize trace if config.SETTINGS.trace.enable: configure_trace( + service="infrahub-server", version=__version__, exporter_type=config.SETTINGS.trace.exporter_type, exporter_endpoint=config.SETTINGS.trace.trace_endpoint, @@ -95,8 +96,13 @@ async def lifespan(application: FastAPI) -> AsyncGenerator: redoc_url="/api/redoc", ) -FastAPIInstrumentor().instrument_app(app, excluded_urls=".*/metrics") -tracer = get_tracer() + +def server_request_hook(span: Span, scope: dict) -> None: # pylint: disable=unused-argument + if span and span.is_recording(): + span.set_attribute("worker", WORKER_IDENTITY) + + +FastAPIInstrumentor().instrument_app(app, excluded_urls=".*/metrics", server_request_hook=server_request_hook) FRONTEND_DIRECTORY = os.environ.get("INFRAHUB_FRONTEND_DIRECTORY", os.path.abspath("frontend")) FRONTEND_ASSET_DIRECTORY = f"{FRONTEND_DIRECTORY}/dist/assets" @@ -118,15 +124,17 @@ async def lifespan(application: FastAPI) -> AsyncGenerator: async def logging_middleware(request: Request, call_next: Callable[[Request], Awaitable[Response]]) -> Response: clear_log_context() request_id = correlation_id.get() - with tracer.start_as_current_span(f"processing request {request_id}"): - trace_id = get_traceid() - set_log_data(key="request_id", value=request_id) - set_log_data(key="app", value="infrahub.api") - set_log_data(key="worker", value=WORKER_IDENTITY) - if trace_id: - set_log_data(key="trace_id", value=trace_id) - response = await call_next(request) - return response + + set_log_data(key="request_id", value=request_id) + set_log_data(key="app", value="infrahub.api") + set_log_data(key="worker", value=WORKER_IDENTITY) + + trace_id = get_traceid() + if trace_id: + set_log_data(key="trace_id", value=trace_id) + + response = await call_next(request) + return response @app.middleware("http") diff --git a/backend/infrahub/services/adapters/message_bus/rabbitmq.py b/backend/infrahub/services/adapters/message_bus/rabbitmq.py index cf20e07ab4..1911687162 100644 --- a/backend/infrahub/services/adapters/message_bus/rabbitmq.py +++ b/backend/infrahub/services/adapters/message_bus/rabbitmq.py @@ -4,8 +4,11 @@ from typing import TYPE_CHECKING, Awaitable, Callable, List, MutableMapping, Optional, Type, TypeVar import aio_pika +import opentelemetry.instrumentation.aio_pika.span_builder import ujson from infrahub_sdk import UUIDT +from opentelemetry.instrumentation.aio_pika import AioPikaInstrumentor +from opentelemetry.semconv.trace import SpanAttributes from infrahub import config from infrahub.components import ComponentType @@ -24,6 +27,7 @@ AbstractQueue, AbstractRobustConnection, ) + from opentelemetry.instrumentation.aio_pika.span_builder import SpanBuilder from infrahub.config import BrokerSettings from infrahub.services import InfrahubServices @@ -32,6 +36,29 @@ ResponseClass = TypeVar("ResponseClass") +AioPikaInstrumentor().instrument() + + +# TODO: remove this once https://github.com/open-telemetry/opentelemetry-python-contrib/issues/1835 is resolved +def patch_spanbuilder_set_channel() -> None: + """ + The default SpanBuilder.set_channel does not work with aio_pika 9.1 and the refactored connection + attribute + """ + + def set_channel(self: SpanBuilder, channel: AbstractChannel) -> None: + if hasattr(channel, "_connection"): + url = channel._connection.url + self._attributes.update( + { + SpanAttributes.NET_PEER_NAME: url.host, + SpanAttributes.NET_PEER_PORT: url.port, + } + ) + + opentelemetry.instrumentation.aio_pika.span_builder.SpanBuilder.set_channel = set_channel # type: ignore + + async def _add_request_id(message: InfrahubMessage) -> None: log_data = get_log_data() message.meta.request_id = log_data.get("request_id", "") @@ -54,6 +81,8 @@ def __init__(self, settings: Optional[BrokerSettings] = None) -> None: self.futures: MutableMapping[str, asyncio.Future] = {} async def initialize(self, service: InfrahubServices) -> None: + patch_spanbuilder_set_channel() + self.service = service self.connection = await aio_pika.connect_robust( host=self.settings.address, diff --git a/backend/infrahub/trace.py b/backend/infrahub/trace.py index 689b1f7aac..3a8588a43d 100644 --- a/backend/infrahub/trace.py +++ b/backend/infrahub/trace.py @@ -11,14 +11,35 @@ from opentelemetry.trace import StatusCode -def get_tracer(name: str = "infrahub") -> trace.Tracer: - return trace.get_tracer(name) - - def get_current_span_with_context() -> trace.Span: return trace.get_current_span() +def create_tracer_provider( + service: str, version: str, exporter_type: str, exporter_endpoint: str = None, exporter_protocol: str = None +) -> TracerProvider: + # Create a BatchSpanProcessor exporter based on the type + if exporter_type == "console": + exporter = ConsoleSpanExporter() + elif exporter_type == "otlp": + if not exporter_endpoint: + raise ValueError("Exporter type is set to otlp but endpoint is not set") + if exporter_protocol == "http/protobuf": + exporter = HTTPSpanExporter(endpoint=exporter_endpoint) + elif exporter_protocol == "grpc": + exporter = GRPCSpanExporter(endpoint=exporter_endpoint) + else: + raise ValueError("Exporter type unsupported by Infrahub") + + # Resource can be required for some backends, e.g. Jaeger + resource = Resource(attributes={"service.name": service, "service.version": version}) + span_processor = BatchSpanProcessor(exporter) + tracer_provider = TracerProvider(resource=resource) + tracer_provider.add_span_processor(span_processor) + + return tracer_provider + + def get_traceid() -> str: current_span = get_current_span_with_context() trace_id = current_span.get_span_context().trace_id @@ -54,42 +75,17 @@ def add_span_exception(exception: Exception) -> None: current_span.record_exception(exception) -def create_tracer_provider( - version: str, exporter_type: str, exporter_endpoint: str = None, exporter_protocol: str = None -) -> TracerProvider: - # Create a BatchSpanProcessor exporter based on the type - if exporter_type == "console": - exporter = ConsoleSpanExporter() - elif exporter_type == "otlp": - if not exporter_endpoint: - raise ValueError("Exporter type is set to otlp but endpoint is not set") - if exporter_protocol == "http/protobuf": - exporter = HTTPSpanExporter(endpoint=exporter_endpoint) - elif exporter_protocol == "grpc": - exporter = GRPCSpanExporter(endpoint=exporter_endpoint) - else: - raise ValueError("Exporter type unsupported by Infrahub") - - # Resource can be required for some backends, e.g. Jaeger - resource = Resource(attributes={"service.name": "infrahub", "service.version": version}) - span_processor = BatchSpanProcessor(exporter) - tracer_provider = TracerProvider(resource=resource) - tracer_provider.add_span_processor(span_processor) - - return tracer_provider - - def configure_trace( - version: str, exporter_type: str, exporter_endpoint: str = None, exporter_protocol: str = None + service: str, version: str, exporter_type: str, exporter_endpoint: str | None = None, exporter_protocol: str = None ) -> None: # Create a trace provider with the exporter tracer_provider = create_tracer_provider( + service=service, version=version, exporter_type=exporter_type, exporter_endpoint=exporter_endpoint, exporter_protocol=exporter_protocol, ) - tracer_provider.get_tracer(__name__) # Register the trace provider trace.set_tracer_provider(tracer_provider) diff --git a/development/docker-compose.override.yml.tmp b/development/docker-compose.override.yml.tmp index 81bc6b1d2f..0967decb3d 100644 --- a/development/docker-compose.override.yml.tmp +++ b/development/docker-compose.override.yml.tmp @@ -2,7 +2,7 @@ services: # -------------------------------------------------------------------------------- # - Prometheus to collect all metrics endpoints - # - Tempo to receive traces + # - Tempo or Jaeger to receive traces # - Grafana to visualize these metrics # - Loki to receive logs from promtail # - Promtail to parse logs from different source @@ -42,6 +42,13 @@ services: ports: - "3200:3200" + # jaeger: + # image: jaegertracing/all-in-one:1.53 + # environment: + # COLLECTOR_ZIPKIN_HOST_PORT: ":9411" + # ports: + # - "16686:16686" + prometheus: image: prom/prometheus:latest volumes: diff --git a/development/infrahub.toml b/development/infrahub.toml index 7aa6b1121f..c8aec827fc 100644 --- a/development/infrahub.toml +++ b/development/infrahub.toml @@ -30,7 +30,7 @@ enable = false insecure = "True" exporter_type = "otlp" exporter_protocol = "grpc" -exporter_endpoint = "tempo" +exporter_endpoint = "jaeger" exporter_port = 4317 diff --git a/poetry.lock b/poetry.lock index f75c877940..69098aeed4 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.1 and should not be changed by hand. [[package]] name = "aio-pika" @@ -179,17 +179,6 @@ files = [ {file = "backcall-0.2.0.tar.gz", hash = "sha256:5cbdbf27be5e7cfadb448baf0aa95508f91f2bbc6c6437cd9cd06e2a4c215e1e"}, ] -[[package]] -name = "backoff" -version = "2.2.1" -description = "Function decoration for backoff and retry" -optional = false -python-versions = ">=3.7,<4.0" -files = [ - {file = "backoff-2.2.1-py3-none-any.whl", hash = "sha256:63579f9a0628e06278f7e47b7d7d5b6ce20dc65c5e96a6f3ca99a6adca0396e8"}, - {file = "backoff-2.2.1.tar.gz", hash = "sha256:03f829f5bb1923180821643f8753b0502c3b682293992485b0eef2807afa5cba"}, -] - [[package]] name = "backports-zoneinfo" version = "0.2.1" @@ -2728,91 +2717,85 @@ files = [ [[package]] name = "opentelemetry-api" -version = "1.21.0" +version = "1.24.0" description = "OpenTelemetry Python API" optional = false -python-versions = ">=3.7" +python-versions = ">=3.8" files = [ - {file = "opentelemetry_api-1.21.0-py3-none-any.whl", hash = "sha256:4bb86b28627b7e41098f0e93280fe4892a1abed1b79a19aec6f928f39b17dffb"}, - {file = "opentelemetry_api-1.21.0.tar.gz", hash = "sha256:d6185fd5043e000075d921822fd2d26b953eba8ca21b1e2fa360dd46a7686316"}, + {file = "opentelemetry_api-1.24.0-py3-none-any.whl", hash = "sha256:0f2c363d98d10d1ce93330015ca7fd3a65f60be64e05e30f557c61de52c80ca2"}, + {file = "opentelemetry_api-1.24.0.tar.gz", hash = "sha256:42719f10ce7b5a9a73b10a4baf620574fb8ad495a9cbe5c18d76b75d8689c67e"}, ] [package.dependencies] deprecated = ">=1.2.6" -importlib-metadata = ">=6.0,<7.0" +importlib-metadata = ">=6.0,<=7.0" [[package]] name = "opentelemetry-exporter-otlp-proto-common" -version = "1.21.0" +version = "1.24.0" description = "OpenTelemetry Protobuf encoding" optional = false -python-versions = ">=3.7" +python-versions = ">=3.8" files = [ - {file = "opentelemetry_exporter_otlp_proto_common-1.21.0-py3-none-any.whl", hash = "sha256:97b1022b38270ec65d11fbfa348e0cd49d12006485c2321ea3b1b7037d42b6ec"}, - {file = "opentelemetry_exporter_otlp_proto_common-1.21.0.tar.gz", hash = "sha256:61db274d8a68d636fb2ec2a0f281922949361cdd8236e25ff5539edf942b3226"}, + {file = "opentelemetry_exporter_otlp_proto_common-1.24.0-py3-none-any.whl", hash = "sha256:e51f2c9735054d598ad2df5d3eca830fecfb5b0bda0a2fa742c9c7718e12f641"}, + {file = "opentelemetry_exporter_otlp_proto_common-1.24.0.tar.gz", hash = "sha256:5d31fa1ff976cacc38be1ec4e3279a3f88435c75b38b1f7a099a1faffc302461"}, ] [package.dependencies] -backoff = {version = ">=1.10.0,<3.0.0", markers = "python_version >= \"3.7\""} -opentelemetry-proto = "1.21.0" +opentelemetry-proto = "1.24.0" [[package]] name = "opentelemetry-exporter-otlp-proto-grpc" -version = "1.21.0" +version = "1.24.0" description = "OpenTelemetry Collector Protobuf over gRPC Exporter" optional = false -python-versions = ">=3.7" +python-versions = ">=3.8" files = [ - {file = "opentelemetry_exporter_otlp_proto_grpc-1.21.0-py3-none-any.whl", hash = "sha256:ab37c63d6cb58d6506f76d71d07018eb1f561d83e642a8f5aa53dddf306087a4"}, - {file = "opentelemetry_exporter_otlp_proto_grpc-1.21.0.tar.gz", hash = "sha256:a497c5611245a2d17d9aa1e1cbb7ab567843d53231dcc844a62cea9f0924ffa7"}, + {file = "opentelemetry_exporter_otlp_proto_grpc-1.24.0-py3-none-any.whl", hash = "sha256:f40d62aa30a0a43cc1657428e59fcf82ad5f7ea8fff75de0f9d9cb6f739e0a3b"}, + {file = "opentelemetry_exporter_otlp_proto_grpc-1.24.0.tar.gz", hash = "sha256:217c6e30634f2c9797999ea9da29f7300479a94a610139b9df17433f915e7baa"}, ] [package.dependencies] -backoff = {version = ">=1.10.0,<3.0.0", markers = "python_version >= \"3.7\""} deprecated = ">=1.2.6" googleapis-common-protos = ">=1.52,<2.0" grpcio = ">=1.0.0,<2.0.0" opentelemetry-api = ">=1.15,<2.0" -opentelemetry-exporter-otlp-proto-common = "1.21.0" -opentelemetry-proto = "1.21.0" -opentelemetry-sdk = ">=1.21.0,<1.22.0" +opentelemetry-exporter-otlp-proto-common = "1.24.0" +opentelemetry-proto = "1.24.0" +opentelemetry-sdk = ">=1.24.0,<1.25.0" [package.extras] test = ["pytest-grpc"] [[package]] name = "opentelemetry-exporter-otlp-proto-http" -version = "1.21.0" +version = "1.24.0" description = "OpenTelemetry Collector Protobuf over HTTP Exporter" optional = false -python-versions = ">=3.7" +python-versions = ">=3.8" files = [ - {file = "opentelemetry_exporter_otlp_proto_http-1.21.0-py3-none-any.whl", hash = "sha256:56837773de6fb2714c01fc4895caebe876f6397bbc4d16afddf89e1299a55ee2"}, - {file = "opentelemetry_exporter_otlp_proto_http-1.21.0.tar.gz", hash = "sha256:19d60afa4ae8597f7ef61ad75c8b6c6b7ef8cb73a33fb4aed4dbc86d5c8d3301"}, + {file = "opentelemetry_exporter_otlp_proto_http-1.24.0-py3-none-any.whl", hash = "sha256:25af10e46fdf4cd3833175e42f4879a1255fc01655fe14c876183a2903949836"}, + {file = "opentelemetry_exporter_otlp_proto_http-1.24.0.tar.gz", hash = "sha256:704c066cc96f5131881b75c0eac286cd73fc735c490b054838b4513254bd7850"}, ] [package.dependencies] -backoff = {version = ">=1.10.0,<3.0.0", markers = "python_version >= \"3.7\""} deprecated = ">=1.2.6" googleapis-common-protos = ">=1.52,<2.0" opentelemetry-api = ">=1.15,<2.0" -opentelemetry-exporter-otlp-proto-common = "1.21.0" -opentelemetry-proto = "1.21.0" -opentelemetry-sdk = ">=1.21.0,<1.22.0" +opentelemetry-exporter-otlp-proto-common = "1.24.0" +opentelemetry-proto = "1.24.0" +opentelemetry-sdk = ">=1.24.0,<1.25.0" requests = ">=2.7,<3.0" -[package.extras] -test = ["responses (==0.22.0)"] - [[package]] name = "opentelemetry-instrumentation" -version = "0.42b0" +version = "0.45b0" description = "Instrumentation Tools & Auto Instrumentation for OpenTelemetry Python" optional = false -python-versions = ">=3.7" +python-versions = ">=3.8" files = [ - {file = "opentelemetry_instrumentation-0.42b0-py3-none-any.whl", hash = "sha256:65ae54ddb90ca2d05d2d16bf6863173e7141eba1bbbf41fc9bbb02446adbe369"}, - {file = "opentelemetry_instrumentation-0.42b0.tar.gz", hash = "sha256:6a653a1fed0f76eea32885321d77c750483e987eeefa4cbf219fc83559543198"}, + {file = "opentelemetry_instrumentation-0.45b0-py3-none-any.whl", hash = "sha256:06c02e2c952c1b076e8eaedf1b82f715e2937ba7eeacab55913dd434fbcec258"}, + {file = "opentelemetry_instrumentation-0.45b0.tar.gz", hash = "sha256:6c47120a7970bbeb458e6a73686ee9ba84b106329a79e4a4a66761f933709c7e"}, ] [package.dependencies] @@ -2820,59 +2803,76 @@ opentelemetry-api = ">=1.4,<2.0" setuptools = ">=16.0" wrapt = ">=1.0.0,<2.0.0" +[[package]] +name = "opentelemetry-instrumentation-aio-pika" +version = "0.45b0" +description = "OpenTelemetry Aio-pika instrumentation" +optional = false +python-versions = ">=3.8" +files = [ + {file = "opentelemetry_instrumentation_aio_pika-0.45b0-py3-none-any.whl", hash = "sha256:c1d114ac06f1ea87a9f00786c351ba349472c151dab4dae8c341e0c0348e4571"}, + {file = "opentelemetry_instrumentation_aio_pika-0.45b0.tar.gz", hash = "sha256:a05543034f402211f83629e1e8a03a90c44aeeea79caca476d49a08471d3ddfa"}, +] + +[package.dependencies] +opentelemetry-api = ">=1.5,<2.0" +opentelemetry-instrumentation = "0.45b0" +wrapt = ">=1.0.0,<2.0.0" + +[package.extras] +instruments = ["aio-pika (>=7.2.0,<10.0.0)"] + [[package]] name = "opentelemetry-instrumentation-asgi" -version = "0.42b0" +version = "0.45b0" description = "ASGI instrumentation for OpenTelemetry" optional = false -python-versions = ">=3.7" +python-versions = ">=3.8" files = [ - {file = "opentelemetry_instrumentation_asgi-0.42b0-py3-none-any.whl", hash = "sha256:79b7278fb614aba1bf2211060960d3e8501c1d7d9314b857b30ad80ba34a2805"}, - {file = "opentelemetry_instrumentation_asgi-0.42b0.tar.gz", hash = "sha256:da1d5dd4f172c44c6c100dae352e1fd0ae36dc4f266b3fed68ce9d5ab94c9146"}, + {file = "opentelemetry_instrumentation_asgi-0.45b0-py3-none-any.whl", hash = "sha256:8be1157ed62f0db24e45fdf7933c530c4338bd025c5d4af7830e903c0756021b"}, + {file = "opentelemetry_instrumentation_asgi-0.45b0.tar.gz", hash = "sha256:97f55620f163fd3d20323e9fd8dc3aacc826c03397213ff36b877e0f4b6b08a6"}, ] [package.dependencies] asgiref = ">=3.0,<4.0" opentelemetry-api = ">=1.12,<2.0" -opentelemetry-instrumentation = "0.42b0" -opentelemetry-semantic-conventions = "0.42b0" -opentelemetry-util-http = "0.42b0" +opentelemetry-instrumentation = "0.45b0" +opentelemetry-semantic-conventions = "0.45b0" +opentelemetry-util-http = "0.45b0" [package.extras] instruments = ["asgiref (>=3.0,<4.0)"] -test = ["opentelemetry-instrumentation-asgi[instruments]", "opentelemetry-test-utils (==0.42b0)"] [[package]] name = "opentelemetry-instrumentation-fastapi" -version = "0.42b0" +version = "0.45b0" description = "OpenTelemetry FastAPI Instrumentation" optional = false -python-versions = ">=3.7" +python-versions = ">=3.8" files = [ - {file = "opentelemetry_instrumentation_fastapi-0.42b0-py3-none-any.whl", hash = "sha256:d53a26c4859767d5ba67109038cabc7165d97a8a8b7654ccde4ce290036d1725"}, - {file = "opentelemetry_instrumentation_fastapi-0.42b0.tar.gz", hash = "sha256:7181d4886e57182e93477c4b797a7cd5467820b93c238eeb3e7d27a563c176e8"}, + {file = "opentelemetry_instrumentation_fastapi-0.45b0-py3-none-any.whl", hash = "sha256:77d9c123a363129148f5f66d44094f3d67aaaa2b201396d94782b4a7f9ce4314"}, + {file = "opentelemetry_instrumentation_fastapi-0.45b0.tar.gz", hash = "sha256:5a6b91e1c08a01601845fcfcfdefd0a2aecdb3c356d4a436a3210cb58c21487e"}, ] [package.dependencies] opentelemetry-api = ">=1.12,<2.0" -opentelemetry-instrumentation = "0.42b0" -opentelemetry-instrumentation-asgi = "0.42b0" -opentelemetry-semantic-conventions = "0.42b0" -opentelemetry-util-http = "0.42b0" +opentelemetry-instrumentation = "0.45b0" +opentelemetry-instrumentation-asgi = "0.45b0" +opentelemetry-semantic-conventions = "0.45b0" +opentelemetry-util-http = "0.45b0" [package.extras] instruments = ["fastapi (>=0.58,<1.0)"] -test = ["httpx (>=0.22,<1.0)", "opentelemetry-instrumentation-fastapi[instruments]", "opentelemetry-test-utils (==0.42b0)", "requests (>=2.23,<3.0)"] [[package]] name = "opentelemetry-proto" -version = "1.21.0" +version = "1.24.0" description = "OpenTelemetry Python Proto" optional = false -python-versions = ">=3.7" +python-versions = ">=3.8" files = [ - {file = "opentelemetry_proto-1.21.0-py3-none-any.whl", hash = "sha256:32fc4248e83eebd80994e13963e683f25f3b443226336bb12b5b6d53638f50ba"}, - {file = "opentelemetry_proto-1.21.0.tar.gz", hash = "sha256:7d5172c29ed1b525b5ecf4ebe758c7138a9224441b3cfe683d0a237c33b1941f"}, + {file = "opentelemetry_proto-1.24.0-py3-none-any.whl", hash = "sha256:bcb80e1e78a003040db71ccf83f2ad2019273d1e0828089d183b18a1476527ce"}, + {file = "opentelemetry_proto-1.24.0.tar.gz", hash = "sha256:ff551b8ad63c6cabb1845ce217a6709358dfaba0f75ea1fa21a61ceddc78cab8"}, ] [package.dependencies] @@ -2880,40 +2880,40 @@ protobuf = ">=3.19,<5.0" [[package]] name = "opentelemetry-sdk" -version = "1.21.0" +version = "1.24.0" description = "OpenTelemetry Python SDK" optional = false -python-versions = ">=3.7" +python-versions = ">=3.8" files = [ - {file = "opentelemetry_sdk-1.21.0-py3-none-any.whl", hash = "sha256:9fe633243a8c655fedace3a0b89ccdfc654c0290ea2d8e839bd5db3131186f73"}, - {file = "opentelemetry_sdk-1.21.0.tar.gz", hash = "sha256:3ec8cd3020328d6bc5c9991ccaf9ae820ccb6395a5648d9a95d3ec88275b8879"}, + {file = "opentelemetry_sdk-1.24.0-py3-none-any.whl", hash = "sha256:fa731e24efe832e98bcd90902085b359dcfef7d9c9c00eb5b9a18587dae3eb59"}, + {file = "opentelemetry_sdk-1.24.0.tar.gz", hash = "sha256:75bc0563affffa827700e0f4f4a68e1e257db0df13372344aebc6f8a64cde2e5"}, ] [package.dependencies] -opentelemetry-api = "1.21.0" -opentelemetry-semantic-conventions = "0.42b0" +opentelemetry-api = "1.24.0" +opentelemetry-semantic-conventions = "0.45b0" typing-extensions = ">=3.7.4" [[package]] name = "opentelemetry-semantic-conventions" -version = "0.42b0" +version = "0.45b0" description = "OpenTelemetry Semantic Conventions" optional = false -python-versions = ">=3.7" +python-versions = ">=3.8" files = [ - {file = "opentelemetry_semantic_conventions-0.42b0-py3-none-any.whl", hash = "sha256:5cd719cbfec448af658860796c5d0fcea2fdf0945a2bed2363f42cb1ee39f526"}, - {file = "opentelemetry_semantic_conventions-0.42b0.tar.gz", hash = "sha256:44ae67a0a3252a05072877857e5cc1242c98d4cf12870159f1a94bec800d38ec"}, + {file = "opentelemetry_semantic_conventions-0.45b0-py3-none-any.whl", hash = "sha256:a4a6fb9a7bacd9167c082aa4681009e9acdbfa28ffb2387af50c2fef3d30c864"}, + {file = "opentelemetry_semantic_conventions-0.45b0.tar.gz", hash = "sha256:7c84215a44ac846bc4b8e32d5e78935c5c43482e491812a0bb8aaf87e4d92118"}, ] [[package]] name = "opentelemetry-util-http" -version = "0.42b0" +version = "0.45b0" description = "Web util for OpenTelemetry" optional = false -python-versions = ">=3.7" +python-versions = ">=3.8" files = [ - {file = "opentelemetry_util_http-0.42b0-py3-none-any.whl", hash = "sha256:764069ed2f7e9a98ed1a7a87111f838000484e388e81f467405933be4b0306c6"}, - {file = "opentelemetry_util_http-0.42b0.tar.gz", hash = "sha256:665e7d372837811aa08cbb9102d4da862441d1c9b1795d649ef08386c8a3cbbd"}, + {file = "opentelemetry_util_http-0.45b0-py3-none-any.whl", hash = "sha256:6628868b501b3004e1860f976f410eeb3d3499e009719d818000f24ce17b6e33"}, + {file = "opentelemetry_util_http-0.45b0.tar.gz", hash = "sha256:4ce08b6a7d52dd7c96b7705b5b4f06fdb6aa3eac1233b3b0bfef8a0cab9a92cd"}, ] [[package]] @@ -3974,6 +3974,7 @@ files = [ {file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9"}, + {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a08c6f0fe150303c1c6b71ebcd7213c2858041a7e01975da3a99aed1e7a378ef"}, {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0"}, {file = "PyYAML-6.0.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4"}, {file = "PyYAML-6.0.1-cp312-cp312-win32.whl", hash = "sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54"}, @@ -5203,4 +5204,4 @@ testing = ["coverage (>=5.0.3)", "zope.event", "zope.testing"] [metadata] lock-version = "2.0" python-versions = "^3.8, < 3.13" -content-hash = "7a9630bc79694d42fc63196bf2fd46fe9a5b67d7fe9607193afbf64e2b5732ae" +content-hash = "eb875d34c2fbf10e9e9cb33cc206ac8d9a2a30745b206fd6c90dcdc06458a163" diff --git a/pyproject.toml b/pyproject.toml index 182862f34d..106db3958d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,11 +24,6 @@ structlog = "24.1.0" boto3 = "~1.33.11" email-validator = "~2.1" redis = { version = "^5.0.0", extras = ["hiredis"]} -opentelemetry-api = "~1.21" -opentelemetry-sdk = "~1.21" -opentelemetry-instrumentation-fastapi = "^0.42b0" -opentelemetry-exporter-otlp-proto-http = "~1.21" -opentelemetry-exporter-otlp-proto-grpc = "~1.21" typer = "~0.7" # Dependencies specific to the API Server @@ -43,6 +38,10 @@ asgi-correlation-id = "4.2.0" # Middleware for FastAPI to generate ID per bcrypt = "~4.1" # Used to hash and validate password pyjwt = "~2.8" # Used to manage JWT tokens uvicorn = {version = "~0.27", extras = ["standard"]} +opentelemetry-instrumentation-aio-pika = "^0.45b0" +opentelemetry-instrumentation-fastapi = "^0.45b0" +opentelemetry-exporter-otlp-proto-grpc = "^1.24.0" +opentelemetry-exporter-otlp-proto-http = "^1.24.0" [tool.poetry.group.dev.dependencies] yamllint = "*" From 44cbe7bb38f99fb4fe2301e3b624ddacf2068f7d Mon Sep 17 00:00:00 2001 From: Fatih Acar Date: Sun, 31 Mar 2024 14:12:32 +0200 Subject: [PATCH 2/2] feat(ci): enable tracing during e2e tests Signed-off-by: Fatih Acar --- .github/workflows/ci.yml | 10 +++++ backend/infrahub/cli/git_agent.py | 2 +- backend/infrahub/config.py | 29 -------------- backend/infrahub/server.py | 2 +- backend/infrahub/trace.py | 65 +++++++++++++++++++------------ development/docker-compose.yml | 10 +++++ development/infrahub.toml | 9 ----- pyproject.toml | 4 -- 8 files changed, 62 insertions(+), 69 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 704851eabf..81c9bdf99f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -606,6 +606,16 @@ jobs: run: echo "INFRAHUB_DB_BACKUP_PORT=$(shuf -n 1 -i 10000-30000)" >> $GITHUB_ENV - name: Select vmagent port run: echo "VMAGENT_PORT=$(shuf -n 1 -i 10000-30000)" >> $GITHUB_ENV + + - name: Enable tracing + run: echo "INFRAHUB_TRACE_ENABLE=true" >> $GITHUB_ENV + - name: Set tracing configuration + run: echo "INFRAHUB_TRACE_INSECURE=false" >> $GITHUB_ENV + - name: Set tracing configuration + run: echo "INFRAHUB_TRACE_EXPORTER_ENDPOINT=${{ secrets.TRACING_ENDPOINT }}" >> $GITHUB_ENV + - name: Set tracing configuration + run: echo "OTEL_RESOURCE_ATTRIBUTES=github.run_id=${GITHUB_RUN_ID}" >> $GITHUB_ENV + - name: "Store start time" run: echo TEST_START_TIME=$(date +%s)000 >> $GITHUB_ENV diff --git a/backend/infrahub/cli/git_agent.py b/backend/infrahub/cli/git_agent.py index 04c3266a03..fc68c9e925 100644 --- a/backend/infrahub/cli/git_agent.py +++ b/backend/infrahub/cli/git_agent.py @@ -73,7 +73,7 @@ async def _start(debug: bool, port: int) -> None: service="infrahub-git-agent", version=__version__, exporter_type=config.SETTINGS.trace.exporter_type, - exporter_endpoint=config.SETTINGS.trace.trace_endpoint, + exporter_endpoint=config.SETTINGS.trace.exporter_endpoint, exporter_protocol=config.SETTINGS.trace.exporter_protocol, ) diff --git a/backend/infrahub/config.py b/backend/infrahub/config.py index a968f56d1d..ea0c6d1c17 100644 --- a/backend/infrahub/config.py +++ b/backend/infrahub/config.py @@ -279,35 +279,6 @@ class TraceSettings(BaseSettings): default=TraceTransportProtocol.GRPC, description="Protocol to be used for exporting traces" ) exporter_endpoint: Optional[str] = Field(default=None, description="OTLP endpoint for exporting traces") - exporter_port: Optional[int] = Field( - default=None, ge=1, le=65535, description="Specified if running on a non default port (4317)" - ) - - @property - def service_port(self) -> int: - if self.exporter_protocol == TraceTransportProtocol.GRPC: - default_port = 4317 - elif self.exporter_protocol == TraceTransportProtocol.HTTP_PROTOBUF: - default_port = 4318 - else: - default_port = 4317 - - return self.exporter_port or default_port - - @property - def trace_endpoint(self) -> Optional[str]: - if not self.exporter_endpoint: - return None - if self.insecure: - scheme = "http://" - else: - scheme = "https://" - endpoint = str(self.exporter_endpoint) + ":" + str(self.service_port) - - if self.exporter_protocol == TraceTransportProtocol.HTTP_PROTOBUF: - endpoint += "/v1/traces" - - return scheme + endpoint @dataclass diff --git a/backend/infrahub/server.py b/backend/infrahub/server.py index 30dfe2361e..5b98f7641c 100644 --- a/backend/infrahub/server.py +++ b/backend/infrahub/server.py @@ -47,7 +47,7 @@ async def app_initialization(application: FastAPI) -> None: service="infrahub-server", version=__version__, exporter_type=config.SETTINGS.trace.exporter_type, - exporter_endpoint=config.SETTINGS.trace.trace_endpoint, + exporter_endpoint=config.SETTINGS.trace.exporter_endpoint, exporter_protocol=config.SETTINGS.trace.exporter_protocol, ) diff --git a/backend/infrahub/trace.py b/backend/infrahub/trace.py index 3a8588a43d..c332380b61 100644 --- a/backend/infrahub/trace.py +++ b/backend/infrahub/trace.py @@ -1,3 +1,5 @@ +import os + from opentelemetry import trace from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( OTLPSpanExporter as GRPCSpanExporter, @@ -10,36 +12,13 @@ from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter from opentelemetry.trace import StatusCode +from infrahub.worker import WORKER_IDENTITY + def get_current_span_with_context() -> trace.Span: return trace.get_current_span() -def create_tracer_provider( - service: str, version: str, exporter_type: str, exporter_endpoint: str = None, exporter_protocol: str = None -) -> TracerProvider: - # Create a BatchSpanProcessor exporter based on the type - if exporter_type == "console": - exporter = ConsoleSpanExporter() - elif exporter_type == "otlp": - if not exporter_endpoint: - raise ValueError("Exporter type is set to otlp but endpoint is not set") - if exporter_protocol == "http/protobuf": - exporter = HTTPSpanExporter(endpoint=exporter_endpoint) - elif exporter_protocol == "grpc": - exporter = GRPCSpanExporter(endpoint=exporter_endpoint) - else: - raise ValueError("Exporter type unsupported by Infrahub") - - # Resource can be required for some backends, e.g. Jaeger - resource = Resource(attributes={"service.name": service, "service.version": version}) - span_processor = BatchSpanProcessor(exporter) - tracer_provider = TracerProvider(resource=resource) - tracer_provider.add_span_processor(span_processor) - - return tracer_provider - - def get_traceid() -> str: current_span = get_current_span_with_context() trace_id = current_span.get_span_context().trace_id @@ -75,6 +54,42 @@ def add_span_exception(exception: Exception) -> None: current_span.record_exception(exception) +def create_tracer_provider( + service: str, version: str, exporter_type: str, exporter_endpoint: str = None, exporter_protocol: str = None +) -> TracerProvider: + # Create a BatchSpanProcessor exporter based on the type + if exporter_type == "console": + exporter = ConsoleSpanExporter() + elif exporter_type == "otlp": + if not exporter_endpoint: + raise ValueError("Exporter type is set to otlp but endpoint is not set") + if exporter_protocol == "http/protobuf": + exporter = HTTPSpanExporter(endpoint=exporter_endpoint) + elif exporter_protocol == "grpc": + exporter = GRPCSpanExporter(endpoint=exporter_endpoint) + else: + raise ValueError("Exporter type unsupported by Infrahub") + + extra_attributes = {} + if os.getenv("OTEL_RESOURCE_ATTRIBUTES"): + extra_attributes = dict(attr.split("=") for attr in os.getenv("OTEL_RESOURCE_ATTRIBUTES").split(",")) + + # Resource can be required for some backends, e.g. Jaeger + resource = Resource( + attributes={ + "service.name": service, + "service.version": version, + "worker.id": WORKER_IDENTITY, + **extra_attributes, + } + ) + span_processor = BatchSpanProcessor(exporter) + tracer_provider = TracerProvider(resource=resource) + tracer_provider.add_span_processor(span_processor) + + return tracer_provider + + def configure_trace( service: str, version: str, exporter_type: str, exporter_endpoint: str | None = None, exporter_protocol: str = None ) -> None: diff --git a/development/docker-compose.yml b/development/docker-compose.yml index 2281185917..5a30ae3526 100644 --- a/development/docker-compose.yml +++ b/development/docker-compose.yml @@ -24,6 +24,11 @@ services: - "INFRAHUB_SECURITY_INITIAL_ADMIN_TOKEN=06438eb2-8019-4776-878c-0941b1f1d1ec" - "INFRAHUB_SECURITY_SECRET_KEY=327f747f-efac-42be-9e73-999f08f86b92" - "INFRAHUB_ALLOW_ANONYMOUS_ACCESS=true" + - "INFRAHUB_TRACE_ENABLE=${INFRAHUB_TRACE_ENABLE:-false}" + - "INFRAHUB_TRACE_INSECURE=${INFRAHUB_TRACE_INSECURE:-true}" + - "INFRAHUB_TRACE_EXPORTER_TYPE=${INFRAHUB_TRACE_EXPORTER_TYPE:-otlp}" + - "INFRAHUB_TRACE_EXPORTER_ENDPOINT=${INFRAHUB_TRACE_EXPORTER_ENDPOINT:-http://jaeger:4317}" + - "OTEL_RESOURCE_ATTRIBUTES=${OTEL_RESOURCE_ATTRIBUTES:-}" - "INFRAHUB_DB_TYPE=${INFRAHUB_DB_TYPE}" volumes: - "storage_data:/opt/infrahub/storage" @@ -58,6 +63,11 @@ services: - "INFRAHUB_LOG_LEVEL=DEBUG" - "INFRAHUB_SDK_API_TOKEN=06438eb2-8019-4776-878c-0941b1f1d1ec" - "INFRAHUB_SDK_TIMEOUT=20" + - "INFRAHUB_TRACE_ENABLE=${INFRAHUB_TRACE_ENABLE:-false}" + - "INFRAHUB_TRACE_INSECURE=${INFRAHUB_TRACE_INSECURE:-true}" + - "INFRAHUB_TRACE_EXPORTER_TYPE=${INFRAHUB_TRACE_EXPORTER_TYPE:-otlp}" + - "INFRAHUB_TRACE_EXPORTER_ENDPOINT=${INFRAHUB_TRACE_EXPORTER_ENDPOINT:-http://jaeger:4317}" + - "OTEL_RESOURCE_ATTRIBUTES=${OTEL_RESOURCE_ATTRIBUTES:-}" - "INFRAHUB_DB_TYPE=${INFRAHUB_DB_TYPE}" volumes: - "git_data:/opt/infrahub/git" diff --git a/development/infrahub.toml b/development/infrahub.toml index c8aec827fc..3a22ebe91a 100644 --- a/development/infrahub.toml +++ b/development/infrahub.toml @@ -25,14 +25,5 @@ driver = "local" [storage.local] path = "/opt/infrahub/storage" -[trace] -enable = false -insecure = "True" -exporter_type = "otlp" -exporter_protocol = "grpc" -exporter_endpoint = "jaeger" -exporter_port = 4317 - - # [experimental_features] # pull_request = true diff --git a/pyproject.toml b/pyproject.toml index 106db3958d..83a64e2e11 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -380,10 +380,6 @@ ignore_errors = true module = "infrahub.message_bus.operations.*" ignore_errors = true -[[tool.mypy.overrides]] -module = "infrahub.server" -ignore_errors = true - [[tool.mypy.overrides]] module = "infrahub.tasks.registry" ignore_errors = true