Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(feat): fall back to pure python indexing in case of unhandled rust indexing for read #30

Merged
merged 27 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
afd0a37
(chore): file structure
ilan-gold Nov 7, 2024
4ceba6c
Merge branch 'ld/codec_pipeline' into ig/refactoring
ilan-gold Nov 7, 2024
2af41ec
(chore): parametrize tests to get full scope of possibilities
ilan-gold Nov 7, 2024
db97bd4
Merge branch 'ld/codec_pipeline' into ig/test_full_indexing
ilan-gold Nov 8, 2024
c945e88
(chore): xfail tests that fail on zarr-python default pipeline
ilan-gold Nov 8, 2024
0bb8dbc
(fix) singular
ilan-gold Nov 8, 2024
3f176bf
(fix): check for contiguous index arrays
ilan-gold Nov 8, 2024
ff44774
(fix): contiguous numpy arrays converted to slices
ilan-gold Nov 8, 2024
f81a1c8
(feat): add reading for non-contiguous buffers
ilan-gold Nov 8, 2024
42de375
(chore): remove unused imports
ilan-gold Nov 8, 2024
8b60bed
(fix): cleanup unwraps in `retrieve_chunks`
LDeakin Nov 8, 2024
c35766c
Merge branch 'ld/codec_pipeline' into ig/test_full_indexing
flying-sheep Nov 9, 2024
16262fe
Refactor full indexing (#34)
flying-sheep Nov 11, 2024
96d8a2e
(chore): `make_chunk_info_for_rust` cleanup
ilan-gold Nov 11, 2024
fbfa958
(fix): all tests working except "tests/test_pipeline.py::test_roundtr…
ilan-gold Nov 11, 2024
9f90b5e
(fix): skip read in `store_chunk_subset_bytes` for full chunks
LDeakin Nov 11, 2024
deba69e
(fix): improve dropped index detection + disallow integer write case
ilan-gold Nov 12, 2024
0de267f
(chore): message more specific
ilan-gold Nov 12, 2024
e53d4a7
(fix): use `Exception`
ilan-gold Nov 14, 2024
45decd5
(chore): erroneous comment
ilan-gold Nov 14, 2024
68b80e5
(chore): `drop_axes` default
ilan-gold Nov 14, 2024
147ac56
(chore): `drop_axes` param
ilan-gold Nov 14, 2024
f844224
(chore): apply review
ilan-gold Nov 14, 2024
7aeff55
(chore): `else` branch
ilan-gold Nov 14, 2024
639f77b
(chore): add basic nd tests (#35)
ilan-gold Nov 14, 2024
766cf41
(fix): clarify collapsed dimension behavior
ilan-gold Nov 14, 2024
61a4d4b
(chore): clean ups
ilan-gold Nov 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 37 additions & 8 deletions python/zarrs_python/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,13 @@
from zarr.core.indexing import SelectorTuple

from ._internal import CodecPipelineImpl
from .utils import get_max_threads, make_chunk_info_for_rust
from .utils import (
CollapsedDimensionError,
DiscontiguousArrayError,
get_max_threads,
make_chunk_info_for_rust,
make_chunk_info_for_rust_with_indices,
)


@dataclass(frozen=True)
Expand Down Expand Up @@ -94,12 +100,32 @@ async def read(
out = out.as_ndarray_like() # FIXME: Error if array is not in host memory
if not out.dtype.isnative:
raise RuntimeError("Non-native byte order not supported")

chunks_desc = make_chunk_info_for_rust(batch_info)
await asyncio.to_thread(
self.impl.retrieve_chunks, chunks_desc, out, chunk_concurrent_limit
try:
chunks_desc = make_chunk_info_for_rust_with_indices(batch_info, drop_axes)
index_in_rust = True
except (DiscontiguousArrayError, CollapsedDimensionError):
chunks_desc = make_chunk_info_for_rust(batch_info)
index_in_rust = False
if index_in_rust:
await asyncio.to_thread(
self.impl.retrieve_chunks_and_apply_index,
chunks_desc,
out,
chunk_concurrent_limit,
)
return None
chunks = await asyncio.to_thread(
self.impl.retrieve_chunks, chunks_desc, chunk_concurrent_limit
)
return None
for chunk, chunk_info in zip(chunks, batch_info):
out_selection = chunk_info[3]
selection = chunk_info[2]
spec = chunk_info[1]
chunk_reshaped = chunk.view(spec.dtype).reshape(spec.shape)
chunk_selected = chunk_reshaped[selection]
if drop_axes:
chunk_selected = np.squeeze(chunk_selected, axis=drop_axes)
out[out_selection] = chunk_selected

async def write(
self,
Expand All @@ -117,8 +143,11 @@ async def write(
value = np.ascontiguousarray(value, dtype=value.dtype.newbyteorder("="))
elif not value.flags.c_contiguous:
value = np.ascontiguousarray(value)
chunks_desc = make_chunk_info_for_rust(batch_info)
chunks_desc = make_chunk_info_for_rust_with_indices(batch_info, drop_axes)
await asyncio.to_thread(
self.impl.store_chunks, chunks_desc, value, chunk_concurrent_limit
self.impl.store_chunks_with_indices,
chunks_desc,
value,
chunk_concurrent_limit,
)
return None
160 changes: 141 additions & 19 deletions python/zarrs_python/utils.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
from __future__ import annotations

import operator
import os
from functools import reduce
from typing import TYPE_CHECKING, Any

import numpy as np
from zarr.core.indexing import ArrayIndexError, SelectorTuple, is_integer
from zarr.core.indexing import SelectorTuple, is_integer

if TYPE_CHECKING:
from collections.abc import Iterable
from types import EllipsisType

from zarr.abc.store import ByteSetter
from zarr.abc.store import ByteGetter, ByteSetter
from zarr.core.array_spec import ArraySpec
from zarr.core.common import ChunkCoords

Expand All @@ -19,45 +22,164 @@ def get_max_threads() -> int:
return (os.cpu_count() or 1) + 4


# This is a copy of the function from zarr.core.indexing that fixes:
class DiscontiguousArrayError(Exception):
pass


class CollapsedDimensionError(Exception):
pass


# This is a (mostly) copy of the function from zarr.core.indexing that fixes:
# DeprecationWarning: Conversion of an array with ndim > 0 to a scalar is deprecated
# TODO: Upstream this fix
def make_slice_selection(selection: Any) -> list[slice]:
def make_slice_selection(selection: tuple[np.ndarray | float]) -> list[slice]:
ls: list[slice] = []
for dim_selection in selection:
if is_integer(dim_selection):
ls.append(slice(int(dim_selection), int(dim_selection) + 1, 1))
elif isinstance(dim_selection, np.ndarray):
dim_selection = dim_selection.ravel()
if len(dim_selection) == 1:
ls.append(
slice(int(dim_selection.item()), int(dim_selection.item()) + 1, 1)
)
else:
raise ArrayIndexError
diff = np.diff(dim_selection)
if (diff != 1).any() and (diff != 0).any():
raise DiscontiguousArrayError(diff)
ls.append(slice(dim_selection[0], dim_selection[-1] + 1, 1))
else:
ls.append(dim_selection)
return ls


def selector_tuple_to_slice_selection(selector_tuple: SelectorTuple) -> list[slice]:
if isinstance(selector_tuple, slice):
return [selector_tuple]
if all(isinstance(s, slice) for s in selector_tuple):
return list(selector_tuple)
return make_slice_selection(selector_tuple)


def convert_chunk_to_primitive(
byte_getter: ByteGetter | ByteSetter, chunk_spec: ArraySpec
) -> tuple[str, ChunkCoords, str, Any]:
return (
[selector_tuple]
if isinstance(selector_tuple, slice)
else make_slice_selection(selector_tuple)
str(byte_getter),
chunk_spec.shape,
str(chunk_spec.dtype),
chunk_spec.fill_value.tobytes(),
)


def make_chunk_info_for_rust(
batch_info: Iterable[tuple[ByteSetter, ArraySpec, SelectorTuple, SelectorTuple]],
) -> list[tuple[str, ChunkCoords, str, Any, list[slice], list[slice]]]:
return list(
(
str(byte_getter),
def resulting_shape_from_index(
array_shape: tuple[int, ...],
index_tuple: tuple[int | slice | EllipsisType | np.ndarray],
drop_axes: tuple[int, ...],
*,
pad: bool,
) -> tuple[int, ...]:
result_shape = []
advanced_index_shapes = [
idx.shape for idx in index_tuple if isinstance(idx, np.ndarray)
]
basic_shape_index = 0

# Broadcast all advanced indices, if any
if advanced_index_shapes:
result_shape += np.broadcast_shapes(*advanced_index_shapes)
# Consume dimensions from array_shape
basic_shape_index += len(advanced_index_shapes)

# Process each remaining index in index_tuple
for idx in index_tuple:
if isinstance(idx, int):
# Integer index reduces dimension, so skip this dimension in array_shape
basic_shape_index += 1
elif isinstance(idx, slice):
if idx.step is not None and idx.step > 1:
raise DiscontiguousArrayError(
"Step size greater than 1 is not supported"
)
# Slice keeps dimension, adjust size accordingly
start, stop, _ = idx.indices(array_shape[basic_shape_index])
result_shape.append(stop - start)
basic_shape_index += 1
elif idx is Ellipsis:
# Calculate number of dimensions that Ellipsis should fill
num_to_fill = len(array_shape) - len(index_tuple) + 1
result_shape += array_shape[
basic_shape_index : basic_shape_index + num_to_fill
]
basic_shape_index += num_to_fill
flying-sheep marked this conversation as resolved.
Show resolved Hide resolved
elif not isinstance(idx, np.ndarray):
raise ValueError(f"Invalid index type: {type(idx)}")

# Step 4: Append remaining dimensions from array_shape if fewer indices were used
if basic_shape_index < len(array_shape) and pad:
result_shape += array_shape[basic_shape_index:]

return tuple(size for idx, size in enumerate(result_shape) if idx not in drop_axes)


def prod_op(x: Iterable[int]) -> int:
return reduce(operator.mul, x, 1)


def get_shape_for_selector(
selector_tuple: SelectorTuple,
shape: tuple[int, ...],
*,
pad: bool,
drop_axes: tuple[int, ...] = (),
) -> tuple[int, ...]:
if isinstance(selector_tuple, slice | np.ndarray):
return resulting_shape_from_index(
shape,
(selector_tuple,),
drop_axes,
pad=pad,
)
return resulting_shape_from_index(shape, selector_tuple, drop_axes, pad=pad)


def make_chunk_info_for_rust_with_indices(
batch_info: Iterable[
tuple[ByteGetter | ByteSetter, ArraySpec, SelectorTuple, SelectorTuple]
],
drop_axes: tuple[int, ...],
) -> list[tuple[tuple[str, ChunkCoords, str, Any], list[slice], list[slice]]]:
chunk_info_with_indices = []
for byte_getter, chunk_spec, chunk_selection, out_selection in batch_info:
chunk_info = convert_chunk_to_primitive(byte_getter, chunk_spec)
out_selection_as_slices = selector_tuple_to_slice_selection(out_selection)
chunk_selection_as_slices = selector_tuple_to_slice_selection(chunk_selection)
shape_chunk_selection_slices = get_shape_for_selector(
tuple(chunk_selection_as_slices),
chunk_spec.shape,
str(chunk_spec.dtype),
chunk_spec.fill_value.tobytes(),
selector_tuple_to_slice_selection(out_selection),
selector_tuple_to_slice_selection(chunk_selection),
pad=True,
drop_axes=drop_axes,
)
for (byte_getter, chunk_spec, chunk_selection, out_selection) in batch_info
shape_chunk_selection = get_shape_for_selector(
chunk_selection, chunk_spec.shape, pad=True, drop_axes=drop_axes
)
if prod_op(shape_chunk_selection) != prod_op(shape_chunk_selection_slices):
raise CollapsedDimensionError(
f"{shape_chunk_selection} != {shape_chunk_selection_slices}"
)
chunk_info_with_indices.append(
(chunk_info, out_selection_as_slices, chunk_selection_as_slices)
)
return chunk_info_with_indices


def make_chunk_info_for_rust(
batch_info: Iterable[
tuple[ByteGetter | ByteSetter, ArraySpec, SelectorTuple, SelectorTuple]
],
) -> list[tuple[str, ChunkCoords, str, Any]]:
return list(
convert_chunk_to_primitive(byte_getter, chunk_spec)
for (byte_getter, chunk_spec, _, _) in batch_info
)
Loading
Loading