Skip to content

Commit

Permalink
Merge pull request #2037 from opsmill/fac-improve-tracing
Browse files Browse the repository at this point in the history
feat(backend): add e2e useful tracing
  • Loading branch information
fatih-acar authored Apr 10, 2024
2 parents bb765ca + 44cbe7b commit d8d35c7
Show file tree
Hide file tree
Showing 14 changed files with 227 additions and 170 deletions.
10 changes: 10 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,16 @@ jobs:
run: echo "INFRAHUB_DB_BACKUP_PORT=$(shuf -n 1 -i 10000-30000)" >> $GITHUB_ENV
- name: Select vmagent port
run: echo "VMAGENT_PORT=$(shuf -n 1 -i 10000-30000)" >> $GITHUB_ENV

- name: Enable tracing
run: echo "INFRAHUB_TRACE_ENABLE=true" >> $GITHUB_ENV
- name: Set tracing configuration
run: echo "INFRAHUB_TRACE_INSECURE=false" >> $GITHUB_ENV
- name: Set tracing configuration
run: echo "INFRAHUB_TRACE_EXPORTER_ENDPOINT=${{ secrets.TRACING_ENDPOINT }}" >> $GITHUB_ENV
- name: Set tracing configuration
run: echo "OTEL_RESOURCE_ATTRIBUTES=github.run_id=${GITHUB_RUN_ID}" >> $GITHUB_ENV

- name: "Store start time"
run: echo TEST_START_TIME=$(date +%s)000 >> $GITHUB_ENV

Expand Down
13 changes: 12 additions & 1 deletion backend/infrahub/cli/git_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from prometheus_client import start_http_server
from rich.logging import RichHandler

from infrahub import config
from infrahub import __version__, config
from infrahub.components import ComponentType
from infrahub.core.initialization import initialization
from infrahub.database import InfrahubDatabase, get_db
Expand All @@ -20,6 +20,7 @@
from infrahub.services import InfrahubServices
from infrahub.services.adapters.cache.redis import RedisCache
from infrahub.services.adapters.message_bus.rabbitmq import RabbitMQMessageBus
from infrahub.trace import configure_trace

app = typer.Typer()

Expand Down Expand Up @@ -66,6 +67,16 @@ async def _start(debug: bool, port: int) -> None:
client = await InfrahubClient.init(address=config.SETTINGS.main.internal_address, retry_on_failure=True, log=log)
await client.branch.all()

# Initialize trace
if config.SETTINGS.trace.enable:
configure_trace(
service="infrahub-git-agent",
version=__version__,
exporter_type=config.SETTINGS.trace.exporter_type,
exporter_endpoint=config.SETTINGS.trace.exporter_endpoint,
exporter_protocol=config.SETTINGS.trace.exporter_protocol,
)

# Initialize the lock
initialize_lock()

Expand Down
29 changes: 0 additions & 29 deletions backend/infrahub/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,35 +279,6 @@ class TraceSettings(BaseSettings):
default=TraceTransportProtocol.GRPC, description="Protocol to be used for exporting traces"
)
exporter_endpoint: Optional[str] = Field(default=None, description="OTLP endpoint for exporting traces")
exporter_port: Optional[int] = Field(
default=None, ge=1, le=65535, description="Specified if running on a non default port (4317)"
)

@property
def service_port(self) -> int:
if self.exporter_protocol == TraceTransportProtocol.GRPC:
default_port = 4317
elif self.exporter_protocol == TraceTransportProtocol.HTTP_PROTOBUF:
default_port = 4318
else:
default_port = 4317

return self.exporter_port or default_port

@property
def trace_endpoint(self) -> Optional[str]:
if not self.exporter_endpoint:
return None
if self.insecure:
scheme = "http://"
else:
scheme = "https://"
endpoint = str(self.exporter_endpoint) + ":" + str(self.service_port)

