Skip to content

Commit

Permalink
feat(backend): add e2e useful tracing
Browse files Browse the repository at this point in the history
Enabled tracing component inside git agent in order to have end to end
spans for a single request.
Added spans at most useful places such as message bus' execute_message
and database execute_query.

Signed-off-by: Fatih Acar <fatih@opsmill.com>
  • Loading branch information
fatih-acar committed Mar 31, 2024
1 parent 7281010 commit 7da266e
Show file tree
Hide file tree
Showing 10 changed files with 201 additions and 141 deletions.
13 changes: 12 additions & 1 deletion backend/infrahub/cli/git_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from prometheus_client import start_http_server
from rich.logging import RichHandler

from infrahub import config
from infrahub import __version__, config
from infrahub.components import ComponentType
from infrahub.core.initialization import initialization
from infrahub.database import InfrahubDatabase, get_db
Expand All @@ -20,6 +20,7 @@
from infrahub.services import InfrahubServices
from infrahub.services.adapters.cache.redis import RedisCache
from infrahub.services.adapters.message_bus.rabbitmq import RabbitMQMessageBus
from infrahub.trace import configure_trace

app = typer.Typer()

Expand Down Expand Up @@ -66,6 +67,16 @@ async def _start(debug: bool, port: int) -> None:
client = await InfrahubClient.init(address=config.SETTINGS.main.internal_address, retry_on_failure=True, log=log)
await client.branch.all()

# Initialize trace
if config.SETTINGS.trace.enable:
configure_trace(
service="infrahub-git-agent",
version=__version__,
exporter_type=config.SETTINGS.trace.exporter_type,
exporter_endpoint=config.SETTINGS.trace.trace_endpoint,
exporter_protocol=config.SETTINGS.trace.exporter_protocol,
)

# Initialize the lock
initialize_lock()

Expand Down
21 changes: 14 additions & 7 deletions backend/infrahub/database/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
Record,
)
from neo4j.exceptions import ClientError, Neo4jError, ServiceUnavailable, TransientError
from opentelemetry import trace
from typing_extensions import Self

from infrahub import config
Expand Down Expand Up @@ -167,17 +168,23 @@ async def close(self):
async def execute_query(
self, query: str, params: Optional[Dict[str, Any]] = None, name: Optional[str] = "undefined"
) -> List[Record]:
with QUERY_EXECUTION_METRICS.labels(str(self._session_mode), name).time():
response = await self.run_query(query=query, params=params)
return [item async for item in response]
with trace.get_tracer(__name__).start_as_current_span("execute_db_query") as span:
span.set_attribute("query", query)

with QUERY_EXECUTION_METRICS.labels(str(self._session_mode), name).time():
response = await self.run_query(query=query, params=params)
return [item async for item in response]

async def execute_query_with_metadata(
self, query: str, params: Optional[Dict[str, Any]] = None, name: Optional[str] = "undefined"
) -> Tuple[List[Record], Dict[str, Any]]:
with QUERY_EXECUTION_METRICS.labels(str(self._session_mode), name).time():
response = await self.run_query(query=query, params=params)
results = [item async for item in response]
return results, response._metadata or {}
with trace.get_tracer(__name__).start_as_current_span("execute_db_query_with_metadata") as span:
span.set_attribute("query", query)

with QUERY_EXECUTION_METRICS.labels(str(self._session_mode), name).time():
response = await self.run_query(query=query, params=params)
results = [item async for item in response]
return results, response._metadata or {}

async def run_query(self, query: str, params: Optional[Dict[str, Any]] = None) -> AsyncResult:
if self.is_transaction:
Expand Down
2 changes: 2 additions & 0 deletions backend/infrahub/graphql/mutations/branch.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import pydantic
from graphene import Boolean, Field, InputObjectType, List, Mutation, String
from infrahub_sdk.utils import extract_fields, extract_fields_first_node
from opentelemetry import trace
from typing_extensions import Self

from infrahub import config, lock
Expand Down Expand Up @@ -55,6 +56,7 @@ class Arguments:

@classmethod
@retry_db_transaction(name="branch_create")
@trace.get_tracer(__name__).start_as_current_span("branch_create")
async def mutate(
cls, root: dict, info: GraphQLResolveInfo, data: BranchCreateInput, background_execution: bool = False
) -> Self:
Expand Down
34 changes: 21 additions & 13 deletions backend/infrahub/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from infrahub_sdk.timestamp import TimestampFormatError
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor, Span
from pydantic import ValidationError
from starlette_exporter import PrometheusMiddleware, handle_metrics

