Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check for aligned chunks when writing to existing variables #8459

Merged
merged 14 commits into from
Mar 29, 2024
5 changes: 4 additions & 1 deletion doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ v2024.03.0 (unreleased)

New Features
~~~~~~~~~~~~

- Partial writes to existing chunks with ``region`` or ``append_dim`` will now raise an error
(unless ``safe_chunks=False``); previously an error would only be raised on
new variables. (:pull:`8459`, :issue:`8371`, :issue:`8882`)
By `Maximilian Roos <https://github.com/max-sixty>`_.
- Grouped and resampling quantile calculations now use the vectorized algorithm in ``flox>=0.9.4`` if present.
By `Deepak Cherian <https://github.com/dcherian>`_.
- Do not broadcast in arithmetic operations when global option ``arithmetic_broadcast=False``
Expand Down
16 changes: 12 additions & 4 deletions xarray/backends/zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ def _determine_zarr_chunks(enc_chunks, var_chunks, ndim, name, safe_chunks):
f"Writing this array in parallel with dask could lead to corrupted data."
)
if safe_chunks:
raise NotImplementedError(
raise ValueError(
base_error
+ " Consider either rechunking using `chunk()`, deleting "
"or modifying `encoding['chunks']`, or specify `safe_chunks=False`."
Expand Down Expand Up @@ -702,6 +702,17 @@ def set_variables(self, variables, check_encoding_set, writer, unlimited_dims=No
if v.encoding == {"_FillValue": None} and fill_value is None:
v.encoding = {}

# We need to do this for both new and existing variables to ensure we're not
# writing to a partial chunk, even though we don't use the `encoding` value
# when writing to an existing variable. See
# https://github.com/pydata/xarray/issues/8371 for details.
encoding = extract_zarr_variable_encoding(
dcherian marked this conversation as resolved.
Show resolved Hide resolved
v,
raise_on_invalid=check,
name=vn,
safe_chunks=self._safe_chunks,
)

if name in existing_keys:
# existing variable
# TODO: if mode="a", consider overriding the existing variable
Expand Down Expand Up @@ -732,9 +743,6 @@ def set_variables(self, variables, check_encoding_set, writer, unlimited_dims=No
zarr_array = self.zarr_group[name]
else:
# new variable
encoding = extract_zarr_variable_encoding(
v, raise_on_invalid=check, name=vn, safe_chunks=self._safe_chunks
)
encoded_attrs = {}
# the magic for storing the hidden dimension data
encoded_attrs[DIMENSION_KEY] = dims
Expand Down
12 changes: 9 additions & 3 deletions xarray/core/dataarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -4120,7 +4120,7 @@ def to_zarr(
compute: Literal[True] = True,
consolidated: bool | None = None,
append_dim: Hashable | None = None,
region: Mapping[str, slice] | None = None,
region: Mapping[str, slice | Literal["auto"]] | Literal["auto"] | None = None,
safe_chunks: bool = True,
storage_options: dict[str, str] | None = None,
zarr_version: int | None = None,
Expand All @@ -4140,7 +4140,7 @@ def to_zarr(
compute: Literal[False],
consolidated: bool | None = None,
append_dim: Hashable | None = None,
region: Mapping[str, slice] | None = None,
region: Mapping[str, slice | Literal["auto"]] | Literal["auto"] | None = None,
safe_chunks: bool = True,
storage_options: dict[str, str] | None = None,
zarr_version: int | None = None,
Expand All @@ -4158,7 +4158,7 @@ def to_zarr(
compute: bool = True,
consolidated: bool | None = None,
append_dim: Hashable | None = None,
region: Mapping[str, slice] | None = None,
region: Mapping[str, slice | Literal["auto"]] | Literal["auto"] | None = None,
safe_chunks: bool = True,
storage_options: dict[str, str] | None = None,
zarr_version: int | None = None,
Expand Down Expand Up @@ -4237,6 +4237,12 @@ def to_zarr(
in with ``region``, use a separate call to ``to_zarr()`` with
``compute=False``. See "Appending to existing Zarr stores" in
the reference documentation for full details.

Users are expected to ensure that the specified region aligns with
Zarr chunk boundaries, and that dask chunks are also aligned.
Xarray makes limited checks that these multiple chunk boundaries line up.
It is possible to write incomplete chunks and corrupt the data with this
option if you are not careful.
safe_chunks : bool, default: True
If True, only allow writes to when there is a many-to-one relationship
between Zarr chunks (specified in encoding) and Dask chunks.
Expand Down
6 changes: 6 additions & 0 deletions xarray/core/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2452,6 +2452,12 @@ def to_zarr(
in with ``region``, use a separate call to ``to_zarr()`` with
``compute=False``. See "Appending to existing Zarr stores" in
the reference documentation for full details.

Users are expected to ensure that the specified region aligns with
Zarr chunk boundaries, and that dask chunks are also aligned.
Xarray makes limited checks that these multiple chunk boundaries line up.
It is possible to write incomplete chunks and corrupt the data with this
option if you are not careful.
safe_chunks : bool, default: True
If True, only allow writes to when there is a many-to-one relationship
between Zarr chunks (specified in encoding) and Dask chunks.
Expand Down
85 changes: 80 additions & 5 deletions xarray/tests/test_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -2305,7 +2305,7 @@ def test_chunk_encoding_with_dask(self) -> None:
# should fail if encoding["chunks"] clashes with dask_chunks
badenc = ds.chunk({"x": 4})
badenc.var1.encoding["chunks"] = (6,)
with pytest.raises(NotImplementedError, match=r"named 'var1' would overlap"):
with pytest.raises(ValueError, match=r"named 'var1' would overlap"):
with self.roundtrip(badenc) as actual:
pass

Expand Down Expand Up @@ -2343,9 +2343,7 @@ def test_chunk_encoding_with_dask(self) -> None:
# but itermediate unaligned chunks are bad
badenc = ds.chunk({"x": (3, 5, 3, 1)})
badenc.var1.encoding["chunks"] = (3,)
with pytest.raises(
NotImplementedError, match=r"would overlap multiple dask chunks"
):
with pytest.raises(ValueError, match=r"would overlap multiple dask chunks"):
with self.roundtrip(badenc) as actual:
pass

Expand All @@ -2359,7 +2357,7 @@ def test_chunk_encoding_with_dask(self) -> None:
# TODO: remove this failure once synchronized overlapping writes are
# supported by xarray
ds_chunk4["var1"].encoding.update({"chunks": 5})
with pytest.raises(NotImplementedError, match=r"named 'var1' would overlap"):
with pytest.raises(ValueError, match=r"named 'var1' would overlap"):
with self.roundtrip(ds_chunk4) as actual:
pass
# override option
Expand Down Expand Up @@ -5733,3 +5731,80 @@ def test_zarr_region(tmp_path):

# Write without region
ds_transposed.to_zarr(tmp_path / "test.zarr", mode="r+")


@requires_zarr
@requires_dask
def test_zarr_region_chunk_partial(tmp_path):
"""
Check that writing to partial chunks with `region` fails, assuming `safe_chunks=False`.
"""
ds = (
xr.DataArray(np.arange(120).reshape(4, 3, -1), dims=list("abc"))
.rename("var1")
.to_dataset()
)

ds.chunk(5).to_zarr(tmp_path / "foo.zarr", compute=False, mode="w")
with pytest.raises(ValueError):
for r in range(ds.sizes["a"]):
ds.chunk(3).isel(a=[r]).to_zarr(
tmp_path / "foo.zarr", region=dict(a=slice(r, r + 1))
)


@requires_zarr
@requires_dask
def test_zarr_append_chunk_partial(tmp_path):
t_coords = np.array([np.datetime64("2020-01-01").astype("datetime64[ns]")])
data = np.ones((10, 10))

da = xr.DataArray(
data.reshape((-1, 10, 10)),
dims=["time", "x", "y"],
coords={"time": t_coords},
name="foo",
)
da.to_zarr(tmp_path / "foo.zarr", mode="w", encoding={"foo": {"chunks": (5, 5, 1)}})

new_time = np.array([np.datetime64("2021-01-01").astype("datetime64[ns]")])

da2 = xr.DataArray(
data.reshape((-1, 10, 10)),
dims=["time", "x", "y"],
coords={"time": new_time},
name="foo",
)
with pytest.raises(ValueError, match="encoding was provided"):
da2.to_zarr(
tmp_path / "foo.zarr",
append_dim="time",
mode="a",
encoding={"foo": {"chunks": (1, 1, 1)}},
)

# chunking with dask sidesteps the encoding check, so we need a different check
with pytest.raises(ValueError, match="Specified zarr chunks"):
da2.chunk({"x": 1, "y": 1, "time": 1}).to_zarr(
tmp_path / "foo.zarr", append_dim="time", mode="a"
)


@requires_zarr
@requires_dask
def test_zarr_region_chunk_partial_offset(tmp_path):
# https://github.com/pydata/xarray/pull/8459#issuecomment-1819417545
store = tmp_path / "foo.zarr"
data = np.ones((30,))
da = xr.DataArray(data, dims=["x"], coords={"x": range(30)}, name="foo").chunk(x=10)
da.to_zarr(store, compute=False)

da.isel(x=slice(10)).chunk(x=(10,)).to_zarr(store, region="auto")

da.isel(x=slice(5, 25)).chunk(x=(10, 10)).to_zarr(
store, safe_chunks=False, region="auto"
)

# This write is unsafe, and should raise an error, but does not.
# with pytest.raises(ValueError):
# da.isel(x=slice(5, 25)).chunk(x=(10, 10)).to_zarr(store, region="auto")
Loading