From d7ce6de433bb862aac78f7e502752607a28f6d8c Mon Sep 17 00:00:00 2001 From: Phil Schaf Date: Thu, 25 Sep 2025 10:13:50 +0200 Subject: [PATCH 1/3] WIP map py err --- src/chunk_item.rs | 8 ++++---- src/concurrency.rs | 4 ++-- src/lib.rs | 39 ++++++++++++++++++++------------------- src/map_py_err.rs | 0 src/store.rs | 4 ++-- src/store/filesystem.rs | 4 ++-- src/utils.rs | 16 ++++++++-------- 7 files changed, 38 insertions(+), 37 deletions(-) create mode 100644 src/map_py_err.rs diff --git a/src/chunk_item.rs b/src/chunk_item.rs index 2eb6398..5de922b 100644 --- a/src/chunk_item.rs +++ b/src/chunk_item.rs @@ -14,7 +14,7 @@ use zarrs::{ storage::StoreKey, }; -use crate::utils::PyErrExt; +use crate::utils::PyErrStrExt; pub(crate) trait ChunksItem { fn key(&self) -> &StoreKey; @@ -76,7 +76,7 @@ impl Basic { let fill_value: Bound<'_, PyAny> = chunk_spec.getattr("fill_value")?; let fill_value_bytes = fill_value_to_bytes(&dtype, &fill_value)?; Ok(Self { - key: StoreKey::new(path).map_py_err::()?, + key: StoreKey::new(path).map_py_err_from_str::()?, representation: get_chunk_representation(chunk_shape, &dtype, fill_value_bytes)?, }) } @@ -148,14 +148,14 @@ fn get_chunk_representation( &MetadataV3::new(dtype), zarrs::config::global_config().data_type_aliases_v3(), ) - .map_py_err::()?; + .map_py_err_from_str::()?; let chunk_shape = chunk_shape .into_iter() .map(|x| NonZeroU64::new(x).expect("chunk shapes should always be non-zero")) .collect(); let chunk_representation = ChunkRepresentation::new(chunk_shape, data_type, FillValue::new(fill_value)) - .map_py_err::()?; + .map_py_err_from_str::()?; Ok(chunk_representation) } diff --git a/src/concurrency.rs b/src/concurrency.rs index 6eba7e3..191b295 100644 --- a/src/concurrency.rs +++ b/src/concurrency.rs @@ -4,7 +4,7 @@ use zarrs::array::{ RecommendedConcurrency, }; -use crate::{chunk_item::ChunksItem, utils::PyCodecErrExt as _, CodecPipelineImpl}; +use crate::{chunk_item::ChunksItem, utils::PyErrExt as _, CodecPipelineImpl}; pub trait ChunkConcurrentLimitAndCodecOptions { fn get_chunk_concurrent_limit_and_codec_options( @@ -30,7 +30,7 @@ where let codec_concurrency = codec_pipeline_impl .codec_chain .recommended_concurrency(chunk_representation) - .map_codec_err()?; + .map_py_err()?; let min_concurrent_chunks = std::cmp::min(codec_pipeline_impl.chunk_concurrent_minimum, num_chunks); diff --git a/src/lib.rs b/src/lib.rs index 54e850f..53e3964 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -41,12 +41,13 @@ mod runtime; mod store; #[cfg(test)] mod tests; +mod map_py_err; mod utils; use crate::chunk_item::ChunksItem; use crate::concurrency::ChunkConcurrentLimitAndCodecOptions; use crate::store::StoreConfig; -use crate::utils::{PyCodecErrExt, PyErrExt as _, PyUntypedArrayExt as _}; +use crate::utils::{PyErrExt, PyErrStrExt as _, PyUntypedArrayExt as _}; // TODO: Use a OnceLock for store with get_or_try_init when stabilised? #[gen_stub_pyclass] @@ -67,12 +68,12 @@ impl CodecPipelineImpl { codec_chain: &CodecChain, codec_options: &CodecOptions, ) -> PyResult> { - let value_encoded = self.store.get(item.key()).map_py_err::()?; + let value_encoded = self.store.get(item.key()).map_py_err_from_str::()?; let value_decoded = if let Some(value_encoded) = value_encoded { let value_encoded: Vec = value_encoded.into(); // zero-copy in this case codec_chain .decode(value_encoded.into(), item.representation(), codec_options) - .map_codec_err()? + .map_py_err()? } else { let array_size = ArraySize::new( item.representation().data_type().size(), @@ -95,20 +96,20 @@ impl CodecPipelineImpl { item.representation().num_elements(), item.representation().data_type().size(), ) - .map_codec_err()?; + .map_py_err()?; if value_decoded.is_fill_value(item.representation().fill_value()) { - self.store.erase(item.key()).map_py_err::() + self.store.erase(item.key()).map_py_err_from_str::() } else { let value_encoded = codec_chain .encode(value_decoded, item.representation(), codec_options) .map(Cow::into_owned) - .map_codec_err()?; + .map_py_err()?; // Store the encoded chunk self.store .set(item.key(), value_encoded.into()) - .map_py_err::() + .map_py_err_from_str::() } } @@ -135,7 +136,7 @@ impl CodecPipelineImpl { // Validate the chunk subset bytes chunk_subset_bytes .validate(chunk_subset.num_elements(), data_type_size) - .map_codec_err()?; + .map_py_err()?; // Retrieve the chunk let chunk_bytes_old = self.retrieve_chunk_bytes(item, codec_chain, codec_options)?; @@ -148,7 +149,7 @@ impl CodecPipelineImpl { &chunk_subset_bytes, data_type_size, ) - .map_codec_err()?; + .map_py_err()?; // Store the updated chunk self.store_chunk_bytes(item, codec_chain, chunk_bytes_new, codec_options) @@ -257,11 +258,11 @@ impl CodecPipelineImpl { num_threads: Option, ) -> PyResult { let metadata: ArrayMetadata = - serde_json::from_str(array_metadata).map_py_err::()?; + serde_json::from_str(array_metadata).map_py_err_from_str::()?; let codec_metadata = - array_metadata_to_codec_metadata_v3(metadata).map_py_err::()?; + array_metadata_to_codec_metadata_v3(metadata).map_py_err_from_str::()?; let codec_chain = - Arc::new(CodecChain::from_metadata(&codec_metadata).map_py_err::()?); + Arc::new(CodecChain::from_metadata(&codec_metadata).map_py_err_from_str::()?); let mut codec_options = CodecOptionsBuilder::new(); if let Some(validate_checksums) = validate_checksums { @@ -276,7 +277,7 @@ impl CodecPipelineImpl { let num_threads = num_threads.unwrap_or(rayon::current_num_threads()); let store: ReadableWritableListableStorage = - (&store_config).try_into().map_py_err::()?; + (&store_config).try_into().map_py_err_from_str::()?; Ok(Self { store, @@ -330,7 +331,7 @@ impl CodecPipelineImpl { item.representation(), &codec_options, ) - .map_codec_err()?; + .map_py_err()?; Ok((item.key().clone(), partial_decoder)) } ) @@ -359,11 +360,11 @@ impl CodecPipelineImpl { .data_type() .fixed_size() .ok_or("variable length data type not supported") - .map_py_err::()?, + .map_py_err_from_str::()?, &output_shape, subset, ) - .map_py_err::()? + .map_py_err_from_str::()? }; // See zarrs::array::Array::retrieve_chunk_subset_into @@ -372,7 +373,7 @@ impl CodecPipelineImpl { { // See zarrs::array::Array::retrieve_chunk_into if let Some(chunk_encoded) = - self.store.get(item.key()).map_py_err::()? + self.store.get(item.key()).map_py_err_from_str::()? { // Decode the encoded data into the output buffer let chunk_encoded: Vec = chunk_encoded.into(); @@ -401,7 +402,7 @@ impl CodecPipelineImpl { &codec_options, ) } - .map_codec_err() + .map_py_err() }; iter_concurrent_limit!( @@ -454,7 +455,7 @@ impl CodecPipelineImpl { &input_shape, item.item.representation().data_type(), ) - .map_codec_err()?; + .map_py_err()?; self.store_chunk_subset_bytes( &item, &self.codec_chain, diff --git a/src/map_py_err.rs b/src/map_py_err.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/store.rs b/src/store.rs index 9fa1c92..561b9c8 100644 --- a/src/store.rs +++ b/src/store.rs @@ -10,7 +10,7 @@ use zarrs::storage::{ storage_adapter::async_to_sync::AsyncToSyncStorageAdapter, ReadableWritableListableStorage, }; -use crate::{runtime::tokio_block_on, utils::PyErrExt}; +use crate::{runtime::tokio_block_on, utils::PyErrStrExt}; mod filesystem; mod http; @@ -79,7 +79,7 @@ fn opendal_builder_to_sync_store( builder: B, ) -> PyResult { let operator = opendal::Operator::new(builder) - .map_py_err::()? + .map_py_err_from_str::()? .finish(); let store = Arc::new(zarrs_opendal::AsyncOpendalStore::new(operator)); let store = Arc::new(AsyncToSyncStorageAdapter::new(store, tokio_block_on())); diff --git a/src/store/filesystem.rs b/src/store/filesystem.rs index e5e8091..de973f2 100644 --- a/src/store/filesystem.rs +++ b/src/store/filesystem.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use pyo3::{exceptions::PyRuntimeError, PyErr}; use zarrs::{filesystem::FilesystemStore, storage::ReadableWritableListableStorage}; -use crate::utils::PyErrExt; +use crate::utils::PyErrStrExt; #[derive(Debug, Clone)] pub struct FilesystemStoreConfig { @@ -21,7 +21,7 @@ impl TryInto for &FilesystemStoreConfig { fn try_into(self) -> Result { let store = - Arc::new(FilesystemStore::new(self.root.clone()).map_py_err::()?); + Arc::new(FilesystemStore::new(self.root.clone()).map_py_err_from_str::()?); Ok(store) } } diff --git a/src/utils.rs b/src/utils.rs index eda2aa0..e072792 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -6,22 +6,22 @@ use zarrs::array::codec::CodecError; use crate::{ChunksItem, WithSubset}; -pub(crate) trait PyErrExt { - fn map_py_err(self) -> PyResult; +pub(crate) trait PyErrStrExt { + fn map_py_err_from_str(self) -> PyResult; } -impl PyErrExt for Result { - fn map_py_err(self) -> PyResult { +impl PyErrStrExt for Result { + fn map_py_err_from_str(self) -> PyResult { self.map_err(|e| PyErr::new::(format!("{e}"))) } } -pub(crate) trait PyCodecErrExt { - fn map_codec_err(self) -> PyResult; +pub(crate) trait PyErrExt { + fn map_py_err(self) -> PyResult; } -impl PyCodecErrExt for Result { - fn map_codec_err(self) -> PyResult { +impl PyErrExt for Result { + fn map_py_err(self) -> PyResult { // see https://docs.python.org/3/library/exceptions.html#exception-hierarchy self.map_err(|e| match e { // requested indexing operation doesn’t match shape From 7ac125e6e0f65f6d25cd0efb18a0d0ff7f8a7788 Mon Sep 17 00:00:00 2001 From: Phil Schaf Date: Thu, 25 Sep 2025 10:33:18 +0200 Subject: [PATCH 2/3] move out --- src/chunk_item.rs | 2 +- src/concurrency.rs | 3 +- src/lib.rs | 113 +++++++++++++++++++++++++--------------- src/map_py_err.rs | 36 +++++++++++++ src/store.rs | 3 +- src/store/filesystem.rs | 7 +-- src/utils.rs | 37 +------------ 7 files changed, 117 insertions(+), 84 deletions(-) diff --git a/src/chunk_item.rs b/src/chunk_item.rs index 5de922b..1a69310 100644 --- a/src/chunk_item.rs +++ b/src/chunk_item.rs @@ -14,7 +14,7 @@ use zarrs::{ storage::StoreKey, }; -use crate::utils::PyErrStrExt; +use crate::map_py_err::PyErrStrExt as _; pub(crate) trait ChunksItem { fn key(&self) -> &StoreKey; diff --git a/src/concurrency.rs b/src/concurrency.rs index 191b295..a1e6a46 100644 --- a/src/concurrency.rs +++ b/src/concurrency.rs @@ -4,7 +4,8 @@ use zarrs::array::{ RecommendedConcurrency, }; -use crate::{chunk_item::ChunksItem, utils::PyErrExt as _, CodecPipelineImpl}; +use crate::map_py_err::PyErrExt as _; +use crate::{chunk_item::ChunksItem, CodecPipelineImpl}; pub trait ChunkConcurrentLimitAndCodecOptions { fn get_chunk_concurrent_limit_and_codec_options( diff --git a/src/lib.rs b/src/lib.rs index 53e3964..6e3a0ce 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -37,17 +37,18 @@ use zarrs::storage::{ReadableWritableListableStorage, StorageHandle, StoreKey}; mod chunk_item; mod concurrency; +mod map_py_err; mod runtime; mod store; #[cfg(test)] mod tests; -mod map_py_err; mod utils; use crate::chunk_item::ChunksItem; use crate::concurrency::ChunkConcurrentLimitAndCodecOptions; +use crate::map_py_err::{PyErrExt as _, PyErrStrExt as _}; use crate::store::StoreConfig; -use crate::utils::{PyErrExt, PyErrStrExt as _, PyUntypedArrayExt as _}; +use crate::utils::PyUntypedArrayExt as _; // TODO: Use a OnceLock for store with get_or_try_init when stabilised? #[gen_stub_pyclass] @@ -68,7 +69,10 @@ impl CodecPipelineImpl { codec_chain: &CodecChain, codec_options: &CodecOptions, ) -> PyResult> { - let value_encoded = self.store.get(item.key()).map_py_err_from_str::()?; + let value_encoded = self + .store + .get(item.key()) + .map_py_err_from_str::()?; let value_decoded = if let Some(value_encoded) = value_encoded { let value_encoded: Vec = value_encoded.into(); // zero-copy in this case codec_chain @@ -99,7 +103,9 @@ impl CodecPipelineImpl { .map_py_err()?; if value_decoded.is_fill_value(item.representation().fill_value()) { - self.store.erase(item.key()).map_py_err_from_str::() + self.store + .erase(item.key()) + .map_py_err_from_str::() } else { let value_encoded = codec_chain .encode(value_decoded, item.representation(), codec_options) @@ -239,6 +245,7 @@ fn array_metadata_to_codec_metadata_v3( #[gen_stub_pymethods] #[pymethods] impl CodecPipelineImpl { + #[allow(clippy::needless_pass_by_value)] #[pyo3(signature = ( array_metadata, store_config, @@ -261,8 +268,9 @@ impl CodecPipelineImpl { serde_json::from_str(array_metadata).map_py_err_from_str::()?; let codec_metadata = array_metadata_to_codec_metadata_v3(metadata).map_py_err_from_str::()?; - let codec_chain = - Arc::new(CodecChain::from_metadata(&codec_metadata).map_py_err_from_str::()?); + let codec_chain = Arc::new( + CodecChain::from_metadata(&codec_metadata).map_py_err_from_str::()?, + ); let mut codec_options = CodecOptionsBuilder::new(); if let Some(validate_checksums) = validate_checksums { @@ -276,8 +284,9 @@ impl CodecPipelineImpl { chunk_concurrent_maximum.unwrap_or(rayon::current_num_threads()); let num_threads = num_threads.unwrap_or(rayon::current_num_threads()); - let store: ReadableWritableListableStorage = - (&store_config).try_into().map_py_err_from_str::()?; + let store: ReadableWritableListableStorage = (&store_config) + .try_into() + .map_py_err_from_str::()?; Ok(Self { store, @@ -306,38 +315,12 @@ impl CodecPipelineImpl { return Ok(()); }; - // Assemble partial decoders ahead of time and in parallel - let partial_chunk_descriptions = chunk_descriptions - .iter() - .filter(|item| !(is_whole_chunk(item))) - .unique_by(|item| item.key()) - .collect::>(); - let mut partial_decoder_cache: HashMap> = - HashMap::new(); - if !partial_chunk_descriptions.is_empty() { - let key_decoder_pairs = iter_concurrent_limit!( - chunk_concurrent_limit, - partial_chunk_descriptions, - map, - |item| { - let storage_handle = Arc::new(StorageHandle::new(self.store.clone())); - let input_handle = - StoragePartialDecoder::new(storage_handle, item.key().clone()); - let partial_decoder = self - .codec_chain - .clone() - .partial_decoder( - Arc::new(input_handle), - item.representation(), - &codec_options, - ) - .map_py_err()?; - Ok((item.key().clone(), partial_decoder)) - } - ) - .collect::>>()?; - partial_decoder_cache.extend(key_decoder_pairs); - } + // Assemble partial decoders ahead of time + let partial_decoder_cache = self.assemble_partial_decoders( + chunk_descriptions.as_ref(), + chunk_concurrent_limit, + &codec_options, + )?; py.allow_threads(move || { // FIXME: the `decode_into` methods only support fixed length data types. @@ -372,8 +355,10 @@ impl CodecPipelineImpl { && chunk_subset.shape() == item.representation().shape_u64() { // See zarrs::array::Array::retrieve_chunk_into - if let Some(chunk_encoded) = - self.store.get(item.key()).map_py_err_from_str::()? + if let Some(chunk_encoded) = self + .store + .get(item.key()) + .map_py_err_from_str::()? { // Decode the encoded data into the output buffer let chunk_encoded: Vec = chunk_encoded.into(); @@ -495,6 +480,50 @@ impl CodecPipelineImpl { } } +impl CodecPipelineImpl { + /// Assemble partial decoders in parallel + fn assemble_partial_decoders( + &self, + chunk_descriptions: &[chunk_item::WithSubset], + chunk_concurrent_limit: usize, + codec_options: &CodecOptions, + ) -> PyResult>> { + let partial_chunk_descriptions = chunk_descriptions + .iter() + .filter(|item| !(is_whole_chunk(item))) + .unique_by(|item| item.key()) + .collect::>(); + let mut partial_decoder_cache: HashMap> = + HashMap::new(); + if !partial_chunk_descriptions.is_empty() { + let key_decoder_pairs = iter_concurrent_limit!( + chunk_concurrent_limit, + partial_chunk_descriptions, + map, + |item| { + let storage_handle = Arc::new(StorageHandle::new(self.store.clone())); + let input_handle = + StoragePartialDecoder::new(storage_handle, item.key().clone()); + let partial_decoder = self + .codec_chain + .clone() + .partial_decoder( + Arc::new(input_handle), + item.representation(), + codec_options, + ) + .map_py_err()?; + Ok((item.key().clone(), partial_decoder)) + } + ) + .collect::>>()?; + partial_decoder_cache.extend(key_decoder_pairs); + } + + Ok(partial_decoder_cache) + } +} + /// A Python module implemented in Rust. #[pymodule] fn _internal(m: &Bound<'_, PyModule>) -> PyResult<()> { diff --git a/src/map_py_err.rs b/src/map_py_err.rs index e69de29..c397d8e 100644 --- a/src/map_py_err.rs +++ b/src/map_py_err.rs @@ -0,0 +1,36 @@ +use std::fmt::Display; + +use pyo3::{PyErr, PyResult, PyTypeInfo}; +use zarrs::array::codec::CodecError; + +pub(crate) trait PyErrStrExt { + fn map_py_err_from_str(self) -> PyResult; +} + +impl PyErrStrExt for Result { + fn map_py_err_from_str(self) -> PyResult { + self.map_err(|e| PyErr::new::(format!("{e}"))) + } +} + +pub(crate) trait PyErrExt { + fn map_py_err(self) -> PyResult; +} + +impl PyErrExt for Result { + fn map_py_err(self) -> PyResult { + // see https://docs.python.org/3/library/exceptions.html#exception-hierarchy + self.map_err(|e| match e { + // requested indexing operation doesn’t match shape + CodecError::IncompatibleIndexer(_) + | CodecError::IncompatibleDimensionalityError(_) + | CodecError::InvalidByteRangeError(_) => { + PyErr::new::(format!("{e}")) + } + // some pipe, file, or subprocess failed + CodecError::IOError(_) => PyErr::new::(format!("{e}")), + // all the rest: some unknown runtime problem + e => PyErr::new::(format!("{e}")), + }) + } +} diff --git a/src/store.rs b/src/store.rs index 561b9c8..1adf90d 100644 --- a/src/store.rs +++ b/src/store.rs @@ -10,7 +10,8 @@ use zarrs::storage::{ storage_adapter::async_to_sync::AsyncToSyncStorageAdapter, ReadableWritableListableStorage, }; -use crate::{runtime::tokio_block_on, utils::PyErrStrExt}; +use crate::map_py_err::PyErrStrExt as _; +use crate::runtime::tokio_block_on; mod filesystem; mod http; diff --git a/src/store/filesystem.rs b/src/store/filesystem.rs index de973f2..21927ea 100644 --- a/src/store/filesystem.rs +++ b/src/store/filesystem.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use pyo3::{exceptions::PyRuntimeError, PyErr}; use zarrs::{filesystem::FilesystemStore, storage::ReadableWritableListableStorage}; -use crate::utils::PyErrStrExt; +use crate::map_py_err::PyErrStrExt as _; #[derive(Debug, Clone)] pub struct FilesystemStoreConfig { @@ -20,8 +20,9 @@ impl TryInto for &FilesystemStoreConfig { type Error = PyErr; fn try_into(self) -> Result { - let store = - Arc::new(FilesystemStore::new(self.root.clone()).map_py_err_from_str::()?); + let store = Arc::new( + FilesystemStore::new(self.root.clone()).map_py_err_from_str::()?, + ); Ok(store) } } diff --git a/src/utils.rs b/src/utils.rs index e072792..e8d052f 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,43 +1,8 @@ -use std::fmt::Display; - use numpy::{PyUntypedArray, PyUntypedArrayMethods}; -use pyo3::{Bound, PyErr, PyResult, PyTypeInfo}; -use zarrs::array::codec::CodecError; +use pyo3::{Bound, PyResult}; use crate::{ChunksItem, WithSubset}; -pub(crate) trait PyErrStrExt { - fn map_py_err_from_str(self) -> PyResult; -} - -impl PyErrStrExt for Result { - fn map_py_err_from_str(self) -> PyResult { - self.map_err(|e| PyErr::new::(format!("{e}"))) - } -} - -pub(crate) trait PyErrExt { - fn map_py_err(self) -> PyResult; -} - -impl PyErrExt for Result { - fn map_py_err(self) -> PyResult { - // see https://docs.python.org/3/library/exceptions.html#exception-hierarchy - self.map_err(|e| match e { - // requested indexing operation doesn’t match shape - CodecError::IncompatibleIndexer(_) - | CodecError::IncompatibleDimensionalityError(_) - | CodecError::InvalidByteRangeError(_) => { - PyErr::new::(format!("{e}")) - } - // some pipe, file, or subprocess failed - CodecError::IOError(_) => PyErr::new::(format!("{e}")), - // all the rest: some unknown runtime problem - e => PyErr::new::(format!("{e}")), - }) - } -} - pub(crate) trait PyUntypedArrayExt { fn shape_zarr(&self) -> PyResult>; } From 2cc0d7052cd929e8191b142727a749ae218e7f5d Mon Sep 17 00:00:00 2001 From: Phil Schaf Date: Thu, 25 Sep 2025 10:37:57 +0200 Subject: [PATCH 3/3] mvoe --- src/lib.rs | 90 ++++++++++++++++++++++++++---------------------------- 1 file changed, 44 insertions(+), 46 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 6e3a0ce..e40b3fc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -176,7 +176,7 @@ impl CodecPipelineImpl { array_object } - fn nparray_to_slice<'a>(value: &'a Bound<'_, PyUntypedArray>) -> Result<&'a [u8], PyErr> { + fn nparray_to_slice<'a>(value: &'a Bound<'_, PyUntypedArray>) -> PyResult<&'a [u8]> { if !value.is_c_contiguous() { return Err(PyErr::new::( "input array must be a C contiguous array".to_string(), @@ -195,7 +195,7 @@ impl CodecPipelineImpl { fn nparray_to_unsafe_cell_slice<'a>( value: &'a Bound<'_, PyUntypedArray>, - ) -> Result, PyErr> { + ) -> PyResult> { if !value.is_c_contiguous() { return Err(PyErr::new::( "input array must be a C contiguous array".to_string(), @@ -211,6 +211,48 @@ impl CodecPipelineImpl { }; Ok(UnsafeCellSlice::new(output)) } + + /// Assemble partial decoders in parallel + fn assemble_partial_decoders( + &self, + chunk_descriptions: &[chunk_item::WithSubset], + chunk_concurrent_limit: usize, + codec_options: &CodecOptions, + ) -> PyResult>> { + let partial_chunk_descriptions = chunk_descriptions + .iter() + .filter(|item| !(is_whole_chunk(item))) + .unique_by(|item| item.key()) + .collect::>(); + let mut partial_decoder_cache: HashMap> = + HashMap::new(); + if !partial_chunk_descriptions.is_empty() { + let key_decoder_pairs = iter_concurrent_limit!( + chunk_concurrent_limit, + partial_chunk_descriptions, + map, + |item| { + let storage_handle = Arc::new(StorageHandle::new(self.store.clone())); + let input_handle = + StoragePartialDecoder::new(storage_handle, item.key().clone()); + let partial_decoder = self + .codec_chain + .clone() + .partial_decoder( + Arc::new(input_handle), + item.representation(), + codec_options, + ) + .map_py_err()?; + Ok((item.key().clone(), partial_decoder)) + } + ) + .collect::>>()?; + partial_decoder_cache.extend(key_decoder_pairs); + } + + Ok(partial_decoder_cache) + } } fn array_metadata_to_codec_metadata_v3( @@ -480,50 +522,6 @@ impl CodecPipelineImpl { } } -impl CodecPipelineImpl { - /// Assemble partial decoders in parallel - fn assemble_partial_decoders( - &self, - chunk_descriptions: &[chunk_item::WithSubset], - chunk_concurrent_limit: usize, - codec_options: &CodecOptions, - ) -> PyResult>> { - let partial_chunk_descriptions = chunk_descriptions - .iter() - .filter(|item| !(is_whole_chunk(item))) - .unique_by(|item| item.key()) - .collect::>(); - let mut partial_decoder_cache: HashMap> = - HashMap::new(); - if !partial_chunk_descriptions.is_empty() { - let key_decoder_pairs = iter_concurrent_limit!( - chunk_concurrent_limit, - partial_chunk_descriptions, - map, - |item| { - let storage_handle = Arc::new(StorageHandle::new(self.store.clone())); - let input_handle = - StoragePartialDecoder::new(storage_handle, item.key().clone()); - let partial_decoder = self - .codec_chain - .clone() - .partial_decoder( - Arc::new(input_handle), - item.representation(), - codec_options, - ) - .map_py_err()?; - Ok((item.key().clone(), partial_decoder)) - } - ) - .collect::>>()?; - partial_decoder_cache.extend(key_decoder_pairs); - } - - Ok(partial_decoder_cache) - } -} - /// A Python module implemented in Rust. #[pymodule] fn _internal(m: &Bound<'_, PyModule>) -> PyResult<()> {