From 63b997874c7b22f5ddc6a63d989d0a30e58b3836 Mon Sep 17 00:00:00 2001 From: oesteban Date: Fri, 3 Nov 2017 11:29:16 -0700 Subject: [PATCH 1/6] add memory annotation to RobustMNINormalization --- fmriprep/workflows/anatomical.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/fmriprep/workflows/anatomical.py b/fmriprep/workflows/anatomical.py index afe7b4e82..59a8573c6 100644 --- a/fmriprep/workflows/anatomical.py +++ b/fmriprep/workflows/anatomical.py @@ -219,7 +219,8 @@ def init_anat_preproc_wf(skull_strip_template, output_spaces, template, debug, flavor='testing' if debug else 'precise', ), name='t1_2_mni', - n_procs=omp_nthreads + n_procs=omp_nthreads, + mem_gb=2 ) # Resample the brain mask and the tissue probability maps into mni space From f324aaa992b7252c3376bddd31f3db2e3cbae148 Mon Sep 17 00:00:00 2001 From: oesteban Date: Fri, 3 Nov 2017 12:57:51 -0700 Subject: [PATCH 2/6] annotate node autorecon2_vol --- fmriprep/workflows/anatomical.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/fmriprep/workflows/anatomical.py b/fmriprep/workflows/anatomical.py index 59a8573c6..0f6f5d75b 100644 --- a/fmriprep/workflows/anatomical.py +++ b/fmriprep/workflows/anatomical.py @@ -670,8 +670,7 @@ def init_autorecon_resume_wf(omp_nthreads, name='autorecon_resume_wf'): autorecon2_vol = pe.Node( fs.ReconAll(directive='autorecon2-volonly', openmp=omp_nthreads), - n_procs=omp_nthreads, - name='autorecon2_vol') + n_procs=omp_nthreads, mem_gb=5, name='autorecon2_vol') autorecon_surfs = pe.MapNode( fs.ReconAll( From f6e057ff51dfdd719dcbe1c5708ae000012f19e2 Mon Sep 17 00:00:00 2001 From: oesteban Date: Fri, 3 Nov 2017 19:15:43 -0700 Subject: [PATCH 3/6] enable psutil --- fmriprep/info.py | 1 + 1 file changed, 1 insertion(+) diff --git a/fmriprep/info.py b/fmriprep/info.py index f7abb95ca..cfba881a6 100644 --- a/fmriprep/info.py +++ b/fmriprep/info.py @@ -88,6 +88,7 @@ 'tests': TESTS_REQUIRES, 'duecredit': ['duecredit'], 'datalad': ['datalad'], + 'resmon': ['psutil>=5.4.0'], } # Enable a handle to install all extra dependencies at once From 6b3d7970c79177d482820110f27f1d300a99feff Mon Sep 17 00:00:00 2001 From: oesteban Date: Sat, 4 Nov 2017 09:46:26 -0700 Subject: [PATCH 4/6] use threading for these pools --- fmriprep/interfaces/itk.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/fmriprep/interfaces/itk.py b/fmriprep/interfaces/itk.py index acc3e215a..fb7b8cfb2 100644 --- a/fmriprep/interfaces/itk.py +++ b/fmriprep/interfaces/itk.py @@ -55,8 +55,8 @@ def _run_interface(self, runtime): with TemporaryDirectory(prefix='tmp-', dir=runtime.cwd) as tmp_folder: # Inputs are ready to run in parallel if num_threads is None or num_threads > 1: - from multiprocessing import Pool - pool = Pool(processes=num_threads, maxtasksperchild=100) + from multiprocessing.dummy import Pool + pool = Pool(num_threads) itk_outs = pool.map(_mat2itk, [ (in_mat, self.inputs.in_reference, self.inputs.in_source, i, tmp_folder) for i, in_mat in enumerate(self.inputs.in_files)] @@ -138,8 +138,8 @@ def _run_interface(self, runtime): for i, (in_file, in_xfm) in enumerate(zip(in_files, xfms_list)) ] else: - from multiprocessing import Pool - pool = Pool(processes=num_threads, maxtasksperchild=100) + from multiprocessing.dummy import Pool + pool = Pool(num_threads) out_files = pool.map( _applytfms, [(in_file, in_xfm, ifargs, i, runtime.cwd) for i, (in_file, in_xfm) in enumerate(zip(in_files, xfms_list))] @@ -236,7 +236,7 @@ def _mat2itk(args): # Run c3d_affine_tool C3dAffineTool(transform_file=in_file, reference_file=in_ref, source_file=in_src, - fsl2ras=True, itk_transform=out_file).run() + fsl2ras=True, itk_transform=out_file, resource_monitor=False).run() transform = '#Transform %d\n' % index with open(out_file) as itkfh: transform += ''.join(itkfh.readlines()[2:]) @@ -262,6 +262,7 @@ def _applytfms(args): xfm = ApplyTransforms( input_image=in_file, transforms=in_xform, output_image=out_file, **ifargs) xfm.terminal_output = 'allatonce' + xfm.resource_monitor = False runtime = xfm.run().runtime if copy_dtype: From a4b353ab1abb0e0052f93e3411faf4d66d9b4452 Mon Sep 17 00:00:00 2001 From: oesteban Date: Mon, 6 Nov 2017 12:59:23 -0800 Subject: [PATCH 5/6] indexed_gzip>=0.7.0 (https://github.com/poldracklab/fmriprep/issues/801#issuecomment-341965807) --- fmriprep/info.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fmriprep/info.py b/fmriprep/info.py index cfba881a6..72c47e970 100644 --- a/fmriprep/info.py +++ b/fmriprep/info.py @@ -71,7 +71,7 @@ 'statsmodels', 'nipype', 'seaborn', - 'indexed_gzip>=0.6.1', + 'indexed_gzip>=0.7.0', ] LINKS_REQUIRES = [ From 24e09dd96f1d19995d6943cca3ec7d5bfd795897 Mon Sep 17 00:00:00 2001 From: oesteban Date: Mon, 6 Nov 2017 22:13:59 -0800 Subject: [PATCH 6/6] replace inner pool with `concurrent.futures.ThreadPoolExecutor`\n\nHopefully, it will avoid an army of zombies (https://stackoverflow.com/questions/17223301/python-multiprocessing-is-it-possible-to-have-a-pool-inside-of-a-pool) --- fmriprep/interfaces/itk.py | 28 ++++++++++++---------------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/fmriprep/interfaces/itk.py b/fmriprep/interfaces/itk.py index fb7b8cfb2..8fc1554b6 100644 --- a/fmriprep/interfaces/itk.py +++ b/fmriprep/interfaces/itk.py @@ -55,14 +55,12 @@ def _run_interface(self, runtime): with TemporaryDirectory(prefix='tmp-', dir=runtime.cwd) as tmp_folder: # Inputs are ready to run in parallel if num_threads is None or num_threads > 1: - from multiprocessing.dummy import Pool - pool = Pool(num_threads) - itk_outs = pool.map(_mat2itk, [ - (in_mat, self.inputs.in_reference, self.inputs.in_source, i, tmp_folder) - for i, in_mat in enumerate(self.inputs.in_files)] - ) - pool.close() - pool.join() + from concurrent.futures import ThreadPoolExecutor + with ThreadPoolExecutor(max_workers=num_threads) as pool: + itk_outs = list(pool.map(_mat2itk, [ + (in_mat, self.inputs.in_reference, self.inputs.in_source, i, tmp_folder) + for i, in_mat in enumerate(self.inputs.in_files)] + )) else: itk_outs = [_mat2itk(( in_mat, self.inputs.in_reference, self.inputs.in_source, i, tmp_folder)) @@ -138,14 +136,12 @@ def _run_interface(self, runtime): for i, (in_file, in_xfm) in enumerate(zip(in_files, xfms_list)) ] else: - from multiprocessing.dummy import Pool - pool = Pool(num_threads) - out_files = pool.map( - _applytfms, [(in_file, in_xfm, ifargs, i, runtime.cwd) - for i, (in_file, in_xfm) in enumerate(zip(in_files, xfms_list))] - ) - pool.close() - pool.join() + from concurrent.futures import ThreadPoolExecutor + with ThreadPoolExecutor(max_workers=num_threads) as pool: + out_files = list(pool.map(_applytfms, [ + (in_file, in_xfm, ifargs, i, runtime.cwd) + for i, (in_file, in_xfm) in enumerate(zip(in_files, xfms_list))] + )) tmp_folder.cleanup() # Collect output file names, after sorting by index