Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Appending with to_zarr raises ValueError if append_dim length of existing data is not an integer multiple of chunk size #9767

Closed
5 tasks done
RKuttruff opened this issue Nov 11, 2024 · 15 comments
Labels
plan to close May be closeable, needs more eyeballs topic-zarr Related to zarr storage library

Comments

@RKuttruff
Copy link
Contributor

What happened?

I have code that produces zarr data as output with configurable chunking. Recent builds have been raising unexpected ValueErrors about misaligned chunks, despite a) the chunk shaping being the same for both the new and existing data and b) calling chunk() and ensuring encoding['chunks'] is unset on append as suggested in the error message.

The error:

ValueError: Specified zarr chunks encoding['chunks']=(14, 500, 500) for variable named 'foo' would overlap multiple dask chunks ((14, 14), (180,), (360,)) on the region (slice(29, None, None), slice(None, None, None), slice(None, None, None)). Writing this array in parallel with dask could lead to corrupted data. Consider either rechunking using `chunk()`, deleting or modifying `encoding['chunks']`, or specify `safe_chunks=False`.

In the provided MCVE, this can be observed as provided. If the value(s) of DAYS_PER_APPEND or the first value of the CHUNKING tuple are edited to be integer multiples of each other the error is not raised. If you further edit to add an offset to the create() call for the first dataset such that it will not be an integer multiple of the chunk shape (ie, create(DAYS_PER_APPEND + 1, start_dt, LATITUDE_RES) with CHUNKING = (14, 50, 50)) the error will appear again, but NOT if this is done for the second dataset, leading me to conclude that the error is raised on the existing dataset being out of alignment with the chunk shape.

What did you expect to happen?

I expect appending with to_zarr to complete without error regardless of the length of the append dimension in the existing data, provided the chunking of both are the same.

Minimal Complete Verifiable Example

from datetime import datetime, timezone, timedelta

import numpy as np
import xarray as xr
import zarr


LATITUDE_RES = 180
DAYS_PER_APPEND = 31
CHUNKING = (14, 50, 50)


def create(count: int, start_date: datetime, lat_res: int):
    times = []

    for _ in range(count):
        times.append(start_date.timestamp())
        start_date += timedelta(days=1)

    times = np.array(times)
    lats = np.linspace(-90, 90, lat_res)
    lons = np.linspace(-180, 180, lat_res * 2)

    coords = {
        'longitude': ('longitude', lons),
        'latitude': ('latitude', lats),
        'time': ('time', times)
    }

    ds = xr.Dataset(
        data_vars={
            'foo': (('time', 'latitude', 'longitude'), np.random.random((count, lat_res, lat_res * 2))),
            'bar': (('time', 'latitude', 'longitude'), np.random.random((count, lat_res, lat_res * 2))),
            'baz': (('time', 'latitude', 'longitude'), np.random.random((count, lat_res, lat_res * 2))),
        },
        coords=coords,
    )

    return ds, start_date


start_dt = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0, tzinfo=timezone.utc)

first_ds, next_dt = create(DAYS_PER_APPEND, start_dt, LATITUDE_RES)
print('Created first dataset')
print(first_ds)
print()

for var in first_ds.data_vars:
    first_ds[var] = first_ds[var].chunk(CHUNKING)

encoding = {vname: {'compressor': zarr.Blosc(cname='blosclz', clevel=9), 'chunks': CHUNKING} for vname in first_ds.data_vars}

print('Prepared first dataset')
print(first_ds)
print(f'Encodings: {encoding}')
print('Data variable attributes:')
for var in first_ds.data_vars:
    print(f'\t - {var}: {first_ds[var].attrs}')

first_ds.to_zarr(
    '/tmp/test.zarr',
    mode=None,
    append_dim=None,
    encoding=encoding,
    write_empty_chunks=False,
    consolidated=True
)

second_ds, _ = create(DAYS_PER_APPEND, next_dt, LATITUDE_RES)
print('Created second dataset')
print(second_ds)
print()

