Skip to content

Commit

Permalink
Modifications to tidy up dataframe preparation.
Browse files Browse the repository at this point in the history
  • Loading branch information
gavinevans committed Oct 15, 2021
1 parent 2ccebbb commit e1a762c
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 62 deletions.
95 changes: 60 additions & 35 deletions improver/calibration/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,11 @@
from collections import OrderedDict
from typing import List, Optional, Sequence, Tuple

import iris
import numpy as np
import pandas as pd
from iris.coords import AuxCoord, DimCoord
from iris.cube import Cube, CubeList
from numpy import timedelta64
from pandas.core.frame import DataFrame
from pandas.core.indexes.base import Index
from pandas.core.indexes.datetimes import DatetimeIndex

from improver.ensemble_copula_coupling.ensemble_copula_coupling import (
Expand Down Expand Up @@ -316,6 +313,63 @@ def _training_dates_for_calibration(
)


def _prepare_dataframes(
forecast_df: DataFrame, truth_df: DataFrame
) -> Tuple[DataFrame, DataFrame]:
"""Prepare dataframes for conversion to cubes by: 1) checking
that the expected columns are present, 2) finding the sites
common to both the forecast and truth dataframes and 3)
replacing and supplementing the truth dataframe with
information from the forecast dataframe. Note that this third
step will also ensure that a row containing a NaN for the
ob_value is inserted for any missing observations.
Args:
forecast_df:
DataFrame expected to contain the following columns: forecast,
blend_time, forecast_period, forecast_reference_time, time,
wmo_id, percentile, diagnostic, latitude, longitude, period,
height, cf_name, units. Any other columns are ignored.
truth_df:
DataFrame expected to contain the following columns: ob_value,
time, wmo_id, diagnostic, latitude, longitude and altitude.
Any other columns are ignored.
Returns:
A sanitised version of the forecasts and truth dataframes that
are ready for conversion to cubes.
"""
_dataframe_column_check(forecast_df, FORECAST_DATAFRAME_COLUMNS)
_dataframe_column_check(truth_df, TRUTH_DATAFRAME_COLUMNS)

# Find the common set of WMO IDs.
common_wmo_ids = set(forecast_df["wmo_id"]).intersection(truth_df["wmo_id"])
forecast_df = forecast_df[forecast_df["wmo_id"].isin(common_wmo_ids)]
truth_df = truth_df[truth_df["wmo_id"].isin(common_wmo_ids)]

truth_df = truth_df.drop(columns=["altitude", "latitude", "longitude"])
# Identify columns to copy onto the truth_df from the forecast_df
forecast_subset = forecast_df[
[
"wmo_id",
"latitude",
"longitude",
"altitude",
"period",
"height",
"cf_name",
"units",
"time",
"diagnostic",
]
].drop_duplicates()
# Use "outer" to fill in any missing observations in the truth dataframe.
truth_df = truth_df.merge(
forecast_subset, on=["wmo_id", "time", "diagnostic"], how="outer"
)
return forecast_df, truth_df


def forecast_dataframe_to_cube(
df: DataFrame, training_dates: DatetimeIndex, forecast_period: int
) -> Cube:
Expand Down Expand Up @@ -445,19 +499,6 @@ def truth_dataframe_to_cube(df: DataFrame, training_dates: DatetimeIndex,) -> Cu
# per column.
_unique_check(time_df, "diagnostic")

# Ensure that every WMO ID has an entry for a particular time.
new_index = Index(df["wmo_id"].unique(), name="wmo_id")
time_df = time_df.set_index("wmo_id").reindex(new_index)

# Fill the alt/lat/lon with the mode to ensure consistent coordinates
# to support merging. Also fill other columns known to contain one
# unique value.
for col in ["altitude", "latitude", "longitude", "diagnostic"]:
time_df[col] = df.groupby(by="wmo_id", sort=False)[col].agg(
lambda x: pd.Series.mode(x, dropna=not df[col].isna().all())
)
time_df = time_df.reset_index()

if time_df["period"].isna().all():
time_bounds = None
else:
Expand Down Expand Up @@ -491,7 +532,8 @@ def forecast_and_truth_dataframes_to_cubes(
forecast_period: int,
training_length: int,
) -> Tuple[Cube, Cube]:
"""Convert a truth DataFrame into an iris Cube.
"""Convert a forecast DataFrame into an iris Cube and a
truth DataFrame into an iris Cube.
Args:
forecast_df:
Expand All @@ -517,24 +559,7 @@ def forecast_and_truth_dataframes_to_cubes(
cycletime, forecast_period, training_length
)

_dataframe_column_check(forecast_df, FORECAST_DATAFRAME_COLUMNS)
_dataframe_column_check(truth_df, TRUTH_DATAFRAME_COLUMNS)

truth_df = truth_df.drop(columns=["altitude", "latitude", "longitude"])
# Identify columns to copy onto the truth_df from the forecast_df
forecast_subset = forecast_df[
[
"wmo_id",
"latitude",
"longitude",
"altitude",
"period",
"height",
"cf_name",
"units",
]
].drop_duplicates()
truth_df = truth_df.merge(forecast_subset, on=["wmo_id"])
forecast_df, truth_df = _prepare_dataframes(forecast_df, truth_df)

forecast_cube = forecast_dataframe_to_cube(
forecast_df, training_dates, forecast_period
Expand Down
64 changes: 37 additions & 27 deletions improver_tests/calibration/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

import unittest
from datetime import datetime
from unittest.case import expectedFailure

import iris
import numpy as np
Expand Down Expand Up @@ -536,28 +535,6 @@ def test_two_day_training(self):
result = truth_dataframe_to_cube(self.truth_df, self.date_range_two_days,)
self.assertCubeEqual(result, self.expected_period_truth[1:, :])

def test_missing_observation(self):
"""Test an input DataFrame is converted correctly into an Iris Cube
if an observation is missing at a particular time."""
df = self.truth_df.head(-1)
self.expected_period_truth.data[-1, -1] = np.nan
result = truth_dataframe_to_cube(df, self.date_range)
np.testing.assert_array_equal(result.data, self.expected_period_truth.data)
for coord in ["altitude", "latitude", "longitude"]:
self.assertEqual(
result.coord(coord), self.expected_period_truth.coord(coord)
)

def test_moving_sites(self):
"""Test an input DataFrame is converted correctly into an Iris Cube
if the position of a particular site varies during the training period."""
df = self.truth_df.copy()
df.at[0, "altitude"] = 45
df.at[0, "latitude"] = 52
df.at[0, "longitude"] = -12
result = truth_dataframe_to_cube(df, self.date_range)
self.assertCubeEqual(result, self.expected_period_truth)

def test_empty_dataframe(self):
"""Test if none of the required data is available in the dataframe."""
validity_time = np.datetime64("2017-07-22T19:00:00")
Expand Down Expand Up @@ -637,8 +614,25 @@ def test_multiday_forecast_period(self):
self.assertCubeEqual(result[0], self.expected_period_forecast)
self.assertCubeEqual(result[1], self.expected_period_truth)

def test_site_mismatch(self):
"""Test for a mismatch in the sites available as truths and forecasts."""
def test_site_absent_from_forecast(self):
"""Test for when a site is absent from the forecast dataframe."""
df = self.forecast_df.copy()
df = df.loc[df["wmo_id"].isin(self.wmo_ids[:-1])]
expected_forecast = self.expected_period_forecast[:, :, :-1]
expected_truth = self.expected_period_truth[:, :-1]
result = forecast_and_truth_dataframes_to_cubes(
df,
self.truth_subset_df,
self.cycletime,
self.forecast_period,
self.training_length,
)
self.assertEqual(len(result), 2)
self.assertCubeEqual(result[0], expected_forecast)
self.assertCubeEqual(result[1], expected_truth)

def test_site_absent_from_truth(self):
"""Test for when a site is absent from the truth dataframe."""
df = self.truth_subset_df.copy()
df = df.loc[df["wmo_id"].isin(self.wmo_ids[:-1])]
expected_forecast = self.expected_period_forecast[:, :, :-1]
Expand Down Expand Up @@ -673,6 +667,22 @@ def test_site_coord_mismatch(self):
self.assertCubeEqual(result[0], self.expected_period_forecast)
self.assertCubeEqual(result[1], self.expected_period_truth)

def test_missing_observation(self):
"""Test a truth DataFrame with a missing observation at
a particular time is converted correctly into an iris Cube."""
df = self.truth_subset_df.head(-1)
self.expected_period_truth.data[-1, -1] = np.nan
result = forecast_and_truth_dataframes_to_cubes(
self.forecast_df,
df,
self.cycletime,
self.forecast_period,
self.training_length,
)
self.assertEqual(len(result), 2)
self.assertCubeEqual(result[0], self.expected_period_forecast)
self.assertCubeEqual(result[1], self.expected_period_truth)

def test_forecast_missing_compulsory_columns(self):
"""Test if there are missing compulsory columns in the forecast
dataframe."""
Expand Down Expand Up @@ -705,7 +715,7 @@ def test_truth_missing_compulsory_columns(self):

def test_forecast_additional_columns_present(self):
"""Test that if there are additional columns present
in forecast dataframe, these have no impact."""
in the forecast dataframe, these have no impact."""
df = self.forecast_df.copy()
df["station_id"] = "11111"
result = forecast_and_truth_dataframes_to_cubes(
Expand All @@ -719,7 +729,7 @@ def test_forecast_additional_columns_present(self):

def test_truth_additional_columns_present(self):
"""Test that if there are additional columns present
in truth dataframe, these have no impact."""
in the truth dataframe, these have no impact."""
df = self.truth_subset_df.copy()
df["station_id"] = "11111"
result = forecast_and_truth_dataframes_to_cubes(
Expand Down

0 comments on commit e1a762c

Please sign in to comment.