Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: incorporate schema for postgresql and add integration test #4474

Merged
merged 5 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 23 additions & 10 deletions .github/workflows/python-CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ name: Python CI

on:
push:
branches: [main, auth]
branches: [ main, auth ]
pull_request:
paths:
- "src/**"
- "tests/**"
- "integration_tests/server/**"
- "integration_tests/**"
- "tutorials/**"
- "pyproject.toml"
- "packages/**"
Expand Down Expand Up @@ -39,7 +39,7 @@ jobs:
phoenix:
- "src/**"
- "tests/**"
- "integration_tests/server/**"
- "integration_tests/**"
- "tutorials/**"
- "pyproject.toml"
packages:
Expand All @@ -61,7 +61,7 @@ jobs:
3.12
- uses: yezz123/setup-uv@v4
with:
uv-version: 0.3.0
uv-version: 0.4.2
uv-venv: ${{ github.job }}-${{ github.run_number }}
- run: uv pip install tox==4.18.0 tox-uv==1.11.2
- run: tox run-parallel --parallel-no-spinner -e py38-ci-pkg-phoenix_evals,py312-ci-pkg-phoenix_evals
Expand Down Expand Up @@ -113,8 +113,8 @@ jobs:
if: ${{ needs.changes.outputs.phoenix == 'true' }}
strategy:
matrix:
os: [macos-12, windows-latest, ubuntu-latest]
python-version: ["3.8"]
os: [ macos-12, windows-latest, ubuntu-latest ]
python-version: [ "3.8" ]
steps:
- name: Checkout Repository
uses: actions/checkout@v4
Expand Down Expand Up @@ -149,16 +149,29 @@ jobs:
if: ${{ needs.changes.outputs.phoenix == 'true' }}
strategy:
matrix:
os: [ubuntu-latest, windows-latest, windows-2019, macos-12]
os: [ ubuntu-latest, windows-latest, windows-2019, macos-12 ]
services:
postgres:
# Applying this workaround: https://github.com/actions/runner/issues/822
image: ${{ (matrix.os == 'ubuntu-latest') && 'postgres:12' || '' }}
env:
POSTGRES_PASSWORD: phoenix
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setting a distinct password to avoid making inadvertent connections when running locally

options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 5432:5432
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: 3.8
- uses: yezz123/setup-uv@v4
with:
uv-version: 0.3.0
uv-version: 0.4.2
uv-venv: ${{ github.job }}-${{ github.run_number }}
- run: uv pip install tox==4.18.0 tox-uv==1.11.2
- run: tox run -e ci-integration_tests -- server
timeout-minutes: 10
- run: tox run -e ci-integration_tests
timeout-minutes: 15
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
repos:
- repo: https://github.com/charliermarsh/ruff-pre-commit
rev: "v0.6.1"
rev: "v0.6.3"
hooks:
- id: ruff-format
- id: ruff
Expand Down
5 changes: 3 additions & 2 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mypy==1.11.1
ruff==0.6.1
mypy==1.11.2
ruff==0.6.3
pytest==8.3.2
pytest-xdist==3.6.1
pytest-asyncio==0.23.8
uvloop; platform_system != 'Windows'
288 changes: 288 additions & 0 deletions integration_tests/conftest.py
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copied from #4470

Original file line number Diff line number Diff line change
@@ -0,0 +1,288 @@
import os
import sys
from contextlib import ExitStack, contextmanager
from subprocess import PIPE, STDOUT
from threading import Lock, Thread
from time import sleep, time
from typing import Any, Callable, ContextManager, Dict, Iterator, List, Optional, Protocol, cast
from unittest import mock
from urllib.parse import urljoin
from urllib.request import urlopen

import httpx
import pytest
from _pytest.fixtures import SubRequest
from _pytest.tmpdir import TempPathFactory
from faker import Faker
from openinference.semconv.resource import ResourceAttributes
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor, SpanExporter
from opentelemetry.trace import Span, Tracer
from phoenix.config import (
ENV_PHOENIX_GRPC_PORT,
ENV_PHOENIX_PORT,
ENV_PHOENIX_SQL_DATABASE_SCHEMA,
ENV_PHOENIX_SQL_DATABASE_URL,
ENV_PHOENIX_WORKING_DIR,
get_base_url,
get_env_grpc_port,
get_env_host,
)
from portpicker import pick_unused_port # type: ignore[import-untyped]
from psutil import STATUS_ZOMBIE, Popen
from sqlalchemy import URL, create_engine, make_url, text
from sqlalchemy.exc import OperationalError
from typing_extensions import TypeAlias

