Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: phoenix client get_spans_dataframe() and query_spans() #2151

Merged
merged 22 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ dependencies = [
"requests",
"opentelemetry-sdk",
"opentelemetry-proto",
"pyarrow",
]
dynamic = ["version"]

Expand Down
2 changes: 2 additions & 0 deletions src/phoenix/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from .datasets.dataset import Dataset
from .datasets.fixtures import ExampleDatasets, load_example
from .datasets.schema import EmbeddingColumnNames, RetrievalEmbeddingColumnNames, Schema
from .session.client import Client
from .session.evaluation import log_evaluations
from .session.session import NotebookEnvironment, Session, active_session, close_app, launch_app
from .trace.fixtures import load_example_traces
Expand Down Expand Up @@ -39,4 +40,5 @@
"TraceDataset",
"NotebookEnvironment",
"log_evaluations",
"Client",
]
14 changes: 1 addition & 13 deletions src/phoenix/core/traces.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import weakref
from collections import defaultdict
from datetime import datetime, timezone
from enum import Enum
from queue import SimpleQueue
from threading import RLock, Thread
from types import MethodType
Expand Down Expand Up @@ -32,6 +31,7 @@
ATTRIBUTE_PREFIX,
COMPUTED_PREFIX,
CONTEXT_PREFIX,
ComputedAttributes,
Span,
SpanAttributes,
SpanID,
Expand All @@ -55,18 +55,6 @@
LLM_TOKEN_COUNT_COMPLETION = ATTRIBUTE_PREFIX + semantic_conventions.LLM_TOKEN_COUNT_COMPLETION


class ComputedAttributes(Enum):
# Enum value must be string prefixed by COMPUTED_PREFIX
LATENCY_MS = (
COMPUTED_PREFIX + "latency_ms"
) # The latency (or duration) of the span in milliseconds
CUMULATIVE_LLM_TOKEN_COUNT_TOTAL = COMPUTED_PREFIX + "cumulative_token_count.total"
CUMULATIVE_LLM_TOKEN_COUNT_PROMPT = COMPUTED_PREFIX + "cumulative_token_count.prompt"
CUMULATIVE_LLM_TOKEN_COUNT_COMPLETION = COMPUTED_PREFIX + "cumulative_token_count.completion"
ERROR_COUNT = COMPUTED_PREFIX + "error_count"
CUMULATIVE_ERROR_COUNT = COMPUTED_PREFIX + "cumulative_error_count"


