-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add open_virtual_mfdataset #349
base: main
Are you sure you want to change the base?
Changes from 2 commits
a48e8a4
75c7da3
ce5a096
bcf1b70
61f0f32
5317207
cd54328
323904c
c229c06
f296ef9
542f063
a013b2c
f5123cf
f134644
a2c64d0
86f2daf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
@@ -1,13 +1,23 @@ | ||||
import os | ||||
import warnings | ||||
from collections.abc import Iterable, Mapping | ||||
from enum import Enum, auto | ||||
from functools import partial | ||||
from pathlib import Path | ||||
from typing import ( | ||||
TYPE_CHECKING, | ||||
Any, | ||||
Callable, | ||||
Literal, | ||||
Optional, | ||||
Sequence, | ||||
cast, | ||||
) | ||||
|
||||
from xarray import Dataset, Index | ||||
from xarray import DataArray, Dataset, Index, combine_by_coords | ||||
from xarray.backends.api import _multi_file_closer | ||||
from xarray.backends.common import _find_absolute_paths | ||||
from xarray.core.combine import _infer_concat_order_from_positions, _nested_combine | ||||
|
||||
from virtualizarr.manifests import ManifestArray | ||||
from virtualizarr.readers import ( | ||||
|
@@ -22,6 +32,15 @@ | |||
from virtualizarr.readers.common import VirtualBackend | ||||
from virtualizarr.utils import _FsspecFSFromFilepath, check_for_collisions | ||||
|
||||
if TYPE_CHECKING: | ||||
from xarray.core.types import ( | ||||
CombineAttrsOptions, | ||||
CompatOptions, | ||||
JoinOptions, | ||||
NestedSequence, | ||||
) | ||||
|
||||
|
||||
# TODO add entrypoint to allow external libraries to add to this mapping | ||||
VIRTUAL_BACKENDS = { | ||||
"kerchunk": KerchunkVirtualBackend, | ||||
|
@@ -209,3 +228,211 @@ def open_virtual_dataset( | |||
) | ||||
|
||||
return vds | ||||
|
||||
|
||||
def open_virtual_mfdataset( | ||||
paths: str | Sequence[str | os.PathLike] | NestedSequence[str | os.PathLike], | ||||
concat_dim: ( | ||||
str | ||||
| DataArray | ||||
| Index | ||||
| Sequence[str] | ||||
| Sequence[DataArray] | ||||
| Sequence[Index] | ||||
| None | ||||
) = None, | ||||
compat: CompatOptions = "no_conflicts", | ||||
preprocess: Callable[[Dataset], Dataset] | None = None, | ||||
data_vars: Literal["all", "minimal", "different"] | list[str] = "all", | ||||
coords="different", | ||||
combine: Literal["by_coords", "nested"] = "by_coords", | ||||
parallel: Literal["lithops", "dask", False] = False, | ||||
join: JoinOptions = "outer", | ||||
attrs_file: str | os.PathLike | None = None, | ||||
combine_attrs: CombineAttrsOptions = "override", | ||||
**kwargs, | ||||
) -> Dataset: | ||||
"""Open multiple files as a single virtual dataset | ||||
|
||||
If combine='by_coords' then the function ``combine_by_coords`` is used to combine | ||||
the datasets into one before returning the result, and if combine='nested' then | ||||
``combine_nested`` is used. The filepaths must be structured according to which | ||||
combining function is used, the details of which are given in the documentation for | ||||
``combine_by_coords`` and ``combine_nested``. By default ``combine='by_coords'`` | ||||
will be used. Global attributes from the ``attrs_file`` are used | ||||
for the combined dataset. | ||||
|
||||
Parameters | ||||
---------- | ||||
paths | ||||
Same as in xarray.open_mfdataset | ||||
concat_dim | ||||
Same as in xarray.open_mfdataset | ||||
compat | ||||
Same as in xarray.open_mfdataset | ||||
preprocess | ||||
Same as in xarray.open_mfdataset | ||||
data_vars | ||||
Same as in xarray.open_mfdataset | ||||
coords | ||||
Same as in xarray.open_mfdataset | ||||
combine | ||||
Same as in xarray.open_mfdataset | ||||
parallel : 'dask', 'lithops', or False | ||||
Specify whether the open and preprocess steps of this function will be | ||||
performed in parallel using ``dask.delayed``, in parallel using ``lithops.map``, or in serial. | ||||
Default is False. | ||||
join | ||||
Same as in xarray.open_mfdataset | ||||
attrs_file | ||||
Same as in xarray.open_mfdataset | ||||
combine_attrs | ||||
Same as in xarray.open_mfdataset | ||||
**kwargs : optional | ||||
Additional arguments passed on to :py:func:`virtualizarr.open_virtual_dataset`. For an | ||||
overview of some of the possible options, see the documentation of | ||||
:py:func:`virtualizarr.open_virtual_dataset`. | ||||
|
||||
Returns | ||||
------- | ||||
xarray.Dataset | ||||
|
||||
Notes | ||||
----- | ||||
The results of opening each virtual dataset in parallel are sent back to the client process, so must not be too large. | ||||
""" | ||||
|
||||
# TODO this is practically all just copied from xarray.open_mfdataset - an argument for writing a virtualizarr engine for xarray? | ||||
|
||||
# TODO add options passed to open_virtual_dataset explicitly? | ||||
|
||||
paths = _find_absolute_paths(paths) | ||||
|
||||
if not paths: | ||||
raise OSError("no files to open") | ||||
|
||||
paths1d: list[str] | ||||
if combine == "nested": | ||||
if isinstance(concat_dim, str | DataArray) or concat_dim is None: | ||||
concat_dim = [concat_dim] # type: ignore[assignment] | ||||
|
||||
# This creates a flat list which is easier to iterate over, whilst | ||||
# encoding the originally-supplied structure as "ids". | ||||
# The "ids" are not used at all if combine='by_coords`. | ||||
combined_ids_paths = _infer_concat_order_from_positions(paths) | ||||
ids, paths1d = ( | ||||
list(combined_ids_paths.keys()), | ||||
list(combined_ids_paths.values()), | ||||
) | ||||
elif concat_dim is not None: | ||||
raise ValueError( | ||||
"When combine='by_coords', passing a value for `concat_dim` has no " | ||||
"effect. To manually combine along a specific dimension you should " | ||||
"instead specify combine='nested' along with a value for `concat_dim`.", | ||||
) | ||||
else: | ||||
paths1d = paths # type: ignore[assignment] | ||||
|
||||
if parallel == "dask": | ||||
import dask | ||||
|
||||
# wrap the open_dataset, getattr, and preprocess with delayed | ||||
open_ = dask.delayed(open_virtual_dataset) | ||||
getattr_ = dask.delayed(getattr) | ||||
if preprocess is not None: | ||||
preprocess = dask.delayed(preprocess) | ||||
elif parallel == "lithops": | ||||
import lithops | ||||
Comment on lines
+345
to
+346
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe all of this could also be useful upstream in |
||||
|
||||
# TODO use RetryingFunctionExecutor instead? | ||||
# TODO what's the easiest way to pass the lithops config in? | ||||
fn_exec = lithops.FunctionExecutor() | ||||
|
||||
# lithops doesn't have a delayed primitive | ||||
open_ = open_virtual_dataset | ||||
Comment on lines
+352
to
+353
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the code would be more straightforward if the parallel primitive we used for lithops was the same as the one we used for dask. |
||||
# TODO I don't know how best to chain this with the getattr, or if that closing stuff is even necessary for virtual datasets | ||||
# getattr_ = getattr | ||||
elif parallel is not False: | ||||
raise ValueError( | ||||
f"{parallel} is an invalid option for the keyword argument ``parallel``" | ||||
) | ||||
else: | ||||
open_ = open_virtual_dataset | ||||
getattr_ = getattr | ||||
|
||||
if parallel == "dask": | ||||
datasets = [open_(p, **kwargs) for p in paths1d] | ||||
closers = [getattr_(ds, "_close") for ds in datasets] | ||||
if preprocess is not None: | ||||
datasets = [preprocess(ds) for ds in datasets] | ||||
|
||||
# calling compute here will return the datasets/file_objs lists, | ||||
# the underlying datasets will still be stored as dask arrays | ||||
datasets, closers = dask.compute(datasets, closers) | ||||
elif parallel == "lithops": | ||||
|
||||
def generate_refs(path): | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the equivalent of @thodson-usgs 's
|
||||
# allows passing the open_virtual_dataset function to lithops without evaluating it | ||||
vds = open_(path, **kwargs) | ||||
# TODO perhaps we should just load the loadable_vars here and close before returning? | ||||
return vds | ||||
|
||||
futures = fn_exec.map(generate_refs, paths1d) | ||||
|
||||
# wait for all the serverless workers to finish, and send their resulting virtual datasets back to the client | ||||
completed_futures, _ = fn_exec.wait(futures, download_results=True) | ||||
virtual_datasets = [future.get_result() for future in completed_futures] | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IIUC this will cause every serverless worker to send a small virtual dataset back to the client process over the internet somehow |
||||
elif parallel is False: | ||||
datasets = [open_(p, **kwargs) for p in paths1d] | ||||
closers = [getattr_(ds, "_close") for ds in datasets] | ||||
if preprocess is not None: | ||||
datasets = [preprocess(ds) for ds in datasets] | ||||
|
||||
# Combine all datasets, closing them in case of a ValueError | ||||
try: | ||||
if combine == "nested": | ||||
# Combined nested list by successive concat and merge operations | ||||
# along each dimension, using structure given by "ids" | ||||
combined = _nested_combine( | ||||
virtual_datasets, | ||||
concat_dims=concat_dim, | ||||
compat=compat, | ||||
data_vars=data_vars, | ||||
coords=coords, | ||||
ids=ids, | ||||
join=join, | ||||
combine_attrs=combine_attrs, | ||||
) | ||||
elif combine == "by_coords": | ||||
# Redo ordering from coordinates, ignoring how they were ordered | ||||
# previously | ||||
combined = combine_by_coords( | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is only going to work if we have used |
||||
virtual_datasets, | ||||
compat=compat, | ||||
data_vars=data_vars, | ||||
coords=coords, | ||||
join=join, | ||||
combine_attrs=combine_attrs, | ||||
) | ||||
else: | ||||
raise ValueError( | ||||
f"{combine} is an invalid option for the keyword argument" | ||||
" ``combine``" | ||||
) | ||||
except ValueError: | ||||
for ds in virtual_datasets: | ||||
ds.close() | ||||
raise | ||||
|
||||
combined.set_close(partial(_multi_file_closer, closers)) | ||||
|
||||
# read global attributes from the attrs_file or from the first dataset | ||||
if attrs_file is not None: | ||||
if isinstance(attrs_file, os.PathLike): | ||||
attrs_file = cast(str, os.fspath(attrs_file)) | ||||
combined.attrs = virtual_datasets[paths1d.index(attrs_file)].attrs | ||||
|
||||
# TODO should we just immediately close everything? | ||||
# TODO We should have already read everything we're ever going to read into memory at this point | ||||
|
||||
return combined |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like importing these deep xarray internals like this (though
_infer_concat_order_from_positions
and_nested_combine
haven't changed since I wrote them 6 years ago), but the only alternative would be to make a general virtualizarr backend engine for xarray (see #35).