From 701a6828299d3e1022bfd304eb07b587f8b10682 Mon Sep 17 00:00:00 2001 From: Roger Yang Date: Mon, 1 Apr 2024 15:04:51 -0700 Subject: [PATCH 01/12] wip --- app/schema.graphql | 2 +- src/phoenix/core/database.py | 9 + src/phoenix/core/traces.py | 76 ++++- src/phoenix/db/__init__.py | 0 src/phoenix/db/database.py | 356 ++++++++++++++++++++++++ src/phoenix/server/api/types/Project.py | 35 ++- src/phoenix/server/api/types/Trace.py | 10 +- src/phoenix/server/main.py | 5 +- src/phoenix/session/session.py | 40 ++- src/phoenix/trace/otel.py | 4 +- src/phoenix/trace/schemas.py | 12 +- 11 files changed, 506 insertions(+), 43 deletions(-) create mode 100644 src/phoenix/core/database.py create mode 100644 src/phoenix/db/__init__.py create mode 100644 src/phoenix/db/database.py diff --git a/app/schema.graphql b/app/schema.graphql index 0236fc879f..e5ff7dbcae 100644 --- a/app/schema.graphql +++ b/app/schema.graphql @@ -543,7 +543,7 @@ type Project implements Node { endTime: DateTime recordCount(timeRange: TimeRange): Int! traceCount(timeRange: TimeRange): Int! - tokenCountTotal: Int! + tokenCountTotal(timeRange: TimeRange): Int! latencyMsP50: Float latencyMsP99: Float trace(traceId: ID!): Trace diff --git a/src/phoenix/core/database.py b/src/phoenix/core/database.py new file mode 100644 index 0000000000..174e67ba34 --- /dev/null +++ b/src/phoenix/core/database.py @@ -0,0 +1,9 @@ +from typing import Iterator, Protocol, Tuple + +from phoenix.trace.schemas import Span + + +class Database(Protocol): + def insert_span(self, span: Span, project_name: str) -> None: ... + def get_projects(self) -> Iterator[Tuple[int, str]]: ... + def get_spans(self, project_name: str) -> Iterator[Span]: ... diff --git a/src/phoenix/core/traces.py b/src/phoenix/core/traces.py index 0681e5479e..6fff53e469 100644 --- a/src/phoenix/core/traces.py +++ b/src/phoenix/core/traces.py @@ -1,9 +1,10 @@ import weakref from collections import defaultdict +from datetime import datetime from queue import SimpleQueue from threading import RLock, Thread from types import MethodType -from typing import DefaultDict, Iterator, Optional, Tuple, Union +from typing import DefaultDict, Iterator, Optional, Protocol, Tuple, Union from typing_extensions import assert_never @@ -12,16 +13,45 @@ from phoenix.core.project import ( END_OF_QUEUE, Project, + WrappedSpan, _ProjectName, ) -from phoenix.trace.schemas import Span +from phoenix.trace.schemas import ComputedAttributes, ComputedValues, Span, TraceID _SpanItem = Tuple[Span, _ProjectName] _EvalItem = Tuple[pb.Evaluation, _ProjectName] +class Database(Protocol): + def insert_span(self, span: Span, project_name: str) -> None: ... + + def trace_count( + self, + project_name: str, + start_time: Optional[datetime] = None, + stop_time: Optional[datetime] = None, + ) -> int: ... + + def span_count( + self, + project_name: str, + start_time: Optional[datetime] = None, + stop_time: Optional[datetime] = None, + ) -> int: ... + + def llm_token_count_total( + self, + project_name: str, + start_time: Optional[datetime] = None, + stop_time: Optional[datetime] = None, + ) -> int: ... + + def get_trace(self, trace_id: TraceID) -> Iterator[Tuple[Span, ComputedValues]]: ... + + class Traces: - def __init__(self) -> None: + def __init__(self, database: Database) -> None: + self._database = database self._span_queue: "SimpleQueue[Optional[_SpanItem]]" = SimpleQueue() self._eval_queue: "SimpleQueue[Optional[_EvalItem]]" = SimpleQueue() # Putting `None` as the sentinel value for queue termination. @@ -34,6 +64,45 @@ def __init__(self) -> None: ) self._start_consumers() + def trace_count( + self, + project_name: str, + start_time: Optional[datetime] = None, + stop_time: Optional[datetime] = None, + ) -> int: + return self._database.trace_count(project_name, start_time, stop_time) + + def span_count( + self, + project_name: str, + start_time: Optional[datetime] = None, + stop_time: Optional[datetime] = None, + ) -> int: + return self._database.span_count(project_name, start_time, stop_time) + + def llm_token_count_total( + self, + project_name: str, + start_time: Optional[datetime] = None, + stop_time: Optional[datetime] = None, + ) -> int: + return self._database.llm_token_count_total(project_name, start_time, stop_time) + + def get_trace(self, trace_id: TraceID) -> Iterator[WrappedSpan]: + for span, computed_values in self._database.get_trace(trace_id): + wrapped_span = WrappedSpan(span) + wrapped_span[ComputedAttributes.LATENCY_MS] = computed_values.latency_ms + wrapped_span[ComputedAttributes.CUMULATIVE_LLM_TOKEN_COUNT_PROMPT] = ( + computed_values.cumulative_llm_token_count_prompt + ) + wrapped_span[ComputedAttributes.CUMULATIVE_LLM_TOKEN_COUNT_COMPLETION] = ( + computed_values.cumulative_llm_token_count_completion + ) + wrapped_span[ComputedAttributes.CUMULATIVE_LLM_TOKEN_COUNT_TOTAL] = ( + computed_values.cumulative_llm_token_count_total + ) + yield wrapped_span + def get_project(self, project_name: str) -> Optional["Project"]: with self._lock: return self._projects.get(project_name) @@ -84,6 +153,7 @@ def _start_consumers(self) -> None: def _consume_spans(self, queue: "SimpleQueue[Optional[_SpanItem]]") -> None: while (item := queue.get()) is not END_OF_QUEUE: span, project_name = item + self._database.insert_span(span, project_name=project_name) with self._lock: project = self._projects[project_name] project.add_span(span) diff --git a/src/phoenix/db/__init__.py b/src/phoenix/db/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/phoenix/db/database.py b/src/phoenix/db/database.py new file mode 100644 index 0000000000..b5b030b107 --- /dev/null +++ b/src/phoenix/db/database.py @@ -0,0 +1,356 @@ +import json +import sqlite3 +from datetime import datetime +from pathlib import Path +from typing import Any, Iterator, Optional, Tuple, cast + +import numpy as np +from openinference.semconv.trace import SpanAttributes + +from phoenix.trace.schemas import ComputedValues, Span, SpanContext, SpanKind, SpanStatusCode + +_CONFIG = """ +PRAGMA foreign_keys = ON; +PRAGMA journal_mode = WAL2; +PRAGMA synchronous = OFF; +PRAGMA cache_size = -32000; +""" + +_INIT_DB = """ +BEGIN; +CREATE TABLE projects ( + rowid INTEGER PRIMARY KEY, + name TEXT NOT NULL UNIQUE, + description TEXT, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP +); +INSERT INTO projects(name) VALUES('default'); +CREATE TABLE traces ( + rowid INTEGER PRIMARY KEY, + trace_id TEXT UNIQUE NOT NULL, + project_rowid INTEGER NOT NULL, + session_id TEXT, + start_time TIMESTAMP NOT NULL, + end_time TIMESTAMP NOT NULL, + FOREIGN KEY(project_rowid) REFERENCES projects(rowid) +); +CREATE INDEX idx_trace_start_time ON traces(start_time); +CREATE TABLE spans ( + rowid INTEGER PRIMARY KEY, + span_id TEXT UNIQUE NOT NULL, + trace_rowid INTEGER NOT NULL, + parent_span_id TEXT, + kind TEXT NOT NULL, + name TEXT NOT NULL, + start_time TIMESTAMP NOT NULL, + end_time TIMESTAMP NOT NULL, + attributes JSON, + events JSON, + status TEXT CHECK(status IN ('UNSET','OK','ERROR')) NOT NULL DEFAULT('UNSET'), + status_message TEXT, + latency_ms REAL NOT NULL DEFAULT 0.0, + cumulative_error_count INTEGER NOT NULL DEFAULT 0, + cumulative_llm_token_count_prompt INTEGER NOT NULL DEFAULT 0, + cumulative_llm_token_count_completion INTEGER NOT NULL DEFAULT 0, + FOREIGN KEY(trace_rowid) REFERENCES traces(rowid) +); +CREATE INDEX idx_parent_span_id ON spans(parent_span_id); +PRAGMA user_version = 1; +COMMIT; +""" + + +class SqliteDatabase: + def __init__(self, database: Optional[Path] = None) -> None: + """ + :param database: The path to the database file to be opened. + """ + self.con = sqlite3.connect( + database=database or ":memory:", + uri=True, + check_same_thread=False, + ) + # self.con.set_trace_callback(print) + self.cur = self.con.cursor() + self.cur.executescript(_CONFIG) + if int(self.cur.execute("PRAGMA user_version;").fetchone()[0]) < 1: + self.cur.executescript(_INIT_DB) + + def insert_span(self, span: Span, project_name: str) -> None: + self.cur.execute("BEGIN;") + try: + if not ( + projects := self.cur.execute( + "SELECT rowid FROM projects WHERE name = ?;", + (project_name,), + ).fetchone() + ): + projects = self.cur.execute( + "INSERT INTO projects(name) VALUES(?) RETURNING rowid;", + (project_name,), + ).fetchone() + project_rowid = projects[0] + if ( + trace_row := self.cur.execute( + """ + INSERT INTO traces(trace_id, project_rowid, session_id, start_time, end_time) + VALUES(?,?,?,?,?) + ON CONFLICT DO UPDATE SET + start_time = CASE WHEN excluded.start_time < start_time THEN excluded.start_time ELSE start_time END, + end_time = CASE WHEN end_time < excluded.end_time THEN excluded.end_time ELSE end_time END + WHERE excluded.start_time < start_time OR end_time < excluded.end_time + RETURNING rowid; + """, # noqa E501 + ( + span.context.trace_id, + project_rowid, + None, + span.start_time, + span.end_time, + ), + ).fetchone() + ) is None: + trace_row = self.cur.execute( + "SELECT rowid from traces where trace_id = ?", (span.context.trace_id,) + ).fetchone() + trace_rowid = trace_row[0] + latency_ms = (span.end_time - span.start_time).total_seconds() * 1000 + cumulative_error_count = int(span.status_code is SpanStatusCode.ERROR) + cumulative_llm_token_count_prompt = cast( + int, span.attributes.get(SpanAttributes.LLM_TOKEN_COUNT_PROMPT, 0) + ) + cumulative_llm_token_count_completion = cast( + int, span.attributes.get(SpanAttributes.LLM_TOKEN_COUNT_COMPLETION, 0) + ) + if accumulation := self.cur.execute( + """ + SELECT + sum(cumulative_error_count), + sum(cumulative_llm_token_count_prompt), + sum(cumulative_llm_token_count_completion) + FROM spans + WHERE parent_span_id = ? + """, # noqa E501 + (span.context.span_id,), + ).fetchone(): + cumulative_error_count += cast(int, accumulation[0] or 0) + cumulative_llm_token_count_prompt += cast(int, accumulation[1] or 0) + cumulative_llm_token_count_completion += cast(int, accumulation[2] or 0) + self.cur.execute( + """ + INSERT INTO spans(span_id, trace_rowid, parent_span_id, kind, name, start_time, end_time, attributes, events, status, status_message, latency_ms, cumulative_error_count, cumulative_llm_token_count_prompt, cumulative_llm_token_count_completion) + VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) + RETURNING rowid; + """, # noqa E501 + ( + span.context.span_id, + trace_rowid, + span.parent_id, + span.span_kind.value, + span.name, + span.start_time, + span.end_time, + json.dumps(span.attributes, cls=_Encoder), + json.dumps(span.events, cls=_Encoder), + span.status_code.value, + span.status_message, + latency_ms, + cumulative_error_count, + cumulative_llm_token_count_prompt, + cumulative_llm_token_count_completion, + ), + ) + parent_id = span.parent_id + while parent_id: + if parent_span := self.cur.execute( + """ + SELECT rowid, parent_span_id + FROM spans + WHERE span_id = ? + """, + (parent_id,), + ).fetchone(): + rowid, parent_id = parent_span[0], parent_span[1] + self.cur.execute( + """ + UPDATE spans SET + cumulative_error_count = cumulative_error_count + ?, + cumulative_llm_token_count_prompt = cumulative_llm_token_count_prompt + ?, + cumulative_llm_token_count_completion = cumulative_llm_token_count_completion + ? + WHERE rowid = ?; + """, # noqa E501 + ( + cumulative_error_count, + cumulative_llm_token_count_prompt, + cumulative_llm_token_count_completion, + rowid, + ), + ) + else: + break + except Exception: + self.cur.execute("ROLLBACK;") + else: + self.cur.execute("COMMIT;") + + def get_projects(self) -> Iterator[Tuple[int, str]]: + for project in self.cur.execute("SELECT rowid, name FROM projects;").fetchall(): + yield cast(Tuple[int, str], (project[0], project[1])) + + def trace_count( + self, + project_name: str, + start_time: Optional[datetime] = None, + stop_time: Optional[datetime] = None, + ) -> int: + query = """ + SELECT COUNT(*) + FROM traces + JOIN projects ON projects.rowid = traces.project_rowid + WHERE projects.name = ? + """ + if start_time and stop_time: + cur = self.cur.execute( + query + " AND ? <= start_time AND start_time < ?;", + (project_name, start_time, stop_time), + ) + elif start_time: + cur = self.cur.execute( + query + " AND ? <= traces.start_time;", (project_name, start_time) + ) + elif start_time: + cur = self.cur.execute(query + " AND traces.start_time < ?;", (project_name, stop_time)) + else: + cur = self.cur.execute(query + ";", (project_name,)) + if res := cur.fetchone(): + return cast(int, res[0]) + return 0 + + def span_count( + self, + project_name: str, + start_time: Optional[datetime] = None, + stop_time: Optional[datetime] = None, + ) -> int: + query = """ + SELECT COUNT(*) + FROM spans + JOIN traces ON traces.rowid = spans.trace_rowid + JOIN projects ON projects.rowid = traces.project_rowid + WHERE projects.name = ? + """ + if start_time and stop_time: + cur = self.cur.execute( + query + " AND ? <= start_time AND start_time < ?;", + (project_name, start_time, stop_time), + ) + elif start_time: + cur = self.cur.execute( + query + " AND ? <= spans.start_time;", (project_name, start_time) + ) + elif start_time: + cur = self.cur.execute(query + " AND spans.start_time < ?;", (project_name, stop_time)) + else: + cur = self.cur.execute(query + ";", (project_name,)) + if res := cur.fetchone(): + return cast(int, res[0]) + return 0 + + def llm_token_count_total( + self, + project_name: str, + start_time: Optional[datetime] = None, + stop_time: Optional[datetime] = None, + ) -> int: + query = """ + SELECT + SUM(COALESCE(json_extract(spans.attributes, '$."llm.token_count.prompt"'), 0) + COALESCE(json_extract(spans.attributes, '$."llm.token_count.completion"'), 0)) + FROM spans + JOIN traces ON traces.rowid = spans.trace_rowid + JOIN projects ON projects.rowid = traces.project_rowid + WHERE projects.name = ? + """ # noqa E501 + if start_time and stop_time: + cur = self.cur.execute( + query + " AND ? <= start_time AND start_time < ?", + (project_name, start_time, stop_time), + ) + elif start_time: + cur = self.cur.execute( + query + " AND ? <= traces.start_time", (project_name, start_time) + ) + elif start_time: + cur = self.cur.execute(query + " AND traces.start_time < ?", (project_name, stop_time)) + else: + cur = self.cur.execute(query, (project_name,)) + if res := cur.fetchone(): + return cast(int, res[0]) + return 0 + + def get_trace(self, trace_id: str) -> Iterator[Tuple[Span, ComputedValues]]: + for span in self.cur.execute( + """ + SELECT + spans.span_id, + traces.trace_id, + spans.parent_span_id, + spans.kind, + spans.name, + spans.start_time, + spans.end_time, + spans.attributes, + spans.events, + spans.status, + spans.status_message, + spans.cumulative_error_count, + spans.cumulative_llm_token_count_prompt, + spans.cumulative_llm_token_count_completion + FROM spans + JOIN traces ON traces.rowid = spans.trace_rowid + WHERE traces.trace_id = ? + """, + (trace_id,), + ).fetchall(): + start_time = datetime.fromisoformat(span[5]) + end_time = datetime.fromisoformat(span[6]) + latency_ms = (end_time - start_time).total_seconds() * 1000 + yield ( + Span( + name=span[4], + context=SpanContext(trace_id=span[1], span_id=span[0]), + parent_id=span[2], + span_kind=SpanKind(span[3]), + start_time=start_time, + end_time=end_time, + attributes=json.loads(span[7]), + events=json.loads(span[8]), + status_code=SpanStatusCode(span[9]), + status_message=span[10], + conversation=None, + ), + ComputedValues( + latency_ms=latency_ms, + cumulative_error_count=span[11], + cumulative_llm_token_count_prompt=span[12], + cumulative_llm_token_count_completion=span[13], + cumulative_llm_token_count_total=span[12] + span[13], + ), + ) + + +class _Encoder(json.JSONEncoder): + def default(self, obj: Any) -> Any: + if isinstance(obj, datetime): + return obj.isoformat() + elif isinstance(obj, np.ndarray): + return list(obj) + elif isinstance(obj, np.integer): + return int(obj) + elif isinstance(obj, np.floating): + return float(obj) + return super().default(obj) + + +if __name__ == "__main__": + _DB = SqliteDatabase() diff --git a/src/phoenix/server/api/types/Project.py b/src/phoenix/server/api/types/Project.py index 926d28296b..671d74cd93 100644 --- a/src/phoenix/server/api/types/Project.py +++ b/src/phoenix/server/api/types/Project.py @@ -4,9 +4,11 @@ import strawberry from strawberry import ID, UNSET +from strawberry.types import Info from phoenix.core.project import Project as CoreProject from phoenix.metrics.retrieval_metrics import RetrievalMetrics +from phoenix.server.api.context import Context from phoenix.server.api.input_types.SpanSort import SpanSort from phoenix.server.api.input_types.TimeRange import TimeRange from phoenix.server.api.types.DocumentEvaluationSummary import DocumentEvaluationSummary @@ -43,24 +45,41 @@ def end_time(self) -> Optional[datetime]: @strawberry.field def record_count( self, + info: Info[Context, None], time_range: Optional[TimeRange] = UNSET, ) -> int: - if not time_range: - return self.project.span_count() - return self.project.span_count(time_range.start, time_range.end) + if not (traces := info.context.traces): + return 0 + start_time, stop_time = ( + (None, None) if not time_range else (time_range.start, time_range.end) + ) + return traces.span_count(self.name, start_time, stop_time) @strawberry.field def trace_count( self, + info: Info[Context, None], time_range: Optional[TimeRange] = UNSET, ) -> int: - if not time_range: - return self.project.trace_count() - return self.project.trace_count(time_range.start, time_range.end) + if not (traces := info.context.traces): + return 0 + start_time, stop_time = ( + (None, None) if not time_range else (time_range.start, time_range.end) + ) + return traces.trace_count(self.name, start_time, stop_time) @strawberry.field - def token_count_total(self) -> int: - return self.project.token_count_total + def token_count_total( + self, + info: Info[Context, None], + time_range: Optional[TimeRange] = UNSET, + ) -> int: + if not (traces := info.context.traces): + return 0 + start_time, stop_time = ( + (None, None) if not time_range else (time_range.start, time_range.end) + ) + return traces.llm_token_count_total(self.name, start_time, stop_time) @strawberry.field def latency_ms_p50(self) -> Optional[float]: diff --git a/src/phoenix/server/api/types/Trace.py b/src/phoenix/server/api/types/Trace.py index 4a0b3331ba..a7d440bc52 100644 --- a/src/phoenix/server/api/types/Trace.py +++ b/src/phoenix/server/api/types/Trace.py @@ -2,8 +2,10 @@ import strawberry from strawberry import ID, UNSET, Private +from strawberry.types import Info from phoenix.core.project import Project +from phoenix.server.api.context import Context from phoenix.server.api.types.Evaluation import TraceEvaluation from phoenix.server.api.types.pagination import ( Connection, @@ -23,6 +25,7 @@ class Trace: @strawberry.field def spans( self, + info: Info[Context, None], first: Optional[int] = 50, last: Optional[int] = UNSET, after: Optional[Cursor] = UNSET, @@ -34,10 +37,9 @@ def spans( last=last, before=before if isinstance(before, Cursor) else None, ) - spans = sorted( - self.project.get_trace(TraceID(self.trace_id)), - key=lambda span: span.start_time, - ) + if not (traces := info.context.traces): + return connection_from_list(data=[], args=args) + spans = traces.get_trace(TraceID(self.trace_id)) data = [to_gql_span(span, self.project) for span in spans] return connection_from_list(data=data, args=args) diff --git a/src/phoenix/server/main.py b/src/phoenix/server/main.py index 070a745f46..0f32637119 100644 --- a/src/phoenix/server/main.py +++ b/src/phoenix/server/main.py @@ -16,11 +16,13 @@ get_env_host, get_env_port, get_pids_path, + get_working_dir, ) from phoenix.core.model_schema_adapter import create_model_from_datasets from phoenix.core.traces import Traces from phoenix.datasets.dataset import EMPTY_DATASET, Dataset from phoenix.datasets.fixtures import FIXTURES, get_datasets +from phoenix.db.database import SqliteDatabase from phoenix.pointcloud.umap_parameters import ( DEFAULT_MIN_DIST, DEFAULT_N_NEIGHBORS, @@ -193,7 +195,8 @@ def _load_items( primary_dataset, reference_dataset, ) - traces = Traces() + db = SqliteDatabase(get_working_dir() / "phoenix.db") + traces = Traces(db) if span_store := get_span_store(): Thread(target=load_traces_data_from_store, args=(traces, span_store), daemon=True).start() if trace_dataset_name is not None: diff --git a/src/phoenix/session/session.py b/src/phoenix/session/session.py index 4a69d6da76..d072ff2bc9 100644 --- a/src/phoenix/session/session.py +++ b/src/phoenix/session/session.py @@ -37,6 +37,7 @@ from phoenix.core.model_schema_adapter import create_model_from_datasets from phoenix.core.traces import Traces from phoenix.datasets.dataset import EMPTY_DATASET, Dataset +from phoenix.db.database import SqliteDatabase from phoenix.pointcloud.umap_parameters import get_umap_parameters from phoenix.server.app import create_app from phoenix.server.thread_server import ThreadServer @@ -118,27 +119,6 @@ def __init__( self.corpus_dataset = corpus_dataset self.trace_dataset = trace_dataset self.umap_parameters = get_umap_parameters(default_umap_parameters) - self.model = create_model_from_datasets( - primary_dataset, - reference_dataset, - ) - - self.corpus = ( - create_model_from_datasets( - corpus_dataset, - ) - if corpus_dataset is not None - else None - ) - - self.traces = Traces() - if trace_dataset: - for span in trace_dataset.to_spans(): - self.traces.put(span) - for evaluations in trace_dataset.evaluations: - for pb_evaluation in encode_evaluations(evaluations): - self.traces.put(pb_evaluation) - self.host = host or get_env_host() self.port = port or get_env_port() self.temp_dir = TemporaryDirectory() @@ -304,6 +284,24 @@ def __init__( port=port, notebook_env=notebook_env, ) + self.model = create_model_from_datasets( + primary_dataset, + reference_dataset, + ) + self.corpus = ( + create_model_from_datasets( + corpus_dataset, + ) + if corpus_dataset is not None + else None + ) + self.traces = Traces(SqliteDatabase()) + if trace_dataset: + for span in trace_dataset.to_spans(): + self.traces.put(span) + for evaluations in trace_dataset.evaluations: + for pb_evaluation in encode_evaluations(evaluations): + self.traces.put(pb_evaluation) if span_store := get_span_store(): Thread( target=load_traces_data_from_store, diff --git a/src/phoenix/trace/otel.py b/src/phoenix/trace/otel.py index 12ec0b4104..9b0f1ab106 100644 --- a/src/phoenix/trace/otel.py +++ b/src/phoenix/trace/otel.py @@ -61,9 +61,7 @@ def decode(otlp_span: otlp.Span) -> Span: parent_id = _decode_identifier(otlp_span.parent_span_id) start_time = _decode_unix_nano(otlp_span.start_time_unix_nano) - end_time = ( - _decode_unix_nano(otlp_span.end_time_unix_nano) if otlp_span.end_time_unix_nano else None - ) + end_time = _decode_unix_nano(otlp_span.end_time_unix_nano) attributes = dict(_unflatten(_load_json_strings(_decode_key_values(otlp_span.attributes)))) span_kind = SpanKind(attributes.pop(OPENINFERENCE_SPAN_KIND, None)) diff --git a/src/phoenix/trace/schemas.py b/src/phoenix/trace/schemas.py index efebeea439..7caeab9bf4 100644 --- a/src/phoenix/trace/schemas.py +++ b/src/phoenix/trace/schemas.py @@ -1,7 +1,7 @@ from dataclasses import dataclass from datetime import datetime from enum import Enum -from typing import Any, Dict, List, Optional, Union +from typing import Any, Dict, List, NamedTuple, Optional, Union from uuid import UUID EXCEPTION_TYPE = "exception.type" @@ -142,7 +142,7 @@ class Span: "If the parent_id is None, this is the root span" parent_id: Optional[SpanID] start_time: datetime - end_time: Optional[datetime] + end_time: datetime status_code: SpanStatusCode status_message: str """ @@ -202,3 +202,11 @@ class ComputedAttributes(Enum): CUMULATIVE_LLM_TOKEN_COUNT_COMPLETION = "cumulative_token_count.completion" ERROR_COUNT = "error_count" CUMULATIVE_ERROR_COUNT = "cumulative_error_count" + + +class ComputedValues(NamedTuple): + latency_ms: float + cumulative_error_count: int + cumulative_llm_token_count_prompt: int + cumulative_llm_token_count_completion: int + cumulative_llm_token_count_total: int From eda5706b27b80c7f1cba4a4ecae836ea19c0c4e6 Mon Sep 17 00:00:00 2001 From: Roger Yang Date: Mon, 1 Apr 2024 15:06:36 -0700 Subject: [PATCH 02/12] clean up --- src/phoenix/db/database.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/phoenix/db/database.py b/src/phoenix/db/database.py index b5b030b107..54e2f1c7c2 100644 --- a/src/phoenix/db/database.py +++ b/src/phoenix/db/database.py @@ -350,7 +350,3 @@ def default(self, obj: Any) -> Any: elif isinstance(obj, np.floating): return float(obj) return super().default(obj) - - -if __name__ == "__main__": - _DB = SqliteDatabase() From 2b5a8ebb2084f52cf6fa96573e432fed6d7c05e2 Mon Sep 17 00:00:00 2001 From: Roger Yang Date: Mon, 1 Apr 2024 15:09:29 -0700 Subject: [PATCH 03/12] clean up --- src/phoenix/core/database.py | 9 --------- 1 file changed, 9 deletions(-) delete mode 100644 src/phoenix/core/database.py diff --git a/src/phoenix/core/database.py b/src/phoenix/core/database.py deleted file mode 100644 index 174e67ba34..0000000000 --- a/src/phoenix/core/database.py +++ /dev/null @@ -1,9 +0,0 @@ -from typing import Iterator, Protocol, Tuple - -from phoenix.trace.schemas import Span - - -class Database(Protocol): - def insert_span(self, span: Span, project_name: str) -> None: ... - def get_projects(self) -> Iterator[Tuple[int, str]]: ... - def get_spans(self, project_name: str) -> Iterator[Span]: ... From 83ddac8748bb2909c4fbda30afc48bb83674d16c Mon Sep 17 00:00:00 2001 From: Roger Yang Date: Mon, 1 Apr 2024 15:13:14 -0700 Subject: [PATCH 04/12] clean up --- src/phoenix/db/database.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/phoenix/db/database.py b/src/phoenix/db/database.py index 54e2f1c7c2..c86d858dfd 100644 --- a/src/phoenix/db/database.py +++ b/src/phoenix/db/database.py @@ -242,7 +242,7 @@ def span_count( """ if start_time and stop_time: cur = self.cur.execute( - query + " AND ? <= start_time AND start_time < ?;", + query + " AND ? <= spans.start_time AND spans.start_time < ?;", (project_name, start_time, stop_time), ) elif start_time: @@ -273,17 +273,17 @@ def llm_token_count_total( """ # noqa E501 if start_time and stop_time: cur = self.cur.execute( - query + " AND ? <= start_time AND start_time < ?", + query + " AND ? <= traces.start_time AND traces.start_time < ?;", (project_name, start_time, stop_time), ) elif start_time: cur = self.cur.execute( - query + " AND ? <= traces.start_time", (project_name, start_time) + query + " AND ? <= traces.start_time;", (project_name, start_time) ) elif start_time: - cur = self.cur.execute(query + " AND traces.start_time < ?", (project_name, stop_time)) + cur = self.cur.execute(query + " AND traces.start_time < ?;", (project_name, stop_time)) else: - cur = self.cur.execute(query, (project_name,)) + cur = self.cur.execute(query + ";", (project_name,)) if res := cur.fetchone(): return cast(int, res[0]) return 0 From 949bd392c627cd410cc236debe43631d0d3f3b88 Mon Sep 17 00:00:00 2001 From: Roger Yang Date: Mon, 1 Apr 2024 15:23:57 -0700 Subject: [PATCH 05/12] clean up --- src/phoenix/db/database.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/src/phoenix/db/database.py b/src/phoenix/db/database.py index c86d858dfd..ad413f20ed 100644 --- a/src/phoenix/db/database.py +++ b/src/phoenix/db/database.py @@ -49,7 +49,6 @@ events JSON, status TEXT CHECK(status IN ('UNSET','OK','ERROR')) NOT NULL DEFAULT('UNSET'), status_message TEXT, - latency_ms REAL NOT NULL DEFAULT 0.0, cumulative_error_count INTEGER NOT NULL DEFAULT 0, cumulative_llm_token_count_prompt INTEGER NOT NULL DEFAULT 0, cumulative_llm_token_count_completion INTEGER NOT NULL DEFAULT 0, @@ -115,7 +114,6 @@ def insert_span(self, span: Span, project_name: str) -> None: "SELECT rowid from traces where trace_id = ?", (span.context.trace_id,) ).fetchone() trace_rowid = trace_row[0] - latency_ms = (span.end_time - span.start_time).total_seconds() * 1000 cumulative_error_count = int(span.status_code is SpanStatusCode.ERROR) cumulative_llm_token_count_prompt = cast( int, span.attributes.get(SpanAttributes.LLM_TOKEN_COUNT_PROMPT, 0) @@ -139,8 +137,8 @@ def insert_span(self, span: Span, project_name: str) -> None: cumulative_llm_token_count_completion += cast(int, accumulation[2] or 0) self.cur.execute( """ - INSERT INTO spans(span_id, trace_rowid, parent_span_id, kind, name, start_time, end_time, attributes, events, status, status_message, latency_ms, cumulative_error_count, cumulative_llm_token_count_prompt, cumulative_llm_token_count_completion) - VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) + INSERT INTO spans(span_id, trace_rowid, parent_span_id, kind, name, start_time, end_time, attributes, events, status, status_message, cumulative_error_count, cumulative_llm_token_count_prompt, cumulative_llm_token_count_completion) + VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?) RETURNING rowid; """, # noqa E501 ( @@ -155,7 +153,6 @@ def insert_span(self, span: Span, project_name: str) -> None: json.dumps(span.events, cls=_Encoder), span.status_code.value, span.status_message, - latency_ms, cumulative_error_count, cumulative_llm_token_count_prompt, cumulative_llm_token_count_completion, @@ -191,6 +188,7 @@ def insert_span(self, span: Span, project_name: str) -> None: break except Exception: self.cur.execute("ROLLBACK;") + raise else: self.cur.execute("COMMIT;") @@ -224,7 +222,7 @@ def trace_count( else: cur = self.cur.execute(query + ";", (project_name,)) if res := cur.fetchone(): - return cast(int, res[0]) + return cast(int, res[0] or 0) return 0 def span_count( @@ -254,7 +252,7 @@ def span_count( else: cur = self.cur.execute(query + ";", (project_name,)) if res := cur.fetchone(): - return cast(int, res[0]) + return cast(int, res[0] or 0) return 0 def llm_token_count_total( @@ -265,7 +263,8 @@ def llm_token_count_total( ) -> int: query = """ SELECT - SUM(COALESCE(json_extract(spans.attributes, '$."llm.token_count.prompt"'), 0) + COALESCE(json_extract(spans.attributes, '$."llm.token_count.completion"'), 0)) + SUM(COALESCE(json_extract(spans.attributes, '$."llm.token_count.prompt"'), 0) + + COALESCE(json_extract(spans.attributes, '$."llm.token_count.completion"'), 0)) FROM spans JOIN traces ON traces.rowid = spans.trace_rowid JOIN projects ON projects.rowid = traces.project_rowid @@ -285,7 +284,7 @@ def llm_token_count_total( else: cur = self.cur.execute(query + ";", (project_name,)) if res := cur.fetchone(): - return cast(int, res[0]) + return cast(int, res[0] or 0) return 0 def get_trace(self, trace_id: str) -> Iterator[Tuple[Span, ComputedValues]]: From 7b531cf82d23dfde76c737bfdee0c3765d1ef4d7 Mon Sep 17 00:00:00 2001 From: Roger Yang Date: Mon, 1 Apr 2024 15:31:42 -0700 Subject: [PATCH 06/12] clean up --- src/phoenix/db/database.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/phoenix/db/database.py b/src/phoenix/db/database.py index ad413f20ed..b6ec16795e 100644 --- a/src/phoenix/db/database.py +++ b/src/phoenix/db/database.py @@ -188,7 +188,6 @@ def insert_span(self, span: Span, project_name: str) -> None: break except Exception: self.cur.execute("ROLLBACK;") - raise else: self.cur.execute("COMMIT;") @@ -272,15 +271,15 @@ def llm_token_count_total( """ # noqa E501 if start_time and stop_time: cur = self.cur.execute( - query + " AND ? <= traces.start_time AND traces.start_time < ?;", + query + " AND ? <= spans.start_time AND spans.start_time < ?;", (project_name, start_time, stop_time), ) elif start_time: cur = self.cur.execute( - query + " AND ? <= traces.start_time;", (project_name, start_time) + query + " AND ? <= spans.start_time;", (project_name, start_time) ) elif start_time: - cur = self.cur.execute(query + " AND traces.start_time < ?;", (project_name, stop_time)) + cur = self.cur.execute(query + " AND spans.start_time < ?;", (project_name, stop_time)) else: cur = self.cur.execute(query + ";", (project_name,)) if res := cur.fetchone(): From 8361625e7078729d7ab0155ac20ae169cd1f4a66 Mon Sep 17 00:00:00 2001 From: Roger Yang Date: Mon, 1 Apr 2024 15:33:02 -0700 Subject: [PATCH 07/12] clean up --- src/phoenix/db/database.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/phoenix/db/database.py b/src/phoenix/db/database.py index b6ec16795e..966f601c1e 100644 --- a/src/phoenix/db/database.py +++ b/src/phoenix/db/database.py @@ -209,7 +209,7 @@ def trace_count( """ if start_time and stop_time: cur = self.cur.execute( - query + " AND ? <= start_time AND start_time < ?;", + query + " AND ? <= traces.start_time AND traces.start_time < ?;", (project_name, start_time, stop_time), ) elif start_time: From af9c71a5ea2b41f66d6c729a717b1ed08073c24e Mon Sep 17 00:00:00 2001 From: Roger Yang Date: Mon, 1 Apr 2024 15:39:45 -0700 Subject: [PATCH 08/12] clean up --- src/phoenix/db/database.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/phoenix/db/database.py b/src/phoenix/db/database.py index 966f601c1e..76cab2f182 100644 --- a/src/phoenix/db/database.py +++ b/src/phoenix/db/database.py @@ -11,7 +11,7 @@ _CONFIG = """ PRAGMA foreign_keys = ON; -PRAGMA journal_mode = WAL2; +PRAGMA journal_mode = WAL; PRAGMA synchronous = OFF; PRAGMA cache_size = -32000; """ From f306fb77f60fc90b8f1a9b5e5d8726be8e5fdf01 Mon Sep 17 00:00:00 2001 From: Roger Yang Date: Mon, 1 Apr 2024 15:53:52 -0700 Subject: [PATCH 09/12] clean up --- src/phoenix/db/database.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/phoenix/db/database.py b/src/phoenix/db/database.py index 76cab2f182..6dc3f8cbed 100644 --- a/src/phoenix/db/database.py +++ b/src/phoenix/db/database.py @@ -14,6 +14,7 @@ PRAGMA journal_mode = WAL; PRAGMA synchronous = OFF; PRAGMA cache_size = -32000; +PRAGMA busy_timeout = 10000; """ _INIT_DB = """ From 941798e9bbf130795c40e766947f1f9340961f52 Mon Sep 17 00:00:00 2001 From: Roger Yang Date: Mon, 1 Apr 2024 16:26:34 -0700 Subject: [PATCH 10/12] use local cursor --- src/phoenix/db/database.py | 70 +++++++++++++++++++------------------- 1 file changed, 35 insertions(+), 35 deletions(-) diff --git a/src/phoenix/db/database.py b/src/phoenix/db/database.py index 6dc3f8cbed..171d65d0bf 100644 --- a/src/phoenix/db/database.py +++ b/src/phoenix/db/database.py @@ -72,27 +72,28 @@ def __init__(self, database: Optional[Path] = None) -> None: check_same_thread=False, ) # self.con.set_trace_callback(print) - self.cur = self.con.cursor() - self.cur.executescript(_CONFIG) - if int(self.cur.execute("PRAGMA user_version;").fetchone()[0]) < 1: - self.cur.executescript(_INIT_DB) + cur = self.con.cursor() + cur.executescript(_CONFIG) + if int(cur.execute("PRAGMA user_version;").fetchone()[0]) < 1: + cur.executescript(_INIT_DB) def insert_span(self, span: Span, project_name: str) -> None: - self.cur.execute("BEGIN;") + cur = self.con.cursor() + cur.execute("BEGIN;") try: if not ( - projects := self.cur.execute( + projects := cur.execute( "SELECT rowid FROM projects WHERE name = ?;", (project_name,), ).fetchone() ): - projects = self.cur.execute( + projects = cur.execute( "INSERT INTO projects(name) VALUES(?) RETURNING rowid;", (project_name,), ).fetchone() project_rowid = projects[0] if ( - trace_row := self.cur.execute( + trace_row := cur.execute( """ INSERT INTO traces(trace_id, project_rowid, session_id, start_time, end_time) VALUES(?,?,?,?,?) @@ -111,7 +112,7 @@ def insert_span(self, span: Span, project_name: str) -> None: ), ).fetchone() ) is None: - trace_row = self.cur.execute( + trace_row = cur.execute( "SELECT rowid from traces where trace_id = ?", (span.context.trace_id,) ).fetchone() trace_rowid = trace_row[0] @@ -122,7 +123,7 @@ def insert_span(self, span: Span, project_name: str) -> None: cumulative_llm_token_count_completion = cast( int, span.attributes.get(SpanAttributes.LLM_TOKEN_COUNT_COMPLETION, 0) ) - if accumulation := self.cur.execute( + if accumulation := cur.execute( """ SELECT sum(cumulative_error_count), @@ -136,7 +137,7 @@ def insert_span(self, span: Span, project_name: str) -> None: cumulative_error_count += cast(int, accumulation[0] or 0) cumulative_llm_token_count_prompt += cast(int, accumulation[1] or 0) cumulative_llm_token_count_completion += cast(int, accumulation[2] or 0) - self.cur.execute( + cur.execute( """ INSERT INTO spans(span_id, trace_rowid, parent_span_id, kind, name, start_time, end_time, attributes, events, status, status_message, cumulative_error_count, cumulative_llm_token_count_prompt, cumulative_llm_token_count_completion) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?) @@ -161,7 +162,7 @@ def insert_span(self, span: Span, project_name: str) -> None: ) parent_id = span.parent_id while parent_id: - if parent_span := self.cur.execute( + if parent_span := cur.execute( """ SELECT rowid, parent_span_id FROM spans @@ -170,7 +171,7 @@ def insert_span(self, span: Span, project_name: str) -> None: (parent_id,), ).fetchone(): rowid, parent_id = parent_span[0], parent_span[1] - self.cur.execute( + cur.execute( """ UPDATE spans SET cumulative_error_count = cumulative_error_count + ?, @@ -188,12 +189,13 @@ def insert_span(self, span: Span, project_name: str) -> None: else: break except Exception: - self.cur.execute("ROLLBACK;") + cur.execute("ROLLBACK;") else: - self.cur.execute("COMMIT;") + cur.execute("COMMIT;") def get_projects(self) -> Iterator[Tuple[int, str]]: - for project in self.cur.execute("SELECT rowid, name FROM projects;").fetchall(): + cur = self.con.cursor() + for project in cur.execute("SELECT rowid, name FROM projects;").fetchall(): yield cast(Tuple[int, str], (project[0], project[1])) def trace_count( @@ -208,19 +210,18 @@ def trace_count( JOIN projects ON projects.rowid = traces.project_rowid WHERE projects.name = ? """ + cur = self.con.cursor() if start_time and stop_time: - cur = self.cur.execute( + cur = cur.execute( query + " AND ? <= traces.start_time AND traces.start_time < ?;", (project_name, start_time, stop_time), ) elif start_time: - cur = self.cur.execute( - query + " AND ? <= traces.start_time;", (project_name, start_time) - ) + cur = cur.execute(query + " AND ? <= traces.start_time;", (project_name, start_time)) elif start_time: - cur = self.cur.execute(query + " AND traces.start_time < ?;", (project_name, stop_time)) + cur = cur.execute(query + " AND traces.start_time < ?;", (project_name, stop_time)) else: - cur = self.cur.execute(query + ";", (project_name,)) + cur = cur.execute(query + ";", (project_name,)) if res := cur.fetchone(): return cast(int, res[0] or 0) return 0 @@ -238,19 +239,18 @@ def span_count( JOIN projects ON projects.rowid = traces.project_rowid WHERE projects.name = ? """ + cur = self.con.cursor() if start_time and stop_time: - cur = self.cur.execute( + cur = cur.execute( query + " AND ? <= spans.start_time AND spans.start_time < ?;", (project_name, start_time, stop_time), ) elif start_time: - cur = self.cur.execute( - query + " AND ? <= spans.start_time;", (project_name, start_time) - ) + cur = cur.execute(query + " AND ? <= spans.start_time;", (project_name, start_time)) elif start_time: - cur = self.cur.execute(query + " AND spans.start_time < ?;", (project_name, stop_time)) + cur = cur.execute(query + " AND spans.start_time < ?;", (project_name, stop_time)) else: - cur = self.cur.execute(query + ";", (project_name,)) + cur = cur.execute(query + ";", (project_name,)) if res := cur.fetchone(): return cast(int, res[0] or 0) return 0 @@ -270,25 +270,25 @@ def llm_token_count_total( JOIN projects ON projects.rowid = traces.project_rowid WHERE projects.name = ? """ # noqa E501 + cur = self.con.cursor() if start_time and stop_time: - cur = self.cur.execute( + cur = cur.execute( query + " AND ? <= spans.start_time AND spans.start_time < ?;", (project_name, start_time, stop_time), ) elif start_time: - cur = self.cur.execute( - query + " AND ? <= spans.start_time;", (project_name, start_time) - ) + cur = cur.execute(query + " AND ? <= spans.start_time;", (project_name, start_time)) elif start_time: - cur = self.cur.execute(query + " AND spans.start_time < ?;", (project_name, stop_time)) + cur = cur.execute(query + " AND spans.start_time < ?;", (project_name, stop_time)) else: - cur = self.cur.execute(query + ";", (project_name,)) + cur = cur.execute(query + ";", (project_name,)) if res := cur.fetchone(): return cast(int, res[0] or 0) return 0 def get_trace(self, trace_id: str) -> Iterator[Tuple[Span, ComputedValues]]: - for span in self.cur.execute( + cur = self.con.cursor() + for span in cur.execute( """ SELECT spans.span_id, From 701e94d39d65dcf5bb5390d40c740ac47aad4f07 Mon Sep 17 00:00:00 2001 From: Roger Yang Date: Tue, 2 Apr 2024 13:00:22 -0700 Subject: [PATCH 11/12] clean up --- src/phoenix/db/database.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/phoenix/db/database.py b/src/phoenix/db/database.py index 171d65d0bf..3e3486cca8 100644 --- a/src/phoenix/db/database.py +++ b/src/phoenix/db/database.py @@ -1,6 +1,7 @@ import json import sqlite3 from datetime import datetime +from enum import Enum from pathlib import Path from typing import Any, Iterator, Optional, Tuple, cast @@ -218,7 +219,7 @@ def trace_count( ) elif start_time: cur = cur.execute(query + " AND ? <= traces.start_time;", (project_name, start_time)) - elif start_time: + elif stop_time: cur = cur.execute(query + " AND traces.start_time < ?;", (project_name, stop_time)) else: cur = cur.execute(query + ";", (project_name,)) @@ -247,7 +248,7 @@ def span_count( ) elif start_time: cur = cur.execute(query + " AND ? <= spans.start_time;", (project_name, start_time)) - elif start_time: + elif stop_time: cur = cur.execute(query + " AND spans.start_time < ?;", (project_name, stop_time)) else: cur = cur.execute(query + ";", (project_name,)) @@ -278,7 +279,7 @@ def llm_token_count_total( ) elif start_time: cur = cur.execute(query + " AND ? <= spans.start_time;", (project_name, start_time)) - elif start_time: + elif stop_time: cur = cur.execute(query + " AND spans.start_time < ?;", (project_name, stop_time)) else: cur = cur.execute(query + ";", (project_name,)) @@ -342,6 +343,8 @@ class _Encoder(json.JSONEncoder): def default(self, obj: Any) -> Any: if isinstance(obj, datetime): return obj.isoformat() + elif isinstance(obj, Enum): + return obj.value elif isinstance(obj, np.ndarray): return list(obj) elif isinstance(obj, np.integer): From 60985f49646798f745b225f1226658d80a19a369 Mon Sep 17 00:00:00 2001 From: Mikyo King Date: Tue, 2 Apr 2024 14:38:04 -0600 Subject: [PATCH 12/12] fix mimetype encoding --- src/phoenix/server/api/types/MimeType.py | 4 ++-- src/phoenix/server/main.py | 19 ++++++++++++------- src/phoenix/trace/otel.py | 5 ++--- 3 files changed, 16 insertions(+), 12 deletions(-) diff --git a/src/phoenix/server/api/types/MimeType.py b/src/phoenix/server/api/types/MimeType.py index 4c33572994..9dbe40f477 100644 --- a/src/phoenix/server/api/types/MimeType.py +++ b/src/phoenix/server/api/types/MimeType.py @@ -8,8 +8,8 @@ @strawberry.enum class MimeType(Enum): - text = trace_schemas.MimeType.TEXT - json = trace_schemas.MimeType.JSON + text = trace_schemas.MimeType.TEXT.value + json = trace_schemas.MimeType.JSON.value @classmethod def _missing_(cls, v: Any) -> Optional["MimeType"]: diff --git a/src/phoenix/server/main.py b/src/phoenix/server/main.py index 0f32637119..1b43921264 100644 --- a/src/phoenix/server/main.py +++ b/src/phoenix/server/main.py @@ -50,7 +50,7 @@ ██████╔╝███████║██║ ██║█████╗ ██╔██╗ ██║██║ ╚███╔╝ ██╔═══╝ ██╔══██║██║ ██║██╔══╝ ██║╚██╗██║██║ ██╔██╗ ██║ ██║ ██║╚██████╔╝███████╗██║ ╚████║██║██╔╝ ██╗ -╚═╝ ╚═╝ ╚═╝ ╚═════╝ ╚══════╝╚═╝ ╚═══╝╚═╝╚═╝ ╚═╝ v{0} +╚═╝ ╚═╝ ╚═╝ ╚═════╝ ╚══════╝╚═╝ ╚═══╝╚═╝╚═╝ ╚═╝ v{version} | | 🌎 Join our Community 🌎 @@ -63,9 +63,9 @@ | https://docs.arize.com/phoenix | | 🚀 Phoenix Server 🚀 -| Phoenix UI: http://{1}:{2} +| Phoenix UI: http://{host}:{port} | Log traces: /v1/traces over HTTP -| +| Storage location: {working_dir} """ @@ -195,7 +195,8 @@ def _load_items( primary_dataset, reference_dataset, ) - db = SqliteDatabase(get_working_dir() / "phoenix.db") + working_dir = get_working_dir() + db = SqliteDatabase(working_dir / "phoenix.db") traces = Traces(db) if span_store := get_span_store(): Thread(target=load_traces_data_from_store, args=(traces, span_store), daemon=True).start() @@ -249,9 +250,13 @@ def _load_items( # Print information about the server phoenix_version = pkg_resources.get_distribution("arize-phoenix").version - print( - _WELCOME_MESSAGE.format(phoenix_version, host if host != "0.0.0.0" else "localhost", port) - ) + config = { + "version": phoenix_version, + "host": host, + "port": port, + "working_dir": working_dir, + } + print(_WELCOME_MESSAGE.format(**config)) # Start the server server.run() diff --git a/src/phoenix/trace/otel.py b/src/phoenix/trace/otel.py index 9b0f1ab106..21bb417b4a 100644 --- a/src/phoenix/trace/otel.py +++ b/src/phoenix/trace/otel.py @@ -33,7 +33,6 @@ EXCEPTION_MESSAGE, EXCEPTION_STACKTRACE, EXCEPTION_TYPE, - MimeType, Span, SpanContext, SpanEvent, @@ -68,7 +67,7 @@ def decode(otlp_span: otlp.Span) -> Span: for mime_type in (INPUT_MIME_TYPE, OUTPUT_MIME_TYPE): if mime_type in attributes: - attributes[mime_type] = MimeType(attributes[mime_type]) + attributes[mime_type] = attributes[mime_type] status_code, status_message = _decode_status(otlp_span.status) events = [_decode_event(event) for event in otlp_span.events] @@ -318,7 +317,7 @@ def encode(span: Span) -> otlp.Span: for mime_type in (INPUT_MIME_TYPE, OUTPUT_MIME_TYPE): if mime_type in attributes: - attributes[mime_type] = attributes[mime_type].value + attributes[mime_type] = attributes[mime_type] for key, value in span.attributes.items(): if value is None: