Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

calling to_zarr inside map_blocks function results in missing values #8703

Closed
5 tasks done
martinspetlik opened this issue Feb 4, 2024 · 8 comments · May be fixed by #8746
Closed
5 tasks done

calling to_zarr inside map_blocks function results in missing values #8703

martinspetlik opened this issue Feb 4, 2024 · 8 comments · May be fixed by #8746
Labels
plan to close May be closeable, needs more eyeballs

Comments

@martinspetlik
Copy link

What happened?

I want to work with a huge dataset stored in hdf5 loaded in chunks. Each chunk contains part of my data that should be saved to a specific region of zarr files. I need to follow the original order of chunks.
I found it a convenient way to use a map_blocks function for this purpose. However, I have missing values in the final zarr file. Some chunks or parts of chunks are not stored.

I used a simplified scenario for code documenting this behavior. The initial zarr file of zeros is filled with ones. There are always some parts where there are still zeros.

What did you expect to happen?

No response

Minimal Complete Verifiable Example

import os
import shutil
import xarray as xr
import numpy as np
import dask.array as da

xr.show_versions()

zarr_file = "file.zarr"
if os.path.exists(zarr_file):
    shutil.rmtree(zarr_file)

chunk_size = 5
shape = (50, 32, 1000)
ones_dataset = xr.Dataset({"data": xr.ones_like(xr.DataArray(np.empty(shape)))})
ones_dataset = ones_dataset.chunk({'dim_0': chunk_size})
chunk_indices = np.arange(len(ones_dataset.chunks['dim_0']))
chunk_ids = np.repeat(np.arange(ones_dataset.sizes["dim_0"] // chunk_size), chunk_size)
chunk_ids_dask_array = da.from_array(chunk_ids, chunks=(chunk_size,))
# Append the chunk IDs Dask array as a new variable to the existing dataset
ones_dataset['chunk_id'] = (('dim_0',), chunk_ids_dask_array)


# Create a new dataset filled with zeros
zeros_dataset = xr.Dataset({"data": xr.zeros_like(xr.DataArray(np.empty(shape)))})
zeros_dataset.to_zarr(zarr_file, compute=False)


def process_chunk(chunk_dataset):
    chunk_id = int(chunk_dataset["chunk_id"][0])
    chunk_dataset_to_store = chunk_dataset.drop_vars("chunk_id")

    start_index = chunk_id * chunk_size
    end_index = chunk_id * chunk_size + chunk_size

    chunk_dataset_to_store.to_zarr(zarr_file, region={'dim_0': slice(start_index, end_index)})
    return chunk_dataset


ones_dataset.map_blocks(process_chunk, template=ones_dataset).compute()

# Load data stored in zarr
zarr_data = xr.open_zarr(zarr_file, chunks={'dim_0': chunk_size})

# Find differences
for var_name in zarr_data.variables:
    try:
        xr.testing.assert_equal(zarr_data[var_name], ones_dataset[var_name])
    except AssertionError:
        print(f"Differences in {var_name}:")
        print(zarr_data[var_name].values)
        print(ones_dataset[var_name].values)

MVCE confirmation

  • Minimal example — the example is as focused as reasonably possible to demonstrate the underlying issue in xarray.
  • Complete example — the example is self-contained, including all data and the text of any traceback.
  • Verifiable example — the example copy & pastes into an IPython prompt or Binder notebook, returning the result.
  • New issue — a search of GitHub Issues suggests this is not a duplicate.
  • Recent environment — the issue occurs with the latest version of xarray and its dependencies.

Relevant log output

No response

Anything else we need to know?

No response

Environment

INSTALLED VERSIONS ------------------ commit: None python: 3.10.12 (main, Nov 20 2023, 15:14:05) [GCC 11.4.0] python-bits: 64 OS: Linux OS-release: 6.5.0-15-generic machine: x86_64 processor: x86_64 byteorder: little LC_ALL: None LANG: en_US.UTF-8 LOCALE: ('en_US', 'UTF-8') libhdf5: 1.12.2 libnetcdf: 4.9.3-development

xarray: 2024.1.1
pandas: 2.1.4
numpy: 1.26.3
scipy: 1.11.4
netCDF4: 1.6.5
pydap: None
h5netcdf: 1.3.0
h5py: 3.10.0
Nio: None
zarr: 2.16.1
cftime: 1.6.3
nc_time_axis: 1.4.1
iris: None
bottleneck: 1.3.7
dask: 2024.1.1
distributed: 2024.1.0
matplotlib: 3.8.2
cartopy: 0.22.0
seaborn: 0.13.1
numbagg: 0.6.8
fsspec: 2023.12.2
cupy: None
pint: None
sparse: None
flox: 0.8.9
numpy_groupies: 0.10.2
setuptools: 69.0.2
pip: 23.3.1
conda: None
pytest: 7.4.4
mypy: None
IPython: None
sphinx: None

@martinspetlik martinspetlik added bug needs triage Issue that has not been reviewed by xarray team member labels Feb 4, 2024
Copy link

welcome bot commented Feb 4, 2024

Thanks for opening your first issue here at xarray! Be sure to follow the issue template!
If you have an idea for a solution, we would really welcome a Pull Request with proposed changes.
See the Contributing Guide for more.
It may take us a while to respond here, but we really value your contribution. Contributors like you help make xarray better.
Thank you!

@etienneschalk
Copy link
Contributor

Hello,

I have the "gut feeling" that calling a method related to persisting data inside of the function passed to map_blocks might lead to unexpected results.

Is there a way to try something else, eg removing

- chunk_dataset_to_store.to_zarr(zarr_file, region={'dim_0': slice(start_index, end_index)})

and instead calling to_zarr on the whole dataset once

- ones_dataset.map_blocks(process_chunk, template=ones_dataset)
+ ones_dataset.map_blocks(process_chunk, template=ones_dataset).to_zarr(zarr_file)

I understand the issue regarding the preservation of chunks IDs. I would assume that the original order of chunks would be preserved, if the Dask chunks are identical to the Zarr chunks?

Also, I tested repeated runs, and ultimately all ones are written. It really seems that some are skipped randomly with the current code.

@max-sixty
Copy link
Collaborator

Could this be #8371 ?

@martinspetlik
Copy link
Author

and instead calling to_zarr on the whole dataset once

Unfortunately, I need to save slices from chunks into different zarr files for my real data.

Also, I tested repeated runs, and ultimately all ones are written.

Yes, it works. But it might be time-consuming. I observed that for 10 chunks I need from 3 to 14 map_blocks runs. Considering shape = (500, 32, 1000), I have 100 chunks and I need from 17 to 33 map_blocks runs to have complete data, based on 10 observations.

@martinspetlik
Copy link
Author

Could this be #8371 ?

If I understand correctly, issue #8371 is solved in PR #8459, right?

I tried using the xarray version from PR #8459, but it did not help.

@dcherian
Copy link
Contributor

This is #8371, you need to specify the correcty chunksizes when creating or writing zeros_dataset. Otherwise you end up with partially updated chunks.

Using

zeros_dataset = xr.Dataset({"data": xr.zeros_like(xr.DataArray(np.empty(shape))).chunk(ones_dataset.chunksizes)})

makes this succeed for me.

@dcherian dcherian added plan to close May be closeable, needs more eyeballs and removed bug needs triage Issue that has not been reviewed by xarray team member labels Feb 13, 2024
@martinspetlik
Copy link
Author

Thank you. It works.

I may have missed it somewhere, but it would be great if it was mentioned, for example, in https://docs.xarray.dev/en/stable/user-guide/io.html?appending-to-existing-zarr-stores=#appending-to-existing-zarr-stores

@dcherian
Copy link
Contributor

We'd happily take a PR for that

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
plan to close May be closeable, needs more eyeballs
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants