diff --git a/src/chunk_item.rs b/src/chunk_item.rs index 2eb6398..1a69310 100644 --- a/src/chunk_item.rs +++ b/src/chunk_item.rs @@ -14,7 +14,7 @@ use zarrs::{ storage::StoreKey, }; -use crate::utils::PyErrExt; +use crate::map_py_err::PyErrStrExt as _; pub(crate) trait ChunksItem { fn key(&self) -> &StoreKey; @@ -76,7 +76,7 @@ impl Basic { let fill_value: Bound<'_, PyAny> = chunk_spec.getattr("fill_value")?; let fill_value_bytes = fill_value_to_bytes(&dtype, &fill_value)?; Ok(Self { - key: StoreKey::new(path).map_py_err::()?, + key: StoreKey::new(path).map_py_err_from_str::()?, representation: get_chunk_representation(chunk_shape, &dtype, fill_value_bytes)?, }) } @@ -148,14 +148,14 @@ fn get_chunk_representation( &MetadataV3::new(dtype), zarrs::config::global_config().data_type_aliases_v3(), ) - .map_py_err::()?; + .map_py_err_from_str::()?; let chunk_shape = chunk_shape .into_iter() .map(|x| NonZeroU64::new(x).expect("chunk shapes should always be non-zero")) .collect(); let chunk_representation = ChunkRepresentation::new(chunk_shape, data_type, FillValue::new(fill_value)) - .map_py_err::()?; + .map_py_err_from_str::()?; Ok(chunk_representation) } diff --git a/src/concurrency.rs b/src/concurrency.rs index 6eba7e3..a1e6a46 100644 --- a/src/concurrency.rs +++ b/src/concurrency.rs @@ -4,7 +4,8 @@ use zarrs::array::{ RecommendedConcurrency, }; -use crate::{chunk_item::ChunksItem, utils::PyCodecErrExt as _, CodecPipelineImpl}; +use crate::map_py_err::PyErrExt as _; +use crate::{chunk_item::ChunksItem, CodecPipelineImpl}; pub trait ChunkConcurrentLimitAndCodecOptions { fn get_chunk_concurrent_limit_and_codec_options( @@ -30,7 +31,7 @@ where let codec_concurrency = codec_pipeline_impl .codec_chain .recommended_concurrency(chunk_representation) - .map_codec_err()?; + .map_py_err()?; let min_concurrent_chunks = std::cmp::min(codec_pipeline_impl.chunk_concurrent_minimum, num_chunks); diff --git a/src/lib.rs b/src/lib.rs index 54e850f..e40b3fc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -37,6 +37,7 @@ use zarrs::storage::{ReadableWritableListableStorage, StorageHandle, StoreKey}; mod chunk_item; mod concurrency; +mod map_py_err; mod runtime; mod store; #[cfg(test)] @@ -45,8 +46,9 @@ mod utils; use crate::chunk_item::ChunksItem; use crate::concurrency::ChunkConcurrentLimitAndCodecOptions; +use crate::map_py_err::{PyErrExt as _, PyErrStrExt as _}; use crate::store::StoreConfig; -use crate::utils::{PyCodecErrExt, PyErrExt as _, PyUntypedArrayExt as _}; +use crate::utils::PyUntypedArrayExt as _; // TODO: Use a OnceLock for store with get_or_try_init when stabilised? #[gen_stub_pyclass] @@ -67,12 +69,15 @@ impl CodecPipelineImpl { codec_chain: &CodecChain, codec_options: &CodecOptions, ) -> PyResult> { - let value_encoded = self.store.get(item.key()).map_py_err::()?; + let value_encoded = self + .store + .get(item.key()) + .map_py_err_from_str::()?; let value_decoded = if let Some(value_encoded) = value_encoded { let value_encoded: Vec = value_encoded.into(); // zero-copy in this case codec_chain .decode(value_encoded.into(), item.representation(), codec_options) - .map_codec_err()? + .map_py_err()? } else { let array_size = ArraySize::new( item.representation().data_type().size(), @@ -95,20 +100,22 @@ impl CodecPipelineImpl { item.representation().num_elements(), item.representation().data_type().size(), ) - .map_codec_err()?; + .map_py_err()?; if value_decoded.is_fill_value(item.representation().fill_value()) { - self.store.erase(item.key()).map_py_err::() + self.store + .erase(item.key()) + .map_py_err_from_str::() } else { let value_encoded = codec_chain .encode(value_decoded, item.representation(), codec_options) .map(Cow::into_owned) - .map_codec_err()?; + .map_py_err()?; // Store the encoded chunk self.store .set(item.key(), value_encoded.into()) - .map_py_err::() + .map_py_err_from_str::() } } @@ -135,7 +142,7 @@ impl CodecPipelineImpl { // Validate the chunk subset bytes chunk_subset_bytes .validate(chunk_subset.num_elements(), data_type_size) - .map_codec_err()?; + .map_py_err()?; // Retrieve the chunk let chunk_bytes_old = self.retrieve_chunk_bytes(item, codec_chain, codec_options)?; @@ -148,7 +155,7 @@ impl CodecPipelineImpl { &chunk_subset_bytes, data_type_size, ) - .map_codec_err()?; + .map_py_err()?; // Store the updated chunk self.store_chunk_bytes(item, codec_chain, chunk_bytes_new, codec_options) @@ -169,7 +176,7 @@ impl CodecPipelineImpl { array_object } - fn nparray_to_slice<'a>(value: &'a Bound<'_, PyUntypedArray>) -> Result<&'a [u8], PyErr> { + fn nparray_to_slice<'a>(value: &'a Bound<'_, PyUntypedArray>) -> PyResult<&'a [u8]> { if !value.is_c_contiguous() { return Err(PyErr::new::( "input array must be a C contiguous array".to_string(), @@ -188,7 +195,7 @@ impl CodecPipelineImpl { fn nparray_to_unsafe_cell_slice<'a>( value: &'a Bound<'_, PyUntypedArray>, - ) -> Result, PyErr> { + ) -> PyResult> { if !value.is_c_contiguous() { return Err(PyErr::new::( "input array must be a C contiguous array".to_string(), @@ -204,6 +211,48 @@ impl CodecPipelineImpl { }; Ok(UnsafeCellSlice::new(output)) } + + /// Assemble partial decoders in parallel + fn assemble_partial_decoders( + &self, + chunk_descriptions: &[chunk_item::WithSubset], + chunk_concurrent_limit: usize, + codec_options: &CodecOptions, + ) -> PyResult>> { + let partial_chunk_descriptions = chunk_descriptions + .iter() + .filter(|item| !(is_whole_chunk(item))) + .unique_by(|item| item.key()) + .collect::>(); + let mut partial_decoder_cache: HashMap> = + HashMap::new(); + if !partial_chunk_descriptions.is_empty() { + let key_decoder_pairs = iter_concurrent_limit!( + chunk_concurrent_limit, + partial_chunk_descriptions, + map, + |item| { + let storage_handle = Arc::new(StorageHandle::new(self.store.clone())); + let input_handle = + StoragePartialDecoder::new(storage_handle, item.key().clone()); + let partial_decoder = self + .codec_chain + .clone() + .partial_decoder( + Arc::new(input_handle), + item.representation(), + codec_options, + ) + .map_py_err()?; + Ok((item.key().clone(), partial_decoder)) + } + ) + .collect::>>()?; + partial_decoder_cache.extend(key_decoder_pairs); + } + + Ok(partial_decoder_cache) + } } fn array_metadata_to_codec_metadata_v3( @@ -238,6 +287,7 @@ fn array_metadata_to_codec_metadata_v3( #[gen_stub_pymethods] #[pymethods] impl CodecPipelineImpl { + #[allow(clippy::needless_pass_by_value)] #[pyo3(signature = ( array_metadata, store_config, @@ -257,11 +307,12 @@ impl CodecPipelineImpl { num_threads: Option, ) -> PyResult { let metadata: ArrayMetadata = - serde_json::from_str(array_metadata).map_py_err::()?; + serde_json::from_str(array_metadata).map_py_err_from_str::()?; let codec_metadata = - array_metadata_to_codec_metadata_v3(metadata).map_py_err::()?; - let codec_chain = - Arc::new(CodecChain::from_metadata(&codec_metadata).map_py_err::()?); + array_metadata_to_codec_metadata_v3(metadata).map_py_err_from_str::()?; + let codec_chain = Arc::new( + CodecChain::from_metadata(&codec_metadata).map_py_err_from_str::()?, + ); let mut codec_options = CodecOptionsBuilder::new(); if let Some(validate_checksums) = validate_checksums { @@ -275,8 +326,9 @@ impl CodecPipelineImpl { chunk_concurrent_maximum.unwrap_or(rayon::current_num_threads()); let num_threads = num_threads.unwrap_or(rayon::current_num_threads()); - let store: ReadableWritableListableStorage = - (&store_config).try_into().map_py_err::()?; + let store: ReadableWritableListableStorage = (&store_config) + .try_into() + .map_py_err_from_str::()?; Ok(Self { store, @@ -305,38 +357,12 @@ impl CodecPipelineImpl { return Ok(()); }; - // Assemble partial decoders ahead of time and in parallel - let partial_chunk_descriptions = chunk_descriptions - .iter() - .filter(|item| !(is_whole_chunk(item))) - .unique_by(|item| item.key()) - .collect::>(); - let mut partial_decoder_cache: HashMap> = - HashMap::new(); - if !partial_chunk_descriptions.is_empty() { - let key_decoder_pairs = iter_concurrent_limit!( - chunk_concurrent_limit, - partial_chunk_descriptions, - map, - |item| { - let storage_handle = Arc::new(StorageHandle::new(self.store.clone())); - let input_handle = - StoragePartialDecoder::new(storage_handle, item.key().clone()); - let partial_decoder = self - .codec_chain - .clone() - .partial_decoder( - Arc::new(input_handle), - item.representation(), - &codec_options, - ) - .map_codec_err()?; - Ok((item.key().clone(), partial_decoder)) - } - ) - .collect::>>()?; - partial_decoder_cache.extend(key_decoder_pairs); - } + // Assemble partial decoders ahead of time + let partial_decoder_cache = self.assemble_partial_decoders( + chunk_descriptions.as_ref(), + chunk_concurrent_limit, + &codec_options, + )?; py.allow_threads(move || { // FIXME: the `decode_into` methods only support fixed length data types. @@ -359,11 +385,11 @@ impl CodecPipelineImpl { .data_type() .fixed_size() .ok_or("variable length data type not supported") - .map_py_err::()?, + .map_py_err_from_str::()?, &output_shape, subset, ) - .map_py_err::()? + .map_py_err_from_str::()? }; // See zarrs::array::Array::retrieve_chunk_subset_into @@ -371,8 +397,10 @@ impl CodecPipelineImpl { && chunk_subset.shape() == item.representation().shape_u64() { // See zarrs::array::Array::retrieve_chunk_into - if let Some(chunk_encoded) = - self.store.get(item.key()).map_py_err::()? + if let Some(chunk_encoded) = self + .store + .get(item.key()) + .map_py_err_from_str::()? { // Decode the encoded data into the output buffer let chunk_encoded: Vec = chunk_encoded.into(); @@ -401,7 +429,7 @@ impl CodecPipelineImpl { &codec_options, ) } - .map_codec_err() + .map_py_err() }; iter_concurrent_limit!( @@ -454,7 +482,7 @@ impl CodecPipelineImpl { &input_shape, item.item.representation().data_type(), ) - .map_codec_err()?; + .map_py_err()?; self.store_chunk_subset_bytes( &item, &self.codec_chain, diff --git a/src/map_py_err.rs b/src/map_py_err.rs new file mode 100644 index 0000000..c397d8e --- /dev/null +++ b/src/map_py_err.rs @@ -0,0 +1,36 @@ +use std::fmt::Display; + +use pyo3::{PyErr, PyResult, PyTypeInfo}; +use zarrs::array::codec::CodecError; + +pub(crate) trait PyErrStrExt { + fn map_py_err_from_str(self) -> PyResult; +} + +impl PyErrStrExt for Result { + fn map_py_err_from_str(self) -> PyResult { + self.map_err(|e| PyErr::new::(format!("{e}"))) + } +} + +pub(crate) trait PyErrExt { + fn map_py_err(self) -> PyResult; +} + +impl PyErrExt for Result { + fn map_py_err(self) -> PyResult { + // see https://docs.python.org/3/library/exceptions.html#exception-hierarchy + self.map_err(|e| match e { + // requested indexing operation doesn’t match shape + CodecError::IncompatibleIndexer(_) + | CodecError::IncompatibleDimensionalityError(_) + | CodecError::InvalidByteRangeError(_) => { + PyErr::new::(format!("{e}")) + } + // some pipe, file, or subprocess failed + CodecError::IOError(_) => PyErr::new::(format!("{e}")), + // all the rest: some unknown runtime problem + e => PyErr::new::(format!("{e}")), + }) + } +} diff --git a/src/store.rs b/src/store.rs index 9fa1c92..1adf90d 100644 --- a/src/store.rs +++ b/src/store.rs @@ -10,7 +10,8 @@ use zarrs::storage::{ storage_adapter::async_to_sync::AsyncToSyncStorageAdapter, ReadableWritableListableStorage, }; -use crate::{runtime::tokio_block_on, utils::PyErrExt}; +use crate::map_py_err::PyErrStrExt as _; +use crate::runtime::tokio_block_on; mod filesystem; mod http; @@ -79,7 +80,7 @@ fn opendal_builder_to_sync_store( builder: B, ) -> PyResult { let operator = opendal::Operator::new(builder) - .map_py_err::()? + .map_py_err_from_str::()? .finish(); let store = Arc::new(zarrs_opendal::AsyncOpendalStore::new(operator)); let store = Arc::new(AsyncToSyncStorageAdapter::new(store, tokio_block_on())); diff --git a/src/store/filesystem.rs b/src/store/filesystem.rs index e5e8091..21927ea 100644 --- a/src/store/filesystem.rs +++ b/src/store/filesystem.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use pyo3::{exceptions::PyRuntimeError, PyErr}; use zarrs::{filesystem::FilesystemStore, storage::ReadableWritableListableStorage}; -use crate::utils::PyErrExt; +use crate::map_py_err::PyErrStrExt as _; #[derive(Debug, Clone)] pub struct FilesystemStoreConfig { @@ -20,8 +20,9 @@ impl TryInto for &FilesystemStoreConfig { type Error = PyErr; fn try_into(self) -> Result { - let store = - Arc::new(FilesystemStore::new(self.root.clone()).map_py_err::()?); + let store = Arc::new( + FilesystemStore::new(self.root.clone()).map_py_err_from_str::()?, + ); Ok(store) } } diff --git a/src/utils.rs b/src/utils.rs index eda2aa0..e8d052f 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,43 +1,8 @@ -use std::fmt::Display; - use numpy::{PyUntypedArray, PyUntypedArrayMethods}; -use pyo3::{Bound, PyErr, PyResult, PyTypeInfo}; -use zarrs::array::codec::CodecError; +use pyo3::{Bound, PyResult}; use crate::{ChunksItem, WithSubset}; -pub(crate) trait PyErrExt { - fn map_py_err(self) -> PyResult; -} - -impl PyErrExt for Result { - fn map_py_err(self) -> PyResult { - self.map_err(|e| PyErr::new::(format!("{e}"))) - } -} - -pub(crate) trait PyCodecErrExt { - fn map_codec_err(self) -> PyResult; -} - -impl PyCodecErrExt for Result { - fn map_codec_err(self) -> PyResult { - // see https://docs.python.org/3/library/exceptions.html#exception-hierarchy - self.map_err(|e| match e { - // requested indexing operation doesn’t match shape - CodecError::IncompatibleIndexer(_) - | CodecError::IncompatibleDimensionalityError(_) - | CodecError::InvalidByteRangeError(_) => { - PyErr::new::(format!("{e}")) - } - // some pipe, file, or subprocess failed - CodecError::IOError(_) => PyErr::new::(format!("{e}")), - // all the rest: some unknown runtime problem - e => PyErr::new::(format!("{e}")), - }) - } -} - pub(crate) trait PyUntypedArrayExt { fn shape_zarr(&self) -> PyResult>; }