From bcc25fda5391fe4c32cf7933b8254e155c345311 Mon Sep 17 00:00:00 2001 From: oesteban Date: Wed, 10 Jan 2018 12:31:07 -0800 Subject: [PATCH 1/4] [FIX] MultiProc starting workers at dubious wd When ``maxtasksperchild`` was set at a very low value, workers are created very often, sometimes at working directories deleted after the interface cleanup. That would trigger an ``OSError`` when calling ``os.getcwd()`` during ``nipype.config`` import. This PR sets an initializer for the workers that just changes to the appropriate working directory before the worker is spun up. Fixes poldracklab/fmriprep#868 --- nipype/pipeline/plugins/multiproc.py | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/nipype/pipeline/plugins/multiproc.py b/nipype/pipeline/plugins/multiproc.py index 86c021decd..bec01d04b2 100644 --- a/nipype/pipeline/plugins/multiproc.py +++ b/nipype/pipeline/plugins/multiproc.py @@ -9,6 +9,7 @@ from __future__ import print_function, division, unicode_literals, absolute_import # Import packages +import os from multiprocessing import Process, Pool, cpu_count, pool from traceback import format_exception import sys @@ -47,6 +48,8 @@ def run_node(node, updatehash, taskid): the node to run updatehash : boolean flag for updating hash + taskid : int + an identifier for this task Returns ------- @@ -68,6 +71,10 @@ def run_node(node, updatehash, taskid): return result +def _init_worker(cwd): + os.chdir(cwd) + + class NonDaemonProcess(Process): """A non-daemon process to support internal multiprocessing. """ @@ -128,6 +135,10 @@ def __init__(self, plugin_args=None): self._task_obj = {} self._taskid = 0 + # Cache current working directory and make sure we + # change to it when workers are set up + self._cwd = os.getcwd() + # Read in options or set defaults. non_daemon = self.plugin_args.get('non_daemon', True) maxtasks = self.plugin_args.get('maxtasksperchild', 10) @@ -140,19 +151,28 @@ def __init__(self, plugin_args=None): # Instantiate different thread pools for non-daemon processes logger.debug('[MultiProc] Starting in "%sdaemon" mode (n_procs=%d, ' - 'mem_gb=%0.2f)', 'non' * int(non_daemon), self.processors, - self.memory_gb) + 'mem_gb=%0.2f, cwd=%s)', 'non' * int(non_daemon), + self.processors, self.memory_gb, self._cwd) NipypePool = NonDaemonPool if non_daemon else Pool try: - self.pool = NipypePool(processes=self.processors, - maxtasksperchild=maxtasks) + self.pool = NipypePool( + processes=self.processors, + maxtasksperchild=maxtasks, + initializer=_init_worker, + initargs=(self._cwd,) + ) except TypeError: + # Python < 3.2 does not have maxtasksperchild + # When maxtasksperchild is not set, initializer is not to be + # called. self.pool = NipypePool(processes=self.processors) self._stats = None def _async_callback(self, args): + # Make sure runtime is not left at a dubious working directory + os.chdir(self._cwd) self._taskresult[args['taskid']] = args def _get_result(self, taskid): From 70da04e5e849577ff056484f59a8d18d0d3db21f Mon Sep 17 00:00:00 2001 From: oesteban Date: Wed, 10 Jan 2018 14:02:43 -0800 Subject: [PATCH 2/4] less verbose, update CHANGES --- CHANGES | 1 + nipype/pipeline/plugins/multiproc.py | 8 ++------ 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/CHANGES b/CHANGES index 01a09b735a..bf8b9b359f 100644 --- a/CHANGES +++ b/CHANGES @@ -1,6 +1,7 @@ Upcoming release (0.14.1) ================ +* FIX: MultiProc starting workers at dubious wd (https://github.com/nipy/nipype/pull/2368) * MAINT: Cleaning / simplify ``Node`` (https://github.com/nipy/nipype/pull/#2325) 0.14.0 (November 29, 2017) diff --git a/nipype/pipeline/plugins/multiproc.py b/nipype/pipeline/plugins/multiproc.py index bec01d04b2..025f8cf111 100644 --- a/nipype/pipeline/plugins/multiproc.py +++ b/nipype/pipeline/plugins/multiproc.py @@ -71,10 +71,6 @@ def run_node(node, updatehash, taskid): return result -def _init_worker(cwd): - os.chdir(cwd) - - class NonDaemonProcess(Process): """A non-daemon process to support internal multiprocessing. """ @@ -159,13 +155,13 @@ def __init__(self, plugin_args=None): self.pool = NipypePool( processes=self.processors, maxtasksperchild=maxtasks, - initializer=_init_worker, + initializer=os.chdir, initargs=(self._cwd,) ) except TypeError: # Python < 3.2 does not have maxtasksperchild # When maxtasksperchild is not set, initializer is not to be - # called. + # called self.pool = NipypePool(processes=self.processors) self._stats = None From a9b2609870d3ae8baf1136e15b27090ac7d7afbb Mon Sep 17 00:00:00 2001 From: oesteban Date: Thu, 18 Jan 2018 14:33:37 -0800 Subject: [PATCH 3/4] add runtime.cwd to report.rst --- nipype/pipeline/engine/utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/nipype/pipeline/engine/utils.py b/nipype/pipeline/engine/utils.py index b2bb7f7c75..892043b162 100644 --- a/nipype/pipeline/engine/utils.py +++ b/nipype/pipeline/engine/utils.py @@ -188,6 +188,7 @@ def write_report(node, report_type=None, is_mapnode=False): rst_dict = { 'hostname': result.runtime.hostname, 'duration': result.runtime.duration, + 'working_dir': result.runtime.cwd, } if hasattr(result.runtime, 'cmdline'): From af6c708110d0551a1042b25c84805b579011079a Mon Sep 17 00:00:00 2001 From: Oscar Esteban Date: Thu, 18 Jan 2018 22:08:53 -0800 Subject: [PATCH 4/4] sty changes --- nipype/pipeline/engine/nodes.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/nipype/pipeline/engine/nodes.py b/nipype/pipeline/engine/nodes.py index 7f99810f68..b084e4d8ef 100644 --- a/nipype/pipeline/engine/nodes.py +++ b/nipype/pipeline/engine/nodes.py @@ -172,7 +172,7 @@ def __init__(self, self.synchronize = synchronize self.itersource = itersource self.overwrite = overwrite - self.parameterization = None + self.parameterization = [] self.input_source = {} self.plugin_args = {} @@ -242,9 +242,10 @@ def n_procs(self, value): @property def itername(self): + """Name for expanded iterable""" itername = self._id if self._hierarchy: - itername = self._hierarchy + '.' + self._id + itername = '%s.%s' % (self._hierarchy, self._id) return itername def output_dir(self):