Skip to content

Commit

Permalink
Merge pull request #213 from lsst-sqre/tickets/DM-45138
Browse files Browse the repository at this point in the history
DM-45138: Wait for jobs to finish before cancelling them
  • Loading branch information
rra authored Jul 16, 2024
2 parents defb71d + afdd195 commit 6910894
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 9 deletions.
3 changes: 3 additions & 0 deletions changelog.d/20240715_170552_rra_DM_45138.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
### New features

- Worker pods now wait for 30 seconds (UWS database workers) or 55 seconds (cutout workers) for jobs to finish on shutdown before cancelling them.
13 changes: 11 additions & 2 deletions src/vocutouts/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,15 @@ class Config(BaseSettings):
None, title="Password for UWS job database"
)

grace_period: timedelta = Field(
timedelta(seconds=30),
title="Grace period for jobs",
description=(
"How long to wait for a job to finish on shutdown before"
" canceling it"
),
)

lifetime: timedelta = Field(
timedelta(days=7), title="Lifetime of cutout job results"
)
Expand Down Expand Up @@ -194,12 +203,12 @@ def _parse_timedelta(cls, v: str | float | timedelta) -> float | timedelta:
except ValueError:
return parse_timedelta(v)

@field_validator("timeout", mode="before")
@field_validator("grace_period", "timeout", mode="before")
@classmethod
def _parse_timedelta_seconds(
cls, v: str | float | timedelta
) -> float | timedelta:
"""Support human-readable timedeltas."""
"""Support number of seconds as a string."""
if not isinstance(v, str):
return v
return int(v)
Expand Down
1 change: 1 addition & 0 deletions src/vocutouts/uws/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ async def shutdown(ctx: dict[Any, Any]) -> None:
],
functions=[uws_job_started, uws_job_completed],
redis_settings=self._config.arq_redis_settings,
job_completion_wait=UWS_DATABASE_TIMEOUT,
job_timeout=UWS_DATABASE_TIMEOUT,
max_jobs=10,
queue_name=UWS_QUEUE_NAME,
Expand Down
10 changes: 8 additions & 2 deletions src/vocutouts/uws/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
"""How long to wait for a job to stop before giving up."""

UWS_DATABASE_TIMEOUT = timedelta(seconds=30)
"""Timeout on workers that update the UWS database."""
"""Timeout on workers that update the UWS database.
This should match the default Kubernetes grace period for a pod to shut down.
"""

