diff --git a/changelog.d/20240715_170552_rra_DM_45138.md b/changelog.d/20240715_170552_rra_DM_45138.md new file mode 100644 index 0000000..cd26495 --- /dev/null +++ b/changelog.d/20240715_170552_rra_DM_45138.md @@ -0,0 +1,3 @@ +### New features + +- Worker pods now wait for 30 seconds (UWS database workers) or 55 seconds (cutout workers) for jobs to finish on shutdown before cancelling them. diff --git a/src/vocutouts/config.py b/src/vocutouts/config.py index b42a015..6e46106 100644 --- a/src/vocutouts/config.py +++ b/src/vocutouts/config.py @@ -73,6 +73,15 @@ class Config(BaseSettings): None, title="Password for UWS job database" ) + grace_period: timedelta = Field( + timedelta(seconds=30), + title="Grace period for jobs", + description=( + "How long to wait for a job to finish on shutdown before" + " canceling it" + ), + ) + lifetime: timedelta = Field( timedelta(days=7), title="Lifetime of cutout job results" ) @@ -194,12 +203,12 @@ def _parse_timedelta(cls, v: str | float | timedelta) -> float | timedelta: except ValueError: return parse_timedelta(v) - @field_validator("timeout", mode="before") + @field_validator("grace_period", "timeout", mode="before") @classmethod def _parse_timedelta_seconds( cls, v: str | float | timedelta ) -> float | timedelta: - """Support human-readable timedeltas.""" + """Support number of seconds as a string.""" if not isinstance(v, str): return v return int(v) diff --git a/src/vocutouts/uws/app.py b/src/vocutouts/uws/app.py index bce84c3..b1c23cb 100644 --- a/src/vocutouts/uws/app.py +++ b/src/vocutouts/uws/app.py @@ -115,6 +115,7 @@ async def shutdown(ctx: dict[Any, Any]) -> None: ], functions=[uws_job_started, uws_job_completed], redis_settings=self._config.arq_redis_settings, + job_completion_wait=UWS_DATABASE_TIMEOUT, job_timeout=UWS_DATABASE_TIMEOUT, max_jobs=10, queue_name=UWS_QUEUE_NAME, diff --git a/src/vocutouts/uws/constants.py b/src/vocutouts/uws/constants.py index 323c502..ce30cff 100644 --- a/src/vocutouts/uws/constants.py +++ b/src/vocutouts/uws/constants.py @@ -21,7 +21,10 @@ """How long to wait for a job to stop before giving up.""" UWS_DATABASE_TIMEOUT = timedelta(seconds=30) -"""Timeout on workers that update the UWS database.""" +"""Timeout on workers that update the UWS database. + +This should match the default Kubernetes grace period for a pod to shut down. +""" UWS_EXPIRE_JOBS_SCHEDULE = Options( month=None, @@ -35,4 +38,7 @@ """Schedule for job expiration cron job, as `arq.cron.cron` parameters.""" UWS_QUEUE_NAME = "uws:queue" -"""Name of the arq queue for internal UWS messages.""" +"""Name of the arq queue for internal UWS messages. + +Must match ``_UWS_QUEUE_NAME`` in :mod:`vocutouts.uws.uwsworker`. +""" diff --git a/src/vocutouts/uws/uwsworker.py b/src/vocutouts/uws/uwsworker.py index 2b0b493..93103b5 100644 --- a/src/vocutouts/uws/uwsworker.py +++ b/src/vocutouts/uws/uwsworker.py @@ -25,9 +25,14 @@ from safir.arq import ArqMode, ArqQueue, MockArqQueue, RedisArqQueue from structlog.stdlib import BoundLogger -from .constants import UWS_QUEUE_NAME - T = TypeVar("T", bound="BaseModel") +"""Type for job parameters.""" + +_UWS_QUEUE_NAME = "uws:queue" +"""Name of the arq queue for internal UWS messages. + +Must match `~vocutouts.uws.constants.UWS_QUEUE_NAME`. +""" __all__ = [ "WorkerConfig", @@ -40,6 +45,7 @@ "WorkerTimeoutError", "WorkerTransientError", "WorkerUsageError", + "T", "build_worker", ] @@ -57,6 +63,13 @@ class WorkerConfig(Generic[T]): arq_queue_password: str | None """Password of the Redis arq queue.""" + grace_period: timedelta + """How long to wait for workers to shut down before cancelling them. + + This should be set to somewhat less than the Kubernetes grace period for + terminating the pod (about five seconds less, for example). + """ + parameters_class: type[T] """Class of the parameters to pass to the backend worker.""" @@ -99,6 +112,9 @@ class returned by other functions. redis_settings: RedisSettings """Redis configuration for arq.""" + job_completion_wait: SecondsTimedelta + """How long to wait for jobs to complete before cancelling them.""" + job_timeout: SecondsTimedelta """Maximum timeout for all jobs.""" @@ -338,10 +354,10 @@ async def startup(ctx: dict[Any, Any]) -> None: if config.arq_mode == ArqMode.production: settings = config.arq_redis_settings arq: ArqQueue = await RedisArqQueue.initialize( - settings, default_queue_name=UWS_QUEUE_NAME + settings, default_queue_name=_UWS_QUEUE_NAME ) else: - arq = MockArqQueue(default_queue_name=UWS_QUEUE_NAME) + arq = MockArqQueue(default_queue_name=_UWS_QUEUE_NAME) ctx["arq"] = arq ctx["logger"] = logger @@ -399,6 +415,7 @@ async def run( return WorkerSettings( functions=[func(run, name=worker.__qualname__)], redis_settings=config.arq_redis_settings, + job_completion_wait=config.grace_period, job_timeout=config.timeout, max_jobs=1, allow_abort_jobs=True, diff --git a/src/vocutouts/workers/cutout.py b/src/vocutouts/workers/cutout.py index 4649098..d728d88 100644 --- a/src/vocutouts/workers/cutout.py +++ b/src/vocutouts/workers/cutout.py @@ -195,12 +195,19 @@ def cutout( log_level=os.getenv("CUTOUT_LOG_LEVEL", "INFO"), ) +# Provide five seconds of time for arq to shut the worker down cleanly after +# cancelling any running job. +_grace_period = timedelta(seconds=int(os.environ["CUTOUT_GRACE_PERIOD"])) +if _grace_period > timedelta(seconds=5): + _grace_period -= timedelta(seconds=5) + WorkerSettings = build_worker( cutout, WorkerConfig( arq_mode=ArqMode.production, arq_queue_url=os.environ["CUTOUT_ARQ_QUEUE_URL"], arq_queue_password=os.getenv("CUTOUT_ARQ_QUEUE_PASSWORD"), + grace_period=_grace_period, parameters_class=WorkerCutout, timeout=timedelta(seconds=int(os.environ["CUTOUT_TIMEOUT"])), ), diff --git a/tests/uws/workers_test.py b/tests/uws/workers_test.py index 5f10496..1f02546 100644 --- a/tests/uws/workers_test.py +++ b/tests/uws/workers_test.py @@ -20,7 +20,7 @@ from vocutouts.uws.app import UWSApplication from vocutouts.uws.config import UWSConfig -from vocutouts.uws.constants import UWS_QUEUE_NAME +from vocutouts.uws.constants import UWS_DATABASE_TIMEOUT, UWS_QUEUE_NAME from vocutouts.uws.dependencies import UWSFactory from vocutouts.uws.models import ErrorCode, UWSJobParameter, UWSJobResult from vocutouts.uws.storage import JobStore @@ -60,6 +60,7 @@ async def test_build_worker( f"/{redis_settings.database}" ), arq_queue_password=redis_settings.password, + grace_period=timedelta(seconds=60), parameters_class=SimpleParameters, timeout=uws_config.execution_duration, ) @@ -70,6 +71,7 @@ async def test_build_worker( assert settings.functions[0].name == hello.__qualname__ assert settings.redis_settings == uws_config.arq_redis_settings assert settings.allow_abort_jobs + assert settings.job_completion_wait == timedelta(seconds=60) assert settings.queue_name == default_queue_name assert settings.on_startup assert settings.on_shutdown @@ -133,6 +135,7 @@ async def test_timeout(uws_config: UWSConfig, logger: BoundLogger) -> None: f"/{redis_settings.database}" ), arq_queue_password=redis_settings.password, + grace_period=timedelta(seconds=60), parameters_class=SimpleParameters, timeout=uws_config.execution_duration, ) @@ -228,6 +231,7 @@ async def test_build_uws_worker( assert callable(expire_jobs) assert settings.redis_settings == uws_config.arq_redis_settings assert not settings.allow_abort_jobs + assert settings.job_completion_wait == UWS_DATABASE_TIMEOUT assert settings.queue_name == UWS_QUEUE_NAME assert settings.on_startup assert settings.on_shutdown