Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hold reports in collector service until it uploaded to UI service. #1241

Merged
merged 4 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
19 changes: 19 additions & 0 deletions examples/service/collector/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
services:
collector:
image: evidently/evidently-service:0.4.33
entrypoint: ["evidently", "collector"]
command: ["--config-path", "/config/collector.json"]
ports:
- 8001:8001
volumes:
- ./config:/config # persistent storage for collector configuration.
# After updating it via API it would be saved in this folder persistently.
ui:
image: evidently/evidently-service:0.4.33
ports:
- 8000:8000
command: ["--workspace", "/data"]
volumes:
- data:/data # where to store data for UI service. In this example is docker volume.
volumes:
data:
53 changes: 53 additions & 0 deletions examples/service/collector/generate_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import pandas as pd

from evidently.collector.client import CollectorClient
from evidently.report import Report
from evidently.metric_preset import DataQualityPreset

from evidently.collector.config import CollectorConfig, IntervalTrigger, ReportConfig
from evidently.ui.workspace import RemoteWorkspace


def get_or_create_project_by_name(ui_api_url: str, project_name: str):
ui_client = RemoteWorkspace(ui_api_url)
projects = ui_client.search_project(project_name)
if len(projects) == 0:
project = ui_client.create_project(project_name)
else:
project = projects[0]
return str(project.id)


# Project ID to upload data.
# Can be obtained via UI after creating project
project_id = get_or_create_project_by_name("http://localhost:8000", "My Test Project")

# Address of UI service for snapshot upload.
# Should be address of UI service accessible from collector service.
# For docker compose: http://ui.:8000
api_url = "http://ui.:8000"

# Generate Report configuration
# create sample report to create configuration from it should contain:
# - expected metrics
# - sample dataset (with required columns)

report = Report(metrics=[DataQualityPreset()])

sample_data = pd.DataFrame(data={"a": [1, 2, 3, 4]})

report.run(current_data=sample_data, reference_data=None)

# create collector configuration
config = CollectorConfig(
id="main",
trigger=IntervalTrigger(interval=10),
report_config=ReportConfig.from_report(report),
reference_path=None,
project_id=project_id,
api_url=api_url,
)

client = CollectorClient("http://localhost:8001")
client.create_collector("main", config)
# After this call collector configuration would be saved in ./config folder.
46 changes: 34 additions & 12 deletions src/evidently/collector/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
from evidently.collector.config import CollectorConfig
from evidently.collector.config import CollectorServiceConfig
from evidently.collector.storage import CollectorStorage
from evidently.collector.storage import CreateSnapshotEvent
from evidently.collector.storage import LogEvent
from evidently.collector.storage import UploadSnapshotEvent
from evidently.telemetry import DO_NOT_TRACK_ENV
from evidently.telemetry import event_logger
from evidently.ui.components.security import NoSecurityComponent
Expand Down Expand Up @@ -126,6 +128,7 @@ async def check_snapshots_factory(service: CollectorServiceConfig, storage: Coll
if not collector.trigger.is_ready(collector, storage):
continue
await create_snapshot(collector, storage)
await send_snapshot(collector, storage)


async def create_snapshot(collector: CollectorConfig, storage: CollectorStorage) -> None:
Expand All @@ -141,23 +144,42 @@ async def create_snapshot(collector: CollectorConfig, storage: CollectorStorage)
report.run, reference_data=collector.reference, current_data=current, column_mapping=ColumnMapping()
) # FIXME: sync function
report._inner_suite.raise_for_error()
snapshot = report.to_snapshot()
except Exception as e:
logger.exception(f"Error running report: {e}")
storage.log(
collector.id, LogEvent(ok=False, error=f"Error running report: {e.__class__.__name__}: {e.args}")
collector.id,
CreateSnapshotEvent(
snapshot_id=str(report.id),
ok=False,
error=f"Error running report: {e.__class__.__name__}: {e.args}",
),
)
return
try:
await sync_to_thread(
collector.workspace.add_snapshot, collector.project_id, report.to_snapshot()
) # FIXME: sync function
except Exception as e:
logger.exception(f"Error saving snapshot: {e}")
storage.log(
collector.id, LogEvent(ok=False, error=f"Error saving snapshot: {e.__class__.__name__}: {e.args}")
)
return
storage.log(collector.id, LogEvent(ok=True))
storage.add_snapshot(collector.id, snapshot)
storage.log(collector.id, CreateSnapshotEvent(snapshot_id=str(snapshot.id), ok=True))


