Skip to content

Commit

Permalink
status: retrieve job status from Job table
Browse files Browse the repository at this point in the history
* Reads job status from Job table to get the running jobs
  as the `job_progress` in the Workflow table is not correctly
  updated, finished jobs are reported as running creating an exception
  while trying to delete a job which doesn't exist (for more see
  #299)
  (closes reanahub/reana#266).

* Provides a user understandable message (closes reanahub/reana#266).
  • Loading branch information
Diego Rodriguez committed Feb 12, 2020
1 parent 9d6ec7d commit 1fe4606
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 10 deletions.
4 changes: 4 additions & 0 deletions reana_workflow_controller/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,7 @@ class REANAExternalCallError(Exception):

class REANAWorkflowStatusError(Exception):
"""Error when trying to change workflow status."""


class REANAWorkflowStopError(Exception):
"""Error when trying to stop a workflow."""
30 changes: 21 additions & 9 deletions reana_workflow_controller/workflow_run_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@
create_cvmfs_storage_class, format_cmd)
from reana_db.config import SQLALCHEMY_DATABASE_URI
from reana_db.database import Session
from reana_db.models import Job
from reana_db.models import Job, JobStatus

from reana_workflow_controller.errors import (REANAInteractiveSessionError,
REANAWorkflowControllerError)
REANAWorkflowControllerError,
REANAWorkflowStopError)
from reana_workflow_controller.k8s import (build_interactive_k8s_objects,
delete_k8s_ingress_object,
delete_k8s_objects_if_exist,
Expand Down Expand Up @@ -221,9 +222,9 @@ def _workflow_engine_env_vars(self):
def get_workflow_running_jobs_as_backend_ids(self):
"""Get all running jobs of a workflow as backend job IDs."""
session = Session.object_session(self.workflow)
job_list = self.workflow.job_progress.get(
'running', {}).get('job_ids', [])
rows = session.query(Job).filter(Job.id_.in_(job_list))
rows = session.query(Job).filter_by(
workflow_uuid=str(self.workflow.id_),
status=JobStatus.running)
backend_ids = [j.backend_job_id for j in rows.all()]
return backend_ids

Expand Down Expand Up @@ -341,11 +342,22 @@ def stop_batch_workflow_run(self):
workflow_run_name = self._workflow_run_name_generator('batch')
to_delete = self.get_workflow_running_jobs_as_backend_ids() + \
[workflow_run_name]
error = False
for job in to_delete:
current_k8s_batchv1_api_client.delete_namespaced_job(
job,
KubernetesWorkflowRunManager.default_namespace,
body=V1DeleteOptions(propagation_policy='Background'))
try:
current_k8s_batchv1_api_client.delete_namespaced_job(
job,
KubernetesWorkflowRunManager.default_namespace,
body=V1DeleteOptions(propagation_policy='Background'))
except ApiException:
logging.error(f'Error while trying to stop {self.workflow.id_}'
f': Kubernetes job {job} could not be deleted.',
exc_info=True)
error = True
continue
if error:
raise REANAWorkflowStopError(
f'Workflow {self.workflow.id_} could not be stopped.')

def _create_job_spec(self, name, command=None, image=None,
env_vars=None, overwrite_input_parameters=None,
Expand Down
3 changes: 2 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ def add_kubernetes_jobs_to_workflow_callable(workflow, backend=None,
backend_job_id = uuid.uuid4()
job = Job(id_=reana_job_id,
backend_job_id=str(backend_job_id),
workflow_uuid=workflow.id_)
workflow_uuid=workflow.id_,
status=JobStatus.running)
progress_dict[status]['job_ids'].append(str(job.id_))
progress_dict[status]['total'] += 1
session.add(job)
Expand Down

0 comments on commit 1fe4606

Please sign in to comment.