From dc792860778002b3e9bab3cf7f42b8cc30de0dbd Mon Sep 17 00:00:00 2001 From: Dustin Ngo Date: Wed, 10 Apr 2024 11:48:18 -0400 Subject: [PATCH 1/2] Begin refactoring API Flesh out V2 API Lint and clean up type annotations --- src/phoenix/server/api/routers/v1/__init__.py | 23 +++ .../routers/{ => v1}/evaluation_handler.py | 0 .../api/routers/{ => v1}/span_handler.py | 0 .../api/routers/{ => v1}/trace_handler.py | 0 src/phoenix/server/api/routers/v2/__init__.py | 10 ++ .../server/api/routers/v2/evaluations.py | 142 ++++++++++++++++++ src/phoenix/server/api/routers/v2/spans.py | 85 +++++++++++ src/phoenix/server/api/routers/v2/traces.py | 84 +++++++++++ src/phoenix/server/app.py | 40 +++-- 9 files changed, 362 insertions(+), 22 deletions(-) create mode 100644 src/phoenix/server/api/routers/v1/__init__.py rename src/phoenix/server/api/routers/{ => v1}/evaluation_handler.py (100%) rename src/phoenix/server/api/routers/{ => v1}/span_handler.py (100%) rename src/phoenix/server/api/routers/{ => v1}/trace_handler.py (100%) create mode 100644 src/phoenix/server/api/routers/v2/__init__.py create mode 100644 src/phoenix/server/api/routers/v2/evaluations.py create mode 100644 src/phoenix/server/api/routers/v2/spans.py create mode 100644 src/phoenix/server/api/routers/v2/traces.py diff --git a/src/phoenix/server/api/routers/v1/__init__.py b/src/phoenix/server/api/routers/v1/__init__.py new file mode 100644 index 0000000000..d11ff203fe --- /dev/null +++ b/src/phoenix/server/api/routers/v1/__init__.py @@ -0,0 +1,23 @@ +from typing import List, Optional + +from starlette.routing import Route + +from phoenix.core.traces import Traces +from phoenix.storage.span_store import SpanStore + +from .evaluation_handler import EvaluationHandler +from .span_handler import SpanHandler +from .trace_handler import TraceHandler + + +def v1_routes(traces: Traces, span_store: Optional[SpanStore]) -> List[Route]: + return [ + Route("/v1/spans", type("SpanEndpoint", (SpanHandler,), {"traces": traces})), + Route( + "/v1/traces", + type("TraceEndpoint", (TraceHandler,), {"traces": traces, "store": span_store}), + ), + Route( + "/v1/evaluations", type("EvaluationEndpoint", (EvaluationHandler,), {"traces": traces}) + ), + ] diff --git a/src/phoenix/server/api/routers/evaluation_handler.py b/src/phoenix/server/api/routers/v1/evaluation_handler.py similarity index 100% rename from src/phoenix/server/api/routers/evaluation_handler.py rename to src/phoenix/server/api/routers/v1/evaluation_handler.py diff --git a/src/phoenix/server/api/routers/span_handler.py b/src/phoenix/server/api/routers/v1/span_handler.py similarity index 100% rename from src/phoenix/server/api/routers/span_handler.py rename to src/phoenix/server/api/routers/v1/span_handler.py diff --git a/src/phoenix/server/api/routers/trace_handler.py b/src/phoenix/server/api/routers/v1/trace_handler.py similarity index 100% rename from src/phoenix/server/api/routers/trace_handler.py rename to src/phoenix/server/api/routers/v1/trace_handler.py diff --git a/src/phoenix/server/api/routers/v2/__init__.py b/src/phoenix/server/api/routers/v2/__init__.py new file mode 100644 index 0000000000..1cb3c859d0 --- /dev/null +++ b/src/phoenix/server/api/routers/v2/__init__.py @@ -0,0 +1,10 @@ +from starlette.routing import Route + +from . import evaluations, spans, traces + +V2_ROUTES = [ + Route("/v2/evaluations", evaluations.post_evaluation, methods=["POST"]), + Route("/v2/evaluations", evaluations.get_evaluations, methods=["GET"]), + Route("/v2/traces", traces.post_traces, methods=["POST"]), + Route("/v2/spans", spans.get_spans, methods=["GET"]), +] diff --git a/src/phoenix/server/api/routers/v2/evaluations.py b/src/phoenix/server/api/routers/v2/evaluations.py new file mode 100644 index 0000000000..76c8227c1e --- /dev/null +++ b/src/phoenix/server/api/routers/v2/evaluations.py @@ -0,0 +1,142 @@ +import asyncio +import gzip +from typing import AsyncIterator + +import pyarrow as pa +from google.protobuf.message import DecodeError +from starlette.background import BackgroundTask +from starlette.requests import Request +from starlette.responses import Response, StreamingResponse +from starlette.status import ( + HTTP_403_FORBIDDEN, + HTTP_404_NOT_FOUND, + HTTP_415_UNSUPPORTED_MEDIA_TYPE, + HTTP_422_UNPROCESSABLE_ENTITY, +) + +import phoenix.trace.v1 as pb +from phoenix.config import DEFAULT_PROJECT_NAME +from phoenix.core.traces import Traces +from phoenix.server.api.routers.utils import table_to_bytes +from phoenix.session.evaluation import encode_evaluations +from phoenix.trace.span_evaluations import Evaluations + + +async def post_evaluation(request: Request) -> Response: + """ + summary: Add an evaluation to Phoenix + operationId: addEvaluation + tags: + - evaluations + parameters: + - name: project_name + in: query + schema: + type: string + description: The project name to add the evaluation to + required: false + requestBody: + required: true + content: + application/x-protobuf: + schema: + type: string + format: binary + application/x-pandas-arrow: + schema: + type: string + format: binary + responses: + 200: + description: Success + 403: + description: Forbidden + 415: + description: Unsupported content type, only gzipped protobuf and pandas-arrow are supported + 422: + description: Request body is invalid + """ + if request.app.state.read_only: + return Response(status_code=HTTP_403_FORBIDDEN) + traces: Traces = request.app.state.traces + content_type = request.headers.get("content-type") + project_name = request.query_params.get("project_name", DEFAULT_PROJECT_NAME) + if content_type == "application/x-pandas-arrow": + return await _process_pyarrow(request, project_name, traces) + if content_type != "application/x-protobuf": + return Response("Unsupported content type", status_code=HTTP_415_UNSUPPORTED_MEDIA_TYPE) + body = await request.body() + content_encoding = request.headers.get("content-encoding") + if content_encoding == "gzip": + body = gzip.decompress(body) + elif content_encoding: + return Response("Unsupported content encoding", status_code=HTTP_415_UNSUPPORTED_MEDIA_TYPE) + evaluation = pb.Evaluation() + try: + evaluation.ParseFromString(body) + except DecodeError: + return Response("Request body is invalid", status_code=HTTP_422_UNPROCESSABLE_ENTITY) + traces.put(evaluation, project_name=project_name) + return Response() + + +async def get_evaluations(request: Request) -> Response: + """ + summary: Get evaluations from Phoenix + operationId: getEvaluation + tags: + - evaluations + parameters: + - name: project_name + in: query + schema: + type: string + description: The project name to get evaluations from + required: false + responses: + 200: + description: Success + 404: + description: Not found + """ + traces: Traces = request.app.state.traces + project_name = request.query_params.get("project_name", DEFAULT_PROJECT_NAME) + project = traces.get_project(project_name) + if not project: + return Response(status_code=HTTP_404_NOT_FOUND) + loop = asyncio.get_running_loop() + results = await loop.run_in_executor(None, project.export_evaluations) + if not results: + return Response(status_code=HTTP_404_NOT_FOUND) + + async def content() -> AsyncIterator[bytes]: + for result in results: + yield await loop.run_in_executor( + None, lambda: table_to_bytes(result.to_pyarrow_table()) + ) + + return StreamingResponse(content=content(), media_type="application/x-pandas-arrow") + + +async def _process_pyarrow(request: Request, project_name: str, traces: Traces) -> Response: + body = await request.body() + try: + reader = pa.ipc.open_stream(body) + except pa.ArrowInvalid: + return Response( + content="Request body is not valid pyarrow", + status_code=HTTP_422_UNPROCESSABLE_ENTITY, + ) + try: + evaluations = Evaluations.from_pyarrow_reader(reader) + except Exception: + return Response( + content="Invalid data in request body", + status_code=HTTP_422_UNPROCESSABLE_ENTITY, + ) + return Response(background=BackgroundTask(_add_evaluations, evaluations, project_name, traces)) + + +async def _add_evaluations(evaluations: Evaluations, project_name: str, traces: Traces) -> None: + for evaluation in encode_evaluations(evaluations): + traces.put(evaluation, project_name=project_name) diff --git a/src/phoenix/server/api/routers/v2/spans.py b/src/phoenix/server/api/routers/v2/spans.py new file mode 100644 index 0000000000..d654b98b7b --- /dev/null +++ b/src/phoenix/server/api/routers/v2/spans.py @@ -0,0 +1,85 @@ +import asyncio +from functools import partial +from typing import AsyncIterator + +from starlette.requests import Request +from starlette.responses import Response, StreamingResponse +from starlette.status import HTTP_404_NOT_FOUND, HTTP_422_UNPROCESSABLE_ENTITY + +from phoenix.config import DEFAULT_PROJECT_NAME +from phoenix.core.traces import Traces +from phoenix.server.api.routers.utils import df_to_bytes, from_iso_format +from phoenix.trace.dsl import SpanQuery +from phoenix.utilities import query_spans + + +async def get_spans(request: Request) -> Response: + """ + summary: Get gets from Phoenix + operationId: getSpans + tags: + - spans + parameters: + - name: project_name + in: query + schema: + type: string + description: The project name to get evaluations from + required: false + responses: + 200: + description: Success + 404: + description: Not found + """ + traces: Traces = request.app.state.traces + payload = await request.json() + queries = payload.pop("queries", []) + project_name = request.query_params.get("project_name", DEFAULT_PROJECT_NAME) + if not (project := traces.get_project(project_name)): + return Response(status_code=HTTP_404_NOT_FOUND) + loop = asyncio.get_running_loop() + valid_eval_names = ( + await loop.run_in_executor( + None, + project.get_span_evaluation_names, + ) + if project + else () + ) + try: + span_queries = [ + SpanQuery.from_dict( + query, + evals=project, + valid_eval_names=valid_eval_names, + ) + for query in queries + ] + except Exception as e: + return Response( + status_code=HTTP_422_UNPROCESSABLE_ENTITY, + content=f"Invalid query: {e}", + ) + results = await loop.run_in_executor( + None, + partial( + query_spans, + project, + *span_queries, + start_time=from_iso_format(payload.get("start_time")), + stop_time=from_iso_format(payload.get("stop_time")), + root_spans_only=payload.get("root_spans_only"), + ), + ) + if not results: + return Response(status_code=HTTP_404_NOT_FOUND) + + async def content() -> AsyncIterator[bytes]: + for result in results: + yield df_to_bytes(result) + + return StreamingResponse( + content=content(), + media_type="application/x-pandas-arrow", + ) diff --git a/src/phoenix/server/api/routers/v2/traces.py b/src/phoenix/server/api/routers/v2/traces.py new file mode 100644 index 0000000000..a8164fc726 --- /dev/null +++ b/src/phoenix/server/api/routers/v2/traces.py @@ -0,0 +1,84 @@ +import asyncio +import gzip +import zlib + +from google.protobuf.message import DecodeError +from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import ( + ExportTraceServiceRequest, +) +from opentelemetry.proto.trace.v1.trace_pb2 import TracesData +from starlette.requests import Request +from starlette.responses import Response +from starlette.status import ( + HTTP_403_FORBIDDEN, + HTTP_415_UNSUPPORTED_MEDIA_TYPE, + HTTP_422_UNPROCESSABLE_ENTITY, +) + +from phoenix.core.traces import Traces +from phoenix.storage.span_store import SpanStore +from phoenix.trace.otel import decode +from phoenix.utilities.project import get_project_name + + +async def post_traces(request: Request) -> Response: + """ + summary: Send traces to Phoenix + operationId: addTraces + tags: + - traces + requestBody: + required: true + content: + application/x-protobuf: + schema: + type: string + format: binary + responses: + 200: + description: Success + 403: + description: Forbidden + 415: + description: Unsupported content type, only gzipped protobuf + 422: + description: Request body is invalid + """ + if request.app.state.read_only: + return Response(status_code=HTTP_403_FORBIDDEN) + traces: Traces = request.app.state.traces + store: SpanStore = request.app.state.store + content_type = request.headers.get("content-type") + if content_type != "application/x-protobuf": + return Response( + content=f"Unsupported content type: {content_type}", + status_code=HTTP_415_UNSUPPORTED_MEDIA_TYPE, + ) + content_encoding = request.headers.get("content-encoding") + if content_encoding and content_encoding not in ("gzip", "deflate"): + return Response( + content=f"Unsupported content encoding: {content_encoding}", + status_code=HTTP_415_UNSUPPORTED_MEDIA_TYPE, + ) + body = await request.body() + if content_encoding == "gzip": + body = gzip.decompress(body) + elif content_encoding == "deflate": + body = zlib.decompress(body) + req = ExportTraceServiceRequest() + try: + req.ParseFromString(body) + except DecodeError: + return Response( + content="Request body is invalid ExportTraceServiceRequest", + status_code=HTTP_422_UNPROCESSABLE_ENTITY, + ) + if store: + store.save(TracesData(resource_spans=req.resource_spans)) + for resource_spans in req.resource_spans: + project_name = get_project_name(resource_spans.resource.attributes) + for scope_span in resource_spans.scope_spans: + for span in scope_span.spans: + traces.put(decode(span), project_name=project_name) + await asyncio.sleep(0) + return Response() diff --git a/src/phoenix/server/app.py b/src/phoenix/server/app.py index 2a125b57e1..03e6eec072 100644 --- a/src/phoenix/server/app.py +++ b/src/phoenix/server/app.py @@ -28,6 +28,7 @@ from starlette.requests import Request from starlette.responses import FileResponse, PlainTextResponse, Response from starlette.routing import Mount, Route +from starlette.schemas import SchemaGenerator from starlette.staticfiles import StaticFiles from starlette.templating import Jinja2Templates from starlette.types import Scope, StatefulLifespan @@ -49,9 +50,8 @@ LatencyMsQuantileDataLoader, SpanEvaluationsDataLoader, ) -from phoenix.server.api.routers.evaluation_handler import EvaluationHandler -from phoenix.server.api.routers.span_handler import SpanHandler -from phoenix.server.api.routers.trace_handler import TraceHandler +from phoenix.server.api.routers.v1 import v1_routes +from phoenix.server.api.routers.v2 import V2_ROUTES from phoenix.server.api.schema import schema from phoenix.storage.span_store import SpanStore from phoenix.trace.schemas import Span @@ -60,6 +60,8 @@ templates = Jinja2Templates(directory=SERVER_DIR / "templates") +schemas = SchemaGenerator({"openapi": "3.0.0", "info": {"title": "Example API", "version": "1.0"}}) + class AppConfig(NamedTuple): has_inferences: bool @@ -211,6 +213,10 @@ async def check_healthz(_: Request) -> PlainTextResponse: return PlainTextResponse("OK") +async def openapi_schema(request: Request) -> Response: + return schemas.OpenAPIResponse(request=request) + + def create_app( database: str, export_path: Path, @@ -251,32 +257,18 @@ def create_app( prometheus_middlewares = [Middleware(PrometheusMiddleware)] else: prometheus_middlewares = [] - return Starlette( + + app = Starlette( lifespan=_lifespan(db, initial_batch_of_spans, initial_batch_of_evaluations), middleware=[ Middleware(HeadersMiddleware), *prometheus_middlewares, ], debug=debug, - routes=( - [] - if traces is None or read_only - else [ - Route( - "/v1/spans", - type("SpanEndpoint", (SpanHandler,), {"traces": traces}), - ), - Route( - "/v1/traces", - type("TraceEndpoint", (TraceHandler,), {"traces": traces, "store": span_store}), - ), - Route( - "/v1/evaluations", - type("EvaluationEndpoint", (EvaluationHandler,), {"traces": traces}), - ), - ] - ) + routes=([] if traces is None or read_only else v1_routes(traces, span_store)) + + V2_ROUTES + [ + Route("/schema", endpoint=openapi_schema, include_in_schema=False), Route("/arize_phoenix_version", version), Route("/healthz", check_healthz), Route( @@ -307,3 +299,7 @@ def create_app( ), ], ) + app.state.traces = traces + app.state.store = span_store + app.state.read_only = read_only + return app From 85676998c3ccf3b5eafe42c2c621031b219c6240 Mon Sep 17 00:00:00 2001 From: Dustin Ngo Date: Thu, 11 Apr 2024 18:38:34 -0400 Subject: [PATCH 2/2] Update V2 routes for persistence Remove busywait Swap out v2 routes for v1 routes! Update Client API usage Fill out get_spans body schema Change reading spans from GET to POST Update tests for new `/spans` route method Update more tests for new `/spans` route method --- src/phoenix/server/api/routers/v1/__init__.py | 27 ++-- .../api/routers/v1/evaluation_handler.py | 115 ------------------ .../api/routers/{v2 => v1}/evaluations.py | 12 +- .../server/api/routers/v1/span_handler.py | 70 ----------- .../server/api/routers/{v2 => v1}/spans.py | 39 +++++- .../server/api/routers/v1/trace_handler.py | 68 ----------- .../server/api/routers/{v2 => v1}/traces.py | 14 ++- src/phoenix/server/api/routers/v2/__init__.py | 10 -- src/phoenix/server/app.py | 6 +- src/phoenix/session/client.py | 10 +- tests/session/test_client.py | 12 +- 11 files changed, 77 insertions(+), 306 deletions(-) delete mode 100644 src/phoenix/server/api/routers/v1/evaluation_handler.py rename src/phoenix/server/api/routers/{v2 => v1}/evaluations.py (92%) delete mode 100644 src/phoenix/server/api/routers/v1/span_handler.py rename src/phoenix/server/api/routers/{v2 => v1}/spans.py (69%) delete mode 100644 src/phoenix/server/api/routers/v1/trace_handler.py rename src/phoenix/server/api/routers/{v2 => v1}/traces.py (80%) delete mode 100644 src/phoenix/server/api/routers/v2/__init__.py diff --git a/src/phoenix/server/api/routers/v1/__init__.py b/src/phoenix/server/api/routers/v1/__init__.py index d11ff203fe..35d65f5591 100644 --- a/src/phoenix/server/api/routers/v1/__init__.py +++ b/src/phoenix/server/api/routers/v1/__init__.py @@ -1,23 +1,10 @@ -from typing import List, Optional - from starlette.routing import Route -from phoenix.core.traces import Traces -from phoenix.storage.span_store import SpanStore - -from .evaluation_handler import EvaluationHandler -from .span_handler import SpanHandler -from .trace_handler import TraceHandler - +from . import evaluations, spans, traces -def v1_routes(traces: Traces, span_store: Optional[SpanStore]) -> List[Route]: - return [ - Route("/v1/spans", type("SpanEndpoint", (SpanHandler,), {"traces": traces})), - Route( - "/v1/traces", - type("TraceEndpoint", (TraceHandler,), {"traces": traces, "store": span_store}), - ), - Route( - "/v1/evaluations", type("EvaluationEndpoint", (EvaluationHandler,), {"traces": traces}) - ), - ] +V1_ROUTES = [ + Route("/v1/evaluations", evaluations.post_evaluation, methods=["POST"]), + Route("/v1/evaluations", evaluations.get_evaluations, methods=["GET"]), + Route("/v1/traces", traces.post_traces, methods=["POST"]), + Route("/v1/spans", spans.read_spans, methods=["POST"]), +] diff --git a/src/phoenix/server/api/routers/v1/evaluation_handler.py b/src/phoenix/server/api/routers/v1/evaluation_handler.py deleted file mode 100644 index 0c73d33889..0000000000 --- a/src/phoenix/server/api/routers/v1/evaluation_handler.py +++ /dev/null @@ -1,115 +0,0 @@ -import asyncio -import gzip -from typing import AsyncIterator - -import pyarrow as pa -from google.protobuf.message import DecodeError -from starlette.background import BackgroundTask -from starlette.datastructures import State -from starlette.endpoints import HTTPEndpoint -from starlette.requests import Request -from starlette.responses import Response, StreamingResponse -from starlette.status import ( - HTTP_404_NOT_FOUND, - HTTP_415_UNSUPPORTED_MEDIA_TYPE, - HTTP_422_UNPROCESSABLE_ENTITY, -) - -import phoenix.trace.v1 as pb -from phoenix.config import DEFAULT_PROJECT_NAME -from phoenix.core.traces import Traces -from phoenix.server.api.routers.utils import table_to_bytes -from phoenix.session.evaluation import encode_evaluations -from phoenix.trace.span_evaluations import Evaluations - - -class EvaluationHandler(HTTPEndpoint): - traces: Traces - - async def post(self, request: Request) -> Response: - content_type = request.headers.get("content-type") - project_name = request.headers.get("project-name", DEFAULT_PROJECT_NAME) - if content_type == "application/x-pandas-arrow": - return await self._process_pyarrow(request, project_name) - if content_type != "application/x-protobuf": - return Response( - content="Unsupported content type", - status_code=HTTP_415_UNSUPPORTED_MEDIA_TYPE, - ) - body = await request.body() - content_encoding = request.headers.get("content-encoding") - if content_encoding == "gzip": - body = gzip.decompress(body) - elif content_encoding: - return Response( - content="Unsupported content encoding", - status_code=HTTP_415_UNSUPPORTED_MEDIA_TYPE, - ) - evaluation = pb.Evaluation() - try: - evaluation.ParseFromString(body) - except DecodeError: - return Response( - content="Request body is invalid", - status_code=HTTP_422_UNPROCESSABLE_ENTITY, - ) - self.traces.put(evaluation, project_name=project_name) - return Response() - - async def get(self, request: Request) -> Response: - payload = await request.json() - project_name = payload.pop("project_name", None) or DEFAULT_PROJECT_NAME - project = self.traces.get_project(project_name) - if not project: - return Response(status_code=HTTP_404_NOT_FOUND) - loop = asyncio.get_running_loop() - results = await loop.run_in_executor( - None, - project.export_evaluations, - ) - if not results: - return Response(status_code=HTTP_404_NOT_FOUND) - - async def content() -> AsyncIterator[bytes]: - for result in results: - yield await loop.run_in_executor( - None, - lambda: table_to_bytes(result.to_pyarrow_table()), - ) - - return StreamingResponse( - content=content(), - media_type="application/x-pandas-arrow", - ) - - async def _process_pyarrow(self, request: Request, project_name: str) -> Response: - body = await request.body() - try: - reader = pa.ipc.open_stream(body) - except pa.ArrowInvalid: - return Response( - content="Request body is not valid pyarrow", - status_code=HTTP_422_UNPROCESSABLE_ENTITY, - ) - try: - evaluations = Evaluations.from_pyarrow_reader(reader) - except Exception: - return Response( - content="Invalid data in request body", - status_code=HTTP_422_UNPROCESSABLE_ENTITY, - ) - return Response( - background=BackgroundTask( - self._add_evaluations, - request.state, - evaluations, - project_name, - ) - ) - - async def _add_evaluations( - self, state: State, evaluations: Evaluations, project_name: str - ) -> None: - for evaluation in encode_evaluations(evaluations): - state.queue_evaluation_for_bulk_insert(evaluation) - self.traces.put(evaluation, project_name=project_name) diff --git a/src/phoenix/server/api/routers/v2/evaluations.py b/src/phoenix/server/api/routers/v1/evaluations.py similarity index 92% rename from src/phoenix/server/api/routers/v2/evaluations.py rename to src/phoenix/server/api/routers/v1/evaluations.py index 76c8227c1e..7833703eb5 100644 --- a/src/phoenix/server/api/routers/v2/evaluations.py +++ b/src/phoenix/server/api/routers/v1/evaluations.py @@ -5,6 +5,7 @@ import pyarrow as pa from google.protobuf.message import DecodeError from starlette.background import BackgroundTask +from starlette.datastructures import State from starlette.requests import Request from starlette.responses import Response, StreamingResponse from starlette.status import ( @@ -134,9 +135,16 @@ async def _process_pyarrow(request: Request, project_name: str, traces: Traces) content="Invalid data in request body", status_code=HTTP_422_UNPROCESSABLE_ENTITY, ) - return Response(background=BackgroundTask(_add_evaluations, evaluations, project_name, traces)) + return Response( + background=BackgroundTask( + _add_evaluations, request.state, evaluations, project_name, traces + ) + ) -async def _add_evaluations(evaluations: Evaluations, project_name: str, traces: Traces) -> None: +async def _add_evaluations( + state: State, evaluations: Evaluations, project_name: str, traces: Traces +) -> None: for evaluation in encode_evaluations(evaluations): + state.queue_evaluation_for_bulk_insert(evaluation) traces.put(evaluation, project_name=project_name) diff --git a/src/phoenix/server/api/routers/v1/span_handler.py b/src/phoenix/server/api/routers/v1/span_handler.py deleted file mode 100644 index b1069b7ae0..0000000000 --- a/src/phoenix/server/api/routers/v1/span_handler.py +++ /dev/null @@ -1,70 +0,0 @@ -import asyncio -from functools import partial -from typing import AsyncIterator - -from starlette.endpoints import HTTPEndpoint -from starlette.requests import Request -from starlette.responses import Response, StreamingResponse -from starlette.status import HTTP_404_NOT_FOUND, HTTP_422_UNPROCESSABLE_ENTITY - -from phoenix.config import DEFAULT_PROJECT_NAME -from phoenix.core.traces import Traces -from phoenix.server.api.routers.utils import df_to_bytes, from_iso_format -from phoenix.trace.dsl import SpanQuery -from phoenix.utilities import query_spans - - -class SpanHandler(HTTPEndpoint): - traces: Traces - - async def get(self, request: Request) -> Response: - payload = await request.json() - queries = payload.pop("queries", []) - project_name = payload.pop("project_name", None) or DEFAULT_PROJECT_NAME - if not (project := self.traces.get_project(project_name)): - return Response(status_code=HTTP_404_NOT_FOUND) - loop = asyncio.get_running_loop() - valid_eval_names = ( - await loop.run_in_executor( - None, - project.get_span_evaluation_names, - ) - if project - else () - ) - try: - span_queries = [ - SpanQuery.from_dict( - query, - evals=project, - valid_eval_names=valid_eval_names, - ) - for query in queries - ] - except Exception as e: - return Response( - status_code=HTTP_422_UNPROCESSABLE_ENTITY, - content=f"Invalid query: {e}", - ) - results = await loop.run_in_executor( - None, - partial( - query_spans, - project, - *span_queries, - start_time=from_iso_format(payload.get("start_time")), - stop_time=from_iso_format(payload.get("stop_time")), - root_spans_only=payload.get("root_spans_only"), - ), - ) - if not results: - return Response(status_code=HTTP_404_NOT_FOUND) - - async def content() -> AsyncIterator[bytes]: - for result in results: - yield df_to_bytes(result) - - return StreamingResponse( - content=content(), - media_type="application/x-pandas-arrow", - ) diff --git a/src/phoenix/server/api/routers/v2/spans.py b/src/phoenix/server/api/routers/v1/spans.py similarity index 69% rename from src/phoenix/server/api/routers/v2/spans.py rename to src/phoenix/server/api/routers/v1/spans.py index d654b98b7b..496c8c1613 100644 --- a/src/phoenix/server/api/routers/v2/spans.py +++ b/src/phoenix/server/api/routers/v1/spans.py @@ -13,10 +13,11 @@ from phoenix.utilities import query_spans -async def get_spans(request: Request) -> Response: +# TODO: Add property details to SpanQuery schema +async def read_spans(request: Request) -> Response: """ summary: Get gets from Phoenix - operationId: getSpans + operationId: readSpans tags: - spans parameters: @@ -26,11 +27,45 @@ async def get_spans(request: Request) -> Response: type: string description: The project name to get evaluations from required: false + requestBody: + required: true + content: + application/json: + schema: + type: object + properties: + queries: + type: array + items: + type: object + properties: + select: + type: object + filter: + type: object + explode: + type: object + concat: + type: object + rename: + type: object + index: + type: object + start_time: + type: string + format: date-time + stop_time: + type: string + format: date-time + root_spans_only: + type: boolean responses: 200: description: Success 404: description: Not found + 422: + description: Request body is invalid """ traces: Traces = request.app.state.traces payload = await request.json() diff --git a/src/phoenix/server/api/routers/v1/trace_handler.py b/src/phoenix/server/api/routers/v1/trace_handler.py deleted file mode 100644 index c50efcae96..0000000000 --- a/src/phoenix/server/api/routers/v1/trace_handler.py +++ /dev/null @@ -1,68 +0,0 @@ -import asyncio -import gzip -import zlib -from typing import Optional - -from google.protobuf.message import DecodeError -from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import ( - ExportTraceServiceRequest, -) -from opentelemetry.proto.trace.v1.trace_pb2 import TracesData -from starlette.endpoints import HTTPEndpoint -from starlette.requests import Request -from starlette.responses import Response -from starlette.status import HTTP_415_UNSUPPORTED_MEDIA_TYPE, HTTP_422_UNPROCESSABLE_ENTITY - -from phoenix.core.traces import Traces -from phoenix.storage.span_store import SpanStore -from phoenix.trace.otel import decode -from phoenix.utilities.project import get_project_name - - -class TraceHandler(HTTPEndpoint): - traces: Traces - store: Optional[SpanStore] - - async def post(self, request: Request) -> Response: - content_type = request.headers.get("content-type") - if content_type != "application/x-protobuf": - return Response( - content=f"Unsupported content type: {content_type}", - status_code=HTTP_415_UNSUPPORTED_MEDIA_TYPE, - ) - content_encoding = request.headers.get("content-encoding") - if content_encoding and content_encoding not in ("gzip", "deflate"): - return Response( - content=f"Unsupported content encoding: {content_encoding}", - status_code=HTTP_415_UNSUPPORTED_MEDIA_TYPE, - ) - body = await request.body() - if content_encoding == "gzip": - body = gzip.decompress(body) - elif content_encoding == "deflate": - body = zlib.decompress(body) - req = ExportTraceServiceRequest() - try: - req.ParseFromString(body) - except DecodeError: - return Response( - content="Request body is invalid ExportTraceServiceRequest", - status_code=HTTP_422_UNPROCESSABLE_ENTITY, - ) - if self.store: - self.store.save(TracesData(resource_spans=req.resource_spans)) - for resource_spans in req.resource_spans: - project_name = get_project_name(resource_spans.resource.attributes) - for scope_span in resource_spans.scope_spans: - for otlp_span in scope_span.spans: - span = decode(otlp_span) - # TODO(persistence): Decide which one is better: delayed - # bulk-insert or insert each request immediately, i.e. one - # transaction per request. The bulk-insert is more efficient, - # but it queues data in volatile (buffer) memory (for a short - # period of time), so the 200 response is not a genuine - # confirmation of data persistence. - request.state.queue_span_for_bulk_insert(span, project_name) - self.traces.put(span, project_name=project_name) - await asyncio.sleep(0) - return Response() diff --git a/src/phoenix/server/api/routers/v2/traces.py b/src/phoenix/server/api/routers/v1/traces.py similarity index 80% rename from src/phoenix/server/api/routers/v2/traces.py rename to src/phoenix/server/api/routers/v1/traces.py index a8164fc726..5a4dd814d0 100644 --- a/src/phoenix/server/api/routers/v2/traces.py +++ b/src/phoenix/server/api/routers/v1/traces.py @@ -1,4 +1,3 @@ -import asyncio import gzip import zlib @@ -78,7 +77,14 @@ async def post_traces(request: Request) -> Response: for resource_spans in req.resource_spans: project_name = get_project_name(resource_spans.resource.attributes) for scope_span in resource_spans.scope_spans: - for span in scope_span.spans: - traces.put(decode(span), project_name=project_name) - await asyncio.sleep(0) + for otlp_span in scope_span.spans: + span = decode(otlp_span) + # TODO(persistence): Decide which one is better: delayed + # bulk-insert or insert each request immediately, i.e. one + # transaction per request. The bulk-insert is more efficient, + # but it queues data in volatile (buffer) memory (for a short + # period of time), so the 200 response is not a genuine + # confirmation of data persistence. + request.state.queue_span_for_bulk_insert(span, project_name) + traces.put(span, project_name=project_name) return Response() diff --git a/src/phoenix/server/api/routers/v2/__init__.py b/src/phoenix/server/api/routers/v2/__init__.py deleted file mode 100644 index 1cb3c859d0..0000000000 --- a/src/phoenix/server/api/routers/v2/__init__.py +++ /dev/null @@ -1,10 +0,0 @@ -from starlette.routing import Route - -from . import evaluations, spans, traces - -V2_ROUTES = [ - Route("/v2/evaluations", evaluations.post_evaluation, methods=["POST"]), - Route("/v2/evaluations", evaluations.get_evaluations, methods=["GET"]), - Route("/v2/traces", traces.post_traces, methods=["POST"]), - Route("/v2/spans", spans.get_spans, methods=["GET"]), -] diff --git a/src/phoenix/server/app.py b/src/phoenix/server/app.py index 03e6eec072..eea94f3363 100644 --- a/src/phoenix/server/app.py +++ b/src/phoenix/server/app.py @@ -50,8 +50,7 @@ LatencyMsQuantileDataLoader, SpanEvaluationsDataLoader, ) -from phoenix.server.api.routers.v1 import v1_routes -from phoenix.server.api.routers.v2 import V2_ROUTES +from phoenix.server.api.routers.v1 import V1_ROUTES from phoenix.server.api.schema import schema from phoenix.storage.span_store import SpanStore from phoenix.trace.schemas import Span @@ -265,8 +264,7 @@ def create_app( *prometheus_middlewares, ], debug=debug, - routes=([] if traces is None or read_only else v1_routes(traces, span_store)) - + V2_ROUTES + routes=([] if traces is None else V1_ROUTES) + [ Route("/schema", endpoint=openapi_schema, include_in_schema=False), Route("/arize_phoenix_version", version), diff --git a/src/phoenix/session/client.py b/src/phoenix/session/client.py index a401e56d34..3565580dbe 100644 --- a/src/phoenix/session/client.py +++ b/src/phoenix/session/client.py @@ -88,14 +88,14 @@ def query_spans( root_spans_only=root_spans_only, project_name=project_name, ) - response = self._session.get( + response = self._session.post( url=urljoin(self._base_url, "/v1/spans"), + params={"project_name": project_name}, json={ "queries": [q.to_dict() for q in queries], "start_time": _to_iso_format(start_time), "stop_time": _to_iso_format(stop_time), "root_spans_only": root_spans_only, - "project_name": project_name, }, ) if response.status_code == 404: @@ -138,7 +138,7 @@ def get_evaluations( return session.get_evaluations(project_name=project_name) response = self._session.get( urljoin(self._base_url, "/v1/evaluations"), - json={"project_name": project_name}, + params={"project_name": project_name}, ) if response.status_code == 404: logger.info("No evaluations found.") @@ -183,13 +183,13 @@ def log_evaluations(self, *evals: Evaluations, project_name: Optional[str] = Non table = evaluation.to_pyarrow_table() sink = pa.BufferOutputStream() headers = {"content-type": "application/x-pandas-arrow"} - if project_name: - headers["project-name"] = project_name + params = {"project-name": project_name} with pa.ipc.new_stream(sink, table.schema) as writer: writer.write_table(table) self._session.post( urljoin(self._base_url, "/v1/evaluations"), data=cast(bytes, sink.getvalue().to_pybytes()), + params=params, headers=headers, ).raise_for_status() diff --git a/tests/session/test_client.py b/tests/session/test_client.py index 65bf56294e..5481f2afa1 100644 --- a/tests/session/test_client.py +++ b/tests/session/test_client.py @@ -15,11 +15,11 @@ def test_get_spans_dataframe(client: Client, endpoint: str, dataframe: pd.DataFrame): url = urljoin(endpoint, "v1/spans") - responses.get(url, body=_df_to_bytes(dataframe)) + responses.post(url, body=_df_to_bytes(dataframe)) df = client.get_spans_dataframe() assert_frame_equal(df, dataframe) - responses.get(url, status=404) + responses.post(url, status=404) assert client.get_spans_dataframe() is None @@ -28,20 +28,20 @@ def test_query_spans(client: Client, endpoint: str, dataframe: pd.DataFrame): df0, df1 = dataframe.iloc[:1, :], dataframe.iloc[1:, :] url = urljoin(endpoint, "v1/spans") - responses.get(url, body=b"".join([_df_to_bytes(df0), _df_to_bytes(df1)])) + responses.post(url, body=b"".join([_df_to_bytes(df0), _df_to_bytes(df1)])) query = SpanQuery() dfs = client.query_spans(query, query) assert len(dfs) == 2 assert_frame_equal(dfs[0], df0) assert_frame_equal(dfs[1], df1) - responses.get(url, status=404) + responses.post(url, status=404) assert client.query_spans(query) is None - responses.get(url, body=_df_to_bytes(df0)) + responses.post(url, body=_df_to_bytes(df0)) assert_frame_equal(client.query_spans(query), df0) - responses.get(url, body=_df_to_bytes(df1)) + responses.post(url, body=_df_to_bytes(df1)) assert_frame_equal(client.query_spans(), df1)