Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce HDF file cache #83

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
202 changes: 169 additions & 33 deletions h5io_browser/base.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import contextlib
import os
import posixpath
import sys
Expand Down Expand Up @@ -36,7 +37,7 @@ def delete_item(file_name: str, h5_path: str) -> None:
"""
try:
if os.path.exists(file_name):
with _open_hdf(file_name, mode="a") as store:
with open_hdf(file_name, mode="a") as store:
del store[h5_path]
except (AttributeError, KeyError):
pass
Expand All @@ -58,7 +59,7 @@ def list_hdf(
(list, list): list of HDF5 nodes and list of HDF5 groups
"""
if os.path.exists(file_name):
with h5py.File(file_name, "r") as hdf:
with open_hdf(file_name, "r") as hdf:
try:
return _get_hdf_content(hdf=hdf[h5_path], recursive=recursive)
except KeyError:
Expand Down Expand Up @@ -93,7 +94,7 @@ def read_dict_from_hdf(
"""
if h5_path[0] != "/":
h5_path = "/" + h5_path
with h5py.File(file_name, "r") as hdf:
with open_hdf(file_name, "r", swmr=True) as hdf:
group_attrs_dict = hdf[h5_path].attrs
if (
"TITLE" in group_attrs_dict.keys()
Expand Down Expand Up @@ -154,7 +155,7 @@ def write_dict_to_hdf(
keys in data. This does not apply to the top level name (title). If 'error', '/' is not allowed
in any lower-level keys.
"""
with _open_hdf(file_name, mode="a") as store:
with open_hdf(file_name, mode="a") as store:
for k, v in data_dict.items():
_write_hdf5_with_json_support(
hdf_filehandle=store,
Expand Down Expand Up @@ -230,6 +231,78 @@ def _merge_nested_dict(main_dict: dict, add_dict: dict) -> dict:
return main_dict


_FILE_HANDLE_CACHE = {}


def _fetch_handle(filename, mode):
"""Look up handle with compatible mode."""
# lists which modes (of already cached open handles) are suitable for each of the requested modes
acceptable_modes = {
# we can also read from r+/a modes, but e.g. an a mode handle won't do for a r+ request, because r+ wants to
# create the file
"r": ["r", "r+", "a"],
"w": ["w", "r+", "a"],
"a": ["a", "r+"],
}.get(mode, [mode])
for mode in acceptable_modes:
handle = _FILE_HANDLE_CACHE.get((filename, mode), None)
if handle is not None:
return handle


class HdfCacheWarning(UserWarning):
pass


@contextlib.contextmanager
def CachedHDF(filename: str, mode: str = "r"):
"""
Cache a HDF5 file handle.

After entering a with statement with this context manager, all calls to :func:`.open_hdf` are rerouted to check
inside a (global) file handle cache. File handles are cached separately for different modes, but as long as modes
are compatible open handles are preferred to opening multiple files (e.g. a cached append mode handle would be
returned from :func:`.open_hdf` when a read mode one is requested).

Yields the file handle. The handle yielded from here or ones obtained from :func:`.open_hdf` with in the context
must not be explicitly closed. If this is detected, the file is reopened and a warning is printed.

Single Writer Multiple Reader (SWMR) mode is implied in this context.

Args:
filename (str): Name of the file on disk
mode (str): r Readonly, file must exist (default)
r+ Read/write, file must exist
w Create file, truncate if exists
w- or x Create file, fail if exists
a Read/write if exists, create otherwise

Yields:
h5py.File: cached file handle
"""
filename = os.path.realpath(os.path.abspath(filename))
handle = _fetch_handle(filename, mode)
# File already cached, someone above us opened it and will clean it up, we just need to pass it down
if handle is not None:
yield handle
else:
with _open_hdf(filename, mode, swmr=True) as handle:
_FILE_HANDLE_CACHE[filename, handle.mode] = handle
try:
yield handle
finally:
# HDF file are true-ish when open, false-ish when not
if not handle or handle not in _FILE_HANDLE_CACHE.values():
warnings.warn(
"Cached file handle closed or no longer in cache. "
"This indicates a programming bug in a caller of open_hdf. "
"You should not manually close file handles obtained from this function.",
category=HdfCacheWarning,
stacklevel=2,
)
_FILE_HANDLE_CACHE.pop((filename, handle.mode), None)


def _open_hdf(filename: str, mode: str = "r", swmr: bool = False) -> h5py.File:
"""
Open HDF5 file
Expand All @@ -248,13 +321,60 @@ def _open_hdf(filename: str, mode: str = "r", swmr: bool = False) -> h5py.File:
h5py.File: open HDF5 file object
"""
if swmr and mode != "r":
store = h5py.File(name=filename, mode=mode, libver="latest")
store.swmr_mode = True
store = h5py.File(name=filename, mode=mode, libver="latest", swmr=False)
if not store.swmr_mode:
store.swmr_mode = True
Comment on lines +324 to +326
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change necessary? The previous code was based on the example from the h5py documentation:
https://docs.h5py.org/en/stable/swmr.html#using-the-swmr-feature-from-h5py

return store
else:
return h5py.File(name=filename, mode=mode, libver="latest", swmr=swmr)


def open_hdf(
filename: str, mode: str = "r", swmr: bool = False, cache: bool = False
) -> contextlib.AbstractContextManager[h5py.File]:
"""
Open HDF5 file with optional caching.

When a matching file handle exists in the cache, it is always returned, no matter whether `cache` is given or not.

This function may return a bare file handle or it may return a context manager yielding a file handle, as a
consequence calling code must not try to call normal hdf functions or methods on the return value, but always unwrap
it first by entering a with statement.

Args:
filename (str): Name of the file on disk, or file-like object. Note: for files created with the 'core' driver,
HDF5 still requires this be non-empty.
mode (str): r Readonly, file must exist (default)
r+ Read/write, file must exist
w Create file, truncate if exists
w- or x Create file, fail if exists
a Read/write if exists, create otherwise
swmr (bool): Open the file in SWMR read mode. Only used when mode = 'r'.
cache (bool): Keep the file handle in a cache or return it from there

Returns:
context manager around h5py.File: the opened or cached HDF5 file object
"""
filename = os.path.realpath(os.path.abspath(filename))
handle = _fetch_handle(filename, mode)
if handle is not None:
if not handle:
warnings.warn(
"File handle found in cache, but in closed state. "
"This indicates a programming bug in a previous caller of open_hdf. "
"You should not manually close file handles obtained from this function.",
category=HdfCacheWarning,
stacklevel=2,
)
del _FILE_HANDLE_CACHE[filename, handle.mode]
return CachedHDF(filename, mode)
return contextlib.nullcontext(handle)
if cache:
return CachedHDF(filename, mode)
else:
return _open_hdf(filename, mode, swmr)


def _read_hdf(
hdf_filehandle: Union[str, h5py.File], h5_path: str, slash: str = "ignore"
) -> Any:
Expand All @@ -271,17 +391,22 @@ def _read_hdf(
object: The loaded data. Can be of any type supported by ``write_hdf5``.
"""
file_name = _get_filename_from_filehandle(hdf_filehandle=hdf_filehandle)
return _retry(
lambda: h5io.read_hdf5(
fname=hdf_filehandle,
title=h5_path,
slash=slash,
),
error=BlockingIOError,
msg=f"Two or more processes tried to access the file {file_name}.",
at_most=10,
delay=1,
)
if isinstance(hdf_filehandle, str):
hdf_context = open_hdf(hdf_filehandle, mode="r", swmr=True)
else:
hdf_context = contextlib.nullcontext(hdf_filehandle)
with hdf_context as hdf_filehandle:
return _retry(
lambda: h5io.read_hdf5(
fname=hdf_filehandle,
title=h5_path,
slash=slash,
),
error=BlockingIOError,
msg=f"Two or more processes tried to access the file {file_name}.",
at_most=10,
delay=1,
)


def _read_dict_from_open_hdf(hdf_filehandle, h5_path, recursive=False, slash="ignore"):
Expand Down Expand Up @@ -347,22 +472,33 @@ def _write_hdf(
the HDF5 file. (requires python >=3.11)
"""
file_name = _get_filename_from_filehandle(hdf_filehandle=hdf_filehandle)
_retry(
lambda: h5io.write_hdf5(
fname=hdf_filehandle,
data=data,
overwrite=overwrite,
compression=compression,
title=h5_path,
slash=slash,
use_json=use_json,
use_state=use_state,
),
error=BlockingIOError,
msg=f"Two or more processes tried to access the file {file_name}.",
at_most=10,
delay=1,
)
if isinstance(hdf_filehandle, str):
if overwrite == "update":
mode = "a"
elif overwrite:
mode = "w"
else:
mode = "w-"
hdf_context = open_hdf(hdf_filehandle, mode=mode)
else:
hdf_context = contextlib.nullcontext(hdf_filehandle)
with hdf_context as hdf_filehandle:
_retry(
lambda: h5io.write_hdf5(
fname=hdf_filehandle,
data=data,
overwrite=overwrite,
compression=compression,
title=h5_path,
slash=slash,
use_json=use_json,
use_state=use_state,
),
error=BlockingIOError,
msg=f"Two or more processes tried to access the file {file_name}.",
at_most=10,
delay=1,
)


def _write_hdf5_with_json_support(
Expand Down
42 changes: 41 additions & 1 deletion tests/test_base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import sys
import numpy as np
import h5py
from unittest import TestCase
Expand All @@ -12,6 +13,8 @@
write_dict_to_hdf,
)
from h5io_browser.base import (
CachedHDF,
open_hdf,
_get_hdf_content,
_is_ragged_in_1st_dim_only,
_read_dict_from_open_hdf,
Expand Down Expand Up @@ -439,7 +442,7 @@ def setUp(self):
"b": 42,
}
self.h5_path = "h5io"
h5io.write_hdf5("testcomp.h5", self.data)
h5io.write_hdf5(self.file_name, self.data)

def test_h5io(self):
dataread = h5io.read_hdf5(self.file_name, self.h5_path)
Expand Down Expand Up @@ -553,3 +556,40 @@ def test_hdf5_structure(self):
{"data_hierarchical/c/e": {"TITLE": "ndarray"}},
],
)


class CachedTestMixin:
"""
Run mixed in test cases, but open hdf file cache before each method.

WARNING: Assumes that cls.file_name is defined and is the only HDF file
accessed during all test methods.
"""

def setUp(self):
super().setUp()
# Test with an append mode file cache, if test request read mode files
# it will still receive the cached handle
cache = CachedHDF(self.file_name, "a")
# Poor man's enterContext, it seems if enterContext is used the clean
# up action is only performed after the tearDown and so tearDown and
# context manager contest the underlying hdf5 files and that causes
# errors on windows
self._context = cache
cache.__enter__()

def tearDown(self):
self._context.__exit__(None, None, None)
super().tearDown()


class TestBaseHierachicalCached(CachedTestMixin, TestBaseHierachical):
pass


class TestBaseJSONCached(CachedTestMixin, TestBaseJSON):
pass


class TestBasePartialRead(CachedTestMixin, TestBasePartialRead):
pass
Loading