diff --git a/src/phoenix/server/api/routers/evaluation_handler.py b/src/phoenix/server/api/routers/evaluation_handler.py deleted file mode 100644 index 0c73d33889..0000000000 --- a/src/phoenix/server/api/routers/evaluation_handler.py +++ /dev/null @@ -1,115 +0,0 @@ -import asyncio -import gzip -from typing import AsyncIterator - -import pyarrow as pa -from google.protobuf.message import DecodeError -from starlette.background import BackgroundTask -from starlette.datastructures import State -from starlette.endpoints import HTTPEndpoint -from starlette.requests import Request -from starlette.responses import Response, StreamingResponse -from starlette.status import ( - HTTP_404_NOT_FOUND, - HTTP_415_UNSUPPORTED_MEDIA_TYPE, - HTTP_422_UNPROCESSABLE_ENTITY, -) - -import phoenix.trace.v1 as pb -from phoenix.config import DEFAULT_PROJECT_NAME -from phoenix.core.traces import Traces -from phoenix.server.api.routers.utils import table_to_bytes -from phoenix.session.evaluation import encode_evaluations -from phoenix.trace.span_evaluations import Evaluations - - -class EvaluationHandler(HTTPEndpoint): - traces: Traces - - async def post(self, request: Request) -> Response: - content_type = request.headers.get("content-type") - project_name = request.headers.get("project-name", DEFAULT_PROJECT_NAME) - if content_type == "application/x-pandas-arrow": - return await self._process_pyarrow(request, project_name) - if content_type != "application/x-protobuf": - return Response( - content="Unsupported content type", - status_code=HTTP_415_UNSUPPORTED_MEDIA_TYPE, - ) - body = await request.body() - content_encoding = request.headers.get("content-encoding") - if content_encoding == "gzip": - body = gzip.decompress(body) - elif content_encoding: - return Response( - content="Unsupported content encoding", - status_code=HTTP_415_UNSUPPORTED_MEDIA_TYPE, - ) - evaluation = pb.Evaluation() - try: - evaluation.ParseFromString(body) - except DecodeError: - return Response( - content="Request body is invalid", - status_code=HTTP_422_UNPROCESSABLE_ENTITY, - ) - self.traces.put(evaluation, project_name=project_name) - return Response() - - async def get(self, request: Request) -> Response: - payload = await request.json() - project_name = payload.pop("project_name", None) or DEFAULT_PROJECT_NAME - project = self.traces.get_project(project_name) - if not project: - return Response(status_code=HTTP_404_NOT_FOUND) - loop = asyncio.get_running_loop() - results = await loop.run_in_executor( - None, - project.export_evaluations, - ) - if not results: - return Response(status_code=HTTP_404_NOT_FOUND) - - async def content() -> AsyncIterator[bytes]: - for result in results: - yield await loop.run_in_executor( - None, - lambda: table_to_bytes(result.to_pyarrow_table()), - ) - - return StreamingResponse( - content=content(), - media_type="application/x-pandas-arrow", - ) - - async def _process_pyarrow(self, request: Request, project_name: str) -> Response: - body = await request.body() - try: - reader = pa.ipc.open_stream(body) - except pa.ArrowInvalid: - return Response( - content="Request body is not valid pyarrow", - status_code=HTTP_422_UNPROCESSABLE_ENTITY, - ) - try: - evaluations = Evaluations.from_pyarrow_reader(reader) - except Exception: - return Response( - content="Invalid data in request body", - status_code=HTTP_422_UNPROCESSABLE_ENTITY, - ) - return Response( - background=BackgroundTask( - self._add_evaluations, - request.state, - evaluations, - project_name, - ) - ) - - async def _add_evaluations( - self, state: State, evaluations: Evaluations, project_name: str - ) -> None: - for evaluation in encode_evaluations(evaluations): - state.queue_evaluation_for_bulk_insert(evaluation) - self.traces.put(evaluation, project_name=project_name) diff --git a/src/phoenix/server/api/routers/span_handler.py b/src/phoenix/server/api/routers/span_handler.py deleted file mode 100644 index b1069b7ae0..0000000000 --- a/src/phoenix/server/api/routers/span_handler.py +++ /dev/null @@ -1,70 +0,0 @@ -import asyncio -from functools import partial -from typing import AsyncIterator - -from starlette.endpoints import HTTPEndpoint -from starlette.requests import Request -from starlette.responses import Response, StreamingResponse -from starlette.status import HTTP_404_NOT_FOUND, HTTP_422_UNPROCESSABLE_ENTITY - -from phoenix.config import DEFAULT_PROJECT_NAME -from phoenix.core.traces import Traces -from phoenix.server.api.routers.utils import df_to_bytes, from_iso_format -from phoenix.trace.dsl import SpanQuery -from phoenix.utilities import query_spans - - -class SpanHandler(HTTPEndpoint): - traces: Traces - - async def get(self, request: Request) -> Response: - payload = await request.json() - queries = payload.pop("queries", []) - project_name = payload.pop("project_name", None) or DEFAULT_PROJECT_NAME - if not (project := self.traces.get_project(project_name)): - return Response(status_code=HTTP_404_NOT_FOUND) - loop = asyncio.get_running_loop() - valid_eval_names = ( - await loop.run_in_executor( - None, - project.get_span_evaluation_names, - ) - if project - else () - ) - try: - span_queries = [ - SpanQuery.from_dict( - query, - evals=project, - valid_eval_names=valid_eval_names, - ) - for query in queries - ] - except Exception as e: - return Response( - status_code=HTTP_422_UNPROCESSABLE_ENTITY, - content=f"Invalid query: {e}", - ) - results = await loop.run_in_executor( - None, - partial( - query_spans, - project, - *span_queries, - start_time=from_iso_format(payload.get("start_time")), - stop_time=from_iso_format(payload.get("stop_time")), - root_spans_only=payload.get("root_spans_only"), - ), - ) - if not results: - return Response(status_code=HTTP_404_NOT_FOUND) - - async def content() -> AsyncIterator[bytes]: - for result in results: - yield df_to_bytes(result) - - return StreamingResponse( - content=content(), - media_type="application/x-pandas-arrow", - ) diff --git a/src/phoenix/server/api/routers/trace_handler.py b/src/phoenix/server/api/routers/trace_handler.py deleted file mode 100644 index c50efcae96..0000000000 --- a/src/phoenix/server/api/routers/trace_handler.py +++ /dev/null @@ -1,68 +0,0 @@ -import asyncio -import gzip -import zlib -from typing import Optional - -from google.protobuf.message import DecodeError -from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import ( - ExportTraceServiceRequest, -) -from opentelemetry.proto.trace.v1.trace_pb2 import TracesData -from starlette.endpoints import HTTPEndpoint -from starlette.requests import Request -from starlette.responses import Response -from starlette.status import HTTP_415_UNSUPPORTED_MEDIA_TYPE, HTTP_422_UNPROCESSABLE_ENTITY - -from phoenix.core.traces import Traces -from phoenix.storage.span_store import SpanStore -from phoenix.trace.otel import decode -from phoenix.utilities.project import get_project_name - - -class TraceHandler(HTTPEndpoint): - traces: Traces - store: Optional[SpanStore] - - async def post(self, request: Request) -> Response: - content_type = request.headers.get("content-type") - if content_type != "application/x-protobuf": - return Response( - content=f"Unsupported content type: {content_type}", - status_code=HTTP_415_UNSUPPORTED_MEDIA_TYPE, - ) - content_encoding = request.headers.get("content-encoding") - if content_encoding and content_encoding not in ("gzip", "deflate"): - return Response( - content=f"Unsupported content encoding: {content_encoding}", - status_code=HTTP_415_UNSUPPORTED_MEDIA_TYPE, - ) - body = await request.body() - if content_encoding == "gzip": - body = gzip.decompress(body) - elif content_encoding == "deflate": - body = zlib.decompress(body) - req = ExportTraceServiceRequest() - try: - req.ParseFromString(body) - except DecodeError: - return Response( - content="Request body is invalid ExportTraceServiceRequest", - status_code=HTTP_422_UNPROCESSABLE_ENTITY, - ) - if self.store: - self.store.save(TracesData(resource_spans=req.resource_spans)) - for resource_spans in req.resource_spans: - project_name = get_project_name(resource_spans.resource.attributes) - for scope_span in resource_spans.scope_spans: - for otlp_span in scope_span.spans: - span = decode(otlp_span) - # TODO(persistence): Decide which one is better: delayed - # bulk-insert or insert each request immediately, i.e. one - # transaction per request. The bulk-insert is more efficient, - # but it queues data in volatile (buffer) memory (for a short - # period of time), so the 200 response is not a genuine - # confirmation of data persistence. - request.state.queue_span_for_bulk_insert(span, project_name) - self.traces.put(span, project_name=project_name) - await asyncio.sleep(0) - return Response() diff --git a/src/phoenix/server/api/routers/v1/__init__.py b/src/phoenix/server/api/routers/v1/__init__.py new file mode 100644 index 0000000000..a3f111f633 --- /dev/null +++ b/src/phoenix/server/api/routers/v1/__init__.py @@ -0,0 +1,11 @@ +from starlette.routing import Route + +from . import evaluations, spans, traces + +V1_ROUTES = [ + Route("/v1/evaluations", evaluations.post_evaluations, methods=["POST"]), + Route("/v1/evaluations", evaluations.get_evaluations, methods=["GET"]), + Route("/v1/traces", traces.post_traces, methods=["POST"]), + Route("/v1/spans", spans.query_spans_handler, methods=["POST"]), + Route("/v1/spans", spans.get_spans_handler, methods=["GET"]), +] diff --git a/src/phoenix/server/api/routers/v1/evaluations.py b/src/phoenix/server/api/routers/v1/evaluations.py new file mode 100644 index 0000000000..ef951ba413 --- /dev/null +++ b/src/phoenix/server/api/routers/v1/evaluations.py @@ -0,0 +1,160 @@ +import asyncio +import gzip +from typing import AsyncIterator + +import pyarrow as pa +from google.protobuf.message import DecodeError +from starlette.background import BackgroundTask +from starlette.datastructures import State +from starlette.requests import Request +from starlette.responses import Response, StreamingResponse +from starlette.status import ( + HTTP_403_FORBIDDEN, + HTTP_404_NOT_FOUND, + HTTP_415_UNSUPPORTED_MEDIA_TYPE, + HTTP_422_UNPROCESSABLE_ENTITY, +) + +import phoenix.trace.v1 as pb +from phoenix.config import DEFAULT_PROJECT_NAME +from phoenix.core.traces import Traces +from phoenix.server.api.routers.utils import table_to_bytes +from phoenix.session.evaluation import encode_evaluations +from phoenix.trace.span_evaluations import Evaluations + + +async def post_evaluations(request: Request) -> Response: + """ + summary: Add evaluations to a span, trace, or document + operationId: addEvaluations + tags: + - evaluations + parameters: + - name: project-name + in: query + schema: + type: string + description: The project name to add the evaluation to + default: default + requestBody: + required: true + content: + application/x-protobuf: + schema: + type: string + format: binary + application/x-pandas-arrow: + schema: + type: string + format: binary + responses: + 200: + description: Success + 403: + description: Forbidden + 415: + description: Unsupported content type, only gzipped protobuf and pandas-arrow are supported + 422: + description: Request body is invalid + """ + if request.app.state.read_only: + return Response(status_code=HTTP_403_FORBIDDEN) + traces: Traces = request.app.state.traces + content_type = request.headers.get("content-type") + project_name = ( + request.query_params.get("project-name") + # read from headers for backwards compatibility + or request.headers.get("project-name") + or DEFAULT_PROJECT_NAME + ) + if content_type == "application/x-pandas-arrow": + return await _process_pyarrow(request, project_name, traces) + if content_type != "application/x-protobuf": + return Response("Unsupported content type", status_code=HTTP_415_UNSUPPORTED_MEDIA_TYPE) + body = await request.body() + content_encoding = request.headers.get("content-encoding") + if content_encoding == "gzip": + body = gzip.decompress(body) + elif content_encoding: + return Response("Unsupported content encoding", status_code=HTTP_415_UNSUPPORTED_MEDIA_TYPE) + evaluation = pb.Evaluation() + try: + evaluation.ParseFromString(body) + except DecodeError: + return Response("Request body is invalid", status_code=HTTP_422_UNPROCESSABLE_ENTITY) + traces.put(evaluation, project_name=project_name) + return Response() + + +async def get_evaluations(request: Request) -> Response: + """ + summary: Get evaluations from Phoenix + operationId: getEvaluation + tags: + - evaluations + parameters: + - name: project-name + in: query + schema: + type: string + description: The project name to get evaluations from + default: default + responses: + 200: + description: Success + 404: + description: Not found + """ + traces: Traces = request.app.state.traces + project_name = ( + request.query_params.get("project-name") + # read from headers for backwards compatibility + or request.headers.get("project-name") + or DEFAULT_PROJECT_NAME + ) + project = traces.get_project(project_name) + if not project: + return Response(status_code=HTTP_404_NOT_FOUND) + loop = asyncio.get_running_loop() + results = await loop.run_in_executor(None, project.export_evaluations) + if not results: + return Response(status_code=HTTP_404_NOT_FOUND) + + async def content() -> AsyncIterator[bytes]: + for result in results: + yield await loop.run_in_executor( + None, lambda: table_to_bytes(result.to_pyarrow_table()) + ) + + return StreamingResponse(content=content(), media_type="application/x-pandas-arrow") + + +async def _process_pyarrow(request: Request, project_name: str, traces: Traces) -> Response: + body = await request.body() + try: + reader = pa.ipc.open_stream(body) + except pa.ArrowInvalid: + return Response( + content="Request body is not valid pyarrow", + status_code=HTTP_422_UNPROCESSABLE_ENTITY, + ) + try: + evaluations = Evaluations.from_pyarrow_reader(reader) + except Exception: + return Response( + content="Invalid data in request body", + status_code=HTTP_422_UNPROCESSABLE_ENTITY, + ) + return Response( + background=BackgroundTask( + _add_evaluations, request.state, evaluations, project_name, traces + ) + ) + + +async def _add_evaluations( + state: State, evaluations: Evaluations, project_name: str, traces: Traces +) -> None: + for evaluation in encode_evaluations(evaluations): + state.queue_evaluation_for_bulk_insert(evaluation) + traces.put(evaluation, project_name=project_name) diff --git a/src/phoenix/server/api/routers/v1/spans.py b/src/phoenix/server/api/routers/v1/spans.py new file mode 100644 index 0000000000..403fb6f4a3 --- /dev/null +++ b/src/phoenix/server/api/routers/v1/spans.py @@ -0,0 +1,184 @@ +import asyncio +from functools import partial +from typing import AsyncIterator + +from starlette.requests import Request +from starlette.responses import Response, StreamingResponse +from starlette.status import HTTP_404_NOT_FOUND, HTTP_422_UNPROCESSABLE_ENTITY + +from phoenix.config import DEFAULT_PROJECT_NAME +from phoenix.core.traces import Traces +from phoenix.server.api.routers.utils import df_to_bytes, from_iso_format +from phoenix.trace.dsl import SpanQuery +from phoenix.utilities import query_spans + + +# TODO: Add property details to SpanQuery schema +async def query_spans_handler(request: Request) -> Response: + """ + summary: Query spans using query DSL + operationId: querySpans + tags: + - spans + parameters: + - name: project-name + in: query + schema: + type: string + description: The project name to get evaluations from + default: default + requestBody: + required: true + content: + application/json: + schema: + type: object + properties: + queries: + type: array + items: + type: object + properties: + select: + type: object + filter: + type: object + explode: + type: object + concat: + type: object + rename: + type: object + index: + type: object + start_time: + type: string + format: date-time + stop_time: + type: string + format: date-time + root_spans_only: + type: boolean + responses: + 200: + description: Success + 404: + description: Not found + 422: + description: Request body is invalid + """ + traces: Traces = request.app.state.traces + payload = await request.json() + queries = payload.pop("queries", []) + project_name = ( + request.query_params.get("project-name") + # read from headers for backwards compatibility + or request.headers.get("project-name") + or DEFAULT_PROJECT_NAME + ) + if not (project := traces.get_project(project_name)): + return Response(status_code=HTTP_404_NOT_FOUND) + loop = asyncio.get_running_loop() + valid_eval_names = ( + await loop.run_in_executor( + None, + project.get_span_evaluation_names, + ) + if project + else () + ) + try: + span_queries = [ + SpanQuery.from_dict( + query, + evals=project, + valid_eval_names=valid_eval_names, + ) + for query in queries + ] + except Exception as e: + return Response( + status_code=HTTP_422_UNPROCESSABLE_ENTITY, + content=f"Invalid query: {e}", + ) + results = await loop.run_in_executor( + None, + partial( + query_spans, + project, + *span_queries, + start_time=from_iso_format(payload.get("start_time")), + stop_time=from_iso_format(payload.get("stop_time")), + root_spans_only=payload.get("root_spans_only"), + ), + ) + if not results: + return Response(status_code=HTTP_404_NOT_FOUND) + + async def content() -> AsyncIterator[bytes]: + for result in results: + yield df_to_bytes(result) + + return StreamingResponse( + content=content(), + media_type="application/x-pandas-arrow", + ) + + +async def get_spans_handler(request: Request) -> Response: + """ + summary: Deprecated route for querying for spans, use the POST method instead + operationId: legacyQuerySpans + deprecated: true + """ + traces: Traces = request.app.state.traces + payload = await request.json() + queries = payload.pop("queries", []) + project_name = request.query_params.get("project_name", DEFAULT_PROJECT_NAME) + if not (project := traces.get_project(project_name)): + return Response(status_code=HTTP_404_NOT_FOUND) + loop = asyncio.get_running_loop() + valid_eval_names = ( + await loop.run_in_executor( + None, + project.get_span_evaluation_names, + ) + if project + else () + ) + try: + span_queries = [ + SpanQuery.from_dict( + query, + evals=project, + valid_eval_names=valid_eval_names, + ) + for query in queries + ] + except Exception as e: + return Response( + status_code=HTTP_422_UNPROCESSABLE_ENTITY, + content=f"Invalid query: {e}", + ) + results = await loop.run_in_executor( + None, + partial( + query_spans, + project, + *span_queries, + start_time=from_iso_format(payload.get("start_time")), + stop_time=from_iso_format(payload.get("stop_time")), + root_spans_only=payload.get("root_spans_only"), + ), + ) + if not results: + return Response(status_code=HTTP_404_NOT_FOUND) + + async def content() -> AsyncIterator[bytes]: + for result in results: + yield df_to_bytes(result) + + return StreamingResponse( + content=content(), + media_type="application/x-pandas-arrow", + ) diff --git a/src/phoenix/server/api/routers/v1/traces.py b/src/phoenix/server/api/routers/v1/traces.py new file mode 100644 index 0000000000..5a4dd814d0 --- /dev/null +++ b/src/phoenix/server/api/routers/v1/traces.py @@ -0,0 +1,90 @@ +import gzip +import zlib + +from google.protobuf.message import DecodeError +from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import ( + ExportTraceServiceRequest, +) +from opentelemetry.proto.trace.v1.trace_pb2 import TracesData +from starlette.requests import Request +from starlette.responses import Response +from starlette.status import ( + HTTP_403_FORBIDDEN, + HTTP_415_UNSUPPORTED_MEDIA_TYPE, + HTTP_422_UNPROCESSABLE_ENTITY, +) + +from phoenix.core.traces import Traces +from phoenix.storage.span_store import SpanStore +from phoenix.trace.otel import decode +from phoenix.utilities.project import get_project_name + + +async def post_traces(request: Request) -> Response: + """ + summary: Send traces to Phoenix + operationId: addTraces + tags: + - traces + requestBody: + required: true + content: + application/x-protobuf: + schema: + type: string + format: binary + responses: + 200: + description: Success + 403: + description: Forbidden + 415: + description: Unsupported content type, only gzipped protobuf + 422: + description: Request body is invalid + """ + if request.app.state.read_only: + return Response(status_code=HTTP_403_FORBIDDEN) + traces: Traces = request.app.state.traces + store: SpanStore = request.app.state.store + content_type = request.headers.get("content-type") + if content_type != "application/x-protobuf": + return Response( + content=f"Unsupported content type: {content_type}", + status_code=HTTP_415_UNSUPPORTED_MEDIA_TYPE, + ) + content_encoding = request.headers.get("content-encoding") + if content_encoding and content_encoding not in ("gzip", "deflate"): + return Response( + content=f"Unsupported content encoding: {content_encoding}", + status_code=HTTP_415_UNSUPPORTED_MEDIA_TYPE, + ) + body = await request.body() + if content_encoding == "gzip": + body = gzip.decompress(body) + elif content_encoding == "deflate": + body = zlib.decompress(body) + req = ExportTraceServiceRequest() + try: + req.ParseFromString(body) + except DecodeError: + return Response( + content="Request body is invalid ExportTraceServiceRequest", + status_code=HTTP_422_UNPROCESSABLE_ENTITY, + ) + if store: + store.save(TracesData(resource_spans=req.resource_spans)) + for resource_spans in req.resource_spans: + project_name = get_project_name(resource_spans.resource.attributes) + for scope_span in resource_spans.scope_spans: + for otlp_span in scope_span.spans: + span = decode(otlp_span) + # TODO(persistence): Decide which one is better: delayed + # bulk-insert or insert each request immediately, i.e. one + # transaction per request. The bulk-insert is more efficient, + # but it queues data in volatile (buffer) memory (for a short + # period of time), so the 200 response is not a genuine + # confirmation of data persistence. + request.state.queue_span_for_bulk_insert(span, project_name) + traces.put(span, project_name=project_name) + return Response() diff --git a/src/phoenix/server/app.py b/src/phoenix/server/app.py index 2a125b57e1..f97d683f37 100644 --- a/src/phoenix/server/app.py +++ b/src/phoenix/server/app.py @@ -28,6 +28,7 @@ from starlette.requests import Request from starlette.responses import FileResponse, PlainTextResponse, Response from starlette.routing import Mount, Route +from starlette.schemas import SchemaGenerator from starlette.staticfiles import StaticFiles from starlette.templating import Jinja2Templates from starlette.types import Scope, StatefulLifespan @@ -49,9 +50,7 @@ LatencyMsQuantileDataLoader, SpanEvaluationsDataLoader, ) -from phoenix.server.api.routers.evaluation_handler import EvaluationHandler -from phoenix.server.api.routers.span_handler import SpanHandler -from phoenix.server.api.routers.trace_handler import TraceHandler +from phoenix.server.api.routers.v1 import V1_ROUTES from phoenix.server.api.schema import schema from phoenix.storage.span_store import SpanStore from phoenix.trace.schemas import Span @@ -60,6 +59,10 @@ templates = Jinja2Templates(directory=SERVER_DIR / "templates") +schemas = SchemaGenerator( + {"openapi": "3.0.0", "info": {"title": "ArizePhoenix API", "version": "1.0"}} +) + class AppConfig(NamedTuple): has_inferences: bool @@ -211,6 +214,10 @@ async def check_healthz(_: Request) -> PlainTextResponse: return PlainTextResponse("OK") +async def openapi_schema(request: Request) -> Response: + return schemas.OpenAPIResponse(request=request) + + def create_app( database: str, export_path: Path, @@ -251,32 +258,17 @@ def create_app( prometheus_middlewares = [Middleware(PrometheusMiddleware)] else: prometheus_middlewares = [] - return Starlette( + + app = Starlette( lifespan=_lifespan(db, initial_batch_of_spans, initial_batch_of_evaluations), middleware=[ Middleware(HeadersMiddleware), *prometheus_middlewares, ], debug=debug, - routes=( - [] - if traces is None or read_only - else [ - Route( - "/v1/spans", - type("SpanEndpoint", (SpanHandler,), {"traces": traces}), - ), - Route( - "/v1/traces", - type("TraceEndpoint", (TraceHandler,), {"traces": traces, "store": span_store}), - ), - Route( - "/v1/evaluations", - type("EvaluationEndpoint", (EvaluationHandler,), {"traces": traces}), - ), - ] - ) + routes=([] if traces is None else V1_ROUTES) + [ + Route("/schema", endpoint=openapi_schema, include_in_schema=False), Route("/arize_phoenix_version", version), Route("/healthz", check_healthz), Route( @@ -307,3 +299,7 @@ def create_app( ), ], ) + app.state.traces = traces + app.state.store = span_store + app.state.read_only = read_only + return app diff --git a/src/phoenix/session/client.py b/src/phoenix/session/client.py index a401e56d34..3565580dbe 100644 --- a/src/phoenix/session/client.py +++ b/src/phoenix/session/client.py @@ -88,14 +88,14 @@ def query_spans( root_spans_only=root_spans_only, project_name=project_name, ) - response = self._session.get( + response = self._session.post( url=urljoin(self._base_url, "/v1/spans"), + params={"project_name": project_name}, json={ "queries": [q.to_dict() for q in queries], "start_time": _to_iso_format(start_time), "stop_time": _to_iso_format(stop_time), "root_spans_only": root_spans_only, - "project_name": project_name, }, ) if response.status_code == 404: @@ -138,7 +138,7 @@ def get_evaluations( return session.get_evaluations(project_name=project_name) response = self._session.get( urljoin(self._base_url, "/v1/evaluations"), - json={"project_name": project_name}, + params={"project_name": project_name}, ) if response.status_code == 404: logger.info("No evaluations found.") @@ -183,13 +183,13 @@ def log_evaluations(self, *evals: Evaluations, project_name: Optional[str] = Non table = evaluation.to_pyarrow_table() sink = pa.BufferOutputStream() headers = {"content-type": "application/x-pandas-arrow"} - if project_name: - headers["project-name"] = project_name + params = {"project-name": project_name} with pa.ipc.new_stream(sink, table.schema) as writer: writer.write_table(table) self._session.post( urljoin(self._base_url, "/v1/evaluations"), data=cast(bytes, sink.getvalue().to_pybytes()), + params=params, headers=headers, ).raise_for_status() diff --git a/tests/session/test_client.py b/tests/session/test_client.py index 65bf56294e..5481f2afa1 100644 --- a/tests/session/test_client.py +++ b/tests/session/test_client.py @@ -15,11 +15,11 @@ def test_get_spans_dataframe(client: Client, endpoint: str, dataframe: pd.DataFrame): url = urljoin(endpoint, "v1/spans") - responses.get(url, body=_df_to_bytes(dataframe)) + responses.post(url, body=_df_to_bytes(dataframe)) df = client.get_spans_dataframe() assert_frame_equal(df, dataframe) - responses.get(url, status=404) + responses.post(url, status=404) assert client.get_spans_dataframe() is None @@ -28,20 +28,20 @@ def test_query_spans(client: Client, endpoint: str, dataframe: pd.DataFrame): df0, df1 = dataframe.iloc[:1, :], dataframe.iloc[1:, :] url = urljoin(endpoint, "v1/spans") - responses.get(url, body=b"".join([_df_to_bytes(df0), _df_to_bytes(df1)])) + responses.post(url, body=b"".join([_df_to_bytes(df0), _df_to_bytes(df1)])) query = SpanQuery() dfs = client.query_spans(query, query) assert len(dfs) == 2 assert_frame_equal(dfs[0], df0) assert_frame_equal(dfs[1], df1) - responses.get(url, status=404) + responses.post(url, status=404) assert client.query_spans(query) is None - responses.get(url, body=_df_to_bytes(df0)) + responses.post(url, body=_df_to_bytes(df0)) assert_frame_equal(client.query_spans(query), df0) - responses.get(url, body=_df_to_bytes(df1)) + responses.post(url, body=_df_to_bytes(df1)) assert_frame_equal(client.query_spans(), df1)