diff --git a/CHANGES b/CHANGES index 29e34a1854..fa17166884 100644 --- a/CHANGES +++ b/CHANGES @@ -1,6 +1,7 @@ Upcoming release (0.14.1) ========================= +* FIX: MultiProc starting workers at dubious wd (https://github.com/nipy/nipype/pull/2368) * REF+FIX: Move BIDSDataGrabber to `interfaces.io` + fix correct default behavior (https://github.com/nipy/nipype/pull/2336) * ENH: Add AFNI interface for 3dConvertDset (https://github.com/nipy/nipype/pull/2337) * MAINT: Cleanup EngineBase (https://github.com/nipy/nipype/pull/2376) diff --git a/nipype/pipeline/engine/nodes.py b/nipype/pipeline/engine/nodes.py index 5dfc122011..e013702f8a 100644 --- a/nipype/pipeline/engine/nodes.py +++ b/nipype/pipeline/engine/nodes.py @@ -172,7 +172,7 @@ def __init__(self, self.synchronize = synchronize self.itersource = itersource self.overwrite = overwrite - self.parameterization = None + self.parameterization = [] self.input_source = {} self.plugin_args = {} @@ -242,9 +242,10 @@ def n_procs(self, value): @property def itername(self): + """Name for expanded iterable""" itername = self._id if self._hierarchy: - itername = self._hierarchy + '.' + self._id + itername = '%s.%s' % (self._hierarchy, self._id) return itername def output_dir(self): diff --git a/nipype/pipeline/engine/utils.py b/nipype/pipeline/engine/utils.py index d42cc1cbe6..ec8d0e42ec 100644 --- a/nipype/pipeline/engine/utils.py +++ b/nipype/pipeline/engine/utils.py @@ -188,6 +188,7 @@ def write_report(node, report_type=None, is_mapnode=False): rst_dict = { 'hostname': result.runtime.hostname, 'duration': result.runtime.duration, + 'working_dir': result.runtime.cwd, } if hasattr(result.runtime, 'cmdline'): diff --git a/nipype/pipeline/plugins/multiproc.py b/nipype/pipeline/plugins/multiproc.py index f40dd49e02..36c2cfd996 100644 --- a/nipype/pipeline/plugins/multiproc.py +++ b/nipype/pipeline/plugins/multiproc.py @@ -10,6 +10,7 @@ absolute_import) # Import packages +import os from multiprocessing import Process, Pool, cpu_count, pool from traceback import format_exception import sys @@ -50,6 +51,8 @@ def run_node(node, updatehash, taskid): the node to run updatehash : boolean flag for updating hash + taskid : int + an identifier for this task Returns ------- @@ -63,7 +66,7 @@ def run_node(node, updatehash, taskid): # Try and execute the node via node.run() try: result['result'] = node.run(updatehash=updatehash) - except: + except: # noqa: E722, intendedly catch all here result['traceback'] = format_exception(*sys.exc_info()) result['result'] = node.result @@ -131,6 +134,10 @@ def __init__(self, plugin_args=None): self._task_obj = {} self._taskid = 0 + # Cache current working directory and make sure we + # change to it when workers are set up + self._cwd = os.getcwd() + # Read in options or set defaults. non_daemon = self.plugin_args.get('non_daemon', True) maxtasks = self.plugin_args.get('maxtasksperchild', 10) @@ -143,19 +150,28 @@ def __init__(self, plugin_args=None): # Instantiate different thread pools for non-daemon processes logger.debug('[MultiProc] Starting in "%sdaemon" mode (n_procs=%d, ' - 'mem_gb=%0.2f)', 'non' * int(non_daemon), self.processors, - self.memory_gb) + 'mem_gb=%0.2f, cwd=%s)', 'non' * int(non_daemon), + self.processors, self.memory_gb, self._cwd) NipypePool = NonDaemonPool if non_daemon else Pool try: self.pool = NipypePool( - processes=self.processors, maxtasksperchild=maxtasks) + processes=self.processors, + maxtasksperchild=maxtasks, + initializer=os.chdir, + initargs=(self._cwd,) + ) except TypeError: + # Python < 3.2 does not have maxtasksperchild + # When maxtasksperchild is not set, initializer is not to be + # called self.pool = NipypePool(processes=self.processors) self._stats = None def _async_callback(self, args): + # Make sure runtime is not left at a dubious working directory + os.chdir(self._cwd) self._taskresult[args['taskid']] = args def _get_result(self, taskid): @@ -360,7 +376,6 @@ def _sort_jobs(self, jobids, scheduler='tsort'): if scheduler == 'mem_thread': return sorted( jobids, - key= - lambda item: (self.procs[item].mem_gb, self.procs[item].n_procs) + key=lambda item: (self.procs[item].mem_gb, self.procs[item].n_procs) ) return jobids