Expand All @@ -33,7 +33,7 @@
from infrahub.services import InfrahubServices, services
from infrahub.services.adapters.cache.redis import RedisCache
from infrahub.services.adapters.message_bus.rabbitmq import RabbitMQMessageBus
from infrahub.trace import add_span_exception, configure_trace, get_traceid, get_tracer
from infrahub.trace import add_span_exception, configure_trace, get_traceid
from infrahub.worker import WORKER_IDENTITY


Expand All @@ -43,6 +43,7 @@ async def app_initialization(application: FastAPI) -> None:
# Initialize trace
if config.SETTINGS.trace.enable:
configure_trace(
service="infrahub-server",
version=__version__,
exporter_type=config.SETTINGS.trace.exporter_type,
exporter_endpoint=config.SETTINGS.trace.trace_endpoint,
Expand Down Expand Up @@ -93,8 +94,13 @@ async def lifespan(application: FastAPI) -> AsyncGenerator:
redoc_url="/api/redoc",
)

FastAPIInstrumentor().instrument_app(app, excluded_urls=".*/metrics")
tracer = get_tracer()

def server_request_hook(span: Span, scope: dict) -> None: # pylint: disable=unused-argument
if span and span.is_recording():
span.set_attribute("worker", WORKER_IDENTITY)


FastAPIInstrumentor().instrument_app(app, excluded_urls=".*/metrics", server_request_hook=server_request_hook)

FRONTEND_DIRECTORY = os.environ.get("INFRAHUB_FRONTEND_DIRECTORY", os.path.abspath("frontend"))
FRONTEND_ASSET_DIRECTORY = f"{FRONTEND_DIRECTORY}/dist/assets"
Expand All @@ -116,15 +122,17 @@ async def lifespan(application: FastAPI) -> AsyncGenerator:
async def logging_middleware(request: Request, call_next: Callable[[Request], Awaitable[Response]]) -> Response:
clear_log_context()
request_id = correlation_id.get()
with tracer.start_as_current_span(f"processing request {request_id}"):
trace_id = get_traceid()
set_log_data(key="request_id", value=request_id)
set_log_data(key="app", value="infrahub.api")
set_log_data(key="worker", value=WORKER_IDENTITY)
if trace_id:
set_log_data(key="trace_id", value=trace_id)
response = await call_next(request)
return response

set_log_data(key="request_id", value=request_id)
set_log_data(key="app", value="infrahub.api")
set_log_data(key="worker", value=WORKER_IDENTITY)

trace_id = get_traceid()
if trace_id:
set_log_data(key="trace_id", value=trace_id)

response = await call_next(request)
return response


@app.middleware("http")
Expand Down
29 changes: 29 additions & 0 deletions backend/infrahub/services/adapters/message_bus/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
from typing import TYPE_CHECKING, Awaitable, Callable, List, MutableMapping, Optional, Type, TypeVar

import aio_pika
import opentelemetry.instrumentation.aio_pika.span_builder
import ujson
from infrahub_sdk import UUIDT
from opentelemetry.instrumentation.aio_pika import AioPikaInstrumentor
from opentelemetry.semconv.trace import SpanAttributes

from infrahub import config
from infrahub.components import ComponentType
Expand All @@ -24,6 +27,7 @@
AbstractQueue,
AbstractRobustConnection,
)
from opentelemetry.instrumentation.aio_pika.span_builder import SpanBuilder

from infrahub.config import BrokerSettings
from infrahub.services import InfrahubServices
Expand All @@ -32,6 +36,29 @@
ResponseClass = TypeVar("ResponseClass")


AioPikaInstrumentor().instrument()


# TODO: remove this once https://github.com/open-telemetry/opentelemetry-python-contrib/issues/1835 is resolved
def patch_spanbuilder_set_channel() -> None:
"""
The default SpanBuilder.set_channel does not work with aio_pika 9.1 and the refactored connection
attribute
"""

def set_channel(self: SpanBuilder, channel: AbstractChannel) -> None:
if hasattr(channel, "_connection"):
url = channel._connection.url
self._attributes.update(
{
SpanAttributes.NET_PEER_NAME: url.host,
SpanAttributes.NET_PEER_PORT: url.port,
}
)

opentelemetry.instrumentation.aio_pika.span_builder.SpanBuilder.set_channel = set_channel # type: ignore


async def _add_request_id(message: InfrahubMessage) -> None:
log_data = get_log_data()
message.meta.request_id = log_data.get("request_id", "")
Expand All @@ -54,6 +81,8 @@ def __init__(self, settings: Optional[BrokerSettings] = None) -> None:
self.futures: MutableMapping[str, asyncio.Future] = {}

