Skip to content

Commit

Permalink
Add docstrings to parallel and error checking when parallel= 0
Browse files Browse the repository at this point in the history
  • Loading branch information
AlistairSymonds committed Sep 2, 2022
1 parent ddaa218 commit 07d1d9d
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 9 deletions.
5 changes: 5 additions & 0 deletions reproject/interpolation/high_level.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ def reproject_interp(input_data, output_projection, shape_out=None, hdu_in=0,
reprojected to one block at a time, this is useful for memory limited scenarios
such as dealing with very large arrays or high resolution output spaces.
parallel : bool or int
Flag for parallel implementation. If ``True``, a parallel implementation
is chosen, the number of processes selected automatically to be equal to
the number of logical CPUs detected on the machine. If ``False``, a
serial implementation is chosen. If the flag is a positive integer ``n``
greater than one, a parallel implementation using ``n`` processes is chosen.
roundtrip_coords : bool
Whether to verify that coordinate transformations are defined in both
directions.
Expand Down
2 changes: 1 addition & 1 deletion reproject/interpolation/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ def test_identity_with_offset(roundtrip_coords):
assert_allclose(expected, array_out, atol=1e-10)


@pytest.mark.parametrize('parallel', [True, 0, 2, False])
@pytest.mark.parametrize('parallel', [True, 2, False])
@pytest.mark.parametrize('block_size', [[10, 10], [500, 500], [500, 100], None])
def test_blocked_against_single(parallel, block_size):

Expand Down
87 changes: 79 additions & 8 deletions reproject/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,31 @@ def parse_output_projection(output_projection, shape_out=None, output_array=None

def _block(reproject_func, array_in, wcs_in, wcs_out_sub, shape_out, i_range, j_range,
return_footprint):
# i and j range must be passed through for multiprocessing to know where to reinsert patches
"""
Implementation function that handles reprojecting subsets blocks of pixels
from an input image and holds metadata about where to reinsert when done.
Parameters
----------
reproject_func
One the existing reproject functions implementing a reprojection algorithm
that that will be used be used to perform reprojection
array_in
Data following the same format as expected by underlying reproject_func,
expected to `~numpy.ndarray` when used from reproject_blocked()
wcs_in: `~astropy.wcs.WCS`
WCS object corresponding to array_in
wcs_out_sub:
Output WCS image will be projected to. Normally will correspond to subset of
total output image when used by repoject_blocked()
shape_out:
Passed to reproject_func() alongside WCS out to determine image size
i_range:
Passed through unmodified, used to determine where to reinsert block
j_range:
Passed through unmodified, used to determine where to reinsert block
"""

result = reproject_func(array_in, wcs_in, wcs_out_sub,
shape_out=shape_out, return_footprint=return_footprint)

Expand All @@ -159,6 +183,47 @@ def _block(reproject_func, array_in, wcs_in, wcs_out_sub, shape_out, i_range, j_
def reproject_blocked(reproject_func, array_in, wcs_in, shape_out, wcs_out, block_size,
output_array=None,
return_footprint=True, output_footprint=None, parallel=True):
"""
Implementation function that handles reprojecting subsets blocks of pixels
from an input image and holds metadata about where to reinsert when done.
Parameters
----------
reproject_func
One the existing reproject functions implementing a reprojection algorithm
that that will be used be used to perform reprojection
array_in
Data following the same format as expected by underlying reproject_func,
expected to `~numpy.ndarray` when used from reproject_blocked()
wcs_in: `~astropy.wcs.WCS`
WCS object corresponding to array_in
shape_out: tuple
Passed to reproject_func() alongside WCS out to determine image size
wcs_out: `~astropy.wcs.WCS`
Output WCS image will be projected to. Normally will correspond to subset of
total output image when used by repoject_blocked()
block_size: tuple
The size of blocks in terms of output array pixels that each block will handle
reprojecting. Extending out from (0,0) coords positively, block sizes
are clamped to output space edges when a block would extend past edge
output_array : None or `~numpy.ndarray`
An array in which to store the reprojected data. This can be any numpy
array including a memory map, which may be helpful when dealing with
extremely large files.
return_footprint : bool
Whether to return the footprint in addition to the output array.
output_footprint : None or `~numpy.ndarray`
An array in which to store the footprint of reprojected data. This can be
any numpy array including a memory map, which may be helpful when dealing with
extremely large files.
parallel : bool or int
Flag for parallel implementation. If ``True``, a parallel implementation
is chosen, the number of processes selected automatically to be equal to
the number of logical CPUs detected on the machine. If ``False``, a
serial implementation is chosen. If the flag is a positive integer ``n``
greater than one, a parallel implementation using ``n`` processes is chosen.
"""

if output_array is None:
output_array = np.zeros(shape_out, dtype=float)
if output_footprint is None and return_footprint:
Expand All @@ -169,12 +234,18 @@ def reproject_blocked(reproject_func, array_in, wcs_in, shape_out, wcs_out, bloc
blocks_futures = []

if parallel or type(parallel) is int:
if type(parallel) is int and parallel > 0:
proc_pool = futures.ProcessPoolExecutor(max_workers=parallel)
if type(parallel) is int:
if parallel <= 0:
raise ValueError("The number of processors to use must be strictly positive")
else:
proc_pool = futures.ProcessPoolExecutor(max_workers=parallel)
else:
proc_pool = futures.ProcessPoolExecutor()

sequential_blocks_done = 0
# This will iterate over the output space, generating slices of that
# WCS and either processing and reinserting them immediately,
# or when doing parallel impl submit them to workers then wait and reinsert as
# the workers complete each block
for imin in range(0, output_array.shape[0], block_size[0]):
imax = min(imin + block_size[0], output_array.shape[0])
for jmin in range(0, output_array.shape[1], block_size[1]):
Expand All @@ -200,7 +271,6 @@ def reproject_blocked(reproject_func, array_in, wcs_in, shape_out, wcs_out, bloc
if return_footprint:
output_footprint[imin:imax, jmin:jmax] = completed_block['res_fp'][:]

sequential_blocks_done += 1
else:
# if parallel just submit all work items and move on to waiting for them to be done
future = proc_pool.submit(_block, reproject_func=reproject_func, array_in=array_in,
Expand All @@ -211,15 +281,16 @@ def reproject_blocked(reproject_func, array_in, wcs_in, shape_out, wcs_out, bloc
blocks_futures.append(future)

# If a parallel implementation is being used that means the
# blocks have not been reassembled yet and must be done now
# blocks have not been reassembled yet and must be done now as their
# block call completes in the worker processes
if proc_pool is not None:
completed_future_count = 0
for completed_future in futures.as_completed(blocks_futures):
completed_block = completed_future.result()
i_range = completed_block['i']
j_range = completed_block['j']
output_array[i_range[0]:i_range[1], j_range[0]:j_range[1]] \
= completed_block['res_arr'][:]
output_array[i_range[0]:i_range[1], j_range[0]:j_range[1]] = (
completed_block['res_arr'][:])

if return_footprint:
footprint_block = completed_block['res_fp'][:]
Expand Down

0 comments on commit 07d1d9d

Please sign in to comment.