Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zarr chunking fixes #5065

Merged
merged 16 commits into from
Apr 26, 2021
2 changes: 1 addition & 1 deletion doc/internals/duck-arrays-integration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ argument:
...

def _repr_inline_(self, max_width):
""" format to a single line with at most max_width characters """
"""format to a single line with at most max_width characters"""
...

...
Expand Down
5 changes: 0 additions & 5 deletions doc/user-guide/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -837,11 +837,6 @@ Xarray's Zarr backend allows xarray to leverage these capabilities, including
the ability to store and analyze datasets far too large fit onto disk
(particularly :ref:`in combination with dask <dask>`).

.. warning::

Zarr support is still an experimental feature. Please report any bugs or
unexepected behavior via github issues.

Xarray can't open just any zarr dataset, because xarray requires special
metadata (attributes) describing the dataset dimensions and coordinates.
At this time, xarray can only open zarr datasets that have been written by
Expand Down
3 changes: 3 additions & 0 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ v0.17.1 (unreleased)
New Features
~~~~~~~~~~~~

- Add ``safe_chunks`` option to :py:meth:`Dataset.to_zarr` which allows overriding
checks made to ensure Dask and Zarr chunk compatibility (:issue:`5056`).
By `Ryan Abernathey <https://github.com/rabernat>`_
- Add :py:meth:`Dataset.query` and :py:meth:`DataArray.query` which enable indexing
of datasets and data arrays by evaluating query expressions against the values of the
data variables (:pull:`4984`). By `Alistair Miles <https://github.com/alimanfoo>`_.
Expand Down
3 changes: 3 additions & 0 deletions xarray/backends/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1365,6 +1365,7 @@ def to_zarr(
consolidated: bool = False,
append_dim: Hashable = None,
region: Mapping[str, slice] = None,
safe_chunks: bool = True,
):
"""This function creates an appropriate datastore for writing a dataset to
a zarr ztore
Expand Down Expand Up @@ -1419,6 +1420,7 @@ def to_zarr(
consolidated=consolidated,
region=region,
encoding=encoding,
# do we need to pass safe_chunks through here?
)

zstore = backends.ZarrStore.open_group(
Expand All @@ -1430,6 +1432,7 @@ def to_zarr(
chunk_store=chunk_store,
append_dim=append_dim,
write_region=region,
safe_chunks=safe_chunks,
)
writer = ArrayWriter()
# TODO: figure out how to properly handle unlimited_dims
Expand Down
50 changes: 35 additions & 15 deletions xarray/backends/zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def __getitem__(self, key):
# could possibly have a work-around for 0d data here


def _determine_zarr_chunks(enc_chunks, var_chunks, ndim, name):
def _determine_zarr_chunks(enc_chunks, var_chunks, ndim, name, safe_chunks):
"""
Given encoding chunks (possibly None) and variable chunks (possibly None)
"""
Expand Down Expand Up @@ -133,7 +133,7 @@ def _determine_zarr_chunks(enc_chunks, var_chunks, ndim, name):

if len(enc_chunks_tuple) != ndim:
# throw away encoding chunks, start over
return _determine_zarr_chunks(None, var_chunks, ndim, name)
return _determine_zarr_chunks(None, var_chunks, ndim, name, safe_chunks)

for x in enc_chunks_tuple:
if not isinstance(x, int):
Expand Down Expand Up @@ -164,24 +164,32 @@ def _determine_zarr_chunks(enc_chunks, var_chunks, ndim, name):
continue
for dchunk in dchunks[:-1]:
if dchunk % zchunk:
raise NotImplementedError(
base_error = (
f"Specified zarr chunks encoding['chunks']={enc_chunks_tuple!r} for "
f"variable named {name!r} would overlap multiple dask chunks {var_chunks!r}. "
"This is not implemented in xarray yet. "
"Consider either rechunking using `chunk()` or instead deleting "
"or modifying `encoding['chunks']`."
f"Writing this array in parallel with dask could lead to corrupted data."
)
if safe_chunks:
raise NotImplementedError(
base_error
+ " Consider either rechunking using `chunk()`, deleting "
"or modifying `encoding['chunks']`, or specify `safe_chunks=False`."
)
if dchunks[-1] > zchunk:
raise ValueError(
base_error = (
"Final chunk of Zarr array must be the same size or "
"smaller than the first. "
f"Specified Zarr chunk encoding['chunks']={enc_chunks_tuple}, "
f"for variable named {name!r} "
f"but {dchunks} in the variable's Dask chunks {var_chunks} is "
f"but {dchunks} in the variable's Dask chunks {var_chunks} are "
"incompatible with this encoding. "
rabernat marked this conversation as resolved.
Show resolved Hide resolved
"Consider either rechunking using `chunk()` or instead deleting "
"or modifying `encoding['chunks']`."
)
if safe_chunks:
raise NotImplementedError(
base_error
+ " Consider either rechunking using `chunk()`, deleting "
"or modifying `encoding['chunks']`, or specify `safe_chunks=False`."
)
return enc_chunks_tuple

raise AssertionError("We should never get here. Function logic must be wrong.")
Expand All @@ -203,7 +211,9 @@ def _get_zarr_dims_and_attrs(zarr_obj, dimension_key):
return dimensions, attributes


def extract_zarr_variable_encoding(variable, raise_on_invalid=False, name=None):
def extract_zarr_variable_encoding(
variable, raise_on_invalid=False, name=None, safe_chunks=True
):
"""
Extract zarr encoding dictionary from xarray Variable

Expand Down Expand Up @@ -233,7 +243,7 @@ def extract_zarr_variable_encoding(variable, raise_on_invalid=False, name=None):
del encoding[k]

