Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix zarr datetime64 chunks #8253

Closed
wants to merge 18 commits into from
7 changes: 7 additions & 0 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ v2023.10.1 (19 Oct, 2023)
This release updates our minimum numpy version in ``pyproject.toml`` to 1.22,
consistent with our documentation below.

Bug fixes
~~~~~~~~~

- Fix bug where :py:meth:`Dataset.to_zarr` would modify chunks of datetime-like variables (:issue:`8230`, :pull:`8253`).
By `Mattia Almansi <https://github.com/malmans2>`_.


.. _whats-new.2023.10.0:

v2023.10.0 (19 Oct, 2023)
Expand Down
40 changes: 24 additions & 16 deletions xarray/backends/zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,24 @@ def __getitem__(self, key):
# could possibly have a work-around for 0d data here


def _squeeze_var_chunks(var_chunks, name=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we get a quick comment to explain what this function does?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if any(len(set(chunks[:-1])) > 1 for chunks in var_chunks):
raise ValueError(
"Zarr requires uniform chunk sizes except for final chunk. "
f"Variable named {name!r} has incompatible dask chunks: {var_chunks!r}. "
"Consider rechunking using `chunk()`."
)
if any((chunks[0] < chunks[-1]) for chunks in var_chunks):
raise ValueError(
"Final chunk of Zarr array must be the same size or smaller "
f"than the first. Variable named {name!r} has incompatible Dask chunks {var_chunks!r}."
"Consider either rechunking using `chunk()` or instead deleting "
"or modifying `encoding['chunks']`."
)
# return the first chunk for each dimension
return tuple(chunk[0] for chunk in var_chunks)


def _determine_zarr_chunks(enc_chunks, var_chunks, ndim, name, safe_chunks):
"""
Given encoding chunks (possibly None or []) and variable chunks
Expand All @@ -128,21 +146,7 @@ def _determine_zarr_chunks(enc_chunks, var_chunks, ndim, name, safe_chunks):
# while dask chunks can be variable sized
# http://dask.pydata.org/en/latest/array-design.html#chunks
if var_chunks and not enc_chunks:
if any(len(set(chunks[:-1])) > 1 for chunks in var_chunks):
raise ValueError(
"Zarr requires uniform chunk sizes except for final chunk. "
f"Variable named {name!r} has incompatible dask chunks: {var_chunks!r}. "
"Consider rechunking using `chunk()`."
)
if any((chunks[0] < chunks[-1]) for chunks in var_chunks):
raise ValueError(
"Final chunk of Zarr array must be the same size or smaller "
f"than the first. Variable named {name!r} has incompatible Dask chunks {var_chunks!r}."
"Consider either rechunking using `chunk()` or instead deleting "
"or modifying `encoding['chunks']`."
)
# return the first chunk for each dimension
return tuple(chunk[0] for chunk in var_chunks)
return _squeeze_var_chunks(var_chunks, name=name)

# from here on, we are dealing with user-specified chunks in encoding
# zarr allows chunks to be an integer, in which case it uses the same chunk
Expand Down Expand Up @@ -286,7 +290,8 @@ def extract_zarr_variable_encoding(


# Function below is copied from conventions.encode_cf_variable.
# The only change is to raise an error for object dtypes.
# The only change is to raise an error for object dtypes, and
# add chunks to the encoding when dask arrays are converted to np.
def encode_zarr_variable(var, needs_copy=True, name=None):
"""
Converts an Variable into an Variable which follows some
Expand All @@ -307,6 +312,7 @@ def encode_zarr_variable(var, needs_copy=True, name=None):
out : Variable
A variable which has been encoded as described above.
"""
original_chunks = var.chunks
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of the crux. I cannot actually understand how / where the .chunks attributes is defined for Variables. We want the decoding pipeline to preserve chunks unmodified.

.chunks is not defined in the Variables class itself so must be somehow inherited from the other classes:

class Variable(NamedArray, AbstractArray, VariableArithmetic):

The word chunks does not even appear in https://github.com/pydata/xarray/blob/main/xarray/coding/times.py.

I'm tempted to loop in @TomNicholas into this conversation, who recently refactored everything about how we handle chunked arrays, to help us sort through this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

chunks is inherited from NamedArray:

def chunks(self) -> _Chunks | None:

Yes, the problem is that the encoder does not make any difference between dask/numpy arrays and always returns numpy arrays. I originally thought that was a mistake, but I wasn't so sure after I tried to change that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DateTimes are always cast to numpy using np.asarray AFAICT. And this is there since that part of the coding was implemented.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So are you saying that all datatime arrays are eagerly computed by the coding pipelines, even if they are Dask arrays?


var = conventions.encode_cf_variable(var, name=name)

Expand All @@ -317,6 +323,8 @@ def encode_zarr_variable(var, needs_copy=True, name=None):
var = coder.encode(var, name=name)
var = coding.strings.ensure_fixed_length_bytes(var)

if original_chunks and not var.chunks and "chunks" not in var.encoding:
var.encoding["chunks"] = _squeeze_var_chunks(original_chunks, name=name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I worry that fixing the issue this way reveals that our internal interfaces are leaky. It seems like a bandaid for a deeper problem.

Why does encode_cf_variable work for some dask-based variables but not for certain datetimes? Why does CFDatetimeCoder behave this way? Is it possible that the encoder is eagerly computing the dask array by mistake?

return var


Expand Down
8 changes: 8 additions & 0 deletions xarray/tests/test_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -2709,6 +2709,14 @@ def test_attributes(self, obj) -> None:
with pytest.raises(TypeError, match=r"Invalid attribute in Dataset.attrs."):
ds.to_zarr(store_target, **self.version_kwargs)

@requires_dask
def test_chunked_datetime64(self) -> None:
original = create_test_data().astype("datetime64[ns]").chunk(1)
with self.roundtrip(original, open_kwargs={"chunks": {}}) as actual:
for name, actual_var in actual.variables.items():
assert original[name].chunks == actual_var.chunks
assert original.chunks == actual.chunks

def test_vectorized_indexing_negative_step(self) -> None:
if not has_dask:
pytest.xfail(
Expand Down