diff --git a/python/zarrs/_internal.pyi b/python/zarrs/_internal.pyi index 7e28264..4223e3d 100644 --- a/python/zarrs/_internal.pyi +++ b/python/zarrs/_internal.pyi @@ -7,6 +7,10 @@ from enum import Enum, auto import numpy import numpy.typing +class Basic: + def __new__(cls, byte_interface: typing.Any, chunk_spec: typing.Any): ... + ... + class CodecPipelineImpl: def __new__( cls, @@ -20,34 +24,15 @@ class CodecPipelineImpl: ): ... def retrieve_chunks_and_apply_index( self, - chunk_descriptions: typing.Sequence[ - tuple[ - tuple[ - StoreConfig, str, typing.Sequence[int], str, typing.Sequence[int] - ], - typing.Sequence[slice], - typing.Sequence[slice], - ] - ], + chunk_descriptions: typing.Sequence[WithSubset], value: numpy.NDArray[typing.Any], ) -> None: ... def retrieve_chunks( - self, - chunk_descriptions: typing.Sequence[ - tuple[StoreConfig, str, typing.Sequence[int], str, typing.Sequence[int]] - ], + self, chunk_descriptions: typing.Sequence[Basic] ) -> list[numpy.typing.NDArray[numpy.uint8]]: ... def store_chunks_with_indices( self, - chunk_descriptions: typing.Sequence[ - tuple[ - tuple[ - StoreConfig, str, typing.Sequence[int], str, typing.Sequence[int] - ], - typing.Sequence[slice], - typing.Sequence[slice], - ] - ], + chunk_descriptions: typing.Sequence[WithSubset], value: numpy.NDArray[typing.Any], ) -> None: ... @@ -57,6 +42,16 @@ class FilesystemStoreConfig: class HttpStoreConfig: endpoint: str +class WithSubset: + def __new__( + cls, + item: Basic, + chunk_subset: typing.Sequence[slice], + subset: typing.Sequence[slice], + shape: typing.Sequence[int], + ): ... + ... + class StoreConfig(Enum): Filesystem = auto() Http = auto() diff --git a/python/zarrs/pipeline.py b/python/zarrs/pipeline.py index f6552ba..86846d2 100644 --- a/python/zarrs/pipeline.py +++ b/python/zarrs/pipeline.py @@ -6,10 +6,7 @@ from typing import TYPE_CHECKING, TypedDict import numpy as np -from zarr.abc.codec import ( - Codec, - CodecPipeline, -) +from zarr.abc.codec import Codec, CodecPipeline from zarr.core.config import config if TYPE_CHECKING: @@ -18,7 +15,7 @@ from zarr.abc.store import ByteGetter, ByteSetter from zarr.core.array_spec import ArraySpec - from zarr.core.buffer import Buffer, NDBuffer + from zarr.core.buffer import Buffer, NDArrayLike, NDBuffer from zarr.core.chunk_grids import ChunkGrid from zarr.core.common import ChunkCoords from zarr.core.indexing import SelectorTuple @@ -120,19 +117,20 @@ async def read( batch_info: Iterable[ tuple[ByteGetter, ArraySpec, SelectorTuple, SelectorTuple] ], - out: NDBuffer, + out: NDBuffer, # type: ignore drop_axes: tuple[int, ...] = (), # FIXME: unused ) -> None: - out = out.as_ndarray_like() # FIXME: Error if array is not in host memory + # FIXME: Error if array is not in host memory + out: NDArrayLike = out.as_ndarray_like() if not out.dtype.isnative: raise RuntimeError("Non-native byte order not supported") try: - chunks_desc = make_chunk_info_for_rust_with_indices(batch_info, drop_axes) - index_in_rust = True + chunks_desc = make_chunk_info_for_rust_with_indices( + batch_info, drop_axes, out.shape + ) except (DiscontiguousArrayError, CollapsedDimensionError): chunks_desc = make_chunk_info_for_rust(batch_info) - index_in_rust = False - if index_in_rust: + else: await asyncio.to_thread( self.impl.retrieve_chunks_and_apply_index, chunks_desc, @@ -140,10 +138,7 @@ async def read( ) return None chunks = await asyncio.to_thread(self.impl.retrieve_chunks, chunks_desc) - for chunk, chunk_info in zip(chunks, batch_info): - out_selection = chunk_info[3] - selection = chunk_info[2] - spec = chunk_info[1] + for chunk, (_, spec, selection, out_selection) in zip(chunks, batch_info): chunk_reshaped = chunk.view(spec.dtype).reshape(spec.shape) chunk_selected = chunk_reshaped[selection] if drop_axes: @@ -155,18 +150,17 @@ async def write( batch_info: Iterable[ tuple[ByteSetter, ArraySpec, SelectorTuple, SelectorTuple] ], - value: NDBuffer, + value: NDBuffer, # type: ignore drop_axes: tuple[int, ...] = (), ) -> None: - value = value.as_ndarray_like() # FIXME: Error if array is not in host memory + # FIXME: Error if array is not in host memory + value: NDArrayLike | np.ndarray = value.as_ndarray_like() if not value.dtype.isnative: value = np.ascontiguousarray(value, dtype=value.dtype.newbyteorder("=")) elif not value.flags.c_contiguous: value = np.ascontiguousarray(value) - chunks_desc = make_chunk_info_for_rust_with_indices(batch_info, drop_axes) - await asyncio.to_thread( - self.impl.store_chunks_with_indices, - chunks_desc, - value, + chunks_desc = make_chunk_info_for_rust_with_indices( + batch_info, drop_axes, value.shape ) + await asyncio.to_thread(self.impl.store_chunks_with_indices, chunks_desc, value) return None diff --git a/python/zarrs/utils.py b/python/zarrs/utils.py index 9e2a7b6..0aa0d0d 100644 --- a/python/zarrs/utils.py +++ b/python/zarrs/utils.py @@ -3,18 +3,19 @@ import operator import os from functools import reduce -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING import numpy as np from zarr.core.indexing import SelectorTuple, is_integer +from zarrs._internal import Basic, WithSubset + if TYPE_CHECKING: from collections.abc import Iterable from types import EllipsisType - from zarr.abc.store import ByteGetter, ByteSetter, Store + from zarr.abc.store import ByteGetter, ByteSetter from zarr.core.array_spec import ArraySpec - from zarr.core.common import ChunkCoords # adapted from https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor @@ -62,18 +63,6 @@ def selector_tuple_to_slice_selection(selector_tuple: SelectorTuple) -> list[sli return make_slice_selection(selector_tuple) -def convert_chunk_to_primitive( - byte_interface: ByteGetter | ByteSetter, chunk_spec: ArraySpec -) -> tuple[Store, str, ChunkCoords, str, Any]: - return ( - byte_interface.store, - byte_interface.path, - chunk_spec.shape, - str(chunk_spec.dtype), - chunk_spec.fill_value.tobytes(), - ) - - def resulting_shape_from_index( array_shape: tuple[int, ...], index_tuple: tuple[int | slice | EllipsisType | np.ndarray], @@ -150,10 +139,12 @@ def make_chunk_info_for_rust_with_indices( tuple[ByteGetter | ByteSetter, ArraySpec, SelectorTuple, SelectorTuple] ], drop_axes: tuple[int, ...], -) -> list[tuple[tuple[Store, str, ChunkCoords, str, Any], list[slice], list[slice]]]: - chunk_info_with_indices = [] + shape: tuple[int, ...], +) -> list[WithSubset]: + shape = shape if shape else (1,) # constant array + chunk_info_with_indices: list[WithSubset] = [] for byte_getter, chunk_spec, chunk_selection, out_selection in batch_info: - chunk_info = convert_chunk_to_primitive(byte_getter, chunk_spec) + chunk_info = Basic(byte_getter, chunk_spec) out_selection_as_slices = selector_tuple_to_slice_selection(out_selection) chunk_selection_as_slices = selector_tuple_to_slice_selection(chunk_selection) shape_chunk_selection_slices = get_shape_for_selector( @@ -170,7 +161,12 @@ def make_chunk_info_for_rust_with_indices( f"{shape_chunk_selection} != {shape_chunk_selection_slices}" ) chunk_info_with_indices.append( - (chunk_info, out_selection_as_slices, chunk_selection_as_slices) + WithSubset( + chunk_info, + chunk_subset=chunk_selection_as_slices, + subset=out_selection_as_slices, + shape=shape, + ) ) return chunk_info_with_indices @@ -179,8 +175,8 @@ def make_chunk_info_for_rust( batch_info: Iterable[ tuple[ByteGetter | ByteSetter, ArraySpec, SelectorTuple, SelectorTuple] ], -) -> list[tuple[Store, str, ChunkCoords, str, Any]]: - return list( - convert_chunk_to_primitive(byte_getter, chunk_spec) - for (byte_getter, chunk_spec, _, _) in batch_info - ) +) -> list[Basic]: + return [ + Basic(byte_interface, chunk_spec) + for (byte_interface, chunk_spec, _, _) in batch_info + ] diff --git a/src/chunk_item.rs b/src/chunk_item.rs index aea98d8..6aab47a 100644 --- a/src/chunk_item.rs +++ b/src/chunk_item.rs @@ -2,74 +2,92 @@ use std::num::NonZeroU64; use pyo3::{ exceptions::{PyRuntimeError, PyValueError}, - types::{PySlice, PySliceMethods}, - Bound, PyErr, PyResult, + pyclass, pymethods, + types::{PyAnyMethods as _, PySlice, PySliceMethods as _}, + Bound, PyAny, PyErr, PyResult, }; +use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods}; use zarrs::{ array::{ChunkRepresentation, DataType, FillValue}, array_subset::ArraySubset, metadata::v3::{array::data_type::DataTypeMetadataV3, MetadataV3}, - storage::{MaybeBytes, ReadableWritableListableStorage, StorageError, StoreKey}, + storage::StoreKey, }; -use crate::{utils::PyErrExt, StoreConfig}; - -pub(crate) type Raw<'a> = ( - // store - StoreConfig, - // path - String, - // shape - Vec, - // data type - String, - // fill value bytes - Vec, -); - -pub(crate) type RawWithIndices<'a> = ( - Raw<'a>, - // out selection - Vec>, - // chunk selection - Vec>, -); - -pub(crate) trait IntoItem: std::marker::Sized { - fn store_config(&self) -> &StoreConfig; - fn path(&self) -> &str; - fn into_item( - self, - store: ReadableWritableListableStorage, - key: StoreKey, - shape: S, - ) -> PyResult; -} +use crate::{store::StoreConfig, utils::PyErrExt}; pub(crate) trait ChunksItem { - fn store(&self) -> ReadableWritableListableStorage; + fn store_config(&self) -> StoreConfig; fn key(&self) -> &StoreKey; fn representation(&self) -> &ChunkRepresentation; - - fn get(&self) -> Result { - self.store().get(self.key()) - } } +#[derive(Clone)] +#[gen_stub_pyclass] +#[pyclass] pub(crate) struct Basic { - store: ReadableWritableListableStorage, + store: StoreConfig, key: StoreKey, representation: ChunkRepresentation, } +#[gen_stub_pymethods] +#[pymethods] +impl Basic { + #[new] + fn new(byte_interface: &Bound<'_, PyAny>, chunk_spec: &Bound<'_, PyAny>) -> PyResult { + let store: StoreConfig = byte_interface.getattr("store")?.extract()?; + let path: String = byte_interface.getattr("path")?.extract()?; + + let chunk_shape = chunk_spec.getattr("shape")?.extract()?; + let dtype: String = chunk_spec + .getattr("dtype")? + .call_method0("__str__")? + .extract()?; + let fill_value = chunk_spec + .getattr("fill_value")? + .call_method0("tobytes")? + .extract()?; + Ok(Self { + store, + key: StoreKey::new(path).map_py_err::()?, + representation: get_chunk_representation(chunk_shape, &dtype, fill_value)?, + }) + } +} + +#[derive(Clone)] +#[gen_stub_pyclass] +#[pyclass] pub(crate) struct WithSubset { pub item: Basic, pub chunk_subset: ArraySubset, pub subset: ArraySubset, } +#[gen_stub_pymethods] +#[pymethods] +impl WithSubset { + #[new] + fn new( + item: Basic, + chunk_subset: Vec>, + subset: Vec>, + shape: Vec, + ) -> PyResult { + let chunk_subset = + selection_to_array_subset(&chunk_subset, &item.representation.shape_u64())?; + let subset = selection_to_array_subset(&subset, &shape)?; + Ok(Self { + item, + chunk_subset, + subset, + }) + } +} + impl ChunksItem for Basic { - fn store(&self) -> ReadableWritableListableStorage { + fn store_config(&self) -> StoreConfig { self.store.clone() } fn key(&self) -> &StoreKey { @@ -81,7 +99,7 @@ impl ChunksItem for Basic { } impl ChunksItem for WithSubset { - fn store(&self) -> ReadableWritableListableStorage { + fn store_config(&self) -> StoreConfig { self.item.store.clone() } fn key(&self) -> &StoreKey { @@ -92,57 +110,6 @@ impl ChunksItem for WithSubset { } } -impl<'a> IntoItem for Raw<'a> { - fn store_config(&self) -> &StoreConfig { - &self.0 - } - - fn path(&self) -> &str { - &self.1 - } - - fn into_item( - self, - store: ReadableWritableListableStorage, - key: StoreKey, - (): (), - ) -> PyResult { - let (_, _, chunk_shape, dtype, fill_value) = self; - let representation = get_chunk_representation(chunk_shape, &dtype, fill_value)?; - Ok(Basic { - store, - key, - representation, - }) - } -} - -impl IntoItem for RawWithIndices<'_> { - fn store_config(&self) -> &StoreConfig { - &self.0 .0 - } - - fn path(&self) -> &str { - &self.0 .1 - } - - fn into_item( - self, - store: ReadableWritableListableStorage, - key: StoreKey, - shape: &[u64], - ) -> PyResult { - let (raw, selection, chunk_selection) = self; - let chunk_shape = raw.2.clone(); - let item = raw.into_item(store.clone(), key, ())?; - Ok(WithSubset { - item, - chunk_subset: selection_to_array_subset(&chunk_selection, &chunk_shape)?, - subset: selection_to_array_subset(&selection, shape)?, - }) - } -} - fn get_chunk_representation( chunk_shape: Vec, dtype: &str, diff --git a/src/lib.rs b/src/lib.rs index 123f8d2..4c1d91c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,17 +1,18 @@ #![warn(clippy::pedantic)] #![allow(clippy::module_name_repetitions)] -use chunk_item::{ChunksItem, IntoItem}; +use chunk_item::ChunksItem; use concurrency::ChunkConcurrentLimitAndCodecOptions; use numpy::npyffi::PyArrayObject; use numpy::{IntoPyArray, PyArray1, PyUntypedArray, PyUntypedArrayMethods}; -use pyo3::exceptions::{PyRuntimeError, PyTypeError, PyValueError}; +use pyo3::exceptions::{PyKeyError, PyRuntimeError, PyTypeError, PyValueError}; use pyo3::prelude::*; use pyo3_stub_gen::define_stub_info_gatherer; use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods}; use rayon::iter::{IntoParallelIterator, ParallelIterator}; use rayon_iter_concurrent_limit::iter_concurrent_limit; use std::borrow::Cow; +use std::collections::BTreeMap; use std::sync::{Arc, Mutex}; use store::StoreConfig; use unsafe_cell_slice::UnsafeCellSlice; @@ -23,7 +24,7 @@ use zarrs::array::{ }; use zarrs::array_subset::ArraySubset; use zarrs::metadata::v3::MetadataV3; -use zarrs::storage::{ReadableWritableListableStorage, StorageHandle, StoreKey}; +use zarrs::storage::{MaybeBytes, ReadableWritableListableStorage, StorageHandle}; mod chunk_item; mod concurrency; @@ -39,8 +40,8 @@ use utils::{PyErrExt, PyUntypedArrayExt}; #[gen_stub_pyclass] #[pyclass] pub struct CodecPipelineImpl { + pub(crate) stores: Mutex>, pub(crate) codec_chain: Arc, - pub(crate) store: Mutex>, pub(crate) codec_options: CodecOptions, pub(crate) chunk_concurrent_minimum: usize, pub(crate) chunk_concurrent_maximum: usize, @@ -48,47 +49,30 @@ pub struct CodecPipelineImpl { } impl CodecPipelineImpl { - fn get_store_from_config( - &self, - config: &StoreConfig, - ) -> PyResult { - let mut gstore = self.store.lock().map_err(|_| { - PyErr::new::("failed to lock the store mutex".to_string()) - })?; - - // TODO: Request upstream change to get store on codec pipeline initialisation, do not want to do all of this here - if let Some(gstore) = gstore.as_ref() { - Ok(gstore.clone()) - } else { - let store: ReadableWritableListableStorage = config.try_into()?; - *gstore = Some(store.clone()); - Ok(store) + fn store(&self, item: &I) -> PyResult { + use std::collections::btree_map::Entry::{Occupied, Vacant}; + match self + .stores + .lock() + .map_py_err::()? + .entry(item.store_config()) + { + Occupied(e) => Ok(e.get().clone()), + Vacant(e) => Ok(e.insert((&item.store_config()).try_into()?).clone()), } } - fn collect_chunk_descriptions, I, S: Copy>( - &self, - chunk_descriptions: Vec, - shape: S, - ) -> PyResult> { - chunk_descriptions - .into_iter() - .map(|raw| { - // TODO: Prefer to get the store once, and assume it is the same for all chunks - let store = self.get_store_from_config(raw.store_config())?; - let path = raw.path(); - let key = StoreKey::new(path).map_py_err::()?; - raw.into_item(store, key, shape) - }) - .collect() + fn get(&self, item: &I) -> PyResult { + self.store(item)?.get(item.key()).map_py_err::() } fn retrieve_chunk_bytes<'a, I: ChunksItem>( + &self, item: &I, codec_chain: &CodecChain, codec_options: &CodecOptions, ) -> PyResult> { - let value_encoded = item.get().map_py_err::()?; + let value_encoded = self.get(item).map_py_err::()?; let value_decoded = if let Some(value_encoded) = value_encoded { let value_encoded: Vec = value_encoded.into(); // zero-copy in this case codec_chain @@ -105,6 +89,7 @@ impl CodecPipelineImpl { } fn store_chunk_bytes( + &self, item: &I, codec_chain: &CodecChain, value_decoded: ArrayBytes, @@ -118,7 +103,7 @@ impl CodecPipelineImpl { .map_py_err::()?; if value_decoded.is_fill_value(item.representation().fill_value()) { - item.store().erase(item.key()) + self.store(item)?.erase(item.key()) } else { let value_encoded = codec_chain .encode(value_decoded, item.representation(), codec_options) @@ -126,40 +111,38 @@ impl CodecPipelineImpl { .map_py_err::()?; // Store the encoded chunk - item.store().set(item.key(), value_encoded.into()) + self.store(item)?.set(item.key(), value_encoded.into()) } .map_py_err::() } fn store_chunk_subset_bytes( + &self, item: &I, codec_chain: &CodecChain, chunk_subset_bytes: ArrayBytes, chunk_subset: &ArraySubset, codec_options: &CodecOptions, ) -> PyResult<()> { - if !chunk_subset.inbounds(&item.representation().shape_u64()) { - return Err(PyErr::new::( - "chunk subset is out of bounds".to_string(), - )); + let array_shape = item.representation().shape_u64(); + if !chunk_subset.inbounds(&array_shape) { + return Err(PyErr::new::(format!( + "chunk subset ({chunk_subset}) is out of bounds for array shape ({array_shape:?})" + ))); } + let data_type_size = item.representation().data_type().size(); - if chunk_subset.start().iter().all(|&o| o == 0) - && chunk_subset.shape() == item.representation().shape_u64() - { + if chunk_subset.start().iter().all(|&o| o == 0) && chunk_subset.shape() == array_shape { // Fast path if the chunk subset spans the entire chunk, no read required - Self::store_chunk_bytes(item, codec_chain, chunk_subset_bytes, codec_options) + self.store_chunk_bytes(item, codec_chain, chunk_subset_bytes, codec_options) } else { // Validate the chunk subset bytes chunk_subset_bytes - .validate( - chunk_subset.num_elements(), - item.representation().data_type().size(), - ) + .validate(chunk_subset.num_elements(), data_type_size) .map_py_err::()?; // Retrieve the chunk - let chunk_bytes_old = Self::retrieve_chunk_bytes(item, codec_chain, codec_options)?; + let chunk_bytes_old = self.retrieve_chunk_bytes(item, codec_chain, codec_options)?; // Update the chunk let chunk_bytes_new = unsafe { @@ -170,15 +153,15 @@ impl CodecPipelineImpl { // - output bytes and output subset bytes are compatible (same data type) update_array_bytes( chunk_bytes_old, - &item.representation().shape_u64(), + &array_shape, chunk_subset, &chunk_subset_bytes, - item.representation().data_type().size(), + data_type_size, ) }; // Store the updated chunk - Self::store_chunk_bytes(item, codec_chain, chunk_bytes_new, codec_options) + self.store_chunk_bytes(item, codec_chain, chunk_bytes_new, codec_options) } } @@ -270,8 +253,8 @@ impl CodecPipelineImpl { let num_threads = num_threads.unwrap_or(rayon::current_num_threads()); Ok(Self { + stores: Mutex::default(), codec_chain, - store: Mutex::new(None), codec_options, chunk_concurrent_minimum, chunk_concurrent_maximum, @@ -282,7 +265,7 @@ impl CodecPipelineImpl { fn retrieve_chunks_and_apply_index( &self, py: Python, - chunk_descriptions: Vec, // FIXME: Ref / iterable? + chunk_descriptions: Vec, // FIXME: Ref / iterable? value: &Bound<'_, PyUntypedArray>, ) -> PyResult<()> { // Get input array @@ -293,8 +276,6 @@ impl CodecPipelineImpl { } let output = Self::nparray_to_unsafe_cell_slice(value); let output_shape: Vec = value.shape_zarr()?; - let chunk_descriptions = - self.collect_chunk_descriptions(chunk_descriptions, &output_shape)?; // Adjust the concurrency based on the codec chain and the first chunk description let Some((chunk_concurrent_limit, codec_options)) = @@ -310,7 +291,7 @@ impl CodecPipelineImpl { && item.chunk_subset.shape() == item.representation().shape_u64() { // See zarrs::array::Array::retrieve_chunk_into - let chunk_encoded = item.get().map_py_err::()?; + let chunk_encoded = self.get(&item)?; if let Some(chunk_encoded) = chunk_encoded { // Decode the encoded data into the output buffer let chunk_encoded: Vec = chunk_encoded.into(); @@ -345,7 +326,7 @@ impl CodecPipelineImpl { } } else { // Partially decode the chunk into the output buffer - let storage_handle = Arc::new(StorageHandle::new(item.store().clone())); + let storage_handle = Arc::new(StorageHandle::new(self.store(&item)?)); // NOTE: Normally a storage transformer would exist between the storage handle and the input handle // but zarr-python does not support them nor forward them to the codec pipeline let input_handle = Arc::new(StoragePartialDecoder::new( @@ -388,10 +369,8 @@ impl CodecPipelineImpl { fn retrieve_chunks<'py>( &self, py: Python<'py>, - chunk_descriptions: Vec, // FIXME: Ref / iterable? + chunk_descriptions: Vec, // FIXME: Ref / iterable? ) -> PyResult>>> { - let chunk_descriptions = self.collect_chunk_descriptions(chunk_descriptions, ())?; - // Adjust the concurrency based on the codec chain and the first chunk description let Some((chunk_concurrent_limit, codec_options)) = chunk_descriptions.get_chunk_concurrent_limit_and_codec_options(self)? @@ -401,7 +380,7 @@ impl CodecPipelineImpl { let chunk_bytes = py.allow_threads(move || { let get_chunk_subset = |item: chunk_item::Basic| { - let chunk_encoded = item.get().map_py_err::()?; + let chunk_encoded = self.get(&item).map_py_err::()?; Ok(if let Some(chunk_encoded) = chunk_encoded { let chunk_encoded: Vec = chunk_encoded.into(); self.codec_chain @@ -439,7 +418,7 @@ impl CodecPipelineImpl { fn store_chunks_with_indices( &self, py: Python, - chunk_descriptions: Vec, + chunk_descriptions: Vec, value: &Bound<'_, PyUntypedArray>, ) -> PyResult<()> { enum InputValue<'a> { @@ -460,10 +439,7 @@ impl CodecPipelineImpl { } else { InputValue::Constant(FillValue::new(input_slice.to_vec())) }; - let input_shape: Vec = value.shape_zarr()?; - let chunk_descriptions = - self.collect_chunk_descriptions(chunk_descriptions, &input_shape)?; // Adjust the concurrency based on the codec chain and the first chunk description let Some((chunk_concurrent_limit, codec_options)) = @@ -482,7 +458,7 @@ impl CodecPipelineImpl { item.item.representation().data_type(), ) .map_py_err::()?; - Self::store_chunk_subset_bytes( + self.store_chunk_subset_bytes( &item, &self.codec_chain, chunk_subset_bytes, @@ -499,7 +475,7 @@ impl CodecPipelineImpl { constant_value, ); - Self::store_chunk_subset_bytes( + self.store_chunk_subset_bytes( &item, &self.codec_chain, chunk_subset_bytes, @@ -526,6 +502,8 @@ impl CodecPipelineImpl { fn _internal(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add("__version__", env!("CARGO_PKG_VERSION"))?; m.add_class::()?; + m.add_class::()?; + m.add_class::()?; Ok(()) } diff --git a/src/store.rs b/src/store.rs index a44f77a..3eeb0b8 100644 --- a/src/store.rs +++ b/src/store.rs @@ -19,6 +19,7 @@ use crate::{runtime::tokio_block_on, utils::PyErrExt}; mod filesystem; mod http; +#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] #[gen_stub_pyclass_enum] pub enum StoreConfig { Filesystem(FilesystemStoreConfig), diff --git a/src/store/filesystem.rs b/src/store/filesystem.rs index 9bf5b16..8ee865b 100644 --- a/src/store/filesystem.rs +++ b/src/store/filesystem.rs @@ -6,7 +6,7 @@ use zarrs::{filesystem::FilesystemStore, storage::ReadableWritableListableStorag use crate::utils::PyErrExt; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] #[gen_stub_pyclass] #[pyclass] pub struct FilesystemStoreConfig { diff --git a/src/store/http.rs b/src/store/http.rs index 725d5a4..0c7820b 100644 --- a/src/store/http.rs +++ b/src/store/http.rs @@ -6,7 +6,7 @@ use zarrs::storage::ReadableWritableListableStorage; use super::opendal_builder_to_sync_store; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] #[gen_stub_pyclass] #[pyclass] pub struct HttpStoreConfig { diff --git a/src/tests.rs b/src/tests.rs index 355e8ec..2cf4570 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -1,3 +1,5 @@ +use pyo3::ffi::c_str; + use numpy::PyUntypedArray; use pyo3::{ types::{PyAnyMethods, PyModule}, @@ -10,13 +12,15 @@ use crate::CodecPipelineImpl; fn test_nparray_to_unsafe_cell_slice_empty() -> PyResult<()> { pyo3::prepare_freethreaded_python(); Python::with_gil(|py| { - let arr: Bound<'_, PyUntypedArray> = PyModule::from_code_bound( + let arr: Bound<'_, PyUntypedArray> = PyModule::from_code( py, - "def empty_array(): + c_str!( + "def empty_array(): import numpy as np - return np.empty(0, dtype=np.uint8)", - "", - "", + return np.empty(0, dtype=np.uint8)" + ), + c_str!(""), + c_str!(""), )? .getattr("empty_array")? .call0()?