Skip to content

Commit f3b0912

Browse files
authored
Merge pull request #2368 from oesteban/fix/multi-proc-cwd
[FIX] MultiProc starting workers at dubious wd
2 parents 6096f92 + 5b5547b commit f3b0912

File tree

4 files changed

+26
-8
lines changed

4 files changed

+26
-8
lines changed

CHANGES

+1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
Upcoming release (0.14.1)
22
=========================
33

4+
* FIX: MultiProc starting workers at dubious wd (https://github.com/nipy/nipype/pull/2368)
45
* REF+FIX: Move BIDSDataGrabber to `interfaces.io` + fix correct default behavior (https://github.com/nipy/nipype/pull/2336)
56
* ENH: Add AFNI interface for 3dConvertDset (https://github.com/nipy/nipype/pull/2337)
67
* MAINT: Cleanup EngineBase (https://github.com/nipy/nipype/pull/2376)

nipype/pipeline/engine/nodes.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ def __init__(self,
172172
self.synchronize = synchronize
173173
self.itersource = itersource
174174
self.overwrite = overwrite
175-
self.parameterization = None
175+
self.parameterization = []
176176
self.input_source = {}
177177
self.plugin_args = {}
178178

@@ -242,9 +242,10 @@ def n_procs(self, value):
242242

243243
@property
244244
def itername(self):
245+
"""Name for expanded iterable"""
245246
itername = self._id
246247
if self._hierarchy:
247-
itername = self._hierarchy + '.' + self._id
248+
itername = '%s.%s' % (self._hierarchy, self._id)
248249
return itername
249250

250251
def output_dir(self):

nipype/pipeline/engine/utils.py

+1
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ def write_report(node, report_type=None, is_mapnode=False):
188188
rst_dict = {
189189
'hostname': result.runtime.hostname,
190190
'duration': result.runtime.duration,
191+
'working_dir': result.runtime.cwd,
191192
}
192193

193194
if hasattr(result.runtime, 'cmdline'):

nipype/pipeline/plugins/multiproc.py

+21-6
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
absolute_import)
1111

1212
# Import packages
13+
import os
1314
from multiprocessing import Process, Pool, cpu_count, pool
1415
from traceback import format_exception
1516
import sys
@@ -50,6 +51,8 @@ def run_node(node, updatehash, taskid):
5051
the node to run
5152
updatehash : boolean
5253
flag for updating hash
54+
taskid : int
55+
an identifier for this task
5356
5457
Returns
5558
-------
@@ -63,7 +66,7 @@ def run_node(node, updatehash, taskid):
6366
# Try and execute the node via node.run()
6467
try:
6568
result['result'] = node.run(updatehash=updatehash)
66-
except:
69+
except: # noqa: E722, intendedly catch all here
6770
result['traceback'] = format_exception(*sys.exc_info())
6871
result['result'] = node.result
6972

@@ -131,6 +134,10 @@ def __init__(self, plugin_args=None):
131134
self._task_obj = {}
132135
self._taskid = 0
133136

137+
# Cache current working directory and make sure we
138+
# change to it when workers are set up
139+
self._cwd = os.getcwd()
140+
134141
# Read in options or set defaults.
135142
non_daemon = self.plugin_args.get('non_daemon', True)
136143
maxtasks = self.plugin_args.get('maxtasksperchild', 10)
@@ -143,19 +150,28 @@ def __init__(self, plugin_args=None):
143150

144151
# Instantiate different thread pools for non-daemon processes
145152
logger.debug('[MultiProc] Starting in "%sdaemon" mode (n_procs=%d, '
146-
'mem_gb=%0.2f)', 'non' * int(non_daemon), self.processors,
147-
self.memory_gb)
153+
'mem_gb=%0.2f, cwd=%s)', 'non' * int(non_daemon),
154+
self.processors, self.memory_gb, self._cwd)
148155

149156
NipypePool = NonDaemonPool if non_daemon else Pool
150157
try:
151158
self.pool = NipypePool(
152-
processes=self.processors, maxtasksperchild=maxtasks)
159+
processes=self.processors,
160+
maxtasksperchild=maxtasks,
161+
initializer=os.chdir,
162+
initargs=(self._cwd,)
163+
)
153164
except TypeError:
165+
# Python < 3.2 does not have maxtasksperchild
166+
# When maxtasksperchild is not set, initializer is not to be
167+
# called
154168
self.pool = NipypePool(processes=self.processors)
155169

156170
self._stats = None
157171

158172
def _async_callback(self, args):
173+
# Make sure runtime is not left at a dubious working directory
174+
os.chdir(self._cwd)
159175
self._taskresult[args['taskid']] = args
160176

161177
def _get_result(self, taskid):
@@ -360,7 +376,6 @@ def _sort_jobs(self, jobids, scheduler='tsort'):
360376
if scheduler == 'mem_thread':
361377
return sorted(
362378
jobids,
363-
key=
364-
lambda item: (self.procs[item].mem_gb, self.procs[item].n_procs)
379+
key=lambda item: (self.procs[item].mem_gb, self.procs[item].n_procs)
365380
)
366381
return jobids

0 commit comments

Comments
 (0)