Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add open_virtual_mfdataset #349

Draft
wants to merge 16 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Reading
:toctree: generated/

open_virtual_dataset

open_virtual_mfdataset
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self: the docs and especially the readme should be rewritten to put this function front and center.


Serialization
-------------
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ test = [
"ruff",
"s3fs",
"scipy",
"lithops",
"virtualizarr[hdf_reader]"
]

Expand Down
2 changes: 1 addition & 1 deletion virtualizarr/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from virtualizarr.manifests import ChunkManifest, ManifestArray # type: ignore # noqa
from virtualizarr.accessor import VirtualiZarrDatasetAccessor # type: ignore # noqa
from virtualizarr.backend import open_virtual_dataset # noqa: F401
from virtualizarr.backend import open_virtual_dataset, open_virtual_mfdataset # noqa: F401

from importlib.metadata import version as _version

Expand Down
231 changes: 230 additions & 1 deletion virtualizarr/backend.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
import os
import warnings
from collections.abc import Iterable, Mapping
from enum import Enum, auto
from pathlib import Path
from typing import (
TYPE_CHECKING,
Any,
Callable,
Literal,
Optional,
Sequence,
cast,
)

from xarray import Dataset, Index
from xarray import DataArray, Dataset, Index, combine_by_coords
from xarray.backends.common import _find_absolute_paths
from xarray.core.combine import _infer_concat_order_from_positions, _nested_combine

