diff --git a/packages/phoenix-otel/README.md b/packages/phoenix-otel/README.md index 3b0eeb4966..2b9c323051 100644 --- a/packages/phoenix-otel/README.md +++ b/packages/phoenix-otel/README.md @@ -3,7 +3,7 @@ Provides a lightweight wrapper around OpenTelemetry primitives with Phoenix-aware defaults. These defaults are aware of the `PHOENIX_COLLECTOR_ENDPOINT`, `PHOENIX_PROJECT_NAME`, and -``PHOENIX_CLIENT_HEADERS` environment variables. +`PHOENIX_CLIENT_HEADERS` environment variables. # Examples @@ -11,12 +11,62 @@ The `phoenix.otel` module provides a high-level `register` function to configure tracing by setting a global `TracerProvider`. The register function can also configure headers and whether or not to process spans one by one or by batch. + +## Quickstart + +``` +from phoenix.otel import register +tracer_provider = register() +``` + +This is all you need to get started using OTel with Phoenix! `register` defaults to sending spans +to an endpoint at `http://localhost` using gRPC. + +### Configuring the collector endpoint + +There are two ways to configure the collector endpoint: + - Using environment variables + - Using the `endpoint` keyword argument + +#### Using environment variables + +If you're setting the `PHOENIX_COLLECTOR_ENDPOINT` environment variable, `register` will +automatically try to send spans to your Phoenix server using gRPC. + ``` +# export PHOENIX_COLLECTOR_ENDPOINT=https://your-phoenix.com:6006 + from phoenix.otel import register +tracer_provider = register() +``` + +#### Specifying the `endpoint` directly -tracer_provider = register(endpoint="http://localhost:6006/v1/traces", project_name="test") +When passing in the `endpoint` argument, **you must specify the fully qualified endpoint**. For +example, in order to export spans via HTTP to localhost, use Pheonix's HTTP collector endpoint: +`http://localhost:6006/v1/traces`. The gRPC endpoint is different: `http://localhost:4317`. + +``` +from phoenix.otel import register +tracer_provider = register(endpoint="http://localhost:6006") +``` + +### Additional configuration + +`register` can be configured with different keyword arguments: +- `project_name`: The Phoenix project name (or `PHOENIX_PROJECT_NAME` env. var) +- `headers`: Headers to send along with each span payload (or `PHOENIX_CLIENT_HEADERS` env. var) +- `batch`: Whether or not to process spans in batch + +``` +from phoenix.otel import register +tracer_provider = register( + project_name="otel-test", headers={"Authorization": "Bearer TOKEN"}, batch=True +) ``` +## A drop-in replacement for OTel primitives + For more granular tracing configuration, these wrappers can be used as drop-in replacements for OTel primitives: @@ -26,28 +76,20 @@ from phoenix.otel import HTTPSpanExporter, TracerProvider, SimpleSpanProcessor tracer_provider = TracerProvider() span_exporter = HTTPSpanExporter(endpoint="http://localhost:6006/v1/traces") -span_processor = SimpleSpanProcessor(exporter=span_exporter) +span_processor = SimpleSpanProcessor(span_exporter=span_exporter) tracer_provider.add_span_processor(span_processor) trace_api.set_tracer_provider(tracer_provider) ``` -Wrappers have Phoenix-aware defaults to greatly simplify the OTel configuration process. +Wrappers have Phoenix-aware defaults to greatly simplify the OTel configuration process. A special +`endpoint` keyword argument can be passed to either a `TracerProvider`, `SimpleSpanProcessor` or +`BatchSpanProcessor` in order to automatically infer which `SpanExporter` to use to simplify setup. -``` -# export PHOENIX_COLLECTOR_ENDPOINT=http://localhost:6006/v1/traces -from opentelemetry import trace as trace_api -from phoenix.otel import TracerProvider +### Using environment variables -tracer_provider = TracerProvider() -trace_api.set_tracer_provider(tracer_provider) ``` +# export PHOENIX_COLLECTOR_ENDPOINT=http://localhost:6006 -Phoenix supports sending traces via either an HTTP or gRPC protocol, if possible, the exporter -will be inferred from the endpoint URL. In the following example, tracing is configured to -export traces via the gRPC protocol based on the `PHOENIX_COLLECTOR_ENDPOINT` URL. - -``` -# export PHOENIX_COLLECTOR_ENDPOINT=http://localhost:4317 from opentelemetry import trace as trace_api from phoenix.otel import TracerProvider @@ -55,21 +97,25 @@ tracer_provider = TracerProvider() trace_api.set_tracer_provider(tracer_provider) ``` -The collector endpoint can be passed directly to the tracer provider. +#### Specifying the `endpoint` directly ``` from opentelemetry import trace as trace_api from phoenix.otel import TracerProvider -tracer_provider = TracerProvider(endpoint="http://localhost:6006/v1/traces") +tracer_provider = TracerProvider(endpoint="http://localhost:4317") trace_api.set_tracer_provider(tracer_provider) ``` +### Further examples + Users can gradually add OTel components as desired: -## Adding resources +## Configuring resources + ``` -# export PHOENIX_COLLECTOR_ENDPOINT=http://localhost:4317 +# export PHOENIX_COLLECTOR_ENDPOINT=http://localhost:6006 + from opentelemetry import trace as trace_api from phoenix.otel import Resource, PROJECT_NAME, TracerProvider @@ -78,8 +124,10 @@ trace_api.set_tracer_provider(tracer_provider) ``` ## Using a BatchSpanProcessor + ``` -# export PHOENIX_COLLECTOR_ENDPOINT=http://localhost:4317 +# export PHOENIX_COLLECTOR_ENDPOINT=http://localhost:6006 + from opentelemetry import trace as trace_api from phoenix.otel import TracerProvider, BatchSpanProcessor @@ -87,3 +135,16 @@ tracer_provider = TracerProvider() batch_processor = BatchSpanProcessor() tracer_provider.add_span_processor(batch_processor) ``` + +## Specifying a custom GRPC endpoint + +``` +from opentelemetry import trace as trace_api +from phoenix.otel import TracerProvider, BatchSpanProcessor, GRPCSpanExporter + +tracer_provider = TracerProvider() +batch_processor = BatchSpanProcessor( + span_exporter=GRPCSpanExporter(endpoint="http://custom-endpoint.com") +) +tracer_provider.add_span_processor(batch_processor) +``` diff --git a/packages/phoenix-otel/src/phoenix/otel/__init__.py b/packages/phoenix-otel/src/phoenix/otel/__init__.py index fbc7d2f117..f68b70e1ec 100644 --- a/packages/phoenix-otel/src/phoenix/otel/__init__.py +++ b/packages/phoenix-otel/src/phoenix/otel/__init__.py @@ -18,5 +18,5 @@ "GRPCSpanExporter", "Resource", "PROJECT_NAME", - register, + "register", ] diff --git a/packages/phoenix-otel/src/phoenix/otel/otel.py b/packages/phoenix-otel/src/phoenix/otel/otel.py index 280725db07..900074bb54 100644 --- a/packages/phoenix-otel/src/phoenix/otel/otel.py +++ b/packages/phoenix-otel/src/phoenix/otel/otel.py @@ -1,6 +1,7 @@ import inspect +import os import warnings -from typing import Any, Dict, Optional +from typing import Any, Dict, List, Optional, Tuple, Union, cast from urllib.parse import ParseResult, urlparse from openinference.semconv.resource import ResourceAttributes as _ResourceAttributes @@ -22,17 +23,20 @@ PROJECT_NAME = _ResourceAttributes.PROJECT_NAME +_DEFAULT_GRPC_PORT = 4317 + def register( *, endpoint: Optional[str] = None, project_name: Optional[str] = None, batch: bool = False, - set_global=True, - headers=None, + set_global_tracer: bool = True, + headers: Optional[Dict[str, str]] = None, + verbose: bool = True, ) -> _TracerProvider: """ - Globally sets an OpenTelemetry TracerProvider for enabling OpenInference tracing. + Creates an OpenTelemetry TracerProvider for enabling OpenInference tracing. For futher configuration, the `phoenix.otel` module provides drop-in replacements for OpenTelemetry TracerProvider, SimpleSpanProcessor, BatchSpanProcessor, HTTPSpanExporter, and @@ -47,28 +51,43 @@ def register( not provided, the `PHOENIX_PROJECT_NAME` environment variable will be used. batch (bool): If True, spans will be processed using a BatchSpanprocessor. If False, spans will be processed one at a time using a SimpleSpanProcessor. - set_global (bool): If False, the TracerProvider will not be set as the global provider. - Defaults to True. + set_global_tracer (bool): If False, the TracerProvider will not be set as the global + tracer provider. Defaults to True. headers (dict, optional): Optional headers to include in the HTTP request to the collector. + verbose (bool): If True, configuration details will be printed to stdout. """ project_name = project_name or get_env_project_name() resource = Resource.create({PROJECT_NAME: project_name}) - tracer_provider = _TracerProvider(resource=resource) + tracer_provider = TracerProvider(resource=resource, verbose=False) span_processor: SpanProcessor if batch: span_processor = BatchSpanProcessor(endpoint=endpoint, headers=headers) else: span_processor = SimpleSpanProcessor(endpoint=endpoint, headers=headers) tracer_provider.add_span_processor(span_processor) + tracer_provider._default_processor = True - if set_global: + if set_global_tracer: trace_api.set_tracer_provider(tracer_provider) + global_provider_msg = ( + "| \n" + "| `register` has set this TracerProvider as the global OpenTelemetry default.\n" + "| To disable this behavior, call `register` with `set_global_tracer=False`.\n" + ) + else: + global_provider_msg = "" + + details = tracer_provider._tracing_details() + if verbose: + print(f"{details}" f"{global_provider_msg}") return tracer_provider class TracerProvider(_TracerProvider): - def __init__(self, *args: Any, endpoint: Optional[str] = None, **kwargs: Any): + def __init__( + self, *args: Any, endpoint: Optional[str] = None, verbose: bool = True, **kwargs: Any + ): sig = inspect.signature(_TracerProvider) bound_args = sig.bind_partial(*args, **kwargs) bound_args.apply_defaults() @@ -78,80 +97,107 @@ def __init__(self, *args: Any, endpoint: Optional[str] = None, **kwargs: Any): ) super().__init__(**bound_args.arguments) - endpoint = endpoint or get_env_collector_endpoint() - parsed_url = urlparse(endpoint) - assert isinstance(parsed_url, ParseResult) + parsed_url, endpoint = _normalized_endpoint(endpoint) self._default_processor = False if _maybe_http_endpoint(parsed_url): - print("Exporting spans via HTTP.") http_exporter: SpanExporter = HTTPSpanExporter(endpoint=endpoint) - self.add_span_processor(SimpleSpanProcessor(exporter=http_exporter)) + self.add_span_processor(SimpleSpanProcessor(span_exporter=http_exporter)) self._default_processor = True elif _maybe_grpc_endpoint(parsed_url): - print("Exporting spans via GRPC.") grpc_exporter: SpanExporter = GRPCSpanExporter(endpoint=endpoint) - self.add_span_processor(SimpleSpanProcessor(exporter=grpc_exporter)) + self.add_span_processor(SimpleSpanProcessor(span_exporter=grpc_exporter)) self._default_processor = True - else: - warnings.warn( - "Could not infer exporter to use. Use `add_span_processor` to configure span " - "processing and export." - ) + if verbose: + print(self._tracing_details()) def add_span_processor(self, *args: Any, **kwargs: Any) -> None: if self._default_processor: - print("Overriding default span processor.") self._active_span_processor.shutdown() self._active_span_processor._span_processors = tuple() # remove default processors self._default_processor = False return super().add_span_processor(*args, **kwargs) + def _tracing_details(self) -> str: + project = self.resource.attributes.get(PROJECT_NAME) + processor_name: Optional[str] = None + endpoint: Optional[str] = None + transport: Optional[str] = None + headers: Optional[Union[Dict[str, str], str]] = None + + if self._active_span_processor: + if processors := self._active_span_processor._span_processors: + if len(processors) == 1: + span_processor = self._active_span_processor._span_processors[0] + if exporter := getattr(span_processor, "span_exporter"): + processor_name = span_processor.__class__.__name__ + endpoint = exporter._endpoint + transport = _exporter_transport(exporter) + headers = _printable_headers(exporter._headers) + else: + processor_name = "Multiple Span Processors" + endpoint = "Multiple Span Exporters" + transport = "Multiple Span Exporters" + headers = "Multiple Span Exporters" + + if os.name == "nt": + details_header = "OpenTelemetry Tracing Details" + else: + details_header = "🔭 OpenTelemetry Tracing Details 🔭" + + configuration_msg = ( + "| Using a default SpanProcessor. `add_span_processor` will overwrite this default.\n" + ) + + details_msg = ( + f"{details_header}\n" + f"| Phoenix Project: {project}\n" + f"| Span Processor: {processor_name}\n" + f"| Collector Endpoint: {endpoint}\n" + f"| Transport: {transport}\n" + f"| Transport Headers: {headers}\n" + "| \n" + f"{configuration_msg if self._default_processor else ''}" + ) + return details_msg + class SimpleSpanProcessor(_SimpleSpanProcessor): def __init__( self, - exporter: Optional[SpanExporter] = None, + span_exporter: Optional[SpanExporter] = None, endpoint: Optional[str] = None, headers: Optional[Dict[str, str]] = None, ): - if exporter is None: - endpoint = endpoint or get_env_collector_endpoint() - parsed_url = urlparse(endpoint) - assert isinstance(parsed_url, ParseResult) + if span_exporter is None: + parsed_url, endpoint = _normalized_endpoint(endpoint) if _maybe_http_endpoint(parsed_url): - print("Exporting spans via HTTP.") - exporter = HTTPSpanExporter(endpoint=endpoint, headers=headers) + span_exporter = HTTPSpanExporter(endpoint=endpoint, headers=headers) elif _maybe_grpc_endpoint(parsed_url): - print("Exporting spans via GRPC.") - exporter = GRPCSpanExporter(endpoint=endpoint, headers=headers) + span_exporter = GRPCSpanExporter(endpoint=endpoint, headers=headers) else: warnings.warn("Could not infer collector endpoint protocol, defaulting to HTTP.") - exporter = HTTPSpanExporter(endpoint=endpoint, headers=headers) - super().__init__(exporter) + span_exporter = HTTPSpanExporter(endpoint=endpoint, headers=headers) + super().__init__(span_exporter) class BatchSpanProcessor(_BatchSpanProcessor): def __init__( self, - exporter: Optional[SpanExporter] = None, + span_exporter: Optional[SpanExporter] = None, endpoint: Optional[str] = None, headers: Optional[Dict[str, str]] = None, ): - if exporter is None: - endpoint = endpoint or get_env_collector_endpoint() - parsed_url = urlparse(endpoint) - assert isinstance(parsed_url, ParseResult) + if span_exporter is None: + parsed_url, endpoint = _normalized_endpoint(endpoint) if _maybe_http_endpoint(parsed_url): - print("Exporting spans via HTTP.") - exporter = HTTPSpanExporter(endpoint=endpoint, headers=headers) + span_exporter = HTTPSpanExporter(endpoint=endpoint, headers=headers) elif _maybe_grpc_endpoint(parsed_url): - print("Exporting spans via GRPC.") - exporter = GRPCSpanExporter(endpoint=endpoint, headers=headers) + span_exporter = GRPCSpanExporter(endpoint=endpoint, headers=headers) else: warnings.warn("Could not infer collector endpoint protocol, defaulting to HTTP.") - exporter = HTTPSpanExporter(endpoint=endpoint, headers=headers) - super().__init__(exporter) + span_exporter = HTTPSpanExporter(endpoint=endpoint, headers=headers) + super().__init__(span_exporter) class HTTPSpanExporter(_HTTPSpanExporter): @@ -164,7 +210,8 @@ def __init__(self, *args: Any, **kwargs: Any): bound_args.arguments["headers"] = get_env_client_headers() if bound_args.arguments.get("endpoint") is None: - bound_args.arguments["endpoint"] = get_env_collector_endpoint() + _, endpoint = _normalized_endpoint(None) + bound_args.arguments["endpoint"] = endpoint super().__init__(**bound_args.arguments) @@ -178,7 +225,8 @@ def __init__(self, *args: Any, **kwargs: Any): bound_args.arguments["headers"] = get_env_client_headers() if bound_args.arguments.get("endpoint") is None: - bound_args.arguments["endpoint"] = get_env_collector_endpoint() + _, endpoint = _normalized_endpoint(None) + bound_args.arguments["endpoint"] = endpoint super().__init__(*args, **kwargs) @@ -192,3 +240,45 @@ def _maybe_grpc_endpoint(parsed_endpoint: ParseResult) -> bool: if not parsed_endpoint.path and parsed_endpoint.port == 4317: return True return False + + +def _exporter_transport(exporter: SpanExporter) -> str: + if isinstance(exporter, _HTTPSpanExporter): + return "HTTP" + if isinstance(exporter, _GRPCSpanExporter): + return "gRPC" + else: + return exporter.__class__.__name__ + + +def _printable_headers(headers: Union[List[Tuple[str, str]], Dict[str, str]]) -> Dict[str, str]: + if isinstance(headers, dict): + return {key.lower(): "****" for key, _ in headers.items()} + return {key.lower(): "****" for key, _ in headers} + + +def _construct_http_endpoint(parsed_endpoint: ParseResult) -> ParseResult: + return parsed_endpoint._replace(path="/v1/traces") + + +def _construct_grpc_endpoint(parsed_endpoint: ParseResult) -> ParseResult: + return parsed_endpoint._replace(netloc=f"{parsed_endpoint.hostname}:{_DEFAULT_GRPC_PORT}") + + +_KNOWN_PROVIDERS = { + "app.phoenix.arize.com": _construct_http_endpoint, +} + + +def _normalized_endpoint(endpoint: Optional[str]) -> Tuple[ParseResult, str]: + if endpoint is None: + base_endpoint = get_env_collector_endpoint() or "http://localhost:6006" + parsed = urlparse(base_endpoint) + if parsed.hostname in _KNOWN_PROVIDERS: + parsed = _KNOWN_PROVIDERS[parsed.hostname](parsed) + else: + parsed = _construct_grpc_endpoint(parsed) + else: + parsed = urlparse(endpoint) + parsed = cast(ParseResult, parsed) + return parsed, parsed.geturl()