Skip to content

Commit

Permalink
Add generic HDF5 dataset
Browse files Browse the repository at this point in the history
Add dataset and tests for HDF5 files which may not be supported by
pandas. Resolves #1026.

Sign-off-by: Eric Oliveira Gomes <ericog123456@gmail.com>
Sign-off-by: tomaz-suller <tomaz.suller@usp.br>
tomaz-suller committed Oct 24, 2022
1 parent cbe0522 commit 719694b
Showing 6 changed files with 529 additions and 0 deletions.
5 changes: 5 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -13,6 +13,11 @@
## Major features and improvements
* The config loader objects now implement `UserDict` and the configuration is accessed through `conf_loader['catalog']`
* You can configure config file patterns through `settings.py` without creating a custom config loader
* Added the following new datasets:

| Type | Description | Location |
| ------------------------- | --------------------------------------------------------------------------------------- | ------------------------------- |
| `hdf5.H5pyDataSet` | Work with HDF5 files through h5py.File objects, which support more formats than pandas | `kedro.extras.datasets.hdf5` |

## Bug fixes and other changes
* Fixed `kedro micropkg pull` for packages on PyPI.
1 change: 1 addition & 0 deletions docs/source/api_docs/kedro.extras.datasets.rst
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@ kedro.extras.datasets
kedro.extras.datasets.dask.ParquetDataSet
kedro.extras.datasets.email.EmailMessageDataSet
kedro.extras.datasets.geopandas.GeoJSONDataSet
kedro.extras.datasets.hdf5.H5pyDataSet
kedro.extras.datasets.holoviews.HoloviewsWriter
kedro.extras.datasets.json.JSONDataSet
kedro.extras.datasets.matplotlib.MatplotlibWriter
8 changes: 8 additions & 0 deletions kedro/extras/datasets/hdf5/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
"""``AbstractDataSet`` implementation that produces arbitrary objects from HDF5 files."""

__all__ = ["H5pyDataSet"]

from contextlib import suppress

with suppress(ImportError):
from .h5py_dataset import H5pyDataSet
217 changes: 217 additions & 0 deletions kedro/extras/datasets/hdf5/h5py_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
"""``H5pyDataSet`` loads/saves data from/to a hdf file using an underlying
filesystem (e.g.: local, S3, GCS). It uses h5py.File to handle the hdf file.
"""

import tempfile
from copy import deepcopy
from io import BytesIO
from pathlib import PurePosixPath
from threading import Lock
from typing import Any, Dict

import fsspec
import h5py

from kedro.io.core import (
AbstractVersionedDataSet,
DataSetError,
Version,
get_filepath_str,
get_protocol_and_path,
)


class H5pyDataSet(AbstractVersionedDataSet):
"""``H5pyDataSet`` loads/saves data from/to a hdf file using an underlying
filesystem (e.g. local, S3, GCS). It uses h5py.File to handle the hdf file.
Example adding a catalog entry with
`YAML API <https://kedro.readthedocs.io/en/stable/05_data/\
01_data_catalog.html#using-the-data-catalog-with-the-yaml-api>`_:
.. code-block:: yaml
>>> hdf_dataset:
>>> type: hdf5.H5pyDataSet
>>> filepath: s3://my_bucket/raw/sensor_reading.h5
>>> credentials: aws_s3_creds
Example using Python API:
::
>>> from kedro.extras.datasets.hdf5 import H5pyDataSet
>>> from tempfile import TemporaryFile
>>> import numpy as np
>>> import h5py
>>>
>>> file = TemporaryFile()
>>> h5f = h5py.File(file)
>>> dataset = h5f.create_dataset('foo', data=[1,2])
>>>
>>> # data_set = H5pyDataSet(filepath="gcs://bucket/test.hdf")
>>> data_set = H5pyDataSet(filepath="test.h5")
>>> # Saved and loaded objects are both h5py.File instances
>>> data_set.save(h5f)
>>> reloaded = data_set.load()
>>> assert 'foo' in reloaded
>>> np.testing.assert_equal(reloaded['foo'][()], [1, 2])
>>>
>>> h5f.close()
>>> file.close()
"""

# _lock is a class attribute that will be shared across all the instances.
# It is used to make dataset safe for threads.
_lock = Lock()
DEFAULT_LOAD_ARGS = {} # type: Dict[str, Any]
DEFAULT_SAVE_ARGS = {} # type: Dict[str, Any]

# pylint: disable=protected-access
@staticmethod
def __h5py_from_binary(binary, load_args):
file_access_property_list = h5py.h5p.create(h5py.h5p.FILE_ACCESS)
file_access_property_list.set_fapl_core(backing_store=False)
file_access_property_list.set_file_image(binary)

file_id_args = {
"fapl": file_access_property_list,
"flags": h5py.h5f.ACC_RDONLY,
"name": next(tempfile._get_candidate_names()).encode(),
}
h5_file_args = {"backing_store": False, "driver": "core", "mode": "r"}

file_id = h5py.h5f.open(**file_id_args)
return h5py.File(file_id, **h5_file_args, **load_args)

@staticmethod
def __h5py_to_binary(h5f: h5py.File, save_args):
bio = BytesIO()
with h5py.File(bio, "w", **save_args) as biof:
for _, value in h5f.items():
h5f.copy(
value,
biof,
expand_soft=True,
expand_external=True,
expand_refs=True,
)
biof.close()
return bio.getvalue()

