Skip to content

Commit 6cd6122

Browse files
TomNicholaspre-commit-ci[bot]dcherianIllviljankeewis
authored
Generalize handling of chunked array types (#7019)
* generalise chunk methods to allow cubed * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fic typing typo * fixed circular import * fix some mypy errors * added cubed to mypy ignore list * simplify __array_ufunc__ check * Revert "simplify __array_ufunc__ check" as I pushed to wrong branch This reverts commit cdcb3fb. * update cubed array type * fix missed conflict * sketch for ChunkManager adapter class * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Remove erroneous docstring about usage of map_blocks Co-authored-by: Deepak Cherian <dcherian@users.noreply.github.com> * apply_ufunc -> apply_gufunc Co-authored-by: Deepak Cherian <dcherian@users.noreply.github.com> * chunk -> from_array Co-authored-by: Deepak Cherian <dcherian@users.noreply.github.com> * remove staticmethods * attempt to type methods of ABC * from_array * attempt to specify types * method for checking array type * Update pyproject.toml * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fixed import errors * generalize .chunk method kwargs * used dask functions in dask chunkmanager * define signatures for apply_gufunc, blockwise, map_blocks * prototype function to detect which parallel backend to use * add cubed.apply_gufunc * ruffify * add rechunk and compute methods for cubed * xr.apply_ufunc now dispatches to chunkmanager.apply_gufunc * CubedManager.chunks * attempt to keep dask and cubed imports lazy * generalize idxmax * move unify_chunks import to ChunkManager * generalize Dataset.load() * check explicitly for chunks attribute instead of hard-coding cubed * better function names * add cubed version of unify_chunks * recognize wrapped duck dask arrays (e.g. pint wrapping dask) * add some tests for fetching ChunkManagers * add from_array_kwargs to open_dataset * add from_array_kwargs to open_zarr * pipe constructors through chunkmanager * generalize map_blocks inside coding * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fixed full_like * add from_array_kwargs to open_zarr * don't import dask.tokenize * fix bugs with passing from_array_kwargs down * generalise reductions by adding to chunkmanager * moved nanfirst/nanlast to duck_array_ops from dask_array_ops * generalize interp * generalized chunk_hint function inside indexing * DaskIndexingAdapter->ChunkedIndexingAdapter * Revert "DaskIndexingAdapter->ChunkedIndexingAdapter" This reverts commit 4ca044b. * pass cubed-related kwargs down through to_zarr by adding .store to ChunkManager * fix typing_extensions on py3.9 * fix ImportError with cubed array type * give up trying to import TypeAlias in CI * fix import of T_Chunks * fix no_implicit_optional warnings * don't define CubedManager if cubed can't be imported * fix local mypy errors * don't explicitly pass enforce_ndim into dask.array.map_blocks * fix drop_axis default * use indexing adapter on cubed arrays too * use array API-compatible version of astype function * whatsnew * document new kwargs * add chunkmanager entrypoint * move CubedManager to a separate package * guess chunkmanager based on whats available * fix bug with tokenizing * adapt tests to emulate existence of entrypoint * use fixture to setup/teardown dummy entrypoint * refactor to make DaskManager unavailable if dask not installed * typing * move whatsnew to latest xarray version * remove superfluous lines from whatsnew * fix bug where zarr backend attempted to use dask when not installed * Remove rogue print statement Co-authored-by: Deepak Cherian <dcherian@users.noreply.github.com> * Clarify what's new Co-authored-by: Deepak Cherian <dcherian@users.noreply.github.com> * use monkeypatch to mock registering of dummy chunkmanager * more tests for guessing chunkmanager correctly * raise TypeError if no chunkmanager found for array types * Correct is_chunked_array check Co-authored-by: Deepak Cherian <dcherian@users.noreply.github.com> * vendor dask.array.core.normalize_chunks * add default implementation of rechunk in ABC * remove cubed-specific type check in daskmanager * nanfirst->chunked_nanfirst * revert adding cubed to NON_NUMPY_SUPPORTED_ARRAY_TYPES * licensing to vendor functions from dask * fix bug * ignore mypy error * separate chunk_manager kwarg from from_array_kwargs dict * rename kwarg to chunked_array_type * refactor from_array_kwargs in .chunk ready for deprecation * print statements in test so I can comment on them * remove print statements now I've commented on them in PR * should fix dask naming tests * make dask-specific kwargs explicit in from_array * debugging print statements * Revert "debugging print statements" This reverts commit 7dc6581. * fix gnarly bug with auto-determining chunksizes caused by not referring to dask.config * hopefully fix broken docstring * Revert "make dask-specific kwargs explicit in from_array" This reverts commit 53d6094. * show chunksize limit used in failing tests * move lazy indexing adapter up out of chunkmanager code * try upgrading minimum version of dask * Revert "try upgrading minimum version of dask" This reverts commit 796a577. * un-vendor dask.array.core.normalize_chunks * refactored to all passing ChunkManagerEntrypoint objects directly * Remove redundant Nones from types Co-authored-by: Illviljan <14371165+Illviljan@users.noreply.github.com> * From future import annotations Co-authored-by: Illviljan <14371165+Illviljan@users.noreply.github.com> * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * From functools import annotations Co-authored-by: Illviljan <14371165+Illviljan@users.noreply.github.com> * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * From future import annotations Co-authored-by: Illviljan <14371165+Illviljan@users.noreply.github.com> * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * defined type for NormalizedChunks * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * standardized capitalization of ChunkManagerEntrypoint * ensure ruff doesn't remove import * ignore remaining typing errors stemming from unclear dask typing for chunks arguments * rename store_kwargs->chunkmanager_store_kwargs * missed return value * array API fixes for astype * Revert "array API fixes for astype" This reverts commit 9cd9078. * Apply suggestions from code review * Update xarray/tests/test_parallelcompat.py * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * overridden -> subclassed Co-authored-by: Justus Magin <keewis@users.noreply.github.com> * from_array_kwargs is optional Co-authored-by: Justus Magin <keewis@users.noreply.github.com> * ensured all compute calls go through chunkmanager * Raise if multiple chunkmanagers recognize array type Co-authored-by: Justus Magin <keewis@users.noreply.github.com> * from_array_kwargs is optional Co-authored-by: Justus Magin <keewis@users.noreply.github.com> * from_array_kwargs is optional Co-authored-by: Justus Magin <keewis@users.noreply.github.com> * from_array_kwargs is optional Co-authored-by: Justus Magin <keewis@users.noreply.github.com> * from_array_kwargs is optional Co-authored-by: Justus Magin <keewis@users.noreply.github.com> * from_array_kwargs is optional Co-authored-by: Justus Magin <keewis@users.noreply.github.com> * fixes for chunk methods * correct readme to reflect fact we aren't vendoring dask in this PR any more * update whatsnew * more docstring corrections * remove comment * Raise NotImplementedErrors in all abstract methods Co-authored-by: Illviljan <14371165+Illviljan@users.noreply.github.com> * type hints for every arg in ChunkManagerEntryPOint methods * more explicit typing + fixes for mypy errors revealed * Keyword-only arguments in full_like etc. Co-authored-by: Illviljan <14371165+Illviljan@users.noreply.github.com> * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * None as default instead of {} * fix bug apparently introduced by changing default type of drop_axis kwarg to map_blocks * Removed hopefully-unnecessary mypy ignore Co-authored-by: Illviljan <14371165+Illviljan@users.noreply.github.com> * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * removed unnecessary mypy ignores * change default value of drop_axis kwarg in map_blocks and catch when dask version < 2022.9.1 * fix checking of dask version in map_blocks --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Deepak Cherian <dcherian@users.noreply.github.com> Co-authored-by: Illviljan <14371165+Illviljan@users.noreply.github.com> Co-authored-by: Justus Magin <keewis@users.noreply.github.com>
1 parent 0b6c6c9 commit 6cd6122

29 files changed

+1406
-209
lines changed

Diff for: doc/whats-new.rst

+5
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,11 @@ Documentation
5858
Internal Changes
5959
~~~~~~~~~~~~~~~~
6060

61+
- Experimental support for wrapping chunked array libraries other than dask.
62+
A new ABC is defined - :py:class:`xr.core.parallelcompat.ChunkManagerEntrypoint` - which can be subclassed and then
63+
registered by alternative chunked array implementations. (:issue:`6807`, :pull:`7019`)
64+
By `Tom Nicholas <https://github.com/TomNicholas>`_.
65+
6166

6267
.. _whats-new.2023.04.2:
6368

Diff for: pyproject.toml

+1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ module = [
3939
"cf_units.*",
4040
"cfgrib.*",
4141
"cftime.*",
42+
"cubed.*",
4243
"cupy.*",
4344
"fsspec.*",
4445
"h5netcdf.*",

Diff for: setup.cfg

+4
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,10 @@ xarray =
132132
static/css/*
133133
static/html/*
134134

135+
[options.entry_points]
136+
xarray.chunkmanagers =
137+
dask = xarray.core.daskmanager:DaskManager
138+
135139
[tool:pytest]
136140
python_files = test_*.py
137141
testpaths = xarray/tests properties

Diff for: xarray/backends/api.py

+73-12
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,16 @@
66
from glob import glob
77
from io import BytesIO
88
from numbers import Number
9-
from typing import TYPE_CHECKING, Any, Callable, Final, Literal, Union, cast, overload
9+
from typing import (
10+
TYPE_CHECKING,
11+
Any,
12+
Callable,
13+
Final,
14+
Literal,
15+
Union,
16+
cast,
17+
overload,
18+
)
1019

1120
import numpy as np
1221

@@ -20,9 +29,11 @@
2029
_nested_combine,
2130
combine_by_coords,
2231
)
32+
from xarray.core.daskmanager import DaskManager
2333
from xarray.core.dataarray import DataArray
2434
from xarray.core.dataset import Dataset, _get_chunk, _maybe_chunk
2535
from xarray.core.indexes import Index
36+
from xarray.core.parallelcompat import guess_chunkmanager
2637
from xarray.core.utils import is_remote_uri
2738

2839
if TYPE_CHECKING:
@@ -38,6 +49,7 @@
3849
CompatOptions,
3950
JoinOptions,
4051
NestedSequence,
52+
T_Chunks,
4153
)
4254

4355
T_NetcdfEngine = Literal["netcdf4", "scipy", "h5netcdf"]
@@ -48,7 +60,6 @@
4860
str, # no nice typing support for custom backends
4961
None,
5062
]
51-
T_Chunks = Union[int, dict[Any, Any], Literal["auto"], None]
5263
T_NetcdfTypes = Literal[
5364
"NETCDF4", "NETCDF4_CLASSIC", "NETCDF3_64BIT", "NETCDF3_CLASSIC"
5465
]
@@ -297,17 +308,27 @@ def _chunk_ds(
297308
chunks,
298309
overwrite_encoded_chunks,
299310
inline_array,
311+
chunked_array_type,
312+
from_array_kwargs,
300313
**extra_tokens,
301314
):
302-
from dask.base import tokenize
315+
chunkmanager = guess_chunkmanager(chunked_array_type)
316+
317+
# TODO refactor to move this dask-specific logic inside the DaskManager class
318+
if isinstance(chunkmanager, DaskManager):
319+
from dask.base import tokenize
303320

304-
mtime = _get_mtime(filename_or_obj)
305-
token = tokenize(filename_or_obj, mtime, engine, chunks, **extra_tokens)
306-
name_prefix = f"open_dataset-{token}"
321+
mtime = _get_mtime(filename_or_obj)
322+
token = tokenize(filename_or_obj, mtime, engine, chunks, **extra_tokens)
323+
name_prefix = "open_dataset-"
324+
else:
325+
# not used
326+
token = (None,)
327+
name_prefix = None
307328

308329
variables = {}
309330
for name, var in backend_ds.variables.items():
310-
var_chunks = _get_chunk(var, chunks)
331+
var_chunks = _get_chunk(var, chunks, chunkmanager)
311332
variables[name] = _maybe_chunk(
312333
name,
313334
var,
@@ -316,6 +337,8 @@ def _chunk_ds(
316337
name_prefix=name_prefix,
317338
token=token,
318339
inline_array=inline_array,
340+
chunked_array_type=chunkmanager,
341+
from_array_kwargs=from_array_kwargs.copy(),
319342
)
320343
return backend_ds._replace(variables)
321344

@@ -328,6 +351,8 @@ def _dataset_from_backend_dataset(
328351
cache,
329352
overwrite_encoded_chunks,
330353
inline_array,
354+
chunked_array_type,
355+
from_array_kwargs,
331356
**extra_tokens,
332357
):
333358
if not isinstance(chunks, (int, dict)) and chunks not in {None, "auto"}:
@@ -346,6 +371,8 @@ def _dataset_from_backend_dataset(
346371
chunks,
347372
overwrite_encoded_chunks,
348373
inline_array,
374+
chunked_array_type,
375+
from_array_kwargs,
349376
**extra_tokens,
350377
)
351378

@@ -373,6 +400,8 @@ def open_dataset(
373400
decode_coords: Literal["coordinates", "all"] | bool | None = None,
374401
drop_variables: str | Iterable[str] | None = None,
375402
inline_array: bool = False,
403+
chunked_array_type: str | None = None,
404+
from_array_kwargs: dict[str, Any] | None = None,
376405
backend_kwargs: dict[str, Any] | None = None,
377406
**kwargs,
378407
) -> Dataset:
@@ -465,6 +494,15 @@ def open_dataset(
465494
itself, and each chunk refers to that task by its key. With
466495
``inline_array=True``, Dask will instead inline the array directly
467496
in the values of the task graph. See :py:func:`dask.array.from_array`.
497+
chunked_array_type: str, optional
498+
Which chunked array type to coerce this datasets' arrays to.
499+
Defaults to 'dask' if installed, else whatever is registered via the `ChunkManagerEnetryPoint` system.
500+
Experimental API that should not be relied upon.
501+
from_array_kwargs: dict
502+
Additional keyword arguments passed on to the `ChunkManagerEntrypoint.from_array` method used to create
503+
chunked arrays, via whichever chunk manager is specified through the `chunked_array_type` kwarg.
504+
For example if :py:func:`dask.array.Array` objects are used for chunking, additional kwargs will be passed
505+
to :py:func:`dask.array.from_array`. Experimental API that should not be relied upon.
468506
backend_kwargs: dict
469507
Additional keyword arguments passed on to the engine open function,
470508
equivalent to `**kwargs`.
@@ -508,6 +546,9 @@ def open_dataset(
508546
if engine is None:
509547
engine = plugins.guess_engine(filename_or_obj)
510548

549+
if from_array_kwargs is None:
550+
from_array_kwargs = {}
551+
511552
backend = plugins.get_backend(engine)
512553

513554
decoders = _resolve_decoders_kwargs(
@@ -536,6 +577,8 @@ def open_dataset(
536577
cache,
537578
overwrite_encoded_chunks,
538579
inline_array,
580+
chunked_array_type,
581+
from_array_kwargs,
539582
drop_variables=drop_variables,
540583
**decoders,
541584
**kwargs,
@@ -546,8 +589,8 @@ def open_dataset(
546589
def open_dataarray(
547590
filename_or_obj: str | os.PathLike[Any] | BufferedIOBase | AbstractDataStore,
548591
*,
549-
engine: T_Engine = None,
550-
chunks: T_Chunks = None,
592+
engine: T_Engine | None = None,
593+
chunks: T_Chunks | None = None,
551594
cache: bool | None = None,
552595
decode_cf: bool | None = None,
553596
mask_and_scale: bool | None = None,
@@ -558,6 +601,8 @@ def open_dataarray(
558601
decode_coords: Literal["coordinates", "all"] | bool | None = None,
559602
drop_variables: str | Iterable[str] | None = None,
560603
inline_array: bool = False,
604+
chunked_array_type: str | None = None,
605+
from_array_kwargs: dict[str, Any] | None = None,
561606
backend_kwargs: dict[str, Any] | None = None,
562607
**kwargs,
563608
) -> DataArray:
@@ -652,6 +697,15 @@ def open_dataarray(
652697
itself, and each chunk refers to that task by its key. With
653698
``inline_array=True``, Dask will instead inline the array directly
654699
in the values of the task graph. See :py:func:`dask.array.from_array`.
700+
chunked_array_type: str, optional
701+
Which chunked array type to coerce the underlying data array to.
702+
Defaults to 'dask' if installed, else whatever is registered via the `ChunkManagerEnetryPoint` system.
703+
Experimental API that should not be relied upon.
704+
from_array_kwargs: dict
705+
Additional keyword arguments passed on to the `ChunkManagerEntrypoint.from_array` method used to create
706+
chunked arrays, via whichever chunk manager is specified through the `chunked_array_type` kwarg.
707+
For example if :py:func:`dask.array.Array` objects are used for chunking, additional kwargs will be passed
708+
to :py:func:`dask.array.from_array`. Experimental API that should not be relied upon.
655709
backend_kwargs: dict
656710
Additional keyword arguments passed on to the engine open function,
657711
equivalent to `**kwargs`.
@@ -695,6 +749,8 @@ def open_dataarray(
695749
cache=cache,
696750
drop_variables=drop_variables,
697751
inline_array=inline_array,
752+
chunked_array_type=chunked_array_type,
753+
from_array_kwargs=from_array_kwargs,
698754
backend_kwargs=backend_kwargs,
699755
use_cftime=use_cftime,
700756
decode_timedelta=decode_timedelta,
@@ -726,7 +782,7 @@ def open_dataarray(
726782

727783
def open_mfdataset(
728784
paths: str | NestedSequence[str | os.PathLike],
729-
chunks: T_Chunks = None,
785+
chunks: T_Chunks | None = None,
730786
concat_dim: str
731787
| DataArray
732788
| Index
@@ -736,7 +792,7 @@ def open_mfdataset(
736792
| None = None,
737793
compat: CompatOptions = "no_conflicts",
738794
preprocess: Callable[[Dataset], Dataset] | None = None,
739-
engine: T_Engine = None,
795+
engine: T_Engine | None = None,
740796
data_vars: Literal["all", "minimal", "different"] | list[str] = "all",
741797
coords="different",
742798
combine: Literal["by_coords", "nested"] = "by_coords",
@@ -1490,6 +1546,7 @@ def to_zarr(
14901546
safe_chunks: bool = True,
14911547
storage_options: dict[str, str] | None = None,
14921548
zarr_version: int | None = None,
1549+
chunkmanager_store_kwargs: dict[str, Any] | None = None,
14931550
) -> backends.ZarrStore:
14941551
...
14951552

@@ -1512,6 +1569,7 @@ def to_zarr(
15121569
safe_chunks: bool = True,
15131570
storage_options: dict[str, str] | None = None,
15141571
zarr_version: int | None = None,
1572+
chunkmanager_store_kwargs: dict[str, Any] | None = None,
15151573
) -> Delayed:
15161574
...
15171575

@@ -1531,6 +1589,7 @@ def to_zarr(
15311589
safe_chunks: bool = True,
15321590
storage_options: dict[str, str] | None = None,
15331591
zarr_version: int | None = None,
1592+
chunkmanager_store_kwargs: dict[str, Any] | None = None,
15341593
) -> backends.ZarrStore | Delayed:
15351594
"""This function creates an appropriate datastore for writing a dataset to
15361595
a zarr ztore
@@ -1652,7 +1711,9 @@ def to_zarr(
16521711
writer = ArrayWriter()
16531712
# TODO: figure out how to properly handle unlimited_dims
16541713
dump_to_store(dataset, zstore, writer, encoding=encoding)
1655-
writes = writer.sync(compute=compute)
1714+
writes = writer.sync(
1715+
compute=compute, chunkmanager_store_kwargs=chunkmanager_store_kwargs
1716+
)
16561717

16571718
if compute:
16581719
_finalize_store(writes, zstore)

Diff for: xarray/backends/common.py

+10-5
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@
1111

1212
from xarray.conventions import cf_encoder
1313
from xarray.core import indexing
14-
from xarray.core.pycompat import is_duck_dask_array
14+
from xarray.core.parallelcompat import get_chunked_array_type
15+
from xarray.core.pycompat import is_chunked_array
1516
from xarray.core.utils import FrozenDict, NdimSizeLenMixin, is_remote_uri
1617

1718
if TYPE_CHECKING:
@@ -153,7 +154,7 @@ def __init__(self, lock=None):
153154
self.lock = lock
154155

155156
def add(self, source, target, region=None):
156-
if is_duck_dask_array(source):
157+
if is_chunked_array(source):
157158
self.sources.append(source)
158159
self.targets.append(target)
159160
self.regions.append(region)
@@ -163,21 +164,25 @@ def add(self, source, target, region=None):
163164
else:
164165
target[...] = source
165166

166-
def sync(self, compute=True):
167+
def sync(self, compute=True, chunkmanager_store_kwargs=None):
167168
if self.sources:
168-
import dask.array as da
169+
chunkmanager = get_chunked_array_type(*self.sources)
169170

170171
# TODO: consider wrapping targets with dask.delayed, if this makes
171172
# for any discernible difference in perforance, e.g.,
172173
# targets = [dask.delayed(t) for t in self.targets]
173174

174-
delayed_store = da.store(
175+
if chunkmanager_store_kwargs is None:
176+
chunkmanager_store_kwargs = {}
177+
178+
delayed_store = chunkmanager.store(
175179
self.sources,
176180
self.targets,
177181
lock=self.lock,
178182
compute=compute,
179183
flush=True,
180184
regions=self.regions,
185+
**chunkmanager_store_kwargs,
181186
)
182187
self.sources = []
183188
self.targets = []

Diff for: xarray/backends/plugins.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ def refresh_engines() -> None:
146146

147147
def guess_engine(
148148
store_spec: str | os.PathLike[Any] | BufferedIOBase | AbstractDataStore,
149-
):
149+
) -> str | type[BackendEntrypoint]:
150150
engines = list_engines()
151151

152152
for engine, backend in engines.items():

Diff for: xarray/backends/zarr.py

+21-2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
)
2020
from xarray.backends.store import StoreBackendEntrypoint
2121
from xarray.core import indexing
22+
from xarray.core.parallelcompat import guess_chunkmanager
2223
from xarray.core.pycompat import integer_types
2324
from xarray.core.utils import (
2425
FrozenDict,
@@ -716,6 +717,8 @@ def open_zarr(
716717
decode_timedelta=None,
717718
use_cftime=None,
718719
zarr_version=None,
720+
chunked_array_type: str | None = None,
721+
from_array_kwargs: dict[str, Any] | None = None,
719722
**kwargs,
720723
):
721724
"""Load and decode a dataset from a Zarr store.
@@ -800,6 +803,15 @@ def open_zarr(
800803
The desired zarr spec version to target (currently 2 or 3). The default
801804
of None will attempt to determine the zarr version from ``store`` when
802805
possible, otherwise defaulting to 2.
806+
chunked_array_type: str, optional
807+
Which chunked array type to coerce this datasets' arrays to.
808+
Defaults to 'dask' if installed, else whatever is registered via the `ChunkManagerEnetryPoint` system.
809+
Experimental API that should not be relied upon.
810+
from_array_kwargs: dict, optional
811+
Additional keyword arguments passed on to the `ChunkManagerEntrypoint.from_array` method used to create
812+
chunked arrays, via whichever chunk manager is specified through the `chunked_array_type` kwarg.
813+
Defaults to {'manager': 'dask'}, meaning additional kwargs will be passed eventually to
814+
:py:func:`dask.array.from_array`. Experimental API that should not be relied upon.
803815
804816
Returns
805817
-------
@@ -817,12 +829,17 @@ def open_zarr(
817829
"""
818830
from xarray.backends.api import open_dataset
819831

832+
if from_array_kwargs is None:
833+
from_array_kwargs = {}
834+
820835
if chunks == "auto":
821836
try:
822-
import dask.array # noqa
837+
guess_chunkmanager(
838+
chunked_array_type
839+
) # attempt to import that parallel backend
823840

824841
chunks = {}
825-
except ImportError:
842+
except ValueError:
826843
chunks = None
827844

828845
if kwargs:
@@ -851,6 +868,8 @@ def open_zarr(
851868
engine="zarr",
852869
chunks=chunks,
853870
drop_variables=drop_variables,
871+
chunked_array_type=chunked_array_type,
872+
from_array_kwargs=from_array_kwargs,
854873
backend_kwargs=backend_kwargs,
855874
decode_timedelta=decode_timedelta,
856875
use_cftime=use_cftime,

0 commit comments

Comments
 (0)