_ProjectName: TypeAlias = str
_SpanName: TypeAlias = str
_Headers: TypeAlias = Dict[str, Any]


class _GetGqlSpans(Protocol):
def __call__(self, *keys: str) -> Dict[_ProjectName, List[Dict[str, Any]]]: ...


class _SpanExporterFactory(Protocol):
def __call__(
self,
*,
headers: Optional[_Headers] = None,
) -> SpanExporter: ...


class _GetTracer(Protocol):
def __call__(
self,
*,
project_name: _ProjectName,
exporter: SpanExporter,
) -> Tracer: ...


class _StartSpan(Protocol):
def __call__(
self,
*,
project_name: _ProjectName,
span_name: _SpanName,
exporter: SpanExporter,
) -> Span: ...


@pytest.fixture(scope="class")
def fake() -> Faker:
return Faker()


@pytest.fixture(autouse=True, scope="class")
def env(tmp_path_factory: TempPathFactory) -> Iterator[None]:
tmp = tmp_path_factory.getbasetemp()
values = (
(ENV_PHOENIX_PORT, str(pick_unused_port())),
(ENV_PHOENIX_GRPC_PORT, str(pick_unused_port())),
(ENV_PHOENIX_WORKING_DIR, str(tmp)),
)
with mock.patch.dict(os.environ, values):
yield


@pytest.fixture(
scope="class",
params=[
pytest.param("sqlite:///:memory:", id="sqlite"),
pytest.param(
"postgresql://127.0.0.1:5432/postgres?user=postgres&password=phoenix",
id="postgresql",
),
],
)
def sql_database_url(request: SubRequest) -> URL:
return make_url(request.param)


@pytest.fixture(autouse=True, scope="class")
def env_phoenix_sql_database_url(
sql_database_url: URL,
fake: Faker,
) -> Iterator[None]:
values = [(ENV_PHOENIX_SQL_DATABASE_URL, sql_database_url.render_as_string())]
with ExitStack() as stack:
if sql_database_url.get_backend_name().startswith("postgresql"):
schema = stack.enter_context(_random_schema(sql_database_url, fake))
values.append((ENV_PHOENIX_SQL_DATABASE_SCHEMA, schema))
stack.enter_context(mock.patch.dict(os.environ, values))
yield


@pytest.fixture(autouse=True, scope="class")
def env_phoenix_sql_database_schema(
fake: Faker,
) -> Iterator[None]:
schema = fake.unique.pystr()
values = ((ENV_PHOENIX_SQL_DATABASE_SCHEMA, schema),)
with mock.patch.dict(os.environ, values):
yield


@pytest.fixture
def get_gql_spans(
httpx_client: httpx.Client,
) -> _GetGqlSpans:
def _(*keys: str) -> Dict[_ProjectName, List[Dict[str, Any]]]:
out = "name spans{edges{node{" + " ".join(keys) + "}}}"
query = dict(query="query{projects{edges{node{" + out + "}}}}")
resp = httpx_client.post(urljoin(get_base_url(), "graphql"), json=query)
resp.raise_for_status()
resp_dict = resp.json()
assert not resp_dict.get("errors")
return {
project["node"]["name"]: [span["node"] for span in project["node"]["spans"]["edges"]]
for project in resp_dict["data"]["projects"]["edges"]
}

return _


@pytest.fixture(scope="session")
def http_span_exporter() -> _SpanExporterFactory:
def _(
*,
headers: Optional[_Headers] = None,
) -> SpanExporter:
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter

endpoint = urljoin(get_base_url(), "v1/traces")
exporter = OTLPSpanExporter(endpoint=endpoint, headers=headers, timeout=1)
exporter._MAX_RETRY_TIMEOUT = 2
return exporter

