-
Notifications
You must be signed in to change notification settings - Fork 59
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RF: Submitter logic. #524
RF: Submitter logic. #524
Conversation
for more information, see https://pre-commit.ci
for more information, see https://pre-commit.ci
Codecov Report
@@ Coverage Diff @@
## master #524 +/- ##
==========================================
- Coverage 79.04% 76.98% -2.07%
==========================================
Files 20 20
Lines 4348 4279 -69
Branches 1231 1204 -27
==========================================
- Hits 3437 3294 -143
- Misses 720 799 +79
+ Partials 191 186 -5
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went through and left some thought / talking points - this is ready for a review.
@@ -1227,21 +1232,31 @@ def create_dotfile(self, type="simple", export=None, name=None): | |||
formatted_dot.append(self.graph.export_graph(dotfile=dotfile, ext=ext)) | |||
return dotfile, formatted_dot | |||
|
|||
def _connect_and_propagate_to_tasks(self): | |||
def _connect_and_propagate_to_tasks( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're doing the "iterate through the graph, assign connections, and do something to each node based on a workflow attribute" in a few places, so let's methodize it.
self.worker = SGEWorker(**kwargs) | ||
else: | ||
raise Exception(f"plugin {self.plugin} not available") | ||
try: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seemed a little cleaner to just attempt to index the plugin here
if is_workflow(runnable): | ||
# resetting all connections with LazyFields | ||
runnable._reset() | ||
self.loop.run_until_complete(self.submit_from_call(runnable, rerun)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trying to keep task.run()
successor as simple as possible
DaskWorker, | ||
SGEWorker, | ||
) | ||
from .workers import WORKERS | ||
from .core import is_workflow | ||
from .helpers import get_open_loop, load_and_run_async |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
given that these functions are only used here, it might help with readability just having them here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree
@@ -110,41 +98,37 @@ async def submit(self, runnable, wait=False, rerun=False): | |||
Coroutines for :class:`~pydra.engine.core.TaskBase` execution. | |||
|
|||
""" | |||
if runnable.plugin and runnable.plugin != self.plugin: | |||
raise NotImplementedError() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was removed since it wasn't being tested.
original:
# dj: this is not tested!!! TODO
await self.worker.run_el(workflow, rerun=rerun)
# job has no state anymore | ||
futures.add( | ||
# This unpickles and runs workflow - why are we pickling? | ||
asyncio.create_task(load_and_run_async(task_pkl, sidx, self, rerun)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this bit is still excessively confusing, and could really benefit from a rework / simplification. Worker.run_el
should have a streamlined behavior - currently it handles two pathways (state vs stateless)
if self.task_rerun and self.propagate_rerun: | ||
task.task_rerun = self.task_rerun | ||
if propagate_rerun: | ||
task.task_rerun = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if this should be always True
, even if self.task_rerun
is False
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it should never be False, since we call this as:
self._connect_and_propagate_to_tasks(
propagate_rerun=self.task_rerun and self.propagate_rerun
)
@mgxd - thank you a lot! Any idea one of the slurm test is failing? (seems to be consistent) If not, I can try to debug tomorrow |
I haven't looked into the SLURM fail too much, but it seems like it is submitting each FunctionTask within the workflow through SLURM directly, instead CF (which I think it was doing before?). |
…lugin than submitter
adding one more case to submit_from_call
ok, I'm merging this for now, unless someone stops me We can have another round of refactoring in another PR at some point. Thanks @mgxd |
A quick pass to try and clean up some of the logic in the
Submitter
class. This can definitely be reduced further, but hopefully this starts with the untangling.