for var in second_ds.data_vars:
    second_ds[var] = second_ds[var].chunk(CHUNKING)

encoding = None

print('Prepared second dataset')
print(second_ds)
print(f'Encodings: {encoding}')
print('Data variable attributes:')
for var in second_ds.data_vars:
    print(f'\t - {var}: {second_ds[var].attrs}')

second_ds.to_zarr(
    '/tmp/test.zarr',
    mode=None,
    append_dim='time',
    encoding=encoding,
    write_empty_chunks=False,
    consolidated=True
)

MVCE confirmation

  • Minimal example — the example is as focused as reasonably possible to demonstrate the underlying issue in xarray.
  • Complete example — the example is self-contained, including all data and the text of any traceback.
  • Verifiable example — the example copy & pastes into an IPython prompt or Binder notebook, returning the result.
  • New issue — a search of GitHub Issues suggests this is not a duplicate.
  • Recent environment — the issue occurs with the latest version of xarray and its dependencies.

Relevant log output

(base) root@926b103bd78a:/# python mcve.py
Created first dataset
<xarray.Dataset> Size: 48MB
Dimensions:    (time: 31, latitude: 180, longitude: 360)
Coordinates:
  * longitude  (longitude) float64 3kB -180.0 -179.0 -178.0 ... 179.0 180.0
  * latitude   (latitude) float64 1kB -90.0 -88.99 -87.99 ... 87.99 88.99 90.0
  * time       (time) float64 248B 1.731e+09 1.731e+09 ... 1.734e+09 1.734e+09
Data variables:
    foo        (time, latitude, longitude) float64 16MB 0.06853 ... 0.1764
    bar        (time, latitude, longitude) float64 16MB 0.7759 ... 0.08998
    baz        (time, latitude, longitude) float64 16MB 0.7744 0.4205 ... 0.5165

//mcve.py:53: DeprecationWarning: Supplying chunks as dimension-order tuples is deprecated. It will raise an error in the future. Instead use a dict with dimension names as keys.
  first_ds[var] = first_ds[var].chunk(CHUNKING)
//mcve.py:53: DeprecationWarning: Supplying chunks as dimension-order tuples is deprecated. It will raise an error in the future. Instead use a dict with dimension names as keys.
  first_ds[var] = first_ds[var].chunk(CHUNKING)
Prepared first dataset
<xarray.Dataset> Size: 48MB
Dimensions:    (time: 31, latitude: 180, longitude: 360)
Coordinates:
  * longitude  (longitude) float64 3kB -180.0 -179.0 -178.0 ... 179.0 180.0
  * latitude   (latitude) float64 1kB -90.0 -88.99 -87.99 ... 87.99 88.99 90.0
  * time       (time) float64 248B 1.731e+09 1.731e+09 ... 1.734e+09 1.734e+09
Data variables:
    foo        (time, latitude, longitude) float64 16MB dask.array<chunksize=(14, 50, 50), meta=np.ndarray>
    bar        (time, latitude, longitude) float64 16MB dask.array<chunksize=(14, 50, 50), meta=np.ndarray>
    baz        (time, latitude, longitude) float64 16MB dask.array<chunksize=(14, 50, 50), meta=np.ndarray>
Encodings: {'foo': {'compressor': Blosc(cname='blosclz', clevel=9, shuffle=SHUFFLE, blocksize=0), 'chunks': (14, 50, 50)}, 'bar': {'compressor': Blosc(cname='blosclz', clevel=9, shuffle=SHUFFLE, blocksize=0), 'chunks': (14, 50, 50)}, 'baz': {'compressor': Blosc(cname='blosclz', clevel=9, shuffle=SHUFFLE, blocksize=0), 'chunks': (14, 50, 50)}}
Data variable attributes:
	 - foo: {}
	 - bar: {}
	 - baz: {}
