Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REF] More memory annotations and considerations #816

Merged
merged 6 commits into from
Nov 10, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion fmriprep/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
'statsmodels',
'nipype',
'seaborn',
'indexed_gzip>=0.6.1',
'indexed_gzip>=0.7.0',
]

LINKS_REQUIRES = [
Expand All @@ -88,6 +88,7 @@
'tests': TESTS_REQUIRES,
'duecredit': ['duecredit'],
'datalad': ['datalad'],
'resmon': ['psutil>=5.4.0'],
}

# Enable a handle to install all extra dependencies at once
Expand Down
31 changes: 14 additions & 17 deletions fmriprep/interfaces/itk.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,12 @@ def _run_interface(self, runtime):
with TemporaryDirectory(prefix='tmp-', dir=runtime.cwd) as tmp_folder:
# Inputs are ready to run in parallel
if num_threads is None or num_threads > 1:
from multiprocessing import Pool
pool = Pool(processes=num_threads, maxtasksperchild=100)
itk_outs = pool.map(_mat2itk, [
(in_mat, self.inputs.in_reference, self.inputs.in_source, i, tmp_folder)
for i, in_mat in enumerate(self.inputs.in_files)]
)
pool.close()
pool.join()
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=num_threads) as pool:
itk_outs = list(pool.map(_mat2itk, [
(in_mat, self.inputs.in_reference, self.inputs.in_source, i, tmp_folder)
for i, in_mat in enumerate(self.inputs.in_files)]
))
else:
itk_outs = [_mat2itk((
in_mat, self.inputs.in_reference, self.inputs.in_source, i, tmp_folder))
Expand Down Expand Up @@ -138,14 +136,12 @@ def _run_interface(self, runtime):
for i, (in_file, in_xfm) in enumerate(zip(in_files, xfms_list))
]
else:
from multiprocessing import Pool
pool = Pool(processes=num_threads, maxtasksperchild=100)
out_files = pool.map(
_applytfms, [(in_file, in_xfm, ifargs, i, runtime.cwd)
for i, (in_file, in_xfm) in enumerate(zip(in_files, xfms_list))]
)
pool.close()
pool.join()
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=num_threads) as pool:
out_files = list(pool.map(_applytfms, [
(in_file, in_xfm, ifargs, i, runtime.cwd)
for i, (in_file, in_xfm) in enumerate(zip(in_files, xfms_list))]
))
tmp_folder.cleanup()

# Collect output file names, after sorting by index
Expand Down Expand Up @@ -236,7 +232,7 @@ def _mat2itk(args):

# Run c3d_affine_tool
C3dAffineTool(transform_file=in_file, reference_file=in_ref, source_file=in_src,
fsl2ras=True, itk_transform=out_file).run()
fsl2ras=True, itk_transform=out_file, resource_monitor=False).run()
transform = '#Transform %d\n' % index
with open(out_file) as itkfh:
transform += ''.join(itkfh.readlines()[2:])
Expand All @@ -262,6 +258,7 @@ def _applytfms(args):
xfm = ApplyTransforms(
input_image=in_file, transforms=in_xform, output_image=out_file, **ifargs)
xfm.terminal_output = 'allatonce'
xfm.resource_monitor = False
runtime = xfm.run().runtime

if copy_dtype:
Expand Down
6 changes: 3 additions & 3 deletions fmriprep/workflows/anatomical.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ def init_anat_preproc_wf(skull_strip_template, output_spaces, template, debug,
flavor='testing' if debug else 'precise',
),
name='t1_2_mni',
n_procs=omp_nthreads
n_procs=omp_nthreads,
mem_gb=2
)

# Resample the brain mask and the tissue probability maps into mni space
Expand Down Expand Up @@ -669,8 +670,7 @@ def init_autorecon_resume_wf(omp_nthreads, name='autorecon_resume_wf'):

autorecon2_vol = pe.Node(
fs.ReconAll(directive='autorecon2-volonly', openmp=omp_nthreads),
n_procs=omp_nthreads,
name='autorecon2_vol')
n_procs=omp_nthreads, mem_gb=5, name='autorecon2_vol')

autorecon_surfs = pe.MapNode(
fs.ReconAll(
Expand Down