async def initialize(self, service: InfrahubServices) -> None:
patch_spanbuilder_set_channel()

self.service = service
self.connection = await aio_pika.connect_robust(
host=self.settings.address,
Expand Down
58 changes: 27 additions & 31 deletions backend/infrahub/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,35 @@
from opentelemetry.trace import StatusCode


def get_tracer(name: str = "infrahub") -> trace.Tracer:
return trace.get_tracer(name)


def get_current_span_with_context() -> trace.Span:
return trace.get_current_span()


def create_tracer_provider(
service: str, version: str, exporter_type: str, exporter_endpoint: str = None, exporter_protocol: str = None
) -> TracerProvider:
# Create a BatchSpanProcessor exporter based on the type
if exporter_type == "console":
exporter = ConsoleSpanExporter()
elif exporter_type == "otlp":
if not exporter_endpoint:
raise ValueError("Exporter type is set to otlp but endpoint is not set")
if exporter_protocol == "http/protobuf":
exporter = HTTPSpanExporter(endpoint=exporter_endpoint)
elif exporter_protocol == "grpc":
exporter = GRPCSpanExporter(endpoint=exporter_endpoint)
else:
raise ValueError("Exporter type unsupported by Infrahub")

# Resource can be required for some backends, e.g. Jaeger
resource = Resource(attributes={"service.name": service, "service.version": version})
span_processor = BatchSpanProcessor(exporter)
tracer_provider = TracerProvider(resource=resource)
tracer_provider.add_span_processor(span_processor)

return tracer_provider


def get_traceid() -> str:
current_span = get_current_span_with_context()
trace_id = current_span.get_span_context().trace_id
Expand Down Expand Up @@ -54,42 +75,17 @@ def add_span_exception(exception: Exception) -> None:
current_span.record_exception(exception)


def create_tracer_provider(
version: str, exporter_type: str, exporter_endpoint: str = None, exporter_protocol: str = None
) -> TracerProvider:
# Create a BatchSpanProcessor exporter based on the type
if exporter_type == "console":
exporter = ConsoleSpanExporter()
elif exporter_type == "otlp":
if not exporter_endpoint:
raise ValueError("Exporter type is set to otlp but endpoint is not set")
if exporter_protocol == "http/protobuf":
exporter = HTTPSpanExporter(endpoint=exporter_endpoint)
elif exporter_protocol == "grpc":
exporter = GRPCSpanExporter(endpoint=exporter_endpoint)
else:
raise ValueError("Exporter type unsupported by Infrahub")

# Resource can be required for some backends, e.g. Jaeger
resource = Resource(attributes={"service.name": "infrahub", "service.version": version})
span_processor = BatchSpanProcessor(exporter)
tracer_provider = TracerProvider(resource=resource)
tracer_provider.add_span_processor(span_processor)

return tracer_provider


def configure_trace(
version: str, exporter_type: str, exporter_endpoint: str = None, exporter_protocol: str = None
service: str, version: str, exporter_type: str, exporter_endpoint: str | None = None, exporter_protocol: str = None
) -> None:
# Create a trace provider with the exporter
tracer_provider = create_tracer_provider(
service=service,
version=version,
exporter_type=exporter_type,
exporter_endpoint=exporter_endpoint,
exporter_protocol=exporter_protocol,
)
tracer_provider.get_tracer(__name__)

# Register the trace provider
trace.set_tracer_provider(tracer_provider)
9 changes: 8 additions & 1 deletion development/docker-compose.override.yml.tmp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ version: "3.4"
services:
# --------------------------------------------------------------------------------
# - Prometheus to collect all metrics endpoints
# - Tempo to receive traces
# - Tempo or Jaeger to receive traces
# - Grafana to visualize these metrics
# - Loki to receive logs from promtail
# - Promtail to parse logs from different source
Expand Down Expand Up @@ -43,6 +43,13 @@ services:
ports:
- "3200:3200"

# jaeger:
# image: jaegertracing/all-in-one:1.53
# environment:
# COLLECTOR_ZIPKIN_HOST_PORT: ":9411"
# ports:
# - "16686:16686"

prometheus:
image: prom/prometheus:latest
volumes:
Expand Down
2 changes: 1 addition & 1 deletion development/infrahub.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ enable = false
insecure = "True"
exporter_type = "otlp"
exporter_protocol = "grpc"
exporter_endpoint = "tempo"
exporter_endpoint = "jaeger"
exporter_port = 4317


Expand Down
Loading

0 comments on commit 7da266e

Please sign in to comment.