-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Appending to zarr store #2706
Appending to zarr store #2706
Changes from 41 commits
f231393
f14f3b7
928440d
442e938
389ba43
6097da2
390a792
da9a962
6756b8f
95d5782
a750a92
295084b
cc353e1
e56a210
519b398
c85aa98
3adfd49
608813b
2078838
7a90ce8
b8af5bd
5bee0dc
7ed77ad
b4ff1c7
5b3f8ea
7564329
93be790
58c4b78
5316593
ad08c73
4d2122b
105ed39
af4a5a5
62d4f52
9558811
9d70e02
a6ff494
34b700f
3e54cb9
97ed25b
beb12e5
41a6ca3
321aec1
2b130ff
58de86d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -224,6 +224,11 @@ Other enhancements | |
report showing what exactly differs between the two objects (dimensions / | ||
coordinates / variables / attributes) (:issue:`1507`). | ||
By `Benoit Bovy <https://github.com/benbovy>`_. | ||
- Added append capability to the zarr store. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Need to credit all the contributors to this PR. |
||
By `Jendrik Jördening <https://github.com/jendrikjoe>`_, | ||
`David Brochart <https://github.com/davidbrochart>`_, | ||
`Ryan Abernathey <https://github.com/rabernat>`_ and | ||
`Shikhar Goenka<https://github.com/shikharsg>`_. | ||
- Resampling of standard and non-standard calendars indexed by | ||
:py:class:`~xarray.CFTimeIndex` is now possible. (:issue:`2191`). | ||
By `Jwen Fai Low <https://github.com/jwenfai>`_ and | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -4,17 +4,21 @@ | |||||
from io import BytesIO | ||||||
from numbers import Number | ||||||
from pathlib import Path | ||||||
import re | ||||||
|
||||||
import numpy as np | ||||||
import pandas as pd | ||||||
|
||||||
from .. import Dataset, DataArray, backends, conventions | ||||||
from .. import Dataset, DataArray, backends, conventions, coding | ||||||
from ..core import indexing | ||||||
from .. import auto_combine | ||||||
from ..core.combine import (combine_by_coords, _nested_combine, | ||||||
_infer_concat_order_from_positions) | ||||||
from ..core.utils import close_on_error, is_grib_path, is_remote_uri | ||||||
from ..core.variable import Variable | ||||||
from .common import ArrayWriter | ||||||
from .locks import _get_scheduler | ||||||
from ..coding.variables import safe_setitem, unpack_for_encoding | ||||||
|
||||||
DATAARRAY_NAME = '__xarray_dataarray_name__' | ||||||
DATAARRAY_VARIABLE = '__xarray_dataarray_variable__' | ||||||
|
@@ -1024,8 +1028,42 @@ def save_mfdataset(datasets, paths, mode='w', format=None, groups=None, | |||||
for w, s in zip(writes, stores)]) | ||||||
|
||||||
|
||||||
def _validate_datatypes_for_zarr_append(dataset): | ||||||
"""DataArray.name and Dataset keys must be a string or None""" | ||||||
def check_dtype(var): | ||||||
if (not np.issubdtype(var.dtype, np.number) | ||||||
and not coding.strings.is_unicode_dtype(var.dtype) | ||||||
and not var.dtype == object): | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this seems to work: if (not np.issubdtype(var.dtype, np.number)
and not coding.strings.is_unicode_dtype(var.dtype)
and not var.dtype == object): while this does not: if (not np.issubdtype(var.dtype, np.number)
and not np.issubdtype(var.dtype, np.string_)
and not np.issubdtype(var.dtype, np.unicode_)
and not coding.strings.is_unicode_dtype(var.dtype)): There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. or were you suggesting this? if (not np.issubdtype(var.dtype, np.number)
and not coding.strings.is_unicode_dtype(var.dtype)): this does not work There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, I understand why this doesn't work: we aren't using xarray's standard encoding pipeline when appending variables. This probably is fine for strings -- in the worst case, users will just get an error downstream from Zarr instead. |
||||||
# and not re.match('^bytes[1-9]+$', var.dtype.name)): | ||||||
raise ValueError('Invalid dtype for data variable: {} ' | ||||||
'dtype must be a subtype of number, ' | ||||||
'a fixed sized string, a fixed size ' | ||||||
'unicode string or an object'.format(var)) | ||||||
for k in dataset.data_vars.values(): | ||||||
check_dtype(k) | ||||||
|
||||||
|
||||||
def _validate_append_dim(ds_to_append, store, append_dim, **open_kwargs): | ||||||
try: | ||||||
ds = backends.zarr.open_zarr(store, **open_kwargs) | ||||||
except ValueError: # store empty | ||||||
return | ||||||
if append_dim: | ||||||
if append_dim not in ds.dims: | ||||||
raise ValueError( | ||||||
"{} not a valid dimension in the Dataset".format(append_dim) | ||||||
) | ||||||
for data_var in ds_to_append: | ||||||
if data_var in ds: | ||||||
if append_dim is None: | ||||||
raise ValueError( | ||||||
"variable '{}' already exists, but append_dim " | ||||||
"was not set".format(data_var) | ||||||
) | ||||||
|
||||||
|
||||||
def to_zarr(dataset, store=None, mode='w-', synchronizer=None, group=None, | ||||||
encoding=None, compute=True, consolidated=False): | ||||||
encoding=None, compute=True, consolidated=False, append_dim=None): | ||||||
"""This function creates an appropriate datastore for writing a dataset to | ||||||
a zarr ztore | ||||||
|
||||||
|
@@ -1040,11 +1078,16 @@ def to_zarr(dataset, store=None, mode='w-', synchronizer=None, group=None, | |||||
_validate_dataset_names(dataset) | ||||||
_validate_attrs(dataset) | ||||||
|
||||||
if mode == "a": | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we raise an error if |
||||||
_validate_datatypes_for_zarr_append(dataset) | ||||||
_validate_append_dim(dataset, store, append_dim, | ||||||
consolidated=consolidated, group=group) | ||||||
|
||||||
zstore = backends.ZarrStore.open_group(store=store, mode=mode, | ||||||
synchronizer=synchronizer, | ||||||
group=group, | ||||||
consolidate_on_close=consolidated) | ||||||
|
||||||
zstore.append_dim = append_dim | ||||||
writer = ArrayWriter() | ||||||
# TODO: figure out how to properly handle unlimited_dims | ||||||
dump_to_store(dataset, zstore, writer, encoding=encoding) | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -158,26 +158,33 @@ class ArrayWriter: | |
def __init__(self, lock=None): | ||
self.sources = [] | ||
self.targets = [] | ||
self.regions = [] | ||
self.lock = lock | ||
|
||
def add(self, source, target): | ||
def add(self, source, target, region=None): | ||
if isinstance(source, dask_array_type): | ||
self.sources.append(source) | ||
self.targets.append(target) | ||
self.regions.append(region) | ||
else: | ||
target[...] = source | ||
if region: | ||
target[region] = source | ||
else: | ||
target[...] = source | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 This seems like a perfect (and rather general) way to handle appends at the dask level. |
||
|
||
def sync(self, compute=True): | ||
if self.sources: | ||
import dask.array as da | ||
# TODO: consider wrapping targets with dask.delayed, if this makes | ||
# for any discernable difference in perforance, e.g., | ||
# targets = [dask.delayed(t) for t in self.targets] | ||
|
||
delayed_store = da.store(self.sources, self.targets, | ||
lock=self.lock, compute=compute, | ||
flush=True) | ||
flush=True, regions=self.regions) | ||
self.sources = [] | ||
self.targets = [] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we reset |
||
self.regions = [] | ||
return delayed_store | ||
|
||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,7 +8,8 @@ | |
from ..core import indexing | ||
from ..core.pycompat import integer_types | ||
from ..core.utils import FrozenOrderedDict, HiddenKeyDict | ||
from .common import AbstractWritableDataStore, BackendArray | ||
from .common import AbstractWritableDataStore, BackendArray, \ | ||
_encode_variable_name | ||
|
||
# need some special secret attributes to tell us the dimensions | ||
_DIMENSION_KEY = '_ARRAY_DIMENSIONS' | ||
|
@@ -212,7 +213,7 @@ def encode_zarr_variable(var, needs_copy=True, name=None): | |
# zarr allows unicode, but not variable-length strings, so it's both | ||
# simpler and more compact to always encode as UTF-8 explicitly. | ||
# TODO: allow toggling this explicitly via dtype in encoding. | ||
coder = coding.strings.EncodedStringCoder(allows_unicode=False) | ||
coder = coding.strings.EncodedStringCoder(allows_unicode=True) | ||
var = coder.encode(var, name=name) | ||
var = coding.strings.ensure_fixed_length_bytes(var) | ||
|
||
|
@@ -257,6 +258,7 @@ def __init__(self, zarr_group, consolidate_on_close=False): | |
self._synchronizer = self.ds.synchronizer | ||
self._group = self.ds.path | ||
self._consolidate_on_close = consolidate_on_close | ||
self.append_dim = None | ||
|
||
def open_store_variable(self, name, zarr_array): | ||
data = indexing.LazilyOuterIndexedArray(ZarrArrayWrapper(name, self)) | ||
|
@@ -313,40 +315,126 @@ def encode_variable(self, variable): | |
def encode_attribute(self, a): | ||
return _encode_zarr_attr_value(a) | ||
|
||
def prepare_variable(self, name, variable, check_encoding=False, | ||
unlimited_dims=None): | ||
|
||
attrs = variable.attrs.copy() | ||
dims = variable.dims | ||
dtype = variable.dtype | ||
shape = variable.shape | ||
|
||
fill_value = attrs.pop('_FillValue', None) | ||
if variable.encoding == {'_FillValue': None} and fill_value is None: | ||
variable.encoding = {} | ||
|
||
encoding = _extract_zarr_variable_encoding( | ||
variable, raise_on_invalid=check_encoding) | ||
|
||
encoded_attrs = OrderedDict() | ||
# the magic for storing the hidden dimension data | ||
encoded_attrs[_DIMENSION_KEY] = dims | ||
for k, v in attrs.items(): | ||
encoded_attrs[k] = self.encode_attribute(v) | ||
|
||
zarr_array = self.ds.create(name, shape=shape, dtype=dtype, | ||
fill_value=fill_value, **encoding) | ||
zarr_array.attrs.put(encoded_attrs) | ||
|
||
return zarr_array, variable.data | ||
|
||
def store(self, variables, attributes, *args, **kwargs): | ||
AbstractWritableDataStore.store(self, variables, attributes, | ||
*args, **kwargs) | ||
def store(self, variables, attributes, check_encoding_set=frozenset(), | ||
writer=None, unlimited_dims=None): | ||
""" | ||
Top level method for putting data on this store, this method: | ||
- encodes variables/attributes | ||
- sets dimensions | ||
- sets variables | ||
|
||
Parameters | ||
---------- | ||
variables : dict-like | ||
Dictionary of key/value (variable name / xr.Variable) pairs | ||
attributes : dict-like | ||
Dictionary of key/value (attribute name / attribute) pairs | ||
check_encoding_set : list-like | ||
List of variables that should be checked for invalid encoding | ||
values | ||
writer : ArrayWriter | ||
unlimited_dims : list-like | ||
List of dimension names that should be treated as unlimited | ||
dimensions. | ||
dimension on which the zarray will be appended | ||
only needed in append mode | ||
""" | ||
|
||
existing_variables = set([vn for vn in variables | ||
if _encode_variable_name(vn) in self.ds]) | ||
new_variables = set(variables) - existing_variables | ||
variables_without_encoding = OrderedDict([(vn, variables[vn]) | ||
for vn in new_variables]) | ||
variables_encoded, attributes = self.encode( | ||
variables_without_encoding, attributes) | ||
|
||
if len(existing_variables) > 0: | ||
# there are variables to append | ||
# their encoding must be the same as in the store | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are there any unit tests that verify that encoding is kept consistent? This would be nice to add, if not. Probably a good example would be dataset saved with scale/offset encoding, where the new dataset to be appended does not have any encoding provided. We could verify that probably scaled values are read back from disk. |
||
ds = open_zarr(self.ds.store, chunks=None) | ||
variables_with_encoding = OrderedDict() | ||
for vn in existing_variables: | ||
variables_with_encoding[vn] = variables[vn] | ||
variables_with_encoding[vn].encoding = ds[vn].encoding | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This modifies an argument that was passed into the function in-place, which in general should be avoided due to unexpected side effects in other parts of the code. It would be better to shallow copy the |
||
variables_with_encoding, _ = self.encode(variables_with_encoding, | ||
{}) | ||
variables_encoded.update(variables_with_encoding) | ||
|
||
self.set_attributes(attributes) | ||
self.set_dimensions(variables_encoded, unlimited_dims=unlimited_dims) | ||
self.set_variables(variables_encoded, check_encoding_set, writer, | ||
unlimited_dims=unlimited_dims) | ||
|
||
def sync(self): | ||
pass | ||
|
||
def set_variables(self, variables, check_encoding_set, writer, | ||
unlimited_dims=None): | ||
""" | ||
This provides a centralized method to set the variables on the data | ||
store. | ||
|
||
Parameters | ||
---------- | ||
variables : dict-like | ||
Dictionary of key/value (variable name / xr.Variable) pairs | ||
check_encoding_set : list-like | ||
List of variables that should be checked for invalid encoding | ||
values | ||
writer : | ||
unlimited_dims : list-like | ||
List of dimension names that should be treated as unlimited | ||
dimensions. | ||
""" | ||
|
||
for vn, v in variables.items(): | ||
name = _encode_variable_name(vn) | ||
check = vn in check_encoding_set | ||
attrs = v.attrs.copy() | ||
dims = v.dims | ||
dtype = v.dtype | ||
shape = v.shape | ||
|
||
fill_value = attrs.pop('_FillValue', None) | ||
if v.encoding == {'_FillValue': None} and fill_value is None: | ||
v.encoding = {} | ||
if name in self.ds: | ||
zarr_array = self.ds[name] | ||
""" | ||
If variable is a dimension of an existing array | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sorry, yes. That comment is not actually needed(it's from code I already removed). So I'll remove that. |
||
append_dim does not need to be specified | ||
""" | ||
if self.append_dim in dims: | ||
# this is the DataArray that has append_dim as a | ||
# dimension | ||
append_axis = dims.index(self.append_dim) | ||
new_shape = list(zarr_array.shape) | ||
new_shape[append_axis] += v.shape[append_axis] | ||
new_region = [slice(None)] * len(new_shape) | ||
new_region[append_axis] = slice( | ||
zarr_array.shape[append_axis], | ||
None | ||
) | ||
zarr_array.resize(new_shape) | ||
writer.add(v.data, zarr_array, | ||
region=tuple(new_region)) | ||
else: | ||
# new variable | ||
encoding = _extract_zarr_variable_encoding( | ||
v, raise_on_invalid=check) | ||
encoded_attrs = OrderedDict() | ||
# the magic for storing the hidden dimension data | ||
encoded_attrs[_DIMENSION_KEY] = dims | ||
for k2, v2 in attrs.items(): | ||
encoded_attrs[k2] = self.encode_attribute(v2) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if we pulled this attribute encoding out before the |
||
|
||
if coding.strings.check_vlen_dtype(dtype) == str: | ||
dtype = str | ||
zarr_array = self.ds.create(name, shape=shape, dtype=dtype, | ||
fill_value=fill_value, **encoding) | ||
zarr_array.attrs.put(encoded_attrs) | ||
writer.add(v.data, zarr_array) | ||
|
||
def close(self): | ||
if self._consolidate_on_close: | ||
import zarr | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we possibly get an example of using append here?