Skip to content

Commit

Permalink
check to make sure helpers need to be killed before sending kill sign…
Browse files Browse the repository at this point in the history
…al (#89)

* check to make sure helpers need to be killed before sending kill signal

* add test for new Status.from_json classmethod

* refactor repeated url building, move status/kill/finish methods to Helper class

* Update sidecar/app/helpers.py
  • Loading branch information
eriktaubeneck authored Sep 27, 2024
1 parent d3253a8 commit 3046a27
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 51 deletions.
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ py-modules = ["sidecar",]
profile = "black"
py_version=39
skip_glob=["server"]
extra_standard_library = ["tomllib"]


[tool.pylint.format]
max-line-length = "88"
Expand Down
69 changes: 67 additions & 2 deletions sidecar/app/helpers.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import tomllib
from dataclasses import dataclass
from enum import IntEnum
from json import JSONDecodeError
from pathlib import Path
from urllib.parse import ParseResult, urlparse
from urllib.parse import ParseResult, urlparse, urlunparse

import tomllib
import httpx
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey
from cryptography.x509 import load_pem_x509_certificate

from .query.step import Status


class Role(IntEnum):
COORDINATOR = 0
Expand All @@ -22,6 +26,67 @@ class Helper:
sidecar_url: ParseResult
public_key: EllipticCurvePublicKey

def query_status_url(self, query_id: str) -> str:
return str(
urlunparse(
self.sidecar_url._replace(
scheme="https", path=f"/start/{query_id}/status"
),
)
)

def query_finish_url(self, query_id: str) -> str:
return str(
urlunparse(
self.sidecar_url._replace(
scheme="https", path=f"/stop/finish/{query_id}"
),
)
)

def query_kill_url(self, query_id: str) -> str:
return str(
urlunparse(
self.sidecar_url._replace(
scheme="https", path=f"/stop/kill/{query_id}"
),
)
)

def get_current_query_status(self, query_id: str) -> Status:
try:
r = httpx.get(self.query_status_url(query_id))
except httpx.RequestError:
return Status.UNKNOWN
try:
j = r.json()
except JSONDecodeError:
return Status.UNKNOWN

return Status.from_json(j)

def kill_query(self, query_id: str) -> str:
status = self.get_current_query_status(query_id)
if status >= Status.COMPLETE:
return (
f"not sending kill signal. helper {self.role} "
f"already has status {status}"
)
r = httpx.post(self.query_kill_url(query_id))
return f"sent kill signal for query({query_id}) to helper {self.role}: {r.text}"

def finish_query(self, query_id: str) -> str:
status = self.get_current_query_status(query_id)
if status >= Status.COMPLETE:
return (
f"not sending finish signal. helper {self.role} "
f"already has status {status}"
)
r = httpx.post(self.query_finish_url(query_id))
return (
f"sent finish signal for query({query_id}) to helper {self.role}: {r.text}"
)


def load_helpers_from_network_config(network_config_path: Path) -> dict[Role, Helper]:
with network_config_path.open("rb") as f:
Expand Down
78 changes: 29 additions & 49 deletions sidecar/app/query/ipa.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,9 @@
from enum import StrEnum
from pathlib import Path
from typing import ClassVar
from urllib.parse import urlunparse

import httpx
import loguru

from ..helpers import Role
from ..local_paths import Paths
from ..settings import get_settings
from .base import Query
Expand All @@ -31,17 +28,9 @@ class IPAQuery(Query):
def send_kill_signals(self):
self.logger.info("sending kill signals")
settings = get_settings()
for helper in settings.helpers.values():
if helper.role == self.role:
continue
finish_url = urlunparse(
helper.sidecar_url._replace(
scheme="https", path=f"/stop/kill/{self.query_id}"
),
)

r = httpx.post(finish_url)
self.logger.info(f"sent post request: {r.text}")
for helper in settings.other_helpers:
response = helper.kill_query(self.query_id)
self.logger.info(response)

def crash(self):
super().crash()
Expand Down Expand Up @@ -248,32 +237,31 @@ def build_from_query(cls, query: IPAQuery):

def run(self):
settings = get_settings()
sidecar_urls = [
helper.sidecar_url
for helper in settings.helpers.values()
if helper.role != Role.COORDINATOR
]
for sidecar_url in sidecar_urls:
url = urlunparse(
sidecar_url._replace(
scheme="https", path=f"/start/{self.query_id}/status"
),
)
for helper in settings.other_helpers:
max_unknonwn_status_wait_time = 100
current_unknown_status_wait_time = 0
loop_wait_time = 1
while True:
r = httpx.get(url).json()
status = r.get("status")
status = helper.get_current_query_status(self.query_id)
match status:
case Status.IN_PROGRESS.name:
case Status.IN_PROGRESS:
break
case Status.KILLED.name:
self.success = False
return
case Status.NOT_FOUND.name:
self.success = False
return
case Status.CRASHED.name:
case Status.KILLED | Status.NOT_FOUND | Status.CRASHED:
self.success = False
return
case Status.STARTING | Status.COMPILING | Status.WAITING_TO_START:
# keep waiting while it's in a startup state
continue
case Status.UNKNOWN | Status.NOT_FOUND:
# eventually fail if the status is unknown or not found
# for ~100 seconds
current_unknown_status_wait_time += loop_wait_time
if (
current_unknown_status_wait_time
>= max_unknonwn_status_wait_time
):
self.success = False
return

time.sleep(1)
time.sleep(3) # allow enough time for the command to start
Expand Down Expand Up @@ -352,24 +340,16 @@ class IPACoordinatorQuery(IPAQuery):
IPACoordinatorStartStep,
]

