diff --git a/reproject/interpolation/core.py b/reproject/interpolation/core.py index 63e2decd0..7fe3a0a9a 100644 --- a/reproject/interpolation/core.py +++ b/reproject/interpolation/core.py @@ -74,6 +74,7 @@ def _reproject_full( array_out=None, return_footprint=True, roundtrip_coords=True, + output_footprint=None, ): """ Reproject n-dimensional data to a new projection using interpolation. @@ -100,6 +101,9 @@ def _reproject_full( if array_out is None: array_out = np.empty(shape_out) + if output_footprint is None: + output_footprint = np.empty(shape_out) + array_out_loopable = array_out if len(array.shape) == wcs_in.low_level_wcs.pixel_n_dim: # We don't need to broadcast the transformation over any extra @@ -149,6 +153,7 @@ def _reproject_full( # also contains this data and has the user's desired output shape. if return_footprint: - return array_out, (~np.isnan(array_out)).astype(float) + output_footprint[:] = (~np.isnan(array_out)).astype(float) + return array_out, output_footprint else: return array_out diff --git a/reproject/interpolation/high_level.py b/reproject/interpolation/high_level.py index a00daa01c..dfd91bddb 100644 --- a/reproject/interpolation/high_level.py +++ b/reproject/interpolation/high_level.py @@ -3,7 +3,7 @@ from astropy.utils import deprecated_renamed_argument -from ..utils import parse_input_data, parse_output_projection, reproject_blocked +from ..utils import _reproject_blocked, parse_input_data, parse_output_projection from .core import _reproject_full __all__ = ["reproject_interp"] @@ -116,20 +116,7 @@ def reproject_interp( # if either of these are not default, it means a blocked method must be used if block_size is not None or parallel is not False: - # if parallel is set but block size isn't, we'll choose - # block size so each thread gets one block each - if parallel is not False and block_size is None: - block_size = list(shape_out) - # each thread gets an equal sized strip of output area to process - block_size[-2] = shape_out[-2] // os.cpu_count() - - # given we have cases where modern system have many cpu cores some sanity clamping is - # to avoid 0 length block sizes when num_cpu_cores is greater than the side of the image - for dim_idx in range(min(len(shape_out), 2)): - if block_size[dim_idx] == 0: - block_size[dim_idx] = shape_out[dim_idx] - - return reproject_blocked( + return _reproject_blocked( _reproject_full, array_in=array_in, wcs_in=wcs_in, @@ -151,4 +138,5 @@ def reproject_interp( array_out=output_array, return_footprint=return_footprint, roundtrip_coords=roundtrip_coords, + output_footprint=output_footprint, ) diff --git a/reproject/interpolation/tests/test_core.py b/reproject/interpolation/tests/test_core.py index 82be075c1..69db845db 100644 --- a/reproject/interpolation/tests/test_core.py +++ b/reproject/interpolation/tests/test_core.py @@ -718,9 +718,11 @@ def test_blocked_broadcast_reprojection(input_extra_dims, output_shape, parallel @pytest.mark.parametrize("parallel", [True, 2, False]) -@pytest.mark.parametrize("block_size", [[40, 40], [500, 500], [500, 100], None]) +@pytest.mark.parametrize("block_size", [[500, 500], [500, 100], None]) +@pytest.mark.parametrize("return_footprint", [False, True]) +@pytest.mark.parametrize("existing_outputs", [False, True]) @pytest.mark.remote_data -def test_blocked_against_single(parallel, block_size): +def test_blocked_against_single(parallel, block_size, return_footprint, existing_outputs): # Ensure when we break a reprojection down into multiple discrete blocks # it has the same result as if all pixels where reprejcted at once @@ -729,6 +731,19 @@ def test_blocked_against_single(parallel, block_size): array_test = None footprint_test = None + shape_out = (720, 721) + + if existing_outputs: + output_array_test = np.zeros(shape_out) + output_footprint_test = np.zeros(shape_out) + output_array_reference = np.zeros(shape_out) + output_footprint_reference = np.zeros(shape_out) + else: + output_array_test = None + output_footprint_test = None + output_array_reference = None + output_footprint_reference = None + # the warning import and ignore is needed to keep pytest happy when running with # older versions of astropy which don't have this fix: # https://github.com/astropy/astropy/pull/12844 @@ -738,72 +753,40 @@ def test_blocked_against_single(parallel, block_size): with warnings.catch_warnings(): warnings.simplefilter("ignore", category=FITSFixedWarning) - - # this one is needed to avoid the following warning from when the np.as_strided() is - # called in wcs_utils.unbroadcast(), only shows up with py3.8, numpy1.17, astropy 4.0.*: - # DeprecationWarning: Numpy has detected that you (may be) writing to an array with - # overlapping memory from np.broadcast_arrays. If this is intentional - # set the WRITEABLE flag True or make a copy immediately before writing. - # We do call as_strided with writeable=True as it recommends and only shows up with the 10px - # testcase so assuming a numpy bug in the detection code which was fixed in later version. - # The pixel values all still match in the end, only shows up due to pytest clearing - # the standard python warning filters by default and failing as the warnings are now - # treated as the exceptions they're implemented on - if block_size == [10, 10]: - warnings.simplefilter("ignore", category=DeprecationWarning) - - array_test, footprint_test = reproject_interp( - hdu2, hdu1.header, parallel=parallel, block_size=block_size + result_test = reproject_interp( + hdu2, + hdu1.header, + parallel=parallel, + block_size=block_size, + return_footprint=return_footprint, + output_array=output_array_test, + output_footprint=output_footprint_test, ) - array_reference, footprint_reference = reproject_interp( - hdu2, hdu1.header, parallel=False, block_size=None + result_reference = reproject_interp( + hdu2, + hdu1.header, + parallel=False, + block_size=None, + return_footprint=return_footprint, + output_array=output_array_reference, + output_footprint=output_footprint_reference, ) - np.testing.assert_allclose(array_test, array_reference, equal_nan=True) - np.testing.assert_allclose(footprint_test, footprint_reference, equal_nan=True) - - -@pytest.mark.remote_data -def test_blocked_corner_cases(): - """ - When doing blocked there are a few checks designed to sanity clamp/preserve - values. Even though the blocking process only tiles in a 2d manner 3d information - about the image needs to be preserved and transformed correctly. Additonally - when automatically determining block size based on CPU cores zeros can appear on - machines where num_cores > x or y dim of output image. So make sure it correctly - functions when 0 block size goes in - """ - - # Read in the input cube - hdu_in = fits.open(get_pkg_data_filename("data/equatorial_3d.fits", package="reproject.tests"))[ - 0 - ] - - # Define the output header - this should be the same for all versions of - # this test to make sure we can use a single reference file. - header_out = hdu_in.header.copy() - header_out["NAXIS1"] = 10 - header_out["NAXIS2"] = 9 - header_out["CTYPE1"] = "GLON-SIN" - header_out["CTYPE2"] = "GLAT-SIN" - header_out["CRVAL1"] = 163.16724 - header_out["CRVAL2"] = -15.777405 - header_out["CRPIX1"] = 6 - header_out["CRPIX2"] = 5 - - array_reference = reproject_interp(hdu_in, header_out, return_footprint=False) - - array_test = None - - # same reason as test above for FITSFixedWarning - import warnings - - with warnings.catch_warnings(): - warnings.simplefilter("ignore", category=FITSFixedWarning) + if return_footprint: + array_test, footprint_test = result_test + array_reference, footprint_reference = result_reference + else: + array_test = result_test + array_reference = result_reference - array_test = reproject_interp( - hdu_in, header_out, parallel=True, block_size=[0, 4], return_footprint=False - ) + if existing_outputs: + assert array_test is output_array_test + assert array_reference is output_array_reference + if return_footprint: + assert footprint_test is output_footprint_test + assert footprint_reference is output_footprint_reference - np.testing.assert_allclose(array_test, array_reference, equal_nan=True, verbose=True) + np.testing.assert_allclose(array_test, array_reference, equal_nan=True) + if return_footprint: + np.testing.assert_allclose(footprint_test, footprint_reference, equal_nan=True) diff --git a/reproject/utils.py b/reproject/utils.py index 7c536e93d..eaeba857e 100644 --- a/reproject/utils.py +++ b/reproject/utils.py @@ -1,12 +1,16 @@ +import tempfile from concurrent import futures import astropy.nddata +import dask +import dask.array as da import numpy as np from astropy.io import fits from astropy.io.fits import CompImageHDU, HDUList, Header, ImageHDU, PrimaryHDU from astropy.wcs import WCS from astropy.wcs.wcsapi import BaseHighLevelWCS, SlicedLowLevelWCS from astropy.wcs.wcsapi.high_level_wcs_wrapper import HighLevelWCSWrapper +from dask.utils import SerializableLock __all__ = [ "parse_input_data", @@ -162,50 +166,7 @@ def parse_output_projection(output_projection, shape_in=None, shape_out=None, ou return wcs_out, shape_out -def _block( - reproject_func, array_in, wcs_in, wcs_out_sub, shape_out, i_range, j_range, return_footprint -): - """ - Implementation function that handles reprojecting subsets blocks of pixels - from an input image and holds metadata about where to reinsert when done. - - Parameters - ---------- - reproject_func - One the existing reproject functions implementing a reprojection algorithm - that that will be used be used to perform reprojection - array_in - Data following the same format as expected by underlying reproject_func, - expected to `~numpy.ndarray` when used from reproject_blocked() - wcs_in: `~astropy.wcs.WCS` - WCS object corresponding to array_in - wcs_out_sub: - Output WCS image will be projected to. Normally will correspond to subset of - total output image when used by repoject_blocked() - shape_out: - Passed to reproject_func() alongside WCS out to determine image size - i_range: - Passed through unmodified, used to determine where to reinsert block - j_range: - Passed through unmodified, used to determine where to reinsert block - """ - - result = reproject_func( - array_in, wcs_in, wcs_out_sub, shape_out=shape_out, return_footprint=return_footprint - ) - - res_arr = None - res_fp = None - - if return_footprint: - res_arr, res_fp = result - else: - res_arr = result - - return {"i": i_range, "j": j_range, "res_arr": res_arr, "res_fp": res_fp} - - -def reproject_blocked( +def _reproject_blocked( reproject_func, array_in, wcs_in, @@ -228,14 +189,14 @@ def reproject_blocked( that that will be used be used to perform reprojection array_in Data following the same format as expected by underlying reproject_func, - expected to `~numpy.ndarray` when used from reproject_blocked() + expected to `~numpy.ndarray` when used from _reproject_blocked() wcs_in: `~astropy.wcs.WCS` WCS object corresponding to array_in shape_out: tuple Passed to reproject_func() alongside WCS out to determine image size wcs_out: `~astropy.wcs.WCS` Output WCS image will be projected to. Normally will correspond to subset of - total output image when used by repoject_blocked() + total output image when used by _reproject_blocked() block_size: tuple The size of blocks in terms of output array pixels that each block will handle reprojecting. Extending out from (0,0) coords positively, block sizes @@ -263,96 +224,85 @@ def reproject_blocked( if output_footprint is None and return_footprint: output_footprint = np.zeros(shape_out, dtype=float) - # setup variables needed for multiprocessing if required - proc_pool = None - blocks_futures = [] - - if parallel or type(parallel) is int: - if type(parallel) is int: - if parallel <= 0: - raise ValueError("The number of processors to use must be strictly positive") - else: - proc_pool = futures.ProcessPoolExecutor(max_workers=parallel) + if block_size is not None and len(block_size) < len(shape_out): + block_size = [-1] * (len(shape_out) - len(block_size)) + list(block_size) + + shape_in = array_in.shape + + # When in parallel mode, we want to make sure we avoid having to copy the + # input array to all processes for each chunk, so instead we write out + # the input array to a Numpy memory map and load it in inside each process + # as a memory-mapped array. We need to be careful how this gets passed to + # reproject_single_block so we pass a variable that can be either a string + # or the array itself (for synchronous mode). + if parallel: + array_in_or_path = tempfile.mktemp() + array_in_memmapped = np.memmap( + array_in_or_path, dtype=float, shape=array_in.shape, mode="w+" + ) + array_in_memmapped[:] = array_in[:] + else: + array_in_or_path = array_in + + def reproject_single_block(a, block_info=None): + if a.ndim == 0 or block_info is None or block_info == []: + return np.array([a, a]) + slices = [slice(*x) for x in block_info[None]["array-location"][-wcs_out.pixel_n_dim :]] + wcs_out_sub = HighLevelWCSWrapper(SlicedLowLevelWCS(wcs_out, slices=slices)) + if isinstance(array_in_or_path, str): + array_in = np.memmap(array_in_or_path, dtype=float, shape=shape_in) else: - proc_pool = futures.ProcessPoolExecutor() - - # This will iterate over the output space, generating slices of that - # WCS and either processing and reinserting them immediately, - # or when doing parallel impl submit them to workers then wait and reinsert as - # the workers complete each block - for imin in range(0, output_array.shape[-2], block_size[0]): - imax = min(imin + block_size[0], output_array.shape[-2]) - for jmin in range(0, output_array.shape[-1], block_size[1]): - jmax = min(jmin + block_size[1], output_array.shape[-1]) - shape_out_sub = (imax - imin, jmax - jmin) - # if the output has more than two dims, apply our blocking to only the last two - shape_out_sub = output_array.shape[:-2] + shape_out_sub - - slices = [slice(imin, imax), slice(jmin, jmax)] - if wcs_out.low_level_wcs.pixel_n_dim > 2: - slices = [Ellipsis] + slices - wcs_out_sub = HighLevelWCSWrapper(SlicedLowLevelWCS(wcs_out, slices=slices)) - - if proc_pool is None: - # if sequential input data and reinsert block into main array immediately - completed_block = _block( - reproject_func=reproject_func, - array_in=array_in, - wcs_in=wcs_in, - wcs_out_sub=wcs_out_sub, - shape_out=shape_out_sub, - return_footprint=return_footprint, - j_range=(jmin, jmax), - i_range=(imin, imax), - ) - - output_array[..., imin:imax, jmin:jmax] = completed_block["res_arr"][:] - if return_footprint: - output_footprint[..., imin:imax, jmin:jmax] = completed_block["res_fp"][:] - - else: - # if parallel just submit all work items and move on to waiting for them to be done - future = proc_pool.submit( - _block, - reproject_func=reproject_func, - array_in=array_in, - wcs_in=wcs_in, - wcs_out_sub=wcs_out_sub, - shape_out=shape_out_sub, - return_footprint=return_footprint, - j_range=(jmin, jmax), - i_range=(imin, imax), - ) - blocks_futures.append(future) - - # If a parallel implementation is being used that means the - # blocks have not been reassembled yet and must be done now as their - # block call completes in the worker processes - if proc_pool is not None: - completed_future_count = 0 - for completed_future in futures.as_completed(blocks_futures): - completed_block = completed_future.result() - i_range = completed_block["i"] - j_range = completed_block["j"] - output_array[..., i_range[0] : i_range[1], j_range[0] : j_range[1]] = completed_block[ - "res_arr" - ][:] - - if return_footprint: - footprint_block = completed_block["res_fp"][:] - output_footprint[ - ..., i_range[0] : i_range[1], j_range[0] : j_range[1] - ] = footprint_block + array_in = array_in_or_path + array, footprint = reproject_func( + array_in, wcs_in, wcs_out_sub, block_info[None]["chunk-shape"][1:] + ) + return np.array([array, footprint]) + + # NOTE: the following array is just used to set up the iteration in map_blocks + # but isn't actually used otherwise - this is deliberate. + output_array_dask = da.empty(shape_out, chunks=block_size or "auto") + + result = da.map_blocks( + reproject_single_block, + output_array_dask, + dtype=float, + new_axis=0, + chunks=(2,) + output_array_dask.chunksize, + ) - completed_future_count += 1 - idx = blocks_futures.index(completed_future) - # ensure memory used by returned data is freed - completed_future._result = None - del blocks_futures[idx], completed_future - proc_pool.shutdown() - del blocks_futures + # Truncate extra elements + result = result[tuple([slice(None)] + [slice(s) for s in shape_out])] + + if parallel: + # As discussed in https://github.com/dask/dask/issues/9556, da.store + # will not work well in multiprocessing mode when the destination is a + # Numpy array. Instead, in this case we save the dask array to a zarr + # array on disk which can be done in parallel, and re-load it as a dask + # array. We can then use da.store in the next step using the + # 'synchronous' scheduler since that is I/O limited so does not need + # to be done in parallel. + filename = tempfile.mktemp() + if isinstance(parallel, int): + workers = {"num_workers": parallel} + else: + workers = {} + with dask.config.set(scheduler="processes", **workers): + result.to_zarr(filename) + result = da.from_zarr(filename) if return_footprint: + da.store( + [result[0], result[1]], + [output_array, output_footprint], + compute=True, + scheduler="synchronous", + ) return output_array, output_footprint else: + da.store( + result[0], + output_array, + compute=True, + scheduler="synchronous", + ) return output_array diff --git a/setup.cfg b/setup.cfg index bf5c97cae..62f11003d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -17,10 +17,14 @@ packages = find: python_requires = >=3.8 setup_requires = setuptools_scm install_requires = - numpy>=1.17 + numpy>=1.20 astropy>=4.0 astropy-healpix>=0.6 - scipy>=1.3 + scipy>=1.5 + dask[array]>=2020 + cloudpickle + zarr + fsspec [options.extras_require] test = diff --git a/tox.ini b/tox.ini index e448c4d0a..8ed4136d2 100644 --- a/tox.ini +++ b/tox.ini @@ -20,10 +20,11 @@ changedir = deps = numpy121: numpy==1.21.* - oldestdeps: numpy==1.17.3 + oldestdeps: numpy==1.20.* oldestdeps: astropy==4.0.* oldestdeps: astropy-healpix==0.6 - oldestdeps: scipy==1.3.2 + oldestdeps: scipy==1.5.* + oldestdeps: dask==2020.12.* devdeps: numpy>=0.0.dev0 devdeps: scipy>=0.0.dev0