Skip to content

Commit

Permalink
Load selected variables instead of making them virtual (#69)
Browse files Browse the repository at this point in the history
* load selected variables instead of making them virtual

* test

* fix test by explicitly handling encouters of IndexVariable objects

* nits

* load_variables->loadable_variables

* documentation
  • Loading branch information
TomNicholas authored Apr 5, 2024
1 parent 817f63b commit 6fef91c
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 27 deletions.
34 changes: 34 additions & 0 deletions docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,36 @@ TODO: Reinstate this part of the docs once [GH issue #18](https://github.com/Tom

TODO: Use preprocess to create a new index from the metadata

## Loading variables

Whilst the values of virtual variables (i.e. those backed by `ManifestArray` objects) cannot be loaded into memory, you do have the option of opening specific variables from the file as loadable lazy numpy/dask arrays, just like `xr.open_dataset` normally returns. These variables are specified using the `loadable_variables` argument:

```python
vds = open_virtual_dataset('air.nc', loadable_variables=['air', 'time'])
```
```python
<xarray.Dataset> Size: 31MB
Dimensions: (time: 2920, lat: 25, lon: 53)
Coordinates:
lat (lat) float32 100B ManifestArray<shape=(25,), dtype=float32, chu...
lon (lon) float32 212B ManifestArray<shape=(53,), dtype=float32, chu...
* time (time) datetime64[ns] 23kB 2013-01-01 ... 2014-12-31T18:00:00
Data variables:
air (time, lat, lon) float64 31MB ...
Attributes:
Conventions: COARDS
description: Data is from NMC initialized reanalysis\n(4x/day). These a...
platform: Model
references: http://www.esrl.noaa.gov/psd/data/gridded/data.ncep.reanaly...
title: 4x daily NMC reanalysis (1948)
```
You can see that the dataset contains a mixture of virtual variables backed by `ManifestArray` objects, and loadable variables backed by (lazy) numpy arrays.

Loading variables can be useful in a few scenarios:
1. You need to look at the actual values of a muilti-dimensional variable in order to decide what to do next,
2. Storing a variable on-disk as a set of references would be inefficient, e.g. because it's a very small array (saving the values like this is similar to kerchunk's concept of "inlining" data),
3. The variable has encoding, and the simplest way to decode it correctly is to let xarray's standard decoding machinery load it into memory and apply the decoding.

## Writing virtual stores to disk

Once we've combined references to all the chunks of all our legacy files into one virtual xarray dataset, we still need to write these references out to disk so that they can be read by our analysis code later.
Expand All @@ -302,6 +332,10 @@ mapper = fs.get_mapper("")
combined_ds = xr.open_dataset(mapper, engine="kerchunk")
```

```{note}
Currently you can only serialize virtual variables backed by `ManifestArray` objects to kerchunk reference files, not real in-memory numpy-backed variables.
```

### Writing as Zarr

TODO: Write out references as a Zarr v3 store following the [Chunk Manifest ZEP](https://github.com/zarr-developers/zarr-specs/issues/287), see [PR #45](https://github.com/TomNicholas/VirtualiZarr/pull/45)
Expand Down
18 changes: 18 additions & 0 deletions virtualizarr/tests/test_xarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import numpy as np
import xarray as xr
import xarray.testing as xrt
from xarray.core.indexes import Index

from virtualizarr import open_virtual_dataset
Expand Down Expand Up @@ -268,3 +269,20 @@ def test_combine_by_coords(self, netcdf4_files):
)

assert combined_vds.xindexes["time"].to_pandas_index().is_monotonic_increasing


class TestLoadVirtualDataset:
def test_loadable_variables(self, netcdf4_file):
vars_to_load = ['air', 'time']
vds = open_virtual_dataset(netcdf4_file, loadable_variables=vars_to_load)

for name in vds.variables:
if name in vars_to_load:
assert isinstance(vds[name].data, np.ndarray), name
else:
assert isinstance(vds[name].data, ManifestArray), name

full_ds = xr.open_dataset(netcdf4_file)
for name in full_ds.variables:
if name in vars_to_load:
xrt.assert_identical(vds.variables[name], full_ds.variables[name])
135 changes: 108 additions & 27 deletions virtualizarr/xarray.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from typing import List, Literal, Mapping, Optional, Union, overload
from typing import List, Literal, Mapping, Optional, Union, overload, MutableMapping, Iterable

import ujson # type: ignore
import xarray as xr
from xarray import register_dataset_accessor
from xarray.backends import BackendArray
from xarray.core.indexes import Index
from xarray.core.indexes import Index, PandasIndex
from xarray.core.variable import IndexVariable

import virtualizarr.kerchunk as kerchunk
from virtualizarr.kerchunk import KerchunkStoreRefs, FileType
Expand All @@ -20,7 +21,8 @@ class ManifestBackendArray(ManifestArray, BackendArray):
def open_virtual_dataset(
filepath: str,
filetype: Optional[FileType] = None,
drop_variables: Optional[List[str]] = None,
drop_variables: Optional[Iterable[str]] = None,
loadable_variables: Optional[Iterable[str]] = None,
indexes: Optional[Mapping[str, Index]] = None,
virtual_array_class=ManifestArray,
) -> xr.Dataset:
Expand All @@ -41,49 +43,99 @@ def open_virtual_dataset(
If not provided will attempt to automatically infer the correct filetype from the the filepath's extension.
drop_variables: list[str], default is None
Variables in the file to drop before returning.
loadable_variables: list[str], default is None
Variables in the file to open as lazy numpy/dask arrays instead of instances of virtual_array_class.
Default is to open all variables as virtual arrays (i.e. ManifestArray).
indexes : Mapping[str, Index], default is None
Indexes to use on the returned xarray Dataset.
Default is None, which will read any 1D coordinate data to create in-memory Pandas indexes.
To avoid creating any indexes, pass indexes={}.
virtual_array_class
Virtual array class to use to represent the references to the chunks in each on-disk array.
Currently can only be ManifestArray, but once VirtualZarrArray is implemented the default should be changed to that.
Returns
-------
vds
An xarray Dataset containing instances of virtual_array_cls for each variable, or normal lazily indexed arrays for each variable in loadable_variables.
"""

if drop_variables is None:
drop_variables = []
elif isinstance(drop_variables, str):
drop_variables = [drop_variables]
else:
drop_variables = list(drop_variables)
if loadable_variables is None:
loadable_variables = []
elif isinstance(loadable_variables, str):
loadable_variables = [loadable_variables]
else:
loadable_variables = list(loadable_variables)
common = set(drop_variables).intersection(set(loadable_variables))
if common:
raise ValueError(f"Cannot both load and drop variables {common}")

# this is the only place we actually always need to use kerchunk directly
# TODO avoid even reading byte ranges for variables that will be dropped later anyway?
vds_refs = kerchunk.read_kerchunk_references_from_file(
filepath=filepath,
filetype=filetype,
)
virtual_vars = virtual_vars_from_kerchunk_refs(
vds_refs,
drop_variables=drop_variables + loadable_variables,
virtual_array_class=virtual_array_class,
)
ds_attrs = kerchunk.fully_decode_arr_refs(vds_refs["refs"]).get(".zattrs", {})

if indexes is None:
# add default indexes by reading data from file
if indexes is None or len(loadable_variables) > 0:
# TODO we are reading a bunch of stuff we know we won't need here, e.g. all of the data variables...
# TODO it would also be nice if we could somehow consolidate this with the reading of the kerchunk references
ds = xr.open_dataset(filepath)
indexes = ds.xindexes
ds.close()
# TODO really we probably want a dedicated xarray backend that iterates over all variables only once
ds = xr.open_dataset(filepath, drop_variables=drop_variables)

if indexes is None:
# add default indexes by reading data from file
indexes = {name: index for name, index in ds.xindexes.items()}
elif indexes != {}:
# TODO allow manual specification of index objects
raise NotImplementedError()
else:
indexes = dict(**indexes) # for type hinting: to allow mutation

vds = dataset_from_kerchunk_refs(
vds_refs,
drop_variables=drop_variables,
virtual_array_class=virtual_array_class,
indexes=indexes,
loadable_vars = {name: var for name, var in ds.variables.items() if name in loadable_variables}

# if we only read the indexes we can just close the file right away as nothing is lazy
if loadable_vars == {}:
ds.close()
else:
loadable_vars = {}
indexes = {}

vars = {**virtual_vars, **loadable_vars}

data_vars, coords = separate_coords(vars, indexes)

vds = xr.Dataset(
data_vars,
coords=coords,
# indexes={}, # TODO should be added in a later version of xarray
attrs=ds_attrs,
)

# TODO we should probably also use vds.set_close() to tell xarray how to close the file we opened

return vds


def dataset_from_kerchunk_refs(
def virtual_vars_from_kerchunk_refs(
refs: KerchunkStoreRefs,
drop_variables: Optional[List[str]] = None,
virtual_array_class=ManifestArray,
indexes={},
) -> xr.Dataset:
) -> Mapping[str, xr.Variable]:
"""
Translate a store-level kerchunk reference dict into an xarray Dataset containing virtualized arrays.
Translate a store-level kerchunk reference dict into aa set of xarray Variables containing virtualized arrays.
drop_variables: list[str], default is None
Variables in the file to drop before returning.
Expand All @@ -99,24 +151,44 @@ def dataset_from_kerchunk_refs(
var_name for var_name in var_names if var_name not in drop_variables
]

vars = {}
for var_name in var_names_to_keep:
vars[var_name] = variable_from_kerchunk_refs(
vars = {var_name: variable_from_kerchunk_refs(
refs, var_name, virtual_array_class
)
) for var_name in var_names_to_keep}

return vars



def dataset_from_kerchunk_refs(
refs: KerchunkStoreRefs,
drop_variables: Optional[List[str]] = None,
virtual_array_class=ManifestArray,
indexes={},
) -> xr.Dataset:
"""
Translate a store-level kerchunk reference dict into an xarray Dataset containing virtualized arrays.
drop_variables: list[str], default is None
Variables in the file to drop before returning.
virtual_array_class
Virtual array class to use to represent the references to the chunks in each on-disk array.
Currently can only be ManifestArray, but once VirtualZarrArray is implemented the default should be changed to that.
"""

vars = virtual_vars_from_kerchunk_refs(refs, drop_variables, virtual_array_class)

data_vars, coords = separate_coords(vars, indexes)

ds_attrs = kerchunk.fully_decode_arr_refs(refs["refs"]).get(".zattrs", {})

ds = xr.Dataset(
vds = xr.Dataset(
data_vars,
coords=coords,
# indexes={}, # TODO should be added in a later version of xarray
attrs=ds_attrs,
)

return ds
return vds


def variable_from_kerchunk_refs(
Expand All @@ -134,13 +206,15 @@ def variable_from_kerchunk_refs(


def separate_coords(
vars: dict[str, xr.Variable],
indexes={},
) -> tuple[dict[str, xr.Variable], xr.Coordinates]:
vars: Mapping[str, xr.Variable],
indexes: MutableMapping[str, Index],
) -> tuple[Mapping[str, xr.Variable], xr.Coordinates]:
"""
Try to generate a set of coordinates that won't cause xarray to automatically build a pandas.Index for the 1D coordinates.
I thought this should be easy but it was actually really hard - in the end I had to checkout xarray v2023.08.0, the last one before #8107 was merged.
Currently requires a workaround unless xarray 8107 is merged.
Will also preserve any loaded variables and indexes it is passed.
"""

# this would normally come from CF decoding, let's hope the fact we're skipping that doesn't cause any problems...
Expand All @@ -155,6 +229,13 @@ def separate_coords(
if len(var.dims) == 1:
dim1d, *_ = var.dims
coord_vars[name] = (dim1d, var.data)

if isinstance(var, IndexVariable):
# unless variable actually already is a loaded IndexVariable,
# in which case we need to keep it and add the corresponding indexes explicitly
coord_vars[name] = var
# TODO this seems suspect - will it handle datetimes?
indexes[name] = PandasIndex(var, dim1d)
else:
coord_vars[name] = var
else:
Expand Down

0 comments on commit 6fef91c

Please sign in to comment.