From afd0a37d58b07feb2db4127b65e5080073a348f3 Mon Sep 17 00:00:00 2001 From: ilan-gold Date: Thu, 7 Nov 2024 10:48:18 +0100 Subject: [PATCH 01/24] (chore): file structure --- python/zarrs_python/__init__.py | 174 +------------------------------- python/zarrs_python/pipeline.py | 124 +++++++++++++++++++++++ python/zarrs_python/utils.py | 63 ++++++++++++ 3 files changed, 191 insertions(+), 170 deletions(-) create mode 100644 python/zarrs_python/pipeline.py create mode 100644 python/zarrs_python/utils.py diff --git a/python/zarrs_python/__init__.py b/python/zarrs_python/__init__.py index 6f76c8c..707046d 100644 --- a/python/zarrs_python/__init__.py +++ b/python/zarrs_python/__init__.py @@ -1,177 +1,11 @@ -from __future__ import annotations - -import asyncio -import json -import os -from dataclasses import dataclass -from typing import TYPE_CHECKING, Any - -import numpy as np -from zarr.abc.codec import ( - Codec, - CodecPipeline, -) -from zarr.core.config import config -from zarr.core.indexing import ArrayIndexError, SelectorTuple, is_integer from zarr.registry import register_pipeline -if TYPE_CHECKING: - from collections.abc import Iterable, Iterator - from typing import Self - - from zarr.abc.store import ByteGetter, ByteSetter - from zarr.core.array_spec import ArraySpec - from zarr.core.buffer import Buffer, NDBuffer - from zarr.core.chunk_grids import ChunkGrid - from zarr.core.common import ChunkCoords - -from ._internal import CodecPipelineImpl - - -# adapted from https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor -def get_max_threads() -> int: - return (os.cpu_count() or 1) + 4 - - -# This is a copy of the function from zarr.core.indexing that fixes: -# DeprecationWarning: Conversion of an array with ndim > 0 to a scalar is deprecated -# TODO: Upstream this fix -def make_slice_selection(selection: Any) -> list[slice]: - ls: list[slice] = [] - for dim_selection in selection: - if is_integer(dim_selection): - ls.append(slice(int(dim_selection), int(dim_selection) + 1, 1)) - elif isinstance(dim_selection, np.ndarray): - if len(dim_selection) == 1: - ls.append( - slice(int(dim_selection.item()), int(dim_selection.item()) + 1, 1) - ) - else: - raise ArrayIndexError - else: - ls.append(dim_selection) - return ls - - -def selector_tuple_to_slice_selection(selector_tuple: SelectorTuple) -> list[slice]: - return ( - [selector_tuple] - if isinstance(selector_tuple, slice) - else make_slice_selection(selector_tuple) - ) - - -def make_chunk_info_for_rust( - batch_info: Iterable[tuple[ByteSetter, ArraySpec, SelectorTuple, SelectorTuple]], -) -> list[tuple[str, ChunkCoords, str, Any, list[slice], list[slice]]]: - return list( - ( - str(byte_getter), - chunk_spec.shape, - str(chunk_spec.dtype), - chunk_spec.fill_value.tobytes(), - selector_tuple_to_slice_selection(out_selection), - selector_tuple_to_slice_selection(chunk_selection), - ) - for (byte_getter, chunk_spec, chunk_selection, out_selection) in batch_info - ) - - -@dataclass(frozen=True) -class ZarrsCodecPipeline(CodecPipeline): - codecs: tuple[Codec, ...] - impl: CodecPipelineImpl - - def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: - raise NotImplementedError("evolve_from_array_spec") - - @classmethod - def from_codecs(cls, codecs: Iterable[Codec]) -> Self: - codec_metadata = [codec.to_dict() for codec in codecs] - codec_metadata_json = json.dumps(codec_metadata) - return cls( - codecs=tuple(codecs), - impl=CodecPipelineImpl( - codec_metadata_json, - config.get("codec_pipeline.validate_checksums", None), - config.get("codec_pipeline.store_empty_chunks", None), - config.get("codec_pipeline.concurrent_target", None), - ), - ) - - @property - def supports_partial_decode(self) -> bool: - return False - - @property - def supports_partial_encode(self) -> bool: - return False - - def __iter__(self) -> Iterator[Codec]: - yield from self.codecs - - def validate( - self, *, shape: ChunkCoords, dtype: np.dtype[Any], chunk_grid: ChunkGrid - ) -> None: - raise NotImplementedError("validate") - - def compute_encoded_size(self, byte_length: int, array_spec: ArraySpec) -> int: - raise NotImplementedError("compute_encoded_size") - - async def decode( - self, - chunk_bytes_and_specs: Iterable[tuple[Buffer | None, ArraySpec]], - ) -> Iterable[NDBuffer | None]: - raise NotImplementedError("decode") - - async def encode( - self, - chunk_arrays_and_specs: Iterable[tuple[NDBuffer | None, ArraySpec]], - ) -> Iterable[Buffer | None]: - raise NotImplementedError("encode") - - async def read( - self, - batch_info: Iterable[ - tuple[ByteGetter, ArraySpec, SelectorTuple, SelectorTuple] - ], - out: NDBuffer, - drop_axes: tuple[int, ...] = (), # FIXME: unused - ) -> None: - chunk_concurrent_limit = ( - config.get("threading.max_workers") or get_max_threads() - ) - out = out.as_ndarray_like() # FIXME: Error if array is not in host memory - if not out.dtype.isnative: - raise RuntimeError("Non-native byte order not supported") +from .pipeline import ZarrsCodecPipeline as _ZarrsCodecPipeline - chunks_desc = make_chunk_info_for_rust(batch_info) - await asyncio.to_thread( - self.impl.retrieve_chunks, chunks_desc, out, chunk_concurrent_limit - ) - return None - async def write( - self, - batch_info: Iterable[ - tuple[ByteSetter, ArraySpec, SelectorTuple, SelectorTuple] - ], - value: NDBuffer, - drop_axes: tuple[int, ...] = (), - ) -> None: - chunk_concurrent_limit = ( - config.get("threading.max_workers") or get_max_threads() - ) - value = value.as_ndarray_like() # FIXME: Error if array is not in host memory - if not value.dtype.isnative: - value = np.ascontiguousarray(value, dtype=value.dtype.newbyteorder("=")) - elif not value.flags.c_contiguous: - value = np.ascontiguousarray(value) - chunks_desc = make_chunk_info_for_rust(batch_info) - await asyncio.to_thread( - self.impl.store_chunks, chunks_desc, value, chunk_concurrent_limit - ) - return None +# Need to do this redirection so people can access the pipeline as `zarrs_python.ZarrsCodecPipeline` instead of `zarrs_python.pipeline.ZarrsCodecPipeline` +class ZarrsCodecPipeline(_ZarrsCodecPipeline): + pass register_pipeline(ZarrsCodecPipeline) diff --git a/python/zarrs_python/pipeline.py b/python/zarrs_python/pipeline.py new file mode 100644 index 0000000..1175413 --- /dev/null +++ b/python/zarrs_python/pipeline.py @@ -0,0 +1,124 @@ +from __future__ import annotations + +import asyncio +import json +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any + +import numpy as np +from zarr.abc.codec import ( + Codec, + CodecPipeline, +) +from zarr.core.config import config + +if TYPE_CHECKING: + from collections.abc import Iterable, Iterator + from typing import Self + + from zarr.abc.store import ByteGetter, ByteSetter + from zarr.core.array_spec import ArraySpec + from zarr.core.buffer import Buffer, NDBuffer + from zarr.core.chunk_grids import ChunkGrid + from zarr.core.common import ChunkCoords + from zarr.core.indexing import SelectorTuple + +from ._internal import CodecPipelineImpl +from .utils import get_max_threads, make_chunk_info_for_rust + + +@dataclass(frozen=True) +class ZarrsCodecPipeline(CodecPipeline): + codecs: tuple[Codec, ...] + impl: CodecPipelineImpl + + def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: + raise NotImplementedError("evolve_from_array_spec") + + @classmethod + def from_codecs(cls, codecs: Iterable[Codec]) -> Self: + codec_metadata = [codec.to_dict() for codec in codecs] + codec_metadata_json = json.dumps(codec_metadata) + return cls( + codecs=tuple(codecs), + impl=CodecPipelineImpl( + codec_metadata_json, + config.get("codec_pipeline.validate_checksums", None), + config.get("codec_pipeline.store_empty_chunks", None), + config.get("codec_pipeline.concurrent_target", None), + ), + ) + + @property + def supports_partial_decode(self) -> bool: + return False + + @property + def supports_partial_encode(self) -> bool: + return False + + def __iter__(self) -> Iterator[Codec]: + yield from self.codecs + + def validate( + self, *, shape: ChunkCoords, dtype: np.dtype[Any], chunk_grid: ChunkGrid + ) -> None: + raise NotImplementedError("validate") + + def compute_encoded_size(self, byte_length: int, array_spec: ArraySpec) -> int: + raise NotImplementedError("compute_encoded_size") + + async def decode( + self, + chunk_bytes_and_specs: Iterable[tuple[Buffer | None, ArraySpec]], + ) -> Iterable[NDBuffer | None]: + raise NotImplementedError("decode") + + async def encode( + self, + chunk_arrays_and_specs: Iterable[tuple[NDBuffer | None, ArraySpec]], + ) -> Iterable[Buffer | None]: + raise NotImplementedError("encode") + + async def read( + self, + batch_info: Iterable[ + tuple[ByteGetter, ArraySpec, SelectorTuple, SelectorTuple] + ], + out: NDBuffer, + drop_axes: tuple[int, ...] = (), # FIXME: unused + ) -> None: + chunk_concurrent_limit = ( + config.get("threading.max_workers") or get_max_threads() + ) + out = out.as_ndarray_like() # FIXME: Error if array is not in host memory + if not out.dtype.isnative: + raise RuntimeError("Non-native byte order not supported") + + chunks_desc = make_chunk_info_for_rust(batch_info) + await asyncio.to_thread( + self.impl.retrieve_chunks, chunks_desc, out, chunk_concurrent_limit + ) + return None + + async def write( + self, + batch_info: Iterable[ + tuple[ByteSetter, ArraySpec, SelectorTuple, SelectorTuple] + ], + value: NDBuffer, + drop_axes: tuple[int, ...] = (), + ) -> None: + chunk_concurrent_limit = ( + config.get("threading.max_workers") or get_max_threads() + ) + value = value.as_ndarray_like() # FIXME: Error if array is not in host memory + if not value.dtype.isnative: + value = np.ascontiguousarray(value, dtype=value.dtype.newbyteorder("=")) + elif not value.flags.c_contiguous: + value = np.ascontiguousarray(value) + chunks_desc = make_chunk_info_for_rust(batch_info) + await asyncio.to_thread( + self.impl.store_chunks, chunks_desc, value, chunk_concurrent_limit + ) + return None diff --git a/python/zarrs_python/utils.py b/python/zarrs_python/utils.py new file mode 100644 index 0000000..8d267a2 --- /dev/null +++ b/python/zarrs_python/utils.py @@ -0,0 +1,63 @@ +from __future__ import annotations + +import os +from typing import TYPE_CHECKING, Any + +import numpy as np +from zarr.core.indexing import ArrayIndexError, SelectorTuple, is_integer + +if TYPE_CHECKING: + from collections.abc import Iterable + + from zarr.abc.store import ByteSetter + from zarr.core.array_spec import ArraySpec + from zarr.core.common import ChunkCoords + + +# adapted from https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor +def get_max_threads() -> int: + return (os.cpu_count() or 1) + 4 + + +# This is a copy of the function from zarr.core.indexing that fixes: +# DeprecationWarning: Conversion of an array with ndim > 0 to a scalar is deprecated +# TODO: Upstream this fix +def make_slice_selection(selection: Any) -> list[slice]: + ls: list[slice] = [] + for dim_selection in selection: + if is_integer(dim_selection): + ls.append(slice(int(dim_selection), int(dim_selection) + 1, 1)) + elif isinstance(dim_selection, np.ndarray): + if len(dim_selection) == 1: + ls.append( + slice(int(dim_selection.item()), int(dim_selection.item()) + 1, 1) + ) + else: + raise ArrayIndexError + else: + ls.append(dim_selection) + return ls + + +def selector_tuple_to_slice_selection(selector_tuple: SelectorTuple) -> list[slice]: + return ( + [selector_tuple] + if isinstance(selector_tuple, slice) + else make_slice_selection(selector_tuple) + ) + + +def make_chunk_info_for_rust( + batch_info: Iterable[tuple[ByteSetter, ArraySpec, SelectorTuple, SelectorTuple]], +) -> list[tuple[str, ChunkCoords, str, Any, list[slice], list[slice]]]: + return list( + ( + str(byte_getter), + chunk_spec.shape, + str(chunk_spec.dtype), + chunk_spec.fill_value.tobytes(), + selector_tuple_to_slice_selection(out_selection), + selector_tuple_to_slice_selection(chunk_selection), + ) + for (byte_getter, chunk_spec, chunk_selection, out_selection) in batch_info + ) From 2af41ece60eacc76f71fab9ac827ef28c56728b7 Mon Sep 17 00:00:00 2001 From: ilan-gold Date: Thu, 7 Nov 2024 13:40:27 +0100 Subject: [PATCH 02/24] (chore): parametrize tests to get full scope of possibilities --- tests/test_pipeline.py | 146 +++++++++++++++++++++++++---------------- 1 file changed, 90 insertions(+), 56 deletions(-) diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 8efc5db..455ac2e 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -1,6 +1,10 @@ #!/usr/bin/env python3 +import operator import tempfile +from functools import reduce +from types import EllipsisType +from typing import Callable import numpy as np import pytest @@ -17,21 +21,88 @@ def fill_value() -> int: @pytest.fixture def chunks() -> tuple[int, ...]: - return (2, 2) + return (5, 5) -@pytest.fixture(params=[np.array([1, 2]), slice(1, 3)], ids=["array", "slice"]) -def indexer(request) -> slice | np.ndarray: +@pytest.fixture +def shape() -> tuple[int, ...]: + return (10, 10) + + +@pytest.fixture( + params=[ + np.array([1, 2]), + np.array([0, 2]), + slice(1, 3), + slice(1, 7), + np.array([0, 6]), + slice(None), + 2, + Ellipsis, + ], + ids=[ + "contiguous_in_chunk_array", + "discontinuous_in_chunk_array", + "slice_in_chunk", + "slice_across_chunks", + "across_chunks_indices_array", + "fill_slice", + "int", + "ellipsis", + ], +) +def indexer_1d(request) -> slice | np.ndarray | int | EllipsisType: return request.param -indexer_2 = indexer +@pytest.fixture +def full_array(shape) -> np.ndarray: + return np.arange(reduce(operator.mul, shape, 1)).reshape(shape) + + +indexer_1d_2 = indexer_1d @pytest.fixture -def arr(fill_value, chunks) -> zarr.Array: - shape = (4, 4) +def index(indexer_1d, indexer_1d_2): + if isinstance(indexer_1d, EllipsisType) and isinstance(indexer_1d_2, EllipsisType): + pytest.skip("Double ellipsis indexing is valid") + return indexer_1d, indexer_1d_2 + + +@pytest.fixture( + params=[lambda x: getattr(x, "oindex"), lambda x: x], ids=["oindex", "vindex"] +) +def indexing_method(request) -> Callable: + return request.param + +@pytest.fixture +def store_values( + indexing_method: Callable, + index: tuple[int | slice | np.ndarray | EllipsisType, ...], + full_array: np.ndarray, + shape: tuple[int, ...], +) -> np.ndarray: + class smoke: + oindex = None + + if not isinstance(index, EllipsisType) and indexing_method(smoke) == "oindex": + index: tuple[int | np.ndarray, ...] = tuple( + i + if (not isinstance(i, slice)) + else np.arange( + i.start if hasattr(i, "start") else 0, + i.stop if hasattr(i, "end") else shape[axis], + ) + for axis, i in enumerate(index) + ) + return full_array[np.ix_(index)] + return full_array[index] + + +@pytest.fixture +def arr(fill_value, chunks, shape) -> zarr.Array: tmp = tempfile.TemporaryDirectory() return zarr.create( shape, @@ -58,72 +129,35 @@ def test_roundtrip_singleton(arr: zarr.Array): assert arr[0, 0] != 42 -def test_roundtrip_full_array(arr: zarr.Array): - stored_values = np.arange(16).reshape(4, 4) +def test_roundtrip_full_array(arr: zarr.Array, shape: tuple[int, ...]): + stored_values = np.arange(reduce(operator.mul, shape, 1)).reshape(shape) arr[:] = stored_values assert np.all(arr[:] == stored_values) -def test_roundtrip_partial( +def test_roundtrip( arr: zarr.Array, - indexer: slice | np.ndarray, - indexer_2: slice | np.ndarray, + store_values: np.ndarray, + index: tuple[int | slice | np.ndarray | EllipsisType, ...], + indexing_method: Callable, ): - if isinstance(indexer, np.ndarray) and isinstance(indexer_2, np.ndarray): + if not isinstance(index, EllipsisType) and all( + isinstance(i, np.ndarray) for i in index + ): pytest.skip( "indexing across two axes with arrays seems to have strange behavior even in normal zarr" ) - stored_value = np.array([[-1, -2], [-3, -4]]) - arr[indexer, indexer_2] = stored_value - res = arr[indexer, indexer_2] + indexing_method(arr)[index] = store_values + res = indexing_method(arr)[index] assert np.all( - res == stored_value, + res == store_values, ), res -def test_roundtrip_1d_axis(arr: zarr.Array, indexer: slice | np.ndarray): - stored_value = np.array([-3, -4]) - arr[2, indexer] = stored_value - res = arr[2, indexer] - assert np.all(res == stored_value), res - - -def test_roundtrip_orthogonal_indexing( - arr: zarr.Array, indexer: slice | np.ndarray, indexer_2: np.ndarray | slice -): - stored_value = np.array([[-1, -2], [-3, -4]]) - arr.oindex[indexer, indexer_2] = stored_value - res = arr.oindex[indexer, indexer_2] - assert np.all(res == stored_value), res - - -def test_roundtrip_orthogonal_indexing_1d_axis( - arr: zarr.Array, indexer: slice | np.ndarray -): - stored_value = np.array([-3, -4]) - arr.oindex[2, indexer] = stored_value - res = arr.oindex[2, indexer] - assert np.all(res == stored_value), res - - -def test_roundtrip_ellipsis_indexing_2d(arr: zarr.Array): - stored_value = np.arange(arr.size).reshape(arr.shape) - arr[...] = stored_value - res = arr[...] - assert np.all(res == stored_value), res - - -def test_roundtrip_ellipsis_indexing_1d(arr: zarr.Array): - stored_value = np.array([1, 2, 3, 4]) - arr[2, ...] = stored_value - res = arr[2, ...] - assert np.all(res == stored_value), res - - def test_roundtrip_ellipsis_indexing_1d_invalid(arr: zarr.Array): stored_value = np.array([1, 2, 3]) with pytest.raises( - BaseException # TODO: ValueError, but this raises pyo3_runtime.PanicException + BaseException # TODO: ValueError, but this raises pyo3_runtime.PanicException # noqa: PT011 ): # zarrs-python error: ValueError: operands could not be broadcast together with shapes (4,) (3,) # numpy error: ValueError: could not broadcast input array from shape (3,) into shape (4,) From c945e88cdef8b2600c16ffe211b1f6baf6e6489a Mon Sep 17 00:00:00 2001 From: ilan-gold Date: Fri, 8 Nov 2024 11:51:55 +0100 Subject: [PATCH 03/24] (chore): xfail tests that fail on zarr-python default pipeline --- tests/conftest.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/tests/conftest.py b/tests/conftest.py index 24e016c..f8e247d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -14,6 +14,7 @@ import zarrs_python # noqa: F401 if TYPE_CHECKING: + from collections.abc import Iterable from typing import Any, Literal from zarr.abc.store import Store @@ -60,3 +61,28 @@ def array_fixture(request: pytest.FixtureRequest) -> npt.NDArray[Any]: .reshape(array_request.shape, order=array_request.order) .astype(array_request.dtype) ) + + +# tests that also fail with zarr-python's default codec pipeline +zarr_python_default_codec_pipeline_failures = [ + "test_roundtrip[oindex-contiguous_in_chunk_array-ellipsis]", + "test_roundtrip[oindex-discontinuous_in_chunk_array-ellipsis]", + "test_roundtrip[vindex-contiguous_in_chunk_array-ellipsis]", + "test_roundtrip[vindex-discontinuous_in_chunk_array-ellipsis]", + "test_roundtrip[oindex-across_chunks_indices_array-ellipsis]", + "test_roundtrip[vindex-ellipsis-across_chunks_indices_array]", + "test_roundtrip[vindex-across_chunks_indices_array-ellipsis]", + "test_roundtrip[vindex-ellipsis-contiguous_in_chunk_array]", + "test_roundtrip[vindex-ellipsis-discontinuous_in_chunk_array]", +] + + +def pytest_collection_modifyitems( + config: pytest.Config, items: Iterable[pytest.Item] +) -> None: + xfail_marker = pytest.mark.xfail( + reason="These tests fail with the zarr-python default codec pipeline." + ) + for item in items: + if item.name in zarr_python_default_codec_pipeline_failures: + item.add_marker(xfail_marker) From 0bb8dbcaca998248ade6d334d609d78def19b7f6 Mon Sep 17 00:00:00 2001 From: ilan-gold Date: Fri, 8 Nov 2024 11:53:14 +0100 Subject: [PATCH 04/24] (fix) singular --- tests/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/conftest.py b/tests/conftest.py index f8e247d..032b6f3 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -81,7 +81,7 @@ def pytest_collection_modifyitems( config: pytest.Config, items: Iterable[pytest.Item] ) -> None: xfail_marker = pytest.mark.xfail( - reason="These tests fail with the zarr-python default codec pipeline." + reason="This test fails with the zarr-python default codec pipeline." ) for item in items: if item.name in zarr_python_default_codec_pipeline_failures: From 3f176bfa3c67d65f46a40b447a9d287ed99fb266 Mon Sep 17 00:00:00 2001 From: ilan-gold Date: Fri, 8 Nov 2024 12:13:59 +0100 Subject: [PATCH 05/24] (fix): check for contiguous index arrays --- python/zarrs_python/utils.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/python/zarrs_python/utils.py b/python/zarrs_python/utils.py index 8d267a2..1999fce 100644 --- a/python/zarrs_python/utils.py +++ b/python/zarrs_python/utils.py @@ -22,29 +22,32 @@ def get_max_threads() -> int: # This is a copy of the function from zarr.core.indexing that fixes: # DeprecationWarning: Conversion of an array with ndim > 0 to a scalar is deprecated # TODO: Upstream this fix -def make_slice_selection(selection: Any) -> list[slice]: +def make_slice_selection(selection: tuple[np.ndarray | float]) -> list[slice]: ls: list[slice] = [] for dim_selection in selection: if is_integer(dim_selection): ls.append(slice(int(dim_selection), int(dim_selection) + 1, 1)) elif isinstance(dim_selection, np.ndarray): + dim_selection = dim_selection.ravel() if len(dim_selection) == 1: ls.append( slice(int(dim_selection.item()), int(dim_selection.item()) + 1, 1) ) else: - raise ArrayIndexError + if not (np.diff(dim_selection) == 1).all(): + raise ArrayIndexError + ls.append(slice(dim_selection[0], dim_selection[-1] + 1, 1)) else: ls.append(dim_selection) return ls def selector_tuple_to_slice_selection(selector_tuple: SelectorTuple) -> list[slice]: - return ( - [selector_tuple] - if isinstance(selector_tuple, slice) - else make_slice_selection(selector_tuple) - ) + if isinstance(selector_tuple, slice): + return [selector_tuple] + if all(isinstance(s, slice) for s in selector_tuple): + return list(selector_tuple) + return make_slice_selection(selector_tuple) def make_chunk_info_for_rust( From ff44774d2e7f30314ba50fca2acc9ee8dee88dbb Mon Sep 17 00:00:00 2001 From: ilan-gold Date: Fri, 8 Nov 2024 12:17:28 +0100 Subject: [PATCH 06/24] (fix): contiguous numpy arrays converted to slices --- python/zarrs_python/utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/zarrs_python/utils.py b/python/zarrs_python/utils.py index 1999fce..41af9b8 100644 --- a/python/zarrs_python/utils.py +++ b/python/zarrs_python/utils.py @@ -34,7 +34,8 @@ def make_slice_selection(selection: tuple[np.ndarray | float]) -> list[slice]: slice(int(dim_selection.item()), int(dim_selection.item()) + 1, 1) ) else: - if not (np.diff(dim_selection) == 1).all(): + diff = np.diff(dim_selection) + if not ((diff != 1).all() or (diff != 0).all()): raise ArrayIndexError ls.append(slice(dim_selection[0], dim_selection[-1] + 1, 1)) else: From f81a1c8119ec072f376aeb0b529cfda2e5d3e25a Mon Sep 17 00:00:00 2001 From: ilan-gold Date: Fri, 8 Nov 2024 16:15:07 +0100 Subject: [PATCH 07/24] (feat): add reading for non-contiguous buffers --- python/zarrs_python/pipeline.py | 44 ++++++++++--- python/zarrs_python/utils.py | 43 +++++++++--- src/lib.rs | 112 ++++++++++++++++++++++++++++---- tests/conftest.py | 35 +++++++++- tests/test_pipeline.py | 41 ++++++++++-- 5 files changed, 237 insertions(+), 38 deletions(-) diff --git a/python/zarrs_python/pipeline.py b/python/zarrs_python/pipeline.py index 1175413..b89ca16 100644 --- a/python/zarrs_python/pipeline.py +++ b/python/zarrs_python/pipeline.py @@ -24,7 +24,12 @@ from zarr.core.indexing import SelectorTuple from ._internal import CodecPipelineImpl -from .utils import get_max_threads, make_chunk_info_for_rust +from .utils import ( + DiscontiguousArrayError, + get_max_threads, + make_chunk_info_for_rust, + make_chunk_info_for_rust_with_indices, +) @dataclass(frozen=True) @@ -94,12 +99,32 @@ async def read( out = out.as_ndarray_like() # FIXME: Error if array is not in host memory if not out.dtype.isnative: raise RuntimeError("Non-native byte order not supported") - - chunks_desc = make_chunk_info_for_rust(batch_info) - await asyncio.to_thread( - self.impl.retrieve_chunks, chunks_desc, out, chunk_concurrent_limit + try: + chunks_desc = make_chunk_info_for_rust_with_indices(batch_info) + index_in_rust = True + except DiscontiguousArrayError: + chunks_desc = make_chunk_info_for_rust(batch_info) + index_in_rust = False + if index_in_rust: + await asyncio.to_thread( + self.impl.retrieve_chunks_and_apply_index, + chunks_desc, + out, + chunk_concurrent_limit, + ) + return None + chunks = await asyncio.to_thread( + self.impl.retrieve_chunks, chunks_desc, chunk_concurrent_limit ) - return None + for chunk, chunk_info in zip(chunks, batch_info): + out_selection = chunk_info[3] + selection = chunk_info[2] + spec = chunk_info[1] + chunk_reshaped = chunk.view(spec.dtype).reshape(spec.shape) + chunk_selected = chunk_reshaped[selection] + if drop_axes: + chunk_selected = np.squeeze(chunk_selected, axis=drop_axes) + out[out_selection] = chunk_selected async def write( self, @@ -117,8 +142,11 @@ async def write( value = np.ascontiguousarray(value, dtype=value.dtype.newbyteorder("=")) elif not value.flags.c_contiguous: value = np.ascontiguousarray(value) - chunks_desc = make_chunk_info_for_rust(batch_info) + chunks_desc = make_chunk_info_for_rust_with_indices(batch_info) await asyncio.to_thread( - self.impl.store_chunks, chunks_desc, value, chunk_concurrent_limit + self.impl.store_chunks_with_indices, + chunks_desc, + value, + chunk_concurrent_limit, ) return None diff --git a/python/zarrs_python/utils.py b/python/zarrs_python/utils.py index 41af9b8..803b344 100644 --- a/python/zarrs_python/utils.py +++ b/python/zarrs_python/utils.py @@ -4,12 +4,12 @@ from typing import TYPE_CHECKING, Any import numpy as np -from zarr.core.indexing import ArrayIndexError, SelectorTuple, is_integer +from zarr.core.indexing import SelectorTuple, is_integer if TYPE_CHECKING: from collections.abc import Iterable - from zarr.abc.store import ByteSetter + from zarr.abc.store import ByteGetter, ByteSetter from zarr.core.array_spec import ArraySpec from zarr.core.common import ChunkCoords @@ -19,6 +19,10 @@ def get_max_threads() -> int: return (os.cpu_count() or 1) + 4 +class DiscontiguousArrayError(BaseException): + pass + + # This is a copy of the function from zarr.core.indexing that fixes: # DeprecationWarning: Conversion of an array with ndim > 0 to a scalar is deprecated # TODO: Upstream this fix @@ -35,8 +39,8 @@ def make_slice_selection(selection: tuple[np.ndarray | float]) -> list[slice]: ) else: diff = np.diff(dim_selection) - if not ((diff != 1).all() or (diff != 0).all()): - raise ArrayIndexError + if (diff != 1).any() and (diff != 0).any(): + raise DiscontiguousArrayError(diff) ls.append(slice(dim_selection[0], dim_selection[-1] + 1, 1)) else: ls.append(dim_selection) @@ -51,17 +55,36 @@ def selector_tuple_to_slice_selection(selector_tuple: SelectorTuple) -> list[sli return make_slice_selection(selector_tuple) -def make_chunk_info_for_rust( - batch_info: Iterable[tuple[ByteSetter, ArraySpec, SelectorTuple, SelectorTuple]], +def convert_chunk_to_primitive( + byte_getter: ByteGetter | ByteSetter, chunk_spec: ArraySpec +) -> tuple[str, ChunkCoords, str, Any]: + return ( + str(byte_getter), + chunk_spec.shape, + str(chunk_spec.dtype), + chunk_spec.fill_value.tobytes(), + ) + + +def make_chunk_info_for_rust_with_indices( + batch_info: Iterable[ + tuple[ByteGetter | ByteSetter, ArraySpec, SelectorTuple, SelectorTuple] + ], ) -> list[tuple[str, ChunkCoords, str, Any, list[slice], list[slice]]]: return list( ( - str(byte_getter), - chunk_spec.shape, - str(chunk_spec.dtype), - chunk_spec.fill_value.tobytes(), + *convert_chunk_to_primitive(byte_getter, chunk_spec), selector_tuple_to_slice_selection(out_selection), selector_tuple_to_slice_selection(chunk_selection), ) for (byte_getter, chunk_spec, chunk_selection, out_selection) in batch_info ) + + +def make_chunk_info_for_rust( + batch_info: Iterable[tuple[ByteGetter, ArraySpec, SelectorTuple, SelectorTuple]], +) -> list[tuple[str, ChunkCoords, str, Any]]: + return list( + convert_chunk_to_primitive(byte_getter, chunk_spec) + for (byte_getter, chunk_spec, chunk_selection, out_selection) in batch_info + ) diff --git a/src/lib.rs b/src/lib.rs index 004758b..b860c16 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,13 +1,15 @@ #![warn(clippy::pedantic)] use numpy::npyffi::PyArrayObject; -use numpy::{PyUntypedArray, PyUntypedArrayMethods}; +use numpy::{IntoPyArray, PyArray, PyUntypedArray, PyUntypedArrayMethods}; use pyo3::exceptions::{PyRuntimeError, PyTypeError, PyValueError}; use pyo3::prelude::*; use pyo3::types::PySlice; use rayon::iter::{IntoParallelIterator, ParallelIterator}; +use rayon::vec; use rayon_iter_concurrent_limit::iter_concurrent_limit; use std::borrow::Cow; +use std::io::{Bytes, Read}; use std::num::NonZeroU64; use std::sync::{Arc, Mutex}; use unsafe_cell_slice::UnsafeCellSlice; @@ -73,15 +75,36 @@ impl CodecPipelineImpl { fn collect_chunk_descriptions( &self, chunk_descriptions: Vec, - shape: &[u64], ) -> PyResult> { + chunk_descriptions + .into_iter() + .map(|(store_path, chunk_shape, dtype, fill_value)| { + let (store, path) = self.get_store_and_path(&store_path)?; + let key = StoreKey::new(path).map_py_err::()?; + Ok(ChunksItem { + store, + key, + representation: Self::get_chunk_representation( + chunk_shape, + &dtype, + fill_value, + )?, + }) + }) + .collect() + } + fn collect_chunk_descriptions_with_index( + &self, + chunk_descriptions: Vec, + shape: &[u64], + ) -> PyResult> { chunk_descriptions .into_iter() .map( |(store_path, chunk_shape, dtype, fill_value, selection, chunk_selection)| { let (store, path) = self.get_store_and_path(&store_path)?; let key = StoreKey::new(path).map_py_err::()?; - Ok(ChunksItem { + Ok(ChunksItemWithSubset { store, key, chunk_subset: Self::selection_to_array_subset( @@ -304,7 +327,7 @@ impl CodecPipelineImpl { } } -type ChunksItemRaw<'a> = ( +type ChunksItemRawWithIndices<'a> = ( // store path String, // shape @@ -319,13 +342,29 @@ type ChunksItemRaw<'a> = ( Vec>, ); -struct ChunksItem { +type ChunksItemRaw<'a> = ( + // store path + String, + // shape + Vec, + // data type + String, + // fill value bytes + Vec, +); + +struct ChunksItemWithSubset { store: Arc, key: StoreKey, chunk_subset: ArraySubset, subset: ArraySubset, representation: ChunkRepresentation, } +struct ChunksItem { + store: Arc, + key: StoreKey, + representation: ChunkRepresentation, +} #[pymethods] impl CodecPipelineImpl { @@ -360,10 +399,10 @@ impl CodecPipelineImpl { }) } - fn retrieve_chunks( + fn retrieve_chunks_and_apply_index( &self, py: Python, - chunk_descriptions: Vec, // FIXME: Ref / iterable? + chunk_descriptions: Vec, // FIXME: Ref / iterable? value: &Bound<'_, PyUntypedArray>, chunk_concurrent_limit: usize, ) -> PyResult<()> { @@ -387,12 +426,12 @@ impl CodecPipelineImpl { }; let chunk_descriptions = - self.collect_chunk_descriptions(chunk_descriptions, &output_shape)?; + self.collect_chunk_descriptions_with_index(chunk_descriptions, &output_shape)?; py.allow_threads(move || { let codec_options = &self.codec_options; - let update_chunk_subset = |item: ChunksItem| { + let update_chunk_subset = |item: ChunksItemWithSubset| { // See zarrs::array::Array::retrieve_chunk_subset_into if item.chunk_subset.start().iter().all(|&o| o == 0) && item.chunk_subset.shape() == item.representation.shape_u64() @@ -471,10 +510,57 @@ impl CodecPipelineImpl { }) } - fn store_chunks( + fn retrieve_chunks<'py>( + &self, + py: Python<'py>, + chunk_descriptions: Vec, // FIXME: Ref / iterable? + chunk_concurrent_limit: usize, + ) -> PyResult>>>> { + let chunk_descriptions = self.collect_chunk_descriptions(chunk_descriptions)?; + + let chunk_bytes = py.allow_threads(move || { + let codec_options = &self.codec_options; + + let get_chunk_subset = |item: ChunksItem| { + // See zarrs::array::Array::retrieve_chunk_into + let chunk_encoded = item.store.get(&item.key).map_py_err::(); + if let Some(chunk_encoded) = chunk_encoded.unwrap() { + let chunk_encoded: Vec = chunk_encoded.into(); + self.codec_chain.decode( + Cow::Owned(chunk_encoded), + &item.representation, + codec_options, + ) + } else { + // The chunk is missing so we need to create one. + let num_elements = item.representation.num_elements(); + let data_type_size = item.representation.data_type().size(); + let chunk_shape = ArraySize::new(data_type_size, num_elements); + let array_bytes = + ArrayBytes::new_fill_value(chunk_shape, item.representation.fill_value()); + Ok(array_bytes) + } + }; + let chunk_bytes: Vec> = iter_concurrent_limit!( + chunk_concurrent_limit, + chunk_descriptions, + map, + get_chunk_subset + ) + .map(|x| x.unwrap().into_fixed().unwrap().to_owned().into_owned()) + .collect::>>(); + chunk_bytes + }); + Ok(chunk_bytes + .into_iter() + .map(|x| x.into_pyarray_bound(py)) + .collect::>>>>()) + } + + fn store_chunks_with_indices( &self, py: Python, - chunk_descriptions: Vec, + chunk_descriptions: Vec, value: &Bound<'_, PyUntypedArray>, chunk_concurrent_limit: usize, ) -> PyResult<()> { @@ -509,12 +595,12 @@ impl CodecPipelineImpl { }; let chunk_descriptions = - self.collect_chunk_descriptions(chunk_descriptions, &input_shape)?; + self.collect_chunk_descriptions_with_index(chunk_descriptions, &input_shape)?; py.allow_threads(move || { let codec_options = &self.codec_options; - let store_chunk = |item: ChunksItem| match &input { + let store_chunk = |item: ChunksItemWithSubset| match &input { InputValue::Array(input) => { let chunk_subset_bytes = input .extract_array_subset( diff --git a/tests/conftest.py b/tests/conftest.py index 032b6f3..d48674c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -74,6 +74,35 @@ def array_fixture(request: pytest.FixtureRequest) -> npt.NDArray[Any]: "test_roundtrip[vindex-across_chunks_indices_array-ellipsis]", "test_roundtrip[vindex-ellipsis-contiguous_in_chunk_array]", "test_roundtrip[vindex-ellipsis-discontinuous_in_chunk_array]", + "test_roundtrip_read_only_zarrs[oindex-contiguous_in_chunk_array-ellipsis]", + "test_roundtrip_read_only_zarrs[oindex-discontinuous_in_chunk_array-ellipsis]", + "test_roundtrip_read_only_zarrs[vindex-contiguous_in_chunk_array-ellipsis]", + "test_roundtrip_read_only_zarrs[vindex-discontinuous_in_chunk_array-ellipsis]", + "test_roundtrip_read_only_zarrs[oindex-across_chunks_indices_array-ellipsis]", + "test_roundtrip_read_only_zarrs[vindex-ellipsis-across_chunks_indices_array]", + "test_roundtrip_read_only_zarrs[vindex-across_chunks_indices_array-ellipsis]", + "test_roundtrip_read_only_zarrs[vindex-ellipsis-contiguous_in_chunk_array]", + "test_roundtrip_read_only_zarrs[vindex-ellipsis-discontinuous_in_chunk_array]", +] + +zarrs_python_no_discontinuous_writes = [ + "test_roundtrip[oindex-discontinuous_in_chunk_array-slice_in_chunk]", + "test_roundtrip[oindex-discontinuous_in_chunk_array-slice_across_chunks]", + "test_roundtrip[oindex-discontinuous_in_chunk_array-fill_slice]", + "test_roundtrip[oindex-discontinuous_in_chunk_array-int]", + "test_roundtrip[oindex-slice_in_chunk-discontinuous_in_chunk_array]", + "test_roundtrip[oindex-slice_across_chunks-discontinuous_in_chunk_array]", + "test_roundtrip[oindex-fill_slice-discontinuous_in_chunk_array]", + "test_roundtrip[oindex-int-discontinuous_in_chunk_array]", + "test_roundtrip[oindex-ellipsis-discontinuous_in_chunk_array]", + "test_roundtrip[vindex-discontinuous_in_chunk_array-slice_in_chunk]", + "test_roundtrip[vindex-discontinuous_in_chunk_array-slice_across_chunks]", + "test_roundtrip[vindex-discontinuous_in_chunk_array-fill_slice]", + "test_roundtrip[vindex-discontinuous_in_chunk_array-int]", + "test_roundtrip[vindex-slice_in_chunk-discontinuous_in_chunk_array]", + "test_roundtrip[vindex-slice_across_chunks-discontinuous_in_chunk_array]", + "test_roundtrip[vindex-fill_slice-discontinuous_in_chunk_array]", + "test_roundtrip[vindex-int-discontinuous_in_chunk_array]", ] @@ -84,5 +113,9 @@ def pytest_collection_modifyitems( reason="This test fails with the zarr-python default codec pipeline." ) for item in items: - if item.name in zarr_python_default_codec_pipeline_failures: + if ( + item.name + in zarr_python_default_codec_pipeline_failures + + zarrs_python_no_discontinuous_writes + ): item.add_marker(xfail_marker) diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 455ac2e..fb4258f 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -1,10 +1,10 @@ #!/usr/bin/env python3 import operator -import tempfile +from collections.abc import Callable +from contextlib import contextmanager from functools import reduce from types import EllipsisType -from typing import Callable import numpy as np import pytest @@ -32,7 +32,7 @@ def shape() -> tuple[int, ...]: @pytest.fixture( params=[ np.array([1, 2]), - np.array([0, 2]), + np.array([0, 3]), slice(1, 3), slice(1, 7), np.array([0, 6]), @@ -102,11 +102,10 @@ class smoke: @pytest.fixture -def arr(fill_value, chunks, shape) -> zarr.Array: - tmp = tempfile.TemporaryDirectory() +def arr(fill_value, chunks, shape, tmp_path) -> zarr.Array: return zarr.create( shape, - store=LocalStore(root=tmp.name, mode="w"), + store=LocalStore(root=tmp_path / ".zarr", mode="w"), chunks=chunks, dtype=np.int16, fill_value=fill_value, @@ -154,6 +153,36 @@ def test_roundtrip( ), res +@contextmanager +def use_zarr_default_codec_reader(): + zarr.config.set( + {"codec_pipeline.path": "zarr.codecs.pipeline.BatchedCodecPipeline"} + ) + yield + zarr.config.set({"codec_pipeline.path": "zarrs_python.ZarrsCodecPipeline"}) + + +def test_roundtrip_read_only_zarrs( + arr, + store_values: np.ndarray, + index: tuple[int | slice | np.ndarray | EllipsisType, ...], + indexing_method: Callable, +): + if not isinstance(index, EllipsisType) and all( + isinstance(i, np.ndarray) for i in index + ): + pytest.skip( + "indexing across two axes with arrays seems to have strange behavior even in normal zarr" + ) + with use_zarr_default_codec_reader(): + arr_default = zarr.open(arr.store, mode="r+") + indexing_method(arr_default)[index] = store_values + res = indexing_method(arr)[index] + assert np.all( + res == store_values, + ), res + + def test_roundtrip_ellipsis_indexing_1d_invalid(arr: zarr.Array): stored_value = np.array([1, 2, 3]) with pytest.raises( From 42de375d29179183a7eea1889026fcc0adf0cbf8 Mon Sep 17 00:00:00 2001 From: ilan-gold Date: Fri, 8 Nov 2024 16:19:41 +0100 Subject: [PATCH 08/24] (chore): remove unused imports --- src/lib.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index b860c16..9980c21 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,10 +6,8 @@ use pyo3::exceptions::{PyRuntimeError, PyTypeError, PyValueError}; use pyo3::prelude::*; use pyo3::types::PySlice; use rayon::iter::{IntoParallelIterator, ParallelIterator}; -use rayon::vec; use rayon_iter_concurrent_limit::iter_concurrent_limit; use std::borrow::Cow; -use std::io::{Bytes, Read}; use std::num::NonZeroU64; use std::sync::{Arc, Mutex}; use unsafe_cell_slice::UnsafeCellSlice; From 8b60bed4a02bf94c5732602b4e1005eaa840db16 Mon Sep 17 00:00:00 2001 From: Lachlan Deakin Date: Sat, 9 Nov 2024 06:54:44 +1100 Subject: [PATCH 09/24] (fix): cleanup unwraps in `retrieve_chunks` --- src/lib.rs | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 9980c21..8c38238 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -520,35 +520,35 @@ impl CodecPipelineImpl { let codec_options = &self.codec_options; let get_chunk_subset = |item: ChunksItem| { - // See zarrs::array::Array::retrieve_chunk_into - let chunk_encoded = item.store.get(&item.key).map_py_err::(); - if let Some(chunk_encoded) = chunk_encoded.unwrap() { + let chunk_encoded = item.store.get(&item.key).map_py_err::()?; + Ok(if let Some(chunk_encoded) = chunk_encoded { let chunk_encoded: Vec = chunk_encoded.into(); - self.codec_chain.decode( - Cow::Owned(chunk_encoded), - &item.representation, - codec_options, - ) + self.codec_chain + .decode( + Cow::Owned(chunk_encoded), + &item.representation, + codec_options, + ) + .map_py_err::()? } else { // The chunk is missing so we need to create one. let num_elements = item.representation.num_elements(); let data_type_size = item.representation.data_type().size(); let chunk_shape = ArraySize::new(data_type_size, num_elements); - let array_bytes = - ArrayBytes::new_fill_value(chunk_shape, item.representation.fill_value()); - Ok(array_bytes) + ArrayBytes::new_fill_value(chunk_shape, item.representation.fill_value()) } + .into_fixed() + .map_py_err::()? + .into_owned()) }; - let chunk_bytes: Vec> = iter_concurrent_limit!( + iter_concurrent_limit!( chunk_concurrent_limit, chunk_descriptions, map, get_chunk_subset ) - .map(|x| x.unwrap().into_fixed().unwrap().to_owned().into_owned()) - .collect::>>(); - chunk_bytes - }); + .collect::>>>() + })?; Ok(chunk_bytes .into_iter() .map(|x| x.into_pyarray_bound(py)) From 16262fef9a34284c4a43b96f801f55b770a94fa9 Mon Sep 17 00:00:00 2001 From: "Philipp A." Date: Mon, 11 Nov 2024 12:07:31 +0100 Subject: [PATCH 10/24] Refactor full indexing (#34) --- python/zarrs_python/utils.py | 4 +- src/chunk_item.rs | 185 +++++++++++++++++++++++ src/lib.rs | 278 ++++++++--------------------------- zarrs_python | 1 + 4 files changed, 251 insertions(+), 217 deletions(-) create mode 100644 src/chunk_item.rs create mode 160000 zarrs_python diff --git a/python/zarrs_python/utils.py b/python/zarrs_python/utils.py index 803b344..03b183b 100644 --- a/python/zarrs_python/utils.py +++ b/python/zarrs_python/utils.py @@ -70,10 +70,10 @@ def make_chunk_info_for_rust_with_indices( batch_info: Iterable[ tuple[ByteGetter | ByteSetter, ArraySpec, SelectorTuple, SelectorTuple] ], -) -> list[tuple[str, ChunkCoords, str, Any, list[slice], list[slice]]]: +) -> list[tuple[tuple[str, ChunkCoords, str, Any], list[slice], list[slice]]]: return list( ( - *convert_chunk_to_primitive(byte_getter, chunk_spec), + convert_chunk_to_primitive(byte_getter, chunk_spec), selector_tuple_to_slice_selection(out_selection), selector_tuple_to_slice_selection(chunk_selection), ) diff --git a/src/chunk_item.rs b/src/chunk_item.rs new file mode 100644 index 0000000..c381f9b --- /dev/null +++ b/src/chunk_item.rs @@ -0,0 +1,185 @@ +use std::{num::NonZeroU64, sync::Arc}; + +use pyo3::{ + exceptions::{PyRuntimeError, PyValueError}, + types::{PySlice, PySliceMethods}, + Bound, PyErr, PyResult, +}; +use zarrs::{ + array::{ChunkRepresentation, DataType, FillValue}, + array_subset::ArraySubset, + metadata::v3::{array::data_type::DataTypeMetadataV3, MetadataV3}, + storage::{MaybeBytes, ReadableWritableListableStorageTraits, StorageError, StoreKey}, +}; + +use crate::utils::PyErrExt; + +pub(crate) type Raw<'a> = ( + // store path + String, + // shape + Vec, + // data type + String, + // fill value bytes + Vec, +); + +pub(crate) type RawWithIndices<'a> = ( + Raw<'a>, + // out selection + Vec>, + // chunk selection + Vec>, +); + +pub(crate) trait IntoItem: std::marker::Sized { + fn store_path(&self) -> &str; + fn into_item( + self, + store: Arc, + key: StoreKey, + shape: S, + ) -> PyResult; +} + +pub(crate) trait ChunksItem { + fn store(&self) -> Arc; + fn key(&self) -> &StoreKey; + fn representation(&self) -> &ChunkRepresentation; + + fn get(&self) -> Result { + self.store().get(self.key()) + } +} + +pub(crate) struct Basic { + store: Arc, + key: StoreKey, + representation: ChunkRepresentation, +} + +pub(crate) struct WithSubset { + pub item: Basic, + pub chunk_subset: ArraySubset, + pub subset: ArraySubset, +} + +impl ChunksItem for Basic { + fn store(&self) -> Arc { + self.store.clone() + } + fn key(&self) -> &StoreKey { + &self.key + } + fn representation(&self) -> &ChunkRepresentation { + &self.representation + } +} + +impl ChunksItem for WithSubset { + fn store(&self) -> Arc { + self.item.store.clone() + } + fn key(&self) -> &StoreKey { + &self.item.key + } + fn representation(&self) -> &ChunkRepresentation { + &self.item.representation + } +} + +impl<'a> IntoItem for Raw<'a> { + fn store_path(&self) -> &str { + &self.0 + } + fn into_item( + self, + store: Arc, + key: StoreKey, + (): (), + ) -> PyResult { + let (_, chunk_shape, dtype, fill_value) = self; + let representation = get_chunk_representation(chunk_shape, &dtype, fill_value)?; + Ok(Basic { + store, + key, + representation, + }) + } +} + +impl IntoItem for RawWithIndices<'_> { + fn store_path(&self) -> &str { + &self.0 .0 + } + fn into_item( + self, + store: Arc, + key: StoreKey, + shape: &[u64], + ) -> PyResult { + let (raw, selection, chunk_selection) = self; + let chunk_shape = raw.1.clone(); + let item = raw.into_item(store.clone(), key, ())?; + Ok(WithSubset { + item, + chunk_subset: selection_to_array_subset(&chunk_selection, &chunk_shape)?, + subset: selection_to_array_subset(&selection, shape)?, + }) + } +} + +fn get_chunk_representation( + chunk_shape: Vec, + dtype: &str, + fill_value: Vec, +) -> PyResult { + // Get the chunk representation + let data_type = + DataType::from_metadata(&DataTypeMetadataV3::from_metadata(&MetadataV3::new(dtype))) + .map_py_err::()?; + let chunk_shape = chunk_shape + .into_iter() + .map(|x| NonZeroU64::new(x).expect("chunk shapes should always be non-zero")) + .collect(); + let chunk_representation = + ChunkRepresentation::new(chunk_shape, data_type, FillValue::new(fill_value)) + .map_py_err::()?; + Ok(chunk_representation) +} + +fn slice_to_range(slice: &Bound<'_, PySlice>, length: isize) -> PyResult> { + let indices = slice.indices(length)?; + if indices.start < 0 { + Err(PyErr::new::( + "slice start must be greater than or equal to 0".to_string(), + )) + } else if indices.stop < 0 { + Err(PyErr::new::( + "slice stop must be greater than or equal to 0".to_string(), + )) + } else if indices.step != 1 { + Err(PyErr::new::( + "slice step must be equal to 1".to_string(), + )) + } else { + Ok(u64::try_from(indices.start)?..u64::try_from(indices.stop)?) + } +} + +fn selection_to_array_subset( + selection: &[Bound<'_, PySlice>], + shape: &[u64], +) -> PyResult { + if selection.is_empty() { + Ok(ArraySubset::new_with_shape(vec![1; shape.len()])) + } else { + let chunk_ranges = selection + .iter() + .zip(shape) + .map(|(selection, &shape)| slice_to_range(selection, isize::try_from(shape)?)) + .collect::>>()?; + Ok(ArraySubset::new_with_ranges(&chunk_ranges)) + } +} diff --git a/src/lib.rs b/src/lib.rs index 24bf1bb..7d8f001 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,28 +1,26 @@ #![warn(clippy::pedantic)] +use chunk_item::{ChunksItem, IntoItem}; use numpy::npyffi::PyArrayObject; -use numpy::{IntoPyArray, PyArray, PyUntypedArray, PyUntypedArrayMethods}; +use numpy::{IntoPyArray, PyArray1, PyUntypedArray, PyUntypedArrayMethods}; use pyo3::exceptions::{PyRuntimeError, PyTypeError, PyValueError}; use pyo3::prelude::*; -use pyo3::types::PySlice; use rayon::iter::{IntoParallelIterator, ParallelIterator}; use rayon_iter_concurrent_limit::iter_concurrent_limit; use std::borrow::Cow; -use std::num::NonZeroU64; use std::sync::{Arc, Mutex}; use unsafe_cell_slice::UnsafeCellSlice; use zarrs::array::codec::{ ArrayToBytesCodecTraits, CodecOptions, CodecOptionsBuilder, StoragePartialDecoder, }; use zarrs::array::{ - copy_fill_value_into, update_array_bytes, ArrayBytes, ArraySize, ChunkRepresentation, - CodecChain, DataType, FillValue, + copy_fill_value_into, update_array_bytes, ArrayBytes, ArraySize, CodecChain, FillValue, }; use zarrs::array_subset::ArraySubset; -use zarrs::metadata::v3::array::data_type::DataTypeMetadataV3; use zarrs::metadata::v3::MetadataV3; use zarrs::storage::{ReadableWritableListableStorageTraits, StorageHandle, StoreKey}; +mod chunk_item; mod codec_pipeline_store_filesystem; #[cfg(test)] mod tests; @@ -70,126 +68,65 @@ impl CodecPipelineImpl { } } - fn collect_chunk_descriptions( + fn collect_chunk_descriptions, I, S: Copy>( &self, - chunk_descriptions: Vec, - ) -> PyResult> { + chunk_descriptions: Vec, + shape: S, + ) -> PyResult> { chunk_descriptions .into_iter() - .map(|(store_path, chunk_shape, dtype, fill_value)| { - let (store, path) = self.get_store_and_path(&store_path)?; + .map(|raw| { + let (store, path) = self.get_store_and_path(raw.store_path())?; let key = StoreKey::new(path).map_py_err::()?; - Ok(ChunksItem { - store, - key, - representation: Self::get_chunk_representation( - chunk_shape, - &dtype, - fill_value, - )?, - }) + raw.into_item(store, key, shape) }) .collect() } - fn collect_chunk_descriptions_with_index( - &self, - chunk_descriptions: Vec, - shape: &[u64], - ) -> PyResult> { - chunk_descriptions - .into_iter() - .map( - |(store_path, chunk_shape, dtype, fill_value, selection, chunk_selection)| { - let (store, path) = self.get_store_and_path(&store_path)?; - let key = StoreKey::new(path).map_py_err::()?; - Ok(ChunksItemWithSubset { - store, - key, - chunk_subset: Self::selection_to_array_subset( - &chunk_selection, - &chunk_shape, - )?, - subset: Self::selection_to_array_subset(&selection, shape)?, - representation: Self::get_chunk_representation( - chunk_shape, - &dtype, - fill_value, - )?, - }) - }, - ) - .collect() - } - fn get_chunk_representation( - chunk_shape: Vec, - dtype: &str, - fill_value: Vec, - ) -> PyResult { - // Get the chunk representation - let data_type = - DataType::from_metadata(&DataTypeMetadataV3::from_metadata(&MetadataV3::new(dtype))) - .map_py_err::()?; - let chunk_shape = chunk_shape - .into_iter() - .map(|x| NonZeroU64::new(x).expect("chunk shapes should always be non-zero")) - .collect(); - let chunk_representation = - ChunkRepresentation::new(chunk_shape, data_type, FillValue::new(fill_value)) - .map_py_err::()?; - Ok(chunk_representation) - } - - fn retrieve_chunk_bytes<'a>( - store: &dyn ReadableWritableListableStorageTraits, - key: &StoreKey, + fn retrieve_chunk_bytes<'a, I: ChunksItem>( + item: &I, codec_chain: &CodecChain, - chunk_representation: &ChunkRepresentation, codec_options: &CodecOptions, ) -> PyResult> { - let value_encoded = store.get(key).map_py_err::()?; + let value_encoded = item.get().map_py_err::()?; let value_decoded = if let Some(value_encoded) = value_encoded { let value_encoded: Vec = value_encoded.into(); // zero-copy in this case codec_chain - .decode(value_encoded.into(), chunk_representation, codec_options) + .decode(value_encoded.into(), item.representation(), codec_options) .map_py_err::()? } else { let array_size = ArraySize::new( - chunk_representation.data_type().size(), - chunk_representation.num_elements(), + item.representation().data_type().size(), + item.representation().num_elements(), ); - ArrayBytes::new_fill_value(array_size, chunk_representation.fill_value()) + ArrayBytes::new_fill_value(array_size, item.representation().fill_value()) }; Ok(value_decoded) } - fn store_chunk_bytes( - store: &dyn ReadableWritableListableStorageTraits, - key: &StoreKey, + fn store_chunk_bytes( + item: &I, codec_chain: &CodecChain, - chunk_representation: &ChunkRepresentation, value_decoded: ArrayBytes, codec_options: &CodecOptions, ) -> PyResult<()> { - if value_decoded.is_fill_value(chunk_representation.fill_value()) { - store.erase(key) + if value_decoded.is_fill_value(item.representation().fill_value()) { + item.store().erase(item.key()) } else { let value_encoded = codec_chain - .encode(value_decoded, chunk_representation, codec_options) + .encode(value_decoded, item.representation(), codec_options) .map(Cow::into_owned) .map_py_err::()?; // Store the encoded chunk - store.set(key, value_encoded.into()) + item.store().set(item.key(), value_encoded.into()) } .map_py_err::() } - fn store_chunk_subset_bytes( - store: &dyn ReadableWritableListableStorageTraits, - key: &StoreKey, + fn store_chunk_subset_bytes( + item: &I, codec_chain: &CodecChain, - chunk_representation: &ChunkRepresentation, chunk_subset_bytes: &ArrayBytes, chunk_subset: &ArraySubset, codec_options: &CodecOptions, @@ -198,23 +135,17 @@ impl CodecPipelineImpl { chunk_subset_bytes .validate( chunk_subset.num_elements(), - chunk_representation.data_type().size(), + item.representation().data_type().size(), ) .map_py_err::()?; - if !chunk_subset.inbounds(&chunk_representation.shape_u64()) { + if !chunk_subset.inbounds(&item.representation().shape_u64()) { return Err(PyErr::new::( "chunk subset is out of bounds".to_string(), )); } // Retrieve the chunk - let chunk_bytes_old = Self::retrieve_chunk_bytes( - store, - key, - codec_chain, - chunk_representation, - codec_options, - )?; + let chunk_bytes_old = Self::retrieve_chunk_bytes(item, codec_chain, codec_options)?; // Update the chunk let chunk_bytes_new = unsafe { @@ -225,57 +156,15 @@ impl CodecPipelineImpl { // - output bytes and output subset bytes are compatible (same data type) update_array_bytes( chunk_bytes_old, - &chunk_representation.shape_u64(), + &item.representation().shape_u64(), chunk_subset, chunk_subset_bytes, - chunk_representation.data_type().size(), + item.representation().data_type().size(), ) }; // Store the updated chunk - Self::store_chunk_bytes( - store, - key, - codec_chain, - chunk_representation, - chunk_bytes_new, - codec_options, - ) - } - - fn slice_to_range(slice: &Bound<'_, PySlice>, length: isize) -> PyResult> { - let indices = slice.indices(length)?; - if indices.start < 0 { - Err(PyErr::new::( - "slice start must be greater than or equal to 0".to_string(), - )) - } else if indices.stop < 0 { - Err(PyErr::new::( - "slice stop must be greater than or equal to 0".to_string(), - )) - } else if indices.step != 1 { - Err(PyErr::new::( - "slice step must be equal to 1".to_string(), - )) - } else { - Ok(u64::try_from(indices.start)?..u64::try_from(indices.stop)?) - } - } - - fn selection_to_array_subset( - selection: &[Bound<'_, PySlice>], - shape: &[u64], - ) -> PyResult { - if selection.is_empty() { - Ok(ArraySubset::new_with_shape(vec![1; shape.len()])) - } else { - let chunk_ranges = selection - .iter() - .zip(shape) - .map(|(selection, &shape)| Self::slice_to_range(selection, isize::try_from(shape)?)) - .collect::>>()?; - Ok(ArraySubset::new_with_ranges(&chunk_ranges)) - } + Self::store_chunk_bytes(item, codec_chain, chunk_bytes_new, codec_options) } fn pyarray_itemsize(value: &Bound<'_, PyUntypedArray>) -> usize { @@ -325,45 +214,6 @@ impl CodecPipelineImpl { } } -type ChunksItemRawWithIndices<'a> = ( - // store path - String, - // shape - Vec, - // data type - String, - // fill value bytes - Vec, - // out selection - Vec>, - // chunk selection - Vec>, -); - -type ChunksItemRaw<'a> = ( - // store path - String, - // shape - Vec, - // data type - String, - // fill value bytes - Vec, -); - -struct ChunksItemWithSubset { - store: Arc, - key: StoreKey, - chunk_subset: ArraySubset, - subset: ArraySubset, - representation: ChunkRepresentation, -} -struct ChunksItem { - store: Arc, - key: StoreKey, - representation: ChunkRepresentation, -} - #[pymethods] impl CodecPipelineImpl { #[pyo3(signature = (metadata, validate_checksums=None, store_empty_chunks=None, concurrent_target=None))] @@ -400,7 +250,7 @@ impl CodecPipelineImpl { fn retrieve_chunks_and_apply_index( &self, py: Python, - chunk_descriptions: Vec, // FIXME: Ref / iterable? + chunk_descriptions: Vec, // FIXME: Ref / iterable? value: &Bound<'_, PyUntypedArray>, chunk_concurrent_limit: usize, ) -> PyResult<()> { @@ -413,18 +263,18 @@ impl CodecPipelineImpl { let output = Self::nparray_to_unsafe_cell_slice(value); let output_shape: Vec = value.shape_zarr()?; let chunk_descriptions = - self.collect_chunk_descriptions_with_index(chunk_descriptions, &output_shape)?; + self.collect_chunk_descriptions(chunk_descriptions, &output_shape)?; py.allow_threads(move || { let codec_options = &self.codec_options; - let update_chunk_subset = |item: ChunksItemWithSubset| { + let update_chunk_subset = |item: chunk_item::WithSubset| { // See zarrs::array::Array::retrieve_chunk_subset_into if item.chunk_subset.start().iter().all(|&o| o == 0) - && item.chunk_subset.shape() == item.representation.shape_u64() + && item.chunk_subset.shape() == item.representation().shape_u64() { // See zarrs::array::Array::retrieve_chunk_into - let chunk_encoded = item.store.get(&item.key).map_py_err::()?; + let chunk_encoded = item.get().map_py_err::()?; if let Some(chunk_encoded) = chunk_encoded { // Decode the encoded data into the output buffer let chunk_encoded: Vec = chunk_encoded.into(); @@ -434,7 +284,7 @@ impl CodecPipelineImpl { // - item.subset is within the bounds of output_shape. self.codec_chain.decode_into( Cow::Owned(chunk_encoded), - &item.representation, + item.representation(), &output, &output_shape, &item.subset, @@ -449,8 +299,8 @@ impl CodecPipelineImpl { // - output is an array with output_shape elements of the item.representation data type, // - item.subset is within the bounds of output_shape. copy_fill_value_into( - item.representation.data_type(), - item.representation.fill_value(), + item.representation().data_type(), + item.representation().fill_value(), &output, &output_shape, &item.subset, @@ -459,15 +309,17 @@ impl CodecPipelineImpl { } } else { // Partially decode the chunk into the output buffer - let storage_handle = Arc::new(StorageHandle::new(item.store.clone())); + let storage_handle = Arc::new(StorageHandle::new(item.store().clone())); // NOTE: Normally a storage transformer would exist between the storage handle and the input handle // but zarr-python does not support them nor forward them to the codec pipeline - let input_handle = - Arc::new(StoragePartialDecoder::new(storage_handle, item.key)); + let input_handle = Arc::new(StoragePartialDecoder::new( + storage_handle, + item.key().clone(), + )); let partial_decoder = self .codec_chain .clone() - .partial_decoder(input_handle, &item.representation, codec_options) + .partial_decoder(input_handle, item.representation(), codec_options) .map_py_err::()?; unsafe { // SAFETY: @@ -500,31 +352,31 @@ impl CodecPipelineImpl { fn retrieve_chunks<'py>( &self, py: Python<'py>, - chunk_descriptions: Vec, // FIXME: Ref / iterable? + chunk_descriptions: Vec, // FIXME: Ref / iterable? chunk_concurrent_limit: usize, - ) -> PyResult>>>> { - let chunk_descriptions = self.collect_chunk_descriptions(chunk_descriptions)?; + ) -> PyResult>>> { + let chunk_descriptions = self.collect_chunk_descriptions(chunk_descriptions, ())?; let chunk_bytes = py.allow_threads(move || { let codec_options = &self.codec_options; - let get_chunk_subset = |item: ChunksItem| { - let chunk_encoded = item.store.get(&item.key).map_py_err::()?; + let get_chunk_subset = |item: chunk_item::Basic| { + let chunk_encoded = item.get().map_py_err::()?; Ok(if let Some(chunk_encoded) = chunk_encoded { let chunk_encoded: Vec = chunk_encoded.into(); self.codec_chain .decode( Cow::Owned(chunk_encoded), - &item.representation, + item.representation(), codec_options, ) .map_py_err::()? } else { // The chunk is missing so we need to create one. - let num_elements = item.representation.num_elements(); - let data_type_size = item.representation.data_type().size(); + let num_elements = item.representation().num_elements(); + let data_type_size = item.representation().data_type().size(); let chunk_shape = ArraySize::new(data_type_size, num_elements); - ArrayBytes::new_fill_value(chunk_shape, item.representation.fill_value()) + ArrayBytes::new_fill_value(chunk_shape, item.representation().fill_value()) } .into_fixed() .map_py_err::()? @@ -541,13 +393,13 @@ impl CodecPipelineImpl { Ok(chunk_bytes .into_iter() .map(|x| x.into_pyarray_bound(py)) - .collect::>>>>()) + .collect()) } fn store_chunks_with_indices( &self, py: Python, - chunk_descriptions: Vec, + chunk_descriptions: Vec, value: &Bound<'_, PyUntypedArray>, chunk_concurrent_limit: usize, ) -> PyResult<()> { @@ -572,25 +424,23 @@ impl CodecPipelineImpl { let input_shape: Vec = value.shape_zarr()?; let chunk_descriptions = - self.collect_chunk_descriptions_with_index(chunk_descriptions, &input_shape)?; + self.collect_chunk_descriptions(chunk_descriptions, &input_shape)?; py.allow_threads(move || { let codec_options = &self.codec_options; - let store_chunk = |item: ChunksItemWithSubset| match &input { + let store_chunk = |item: chunk_item::WithSubset| match &input { InputValue::Array(input) => { let chunk_subset_bytes = input .extract_array_subset( &item.subset, &input_shape, - item.representation.data_type(), + item.item.representation().data_type(), ) .map_py_err::()?; Self::store_chunk_subset_bytes( - item.store.as_ref(), - &item.key, + &item, &self.codec_chain, - &item.representation, &chunk_subset_bytes, &item.chunk_subset, codec_options, @@ -599,17 +449,15 @@ impl CodecPipelineImpl { InputValue::Constant(constant_value) => { let chunk_subset_bytes = ArrayBytes::new_fill_value( ArraySize::new( - item.representation.data_type().size(), + item.representation().data_type().size(), item.chunk_subset.num_elements(), ), constant_value, ); Self::store_chunk_subset_bytes( - item.store.as_ref(), - &item.key, + &item, &self.codec_chain, - &item.representation, &chunk_subset_bytes, &item.chunk_subset, codec_options, diff --git a/zarrs_python b/zarrs_python new file mode 160000 index 0000000..66bca18 --- /dev/null +++ b/zarrs_python @@ -0,0 +1 @@ +Subproject commit 66bca1812b78ea630a24124fbca044318722487e From 96d8a2ef3786e14a8e0b50fd891033f7b84bf447 Mon Sep 17 00:00:00 2001 From: ilan-gold Date: Mon, 11 Nov 2024 13:25:05 +0100 Subject: [PATCH 11/24] (chore): `make_chunk_info_for_rust` cleanup --- python/zarrs_python/utils.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/python/zarrs_python/utils.py b/python/zarrs_python/utils.py index 03b183b..b549a68 100644 --- a/python/zarrs_python/utils.py +++ b/python/zarrs_python/utils.py @@ -82,9 +82,11 @@ def make_chunk_info_for_rust_with_indices( def make_chunk_info_for_rust( - batch_info: Iterable[tuple[ByteGetter, ArraySpec, SelectorTuple, SelectorTuple]], + batch_info: Iterable[ + tuple[ByteGetter | ByteSetter, ArraySpec, SelectorTuple, SelectorTuple] + ], ) -> list[tuple[str, ChunkCoords, str, Any]]: return list( convert_chunk_to_primitive(byte_getter, chunk_spec) - for (byte_getter, chunk_spec, chunk_selection, out_selection) in batch_info + for (byte_getter, chunk_spec, _, _) in batch_info ) From fbfa958c25fd0305a37ac531bb5accc8d310fd8e Mon Sep 17 00:00:00 2001 From: ilan-gold Date: Mon, 11 Nov 2024 14:35:50 +0100 Subject: [PATCH 12/24] (fix): all tests working except "tests/test_pipeline.py::test_roundtrip[vindex-contiguous_in_chunk_array-contiguous_in_chunk_array]" --- tests/conftest.py | 11 ++++++++ tests/test_pipeline.py | 58 +++++++++++++++++++++++------------------- 2 files changed, 43 insertions(+), 26 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index d48674c..9d2c7f7 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -65,6 +65,7 @@ def array_fixture(request: pytest.FixtureRequest) -> npt.NDArray[Any]: # tests that also fail with zarr-python's default codec pipeline zarr_python_default_codec_pipeline_failures = [ + # ellipsis weirdness, need to report "test_roundtrip[oindex-contiguous_in_chunk_array-ellipsis]", "test_roundtrip[oindex-discontinuous_in_chunk_array-ellipsis]", "test_roundtrip[vindex-contiguous_in_chunk_array-ellipsis]", @@ -83,6 +84,8 @@ def array_fixture(request: pytest.FixtureRequest) -> npt.NDArray[Any]: "test_roundtrip_read_only_zarrs[vindex-across_chunks_indices_array-ellipsis]", "test_roundtrip_read_only_zarrs[vindex-ellipsis-contiguous_in_chunk_array]", "test_roundtrip_read_only_zarrs[vindex-ellipsis-discontinuous_in_chunk_array]", + # need to investigate this one - it seems to fail with the default pipeline + "test_roundtrip_read_only_zarrs[vindex-contiguous_in_chunk_array-contiguous_in_chunk_array]", ] zarrs_python_no_discontinuous_writes = [ @@ -103,6 +106,14 @@ def array_fixture(request: pytest.FixtureRequest) -> npt.NDArray[Any]: "test_roundtrip[vindex-slice_across_chunks-discontinuous_in_chunk_array]", "test_roundtrip[vindex-fill_slice-discontinuous_in_chunk_array]", "test_roundtrip[vindex-int-discontinuous_in_chunk_array]", + "test_roundtrip[oindex-discontinuous_in_chunk_array-contiguous_in_chunk_array]", + "test_roundtrip[oindex-contiguous_in_chunk_array-discontinuous_in_chunk_array]", + "test_roundtrip[oindex-across_chunks_indices_array-discontinuous_in_chunk_array]", + "test_roundtrip[oindex-discontinuous_in_chunk_array-discontinuous_in_chunk_array]", + "test_roundtrip[vindex-contiguous_in_chunk_array-discontinuous_in_chunk_array]", + "test_roundtrip[vindex-discontinuous_in_chunk_array-discontinuous_in_chunk_array]", + "test_roundtrip[oindex-discontinuous_in_chunk_array-across_chunks_indices_array]", + "test_roundtrip[vindex-discontinuous_in_chunk_array-contiguous_in_chunk_array]", ] diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index fb4258f..8d48324 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -66,7 +66,7 @@ def full_array(shape) -> np.ndarray: @pytest.fixture def index(indexer_1d, indexer_1d_2): if isinstance(indexer_1d, EllipsisType) and isinstance(indexer_1d_2, EllipsisType): - pytest.skip("Double ellipsis indexing is valid") + pytest.skip("Double ellipsis indexing is not valid") return indexer_1d, indexer_1d_2 @@ -85,19 +85,37 @@ def store_values( shape: tuple[int, ...], ) -> np.ndarray: class smoke: - oindex = None - - if not isinstance(index, EllipsisType) and indexing_method(smoke) == "oindex": - index: tuple[int | np.ndarray, ...] = tuple( - i - if (not isinstance(i, slice)) - else np.arange( - i.start if hasattr(i, "start") else 0, - i.stop if hasattr(i, "end") else shape[axis], + oindex = "oindex" + + def maybe_convert( + i: int | np.ndarray | slice | EllipsisType, axis: int + ) -> np.ndarray: + if isinstance(i, np.ndarray): + return i + if isinstance(i, slice): + return np.arange( + i.start if i.start is not None else 0, + i.stop if i.stop is not None else shape[axis], ) - for axis, i in enumerate(index) + if isinstance(i, int): + return np.array([i]) + if isinstance(i, EllipsisType): + return np.arange(shape[axis]) + raise ValueError(f"Invalid index {i}") + + if not isinstance(index, EllipsisType) and indexing_method(smoke()) == "oindex": + index: tuple[np.ndarray, ...] = tuple( + maybe_convert(i, axis) for axis, i in enumerate(index) ) - return full_array[np.ix_(index)] + res = full_array[np.ix_(*index)] + # squeeze out extra dims from integer indexers + if all(i.shape == (1,) for i in index): + res = res.squeeze() + return res + for axis, i in enumerate(index): + if i.shape == (1,): + res = res.squeeze(axis=axis) + return res return full_array[index] @@ -140,12 +158,6 @@ def test_roundtrip( index: tuple[int | slice | np.ndarray | EllipsisType, ...], indexing_method: Callable, ): - if not isinstance(index, EllipsisType) and all( - isinstance(i, np.ndarray) for i in index - ): - pytest.skip( - "indexing across two axes with arrays seems to have strange behavior even in normal zarr" - ) indexing_method(arr)[index] = store_values res = indexing_method(arr)[index] assert np.all( @@ -163,21 +175,15 @@ def use_zarr_default_codec_reader(): def test_roundtrip_read_only_zarrs( - arr, + arr: zarr.Array, store_values: np.ndarray, index: tuple[int | slice | np.ndarray | EllipsisType, ...], indexing_method: Callable, ): - if not isinstance(index, EllipsisType) and all( - isinstance(i, np.ndarray) for i in index - ): - pytest.skip( - "indexing across two axes with arrays seems to have strange behavior even in normal zarr" - ) with use_zarr_default_codec_reader(): arr_default = zarr.open(arr.store, mode="r+") indexing_method(arr_default)[index] = store_values - res = indexing_method(arr)[index] + res = indexing_method(zarr.open(arr.store))[index] assert np.all( res == store_values, ), res From 9f90b5ee034cfc882a39a2820025fe580e4cdf5b Mon Sep 17 00:00:00 2001 From: Lachlan Deakin Date: Tue, 12 Nov 2024 08:04:54 +1100 Subject: [PATCH 13/24] (fix): skip read in `store_chunk_subset_bytes` for full chunks --- src/lib.rs | 75 ++++++++++++++++++++++++++++++++---------------------- 1 file changed, 45 insertions(+), 30 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 7d8f001..3b988ce 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -110,6 +110,13 @@ impl CodecPipelineImpl { value_decoded: ArrayBytes, codec_options: &CodecOptions, ) -> PyResult<()> { + value_decoded + .validate( + item.representation().num_elements(), + item.representation().data_type().size(), + ) + .map_py_err::()?; + if value_decoded.is_fill_value(item.representation().fill_value()) { item.store().erase(item.key()) } else { @@ -127,44 +134,52 @@ impl CodecPipelineImpl { fn store_chunk_subset_bytes( item: &I, codec_chain: &CodecChain, - chunk_subset_bytes: &ArrayBytes, + chunk_subset_bytes: ArrayBytes, chunk_subset: &ArraySubset, codec_options: &CodecOptions, ) -> PyResult<()> { - // Validate the inputs - chunk_subset_bytes - .validate( - chunk_subset.num_elements(), - item.representation().data_type().size(), - ) - .map_py_err::()?; if !chunk_subset.inbounds(&item.representation().shape_u64()) { return Err(PyErr::new::( "chunk subset is out of bounds".to_string(), )); } - // Retrieve the chunk - let chunk_bytes_old = Self::retrieve_chunk_bytes(item, codec_chain, codec_options)?; - - // Update the chunk - let chunk_bytes_new = unsafe { - // SAFETY: - // - chunk_bytes_old is compatible with the chunk shape and data type size (validated on decoding) - // - chunk_subset is compatible with chunk_subset_bytes and the data type size (validated above) - // - chunk_subset is within the bounds of the chunk shape (validated above) - // - output bytes and output subset bytes are compatible (same data type) - update_array_bytes( - chunk_bytes_old, - &item.representation().shape_u64(), - chunk_subset, - chunk_subset_bytes, - item.representation().data_type().size(), - ) - }; + if chunk_subset.start().iter().all(|&o| o == 0) + && chunk_subset.shape() == item.representation().shape_u64() + { + // Fast path if the chunk subset spans the entire chunk, no read required + Self::store_chunk_bytes(item, codec_chain, chunk_subset_bytes, codec_options) + } else { + // Validate the chunk subset bytes + chunk_subset_bytes + .validate( + chunk_subset.num_elements(), + item.representation().data_type().size(), + ) + .map_py_err::()?; + + // Retrieve the chunk + let chunk_bytes_old = Self::retrieve_chunk_bytes(item, codec_chain, codec_options)?; + + // Update the chunk + let chunk_bytes_new = unsafe { + // SAFETY: + // - chunk_bytes_old is compatible with the chunk shape and data type size (validated on decoding) + // - chunk_subset is compatible with chunk_subset_bytes and the data type size (validated above) + // - chunk_subset is within the bounds of the chunk shape (validated above) + // - output bytes and output subset bytes are compatible (same data type) + update_array_bytes( + chunk_bytes_old, + &item.representation().shape_u64(), + chunk_subset, + &chunk_subset_bytes, + item.representation().data_type().size(), + ) + }; - // Store the updated chunk - Self::store_chunk_bytes(item, codec_chain, chunk_bytes_new, codec_options) + // Store the updated chunk + Self::store_chunk_bytes(item, codec_chain, chunk_bytes_new, codec_options) + } } fn pyarray_itemsize(value: &Bound<'_, PyUntypedArray>) -> usize { @@ -441,7 +456,7 @@ impl CodecPipelineImpl { Self::store_chunk_subset_bytes( &item, &self.codec_chain, - &chunk_subset_bytes, + chunk_subset_bytes, &item.chunk_subset, codec_options, ) @@ -458,7 +473,7 @@ impl CodecPipelineImpl { Self::store_chunk_subset_bytes( &item, &self.codec_chain, - &chunk_subset_bytes, + chunk_subset_bytes, &item.chunk_subset, codec_options, ) From deba69ec266f8c183f55fc3b7383235998c6d243 Mon Sep 17 00:00:00 2001 From: ilan-gold Date: Tue, 12 Nov 2024 16:44:34 +0100 Subject: [PATCH 14/24] (fix): improve dropped index detection + disallow integer write case --- python/zarrs_python/pipeline.py | 7 +-- python/zarrs_python/utils.py | 93 ++++++++++++++++++++++++++++++++- tests/conftest.py | 9 ++++ 3 files changed, 105 insertions(+), 4 deletions(-) diff --git a/python/zarrs_python/pipeline.py b/python/zarrs_python/pipeline.py index b89ca16..bc81ddd 100644 --- a/python/zarrs_python/pipeline.py +++ b/python/zarrs_python/pipeline.py @@ -25,6 +25,7 @@ from ._internal import CodecPipelineImpl from .utils import ( + CollapsedDimensionError, DiscontiguousArrayError, get_max_threads, make_chunk_info_for_rust, @@ -100,9 +101,9 @@ async def read( if not out.dtype.isnative: raise RuntimeError("Non-native byte order not supported") try: - chunks_desc = make_chunk_info_for_rust_with_indices(batch_info) + chunks_desc = make_chunk_info_for_rust_with_indices(batch_info, drop_axes) index_in_rust = True - except DiscontiguousArrayError: + except (DiscontiguousArrayError, CollapsedDimensionError): chunks_desc = make_chunk_info_for_rust(batch_info) index_in_rust = False if index_in_rust: @@ -142,7 +143,7 @@ async def write( value = np.ascontiguousarray(value, dtype=value.dtype.newbyteorder("=")) elif not value.flags.c_contiguous: value = np.ascontiguousarray(value) - chunks_desc = make_chunk_info_for_rust_with_indices(batch_info) + chunks_desc = make_chunk_info_for_rust_with_indices(batch_info, drop_axes) await asyncio.to_thread( self.impl.store_chunks_with_indices, chunks_desc, diff --git a/python/zarrs_python/utils.py b/python/zarrs_python/utils.py index b549a68..f66cb20 100644 --- a/python/zarrs_python/utils.py +++ b/python/zarrs_python/utils.py @@ -8,6 +8,7 @@ if TYPE_CHECKING: from collections.abc import Iterable + from types import EllipsisType from zarr.abc.store import ByteGetter, ByteSetter from zarr.core.array_spec import ArraySpec @@ -23,7 +24,11 @@ class DiscontiguousArrayError(BaseException): pass -# This is a copy of the function from zarr.core.indexing that fixes: +class CollapsedDimensionError(BaseException): + pass + + +# This is a (mostly) copy of the function from zarr.core.indexing that fixes: # DeprecationWarning: Conversion of an array with ndim > 0 to a scalar is deprecated # TODO: Upstream this fix def make_slice_selection(selection: tuple[np.ndarray | float]) -> list[slice]: @@ -44,6 +49,14 @@ def make_slice_selection(selection: tuple[np.ndarray | float]) -> list[slice]: ls.append(slice(dim_selection[0], dim_selection[-1] + 1, 1)) else: ls.append(dim_selection) + if ( + sum(isinstance(dim_selection, np.ndarray) for dim_selection in selection) + == sum(isinstance(dim_selection, slice) for dim_selection in selection) + == len(selection) + ): + raise DiscontiguousArrayError( + "Vindexing with multiple contiguous numpy arrays is not supported" + ) return ls @@ -66,11 +79,89 @@ def convert_chunk_to_primitive( ) +def resulting_shape_from_index( + array_shape: tuple[int, ...], + index_tuple: tuple[int | slice | EllipsisType | np.ndarray], + drop_axes: tuple[int, ...], + *, + pad: bool, +) -> tuple[int, ...]: + result_shape = [] + advanced_index_shapes = [ + idx.shape for idx in index_tuple if isinstance(idx, np.ndarray) + ] + basic_shape_index = 0 + + # Broadcast all advanced indices, if any + if advanced_index_shapes: + broadcasted_shape = np.broadcast_shapes(*advanced_index_shapes) + result_shape.extend(broadcasted_shape) + basic_shape_index += len( + advanced_index_shapes + ) # Consume dimensions from array_shape + + # Process each remaining index in index_tuple + for idx in index_tuple: + if isinstance(idx, int): + # Integer index reduces dimension, so skip this dimension in array_shape + basic_shape_index += 1 + elif isinstance(idx, slice): + if idx.step is not None and idx.step > 1: + raise DiscontiguousArrayError( + "Step size greater than 1 is not supported" + ) + # Slice keeps dimension, adjust size accordingly + start, stop, _ = idx.indices(array_shape[basic_shape_index]) + result_shape.append(stop - start) + basic_shape_index += 1 + elif idx is Ellipsis: + # Calculate number of dimensions that Ellipsis should fill + num_to_fill = len(array_shape) - len(index_tuple) + 1 + result_shape.extend( + array_shape[basic_shape_index : basic_shape_index + num_to_fill] + ) + basic_shape_index += num_to_fill + + # Step 4: Append remaining dimensions from array_shape if fewer indices were used + if basic_shape_index < len(array_shape) and pad: + result_shape.extend(array_shape[basic_shape_index:]) + + return tuple(size for idx, size in enumerate(result_shape) if idx not in drop_axes) + + +def get_shape_for_selector( + selector_tuple: SelectorTuple, + shape: tuple[int, ...], + drop_axes: tuple[int, ...], + *, + pad: bool, +) -> tuple[int, ...]: + if isinstance(selector_tuple, slice | np.ndarray): + return resulting_shape_from_index( + shape, + (selector_tuple,), + drop_axes, + pad=pad, + ) + return resulting_shape_from_index(shape, selector_tuple, drop_axes, pad=pad) + + def make_chunk_info_for_rust_with_indices( batch_info: Iterable[ tuple[ByteGetter | ByteSetter, ArraySpec, SelectorTuple, SelectorTuple] ], + drop_axes: tuple, ) -> list[tuple[tuple[str, ChunkCoords, str, Any], list[slice], list[slice]]]: + # all? + for _, chunk_spec, chunk_selection, out_selection in batch_info: + shape_out_selection = get_shape_for_selector( + out_selection, chunk_spec.shape, (), pad=False + ) + shape_chunk_selection = get_shape_for_selector( + chunk_selection, chunk_spec.shape, drop_axes, pad=True + ) + if len(shape_chunk_selection) != len(shape_out_selection): + raise CollapsedDimensionError() return list( ( convert_chunk_to_primitive(byte_getter, chunk_spec), diff --git a/tests/conftest.py b/tests/conftest.py index 9d2c7f7..9bc6baf 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -116,6 +116,14 @@ def array_fixture(request: pytest.FixtureRequest) -> npt.NDArray[Any]: "test_roundtrip[vindex-discontinuous_in_chunk_array-contiguous_in_chunk_array]", ] +# vindexing with two contiguous arrays would be converted to two slices but +# in numpy indexing actually requires dropping a dimension, which in turn boils +# down to integer indexing, which we can't do i.e., [np.array(1, 2), np.array(1, 2)] -> [slice(1, 2), slice(1, 2)] +# is not a correct conversion, and thus we don't support the write operation +zarrs_python_no_collapsed_dim = [ + "test_roundtrip[vindex-contiguous_in_chunk_array-contiguous_in_chunk_array]" +] + def pytest_collection_modifyitems( config: pytest.Config, items: Iterable[pytest.Item] @@ -128,5 +136,6 @@ def pytest_collection_modifyitems( item.name in zarr_python_default_codec_pipeline_failures + zarrs_python_no_discontinuous_writes + + zarrs_python_no_collapsed_dim ): item.add_marker(xfail_marker) From 0de267f75aeb3ddd0adb2addb022fa233ede8500 Mon Sep 17 00:00:00 2001 From: ilan-gold Date: Tue, 12 Nov 2024 16:45:28 +0100 Subject: [PATCH 15/24] (chore): message more specific --- python/zarrs_python/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/zarrs_python/utils.py b/python/zarrs_python/utils.py index f66cb20..4971429 100644 --- a/python/zarrs_python/utils.py +++ b/python/zarrs_python/utils.py @@ -55,7 +55,7 @@ def make_slice_selection(selection: tuple[np.ndarray | float]) -> list[slice]: == len(selection) ): raise DiscontiguousArrayError( - "Vindexing with multiple contiguous numpy arrays is not supported" + "Vindexing with only contiguous numpy arrays is not supported" ) return ls From e53d4a7e4207f9321b87eb5c191287c15ce52a1c Mon Sep 17 00:00:00 2001 From: ilan-gold Date: Thu, 14 Nov 2024 13:56:53 +0100 Subject: [PATCH 16/24] (fix): use `Exception` --- python/zarrs_python/utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/zarrs_python/utils.py b/python/zarrs_python/utils.py index 4971429..fcd6614 100644 --- a/python/zarrs_python/utils.py +++ b/python/zarrs_python/utils.py @@ -20,11 +20,11 @@ def get_max_threads() -> int: return (os.cpu_count() or 1) + 4 -class DiscontiguousArrayError(BaseException): +class DiscontiguousArrayError(Exception): pass -class CollapsedDimensionError(BaseException): +class CollapsedDimensionError(Exception): pass From 45decd53b4ec0f619b9e85275be31a5bc28359b3 Mon Sep 17 00:00:00 2001 From: ilan-gold Date: Thu, 14 Nov 2024 13:58:30 +0100 Subject: [PATCH 17/24] (chore): erroneous comment --- python/zarrs_python/utils.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/zarrs_python/utils.py b/python/zarrs_python/utils.py index fcd6614..b123308 100644 --- a/python/zarrs_python/utils.py +++ b/python/zarrs_python/utils.py @@ -152,7 +152,6 @@ def make_chunk_info_for_rust_with_indices( ], drop_axes: tuple, ) -> list[tuple[tuple[str, ChunkCoords, str, Any], list[slice], list[slice]]]: - # all? for _, chunk_spec, chunk_selection, out_selection in batch_info: shape_out_selection = get_shape_for_selector( out_selection, chunk_spec.shape, (), pad=False From 68b80e588b1d01c525b39958805dc239db2dafb1 Mon Sep 17 00:00:00 2001 From: ilan-gold Date: Thu, 14 Nov 2024 13:59:22 +0100 Subject: [PATCH 18/24] (chore): `drop_axes` default --- python/zarrs_python/utils.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/zarrs_python/utils.py b/python/zarrs_python/utils.py index b123308..47b0590 100644 --- a/python/zarrs_python/utils.py +++ b/python/zarrs_python/utils.py @@ -132,9 +132,9 @@ def resulting_shape_from_index( def get_shape_for_selector( selector_tuple: SelectorTuple, shape: tuple[int, ...], - drop_axes: tuple[int, ...], *, pad: bool, + drop_axes: tuple[int, ...] = (), ) -> tuple[int, ...]: if isinstance(selector_tuple, slice | np.ndarray): return resulting_shape_from_index( @@ -154,10 +154,10 @@ def make_chunk_info_for_rust_with_indices( ) -> list[tuple[tuple[str, ChunkCoords, str, Any], list[slice], list[slice]]]: for _, chunk_spec, chunk_selection, out_selection in batch_info: shape_out_selection = get_shape_for_selector( - out_selection, chunk_spec.shape, (), pad=False + out_selection, chunk_spec.shape, pad=False ) shape_chunk_selection = get_shape_for_selector( - chunk_selection, chunk_spec.shape, drop_axes, pad=True + chunk_selection, chunk_spec.shape, pad=True, drop_axes=drop_axes ) if len(shape_chunk_selection) != len(shape_out_selection): raise CollapsedDimensionError() From 147ac5620ca4ea4b8f0f159340706d441d067336 Mon Sep 17 00:00:00 2001 From: ilan-gold Date: Thu, 14 Nov 2024 13:59:43 +0100 Subject: [PATCH 19/24] (chore): `drop_axes` param --- python/zarrs_python/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/zarrs_python/utils.py b/python/zarrs_python/utils.py index 47b0590..333d6f4 100644 --- a/python/zarrs_python/utils.py +++ b/python/zarrs_python/utils.py @@ -150,7 +150,7 @@ def make_chunk_info_for_rust_with_indices( batch_info: Iterable[ tuple[ByteGetter | ByteSetter, ArraySpec, SelectorTuple, SelectorTuple] ], - drop_axes: tuple, + drop_axes: tuple[int, ...], ) -> list[tuple[tuple[str, ChunkCoords, str, Any], list[slice], list[slice]]]: for _, chunk_spec, chunk_selection, out_selection in batch_info: shape_out_selection = get_shape_for_selector( From f844224fdc45b9631b1556995294eb77eb7fa7c4 Mon Sep 17 00:00:00 2001 From: ilan-gold Date: Thu, 14 Nov 2024 14:03:12 +0100 Subject: [PATCH 20/24] (chore): apply review --- python/zarrs_python/utils.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/python/zarrs_python/utils.py b/python/zarrs_python/utils.py index 333d6f4..576fc1b 100644 --- a/python/zarrs_python/utils.py +++ b/python/zarrs_python/utils.py @@ -94,11 +94,9 @@ def resulting_shape_from_index( # Broadcast all advanced indices, if any if advanced_index_shapes: - broadcasted_shape = np.broadcast_shapes(*advanced_index_shapes) - result_shape.extend(broadcasted_shape) - basic_shape_index += len( - advanced_index_shapes - ) # Consume dimensions from array_shape + result_shape += np.broadcast_shapes(*advanced_index_shapes) + # Consume dimensions from array_shape + basic_shape_index += len(advanced_index_shapes) # Process each remaining index in index_tuple for idx in index_tuple: @@ -117,14 +115,14 @@ def resulting_shape_from_index( elif idx is Ellipsis: # Calculate number of dimensions that Ellipsis should fill num_to_fill = len(array_shape) - len(index_tuple) + 1 - result_shape.extend( - array_shape[basic_shape_index : basic_shape_index + num_to_fill] - ) + result_shape += array_shape[ + basic_shape_index : basic_shape_index + num_to_fill + ] basic_shape_index += num_to_fill # Step 4: Append remaining dimensions from array_shape if fewer indices were used if basic_shape_index < len(array_shape) and pad: - result_shape.extend(array_shape[basic_shape_index:]) + result_shape += array_shape[basic_shape_index:] return tuple(size for idx, size in enumerate(result_shape) if idx not in drop_axes) From 7aeff55a2b969b643d0bcf0c132c012f8d7fa0c3 Mon Sep 17 00:00:00 2001 From: ilan-gold Date: Thu, 14 Nov 2024 14:05:29 +0100 Subject: [PATCH 21/24] (chore): `else` branch --- python/zarrs_python/utils.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/zarrs_python/utils.py b/python/zarrs_python/utils.py index 576fc1b..c68c88f 100644 --- a/python/zarrs_python/utils.py +++ b/python/zarrs_python/utils.py @@ -119,6 +119,8 @@ def resulting_shape_from_index( basic_shape_index : basic_shape_index + num_to_fill ] basic_shape_index += num_to_fill + elif not isinstance(idx, np.ndarray): + raise ValueError(f"Invalid index type: {type(idx)}") # Step 4: Append remaining dimensions from array_shape if fewer indices were used if basic_shape_index < len(array_shape) and pad: From 639f77b0bfa356866c2ee218160d7ad11a5571fc Mon Sep 17 00:00:00 2001 From: Ilan Gold Date: Thu, 14 Nov 2024 14:06:17 +0100 Subject: [PATCH 22/24] (chore): add basic nd tests (#35) * (chore): add basic 3d tests * (refactor): use `pytest_generate_tests` --- tests/conftest.py | 115 ++++++++++++------------ tests/test_pipeline.py | 194 ++++++++++++++++++++++++----------------- 2 files changed, 174 insertions(+), 135 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 9bc6baf..9530fe4 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -66,54 +66,56 @@ def array_fixture(request: pytest.FixtureRequest) -> npt.NDArray[Any]: # tests that also fail with zarr-python's default codec pipeline zarr_python_default_codec_pipeline_failures = [ # ellipsis weirdness, need to report - "test_roundtrip[oindex-contiguous_in_chunk_array-ellipsis]", - "test_roundtrip[oindex-discontinuous_in_chunk_array-ellipsis]", - "test_roundtrip[vindex-contiguous_in_chunk_array-ellipsis]", - "test_roundtrip[vindex-discontinuous_in_chunk_array-ellipsis]", - "test_roundtrip[oindex-across_chunks_indices_array-ellipsis]", - "test_roundtrip[vindex-ellipsis-across_chunks_indices_array]", - "test_roundtrip[vindex-across_chunks_indices_array-ellipsis]", - "test_roundtrip[vindex-ellipsis-contiguous_in_chunk_array]", - "test_roundtrip[vindex-ellipsis-discontinuous_in_chunk_array]", - "test_roundtrip_read_only_zarrs[oindex-contiguous_in_chunk_array-ellipsis]", - "test_roundtrip_read_only_zarrs[oindex-discontinuous_in_chunk_array-ellipsis]", - "test_roundtrip_read_only_zarrs[vindex-contiguous_in_chunk_array-ellipsis]", - "test_roundtrip_read_only_zarrs[vindex-discontinuous_in_chunk_array-ellipsis]", - "test_roundtrip_read_only_zarrs[oindex-across_chunks_indices_array-ellipsis]", - "test_roundtrip_read_only_zarrs[vindex-ellipsis-across_chunks_indices_array]", - "test_roundtrip_read_only_zarrs[vindex-across_chunks_indices_array-ellipsis]", - "test_roundtrip_read_only_zarrs[vindex-ellipsis-contiguous_in_chunk_array]", - "test_roundtrip_read_only_zarrs[vindex-ellipsis-discontinuous_in_chunk_array]", + "test_roundtrip[oindex-2d-contiguous_in_chunk_array-ellipsis]", + "test_roundtrip[oindex-2d-discontinuous_in_chunk_array-ellipsis]", + "test_roundtrip[vindex-2d-contiguous_in_chunk_array-ellipsis]", + "test_roundtrip[vindex-2d-discontinuous_in_chunk_array-ellipsis]", + "test_roundtrip[oindex-2d-across_chunks_indices_array-ellipsis]", + "test_roundtrip[vindex-2d-ellipsis-across_chunks_indices_array]", + "test_roundtrip[vindex-2d-across_chunks_indices_array-ellipsis]", + "test_roundtrip[vindex-2d-ellipsis-contiguous_in_chunk_array]", + "test_roundtrip[vindex-2d-ellipsis-discontinuous_in_chunk_array]", + "test_roundtrip_read_only_zarrs[oindex-2d-contiguous_in_chunk_array-ellipsis]", + "test_roundtrip_read_only_zarrs[oindex-2d-discontinuous_in_chunk_array-ellipsis]", + "test_roundtrip_read_only_zarrs[vindex-2d-contiguous_in_chunk_array-ellipsis]", + "test_roundtrip_read_only_zarrs[vindex-2d-discontinuous_in_chunk_array-ellipsis]", + "test_roundtrip_read_only_zarrs[oindex-2d-across_chunks_indices_array-ellipsis]", + "test_roundtrip_read_only_zarrs[vindex-2d-ellipsis-across_chunks_indices_array]", + "test_roundtrip_read_only_zarrs[vindex-2d-across_chunks_indices_array-ellipsis]", + "test_roundtrip_read_only_zarrs[vindex-2d-ellipsis-contiguous_in_chunk_array]", + "test_roundtrip_read_only_zarrs[vindex-2d-ellipsis-discontinuous_in_chunk_array]", # need to investigate this one - it seems to fail with the default pipeline - "test_roundtrip_read_only_zarrs[vindex-contiguous_in_chunk_array-contiguous_in_chunk_array]", + "test_roundtrip_read_only_zarrs[vindex-2d-contiguous_in_chunk_array-contiguous_in_chunk_array]", ] zarrs_python_no_discontinuous_writes = [ - "test_roundtrip[oindex-discontinuous_in_chunk_array-slice_in_chunk]", - "test_roundtrip[oindex-discontinuous_in_chunk_array-slice_across_chunks]", - "test_roundtrip[oindex-discontinuous_in_chunk_array-fill_slice]", - "test_roundtrip[oindex-discontinuous_in_chunk_array-int]", - "test_roundtrip[oindex-slice_in_chunk-discontinuous_in_chunk_array]", - "test_roundtrip[oindex-slice_across_chunks-discontinuous_in_chunk_array]", - "test_roundtrip[oindex-fill_slice-discontinuous_in_chunk_array]", - "test_roundtrip[oindex-int-discontinuous_in_chunk_array]", - "test_roundtrip[oindex-ellipsis-discontinuous_in_chunk_array]", - "test_roundtrip[vindex-discontinuous_in_chunk_array-slice_in_chunk]", - "test_roundtrip[vindex-discontinuous_in_chunk_array-slice_across_chunks]", - "test_roundtrip[vindex-discontinuous_in_chunk_array-fill_slice]", - "test_roundtrip[vindex-discontinuous_in_chunk_array-int]", - "test_roundtrip[vindex-slice_in_chunk-discontinuous_in_chunk_array]", - "test_roundtrip[vindex-slice_across_chunks-discontinuous_in_chunk_array]", - "test_roundtrip[vindex-fill_slice-discontinuous_in_chunk_array]", - "test_roundtrip[vindex-int-discontinuous_in_chunk_array]", - "test_roundtrip[oindex-discontinuous_in_chunk_array-contiguous_in_chunk_array]", - "test_roundtrip[oindex-contiguous_in_chunk_array-discontinuous_in_chunk_array]", - "test_roundtrip[oindex-across_chunks_indices_array-discontinuous_in_chunk_array]", - "test_roundtrip[oindex-discontinuous_in_chunk_array-discontinuous_in_chunk_array]", - "test_roundtrip[vindex-contiguous_in_chunk_array-discontinuous_in_chunk_array]", - "test_roundtrip[vindex-discontinuous_in_chunk_array-discontinuous_in_chunk_array]", - "test_roundtrip[oindex-discontinuous_in_chunk_array-across_chunks_indices_array]", - "test_roundtrip[vindex-discontinuous_in_chunk_array-contiguous_in_chunk_array]", + "test_roundtrip[oindex-2d-discontinuous_in_chunk_array-slice_in_chunk]", + "test_roundtrip[oindex-2d-discontinuous_in_chunk_array-slice_across_chunks]", + "test_roundtrip[oindex-2d-discontinuous_in_chunk_array-full_slice]", + "test_roundtrip[oindex-2d-discontinuous_in_chunk_array-int]", + "test_roundtrip[oindex-2d-slice_in_chunk-discontinuous_in_chunk_array]", + "test_roundtrip[oindex-2d-slice_across_chunks-discontinuous_in_chunk_array]", + "test_roundtrip[oindex-2d-full_slice-discontinuous_in_chunk_array]", + "test_roundtrip[oindex-2d-int-discontinuous_in_chunk_array]", + "test_roundtrip[oindex-2d-ellipsis-discontinuous_in_chunk_array]", + "test_roundtrip[vindex-2d-discontinuous_in_chunk_array-slice_in_chunk]", + "test_roundtrip[vindex-2d-discontinuous_in_chunk_array-slice_across_chunks]", + "test_roundtrip[vindex-2d-discontinuous_in_chunk_array-full_slice]", + "test_roundtrip[vindex-2d-discontinuous_in_chunk_array-int]", + "test_roundtrip[vindex-2d-slice_in_chunk-discontinuous_in_chunk_array]", + "test_roundtrip[vindex-2d-slice_across_chunks-discontinuous_in_chunk_array]", + "test_roundtrip[vindex-2d-full_slice-discontinuous_in_chunk_array]", + "test_roundtrip[vindex-2d-int-discontinuous_in_chunk_array]", + "test_roundtrip[oindex-2d-discontinuous_in_chunk_array-contiguous_in_chunk_array]", + "test_roundtrip[oindex-2d-contiguous_in_chunk_array-discontinuous_in_chunk_array]", + "test_roundtrip[oindex-2d-across_chunks_indices_array-discontinuous_in_chunk_array]", + "test_roundtrip[oindex-2d-discontinuous_in_chunk_array-discontinuous_in_chunk_array]", + "test_roundtrip[vindex-2d-contiguous_in_chunk_array-discontinuous_in_chunk_array]", + "test_roundtrip[vindex-2d-discontinuous_in_chunk_array-discontinuous_in_chunk_array]", + "test_roundtrip[oindex-2d-discontinuous_in_chunk_array-across_chunks_indices_array]", + "test_roundtrip[vindex-2d-discontinuous_in_chunk_array-contiguous_in_chunk_array]", + "test_roundtrip[oindex-1d-discontinuous_in_chunk_array]", + "test_roundtrip[vindex-1d-discontinuous_in_chunk_array]", ] # vindexing with two contiguous arrays would be converted to two slices but @@ -121,21 +123,26 @@ def array_fixture(request: pytest.FixtureRequest) -> npt.NDArray[Any]: # down to integer indexing, which we can't do i.e., [np.array(1, 2), np.array(1, 2)] -> [slice(1, 2), slice(1, 2)] # is not a correct conversion, and thus we don't support the write operation zarrs_python_no_collapsed_dim = [ - "test_roundtrip[vindex-contiguous_in_chunk_array-contiguous_in_chunk_array]" + "test_roundtrip[vindex-2d-contiguous_in_chunk_array-contiguous_in_chunk_array]" ] def pytest_collection_modifyitems( config: pytest.Config, items: Iterable[pytest.Item] ) -> None: - xfail_marker = pytest.mark.xfail( - reason="This test fails with the zarr-python default codec pipeline." - ) for item in items: - if ( - item.name - in zarr_python_default_codec_pipeline_failures - + zarrs_python_no_discontinuous_writes - + zarrs_python_no_collapsed_dim - ): + if item.name in zarr_python_default_codec_pipeline_failures: + xfail_marker = pytest.mark.xfail( + reason="This test fails with the zarr-python default codec pipeline." + ) + item.add_marker(xfail_marker) + if item.name in zarrs_python_no_discontinuous_writes: + xfail_marker = pytest.mark.xfail( + reason="zarrs discontinuous writes are not supported." + ) + item.add_marker(xfail_marker) + if item.name in zarrs_python_no_collapsed_dim: + xfail_marker = pytest.mark.xfail( + reason="zarrs vindexing with multiple contiguous arrays is not supported." + ) item.add_marker(xfail_marker) diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 8d48324..07b2016 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -1,9 +1,12 @@ #!/usr/bin/env python3 import operator +import tempfile from collections.abc import Callable from contextlib import contextmanager from functools import reduce +from itertools import product +from pathlib import Path from types import EllipsisType import numpy as np @@ -13,76 +16,92 @@ import zarrs_python # noqa: F401 +axis_size_ = 10 +chunk_size_ = axis_size_ // 2 +fill_value_ = 32767 +dimensionalities_ = list(range(1, 5)) + @pytest.fixture def fill_value() -> int: - return 32767 + return fill_value_ -@pytest.fixture -def chunks() -> tuple[int, ...]: - return (5, 5) +non_numpy_indices = [ + pytest.param(slice(1, 3), id="slice_in_chunk"), + pytest.param(slice(1, 7), id="slice_across_chunks"), + pytest.param(2, id="int"), + pytest.param(slice(None), id="full_slice"), + pytest.param(Ellipsis, id="ellipsis"), +] +numpy_indices = [ + pytest.param(np.array([1, 2]), id="contiguous_in_chunk_array"), + pytest.param(np.array([0, 3]), id="discontinuous_in_chunk_array"), + pytest.param(np.array([0, 6]), id="across_chunks_indices_array"), +] -@pytest.fixture -def shape() -> tuple[int, ...]: - return (10, 10) - - -@pytest.fixture( - params=[ - np.array([1, 2]), - np.array([0, 3]), - slice(1, 3), - slice(1, 7), - np.array([0, 6]), - slice(None), - 2, - Ellipsis, - ], - ids=[ - "contiguous_in_chunk_array", - "discontinuous_in_chunk_array", - "slice_in_chunk", - "slice_across_chunks", - "across_chunks_indices_array", - "fill_slice", - "int", - "ellipsis", - ], -) -def indexer_1d(request) -> slice | np.ndarray | int | EllipsisType: - return request.param +all_indices = numpy_indices + non_numpy_indices +indexing_method_params = [ + pytest.param(lambda x: getattr(x, "oindex"), id="oindex"), + pytest.param(lambda x: x, id="vindex"), +] -@pytest.fixture -def full_array(shape) -> np.ndarray: - return np.arange(reduce(operator.mul, shape, 1)).reshape(shape) - -indexer_1d_2 = indexer_1d - - -@pytest.fixture -def index(indexer_1d, indexer_1d_2): - if isinstance(indexer_1d, EllipsisType) and isinstance(indexer_1d_2, EllipsisType): - pytest.skip("Double ellipsis indexing is not valid") - return indexer_1d, indexer_1d_2 +def pytest_generate_tests(metafunc): + old_pipeline_path = zarr.config.get("codec_pipeline.path") + # need to set the codec pipeline to the zarrs pipeline because the autouse fixture doesn't apply here + zarr.config.set({"codec_pipeline.path": "zarrs_python.ZarrsCodecPipeline"}) + if "test_roundtrip" in metafunc.function.__name__: + arrs = [] + indices = [] + store_values = [] + indexing_methods = [] + ids = [] + for dimensionality in dimensionalities_: + indexers = non_numpy_indices if dimensionality > 2 else all_indices + for index_param_prod in product(indexers, repeat=dimensionality): + index = tuple(index_param.values[0] for index_param in index_param_prod) + # multi-ellipsis indexing is not supported + if sum(isinstance(i, EllipsisType) for i in index) > 1: + continue + for indexing_method_param in indexing_method_params: + arr = gen_arr(fill_value_, Path(tempfile.mktemp()), dimensionality) + indexing_method = indexing_method_param.values[0] + dimensionality_id = f"{dimensionality}d" + id = "-".join( + [indexing_method_param.id, dimensionality_id] + + [index_param.id for index_param in index_param_prod] + ) + ids.append(id) + store_values.append( + gen_store_values( + indexing_method, + index, + full_array((axis_size_,) * dimensionality), + ) + ) + indexing_methods.append(indexing_method) + indices.append(index) + arrs.append(arr) + # array is used as param name to prevent collision with arr fixture + metafunc.parametrize( + ["array", "index", "store_values", "indexing_method"], + zip(arrs, indices, store_values, indexing_methods), + ids=ids, + ) + zarr.config.set({"codec_pipeline.path": old_pipeline_path}) -@pytest.fixture( - params=[lambda x: getattr(x, "oindex"), lambda x: x], ids=["oindex", "vindex"] -) -def indexing_method(request) -> Callable: - return request.param +def full_array(shape) -> np.ndarray: + return np.arange(reduce(operator.mul, shape, 1)).reshape(shape) -@pytest.fixture -def store_values( +def gen_store_values( indexing_method: Callable, index: tuple[int | slice | np.ndarray | EllipsisType, ...], full_array: np.ndarray, - shape: tuple[int, ...], ) -> np.ndarray: class smoke: oindex = "oindex" @@ -95,12 +114,12 @@ def maybe_convert( if isinstance(i, slice): return np.arange( i.start if i.start is not None else 0, - i.stop if i.stop is not None else shape[axis], + i.stop if i.stop is not None else full_array.shape[axis], ) if isinstance(i, int): return np.array([i]) if isinstance(i, EllipsisType): - return np.arange(shape[axis]) + return np.arange(full_array.shape[axis]) raise ValueError(f"Invalid index {i}") if not isinstance(index, EllipsisType) and indexing_method(smoke()) == "oindex": @@ -112,54 +131,65 @@ def maybe_convert( if all(i.shape == (1,) for i in index): res = res.squeeze() return res - for axis, i in enumerate(index): - if i.shape == (1,): - res = res.squeeze(axis=axis) + res = res.squeeze( + axis=tuple(axis for axis, i in enumerate(index) if i.shape == (1,)) + ) return res return full_array[index] -@pytest.fixture -def arr(fill_value, chunks, shape, tmp_path) -> zarr.Array: +def gen_arr(fill_value, tmp_path, dimensionality) -> zarr.Array: return zarr.create( - shape, + (axis_size_,) * dimensionality, store=LocalStore(root=tmp_path / ".zarr", mode="w"), - chunks=chunks, + chunks=(chunk_size_,) * dimensionality, dtype=np.int16, fill_value=fill_value, codecs=[zarr.codecs.BytesCodec(), zarr.codecs.BloscCodec()], ) -def test_fill_value(arr: zarr.Array, fill_value: int): - assert np.all(arr[:] == fill_value) +@pytest.fixture(params=dimensionalities_) +def dimensionality(request): + return request.param -def test_roundtrip_constant(arr: zarr.Array): +@pytest.fixture +def arr(dimensionality, tmp_path) -> zarr.Array: + return gen_arr(fill_value_, tmp_path, dimensionality) + + +def test_fill_value(arr: zarr.Array): + assert np.all(arr[:] == fill_value_) + + +def test_constant(arr: zarr.Array): arr[:] = 42 assert np.all(arr[:] == 42) -def test_roundtrip_singleton(arr: zarr.Array): - arr[1, 1] = 42 - assert arr[1, 1] == 42 - assert arr[0, 0] != 42 +def test_singleton(arr: zarr.Array): + singleton_index = (1,) * len(arr.shape) + non_singleton_index = (0,) * len(arr.shape) + arr[singleton_index] = 42 + assert arr[singleton_index] == 42 + assert arr[non_singleton_index] != 42 -def test_roundtrip_full_array(arr: zarr.Array, shape: tuple[int, ...]): - stored_values = np.arange(reduce(operator.mul, shape, 1)).reshape(shape) +def test_full_array(arr: zarr.Array): + stored_values = full_array(arr.shape) arr[:] = stored_values assert np.all(arr[:] == stored_values) def test_roundtrip( - arr: zarr.Array, + array: zarr.Array, store_values: np.ndarray, index: tuple[int | slice | np.ndarray | EllipsisType, ...], indexing_method: Callable, ): - indexing_method(arr)[index] = store_values - res = indexing_method(arr)[index] + indexing_method(array)[index] = store_values + res = indexing_method(array)[index] assert np.all( res == store_values, ), res @@ -175,25 +205,27 @@ def use_zarr_default_codec_reader(): def test_roundtrip_read_only_zarrs( - arr: zarr.Array, + array: zarr.Array, store_values: np.ndarray, index: tuple[int | slice | np.ndarray | EllipsisType, ...], indexing_method: Callable, ): with use_zarr_default_codec_reader(): - arr_default = zarr.open(arr.store, mode="r+") + arr_default = zarr.open(array.store, mode="r+") indexing_method(arr_default)[index] = store_values - res = indexing_method(zarr.open(arr.store))[index] + res = indexing_method(zarr.open(array.store))[index] assert np.all( res == store_values, ), res -def test_roundtrip_ellipsis_indexing_1d_invalid(arr: zarr.Array): +def test_ellipsis_indexing_invalid(arr: zarr.Array): + if len(arr.shape) <= 2: + pytest.skip( + "Ellipsis indexing works for 1D and 2D arrays in zarr-python despite a shape mismatch" + ) stored_value = np.array([1, 2, 3]) - with pytest.raises( - BaseException # TODO: ValueError, but this raises pyo3_runtime.PanicException # noqa: PT011 - ): + with pytest.raises(ValueError): # noqa: PT011 # zarrs-python error: ValueError: operands could not be broadcast together with shapes (4,) (3,) # numpy error: ValueError: could not broadcast input array from shape (3,) into shape (4,) arr[2, ...] = stored_value From 766cf4126efbb85841e8108266b10d02b2fbae80 Mon Sep 17 00:00:00 2001 From: ilan-gold Date: Thu, 14 Nov 2024 16:49:37 +0100 Subject: [PATCH 23/24] (fix): clarify collapsed dimension behavior --- python/zarrs_python/utils.py | 34 +++++++++++++++------------------- tests/conftest.py | 17 ++++++++++++----- 2 files changed, 27 insertions(+), 24 deletions(-) diff --git a/python/zarrs_python/utils.py b/python/zarrs_python/utils.py index c68c88f..7f1dfc2 100644 --- a/python/zarrs_python/utils.py +++ b/python/zarrs_python/utils.py @@ -1,6 +1,8 @@ from __future__ import annotations +import operator import os +from functools import reduce from typing import TYPE_CHECKING, Any import numpy as np @@ -49,14 +51,6 @@ def make_slice_selection(selection: tuple[np.ndarray | float]) -> list[slice]: ls.append(slice(dim_selection[0], dim_selection[-1] + 1, 1)) else: ls.append(dim_selection) - if ( - sum(isinstance(dim_selection, np.ndarray) for dim_selection in selection) - == sum(isinstance(dim_selection, slice) for dim_selection in selection) - == len(selection) - ): - raise DiscontiguousArrayError( - "Vindexing with only contiguous numpy arrays is not supported" - ) return ls @@ -152,23 +146,25 @@ def make_chunk_info_for_rust_with_indices( ], drop_axes: tuple[int, ...], ) -> list[tuple[tuple[str, ChunkCoords, str, Any], list[slice], list[slice]]]: - for _, chunk_spec, chunk_selection, out_selection in batch_info: - shape_out_selection = get_shape_for_selector( - out_selection, chunk_spec.shape, pad=False + chunk_info_with_indices = [] + for byte_getter, chunk_spec, chunk_selection, out_selection in batch_info: + chunk_info = convert_chunk_to_primitive(byte_getter, chunk_spec) + out_selection_as_slices = selector_tuple_to_slice_selection(out_selection) + chunk_selection_as_slices = selector_tuple_to_slice_selection(chunk_selection) + shape_chunk_selection_slices = get_shape_for_selector( + chunk_selection_as_slices, chunk_spec.shape, pad=True, drop_axes=drop_axes ) shape_chunk_selection = get_shape_for_selector( chunk_selection, chunk_spec.shape, pad=True, drop_axes=drop_axes ) - if len(shape_chunk_selection) != len(shape_out_selection): + if reduce(operator.mul, shape_chunk_selection, 1) != reduce( + operator.mul, shape_chunk_selection_slices, 1 + ): raise CollapsedDimensionError() - return list( - ( - convert_chunk_to_primitive(byte_getter, chunk_spec), - selector_tuple_to_slice_selection(out_selection), - selector_tuple_to_slice_selection(chunk_selection), + chunk_info_with_indices.append( + (chunk_info, out_selection_as_slices, chunk_selection_as_slices) ) - for (byte_getter, chunk_spec, chunk_selection, out_selection) in batch_info - ) + return chunk_info_with_indices def make_chunk_info_for_rust( diff --git a/tests/conftest.py b/tests/conftest.py index 9530fe4..1ac9d7f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -11,7 +11,10 @@ from zarr.storage import LocalStore, MemoryStore, ZipStore from zarr.storage.remote import RemoteStore -import zarrs_python # noqa: F401 +from zarrs_python.utils import ( # noqa: F401 + CollapsedDimensionError, + DiscontiguousArrayError, +) if TYPE_CHECKING: from collections.abc import Iterable @@ -85,7 +88,9 @@ def array_fixture(request: pytest.FixtureRequest) -> npt.NDArray[Any]: "test_roundtrip_read_only_zarrs[vindex-2d-ellipsis-contiguous_in_chunk_array]", "test_roundtrip_read_only_zarrs[vindex-2d-ellipsis-discontinuous_in_chunk_array]", # need to investigate this one - it seems to fail with the default pipeline - "test_roundtrip_read_only_zarrs[vindex-2d-contiguous_in_chunk_array-contiguous_in_chunk_array]", + # but it makes some sense that it succeeds with ours since we fall-back to numpy indexing + # in the case of a collapsed dimension + # "test_roundtrip_read_only_zarrs[vindex-2d-contiguous_in_chunk_array-contiguous_in_chunk_array]", ] zarrs_python_no_discontinuous_writes = [ @@ -120,7 +125,7 @@ def array_fixture(request: pytest.FixtureRequest) -> npt.NDArray[Any]: # vindexing with two contiguous arrays would be converted to two slices but # in numpy indexing actually requires dropping a dimension, which in turn boils -# down to integer indexing, which we can't do i.e., [np.array(1, 2), np.array(1, 2)] -> [slice(1, 2), slice(1, 2)] +# down to integer indexing, which we can't do i.e., [np.array(1, 2), np.array(1, 2)] -> [slice(1, 3), slice(1, 3)] # is not a correct conversion, and thus we don't support the write operation zarrs_python_no_collapsed_dim = [ "test_roundtrip[vindex-2d-contiguous_in_chunk_array-contiguous_in_chunk_array]" @@ -138,11 +143,13 @@ def pytest_collection_modifyitems( item.add_marker(xfail_marker) if item.name in zarrs_python_no_discontinuous_writes: xfail_marker = pytest.mark.xfail( - reason="zarrs discontinuous writes are not supported." + raises=DiscontiguousArrayError, + reason="zarrs discontinuous writes are not supported.", ) item.add_marker(xfail_marker) if item.name in zarrs_python_no_collapsed_dim: xfail_marker = pytest.mark.xfail( - reason="zarrs vindexing with multiple contiguous arrays is not supported." + raises=CollapsedDimensionError, + reason="zarrs vindexing with multiple contiguous arrays is not supported.", ) item.add_marker(xfail_marker) From 61a4d4be21dab0e729744726ba8698a3015cef31 Mon Sep 17 00:00:00 2001 From: ilan-gold Date: Thu, 14 Nov 2024 17:16:35 +0100 Subject: [PATCH 24/24] (chore): clean ups --- python/zarrs_python/utils.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/python/zarrs_python/utils.py b/python/zarrs_python/utils.py index 7f1dfc2..43e8b73 100644 --- a/python/zarrs_python/utils.py +++ b/python/zarrs_python/utils.py @@ -123,6 +123,10 @@ def resulting_shape_from_index( return tuple(size for idx, size in enumerate(result_shape) if idx not in drop_axes) +def prod_op(x: Iterable[int]) -> int: + return reduce(operator.mul, x, 1) + + def get_shape_for_selector( selector_tuple: SelectorTuple, shape: tuple[int, ...], @@ -152,15 +156,18 @@ def make_chunk_info_for_rust_with_indices( out_selection_as_slices = selector_tuple_to_slice_selection(out_selection) chunk_selection_as_slices = selector_tuple_to_slice_selection(chunk_selection) shape_chunk_selection_slices = get_shape_for_selector( - chunk_selection_as_slices, chunk_spec.shape, pad=True, drop_axes=drop_axes + tuple(chunk_selection_as_slices), + chunk_spec.shape, + pad=True, + drop_axes=drop_axes, ) shape_chunk_selection = get_shape_for_selector( chunk_selection, chunk_spec.shape, pad=True, drop_axes=drop_axes ) - if reduce(operator.mul, shape_chunk_selection, 1) != reduce( - operator.mul, shape_chunk_selection_slices, 1 - ): - raise CollapsedDimensionError() + if prod_op(shape_chunk_selection) != prod_op(shape_chunk_selection_slices): + raise CollapsedDimensionError( + f"{shape_chunk_selection} != {shape_chunk_selection_slices}" + ) chunk_info_with_indices.append( (chunk_info, out_selection_as_slices, chunk_selection_as_slices) )