Skip to content

[FIX] MultiProc starting workers at dubious wd #2368

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jan 19, 2018
1 change: 1 addition & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
Upcoming release (0.14.1)
=========================

* FIX: MultiProc starting workers at dubious wd (https://github.com/nipy/nipype/pull/2368)
* REF+FIX: Move BIDSDataGrabber to `interfaces.io` + fix correct default behavior (https://github.com/nipy/nipype/pull/2336)
* ENH: Add AFNI interface for 3dConvertDset (https://github.com/nipy/nipype/pull/2337)
* MAINT: Cleanup EngineBase (https://github.com/nipy/nipype/pull/2376)
Expand Down
5 changes: 3 additions & 2 deletions nipype/pipeline/engine/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def __init__(self,
self.synchronize = synchronize
self.itersource = itersource
self.overwrite = overwrite
self.parameterization = None
self.parameterization = []
self.input_source = {}
self.plugin_args = {}

Expand Down Expand Up @@ -242,9 +242,10 @@ def n_procs(self, value):

@property
def itername(self):
"""Name for expanded iterable"""
itername = self._id
if self._hierarchy:
itername = self._hierarchy + '.' + self._id
itername = '%s.%s' % (self._hierarchy, self._id)
return itername

def output_dir(self):
Expand Down
1 change: 1 addition & 0 deletions nipype/pipeline/engine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ def write_report(node, report_type=None, is_mapnode=False):
rst_dict = {
'hostname': result.runtime.hostname,
'duration': result.runtime.duration,
'working_dir': result.runtime.cwd,
}

if hasattr(result.runtime, 'cmdline'):
Expand Down
27 changes: 21 additions & 6 deletions nipype/pipeline/plugins/multiproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
absolute_import)

# Import packages
import os
from multiprocessing import Process, Pool, cpu_count, pool
from traceback import format_exception
import sys
Expand Down Expand Up @@ -50,6 +51,8 @@ def run_node(node, updatehash, taskid):
the node to run
updatehash : boolean
flag for updating hash
taskid : int
an identifier for this task

Returns
-------
Expand All @@ -63,7 +66,7 @@ def run_node(node, updatehash, taskid):
# Try and execute the node via node.run()
try:
result['result'] = node.run(updatehash=updatehash)
except:
except: # noqa: E722, intendedly catch all here
result['traceback'] = format_exception(*sys.exc_info())
result['result'] = node.result

Expand Down Expand Up @@ -131,6 +134,10 @@ def __init__(self, plugin_args=None):
self._task_obj = {}
self._taskid = 0

# Cache current working directory and make sure we
# change to it when workers are set up
self._cwd = os.getcwd()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the workflow base directory or the CWD of the shell? Could this cause things to dump into the local directory?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't because interfaces handle their WD. I think this is fixing an edge case for fmriprep where we are spinning up and killing workers all the time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok it turns out that some SimpleInterfaces are writing to the workflow base directory :(

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well that's... suboptimal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's actually bothering, because it means that without this patch, those interfaces are being run in some other unexpected path.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apparently it is a problem only with SimpleInterface


# Read in options or set defaults.
non_daemon = self.plugin_args.get('non_daemon', True)
maxtasks = self.plugin_args.get('maxtasksperchild', 10)
Expand All @@ -143,19 +150,28 @@ def __init__(self, plugin_args=None):

# Instantiate different thread pools for non-daemon processes
logger.debug('[MultiProc] Starting in "%sdaemon" mode (n_procs=%d, '
'mem_gb=%0.2f)', 'non' * int(non_daemon), self.processors,
self.memory_gb)
'mem_gb=%0.2f, cwd=%s)', 'non' * int(non_daemon),
self.processors, self.memory_gb, self._cwd)

NipypePool = NonDaemonPool if non_daemon else Pool
try:
self.pool = NipypePool(
processes=self.processors, maxtasksperchild=maxtasks)
processes=self.processors,
maxtasksperchild=maxtasks,
initializer=os.chdir,
initargs=(self._cwd,)
)
except TypeError:
# Python < 3.2 does not have maxtasksperchild
# When maxtasksperchild is not set, initializer is not to be
# called
self.pool = NipypePool(processes=self.processors)

self._stats = None

def _async_callback(self, args):
# Make sure runtime is not left at a dubious working directory
os.chdir(self._cwd)
self._taskresult[args['taskid']] = args

def _get_result(self, taskid):
Expand Down Expand Up @@ -360,7 +376,6 @@ def _sort_jobs(self, jobids, scheduler='tsort'):
if scheduler == 'mem_thread':
return sorted(
jobids,
key=
lambda item: (self.procs[item].mem_gb, self.procs[item].n_procs)
key=lambda item: (self.procs[item].mem_gb, self.procs[item].n_procs)
)
return jobids