From 2b681116c9846338fc6b926e4950a68ee9be90c2 Mon Sep 17 00:00:00 2001 From: Amaury Dehecq Date: Tue, 31 Jan 2023 02:45:05 +0100 Subject: [PATCH] Re-organize coreg.py (#329) * Add a CRS argument to all coreg.fit functions * Replace old slope/aspect calculation with xdem implementation * Add a CRS argument to all coreg.fit functions * Replace old slope/aspect calculation with xdem implementation * Replace subsampling with geoutils.spatial_tools implementation * Improve testing subsampling for deramp coreg * Remove duplicate of deramping * Add function to calculate standard ddem stats * Fix circular import in previous commit * Ensure fit/apply always take transform+crs as input, update tests accordingly. Make non-resampling possible for NuthKaab. * Fix issue with arg name in overloading. Remove optional type for transform/crs. * Raise an error if unproject CRS is used for NuthKaab * Remove unused functions * Move the functions in a more logical order (nothing is changed) * Remove dilate_mask option of function * Fix bug with option not taken into account in CoregPipeline * Fix bug with resampling option not taken into account in Coregpipeline * Revert to old slope/aspect calculation, more efficient * Add a function to create inlier_mask for coreg * Add inout to ignored codespell errors * Add tests for create_inlier_mask function * Update dem_coregistration function to use create_inlier_mask * Fix typo in previous commit and apply linting * Fix issue iwth NMAD improperly calculated * Add a filter to remove pixels with dh above an absolute threshold * Update comment * Implement resample=False in dem_coregistration * Remove the hmode/vmode. Update docstring of dem_coregistration. * Enable input src/ref dem as Raster and not only path to file * Implement resample=False for BiasCorr * Update tests for create_inlier_mask * Make sure option resample=False is flagged as not implemented for BlockWiseCoreg * Add tests for resample=False option * Linting * Update default coreg method and modify outputs of dem_coregistration * Add test suite for dem_coregistration function * Update pre-commit * Fix mypy issue * Account for PR comments * Remove NaN forcing * Linting * Remove unnecessary type * Rename fit_func * Improve docstrings * Simplify if statement * Make resampling algo an optional argument for Coreg.apply * Fix two bugs and add comment * Update NuthKaab shift in tests * Update tests in examples and coreg * Update test_spatialstats values with new coreg that outputs ddem with less nodata * Fix runtime warning in code example * Fix coregistration code snippet * Fix coreg example with new transform and crs argument of Coreg class --------- Co-authored-by: rhugonne --- .pre-commit-config.yaml | 2 +- .../code/coregistration_plot_nuth_kaab.py | 6 +- tests/test_coreg.py | 463 ++++++++- tests/test_examples.py | 3 +- tests/test_spatialstats.py | 6 +- xdem/coreg.py | 886 ++++++++++-------- xdem/ddem.py | 1 + xdem/examples.py | 2 +- 8 files changed, 932 insertions(+), 437 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index efea9dbc..c8452005 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -14,7 +14,7 @@ repos: hooks: - id: codespell args: [ - '--ignore-words-list', 'nd,alos', + '--ignore-words-list', 'nd,alos,inout', '--ignore-regex', '\bhist\b', '--' ] diff --git a/docs/source/code/coregistration_plot_nuth_kaab.py b/docs/source/code/coregistration_plot_nuth_kaab.py index 7fee16a1..1ba811ea 100644 --- a/docs/source/code/coregistration_plot_nuth_kaab.py +++ b/docs/source/code/coregistration_plot_nuth_kaab.py @@ -11,11 +11,11 @@ inlier_mask = ~outlines_1990.create_mask(dem_2009) nuth_kaab = xdem.coreg.NuthKaab() -nuth_kaab.fit(dem_2009.data, dem_1990.data, transform=dem_2009.transform, inlier_mask=inlier_mask) -dem_coreg = nuth_kaab.apply(dem_1990.data, transform=dem_1990.transform) +nuth_kaab.fit(dem_2009, dem_1990, inlier_mask=inlier_mask) +dem_coreg = nuth_kaab.apply(dem_1990) ddem_pre = (dem_2009.data - dem_1990.data).filled(np.nan).squeeze() -ddem_post = (dem_2009.data - dem_coreg).filled(np.nan).squeeze() +ddem_post = (dem_2009.data - dem_coreg.data).filled(np.nan).squeeze() nmad_pre = xdem.spatialstats.nmad(ddem_pre[inlier_mask.squeeze()]) nmad_post = xdem.spatialstats.nmad(ddem_post[inlier_mask.squeeze()]) diff --git a/tests/test_coreg.py b/tests/test_coreg.py index 1293456a..e7ea9aa9 100644 --- a/tests/test_coreg.py +++ b/tests/test_coreg.py @@ -2,12 +2,15 @@ from __future__ import annotations import copy +import os +import tempfile import warnings -from typing import Callable +from typing import Any, Callable import cv2 import geoutils as gu import numpy as np +import pandas as pd import pytest import rasterio as rio from geoutils import Raster, Vector @@ -42,6 +45,7 @@ class TestCoregClass: dem_to_be_aligned=tba.data, inlier_mask=inlier_mask, transform=ref.transform, + crs=ref.crs, verbose=False, ) # Create some 3D coordinates with Z coordinates being 0 to try the apply_pts functions. @@ -124,7 +128,7 @@ def test_bias(self) -> None: assert biascorr.apply_pts(self.points)[0, 2] == biascorr._meta["bias"] # Apply the model to correct the DEM - tba_unbiased = biascorr.apply(self.tba.data, self.ref.transform) + tba_unbiased, _ = biascorr.apply(self.tba.data, self.ref.transform, self.ref.crs) # Create a new bias correction model biascorr2 = coreg.BiasCorr() @@ -135,6 +139,7 @@ def test_bias(self) -> None: reference_dem=self.ref.data, dem_to_be_aligned=tba_unbiased, transform=self.ref.transform, + crs=self.ref.crs, inlier_mask=self.inlier_mask, ) # Test the bias @@ -150,6 +155,7 @@ def test_all_nans(self) -> None: dem1 = np.ones((50, 50), dtype=float) dem2 = dem1.copy() + np.nan affine = rio.transform.from_origin(0, 0, 1, 1) + crs = rio.crs.CRS.from_epsg(4326) biascorr = coreg.BiasCorr() icp = coreg.ICP() @@ -159,7 +165,7 @@ def test_all_nans(self) -> None: dem2[[3, 20, 40], [2, 21, 41]] = 1.2 - biascorr.fit(dem1, dem2, transform=affine) + biascorr.fit(dem1, dem2, transform=affine, crs=crs) pytest.raises(ValueError, icp.fit, dem1, dem2, transform=affine) @@ -169,22 +175,23 @@ def test_error_method(self) -> None: # Create a biased dem dem2 = dem1.copy() + 2.0 affine = rio.transform.from_origin(0, 0, 1, 1) + crs = rio.crs.CRS.from_epsg(4326) biascorr = coreg.BiasCorr() # Fit the bias - biascorr.fit(dem1, dem2, transform=affine) + biascorr.fit(dem1, dem2, transform=affine, crs=crs) # Check that the bias after coregistration is zero - assert biascorr.error(dem1, dem2, transform=affine, error_type="median") == 0 + assert biascorr.error(dem1, dem2, transform=affine, crs=crs, error_type="median") == 0 # Remove the bias fit and see what happens. biascorr._meta["bias"] = 0 # Now it should be equal to dem1 - dem2 - assert biascorr.error(dem1, dem2, transform=affine, error_type="median") == -2 + assert biascorr.error(dem1, dem2, transform=affine, crs=crs, error_type="median") == -2 # Create random noise and see if the standard deviation is equal (it should) dem3 = dem1.copy() + np.random.random(size=dem1.size).reshape(dem1.shape) - assert abs(biascorr.error(dem1, dem3, transform=affine, error_type="std") - np.std(dem3)) < 1e-6 + assert abs(biascorr.error(dem1, dem3, transform=affine, crs=crs, error_type="std") - np.std(dem3)) < 1e-6 def test_coreg_example(self) -> None: """ @@ -216,7 +223,11 @@ def test_nuth_kaab(self) -> None: # Fit the synthesized shifted DEM to the original nuth_kaab.fit( - self.ref.data.squeeze(), shifted_dem, transform=self.ref.transform, verbose=self.fit_params["verbose"] + self.ref.data.squeeze(), + shifted_dem, + transform=self.ref.transform, + crs=self.ref.crs, + verbose=self.fit_params["verbose"], ) # Make sure that the estimated offsets are similar to what was synthesized. @@ -225,7 +236,7 @@ def test_nuth_kaab(self) -> None: assert nuth_kaab._meta["bias"] == pytest.approx(-bias, 0.03) # Apply the estimated shift to "revert the DEM" to its original state. - unshifted_dem = nuth_kaab.apply(shifted_dem, transform=self.ref.transform) + unshifted_dem, _ = nuth_kaab.apply(shifted_dem, transform=self.ref.transform, crs=self.ref.crs) # Measure the difference (should be more or less zero) diff = self.ref.data.squeeze() - unshifted_dem diff = diff.compressed() # turn into a 1D array with only unmasked values @@ -252,8 +263,8 @@ def test_deramping(self) -> None: # Fit the data deramp.fit(**self.fit_params) - # Apply the deramping to a DEm - deramped_dem = deramp.apply(self.tba.data, self.ref.transform) + # Apply the deramping to a DEM + deramped_dem, _ = deramp.apply(self.tba.data, self.ref.transform, self.ref.crs) # Get the periglacial offset after deramping periglacial_offset = (self.ref.data.squeeze() - deramped_dem)[self.inlier_mask.squeeze()] @@ -288,7 +299,7 @@ def test_icp_opencv(self) -> None: icp = coreg.ICP(max_iterations=3) icp.fit(**self.fit_params) - aligned_dem = icp.apply(self.tba.data, self.ref.transform) + aligned_dem, _ = icp.apply(self.tba.data, self.ref.transform, self.ref.crs) assert aligned_dem.shape == self.ref.data.squeeze().shape @@ -299,7 +310,7 @@ def test_pipeline(self) -> None: pipeline = coreg.CoregPipeline([coreg.BiasCorr(), coreg.NuthKaab()]) pipeline.fit(**self.fit_params) - aligned_dem = pipeline.apply(self.tba.data, self.ref.transform) + aligned_dem, _ = pipeline.apply(self.tba.data, self.ref.transform, self.ref.crs) assert aligned_dem.shape == self.ref.data.squeeze().shape @@ -384,6 +395,24 @@ def test_subsample(self) -> None: # Check that the x/y/z differences do not exceed 30cm assert np.count_nonzero(matrix_diff > 0.3) == 0 + # Test subsampled deramping + degree = 1 + deramp_sub = coreg.Deramp(degree=degree) + + # Fit the bias using 50% of the unmasked data using a fraction + deramp_sub.fit(**self.fit_params, subsample=0.5) + # Do the same but specify the pixel count instead. + # They are not perfectly equal (np.count_nonzero(self.mask) // 2 would be exact) + # But this would just repeat the subsample code, so that makes little sense to test. + deramp_sub.fit(**self.fit_params, subsample=self.tba.data.size // 2) + + # Do full bias corr to compare + deramp_full = coreg.Deramp(degree=degree) + deramp_full.fit(**self.fit_params) + + # Check that the estimated biases are similar + assert deramp_sub._meta["coefficients"] == pytest.approx(deramp_full._meta["coefficients"], rel=1e-1) + def test_z_scale_corr(self) -> None: warnings.simplefilter("error") @@ -395,10 +424,10 @@ def test_z_scale_corr(self) -> None: scaled_dem = self.ref.data * factor # Fit the correction - zcorr.fit(self.ref.data, scaled_dem, transform=self.ref.transform) + zcorr.fit(self.ref.data, scaled_dem, transform=self.ref.transform, crs=self.ref.crs) # Apply the correction - unscaled_dem = zcorr.apply(scaled_dem, self.ref.transform) + unscaled_dem, _ = zcorr.apply(scaled_dem, self.ref.transform, self.ref.crs) # Make sure the difference is now minimal diff = (self.ref.data - unscaled_dem).filled(np.nan) @@ -438,8 +467,8 @@ def test_z_scale_corr(self) -> None: dem_with_nans += error_field * 3 # Try the fit now with the messed up DEM as reference. - zcorr.fit(dem_with_nans, scaled_dem, transform=self.ref.transform) - unscaled_dem = zcorr.apply(scaled_dem, self.ref.transform) + zcorr.fit(dem_with_nans, scaled_dem, transform=self.ref.transform, crs=self.ref.crs) + unscaled_dem, _ = zcorr.apply(scaled_dem, self.ref.transform, self.ref.crs) diff = (dem_with_nans - unscaled_dem).filled(np.nan) assert np.abs(np.nanmedian(diff)) < 0.05 @@ -448,10 +477,10 @@ def test_z_scale_corr(self) -> None: # Try to correct using a nonlinear correction. zcorr_nonlinear = coreg.ZScaleCorr(degree=2) - zcorr_nonlinear.fit(dem_with_nans, scaled_dem, transform=self.ref.transform) + zcorr_nonlinear.fit(dem_with_nans, scaled_dem, transform=self.ref.transform, crs=self.ref.crs) # Make sure the difference is minimal - unscaled_dem = zcorr_nonlinear.apply(scaled_dem, self.ref.transform) + unscaled_dem, _ = zcorr_nonlinear.apply(scaled_dem, self.ref.transform, self.ref.crs) diff = (dem_with_nans - unscaled_dem).filled(np.nan) assert np.abs(np.nanmedian(diff)) < 0.05 @@ -488,7 +517,7 @@ def test_blockwise_coreg(self, pipeline: coreg.Coreg, subdivision: int) -> None: chunk_numbers = [m["i"] for m in blockwise._meta["coreg_meta"]] assert np.unique(chunk_numbers).shape[0] == len(chunk_numbers) - transformed_dem = blockwise.apply(self.tba.data, self.tba.transform) + transformed_dem, _ = blockwise.apply(self.tba.data, self.tba.transform, self.tba.crs) ddem_pre = (self.ref.data - self.tba.data)[~self.inlier_mask].squeeze().filled(np.nan) ddem_post = (self.ref.data.squeeze() - transformed_dem)[~self.inlier_mask.squeeze()].filled(np.nan) @@ -532,7 +561,7 @@ def test_blockwise_coreg_large_gaps(self) -> None: # Align the DEM and apply the blockwise to a zero-array (to get the zshift) aligned = blockwise.fit(self.ref, tba).apply(tba) - zshift = blockwise.apply(np.zeros_like(tba.data), transform=tba.transform) + zshift, _ = blockwise.apply(np.zeros_like(tba.data), transform=tba.transform, crs=tba.crs) # Validate that the zshift is not something crazy high and that no negative values exist in the data. assert np.nanmax(np.abs(zshift)) < 50 @@ -567,7 +596,10 @@ def test_coreg_raster_and_ndarray_args(self) -> None: # Fit the data biascorr_r.fit(reference_dem=dem1, dem_to_be_aligned=dem2) biascorr_a.fit( - reference_dem=dem1.data, dem_to_be_aligned=dem2.reproject(dem1, silent=True).data, transform=dem1.transform + reference_dem=dem1.data, + dem_to_be_aligned=dem2.reproject(dem1, silent=True).data, + transform=dem1.transform, + crs=dem1.crs, ) # Validate that they ended up giving the same result. @@ -575,7 +607,7 @@ def test_coreg_raster_and_ndarray_args(self) -> None: # De-shift dem2 dem2_r = biascorr_r.apply(dem2) - dem2_a = biascorr_a.apply(dem2.data, dem2.transform) + dem2_a, _ = biascorr_a.apply(dem2.data, dem2.transform, dem2.crs) # Validate that the return formats were the expected ones, and that they are equal. # Issue - dem2_a does not have the same shape, the first dimension is being squeezed @@ -586,43 +618,136 @@ def test_coreg_raster_and_ndarray_args(self) -> None: # If apply on a masked_array was given without a transform, it should fail. with pytest.raises(ValueError, match="'transform' must be given"): - biascorr_a.apply(dem2.data) + biascorr_a.apply(dem2.data, crs=dem2.crs) + + # If apply on a masked_array was given without a crs, it should fail. + with pytest.raises(ValueError, match="'crs' must be given"): + biascorr_a.apply(dem2.data, transform=dem2.transform) + # If transform provided with input Raster, should raise a warning with pytest.warns(UserWarning, match="DEM .* overrides the given 'transform'"): biascorr_a.apply(dem2, transform=dem2.transform) + # If crs provided with input Raster, should raise a warning + with pytest.warns(UserWarning, match="DEM .* overrides the given 'crs'"): + biascorr_a.apply(dem2, crs=dem2.crs) + + # Inputs contain: coregistration method, is implemented, comparison is "strict" or "approx" + @pytest.mark.parametrize( + "inputs", + [ + [xdem.coreg.BiasCorr(), True, "strict"], + [xdem.coreg.Deramp(), True, "strict"], + [xdem.coreg.ZScaleCorr(), True, "strict"], + [xdem.coreg.NuthKaab(), True, "approx"], + [xdem.coreg.NuthKaab() + xdem.coreg.Deramp(), True, "approx"], + [xdem.coreg.BlockwiseCoreg(coreg=xdem.coreg.NuthKaab(), subdivision=16), False, ""], + [xdem.coreg.ICP(), False, ""], + ], + ) # type: ignore + def test_apply_resample(self, inputs: list[Any]) -> None: + """ + Test that the option resample of coreg.apply works as expected. + For vertical correction only (BiasCorr, Deramp...), option True or False should yield same results. + For horizontal shifts (NuthKaab etc), georef should differ, but DEMs should be the same after resampling. + For others, the method is not implemented. + """ + # Get test inputs + coreg_method, is_implemented, comp = inputs + ref_dem, tba_dem, outlines = load_examples() # Load example reference, to-be-aligned and mask. + + # Prepare coreg + inlier_mask = ~outlines.create_mask(ref_dem) + coreg_method.fit(tba_dem, ref_dem, inlier_mask=inlier_mask) + + # If not implemented, should raise an error + if not is_implemented: + with pytest.raises(NotImplementedError, match="Option `resample=False` not implemented for coreg method *"): + dem_coreg_noresample = coreg_method.apply(tba_dem, resample=False) + return + else: + dem_coreg_resample = coreg_method.apply(tba_dem) + dem_coreg_noresample = coreg_method.apply(tba_dem, resample=False) + + if comp == "strict": + # Both methods should yield the exact same output + assert dem_coreg_resample == dem_coreg_noresample + elif comp == "approx": + # The georef should be different + assert dem_coreg_noresample.transform != dem_coreg_resample.transform + + # After resampling, both results should be almost equal + dem_final = dem_coreg_noresample.reproject(dem_coreg_resample) + diff = dem_final - dem_coreg_resample + assert np.all(np.abs(diff.data) == pytest.approx(0, abs=1e-2)) + # assert np.count_nonzero(diff.data) == 0 + + # Test it works with different resampling algorithms + dem_coreg_resample = coreg_method.apply(tba_dem, resample=True, resampling=rio.warp.Resampling.nearest) + dem_coreg_resample = coreg_method.apply(tba_dem, resample=True, resampling=rio.warp.Resampling.cubic) + with pytest.raises(ValueError, match="`resampling` must be a rio.warp.Resampling algorithm"): + dem_coreg_resample = coreg_method.apply(tba_dem, resample=True, resampling=None) + @pytest.mark.parametrize( "combination", [ - ("dem1", "dem2", "None", "fit", "passes", ""), - ("dem1", "dem2", "None", "apply", "passes", ""), - ("dem1.data", "dem2.data", "dem1.transform", "fit", "passes", ""), - ("dem1.data", "dem2.data", "dem1.transform", "apply", "passes", ""), + ("dem1", "dem2", "None", "None", "fit", "passes", ""), + ("dem1", "dem2", "None", "None", "apply", "passes", ""), + ("dem1.data", "dem2.data", "dem1.transform", "dem1.crs", "fit", "passes", ""), + ("dem1.data", "dem2.data", "dem1.transform", "dem1.crs", "apply", "passes", ""), ( "dem1", "dem2.data", "dem1.transform", + "dem1.crs", "fit", "warns", "'reference_dem' .* overrides the given 'transform'", ), - ("dem1.data", "dem2", "dem1.transform", "fit", "warns", "'dem_to_be_aligned' .* overrides .*"), + ("dem1.data", "dem2", "dem1.transform", "None", "fit", "warns", "'dem_to_be_aligned' .* overrides .*"), ( "dem1.data", "dem2.data", "None", + "dem1.crs", "fit", "error", "'transform' must be given if both DEMs are array-like.", ), - ("dem1", "dem2.data", "None", "apply", "error", "'transform' must be given if DEM is array-like."), - ("dem1", "dem2", "dem2.transform", "apply", "warns", "DEM .* overrides the given 'transform'"), - ("None", "None", "None", "fit", "error", "Both DEMs need to be array-like"), - ("dem1 + np.nan", "dem2", "None", "fit", "error", "'reference_dem' had only NaNs"), - ("dem1", "dem2 + np.nan", "None", "fit", "error", "'dem_to_be_aligned' had only NaNs"), + ( + "dem1.data", + "dem2.data", + "dem1.transform", + "None", + "fit", + "error", + "'crs' must be given if both DEMs are array-like.", + ), + ( + "dem1", + "dem2.data", + "None", + "dem1.crs", + "apply", + "error", + "'transform' must be given if DEM is array-like.", + ), + ( + "dem1", + "dem2.data", + "dem1.transform", + "None", + "apply", + "error", + "'crs' must be given if DEM is array-like.", + ), + ("dem1", "dem2", "dem2.transform", "None", "apply", "warns", "DEM .* overrides the given 'transform'"), + ("None", "None", "None", "None", "fit", "error", "Both DEMs need to be array-like"), + ("dem1 + np.nan", "dem2", "None", "None", "fit", "error", "'reference_dem' had only NaNs"), + ("dem1", "dem2 + np.nan", "None", "None", "fit", "error", "'dem_to_be_aligned' had only NaNs"), ], ) # type: ignore - def test_coreg_raises(self, combination: tuple[str, str, str, str, str, str]) -> None: + def test_coreg_raises(self, combination: tuple[str, str, str, str, str, str, str]) -> None: """ Assert that the expected warnings/errors are triggered under different circumstances. @@ -630,13 +755,15 @@ def test_coreg_raises(self, combination: tuple[str, str, str, str, str, str]) -> 1. The reference_dem (will be eval'd) 2. The dem to be aligned (will be eval'd) 3. The transform to use (will be eval'd) - 4. Which coreg method to assess - 5. The expected outcome of the test. - 6. The error/warning message (if applicable) + 4. The CRS to use (will be eval'd) + 5. Which coreg method to assess + 6. The expected outcome of the test. + 7. The error/warning message (if applicable) """ warnings.simplefilter("error") - ref_dem, tba_dem, transform, testing_step, result, text = combination + ref_dem, tba_dem, transform, crs, testing_step, result, text = combination + # Create a small sample-DEM dem1 = xdem.DEM.from_array( np.arange(25, dtype="float64").reshape(5, 5), @@ -647,16 +774,16 @@ def test_coreg_raises(self, combination: tuple[str, str, str, str, str, str]) -> dem2 = dem1.copy() # noqa # Evaluate the parametrization (e.g. 'dem2.transform') - ref_dem, tba_dem, transform = map(eval, (ref_dem, tba_dem, transform)) + ref_dem, tba_dem, transform, crs = map(eval, (ref_dem, tba_dem, transform, crs)) # Use BiasCorr as a representative example. biascorr = xdem.coreg.BiasCorr() def fit_func() -> coreg.Coreg: - return biascorr.fit(ref_dem, tba_dem, transform=transform) + return biascorr.fit(ref_dem, tba_dem, transform=transform, crs=crs) def apply_func() -> NDArrayf: - return biascorr.apply(tba_dem, transform=transform) + return biascorr.apply(tba_dem, transform=transform, crs=crs) # Try running the methods in order and validate the result. for method, method_call in [("fit", fit_func), ("apply", apply_func)]: @@ -681,9 +808,12 @@ def test_coreg_oneliner(self) -> None: dem_arr = np.ones((5, 5), dtype="int32") dem_arr2 = dem_arr + 1 transform = rio.transform.from_origin(0, 5, 1, 1) + crs = rio.crs.CRS.from_epsg(4326) - dem_arr2_fixed = ( - coreg.BiasCorr().fit(dem_arr, dem_arr2, transform=transform).apply(dem_arr2, transform=transform) + dem_arr2_fixed, _ = ( + coreg.BiasCorr() + .fit(dem_arr, dem_arr2, transform=transform, crs=crs) + .apply(dem_arr2, transform=transform, crs=crs) ) assert np.array_equal(dem_arr, dem_arr2_fixed) @@ -718,14 +848,6 @@ def test_apply_matrix() -> None: matrix[2, 3] = -bias transformed_dem = coreg.apply_matrix(shifted_dem, ref.transform, matrix, resampling="bilinear") - - # Dilate the mask: this should remove the same edge pixels as done by skimage - transformed_dem_dilated = coreg.apply_matrix( - shifted_dem, ref.transform, matrix, resampling="bilinear", dilate_mask=True - ) - # Validate the same pixels were removed. - assert np.count_nonzero(np.isfinite(transformed_dem)) == np.count_nonzero(np.isfinite(transformed_dem_dilated)) - diff = np.asarray(ref_arr - transformed_dem) # Check that the median is very close to zero @@ -911,3 +1033,240 @@ def test_warp_dem() -> None: plt.subplot(144) plt.imshow(dem - untransformed_dem, cmap="coolwarm_r", vmin=-10, vmax=10) plt.show() + + +def test_create_inlier_mask() -> None: + """Test that the create_inlier_mask function works expectedly.""" + warnings.simplefilter("error") + + ref, tba, outlines = load_examples() # Load example reference, to-be-aligned and outlines + + # - Assert that without filtering create_inlier_mask behaves as if calling Vector.create_mask - # + # Masking inside - using Vector + inlier_mask_comp = ~outlines.create_mask(ref) + inlier_mask = xdem.coreg.create_inlier_mask( + tba, + ref, + [ + outlines, + ], + filtering=False, + ) + assert np.all(inlier_mask_comp == inlier_mask) + + # Masking inside - using string + inlier_mask = xdem.coreg.create_inlier_mask( + tba, + ref, + [ + outlines.name, + ], + filtering=False, + ) + assert np.all(inlier_mask_comp == inlier_mask) + + # Masking outside - using Vector + inlier_mask = xdem.coreg.create_inlier_mask( + tba, + ref, + [ + outlines, + ], + inout=[ + -1, + ], + filtering=False, + ) + assert np.all(~inlier_mask_comp == inlier_mask) + + # Masking outside - using string + inlier_mask = xdem.coreg.create_inlier_mask( + tba, + ref, + [ + outlines.name, + ], + inout=[-1], + filtering=False, + ) + assert np.all(~inlier_mask_comp == inlier_mask) + + # - Test filtering options only - # + # Test the slope filter only + slope = xdem.terrain.slope(ref) + slope_lim = [1, 50] + inlier_mask_comp2 = np.ones(tba.data.shape, dtype=bool) + inlier_mask_comp2[slope.data < slope_lim[0]] = False + inlier_mask_comp2[slope.data > slope_lim[1]] = False + inlier_mask = xdem.coreg.create_inlier_mask(tba, ref, filtering=True, slope_lim=slope_lim, nmad_factor=np.inf) + assert np.all(inlier_mask == inlier_mask_comp2) + + # Test the nmad_factor filter only + nmad_factor = 3 + ddem = tba - ref + inlier_mask_comp3 = (np.abs(ddem.data - np.median(ddem)) < nmad_factor * xdem.spatialstats.nmad(ddem)).filled(False) + inlier_mask = xdem.coreg.create_inlier_mask(tba, ref, filtering=True, slope_lim=[0, 90], nmad_factor=nmad_factor) + assert np.all(inlier_mask == inlier_mask_comp3) + + # Test the sum of both + inlier_mask = xdem.coreg.create_inlier_mask( + tba, ref, shp_list=[], inout=[], filtering=True, slope_lim=slope_lim, nmad_factor=nmad_factor + ) + inlier_mask_all = inlier_mask_comp2 & inlier_mask_comp3 + assert np.all(inlier_mask == inlier_mask_all) + + # Test the dh_max filter only + dh_max = 200 + inlier_mask_comp4 = (np.abs(ddem.data) < dh_max).filled(False) + inlier_mask = xdem.coreg.create_inlier_mask( + tba, ref, filtering=True, slope_lim=[0, 90], nmad_factor=np.inf, dh_max=dh_max + ) + assert np.all(inlier_mask == inlier_mask_comp4) + + # - Test the sum of outlines + dh_max + slope - # + # nmad_factor will have a different behavior because it calculates nmad from the inliers of previous filters + inlier_mask = xdem.coreg.create_inlier_mask( + tba, + ref, + shp_list=[ + outlines, + ], + inout=[ + -1, + ], + filtering=True, + slope_lim=slope_lim, + nmad_factor=np.inf, + dh_max=dh_max, + ) + inlier_mask_all = ~inlier_mask_comp & inlier_mask_comp2 & inlier_mask_comp4 + assert np.all(inlier_mask == inlier_mask_all) + + # - Test that proper errors are raised for wrong inputs - # + with pytest.raises(ValueError, match="`shp_list` must be a list/tuple"): + inlier_mask = xdem.coreg.create_inlier_mask(tba, ref, shp_list=outlines) + + with pytest.raises(ValueError, match="`shp_list` must be a list/tuple of strings or geoutils.Vector instance"): + inlier_mask = xdem.coreg.create_inlier_mask(tba, ref, shp_list=[1]) + + with pytest.raises(ValueError, match="`inout` must be a list/tuple"): + inlier_mask = xdem.coreg.create_inlier_mask( + tba, + ref, + shp_list=[ + outlines, + ], + inout=1, # type: ignore + ) + + with pytest.raises(ValueError, match="`inout` must contain only 1 and -1"): + inlier_mask = xdem.coreg.create_inlier_mask( + tba, + ref, + shp_list=[ + outlines, + ], + inout=[ + 0, + ], + ) + + with pytest.raises(ValueError, match="`inout` must be of same length as shp"): + inlier_mask = xdem.coreg.create_inlier_mask( + tba, + ref, + shp_list=[ + outlines, + ], + inout=[1, 1], + ) + + with pytest.raises(ValueError, match="`slope_lim` must be a list/tuple"): + inlier_mask = xdem.coreg.create_inlier_mask(tba, ref, filtering=True, slope_lim=1) # type: ignore + + with pytest.raises(ValueError, match="`slope_lim` must contain 2 elements"): + inlier_mask = xdem.coreg.create_inlier_mask(tba, ref, filtering=True, slope_lim=[30]) + + with pytest.raises(ValueError, match=r"`slope_lim` must be a tuple/list of 2 elements in the range \[0-90\]"): + inlier_mask = xdem.coreg.create_inlier_mask(tba, ref, filtering=True, slope_lim=[-1, 40]) + + with pytest.raises(ValueError, match=r"`slope_lim` must be a tuple/list of 2 elements in the range \[0-90\]"): + inlier_mask = xdem.coreg.create_inlier_mask(tba, ref, filtering=True, slope_lim=[1, 120]) + + +def test_dem_coregistration() -> None: + """ + Test that the dem_coregistration function works expectedly. + Tests the features that are specific to dem_coregistration. + For example, many features are tested in create_inlier_mask, so not tested again here. + TODO: Add DEMs with different projection/grid to test that regridding works as expected. + """ + # Load example reference, to-be-aligned and outlines + ref_dem, tba_dem, outlines = load_examples() + + # - Check that it works with default parameters - # + dem_coreg, coreg_method, coreg_stats, inlier_mask = xdem.coreg.dem_coregistration(tba_dem, ref_dem) + + # Assert that outputs have expected format + assert isinstance(dem_coreg, xdem.DEM) + assert isinstance(coreg_method, xdem.coreg.Coreg) + assert isinstance(coreg_stats, pd.DataFrame) + + # Assert that default coreg_method is as expected + assert hasattr(coreg_method, "pipeline") + assert isinstance(coreg_method.pipeline[0], xdem.coreg.NuthKaab) + assert isinstance(coreg_method.pipeline[1], xdem.coreg.BiasCorr) + + # The result should be similar to applying the same coreg by hand with: + # - DEMs converted to Float32 + # - default inlier_mask + # - no resampling + coreg_method_ref = xdem.coreg.NuthKaab() + xdem.coreg.BiasCorr() + inlier_mask = xdem.coreg.create_inlier_mask(tba_dem, ref_dem) + coreg_method_ref.fit(ref_dem.astype("float32"), tba_dem.astype("float32"), inlier_mask=inlier_mask) + dem_coreg_ref = coreg_method_ref.apply(tba_dem, resample=False) + assert dem_coreg == dem_coreg_ref + + # Assert that coregistration improved the residuals + assert abs(coreg_stats["med_orig"].values) > abs(coreg_stats["med_coreg"].values) + assert coreg_stats["nmad_orig"].values > coreg_stats["nmad_coreg"].values + + # - Check some alternative arguments - # + # Test with filename instead of DEMs + dem_coreg2, _, _, _ = xdem.coreg.dem_coregistration(tba_dem.filename, ref_dem.filename) + assert dem_coreg2 == dem_coreg + + # Test saving to file + outfile = tempfile.NamedTemporaryFile() + _, _, _, _ = xdem.coreg.dem_coregistration(tba_dem, ref_dem, out_dem_path=outfile.name) + dem_coreg2 = xdem.DEM(outfile.name) + assert dem_coreg2 == dem_coreg + outfile.close() + + # Test that shapefile is properly taken into account - inlier_mask should be False inside outlines + # Need to use resample=True, to ensure that dem_coreg has same georef as inlier_mask + dem_coreg, coreg_method, coreg_stats, inlier_mask = xdem.coreg.dem_coregistration( + tba_dem, + ref_dem, + shp_list=[ + outlines, + ], + resample=True, + ) + gl_mask = outlines.create_mask(dem_coreg) + assert np.all(~inlier_mask[gl_mask]) + + # Testing with plot + out_fig = tempfile.NamedTemporaryFile(suffix=".png") + assert os.path.getsize(out_fig.name) == 0 + _, _, _, _ = xdem.coreg.dem_coregistration(tba_dem, ref_dem, plot=True, out_fig=out_fig.name) + assert os.path.getsize(out_fig.name) > 0 + out_fig.close() + + # Testing different coreg method + dem_coreg, coreg_method, coreg_stats, inlier_mask = xdem.coreg.dem_coregistration( + tba_dem, ref_dem, coreg_method=xdem.coreg.Deramp(degree=1) + ) + assert isinstance(coreg_method, xdem.coreg.Deramp) + assert abs(coreg_stats["med_orig"].values) > abs(coreg_stats["med_coreg"].values) + assert coreg_stats["nmad_orig"].values > coreg_stats["nmad_coreg"].values diff --git a/tests/test_examples.py b/tests/test_examples.py index 00b6901b..f6af544b 100644 --- a/tests/test_examples.py +++ b/tests/test_examples.py @@ -55,7 +55,8 @@ def test_array_content(self, rst_and_truevals: tuple[Raster, NDArrayf]) -> None: assert values == pytest.approx(truevals) - @pytest.mark.parametrize("rst_and_truenodata", [(ref_dem, 0), (tba_dem, 0), (ddem, 2316)]) # type: ignore + # Note: Following PR #329, no gaps on DEM edges after coregistration + @pytest.mark.parametrize("rst_and_truenodata", [(ref_dem, 0), (tba_dem, 0), (ddem, 0)]) # type: ignore def test_array_nodata(self, rst_and_truenodata: tuple[Raster, int]) -> None: """Let's also check that the data arrays have always the same number of not finite values""" diff --git a/tests/test_spatialstats.py b/tests/test_spatialstats.py index f953423f..07107b49 100644 --- a/tests/test_spatialstats.py +++ b/tests/test_spatialstats.py @@ -390,9 +390,9 @@ def test_sample_multirange_variogram_default(self) -> None: # Check the variogram output is consistent for a random state df = xdem.spatialstats.sample_empirical_variogram(values=self.diff, subsample=10, random_state=42) - assert df["exp"][15] == pytest.approx(23.517837524414062, abs=1e-3) + assert df["exp"][15] == pytest.approx(5.087792205810548, abs=1e-3) assert df["lags"][15] == pytest.approx(5120) - assert df["count"][15] == 2 + assert df["count"][15] == 5 # With a single run, no error can be estimated assert all(np.isnan(df.err_exp.values)) @@ -1180,7 +1180,7 @@ def test_patches_method_loop_quadrant(self) -> None: assert all(df.columns == ["nmad", "nb_indep_patches", "exact_areas", "areas"]) # Check the sampling is fixed for a random state - assert df["nmad"][0] == pytest.approx(1.8663623135417342, abs=1e-3) + assert df["nmad"][0] == pytest.approx(1.8392861049330378, abs=1e-3) assert df["nb_indep_patches"][0] == 100 assert df["exact_areas"][0] == pytest.approx(df["areas"][0], rel=0.2) diff --git a/xdem/coreg.py b/xdem/coreg.py index 7576590b..88669b2d 100644 --- a/xdem/coreg.py +++ b/xdem/coreg.py @@ -27,7 +27,7 @@ import skimage.transform from geoutils import spatial_tools from geoutils._typing import AnyNumber -from geoutils.georaster import RasterType +from geoutils.georaster import RasterType, raster from rasterio import Affine from tqdm import tqdm, trange @@ -43,65 +43,29 @@ _HAS_P3D = False -def filter_by_range(ds: rio.DatasetReader, rangelim: tuple[float, float]) -> MArrayf: +def _calculate_slope_and_aspect_nuthkaab(dem: NDArrayf) -> tuple[NDArrayf, NDArrayf]: """ - Function to filter values using a range. - """ - print("Excluding values outside of range: {:f} to {:f}".format(*rangelim)) - out = np.ma.masked_outside(ds, *rangelim) - out.set_fill_value(ds.fill_value) - return out - - -def filtered_slope(ds_slope: rio.DatasetReader, slope_lim: tuple[float, float] = (0.1, 40)) -> MArrayf: - print("Slope filter: %0.2f - %0.2f" % slope_lim) - print("Initial count: %i" % ds_slope.count()) - flt_slope = filter_by_range(ds_slope, slope_lim) - print(flt_slope.count()) - return flt_slope + Calculate the tangent of slope and aspect of a DEM, in radians, as needed for the Nuth & Kaab algorithm. + :param dem: A numpy array of elevation values. -def apply_xy_shift(ds: rio.DatasetReader, dx: float, dy: float) -> NDArrayf: - """ - Apply horizontal shift to rio dataset using Transform affine matrix - :param ds: DEM - :param dx: dx shift value - :param dy: dy shift value - - Returns: - Rio Dataset with updated transform + :returns: The tangent of slope and aspect (in radians) of the DEM. """ - print("X shift: ", dx) - print("Y shift: ", dy) - - # Update geotransform - gt_orig = ds.transform - gt_align = Affine(gt_orig.a, gt_orig.b, gt_orig.c + dx, gt_orig.d, gt_orig.e, gt_orig.f + dy) - - print("Original transform:", gt_orig) - print("Updated transform:", gt_align) + # Old implementation + # # Calculate the gradient of the slope + gradient_y, gradient_x = np.gradient(dem) + slope_tan = np.sqrt(gradient_x**2 + gradient_y**2) + aspect = np.arctan2(-gradient_x, gradient_y) + aspect += np.pi - # Update ds Geotransform - ds_align = ds - meta_update = ds.meta.copy() - meta_update({"driver": "GTiff", "height": ds.shape[1], "width": ds.shape[2], "transform": gt_align, "crs": ds.crs}) - # to split this part in two? - with rasterio.open(ds_align, "w", **meta_update) as dest: - dest.write(ds_align) + # xdem implementation + # slope, aspect = xdem.terrain.get_terrain_attribute( + # dem, attribute=["slope", "aspect"], resolution=1, degrees=False + # ) + # slope_tan = np.tan(slope) + # aspect = (aspect + np.pi) % (2 * np.pi) - return ds_align - - -def apply_z_shift(ds: rio.DatasetReader, dz: float) -> float: - """ - Apply vertical shift to rio dataset using Transform affine matrix - :param ds: DEM - :param dz: dz shift value - """ - src_dem = rio.open(ds) - a = src_dem.read(1) - ds_shift = a + dz - return ds_shift + return slope_tan, aspect def get_horizontal_shift( @@ -203,69 +167,100 @@ def residuals(parameters: tuple[float, float, float], y_values: NDArrayf, x_valu return east_offset, north_offset, c_parameter -def calculate_slope_and_aspect(dem: NDArrayf) -> tuple[NDArrayf, NDArrayf]: +def apply_xy_shift(transform: rio.transform.Affine, dx: float, dy: float) -> rio.transform.Affine: """ - Calculate the slope and aspect of a DEM. - - :param dem: A numpy array of elevation values. + Apply horizontal shift to a rasterio Affine transform + :param transform: The Affine transform of the raster + :param dx: dx shift value + :param dy: dy shift value - :returns: The slope (in pixels??) and aspect (in radians) of the DEM. + Returns: Updated transform """ - # TODO: Figure out why slope is called slope_px. What unit is it in? - # TODO: Change accordingly in the get_horizontal_shift docstring. + transform_shifted = Affine(transform.a, transform.b, transform.c + dx, transform.d, transform.e, transform.f + dy) + return transform_shifted - # Calculate the gradient of the slope - gradient_y, gradient_x = np.gradient(dem) - slope_px = np.sqrt(gradient_x**2 + gradient_y**2) - aspect = np.arctan2(-gradient_x, gradient_y) - aspect += np.pi +def calculate_ddem_stats( + ddem: NDArrayf | MArrayf, + inlier_mask: NDArrayf | None = None, + stats_list: tuple[Callable[[NDArrayf], AnyNumber], ...] | None = None, + stats_labels: tuple[str, ...] | None = None, +) -> dict[str, float]: + """ + Calculate standard statistics of ddem, e.g., to be used to compare before/after coregistration. + Default statistics are: count, mean, median, NMAD and std. + + :param ddem: The DEM difference to be analyzed. + :param inlier_mask: 2D boolean array of areas to include in the analysis (inliers=True). + :param stats_list: Statistics to compute on the DEM difference. + :param stats_labels: Labels of the statistics to compute (same length as stats_list). - return slope_px, aspect + Returns: a dictionary containing the statistics + """ + # Default stats - Cannot be put in default args due to circular import with xdem.spatialstats.nmad. + if (stats_list is None) or (stats_labels is None): + stats_list = (np.size, np.mean, np.median, xdem.spatialstats.nmad, np.std) + stats_labels = ("count", "mean", "median", "nmad", "std") + + # Check that stats_list and stats_labels are correct + if len(stats_list) != len(stats_labels): + raise ValueError("Number of items in `stats_list` and `stats_labels` should be identical.") + for stat, label in zip(stats_list, stats_labels): + if not callable(stat): + raise ValueError(f"Item {stat} in `stats_list` should be a callable/function.") + if not isinstance(label, str): + raise ValueError(f"Item {label} in `stats_labels` should be a string.") + + # Get the mask of valid and inliers pixels + nan_mask = ~np.isfinite(ddem) + if inlier_mask is None: + inlier_mask = np.ones(ddem.shape, dtype="bool") + valid_ddem = ddem[~nan_mask & inlier_mask] + + # Calculate stats + stats = {} + for stat, label in zip(stats_list, stats_labels): + stats[label] = stat(valid_ddem) + + return stats def deramping( - elevation_difference: NDArrayf, - x_coordinates: NDArrayf, - y_coordinates: NDArrayf, + ddem: NDArrayf | MArrayf, + x_coords: NDArrayf, + y_coords: NDArrayf, degree: int, + subsample: float | int = 1.0, verbose: bool = False, - metadata: dict[str, Any] | None = None, -) -> Callable[[NDArrayf, NDArrayf], NDArrayf]: +) -> tuple[Callable[[NDArrayf, NDArrayf], NDArrayf], tuple[NDArrayf, int]]: """ - Calculate a deramping function to account for rotational and non-rigid components of the elevation difference. + Calculate a deramping function to remove spatially correlated elevation differences that can be explained by \ + a polynomial of degree `degree`. - :param elevation_difference: The elevation difference array to analyse. - :param x_coordinates: x-coordinates of the above array (must have the same shape as elevation_difference) - :param y_coordinates: y-coordinates of the above array (must have the same shape as elevation_difference) + :param ddem: The elevation difference array to analyse. + :param x_coords: x-coordinates of the above array (must have the same shape as elevation_difference) + :param y_coords: y-coordinates of the above array (must have the same shape as elevation_difference) :param degree: The polynomial degree to estimate the ramp. + :param subsample: Subsample the input to increase performance. <1 is parsed as a fraction. >1 is a pixel count. :param verbose: Print the least squares optimization progress. - :param metadata: Optional. A metadata dictionary that will be updated with the key "deramp". - :returns: A callable function to estimate the ramp. + :returns: A callable function to estimate the ramp and the output of scipy.optimize.leastsq """ - # warnings.warn("This function is deprecated in favour of the new Coreg class.", DeprecationWarning) - # Extract only the finite values of the elevation difference and corresponding coordinates. - valid_diffs = elevation_difference[np.isfinite(elevation_difference)] - valid_x_coords = x_coordinates[np.isfinite(elevation_difference)] - valid_y_coords = y_coordinates[np.isfinite(elevation_difference)] - - # Randomly subsample the values if there are more than 500,000 of them. - if valid_x_coords.shape[0] > 500_000: - random_indices = np.random.randint(0, valid_x_coords.shape[0] - 1, 500_000) - valid_diffs = valid_diffs[random_indices] - valid_x_coords = valid_x_coords[random_indices] - valid_y_coords = valid_y_coords[random_indices] - - # Create a function whose residuals will be attempted to minimise - def estimate_values( - x_coordinates: NDArrayf, y_coordinates: NDArrayf, coefficients: NDArrayf, degree: int - ) -> NDArrayf: + # Extract only valid pixels + valid_mask = np.isfinite(ddem) + ddem = ddem[valid_mask] + x_coords = x_coords[valid_mask] + y_coords = y_coords[valid_mask] + + # Formulate the 2D polynomial whose coefficients will be solved for. + def poly2d(x_coords: NDArrayf, y_coords: NDArrayf, coefficients: NDArrayf) -> NDArrayf: """ Estimate values from a 2D-polynomial. - :param x_coordinates: x-coordinates of the difference array (must have the same shape as elevation_difference). - :param y_coordinates: y-coordinates of the difference array (must have the same shape as elevation_difference). + :param x_coords: x-coordinates of the difference array (must have the same shape as + elevation_difference). + :param y_coords: y-coordinates of the difference array (must have the same shape as + elevation_difference). :param coefficients: The coefficients (a, b, c, etc.) of the polynomial. :param degree: The degree of the polynomial. @@ -278,10 +273,10 @@ def estimate_values( if len(coefficients) != coefficient_size: raise ValueError() - # Do Amaury's black magic to estimate the values. + # Build the polynomial of degree `degree` estimated_values = np.sum( [ - coefficients[k * (k + 1) // 2 + j] * x_coordinates ** (k - j) * y_coordinates**j + coefficients[k * (k + 1) // 2 + j] * x_coords ** (k - j) * y_coords**j for k in range(degree + 1) for j in range(k + 1) ], @@ -289,61 +284,39 @@ def estimate_values( ) return estimated_values # type: ignore - # Create the error function - def residuals( - coefficients: NDArrayf, values: NDArrayf, x_coordinates: NDArrayf, y_coordinates: NDArrayf, degree: int - ) -> NDArrayf: - """ - Calculate the difference between the estimated and measured values. - - :param coefficients: Coefficients for the estimation. - :param values: The measured values. - :param x_coordinates: The x-coordinates of the values. - :param y_coordinates: The y-coordinates of the values. - :param degree: The degree of the polynomial to estimate. - - :returns: An array of residuals. - """ - error = estimate_values(x_coordinates, y_coordinates, coefficients, degree) - values - error = error[np.isfinite(error)] + def residuals(coefs: NDArrayf, x_coords: NDArrayf, y_coords: NDArrayf, targets: NDArrayf) -> NDArrayf: + """Return the optimization residuals""" + res = targets - poly2d(x_coords, y_coords, coefs) + return res[np.isfinite(res)] - return error - - # Run a least-squares minimisation to estimate the correct coefficients. - # TODO: Maybe remove the full_output? - initial_guess = np.zeros(shape=((degree + 1) * (degree + 2) // 2)) if verbose: - print("Deramping...") - coefficients = scipy.optimize.least_squares( - fun=residuals, - x0=initial_guess, - args=(valid_diffs, valid_x_coords, valid_y_coords, degree), - verbose=2 if verbose and degree > 1 else 0, - ).x - - # Generate the return-function which can correctly estimate the ramp + print("Estimating deramp function...") + + # reduce number of elements for speed + rand_indices = gu.spatial_tools.subsample_raster(x_coords, subsample=subsample, return_indices=True) + x_coords = x_coords[rand_indices] + y_coords = y_coords[rand_indices] + ddem = ddem[rand_indices] + + # Optimize polynomial parameters + coefs = scipy.optimize.leastsq( + func=residuals, + x0=np.zeros(shape=((degree + 1) * (degree + 2) // 2)), + args=(x_coords, y_coords, ddem), + ) - def ramp(x_coordinates: NDArrayf, y_coordinates: NDArrayf) -> NDArrayf: + def fit_ramp(x: NDArrayf, y: NDArrayf) -> NDArrayf: """ - Get the values of the ramp that corresponds to given coordinates. + Get the elevation difference biases (ramp) at the given coordinates. :param x_coordinates: x-coordinates of interest. :param y_coordinates: y-coordinates of interest. - :returns: The estimated ramp offsets. + :returns: The estimated elevation difference bias. """ - return estimate_values(x_coordinates, y_coordinates, coefficients, degree) - - if metadata is not None: - metadata["deramp"] = { - "coefficients": coefficients, - "nmad": xdem.spatialstats.nmad( - residuals(coefficients, valid_diffs, valid_x_coords, valid_y_coords, degree) - ), - } + return poly2d(x, y, coefs[0]) - # Return the function which can be used later. - return ramp + return fit_ramp, coefs def mask_as_array( @@ -462,6 +435,7 @@ def fit( dem_to_be_aligned: NDArrayf | MArrayf | RasterType, inlier_mask: NDArrayf | None = None, transform: rio.transform.Affine | None = None, + crs: rio.crs.CRS | None = None, weights: NDArrayf | None = None, subsample: float | int = 1.0, verbose: bool = False, @@ -473,7 +447,8 @@ def fit( :param reference_dem: 2D array of elevation values acting reference. :param dem_to_be_aligned: 2D array of elevation values to be aligned. :param inlier_mask: Optional. 2D boolean array of areas to include in the analysis (inliers=True). - :param transform: Optional. Transform of the reference_dem. Mandatory in some cases. + :param transform: Optional. Transform of the reference_dem. Mandatory if DEM provided as array. + :param crs: Optional. CRS of the reference_dem. Mandatory if DEM provided as array. :param weights: Optional. Per-pixel weights for the coregistration. :param subsample: Subsample the input to increase performance. <1 is parsed as a fraction. >1 is a pixel count. :param verbose: Print progress messages to stdout. @@ -503,6 +478,10 @@ def fit( transform = dem.transform elif transform is not None: warnings.warn(f"'{name}' of type {type(dem)} overrides the given 'transform'") + if crs is None: + crs = dem.crs + elif crs is not None: + warnings.warn(f"'{name}' of type {type(dem)} overrides the given 'crs'") """ if name == "reference_dem": @@ -514,6 +493,9 @@ def fit( if transform is None: raise ValueError("'transform' must be given if both DEMs are array-like.") + if crs is None: + raise ValueError("'crs' must be given if both DEMs are array-like.") + ref_dem, ref_mask = spatial_tools.get_array_and_mask(reference_dem) tba_dem, tba_mask = spatial_tools.get_array_and_mask(dem_to_be_aligned) @@ -539,20 +521,11 @@ def fit( full_mask = ( ~ref_mask & ~tba_mask & (np.asarray(inlier_mask) if inlier_mask is not None else True) ).squeeze() - # If subsample is less than one, it is parsed as a fraction (e.g. 0.8 => retain 80% of the values) - if subsample < 1.0: - subsample = int(np.count_nonzero(full_mask) * (1 - subsample)) - - # Randomly pick N inliers in the full_mask where N=subsample - random_falses = np.random.choice(np.argwhere(full_mask.flatten()).squeeze(), int(subsample), replace=False) - # Convert the 1D indices to 2D indices - cols = (random_falses // full_mask.shape[0]).astype(int) - rows = random_falses % full_mask.shape[0] - # Set the N random inliers to be parsed as outliers instead. - full_mask[rows, cols] = False + random_indices = gu.spatial_tools.subsample_raster(full_mask, subsample=subsample, return_indices=True) + full_mask[random_indices] = False # Run the associated fitting function - self._fit_func(ref_dem=ref_dem, tba_dem=tba_dem, transform=transform, weights=weights, verbose=verbose) + self._fit_func(ref_dem=ref_dem, tba_dem=tba_dem, transform=transform, crs=crs, weights=weights, verbose=verbose) # Flag that the fitting function has been called. self._fit_called = True @@ -560,26 +533,57 @@ def fit( return self @overload - def apply(self, dem: MArrayf, transform: rio.transform.Affine | None = None, **kwargs: Any) -> MArrayf: + def apply( + self, + dem: MArrayf, + transform: rio.transform.Affine | None = None, + crs: rio.crs.CRS | None = None, + resample: bool = True, + **kwargs: Any, + ) -> tuple[MArrayf, rio.transform.Affine]: ... @overload - def apply(self, dem: NDArrayf, transform: rio.transform.Affine | None = None, **kwargs: Any) -> NDArrayf: + def apply( + self, + dem: NDArrayf, + transform: rio.transform.Affine | None = None, + crs: rio.crs.CRS | None = None, + resample: bool = True, + **kwargs: Any, + ) -> tuple[NDArrayf, rio.transform.Affine]: ... @overload - def apply(self, dem: RasterType, transform: rio.transform.Affine | None = None, **kwargs: Any) -> RasterType: + def apply( + self, + dem: RasterType, + transform: rio.transform.Affine | None = None, + crs: rio.crs.CRS | None = None, + resample: bool = True, + **kwargs: Any, + ) -> RasterType: ... def apply( - self, dem: RasterType | NDArrayf | MArrayf, transform: rio.transform.Affine | None = None, **kwargs: Any - ) -> RasterType | NDArrayf | MArrayf: + self, + dem: RasterType | NDArrayf | MArrayf, + transform: rio.transform.Affine | None = None, + crs: rio.crs.CRS | None = None, + resample: bool = True, + **kwargs: Any, + ) -> RasterType | tuple[NDArrayf, rio.transform.Affine] | tuple[MArrayf, rio.transform.Affine]: """ Apply the estimated transform to a DEM. :param dem: A DEM array or Raster to apply the transform on. - :param transform: The transform object of the DEM. Required if 'dem' is an array and not a Raster. + :param transform: Optional. The transform object of the DEM. Mandatory if 'dem' provided as array. + :param crs: Optional. CRS of the reference_dem. Mandatory if 'dem' provided as array. + :param resample: If set to True, will reproject output Raster on the same grid as input. Otherwise, \ + only the transform might be updated and no resampling is done. :param kwargs: Any optional arguments to be passed to either self._apply_func or apply_matrix. + Kwarg `resampling` can be set to any rio.warp.Resampling to use a different resampling in case \ + `resample` is True, default is bilinear. :returns: The transformed DEM. """ @@ -591,9 +595,16 @@ def apply( transform = dem.transform else: warnings.warn(f"DEM of type {type(dem)} overrides the given 'transform'") + if crs is None: + crs = dem.crs + else: + warnings.warn(f"DEM of type {type(dem)} overrides the given 'crs'") + else: if transform is None: raise ValueError("'transform' must be given if DEM is array-like.") + if crs is None: + raise ValueError("'crs' must be given if DEM is array-like.") # The array to provide the functions will be an ndarray with NaNs for masked out areas. dem_array, dem_mask = spatial_tools.get_array_and_mask(dem) @@ -603,17 +614,23 @@ def apply( # See if a _apply_func exists try: + # arg `resample` must be passed to _apply_func, otherwise will be overwritten in CoregPipeline + kwargs["resample"] = resample + # Run the associated apply function - applied_dem = self._apply_func(dem_array, transform, **kwargs) # pylint: disable=assignment-from-no-return + applied_dem, out_transform = self._apply_func( + dem_array, transform, crs, **kwargs + ) # pylint: disable=assignment-from-no-return + # If it doesn't exist, use apply_matrix() except NotImplementedError: + + # In this case, resampling is necessary + if not resample: + raise NotImplementedError(f"Option `resample=False` not implemented for coreg method {self.__class__}") + kwargs.pop("resample") # Need to removed before passing to apply_matrix + if self.is_affine: # This only works on it's affine, however. - # If dilate_mask is not specified, set it to True by default - if "dilate_mask" in kwargs.keys(): - dilate_mask = kwargs["dilate_mask"] - kwargs.pop("dilate_mask") - else: - dilate_mask = True # Apply the matrix around the centroid (if defined, otherwise just from the center). applied_dem = apply_matrix( @@ -621,17 +638,41 @@ def apply( transform=transform, matrix=self.to_matrix(), centroid=self._meta.get("centroid"), - dilate_mask=dilate_mask, **kwargs, ) + out_transform = transform else: raise ValueError("Coreg method is non-rigid but has no implemented _apply_func") # Ensure the dtype is OK applied_dem = applied_dem.astype("float32") + # Set default dst_nodata + if isinstance(dem, gu.Raster): + dst_nodata = dem.nodata + else: + dst_nodata = raster._default_nodata(applied_dem.dtype) + + # Resample the array on the original grid + if resample: + # Set default resampling method if not specified in kwargs + resampling = kwargs.get("resampling", rio.warp.Resampling.bilinear) + if not isinstance(resampling, rio.warp.Resampling): + raise ValueError("`resampling` must be a rio.warp.Resampling algorithm") + + applied_dem, out_transform = rio.warp.reproject( + applied_dem, + destination=applied_dem, + src_transform=out_transform, + dst_transform=transform, + src_crs=crs, + dst_crs=crs, + resampling=resampling, + dst_nodata=dst_nodata, + ) + # Calculate final mask - final_mask = ~np.isfinite(applied_dem) + final_mask = np.logical_or(~np.isfinite(applied_dem), applied_dem == dst_nodata) # If the DEM was a masked_array, copy the mask to the new DEM if isinstance(dem, (np.ma.masked_array, gu.Raster)): @@ -639,12 +680,12 @@ def apply( else: applied_dem[final_mask] = np.nan - # If the input was a Raster, return a Raster as well. + # If the input was a Raster, returns a Raster, else returns array and transform if isinstance(dem, gu.Raster): - return dem.copy(new_array=applied_dem) - # return dem.from_array(applied_dem, transform, dem.crs, nodata=dem.nodata) - - return applied_dem + out_dem = dem.from_array(applied_dem, out_transform, crs, nodata=dem.nodata) + return out_dem + else: + return applied_dem, out_transform def apply_pts(self, coords: NDArrayf) -> NDArrayf: """ @@ -718,6 +759,7 @@ def residuals( dem_to_be_aligned: NDArrayf, inlier_mask: NDArrayf | None = None, transform: rio.transform.Affine | None = None, + crs: rio.crs.CRS | None = None, ) -> NDArrayf: """ Calculate the residual offsets (the difference) between two DEMs after applying the transformation. @@ -726,11 +768,12 @@ def residuals( :param dem_to_be_aligned: 2D array of elevation values to be aligned. :param inlier_mask: Optional. 2D boolean array of areas to include in the analysis (inliers=True). :param transform: Optional. Transform of the reference_dem. Mandatory in some cases. + :param crs: Optional. CRS of the reference_dem. Mandatory in some cases. :returns: A 1D array of finite residuals. """ # Use the transform to correct the DEM to be aligned. - aligned_dem = self.apply(dem_to_be_aligned, transform=transform) + aligned_dem, _ = self.apply(dem_to_be_aligned, transform=transform, crs=crs) # Format the reference DEM ref_arr, ref_mask = spatial_tools.get_array_and_mask(reference_dem) @@ -759,6 +802,7 @@ def error( error_type: list[str], inlier_mask: NDArrayf | None = None, transform: rio.transform.Affine | None = None, + crs: rio.crs.CRS | None = None, ) -> list[np.floating[Any] | float | np.integer[Any] | int]: ... @@ -770,6 +814,7 @@ def error( error_type: str = "nmad", inlier_mask: NDArrayf | None = None, transform: rio.transform.Affine | None = None, + crs: rio.crs.CRS | None = None, ) -> np.floating[Any] | float | np.integer[Any] | int: ... @@ -780,6 +825,7 @@ def error( error_type: str | list[str] = "nmad", inlier_mask: NDArrayf | None = None, transform: rio.transform.Affine | None = None, + crs: rio.crs.CRS | None = None, ) -> np.floating[Any] | float | np.integer[Any] | int | list[np.floating[Any] | float | np.integer[Any] | int]: """ Calculate the error of a coregistration approach. @@ -798,6 +844,7 @@ def error( :param error_type: The type of error measure to calculate. May be a list of error types. :param inlier_mask: Optional. 2D boolean array of areas to include in the analysis (inliers=True). :param transform: Optional. Transform of the reference_dem. Mandatory in some cases. + :param crs: Optional. CRS of the reference_dem. Mandatory in some cases. :returns: The error measure of choice for the residuals. """ @@ -809,6 +856,7 @@ def error( dem_to_be_aligned=dem_to_be_aligned, inlier_mask=inlier_mask, transform=transform, + crs=crs, ) def rms(res: NDArrayf) -> np.floating[Any]: @@ -897,7 +945,8 @@ def _fit_func( self, ref_dem: NDArrayf, tba_dem: NDArrayf, - transform: rio.transform.Affine | None, + transform: rio.transform.Affine, + crs: rio.crs.CRS, weights: NDArrayf | None, verbose: bool = False, ) -> None: @@ -915,7 +964,9 @@ def _to_matrix_func(self) -> NDArrayf: raise NotImplementedError("This should be implemented by subclassing") - def _apply_func(self, dem: NDArrayf, transform: rio.transform.Affine, **kwargs: Any) -> NDArrayf: + def _apply_func( + self, dem: NDArrayf, transform: rio.transform.Affine, crs: rio.crs.CRS, **kwargs: Any + ) -> tuple[NDArrayf, rio.transform.Affine]: # FOR DEVELOPERS: This function is only needed for non-rigid transforms. raise NotImplementedError("This should have been implemented by subclassing") @@ -946,7 +997,8 @@ def _fit_func( self, ref_dem: NDArrayf, tba_dem: NDArrayf, - transform: rio.transform.Affine | None, + transform: rio.transform.Affine, + crs: rio.crs.CRS, weights: NDArrayf | None, verbose: bool = False, ) -> None: @@ -971,6 +1023,18 @@ def _fit_func( self._meta["bias"] = bias + def _apply_func( + self, dem: NDArrayf, transform: rio.transform.Affine, crs: rio.crs.CRS, **kwargs: Any + ) -> tuple[NDArrayf, rio.transform.Affine]: + """Apply the BiasCorr function to a DEM.""" + return dem + self._meta["bias"], transform + + def _apply_pts_func(self, coords: NDArrayf) -> NDArrayf: + """Apply the BiasCorr function to a set of points.""" + new_coords = coords.copy() + new_coords[:, 2] += self._meta["bias"] + return new_coords + def _to_matrix_func(self) -> NDArrayf: """Convert the bias to a transform matrix.""" empty_matrix = np.diag(np.ones(4, dtype=float)) @@ -1015,7 +1079,8 @@ def _fit_func( self, ref_dem: NDArrayf, tba_dem: NDArrayf, - transform: rio.transform.Affine | None, + transform: rio.transform.Affine, + crs: rio.crs.CRS, weights: NDArrayf | None, verbose: bool = False, ) -> None: @@ -1106,94 +1171,30 @@ def _fit_func( self, ref_dem: NDArrayf, tba_dem: NDArrayf, - transform: rio.transform.Affine | None, + transform: rio.transform.Affine, + crs: rio.crs.CRS, weights: NDArrayf | None, verbose: bool = False, ) -> None: """Fit the dDEM between the DEMs to a least squares polynomial equation.""" - x_coords, y_coords = _get_x_and_y_coords(ref_dem.shape, transform) - ddem = ref_dem - tba_dem - valid_mask = np.isfinite(ddem) - ddem = ddem[valid_mask] - x_coords = x_coords[valid_mask] - y_coords = y_coords[valid_mask] - - # Formulate the 2D polynomial whose coefficients will be solved for. - def poly2d(x_coordinates: NDArrayf, y_coordinates: NDArrayf, coefficients: NDArrayf) -> NDArrayf: - """ - Estimate values from a 2D-polynomial. - - :param x_coordinates: x-coordinates of the difference array (must have the same shape as - elevation_difference). - :param y_coordinates: y-coordinates of the difference array (must have the same shape as - elevation_difference). - :param coefficients: The coefficients (a, b, c, etc.) of the polynomial. - :param degree: The degree of the polynomial. - - :raises ValueError: If the length of the coefficients list is not compatible with the degree. - - :returns: The values estimated by the polynomial. - """ - # Check that the coefficient size is correct. - coefficient_size = (self.degree + 1) * (self.degree + 2) / 2 - if len(coefficients) != coefficient_size: - raise ValueError() - - # Do Amaury's black magic to formulate and calculate the polynomial equation. - estimated_values = np.sum( - [ - coefficients[k * (k + 1) // 2 + j] * x_coordinates ** (k - j) * y_coordinates**j - for k in range(self.degree + 1) - for j in range(k + 1) - ], - axis=0, - ) - return estimated_values # type: ignore - - def residuals(coefs: NDArrayf, x_coords: NDArrayf, y_coords: NDArrayf, targets: NDArrayf) -> NDArrayf: - res = targets - poly2d(x_coords, y_coords, coefs) - return res[np.isfinite(res)] - - if verbose: - print("Estimating deramp function...") - - # reduce number of elements for speed - # Get number of points to extract - max_points = np.size(x_coords) - if (self.subsample <= 1) & (self.subsample >= 0): - npoints = int(self.subsample * max_points) - elif self.subsample > 1: - npoints = int(self.subsample) - else: - raise ValueError("`subsample` must be >= 0") - - if max_points > npoints: - indices = np.random.choice(max_points, npoints, replace=False) - x_coords = x_coords[indices] - y_coords = y_coords[indices] - ddem = ddem[indices] - - # Optimize polynomial parameters - coefs = scipy.optimize.leastsq( - func=residuals, - x0=np.zeros(shape=((self.degree + 1) * (self.degree + 2) // 2)), - args=(x_coords, y_coords, ddem), + x_coords, y_coords = _get_x_and_y_coords(ref_dem.shape, transform) + fit_ramp, coefs = deramping( + ddem, x_coords, y_coords, degree=self.degree, subsample=self.subsample, verbose=verbose ) - def fit_func(x: NDArrayf, y: NDArrayf) -> NDArrayf: - return poly2d(x, y, coefs[0]) - self._meta["coefficients"] = coefs[0] - self._meta["func"] = fit_func + self._meta["func"] = fit_ramp - def _apply_func(self, dem: NDArrayf, transform: rio.transform.Affine, **kwargs: Any) -> NDArrayf: + def _apply_func( + self, dem: NDArrayf, transform: rio.transform.Affine, crs: rio.crs.CRS, **kwargs: Any + ) -> tuple[NDArrayf, rio.transform.Affine]: """Apply the deramp function to a DEM.""" x_coords, y_coords = _get_x_and_y_coords(dem.shape, transform) ramp = self._meta["func"](x_coords, y_coords) - return dem + ramp + return dem + ramp, transform def _apply_pts_func(self, coords: NDArrayf) -> NDArrayf: """Apply the deramp function to a set of points.""" @@ -1252,7 +1253,8 @@ def _fit_func( self, ref_dem: NDArrayf, tba_dem: NDArrayf, - transform: rio.transform.Affine | None, + transform: rio.transform.Affine, + crs: rio.crs.CRS, weights: NDArrayf | None, verbose: bool = False, ) -> None: @@ -1262,18 +1264,21 @@ def _fit_func( for i, coreg in enumerate(self.pipeline): if verbose: print(f"Running pipeline step: {i + 1} / {len(self.pipeline)}") - coreg._fit_func(ref_dem, tba_dem_mod, transform=transform, weights=weights, verbose=verbose) + coreg._fit_func(ref_dem, tba_dem_mod, transform=transform, crs=crs, weights=weights, verbose=verbose) coreg._fit_called = True - tba_dem_mod = coreg.apply(tba_dem_mod, transform) + tba_dem_mod, out_transform = coreg.apply(tba_dem_mod, transform, crs) - def _apply_func(self, dem: NDArrayf, transform: rio.transform.Affine, **kwargs: Any) -> NDArrayf: + def _apply_func( + self, dem: NDArrayf, transform: rio.transform.Affine, crs: rio.crs.CRS, **kwargs: Any + ) -> tuple[NDArrayf, rio.transform.Affine]: """Apply the coregistration steps sequentially to a DEM.""" dem_mod = dem.copy() + out_transform = copy.copy(transform) for coreg in self.pipeline: - dem_mod = coreg.apply(dem_mod, transform, **kwargs) + dem_mod, out_transform = coreg.apply(dem_mod, out_transform, crs, **kwargs) - return dem_mod + return dem_mod, out_transform def _apply_pts_func(self, coords: NDArrayf) -> NDArrayf: """Apply the coregistration steps sequentially to a set of points.""" @@ -1342,7 +1347,8 @@ def _fit_func( self, ref_dem: NDArrayf, tba_dem: NDArrayf, - transform: rio.transform.Affine | None, + transform: rio.transform.Affine, + crs: rio.crs.CRS, weights: NDArrayf | None, verbose: bool = False, ) -> None: @@ -1354,10 +1360,18 @@ def _fit_func( # Make a new DEM which will be modified inplace aligned_dem = tba_dem.copy() + # Check that DEM CRS is projected, otherwise slope is not correctly calculated + if not crs.is_projected: + raise NotImplementedError( + f"DEMs CRS is {crs}. NuthKaab coregistration only works with \ +projected CRS. First, reproject your DEMs in a local projected CRS, e.g. UTM, and re-run." + ) + # Calculate slope and aspect maps from the reference DEM if verbose: print(" Calculate slope and aspect") - slope, aspect = calculate_slope_and_aspect(ref_dem) + + slope_tan, aspect = _calculate_slope_and_aspect_nuthkaab(ref_dem) # Make index grids for the east and north dimensions east_grid = np.arange(ref_dem.shape[1]) @@ -1401,7 +1415,7 @@ def _fit_func( # Estimate the horizontal shift from the implementation by Nuth and Kääb (2011) east_diff, north_diff, _ = get_horizontal_shift( # type: ignore - elevation_difference=elevation_difference, slope=slope, aspect=aspect + elevation_difference=elevation_difference, slope=slope_tan, aspect=aspect ) if verbose: pbar.write(f" #{i + 1:d} - Offset in pixels : ({east_diff:.2f}, {north_diff:.2f})") @@ -1465,6 +1479,29 @@ def _to_matrix_func(self) -> NDArrayf: return matrix + def _apply_func( + self, dem: NDArrayf, transform: rio.transform.Affine, crs: rio.crs.CRS, **kwargs: Any + ) -> tuple[NDArrayf, rio.transform.Affine]: + """Apply the Nuth & Kaab shift to a DEM.""" + offset_east = self._meta["offset_east_px"] * self._meta["resolution"] + offset_north = self._meta["offset_north_px"] * self._meta["resolution"] + + updated_transform = apply_xy_shift(transform, -offset_east, -offset_north) + bias = self._meta["bias"] + return dem + bias, updated_transform + + def _apply_pts_func(self, coords: NDArrayf) -> NDArrayf: + """Apply the Nuth & Kaab shift to a set of points.""" + offset_east = self._meta["offset_east_px"] * self._meta["resolution"] + offset_north = self._meta["offset_north_px"] * self._meta["resolution"] + + new_coords = coords.copy() + new_coords[:, 0] += offset_east + new_coords[:, 1] += offset_north + new_coords[:, 2] += self._meta["bias"] + + return new_coords + def invert_matrix(matrix: NDArrayf) -> NDArrayf: """Invert a transformation matrix.""" @@ -1484,7 +1521,6 @@ def apply_matrix( invert: bool = False, centroid: tuple[float, float, float] | None = None, resampling: int | str = "bilinear", - dilate_mask: bool = False, fill_max_search: int = 0, ) -> NDArrayf: """ @@ -1506,7 +1542,6 @@ def apply_matrix( :param invert: Invert the transformation matrix. :param centroid: The X/Y/Z transformation centroid. Irrelevant for pure translations. Defaults to the midpoint (Z=0) :param resampling: The resampling method to use. Can be `nearest`, `bilinear`, `cubic` or an integer from 0-5. - :param dilate_mask: DEPRECATED - This option does not do anything anymore. Will be removed in the future. :param fill_max_search: Set to > 0 value to fill the DEM before applying the transformation, to avoid spreading\ gaps. The DEM will be filled with rasterio.fill.fillnodata with max_search_distance set to fill_max_search.\ This is experimental, use at your own risk ! @@ -1540,7 +1575,7 @@ def apply_matrix( if not _has_cv2: raise ValueError("Optional dependency needed. Install 'opencv'") - nan_mask = spatial_tools.get_mask(dem) + nan_mask = ~np.isfinite(dem) assert np.count_nonzero(~nan_mask) > 0, "Given DEM had all nans." # Optionally, fill DEM around gaps to reduce spread of gaps if fill_max_search > 0: @@ -1579,7 +1614,7 @@ def apply_matrix( ).reshape(point_cloud.shape) # Estimate the vertical difference of old and new point cloud elevations. - deramp = deramping( + deramp, coeffs = deramping( (point_cloud[:, :, 2] - transformed_points[:, :, 2])[~nan_mask].flatten(), point_cloud[:, :, 0][~nan_mask].flatten(), point_cloud[:, :, 1][~nan_mask].flatten(), @@ -1612,20 +1647,6 @@ def apply_matrix( transformed_dem = skimage.transform.warp( filled_dem, inds, order=resampling_order, mode="constant", cval=np.nan, preserve_range=True ) - # TODO: remove these lines when dilate_mask is deprecated - # # Warp the NaN mask, setting true to all values outside the new frame. - # tr_nan_mask = ( - # skimage.transform.warp( - # nan_mask.astype("uint8"), inds, order=resampling_order, mode="constant", cval=1, preserve_range=True - # ) - # > 0 - # ) - - # if dilate_mask: - # tr_nan_mask = scipy.ndimage.binary_dilation(tr_nan_mask, iterations=resampling_order) - - # # Apply the transformed nan_mask - # transformed_dem[tr_nan_mask] = np.nan assert np.count_nonzero(~np.isnan(transformed_dem)) > 0, "Transformed DEM has all nans." @@ -1658,7 +1679,8 @@ def _fit_func( self, ref_dem: NDArrayf, tba_dem: NDArrayf, - transform: rio.transform.Affine | None, + transform: rio.transform.Affine, + crs: rio.crs.CRS, weights: NDArrayf | None, verbose: bool = False, ) -> None: @@ -1672,11 +1694,13 @@ def _fit_func( coefficients = np.polyfit(medians.index.mid, medians.values, deg=self.degree) self._meta["coefficients"] = coefficients - def _apply_func(self, dem: NDArrayf, transform: rio.transform.Affine, **kwargs: Any) -> NDArrayf: + def _apply_func( + self, dem: NDArrayf, transform: rio.transform.Affine, crs: rio.crs.CRS, **kwargs: Any + ) -> tuple[NDArrayf, rio.transform.Affine]: """Apply the scaling model to a DEM.""" model = np.poly1d(self._meta["coefficients"]) - return dem + model(dem) + return dem + model(dem), transform def _apply_pts_func(self, coords: NDArrayf) -> NDArrayf: """Apply the scaling model to a set of points.""" @@ -1744,6 +1768,7 @@ def _fit_func( ref_dem: NDArrayf, tba_dem: NDArrayf, transform: rio.transform.Affine, + crs: rio.crs.CRS, weights: NDArrayf | None, verbose: bool = False, ) -> None: @@ -1788,14 +1813,15 @@ def coregister(i: int) -> dict[str, Any] | BaseException | None: dem_to_be_aligned=tba_subset, transform=transform_subset, inlier_mask=mask_subset, + crs=crs, ) - nmad, median = coreg.error( reference_dem=ref_subset, dem_to_be_aligned=tba_subset, error_type=["nmad", "median"], inlier_mask=mask_subset, transform=transform_subset, + crs=crs, ) except Exception as exception: return exception @@ -1979,7 +2005,13 @@ def subdivide_array(self, shape: tuple[int, ...]) -> NDArrayf: shape = (shape[1], shape[2]) return spatial_tools.subdivide_array(shape, count=self.subdivision) - def _apply_func(self, dem: NDArrayf, transform: rio.transform.Affine, **kwargs: Any) -> NDArrayf: + def _apply_func( + self, dem: NDArrayf, transform: rio.transform.Affine, crs: rio.crs.CRS, **kwargs: Any + ) -> tuple[NDArrayf, rio.transform.Affine]: + + # Other option than resample=True is not implemented for this case + if "resample" in kwargs and kwargs["resample"] is not True: + raise NotImplementedError() points = self.to_points() @@ -2007,7 +2039,7 @@ def _apply_func(self, dem: NDArrayf, transform: rio.transform.Affine, **kwargs: resampling="linear", ) - return warped_dem + return warped_dem, transform def _apply_pts_func(self, coords: NDArrayf) -> NDArrayf: """Apply the scaling model to a set of points.""" @@ -2155,134 +2187,236 @@ def warp_dem( return warped.reshape(dem.shape) -hmodes_dict = { - "nuth_kaab": NuthKaab(), - "nuth_kaab_block": BlockwiseCoreg(coreg=NuthKaab(), subdivision=16), - "icp": ICP(), -} +def create_inlier_mask( + src_dem: RasterType, + ref_dem: RasterType, + shp_list: list[str | gu.Vector | None] | tuple[str | gu.Vector] | tuple[()] = (), + inout: list[int] | tuple[int] | tuple[()] = (), + filtering: bool = True, + dh_max: AnyNumber = None, + nmad_factor: AnyNumber = 5, + slope_lim: list[AnyNumber] | tuple[AnyNumber, AnyNumber] = (0.1, 40), +) -> NDArrayf: + """ + Create a mask of inliers pixels to be used for coregistration. The following pixels can be excluded: + - pixels within polygons of file(s) in shp_list (with corresponding inout element set to 1) - useful for \ + masking unstable terrain like glaciers. + - pixels outside polygons of file(s) in shp_list (with corresponding inout element set to -1) - useful to \ +delineate a known stable area. + - pixels with absolute dh (=src-ref) are larger than a given threshold + - pixels where absolute dh differ from the mean dh by more than a set threshold (with \ +filtering=True and nmad_factor) + - pixels with low/high slope (with filtering=True and set slope_lim values) + + :param src_dem: the source DEM to be coregistered, as a Raster or DEM instance. + :param ref_dem: the reference DEM, must have same grid as src_dem. To be used for filtering only. + :param shp_list: a list of one or several paths to shapefiles to use for masking. Default is none. + :param inout: a list of same size as shp_list. For each shapefile, set to 1 (resp. -1) to specify whether \ +to mask inside (resp. outside) of the polygons. Defaults to masking inside polygons for all shapefiles. + :param filtering: if set to True, pixels will be removed based on dh values or slope (see next arguments). + :param dh_max: remove pixels where abs(src - ref) is more than this value. + :param nmad_factor: remove pixels where abs(src - ref) differ by nmad_factor * NMAD from the median. + :param slope_lim: a list/tuple of min and max slope values, in degrees. Pixels outside this slope range will \ +be excluded. + + :returns: an boolean array of same shape as src_dem set to True for inlier pixels + """ + # - Sanity check on inputs - # + # Check correct input type of shp_list + if not isinstance(shp_list, (list, tuple)): + raise ValueError("`shp_list` must be a list/tuple") + for el in shp_list: + if not isinstance(el, (str, gu.Vector)): + raise ValueError("`shp_list` must be a list/tuple of strings or geoutils.Vector instance") + + # Check correct input type of inout + if not isinstance(inout, (list, tuple)): + raise ValueError("`inout` must be a list/tuple") + + if len(shp_list) > 0: + if len(inout) == 0: + # Fill inout with 1 + inout = [1] * len(shp_list) + elif len(inout) == len(shp_list): + # Check that inout contains only 1 and -1 + not_valid = [el for el in np.unique(inout) if ((el != 1) & (el != -1))] + if len(not_valid) > 0: + raise ValueError("`inout` must contain only 1 and -1") + else: + raise ValueError("`inout` must be of same length as shp") + + # Check slope_lim type + if not isinstance(slope_lim, (list, tuple)): + raise ValueError("`slope_lim` must be a list/tuple") + if len(slope_lim) != 2: + raise ValueError("`slope_lim` must contain 2 elements") + for el in slope_lim: + if (not isinstance(el, (int, float, np.integer, np.floating))) or (el < 0) or (el > 90): + raise ValueError("`slope_lim` must be a tuple/list of 2 elements in the range [0-90]") + + # Initialize inlier_mask with no masked pixel + inlier_mask = np.ones(src_dem.data.shape, dtype="bool") + + # - Create mask based on shapefiles - # + if len(shp_list) > 0: + for k, shp in enumerate(shp_list): + if isinstance(shp, str): + outlines = gu.Vector(shp) + else: + outlines = shp + mask_temp = outlines.create_mask(src_dem).astype("bool") + + # Append mask for given shapefile to final mask + if inout[k] == 1: + inlier_mask[mask_temp] = False + elif inout[k] == -1: + inlier_mask[~mask_temp] = False + + # - Filter possible outliers - # + if filtering: + # Calculate dDEM + ddem = src_dem - ref_dem -vmodes_dict = { - "median": BiasCorr(bias_func=np.median), - "mean": BiasCorr(bias_func=np.mean), - "deramp": Deramp(), -} + # Remove gross blunders with absolute threshold + if dh_max is not None: + inlier_mask[np.abs(ddem.data) > dh_max] = False + + # Remove blunders where dh differ by nmad_factor * NMAD from the median + nmad = xdem.spatialstats.nmad(ddem.data[inlier_mask]) + med = np.ma.median(ddem.data[inlier_mask]) + inlier_mask = inlier_mask & (np.abs(ddem.data - med) < nmad_factor * nmad).filled(False) + + # Exclude steep slopes for coreg + slope = xdem.terrain.slope(ref_dem) + inlier_mask[slope.data < slope_lim[0]] = False + inlier_mask[slope.data > slope_lim[1]] = False + + return inlier_mask def dem_coregistration( - src_dem_path: str, - ref_dem_path: str, + src_dem_path: str | RasterType, + ref_dem_path: str | RasterType, out_dem_path: str | None = None, - shpfile: str | None = None, - coreg_method: Coreg | None = None, - hmode: str = "nuth_kaab", - vmode: str = "median", - deramp_degree: int = 1, + coreg_method: Coreg | None = NuthKaab() + BiasCorr(), grid: str = "ref", + resample: bool = False, + resampling: rio.warp.Resampling | None = rio.warp.Resampling.bilinear, + shp_list: list[str | gu.Vector] | tuple[str | gu.Vector] | tuple[()] = (), + inout: list[int] | tuple[int] | tuple[()] = (), filtering: bool = True, + dh_max: AnyNumber = None, + nmad_factor: AnyNumber = 5, slope_lim: list[AnyNumber] | tuple[AnyNumber, AnyNumber] = (0.1, 40), plot: bool = False, out_fig: str = None, verbose: bool = False, -) -> tuple[xdem.DEM, pd.DataFrame]: +) -> tuple[xdem.DEM, xdem.coreg.Coreg, pd.DataFrame, NDArrayf]: """ A one-line function to coregister a selected DEM to a reference DEM. - Reads both DEMs, reprojects them on the same grid, mask content of shpfile, filter steep slopes and outliers, \ -run the coregistration, returns the coregistered DEM and some statistics. + + Reads both DEMs, reprojects them on the same grid, mask pixels based on shapefile(s), filter steep slopes and \ +outliers, run the coregistration, returns the coregistered DEM and some statistics. Optionally, save the coregistered DEM to file and make a figure. + For details on masking options, see `create_inlier_mask` function. - :param src_dem_path: path to the input DEM to be coregistered - :param ref_dem: path to the reference DEM + :param src_dem_path: Path to the input DEM to be coregistered + :param ref_dem_path: Path to the reference DEM :param out_dem_path: Path where to save the coregistered DEM. If set to None (default), will not save to file. - :param shpfile: path to a vector file containing areas to be masked for coregistration - :param coreg_method: The xdem coregistration method, or pipeline. If set to None, DEMs will be resampled to \ -ref grid and optionally filtered, but not coregistered. Will be used in priority over hmode and vmode. - :param hmode: The method to be used for horizontally aligning the DEMs, e.g. Nuth & Kaab or ICP. Can be any \ -of {list(vmodes_dict.keys())}. - :param vmode: The method to be used for vertically aligning the DEMs, e.g. mean/median bias correction or \ -deramping. Can be any of {list(hmodes_dict.keys())}. - :param deramp_degree: The degree of the polynomial for deramping. - :param grid: the grid to be used during coregistration, set either to "ref" or "src". - :param filtering: if set to True, filtering will be applied prior to coregistration - :param plot: Set to True to plot a figure of elevation diff before/after coregistration + :param coreg_method: The xdem coregistration method, or pipeline. + :param grid: The grid to be used during coregistration, set either to "ref" or "src". + :param resample: If set to True, will reproject output Raster on the same grid as input. Otherwise, only \ +the array/transform will be updated (if possible) and no resampling is done. Useful to avoid spreading data gaps. + :param resampling: The resampling algorithm to be used if `resample` is True. Default is bilinear. + :param shp_list: A list of one or several paths to shapefiles to use for masking. + :param inout: A list of same size as shp_list. For each shapefile, set to 1 (resp. -1) to specify whether \ +to mask inside (resp. outside) of the polygons. Defaults to masking inside polygons for all shapefiles. + :param filtering: If set to True, filtering will be applied prior to coregistration. + :param dh_max: Remove pixels where abs(src - ref) is more than this value. + :param nmad_factor: Remove pixels where abs(src - ref) differ by nmad_factor * NMAD from the median. + :param slope_lim: A list/tuple of min and max slope values, in degrees. Pixels outside this slope range will \ +be excluded. + :param plot: Set to True to plot a figure of elevation diff before/after coregistration. :param out_fig: Path to the output figure. If None will display to screen. - :param verbose: set to True to print details on screen during coregistration. + :param verbose: Set to True to print details on screen during coregistration. - :returns: a tuple containing 1) coregistered DEM as an xdem.DEM instance and 2) DataFrame of coregistration \ -statistics (count of obs, median and NMAD over stable terrain) before and after coreg. + :returns: A tuple containing 1) coregistered DEM as an xdem.DEM instance 2) the coregistration method \ +3) DataFrame of coregistration statistics (count of obs, median and NMAD over stable terrain) before and after \ +coregistration and 4) the inlier_mask used. """ - # Check input arguments - if (coreg_method is not None) and ((hmode is not None) or (vmode is not None)): - warnings.warn("Both `coreg_method` and `hmode/vmode` are set. Using coreg_method.") + # Check inputs + if not isinstance(coreg_method, xdem.coreg.Coreg): + raise ValueError("`coreg_method` must be an xdem.coreg instance (e.g. xdem.coreg.NuthKaab())") - if hmode not in list(hmodes_dict.keys()): - raise ValueError(f"vhmode must be in {list(hmodes_dict.keys())}") + if isinstance(ref_dem_path, str): + if not isinstance(src_dem_path, str): + raise ValueError( + f"`ref_dem_path` is string but `src_dem_path` has type {type(src_dem_path)}." + "Both must have same type." + ) + elif isinstance(ref_dem_path, gu.Raster): + if not isinstance(src_dem_path, gu.Raster): + raise ValueError( + f"`ref_dem_path` is of Raster type but `src_dem_path` has type {type(src_dem_path)}." + "Both must have same type." + ) + else: + raise ValueError("`ref_dem_path` must be either a string or a Raster") - if vmode not in list(vmodes_dict.keys()): - raise ValueError(f"vmode must be in {list(vmodes_dict.keys())}") + if grid not in ["ref", "src"]: + raise ValueError(f"`grid` must be either 'ref' or 'src' - currently set to {grid}") # Load both DEMs if verbose: print("Loading and reprojecting input data") - if grid == "ref": - ref_dem, src_dem = gu.spatial_tools.load_multiple_rasters([ref_dem_path, src_dem_path], ref_grid=0) - elif grid == "src": - ref_dem, src_dem = gu.spatial_tools.load_multiple_rasters([ref_dem_path, src_dem_path], ref_grid=1) + + if isinstance(ref_dem_path, str): + if grid == "ref": + ref_dem, src_dem = gu.spatial_tools.load_multiple_rasters([ref_dem_path, src_dem_path], ref_grid=0) + elif grid == "src": + ref_dem, src_dem = gu.spatial_tools.load_multiple_rasters([ref_dem_path, src_dem_path], ref_grid=1) else: - raise ValueError(f"`grid` must be either 'ref' or 'src' - currently set to {grid}") + ref_dem = ref_dem_path + src_dem = src_dem_path + if grid == "ref": + src_dem = src_dem.reproject(ref_dem, silent=True) + elif grid == "src": + ref_dem = ref_dem.reproject(src_dem, silent=True) # Convert to DEM instance with Float32 dtype + # TODO: Could only convert types int into float, but any other float dtype should yield very similar results ref_dem = xdem.DEM(ref_dem.astype(np.float32)) src_dem = xdem.DEM(src_dem.astype(np.float32)) # Create raster mask - if shpfile is not None: - outlines = gu.Vector(shpfile) - stable_mask = ~outlines.create_mask(src_dem) - else: - stable_mask = np.ones(src_dem.data.shape, dtype="bool") + if verbose: + print("Creating mask of inlier pixels") + + inlier_mask = create_inlier_mask( + src_dem, + ref_dem, + shp_list=shp_list, + inout=inout, + filtering=filtering, + dh_max=dh_max, + nmad_factor=nmad_factor, + slope_lim=slope_lim, + ) # Calculate dDEM ddem = src_dem - ref_dem - # Filter gross outliers in stable terrain - if filtering: - # Remove gross blunders where dh differ by 5 NMAD from the median - inlier_mask = stable_mask & (np.abs(ddem.data - np.median(ddem)) < 5 * xdem.spatialstats.nmad(ddem)).filled( - False - ) - - # Exclude steep slopes for coreg - slope = xdem.terrain.slope(ref_dem) - inlier_mask[slope.data < slope_lim[0]] = False - inlier_mask[slope.data > slope_lim[1]] = False - - else: - inlier_mask = stable_mask - # Calculate dDEM statistics on pixels used for coreg inlier_data = ddem.data[inlier_mask].compressed() nstable_orig, mean_orig = len(inlier_data), np.mean(inlier_data) med_orig, nmad_orig = np.median(inlier_data), xdem.spatialstats.nmad(inlier_data) # Coregister to reference - Note: this will spread NaN - # Better strategy: calculate shift, update transform, resample - if isinstance(coreg_method, xdem.coreg.Coreg): - coreg_method.fit(ref_dem, src_dem, inlier_mask, verbose=verbose) - dem_coreg = coreg_method.apply(src_dem, dilate_mask=False) - elif coreg_method is None: - # Horizontal coregistration - hcoreg_method = hmodes_dict[hmode] - hcoreg_method.fit(ref_dem, src_dem, inlier_mask, verbose=verbose) - dem_hcoreg = hcoreg_method.apply(src_dem, dilate_mask=False) - - # Vertical coregistration - vcoreg_method = vmodes_dict[vmode] - if vmode == "deramp": - vcoreg_method.degree = deramp_degree - vcoreg_method.fit(ref_dem, dem_hcoreg, inlier_mask, verbose=verbose) - dem_coreg = vcoreg_method.apply(dem_hcoreg, dilate_mask=False) - - ddem_coreg = dem_coreg - ref_dem + coreg_method.fit(ref_dem, src_dem, inlier_mask, verbose=verbose) + dem_coreg = coreg_method.apply(src_dem, resample=resample, resampling=resampling) + + # Calculate coregistered ddem (might need resampling if resample set to False), needed for stats and plot only + ddem_coreg = dem_coreg.reproject(ref_dem, silent=True) - ref_dem # Calculate new stats inlier_data = ddem_coreg.data[inlier_mask].compressed() @@ -2327,4 +2461,4 @@ def dem_coregistration( columns=("nstable_orig", "med_orig", "nmad_orig", "nstable_coreg", "med_coreg", "nmad_coreg"), ) - return dem_coreg, out_stats + return dem_coreg, coreg_method, out_stats, inlier_mask diff --git a/xdem/ddem.py b/xdem/ddem.py index eedf9893..f787792b 100644 --- a/xdem/ddem.py +++ b/xdem/ddem.py @@ -169,6 +169,7 @@ def interpolate( try: with warnings.catch_warnings(): warnings.filterwarnings("ignore", "Not enough valid bins") + warnings.filterwarnings("ignore", "invalid value encountered in divide") interpolated_ddem = np.asarray( xdem.volume.hypsometric_interpolation( interpolated_ddem, reference_elevation.data, mask=feature_mask diff --git a/xdem/examples.py b/xdem/examples.py index bbd5d94d..1bf165a7 100644 --- a/xdem/examples.py +++ b/xdem/examples.py @@ -103,7 +103,7 @@ def process_coregistered_examples(name: str, overwrite: bool = False) -> None: nuth_kaab = xdem.coreg.NuthKaab() nuth_kaab.fit(reference_raster, to_be_aligned_raster, inlier_mask=inlier_mask) - aligned_raster = nuth_kaab.apply(to_be_aligned_raster) + aligned_raster = nuth_kaab.apply(to_be_aligned_raster, resample=True) diff = reference_raster - aligned_raster