From 2d61d1aa2382c08f746e61cd2fe7c4c5cd5c769f Mon Sep 17 00:00:00 2001 From: Tom Nicholas Date: Wed, 15 May 2024 21:35:39 -0600 Subject: [PATCH] Inline loaded variables into kerchunk references (#73) * add test vs kerchunk inlining * encode inlined data correctly (at least for standard numpy arrays) * don't test time variable for now * store fill_value as np.NaN, and always coerce it * test passing for lat and lon variables * formatting * encode numpy types * tidy internal import * parametrize test to test inlining different variables * raise when encoding encountered during serialization of numpy arrays * see if closing the netcdf files in the fixtures fixes the kerchunk error * update docs * ensure inline_threshold is an int * formatting * specified NetCDF4 for netcdf4_file fixture & added netcdf4 to pyproject.toml [test] (#116) * organize tests * dont unnecessarily slice dataset --------- Co-authored-by: Raphael Hagen --- docs/usage.md | 4 +- pyproject.toml | 7 +- virtualizarr/kerchunk.py | 83 ++++++++--- virtualizarr/tests/conftest.py | 5 +- virtualizarr/tests/test_integration.py | 132 +++++++++--------- virtualizarr/tests/test_kerchunk.py | 10 +- .../tests/test_manifests/test_array.py | 2 +- virtualizarr/tests/test_zarr.py | 2 +- virtualizarr/zarr.py | 21 ++- 9 files changed, 171 insertions(+), 95 deletions(-) diff --git a/docs/usage.md b/docs/usage.md index 425ee49d..691e69eb 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -333,8 +333,10 @@ These references can now be interpreted like they were a Zarr store by [fsspec]( combined_ds = xr.open_dataset('combined.json', engine="kerchunk") ``` +In-memory ("loadable") variables backed by numpy arrays can also be written out to kerchunk reference files, with the values serialized as bytes. This is equivalent to kerchunk's concept of "inlining", but done on a per-array basis using the `loadable_variables` kwarg rather than a per-chunk basis using kerchunk's `inline_threshold` kwarg. + ```{note} -Currently you can only serialize virtual variables backed by `ManifestArray` objects to kerchunk reference files, not real in-memory numpy-backed variables. +Currently you can only serialize in-memory variables to kerchunk references if they do not have any encoding. ``` When you have many chunks, the reference file can get large enough to be unwieldy as json. In that case the references can be instead stored as parquet. Again this uses kerchunk internally. diff --git a/pyproject.toml b/pyproject.toml index 8338279c..3bf03529 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,15 +35,16 @@ dependencies = [ test = [ "codecov", "pre-commit", + "ruff", "pytest-mypy", "pytest-cov", "pytest", - "fsspec", "pooch", "scipy", - "ruff", + "netcdf4", + "fsspec", + "s3fs", "fastparquet", - "s3fs" ] diff --git a/virtualizarr/kerchunk.py b/virtualizarr/kerchunk.py index f24d5f3e..3ba39457 100644 --- a/virtualizarr/kerchunk.py +++ b/virtualizarr/kerchunk.py @@ -1,9 +1,14 @@ +import base64 +import json +from enum import Enum, auto from pathlib import Path from typing import NewType, Optional, cast +import numpy as np import ujson # type: ignore import xarray as xr +from virtualizarr.manifests.manifest import join from virtualizarr.utils import _fsspec_openfile_from_filepath from virtualizarr.zarr import ZArray, ZAttrs @@ -19,9 +24,6 @@ ) # lower-level dict containing just the information for one zarr array -from enum import Enum, auto - - class AutoName(Enum): # Recommended by official Python docs for auto naming: # https://docs.python.org/3/library/enum.html#using-automatic-values @@ -38,6 +40,16 @@ class FileType(AutoName): zarr = auto() +class NumpyEncoder(json.JSONEncoder): + # TODO I don't understand how kerchunk gets around this problem of encoding numpy types (in the zattrs) whilst only using ujson + def default(self, obj): + if isinstance(obj, np.ndarray): + return obj.tolist() # Convert NumPy array to Python list + elif isinstance(obj, np.generic): + return obj.item() # Convert NumPy scalar to Python scalar + return json.JSONEncoder.default(self, obj) + + def read_kerchunk_references_from_file( filepath: str, filetype: FileType | None, @@ -194,7 +206,7 @@ def dataset_to_kerchunk_refs(ds: xr.Dataset) -> KerchunkStoreRefs: all_arr_refs = {} for var_name, var in ds.variables.items(): - arr_refs = variable_to_kerchunk_arr_refs(var) + arr_refs = variable_to_kerchunk_arr_refs(var, var_name) prepended_with_var_name = { f"{var_name}/{key}": val for key, val in arr_refs.items() @@ -206,6 +218,7 @@ def dataset_to_kerchunk_refs(ds: xr.Dataset) -> KerchunkStoreRefs: "version": 1, "refs": { ".zgroup": '{"zarr_format":2}', + ".zattrs": ujson.dumps(ds.attrs), **all_arr_refs, }, } @@ -213,31 +226,67 @@ def dataset_to_kerchunk_refs(ds: xr.Dataset) -> KerchunkStoreRefs: return cast(KerchunkStoreRefs, ds_refs) -def variable_to_kerchunk_arr_refs(var: xr.Variable) -> KerchunkArrRefs: +def variable_to_kerchunk_arr_refs(var: xr.Variable, var_name: str) -> KerchunkArrRefs: """ - Create a dictionary containing kerchunk-style array references from a single xarray.Variable (which wraps a ManifestArray). + Create a dictionary containing kerchunk-style array references from a single xarray.Variable (which wraps either a ManifestArray or a numpy array). Partially encodes the inner dicts to json to match kerchunk behaviour (see https://github.com/fsspec/kerchunk/issues/415). """ from virtualizarr.manifests import ManifestArray - marr = var.data + if isinstance(var.data, ManifestArray): + marr = var.data - if not isinstance(marr, ManifestArray): - raise TypeError( - f"Can only serialize wrapped arrays of type ManifestArray, but got type {type(marr)}" - ) + arr_refs: dict[str, str | list[str | int]] = { + str(chunk_key): chunk_entry.to_kerchunk() + for chunk_key, chunk_entry in marr.manifest.entries.items() + } - arr_refs: dict[str, str | list[str | int]] = { - str(chunk_key): chunk_entry.to_kerchunk() - for chunk_key, chunk_entry in marr.manifest.entries.items() - } + zarray = marr.zarray + + else: + try: + np_arr = var.to_numpy() + except AttributeError as e: + raise TypeError( + f"Can only serialize wrapped arrays of type ManifestArray or numpy.ndarray, but got type {type(var.data)}" + ) from e + + if var.encoding: + if "scale_factor" in var.encoding: + raise NotImplementedError( + f"Cannot serialize loaded variable {var_name}, as it is encoded with a scale_factor" + ) + if "offset" in var.encoding: + raise NotImplementedError( + f"Cannot serialize loaded variable {var_name}, as it is encoded with an offset" + ) + if "calendar" in var.encoding: + raise NotImplementedError( + f"Cannot serialize loaded variable {var_name}, as it is encoded with a calendar" + ) + + # This encoding is what kerchunk does when it "inlines" data, see https://github.com/fsspec/kerchunk/blob/a0c4f3b828d37f6d07995925b324595af68c4a19/kerchunk/hdf.py#L472 + byte_data = np_arr.tobytes() + # TODO do I really need to encode then decode like this? + inlined_data = (b"base64:" + base64.b64encode(byte_data)).decode("utf-8") + + # TODO can this be generalized to save individual chunks of a dask array? + # TODO will this fail for a scalar? + arr_refs = {join(0 for _ in np_arr.shape): inlined_data} + + zarray = ZArray( + chunks=np_arr.shape, + shape=np_arr.shape, + dtype=np_arr.dtype, + order="C", + ) - zarray_dict = marr.zarray.to_kerchunk_json() + zarray_dict = zarray.to_kerchunk_json() arr_refs[".zarray"] = zarray_dict zattrs = var.attrs zattrs["_ARRAY_DIMENSIONS"] = list(var.dims) - arr_refs[".zattrs"] = ujson.dumps(zattrs) + arr_refs[".zattrs"] = json.dumps(zattrs, separators=(",", ":"), cls=NumpyEncoder) return cast(KerchunkArrRefs, arr_refs) diff --git a/virtualizarr/tests/conftest.py b/virtualizarr/tests/conftest.py index 51d672d7..08651353 100644 --- a/virtualizarr/tests/conftest.py +++ b/virtualizarr/tests/conftest.py @@ -9,7 +9,8 @@ def netcdf4_file(tmpdir): # Save it to disk as netCDF (in temporary directory) filepath = f"{tmpdir}/air.nc" - ds.to_netcdf(filepath) + ds.to_netcdf(filepath, format="NETCDF4") + ds.close() return filepath @@ -28,5 +29,7 @@ def netcdf4_files(tmpdir): filepath2 = f"{tmpdir}/air2.nc" ds1.to_netcdf(filepath1) ds2.to_netcdf(filepath2) + ds1.close() + ds2.close() return filepath1, filepath2 diff --git a/virtualizarr/tests/test_integration.py b/virtualizarr/tests/test_integration.py index 3d199b73..064968b3 100644 --- a/virtualizarr/tests/test_integration.py +++ b/virtualizarr/tests/test_integration.py @@ -1,4 +1,3 @@ -import fsspec import pytest import xarray as xr import xarray.testing as xrt @@ -6,63 +5,91 @@ from virtualizarr import open_virtual_dataset -def test_kerchunk_roundtrip_no_concat(tmpdir): - # set up example xarray dataset - ds = xr.tutorial.open_dataset("air_temperature", decode_times=False) - - # save it to disk as netCDF (in temporary directory) - ds.to_netcdf(f"{tmpdir}/air.nc") +@pytest.mark.parametrize( + "inline_threshold, vars_to_inline", + [ + (5e2, ["lat", "lon"]), + pytest.param( + 5e4, ["lat", "lon", "time"], marks=pytest.mark.xfail(reason="time encoding") + ), + pytest.param( + 5e7, + ["lat", "lon", "time", "air"], + marks=pytest.mark.xfail(reason="scale factor encoding"), + ), + ], +) +def test_numpy_arrays_to_inlined_kerchunk_refs( + netcdf4_file, inline_threshold, vars_to_inline +): + from kerchunk.hdf import SingleHdf5ToZarr + + # inline_threshold is chosen to test inlining only the variables listed in vars_to_inline + expected = SingleHdf5ToZarr( + netcdf4_file, inline_threshold=int(inline_threshold) + ).translate() + + # loading the variables should produce same result as inlining them using kerchunk + vds = open_virtual_dataset( + netcdf4_file, loadable_variables=vars_to_inline, indexes={} + ) + refs = vds.virtualize.to_kerchunk(format="dict") - # use open_dataset_via_kerchunk to read it as references - vds = open_virtual_dataset(f"{tmpdir}/air.nc", indexes={}) + # TODO I would just compare the entire dicts but kerchunk returns inconsistent results - see https://github.com/TomNicholas/VirtualiZarr/pull/73#issuecomment-2040931202 + # assert refs == expected + assert refs["refs"]["air/0.0.0"] == expected["refs"]["air/0.0.0"] + assert refs["refs"]["lon/0"] == expected["refs"]["lon/0"] + assert refs["refs"]["lat/0"] == expected["refs"]["lat/0"] + assert refs["refs"]["time/0"] == expected["refs"]["time/0"] - # write those references to disk as kerchunk json - vds.virtualize.to_kerchunk(f"{tmpdir}/refs.json", format="json") - # use fsspec to read the dataset from disk via the zarr store - fs = fsspec.filesystem("reference", fo=f"{tmpdir}/refs.json") - m = fs.get_mapper("") +@pytest.mark.parametrize("format", ["json", "parquet"]) +class TestKerchunkRoundtrip: + def test_kerchunk_roundtrip_no_concat(self, tmpdir, format): + # set up example xarray dataset + ds = xr.tutorial.open_dataset("air_temperature", decode_times=False) - roundtrip = xr.open_dataset(m, engine="kerchunk") + # save it to disk as netCDF (in temporary directory) + ds.to_netcdf(f"{tmpdir}/air.nc") - # assert equal to original dataset - xrt.assert_equal(roundtrip, ds) + # use open_dataset_via_kerchunk to read it as references + vds = open_virtual_dataset(f"{tmpdir}/air.nc", indexes={}) + # write those references to disk as kerchunk json + vds.virtualize.to_kerchunk(f"{tmpdir}/refs.{format}", format=format) -def test_kerchunk_roundtrip_concat(tmpdir): - # set up example xarray dataset - ds = xr.tutorial.open_dataset("air_temperature", decode_times=False).isel( - time=slice(None, 2000) - ) + # use fsspec to read the dataset from disk via the zarr store + roundtrip = xr.open_dataset(f"{tmpdir}/refs.{format}", engine="kerchunk") - # split into two datasets - ds1, ds2 = ds.isel(time=slice(None, 1000)), ds.isel(time=slice(1000, None)) + # assert equal to original dataset + xrt.assert_equal(roundtrip, ds) - # save it to disk as netCDF (in temporary directory) - ds1.to_netcdf(f"{tmpdir}/air1.nc") - ds2.to_netcdf(f"{tmpdir}/air2.nc") + def test_kerchunk_roundtrip_concat(self, tmpdir, format): + # set up example xarray dataset + ds = xr.tutorial.open_dataset("air_temperature", decode_times=False) - # use open_dataset_via_kerchunk to read it as references - vds1 = open_virtual_dataset(f"{tmpdir}/air1.nc", indexes={}) - vds2 = open_virtual_dataset(f"{tmpdir}/air2.nc", indexes={}) + # split into two datasets + ds1, ds2 = ds.isel(time=slice(None, 1460)), ds.isel(time=slice(1460, None)) - # concatenate virtually along time - vds = xr.concat([vds1, vds2], dim="time", coords="minimal", compat="override") - print(vds["air"].variable._data) + # save it to disk as netCDF (in temporary directory) + ds1.to_netcdf(f"{tmpdir}/air1.nc") + ds2.to_netcdf(f"{tmpdir}/air2.nc") - # write those references to disk as kerchunk json - vds.virtualize.to_kerchunk(f"{tmpdir}/refs.json", format="json") + # use open_dataset_via_kerchunk to read it as references + vds1 = open_virtual_dataset(f"{tmpdir}/air1.nc", indexes={}) + vds2 = open_virtual_dataset(f"{tmpdir}/air2.nc", indexes={}) - # use fsspec to read the dataset from disk via the zarr store - fs = fsspec.filesystem("reference", fo=f"{tmpdir}/refs.json") - m = fs.get_mapper("") + # concatenate virtually along time + vds = xr.concat([vds1, vds2], dim="time", coords="minimal", compat="override") - roundtrip = xr.open_dataset(m, engine="kerchunk") + # write those references to disk as kerchunk json + vds.virtualize.to_kerchunk(f"{tmpdir}/refs.{format}", format=format) - # user does analysis here + # use fsspec to read the dataset from disk via the zarr store + roundtrip = xr.open_dataset(f"{tmpdir}/refs.{format}", engine="kerchunk") - # assert equal to original dataset - xrt.assert_equal(roundtrip, ds) + # assert equal to original dataset + xrt.assert_equal(roundtrip, ds) def test_open_scalar_variable(tmpdir): @@ -73,24 +100,3 @@ def test_open_scalar_variable(tmpdir): vds = open_virtual_dataset(f"{tmpdir}/scalar.nc") assert vds["a"].shape == () - - -@pytest.mark.parametrize("format", ["json", "parquet"]) -def test_kerchunk_roundtrip(tmpdir, format): - # set up example xarray dataset - ds = xr.tutorial.open_dataset("air_temperature", decode_times=False) - - # save it to disk as netCDF (in temporary directory) - ds.to_netcdf(f"{tmpdir}/air.nc") - - # use open_virtual_dataset to read it as references - vds = open_virtual_dataset(f"{tmpdir}/air.nc", indexes={}) - - # write those references to disk as kerchunk json - vds.virtualize.to_kerchunk(f"{tmpdir}/refs.{format}", format=format) - - # read the dataset from disk via the zarr store - roundtrip = xr.open_dataset(f"{tmpdir}/refs.{format}", engine="kerchunk") - - # assert equal to original dataset - xrt.assert_equal(roundtrip, ds) diff --git a/virtualizarr/tests/test_kerchunk.py b/virtualizarr/tests/test_kerchunk.py index 001aeca7..57e7493f 100644 --- a/virtualizarr/tests/test_kerchunk.py +++ b/virtualizarr/tests/test_kerchunk.py @@ -40,7 +40,7 @@ def test_dataset_from_df_refs(): assert da.data.zarray.compressor is None assert da.data.zarray.filters is None - assert da.data.zarray.fill_value is None + assert da.data.zarray.fill_value is np.NaN assert da.data.zarray.order == "C" assert da.data.manifest.dict() == { @@ -76,7 +76,7 @@ def test_accessor_to_kerchunk_dict(self): chunks=(2, 3), compressor=None, filters=None, - fill_value=None, + fill_value=np.NaN, order="C", ), ) @@ -86,6 +86,7 @@ def test_accessor_to_kerchunk_dict(self): "version": 1, "refs": { ".zgroup": '{"zarr_format":2}', + ".zattrs": "{}", "a/.zarray": '{"chunks":[2,3],"compressor":null,"dtype":" str: @classmethod def from_kerchunk_refs(cls, decoded_arr_refs_zarray) -> "ZArray": - # TODO should we be doing some type coercion on the 'fill_value' here? + # coerce type of fill_value as kerchunk can be inconsistent with this + fill_value = decoded_arr_refs_zarray["fill_value"] + if fill_value is None or fill_value == "NaN": + fill_value = np.NaN return ZArray( chunks=tuple(decoded_arr_refs_zarray["chunks"]), compressor=decoded_arr_refs_zarray["compressor"], dtype=np.dtype(decoded_arr_refs_zarray["dtype"]), - fill_value=decoded_arr_refs_zarray["fill_value"], + fill_value=fill_value, filters=decoded_arr_refs_zarray["filters"], order=decoded_arr_refs_zarray["order"], shape=tuple(decoded_arr_refs_zarray["shape"]), @@ -88,7 +91,12 @@ def from_kerchunk_refs(cls, decoded_arr_refs_zarray) -> "ZArray": def dict(self) -> dict[str, Any]: zarray_dict = dict(self) + zarray_dict["dtype"] = encode_dtype(zarray_dict["dtype"]) + + if zarray_dict["fill_value"] is np.NaN: + zarray_dict["fill_value"] = None + return zarray_dict def to_kerchunk_json(self) -> str: @@ -238,11 +246,16 @@ def metadata_from_zarr_json(filepath: Path) -> tuple[ZArray, list[str], dict]: chunk_shape = metadata["chunk_grid"]["configuration"]["chunk_shape"] + if metadata["fill_value"] is None: + fill_value = np.NaN + else: + fill_value = metadata["fill_value"] + zarray = ZArray( chunks=metadata["chunk_grid"]["configuration"]["chunk_shape"], compressor=metadata["codecs"], dtype=np.dtype(metadata["data_type"]), - fill_value=metadata["fill_value"], + fill_value=fill_value, filters=metadata.get("filters", None), order="C", shape=chunk_shape,