Skip to content

Commit

Permalink
Modifications to support the use of fastparquet at version 0.5.0.
Browse files Browse the repository at this point in the history
  • Loading branch information
gavinevans committed Oct 21, 2021
1 parent 79d6680 commit 5ccce69
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 37 deletions.
2 changes: 1 addition & 1 deletion envs/environment_py37_iris30.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ dependencies:
- sigtools
- sphinx
# Optional
- fastparquet
- fastparquet=0.5.0
- numba
- pysteps=1.4.1
- statsmodels
Expand Down
19 changes: 3 additions & 16 deletions improver/cli/estimate_emos_coefficients_from_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,30 +138,17 @@ def process(
"""

import iris
import pandas as pd
from iris.cube import CubeList

from improver.calibration import forecast_and_truth_dataframes_to_cubes
from improver.calibration.ensemble_calibration import (
EstimateCoefficientsForEnsembleCalibration,
)
from improver.utilities.load import load_parquet

filters = [("diagnostic", "==", diagnostic)]
forecast_df = pd.read_parquet(forecast, filters=filters)
if forecast_df.empty:
msg = (
f"The requested filepath {forecast} does not contain the "
f"requested contents: {filters}"
)
raise IOError(msg)

truth_df = pd.read_parquet(truth, filters=filters)
if truth_df.empty:
msg = (
f"The requested filepath {truth} does not contain the "
f"requested contents: {filters}"
)
raise IOError(msg)
forecast_df = load_parquet(forecast, filters)
truth_df = load_parquet(truth, filters)

forecast_cube, truth_cube = forecast_and_truth_dataframes_to_cubes(
forecast_df,
Expand Down
61 changes: 60 additions & 1 deletion improver/utilities/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,15 @@
# POSSIBILITY OF SUCH DAMAGE.
"""Module for loading cubes."""

from typing import List, Optional, Union
import os
from pathlib import Path
from typing import List, Optional, Tuple, Union

import iris
import pandas as pd
from iris import Constraint
from iris.cube import Cube, CubeList
from pandas.core.frame import DataFrame

from improver.utilities.cube_manipulation import (
MergeCubes,
Expand Down Expand Up @@ -145,3 +149,58 @@ def load_cube(
else:
cube = MergeCubes()(cubes)
return cube


def load_parquet(
path: Path, filters: Optional[List[Tuple[str, str, str]]] = None
) -> DataFrame:
"""Load a parquet file or a partitioned parquet dataset from the
path provided into a pandas DataFrame. For a partitioned parquet
dataset, the path provided is expected to be a directory containing
a _metadata directory or a directory containing subdirectories,
which each contain a _metadata directory.
Args:
path:
Path to a parquet file or a path or a partitioned parquet
dataset that will be loaded into a single pandas DataFrame.
filters:
Filter to restrict the contents of parquet file loaded.
For example: [('diagnostic', '==', 'wind_speed_at_10m')].
Returns:
Pandas DataFrame that has been loaded from the input filepath given
the filters provided.
Raises:
IOError: If no parquet files are found within the directory path.
IOError: If the filtered pandas dataframe is empty, raise an IOError.
"""
if filters is None:
filters = []

if path.is_file() or (path / "_metadata").exists():
dirs = [path]
else:
dirs = [
path / d.name for d in os.scandir(path) if (Path(d) / "_metadata").exists()
]

if not dirs:
msg = (
"The path provided is not a file and can not be "
"interpreted as a partitioned parquet dataset "
"as no _metadata subdirectory is present. "
f"The path provided was {path}"
)
raise IOError(msg)

df = pd.concat(pd.read_parquet(d, filters=filters) for d in dirs)

if df.empty:
msg = (
f"The requested directory {path} does not contain the "
f"requested contents: {filters}"
)
raise IOError(msg)
return df
35 changes: 17 additions & 18 deletions improver_tests/acceptance/SHA256SUMS
Original file line number Diff line number Diff line change
Expand Up @@ -99,24 +99,23 @@ d3efbc6014743793fafed512a8017a7b75e4f0ffa7fd202cd4f1b4a1086b2583 ./create-grid-
575151096829ed458ab13702129efc88f48a8ec459f9b3f39e11e4f5b2e296b2 ./create-grid-with-halo/basic/source_grid.nc
bf7e42be7897606682c3ecdaeb27bf3d3b6ab13a9a88b46c88ae6e92801c6245 ./create-grid-with-halo/halo_size/kgo.nc
9b48f3eabeed93a90654a5e6a2f06fcb7297cdba40f650551d99a5e310cf27dd ./estimate-emos-coefficients-from-table/altitude.nc
60aa2c755d6b2767b82d8e2876e6842f2e40a772aa74189f18af362425fae8da ./estimate-emos-coefficients-from-table/forecast_table/_common_metadata
8c2481c9eb772b93b19dfd3d931812eae08a9feab95a98b77343e2cca97d5dac ./estimate-emos-coefficients-from-table/forecast_table/_metadata
5d15436d9e864f2bf975f9434481920aa03594d98688b69e75cd090ff60a3f4f ./estimate-emos-coefficients-from-table/forecast_table/diagnostic=temperature_at_screen_level/part.0.parquet
ad1c3fdafc20fc72cc495a0a19597873b9aafe6b18e16b61413071e2ca943a11 ./estimate-emos-coefficients-from-table/forecast_table/diagnostic=temperature_at_screen_level_max-daytime/part.0.parquet
bd5d5917e131401801cec8ee6af4e3852b8bccf7ddb78bc2f2ab251ffd8fe1e8 ./estimate-emos-coefficients-from-table/forecast_table/diagnostic=wind_speed_at_10m/part.0.parquet
da550e187acfea39375bc9f2bd6a00859315331686098ba963762e6d8f18fe93 ./estimate-emos-coefficients-from-table/forecast_table_quantiles/_common_metadata
c471e7cf4c685203f7ca9e80f77d4fd1d5ebf21132a4ba572223668b109aa4b4 ./estimate-emos-coefficients-from-table/forecast_table_quantiles/_metadata
6285b4f36378732e5893ed2134236fff0e0cd8822cefe0bef3e35824b4583fb1 ./estimate-emos-coefficients-from-table/forecast_table_quantiles/diagnostic=temperature_at_screen_level/part.0.parquet
e802b6d8042973d9bdfea04d0970683f590c4190ac98cf053dff584a79932e46 ./estimate-emos-coefficients-from-table/forecast_table_quantiles/diagnostic=temperature_at_screen_level_max-daytime/part.0.parquet
f88ba8b7ca77e5906a6e2bd761ac8c02bd02797954c0adab949b9c10b98fb542 ./estimate-emos-coefficients-from-table/forecast_table_quantiles/diagnostic=wind_speed_at_10m/part.0.parquet
dd6c5197addd880783cff8fa52cf9fb7a494ee16e2447344fb242a958233b97d ./estimate-emos-coefficients-from-table/screen_temperature_additional_predictor_kgo.nc
8701aa4f9e94ff73d8cd983ea985d30ba2374b7c62469d7570b778b6d56700a3 ./estimate-emos-coefficients-from-table/screen_temperature_input_quantiles_kgo.nc
8701aa4f9e94ff73d8cd983ea985d30ba2374b7c62469d7570b778b6d56700a3 ./estimate-emos-coefficients-from-table/screen_temperature_kgo.nc
29d09a7244231279161cf79a76917b94941ca1e69544e01b4937db5e791db9ef ./estimate-emos-coefficients-from-table/truth_table/_common_metadata
4b28fae73ff92595b47f6381f3c772e9cc93bbf64cb6bed28c3b3dabb5dceb6c ./estimate-emos-coefficients-from-table/truth_table/_metadata
ac2e86b3e1a2dfb0d1275abb50bf8c280b2851c679f5a359dc273b6455517297 ./estimate-emos-coefficients-from-table/truth_table/diagnostic=temperature_at_screen_level/part.0.parquet
7e41450af449534eee0fb1c8cb45c1758d93099401841f48adcfd7ac0968efe4 ./estimate-emos-coefficients-from-table/truth_table/diagnostic=wind_speed_at_10m/part.0.parquet
05131c12a8f0c9616be81460fbf3086c441b502ffbaf0e3ef7f437a45278119b ./estimate-emos-coefficients-from-table/wind_speed_kgo.nc
93e0c05333bc93ca252027ee5e2da12ee93fac5bbff4bab00b9c334ad83141e2 ./estimate-emos-coefficients-from-table/forecast_table/_common_metadata
037e1cfc55c703a30cdbc1dee78da47020a598683247dc29a8ef88e65b53dfb1 ./estimate-emos-coefficients-from-table/forecast_table/_metadata
9290e994c78b44dcd334baa00b35eb168202ac2e6de998345f6a8cad77661837 ./estimate-emos-coefficients-from-table/forecast_table/diagnostic=temperature_at_screen_level/part.0.parquet
c9d91761e6a58a7b477b027527492fcc87fa6edb4249ee8ae85666b650a80df4 ./estimate-emos-coefficients-from-table/forecast_table/diagnostic=temperature_at_screen_level_max-daytime/part.0.parquet
830980bee6c9d1f69c9789ee6f16b80220ebcc2094e018abcad467aaa0172c96 ./estimate-emos-coefficients-from-table/forecast_table/diagnostic=wind_speed_at_10m/part.0.parquet
b3d8e86fcd40c596edf0173ca4281cd200770ba641e72ec0ccca0298b3c40e0e ./estimate-emos-coefficients-from-table/forecast_table_quantiles/_common_metadata
0425d143be811deb7487c7665e091d5dc31006e0a055f9f87cd5af4c746e034d ./estimate-emos-coefficients-from-table/forecast_table_quantiles/_metadata
4eca1d4d550cd1d934c73c5bcb146076342292ffe7d9d0a9d545eacaf8eb3090 ./estimate-emos-coefficients-from-table/forecast_table_quantiles/diagnostic=temperature_at_screen_level/part.0.parquet
7d296be97fdb2dd5261a7deb6487d022842ac86b53f44875f4267f7790857e1c ./estimate-emos-coefficients-from-table/forecast_table_quantiles/diagnostic=wind_speed_at_10m/part.0.parquet
84fc5fbdc782ecff9262f26232a971b327e3adbf3f0b122d8c3d4b147fec172b ./estimate-emos-coefficients-from-table/screen_temperature_additional_predictor_kgo.nc
542d054bd382eb1bf85fdc6a68c9a93de1c070b82bb1caef0d0917885c292e9c ./estimate-emos-coefficients-from-table/screen_temperature_input_quantiles_kgo.nc
542d054bd382eb1bf85fdc6a68c9a93de1c070b82bb1caef0d0917885c292e9c ./estimate-emos-coefficients-from-table/screen_temperature_kgo.nc
b49d0171b1a52f5cdf38ed86f50ab82719b3902905657c41ae038e78cd11d23c ./estimate-emos-coefficients-from-table/truth_table/_common_metadata
fe0722513847f999ac495d1e0ec2e4a501298e46cd102f55e44809209f807c5e ./estimate-emos-coefficients-from-table/truth_table/_metadata
20b85bcc2b1882f54b11b264024501a3db7e7a8aeaafaa2cb43241f47c5a1b2e ./estimate-emos-coefficients-from-table/truth_table/diagnostic=temperature_at_screen_level/part.0.parquet
9437152c25d6004a93baaaa4776ba15c3c6eb75e2e7f692fcc77bb615392b746 ./estimate-emos-coefficients-from-table/truth_table/diagnostic=wind_speed_at_10m/part.0.parquet
37650efdd39a45296faa190231bb3e7bc817feacf052a1e0edf6631d3b52f955 ./estimate-emos-coefficients-from-table/wind_speed_kgo.nc
d84aaed836c80f55e11207c8618d556b8aaca0c9dfd4320a3ea88387de71670c ./estimate-emos-coefficients/combined_input/historic_forecast.json
eeda3b2a092f3bc78bf7dc8cc22efd5e885f94b8b8e04ccc4740978dfc79e04d ./estimate-emos-coefficients/combined_input/truth.json
ff3a00a16fa94697d6e529a6f98384e4602f70a7d6ce26669c3947c8ac2e6f7e ./estimate-emos-coefficients/landmask.nc
Expand Down
103 changes: 102 additions & 1 deletion improver_tests/utilities/test_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,16 @@
"""Unit tests for loading functionality."""

import os
import shutil
import unittest
from datetime import datetime
from pathlib import Path
from tempfile import mkdtemp

import iris
import numpy as np
import pandas as pd
import pytest
from iris.tests import IrisTest

from improver.metadata.probabilistic import find_threshold_coordinate
Expand All @@ -46,7 +50,7 @@
set_up_probability_cube,
set_up_variable_cube,
)
from improver.utilities.load import load_cube, load_cubelist
from improver.utilities.load import load_cube, load_cubelist, load_parquet
from improver.utilities.save import save_netcdf


Expand Down Expand Up @@ -439,5 +443,102 @@ def test_lazy_load(self):
self.assertArrayEqual([True, True], [_.has_lazy_data() for _ in result])


class Test_load_parquet(unittest.TestCase):

"""Test the load_parquet function."""

def setUp(self):
"""Set-up tests."""
pytest.importorskip("fastparquet")
data = np.array([6.8, 2.7, 21.2], dtype=np.float32)
self.truth_data = np.tile(data, 3)

self.time1 = np.datetime64("2017-07-20T18:00:00")
self.time2 = np.datetime64("2017-07-21T18:00:00")
self.time3 = np.datetime64("2017-07-22T18:00:00")

self.wmo_ids = ["03002", "03003", "03004"]
diags = ["air_temperature"]
self.latitudes = [50, 60, 70]
self.longitudes = [-10, 0, 10]
self.altitudes = [10, 20, 30]

df_dict = {
"ob_value": self.truth_data,
"time": np.repeat([self.time1, self.time2, self.time3], 3),
"diagnostic": diags * 9,
"latitude": self.latitudes * 3,
"longitude": self.longitudes * 3,
"altitude": self.altitudes * 3,
"wmo_id": self.wmo_ids * 3,
}

self.df = pd.DataFrame(df_dict)
self.df = self.df.sort_values(by=["wmo_id"]).reset_index(drop=True)

self.one_parquet_filepath = Path(mkdtemp()) / "temp.parquet"
self.df.to_parquet(self.one_parquet_filepath, compression=None)

self.partitioned_directory = Path(mkdtemp())
self.df.to_parquet(
self.partitioned_directory, partition_cols="wmo_id", compression=None
)

self.partitioned_directory_with_sub = Path(mkdtemp()) / "extra_dir"
self.df.to_parquet(
self.partitioned_directory_with_sub,
partition_cols="wmo_id",
compression=None,
)

self.empty_directory = Path(mkdtemp())

def tearDown(self):
"""Remove temporary files and directories created for testing."""
os.unlink(self.one_parquet_filepath)
shutil.rmtree(self.partitioned_directory)
shutil.rmtree(self.partitioned_directory_with_sub)
shutil.rmtree(self.empty_directory)

def test_basic(self):
"""Test loading a parquet file."""
result = load_parquet(self.one_parquet_filepath)
pd.testing.assert_frame_equal(result, self.df)

def test_partitioned_dataset(self):
"""Test loading a parquet dataset."""
result = load_parquet(self.partitioned_directory)
result["wmo_id"] = result["wmo_id"].astype(object)
pd.testing.assert_frame_equal(result, self.df)

def test_partitioned_dataset_with_subdirectory(self):
"""Test loading a parquet dataset with an additional subdirectory."""
result = load_parquet(self.partitioned_directory_with_sub)
result["wmo_id"] = result["wmo_id"].astype(object)
pd.testing.assert_frame_equal(result, self.df)

def test_filter(self):
"""Test loading a parquet file with a filter."""
filters = [("wmo_id", "==", "03002")]
expected_df = self.df.loc[self.df["wmo_id"] == self.wmo_ids[0]]
result = load_parquet(self.partitioned_directory, filters=filters)
result["wmo_id"] = result["wmo_id"].astype(object)
pd.testing.assert_frame_equal(result, expected_df)

def test_empty_filter(self):
"""Test loading a parquet file with a filter when only an empty
dataframe is loaded."""
filters = [("wmo_id", "==", "03005")]
msg = "does not contain the requested contents"
with self.assertRaisesRegexp(IOError, msg):
load_parquet(self.partitioned_directory, filters)

def test_no_dirs(self):
"""Test for when the expected subdirectories do not exist."""
msg = "The path provided is not"
with self.assertRaisesRegexp(IOError, msg):
load_parquet(self.empty_directory)


if __name__ == "__main__":
unittest.main()

0 comments on commit 5ccce69

Please sign in to comment.