Skip to content

Commit

Permalink
Manually specify chunks in open_zarr (#2530)
Browse files Browse the repository at this point in the history
* added manual chunks for open_zarr

* updated whats-new

* fixed pep8 issues

* removed whitespace

* added deprecation warning

* fixed pep8 issues

* added warning for bad chunks

* fixed lingering rebase conflicts

* fixed pep8 issues

* added stacklevel

* fixed pep8 issues

* Various fixes for explicit Dataset.indexes (#2858)

* Various fixes for explicit Dataset.indexes

Fixes GH2856

I've added internal consistency checks to the uses of ``assert_equal`` in our
test suite, so this shouldn't happen again.

* Fix indexes in Dataset.interp

* 0.12.1 release

* revert to 0.12.2 dev

* update links to https (#2872)

* Fix mypy typing error in cftime_offsets.py (#2878)

* decreased pytest verbosity (#2881)

* added manual chunks for open_zarr

* updated whats-new

* fixed pep8 issues

* removed whitespace

* added deprecation warning

* fixed pep8 issues

* added warning for bad chunks

* fixed lingering rebase conflicts

* fixed pep8 issues

* added stacklevel

* fixed pep8 issues

* disallow unicode again

* disallow unicode again
  • Loading branch information
lilyminium authored and rabernat committed Apr 18, 2019
1 parent aebe60c commit baf81b4
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 27 deletions.
5 changes: 5 additions & 0 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@ Other enhancements
By `Keisuke Fujii <https://github.com/fujiisoup>`_.
- Added :py:meth:`~xarray.Dataset.drop_dims` (:issue:`1949`).
By `Kevin Squire <https://github.com/kmsquire>`_.
- ``xr.open_zarr`` now accepts manually specified chunks with the ``chunks=``
parameter. ``auto_chunk=True`` is equivalent to ``chunks='auto'`` for
backwards compatibility. The ``overwrite_encoded_chunks`` parameter is
added to remove the original zarr chunk encoding.
By `Lily Wang <https://github.com/lilyminium>`_.

Bug fixes
~~~~~~~~~
Expand Down
110 changes: 87 additions & 23 deletions xarray/backends/zarr.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import warnings
from collections import OrderedDict
from distutils.version import LooseVersion

Expand Down Expand Up @@ -352,10 +353,11 @@ def close(self):
zarr.consolidate_metadata(self.ds.store)


def open_zarr(store, group=None, synchronizer=None, auto_chunk=True,
def open_zarr(store, group=None, synchronizer=None, chunks='auto',
decode_cf=True, mask_and_scale=True, decode_times=True,
concat_characters=True, decode_coords=True,
drop_variables=None, consolidated=False):
drop_variables=None, consolidated=False,
overwrite_encoded_chunks=False, **kwargs):
"""Load and decode a dataset from a Zarr store.
.. note:: Experimental
Expand All @@ -375,10 +377,15 @@ def open_zarr(store, group=None, synchronizer=None, auto_chunk=True,
Array synchronizer provided to zarr
group : str, obtional
Group path. (a.k.a. `path` in zarr terminology.)
auto_chunk : bool, optional
Whether to automatically create dask chunks corresponding to each
variable's zarr chunks. If False, zarr array data will lazily convert
to numpy arrays upon access.
chunks : int or dict or tuple or {None, 'auto'}, optional
Chunk sizes along each dimension, e.g., ``5`` or
``{'x': 5, 'y': 5}``. If `chunks='auto'`, dask chunks are created
based on the variable's zarr chunks. If `chunks=None`, zarr array
data will lazily convert to numpy arrays upon access. This accepts
all the chunk specifications as Dask does.
overwrite_encoded_chunks: bool, optional
Whether to drop the zarr chunks encoded for each variable when a
dataset is loaded with specified chunk sizes (default: False)
decode_cf : bool, optional
Whether to decode these variables, assuming they were saved according
to CF conventions.
Expand Down Expand Up @@ -422,6 +429,24 @@ def open_zarr(store, group=None, synchronizer=None, auto_chunk=True,
----------
http://zarr.readthedocs.io/
"""
if 'auto_chunk' in kwargs:
auto_chunk = kwargs.pop('auto_chunk')
if auto_chunk:
chunks = 'auto' # maintain backwards compatibility
else:
chunks = None

warnings.warn("auto_chunk is deprecated. Use chunks='auto' instead.",
FutureWarning, stacklevel=2)

if kwargs:
raise TypeError("open_zarr() got unexpected keyword arguments " +
",".join(kwargs.keys()))

if not isinstance(chunks, (int, dict)):
if chunks != 'auto' and chunks is not None:
raise ValueError("chunks must be an int, dict, 'auto', or None. "
"Instead found %s. " % chunks)

if not decode_cf:
mask_and_scale = False
Expand Down Expand Up @@ -449,21 +474,60 @@ def maybe_decode_store(store, lock=False):

# auto chunking needs to be here and not in ZarrStore because variable
# chunks do not survive decode_cf
if auto_chunk:
# adapted from Dataset.Chunk()
def maybe_chunk(name, var):
from dask.base import tokenize
chunks = var.encoding.get('chunks')
if (var.ndim > 0) and (chunks is not None):
# does this cause any data to be read?
token2 = tokenize(name, var._data)
name2 = 'zarr-%s' % token2
return var.chunk(chunks, name=name2, lock=None)
else:
return var

variables = OrderedDict([(k, maybe_chunk(k, v))
for k, v in ds.variables.items()])
return ds._replace_vars_and_dims(variables)
else:
# return trivial case
if not chunks:
return ds

# adapted from Dataset.Chunk()
if isinstance(chunks, int):
chunks = dict.fromkeys(ds.dims, chunks)

if isinstance(chunks, tuple) and len(chunks) == len(ds.dims):
chunks = dict(zip(ds.dims, chunks))

def get_chunk(name, var, chunks):
chunk_spec = dict(zip(var.dims, var.encoding.get('chunks')))

# Coordinate labels aren't chunked
if var.ndim == 1 and var.dims[0] == name:
return chunk_spec

if chunks == 'auto':
return chunk_spec

for dim in var.dims:
if dim in chunks:
spec = chunks[dim]
if isinstance(spec, int):
spec = (spec,)
if isinstance(spec, (tuple, list)) and chunk_spec[dim]:
if any(s % chunk_spec[dim] for s in spec):
warnings.warn("Specified Dask chunks %r would "
"separate Zarr chunk shape %r for "
"dimension %r. This significantly "
"degrades performance. Consider "
"rechunking after loading instead."
% (chunks[dim], chunk_spec[dim], dim),
stacklevel=2)
chunk_spec[dim] = chunks[dim]
return chunk_spec

def maybe_chunk(name, var, chunks):
from dask.base import tokenize

chunk_spec = get_chunk(name, var, chunks)

if (var.ndim > 0) and (chunk_spec is not None):
# does this cause any data to be read?
token2 = tokenize(name, var._data)
name2 = 'zarr-%s' % token2
var = var.chunk(chunk_spec, name=name2, lock=None)
if overwrite_encoded_chunks and var.chunks is not None:
var.encoding['chunks'] = tuple(x[0] for x in var.chunks)
return var
else:
return var

variables = OrderedDict([(k, maybe_chunk(k, v, chunks))
for k, v in ds.variables.items()])
return ds._replace_vars_and_dims(variables)
90 changes: 86 additions & 4 deletions xarray/tests/test_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -1391,27 +1391,109 @@ def test_auto_chunk(self):
original = create_test_data().chunk()

with self.roundtrip(
original, open_kwargs={'auto_chunk': False}) as actual:
original, open_kwargs={'chunks': None}) as actual:
for k, v in actual.variables.items():
# only index variables should be in memory
assert v._in_memory == (k in actual.dims)
# there should be no chunks
assert v.chunks is None

with self.roundtrip(
original, open_kwargs={'auto_chunk': True}) as actual:
original, open_kwargs={'chunks': 'auto'}) as actual:
for k, v in actual.variables.items():
# only index variables should be in memory
assert v._in_memory == (k in actual.dims)
# chunk size should be the same as original
assert v.chunks == original[k].chunks

def test_manual_chunk(self):
original = create_test_data().chunk({'dim1': 3, 'dim2': 4, 'dim3': 3})

# All of these should return non-chunked arrays
NO_CHUNKS = (None, 0, {})
for no_chunk in NO_CHUNKS:
open_kwargs = {'chunks': no_chunk}
with self.roundtrip(original, open_kwargs=open_kwargs) as actual:
for k, v in actual.variables.items():
# only index variables should be in memory
assert v._in_memory == (k in actual.dims)
# there should be no chunks
assert v.chunks is None

# uniform arrays
for i in range(2, 6):
rechunked = original.chunk(chunks=i)
open_kwargs = {'chunks': i}
with self.roundtrip(original, open_kwargs=open_kwargs) as actual:
for k, v in actual.variables.items():
# only index variables should be in memory
assert v._in_memory == (k in actual.dims)
# chunk size should be the same as rechunked
assert v.chunks == rechunked[k].chunks

chunks = {'dim1': 2, 'dim2': 3, 'dim3': 5}
rechunked = original.chunk(chunks=chunks)

open_kwargs = {'chunks': chunks, 'overwrite_encoded_chunks': True}
with self.roundtrip(original, open_kwargs=open_kwargs) as actual:
for k, v in actual.variables.items():
assert v.chunks == rechunked[k].chunks

with self.roundtrip(actual) as auto:
# encoding should have changed
for k, v in actual.variables.items():
assert v.chunks == rechunked[k].chunks

assert_identical(actual, auto)
assert_identical(actual.load(), auto.load())

def test_warning_on_bad_chunks(self):
original = create_test_data().chunk({'dim1': 4, 'dim2': 3, 'dim3': 5})

bad_chunks = (2, {'dim2': (3, 3, 2, 1)})
for chunks in bad_chunks:
kwargs = {'chunks': chunks}
with pytest.warns(UserWarning):
with self.roundtrip(original, open_kwargs=kwargs) as actual:
for k, v in actual.variables.items():
# only index variables should be in memory
assert v._in_memory == (k in actual.dims)

good_chunks = ({'dim2': 3}, {'dim3': 10})
for chunks in good_chunks:
kwargs = {'chunks': chunks}
with pytest.warns(None) as record:
with self.roundtrip(original, open_kwargs=kwargs) as actual:
for k, v in actual.variables.items():
# only index variables should be in memory
assert v._in_memory == (k in actual.dims)
assert len(record) == 0

def test_deprecate_auto_chunk(self):
original = create_test_data().chunk()
with pytest.warns(FutureWarning):
with self.roundtrip(
original, open_kwargs={'auto_chunk': True}) as actual:
for k, v in actual.variables.items():
# only index variables should be in memory
assert v._in_memory == (k in actual.dims)
# chunk size should be the same as original
assert v.chunks == original[k].chunks

with pytest.warns(FutureWarning):
with self.roundtrip(
original, open_kwargs={'auto_chunk': False}) as actual:
for k, v in actual.variables.items():
# only index variables should be in memory
assert v._in_memory == (k in actual.dims)
# there should be no chunks
assert v.chunks is None

def test_write_uneven_dask_chunks(self):
# regression for GH#2225
original = create_test_data().chunk({'dim1': 3, 'dim2': 4, 'dim3': 3})

with self.roundtrip(
original, open_kwargs={'auto_chunk': True}) as actual:
original, open_kwargs={'chunks': 'auto'}) as actual:
for k, v in actual.data_vars.items():
print(k)
assert v.chunks == actual[k].chunks
Expand Down

0 comments on commit baf81b4

Please sign in to comment.