class ReadableSpan(ObjectProxy): # type: ignore
"""
A wrapped a protobuf Span, with access methods and ability to decode to
Expand Down
3 changes: 1 addition & 2 deletions src/phoenix/server/api/input_types/SpanSort.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@
from phoenix.core.traces import (
END_TIME,
START_TIME,
ComputedAttributes,
)
from phoenix.server.api.types.SortDir import SortDir
from phoenix.trace import semantic_conventions
from phoenix.trace.schemas import Span, SpanID
from phoenix.trace.schemas import ComputedAttributes, Span, SpanID


@strawberry.enum
Expand Down
Empty file.
110 changes: 110 additions & 0 deletions src/phoenix/server/api/routers/span_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import asyncio
import gzip
from datetime import datetime
from functools import partial
from typing import AsyncIterator, Optional, cast

import opentelemetry.proto.trace.v1.trace_pb2 as otlp
import pandas as pd
import pyarrow as pa
from starlette.endpoints import HTTPEndpoint
from starlette.requests import Request
from starlette.responses import Response, StreamingResponse
from starlette.status import HTTP_404_NOT_FOUND, HTTP_422_UNPROCESSABLE_ENTITY

from phoenix.core.evals import Evals
from phoenix.core.traces import Traces
from phoenix.trace.dsl import SpanQuery
from phoenix.trace.otel import encode
from phoenix.trace.schemas import Span
from phoenix.trace.span_json_decoder import json_to_span
from phoenix.utilities import query_spans


class SpanHandler(HTTPEndpoint):
traces: Traces
evals: Optional[Evals] = None

async def post(self, request: Request) -> Response:
try:
content_type = request.headers.get("content-type")
if content_type == "application/x-protobuf":
body = await request.body()
content_encoding = request.headers.get("content-encoding")
if content_encoding == "gzip":
body = gzip.decompress(body)
otlp_span = otlp.Span()
otlp_span.ParseFromString(body)
else:
span = json_to_span(await request.json())
assert isinstance(span, Span)
otlp_span = encode(span)
except Exception:
return Response(status_code=422)
self.traces.put(otlp_span)
return Response()

async def get(self, request: Request) -> Response:
payload = await request.json()
queries = payload.pop("queries", [])
loop = asyncio.get_running_loop()
valid_eval_names = (
await loop.run_in_executor(
None,
self.evals.get_span_evaluation_names,
)
if self.evals
else ()
)
try:
span_queries = [
SpanQuery.from_dict(
query,
evals=self.evals,
valid_eval_names=valid_eval_names,
)
for query in queries
]
except Exception as e:
return Response(
status_code=HTTP_422_UNPROCESSABLE_ENTITY,
content=f"Invalid query: {e}",
)
results = await loop.run_in_executor(
None,
partial(
query_spans,
self.traces,
*span_queries,
start_time=_from_iso_format(payload.get("start_time")),
stop_time=_from_iso_format(payload.get("stop_time")),
root_spans_only=payload.get("root_spans_only"),
),
)
if not results:
return Response(status_code=HTTP_404_NOT_FOUND)

async def content() -> AsyncIterator[bytes]:
for result in results:
yield _df_to_bytes(result)

return StreamingResponse(
content=content(),
media_type="application/x-pandas-arrow",
)


def _from_iso_format(value: Optional[str]) -> Optional[datetime]:
return datetime.fromisoformat(value) if value else None


def _df_to_bytes(df: pd.DataFrame) -> bytes:
pa_table = pa.Table.from_pandas(df)
return _table_to_bytes(pa_table)


def _table_to_bytes(table: pa.Table) -> bytes:
sink = pa.BufferOutputStream()
with pa.ipc.new_stream(sink, table.schema) as writer:
writer.write_table(table)
return cast(bytes, sink.getvalue().to_pybytes())
3 changes: 1 addition & 2 deletions src/phoenix/server/api/types/Span.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@
from strawberry.types import Info

import phoenix.trace.schemas as trace_schema
from phoenix.core.traces import ComputedAttributes
from phoenix.metrics.retrieval_metrics import RetrievalMetrics
from phoenix.server.api.context import Context
from phoenix.server.api.types.DocumentRetrievalMetrics import DocumentRetrievalMetrics
from phoenix.server.api.types.Evaluation import DocumentEvaluation, SpanEvaluation
from phoenix.server.api.types.MimeType import MimeType
from phoenix.trace.schemas import SpanID
from phoenix.trace.schemas import ComputedAttributes, SpanID
from phoenix.trace.semantic_conventions import (
EMBEDDING_EMBEDDINGS,
EMBEDDING_VECTOR,
Expand Down
12 changes: 6 additions & 6 deletions src/phoenix/server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
from phoenix.core.traces import Traces
from phoenix.pointcloud.umap_parameters import UMAPParameters
from phoenix.server.api.context import Context
from phoenix.server.api.routers.evaluation_handler import EvaluationHandler
from phoenix.server.api.routers.span_handler import SpanHandler
from phoenix.server.api.routers.trace_handler import TraceHandler
from phoenix.server.api.schema import schema
from phoenix.server.evaluation_handler import EvaluationHandler
from phoenix.server.span_handler import SpanHandler
from phoenix.server.trace_handler import TraceHandler

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -171,7 +171,7 @@ def create_app(
else [
Route(
"/v1/spans",
type("SpanEndpoint", (SpanHandler,), {"queue": traces}),
type("SpanEndpoint", (SpanHandler,), {"traces": traces, "evals": evals}),
),
Route(
"/v1/traces",
Expand All @@ -185,8 +185,8 @@ def create_app(
else [
Route(
"/v1/evaluations",
type("SpanEndpoint", (EvaluationHandler,), {"queue": evals}),
)
type("EvaluationsEndpoint", (EvaluationHandler,), {"queue": evals}),
),
]
)
+ [
Expand Down
39 changes: 0 additions & 39 deletions src/phoenix/server/span_handler.py

This file was deleted.

100 changes: 100 additions & 0 deletions src/phoenix/session/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import logging
import weakref
from datetime import datetime
from io import BytesIO
from typing import List, Optional, Union
from urllib.parse import urljoin

import pandas as pd
import pyarrow as pa
from pyarrow import ArrowInvalid
from requests import Session

import phoenix as px
from phoenix.config import get_env_host, get_env_port
from phoenix.session.data_extractor import TraceDataExtractor
from phoenix.trace.dsl import SpanQuery

logger = logging.getLogger(__name__)


class Client(TraceDataExtractor):
def __init__(
self,
endpoint: Optional[str] = None,
use_active_session_if_available: bool = True,
):
"""
Client for connecting to a Phoenix server.

Parameters
----------
endpoint : str, optional
Phoenix server endpoint, e.g. http://localhost:6006. If not provided, the
endpoint will be inferred from the environment variables.
use_active_session_if_available : bool, optional
If active session is available, use it instead of sending HTTP requests.
"""
self._use_active_session_if_available = use_active_session_if_available
self._base_url = endpoint or f"http://{get_env_host()}:{get_env_port()}"
self._session = Session()
weakref.finalize(self, self._session.close)
if not (self._use_active_session_if_available and px.active_session()):
self._warn_if_phoenix_is_not_running()

def query_spans(
self,
*queries: SpanQuery,
start_time: Optional[datetime] = None,
stop_time: Optional[datetime] = None,
root_spans_only: Optional[bool] = None,
) -> Optional[Union[pd.DataFrame, List[pd.DataFrame]]]:
if not queries:
queries = (SpanQuery(),)
if self._use_active_session_if_available and (session := px.active_session()):
return session.query_spans(
*queries,
start_time=start_time,
stop_time=stop_time,
root_spans_only=root_spans_only,
)
response = self._session.get(
url=urljoin(self._base_url, "v1/spans"),
json={
"queries": [q.to_dict() for q in queries],
"start_time": _to_iso_format(start_time),
"stop_time": _to_iso_format(stop_time),
"root_spans_only": root_spans_only,
},
)
if response.status_code == 404:
logger.info("No spans found.")
return None
elif response.status_code == 422:
raise ValueError(response.content.decode())
response.raise_for_status()
source = BytesIO(response.content)
results = []
while True:
try:
with pa.ipc.open_stream(source) as reader:
results.append(reader.read_pandas())
except ArrowInvalid:
break
if len(results) == 1:
df = results[0]
return None if df.shape == (0, 0) else df
return results

def _warn_if_phoenix_is_not_running(self) -> None:
try:
self._session.get(urljoin(self._base_url, "arize_phoenix_version")).raise_for_status()
except Exception:
logger.warning(
f"Arize Phoenix is not running on {self._base_url}. Launch Phoenix "
f"with `import phoenix as px; px.launch_app()`"
)


def _to_iso_format(value: Optional[datetime]) -> Optional[str]:
return value.isoformat() if value else None
Loading
Loading