Skip to content

Commit

Permalink
feat: ingest pyarrow span evals into sqlite (#2837)
Browse files Browse the repository at this point in the history
adds sqlite support for span evals ingested via pyarrow
  • Loading branch information
axiomofjoy authored Apr 10, 2024
1 parent 48d4643 commit 3a6666c
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 6 deletions.
55 changes: 52 additions & 3 deletions src/phoenix/db/bulk_inserter.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from sqlalchemy import func, insert, select, update
from sqlalchemy.ext.asyncio import AsyncSession

import phoenix.trace.v1 as pb
from phoenix.db import models
from phoenix.trace.schemas import Span, SpanStatusCode

Expand Down Expand Up @@ -37,26 +38,34 @@ def __init__(
self._spans: List[Tuple[Span, str]] = (
[] if initial_batch_of_spans is None else list(initial_batch_of_spans)
)
self._evaluations: List[pb.Evaluation] = []
self._task: Optional[asyncio.Task[None]] = None

async def __aenter__(self) -> Callable[[Span, str], None]:
async def __aenter__(
self,
) -> Tuple[Callable[[Span, str], None], Callable[[pb.Evaluation], None]]:
self._running = True
self._task = asyncio.create_task(self._bulk_insert())
return self._queue_span
return self._queue_span, self._queue_evaluation

async def __aexit__(self, *args: Any) -> None:
self._running = False

def _queue_span(self, span: Span, project_name: str) -> None:
self._spans.append((span, project_name))

def _queue_evaluation(self, evaluation: pb.Evaluation) -> None:
self._evaluations.append(evaluation)

async def _bulk_insert(self) -> None:
next_run_at = time() + self._run_interval_seconds
while self._spans or self._running:
while self._spans or self._evaluations or self._running:
await asyncio.sleep(next_run_at - time())
next_run_at = time() + self._run_interval_seconds
if self._spans:
await self._insert_spans()
if self._evaluations:
await self._insert_evaluations()

async def _insert_spans(self) -> None:
spans = self._spans
Expand All @@ -75,6 +84,46 @@ async def _insert_spans(self) -> None:
except Exception:
logger.exception("Failed to insert spans")

async def _insert_evaluations(self) -> None:
evaluations = self._evaluations
self._evaluations = []
for i in range(0, len(evaluations), self._max_num_per_transaction):
try:
async with self._db() as session:
for evaluation in islice(evaluations, i, i + self._max_num_per_transaction):
try:
async with session.begin_nested():
await _insert_evaluation(session, evaluation)
except Exception:
logger.exception(
"Failed to insert evaluation "
f"for span_id={evaluation.SubjectId.span_id}"
)
except Exception:
logger.exception("Failed to insert evaluations")


async def _insert_evaluation(session: AsyncSession, evaluation: pb.Evaluation) -> None:
if not (
span_rowid := await session.scalar(
select(models.Span.id).where(models.Span.span_id == evaluation.subject_id.span_id)
)
):
return
await session.scalar(
insert(models.SpanAnnotation)
.values(
span_rowid=span_rowid,
name=evaluation.name,
label=evaluation.result.label.value,
score=evaluation.result.score.value,
explanation=evaluation.result.explanation.value,
metadata_={},
annotator_kind="LLM",
)
.returning(models.SpanAnnotation.id)
)


async def _insert_span(session: AsyncSession, span: Span, project_name: str) -> None:
if await session.scalar(select(1).where(models.Span.span_id == span.context.span_id)):
Expand Down
7 changes: 6 additions & 1 deletion src/phoenix/server/api/routers/evaluation_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import pyarrow as pa
from google.protobuf.message import DecodeError
from starlette.background import BackgroundTask
from starlette.datastructures import State
from starlette.endpoints import HTTPEndpoint
from starlette.requests import Request
from starlette.responses import Response, StreamingResponse
Expand Down Expand Up @@ -100,11 +101,15 @@ async def _process_pyarrow(self, request: Request, project_name: str) -> Respons
return Response(
background=BackgroundTask(
self._add_evaluations,
request.state,
evaluations,
project_name,
)
)

async def _add_evaluations(self, evaluations: Evaluations, project_name: str) -> None:
async def _add_evaluations(
self, state: State, evaluations: Evaluations, project_name: str
) -> None:
for evaluation in encode_evaluations(evaluations):
state.queue_evaluation_for_bulk_insert(evaluation)
self.traces.put(evaluation, project_name=project_name)
7 changes: 5 additions & 2 deletions src/phoenix/server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,11 @@ def _lifespan(
) -> StatefulLifespan[Starlette]:
@contextlib.asynccontextmanager
async def lifespan(_: Starlette) -> AsyncIterator[Dict[str, Any]]:
async with BulkInserter(db, initial_batch_of_spans) as queue_span:
yield {"queue_span_for_bulk_insert": queue_span}
async with BulkInserter(db, initial_batch_of_spans) as (queue_span, queue_evaluation):
yield {
"queue_span_for_bulk_insert": queue_span,
"queue_evaluation_for_bulk_insert": queue_evaluation,
}

return lifespan

Expand Down

0 comments on commit 3a6666c

Please sign in to comment.