diff --git a/doc/whats-new.rst b/doc/whats-new.rst index 72344df4658..624f161d773 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -29,6 +29,8 @@ Deprecations Bug fixes ~~~~~~~~~ +- Fix the ``align_chunks`` parameter on the :py:meth:`~xarray.Dataset.to_zarr` method, it was not being + passed to the underlying :py:meth:`~xarray.backends.api` method (:issue:`10501`, :pull:`10516`). Documentation ~~~~~~~~~~~~~ diff --git a/xarray/backends/chunks.py b/xarray/backends/chunks.py index f17f5375976..c255c7db591 100644 --- a/xarray/backends/chunks.py +++ b/xarray/backends/chunks.py @@ -4,20 +4,18 @@ def align_nd_chunks( - nd_var_chunks: tuple[tuple[int, ...], ...], + nd_v_chunks: tuple[tuple[int, ...], ...], nd_backend_chunks: tuple[tuple[int, ...], ...], ) -> tuple[tuple[int, ...], ...]: - if len(nd_backend_chunks) != len(nd_var_chunks): + if len(nd_backend_chunks) != len(nd_v_chunks): raise ValueError( "The number of dimensions on the backend and the variable must be the same." ) nd_aligned_chunks: list[tuple[int, ...]] = [] - for backend_chunks, var_chunks in zip( - nd_backend_chunks, nd_var_chunks, strict=True - ): + for backend_chunks, v_chunks in zip(nd_backend_chunks, nd_v_chunks, strict=True): # Validate that they have the same number of elements - if sum(backend_chunks) != sum(var_chunks): + if sum(backend_chunks) != sum(v_chunks): raise ValueError( "The number of elements in the backend does not " "match the number of elements in the variable. " @@ -42,8 +40,8 @@ def align_nd_chunks( nd_aligned_chunks.append(backend_chunks) continue - if len(var_chunks) == 1: - nd_aligned_chunks.append(var_chunks) + if len(v_chunks) == 1: + nd_aligned_chunks.append(v_chunks) continue # Size of the chunk on the backend @@ -51,7 +49,7 @@ def align_nd_chunks( # The ideal size of the chunks is the maximum of the two; this would avoid # that we use more memory than expected - max_chunk = max(fixed_chunk, *var_chunks) + max_chunk = max(fixed_chunk, *v_chunks) # The algorithm assumes that the chunks on this array are aligned except the last one # because it can be considered a partial one @@ -59,22 +57,22 @@ def align_nd_chunks( # For simplicity of the algorithm, let's transform the Array chunks in such a way that # we remove the partial chunks. To achieve this, we add artificial data to the borders - t_var_chunks = list(var_chunks) - t_var_chunks[0] += fixed_chunk - backend_chunks[0] - t_var_chunks[-1] += fixed_chunk - backend_chunks[-1] + t_v_chunks = list(v_chunks) + t_v_chunks[0] += fixed_chunk - backend_chunks[0] + t_v_chunks[-1] += fixed_chunk - backend_chunks[-1] # The unfilled_size is the amount of space that has not been filled on the last # processed chunk; this is equivalent to the amount of data that would need to be # added to a partial Zarr chunk to fill it up to the fixed_chunk size unfilled_size = 0 - for var_chunk in t_var_chunks: + for v_chunk in t_v_chunks: # Ideally, we should try to preserve the original Dask chunks, but this is only # possible if the last processed chunk was aligned (unfilled_size == 0) - ideal_chunk = var_chunk + ideal_chunk = v_chunk if unfilled_size: # If that scenario is not possible, the best option is to merge the chunks - ideal_chunk = var_chunk + aligned_chunks[-1] + ideal_chunk = v_chunk + aligned_chunks[-1] while ideal_chunk: if not unfilled_size: @@ -105,27 +103,27 @@ def align_nd_chunks( border_size = fixed_chunk - backend_chunks[::order][0] aligned_chunks = aligned_chunks[::order] aligned_chunks[0] -= border_size - t_var_chunks = t_var_chunks[::order] - t_var_chunks[0] -= border_size + t_v_chunks = t_v_chunks[::order] + t_v_chunks[0] -= border_size if ( len(aligned_chunks) >= 2 and aligned_chunks[0] + aligned_chunks[1] <= max_chunk - and aligned_chunks[0] != t_var_chunks[0] + and aligned_chunks[0] != t_v_chunks[0] ): # The artificial data added to the border can introduce inefficient chunks # on the borders, for that reason, we will check if we can merge them or not # Example: # backend_chunks = [6, 6, 1] - # var_chunks = [6, 7] - # t_var_chunks = [6, 12] - # The ideal output should preserve the same var_chunks, but the previous loop + # v_chunks = [6, 7] + # t_v_chunks = [6, 12] + # The ideal output should preserve the same v_chunks, but the previous loop # is going to produce aligned_chunks = [6, 6, 6] # And after removing the artificial data, we will end up with aligned_chunks = [6, 6, 1] # which is not ideal and can be merged into a single chunk aligned_chunks[1] += aligned_chunks[0] aligned_chunks = aligned_chunks[1:] - t_var_chunks = t_var_chunks[::order] + t_v_chunks = t_v_chunks[::order] aligned_chunks = aligned_chunks[::order] nd_aligned_chunks.append(tuple(aligned_chunks)) @@ -144,6 +142,11 @@ def build_grid_chunks( region_start = region.start or 0 # Generate the zarr chunks inside the region of this dim chunks_on_region = [chunk_size - (region_start % chunk_size)] + if chunks_on_region[0] >= size: + # This is useful for the scenarios where the chunk_size are bigger + # than the variable chunks, which can happens when the user specifies + # the enc_chunks manually. + return (size,) chunks_on_region.extend([chunk_size] * ((size - chunks_on_region[0]) // chunk_size)) if (size - chunks_on_region[0]) % chunk_size != 0: chunks_on_region.append((size - chunks_on_region[0]) % chunk_size) @@ -155,23 +158,23 @@ def grid_rechunk( enc_chunks: tuple[int, ...], region: tuple[slice, ...], ) -> Variable: - nd_var_chunks = v.chunks - if not nd_var_chunks: + nd_v_chunks = v.chunks + if not nd_v_chunks: return v nd_grid_chunks = tuple( build_grid_chunks( - sum(var_chunks), + v_size, region=interval, chunk_size=chunk_size, ) - for var_chunks, chunk_size, interval in zip( - nd_var_chunks, enc_chunks, region, strict=True + for v_size, chunk_size, interval in zip( + v.shape, enc_chunks, region, strict=True ) ) nd_aligned_chunks = align_nd_chunks( - nd_var_chunks=nd_var_chunks, + nd_v_chunks=nd_v_chunks, nd_backend_chunks=nd_grid_chunks, ) v = v.chunk(dict(zip(v.dims, nd_aligned_chunks, strict=True))) @@ -179,21 +182,21 @@ def grid_rechunk( def validate_grid_chunks_alignment( - nd_var_chunks: tuple[tuple[int, ...], ...] | None, + nd_v_chunks: tuple[tuple[int, ...], ...] | None, enc_chunks: tuple[int, ...], backend_shape: tuple[int, ...], region: tuple[slice, ...], allow_partial_chunks: bool, name: str, ): - if nd_var_chunks is None: + if nd_v_chunks is None: return base_error = ( "Specified Zarr chunks encoding['chunks']={enc_chunks!r} for " "variable named {name!r} would overlap multiple Dask chunks. " - "Check the chunk at position {var_chunk_pos}, which has a size of " - "{var_chunk_size} on dimension {dim_i}. It is unaligned with " - "backend chunks of size {chunk_size} in region {region}. " + "Please check the Dask chunks at position {v_chunk_pos} and " + "{v_chunk_pos_next}, on axis {axis}, they are overlapped " + "on the same Zarr chunk in the region {region}. " "Writing this array in parallel with Dask could lead to corrupted data. " "To resolve this issue, consider one of the following options: " "- Rechunk the array using `chunk()`. " @@ -202,22 +205,23 @@ def validate_grid_chunks_alignment( "- Enable automatic chunks alignment with `align_chunks=True`." ) - for dim_i, chunk_size, var_chunks, interval, size in zip( + for axis, chunk_size, v_chunks, interval, size in zip( range(len(enc_chunks)), enc_chunks, - nd_var_chunks, + nd_v_chunks, region, backend_shape, strict=True, ): - for i, chunk in enumerate(var_chunks[1:-1]): + for i, chunk in enumerate(v_chunks[1:-1]): if chunk % chunk_size: raise ValueError( base_error.format( - var_chunk_pos=i + 1, - var_chunk_size=chunk, + v_chunk_pos=i + 1, + v_chunk_pos_next=i + 2, + v_chunk_size=chunk, + axis=axis, name=name, - dim_i=dim_i, chunk_size=chunk_size, region=interval, enc_chunks=enc_chunks, @@ -226,20 +230,21 @@ def validate_grid_chunks_alignment( interval_start = interval.start or 0 - if len(var_chunks) > 1: + if len(v_chunks) > 1: # The first border size is the amount of data that needs to be updated on the # first chunk taking into account the region slice. first_border_size = chunk_size if allow_partial_chunks: first_border_size = chunk_size - interval_start % chunk_size - if (var_chunks[0] - first_border_size) % chunk_size: + if (v_chunks[0] - first_border_size) % chunk_size: raise ValueError( base_error.format( - var_chunk_pos=0, - var_chunk_size=var_chunks[0], + v_chunk_pos=0, + v_chunk_pos_next=0, + v_chunk_size=v_chunks[0], + axis=axis, name=name, - dim_i=dim_i, chunk_size=chunk_size, region=interval, enc_chunks=enc_chunks, @@ -250,10 +255,11 @@ def validate_grid_chunks_alignment( region_stop = interval.stop or size error_on_last_chunk = base_error.format( - var_chunk_pos=len(var_chunks) - 1, - var_chunk_size=var_chunks[-1], + v_chunk_pos=len(v_chunks) - 1, + v_chunk_pos_next=len(v_chunks) - 1, + v_chunk_size=v_chunks[-1], + axis=axis, name=name, - dim_i=dim_i, chunk_size=chunk_size, region=interval, enc_chunks=enc_chunks, @@ -267,7 +273,7 @@ def validate_grid_chunks_alignment( # If the region is covering the last chunk then check # if the reminder with the default chunk size # is equal to the size of the last chunk - if var_chunks[-1] % chunk_size != size % chunk_size: + if v_chunks[-1] % chunk_size != size % chunk_size: raise ValueError(error_on_last_chunk) - elif var_chunks[-1] % chunk_size: + elif v_chunks[-1] % chunk_size: raise ValueError(error_on_last_chunk) diff --git a/xarray/backends/zarr.py b/xarray/backends/zarr.py index 452a5751228..f0578ca9352 100644 --- a/xarray/backends/zarr.py +++ b/xarray/backends/zarr.py @@ -1249,7 +1249,7 @@ def set_variables( # threads shape = zarr_shape or v.shape validate_grid_chunks_alignment( - nd_var_chunks=v.chunks, + nd_v_chunks=v.chunks, enc_chunks=encoding["chunks"], region=region, allow_partial_chunks=self._mode != "r+", diff --git a/xarray/core/dataset.py b/xarray/core/dataset.py index a4cf899e687..11f56d3ad44 100644 --- a/xarray/core/dataset.py +++ b/xarray/core/dataset.py @@ -2375,6 +2375,7 @@ def to_zarr( append_dim=append_dim, region=region, safe_chunks=safe_chunks, + align_chunks=align_chunks, zarr_version=zarr_version, zarr_format=zarr_format, write_empty_chunks=write_empty_chunks, diff --git a/xarray/tests/test_backends.py b/xarray/tests/test_backends.py index 59c83ea0480..0f4debada29 100644 --- a/xarray/tests/test_backends.py +++ b/xarray/tests/test_backends.py @@ -7720,6 +7720,54 @@ def test_zarr_safe_chunk_region(self, mode: Literal["r+", "a"]): chunk = chunk.chunk() self.save(store, chunk.chunk(), region=region) + @requires_dask + def test_dataset_to_zarr_align_chunks_true(self, tmp_store) -> None: + # This test is a replica of the one in `test_dataarray_to_zarr_align_chunks_true` + # but for datasets + with self.create_zarr_target() as store: + ds = ( + DataArray( + np.arange(4).reshape((2, 2)), + dims=["a", "b"], + coords={ + "a": np.arange(2), + "b": np.arange(2), + }, + ) + .chunk(a=(1, 1), b=(1, 1)) + .to_dataset(name="foo") + ) + + self.save( + store, + ds, + align_chunks=True, + encoding={"foo": {"chunks": (3, 3)}}, + mode="w", + ) + assert_identical(ds, xr.open_zarr(store)) + + ds = ( + DataArray( + np.arange(4, 8).reshape((2, 2)), + dims=["a", "b"], + coords={ + "a": np.arange(2), + "b": np.arange(2), + }, + ) + .chunk(a=(1, 1), b=(1, 1)) + .to_dataset(name="foo") + ) + + self.save( + store, + ds, + align_chunks=True, + region="auto", + ) + assert_identical(ds, xr.open_zarr(store)) + @requires_h5netcdf @requires_fsspec diff --git a/xarray/tests/test_backends_chunks.py b/xarray/tests/test_backends_chunks.py index 61b844d84be..bb1297d0db3 100644 --- a/xarray/tests/test_backends_chunks.py +++ b/xarray/tests/test_backends_chunks.py @@ -14,6 +14,8 @@ (10, 3, None, (3, 3, 3, 1)), (10, 3, slice(None, 10), (3, 3, 3, 1)), (10, 3, slice(0, None), (3, 3, 3, 1)), + (2, 10, slice(0, 3), (2,)), + (4, 10, slice(7, 10), (3, 1)), ], ) def test_build_grid_chunks(size, chunk_size, region, expected_chunks): @@ -26,16 +28,16 @@ def test_build_grid_chunks(size, chunk_size, region, expected_chunks): @pytest.mark.parametrize( - "nd_var_chunks, nd_backend_chunks, expected_chunks", + "nd_v_chunks, nd_backend_chunks, expected_chunks", [ (((2, 2, 2, 2),), ((3, 3, 2),), ((3, 3, 2),)), # ND cases (((2, 4), (2, 3)), ((2, 2, 2), (3, 2)), ((2, 4), (3, 2))), ], ) -def test_align_nd_chunks(nd_var_chunks, nd_backend_chunks, expected_chunks): +def test_align_nd_chunks(nd_v_chunks, nd_backend_chunks, expected_chunks): aligned_nd_chunks = align_nd_chunks( - nd_var_chunks=nd_var_chunks, + nd_v_chunks=nd_v_chunks, nd_backend_chunks=nd_backend_chunks, ) assert aligned_nd_chunks == expected_chunks @@ -43,7 +45,7 @@ def test_align_nd_chunks(nd_var_chunks, nd_backend_chunks, expected_chunks): @requires_dask @pytest.mark.parametrize( - "enc_chunks, region, nd_var_chunks, expected_chunks", + "enc_chunks, region, nd_v_chunks, expected_chunks", [ ( (3,), @@ -93,7 +95,7 @@ def test_align_nd_chunks(nd_var_chunks, nd_backend_chunks, expected_chunks): ), ], ) -def test_grid_rechunk(enc_chunks, region, nd_var_chunks, expected_chunks): +def test_grid_rechunk(enc_chunks, region, nd_v_chunks, expected_chunks): dims = [f"dim_{i}" for i in range(len(region))] coords = { dim: list(range(r.start, r.stop)) for dim, r in zip(dims, region, strict=False) @@ -104,7 +106,7 @@ def test_grid_rechunk(enc_chunks, region, nd_var_chunks, expected_chunks): dims=dims, coords=coords, ) - arr = arr.chunk(dict(zip(dims, nd_var_chunks, strict=False))) + arr = arr.chunk(dict(zip(dims, nd_v_chunks, strict=False))) result = grid_rechunk( arr.variable,