# pylint: disable=too-many-arguments
def __init__(
self,
filepath: str,
load_args: Dict[str, Any] = None,
save_args: Dict[str, Any] = None,
version: Version = None,
credentials: Dict[str, Any] = None,
fs_args: Dict[str, Any] = None,
) -> None:
"""Creates a new instance of ``H5pyDataSet`` pointing to a concrete hdf file
on a specific filesystem.
Args:
filepath: Filepath in POSIX format to a hdf file prefixed with a protocol like `s3://`.
If prefix is not provided, `file` protocol (local filesystem) will be used.
The prefix should be any protocol supported by ``fsspec``.
Note: `http(s)` doesn't support versioning.
load_args: h5py options for loading hdf files.
You can find all available arguments at:
https://docs.h5py.org/en/stable/high/file.html#h5py.File
All defaults are preserved.
save_args: h5py options for saving hdf files.
You can find all available arguments at:
https://docs.h5py.org/en/stable/high/file.html#h5py.File
All defaults are preserved.
version: If specified, should be an instance of
``kedro.io.core.Version``. If its ``load`` attribute is
None, the latest version will be loaded. If its ``save``
attribute is None, save version will be autogenerated.
credentials: Credentials required to get access to the underlying filesystem.
E.g. for ``GCSFileSystem`` it should look like `{"token": None}`.
fs_args: Extra arguments to pass into underlying filesystem class constructor
(e.g. `{"project": "my-project"}` for ``GCSFileSystem``), as well as
to pass to the filesystem's `open` method through nested keys
`open_args_load` and `open_args_save`.
Here you can find all available arguments for `open`:
https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.open
All defaults are preserved, except `mode`, which is set `wb` when saving.
"""
_fs_args = deepcopy(fs_args) or {}
_fs_open_args_load = _fs_args.pop("open_args_load", {})
_fs_open_args_save = _fs_args.pop("open_args_save", {})
_credentials = deepcopy(credentials) or {}

protocol, path = get_protocol_and_path(filepath, version)
if protocol == "file":
_fs_args.setdefault("auto_mkdir", True)

self._protocol = protocol
self._fs = fsspec.filesystem(self._protocol, **_credentials, **_fs_args)

super().__init__(
filepath=PurePosixPath(path),
version=version,
exists_function=self._fs.exists,
glob_function=self._fs.glob,
)

# Handle default load and save arguments
self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS)
if load_args is not None:
self._load_args.update(load_args)
self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS)
if save_args is not None:
self._save_args.update(save_args)

_fs_open_args_save.setdefault("mode", "wb")
self._fs_open_args_load = _fs_open_args_load
self._fs_open_args_save = _fs_open_args_save

def _describe(self) -> Dict[str, Any]:
return dict(
filepath=self._filepath,
protocol=self._protocol,
load_args=self._load_args,
save_args=self._save_args,
version=self._version,
)

def _load(self) -> h5py.File:
load_path = get_filepath_str(self._get_load_path(), self._protocol)

with self._fs.open(load_path, **self._fs_open_args_load) as fs_file:
binary_data = fs_file.read()

with H5pyDataSet._lock:
return H5pyDataSet.__h5py_from_binary(binary_data, self._load_args)

def _save(self, data: h5py.File) -> None:
save_path = get_filepath_str(self._get_save_path(), self._protocol)

with H5pyDataSet._lock:
binary_data = H5pyDataSet.__h5py_to_binary(data, self._save_args)

with self._fs.open(save_path, **self._fs_open_args_save) as fs_file:
fs_file.write(binary_data)

self._invalidate_cache()

def _exists(self) -> bool:
try:
load_path = get_filepath_str(self._get_load_path(), self._protocol)
except DataSetError:
return False

return self._fs.exists(load_path)

def _release(self) -> None:
super()._release()
self._invalidate_cache()

def _invalidate_cache(self) -> None:
"""Invalidate underlying filesystem caches."""
filepath = get_filepath_str(self._filepath, self._protocol)
self._fs.invalidate_cache(filepath)
39 changes: 39 additions & 0 deletions static/jsonschema/kedro-catalog-0.18.json
Original file line number Diff line number Diff line change
@@ -393,6 +393,45 @@
}
}
},
{
"if": {
"properties": {
"type": {
"const": "h5py.H5pyDataSet"
}
}
},
"then": {
"required": [
"filepath"
],
"properties": {
"filepath": {
"type": "string",
"description": "Filepath in POSIX format to a hdf file prefixed with a protocol like `s3://`.\nIf prefix is not provided, `file` protocol (local filesystem) will be used.\nThe prefix should be any protocol supported by ``fsspec``.\nNote: `http(s)` doesn't support versioning."
},
"load_args": {
"type": "object",
"description": "h5py options for loading hdf files.\nYou can find all available arguments at:\nhttps://docs.h5py.org/en/stable/high/file.html#h5py.File\nAll defaults are preserved."
},
"save_args": {
"type": "object",
"description": "h5py options for saving hdf files.\nYou can find all available arguments at:\nhttps://docs.h5py.org/en/stable/high/file.html#h5py.File\nAll defaults are preserved."
},
"credentials": {
"type": [
"object",
"string"
],
"description": "Credentials required to get access to the underlying filesystem.\nE.g. for ``GCSFileSystem`` it should look like `{\"token\": None}`."
},
"fs_args": {
"type": "object",
"description": "Extra arguments to pass into underlying filesystem class constructor\n(e.g. `{\"project\": \"my-project\"}` for ``GCSFileSystem``), as well as\nto pass to the filesystem's `open` method through nested keys\n`open_args_load` and `open_args_save`.\nHere you can find all available arguments for `open`:\nhttps://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.open\nAll defaults are preserved, except `mode`, which is set `wb` when saving."
}
}
}
},
{
"if": {
"properties": {
Loading

0 comments on commit 719694b

Please sign in to comment.