if self.exporter_protocol == TraceTransportProtocol.HTTP_PROTOBUF:
endpoint += "/v1/traces"

return scheme + endpoint


@dataclass
Expand Down
21 changes: 14 additions & 7 deletions backend/infrahub/database/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
Record,
)
from neo4j.exceptions import ClientError, Neo4jError, ServiceUnavailable, TransientError
from opentelemetry import trace
from typing_extensions import Self

from infrahub import config
Expand Down Expand Up @@ -186,17 +187,23 @@ async def close(self):
async def execute_query(
self, query: str, params: Optional[Dict[str, Any]] = None, name: Optional[str] = "undefined"
) -> List[Record]:
with QUERY_EXECUTION_METRICS.labels(str(self._session_mode), name).time():
response = await self.run_query(query=query, params=params)
return [item async for item in response]
with trace.get_tracer(__name__).start_as_current_span("execute_db_query") as span:
span.set_attribute("query", query)

with QUERY_EXECUTION_METRICS.labels(str(self._session_mode), name).time():
response = await self.run_query(query=query, params=params)
return [item async for item in response]

async def execute_query_with_metadata(
self, query: str, params: Optional[Dict[str, Any]] = None, name: Optional[str] = "undefined"
) -> Tuple[List[Record], Dict[str, Any]]:
with QUERY_EXECUTION_METRICS.labels(str(self._session_mode), name).time():
response = await self.run_query(query=query, params=params)
results = [item async for item in response]
return results, response._metadata or {}
with trace.get_tracer(__name__).start_as_current_span("execute_db_query_with_metadata") as span:
span.set_attribute("query", query)

with QUERY_EXECUTION_METRICS.labels(str(self._session_mode), name).time():
response = await self.run_query(query=query, params=params)
results = [item async for item in response]
return results, response._metadata or {}

async def run_query(self, query: str, params: Optional[Dict[str, Any]] = None) -> AsyncResult:
if self.is_transaction:
Expand Down
26 changes: 15 additions & 11 deletions backend/infrahub/graphql/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from graphql.utilities import (
get_operation_ast,
)
from opentelemetry import trace
from starlette.datastructures import UploadFile
from starlette.requests import HTTPConnection, Request
from starlette.responses import JSONResponse, Response
Expand Down Expand Up @@ -222,17 +223,20 @@ async def _handle_http_request(
"query_id": "",
}

with GRAPHQL_DURATION_METRICS.labels(**labels).time():
result = await graphql(
schema=graphql_params.schema,
source=query,
context_value=graphql_params.context,
root_value=self.root_value,
middleware=self.middleware,
variable_values=variable_values,
operation_name=operation_name,
execution_context_class=self.execution_context_class,
)
with trace.get_tracer(__name__).start_as_current_span("execute_graphql") as span:
span.set_attributes(labels)

with GRAPHQL_DURATION_METRICS.labels(**labels).time():
result = await graphql(
schema=graphql_params.schema,
source=query,
context_value=graphql_params.context,
root_value=self.root_value,
middleware=self.middleware,
variable_values=variable_values,
operation_name=operation_name,
execution_context_class=self.execution_context_class,
)

response: Dict[str, Any] = {"data": result.data}
if result.errors:
Expand Down
2 changes: 2 additions & 0 deletions backend/infrahub/graphql/mutations/branch.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import pydantic
from graphene import Boolean, Field, InputObjectType, List, Mutation, String
from infrahub_sdk.utils import extract_fields, extract_fields_first_node
from opentelemetry import trace
from typing_extensions import Self

from infrahub import config, lock
Expand Down Expand Up @@ -55,6 +56,7 @@ class Arguments:

