Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CLI for ingesting tabular forecasts and observations into EMOS #1592

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 56 additions & 11 deletions improver/calibration/dataframe_utilities.py
Original file line number Diff line number Diff line change
@@ -37,7 +37,7 @@


"""
from typing import Optional, Sequence, Tuple
from typing import List, Optional, Sequence, Tuple

import numpy as np
import pandas as pd
@@ -49,6 +49,7 @@
from improver.ensemble_copula_coupling.ensemble_copula_coupling import (
RebadgePercentilesAsRealizations,
)
from improver.ensemble_copula_coupling.utilities import choose_set_of_percentiles
from improver.metadata.constants.time_types import TIME_COORDS
from improver.spotdata.build_spotdata_cube import build_spotdata_cube

@@ -121,8 +122,8 @@ def _preprocess_temporal_columns(df: DataFrame) -> DataFrame:
content of the columns with temporal dtypes are
accessible as pandas datetime objects.
"""
for col in df.select_dtypes(include="datetime64[ns]"):
df[col] = df[col].dt.tz_localize("UTC").astype("O")
for col in df.select_dtypes(include=["datetime64[ns, UTC]"]):
df[col] = df[col].astype("O")
for col in df.select_dtypes(include="timedelta64[ns]"):
df[col] = df[col].astype("O")
return df
@@ -150,6 +151,28 @@ def _unique_check(df: DataFrame, column: str) -> None:
raise ValueError(msg)


def _quantile_check(df: DataFrame) -> None:
"""Check that the percentiles provided can be considered to be
quantiles with equal spacing spanning the percentile range.

Args:
df: DataFrame with a percentile column.

Raises:
ValueError: Percentiles are not equally spaced.
"""
expected_percentiles = choose_set_of_percentiles(df["percentile"].nunique())

if not np.allclose(expected_percentiles, df["percentile"].unique()):
msg = (
"The forecast percentiles can not be considered as quantiles. "
f"The forecast percentiles are {df['percentile'].unique()}."
"Based on the number of percentiles provided, the expected "
f"percentiles would be {expected_percentiles}."
)
raise ValueError(msg)