from virtualizarr.manifests import ManifestArray
from virtualizarr.readers import (
Expand All @@ -22,6 +30,15 @@
from virtualizarr.readers.common import VirtualBackend
from virtualizarr.utils import _FsspecFSFromFilepath, check_for_collisions

if TYPE_CHECKING:
from xarray.core.types import (
CombineAttrsOptions,
CompatOptions,
JoinOptions,
NestedSequence,
)


# TODO add entrypoint to allow external libraries to add to this mapping
VIRTUAL_BACKENDS = {
"kerchunk": KerchunkVirtualBackend,
Expand Down Expand Up @@ -209,3 +226,215 @@ def open_virtual_dataset(
)

return vds


def open_virtual_mfdataset(
paths: str
| os.PathLike
| Sequence[str | os.PathLike]
| "NestedSequence[str | os.PathLike]",
concat_dim: (
str
| DataArray
| Index
| Sequence[str]
| Sequence[DataArray]
| Sequence[Index]
| None
) = None,
compat: "CompatOptions" = "no_conflicts",
preprocess: Callable[[Dataset], Dataset] | None = None,
data_vars: Literal["all", "minimal", "different"] | list[str] = "all",
coords="different",
combine: Literal["by_coords", "nested"] = "by_coords",
parallel: Literal["lithops", "dask", False] = False,
join: "JoinOptions" = "outer",
attrs_file: str | os.PathLike | None = None,
combine_attrs: "CombineAttrsOptions" = "override",
**kwargs,
) -> Dataset:
"""Open multiple files as a single virtual dataset.

If combine='by_coords' then the function ``combine_by_coords`` is used to combine
the datasets into one before returning the result, and if combine='nested' then
``combine_nested`` is used. The filepaths must be structured according to which
combining function is used, the details of which are given in the documentation for
``combine_by_coords`` and ``combine_nested``. By default ``combine='by_coords'``
will be used. Global attributes from the ``attrs_file`` are used
for the combined dataset.

Parameters
----------
paths
Same as in xarray.open_mfdataset
concat_dim
Same as in xarray.open_mfdataset
compat
Same as in xarray.open_mfdataset
preprocess
Same as in xarray.open_mfdataset
data_vars
Same as in xarray.open_mfdataset
coords
Same as in xarray.open_mfdataset
combine
Same as in xarray.open_mfdataset
parallel : 'dask', 'lithops', or False
Specify whether the open and preprocess steps of this function will be
performed in parallel using ``dask.delayed``, in parallel using ``lithops.map``, or in serial.
Default is False.
join
Same as in xarray.open_mfdataset
attrs_file
Same as in xarray.open_mfdataset
combine_attrs
Same as in xarray.open_mfdataset
**kwargs : optional
Additional arguments passed on to :py:func:`virtualizarr.open_virtual_dataset`. For an
overview of some of the possible options, see the documentation of
:py:func:`virtualizarr.open_virtual_dataset`.

Returns
-------
xarray.Dataset

Notes
-----
The results of opening each virtual dataset in parallel are sent back to the client process, so must not be too large.
"""

# TODO this is practically all just copied from xarray.open_mfdataset - an argument for writing a virtualizarr engine for xarray?

# TODO list kwargs passed to open_virtual_dataset explicitly?

paths = _find_absolute_paths(paths)

if not paths:
raise OSError("no files to open")

paths1d: list[str]
if combine == "nested":
if isinstance(concat_dim, str | DataArray) or concat_dim is None:
concat_dim = [concat_dim] # type: ignore[assignment]

# This creates a flat list which is easier to iterate over, whilst
# encoding the originally-supplied structure as "ids".
# The "ids" are not used at all if combine='by_coords`.
combined_ids_paths = _infer_concat_order_from_positions(paths)
ids, paths1d = (
list(combined_ids_paths.keys()),
list(combined_ids_paths.values()),
)
elif concat_dim is not None:
raise ValueError(
"When combine='by_coords', passing a value for `concat_dim` has no "
"effect. To manually combine along a specific dimension you should "
"instead specify combine='nested' along with a value for `concat_dim`.",
)
else:
paths1d = paths # type: ignore[assignment]

if parallel == "dask":
import dask

# wrap the open_dataset, getattr, and preprocess with delayed
open_ = dask.delayed(open_virtual_dataset)
getattr_ = dask.delayed(getattr)
if preprocess is not None:
preprocess = dask.delayed(preprocess)
elif parallel == "lithops":
import lithops
Comment on lines +345 to +346
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe all of this could also be useful upstream in xr.open_mfdataset


# TODO use RetryingFunctionExecutor instead?
# TODO what's the easiest way to pass the lithops config in?
fn_exec = lithops.FunctionExecutor()

# lithops doesn't have a delayed primitive
open_ = open_virtual_dataset
Comment on lines +352 to +353
Copy link
Member Author

@TomNicholas TomNicholas Dec 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the code would be more straightforward if the parallel primitive we used for lithops was the same as the one we used for dask.

# TODO I don't know how best to chain this with the getattr, or if that closing stuff is even necessary for virtual datasets
# getattr_ = getattr
elif parallel is not False:
raise ValueError(
f"{parallel} is an invalid option for the keyword argument ``parallel``"
)
else:
open_ = open_virtual_dataset
getattr_ = getattr

if parallel == "dask":
virtual_datasets = [open_(p, **kwargs) for p in paths1d]
closers = [getattr_(ds, "_close") for ds in virtual_datasets]
if preprocess is not None:
virtual_datasets = [preprocess(ds) for ds in virtual_datasets]

# calling compute here will return the datasets/file_objs lists,
# the underlying datasets will still be stored as dask arrays
virtual_datasets, closers = dask.compute(virtual_datasets, closers)
elif parallel == "lithops":

def generate_refs(path):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the equivalent of @thodson-usgs 's map_references function

# allows passing the open_virtual_dataset function to lithops without evaluating it
vds = open_(path, **kwargs)
# TODO perhaps we should just load the loadable_vars here and close before returning?
return vds

futures = fn_exec.map(generate_refs, paths1d)

# wait for all the serverless workers to finish, and send their resulting virtual datasets back to the client
# TODO do we need download_results?
completed_futures, _ = fn_exec.wait(futures, download_results=True)
virtual_datasets = completed_futures.get_result()
elif parallel is False:
virtual_datasets = [open_(p, **kwargs) for p in paths1d]
closers = [getattr_(ds, "_close") for ds in virtual_datasets]
if preprocess is not None:
virtual_datasets = [preprocess(ds) for ds in virtual_datasets]

# Combine all datasets, closing them in case of a ValueError
try:
if combine == "nested":
# Combined nested list by successive concat and merge operations
# along each dimension, using structure given by "ids"
combined_vds = _nested_combine(
virtual_datasets,
concat_dims=concat_dim,
compat=compat,
data_vars=data_vars,
coords=coords,
ids=ids,
join=join,
combine_attrs=combine_attrs,
)
elif combine == "by_coords":
# Redo ordering from coordinates, ignoring how they were ordered
# previously
combined_vds = combine_by_coords(
virtual_datasets,
compat=compat,
data_vars=data_vars,
coords=coords,
join=join,
combine_attrs=combine_attrs,
)
else:
raise ValueError(
f"{combine} is an invalid option for the keyword argument"
" ``combine``"
)
except ValueError:
for vds in virtual_datasets:
vds.close()
raise

# combined_vds.set_close(partial(_multi_file_closer, closers))

# read global attributes from the attrs_file or from the first dataset
if attrs_file is not None:
if isinstance(attrs_file, os.PathLike):
attrs_file = cast(str, os.fspath(attrs_file))
combined_vds.attrs = virtual_datasets[paths1d.index(attrs_file)].attrs

# TODO should we just immediately close everything?
# TODO We should have already read everything we're ever going to read into memory at this point

return combined_vds
1 change: 1 addition & 0 deletions virtualizarr/tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def _importorskip(
has_astropy, requires_astropy = _importorskip("astropy")
has_kerchunk, requires_kerchunk = _importorskip("kerchunk")
has_s3fs, requires_s3fs = _importorskip("s3fs")
has_lithops, requires_lithops = _importorskip("lithops")
has_scipy, requires_scipy = _importorskip("scipy")
has_tifffile, requires_tifffile = _importorskip("tifffile")
has_imagecodecs, requires_imagecodecs = _importorskip("imagecodecs")
Expand Down
Loading
Loading