@classmethod
@retry_db_transaction(name="branch_create")
@trace.get_tracer(__name__).start_as_current_span("branch_create")
async def mutate(
cls, root: dict, info: GraphQLResolveInfo, data: BranchCreateInput, background_execution: bool = False
) -> Self:
Expand Down
36 changes: 22 additions & 14 deletions backend/infrahub/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from infrahub_sdk.timestamp import TimestampFormatError
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor, Span
from pydantic import ValidationError
from starlette_exporter import PrometheusMiddleware, handle_metrics

Expand All @@ -34,7 +34,7 @@
from infrahub.services import InfrahubServices, services
from infrahub.services.adapters.cache.redis import RedisCache
from infrahub.services.adapters.message_bus.rabbitmq import RabbitMQMessageBus
from infrahub.trace import add_span_exception, configure_trace, get_traceid, get_tracer
from infrahub.trace import add_span_exception, configure_trace, get_traceid
from infrahub.worker import WORKER_IDENTITY


Expand All @@ -44,9 +44,10 @@ async def app_initialization(application: FastAPI) -> None:
# Initialize trace
if config.SETTINGS.trace.enable:
configure_trace(
service="infrahub-server",
version=__version__,
exporter_type=config.SETTINGS.trace.exporter_type,
exporter_endpoint=config.SETTINGS.trace.trace_endpoint,
exporter_endpoint=config.SETTINGS.trace.exporter_endpoint,
exporter_protocol=config.SETTINGS.trace.exporter_protocol,
)

Expand Down Expand Up @@ -95,8 +96,13 @@ async def lifespan(application: FastAPI) -> AsyncGenerator:
redoc_url="/api/redoc",
)

FastAPIInstrumentor().instrument_app(app, excluded_urls=".*/metrics")
tracer = get_tracer()

def server_request_hook(span: Span, scope: dict) -> None: # pylint: disable=unused-argument
if span and span.is_recording():
span.set_attribute("worker", WORKER_IDENTITY)


FastAPIInstrumentor().instrument_app(app, excluded_urls=".*/metrics", server_request_hook=server_request_hook)

FRONTEND_DIRECTORY = os.environ.get("INFRAHUB_FRONTEND_DIRECTORY", os.path.abspath("frontend"))
FRONTEND_ASSET_DIRECTORY = f"{FRONTEND_DIRECTORY}/dist/assets"
Expand All @@ -118,15 +124,17 @@ async def lifespan(application: FastAPI) -> AsyncGenerator:
async def logging_middleware(request: Request, call_next: Callable[[Request], Awaitable[Response]]) -> Response:
clear_log_context()
request_id = correlation_id.get()
with tracer.start_as_current_span(f"processing request {request_id}"):
trace_id = get_traceid()
set_log_data(key="request_id", value=request_id)
set_log_data(key="app", value="infrahub.api")
set_log_data(key="worker", value=WORKER_IDENTITY)
if trace_id:
set_log_data(key="trace_id", value=trace_id)
response = await call_next(request)
return response

set_log_data(key="request_id", value=request_id)
set_log_data(key="app", value="infrahub.api")
set_log_data(key="worker", value=WORKER_IDENTITY)

trace_id = get_traceid()
if trace_id:
set_log_data(key="trace_id", value=trace_id)

response = await call_next(request)
return response


@app.middleware("http")
Expand Down
29 changes: 29 additions & 0 deletions backend/infrahub/services/adapters/message_bus/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
from typing import TYPE_CHECKING, Awaitable, Callable, List, MutableMapping, Optional, Type, TypeVar

import aio_pika
import opentelemetry.instrumentation.aio_pika.span_builder
import ujson
from infrahub_sdk import UUIDT
from opentelemetry.instrumentation.aio_pika import AioPikaInstrumentor
from opentelemetry.semconv.trace import SpanAttributes

from infrahub import config
from infrahub.components import ComponentType
Expand All @@ -24,6 +27,7 @@
AbstractQueue,
AbstractRobustConnection,
)
from opentelemetry.instrumentation.aio_pika.span_builder import SpanBuilder

