From 9c4da5aeb3c65b3f2bbbcf13444ccfbfb943911a Mon Sep 17 00:00:00 2001 From: Phil Schaf Date: Fri, 13 Dec 2024 14:05:41 +0100 Subject: [PATCH 01/14] simpler --- src/chunk_item.rs | 2 +- src/tests.rs | 12 +++++++----- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/src/chunk_item.rs b/src/chunk_item.rs index aea98d8..0af9db4 100644 --- a/src/chunk_item.rs +++ b/src/chunk_item.rs @@ -92,7 +92,7 @@ impl ChunksItem for WithSubset { } } -impl<'a> IntoItem for Raw<'a> { +impl IntoItem for Raw<'_> { fn store_config(&self) -> &StoreConfig { &self.0 } diff --git a/src/tests.rs b/src/tests.rs index 355e8ec..0e11c3c 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -1,3 +1,5 @@ +use pyo3::ffi::c_str; + use numpy::PyUntypedArray; use pyo3::{ types::{PyAnyMethods, PyModule}, @@ -10,13 +12,13 @@ use crate::CodecPipelineImpl; fn test_nparray_to_unsafe_cell_slice_empty() -> PyResult<()> { pyo3::prepare_freethreaded_python(); Python::with_gil(|py| { - let arr: Bound<'_, PyUntypedArray> = PyModule::from_code_bound( + let arr: Bound<'_, PyUntypedArray> = PyModule::from_code( py, - "def empty_array(): + c_str!("def empty_array(): import numpy as np - return np.empty(0, dtype=np.uint8)", - "", - "", + return np.empty(0, dtype=np.uint8)"), + c_str!(""), + c_str!(""), )? .getattr("empty_array")? .call0()? From 67fd07891edcbaf6fb1d57492a14d8846c31255b Mon Sep 17 00:00:00 2001 From: Phil Schaf Date: Fri, 13 Dec 2024 14:18:38 +0100 Subject: [PATCH 02/14] no lifetime for raw --- src/chunk_item.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/chunk_item.rs b/src/chunk_item.rs index 0af9db4..9aa1fa4 100644 --- a/src/chunk_item.rs +++ b/src/chunk_item.rs @@ -14,7 +14,7 @@ use zarrs::{ use crate::{utils::PyErrExt, StoreConfig}; -pub(crate) type Raw<'a> = ( +pub(crate) type Raw = ( // store StoreConfig, // path @@ -28,7 +28,7 @@ pub(crate) type Raw<'a> = ( ); pub(crate) type RawWithIndices<'a> = ( - Raw<'a>, + Raw, // out selection Vec>, // chunk selection @@ -92,7 +92,7 @@ impl ChunksItem for WithSubset { } } -impl IntoItem for Raw<'_> { +impl IntoItem for Raw { fn store_config(&self) -> &StoreConfig { &self.0 } From fcde7ba74a74873bb34172012d5f501f882073c8 Mon Sep 17 00:00:00 2001 From: Phil Schaf Date: Fri, 13 Dec 2024 14:45:00 +0100 Subject: [PATCH 03/14] Make Raw into class --- python/zarrs/_internal.pyi | 32 +++++++++----------- python/zarrs/utils.py | 19 ++++++------ src/chunk_item.rs | 61 +++++++++++++++++++++----------------- src/lib.rs | 1 + src/store.rs | 1 + 5 files changed, 60 insertions(+), 54 deletions(-) diff --git a/python/zarrs/_internal.pyi b/python/zarrs/_internal.pyi index 7e28264..44bec59 100644 --- a/python/zarrs/_internal.pyi +++ b/python/zarrs/_internal.pyi @@ -21,32 +21,17 @@ class CodecPipelineImpl: def retrieve_chunks_and_apply_index( self, chunk_descriptions: typing.Sequence[ - tuple[ - tuple[ - StoreConfig, str, typing.Sequence[int], str, typing.Sequence[int] - ], - typing.Sequence[slice], - typing.Sequence[slice], - ] + tuple[Raw, typing.Sequence[slice], typing.Sequence[slice]] ], value: numpy.NDArray[typing.Any], ) -> None: ... def retrieve_chunks( - self, - chunk_descriptions: typing.Sequence[ - tuple[StoreConfig, str, typing.Sequence[int], str, typing.Sequence[int]] - ], + self, chunk_descriptions: typing.Sequence[Raw] ) -> list[numpy.typing.NDArray[numpy.uint8]]: ... def store_chunks_with_indices( self, chunk_descriptions: typing.Sequence[ - tuple[ - tuple[ - StoreConfig, str, typing.Sequence[int], str, typing.Sequence[int] - ], - typing.Sequence[slice], - typing.Sequence[slice], - ] + tuple[Raw, typing.Sequence[slice], typing.Sequence[slice]] ], value: numpy.NDArray[typing.Any], ) -> None: ... @@ -57,6 +42,17 @@ class FilesystemStoreConfig: class HttpStoreConfig: endpoint: str +class Raw: + def __new__( + cls, + store: StoreConfig, + path: str, + chunk_shape: typing.Sequence[int], + dtype: str, + fill_value: typing.Sequence[int], + ): ... + ... + class StoreConfig(Enum): Filesystem = auto() Http = auto() diff --git a/python/zarrs/utils.py b/python/zarrs/utils.py index 9e2a7b6..c32a9a9 100644 --- a/python/zarrs/utils.py +++ b/python/zarrs/utils.py @@ -3,18 +3,19 @@ import operator import os from functools import reduce -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING import numpy as np from zarr.core.indexing import SelectorTuple, is_integer +from zarrs._internal import Raw + if TYPE_CHECKING: from collections.abc import Iterable from types import EllipsisType - from zarr.abc.store import ByteGetter, ByteSetter, Store + from zarr.abc.store import ByteGetter, ByteSetter from zarr.core.array_spec import ArraySpec - from zarr.core.common import ChunkCoords # adapted from https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor @@ -64,8 +65,8 @@ def selector_tuple_to_slice_selection(selector_tuple: SelectorTuple) -> list[sli def convert_chunk_to_primitive( byte_interface: ByteGetter | ByteSetter, chunk_spec: ArraySpec -) -> tuple[Store, str, ChunkCoords, str, Any]: - return ( +) -> Raw: + return Raw( byte_interface.store, byte_interface.path, chunk_spec.shape, @@ -150,7 +151,7 @@ def make_chunk_info_for_rust_with_indices( tuple[ByteGetter | ByteSetter, ArraySpec, SelectorTuple, SelectorTuple] ], drop_axes: tuple[int, ...], -) -> list[tuple[tuple[Store, str, ChunkCoords, str, Any], list[slice], list[slice]]]: +) -> list[tuple[Raw, list[slice], list[slice]]]: chunk_info_with_indices = [] for byte_getter, chunk_spec, chunk_selection, out_selection in batch_info: chunk_info = convert_chunk_to_primitive(byte_getter, chunk_spec) @@ -179,8 +180,8 @@ def make_chunk_info_for_rust( batch_info: Iterable[ tuple[ByteGetter | ByteSetter, ArraySpec, SelectorTuple, SelectorTuple] ], -) -> list[tuple[Store, str, ChunkCoords, str, Any]]: - return list( +) -> list[Raw]: + return [ convert_chunk_to_primitive(byte_getter, chunk_spec) for (byte_getter, chunk_spec, _, _) in batch_info - ) + ] diff --git a/src/chunk_item.rs b/src/chunk_item.rs index 9aa1fa4..0528abc 100644 --- a/src/chunk_item.rs +++ b/src/chunk_item.rs @@ -1,10 +1,9 @@ use std::num::NonZeroU64; use pyo3::{ - exceptions::{PyRuntimeError, PyValueError}, - types::{PySlice, PySliceMethods}, - Bound, PyErr, PyResult, + exceptions::{PyRuntimeError, PyValueError}, pyclass, pymethods, types::{PySlice, PySliceMethods}, Bound, PyErr, PyResult }; +use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods}; use zarrs::{ array::{ChunkRepresentation, DataType, FillValue}, array_subset::ArraySubset, @@ -14,18 +13,31 @@ use zarrs::{ use crate::{utils::PyErrExt, StoreConfig}; -pub(crate) type Raw = ( - // store - StoreConfig, - // path - String, - // shape - Vec, - // data type - String, - // fill value bytes - Vec, -); +#[derive(Debug, Clone)] +#[gen_stub_pyclass] +#[pyclass] +pub(crate) struct Raw { + pub store: StoreConfig, + pub path: String, + pub chunk_shape: Vec, + pub dtype: String, + pub fill_value: Vec, +} + +#[gen_stub_pymethods] +#[pymethods] +impl Raw { + #[new] + pub fn new( + store: StoreConfig, + path: String, + chunk_shape: Vec, + dtype: String, + fill_value: Vec, + ) -> Self { + Self { store, path, chunk_shape, dtype, fill_value } + } +} pub(crate) type RawWithIndices<'a> = ( Raw, @@ -94,11 +106,11 @@ impl ChunksItem for WithSubset { impl IntoItem for Raw { fn store_config(&self) -> &StoreConfig { - &self.0 + &self.store } fn path(&self) -> &str { - &self.1 + &self.path } fn into_item( @@ -107,23 +119,18 @@ impl IntoItem for Raw { key: StoreKey, (): (), ) -> PyResult { - let (_, _, chunk_shape, dtype, fill_value) = self; - let representation = get_chunk_representation(chunk_shape, &dtype, fill_value)?; - Ok(Basic { - store, - key, - representation, - }) + let representation = get_chunk_representation(self.chunk_shape, &self.dtype, self.fill_value)?; + Ok(Basic { store, key, representation }) } } impl IntoItem for RawWithIndices<'_> { fn store_config(&self) -> &StoreConfig { - &self.0 .0 + &self.0.store } fn path(&self) -> &str { - &self.0 .1 + &self.0.path } fn into_item( @@ -133,7 +140,7 @@ impl IntoItem for RawWithIndices<'_> { shape: &[u64], ) -> PyResult { let (raw, selection, chunk_selection) = self; - let chunk_shape = raw.2.clone(); + let chunk_shape = raw.chunk_shape.clone(); let item = raw.into_item(store.clone(), key, ())?; Ok(WithSubset { item, diff --git a/src/lib.rs b/src/lib.rs index 123f8d2..21c1edb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -526,6 +526,7 @@ impl CodecPipelineImpl { fn _internal(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add("__version__", env!("CARGO_PKG_VERSION"))?; m.add_class::()?; + m.add_class::()?; Ok(()) } diff --git a/src/store.rs b/src/store.rs index a44f77a..2ccaab2 100644 --- a/src/store.rs +++ b/src/store.rs @@ -19,6 +19,7 @@ use crate::{runtime::tokio_block_on, utils::PyErrExt}; mod filesystem; mod http; +#[derive(Debug, Clone)] #[gen_stub_pyclass_enum] pub enum StoreConfig { Filesystem(FilesystemStoreConfig), From d244fdb5e74c6f28beabc663ecbba899bae695e9 Mon Sep 17 00:00:00 2001 From: Phil Schaf Date: Fri, 13 Dec 2024 15:51:09 +0100 Subject: [PATCH 04/14] Almost --- python/zarrs/_internal.pyi | 25 ++++--- python/zarrs/pipeline.py | 26 +++---- python/zarrs/utils.py | 34 ++++----- src/chunk_item.rs | 143 +++++++++++++------------------------ src/lib.rs | 35 ++------- 5 files changed, 93 insertions(+), 170 deletions(-) diff --git a/python/zarrs/_internal.pyi b/python/zarrs/_internal.pyi index 44bec59..4223e3d 100644 --- a/python/zarrs/_internal.pyi +++ b/python/zarrs/_internal.pyi @@ -7,6 +7,10 @@ from enum import Enum, auto import numpy import numpy.typing +class Basic: + def __new__(cls, byte_interface: typing.Any, chunk_spec: typing.Any): ... + ... + class CodecPipelineImpl: def __new__( cls, @@ -20,19 +24,15 @@ class CodecPipelineImpl: ): ... def retrieve_chunks_and_apply_index( self, - chunk_descriptions: typing.Sequence[ - tuple[Raw, typing.Sequence[slice], typing.Sequence[slice]] - ], + chunk_descriptions: typing.Sequence[WithSubset], value: numpy.NDArray[typing.Any], ) -> None: ... def retrieve_chunks( - self, chunk_descriptions: typing.Sequence[Raw] + self, chunk_descriptions: typing.Sequence[Basic] ) -> list[numpy.typing.NDArray[numpy.uint8]]: ... def store_chunks_with_indices( self, - chunk_descriptions: typing.Sequence[ - tuple[Raw, typing.Sequence[slice], typing.Sequence[slice]] - ], + chunk_descriptions: typing.Sequence[WithSubset], value: numpy.NDArray[typing.Any], ) -> None: ... @@ -42,14 +42,13 @@ class FilesystemStoreConfig: class HttpStoreConfig: endpoint: str -class Raw: +class WithSubset: def __new__( cls, - store: StoreConfig, - path: str, - chunk_shape: typing.Sequence[int], - dtype: str, - fill_value: typing.Sequence[int], + item: Basic, + chunk_subset: typing.Sequence[slice], + subset: typing.Sequence[slice], + shape: typing.Sequence[int], ): ... ... diff --git a/python/zarrs/pipeline.py b/python/zarrs/pipeline.py index f6552ba..749fa3d 100644 --- a/python/zarrs/pipeline.py +++ b/python/zarrs/pipeline.py @@ -6,10 +6,7 @@ from typing import TYPE_CHECKING, TypedDict import numpy as np -from zarr.abc.codec import ( - Codec, - CodecPipeline, -) +from zarr.abc.codec import Codec, CodecPipeline from zarr.core.config import config if TYPE_CHECKING: @@ -18,7 +15,7 @@ from zarr.abc.store import ByteGetter, ByteSetter from zarr.core.array_spec import ArraySpec - from zarr.core.buffer import Buffer, NDBuffer + from zarr.core.buffer import Buffer, NDArrayLike, NDBuffer from zarr.core.chunk_grids import ChunkGrid from zarr.core.common import ChunkCoords from zarr.core.indexing import SelectorTuple @@ -127,12 +124,12 @@ async def read( if not out.dtype.isnative: raise RuntimeError("Non-native byte order not supported") try: - chunks_desc = make_chunk_info_for_rust_with_indices(batch_info, drop_axes) - index_in_rust = True + chunks_desc = make_chunk_info_for_rust_with_indices( + batch_info, drop_axes, out.shape + ) except (DiscontiguousArrayError, CollapsedDimensionError): chunks_desc = make_chunk_info_for_rust(batch_info) - index_in_rust = False - if index_in_rust: + else: await asyncio.to_thread( self.impl.retrieve_chunks_and_apply_index, chunks_desc, @@ -158,15 +155,14 @@ async def write( value: NDBuffer, drop_axes: tuple[int, ...] = (), ) -> None: - value = value.as_ndarray_like() # FIXME: Error if array is not in host memory + # FIXME: Error if array is not in host memory + value: NDArrayLike | np.ndarray = value.as_ndarray_like() if not value.dtype.isnative: value = np.ascontiguousarray(value, dtype=value.dtype.newbyteorder("=")) elif not value.flags.c_contiguous: value = np.ascontiguousarray(value) - chunks_desc = make_chunk_info_for_rust_with_indices(batch_info, drop_axes) - await asyncio.to_thread( - self.impl.store_chunks_with_indices, - chunks_desc, - value, + chunks_desc = make_chunk_info_for_rust_with_indices( + batch_info, drop_axes, value.shape ) + await asyncio.to_thread(self.impl.store_chunks_with_indices, chunks_desc, value) return None diff --git a/python/zarrs/utils.py b/python/zarrs/utils.py index c32a9a9..1ac7756 100644 --- a/python/zarrs/utils.py +++ b/python/zarrs/utils.py @@ -8,7 +8,7 @@ import numpy as np from zarr.core.indexing import SelectorTuple, is_integer -from zarrs._internal import Raw +from zarrs._internal import Basic, WithSubset if TYPE_CHECKING: from collections.abc import Iterable @@ -63,18 +63,6 @@ def selector_tuple_to_slice_selection(selector_tuple: SelectorTuple) -> list[sli return make_slice_selection(selector_tuple) -def convert_chunk_to_primitive( - byte_interface: ByteGetter | ByteSetter, chunk_spec: ArraySpec -) -> Raw: - return Raw( - byte_interface.store, - byte_interface.path, - chunk_spec.shape, - str(chunk_spec.dtype), - chunk_spec.fill_value.tobytes(), - ) - - def resulting_shape_from_index( array_shape: tuple[int, ...], index_tuple: tuple[int | slice | EllipsisType | np.ndarray], @@ -151,10 +139,11 @@ def make_chunk_info_for_rust_with_indices( tuple[ByteGetter | ByteSetter, ArraySpec, SelectorTuple, SelectorTuple] ], drop_axes: tuple[int, ...], -) -> list[tuple[Raw, list[slice], list[slice]]]: - chunk_info_with_indices = [] + shape: tuple[int, ...], +) -> list[WithSubset]: + chunk_info_with_indices: list[WithSubset] = [] for byte_getter, chunk_spec, chunk_selection, out_selection in batch_info: - chunk_info = convert_chunk_to_primitive(byte_getter, chunk_spec) + chunk_info = Basic(byte_getter, chunk_spec) out_selection_as_slices = selector_tuple_to_slice_selection(out_selection) chunk_selection_as_slices = selector_tuple_to_slice_selection(chunk_selection) shape_chunk_selection_slices = get_shape_for_selector( @@ -171,7 +160,12 @@ def make_chunk_info_for_rust_with_indices( f"{shape_chunk_selection} != {shape_chunk_selection_slices}" ) chunk_info_with_indices.append( - (chunk_info, out_selection_as_slices, chunk_selection_as_slices) + WithSubset( + chunk_info, + out_selection_as_slices, + chunk_selection_as_slices, + shape, + ) ) return chunk_info_with_indices @@ -180,8 +174,8 @@ def make_chunk_info_for_rust( batch_info: Iterable[ tuple[ByteGetter | ByteSetter, ArraySpec, SelectorTuple, SelectorTuple] ], -) -> list[Raw]: +) -> list[Basic]: return [ - convert_chunk_to_primitive(byte_getter, chunk_spec) - for (byte_getter, chunk_spec, _, _) in batch_info + Basic(byte_interface, chunk_spec) + for (byte_interface, chunk_spec, _, _) in batch_info ] diff --git a/src/chunk_item.rs b/src/chunk_item.rs index 0528abc..90a238b 100644 --- a/src/chunk_item.rs +++ b/src/chunk_item.rs @@ -1,7 +1,10 @@ use std::num::NonZeroU64; use pyo3::{ - exceptions::{PyRuntimeError, PyValueError}, pyclass, pymethods, types::{PySlice, PySliceMethods}, Bound, PyErr, PyResult + exceptions::{PyRuntimeError, PyValueError}, + pyclass, pymethods, + types::{PyAnyMethods as _, PySlice, PySliceMethods as _}, + Bound, PyAny, PyErr, PyResult, }; use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods}; use zarrs::{ @@ -11,52 +14,7 @@ use zarrs::{ storage::{MaybeBytes, ReadableWritableListableStorage, StorageError, StoreKey}, }; -use crate::{utils::PyErrExt, StoreConfig}; - -#[derive(Debug, Clone)] -#[gen_stub_pyclass] -#[pyclass] -pub(crate) struct Raw { - pub store: StoreConfig, - pub path: String, - pub chunk_shape: Vec, - pub dtype: String, - pub fill_value: Vec, -} - -#[gen_stub_pymethods] -#[pymethods] -impl Raw { - #[new] - pub fn new( - store: StoreConfig, - path: String, - chunk_shape: Vec, - dtype: String, - fill_value: Vec, - ) -> Self { - Self { store, path, chunk_shape, dtype, fill_value } - } -} - -pub(crate) type RawWithIndices<'a> = ( - Raw, - // out selection - Vec>, - // chunk selection - Vec>, -); - -pub(crate) trait IntoItem: std::marker::Sized { - fn store_config(&self) -> &StoreConfig; - fn path(&self) -> &str; - fn into_item( - self, - store: ReadableWritableListableStorage, - key: StoreKey, - shape: S, - ) -> PyResult; -} +use crate::{store::StoreConfig, utils::PyErrExt}; pub(crate) trait ChunksItem { fn store(&self) -> ReadableWritableListableStorage; @@ -68,18 +26,63 @@ pub(crate) trait ChunksItem { } } +#[derive(Clone)] +#[gen_stub_pyclass] +#[pyclass] pub(crate) struct Basic { store: ReadableWritableListableStorage, key: StoreKey, representation: ChunkRepresentation, } +#[gen_stub_pymethods] +#[pymethods] +impl Basic { + #[new] + fn new(byte_interface: &Bound<'_, PyAny>, chunk_spec: &Bound<'_, PyAny>) -> PyResult { + let store: StoreConfig = byte_interface.getattr("store")?.extract()?; + let path: String = byte_interface.getattr("path")?.extract()?; + + let chunk_shape = chunk_spec.getattr("shape")?.extract()?; + let dtype: String = chunk_spec.getattr("dtype")?.call_method0("__str__")?.extract()?; + let fill_value = chunk_spec.getattr("fill_value")?.call_method0("tobytes")?.extract()?; + Ok(Self { + store: (&store).try_into()?, + key: StoreKey::new(path).map_py_err::()?, + representation: get_chunk_representation(chunk_shape, &dtype, fill_value)?, + }) + } +} + +#[derive(Clone)] +#[gen_stub_pyclass] +#[pyclass] pub(crate) struct WithSubset { pub item: Basic, pub chunk_subset: ArraySubset, pub subset: ArraySubset, } +#[gen_stub_pymethods] +#[pymethods] +impl WithSubset { + #[new] + fn new( + item: Basic, + chunk_subset: Vec>, + subset: Vec>, + shape: Vec, + ) -> PyResult { + let chunk_subset = selection_to_array_subset(&chunk_subset, &shape)?; + let subset = selection_to_array_subset(&subset, &shape)?; + Ok(Self { + item, + chunk_subset, + subset, + }) + } +} + impl ChunksItem for Basic { fn store(&self) -> ReadableWritableListableStorage { self.store.clone() @@ -104,52 +107,6 @@ impl ChunksItem for WithSubset { } } -impl IntoItem for Raw { - fn store_config(&self) -> &StoreConfig { - &self.store - } - - fn path(&self) -> &str { - &self.path - } - - fn into_item( - self, - store: ReadableWritableListableStorage, - key: StoreKey, - (): (), - ) -> PyResult { - let representation = get_chunk_representation(self.chunk_shape, &self.dtype, self.fill_value)?; - Ok(Basic { store, key, representation }) - } -} - -impl IntoItem for RawWithIndices<'_> { - fn store_config(&self) -> &StoreConfig { - &self.0.store - } - - fn path(&self) -> &str { - &self.0.path - } - - fn into_item( - self, - store: ReadableWritableListableStorage, - key: StoreKey, - shape: &[u64], - ) -> PyResult { - let (raw, selection, chunk_selection) = self; - let chunk_shape = raw.chunk_shape.clone(); - let item = raw.into_item(store.clone(), key, ())?; - Ok(WithSubset { - item, - chunk_subset: selection_to_array_subset(&chunk_selection, &chunk_shape)?, - subset: selection_to_array_subset(&selection, shape)?, - }) - } -} - fn get_chunk_representation( chunk_shape: Vec, dtype: &str, diff --git a/src/lib.rs b/src/lib.rs index 21c1edb..b56f644 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,7 @@ #![warn(clippy::pedantic)] #![allow(clippy::module_name_repetitions)] -use chunk_item::{ChunksItem, IntoItem}; +use chunk_item::ChunksItem; use concurrency::ChunkConcurrentLimitAndCodecOptions; use numpy::npyffi::PyArrayObject; use numpy::{IntoPyArray, PyArray1, PyUntypedArray, PyUntypedArrayMethods}; @@ -66,23 +66,6 @@ impl CodecPipelineImpl { } } - fn collect_chunk_descriptions, I, S: Copy>( - &self, - chunk_descriptions: Vec, - shape: S, - ) -> PyResult> { - chunk_descriptions - .into_iter() - .map(|raw| { - // TODO: Prefer to get the store once, and assume it is the same for all chunks - let store = self.get_store_from_config(raw.store_config())?; - let path = raw.path(); - let key = StoreKey::new(path).map_py_err::()?; - raw.into_item(store, key, shape) - }) - .collect() - } - fn retrieve_chunk_bytes<'a, I: ChunksItem>( item: &I, codec_chain: &CodecChain, @@ -282,7 +265,7 @@ impl CodecPipelineImpl { fn retrieve_chunks_and_apply_index( &self, py: Python, - chunk_descriptions: Vec, // FIXME: Ref / iterable? + chunk_descriptions: Vec, // FIXME: Ref / iterable? value: &Bound<'_, PyUntypedArray>, ) -> PyResult<()> { // Get input array @@ -293,8 +276,6 @@ impl CodecPipelineImpl { } let output = Self::nparray_to_unsafe_cell_slice(value); let output_shape: Vec = value.shape_zarr()?; - let chunk_descriptions = - self.collect_chunk_descriptions(chunk_descriptions, &output_shape)?; // Adjust the concurrency based on the codec chain and the first chunk description let Some((chunk_concurrent_limit, codec_options)) = @@ -388,10 +369,8 @@ impl CodecPipelineImpl { fn retrieve_chunks<'py>( &self, py: Python<'py>, - chunk_descriptions: Vec, // FIXME: Ref / iterable? + chunk_descriptions: Vec, // FIXME: Ref / iterable? ) -> PyResult>>> { - let chunk_descriptions = self.collect_chunk_descriptions(chunk_descriptions, ())?; - // Adjust the concurrency based on the codec chain and the first chunk description let Some((chunk_concurrent_limit, codec_options)) = chunk_descriptions.get_chunk_concurrent_limit_and_codec_options(self)? @@ -439,7 +418,7 @@ impl CodecPipelineImpl { fn store_chunks_with_indices( &self, py: Python, - chunk_descriptions: Vec, + chunk_descriptions: Vec, value: &Bound<'_, PyUntypedArray>, ) -> PyResult<()> { enum InputValue<'a> { @@ -460,10 +439,7 @@ impl CodecPipelineImpl { } else { InputValue::Constant(FillValue::new(input_slice.to_vec())) }; - let input_shape: Vec = value.shape_zarr()?; - let chunk_descriptions = - self.collect_chunk_descriptions(chunk_descriptions, &input_shape)?; // Adjust the concurrency based on the codec chain and the first chunk description let Some((chunk_concurrent_limit, codec_options)) = @@ -526,7 +502,8 @@ impl CodecPipelineImpl { fn _internal(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add("__version__", env!("CARGO_PKG_VERSION"))?; m.add_class::()?; - m.add_class::()?; + m.add_class::()?; + m.add_class::()?; Ok(()) } From 62ec2873ff3e0d066562c2716888eac0cbaf7328 Mon Sep 17 00:00:00 2001 From: Phil Schaf Date: Fri, 13 Dec 2024 16:04:14 +0100 Subject: [PATCH 05/14] remove global store --- src/lib.rs | 25 ++----------------------- 1 file changed, 2 insertions(+), 23 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index b56f644..965e9e3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,8 +12,7 @@ use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods}; use rayon::iter::{IntoParallelIterator, ParallelIterator}; use rayon_iter_concurrent_limit::iter_concurrent_limit; use std::borrow::Cow; -use std::sync::{Arc, Mutex}; -use store::StoreConfig; +use std::sync::Arc; use unsafe_cell_slice::UnsafeCellSlice; use zarrs::array::codec::{ ArrayToBytesCodecTraits, CodecOptions, CodecOptionsBuilder, StoragePartialDecoder, @@ -23,7 +22,7 @@ use zarrs::array::{ }; use zarrs::array_subset::ArraySubset; use zarrs::metadata::v3::MetadataV3; -use zarrs::storage::{ReadableWritableListableStorage, StorageHandle, StoreKey}; +use zarrs::storage::StorageHandle; mod chunk_item; mod concurrency; @@ -40,7 +39,6 @@ use utils::{PyErrExt, PyUntypedArrayExt}; #[pyclass] pub struct CodecPipelineImpl { pub(crate) codec_chain: Arc, - pub(crate) store: Mutex>, pub(crate) codec_options: CodecOptions, pub(crate) chunk_concurrent_minimum: usize, pub(crate) chunk_concurrent_maximum: usize, @@ -48,24 +46,6 @@ pub struct CodecPipelineImpl { } impl CodecPipelineImpl { - fn get_store_from_config( - &self, - config: &StoreConfig, - ) -> PyResult { - let mut gstore = self.store.lock().map_err(|_| { - PyErr::new::("failed to lock the store mutex".to_string()) - })?; - - // TODO: Request upstream change to get store on codec pipeline initialisation, do not want to do all of this here - if let Some(gstore) = gstore.as_ref() { - Ok(gstore.clone()) - } else { - let store: ReadableWritableListableStorage = config.try_into()?; - *gstore = Some(store.clone()); - Ok(store) - } - } - fn retrieve_chunk_bytes<'a, I: ChunksItem>( item: &I, codec_chain: &CodecChain, @@ -254,7 +234,6 @@ impl CodecPipelineImpl { Ok(Self { codec_chain, - store: Mutex::new(None), codec_options, chunk_concurrent_minimum, chunk_concurrent_maximum, From 41558c3c5af488285abcfb1ed4c25212b4384aeb Mon Sep 17 00:00:00 2001 From: Phil Schaf Date: Fri, 13 Dec 2024 16:10:19 +0100 Subject: [PATCH 06/14] prettier --- python/zarrs/pipeline.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/python/zarrs/pipeline.py b/python/zarrs/pipeline.py index 749fa3d..d0f0c28 100644 --- a/python/zarrs/pipeline.py +++ b/python/zarrs/pipeline.py @@ -137,10 +137,7 @@ async def read( ) return None chunks = await asyncio.to_thread(self.impl.retrieve_chunks, chunks_desc) - for chunk, chunk_info in zip(chunks, batch_info): - out_selection = chunk_info[3] - selection = chunk_info[2] - spec = chunk_info[1] + for chunk, (_, spec, selection, out_selection) in zip(chunks, batch_info): chunk_reshaped = chunk.view(spec.dtype).reshape(spec.shape) chunk_selected = chunk_reshaped[selection] if drop_axes: From 4f749c9f51074fadf3c55e5ddb92ac12e51a5b72 Mon Sep 17 00:00:00 2001 From: Phil Schaf Date: Fri, 13 Dec 2024 16:15:46 +0100 Subject: [PATCH 07/14] unify --- python/zarrs/pipeline.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/python/zarrs/pipeline.py b/python/zarrs/pipeline.py index d0f0c28..86846d2 100644 --- a/python/zarrs/pipeline.py +++ b/python/zarrs/pipeline.py @@ -117,10 +117,11 @@ async def read( batch_info: Iterable[ tuple[ByteGetter, ArraySpec, SelectorTuple, SelectorTuple] ], - out: NDBuffer, + out: NDBuffer, # type: ignore drop_axes: tuple[int, ...] = (), # FIXME: unused ) -> None: - out = out.as_ndarray_like() # FIXME: Error if array is not in host memory + # FIXME: Error if array is not in host memory + out: NDArrayLike = out.as_ndarray_like() if not out.dtype.isnative: raise RuntimeError("Non-native byte order not supported") try: @@ -149,7 +150,7 @@ async def write( batch_info: Iterable[ tuple[ByteSetter, ArraySpec, SelectorTuple, SelectorTuple] ], - value: NDBuffer, + value: NDBuffer, # type: ignore drop_axes: tuple[int, ...] = (), ) -> None: # FIXME: Error if array is not in host memory From 14b27595697603ebe8164b4a5dc5e2d1aa506748 Mon Sep 17 00:00:00 2001 From: Phil Schaf Date: Fri, 13 Dec 2024 17:13:12 +0100 Subject: [PATCH 08/14] reuse stores --- Cargo.toml | 1 + src/chunk_item.rs | 16 ++++++-------- src/lib.rs | 47 +++++++++++++++++++++++++++++------------ src/store.rs | 2 +- src/store/filesystem.rs | 2 +- src/store/http.rs | 2 +- 6 files changed, 43 insertions(+), 27 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2524bf7..7fd53d3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ pyo3-stub-gen = "0.6.1" opendal = { version = "0.50.2", features = ["services-http"] } tokio = { version = "1.41.1", features = ["rt-multi-thread"] } zarrs_opendal = "0.4.0" +arc_map = "0.1.3" [profile.release] lto = true diff --git a/src/chunk_item.rs b/src/chunk_item.rs index 90a238b..1ce6d43 100644 --- a/src/chunk_item.rs +++ b/src/chunk_item.rs @@ -11,26 +11,22 @@ use zarrs::{ array::{ChunkRepresentation, DataType, FillValue}, array_subset::ArraySubset, metadata::v3::{array::data_type::DataTypeMetadataV3, MetadataV3}, - storage::{MaybeBytes, ReadableWritableListableStorage, StorageError, StoreKey}, + storage::StoreKey, }; use crate::{store::StoreConfig, utils::PyErrExt}; pub(crate) trait ChunksItem { - fn store(&self) -> ReadableWritableListableStorage; + fn store_config(&self) -> StoreConfig; fn key(&self) -> &StoreKey; fn representation(&self) -> &ChunkRepresentation; - - fn get(&self) -> Result { - self.store().get(self.key()) - } } #[derive(Clone)] #[gen_stub_pyclass] #[pyclass] pub(crate) struct Basic { - store: ReadableWritableListableStorage, + store: StoreConfig, key: StoreKey, representation: ChunkRepresentation, } @@ -47,7 +43,7 @@ impl Basic { let dtype: String = chunk_spec.getattr("dtype")?.call_method0("__str__")?.extract()?; let fill_value = chunk_spec.getattr("fill_value")?.call_method0("tobytes")?.extract()?; Ok(Self { - store: (&store).try_into()?, + store, key: StoreKey::new(path).map_py_err::()?, representation: get_chunk_representation(chunk_shape, &dtype, fill_value)?, }) @@ -84,7 +80,7 @@ impl WithSubset { } impl ChunksItem for Basic { - fn store(&self) -> ReadableWritableListableStorage { + fn store_config(&self) -> StoreConfig { self.store.clone() } fn key(&self) -> &StoreKey { @@ -96,7 +92,7 @@ impl ChunksItem for Basic { } impl ChunksItem for WithSubset { - fn store(&self) -> ReadableWritableListableStorage { + fn store_config(&self) -> StoreConfig { self.item.store.clone() } fn key(&self) -> &StoreKey { diff --git a/src/lib.rs b/src/lib.rs index 965e9e3..aab0a03 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,14 +5,16 @@ use chunk_item::ChunksItem; use concurrency::ChunkConcurrentLimitAndCodecOptions; use numpy::npyffi::PyArrayObject; use numpy::{IntoPyArray, PyArray1, PyUntypedArray, PyUntypedArrayMethods}; -use pyo3::exceptions::{PyRuntimeError, PyTypeError, PyValueError}; +use pyo3::exceptions::{PyKeyError, PyRuntimeError, PyTypeError, PyValueError}; use pyo3::prelude::*; use pyo3_stub_gen::define_stub_info_gatherer; use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods}; use rayon::iter::{IntoParallelIterator, ParallelIterator}; use rayon_iter_concurrent_limit::iter_concurrent_limit; +use store::StoreConfig; use std::borrow::Cow; -use std::sync::Arc; +use std::collections::BTreeMap; +use std::sync::{Arc, Mutex}; use unsafe_cell_slice::UnsafeCellSlice; use zarrs::array::codec::{ ArrayToBytesCodecTraits, CodecOptions, CodecOptionsBuilder, StoragePartialDecoder, @@ -22,7 +24,7 @@ use zarrs::array::{ }; use zarrs::array_subset::ArraySubset; use zarrs::metadata::v3::MetadataV3; -use zarrs::storage::StorageHandle; +use zarrs::storage::{MaybeBytes, ReadableWritableListableStorage, StorageHandle}; mod chunk_item; mod concurrency; @@ -38,6 +40,7 @@ use utils::{PyErrExt, PyUntypedArrayExt}; #[gen_stub_pyclass] #[pyclass] pub struct CodecPipelineImpl { + pub(crate) stores: Mutex>, pub(crate) codec_chain: Arc, pub(crate) codec_options: CodecOptions, pub(crate) chunk_concurrent_minimum: usize, @@ -46,12 +49,25 @@ pub struct CodecPipelineImpl { } impl CodecPipelineImpl { + fn store(&self, item: &I) -> PyResult { + use std::collections::btree_map::Entry::{Occupied, Vacant}; + Ok(match self.stores.lock().map_py_err::()?.entry(item.store_config()) { + Occupied(e) => e.get().clone(), + Vacant(e) => e.insert((&item.store_config()).try_into()?).clone(), + }) + } + + fn get(&self, item: &I) -> PyResult { + self.store(item)?.get(item.key()).map_py_err::() + } + fn retrieve_chunk_bytes<'a, I: ChunksItem>( + &self, item: &I, codec_chain: &CodecChain, codec_options: &CodecOptions, ) -> PyResult> { - let value_encoded = item.get().map_py_err::()?; + let value_encoded = self.get(item).map_py_err::()?; let value_decoded = if let Some(value_encoded) = value_encoded { let value_encoded: Vec = value_encoded.into(); // zero-copy in this case codec_chain @@ -68,6 +84,7 @@ impl CodecPipelineImpl { } fn store_chunk_bytes( + &self, item: &I, codec_chain: &CodecChain, value_decoded: ArrayBytes, @@ -81,7 +98,7 @@ impl CodecPipelineImpl { .map_py_err::()?; if value_decoded.is_fill_value(item.representation().fill_value()) { - item.store().erase(item.key()) + self.store(item)?.erase(item.key()) } else { let value_encoded = codec_chain .encode(value_decoded, item.representation(), codec_options) @@ -89,12 +106,13 @@ impl CodecPipelineImpl { .map_py_err::()?; // Store the encoded chunk - item.store().set(item.key(), value_encoded.into()) + self.store(item)?.set(item.key(), value_encoded.into()) } .map_py_err::() } fn store_chunk_subset_bytes( + &self, item: &I, codec_chain: &CodecChain, chunk_subset_bytes: ArrayBytes, @@ -111,7 +129,7 @@ impl CodecPipelineImpl { && chunk_subset.shape() == item.representation().shape_u64() { // Fast path if the chunk subset spans the entire chunk, no read required - Self::store_chunk_bytes(item, codec_chain, chunk_subset_bytes, codec_options) + self.store_chunk_bytes(item, codec_chain, chunk_subset_bytes, codec_options) } else { // Validate the chunk subset bytes chunk_subset_bytes @@ -122,7 +140,7 @@ impl CodecPipelineImpl { .map_py_err::()?; // Retrieve the chunk - let chunk_bytes_old = Self::retrieve_chunk_bytes(item, codec_chain, codec_options)?; + let chunk_bytes_old = self.retrieve_chunk_bytes(item, codec_chain, codec_options)?; // Update the chunk let chunk_bytes_new = unsafe { @@ -141,7 +159,7 @@ impl CodecPipelineImpl { }; // Store the updated chunk - Self::store_chunk_bytes(item, codec_chain, chunk_bytes_new, codec_options) + self.store_chunk_bytes(item, codec_chain, chunk_bytes_new, codec_options) } } @@ -233,6 +251,7 @@ impl CodecPipelineImpl { let num_threads = num_threads.unwrap_or(rayon::current_num_threads()); Ok(Self { + stores: Mutex::default(), codec_chain, codec_options, chunk_concurrent_minimum, @@ -270,7 +289,7 @@ impl CodecPipelineImpl { && item.chunk_subset.shape() == item.representation().shape_u64() { // See zarrs::array::Array::retrieve_chunk_into - let chunk_encoded = item.get().map_py_err::()?; + let chunk_encoded = self.get(&item)?; if let Some(chunk_encoded) = chunk_encoded { // Decode the encoded data into the output buffer let chunk_encoded: Vec = chunk_encoded.into(); @@ -305,7 +324,7 @@ impl CodecPipelineImpl { } } else { // Partially decode the chunk into the output buffer - let storage_handle = Arc::new(StorageHandle::new(item.store().clone())); + let storage_handle = Arc::new(StorageHandle::new(self.store(&item)?)); // NOTE: Normally a storage transformer would exist between the storage handle and the input handle // but zarr-python does not support them nor forward them to the codec pipeline let input_handle = Arc::new(StoragePartialDecoder::new( @@ -359,7 +378,7 @@ impl CodecPipelineImpl { let chunk_bytes = py.allow_threads(move || { let get_chunk_subset = |item: chunk_item::Basic| { - let chunk_encoded = item.get().map_py_err::()?; + let chunk_encoded = self.get(&item).map_py_err::()?; Ok(if let Some(chunk_encoded) = chunk_encoded { let chunk_encoded: Vec = chunk_encoded.into(); self.codec_chain @@ -437,7 +456,7 @@ impl CodecPipelineImpl { item.item.representation().data_type(), ) .map_py_err::()?; - Self::store_chunk_subset_bytes( + self.store_chunk_subset_bytes( &item, &self.codec_chain, chunk_subset_bytes, @@ -454,7 +473,7 @@ impl CodecPipelineImpl { constant_value, ); - Self::store_chunk_subset_bytes( + self.store_chunk_subset_bytes( &item, &self.codec_chain, chunk_subset_bytes, diff --git a/src/store.rs b/src/store.rs index 2ccaab2..3eeb0b8 100644 --- a/src/store.rs +++ b/src/store.rs @@ -19,7 +19,7 @@ use crate::{runtime::tokio_block_on, utils::PyErrExt}; mod filesystem; mod http; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] #[gen_stub_pyclass_enum] pub enum StoreConfig { Filesystem(FilesystemStoreConfig), diff --git a/src/store/filesystem.rs b/src/store/filesystem.rs index 9bf5b16..8ee865b 100644 --- a/src/store/filesystem.rs +++ b/src/store/filesystem.rs @@ -6,7 +6,7 @@ use zarrs::{filesystem::FilesystemStore, storage::ReadableWritableListableStorag use crate::utils::PyErrExt; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] #[gen_stub_pyclass] #[pyclass] pub struct FilesystemStoreConfig { diff --git a/src/store/http.rs b/src/store/http.rs index 725d5a4..0c7820b 100644 --- a/src/store/http.rs +++ b/src/store/http.rs @@ -6,7 +6,7 @@ use zarrs::storage::ReadableWritableListableStorage; use super::opendal_builder_to_sync_store; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] #[gen_stub_pyclass] #[pyclass] pub struct HttpStoreConfig { From 0eb504733628528f343ae81a65ed57c467dd0dd6 Mon Sep 17 00:00:00 2001 From: Phil Schaf Date: Fri, 13 Dec 2024 17:19:19 +0100 Subject: [PATCH 09/14] informative error message --- src/lib.rs | 42 ++++++++++++++++++++++-------------------- 1 file changed, 22 insertions(+), 20 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index aab0a03..4c1d91c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,10 +11,10 @@ use pyo3_stub_gen::define_stub_info_gatherer; use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods}; use rayon::iter::{IntoParallelIterator, ParallelIterator}; use rayon_iter_concurrent_limit::iter_concurrent_limit; -use store::StoreConfig; use std::borrow::Cow; use std::collections::BTreeMap; use std::sync::{Arc, Mutex}; +use store::StoreConfig; use unsafe_cell_slice::UnsafeCellSlice; use zarrs::array::codec::{ ArrayToBytesCodecTraits, CodecOptions, CodecOptionsBuilder, StoragePartialDecoder, @@ -51,16 +51,21 @@ pub struct CodecPipelineImpl { impl CodecPipelineImpl { fn store(&self, item: &I) -> PyResult { use std::collections::btree_map::Entry::{Occupied, Vacant}; - Ok(match self.stores.lock().map_py_err::()?.entry(item.store_config()) { - Occupied(e) => e.get().clone(), - Vacant(e) => e.insert((&item.store_config()).try_into()?).clone(), - }) + match self + .stores + .lock() + .map_py_err::()? + .entry(item.store_config()) + { + Occupied(e) => Ok(e.get().clone()), + Vacant(e) => Ok(e.insert((&item.store_config()).try_into()?).clone()), + } } - + fn get(&self, item: &I) -> PyResult { self.store(item)?.get(item.key()).map_py_err::() } - + fn retrieve_chunk_bytes<'a, I: ChunksItem>( &self, item: &I, @@ -119,24 +124,21 @@ impl CodecPipelineImpl { chunk_subset: &ArraySubset, codec_options: &CodecOptions, ) -> PyResult<()> { - if !chunk_subset.inbounds(&item.representation().shape_u64()) { - return Err(PyErr::new::( - "chunk subset is out of bounds".to_string(), - )); + let array_shape = item.representation().shape_u64(); + if !chunk_subset.inbounds(&array_shape) { + return Err(PyErr::new::(format!( + "chunk subset ({chunk_subset}) is out of bounds for array shape ({array_shape:?})" + ))); } + let data_type_size = item.representation().data_type().size(); - if chunk_subset.start().iter().all(|&o| o == 0) - && chunk_subset.shape() == item.representation().shape_u64() - { + if chunk_subset.start().iter().all(|&o| o == 0) && chunk_subset.shape() == array_shape { // Fast path if the chunk subset spans the entire chunk, no read required self.store_chunk_bytes(item, codec_chain, chunk_subset_bytes, codec_options) } else { // Validate the chunk subset bytes chunk_subset_bytes - .validate( - chunk_subset.num_elements(), - item.representation().data_type().size(), - ) + .validate(chunk_subset.num_elements(), data_type_size) .map_py_err::()?; // Retrieve the chunk @@ -151,10 +153,10 @@ impl CodecPipelineImpl { // - output bytes and output subset bytes are compatible (same data type) update_array_bytes( chunk_bytes_old, - &item.representation().shape_u64(), + &array_shape, chunk_subset, &chunk_subset_bytes, - item.representation().data_type().size(), + data_type_size, ) }; From 867af5c3fab46f6479852f70bad4c9fc4f9c3075 Mon Sep 17 00:00:00 2001 From: Phil Schaf Date: Fri, 13 Dec 2024 17:20:03 +0100 Subject: [PATCH 10/14] undo arc_map --- Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 7fd53d3..2524bf7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,6 @@ pyo3-stub-gen = "0.6.1" opendal = { version = "0.50.2", features = ["services-http"] } tokio = { version = "1.41.1", features = ["rt-multi-thread"] } zarrs_opendal = "0.4.0" -arc_map = "0.1.3" [profile.release] lto = true From e8384de3cad56cc54a0f82eb94834fb8e0c13cac Mon Sep 17 00:00:00 2001 From: Phil Schaf Date: Fri, 13 Dec 2024 17:40:46 +0100 Subject: [PATCH 11/14] actually use correct selections --- python/zarrs/utils.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/zarrs/utils.py b/python/zarrs/utils.py index 1ac7756..d553687 100644 --- a/python/zarrs/utils.py +++ b/python/zarrs/utils.py @@ -162,9 +162,9 @@ def make_chunk_info_for_rust_with_indices( chunk_info_with_indices.append( WithSubset( chunk_info, - out_selection_as_slices, - chunk_selection_as_slices, - shape, + chunk_subset=chunk_selection_as_slices, + subset=out_selection_as_slices, + shape=shape, ) ) return chunk_info_with_indices From 77d20e3b98319cfc580d5c149fc6dc9d8d901310 Mon Sep 17 00:00:00 2001 From: Lachlan Deakin Date: Sat, 14 Dec 2024 07:01:36 +1100 Subject: [PATCH 12/14] fmt --- src/chunk_item.rs | 10 ++++++++-- src/tests.rs | 10 ++++++---- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/src/chunk_item.rs b/src/chunk_item.rs index 1ce6d43..3b6c10c 100644 --- a/src/chunk_item.rs +++ b/src/chunk_item.rs @@ -40,8 +40,14 @@ impl Basic { let path: String = byte_interface.getattr("path")?.extract()?; let chunk_shape = chunk_spec.getattr("shape")?.extract()?; - let dtype: String = chunk_spec.getattr("dtype")?.call_method0("__str__")?.extract()?; - let fill_value = chunk_spec.getattr("fill_value")?.call_method0("tobytes")?.extract()?; + let dtype: String = chunk_spec + .getattr("dtype")? + .call_method0("__str__")? + .extract()?; + let fill_value = chunk_spec + .getattr("fill_value")? + .call_method0("tobytes")? + .extract()?; Ok(Self { store, key: StoreKey::new(path).map_py_err::()?, diff --git a/src/tests.rs b/src/tests.rs index 0e11c3c..2cf4570 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -14,11 +14,13 @@ fn test_nparray_to_unsafe_cell_slice_empty() -> PyResult<()> { Python::with_gil(|py| { let arr: Bound<'_, PyUntypedArray> = PyModule::from_code( py, - c_str!("def empty_array(): + c_str!( + "def empty_array(): import numpy as np - return np.empty(0, dtype=np.uint8)"), - c_str!(""), - c_str!(""), + return np.empty(0, dtype=np.uint8)" + ), + c_str!(""), + c_str!(""), )? .getattr("empty_array")? .call0()? From 459580c419a4bc9a219934a10cede331128170b7 Mon Sep 17 00:00:00 2001 From: Lachlan Deakin Date: Sat, 14 Dec 2024 07:01:47 +1100 Subject: [PATCH 13/14] fix: chunk subset calculation --- src/chunk_item.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/chunk_item.rs b/src/chunk_item.rs index 3b6c10c..6aab47a 100644 --- a/src/chunk_item.rs +++ b/src/chunk_item.rs @@ -75,7 +75,8 @@ impl WithSubset { subset: Vec>, shape: Vec, ) -> PyResult { - let chunk_subset = selection_to_array_subset(&chunk_subset, &shape)?; + let chunk_subset = + selection_to_array_subset(&chunk_subset, &item.representation.shape_u64())?; let subset = selection_to_array_subset(&subset, &shape)?; Ok(Self { item, From c421065e43181cdc59f178d62aa1f94009c55bf4 Mon Sep 17 00:00:00 2001 From: Lachlan Deakin Date: Sat, 14 Dec 2024 07:01:59 +1100 Subject: [PATCH 14/14] fix: constant array shape handling --- python/zarrs/utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/zarrs/utils.py b/python/zarrs/utils.py index d553687..0aa0d0d 100644 --- a/python/zarrs/utils.py +++ b/python/zarrs/utils.py @@ -141,6 +141,7 @@ def make_chunk_info_for_rust_with_indices( drop_axes: tuple[int, ...], shape: tuple[int, ...], ) -> list[WithSubset]: + shape = shape if shape else (1,) # constant array chunk_info_with_indices: list[WithSubset] = [] for byte_getter, chunk_spec, chunk_selection, out_selection in batch_info: chunk_info = Basic(byte_getter, chunk_spec)