Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX: Type error in blocking task list, detailed exception message #623

Merged
merged 5 commits into from
Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions pydra/engine/submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,22 @@ async def expand_workflow(self, wf, rerun=False):
await asyncio.sleep(1)
if ii > 60:
blocked = _list_blocked_tasks(graph_copy)
get_runnable_tasks(graph_copy)
# get_runnable_tasks(graph_copy) # Uncomment to debug `get_runnable_tasks`
raise Exception(
"graph is not empty, but not able to get more tasks "
"- something may have gone wrong when retrieving the results "
"of predecessor tasks caused by a file-system error or a bug "
"in the internal workflow logic.\n\nBlocked tasks\n-------------\n"
+ "\n".join(blocked)
"of predecessor tasks. This could be caused by a file-system "
"error or a bug in the internal workflow logic, but is likely "
"to be caused by the hash of an upstream node being unstable."
" \n\nHash instability can be caused by an input of the node being "
"modified in place, or by psuedo-random ordering of `set` or "
"`frozenset` inputs (or nested attributes of inputs) in the hash "
"calculation. To ensure that sets are hashed consistently you can "
"you can try set the environment variable PYTHONHASHSEED=0 for "
"all processes, but it is best to try to identify where the set "
"objects are occurring and manually hash their sorted elements. "
"(or use list objects instead)"
"\n\nBlocked tasks\n-------------\n" + "\n".join(blocked)
)
for task in tasks:
# grab inputs if needed
Expand Down Expand Up @@ -307,7 +316,7 @@ def _list_blocked_tasks(graph):
matching_name.append(
f"{saved_tsk.name} ({tsk_work_dir.name})"
)
blocking.append(pred, ", ".join(matching_name))
blocking.append((pred, ", ".join(matching_name)))
if blocking:
blocked.append(
f"\n{tsk.name} ({tsk.checksum}) is blocked by "
Expand Down
44 changes: 42 additions & 2 deletions pydra/engine/tests/test_submitter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from dateutil import parser
import re
import shutil
import subprocess as sp
import time

Expand All @@ -14,10 +13,11 @@
gen_basic_wf_with_threadcount_concurrent,
)
from ..core import Workflow
from ..submitter import Submitter
from ..submitter import Submitter, get_runnable_tasks, _list_blocked_tasks
from ... import mark
from pathlib import Path
from datetime import datetime
from pydra.engine.specs import LazyField


@mark.task
Expand Down Expand Up @@ -575,3 +575,43 @@ def test_sge_no_limit_maxthreads(tmpdir):
out_job2_dict["start_time"][0], f"%a %b %d %H:%M:%S %Y"
)
assert job_1_endtime > job_2_starttime


def test_wf_with_blocked_tasks(tmpdir):
wf = Workflow(name="wf_with_blocked_tasks", input_spec=["x"])
wf.add(identity(name="taska", x=wf.lzin.x))
wf.add(alter_input(name="taskb", x=wf.taska.lzout.out))
wf.add(to_tuple(name="taskc", x=wf.taska.lzout.out, y=wf.taskb.lzout.out))
wf.set_output([("out", wf.taskb.lzout.out)])

wf.inputs.x = A(1)

wf.cache_dir = tmpdir

with pytest.raises(Exception, match="graph is not empty,"):
with Submitter("serial") as sub:
sub(wf)


class A:
def __init__(self, a):
self.a = a

def __hash__(self):
return hash(self.a)


@mark.task
def identity(x):
return x


@mark.task
def alter_input(x):
x.a = 2
Copy link
Contributor

@effigies effigies Mar 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... Moving comment that grew too large to the main thread.

return x


@mark.task
def to_tuple(x, y):
return (x, y)