From a48e8a4684fc8ee93d73847890aca1739e1f510a Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Sun, 15 Dec 2024 17:54:26 -0500 Subject: [PATCH 01/12] copy implementation from xarray --- virtualizarr/backend.py | 204 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 203 insertions(+), 1 deletion(-) diff --git a/virtualizarr/backend.py b/virtualizarr/backend.py index a8e3b66a..b743f202 100644 --- a/virtualizarr/backend.py +++ b/virtualizarr/backend.py @@ -1,13 +1,23 @@ +import os import warnings from collections.abc import Iterable, Mapping from enum import Enum, auto +from functools import partial from pathlib import Path from typing import ( + TYPE_CHECKING, Any, + Callable, + Literal, Optional, + Sequence, + cast, ) -from xarray import Dataset, Index +from xarray import DataArray, Dataset, Index, combine_by_coords +from xarray.backends.api import _multi_file_closer +from xarray.backends.common import _find_absolute_paths +from xarray.core.combine import _infer_concat_order_from_positions, _nested_combine from virtualizarr.manifests import ManifestArray from virtualizarr.readers import ( @@ -22,6 +32,15 @@ from virtualizarr.readers.common import VirtualBackend from virtualizarr.utils import _FsspecFSFromFilepath, check_for_collisions +if TYPE_CHECKING: + from xarray.core.types import ( + CombineAttrsOptions, + CompatOptions, + JoinOptions, + NestedSequence, + ) + + # TODO add entrypoint to allow external libraries to add to this mapping VIRTUAL_BACKENDS = { "kerchunk": KerchunkVirtualBackend, @@ -209,3 +228,186 @@ def open_virtual_dataset( ) return vds + + +def open_virtual_mfdataset( + paths: str | Sequence[str | os.PathLike] | NestedSequence[str | os.PathLike], + concat_dim: ( + str + | DataArray + | Index + | Sequence[str] + | Sequence[DataArray] + | Sequence[Index] + | None + ) = None, + compat: CompatOptions = "no_conflicts", + preprocess: Callable[[Dataset], Dataset] | None = None, + data_vars: Literal["all", "minimal", "different"] | list[str] = "all", + coords="different", + combine: Literal["by_coords", "nested"] = "by_coords", + parallel: Literal["lithops", "dask", False] = False, + join: JoinOptions = "outer", + attrs_file: str | os.PathLike | None = None, + combine_attrs: CombineAttrsOptions = "override", + **kwargs, +) -> Dataset: + """Open multiple files as a single virtual dataset + + If combine='by_coords' then the function ``combine_by_coords`` is used to combine + the datasets into one before returning the result, and if combine='nested' then + ``combine_nested`` is used. The filepaths must be structured according to which + combining function is used, the details of which are given in the documentation for + ``combine_by_coords`` and ``combine_nested``. By default ``combine='by_coords'`` + will be used. Global attributes from the ``attrs_file`` are used + for the combined dataset. + + Parameters + ---------- + paths + Same as in xarray.open_mfdataset + concat_dim + Same as in xarray.open_mfdataset + compat + Same as in xarray.open_mfdataset + preprocess + Same as in xarray.open_mfdataset + data_vars + Same as in xarray.open_mfdataset + coords + Same as in xarray.open_mfdataset + combine + Same as in xarray.open_mfdataset + parallel : 'dask', 'lithops', or False + Specify whether the open and preprocess steps of this function will be + performed in parallel using ``dask.delayed``, in parallel using ``lithops.map``, or in serial. + Default is False. + join + Same as in xarray.open_mfdataset + attrs_file + Same as in xarray.open_mfdataset + combine_attrs + Same as in xarray.open_mfdataset + **kwargs : optional + Additional arguments passed on to :py:func:`virtualizarr.open_virtual_dataset`. For an + overview of some of the possible options, see the documentation of + :py:func:`virtualizarr.open_virtual_dataset`. + + Returns + ------- + xarray.Dataset + + Notes + ----- + The results of opening each virtual dataset in parallel are sent back to the client process, so must not be too large. + """ + + # TODO this is practically all just copied from xarray.open_mfdataset - an argument for writing a virtualizarr engine for xarray? + + # TODO add options passed to open_virtual_dataset explicitly? + + paths = _find_absolute_paths(paths) + + if not paths: + raise OSError("no files to open") + + paths1d: list[str] + if combine == "nested": + if isinstance(concat_dim, str | DataArray) or concat_dim is None: + concat_dim = [concat_dim] # type: ignore[assignment] + + # This creates a flat list which is easier to iterate over, whilst + # encoding the originally-supplied structure as "ids". + # The "ids" are not used at all if combine='by_coords`. + combined_ids_paths = _infer_concat_order_from_positions(paths) + ids, paths1d = ( + list(combined_ids_paths.keys()), + list(combined_ids_paths.values()), + ) + elif concat_dim is not None: + raise ValueError( + "When combine='by_coords', passing a value for `concat_dim` has no " + "effect. To manually combine along a specific dimension you should " + "instead specify combine='nested' along with a value for `concat_dim`.", + ) + else: + paths1d = paths # type: ignore[assignment] + + if parallel == "dask": + import dask + + # wrap the open_dataset, getattr, and preprocess with delayed + open_ = dask.delayed(open_virtual_dataset) + getattr_ = dask.delayed(getattr) + if preprocess is not None: + preprocess = dask.delayed(preprocess) + elif parallel == "lithops": + raise NotImplementedError() + elif parallel is not False: + raise ValueError( + f"{parallel} is an invalid option for the keyword argument ``parallel``" + ) + else: + open_ = open_virtual_dataset + getattr_ = getattr + + datasets = [open_(p, **kwargs) for p in paths1d] + closers = [getattr_(ds, "_close") for ds in datasets] + if preprocess is not None: + datasets = [preprocess(ds) for ds in datasets] + + if parallel == "dask": + # calling compute here will return the datasets/file_objs lists, + # the underlying datasets will still be stored as dask arrays + datasets, closers = dask.compute(datasets, closers) + elif parallel == "lithops": + raise NotImplementedError() + + # Combine all datasets, closing them in case of a ValueError + try: + if combine == "nested": + # Combined nested list by successive concat and merge operations + # along each dimension, using structure given by "ids" + combined = _nested_combine( + datasets, + concat_dims=concat_dim, + compat=compat, + data_vars=data_vars, + coords=coords, + ids=ids, + join=join, + combine_attrs=combine_attrs, + ) + elif combine == "by_coords": + # Redo ordering from coordinates, ignoring how they were ordered + # previously + combined = combine_by_coords( + datasets, + compat=compat, + data_vars=data_vars, + coords=coords, + join=join, + combine_attrs=combine_attrs, + ) + else: + raise ValueError( + f"{combine} is an invalid option for the keyword argument" + " ``combine``" + ) + except ValueError: + for ds in datasets: + ds.close() + raise + + combined.set_close(partial(_multi_file_closer, closers)) + + # read global attributes from the attrs_file or from the first dataset + if attrs_file is not None: + if isinstance(attrs_file, os.PathLike): + attrs_file = cast(str, os.fspath(attrs_file)) + combined.attrs = datasets[paths1d.index(attrs_file)].attrs + + # TODO should we just immediately close everything? + # TODO We should have already read everything we're ever going to read into memory at this point + + return combined From 75c7da3c36d4e91c0516d95a1f8f31f48c9ad48f Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Sun, 15 Dec 2024 18:52:22 -0500 Subject: [PATCH 02/12] sketch idea for lithops parallelization --- virtualizarr/backend.py | 47 +++++++++++++++++++++++++++++++---------- 1 file changed, 36 insertions(+), 11 deletions(-) diff --git a/virtualizarr/backend.py b/virtualizarr/backend.py index b743f202..e86c3c16 100644 --- a/virtualizarr/backend.py +++ b/virtualizarr/backend.py @@ -342,7 +342,16 @@ def open_virtual_mfdataset( if preprocess is not None: preprocess = dask.delayed(preprocess) elif parallel == "lithops": - raise NotImplementedError() + import lithops + + # TODO use RetryingFunctionExecutor instead? + # TODO what's the easiest way to pass the lithops config in? + fn_exec = lithops.FunctionExecutor() + + # lithops doesn't have a delayed primitive + open_ = open_virtual_dataset + # TODO I don't know how best to chain this with the getattr, or if that closing stuff is even necessary for virtual datasets + # getattr_ = getattr elif parallel is not False: raise ValueError( f"{parallel} is an invalid option for the keyword argument ``parallel``" @@ -351,17 +360,33 @@ def open_virtual_mfdataset( open_ = open_virtual_dataset getattr_ = getattr - datasets = [open_(p, **kwargs) for p in paths1d] - closers = [getattr_(ds, "_close") for ds in datasets] - if preprocess is not None: - datasets = [preprocess(ds) for ds in datasets] - if parallel == "dask": + datasets = [open_(p, **kwargs) for p in paths1d] + closers = [getattr_(ds, "_close") for ds in datasets] + if preprocess is not None: + datasets = [preprocess(ds) for ds in datasets] + # calling compute here will return the datasets/file_objs lists, # the underlying datasets will still be stored as dask arrays datasets, closers = dask.compute(datasets, closers) elif parallel == "lithops": - raise NotImplementedError() + + def generate_refs(path): + # allows passing the open_virtual_dataset function to lithops without evaluating it + vds = open_(path, **kwargs) + # TODO perhaps we should just load the loadable_vars here and close before returning? + return vds + + futures = fn_exec.map(generate_refs, paths1d) + + # wait for all the serverless workers to finish, and send their resulting virtual datasets back to the client + completed_futures, _ = fn_exec.wait(futures, download_results=True) + virtual_datasets = [future.get_result() for future in completed_futures] + elif parallel is False: + datasets = [open_(p, **kwargs) for p in paths1d] + closers = [getattr_(ds, "_close") for ds in datasets] + if preprocess is not None: + datasets = [preprocess(ds) for ds in datasets] # Combine all datasets, closing them in case of a ValueError try: @@ -369,7 +394,7 @@ def open_virtual_mfdataset( # Combined nested list by successive concat and merge operations # along each dimension, using structure given by "ids" combined = _nested_combine( - datasets, + virtual_datasets, concat_dims=concat_dim, compat=compat, data_vars=data_vars, @@ -382,7 +407,7 @@ def open_virtual_mfdataset( # Redo ordering from coordinates, ignoring how they were ordered # previously combined = combine_by_coords( - datasets, + virtual_datasets, compat=compat, data_vars=data_vars, coords=coords, @@ -395,7 +420,7 @@ def open_virtual_mfdataset( " ``combine``" ) except ValueError: - for ds in datasets: + for ds in virtual_datasets: ds.close() raise @@ -405,7 +430,7 @@ def open_virtual_mfdataset( if attrs_file is not None: if isinstance(attrs_file, os.PathLike): attrs_file = cast(str, os.fspath(attrs_file)) - combined.attrs = datasets[paths1d.index(attrs_file)].attrs + combined.attrs = virtual_datasets[paths1d.index(attrs_file)].attrs # TODO should we just immediately close everything? # TODO We should have already read everything we're ever going to read into memory at this point From ce5a0969d330647d85b9529dd57c1dc4cb9368ef Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Mon, 16 Dec 2024 00:03:04 -0500 Subject: [PATCH 03/12] standardize naming of variables --- virtualizarr/backend.py | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/virtualizarr/backend.py b/virtualizarr/backend.py index e86c3c16..68c9aeb8 100644 --- a/virtualizarr/backend.py +++ b/virtualizarr/backend.py @@ -361,14 +361,14 @@ def open_virtual_mfdataset( getattr_ = getattr if parallel == "dask": - datasets = [open_(p, **kwargs) for p in paths1d] - closers = [getattr_(ds, "_close") for ds in datasets] + virtual_datasets = [open_(p, **kwargs) for p in paths1d] + closers = [getattr_(ds, "_close") for ds in virtual_datasets] if preprocess is not None: - datasets = [preprocess(ds) for ds in datasets] + virtual_datasets = [preprocess(ds) for ds in virtual_datasets] # calling compute here will return the datasets/file_objs lists, # the underlying datasets will still be stored as dask arrays - datasets, closers = dask.compute(datasets, closers) + virtual_datasets, closers = dask.compute(virtual_datasets, closers) elif parallel == "lithops": def generate_refs(path): @@ -383,17 +383,17 @@ def generate_refs(path): completed_futures, _ = fn_exec.wait(futures, download_results=True) virtual_datasets = [future.get_result() for future in completed_futures] elif parallel is False: - datasets = [open_(p, **kwargs) for p in paths1d] - closers = [getattr_(ds, "_close") for ds in datasets] + virtual_datasets = [open_(p, **kwargs) for p in paths1d] + closers = [getattr_(ds, "_close") for ds in virtual_datasets] if preprocess is not None: - datasets = [preprocess(ds) for ds in datasets] + virtual_datasets = [preprocess(ds) for ds in virtual_datasets] # Combine all datasets, closing them in case of a ValueError try: if combine == "nested": # Combined nested list by successive concat and merge operations # along each dimension, using structure given by "ids" - combined = _nested_combine( + combined_vds = _nested_combine( virtual_datasets, concat_dims=concat_dim, compat=compat, @@ -406,7 +406,7 @@ def generate_refs(path): elif combine == "by_coords": # Redo ordering from coordinates, ignoring how they were ordered # previously - combined = combine_by_coords( + combined_vds = combine_by_coords( virtual_datasets, compat=compat, data_vars=data_vars, @@ -420,19 +420,19 @@ def generate_refs(path): " ``combine``" ) except ValueError: - for ds in virtual_datasets: - ds.close() + for vds in virtual_datasets: + vds.close() raise - combined.set_close(partial(_multi_file_closer, closers)) + combined_vds.set_close(partial(_multi_file_closer, closers)) # read global attributes from the attrs_file or from the first dataset if attrs_file is not None: if isinstance(attrs_file, os.PathLike): attrs_file = cast(str, os.fspath(attrs_file)) - combined.attrs = virtual_datasets[paths1d.index(attrs_file)].attrs + combined_vds.attrs = virtual_datasets[paths1d.index(attrs_file)].attrs # TODO should we just immediately close everything? # TODO We should have already read everything we're ever going to read into memory at this point - return combined + return combined_vds From bcf1b70a34e55cfda76ffdc075a075dcb1fafa7e Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Mon, 16 Dec 2024 00:06:48 -0500 Subject: [PATCH 04/12] add to public API --- docs/api.rst | 2 +- virtualizarr/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/api.rst b/docs/api.rst index fef8f2f0..a8bee2c3 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -19,7 +19,7 @@ Reading :toctree: generated/ open_virtual_dataset - + open_virtual_mfdataset Serialization ------------- diff --git a/virtualizarr/__init__.py b/virtualizarr/__init__.py index bd70f834..9d7ef92e 100644 --- a/virtualizarr/__init__.py +++ b/virtualizarr/__init__.py @@ -1,6 +1,6 @@ from virtualizarr.manifests import ChunkManifest, ManifestArray # type: ignore # noqa from virtualizarr.accessor import VirtualiZarrDatasetAccessor # type: ignore # noqa -from virtualizarr.backend import open_virtual_dataset # noqa: F401 +from virtualizarr.backend import open_virtual_dataset, open_virtual_mfdataset # noqa: F401 from importlib.metadata import version as _version From 61f0f3236d63c8bd3023422d4d508a34d688574d Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Mon, 16 Dec 2024 00:09:28 -0500 Subject: [PATCH 05/12] fix errors caused by trying to import xarray types --- virtualizarr/backend.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/virtualizarr/backend.py b/virtualizarr/backend.py index 68c9aeb8..cefa21ea 100644 --- a/virtualizarr/backend.py +++ b/virtualizarr/backend.py @@ -231,7 +231,10 @@ def open_virtual_dataset( def open_virtual_mfdataset( - paths: str | Sequence[str | os.PathLike] | NestedSequence[str | os.PathLike], + paths: str + | os.PathLike + | Sequence[str | os.PathLike] + | "NestedSequence[str | os.PathLike]", concat_dim: ( str | DataArray @@ -241,15 +244,15 @@ def open_virtual_mfdataset( | Sequence[Index] | None ) = None, - compat: CompatOptions = "no_conflicts", + compat: "CompatOptions" = "no_conflicts", preprocess: Callable[[Dataset], Dataset] | None = None, data_vars: Literal["all", "minimal", "different"] | list[str] = "all", coords="different", combine: Literal["by_coords", "nested"] = "by_coords", parallel: Literal["lithops", "dask", False] = False, - join: JoinOptions = "outer", + join: "JoinOptions" = "outer", attrs_file: str | os.PathLike | None = None, - combine_attrs: CombineAttrsOptions = "override", + combine_attrs: "CombineAttrsOptions" = "override", **kwargs, ) -> Dataset: """Open multiple files as a single virtual dataset From 5317207d417adec3201efb7ed5e2153743229e81 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Mon, 16 Dec 2024 00:40:56 -0500 Subject: [PATCH 06/12] start writing tests --- conftest.py | 13 +++++++++++ virtualizarr/backend.py | 4 ++-- virtualizarr/tests/test_backend.py | 37 +++++++++++++++++++++++++++++- 3 files changed, 51 insertions(+), 3 deletions(-) diff --git a/conftest.py b/conftest.py index 0be0a89c..bc1e6d47 100644 --- a/conftest.py +++ b/conftest.py @@ -48,6 +48,19 @@ def netcdf4_file(tmpdir): return filepath +@pytest.fixture +def chunked_netcdf4_file(tmpdir): + # Set up example xarray dataset + ds = xr.tutorial.open_dataset("air_temperature") + + # Save it to disk as netCDF (in temporary directory) + filepath = f"{tmpdir}/air.nc" + ds.chunk(time=1460).to_netcdf(filepath, format="NETCDF4") + ds.close() + + return filepath + + @pytest.fixture def netcdf4_file_with_data_in_multiple_groups(tmpdir): filepath = str(tmpdir / "test.nc") diff --git a/virtualizarr/backend.py b/virtualizarr/backend.py index cefa21ea..d6fc337b 100644 --- a/virtualizarr/backend.py +++ b/virtualizarr/backend.py @@ -255,7 +255,7 @@ def open_virtual_mfdataset( combine_attrs: "CombineAttrsOptions" = "override", **kwargs, ) -> Dataset: - """Open multiple files as a single virtual dataset + """Open multiple files as a single virtual dataset. If combine='by_coords' then the function ``combine_by_coords`` is used to combine the datasets into one before returning the result, and if combine='nested' then @@ -307,7 +307,7 @@ def open_virtual_mfdataset( # TODO this is practically all just copied from xarray.open_mfdataset - an argument for writing a virtualizarr engine for xarray? - # TODO add options passed to open_virtual_dataset explicitly? + # TODO list kwargs passed to open_virtual_dataset explicitly? paths = _find_absolute_paths(paths) diff --git a/virtualizarr/tests/test_backend.py b/virtualizarr/tests/test_backend.py index 0c054a3b..ee1832ce 100644 --- a/virtualizarr/tests/test_backend.py +++ b/virtualizarr/tests/test_backend.py @@ -8,7 +8,7 @@ from xarray import Dataset, open_dataset from xarray.core.indexes import Index -from virtualizarr import open_virtual_dataset +from virtualizarr import open_virtual_dataset, open_virtual_mfdataset from virtualizarr.backend import FileType, automatically_determine_filetype from virtualizarr.manifests import ManifestArray from virtualizarr.readers import HDF5VirtualBackend @@ -440,6 +440,41 @@ def test_open_dataset_with_scalar(self, hdf5_scalar, tmpdir, hdf_backend): assert vds.scalar.attrs == {"scalar": "true"} +class TestOpenVirtualMFDataset: + def test_serial(self, netcdf4_files_factory, chunked_netcdf4_file): + filepath1, filepath2 = netcdf4_files_factory() + + combined_vds = open_virtual_mfdataset( + [filepath1, filepath2], + combine="nested", + concat_dim="time", + coords="minimal", + compat="override", + indexes={}, + ) + expected_vds = open_virtual_dataset(chunked_netcdf4_file, indexes={}) + print(combined_vds["air"].data) + print(expected_vds["air"].data) + xrt.assert_identical(combined_vds, expected_vds) + + combined_vds = open_virtual_mfdataset( + [filepath1, filepath2], combine="by_coords" + ) + expected_vds = open_virtual_dataset(chunked_netcdf4_file) + xrt.assert_identical(combined_vds, expected_vds) + + file_glob = filepath1.parent.with_suffix("air*.nc") + combined_vds = open_virtual_mfdataset(file_glob, combine="by_coords") + expected_vds = open_virtual_dataset(chunked_netcdf4_file) + xrt.assert_identical(combined_vds, expected_vds) + + # @requires_dask + def test_dask(self, netcdf4_files_factory): ... + + # @requires_lithops + def test_lithops(self, netcdf4_files_factory): ... + + @requires_kerchunk @pytest.mark.parametrize( "reference_format", From cd5432842aa5c805ea637a55e975f0945ee1c24c Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Tue, 17 Dec 2024 10:32:48 -0500 Subject: [PATCH 07/12] passing test for combining in serial --- virtualizarr/tests/test_backend.py | 29 ++++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/virtualizarr/tests/test_backend.py b/virtualizarr/tests/test_backend.py index ee1832ce..bef3e638 100644 --- a/virtualizarr/tests/test_backend.py +++ b/virtualizarr/tests/test_backend.py @@ -1,4 +1,5 @@ from collections.abc import Mapping +from pathlib import Path from unittest.mock import patch import numpy as np @@ -441,9 +442,10 @@ def test_open_dataset_with_scalar(self, hdf5_scalar, tmpdir, hdf_backend): class TestOpenVirtualMFDataset: - def test_serial(self, netcdf4_files_factory, chunked_netcdf4_file): + def test_serial(self, netcdf4_files_factory): filepath1, filepath2 = netcdf4_files_factory() + # test combine nested without in-memory indexes combined_vds = open_virtual_mfdataset( [filepath1, filepath2], combine="nested", @@ -452,20 +454,29 @@ def test_serial(self, netcdf4_files_factory, chunked_netcdf4_file): compat="override", indexes={}, ) - expected_vds = open_virtual_dataset(chunked_netcdf4_file, indexes={}) - print(combined_vds["air"].data) - print(expected_vds["air"].data) + vds1 = open_virtual_dataset(filepath1, indexes={}) + vds2 = open_virtual_dataset(filepath2, indexes={}) + expected_vds = xr.concat( + [vds1, vds2], dim="time", coords="minimal", compat="override" + ) xrt.assert_identical(combined_vds, expected_vds) + # test combine by coords using in-memory indexes combined_vds = open_virtual_mfdataset( - [filepath1, filepath2], combine="by_coords" + [filepath1, filepath2], combine="by_coords", loadable_variables=["time"] + ) + vds1 = open_virtual_dataset(filepath1, loadable_variables=["time"]) + vds2 = open_virtual_dataset(filepath2, loadable_variables=["time"]) + expected_vds = xr.concat( + [vds1, vds2], dim="time", coords="minimal", compat="override" ) - expected_vds = open_virtual_dataset(chunked_netcdf4_file) xrt.assert_identical(combined_vds, expected_vds) - file_glob = filepath1.parent.with_suffix("air*.nc") - combined_vds = open_virtual_mfdataset(file_glob, combine="by_coords") - expected_vds = open_virtual_dataset(chunked_netcdf4_file) + # test combine by coords again using in-memory indexes but for a glob + file_glob = Path(filepath1).parent.glob("air*.nc") + combined_vds = open_virtual_mfdataset( + file_glob, combine="by_coords", loadable_variables=["time"] + ) xrt.assert_identical(combined_vds, expected_vds) # @requires_dask From c229c061902c16e0667ea1d26c82068ee3c618ac Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Tue, 17 Dec 2024 11:06:04 -0500 Subject: [PATCH 08/12] requires_kerchunk --- virtualizarr/tests/test_backend.py | 1 + 1 file changed, 1 insertion(+) diff --git a/virtualizarr/tests/test_backend.py b/virtualizarr/tests/test_backend.py index f17b84db..aff5cc87 100644 --- a/virtualizarr/tests/test_backend.py +++ b/virtualizarr/tests/test_backend.py @@ -442,6 +442,7 @@ def test_open_dataset_with_scalar(self, hdf5_scalar, hdf_backend): class TestOpenVirtualMFDataset: + @requires_kerchunk def test_serial(self, netcdf4_files_factory): filepath1, filepath2 = netcdf4_files_factory() From f296ef988a39c696a04493c0dfa39396cb2311e4 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Tue, 17 Dec 2024 11:23:51 -0500 Subject: [PATCH 09/12] test for lithops with default LocalHost executor --- virtualizarr/backend.py | 7 ++--- virtualizarr/tests/test_backend.py | 44 +++++++++++++++++++++++++++++- 2 files changed, 46 insertions(+), 5 deletions(-) diff --git a/virtualizarr/backend.py b/virtualizarr/backend.py index d6fc337b..09640b24 100644 --- a/virtualizarr/backend.py +++ b/virtualizarr/backend.py @@ -2,7 +2,6 @@ import warnings from collections.abc import Iterable, Mapping from enum import Enum, auto -from functools import partial from pathlib import Path from typing import ( TYPE_CHECKING, @@ -15,7 +14,6 @@ ) from xarray import DataArray, Dataset, Index, combine_by_coords -from xarray.backends.api import _multi_file_closer from xarray.backends.common import _find_absolute_paths from xarray.core.combine import _infer_concat_order_from_positions, _nested_combine @@ -383,8 +381,9 @@ def generate_refs(path): futures = fn_exec.map(generate_refs, paths1d) # wait for all the serverless workers to finish, and send their resulting virtual datasets back to the client + # TODO do we need download_results? completed_futures, _ = fn_exec.wait(futures, download_results=True) - virtual_datasets = [future.get_result() for future in completed_futures] + virtual_datasets = completed_futures.get_result() elif parallel is False: virtual_datasets = [open_(p, **kwargs) for p in paths1d] closers = [getattr_(ds, "_close") for ds in virtual_datasets] @@ -427,7 +426,7 @@ def generate_refs(path): vds.close() raise - combined_vds.set_close(partial(_multi_file_closer, closers)) + # combined_vds.set_close(partial(_multi_file_closer, closers)) # read global attributes from the attrs_file or from the first dataset if attrs_file is not None: diff --git a/virtualizarr/tests/test_backend.py b/virtualizarr/tests/test_backend.py index aff5cc87..9c0b3f3e 100644 --- a/virtualizarr/tests/test_backend.py +++ b/virtualizarr/tests/test_backend.py @@ -484,4 +484,46 @@ def test_serial(self, netcdf4_files_factory): def test_dask(self, netcdf4_files_factory): ... # @requires_lithops - def test_lithops(self, netcdf4_files_factory): ... + def test_lithops(self, netcdf4_files_factory): + filepath1, filepath2 = netcdf4_files_factory() + + # test combine nested without in-memory indexes + combined_vds = open_virtual_mfdataset( + [filepath1, filepath2], + combine="nested", + concat_dim="time", + coords="minimal", + compat="override", + indexes={}, + parallel="lithops", + ) + vds1 = open_virtual_dataset(filepath1, indexes={}) + vds2 = open_virtual_dataset(filepath2, indexes={}) + expected_vds = xr.concat( + [vds1, vds2], dim="time", coords="minimal", compat="override" + ) + # xrt.assert_identical(combined_vds, expected_vds) + + # test combine by coords using in-memory indexes + combined_vds = open_virtual_mfdataset( + [filepath1, filepath2], + combine="by_coords", + loadable_variables=["time"], + parallel="lithops", + ) + vds1 = open_virtual_dataset(filepath1, loadable_variables=["time"]) + vds2 = open_virtual_dataset(filepath2, loadable_variables=["time"]) + expected_vds = xr.concat( + [vds1, vds2], dim="time", coords="minimal", compat="override" + ) + xrt.assert_identical(combined_vds, expected_vds) + + # test combine by coords again using in-memory indexes but for a glob + file_glob = Path(filepath1).parent.glob("air*.nc") + combined_vds = open_virtual_mfdataset( + file_glob, + combine="by_coords", + loadable_variables=["time"], + parallel="lithops", + ) + xrt.assert_identical(combined_vds, expected_vds) From 542f063aa50c6e2f1087a0aeb743d9fda20393a2 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Tue, 17 Dec 2024 11:47:17 -0500 Subject: [PATCH 10/12] notes on confusing AssertionError --- virtualizarr/tests/test_backend.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/virtualizarr/tests/test_backend.py b/virtualizarr/tests/test_backend.py index 9c0b3f3e..6dd0e828 100644 --- a/virtualizarr/tests/test_backend.py +++ b/virtualizarr/tests/test_backend.py @@ -485,6 +485,8 @@ def test_dask(self, netcdf4_files_factory): ... # @requires_lithops def test_lithops(self, netcdf4_files_factory): + # by default this will use the lithops LocalHost executor + filepath1, filepath2 = netcdf4_files_factory() # test combine nested without in-memory indexes @@ -502,6 +504,26 @@ def test_lithops(self, netcdf4_files_factory): expected_vds = xr.concat( [vds1, vds2], dim="time", coords="minimal", compat="override" ) + + print(combined_vds) + print(expected_vds) + print(combined_vds.indexes) + print(combined_vds.indexes) + print(combined_vds["lat"].attrs) + print(expected_vds["lat"].attrs) + print(combined_vds["lat"].encoding) + print(expected_vds["lat"].encoding) + print(combined_vds["lat"].data) + print(expected_vds["lat"].data) + print(combined_vds["lat"].data.zarray) + print(expected_vds["lat"].data.zarray) + print(combined_vds["lat"].data.manifest.dict()) + print(expected_vds["lat"].data.manifest.dict()) + + # TODO this assertion unintentially triggers loading, see issue #354 + # xrt.assert_identical(combined_vds.coords.variables['lat'], expected_vds.coords.variables['lat']) + + # TODO I have no idea why this assertion fails for all the coords - everything about the coords looks identical # xrt.assert_identical(combined_vds, expected_vds) # test combine by coords using in-memory indexes From a013b2cef35114ae7ee15b91da9f3e6043a00652 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Tue, 17 Dec 2024 11:56:45 -0500 Subject: [PATCH 11/12] ensure lithops is installed --- pyproject.toml | 1 + virtualizarr/tests/__init__.py | 1 + virtualizarr/tests/test_backend.py | 6 ++++-- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index ecda9a16..38d622d6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -59,6 +59,7 @@ test = [ "ruff", "s3fs", "scipy", + "lithops", "virtualizarr[hdf_reader]" ] diff --git a/virtualizarr/tests/__init__.py b/virtualizarr/tests/__init__.py index 658cf640..ecc0ab43 100644 --- a/virtualizarr/tests/__init__.py +++ b/virtualizarr/tests/__init__.py @@ -35,6 +35,7 @@ def _importorskip( has_astropy, requires_astropy = _importorskip("astropy") has_kerchunk, requires_kerchunk = _importorskip("kerchunk") has_s3fs, requires_s3fs = _importorskip("s3fs") +has_lithops, requires_lithops = _importorskip("lithops") has_scipy, requires_scipy = _importorskip("scipy") has_tifffile, requires_tifffile = _importorskip("tifffile") has_imagecodecs, requires_imagecodecs = _importorskip("imagecodecs") diff --git a/virtualizarr/tests/test_backend.py b/virtualizarr/tests/test_backend.py index 6dd0e828..3ffddff1 100644 --- a/virtualizarr/tests/test_backend.py +++ b/virtualizarr/tests/test_backend.py @@ -18,6 +18,7 @@ has_astropy, network, requires_kerchunk, + requires_lithops, requires_s3fs, requires_scipy, ) @@ -441,8 +442,9 @@ def test_open_dataset_with_scalar(self, hdf5_scalar, hdf_backend): assert vds.scalar.attrs == {"scalar": "true"} +# TODO consolidate these by parameterizing over parallel kwarg once they all work +@requires_kerchunk class TestOpenVirtualMFDataset: - @requires_kerchunk def test_serial(self, netcdf4_files_factory): filepath1, filepath2 = netcdf4_files_factory() @@ -483,7 +485,7 @@ def test_serial(self, netcdf4_files_factory): # @requires_dask def test_dask(self, netcdf4_files_factory): ... - # @requires_lithops + @requires_lithops def test_lithops(self, netcdf4_files_factory): # by default this will use the lithops LocalHost executor From f5123cf11e88e5b361382fecc031d1aeb656587e Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Tue, 17 Dec 2024 11:58:25 -0500 Subject: [PATCH 12/12] remove uneeded fixture --- conftest.py | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/conftest.py b/conftest.py index bc1e6d47..0be0a89c 100644 --- a/conftest.py +++ b/conftest.py @@ -48,19 +48,6 @@ def netcdf4_file(tmpdir): return filepath -@pytest.fixture -def chunked_netcdf4_file(tmpdir): - # Set up example xarray dataset - ds = xr.tutorial.open_dataset("air_temperature") - - # Save it to disk as netCDF (in temporary directory) - filepath = f"{tmpdir}/air.nc" - ds.chunk(time=1460).to_netcdf(filepath, format="NETCDF4") - ds.close() - - return filepath - - @pytest.fixture def netcdf4_file_with_data_in_multiple_groups(tmpdir): filepath = str(tmpdir / "test.nc")