def _define_time_coord(
adate: pd.Timestamp, time_bounds: Optional[Sequence[pd.Timestamp]] = None,
) -> DimCoord:
@@ -238,7 +261,9 @@ def _training_dates_for_calibration(


def _prepare_dataframes(
forecast_df: DataFrame, truth_df: DataFrame
forecast_df: DataFrame,
truth_df: DataFrame,
percentiles: Optional[List[float]] = None,
) -> Tuple[DataFrame, DataFrame]:
"""Prepare dataframes for conversion to cubes by: 1) checking
that the expected columns are present, 2) finding the sites
@@ -258,6 +283,8 @@ def _prepare_dataframes(
DataFrame expected to contain the following columns: ob_value,
time, wmo_id, diagnostic, latitude, longitude and altitude.
Any other columns are ignored.
percentiles:
The set of percentiles to be used for estimating EMOS coefficients.

Returns:
A sanitised version of the forecasts and truth dataframes that
@@ -266,11 +293,24 @@ def _prepare_dataframes(
_dataframe_column_check(forecast_df, FORECAST_DATAFRAME_COLUMNS)
_dataframe_column_check(truth_df, TRUTH_DATAFRAME_COLUMNS)

# Extract the required percentiles.
if percentiles:
indices = [np.isclose(forecast_df["percentile"], float(p)) for p in percentiles]
forecast_df = forecast_df[np.logical_or.reduce(indices)]

# Check the percentiles can be considered to be equally space quantiles.
_quantile_check(forecast_df)

# Find the common set of WMO IDs.
common_wmo_ids = set(forecast_df["wmo_id"]).intersection(truth_df["wmo_id"])
common_wmo_ids = sorted(
set(forecast_df["wmo_id"].unique()).intersection(truth_df["wmo_id"].unique())
)
forecast_df = forecast_df[forecast_df["wmo_id"].isin(common_wmo_ids)]
truth_df = truth_df[truth_df["wmo_id"].isin(common_wmo_ids)]

# Ensure time in forecasts is present in truths.
forecast_df = forecast_df[forecast_df["time"].isin(truth_df["time"].unique())]

truth_df = truth_df.drop(columns=["altitude", "latitude", "longitude"])
# Identify columns to copy onto the truth_df from the forecast_df
forecast_subset = forecast_df[
@@ -287,6 +327,7 @@ def _prepare_dataframes(
"diagnostic",
]
].drop_duplicates()

# Use "outer" to fill in any missing observations in the truth dataframe.
truth_df = truth_df.merge(
forecast_subset, on=["wmo_id", "time", "diagnostic"], how="outer"
@@ -314,15 +355,14 @@ def forecast_dataframe_to_cube(
Returns:
Cube containing the forecasts from the training period.
"""
df = _preprocess_temporal_columns(df)

fp_point = pd.Timedelta(int(forecast_period), unit="hours")

cubelist = CubeList()

for adate in training_dates:
time_df = df.loc[(df["time"] == adate) & (df["forecast_period"] == fp_point)]

time_df = _preprocess_temporal_columns(time_df)
if time_df.empty:
continue

@@ -409,12 +449,11 @@ def truth_dataframe_to_cube(df: DataFrame, training_dates: DatetimeIndex,) -> Cu
Returns:
Cube containing the truths from the training period.
"""
df = _preprocess_temporal_columns(df)

cubelist = CubeList()
for adate in training_dates:
time_df = df.loc[(df["time"] == adate)]

time_df = df.loc[df["time"] == adate]
time_df = _preprocess_temporal_columns(time_df)

if time_df.empty:
continue
@@ -455,6 +494,7 @@ def forecast_and_truth_dataframes_to_cubes(
cycletime: str,
forecast_period: int,
training_length: int,
percentiles: Optional[List[float]] = None,
) -> Tuple[Cube, Cube]:
"""Convert a forecast DataFrame into an iris Cube and a
truth DataFrame into an iris Cube.
@@ -475,6 +515,9 @@ def forecast_and_truth_dataframes_to_cubes(
Forecast period in hours as an integer.
training_length:
Training length in days as an integer.
percentiles:
The set of percentiles to be used for estimating EMOS coefficients.
These should be a set of equally spaced quantiles.

Returns:
Forecasts and truths for the training period in Cube format.
@@ -483,7 +526,9 @@ def forecast_and_truth_dataframes_to_cubes(
cycletime, forecast_period, training_length
)

forecast_df, truth_df = _prepare_dataframes(forecast_df, truth_df)
forecast_df, truth_df = _prepare_dataframes(
forecast_df, truth_df, percentiles=percentiles
)

forecast_cube = forecast_dataframe_to_cube(
forecast_df, training_dates, forecast_period
3 changes: 2 additions & 1 deletion improver/cli/__init__.py
Original file line number Diff line number Diff line change
@@ -328,7 +328,8 @@ def with_output(
from improver.utilities.save import save_netcdf

result = wrapped(*args, **kwargs)
if output:

if output and result:
save_netcdf(result, output, compression_level, least_significant_digit)
return
return result
197 changes: 197 additions & 0 deletions improver/cli/estimate_emos_coefficients_from_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# -----------------------------------------------------------------------------
# (C) British Crown Copyright 2017-2021 Met Office.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
"""CLI to estimate coefficients for Ensemble Model Output
Statistics (EMOS), otherwise known as Non-homogeneous Gaussian
Regression (NGR)."""

from improver import cli


@cli.clizefy
@cli.with_output
def process(
forecast: cli.inputpath,
truth: cli.inputpath,
additional_predictors: cli.inputcubelist = None,
*,
diagnostic,
cycletime,
forecast_period,
training_length,
distribution,
point_by_point=False,
use_default_initial_guess=False,
units=None,
predictor="mean",
tolerance: float = 0.02,
max_iterations: int = 1000,
percentiles: cli.comma_separated_list = None,
):
"""Estimate coefficients for Ensemble Model Output Statistics.

Loads in arguments for estimating coefficients for Ensemble Model
Output Statistics (EMOS), otherwise known as Non-homogeneous Gaussian
Regression (NGR). Two sources of input data must be provided: historical
forecasts and historical truth data (to use in calibration).
The estimated coefficients are output as a cube.

Args:
forecast (pathlib.Path):
The path to a Parquet file containing the historical forecasts
to be used for calibration.The expected columns within the
Parquet file are: forecast, blend_time, forecast_period,
forecast_reference_time, time, wmo_id, percentile, diagnostic,
latitude, longitude, period, height, cf_name, units.
truth (pathlib.Path):
The path to a Parquet file containing the truths to be used
for calibration. The expected columns within the
Parquet file are: ob_value, time, wmo_id, diagnostic, latitude,
longitude and altitude.
additional_predictors (iris.cube.Cube):
A cube for a static additional predictor to be used, in addition
to the forecast, when estimating the EMOS coefficients.
diagnostic (str):
The name of the diagnostic to be calibrated within the forecast
and truth tables. This name is used to filter the Parquet file
when reading from disk.
cycletime (str):
Cycletime of a format similar to 20170109T0000Z.
forecast_period (int):
Forecast period to be calibrated in hours.
training_length (int):
Number of days within the training period.
distribution (str):
The distribution that will be used for minimising the
Continuous Ranked Probability Score when estimating the EMOS
coefficients. This will be dependent upon the input phenomenon.
point_by_point (bool):
If True, coefficients are calculated independently for each point
within the input cube by creating an initial guess and minimising
each grid point independently. If False, a single set of
coefficients is calculated using all points.
Warning: This option is memory intensive and is unsuitable for
gridded input. Using a default initial guess may reduce the memory
overhead option.
use_default_initial_guess (bool):
If True, use the default initial guess. The default initial guess
assumes no adjustments are required to the initial choice of
predictor to generate the calibrated distribution. This means
coefficients of 1 for the multiplicative coefficients and 0 for
the additive coefficients. If False, the initial guess is computed.
units (str):
The units that calibration should be undertaken in. The historical
forecast and truth will be converted as required.
predictor (str):
String to specify the form of the predictor used to calculate the
location parameter when estimating the EMOS coefficients.
Currently the ensemble mean ("mean") and the ensemble realizations
("realizations") are supported as options.
tolerance (float):
The tolerance for the Continuous Ranked Probability Score (CRPS)
calculated by the minimisation. Once multiple iterations result in
a CRPS equal to the same value within the specified tolerance, the
minimisation will terminate.
max_iterations (int):
The maximum number of iterations allowed until the minimisation has
converged to a stable solution. If the maximum number of iterations
is reached but the minimisation has not yet converged to a stable
solution, then the available solution is used anyway, and a warning
is raised. If the predictor is "realizations", then the number of
iterations may require increasing, as there will be more
coefficients to solve.
percentiles (List[float]):
The set of percentiles to be used for estimating EMOS coefficients.
These should be a set of equally spaced quantiles.

Returns:
iris.cube.CubeList:
CubeList containing the coefficients estimated using EMOS. Each
coefficient is stored in a separate cube.
"""

import iris
import pandas as pd
from iris.cube import CubeList

from improver.calibration.dataframe_utilities import (
forecast_and_truth_dataframes_to_cubes,
)
from improver.calibration.ensemble_calibration import (
EstimateCoefficientsForEnsembleCalibration,
)

filters = [("diagnostic", "==", diagnostic)]
forecast_df = pd.read_parquet(forecast, filters=filters)
if forecast_df.empty:
msg = (
f"The requested filepath {forecast} does not contain the "
f"requested contents: {filters}"
)
raise IOError(msg)

truth_df = pd.read_parquet(truth, filters=filters)
if truth_df.empty:
msg = (
f"The requested filepath {truth} does not contain the "
f"requested contents: {filters}"
)
raise IOError(msg)

forecast_cube, truth_cube = forecast_and_truth_dataframes_to_cubes(
forecast_df,
truth_df,
cycletime,
forecast_period,
training_length,
percentiles=percentiles,
)

if not forecast_cube or not truth_cube:
return

# Extract WMO IDs from the additional predictors.
if additional_predictors:
constr = iris.Constraint(wmo_id=truth_cube.coord("wmo_id").points)
additional_predictors = CubeList(
[ap.extract(constr) for ap in additional_predictors]
)

plugin = EstimateCoefficientsForEnsembleCalibration(
distribution,
point_by_point=point_by_point,
use_default_initial_guess=use_default_initial_guess,
desired_units=units,
predictor=predictor,
tolerance=tolerance,
max_iterations=max_iterations,
)
return plugin(forecast_cube, truth_cube, additional_fields=additional_predictors)
Loading