UWS_EXPIRE_JOBS_SCHEDULE = Options(
month=None,
Expand All @@ -35,4 +38,7 @@
"""Schedule for job expiration cron job, as `arq.cron.cron` parameters."""

UWS_QUEUE_NAME = "uws:queue"
"""Name of the arq queue for internal UWS messages."""
"""Name of the arq queue for internal UWS messages.
Must match ``_UWS_QUEUE_NAME`` in :mod:`vocutouts.uws.uwsworker`.
"""
25 changes: 21 additions & 4 deletions src/vocutouts/uws/uwsworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,14 @@
from safir.arq import ArqMode, ArqQueue, MockArqQueue, RedisArqQueue
from structlog.stdlib import BoundLogger

from .constants import UWS_QUEUE_NAME

T = TypeVar("T", bound="BaseModel")
"""Type for job parameters."""

_UWS_QUEUE_NAME = "uws:queue"
"""Name of the arq queue for internal UWS messages.
Must match `~vocutouts.uws.constants.UWS_QUEUE_NAME`.
"""

__all__ = [
"WorkerConfig",
Expand All @@ -40,6 +45,7 @@
"WorkerTimeoutError",
"WorkerTransientError",
"WorkerUsageError",
"T",
"build_worker",
]

Expand All @@ -57,6 +63,13 @@ class WorkerConfig(Generic[T]):
arq_queue_password: str | None
"""Password of the Redis arq queue."""

grace_period: timedelta
"""How long to wait for workers to shut down before cancelling them.
This should be set to somewhat less than the Kubernetes grace period for
terminating the pod (about five seconds less, for example).
"""

parameters_class: type[T]
"""Class of the parameters to pass to the backend worker."""

Expand Down Expand Up @@ -99,6 +112,9 @@ class returned by other functions.
redis_settings: RedisSettings
"""Redis configuration for arq."""

job_completion_wait: SecondsTimedelta
"""How long to wait for jobs to complete before cancelling them."""

job_timeout: SecondsTimedelta
"""Maximum timeout for all jobs."""

Expand Down Expand Up @@ -338,10 +354,10 @@ async def startup(ctx: dict[Any, Any]) -> None:
if config.arq_mode == ArqMode.production:
settings = config.arq_redis_settings
arq: ArqQueue = await RedisArqQueue.initialize(
settings, default_queue_name=UWS_QUEUE_NAME
settings, default_queue_name=_UWS_QUEUE_NAME
)
else:
arq = MockArqQueue(default_queue_name=UWS_QUEUE_NAME)
arq = MockArqQueue(default_queue_name=_UWS_QUEUE_NAME)

ctx["arq"] = arq
ctx["logger"] = logger
Expand Down Expand Up @@ -399,6 +415,7 @@ async def run(
return WorkerSettings(
functions=[func(run, name=worker.__qualname__)],
redis_settings=config.arq_redis_settings,
job_completion_wait=config.grace_period,
job_timeout=config.timeout,
max_jobs=1,
allow_abort_jobs=True,
Expand Down
7 changes: 7 additions & 0 deletions src/vocutouts/workers/cutout.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,19 @@ def cutout(
log_level=os.getenv("CUTOUT_LOG_LEVEL", "INFO"),
)

# Provide five seconds of time for arq to shut the worker down cleanly after
# cancelling any running job.
_grace_period = timedelta(seconds=int(os.environ["CUTOUT_GRACE_PERIOD"]))
if _grace_period > timedelta(seconds=5):
_grace_period -= timedelta(seconds=5)

WorkerSettings = build_worker(
cutout,
WorkerConfig(
arq_mode=ArqMode.production,
arq_queue_url=os.environ["CUTOUT_ARQ_QUEUE_URL"],
arq_queue_password=os.getenv("CUTOUT_ARQ_QUEUE_PASSWORD"),
grace_period=_grace_period,
parameters_class=WorkerCutout,
timeout=timedelta(seconds=int(os.environ["CUTOUT_TIMEOUT"])),
),
Expand Down
6 changes: 5 additions & 1 deletion tests/uws/workers_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from vocutouts.uws.app import UWSApplication
from vocutouts.uws.config import UWSConfig
from vocutouts.uws.constants import UWS_QUEUE_NAME
from vocutouts.uws.constants import UWS_DATABASE_TIMEOUT, UWS_QUEUE_NAME
from vocutouts.uws.dependencies import UWSFactory
from vocutouts.uws.models import ErrorCode, UWSJobParameter, UWSJobResult
from vocutouts.uws.storage import JobStore
Expand Down Expand Up @@ -60,6 +60,7 @@ async def test_build_worker(
f"/{redis_settings.database}"
),
arq_queue_password=redis_settings.password,
grace_period=timedelta(seconds=60),
parameters_class=SimpleParameters,
timeout=uws_config.execution_duration,
)
Expand All @@ -70,6 +71,7 @@ async def test_build_worker(
assert settings.functions[0].name == hello.__qualname__
assert settings.redis_settings == uws_config.arq_redis_settings
assert settings.allow_abort_jobs
assert settings.job_completion_wait == timedelta(seconds=60)
assert settings.queue_name == default_queue_name
assert settings.on_startup
assert settings.on_shutdown
Expand Down Expand Up @@ -133,6 +135,7 @@ async def test_timeout(uws_config: UWSConfig, logger: BoundLogger) -> None:
f"/{redis_settings.database}"
),
arq_queue_password=redis_settings.password,
grace_period=timedelta(seconds=60),
parameters_class=SimpleParameters,
timeout=uws_config.execution_duration,
)
Expand Down Expand Up @@ -228,6 +231,7 @@ async def test_build_uws_worker(
assert callable(expire_jobs)
assert settings.redis_settings == uws_config.arq_redis_settings
assert not settings.allow_abort_jobs
assert settings.job_completion_wait == UWS_DATABASE_TIMEOUT
assert settings.queue_name == UWS_QUEUE_NAME
assert settings.on_startup
assert settings.on_shutdown
Expand Down

0 comments on commit 6910894

Please sign in to comment.