From 719694bb8ef5eefe49e54e83230fa4c184d7a940 Mon Sep 17 00:00:00 2001 From: tomaz-suller Date: Mon, 24 Oct 2022 06:51:57 -0300 Subject: [PATCH] Add generic HDF5 dataset Add dataset and tests for HDF5 files which may not be supported by pandas. Resolves #1026. Sign-off-by: Eric Oliveira Gomes Sign-off-by: tomaz-suller --- RELEASE.md | 5 + .../source/api_docs/kedro.extras.datasets.rst | 1 + kedro/extras/datasets/hdf5/__init__.py | 8 + kedro/extras/datasets/hdf5/h5py_dataset.py | 217 +++++++++++++++ static/jsonschema/kedro-catalog-0.18.json | 39 +++ .../extras/datasets/hdf5/test_h5py_dataset.py | 259 ++++++++++++++++++ 6 files changed, 529 insertions(+) create mode 100644 kedro/extras/datasets/hdf5/__init__.py create mode 100644 kedro/extras/datasets/hdf5/h5py_dataset.py create mode 100644 tests/extras/datasets/hdf5/test_h5py_dataset.py diff --git a/RELEASE.md b/RELEASE.md index 13ac55056f..d920061336 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -13,6 +13,11 @@ ## Major features and improvements * The config loader objects now implement `UserDict` and the configuration is accessed through `conf_loader['catalog']` * You can configure config file patterns through `settings.py` without creating a custom config loader +* Added the following new datasets: + +| Type | Description | Location | +| ------------------------- | --------------------------------------------------------------------------------------- | ------------------------------- | +| `hdf5.H5pyDataSet` | Work with HDF5 files through h5py.File objects, which support more formats than pandas | `kedro.extras.datasets.hdf5` | ## Bug fixes and other changes * Fixed `kedro micropkg pull` for packages on PyPI. diff --git a/docs/source/api_docs/kedro.extras.datasets.rst b/docs/source/api_docs/kedro.extras.datasets.rst index 2a9a5acd83..72939dcdf0 100644 --- a/docs/source/api_docs/kedro.extras.datasets.rst +++ b/docs/source/api_docs/kedro.extras.datasets.rst @@ -16,6 +16,7 @@ kedro.extras.datasets kedro.extras.datasets.dask.ParquetDataSet kedro.extras.datasets.email.EmailMessageDataSet kedro.extras.datasets.geopandas.GeoJSONDataSet + kedro.extras.datasets.hdf5.H5pyDataSet kedro.extras.datasets.holoviews.HoloviewsWriter kedro.extras.datasets.json.JSONDataSet kedro.extras.datasets.matplotlib.MatplotlibWriter diff --git a/kedro/extras/datasets/hdf5/__init__.py b/kedro/extras/datasets/hdf5/__init__.py new file mode 100644 index 0000000000..fea71a01e1 --- /dev/null +++ b/kedro/extras/datasets/hdf5/__init__.py @@ -0,0 +1,8 @@ +"""``AbstractDataSet`` implementation that produces arbitrary objects from HDF5 files.""" + +__all__ = ["H5pyDataSet"] + +from contextlib import suppress + +with suppress(ImportError): + from .h5py_dataset import H5pyDataSet diff --git a/kedro/extras/datasets/hdf5/h5py_dataset.py b/kedro/extras/datasets/hdf5/h5py_dataset.py new file mode 100644 index 0000000000..afd2d0f8e6 --- /dev/null +++ b/kedro/extras/datasets/hdf5/h5py_dataset.py @@ -0,0 +1,217 @@ +"""``H5pyDataSet`` loads/saves data from/to a hdf file using an underlying +filesystem (e.g.: local, S3, GCS). It uses h5py.File to handle the hdf file. +""" + +import tempfile +from copy import deepcopy +from io import BytesIO +from pathlib import PurePosixPath +from threading import Lock +from typing import Any, Dict + +import fsspec +import h5py + +from kedro.io.core import ( + AbstractVersionedDataSet, + DataSetError, + Version, + get_filepath_str, + get_protocol_and_path, +) + + +class H5pyDataSet(AbstractVersionedDataSet): + """``H5pyDataSet`` loads/saves data from/to a hdf file using an underlying + filesystem (e.g. local, S3, GCS). It uses h5py.File to handle the hdf file. + + Example adding a catalog entry with + `YAML API `_: + + .. code-block:: yaml + + >>> hdf_dataset: + >>> type: hdf5.H5pyDataSet + >>> filepath: s3://my_bucket/raw/sensor_reading.h5 + >>> credentials: aws_s3_creds + + Example using Python API: + :: + + >>> from kedro.extras.datasets.hdf5 import H5pyDataSet + >>> from tempfile import TemporaryFile + >>> import numpy as np + >>> import h5py + >>> + >>> file = TemporaryFile() + >>> h5f = h5py.File(file) + >>> dataset = h5f.create_dataset('foo', data=[1,2]) + >>> + >>> # data_set = H5pyDataSet(filepath="gcs://bucket/test.hdf") + >>> data_set = H5pyDataSet(filepath="test.h5") + >>> # Saved and loaded objects are both h5py.File instances + >>> data_set.save(h5f) + >>> reloaded = data_set.load() + >>> assert 'foo' in reloaded + >>> np.testing.assert_equal(reloaded['foo'][()], [1, 2]) + >>> + >>> h5f.close() + >>> file.close() + + """ + + # _lock is a class attribute that will be shared across all the instances. + # It is used to make dataset safe for threads. + _lock = Lock() + DEFAULT_LOAD_ARGS = {} # type: Dict[str, Any] + DEFAULT_SAVE_ARGS = {} # type: Dict[str, Any] + + # pylint: disable=protected-access + @staticmethod + def __h5py_from_binary(binary, load_args): + file_access_property_list = h5py.h5p.create(h5py.h5p.FILE_ACCESS) + file_access_property_list.set_fapl_core(backing_store=False) + file_access_property_list.set_file_image(binary) + + file_id_args = { + "fapl": file_access_property_list, + "flags": h5py.h5f.ACC_RDONLY, + "name": next(tempfile._get_candidate_names()).encode(), + } + h5_file_args = {"backing_store": False, "driver": "core", "mode": "r"} + + file_id = h5py.h5f.open(**file_id_args) + return h5py.File(file_id, **h5_file_args, **load_args) + + @staticmethod + def __h5py_to_binary(h5f: h5py.File, save_args): + bio = BytesIO() + with h5py.File(bio, "w", **save_args) as biof: + for _, value in h5f.items(): + h5f.copy( + value, + biof, + expand_soft=True, + expand_external=True, + expand_refs=True, + ) + biof.close() + return bio.getvalue() + + # pylint: disable=too-many-arguments + def __init__( + self, + filepath: str, + load_args: Dict[str, Any] = None, + save_args: Dict[str, Any] = None, + version: Version = None, + credentials: Dict[str, Any] = None, + fs_args: Dict[str, Any] = None, + ) -> None: + """Creates a new instance of ``H5pyDataSet`` pointing to a concrete hdf file + on a specific filesystem. + + Args: + filepath: Filepath in POSIX format to a hdf file prefixed with a protocol like `s3://`. + If prefix is not provided, `file` protocol (local filesystem) will be used. + The prefix should be any protocol supported by ``fsspec``. + Note: `http(s)` doesn't support versioning. + load_args: h5py options for loading hdf files. + You can find all available arguments at: + https://docs.h5py.org/en/stable/high/file.html#h5py.File + All defaults are preserved. + save_args: h5py options for saving hdf files. + You can find all available arguments at: + https://docs.h5py.org/en/stable/high/file.html#h5py.File + All defaults are preserved. + version: If specified, should be an instance of + ``kedro.io.core.Version``. If its ``load`` attribute is + None, the latest version will be loaded. If its ``save`` + attribute is None, save version will be autogenerated. + credentials: Credentials required to get access to the underlying filesystem. + E.g. for ``GCSFileSystem`` it should look like `{"token": None}`. + fs_args: Extra arguments to pass into underlying filesystem class constructor + (e.g. `{"project": "my-project"}` for ``GCSFileSystem``), as well as + to pass to the filesystem's `open` method through nested keys + `open_args_load` and `open_args_save`. + Here you can find all available arguments for `open`: + https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.open + All defaults are preserved, except `mode`, which is set `wb` when saving. + """ + _fs_args = deepcopy(fs_args) or {} + _fs_open_args_load = _fs_args.pop("open_args_load", {}) + _fs_open_args_save = _fs_args.pop("open_args_save", {}) + _credentials = deepcopy(credentials) or {} + + protocol, path = get_protocol_and_path(filepath, version) + if protocol == "file": + _fs_args.setdefault("auto_mkdir", True) + + self._protocol = protocol + self._fs = fsspec.filesystem(self._protocol, **_credentials, **_fs_args) + + super().__init__( + filepath=PurePosixPath(path), + version=version, + exists_function=self._fs.exists, + glob_function=self._fs.glob, + ) + + # Handle default load and save arguments + self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS) + if load_args is not None: + self._load_args.update(load_args) + self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS) + if save_args is not None: + self._save_args.update(save_args) + + _fs_open_args_save.setdefault("mode", "wb") + self._fs_open_args_load = _fs_open_args_load + self._fs_open_args_save = _fs_open_args_save + + def _describe(self) -> Dict[str, Any]: + return dict( + filepath=self._filepath, + protocol=self._protocol, + load_args=self._load_args, + save_args=self._save_args, + version=self._version, + ) + + def _load(self) -> h5py.File: + load_path = get_filepath_str(self._get_load_path(), self._protocol) + + with self._fs.open(load_path, **self._fs_open_args_load) as fs_file: + binary_data = fs_file.read() + + with H5pyDataSet._lock: + return H5pyDataSet.__h5py_from_binary(binary_data, self._load_args) + + def _save(self, data: h5py.File) -> None: + save_path = get_filepath_str(self._get_save_path(), self._protocol) + + with H5pyDataSet._lock: + binary_data = H5pyDataSet.__h5py_to_binary(data, self._save_args) + + with self._fs.open(save_path, **self._fs_open_args_save) as fs_file: + fs_file.write(binary_data) + + self._invalidate_cache() + + def _exists(self) -> bool: + try: + load_path = get_filepath_str(self._get_load_path(), self._protocol) + except DataSetError: + return False + + return self._fs.exists(load_path) + + def _release(self) -> None: + super()._release() + self._invalidate_cache() + + def _invalidate_cache(self) -> None: + """Invalidate underlying filesystem caches.""" + filepath = get_filepath_str(self._filepath, self._protocol) + self._fs.invalidate_cache(filepath) diff --git a/static/jsonschema/kedro-catalog-0.18.json b/static/jsonschema/kedro-catalog-0.18.json index f8f53c62e2..12fb4c7cea 100644 --- a/static/jsonschema/kedro-catalog-0.18.json +++ b/static/jsonschema/kedro-catalog-0.18.json @@ -393,6 +393,45 @@ } } }, + { + "if": { + "properties": { + "type": { + "const": "h5py.H5pyDataSet" + } + } + }, + "then": { + "required": [ + "filepath" + ], + "properties": { + "filepath": { + "type": "string", + "description": "Filepath in POSIX format to a hdf file prefixed with a protocol like `s3://`.\nIf prefix is not provided, `file` protocol (local filesystem) will be used.\nThe prefix should be any protocol supported by ``fsspec``.\nNote: `http(s)` doesn't support versioning." + }, + "load_args": { + "type": "object", + "description": "h5py options for loading hdf files.\nYou can find all available arguments at:\nhttps://docs.h5py.org/en/stable/high/file.html#h5py.File\nAll defaults are preserved." + }, + "save_args": { + "type": "object", + "description": "h5py options for saving hdf files.\nYou can find all available arguments at:\nhttps://docs.h5py.org/en/stable/high/file.html#h5py.File\nAll defaults are preserved." + }, + "credentials": { + "type": [ + "object", + "string" + ], + "description": "Credentials required to get access to the underlying filesystem.\nE.g. for ``GCSFileSystem`` it should look like `{\"token\": None}`." + }, + "fs_args": { + "type": "object", + "description": "Extra arguments to pass into underlying filesystem class constructor\n(e.g. `{\"project\": \"my-project\"}` for ``GCSFileSystem``), as well as\nto pass to the filesystem's `open` method through nested keys\n`open_args_load` and `open_args_save`.\nHere you can find all available arguments for `open`:\nhttps://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.open\nAll defaults are preserved, except `mode`, which is set `wb` when saving." + } + } + } + }, { "if": { "properties": { diff --git a/tests/extras/datasets/hdf5/test_h5py_dataset.py b/tests/extras/datasets/hdf5/test_h5py_dataset.py new file mode 100644 index 0000000000..8edefb40e6 --- /dev/null +++ b/tests/extras/datasets/hdf5/test_h5py_dataset.py @@ -0,0 +1,259 @@ +from pathlib import Path, PurePosixPath +from typing import Union + +import h5py +import numpy as np +import pytest +from fsspec.implementations.http import HTTPFileSystem +from fsspec.implementations.local import LocalFileSystem +from gcsfs import GCSFileSystem +from s3fs.core import S3FileSystem + +from kedro.extras.datasets.hdf5.h5py_dataset import H5pyDataSet +from kedro.io import DataSetError +from kedro.io.core import PROTOCOL_DELIMITER, Version + + +@pytest.fixture +def filepath_hdf(tmp_path): + return (tmp_path / "test.h5").as_posix() + + +@pytest.fixture +def hdf_data_set(filepath_hdf, load_args, save_args, mocker, fs_args): + H5pyDataSet._lock = mocker.MagicMock() + return H5pyDataSet( + filepath=filepath_hdf, + load_args=load_args, + save_args=save_args, + fs_args=fs_args, + ) + + +@pytest.fixture +def versioned_hdf_data_set(filepath_hdf, load_version, save_version): + return H5pyDataSet( + filepath=filepath_hdf, version=Version(load_version, save_version) + ) + + +@pytest.fixture +def dummy_filepath_hdf(tmp_path): + return (tmp_path / "dummy.h5").as_posix() + + +@pytest.fixture +def dummy_h5f(dummy_filepath_hdf): + h5f = h5py.File(dummy_filepath_hdf, "w") + group = h5f.create_group("foo") + group.create_dataset("a", data=np.arange(4).reshape((2, 2))) + h5f.create_dataset("b", data=np.array([1, 2])) + return h5f + + +def assert_h5py_equal( + expected: Union[h5py.File, h5py.Dataset], received: Union[h5py.File, h5py.Dataset] +) -> bool: + + try: + if isinstance(expected, h5py.Dataset) or isinstance(received, h5py.Dataset): + assert isinstance(expected, h5py.Dataset) and isinstance( + received, h5py.Dataset + ) + # Indexing a Dataset with () returns all its values + # which in this case are assumed to be numpy arrays + np.testing.assert_equal(expected[()], received[()]) + else: + for key, expected_value in expected.items(): + received_value = received[key] + assert_h5py_equal(expected_value, received_value) + except (AssertionError, KeyError): + return False + + return True + + +class TestH5pyDataSet: + def test_save_and_load(self, hdf_data_set, dummy_h5f): + """Test saving and reloading the data set.""" + hdf_data_set.save(dummy_h5f) + reloaded = hdf_data_set.load() + assert_h5py_equal(dummy_h5f, reloaded) + assert hdf_data_set._fs_open_args_load == {} + assert hdf_data_set._fs_open_args_save == {"mode": "wb"} + + def test_exists(self, hdf_data_set, dummy_h5f): + """Test `exists` method invocation for both existing and + nonexistent data set.""" + assert not hdf_data_set.exists() + hdf_data_set.save(dummy_h5f) + assert hdf_data_set.exists() + + @pytest.mark.parametrize( + "load_args", [{"k1": "v1", "index": "value"}], indirect=True + ) + def test_load_extra_params(self, hdf_data_set, load_args): + """Test overriding the default load arguments.""" + for key, value in load_args.items(): + assert hdf_data_set._load_args[key] == value + + @pytest.mark.parametrize( + "save_args", [{"k1": "v1", "index": "value"}], indirect=True + ) + def test_save_extra_params(self, hdf_data_set, save_args): + """Test overriding the default save arguments.""" + for key, value in save_args.items(): + assert hdf_data_set._save_args[key] == value + + @pytest.mark.parametrize( + "fs_args", + [{"open_args_load": {"mode": "rb", "compression": "gzip"}}], + indirect=True, + ) + def test_open_extra_args(self, hdf_data_set, fs_args): + assert hdf_data_set._fs_open_args_load == fs_args["open_args_load"] + assert hdf_data_set._fs_open_args_save == {"mode": "wb"} # default unchanged + + def test_load_missing_file(self, hdf_data_set): + """Check the error when trying to load missing file.""" + pattern = r"Failed while loading data from data set H5pyDataSet\(.*\)" + with pytest.raises(DataSetError, match=pattern): + hdf_data_set.load() + + @pytest.mark.parametrize( + "filepath,instance_type", + [ + ("s3://bucket/file.h5", S3FileSystem), + ("file:///tmp/test.h5", LocalFileSystem), + ("/tmp/test.h5", LocalFileSystem), + ("gcs://bucket/file.h5", GCSFileSystem), + ("https://example.com/file.h5", HTTPFileSystem), + ], + ) + def test_protocol_usage(self, filepath, instance_type): + data_set = H5pyDataSet(filepath=filepath) + assert isinstance(data_set._fs, instance_type) + + path = filepath.split(PROTOCOL_DELIMITER, 1)[-1] + + assert str(data_set._filepath) == path + assert isinstance(data_set._filepath, PurePosixPath) + + def test_catalog_release(self, mocker): + fs_mock = mocker.patch("fsspec.filesystem").return_value + filepath = "test.h5" + data_set = H5pyDataSet(filepath=filepath) + data_set.release() + fs_mock.invalidate_cache.assert_called_once_with(filepath) + + def test_thread_lock_usage(self, hdf_data_set, dummy_h5f, mocker): + """Test thread lock usage.""" + mocked_lock = H5pyDataSet._lock + mocked_lock.assert_not_called() + + hdf_data_set.save(dummy_h5f) + # pylint: disable=unnecessary-dunder-call + calls = [mocker.call.__enter__(), mocker.call.__exit__(None, None, None)] + mocked_lock.assert_has_calls(calls) + + mocked_lock.reset_mock() + hdf_data_set.load() + mocked_lock.assert_has_calls(calls) + + +class TestH5pyDataSetVersioned: + def test_version_str_repr(self, load_version, save_version): + """Test that version is in string representation of the class instance + when applicable.""" + filepath = "test.h5" + ds = H5pyDataSet(filepath=filepath) + ds_versioned = H5pyDataSet( + filepath=filepath, version=Version(load_version, save_version) + ) + assert filepath in str(ds) + assert "version" not in str(ds) + + assert filepath in str(ds_versioned) + ver_str = f"version=Version(load={load_version}, save='{save_version}')" + assert ver_str in str(ds_versioned) + assert "H5pyDataSet" in str(ds_versioned) + assert "H5pyDataSet" in str(ds) + assert "protocol" in str(ds_versioned) + assert "protocol" in str(ds) + + def test_save_and_load(self, versioned_hdf_data_set, dummy_h5f): + """Test that saved and reloaded data matches the original one for + the versioned data set.""" + versioned_hdf_data_set.save(dummy_h5f) + reloaded_df = versioned_hdf_data_set.load() + assert_h5py_equal(dummy_h5f, reloaded_df) + + def test_no_versions(self, versioned_hdf_data_set): + """Check the error if no versions are available for load.""" + pattern = r"Did not find any versions for H5pyDataSet\(.+\)" + with pytest.raises(DataSetError, match=pattern): + versioned_hdf_data_set.load() + + def test_exists(self, versioned_hdf_data_set, dummy_h5f): + """Test `exists` method invocation for versioned data set.""" + assert not versioned_hdf_data_set.exists() + versioned_hdf_data_set.save(dummy_h5f) + assert versioned_hdf_data_set.exists() + + def test_prevent_overwrite(self, versioned_hdf_data_set, dummy_h5f): + """Check the error when attempting to override the data set if the + corresponding hdf file for a given save version already exists.""" + versioned_hdf_data_set.save(dummy_h5f) + pattern = ( + r"Save path \'.+\' for H5pyDataSet\(.+\) must " + r"not exist if versioning is enabled\." + ) + with pytest.raises(DataSetError, match=pattern): + versioned_hdf_data_set.save(dummy_h5f) + + @pytest.mark.parametrize( + "load_version", ["2019-01-01T23.59.59.999Z"], indirect=True + ) + @pytest.mark.parametrize( + "save_version", ["2019-01-02T00.00.00.000Z"], indirect=True + ) + def test_save_version_warning( + self, versioned_hdf_data_set, load_version, save_version, dummy_h5f + ): + """Check the warning when saving to the path that differs from + the subsequent load path.""" + pattern = ( + rf"Save version '{save_version}' did not match load version " + rf"'{load_version}' for H5pyDataSet\(.+\)" + ) + with pytest.warns(UserWarning, match=pattern): + versioned_hdf_data_set.save(dummy_h5f) + + def test_http_filesystem_no_versioning(self): + pattern = r"HTTP\(s\) DataSet doesn't support versioning\." + + with pytest.raises(DataSetError, match=pattern): + H5pyDataSet( + filepath="https://example.com/file.h5", + version=Version(None, None), + ) + + def test_versioning_existing_dataset( + self, hdf_data_set, versioned_hdf_data_set, dummy_h5f + ): + """Check the error when attempting to save a versioned dataset on top of an + already existing (non-versioned) dataset.""" + hdf_data_set.save(dummy_h5f) + assert hdf_data_set.exists() + assert hdf_data_set._filepath == versioned_hdf_data_set._filepath + pattern = ( + f"(?=.*file with the same name already exists in the directory)" + f"(?=.*{versioned_hdf_data_set._filepath.parent.as_posix()})" + ) + with pytest.raises(DataSetError, match=pattern): + versioned_hdf_data_set.save(dummy_h5f) + + # Remove non-versioned dataset and try again + Path(hdf_data_set._filepath.as_posix()).unlink() + versioned_hdf_data_set.save(dummy_h5f) + assert versioned_hdf_data_set.exists()