diff --git a/reproject/common.py b/reproject/common.py index 8050910c8..e9de813ac 100644 --- a/reproject/common.py +++ b/reproject/common.py @@ -1,3 +1,4 @@ +import logging import os import tempfile import uuid @@ -15,10 +16,29 @@ __all__ = ["_reproject_dispatcher"] +class _ArrayContainer: + # When we set up as_delayed_memmap_path, if we pass a dask array to it, + # dask will actually compute the array before we get to the code inside + # as_delayed_memmap_path, so as a workaround we wrap any array we + # pass in using _ArrayContainer to make sure dask doesn't try and be smart. + def __init__(self, array): + self._array = array + + @delayed(pure=True) def as_delayed_memmap_path(array, tmp_dir): + + # Extract array from _ArrayContainer + if isinstance(array, _ArrayContainer): + array = array._array + else: + raise TypeError("Expected _ArrayContainer in as_delayed_memmap_path") + + logger = logging.getLogger(__name__) if isinstance(array, da.core.Array): + logger.info("Computing input dask array to Numpy memory-mapped array") array_path, _ = _dask_to_numpy_memmap(array, tmp_dir) + logger.info(f"Numpy memory-mapped array is now at {array_path}") else: array_path = os.path.join(tmp_dir, f"{uuid.uuid4()}.npy") array_memmapped = np.memmap( @@ -95,6 +115,8 @@ def _reproject_dispatcher( Whether to return numpy or dask arrays - defaults to 'numpy'. """ + logger = logging.getLogger(__name__) + if return_type is None: return_type = "numpy" elif return_type not in ("numpy", "dask"): @@ -138,7 +160,11 @@ def _reproject_dispatcher( ) if isinstance(array_in, da.core.Array): - _, array_in = _dask_to_numpy_memmap(array_in, local_tmp_dir) + logger.info("Computing input dask array to Numpy memory-mapped array") + array_path, array_in = _dask_to_numpy_memmap(array_in, local_tmp_dir) + logger.info(f"Numpy memory-mapped array is now at {array_path}") + + logger.info(f"Calling {reproject_func.__name__} in non-dask mode") try: return reproject_func( @@ -180,7 +206,7 @@ def _reproject_dispatcher( tmp_dir = tempfile.mkdtemp() else: tmp_dir = local_tmp_dir - array_in_or_path = as_delayed_memmap_path(array_in, tmp_dir) + array_in_or_path = as_delayed_memmap_path(_ArrayContainer(array_in), tmp_dir) else: # Here we could set array_in_or_path to array_in_path if it has # been set previously, but in synchronous and threaded mode it is @@ -190,7 +216,13 @@ def _reproject_dispatcher( array_in_or_path = array_in def reproject_single_block(a, array_or_path, block_info=None): - if a.ndim == 0 or block_info is None or block_info == []: + + if ( + a.ndim == 0 + or block_info is None + or block_info == [] + or (isinstance(block_info, np.ndarray) and block_info.tolist() == []) + ): return np.array([a, a]) # The WCS class from astropy is not thread-safe, see e.g. @@ -262,6 +294,8 @@ def reproject_single_block(a, array_or_path, block_info=None): array_out_dask = da.empty(shape_out) array_out_dask = array_out_dask.rechunk(block_size_limit=64 * 1024**2, **rechunk_kwargs) + logger.info(f"Setting up output dask array with map_blocks") + result = da.map_blocks( reproject_single_block, array_out_dask, @@ -297,6 +331,8 @@ def reproject_single_block(a, array_or_path, block_info=None): zarr_path = os.path.join(local_tmp_dir, f"{uuid.uuid4()}.zarr") + logger.info(f"Computing output array directly to zarr array at {zarr_path}") + if parallel == "current-scheduler": # Just use whatever is the current active scheduler, which can # be used for e.g. dask.distributed @@ -317,6 +353,8 @@ def reproject_single_block(a, array_or_path, block_info=None): result = da.from_zarr(zarr_path) + logger.info(f"Copying output zarr array into output Numpy arrays") + if return_footprint: da.store( [result[0], result[1]], diff --git a/reproject/mosaicking/coadd.py b/reproject/mosaicking/coadd.py index 6058a2d9b..56ddb209b 100644 --- a/reproject/mosaicking/coadd.py +++ b/reproject/mosaicking/coadd.py @@ -3,6 +3,7 @@ import sys import tempfile import uuid +from logging import getLogger import numpy as np from astropy.wcs import WCS @@ -143,6 +144,8 @@ def reproject_and_coadd( # up memory usage. We could probably still have references to array # objects, but we'd just make sure these were memory mapped + logger = getLogger(__name__) + # Validate inputs if combine_function not in ("mean", "sum", "first", "last", "min", "max"): @@ -176,12 +179,19 @@ def reproject_and_coadd( f"the output shape {shape_out}" ) + logger.info(f"Output mosaic will have shape {shape_out}") + # Define 'on-the-fly' mode: in the case where we don't need to match # the backgrounds and we are combining with 'mean' or 'sum', we don't # have to keep track of the intermediate arrays and can just modify # the output array on-the-fly on_the_fly = not match_background and combine_function in ("mean", "sum") + on_the_fly_prefix = "Using" if on_the_fly else "Not using" + logger.info( + f"{on_the_fly_prefix} on-the-fly mode for adding individual reprojected images to output array" + ) + # Start off by reprojecting individual images to the final projection if not on_the_fly: @@ -190,6 +200,9 @@ def reproject_and_coadd( with tempfile.TemporaryDirectory(ignore_cleanup_errors=IS_WIN) as local_tmp_dir: for idata in progress_bar(range(len(input_data))): + + logger.info(f"Processing input data {idata + 1} of {len(input_data)}") + # We need to pre-parse the data here since we need to figure out how to # optimize/minimize the size of each output tile (see below). array_in, wcs_in = parse_input_data(input_data[idata], hdu_in=hdu_in) @@ -238,6 +251,9 @@ def reproject_and_coadd( break if skip_data: + logger.info( + "Skipping reprojection as no predicted overlap with final mosaic header" + ) continue slice_out = tuple([slice(imin, imax) for (imin, imax) in bounds]) @@ -265,6 +281,10 @@ def reproject_and_coadd( array_path = os.path.join(local_tmp_dir, f"array_{uuid.uuid4()}.np") + logger.info( + f"Creating memory-mapped array with shape {shape_out_indiv} at {array_path}" + ) + array = np.memmap( array_path, shape=shape_out_indiv, @@ -274,6 +294,10 @@ def reproject_and_coadd( footprint_path = os.path.join(local_tmp_dir, f"footprint_{uuid.uuid4()}.np") + logger.info( + f"Creating memory-mapped footprint with shape {shape_out_indiv} at {footprint_path}" + ) + footprint = np.memmap( footprint_path, shape=shape_out_indiv, @@ -285,6 +309,8 @@ def reproject_and_coadd( array = footprint = None + logger.info(f"Calling {reproject_function.__name__} with shape_out={shape_out_indiv}") + array, footprint = reproject_function( (array_in, wcs_in), output_projection=wcs_out_indiv, @@ -301,6 +327,10 @@ def reproject_and_coadd( weights_path = os.path.join(local_tmp_dir, f"weights_{uuid.uuid4()}.np") + logger.info( + f"Creating memory-mapped weights with shape {shape_out_indiv} at {weights_path}" + ) + weights = np.memmap( weights_path, shape=shape_out_indiv, @@ -312,6 +342,10 @@ def reproject_and_coadd( weights = None + logger.info( + f"Calling {reproject_function.__name__} with shape_out={shape_out_indiv} for weights" + ) + weights = reproject_function( (weights_in, wcs_in), output_projection=wcs_out_indiv, @@ -335,6 +369,7 @@ def reproject_and_coadd( if intermediate_memmap: # Remove the reference to the memmap before trying to remove the file itself + logger.info(f"Removing memory-mapped weight array") weights = None try: os.remove(weights_path) @@ -347,6 +382,7 @@ def reproject_and_coadd( # output image is empty (due e.g. to no overlap). if on_the_fly: + logger.info(f"Adding reprojected array to final array") # By default, values outside of the footprint are set to NaN # but we set these to 0 here to avoid getting NaNs in the # means/sums. @@ -362,6 +398,7 @@ def reproject_and_coadd( if intermediate_memmap: # Remove the references to the memmaps themesleves before # trying to remove the files thermselves. + logger.info(f"Removing memory-mapped array and footprint arrays") array = None footprint = None try: @@ -371,10 +408,12 @@ def reproject_and_coadd( pass else: + logger.info(f"Adding reprojected array to list to combine later") arrays.append(array) # If requested, try and match the backgrounds. if match_background and len(arrays) > 1: + logger.info(f"Match backgrounds") offset_matrix = determine_offset_matrix(arrays) corrections = solve_corrections_sgd(offset_matrix) if background_reference: @@ -384,6 +423,7 @@ def reproject_and_coadd( if combine_function in ("mean", "sum"): if match_background: + logger.info(f"Combining reprojected arrays with function {combine_function}") # if we're not matching the background, this part has already been done for array in arrays: # By default, values outside of the footprint are set to NaN @@ -394,10 +434,12 @@ def reproject_and_coadd( output_footprint[array.view_in_original_array] += array.footprint if combine_function == "mean": + logger.info(f"Handle normalization of output array") with np.errstate(invalid="ignore"): output_array /= output_footprint elif combine_function in ("first", "last", "min", "max"): + logger.info(f"Combining reprojected arrays with function {combine_function}") if combine_function == "min": output_array[...] = np.inf elif combine_function == "max": @@ -433,6 +475,7 @@ def reproject_and_coadd( # We need to avoid potentially large memory allocation from output == 0 so # we operate in chunks. + logger.info(f"Resetting invalid pixels to {blank_pixel_value}") for chunk in iterate_chunks(output_array.shape, max_chunk_size=256 * 1024**2): output_array[chunk][output_footprint[chunk] == 0] = blank_pixel_value