Skip to content

Commit

Permalink
Event history refactor (#71)
Browse files Browse the repository at this point in the history
* store status changes with timestamps, show run time based on those timestamps

* fix some bugs

* refactor StatusHistory into it's own dataclass

* create new file specific to status

* add tests for StatusHistory

* add pytest to pre-commit

* attempt to fix github runner

* remove demo_test, accidentally added

* update .gitignore

* clean up mere commit
  • Loading branch information
eriktaubeneck authored Jul 12, 2024
1 parent 8b6b38d commit 771b88e
Show file tree
Hide file tree
Showing 14 changed files with 265 additions and 103 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pre-commit.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:

- name: Install dependencies
run: |
pip install .
pip install -e .
- name: Setup node.js
uses: actions/setup-node@v4
with:
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ tmp/
IGNORE-ME*
.pyre/*
.draft
.coverage*

# local env files
.env*.local
Expand Down
14 changes: 14 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,20 @@ repos:
[
"-rn", # Only display messages
]
- id: pytest
name: pytest
language: python
entry: pytest
types: [python]
pass_filenames: false
args: [--cov=sidecar]
- id : pytest-coverage
name: coverage
language: python
entry: coverage report
types: [python]
pass_filenames: false
args: [--fail-under=9] # increase this over time
- id: pyre-check
name: pyre-check
entry: pyre check
Expand Down
3 changes: 2 additions & 1 deletion .pyre_configuration
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"site_package_search_strategy": "pep561",
"source_directories": [
"sidecar"
{"import_root": ".", "source": "sidecar"}
]

}
11 changes: 11 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ dependencies=[
"pre-commit",
"cryptography",
"httpx",
"pytest",
"pytest-cov",
]

[project.scripts]
Expand All @@ -50,6 +52,15 @@ disable = [
# "R0913",
]

[tool.pylint.main]
source-roots = ["sidecar"]

[tool.black]
target-version = ["py311", ]
include = '\.pyi?$'

[tool.pytest.ini_options]
addopts = [
"--import-mode=importlib",
]
pythonpath = "sidecar"
115 changes: 32 additions & 83 deletions sidecar/app/query/base.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
# pylint: disable=R0801
from __future__ import annotations

import time
from collections import namedtuple
from collections.abc import Iterable
from dataclasses import dataclass, field
from pathlib import Path
Expand All @@ -13,7 +10,8 @@
from ..helpers import Role
from ..logger import logger
from ..settings import settings
from .step import Status, Step
from .status import Status, StatusHistory
from .step import Step

# Dictionary to store queries
queries: dict[str, "Query"] = {}
Expand All @@ -23,30 +21,23 @@ class QueryExistsError(Exception):
pass


StatusChangeEvent = namedtuple("StatusChangeEvent", ["status", "timestamp"])


@dataclass
class Query:
# pylint: disable=too-many-instance-attributes
query_id: str
current_step: Optional[Step] = field(init=False, default=None, repr=True)
start_time: Optional[float] = field(init=False, default=None)
end_time: Optional[float] = field(init=False, default=None)
stopped: bool = field(init=False, default=False)
logger: loguru.Logger = field(init=False, repr=False)
_logger_id: int = field(init=False, repr=False)
_status_history: StatusHistory = field(init=False, repr=True)
step_classes: ClassVar[list[type[Step]]] = []
_log_dir: Path = settings.root_path / Path("logs")
_status_history: list[StatusChangeEvent] = field(
init=False, default_factory=list, repr=True
)
_status_dir: Path = settings.root_path / Path("status_semaphore")

def __post_init__(self):
self.logger = logger.bind(task=self.query_id)
status_dir = settings.root_path / Path("status")
status_dir.mkdir(exist_ok=True)
status_file_path = status_dir / Path(f"{self.query_id}")
self._status_history = StatusHistory(file_path=status_file_path, logger=logger)

self._log_dir.mkdir(exist_ok=True)
self._status_dir.mkdir(exist_ok=True)
self._logger_id = logger.add(
self.log_file_path,
serialize=True,
Expand All @@ -58,17 +49,21 @@ def __post_init__(self):
raise QueryExistsError(f"{self.query_id} already exists")
queries[self.query_id] = self

@property
def _log_dir(self) -> Path:
return settings.root_path / Path("logs")

@property
def role(self) -> Role:
return settings.role

@property
def started(self) -> bool:
return self.start_time is not None
return self.status >= Status.STARTING

@property
def finished(self) -> bool:
return self.end_time is not None
return self.status >= Status.COMPLETE

@classmethod
def get_from_query_id(cls, query_id) -> Optional["Query"]:
Expand All @@ -83,55 +78,22 @@ def get_from_query_id(cls, query_id) -> Optional["Query"]:
if query:
return query
raise e
query.load_history_from_file()
if query.status == Status.UNKNOWN:
return None
return query

def load_history_from_file(self):
if self.status_file_path.exists():
self.logger.debug(
f"Loading query {self.query_id} status history "
f"from file {self.status_file_path}"
)
with self.status_file_path.open("r") as f:
for line in f:
status_str, timestamp = line.split(",")
self._status_history.append(
StatusChangeEvent(
status=Status[status_str], timestamp=float(timestamp)
)
)

@property
def _last_status_event(self):
if not self._status_history:
return StatusChangeEvent(status=Status.UNKNOWN, timestamp=time.time())
return self._status_history[-1]

@property
def status_event_json(self):
status_event = {
"status": self._last_status_event.status.name,
"start_time": self._last_status_event.timestamp,
}
if self.status >= Status.COMPLETE and len(self._status_history) >= 2:
status_event["start_time"] = self._status_history[-2].timestamp
status_event["end_time"] = self._last_status_event.timestamp
return status_event

@property
def status(self) -> Status:
return self._last_status_event.status
return self._status_history.current_status

@status.setter
def status(self, status: Status):
if self.status <= Status.COMPLETE:
now = time.time()
self._status_history.append(StatusChangeEvent(status=status, timestamp=now))
with self.status_file_path.open("a") as f:
self.logger.debug(f"updating status: {status=}")
f.write(f"{status.name},{now}\n")
if self.status != status and self.status <= Status.COMPLETE:
self._status_history.add(status)

@property
def status_event_json(self):
return self._status_history.status_event_json

@property
def running(self):
Expand All @@ -141,18 +103,12 @@ def running(self):
def log_file_path(self) -> Path:
return self._log_dir / Path(f"{self.query_id}.log")

@property
def status_file_path(self) -> Path:
return self._status_dir / Path(f"{self.query_id}")

@property
def steps(self) -> Iterable[Step]:
for step_class in self.step_classes:
if not self.stopped:
yield step_class.build_from_query(self)
yield step_class.build_from_query(self)

def start(self):
self.start_time = time.time()
try:
for step in self.steps:
if self.finished:
Expand Down Expand Up @@ -180,37 +136,30 @@ def finish(self):
self._cleanup()

def kill(self):
self.status = Status.KILLED
self.logger.info(f"Killing: {self=}")
if self.current_step:
self.current_step.terminate()
if self.running:
self.status = Status.KILLED
self.logger.info(f"Killing: {self=}")
if self.current_step:
self.current_step.terminate()
self._cleanup()

def crash(self):
self.status = Status.CRASHED
self.logger.info(f"CRASHING! {self=}")
if self.current_step:
self.current_step.kill()
if self.running:
self.status = Status.CRASHED
self.logger.info(f"CRASHING! {self=}")
if self.current_step:
self.current_step.kill()
self._cleanup()

def _cleanup(self):
self.current_step = None
self.end_time = time.time()
try:
logger.remove(self._logger_id)
except ValueError:
pass
if queries.get(self.query_id) is not None:
del queries[self.query_id]

@property
def run_time(self):
if not self.start_time:
return 0
if not self.end_time:
return time.time() - self.start_time
return self.end_time - self.start_time

@property
def cpu_usage_percent(self) -> float:
if self.current_step:
Expand Down
83 changes: 83 additions & 0 deletions sidecar/app/query/status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from __future__ import annotations

import time
from dataclasses import dataclass, field
from enum import IntEnum, auto
from pathlib import Path
from typing import NamedTuple

import loguru


class Status(IntEnum):
UNKNOWN = auto()
NOT_FOUND = auto()
STARTING = auto()
COMPILING = auto()
WAITING_TO_START = auto()
IN_PROGRESS = auto()
COMPLETE = auto()
KILLED = auto()
CRASHED = auto()


StatusChangeEvent = NamedTuple(
"StatusChangeEvent", [("status", Status), ("timestamp", float)]
)


@dataclass
class StatusHistory:
file_path: Path = field(init=True, repr=False)
logger: loguru.Logger = field(init=True, repr=False, compare=False)
_status_history: list[StatusChangeEvent] = field(
init=False, default_factory=list, repr=True
)

def __post_init__(self):
if self.file_path.exists():
self.logger.debug(f"Loading status history from file {self.file_path}")
with self.file_path.open("r", encoding="utf8") as f:
for line in f:
status_str, timestamp = line.split(",")
self._status_history.append(
StatusChangeEvent(
status=Status[status_str], timestamp=float(timestamp)
)
)

@property
def locking_status(self):
"""Cannot add to history after this or higher status is reached"""
return Status.COMPLETE

def add(self, status: Status, timestamp: float = time.time()):
assert status > self.current_status
assert self.current_status < self.locking_status
self._status_history.append(
StatusChangeEvent(status=status, timestamp=timestamp)
)
with self.file_path.open("a", encoding="utf8") as f:
self.logger.debug(f"updating status: {status=}")
f.write(f"{status.name},{timestamp}\n")

@property
def current_status_event(self):
if not self._status_history:
return StatusChangeEvent(status=Status.UNKNOWN, timestamp=time.time())
return self._status_history[-1]

@property
def current_status(self):
return self.current_status_event.status

@property
def status_event_json(self):
status_event = {
"status": self.current_status_event.status.name,
"start_time": self.current_status_event.timestamp,
}
if self.current_status >= Status.COMPLETE and len(self._status_history) >= 2:
status_event["start_time"] = self._status_history[-2].timestamp
status_event["end_time"] = self.current_status_event.timestamp
return status_event
14 changes: 1 addition & 13 deletions sidecar/app/query/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,17 @@
import os
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import IntEnum, auto
from typing import TYPE_CHECKING, ClassVar, Optional

import loguru

from .command import Command
from .status import Status

if TYPE_CHECKING:
from .base import QueryTypeT


class Status(IntEnum):
UNKNOWN = auto()
STARTING = auto()
COMPILING = auto()
WAITING_TO_START = auto()
IN_PROGRESS = auto()
COMPLETE = auto()
KILLED = auto()
NOT_FOUND = auto()
CRASHED = auto()


@dataclass(kw_only=True)
class Step(ABC):
skip: bool = field(init=False, default=False)
Expand Down
Loading

0 comments on commit 771b88e

Please sign in to comment.