Skip to content

Commit

Permalink
Add cause for WorkQueueTaskFailure if that exception exists (#2910)
Browse files Browse the repository at this point in the history
  • Loading branch information
benclifford authored Oct 23, 2023
1 parent 65cdfb1 commit 9ed5f5e
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 5 deletions.
6 changes: 4 additions & 2 deletions parsl/executors/workqueue/errors.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Optional

from parsl.errors import ParslError
from parsl.app.errors import AppException

Expand All @@ -7,10 +9,10 @@ class WorkQueueTaskFailure(AppException):
Contains:
reason(string)
status(int)
status(optional exception)
"""

def __init__(self, reason, status):
def __init__(self, reason: str, status: Optional[Exception]):
self.reason = reason
self.status = status

Expand Down
12 changes: 9 additions & 3 deletions parsl/executors/workqueue/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,11 @@
'id category cores memory disk gpus priority running_time_min env_pkg map_file function_file result_file input_files output_files')

# Support structure to communicate final status of work queue tasks to parsl
# result is only valid if result_received is True
# reason and status are only valid if result_received is False
# if result_received is True:
# result is the result
# if result_received is False:
# reason and status are only valid if result_received is False
# result is either None or an exception raised while looking for a result
WqTaskToParsl = namedtuple('WqTaskToParsl', 'id result_received result reason status')

# Support structure to report parsl filenames to work queue.
Expand Down Expand Up @@ -731,7 +734,10 @@ def _collect_work_queue_results(self):
else:
# If there are no results, then the task failed according to one of
# work queue modes, such as resource exhaustion.
future.set_exception(WorkQueueTaskFailure(task_report.reason, task_report.result))
ex = WorkQueueTaskFailure(task_report.reason, task_report.result)
if task_report.result is not None:
ex.__cause__ = task_report.result
future.set_exception(ex)
finally:
logger.debug("Marking all outstanding tasks as failed")
logger.debug("Acquiring tasks_lock")
Expand Down

0 comments on commit 9ed5f5e

Please sign in to comment.