Created second dataset
<xarray.Dataset> Size: 48MB
Dimensions:    (time: 31, latitude: 180, longitude: 360)
Coordinates:
  * longitude  (longitude) float64 3kB -180.0 -179.0 -178.0 ... 179.0 180.0
  * latitude   (latitude) float64 1kB -90.0 -88.99 -87.99 ... 87.99 88.99 90.0
  * time       (time) float64 248B 1.734e+09 1.734e+09 ... 1.736e+09 1.737e+09
Data variables:
    foo        (time, latitude, longitude) float64 16MB 0.3227 0.4895 ... 0.7738
    bar        (time, latitude, longitude) float64 16MB 0.7567 0.2322 ... 0.9079
    baz        (time, latitude, longitude) float64 16MB 0.4169 0.9223 ... 0.3972

//mcve.py:79: DeprecationWarning: Supplying chunks as dimension-order tuples is deprecated. It will raise an error in the future. Instead use a dict with dimension names as keys.
  second_ds[var] = second_ds[var].chunk(CHUNKING)
Prepared second dataset
<xarray.Dataset> Size: 48MB
Dimensions:    (time: 31, latitude: 180, longitude: 360)
Coordinates:
  * longitude  (longitude) float64 3kB -180.0 -179.0 -178.0 ... 179.0 180.0
  * latitude   (latitude) float64 1kB -90.0 -88.99 -87.99 ... 87.99 88.99 90.0
  * time       (time) float64 248B 1.734e+09 1.734e+09 ... 1.736e+09 1.737e+09
Data variables:
    foo        (time, latitude, longitude) float64 16MB dask.array<chunksize=(14, 50, 50), meta=np.ndarray>
    bar        (time, latitude, longitude) float64 16MB dask.array<chunksize=(14, 50, 50), meta=np.ndarray>
    baz        (time, latitude, longitude) float64 16MB dask.array<chunksize=(14, 50, 50), meta=np.ndarray>
Encodings: None
Data variable attributes:
	 - foo: {}
	 - bar: {}
	 - baz: {}
