diff --git a/bin/improver b/bin/improver index 7e0cd5a6aa..f0f902b02e 100755 --- a/bin/improver +++ b/bin/improver @@ -52,9 +52,9 @@ set -eu export IMPROVER_DIR="$(cd $(dirname $0)/../ && pwd -P)" # Apply site-specific setup if necessary. -if [[ -f "${IMPROVER_SITE_INIT:=$IMPROVER_DIR/etc/site-init}" ]]; then - . "$IMPROVER_SITE_INIT" -fi +# if [[ -f "${IMPROVER_SITE_INIT:=$IMPROVER_DIR/etc/site-init}" ]]; then +# . "$IMPROVER_SITE_INIT" +# fi # Put our library and scripts in the paths. export PATH="$IMPROVER_DIR/bin/:$PATH" diff --git a/improver/calibration/__init__.py b/improver/calibration/__init__.py index 2fc015870b..d35dcb1abd 100644 --- a/improver/calibration/__init__.py +++ b/improver/calibration/__init__.py @@ -30,10 +30,11 @@ # POSSIBILITY OF SUCH DAMAGE. """init for calibration""" -from collections import OrderedDict from typing import List, Optional, Tuple -from iris.cube import Cube +from iris.cube import Cube, CubeList +import numpy as np +import scipy from improver.metadata.probabilistic import ( get_diagnostic_cube_name_from_probability_name, @@ -42,8 +43,8 @@ def split_forecasts_and_truth( - cubes: List[Cube], truth_attribute: str -) -> Tuple[Cube, Cube, Optional[Cube]]: + cubes: List[Cube], truth_attribute: str, land_sea_mask_name: bool +) -> Tuple[Cube, Cube, Optional[CubeList], Optional[Cube]]: """ A common utility for splitting the various inputs cubes required for calibration CLIs. These are generally the forecast cubes, historic truths, @@ -72,47 +73,167 @@ def split_forecasts_and_truth( IOError: Missing truth or historical forecast in input cubes. """ - grouped_cubes = {} + cubes_dict = {"truth": {}, "land_sea_mask": {}, "other": {}, "historic_forecasts": {}, "additional_fields": {}} + # split non-land_sea_mask cubes on forecast vs truth + truth_key, truth_value = truth_attribute.split("=") for cube in cubes: try: cube_name = get_diagnostic_cube_name_from_probability_name(cube.name()) except ValueError: cube_name = cube.name() - grouped_cubes.setdefault(cube_name, []).append(cube) - if len(grouped_cubes) == 1: - # Only one group - all forecast/truth cubes - land_sea_mask = None - diag_name = list(grouped_cubes.keys())[0] - elif len(grouped_cubes) == 2: - # Two groups - the one with exactly one cube matching a name should - # be the land_sea_mask, since we require more than 2 cubes in - # the forecast/truth group - grouped_cubes = OrderedDict( - sorted(grouped_cubes.items(), key=lambda kv: len(kv[1])) - ) - # landsea name should be the key with the lowest number of cubes (1) - landsea_name, diag_name = list(grouped_cubes.keys()) - land_sea_mask = grouped_cubes[landsea_name][0] - if len(grouped_cubes[landsea_name]) != 1: - raise IOError("Expected one cube for land-sea mask.") - else: - raise ValueError("Must have cubes with 1 or 2 distinct names.") - - # split non-land_sea_mask cubes on forecast vs truth - truth_key, truth_value = truth_attribute.split("=") - input_cubes = grouped_cubes[diag_name] - grouped_cubes = {"truth": [], "historical forecast": []} - for cube in input_cubes: if cube.attributes.get(truth_key) == truth_value: - grouped_cubes["truth"].append(cube) + cubes_dict["truth"].setdefault(cube_name, []).append(cube) + elif cube_name == land_sea_mask_name: + cubes_dict["land_sea_mask"].setdefault(cube_name, []).append(cube) else: - grouped_cubes["historical forecast"].append(cube) + blend_time_list = [c for c in cube.coords() if c.name() == "blend_time"] + if len(blend_time_list): + cube.remove_coord("blend_time") + if cube.coords("forecast_period"): + cube.coord("forecast_period").attributes = {} + if cube.coords("forecast_reference_time"): + cube.coord("forecast_reference_time").attributes = {} + cubes_dict["other"].setdefault(cube_name, []).append(cube) + + if len(cubes_dict["truth"]) > 1: + msg = (f"Truth supplied for multiple diagnostics {list(cubes_dict['truth'].keys())}. " + "The truth should only exist for one diagnostic.") + raise ValueError(msg) + + if land_sea_mask_name and not cubes_dict["land_sea_mask"]: + raise IOError("Expected one cube for land-sea mask with " + f"the name {land_sea_mask_name}.") - missing_inputs = " and ".join(k for k, v in grouped_cubes.items() if not v) + diag_name = list(cubes_dict["truth"].keys())[0] + cubes_dict["historic_forecasts"] = cubes_dict["other"][diag_name] + for k, v in cubes_dict["other"].items(): + if k != diag_name: + cubes_dict["additional_fields"].setdefault(k, []).extend(v) + + missing_inputs = " and ".join(k for k, v in cubes_dict.items() if k in ["truth", "historic_forecasts"] and not v) if missing_inputs: raise IOError(f"Missing {missing_inputs} input.") - truth = MergeCubes()(grouped_cubes["truth"]) - forecast = MergeCubes()(grouped_cubes["historical forecast"]) + truth = MergeCubes()(filter_obs(cubes_dict["truth"][diag_name])) + forecast = MergeCubes()(cubes_dict["historic_forecasts"]) + additional_fields = CubeList([MergeCubes()(cubes_dict["additional_fields"][k]) for k in cubes_dict["additional_fields"]]) + return forecast, truth, additional_fields, cubes_dict["land_sea_mask"] + + +def filter_obs(spot_truths_cubelist: CubeList) -> CubeList: + """Deal with observation sites that failed to provide an altitude, latitude + and longitude. As we've ensured that each observation site has an entry + for each timestep, even if no observation was available at that timestep, + some observations will not have an altitude, latitudes or longitudes. + There is also the potential for some sites to report different altitudes, + latitudes and longitudes at different times. For simplicity, the altitude, + latitude and longitude from the first timestep is used as the altitude, + latitude and longitude throughout the input CubeList. + + Args: + spot_truths_cubelist: + Cubelist of spot truths + + Returns: + CubeList of spot truths with consistent values for the altitude, + latitude and longitude at each timestep. + """ + if not all([c if c.coords("spot_index") else False for c in spot_truths_cubelist]): + return spot_truths_cubelist + altitudes = np.squeeze( + scipy.stats.mode( + np.stack([c.coord("altitude").points for c in spot_truths_cubelist]), axis=0 + )[0] + ) + latitudes = np.squeeze( + scipy.stats.mode( + np.stack([c.coord(axis="y").points for c in spot_truths_cubelist]), axis=0 + )[0] + ) + longitudes = np.squeeze( + scipy.stats.mode( + np.stack([c.coord(axis="x").points for c in spot_truths_cubelist]), axis=0 + )[0] + ) + + altitudes = np.nan_to_num(altitudes) + latitudes = np.nan_to_num(latitudes) + longitudes = np.nan_to_num(longitudes) + + for index, _ in enumerate(spot_truths_cubelist): + + spot_truths_cubelist[index].coord("altitude").points = altitudes + spot_truths_cubelist[index].coord(axis="y").points = latitudes + spot_truths_cubelist[index].coord(axis="x").points = longitudes + + return spot_truths_cubelist + + +def split_forecasts_and_coeffs( + cubes: List[Cube], land_sea_mask_name: bool +) -> Tuple[Cube, Optional[Cube], CubeList, Optional[Cube]]: + """ + A common utility for splitting the various inputs cubes required for + calibration CLIs. These are generally the forecast cubes, historic truths, + and in some instances a land-sea mask is also required. + + Args: + cubes: + A list of input cubes which will be split into relevant groups. + These include the historical forecasts, in the format supported by + the calibration CLIs, and the truth cubes. + truth_attribute: + An attribute and its value in the format of "attribute=value", + which must be present on truth cubes. + + Returns: + - A cube containing all the historic forecasts. + - A cube containing all the truth data. + - If found within the input cubes list a land-sea mask will be + returned, else None is returned. + + Raises: + ValueError: + An unexpected number of distinct cube names were passed in. + IOError: + More than one cube was identified as a land-sea mask. + IOError: + Missing truth or historical forecast in input cubes. + """ + + cubes_dict = {"current_forecast": {}, "coefficients": {}, "land_sea_mask": {}, "additional_fields": {}, "other": {}} + # split non-land_sea_mask cubes on forecast vs truth + for cubelist in cubes: + for cube in cubelist: + try: + cube_name = get_diagnostic_cube_name_from_probability_name(cube.name()) + except ValueError: + cube_name = cube.name() + if "emos_coefficient" in cube_name or "shape_parameters" == cube_name or cube_name in ["fbar", "fsig", "ybar", "ysig"]: + cubes_dict["coefficients"].setdefault(cube_name, []).append(cube) + elif cube_name == land_sea_mask_name: + cubes_dict["land_sea_mask"] = cube + else: + cubes_dict["other"].setdefault(cube_name, []).append(cube) + + if land_sea_mask_name and not cubes_dict["land_sea_mask"]: + raise IOError("Expected one cube for land-sea mask with " + f"the name {land_sea_mask_name}.") + + diagnostic_standard_name = list(set([v[0].attributes["diagnostic_standard_name"] for v in cubes_dict["coefficients"].values() if v[0].attributes.get("diagnostic_standard_name")])) + if len(diagnostic_standard_name) == 1: + diagnostic_standard_name, = diagnostic_standard_name + else: + msg = ("The coefficients cubes are expected to have one consistent " + f"diagnostic_standard_name attribute, rather than {diagnostic_standard_name}") + raise AttributeError(msg) + + cubes_dict["current_forecast"], = cubes_dict["other"][diagnostic_standard_name] + for k, v in cubes_dict["other"].items(): + if k != diagnostic_standard_name: + cubes_dict["additional_fields"].setdefault(k, []).extend(v) + + additional_fields = CubeList([cubes_dict["additional_fields"][k][0] for k in cubes_dict["additional_fields"]]) + coefficients = CubeList([cubes_dict["coefficients"][k][0] for k in cubes_dict["coefficients"]]) - return forecast, truth, land_sea_mask + return cubes_dict["current_forecast"], additional_fields, coefficients, cubes_dict["land_sea_mask"] \ No newline at end of file diff --git a/improver/calibration/ensemble_calibration.py b/improver/calibration/ensemble_calibration.py index 2379d66fbe..677728cad5 100644 --- a/improver/calibration/ensemble_calibration.py +++ b/improver/calibration/ensemble_calibration.py @@ -49,7 +49,7 @@ from numpy import ndarray from scipy import stats from scipy.optimize import minimize -from scipy.stats import norm +from scipy.stats import genlogistic, norm from improver import BasePlugin, PostProcessingPlugin from improver.calibration.utilities import ( @@ -61,6 +61,9 @@ flatten_ignoring_masked_data, forecast_coords_match, merge_land_and_sea, + standardise_forecast_and_truths, + standardise_forecasts, + standardise_truths, statsmodels_available, ) from improver.ensemble_copula_coupling.ensemble_copula_coupling import ( @@ -143,6 +146,7 @@ def __init__( # depending upon the distribution requested. The names of these # distributions match the names of distributions in scipy.stats. self.minimisation_dict = { + "genlogistic": self.calculate_genlogistic_ls, "norm": self.calculate_normal_crps, "truncnorm": self.calculate_truncated_normal_crps, } @@ -262,7 +266,7 @@ def _process_points_independently( self, minimisation_function: Callable, initial_guess: Union[List[float], ndarray], - forecast_predictor: Cube, + forecast_predictors: CubeList, truth: Cube, forecast_var: Cube, predictor: str, @@ -286,41 +290,64 @@ def _process_points_independently( coefficients array is (number of coefficients, length of spatial dimensions). Order of coefficients is [alpha, beta, gamma, delta]. """ - ( - initial_guess, - forecast_predictor.data, - forecast_var.data, - truth.data, - sqrt_pi, - ) = self._set_float64_precision( - initial_guess, forecast_predictor.data, forecast_var.data, truth.data - ) + # ( + # initial_guess, + # forecast_predictor.data, + # forecast_var.data, + # truth.data, + # sqrt_pi, + # ) = self._set_float64_precision( + # initial_guess, forecast_predictor.data, forecast_var.data, truth.data + # ) + + initial_guess = np.array(initial_guess, dtype=np.float64) + truth.data = truth.data.astype(np.float64) + sqrt_pi = np.sqrt(np.pi).astype(np.float64) + for forecast_predictor in forecast_predictors: + forecast_predictor.data.astype(np.float64) + forecast_var.data = forecast_var.data.astype(np.float64) sindex = [ forecast_predictor.coord(axis="y"), forecast_predictor.coord(axis="x"), ] + y_name = truth.coord(axis="y").name() + x_name = truth.coord(axis="x").name() + optimised_coeffs = [] - for index, (fp_slice, truth_slice, fv_slice) in enumerate( - zip( - forecast_predictor.slices_over(sindex), - truth.slices_over(sindex), - forecast_var.slices_over(sindex), - ) - ): - optimised_coeffs.append( - self._minimise_caller( - minimisation_function, - initial_guess[index], - fp_slice.data.T, - truth_slice.data, - fv_slice.data, - sqrt_pi, - predictor, - ).x.astype(np.float32) - ) + for index, (truth_slice, fv_slice) in enumerate(zip(truth.slices_over(sindex), forecast_var.slices_over(sindex))): + if truth_slice.coords("wmo_id"): + constr = iris.Constraint(wmo_id=truth_slice.coord("wmo_id").points) + else: + constr = iris.Constraint( + coord_values={y_name: lambda cell: any(np.isclose(cell.point, truth_slice.coord(axis="y").points)), + x_name: lambda cell: any(np.isclose(cell.point, truth_slice.coord(axis="x").points))}) + new_fp_data = [] + for fp_cube in forecast_predictors: + extracted_cube = fp_cube.extract(constr) + if not fp_cube.coords("time"): + # Broadcast static predictors to the required shape. + multitime_fp_data = np.broadcast_to(extracted_cube.data, (len(truth_slice.coord("time").points),)+extracted_cube.shape) + new_fp_data.append(multitime_fp_data) + else: + new_fp_data.append(extracted_cube.data) + new_fp_data = np.stack(new_fp_data) + if all(np.isnan(truth_slice.data)): + optimised_coeffs.append(np.array(initial_guess[index], dtype=np.float32)) + else: + optimised_coeffs.append( + self._minimise_caller( + minimisation_function, + initial_guess[index], + new_fp_data.T, + truth_slice.data, + fv_slice.data, + sqrt_pi, + predictor, + ).x.astype(np.float32) + ) y_coord = forecast_predictor.coord(axis="y") x_coord = forecast_predictor.coord(axis="x") @@ -343,7 +370,7 @@ def _process_points_together( self, minimisation_function: Callable, initial_guess: Union[List[float], ndarray], - forecast_predictor: Cube, + forecast_predictors: CubeList, truth: Cube, forecast_var: Cube, predictor: str, @@ -367,15 +394,21 @@ def _process_points_together( truth_data = flatten_ignoring_masked_data(truth.data) forecast_var_data = flatten_ignoring_masked_data(forecast_var.data) if predictor.lower() == "mean": - forecast_predictor_data = flatten_ignoring_masked_data( - forecast_predictor.data - ) + fp_data_list = [] + for fp_cube in forecast_predictors: + fp_data = fp_cube.data + if not fp_cube.coords("time"): + # Broadcast static predictors to the required shape. + fp_data = np.broadcast_to(fp_cube.data, (len(truth.coord("time").points),)+fp_cube.shape) + fp_data_list.append(flatten_ignoring_masked_data(fp_data)) + forecast_predictor_data = np.vstack(fp_data_list).T elif predictor.lower() == "realizations": # Need to transpose this array so there are columns for each # ensemble member rather than rows. - forecast_predictor_data = flatten_ignoring_masked_data( - forecast_predictor.data, preserve_leading_dimension=True - ).T + forecast_predictor_data = np.stack([flatten_ignoring_masked_data(fp_cube.data, preserve_leading_dimension=True).T for fp_cube in forecast_predictors]) + # forecast_predictor_data = flatten_ignoring_masked_data( + # forecast_predictor.data, preserve_leading_dimension=True + # ).T ( initial_guess, forecast_predictor_data, @@ -395,7 +428,6 @@ def _process_points_together( sqrt_pi, predictor, ) - if not optimised_coeffs.success: msg = ( "Minimisation did not result in convergence after " @@ -410,7 +442,7 @@ def _process_points_together( def process( self, initial_guess: List[float], - forecast_predictor: Cube, + forecast_predictors: CubeList, truth: Cube, forecast_var: Cube, predictor: str, @@ -472,13 +504,14 @@ def process( check_predictor(predictor) if predictor.lower() == "realizations": - enforce_coordinate_ordering(forecast_predictor, "realization") + for forecast_predictor in forecast_predictors: + enforce_coordinate_ordering(forecast_predictor, "realization") if self.point_by_point: optimised_coeffs = self._process_points_independently( minimisation_function, initial_guess, - forecast_predictor, + forecast_predictors, truth, forecast_var, predictor, @@ -487,11 +520,12 @@ def process( optimised_coeffs = self._process_points_together( minimisation_function, initial_guess, - forecast_predictor, + forecast_predictors, truth, forecast_var, predictor, ) + return optimised_coeffs def calculate_normal_crps( @@ -535,17 +569,18 @@ def calculate_normal_crps( CRPS for the current set of coefficients. This CRPS is a mean value across all points. """ - if predictor.lower() == "mean": - a, b, gamma, delta = initial_guess - a_b = np.array([a, b], dtype=np.float64) - elif predictor.lower() == "realizations": - a, b, gamma, delta = ( + aa, bb, gamma, delta = ( initial_guess[0], - initial_guess[1:-2] * initial_guess[1:-2], + initial_guess[1:-2], initial_guess[-2], initial_guess[-1], ) - a_b = np.array([a] + b.tolist(), dtype=np.float64) + + if predictor.lower() == "mean": + a_b = np.array([aa, *np.atleast_1d(bb)], dtype=np.float64) + elif predictor.lower() == "realizations": + bb = bb * bb + a_b = np.array([aa] + bb.tolist(), dtype=np.float64) new_col = np.ones(truth.shape, dtype=np.float32) all_data = np.column_stack((new_col, forecast_predictor)) @@ -643,6 +678,69 @@ def calculate_truncated_normal_crps( result = self.BAD_VALUE return result + def calculate_genlogistic_ls( + self, initial_guess, forecast_predictor, truth, forecast_var, sqrt_pi, + predictor): + """ + Calculate the log score for a generalised Logistic distribution. + + Args: + initial_guess (list): + List of optimised coefficients. + Order of coefficients is [alpha, beta, gamma, delta, shape]. + forecast_predictor (numpy.ndarray): + Data to be used as the predictor, + either the ensemble mean or the ensemble realizations. + truth (numpy.ndarray): + Data to be used as truth. + forecast_var (numpy.ndarray): + Ensemble variance data. + predictor_of_mean_flag (str): + String to specify the input to calculate the calibrated mean. + Currently the ensemble mean ("mean") and the ensemble + realizations ("realizations") are supported as the predictors. + + Returns: + result (float): + Log score for the current set of coefficients. + + """ + if predictor.lower() == "mean": + aa, bb, gamma, delta, shape = ( + initial_guess[0], + initial_guess[1:-3], + initial_guess[-3], + initial_guess[-2], + initial_guess[-1], + ) + a_b = np.array([aa, *np.atleast_1d(bb)], dtype=np.float64) + new_col = np.ones(truth.shape, dtype=np.float32) + all_data = np.column_stack((new_col, forecast_predictor)) + mu = np.dot(all_data, a_b) + + elif predictor.lower() == "realizations": + a, b, gamma, delta, shape = ( + initial_guess[0], + initial_guess[1:-3] ** 2, + initial_guess[-3], + initial_guess[-2], + initial_guess[-1], + ) + a_b = np.array([a] + b.tolist(), dtype=np.float64) + + new_col = np.ones(truth.shape, dtype=np.float32) + all_data = np.column_stack((new_col, forecast_predictor)) + mu = np.dot(all_data, a_b) + + sigma = np.sqrt( + gamma**2 + delta**2 * forecast_var) + #print("mu = ", mu, "sigma = ", sigma, "shape = ", shape) + result = np.nanmean(-np.log(genlogistic.pdf(truth, c=shape, loc=mu, scale=sigma))) + + if not np.isfinite(np.min(mu/sigma)): + result = self.BAD_VALUE + return result + class EstimateCoefficientsForEnsembleCalibration(BasePlugin): """ @@ -655,6 +753,9 @@ def __init__( distribution: str, point_by_point: bool = False, use_default_initial_guess: bool = False, + local_standardise: bool = False, + local_standardise_using_forecasts: bool = False, + global_standardise: bool = False, desired_units: Optional[Union[str, Unit]] = None, predictor: str = "mean", tolerance: float = 0.02, @@ -686,6 +787,9 @@ def __init__( This means coefficients of 1 for the multiplicative coefficients and 0 for the additive coefficients. If False, the initial guess is computed. + local_standardise: + local_standardise_using_forecasts: + global_standardise: desired_units: The unit that you would like the calibration to be undertaken in. The current forecast, historical forecast and truth will be @@ -716,6 +820,9 @@ def __init__( self.distribution = distribution self.point_by_point = point_by_point self.use_default_initial_guess = use_default_initial_guess + self.local_standardise = local_standardise + self.local_standardise_using_forecasts = local_standardise_using_forecasts + self.global_standardise = global_standardise self._validate_distribution() self.desired_units = desired_units # Ensure predictor is valid. @@ -731,6 +838,8 @@ def __init__( # Setting default values for coeff_names. self.coeff_names = ["alpha", "beta", "gamma", "delta"] + if self.distribution == "genlogistic": + self.coeff_names.append("shape") def _validate_distribution(self) -> None: """Validate that the distribution supplied has a corresponding method @@ -833,7 +942,7 @@ def _get_spatial_associated_coordinates( return template_dims, spatial_associated_coords def _create_cubelist( - self, optimised_coeffs: ndarray, historic_forecasts: Cube + self, optimised_coeffs: ndarray, historic_forecasts: Cube, forecast_predictors:CubeList ) -> CubeList: """Create a cubelist by combining the optimised coefficients and the appropriate metadata. The units of the alpha and gamma coefficients @@ -869,11 +978,29 @@ def _create_cubelist( if self.predictor.lower() == "realizations" and "beta" == coeff_name: used_dims = ["realization"] + used_dims replacements += ["realization"] + template_cube = next(historic_forecasts.slices(used_dims)) + if "beta" == coeff_name: + template_cubes = iris.cube.CubeList() + fp_names = [] + for index, fp in enumerate(forecast_predictors): + template_cube_copy = template_cube.copy() + predictor_index = iris.coords.DimCoord(np.array(index, dtype=np.int32), long_name="predictor_index", units="1") + template_cube_copy.add_aux_coord(predictor_index) + template_cube_copy = iris.util.new_axis(template_cube_copy, predictor_index) + template_cubes.append(template_cube_copy) + fp_names.append(fp.name()) + template_cube = template_cubes.concatenate_cube() + predictor_name = iris.coords.AuxCoord(fp_names, long_name="predictor_name", units="no_unit") + template_cube.add_aux_coord(predictor_name, data_dims=template_cube.coord_dims("predictor_index")) + replacements += ["predictor_index", "predictor_name"] + for coord in coords_to_replace: template_cube.replace_coord(coord) + # Remove coordinates in the template cube that have not been replaced + # and therefore updated. for coord in set([c.name() for c in template_cube.coords()]) - set( replacements ): @@ -883,19 +1010,21 @@ def _create_cubelist( if coeff_name in ["alpha", "gamma"]: coeff_units = historic_forecasts.units + long_name = "shape_parameters" if coeff_name == "shape" else f"emos_coefficient_{coeff_name}" cube = create_new_diagnostic_cube( - f"emos_coefficient_{coeff_name}", + long_name, coeff_units, template_cube, generate_mandatory_attributes([historic_forecasts]), optional_attributes=self._set_attributes(historic_forecasts), - data=np.array(optimised_coeff), + data=np.reshape(optimised_coeff, template_cube.shape) if "beta" == coeff_name else np.array(optimised_coeff), ) cubelist.append(cube) return cubelist def create_coefficients_cubelist( - self, optimised_coeffs: Union[List[float], ndarray], historic_forecasts: Cube + self, optimised_coeffs: Union[List[float], ndarray], historic_forecasts: Cube, + forecast_predictors: CubeList ) -> CubeList: """Create a cubelist for storing the coefficients computed using EMOS. @@ -920,13 +1049,22 @@ def create_coefficients_cubelist( ValueError: If the number of coefficients in the optimised_coeffs does not match the expected number. """ - if self.predictor.lower() == "realizations": - optimised_coeffs = [ - optimised_coeffs[0], - optimised_coeffs[1:-2], - optimised_coeffs[-2], - optimised_coeffs[-1], - ] + if self.predictor.lower() == "realizations" or len(forecast_predictors) > 1: + if self.distribution == "genlogistic": + optimised_coeffs = [ + optimised_coeffs[0], + optimised_coeffs[1:-3], + optimised_coeffs[-3], + optimised_coeffs[-2], + optimised_coeffs[-1], + ] + else: + optimised_coeffs = [ + optimised_coeffs[0], + optimised_coeffs[1:-2], + optimised_coeffs[-2], + optimised_coeffs[-1], + ] if len(optimised_coeffs) != len(self.coeff_names): msg = ( @@ -937,14 +1075,14 @@ def create_coefficients_cubelist( ) raise ValueError(msg) - return self._create_cubelist(optimised_coeffs, historic_forecasts) + return self._create_cubelist(optimised_coeffs, historic_forecasts, forecast_predictors) def compute_initial_guess( self, truths: ndarray, forecast_predictor: ndarray, predictor: str, - number_of_realizations: Optional[int], + number_of_realizations: Optional[int] ) -> List[float]: """ Function to compute initial guess of the alpha, beta, gamma @@ -1001,7 +1139,8 @@ def compute_initial_guess( ) if predictor.lower() == "mean" and default_initial_guess: - initial_guess = [0, 1, 0, 1] + initial_beta = np.repeat(1.0/forecast_predictor.shape[0], forecast_predictor.shape[0]).tolist() + initial_guess = [0] + initial_beta + [0, 1] elif predictor.lower() == "realizations" and ( default_initial_guess or not statsmodels_available() ): @@ -1012,18 +1151,15 @@ def compute_initial_guess( elif not self.use_default_initial_guess: truths_flattened = flatten_ignoring_masked_data(truths) if predictor.lower() == "mean": + import statsmodels.api as sm forecast_predictor_flattened = flatten_ignoring_masked_data( - forecast_predictor + forecast_predictor, preserve_leading_dimension=True ) - if (truths_flattened.size == 0) or ( - forecast_predictor_flattened.size == 0 - ): - gradient, intercept = [np.nan, np.nan] - else: - gradient, intercept, _, _, _ = stats.linregress( - forecast_predictor_flattened, truths_flattened - ) - initial_guess = [intercept, gradient, 0, 1] + val = sm.add_constant(forecast_predictor_flattened.T, has_constant="add") + est = sm.OLS(truths_flattened, val).fit() + intercept = est.params[0] + gradient = est.params[1:] + initial_guess = [intercept] + gradient.tolist() + [0, 1] elif predictor.lower() == "realizations": import statsmodels.api as sm @@ -1036,6 +1172,8 @@ def compute_initial_guess( gradient = est.params[1:] initial_guess = [intercept] + gradient.tolist() + [0, 1] + if "shape" in self.coeff_names: + initial_guess.append(1) return np.array(initial_guess, dtype=np.float32) @staticmethod @@ -1069,7 +1207,7 @@ def guess_and_minimise( self, truths: Cube, historic_forecasts: Cube, - forecast_predictor: Cube, + forecast_predictors: CubeList, forecast_var: Cube, number_of_realizations: Optional[int], ) -> CubeList: @@ -1099,26 +1237,50 @@ def guess_and_minimise( """ if self.point_by_point and not self.use_default_initial_guess: index = [ - forecast_predictor.coord(axis="y"), - forecast_predictor.coord(axis="x"), + truths.coord(axis="y"), + truths.coord(axis="x"), ] + y_name = truths.coord(axis="y").name() + x_name = truths.coord(axis="x").name() + initial_guess = [] - for (truths_slice, fp_slice) in zip( - truths.slices_over(index), forecast_predictor.slices_over(index) - ): + for truths_slice in truths.slices_over(index): + constr = iris.Constraint( + coord_values={y_name: lambda cell: any(np.isclose(cell.point, truths_slice.coord(axis="y").points)), + x_name: lambda cell: any(np.isclose(cell.point, truths_slice.coord(axis="x").points))}) + new_fp_data = [] + for fp_cube in forecast_predictors: + extracted_cube = fp_cube.extract(constr) + if not fp_cube.coords("time"): + # Broadcast static predictors to the required shape. + multitime_fp_data = np.broadcast_to(extracted_cube.data, (len(truths.coord("time").points),)+extracted_cube.shape) + new_fp_data.append(multitime_fp_data) + else: + new_fp_data.append(extracted_cube.data) + new_fp_data = np.stack(new_fp_data) initial_guess.append( self.compute_initial_guess( truths_slice.data, - fp_slice.data, + new_fp_data, self.predictor, number_of_realizations, ) ) else: # Computing initial guess for EMOS coefficients + new_fp_data = [] + for fp_cube in forecast_predictors: + if not fp_cube.coords("time"): + # Broadcast static predictors to the required shape. + multitime_fp_data = np.broadcast_to(fp_cube.data, (len(truths.coord("time").points),)+fp_cube.shape) + new_fp_data.append(multitime_fp_data) + else: + new_fp_data.append(fp_cube.data) + new_fp_data = np.stack(new_fp_data) + initial_guess = self.compute_initial_guess( truths.data, - forecast_predictor.data, + new_fp_data, self.predictor, number_of_realizations, ) @@ -1131,18 +1293,17 @@ def guess_and_minimise( len(initial_guess), ), ) - # Calculate coefficients if there are no nans in the initial guess. optimised_coeffs = self.minimiser( initial_guess, - forecast_predictor, + forecast_predictors, truths, forecast_var, self.predictor, self.distribution.lower(), ) coefficients_cubelist = self.create_coefficients_cubelist( - optimised_coeffs, historic_forecasts + optimised_coeffs, historic_forecasts, forecast_predictors ) return coefficients_cubelist @@ -1151,6 +1312,7 @@ def process( self, historic_forecasts: Cube, truths: Cube, + additional_fields: Optional[CubeList] = None, landsea_mask: Optional[Cube] = None, ) -> CubeList: """ @@ -1180,6 +1342,8 @@ def process( Historic forecasts from the training dataset. truths: Truths from the training dataset. + additional_fields: + Additional fields to use as supplementary predictors. landsea_mask: The optional cube containing a land-sea mask. If provided, only land points are used to calculate the coefficients. Within the @@ -1211,10 +1375,14 @@ def process( # Ensure predictor is valid. check_predictor(self.predictor) - historic_forecasts, truths = filter_non_matching_cubes( - historic_forecasts, truths + historic_forecasts, truths, additional_fields = filter_non_matching_cubes( + historic_forecasts, truths, additional_fields ) check_forecast_consistency(historic_forecasts) + if additional_fields: + for af_cube in additional_fields: + if af_cube.coords("forecast_period") and af_cube.coords("forecast_reference_time"): + check_forecast_consistency(historic_forecasts) # Make sure inputs have the same units. if self.desired_units: historic_forecasts.convert_units(self.desired_units) @@ -1228,15 +1396,39 @@ def process( ) raise ValueError(msg) + coefficients_cubelist = iris.cube.CubeList() + if any([self.local_standardise, self.local_standardise_using_forecasts, self.global_standardise]): + # TODO: Extend local standardisation to handle additional fields. + # historic_forecasts, hf_mean, hf_sd = standardise_forecasts( + # historic_forecasts, hf_coords=historic_forecasts.coords(dim_coords=True)) + # truths, tr_mean, tr_sd = standardise_truths( + # truths, truths.coords(dim_coords=True)) + historic_forecasts, truths, hf_mean, hf_sd, tr_mean, tr_sd = standardise_forecast_and_truths( + historic_forecasts, truths, global_standardise=self.global_standardise, + using_forecasts=self.local_standardise_using_forecasts) + if not all([tr_mean, tr_sd]): + coefficients_cubelist.extend(iris.cube.CubeList([hf_mean, hf_sd])) + else: + # Note that these are not coefficients. + coefficients_cubelist.extend(iris.cube.CubeList([hf_mean, hf_sd, tr_mean, tr_sd])) + number_of_realizations = None if self.predictor.lower() == "mean": - forecast_predictor = collapsed( + forecast_predictors = iris.cube.CubeList() + forecast_predictors.append(collapsed( historic_forecasts, "realization", iris.analysis.MEAN - ) + )) + if additional_fields: + for af_cube in additional_fields: + if af_cube.coords("realization"): + af_cube = collapsed( + af_cube, "realization", iris.analysis.MEAN + ) + forecast_predictors.append(af_cube) elif self.predictor.lower() == "realizations": - forecast_predictor = historic_forecasts - number_of_realizations = len(forecast_predictor.coord("realization").points) - enforce_coordinate_ordering(forecast_predictor, "realization") + number_of_realizations = len(historic_forecasts.coord("realization").points) + enforce_coordinate_ordering(historic_forecasts, "realization") + forecast_predictors = iris.cube.CubeList([historic_forecasts]) forecast_var = collapsed( historic_forecasts, "realization", iris.analysis.VARIANCE @@ -1244,18 +1436,17 @@ def process( # If a landsea_mask is provided mask out the sea points if landsea_mask: - self.mask_cube(forecast_predictor, landsea_mask) + [self.mask_cube(fp, landsea_mask) for fp in forecast_predictors] self.mask_cube(forecast_var, landsea_mask) self.mask_cube(truths, landsea_mask) - coefficients_cubelist = self.guess_and_minimise( + coefficients_cubelist.extend(self.guess_and_minimise( truths, historic_forecasts, - forecast_predictor, + forecast_predictors, forecast_var, number_of_realizations, - ) - + )) return coefficients_cubelist @@ -1265,7 +1456,7 @@ class CalibratedForecastDistributionParameters(BasePlugin): uncalibrated input forecast and EMOS coefficients. """ - def __init__(self, predictor: str = "mean") -> None: + def __init__(self, predictor: str = "mean", unstandardise_using_forecasts=False) -> None: """ Create a plugin that uses the coefficients created using EMOS from historical forecasts and corresponding truths and applies these @@ -1281,9 +1472,11 @@ def __init__(self, predictor: str = "mean") -> None: """ check_predictor(predictor) self.predictor = predictor + self.unstandardise_using_forecasts = unstandardise_using_forecasts self.coefficients_cubelist = None self.current_forecast = None + self.standardise_cubelist = None def __repr__(self) -> str: """Represent the configured plugin instance as a string.""" @@ -1354,17 +1547,58 @@ def _calculate_location_parameter_from_mean(self) -> ndarray: Location parameter calculated using the ensemble mean as the predictor. """ - forecast_predictor = collapsed( + forecast_predictors = iris.cube.CubeList() + forecast_predictors.append(collapsed( self.current_forecast, "realization", iris.analysis.MEAN - ) + )) + for af in self.additional_fields: + if af.coords("realization"): + forecast_predictors.append(collapsed(af, "realization", iris.analysis.MEAN)) + else: + forecast_predictors.append(af) + + fp_names = [fp.name() for fp in forecast_predictors] + if len(forecast_predictors) != len(self.coefficients_cubelist.extract_strict("emos_coefficient_beta").coord("predictor_index").points): + msg = ("The number of forecast predictors must equal the number of " + "beta coefficients in order to create a calibrated forecast. " + f"Number of predictor cubes = {len(forecast_predictors)}: {fp_names}, " + f"Number of predictor coords = {len(self.coefficients_cubelist.extract_strict('emos_coefficient_beta').coord('predictor_index').points)}: {self.coefficients_cubelist.extract_strict('emos_coefficient_beta').coord('predictor_name').points}") + raise ValueError(msg) + else: + fp_names = [fp.name() for fp in forecast_predictors] + print("predictor_cube_names = ", fp_names) + print("predictor_coord_names", self.coefficients_cubelist.extract_strict("emos_coefficient_beta").coord("predictor_name").points) + + if self.standardise_cubelist: + constr = iris.Constraint(self.coefficients_cubelist[0].attributes["diagnostic_standard_name"]) + new_forecast_predictors = iris.cube.CubeList() + for fp in forecast_predictors: + forecast_predictor = fp.extract(constr) + if not forecast_predictor: + new_forecast_predictors.append(fp) + continue + forecast_predictor_orig = forecast_predictor.copy() + unstandardised_predictor = (forecast_predictor - self.standardise_cubelist.extract_strict("fbar"))/self.standardise_cubelist.extract_strict("fsig") + unstandardised_predictor.rename(forecast_predictor.name()) + new_forecast_predictors.append(unstandardised_predictor) + forecast_predictors = new_forecast_predictors # Calculate location parameter = a + b*X, where X is the # raw ensemble mean. In this case, b = beta. - location_parameter = ( - self.coefficients_cubelist.extract_strict("emos_coefficient_alpha").data - + self.coefficients_cubelist.extract_strict("emos_coefficient_beta").data - * forecast_predictor.data - ).astype(np.float32) + location_parameter = np.zeros(forecast_predictors[0].shape) + for fp in forecast_predictors: + constr = iris.Constraint(predictor_name=fp.name()) + location_parameter += self.coefficients_cubelist.extract_strict("emos_coefficient_beta").extract(constr).data * fp.data + location_parameter += self.coefficients_cubelist.extract_strict("emos_coefficient_alpha").data + location_parameter = location_parameter.astype(np.float32) + + if self.standardise_cubelist: + if self.unstandardise_using_forecasts: + location_parameter = (location_parameter*self.standardise_cubelist.extract_strict("fsig").data) + self.standardise_cubelist.extract_strict("fbar").data + else: + location_parameter = (location_parameter*self.standardise_cubelist.extract_strict("ysig").data) + self.standardise_cubelist.extract_strict("ybar").data + index = np.isnan(location_parameter) + location_parameter[index] = forecast_predictor_orig.data[index] return location_parameter @@ -1427,16 +1661,35 @@ def _calculate_scale_parameter(self) -> ndarray: forecast_var = self.current_forecast.collapsed( "realization", iris.analysis.VARIANCE ) + # Calculating the scale parameter, based on the raw variance S^2, # where predicted variance = c + dS^2, where c = (gamma)^2 and # d = (delta)^2 - scale_parameter = ( - self.coefficients_cubelist.extract_strict("emos_coefficient_gamma").data - * self.coefficients_cubelist.extract_strict("emos_coefficient_gamma").data - + self.coefficients_cubelist.extract_strict("emos_coefficient_delta").data - * self.coefficients_cubelist.extract_strict("emos_coefficient_delta").data - * forecast_var.data - ).astype(np.float32) + if self.standardise_cubelist: + if self.unstandardise_using_forecasts: + scale_parameter = ( + (self.coefficients_cubelist.extract_strict("emos_coefficient_gamma").data**2 + + self.coefficients_cubelist.extract_strict("emos_coefficient_delta").data**2 + * forecast_var.data/self.standardise_cubelist.extract_strict("fsig").data**2) * + self.standardise_cubelist.extract_strict("fsig").data**2 + ).astype(np.float32) + else: + scale_parameter = ( + (self.coefficients_cubelist.extract_strict("emos_coefficient_gamma").data**2 + + self.coefficients_cubelist.extract_strict("emos_coefficient_delta").data**2 + * forecast_var.data/self.standardise_cubelist.extract_strict("fsig").data**2) * + self.standardise_cubelist.extract_strict("ysig").data**2 + ).astype(np.float32) + index = scale_parameter.mask + scale_parameter[index] = forecast_var.data[index] + else: + scale_parameter = ( + self.coefficients_cubelist.extract_strict("emos_coefficient_gamma").data + ** 2 + + self.coefficients_cubelist.extract_strict("emos_coefficient_delta").data + ** 2 + * forecast_var.data + ).astype(np.float32) return scale_parameter def _create_output_cubes( @@ -1479,7 +1732,9 @@ def _create_output_cubes( def process( self, current_forecast: Cube, + additional_fields: CubeList, coefficients_cubelist: CubeList, + standardise_cubelist: Optional[CubeList] = None, landsea_mask: Optional[Cube] = None, ) -> Tuple[Cube, Cube]: """ @@ -1490,6 +1745,8 @@ def process( Args: current_forecast: The cube containing the current forecast. + additional_fields: + coefficients_cubelist: CubeList of EMOS coefficients where each cube within the cubelist is for a separate EMOS coefficient e.g. alpha, beta, @@ -1513,7 +1770,9 @@ def process( larger scale parameter will result in a broader PDF. """ self.current_forecast = current_forecast + self.additional_fields = additional_fields self.coefficients_cubelist = coefficients_cubelist + self.standardise_cubelist = standardise_cubelist # Check coefficients_cube and forecast cube are compatible. self._diagnostic_match() @@ -1725,6 +1984,7 @@ def _calibrate_forecast( def process( self, forecast: Cube, + additional_fields: CubeList, coefficients: CubeList, land_sea_mask: Optional[Cube] = None, realizations_count: Optional[int] = None, @@ -1732,6 +1992,9 @@ def process( predictor: str = "mean", randomise: bool = False, random_seed: Optional[int] = None, + local_standardise: bool = False, + local_standardise_using_forecasts: bool = False, + global_standardise: bool = False ) -> Cube: """Calibrate input forecast using pre-calculated coefficients @@ -1739,6 +2002,8 @@ def process( forecast: Uncalibrated forecast as probabilities, percentiles or realizations + additional_fields: + coefficients: EMOS coefficients land_sea_mask: @@ -1765,28 +2030,55 @@ def process( Calibrated forecast in the form of the input (ie probabilities percentiles or realizations) """ + allowed_coeff_names = ["emos_coefficient_alpha", "emos_coefficient_beta", + "emos_coefficient_gamma", "emos_coefficient_delta", "shape_parameters"] + standardisers = iris.cube.CubeList([c for c in coefficients if c.name() not in allowed_coeff_names]) + coefficients = iris.cube.CubeList([c for c in coefficients if c.name() in allowed_coeff_names]) + + if (any([local_standardise, local_standardise_using_forecasts, global_standardise]) + and not standardisers): + msg = ("Unstandardisation is requested but the mean and standard " + "deviation are not available.") + ValueError(msg) + elif (not any([local_standardise, local_standardise_using_forecasts, global_standardise]) and standardisers): + msg = ("No unstandardisation is requested but the mean and " + "standard deviation to unstandardise is available.") + self.forecast_type = self._get_forecast_type(forecast) forecast_as_realizations = forecast.copy() + additional_fields_as_realizations = additional_fields.copy() if self.forecast_type != "realizations": forecast_as_realizations = self._convert_to_realizations( forecast.copy(), realizations_count, ignore_ecc_bounds ) + additional_fields_as_realizations = iris.cube.CubeList() + for af in additional_fields: + if af.coords("time"): + additional_fields_as_realizations.append(self._convert_to_realizations( + af.copy(), realizations_count, ignore_ecc_bounds + )) + else: + additional_fields_as_realizations.append(af) calibration_plugin = CalibratedForecastDistributionParameters( - predictor=predictor + predictor=predictor, unstandardise_using_forecasts=local_standardise_using_forecasts ) location_parameter, scale_parameter = calibration_plugin( - forecast_as_realizations, coefficients, landsea_mask=land_sea_mask + forecast_as_realizations, additional_fields_as_realizations, + coefficients, standardise_cubelist=standardisers, landsea_mask=land_sea_mask ) + if [c for c in coefficients if c.name() == "shape_parameters"]: + shape = coefficients.extract_strict("shape_parameters") + else: + shape = self._get_attribute(coefficients, "shape_parameters", optional=True) + self.distribution = { "name": self._get_attribute(coefficients, "distribution"), "location": location_parameter, "scale": scale_parameter, - "shape": self._get_attribute( - coefficients, "shape_parameters", optional=True - ), + "shape": shape, } result = self._calibrate_forecast(forecast, randomise, random_seed) diff --git a/improver/calibration/utilities.py b/improver/calibration/utilities.py index 2ef8258f7c..b222aa6041 100644 --- a/improver/calibration/utilities.py +++ b/improver/calibration/utilities.py @@ -34,12 +34,12 @@ """ import importlib -from typing import Set, Tuple, Union +from typing import Optional, Set, Tuple, Union import iris import numpy as np from iris.coords import DimCoord -from iris.cube import Cube +from iris.cube import Cube, CubeList from numpy import ndarray from numpy.ma.core import MaskedArray @@ -162,8 +162,8 @@ def check_predictor(predictor: str) -> None: def filter_non_matching_cubes( - historic_forecast: Cube, truth: Cube -) -> Tuple[Cube, Cube]: + historic_forecast: Cube, truth: Cube, additional_fields: Optional[CubeList] = None +) -> Tuple[Cube, Cube, Optional[CubeList]]: """ Provide filtering for the historic forecast and truth to make sure that these contain matching validity times. This ensures that any @@ -176,6 +176,8 @@ def filter_non_matching_cubes( truth: Cube of truth that potentially contains a mismatch compared to the historic forecasts. + additional_fields: + Returns: - Cube of historic forecasts where any mismatches with @@ -189,6 +191,8 @@ def filter_non_matching_cubes( """ matching_historic_forecasts = iris.cube.CubeList([]) matching_truths = iris.cube.CubeList([]) + matching_additional_fields = iris.cube.CubeList([]) + afs_with_time = [af_cube for af_cube in additional_fields if af_cube.coords("time")] for hf_slice in historic_forecast.slices_over("time"): if hf_slice.coord("time").has_bounds(): point = iris_time_to_datetime( @@ -212,16 +216,51 @@ def filter_non_matching_cubes( constr = iris.Constraint(coord_values=coord_values) truth_slice = truth.extract(constr) - if truth_slice: + if truth_slice and not afs_with_time: matching_historic_forecasts.append(hf_slice) matching_truths.append(truth_slice) + + if afs_with_time: + # Only consider the point of the time coordinate for additional + # fields. + coord_values = { + "time": iris_time_to_datetime( + hf_slice.coord("time"), point_or_bound="point" + ) + } + constr = iris.Constraint(coord_values=coord_values) + af_slices = [ + af_cube.extract(constr) + for af_cube in afs_with_time + if af_cube.extract(constr) is not None + ] + if truth_slice and af_slices: + matching_historic_forecasts.append(hf_slice) + matching_truths.append(truth_slice) + matching_additional_fields.extend(af_slices) + + if additional_fields: + # Add additional fields without a time coordinate. + matching_additional_fields.extend( + [af_cube for af_cube in additional_fields if not af_cube.coords("time")] + ) + if not matching_historic_forecasts and not matching_truths: msg = ( "The filtering has found no matches in validity time " "between the historic forecasts and the truths." ) raise ValueError(msg) - return (matching_historic_forecasts.merge_cube(), matching_truths.merge_cube()) + + matching_additional_fields = ( + matching_additional_fields.merge() if matching_additional_fields else None + ) + + return ( + matching_historic_forecasts.merge_cube(), + matching_truths.merge_cube(), + matching_additional_fields, + ) def create_unified_frt_coord(forecast_reference_time: DimCoord) -> DimCoord: @@ -299,7 +338,10 @@ def forecast_coords_match(first_cube: Cube, second_cube: Cube) -> None: ValueError: The two cubes are not equivalent. """ mismatches = [] - if first_cube.coord("forecast_period") != second_cube.coord("forecast_period"): + if ( + first_cube.coord("forecast_period").points + != second_cube.coord("forecast_period").points + ): mismatches.append("forecast_period") if get_frt_hours(first_cube.coord("forecast_reference_time")) != get_frt_hours( @@ -364,3 +406,147 @@ def statsmodels_available() -> bool: if importlib.util.find_spec("statsmodels"): return True return False + + +def standardise_forecast_and_truths( + historic_forecasts, truths, global_standardise=False, using_forecasts=False +): + """Standardise the forecast and truths by subtracting the mean and dividing + by the standard deviation. + + Args: + forecast (iris.cube.Cube) + truth (iris.cube.Cube) + global_standardise + using_forecasts + Standardise the truths using the forecast mean and + standard deviation + + Returns: + Tuple: + """ + if global_standardise: + hf_coords = historic_forecasts.coords(dim_coords=True) + truth_coords = truths.coords(dim_coords=True) + else: + hf_coords = ["realization", "time"] + truth_coords = "time" + + # standardise ensemble members using the mean and standard deviation of the ensemble mean + forecast_mean = historic_forecasts.collapsed(hf_coords, iris.analysis.MEAN) + forecast_mean.rename("fbar") + forecast_sd = historic_forecasts.collapsed(hf_coords, iris.analysis.STD_DEV) + forecast_sd.rename("fsig") + + std_forecast = (historic_forecasts - forecast_mean) / forecast_sd + std_forecast.rename(historic_forecasts.name()) + + if using_forecasts: + std_truth = (truths - forecast_mean) / forecast_sd + std_truth.rename(truths.name()) + # Replace masked values created by dividing existing NaN truth values + # by a number (truth_sd). Otherwise, the "flatten_ignoring_masked_data" + # call in compute_initial_guess ignores these masked truths and ends up + # with a different number of points between the truth and the forecast. + std_truth.data = std_truth.data.filled(np.nan) + return std_forecast, std_truth, forecast_mean, forecast_sd, None, None + + # If using the truth to standardise + if not truths.coords("time", dim_coords=True): + # Handle a training dataset with a single timestep. + msg = ("The truths cube provided does not have a time dimension. " + "Multiple times are required to collapse over the time dimension " + "to calculate the climatological mean or standard deviation.") + raise ValueError(msg) + #truth_mean = truths.copy(np.full(truths.shape, np.nanmean(truths.data))) + #truth_sd = truths.copy(np.full(truths.shape, np.nanstd(truths.data))) + + # Use nanmean and nanstd as observations can sometimes be missing i.e. nan. + from iris.analysis import WeightedAggregator + + nanmean = WeightedAggregator("mean", np.nanmean) + nanstd = WeightedAggregator("standard_deviation", np.nanstd) + + truth_mean = truths.collapsed(truth_coords, nanmean) + truth_mean.rename("ybar") + + truth_sd = truths.collapsed(truth_coords, nanstd) + truth_sd.rename("ysig") + + std_truth = (truths - truth_mean) / truth_sd + std_truth.rename(truths.name()) + # Replace masked values created by dividing existing NaN truth values + # by a number (truth_sd). Otherwise, the "flatten_ignoring_masked_data" + # call in compute_initial_guess ignores these masked truths and ends up + # with a different number of points between the truth and the forecast. + std_truth.data = std_truth.data.filled(np.nan) + + # This check has been commented out because there are instances where a + # particular site has only one observation from a 30 day training dataset. + # This leads to a standard deviation of 0 for that site. A standard + # deviation of 0 will result in std_truth being NaN for the site anyway. + # if np.any(np.isclose(truth_sd.data, 0)): + # msg = ("Standardised truths cannot be calculated if the truth " + # "standard deviation is zero. Increasing the training dataset " + # "length may solve this issue.") + # raise ValueError(msg) + + # # Ensure that masked values in the truth created by the standardisation + # # are also masked in the forecast. + # (rdim,) = std_forecast.coord_dims("realization") + # expanded_truth = np.repeat( + # np.expand_dims(std_truth.data.mask, axis=rdim), + # len(std_forecast.coord("realization").points), + # axis=rdim, + # ) + # std_forecast.data = np.ma.masked_where(expanded_truth, std_forecast.data) + return std_forecast, std_truth, forecast_mean, forecast_sd, truth_mean, truth_sd + + +def standardise_forecasts(historic_forecasts, hf_coords=["realization", "time"]): + """Standardise the forecast by subtracting the mean and dividing + by the standard deviation. + + Args: + forecast (iris.cube.Cube) + truth (iris.cube.Cube) + + Returns: + Tuple: + """ + # standardise ensemble members using the mean and standard deviation of the ensemble mean + forecast_mean = historic_forecasts.collapsed(hf_coords, iris.analysis.MEAN) + forecast_mean.rename("fbar") + forecast_sd = historic_forecasts.collapsed(hf_coords, iris.analysis.STD_DEV) + forecast_sd.rename("fsig") + std_forecast = (historic_forecasts - forecast_mean) / forecast_sd + std_forecast.rename(historic_forecasts.name()) + return std_forecast, forecast_mean, forecast_sd + + +def standardise_truths(truths, truth_coords=["time"]): + """Standardise the truths by subtracting the mean and dividing by the + standard deviation. + + Args: + truths (iris.cube.Cube) + + Returns: + Tuple: + """ + # Use nanmean and nanstd as observations can sometimes be missing i.e. nan. + from iris.analysis import WeightedAggregator + + nanmean = WeightedAggregator("mean", np.nanmean) + nanstd = WeightedAggregator("standard_deviation", np.nanstd) + + truth_mean = truths.collapsed(truth_coords, nanmean) + truth_mean.rename("ybar") + + truth_sd = truths.collapsed(truth_coords, nanstd) + truth_sd.rename("ysig") + + std_truth = (truths - truth_mean) / truth_sd + std_truth.rename(truths.name()) + std_truth.data = std_truth.data.filled(np.nan) + return std_truth, truth_mean, truth_sd diff --git a/improver/cli/apply_emos_coefficients.py b/improver/cli/apply_emos_coefficients.py index d2f9df9c8f..6f82a407d0 100644 --- a/improver/cli/apply_emos_coefficients.py +++ b/improver/cli/apply_emos_coefficients.py @@ -47,15 +47,20 @@ @cli.clizefy @cli.with_output def process( - cube: cli.inputcube, - coefficients: inputcoeffs = None, - land_sea_mask: cli.inputcube = None, - *, + *cubes: cli.inputcubelist, + # cube: cli.inputcube, + # coefficients: inputcoeffs = None, + # land_sea_mask: cli.inputcube = None, + # *, realizations_count: int = None, randomise=False, random_seed: int = None, ignore_ecc_bounds=False, predictor="mean", + land_sea_mask_name: str = None, + local_standardise=False, + global_standardise=False, + local_standardise_using_forecasts=False, ): """Applying coefficients for Ensemble Model Output Statistics. @@ -108,6 +113,10 @@ def process( the location parameter when estimating the EMOS coefficients. Currently the ensemble mean ("mean") and the ensemble realizations ("realizations") are supported as the predictors. + land_sea_mask_name (str): + Name of the land-sea mask cube. If supplied, a land-sea mask cube + is expected within the list of input cubes and this land-sea mask + will be used to calibrate land points only. Returns: iris.cube.Cube: @@ -126,6 +135,9 @@ def process( import warnings from improver.calibration.ensemble_calibration import ApplyEMOS + from improver.calibration import split_forecasts_and_coeffs + + forecast, additional_fields, coefficients, land_sea_mask = split_forecasts_and_coeffs(cubes, land_sea_mask_name) if coefficients is None: msg = ( @@ -133,15 +145,12 @@ def process( "uncalibrated forecast will be returned." ) warnings.warn(msg) - return cube - - if land_sea_mask and land_sea_mask.name() != "land_binary_mask": - msg = "The land_sea_mask cube does not have the name 'land_binary_mask'" - raise ValueError(msg) + return forecast calibration_plugin = ApplyEMOS() result = calibration_plugin( - cube, + forecast, + additional_fields, coefficients, land_sea_mask=land_sea_mask, realizations_count=realizations_count, @@ -149,6 +158,9 @@ def process( predictor=predictor, randomise=randomise, random_seed=random_seed, + local_standardise=local_standardise, + local_standardise_using_forecasts=local_standardise_using_forecasts, + global_standardise=global_standardise, ) return result diff --git a/improver/cli/estimate_emos_coefficients.py b/improver/cli/estimate_emos_coefficients.py index 5e299caa00..4abb75ccab 100755 --- a/improver/cli/estimate_emos_coefficients.py +++ b/improver/cli/estimate_emos_coefficients.py @@ -44,6 +44,10 @@ def process( truth_attribute, point_by_point=False, use_default_initial_guess=False, + local_standardise=False, + local_standardise_using_forecasts=False, + global_standardise=False, + land_sea_mask_name: str = None, units=None, predictor="mean", tolerance: float = 0.02, @@ -86,6 +90,16 @@ def process( predictor to generate the calibrated distribution. This means coefficients of 1 for the multiplicative coefficients and 0 for the additive coefficients. If False, the initial guess is computed. + local_standardise (bool): + + local_standardise_using_forecasts (bool): + + global_standardise (bool): + + land_sea_mask_name (str): + Name of the land-sea mask cube. If supplied, a land-sea mask cube + is expected within the list of input cubes and this land-sea mask + will be used to calibrate land points only. units (str): The units that calibration should be undertaken in. The historical forecast and truth will be converted as required. @@ -118,16 +132,18 @@ def process( from improver.calibration.ensemble_calibration import ( EstimateCoefficientsForEnsembleCalibration, ) - - forecast, truth, land_sea_mask = split_forecasts_and_truth(cubes, truth_attribute) + forecast, truth, additional_fields, land_sea_mask = split_forecasts_and_truth(cubes, truth_attribute, land_sea_mask_name) plugin = EstimateCoefficientsForEnsembleCalibration( distribution, point_by_point=point_by_point, use_default_initial_guess=use_default_initial_guess, + local_standardise=local_standardise, + local_standardise_using_forecasts=local_standardise_using_forecasts, + global_standardise=global_standardise, desired_units=units, predictor=predictor, tolerance=tolerance, max_iterations=max_iterations, ) - return plugin(forecast, truth, landsea_mask=land_sea_mask) + return plugin(forecast, truth, additional_fields=additional_fields, landsea_mask=land_sea_mask) diff --git a/improver/ensemble_copula_coupling/ensemble_copula_coupling.py b/improver/ensemble_copula_coupling/ensemble_copula_coupling.py index 1df5735480..2f71912cad 100644 --- a/improver/ensemble_copula_coupling/ensemble_copula_coupling.py +++ b/improver/ensemble_copula_coupling/ensemble_copula_coupling.py @@ -742,13 +742,15 @@ def __init__( ) raise AttributeError(msg) - if shape_parameters is None: - if self.distribution.name == "truncnorm": + if isinstance(shape_parameters, iris.cube.Cube): + shape_parameters = shape_parameters.data.flatten() + elif shape_parameters is None: + if self.distribution.name in ["genlogistic", "truncnorm"]: raise ValueError( - "For the truncated normal distribution, " + f"For the {self.distribution.name} distribution, " "shape parameters must be specified." ) - shape_parameters = [] + shape_parameters = {} self.shape_parameters = shape_parameters def __repr__(self) -> str: @@ -794,6 +796,13 @@ def _rescale_shape_parameters( rescaled_values.append((value - location_parameter) / scale_parameter) self.shape_parameters = rescaled_values + def _shape_parameters_for_args(self): + """Prepare arguments for defining the distribution.""" + if self.distribution.name == "truncnorm": + self.shape_parameters = dict(zip(["a", "b"], self.shape_parameters)) + elif self.distribution.name == "genlogistic": + self.shape_parameters = {"c": self.shape_parameters} + class ConvertLocationAndScaleParametersToPercentiles( BasePlugin, ConvertLocationAndScaleParameters @@ -858,9 +867,10 @@ def _location_and_scale_parameters_to_percentiles( self._rescale_shape_parameters(location_data, np.sqrt(scale_data)) - percentile_method = self.distribution( - *self.shape_parameters, loc=location_data, scale=np.sqrt(scale_data) - ) + self._shape_parameters_for_args() + parameters = self.shape_parameters.copy() + parameters.update({"loc": location_data, "scale": np.sqrt(scale_data)}) + percentile_method = self.distribution(**parameters) # Loop over percentiles, and use the distribution as the # "percentile_method" with the location and scale parameter to