async def send_snapshot(collector: CollectorConfig, storage: CollectorStorage) -> None:
async with storage.lock(collector.id):
for snapshot_item in storage.take_snapshots(collector.id):
try:
with snapshot_item as snapshot:
await sync_to_thread(
collector.workspace.add_snapshot, collector.project_id, snapshot
) # FIXME: sync function
except Exception as e:
logger.exception(f"Error saving snapshot: {e}")
storage.log(
collector.id,
UploadSnapshotEvent(
snapshot_id=str(snapshot.id),
ok=False,
error=f"Error saving snapshot: {e.__class__.__name__}: {e.args}",
),
)
return
storage.log(collector.id, UploadSnapshotEvent(snapshot_id=str(snapshot.id), ok=True))


def create_app(config_path: str = CONFIG_PATH, secret: Optional[str] = None, debug: bool = False) -> Litestar:
Expand Down
43 changes: 43 additions & 0 deletions src/evidently/collector/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,43 @@
from typing import Any
from typing import Dict
from typing import List
from typing import Sequence

import pandas as pd

from evidently._pydantic_compat import BaseModel
from evidently.pydantic_utils import PolymorphicModel
from evidently.suite.base_suite import Snapshot


class LogEvent(BaseModel):
type: str
snapshot_id: str
ok: bool
error: str = ""


class CreateSnapshotEvent(LogEvent):
type = "CreateSnapshot"


class UploadSnapshotEvent(LogEvent):
type = "UploadSnapshot"


class SnapshotPopper:
def __init__(self, value: Snapshot, snapshot_list: List[Snapshot]):
self.value = value
self.snapshot_list = snapshot_list

def __enter__(self) -> Snapshot:
return self.value

def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is not None:
self.snapshot_list.insert(0, self.value)


class CollectorStorage(PolymorphicModel):
class Config:
underscore_attrs_are_private = True
Expand Down Expand Up @@ -51,17 +76,27 @@ def log(self, id: str, event: LogEvent):
def get_logs(self, id: str) -> List[LogEvent]:
raise NotImplementedError

@abc.abstractmethod
def add_snapshot(self, id: str, report: Snapshot):
raise NotImplementedError

@abc.abstractmethod
def take_snapshots(self, id: str) -> Sequence[SnapshotPopper]:
raise NotImplementedError


class InMemoryStorage(CollectorStorage):
max_log_events: int = 10

_buffers: Dict[str, List[Any]] = {}
_logs: Dict[str, List[LogEvent]] = {}
_snapshots: Dict[str, List[Snapshot]] = {}

def init(self, id: str):
super().init(id)
self._buffers[id] = []
self._logs[id] = []
self._snapshots[id] = []

def append(self, id: str, data: Any):
self._buffers[id].append(data)
Expand All @@ -84,3 +119,11 @@ def log(self, id: str, event: LogEvent):

def get_logs(self, id: str) -> List[LogEvent]:
return self._logs.get(id, [])

def add_snapshot(self, id: str, report: Snapshot):
self._snapshots[id].append(report)

def take_snapshots(self, id: str) -> Sequence[Snapshot]:
snapshot_list = self._snapshots.get(id, [])
while len(snapshot_list) > 0:
yield SnapshotPopper(snapshot_list.pop(0), snapshot_list)
7 changes: 6 additions & 1 deletion tests/collector/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,13 @@ def test_create_snapshot_and_get_logs(
)
assert len(project.list_snapshots()) == 1

snapshot_id = str(project.list_snapshots()[0].id)

r = collector_test_client.get("/new/logs")
r.raise_for_status()

data = r.json()
assert data == [{"error": "", "ok": True}]
assert data == [
{"error": "", "ok": True, "snapshot_id": snapshot_id, "type": "UploadSnapshot"},
{"error": "", "ok": True, "snapshot_id": snapshot_id, "type": "CreateSnapshot"},
]
Loading