Traceback (most recent call last):
  File "//mcve.py", line 90, in <module>
    second_ds.to_zarr(
  File "/opt/conda/lib/python3.11/site-packages/xarray/core/dataset.py", line 2595, in to_zarr
    return to_zarr(  # type: ignore[call-overload,misc]
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/xarray/backends/api.py", line 2239, in to_zarr
    dump_to_store(dataset, zstore, writer, encoding=encoding)
  File "/opt/conda/lib/python3.11/site-packages/xarray/backends/api.py", line 1919, in dump_to_store
    store.store(variables, attrs, check_encoding, writer, unlimited_dims=unlimited_dims)
  File "/opt/conda/lib/python3.11/site-packages/xarray/backends/zarr.py", line 900, in store
    self.set_variables(
  File "/opt/conda/lib/python3.11/site-packages/xarray/backends/zarr.py", line 1024, in set_variables
    encoding = extract_zarr_variable_encoding(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/xarray/backends/zarr.py", line 412, in extract_zarr_variable_encoding
    chunks = _determine_zarr_chunks(
             ^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/xarray/backends/zarr.py", line 288, in _determine_zarr_chunks
    raise ValueError(base_error)
ValueError: Specified zarr chunks encoding['chunks']=(14, 50, 50) for variable named 'bar' would overlap multiple dask chunks ((14, 14, 3), (50, 50, 50, 30), (50, 50, 50, 50, 50, 50, 50, 10)) on the region (slice(31, None, None), slice(None, None, None), slice(None, None, None)). Writing this array in parallel with dask could lead to corrupted data. Consider either rechunking using `chunk()`, deleting or modifying `encoding['chunks']`, or specify `safe_chunks=False`.
(base) root@926b103bd78a:/#

Anything else we need to know?

No response

Environment

/opt/conda/lib/python3.11/site-packages/_distutils_hack/init.py:33: UserWarning: Setuptools is replacing distutils.
warnings.warn("Setuptools is replacing distutils.")

INSTALLED VERSIONS

commit: None
python: 3.11.10 | packaged by conda-forge | (main, Oct 16 2024, 01:27:36) [GCC 13.3.0]
python-bits: 64
OS: Linux
OS-release: 6.10.11-linuxkit
machine: x86_64
processor:
byteorder: little
LC_ALL: C.UTF-8
LANG: C.UTF-8
LOCALE: ('en_US', 'UTF-8')
libhdf5: 1.14.3
libnetcdf: 4.9.2

xarray: 2024.10.0
pandas: 2.2.3
numpy: 1.26.4
scipy: 1.11.4
netCDF4: 1.7.1
pydap: None
h5netcdf: 1.4.0
h5py: 3.12.1
zarr: 2.18.3
cftime: 1.6.4
nc_time_axis: None
iris: None
bottleneck: 1.4.2
dask: 2024.10.0
distributed: 2024.10.0
matplotlib: None
cartopy: None
seaborn: None
numbagg: None
fsspec: 2022.5.0
cupy: None
pint: None
sparse: None
flox: None
numpy_groupies: None
setuptools: 68.0.0
pip: 23.3
conda: 23.11.0
pytest: None
mypy: None
IPython: None
sphinx: None

@RKuttruff RKuttruff added bug needs triage Issue that has not been reviewed by xarray team member labels Nov 11, 2024
RKuttruff added a commit to EarthDigitalTwin/OCO3-data-transformer that referenced this issue Nov 11, 2024
@dcherian
Copy link
Contributor

If you're absolutely confident it's safe you can set safe_chunks=False to skip this check as suggested in the error.

We know that the code raises false positives and would welcome changes that make it better. I believe your offset comment was noted in the PR too but it was a strict improvement over the status quo, and so was merged.

@RKuttruff
Copy link
Contributor Author

@dcherian

I believe your offset comment was noted in the PR too but it was a strict improvement over the status quo, and so was merged.

Which PR was this?

@dcherian
Copy link
Contributor

git blame points to #9527 which was reverted and then added back in #9559

@josephnowak
Copy link
Contributor

josephnowak commented Nov 12, 2024

Hi, I think in this case is not a false positive.

You can visualize the regions of the chunks of your dataset on the time dimension as follows:

chunk 1: from 1 to 14
chunk 2: from 15 to 28
chunk 3: from 29 to 42
chunk 4: from 43 to 56
chunk 5: from 57 to None

Your first_ds would be on the region that goes from position 1 to 31 which means that it is going to write on the chunk 1, 2 and 3 (only three elements would be on the chunk 3) and when you try to append your second_ds whose first chunk is of size 14, Xarray will append the data starting on the chunk 3 (with size 3), and if you write 14 contiguous elements you are going to end up writing on the region that goes from position 32 to 46, which would correspond to chunk 3 and 4, and as you have more chunks on that array you will be writing two chunks at the same time on the chunk 4, and this can corrupt your data.

My recommendation is to modify the chunks of your second_ds to correspond to the size of your first_ds, you can achieve this with the following code (this can generate more tasks than desired so probably you could consider creating your datasets in a different way):

import xarray as xr

second_ds = xr.concat([first_ds, second_ds], dim="time").chunk({"time": 14}).sel(
      time=slice(second_ds.coords["time"][0], None)
)

@dcherian dcherian added topic-zarr Related to zarr storage library and removed needs triage Issue that has not been reviewed by xarray team member labels Nov 12, 2024
@RKuttruff
Copy link
Contributor Author

Thanks @josephnowak, that workaround seems to have helped and the task increase isn't too bad.

Would there be any way to sequence or mutex the chunk writes to avoid this issue?

@josephnowak
Copy link
Contributor

It's nice to hear that it was useful.

There is a parameter called synchronizer on the to_zarr method, it should help you, but I think it can not be used with a Distributed environment (someone correct me if I'm wrong), for that case I think that you can create a class that implement the Zarr interface for synchronization using Dask locks instead of thread or process locks.

@max-sixty max-sixty added plan to close May be closeable, needs more eyeballs and removed bug labels Nov 12, 2024
@max-sixty
Copy link
Collaborator

Great analysis @josephnowak !

@RKuttruff
Copy link
Contributor Author

Thanks for all the help.

Do you think it may be worthwhile to document that method of aligning the new data's chunks somewhere?

@max-sixty
Copy link
Collaborator

What would we add to the documentation? (genuine question, very likely there are things to add!)

In the meantime will close as I don't think there's a bug here, even though the current behavior is not complete

@RKuttruff
Copy link
Contributor Author

@max-sixty Sorry I forgot to reply

I think it would be worthwhile to add Joseph's code snippet somewhere around the documentation to to_zarr as I'm certain I won't be the only one who wishes to repeatedly append new data that may or may not align with the existing Zarr store's chunking in the append dimension.

@max-sixty
Copy link
Collaborator

max-sixty commented Nov 26, 2024

Yes, very open to adding something to the docs. Likely that code snippet needs some generalization before we paste it in...

(Again, the current state is not great; I'm not dismissing this as "everything is perfect", but the binding constraint is making easy-to-understand interfaces & docs...)

@josephnowak
Copy link
Contributor

@max-sixty, do you think it would be useful to add a parameter to the to_zarr method that allows the automatic chunk alignment between Dask and Zarr? It looks like this is a very common problem.

As an additional idea, I think that we could go beyond the chunk alignment parameter and add a coordinate alignment. I think that would make the to_zarr method more user-friendly in some scenarios.

@max-sixty
Copy link
Collaborator

Yes, I definitely think we could have something to align the chunks. That could be a param in to_zarr, or could be a different function.

To @RKuttruff 's point — to the extent we want to make incremental progress, adding the code of a function in the zarr docs page would be valuable, I think...

@lbesnard
Copy link

lbesnard commented Dec 11, 2024

Hi, I was about to lodge an issue on what I believe is a similar bug I've encounter since xarray 2024.10. I've had to revert to 2024.9 in my use case. I have a unittest that I'm happy to share which demonstrate this works with 2024.9 and fails with 2024.10 and above on a LocalCluster https://github.com/aodn/aodn_cloud_optimised/blob/v0.1.32/test_aodn_cloud_optimised/test_generic_zarr_handler.py#L141)

The issue happens after appending or overwriting to an existing zarr dataset, and would completely corrupt it.
I have to set the safe_chunks to True as I'm doing the computation in parallel on a remote coiled cluster (I've tried with False, and the outcome is random, but mostly, the dataset will be corrupted, not even an option). Using a synchronizer could be a solution, however I haven't found the proper doc to use this on a cluster. It's not clear to me if the synchronizer would work per worker, or if it is handled by the scheduler.

Finally, I thought i could get rid of the problem by calling compute() before to_zarr(). However that lead, in my case to another bug, from s3fs, serializing data back to my local machine launching the cluster and computation (linked to fsspec/filesystem_spec#1747) , and completely exploding the memory of the scheduler or my local machine... basically a cascade of bugs which is quite hard to provide a reproducible example easily.

ValueError: Specified zarr chunks encoding['chunks']=(4, 60, 59) for variable named 'UCUR_quality_control' would overlap multiple dask chunks ((1,), (60,), (59,)) on the region (slice(0, 1, None), slice(None, None, None), slice(None, None, None)). Writing this array in parallel with dask could lead to corrupted data. Consider either rechunking using `chunk()`, deleting or modifying `encoding['chunks']`, or specify `safe_chunks=False`.

Note that none of the variables had an encoding dictionary. So the ValueError message is basically useless and leads to more confusion.

Happy to lodge another issue, but I feel like this one should be re open

@max-sixty
Copy link
Collaborator

Can we get an MCVE? Feel free to open a new issue.

Please keep in mind @josephnowak 's analysis of this MCVE; to avoid making an example that has the same issue...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
plan to close May be closeable, needs more eyeballs topic-zarr Related to zarr storage library
Projects
None yet
Development

No branches or pull requests

5 participants