return _


@pytest.fixture(scope="session")
def grpc_span_exporter() -> _SpanExporterFactory:
def _(
*,
headers: Optional[_Headers] = None,
) -> SpanExporter:
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter

host = get_env_host()
if host == "0.0.0.0":
host = "127.0.0.1"
endpoint = f"http://{host}:{get_env_grpc_port()}"
return OTLPSpanExporter(endpoint=endpoint, headers=headers, timeout=1)

return _


@pytest.fixture(scope="session", params=["http", "grpc"])
def span_exporter(request: SubRequest) -> _SpanExporterFactory:
if request.param == "http":
return cast(_SpanExporterFactory, request.getfixturevalue("http_span_exporter"))
if request.param == "grpc":
return cast(_SpanExporterFactory, request.getfixturevalue("grpc_span_exporter"))
raise ValueError(f"Unknown exporter: {request.param}")


@pytest.fixture(scope="session")
def get_tracer() -> _GetTracer:
def _(
*,
project_name: str,
exporter: SpanExporter,
) -> Tracer:
resource = Resource({ResourceAttributes.PROJECT_NAME: project_name})
tracer_provider = TracerProvider(resource=resource)
tracer_provider.add_span_processor(SimpleSpanProcessor(exporter))
return tracer_provider.get_tracer(__name__)

return _


@pytest.fixture(scope="session")
def start_span(
get_tracer: _GetTracer,
) -> _StartSpan:
def _(
*,
project_name: str,
span_name: str,
exporter: SpanExporter,
) -> Span:
return get_tracer(project_name=project_name, exporter=exporter).start_span(span_name)

return _


@pytest.fixture(scope="session")
def httpx_client() -> httpx.Client:
# Having no timeout is useful when stepping through the debugger.
return httpx.Client(timeout=None)


@pytest.fixture(scope="session")
def server() -> Callable[[], ContextManager[None]]:
@contextmanager
def _() -> Iterator[None]:
command = f"{sys.executable} -m phoenix.server.main serve"
process = Popen(command.split(), stdout=PIPE, stderr=STDOUT, text=True, env=os.environ)
log: List[str] = []
lock: Lock = Lock()
Thread(target=capture_stdout, args=(process, log, lock), daemon=True).start()
t = 60
time_limit = time() + t
timed_out = False
url = urljoin(get_base_url(), "healthz")
while not timed_out and is_alive(process):
sleep(0.1)
try:
urlopen(url)
break
except BaseException:
timed_out = time() > time_limit
try:
if timed_out:
raise TimeoutError(f"Server did not start within {t} seconds.")
assert is_alive(process)
with lock:
for line in log:
print(line, end="")
log.clear()
yield
process.terminate()
process.wait(10)
finally:
for line in log:
print(line, end="")

return _


def is_alive(process: Popen) -> bool:
return process.is_running() and process.status() != STATUS_ZOMBIE


def capture_stdout(process: Popen, log: List[str], lock: Lock) -> None:
while is_alive(process):
line = process.stdout.readline()
if line or (log and log[-1] != line):
with lock:
log.append(line)


@contextmanager
def _random_schema(url: URL, fake: Faker) -> Iterator[str]:
engine = create_engine(url.set(drivername="postgresql+psycopg"))
try:
engine.connect()
except OperationalError as ex:
pytest.skip(f"PostgreSQL unavailable: {ex}")
schema = fake.unique.pystr().lower()
yield schema
with engine.connect() as conn:
conn.execute(text(f"DROP SCHEMA IF EXISTS {schema} CASCADE;"))
conn.commit()
engine.dispose()
1 change: 1 addition & 0 deletions integration_tests/mypy.ini
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
[mypy]
strict = true
explicit_package_bases = true
exclude = (^evals|^notebooks)
2 changes: 1 addition & 1 deletion integration_tests/pytest.ini
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[pytest]
addopts = -raP -l
addopts = -rA -l --ignore=evals --ignore=notebooks
1 change: 1 addition & 0 deletions integration_tests/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
faker
httpx
openinference-semantic-conventions
opentelemetry-sdk
portpicker
Expand Down
Loading
Loading