def send_terminate_signals(self):
self.logger.info("sending terminate signals")
def send_finish_signals(self):
self.logger.info("sending finish signals")
settings = get_settings()
for helper in settings.helpers.values():
if helper.role == self.role:
continue
finish_url = urlunparse(
helper.sidecar_url._replace(
scheme="https", path=f"/stop/finish/{self.query_id}"
),
)

r = httpx.post(finish_url)
self.logger.info(f"sent post request: {finish_url}: {r.text}")
for helper in settings.other_helpers:
resp = helper.finish_query(self.query_id)
self.logger.info(resp)

def finish(self):
super().finish()
self.send_terminate_signals()
self.send_finish_signals()


@dataclass(kw_only=True)
Expand Down
8 changes: 8 additions & 0 deletions sidecar/app/query/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ class Status(IntEnum):
KILLED = auto()
CRASHED = auto()

@classmethod
def from_json(cls, response: dict[str, str]):
status_str = response.get("status", "")
try:
return cls[status_str]
except KeyError:
return cls.UNKNOWN


StatusChangeEvent = NamedTuple(
"StatusChangeEvent", [("status", Status), ("timestamp", float)]
Expand Down
4 changes: 4 additions & 0 deletions sidecar/app/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ def helper(self) -> Helper:
def helpers(self) -> dict[Role, Helper]:
return self._helpers

@property
def other_helpers(self) -> list[Helper]:
return [helper for helper in self._helpers.values() if helper.role != self.role]

@property
def status_dir_path(self) -> Path:
return self.root_path / Path("status")
Expand Down
14 changes: 14 additions & 0 deletions sidecar/tests/app/query/test_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,17 @@ def test_status_history_status_event_json(
"start_time": now,
"end_time": now2,
}


@pytest.mark.parametrize(
"json_input,expected_status",
[
({"status": "STARTING"}, Status.STARTING),
({"status": "UNKNOWN"}, Status.UNKNOWN),
({"status": "not-a-status"}, Status.UNKNOWN),
({}, Status.UNKNOWN), # Empty JSON
({"other_key": "value"}, Status.UNKNOWN), # Missing "status" key
],
)
def test_status_from_json(json_input, expected_status):
assert Status.from_json(json_input) == expected_status

0 comments on commit 3046a27

Please sign in to comment.