Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add trace and document evals to GET v1/evaluations #2910

Merged
merged 9 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 102 additions & 16 deletions src/phoenix/server/api/routers/v1/evaluations.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio
import gzip
from typing import AsyncContextManager, AsyncIterator, Callable
from itertools import chain
from typing import AsyncContextManager, Callable, Iterator, Tuple

import pandas as pd
import pyarrow as pa
Expand Down Expand Up @@ -29,7 +29,12 @@
from phoenix.db import models
from phoenix.server.api.routers.utils import table_to_bytes
from phoenix.session.evaluation import encode_evaluations
from phoenix.trace.span_evaluations import Evaluations, SpanEvaluations
from phoenix.trace.span_evaluations import (
DocumentEvaluations,
Evaluations,
SpanEvaluations,
TraceEvaluations,
)

EvaluationName: TypeAlias = str

Expand Down Expand Up @@ -130,21 +135,36 @@ async def get_evaluations(request: Request) -> Response:
_read_sql_span_evaluations_into_dataframe,
project_name,
)
if span_evals_dataframe.empty:
trace_evals_dataframe = await connection.run_sync(
_read_sql_trace_evaluations_into_dataframe,
project_name,
)
document_evals_dataframe = await connection.run_sync(
_read_sql_document_evaluations_into_dataframe,
project_name,
)
if trace_evals_dataframe.empty and span_evals_dataframe.empty and not document_evals_dataframe:
return Response(status_code=HTTP_404_NOT_FOUND)

loop = asyncio.get_running_loop()

async def content() -> AsyncIterator[bytes]:
for eval_name, span_evals_dataframe_for_name in span_evals_dataframe.groupby(
"name", as_index=False
):
span_evals = SpanEvaluations(str(eval_name), span_evals_dataframe_for_name)
yield await loop.run_in_executor(
None, lambda: table_to_bytes(span_evals.to_pyarrow_table())
)

Comment on lines -136 to -145
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Starlette will ensure that this is non-blocking, even if we pass in a synchronous stream to content.

https://github.com/encode/starlette/blob/eb76cae6fdb6c1b0bfcace17d0dec946fe767f84/starlette/responses.py#L230

return StreamingResponse(content=content(), media_type="application/x-pandas-arrow")
evals = chain[Evaluations](
map(
lambda args: TraceEvaluations(*args),
_groupby_eval_name(trace_evals_dataframe),
),
map(
lambda args: SpanEvaluations(*args),
_groupby_eval_name(span_evals_dataframe),
),
map(
lambda args: DocumentEvaluations(*args),
_groupby_eval_name(document_evals_dataframe),
),
)
bytestream = map(lambda evals: table_to_bytes(evals.to_pyarrow_table()), evals)
return StreamingResponse(
content=bytestream,
media_type="application/x-pandas-arrow",
)


async def _process_pyarrow(request: Request, project_name: str, traces: Traces) -> Response:
Expand Down Expand Up @@ -178,6 +198,33 @@ async def _add_evaluations(
traces.put(evaluation, project_name=project_name)


def _read_sql_trace_evaluations_into_dataframe(
connectable: Connectable,
project_name: str,
) -> DataFrame:
"""
This function inputs a synchronous connection to pandas.read_sql since
it does not support async connections.

For more information, see:

https://stackoverflow.com/questions/70848256/how-can-i-use-pandas-read-sql-on-an-async-connection
"""
return pd.read_sql(
select(models.TraceAnnotation, models.Trace.trace_id)
.join(models.Trace)
.join(models.Project)
.where(
and_(
models.Project.name == project_name,
models.TraceAnnotation.annotator_kind == "LLM",
)
),
connectable,
index_col="trace_id",
)


def _read_sql_span_evaluations_into_dataframe(
connectable: Connectable,
project_name: str,
Expand All @@ -204,3 +251,42 @@ def _read_sql_span_evaluations_into_dataframe(
connectable,
index_col="span_id",
)


def _read_sql_document_evaluations_into_dataframe(
connectable: Connectable,
project_name: str,
) -> DataFrame:
"""
This function inputs a synchronous connection to pandas.read_sql since
it does not support async connections.

For more information, see:

https://stackoverflow.com/questions/70848256/how-can-i-use-pandas-read-sql-on-an-async-connection
"""
return pd.read_sql(
select(
models.DocumentAnnotation,
models.DocumentAnnotation.document_index.label("document_position"),
models.Span.span_id,
axiomofjoy marked this conversation as resolved.
Show resolved Hide resolved
)
.join(models.Span)
.join(models.Trace)
.join(models.Project)
.where(
and_(
models.Project.name == project_name,
models.SpanAnnotation.annotator_kind == "LLM",
)
),
connectable,
index_col=["span_id", "document_position"],
)


def _groupby_eval_name(
evals_dataframe: DataFrame,
) -> Iterator[Tuple[EvaluationName, DataFrame]]:
for eval_name, evals_dataframe_for_name in evals_dataframe.groupby("name", as_index=False):
yield str(eval_name), evals_dataframe_for_name
13 changes: 13 additions & 0 deletions src/phoenix/utilities/async_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from typing import AsyncIterable, TypeVar

GenericType = TypeVar("GenericType")


async def achain(*aiterables: AsyncIterable[GenericType]) -> AsyncIterable[GenericType]:
"""
Chains together multiple async iterables into a single async iterable. The
async analogue of itertools.chain.
"""
for aiterable in aiterables:
async for value in aiterable:
yield value
Loading