from infrahub.config import BrokerSettings
from infrahub.services import InfrahubServices
Expand All @@ -32,6 +36,29 @@
ResponseClass = TypeVar("ResponseClass")


AioPikaInstrumentor().instrument()


# TODO: remove this once https://github.com/open-telemetry/opentelemetry-python-contrib/issues/1835 is resolved
def patch_spanbuilder_set_channel() -> None:
"""
The default SpanBuilder.set_channel does not work with aio_pika 9.1 and the refactored connection
attribute
"""

def set_channel(self: SpanBuilder, channel: AbstractChannel) -> None:
if hasattr(channel, "_connection"):
url = channel._connection.url
self._attributes.update(
{
SpanAttributes.NET_PEER_NAME: url.host,
SpanAttributes.NET_PEER_PORT: url.port,
}
)

opentelemetry.instrumentation.aio_pika.span_builder.SpanBuilder.set_channel = set_channel # type: ignore


async def _add_request_id(message: InfrahubMessage) -> None:
log_data = get_log_data()
message.meta.request_id = log_data.get("request_id", "")
Expand All @@ -54,6 +81,8 @@ def __init__(self, settings: Optional[BrokerSettings] = None) -> None:
self.futures: MutableMapping[str, asyncio.Future] = {}

async def initialize(self, service: InfrahubServices) -> None:
patch_spanbuilder_set_channel()

self.service = service
self.connection = await aio_pika.connect_robust(
host=self.settings.address,
Expand Down
25 changes: 18 additions & 7 deletions backend/infrahub/trace.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import os

from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
OTLPSpanExporter as GRPCSpanExporter,
Expand All @@ -10,9 +12,7 @@
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.trace import StatusCode


def get_tracer(name: str = "infrahub") -> trace.Tracer:
return trace.get_tracer(name)
from infrahub.worker import WORKER_IDENTITY


def get_current_span_with_context() -> trace.Span:
Expand Down Expand Up @@ -55,7 +55,7 @@ def add_span_exception(exception: Exception) -> None:


def create_tracer_provider(
version: str, exporter_type: str, exporter_endpoint: str = None, exporter_protocol: str = None
service: str, version: str, exporter_type: str, exporter_endpoint: str = None, exporter_protocol: str = None
) -> TracerProvider:
# Create a BatchSpanProcessor exporter based on the type
if exporter_type == "console":
Expand All @@ -70,8 +70,19 @@ def create_tracer_provider(
else:
raise ValueError("Exporter type unsupported by Infrahub")

extra_attributes = {}
if os.getenv("OTEL_RESOURCE_ATTRIBUTES"):
extra_attributes = dict(attr.split("=") for attr in os.getenv("OTEL_RESOURCE_ATTRIBUTES").split(","))

# Resource can be required for some backends, e.g. Jaeger
resource = Resource(attributes={"service.name": "infrahub", "service.version": version})
resource = Resource(
attributes={
"service.name": service,
"service.version": version,
"worker.id": WORKER_IDENTITY,
**extra_attributes,
}
)
span_processor = BatchSpanProcessor(exporter)
tracer_provider = TracerProvider(resource=resource)
tracer_provider.add_span_processor(span_processor)
Expand All @@ -80,16 +91,16 @@ def create_tracer_provider(


def configure_trace(
version: str, exporter_type: str, exporter_endpoint: str = None, exporter_protocol: str = None
service: str, version: str, exporter_type: str, exporter_endpoint: str | None = None, exporter_protocol: str = None
) -> None:
# Create a trace provider with the exporter
tracer_provider = create_tracer_provider(
service=service,
version=version,
exporter_type=exporter_type,
exporter_endpoint=exporter_endpoint,
exporter_protocol=exporter_protocol,
)
tracer_provider.get_tracer(__name__)

# Register the trace provider
trace.set_tracer_provider(tracer_provider)
Loading

0 comments on commit d8d35c7

Please sign in to comment.