diff --git a/.gitignore b/.gitignore index fbbbe0d5..fb8db887 100644 --- a/.gitignore +++ b/.gitignore @@ -122,6 +122,11 @@ venv.bak/ # PyCharm project setting .idea +# VS code setting +.vscode/ +!.vscode/settings.json +!.vscode/launch.json + # Rope project settings .ropeproject diff --git a/dev-environment.yml b/dev-environment.yml index 291d6251..3c6ae16d 100644 --- a/dev-environment.yml +++ b/dev-environment.yml @@ -41,5 +41,6 @@ dependencies: # - richdem - pip: - - -e ./ + - noisyopt + - -e ./ # - git+https://github.com/GlacioHack/GeoUtils.git diff --git a/environment.yml b/environment.yml index 7f239c4c..cca13771 100644 --- a/environment.yml +++ b/environment.yml @@ -22,5 +22,9 @@ dependencies: - geoutils==0.0.11 - pip + - pip: + - noisyopt + + # - pip: # - git+https://github.com/GlacioHack/GeoUtils.git diff --git a/tests/test_coreg.py b/tests/test_coreg.py index 79e25133..124b9c62 100644 --- a/tests/test_coreg.py +++ b/tests/test_coreg.py @@ -193,7 +193,16 @@ def test_error_method(self) -> None: dem3 = dem1.copy() + np.random.random(size=dem1.size).reshape(dem1.shape) assert abs(biascorr.error(dem1, dem3, transform=affine, crs=crs, error_type="std") - np.std(dem3)) < 1e-6 - def test_coreg_example(self) -> None: + def test_ij_xy(self, i: int = 10, j: int = 20) -> None: + """ + Test the reversibility of ij2xy and xy2ij, which is important for point co-registration. + """ + x, y = self.ref.ij2xy(i, j, offset="ul") + i, j = self.ref.xy2ij(x, y, shift_area_or_point=False) + assert i == pytest.approx(10) + assert j == pytest.approx(20) + + def test_coreg_example(self, verbose: bool = False) -> None: """ Test the co-registration outputs performed on the example are always the same. This overlaps with the test in test_examples.py, but helps identify from where differences arise. @@ -201,13 +210,72 @@ def test_coreg_example(self) -> None: # Run co-registration nuth_kaab = xdem.coreg.NuthKaab() - nuth_kaab.fit(self.ref, self.tba, inlier_mask=self.inlier_mask) + nuth_kaab.fit(self.ref, self.tba, inlier_mask=self.inlier_mask, verbose=verbose) # Check the output metadata is always the same assert nuth_kaab._meta["offset_east_px"] == pytest.approx(-0.46255704521968716) assert nuth_kaab._meta["offset_north_px"] == pytest.approx(-0.13618536563846081) assert nuth_kaab._meta["bias"] == pytest.approx(-1.9815309753424906) + def test_coreg_example_gradiendescending( + self, downsampling: int = 10000, samples: int = 20000, inlier_mask: bool = True, verbose: bool = False + ) -> None: + """ + Test the co-registration outputs performed on the example are always the same. This overlaps with the test in + test_examples.py, but helps identify from where differences arise. + """ + if inlier_mask: + inlier_mask = self.inlier_mask + + # Run co-registration + gds = xdem.coreg.GradientDescending(downsampling=downsampling) + gds.fit_pts(self.ref, self.tba, inlier_mask=inlier_mask, verbose=verbose, samples=samples) + assert gds._meta["offset_east_px"] == pytest.approx(-0.496000, rel=1e-1, abs=0.1) + assert gds._meta["offset_north_px"] == pytest.approx(-0.1875, rel=1e-1, abs=0.1) + assert gds._meta["bias"] == pytest.approx(-1.8730, rel=1e-1) + + def test_coreg_example_shift_test( + self, + shift_px: tuple[float, float] = (1, 1), + verbose: bool = False, + coregs: tuple[str, ...] = ("NuthKaab", "GradientDescending", "NuthKaab_pts"), + downsampling: int = 10000, + ) -> None: + """ + For comparison of coreg algorithms: + Shift a ref_dem on purpose, e.g. shift_px = (1,1), and then apply coreg to shift it back. + """ + warnings.simplefilter("error") + + res = self.ref.res[0] + + # Shift DEM by shift_px + shifted_ref = self.ref.copy() + shifted_ref.shift(shift_px[0] * res, shift_px[1] * res) + + for cor in coregs: + # Do coreg on shifted DEM + if cor == "NuthKaab": + print("\n(1) NuthKaab") + nuth_kaab = xdem.coreg.NuthKaab() + nuth_kaab.fit(shifted_ref, self.ref, inlier_mask=self.inlier_mask, verbose=verbose) + assert nuth_kaab._meta["offset_east_px"] == pytest.approx(-shift_px[0], rel=1e-2) + assert nuth_kaab._meta["offset_north_px"] == pytest.approx(-shift_px[1], rel=1e-2) + + if cor == "GradientDescending": + print("\n(2) GradientDescending") + gds = xdem.coreg.GradientDescending(downsampling=downsampling) + gds.fit_pts(shifted_ref, self.ref, inlier_mask=self.inlier_mask, verbose=verbose) + assert gds._meta["offset_east_px"] == pytest.approx(-shift_px[0], rel=1e-2) + assert gds._meta["offset_north_px"] == pytest.approx(-shift_px[1], rel=1e-2) + + if cor == "NuthKaab_pts": + print("\n(3) NuthKaab running on pts_fit") + nuth_kaab = xdem.coreg.NuthKaab() + nuth_kaab.fit_pts(shifted_ref, self.ref, inlier_mask=self.inlier_mask, verbose=verbose) + assert nuth_kaab._meta["offset_east_px"] == pytest.approx(-shift_px[0], rel=1e-2) + assert nuth_kaab._meta["offset_north_px"] == pytest.approx(-shift_px[1], rel=1e-2) + def test_nuth_kaab(self) -> None: warnings.simplefilter("error") diff --git a/xdem/coreg.py b/xdem/coreg.py index 8af6811f..b729cc28 100644 --- a/xdem/coreg.py +++ b/xdem/coreg.py @@ -34,6 +34,7 @@ subdivide_array, subsample_array, ) +from noisyopt import minimizeCompass from rasterio import Affine from tqdm import tqdm, trange @@ -74,6 +75,119 @@ def _calculate_slope_and_aspect_nuthkaab(dem: NDArrayf) -> tuple[NDArrayf, NDArr return slope_tan, aspect +def apply_xyz_shift_df(df: pd.DataFrame, dx: float, dy: float, dz: float, z_name: str) -> NDArrayf: + """ + Apply shift to dataframe using Transform affine matrix + + :param df: DataFrame with columns 'E','N',z_name (height) + :param dz: dz shift value + """ + + new_df = df.copy() + new_df["E"] += dx + new_df["N"] += dy + new_df[z_name] -= dz + + return new_df + + +def residuals_df( + dem: NDArrayf, + df: pd.DataFrame, + shift_px: tuple[float, float], + dz: float, + z_name: str, + weight: str = None, + **kwargs: Any, +) -> pd.DataFrame: + """ + Calculate the difference between the DEM and points (a dataframe has 'E','N','z') after applying a shift. + + :param dem: DEM + :param df: A dataframe has 'E','N' and has been subseted according to DEM bonds and masks. + :param shift_px: The coordinates of shift pixels (e_px,n_px). + :param dz: The bias. + :param z_name: The column that be used to compare with dem_h. + :param weight: The column that be used as weights + + :returns: An array of residuals. + """ + + # shift ee,nn + ee, nn = (i * dem.res[0] for i in shift_px) + df_shifted = apply_xyz_shift_df(df, ee, nn, dz, z_name=z_name) + + # prepare DEM + arr_ = dem.data.astype(np.float32) + + # get residual error at the point on DEM. + i, j = dem.xy2ij(df_shifted["E"].values, df_shifted["N"].values, op=np.float32) + + # ndimage return + dem_h = scipy.ndimage.map_coordinates(arr_, [i, j], order=1, mode="nearest", **kwargs) + weight_ = df[weight] if weight else 1 + + return (df_shifted[z_name].values - dem_h) * weight_ + + +def _df_sampling_from_dem( + dem: RasterType, tba_dem: RasterType, samples: int = 10000, order: int = 1, offset: str | None = None +) -> pd.DataFrame: + """ + generate a datafram from a dem by random sampling. + + :param offset: The pixel’s center is returned by default, but a corner can be returned + by setting offset to one of ul, ur, ll, lr. + + :returns dataframe: N,E coordinates and z of DEM at sampling points. + """ + try: + if offset is None and (dem.tags["AREA_OR_POINT"] in ["Point", "point"]): + offset = "center" + else: + offset = "ul" + except KeyError: + offset = "ul" + + # Avoid edge, and mask-out area in sampling + width, length = dem.shape + i, j = np.random.randint(10, width - 10, samples), np.random.randint(10, length - 10, samples) + mask = dem.data.mask + + # Get value + x, y = dem.ij2xy(i[~mask[i, j]], j[~mask[i, j]], offset=offset) + z = scipy.ndimage.map_coordinates( + dem.data.astype(np.float32), [i[~mask[i, j]], j[~mask[i, j]]], order=order, mode="nearest" + ) + df = pd.DataFrame({"z": z, "N": y, "E": x}) + + # mask out from tba_dem + if tba_dem is not None: + df, _ = _mask_dataframe_by_dem(df, tba_dem) + + return df + + +def _mask_dataframe_by_dem(df: pd.DataFrame | NDArrayf, dem: RasterType) -> pd.DataFrame | NDArrayf: + """ + mask out the dataframe (has 'E','N' columns), or np.ndarray ([E,N]) by DEM's mask + return new dataframe and mask + """ + + final_mask = ~dem.data.mask + mask_raster = dem.copy(new_array=final_mask.astype(np.float32)) + + if isinstance(df, pd.DataFrame): + pts = np.array((df["E"].values, df["N"].values)).T + elif isinstance(df, np.ndarray): + pts = df + + ref_inlier = mask_raster.interp_points(pts, input_latlon=False, order=0) + new_df = df[ref_inlier.astype(bool)].copy() + + return new_df, ref_inlier.astype(bool) + + def get_horizontal_shift( elevation_difference: NDArrayf, slope: NDArrayf, aspect: NDArrayf, min_count: int = 20 ) -> tuple[float, float, float]: @@ -407,6 +521,8 @@ class CoregDict(TypedDict, total=False): coefficients: NDArrayf coreg_meta: list[Any] resolution: float + nmad: np.floating[Any] + # The pipeline metadata can have any value of the above pipeline: list[Any] @@ -538,6 +654,146 @@ def fit( return self + def fit_pts( + self: CoregType, + reference_dem: NDArrayf | MArrayf | RasterType | pd.DataFrame, + dem_to_be_aligned: RasterType, + inlier_mask: NDArrayf | Mask | None = None, + transform: rio.transform.Affine | None = None, + samples: int = 10000, + subsample: float | int = 1.0, + verbose: bool = False, + mask_high_curv: bool = False, + order: int = 1, + z_name: str = "z", + weights: str | None = None, + ) -> CoregType: + """ + Estimate the coregistration transform between a DEM and a reference point elevation data. + + :param reference_dem: Point elevation data acting reference. + :param dem_to_be_aligned: 2D array of elevation values to be aligned. + :param inlier_mask: Optional. 2D boolean array of areas to include in the analysis (inliers=True). + :param transform: Optional. Transform of the reference_dem. Mandatory in some cases. + :param subsample: Subsample the input to increase performance. <1 is parsed as a fraction. >1 is a pixel count. + :param verbose: Print progress messages to stdout. + :param order: interpolation 0=nearest, 1=linear, 2=cubic. + :param z_name: the column name of dataframe used for elevation differencing + :param weights: the column name of dataframe used for weight, should have the same length with z_name columns + """ + + # Validate that at least one input is a valid array-like (or Raster) types. + if not isinstance(dem_to_be_aligned, (np.ndarray, gu.Raster)): + raise ValueError( + "The dem_to_be_aligned needs to be array-like (implement a numpy array interface)." + f"'dem_to_be_aligned': {dem_to_be_aligned}" + ) + + # DEM to dataframe if ref_dem is raster + # How to make sure sample point locates in stable terrain? + if isinstance(reference_dem, (np.ndarray, gu.Raster)): + reference_dem = _df_sampling_from_dem( + reference_dem, dem_to_be_aligned, samples=samples, order=1, offset=None + ) + + # Validate that at least one input is a valid point data type. + if not isinstance(reference_dem, pd.DataFrame): + raise ValueError( + "The reference_dem needs to be point data format (pd.Dataframe)." f"'reference_dem': {reference_dem}" + ) + + # If any input is a Raster, use its transform if 'transform is None'. + # If 'transform' was given and any input is a Raster, trigger a warning. + # Finally, extract only the data of the raster. + for name, dem in [("dem_to_be_aligned", dem_to_be_aligned)]: + if hasattr(dem, "transform"): + if transform is None: + transform = dem.transform + elif transform is not None: + warnings.warn(f"'{name}' of type {type(dem)} overrides the given 'transform'") + + if transform is None: + raise ValueError("'transform' must be given if the dem_to_be_align DEM is array-like.") + + _, tba_mask = get_array_and_mask(dem_to_be_aligned) + + if np.all(tba_mask): + raise ValueError("'dem_to_be_aligned' had only NaNs") + + tba_dem = dem_to_be_aligned.copy() + ref_valid = np.isfinite(reference_dem["z"].values) + + if np.all(~ref_valid): + raise ValueError("'reference_dem' point data only contains NaNs") + + ref_dem = reference_dem[ref_valid] + + if mask_high_curv: + planc, profc = xdem.terrain.get_terrain_attribute( + tba_dem, attribute=["planform_curvature", "profile_curvature"] + ) + maxc = np.maximum(np.abs(planc), np.abs(profc)) + # Mask very high curvatures to avoid resolution biases + mask_hc = maxc.data > 5.0 + else: + mask_hc = np.zeros(tba_dem.data.mask.shape, dtype=bool) + if "planc" in ref_dem.columns and "profc" in ref_dem.columns: + ref_dem = ref_dem.query("planc < 5 and profc < 5") + else: + print("Warning: There is no curvature in dataframe. Set mask_high_curv=True for more robust results") + + points = np.array((ref_dem["E"].values, ref_dem["N"].values)).T + + # Make sure that the mask has an expected format. + if inlier_mask is not None: + if isinstance(inlier_mask, Mask): + inlier_mask = inlier_mask.data.filled(False).squeeze() + else: + inlier_mask = np.asarray(inlier_mask).squeeze() + assert inlier_mask.dtype == bool, f"Invalid mask dtype: '{inlier_mask.dtype}'. Expected 'bool'" + + if np.all(~inlier_mask): + raise ValueError("'inlier_mask' had no inliers.") + + final_mask = np.logical_and.reduce((~tba_dem.data.mask, inlier_mask, ~mask_hc)) + else: + final_mask = np.logical_and(~tba_dem.data.mask, ~mask_hc) + + mask_raster = tba_dem.copy(new_array=final_mask.astype(np.float32)) + + ref_inlier = mask_raster.interp_points(points, order=0) + ref_inlier = ref_inlier.astype(bool) + + if np.all(~ref_inlier): + raise ValueError("Intersection of 'reference_dem' and 'dem_to_be_aligned' had only NaNs") + + ref_dem = ref_dem[ref_inlier] + + # If subsample is not equal to one, subsampling should be performed. + if subsample != 1.0: + + # Randomly pick N inliers in the full_mask where N=subsample + random_valids = subsample_array(ref_dem["z"].values, subsample=subsample, return_indices=True) + + # Subset to the N random inliers + ref_dem = ref_dem.iloc[random_valids] + + # Run the associated fitting function + self._fit_pts_func( + ref_dem=ref_dem, + tba_dem=tba_dem, + transform=transform, + weights=weights, + verbose=verbose, + order=order, + z_name=z_name, + ) + + # Flag that the fitting function has been called. + self._fit_called = True + + return self + @overload def apply( self, @@ -1328,6 +1584,127 @@ def __add__(self, other: list[Coreg] | Coreg | CoregPipeline) -> CoregPipeline: return CoregPipeline(pipelines) +class GradientDescending(Coreg): + """ + Gradient Descending coregistration by Zhihao + """ + + def __init__( + self, + downsampling: int = 6000, + x0: tuple[float, float] = (0, 0), + bounds: tuple[float, float] = (-3, 3), + deltainit: int = 2, + deltatol: float = 0.004, + feps: float = 0.0001, + ) -> None: + """ + Instantiate gradient descending coregistration object. + + :param downsampling: The number of points of downsampling the df to run the coreg. Set None to disable it. + :param x0: The initial point of gradient descending iteration. + :param bounds: The boundary of the maximum shift. + :param deltainit: Initial pattern size. + :param deltatol: Target pattern size, or the precision you want achieve. + :param feps: Parameters for algorithm. Smallest difference in function value to resolve. + + The algorithm terminates when the iteration is locally optimal at the target pattern size 'deltatol', + or when the function value differs by less than the tolerance 'feps' along all directions. + + """ + self._meta: CoregDict + self.downsampling = downsampling + self.bounds = bounds + self.x0 = x0 + self.deltainit = deltainit + self.deltatol = deltatol + self.feps = feps + + super().__init__() + + def _fit_pts_func( + self, + ref_dem: pd.DataFrame, + tba_dem: NDArrayf, + transform: rio.transform.Affine | None, + verbose: bool = False, + order: int | None = 1, + z_name: str = "z", + weights: str | None = None, + ) -> None: + """Estimate the x/y/z offset between two DEMs. + :param ref_dem: the dataframe used as ref + :param tba_dem: the dem to be aligned + :param z_name: the column name of dataframe used for elevation differencing + :param weights: the column name of dataframe used for weight, should have the same length with z_name columns + :param order and transform is no needed but kept temporally for consistency. + + """ + + # downsampling if downsampling != None + if self.downsampling and len(ref_dem) > self.downsampling: + ref_dem = ref_dem.sample(frac=self.downsampling / len(ref_dem), random_state=42).copy() + + resolution = tba_dem.res[0] + + if verbose: + print("Running Gradient Descending Coreg - Zhihao (in preparation) ") + if self.downsampling: + print("Running on downsampling. The length of the gdf:", len(ref_dem)) + + elevation_difference = residuals_df(tba_dem, ref_dem, (0, 0), 0, z_name=z_name) + nmad_old = xdem.spatialstats.nmad(elevation_difference) + bias = np.nanmedian(elevation_difference) + print(" Statistics on initial dh:") + print(f" Median = {bias:.4f} - NMAD = {nmad_old:.4f}") + + # start iteration, find the best shifting px + def func_cost(x: tuple[float, float]) -> np.floating[Any]: + return xdem.spatialstats.nmad(residuals_df(tba_dem, ref_dem, x, 0, z_name=z_name, weight=weights)) + + res = minimizeCompass( + func_cost, + x0=self.x0, + deltainit=self.deltainit, + deltatol=self.deltatol, + feps=self.feps, + bounds=(self.bounds, self.bounds), + disp=verbose, + errorcontrol=False, + ) + + # Send the best solution to find all results + elevation_difference = residuals_df(tba_dem, ref_dem, (res.x[0], res.x[1]), 0, z_name=z_name) + + # results statistics + bias = np.nanmedian(elevation_difference) + nmad_new = xdem.spatialstats.nmad(elevation_difference) + + # Print final results + if verbose: + + print(f"\n Final offset in pixels (east, north) : ({res.x[0]:f}, {res.x[1]:f})") + print(" Statistics on coregistered dh:") + print(f" Median = {bias:.4f} - NMAD = {nmad_new:.4f}") + + self._meta["offset_east_px"] = res.x[0] + self._meta["offset_north_px"] = res.x[1] + self._meta["bias"] = bias + self._meta["resolution"] = resolution + + def _to_matrix_func(self) -> NDArrayf: + """Return a transformation matrix from the estimated offsets.""" + offset_east = self._meta["offset_east_px"] * self._meta["resolution"] + offset_north = self._meta["offset_north_px"] * self._meta["resolution"] + + matrix = np.diag(np.ones(4, dtype=float)) + matrix[0, 3] += offset_east + matrix[1, 3] += offset_north + matrix[2, 3] += self._meta["bias"] + + return matrix + + class NuthKaab(Coreg): """ Nuth and Kääb (2011) DEM coregistration. @@ -1466,15 +1843,149 @@ def _fit_func( if verbose: print(f"\n Final offset in pixels (east, north) : ({offset_east:f}, {offset_north:f})") print(" Statistics on coregistered dh:") - print(f" Median = {bias:.2f} - NMAD = {nmad_new:.2f}") + print(f" Median = {bias:.4f} - NMAD = {nmad_new:.4f}") self._meta["offset_east_px"] = offset_east self._meta["offset_north_px"] = offset_north self._meta["bias"] = bias self._meta["resolution"] = resolution + self._meta["nmad"] = nmad_new + + def _fit_pts_func( + self, + ref_dem: pd.DataFrame, + tba_dem: RasterType, + transform: rio.transform.Affine | None, + weights: NDArrayf | None, + verbose: bool = False, + order: int = 1, + z_name: str = "z", + ) -> None: + """ + Estimate the x/y/z offset between a DEM and points cloud. + 1. deleted elevation_function and nodata_function, shifting dataframe (points) instead of DEM. + 2. do not support latitude and longitude as inputs. + + :param z_name: the column name of dataframe used for elevation differencing + + """ + + if verbose: + print("Running Nuth and Kääb (2011) coregistration. Shift pts instead of shifting dem") + + tba_arr, _ = get_array_and_mask(tba_dem) + + resolution = tba_dem.res[0] + + # Make a new DEM which will be modified inplace + aligned_dem = tba_dem.copy() + + x_coords, y_coords = (ref_dem["E"].values, ref_dem["N"].values) + pts = np.array((x_coords, y_coords)).T + + # Calculate slope and aspect maps from the reference DEM + if verbose: + print(" Calculate slope and aspect") + slope, aspect = _calculate_slope_and_aspect_nuthkaab(tba_arr) + + slope_r = tba_dem.copy(new_array=np.ma.masked_array(slope[None, :, :], mask=~np.isfinite(slope[None, :, :]))) + aspect_r = tba_dem.copy(new_array=np.ma.masked_array(aspect[None, :, :], mask=~np.isfinite(aspect[None, :, :]))) + + # Initialise east and north pixel offset variables (these will be incremented up and down) + offset_east, offset_north, bias = 0.0, 0.0, 0.0 + + # Calculate initial DEM statistics + slope_pts = slope_r.interp_points(pts, mode="nearest") + aspect_pts = aspect_r.interp_points(pts, mode="nearest") + tba_pts = aligned_dem.interp_points(pts, mode="nearest") + + # Treat new_pts as a window, every time we shift it a little bit to fit the correct view + new_pts = pts.copy() + + elevation_difference = ref_dem[z_name].values - tba_pts + bias = float(np.nanmedian(elevation_difference)) + nmad_old = xdem.spatialstats.nmad(elevation_difference) + + if verbose: + print(" Statistics on initial dh:") + print(f" Median = {bias:.3f} - NMAD = {nmad_old:.3f}") + + # Iteratively run the analysis until the maximum iterations or until the error gets low enough + if verbose: + print(" Iteratively estimating horizontal shit:") + + # If verbose is True, will use progressbar and print additional statements + pbar = trange(self.max_iterations, disable=not verbose, desc=" Progress") + for i in pbar: + + # Estimate the horizontal shift from the implementation by Nuth and Kääb (2011) + east_diff, north_diff, _ = get_horizontal_shift( # type: ignore + elevation_difference=elevation_difference, slope=slope_pts, aspect=aspect_pts + ) + if verbose: + pbar.write(f" #{i + 1:d} - Offset in pixels : ({east_diff:.3f}, {north_diff:.3f})") + + # Increment the offsets with the overall offset + offset_east += east_diff + offset_north += north_diff + + # Assign offset to the coordinates of the pts + # Treat new_pts as a window, every time we shift it a little bit to fit the correct view + new_pts += [east_diff * resolution, north_diff * resolution] + + # Get new values + tba_pts = aligned_dem.interp_points(new_pts, mode="nearest") + elevation_difference = ref_dem[z_name].values - tba_pts + + # Mask out no data by dem's mask + pts_, mask_ = _mask_dataframe_by_dem(new_pts, tba_dem) + + # Update values relataed to shifted pts + elevation_difference = elevation_difference[mask_] + slope_pts = slope_r.interp_points(pts_, mode="nearest") + aspect_pts = aspect_r.interp_points(pts_, mode="nearest") + bias = float(np.nanmedian(elevation_difference)) + + # Update statistics + elevation_difference -= bias + nmad_new = xdem.spatialstats.nmad(elevation_difference) + nmad_gain = (nmad_new - nmad_old) / nmad_old * 100 + + if verbose: + pbar.write(f" Median = {bias:.3f} - NMAD = {nmad_new:.3f} ==> Gain = {nmad_gain:.3f}%") + + # Stop if the NMAD is low and a few iterations have been made + assert ~np.isnan(nmad_new), (offset_east, offset_north) + + offset = np.sqrt(east_diff**2 + north_diff**2) + if i > 1 and offset < self.offset_threshold: + if verbose: + pbar.write( + f" Last offset was below the residual offset threshold of {self.offset_threshold} -> stopping" + ) + break + + nmad_old = nmad_new + + # Print final results + if verbose: + print( + "\n Final offset in pixels (east, north, bais) : ({:f}, {:f},{:f})".format( + offset_east, offset_north, bias + ) + ) + print(" Statistics on coregistered dh:") + print(f" Median = {bias:.3f} - NMAD = {nmad_new:.3f}") + + self._meta["offset_east_px"] = offset_east + self._meta["offset_north_px"] = offset_north + self._meta["bias"] = bias + self._meta["resolution"] = resolution + self._meta["nmad"] = nmad_new def _to_matrix_func(self) -> NDArrayf: """Return a transformation matrix from the estimated offsets.""" + offset_east = self._meta["offset_east_px"] * self._meta["resolution"] offset_north = self._meta["offset_north_px"] * self._meta["resolution"]