diff --git a/pydra/engine/submitter.py b/pydra/engine/submitter.py index 0f1e33ef59..9327e0604c 100644 --- a/pydra/engine/submitter.py +++ b/pydra/engine/submitter.py @@ -169,13 +169,22 @@ async def expand_workflow(self, wf, rerun=False): await asyncio.sleep(1) if ii > 60: blocked = _list_blocked_tasks(graph_copy) - get_runnable_tasks(graph_copy) + # get_runnable_tasks(graph_copy) # Uncomment to debug `get_runnable_tasks` raise Exception( "graph is not empty, but not able to get more tasks " "- something may have gone wrong when retrieving the results " - "of predecessor tasks caused by a file-system error or a bug " - "in the internal workflow logic.\n\nBlocked tasks\n-------------\n" - + "\n".join(blocked) + "of predecessor tasks. This could be caused by a file-system " + "error or a bug in the internal workflow logic, but is likely " + "to be caused by the hash of an upstream node being unstable." + " \n\nHash instability can be caused by an input of the node being " + "modified in place, or by psuedo-random ordering of `set` or " + "`frozenset` inputs (or nested attributes of inputs) in the hash " + "calculation. To ensure that sets are hashed consistently you can " + "you can try set the environment variable PYTHONHASHSEED=0 for " + "all processes, but it is best to try to identify where the set " + "objects are occurring and manually hash their sorted elements. " + "(or use list objects instead)" + "\n\nBlocked tasks\n-------------\n" + "\n".join(blocked) ) for task in tasks: # grab inputs if needed @@ -307,7 +316,7 @@ def _list_blocked_tasks(graph): matching_name.append( f"{saved_tsk.name} ({tsk_work_dir.name})" ) - blocking.append(pred, ", ".join(matching_name)) + blocking.append((pred, ", ".join(matching_name))) if blocking: blocked.append( f"\n{tsk.name} ({tsk.checksum}) is blocked by " diff --git a/pydra/engine/tests/test_submitter.py b/pydra/engine/tests/test_submitter.py index d1823f1fc9..49b10c2b26 100644 --- a/pydra/engine/tests/test_submitter.py +++ b/pydra/engine/tests/test_submitter.py @@ -1,6 +1,5 @@ from dateutil import parser import re -import shutil import subprocess as sp import time @@ -14,10 +13,11 @@ gen_basic_wf_with_threadcount_concurrent, ) from ..core import Workflow -from ..submitter import Submitter +from ..submitter import Submitter, get_runnable_tasks, _list_blocked_tasks from ... import mark from pathlib import Path from datetime import datetime +from pydra.engine.specs import LazyField @mark.task @@ -575,3 +575,43 @@ def test_sge_no_limit_maxthreads(tmpdir): out_job2_dict["start_time"][0], f"%a %b %d %H:%M:%S %Y" ) assert job_1_endtime > job_2_starttime + + +def test_wf_with_blocked_tasks(tmpdir): + wf = Workflow(name="wf_with_blocked_tasks", input_spec=["x"]) + wf.add(identity(name="taska", x=wf.lzin.x)) + wf.add(alter_input(name="taskb", x=wf.taska.lzout.out)) + wf.add(to_tuple(name="taskc", x=wf.taska.lzout.out, y=wf.taskb.lzout.out)) + wf.set_output([("out", wf.taskb.lzout.out)]) + + wf.inputs.x = A(1) + + wf.cache_dir = tmpdir + + with pytest.raises(Exception, match="graph is not empty,"): + with Submitter("serial") as sub: + sub(wf) + + +class A: + def __init__(self, a): + self.a = a + + def __hash__(self): + return hash(self.a) + + +@mark.task +def identity(x): + return x + + +@mark.task +def alter_input(x): + x.a = 2 + return x + + +@mark.task +def to_tuple(x, y): + return (x, y)