chunks = _determine_zarr_chunks(
encoding.get("chunks"), variable.chunks, variable.ndim, name
encoding.get("chunks"), variable.chunks, variable.ndim, name, safe_chunks
)
encoding["chunks"] = chunks
return encoding
Expand Down Expand Up @@ -285,6 +295,7 @@ class ZarrStore(AbstractWritableDataStore):
"_read_only",
"_synchronizer",
"_write_region",
"_safe_chunks",
)

@classmethod
Expand All @@ -300,6 +311,7 @@ def open_group(
storage_options=None,
append_dim=None,
write_region=None,
safe_chunks=True,
):

# zarr doesn't support pathlib.Path objects yet. zarr-python#601
Expand All @@ -323,10 +335,17 @@ def open_group(
zarr_group = zarr.open_consolidated(store, **open_kwargs)
else:
zarr_group = zarr.open_group(store, **open_kwargs)
return cls(zarr_group, consolidate_on_close, append_dim, write_region)
return cls(
zarr_group, consolidate_on_close, append_dim, write_region, safe_chunks
)

def __init__(
self, zarr_group, consolidate_on_close=False, append_dim=None, write_region=None
self,
zarr_group,
consolidate_on_close=False,
append_dim=None,
write_region=None,
safe_chunks=True,
):
self.ds = zarr_group
self._read_only = self.ds.read_only
Expand All @@ -335,6 +354,7 @@ def __init__(
self._consolidate_on_close = consolidate_on_close
self._append_dim = append_dim
self._write_region = write_region
self._safe_chunks = safe_chunks

def open_store_variable(self, name, zarr_array):
data = indexing.LazilyIndexedArray(ZarrArrayWrapper(name, self))
Expand Down Expand Up @@ -497,7 +517,7 @@ def set_variables(self, variables, check_encoding_set, writer, unlimited_dims=No
else:
# new variable
encoding = extract_zarr_variable_encoding(
v, raise_on_invalid=check, name=vn
v, raise_on_invalid=check, name=vn, safe_chunks=self._safe_chunks
)
encoded_attrs = {}
# the magic for storing the hidden dimension data
Expand Down
24 changes: 21 additions & 3 deletions xarray/core/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1776,12 +1776,22 @@ def to_zarr(
consolidated: bool = False,
append_dim: Hashable = None,
region: Mapping[str, slice] = None,
safe_chunks: bool = True,
) -> "ZarrStore":
"""Write dataset contents to a zarr group.

.. note:: Experimental
The Zarr backend is new and experimental. Please report any
unexpected behavior via github issues.
rabernat marked this conversation as resolved.
Show resolved Hide resolved
Zarr chunks are determined in the following way:

- From the ``chunks`` attribute in each variable's ``encoding``
- If the variable is a Dask array, from the dask chunks
- If neither Dask chunks nor encoding chunks are present, chunks will
be determined automatically by Zarr
- If both Dask chunks and encoding chunks are present, encoding chunks
will be used, provided that there is a many-to-one relationship between
encoding chunks and dask chunks (i.e. Dask chunks are bigger than and
evenly divide encoding chunks); otherwise raise a ``ValueError``.
This restriction ensures that no synchronization / locks are required
when writing. To disable this restriction, use ``safe_chunks=False``.

Parameters
----------
Expand Down Expand Up @@ -1833,6 +1843,13 @@ def to_zarr(
in with ``region``, use a separate call to ``to_zarr()`` with
``compute=False``. See "Appending to existing Zarr stores" in
the reference documentation for full details.
safe_chunks : bool, optional
If True, only allow writes to when there is a many-to-one relationship
between Zarr chunks (specified in encoding) and Dask chunks.
Set False to override this restriction; however, data may become corrupted
if Zarr arrays are written in parallel. This option may be useful in combination
with ``compute=False`` to initialize a Zarr from an existing
Dataset with aribtrary chunk structure.

References
----------
Expand Down Expand Up @@ -1869,6 +1886,7 @@ def to_zarr(
consolidated=consolidated,
append_dim=append_dim,
region=region,
safe_chunks=safe_chunks,
)

def __repr__(self) -> str:
Expand Down
17 changes: 14 additions & 3 deletions xarray/tests/test_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -1871,14 +1871,21 @@ def test_chunk_encoding_with_dask(self):
with self.roundtrip(badenc) as actual:
pass

# unless...
with self.roundtrip(badenc, save_kwargs={"safe_chunks": False}) as actual:
# don't actually check equality because the data could be corrupted
pass

badenc.var1.encoding["chunks"] = (2,)
with pytest.raises(ValueError, match=r"Specified Zarr chunk encoding"):
with pytest.raises(NotImplementedError, match=r"Specified Zarr chunk encoding"):
with self.roundtrip(badenc) as actual:
pass

badenc = badenc.chunk({"x": (3, 3, 6)})
badenc.var1.encoding["chunks"] = (3,)
with pytest.raises(ValueError, match=r"incompatible with this encoding"):
with pytest.raises(
NotImplementedError, match=r"incompatible with this encoding"
):
with self.roundtrip(badenc) as actual:
pass

Expand All @@ -1901,9 +1908,13 @@ def test_chunk_encoding_with_dask(self):
# TODO: remove this failure once syncronized overlapping writes are
# supported by xarray
ds_chunk4["var1"].encoding.update({"chunks": 5})
with pytest.raises(NotImplementedError):
with pytest.raises(NotImplementedError, match=r"named 'var1' would overlap"):
with self.roundtrip(ds_chunk4) as actual:
pass
# override option
with self.roundtrip(ds_chunk4, save_kwargs={"safe_chunks": False}) as actual:
# don't actually check equality because the data could be corrupted
pass

def test_hidden_zarr_keys(self):
expected = create_test_data()
Expand Down