Skip to content

Commit

Permalink
feat(manager): call shutdown endpoint before workflow stop (#559)
Browse files Browse the repository at this point in the history
Call the shutdown endpoint of reana-job-controller before stopping a
running workflow, so that running jobs are correctly cleaned up.

Closes #252
Closes #546
  • Loading branch information
mdonadoni committed Feb 1, 2024
1 parent 5992034 commit 719fa37
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 25 deletions.
3 changes: 3 additions & 0 deletions reana_workflow_controller/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@
JOB_CONTROLLER_CONTAINER_PORT = 5000
"""Default container port for REANA Job Controller sidecar."""

JOB_CONTROLLER_SHUTDOWN_ENDPOINT = "/shutdown"
"""Endpoint of reana-job-controller used to stop all the jobs."""

JOB_CONTROLLER_NAME = "job-controller"
"""Default job controller container name."""

Expand Down
23 changes: 11 additions & 12 deletions reana_workflow_controller/workflow_run_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@

from reana_workflow_controller.config import ( # isort:skip
IMAGE_PULL_SECRETS,
JOB_CONTROLLER_CONTAINER_PORT,
JOB_CONTROLLER_SHUTDOWN_ENDPOINT,
REANA_KUBERNETES_JOBS_MAX_USER_MEMORY_LIMIT,
REANA_KUBERNETES_JOBS_MEMORY_LIMIT,
REANA_KUBERNETES_JOBS_TIMEOUT_LIMIT,
Expand Down Expand Up @@ -438,18 +440,6 @@ def _delete_k8s_job_quiet(self, job_name):

def stop_batch_workflow_run(self):
"""Stop a batch workflow run along with all its dependent jobs."""
jobs_to_delete = self.get_workflow_running_jobs()

for job in jobs_to_delete:
job_id = job.backend_job_id
if self._delete_k8s_job_quiet(job_id):
job.status = JobStatus.stopped
Session.add(job)

# Commit the session once all the jobs have been processed.
Session.commit()

# Delete the workflow run batch job
workflow_run_name = self._workflow_run_name_generator("batch")
self._delete_k8s_job_quiet(workflow_run_name)

Expand Down Expand Up @@ -575,6 +565,15 @@ def _create_job_spec(
command=["/bin/bash", "-c"],
args=self._create_job_controller_startup_cmd(user),
ports=[],
# Make sure that all the jobs are stopped before the deletion of the run-batch pod
lifecycle=client.V1Lifecycle(
pre_stop=client.V1Handler(
http_get=client.V1HTTPGetAction(
port=JOB_CONTROLLER_CONTAINER_PORT,
path=JOB_CONTROLLER_SHUTDOWN_ENDPOINT,
)
)
),
)

job_controller_env_vars.extend(
Expand Down
20 changes: 7 additions & 13 deletions tests/test_workflow_run_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,25 +110,19 @@ def test_stop_workflow_backend_only_kubernetes(
"""Test deletion of workflows with only Kubernetes based jobs."""
workflow = sample_serial_workflow_in_db
workflow.status = RunStatus.running
workflow_jobs = add_kubernetes_jobs_to_workflow(workflow)
backend_job_ids = [job.backend_job_id for job in workflow_jobs]
with patch(
"reana_workflow_controller.workflow_run_manager."
"current_k8s_batchv1_api_client"
) as api_client:
kwrm = KubernetesWorkflowRunManager(workflow)
kwrm.stop_batch_workflow_run()
for delete_call in api_client.delete_namespaced_job.call_args_list:
job_id = delete_call.args[0]
if job_id in backend_job_ids:
del backend_job_ids[backend_job_ids.index(job_id)]
# Check that the status of the job with that ID in the database is set to stopped
assert (
Job.query.filter_by(backend_job_id=job_id).one().status
== JobStatus.stopped
)

assert not backend_job_ids
# jobs are deleted by reana-job-controller, so this should be called
# only once to delete the run-batch pod
api_client.delete_namespaced_job.assert_called_once()
assert (
api_client.delete_namespaced_job.call_args.args[0]
== f"reana-run-batch-{workflow.id_}"
)


def test_interactive_session_closure(sample_serial_workflow_in_db, session):
Expand Down

0 comments on commit 719fa37

Please sign in to comment.