Skip to content

Commit

Permalink
(feat): balance chunk/codec concurrency internally and add configurat…
Browse files Browse the repository at this point in the history
…ion/concurrency docs (#42)

* (feat): balance chunk/codec concurrency internally

* (docs): add configuration and concurrency sections

* (fix): address config/concurrency comments

* (fix): move `concurrency_chunks_and_codec` into trait

* (fix): change use of "async.concurrency" to `chunk_concurrent_maximum`

* (fix): typo
  • Loading branch information
LDeakin authored Nov 18, 2024
1 parent a463fdf commit 3db6d02
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 39 deletions.
51 changes: 50 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,56 @@ You can then use your `zarr` as normal (with some caveats)!

We export a `ZarrsCodecPipeline` class so that `zarr-python` can use the class but it is not meant to be instantiated and we do not guarantee the stability of its API beyond what is required so that `zarr-python` can use it. Therefore, it is not documented here. We also export two errors, `DiscontiguousArrayError` and `CollapsedDimensionError` that can be thrown in the process of converting to indexers that `zarrs` can understand (see below for more details).

There are two ways to control the concurrency of the i/o **TODO: Need to clarify this**
### Configuration

`ZarrsCodecPipeline` options are exposed through `zarr.config`.

Standard `zarr.config` options control some functionality (see the defaults in the [config.py](https://github.com/zarr-developers/zarr-python/blob/main/src/zarr/core/config.py) of `zarr-python`):
- `threading.num_workers`: the maximum number of threads used internally by the `ZarrsCodecPipeline` on the Rust side.
- Defaults to the number of threads in the global `rayon` thread pool if set to `None`, which is [typically the number of logical CPUs](https://docs.rs/rayon/latest/rayon/struct.ThreadPoolBuilder.html#method.num_threads).
- `array.write_empty_chunks`: whether or not to store empty chunks.
- Defaults to false if `None`. Note that checking for emptiness has some overhead, see [here](https://docs.rs/zarrs/latest/zarrs/config/struct.Config.html#store-empty-chunks) for more info.
- This option name is proposed in [zarr-python #2429](https://github.com/zarr-developers/zarr-python/pull/2429)

The `ZarrsCodecPipeline` specific options are:
- `codec_pipeline.chunk_concurrent_maximum`: the maximum number of chunks stored/retrieved concurrently.
- Defaults to the number of logical CPUs if `None`. It is constrained by `threading.num_workers` as well.
- `codec_pipeline.chunk_concurrent_minimum`: the minimum number of chunks retrieved/stored concurrently when balancing chunk/codec concurrency.
- Defaults to 4 if `None`. See [here](https://docs.rs/zarrs/latest/zarrs/config/struct.Config.html#chunk-concurrent-minimum) for more info
- `codec_pipeline.validate_checksums`: enable checksum validation (e.g. with the CRC32C codec).
- Defaults to true if `None`. See [here](https://docs.rs/zarrs/latest/zarrs/config/struct.Config.html#validate-checksums) for more info.

For example:
```python
zarr.config.set({
"threading.num_workers": None,
"array.write_empty_chunks": False,
"codec_pipeline": {
"path": "zarrs.ZarrsCodecPipeline",
"validate_checksums": True,
"store_empty_chunks": False,
"chunk_concurrent_maximum": None,
"chunk_concurrent_minimum": 4,
}
})
```

## Concurrency

Concurrency can be classified into two types:
- chunk (outer) concurrency: the number of chunks retrieved/stored concurrently.
- This is chosen automatically based on various factors, such as the chunk size and codecs.
- It is constrained between `codec_pipeline.chunk_concurrent_minimum` and `async.concurrency` for operations involving multiple chunks.
- codec (inner) concurrency: the number of threads encoding/decoding a chunk.
- This is chosen automatically in combination with the chunk concurrency.

The product of the chunk and codec concurrency will approximately match `threading.num_workers`.

Chunk concurrency is typically favored because:
- parallel encoding/decoding can have a high overhead with some codecs, especially with small chunks, and
- it is advantageous to retrieve/store multiple chunks concurrently, especially with high latency stores.

`zarrs-python` will often favor codec concurrency with sharded arrays, as they are well suited to codec concurrency.

## Supported Indexing Methods

Expand Down
32 changes: 17 additions & 15 deletions python/zarrs/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
from .utils import (
CollapsedDimensionError,
DiscontiguousArrayError,
get_max_threads,
make_chunk_info_for_rust,
make_chunk_info_for_rust_with_indices,
)
Expand All @@ -45,13 +44,26 @@ def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self:
def from_codecs(cls, codecs: Iterable[Codec]) -> Self:
codec_metadata = [codec.to_dict() for codec in codecs]
codec_metadata_json = json.dumps(codec_metadata)
# TODO: upstream zarr-python has not settled on how to deal with configs yet
# Should they be checked when an array is created, or when an operation is performed?
# https://github.com/zarr-developers/zarr-python/issues/2409
# https://github.com/zarr-developers/zarr-python/pull/2429
return cls(
codecs=tuple(codecs),
impl=CodecPipelineImpl(
codec_metadata_json,
config.get("codec_pipeline.validate_checksums", None),
config.get("codec_pipeline.store_empty_chunks", None),
config.get("codec_pipeline.concurrent_target", None),
validate_checksums=config.get(
"codec_pipeline.validate_checksums", None
),
# TODO: upstream zarr-python array.write_empty_chunks is not merged yet #2429
store_empty_chunks=config.get("array.write_empty_chunks", None),
chunk_concurrent_minimum=config.get(
"codec_pipeline.chunk_concurrent_minimum", None
),
chunk_concurrent_maximum=config.get(
"codec_pipeline.chunk_concurrent_maximum", None
),
num_threads=config.get("threading.max_workers", None),
),
)

Expand Down Expand Up @@ -94,9 +106,6 @@ async def read(
out: NDBuffer,
drop_axes: tuple[int, ...] = (), # FIXME: unused
) -> None:
chunk_concurrent_limit = (
config.get("threading.max_workers") or get_max_threads()
)
out = out.as_ndarray_like() # FIXME: Error if array is not in host memory
if not out.dtype.isnative:
raise RuntimeError("Non-native byte order not supported")
Expand All @@ -111,12 +120,9 @@ async def read(
self.impl.retrieve_chunks_and_apply_index,
chunks_desc,
out,
chunk_concurrent_limit,
)
return None
chunks = await asyncio.to_thread(
self.impl.retrieve_chunks, chunks_desc, chunk_concurrent_limit
)
chunks = await asyncio.to_thread(self.impl.retrieve_chunks, chunks_desc)
for chunk, chunk_info in zip(chunks, batch_info):
out_selection = chunk_info[3]
selection = chunk_info[2]
Expand All @@ -135,9 +141,6 @@ async def write(
value: NDBuffer,
drop_axes: tuple[int, ...] = (),
) -> None:
chunk_concurrent_limit = (
config.get("threading.max_workers") or get_max_threads()
)
value = value.as_ndarray_like() # FIXME: Error if array is not in host memory
if not value.dtype.isnative:
value = np.ascontiguousarray(value, dtype=value.dtype.newbyteorder("="))
Expand All @@ -148,6 +151,5 @@ async def write(
self.impl.store_chunks_with_indices,
chunks_desc,
value,
chunk_concurrent_limit,
)
return None
51 changes: 51 additions & 0 deletions src/concurrency.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use pyo3::{exceptions::PyRuntimeError, PyErr, PyResult};
use zarrs::array::{
codec::CodecOptions, concurrency::calc_concurrency_outer_inner, ArrayCodecTraits,
RecommendedConcurrency,
};

use crate::{chunk_item::ChunksItem, CodecPipelineImpl};

pub trait ChunkConcurrentLimitAndCodecOptions {
fn get_chunk_concurrent_limit_and_codec_options(
&self,
codec_pipeline_impl: &CodecPipelineImpl,
) -> PyResult<Option<(usize, CodecOptions)>>;
}

impl<T> ChunkConcurrentLimitAndCodecOptions for Vec<T>
where
T: ChunksItem,
{
fn get_chunk_concurrent_limit_and_codec_options(
&self,
codec_pipeline_impl: &CodecPipelineImpl,
) -> PyResult<Option<(usize, CodecOptions)>> {
let num_chunks = self.len();
let Some(chunk_descriptions0) = self.first() else {
return Ok(None);
};
let chunk_representation = chunk_descriptions0.representation();

let codec_concurrency = codec_pipeline_impl
.codec_chain
.recommended_concurrency(chunk_representation)
.map_err(|err| PyErr::new::<PyRuntimeError, _>(err.to_string()))?;

let min_concurrent_chunks =
std::cmp::min(codec_pipeline_impl.chunk_concurrent_minimum, num_chunks);
let max_concurrent_chunks =
std::cmp::max(codec_pipeline_impl.chunk_concurrent_maximum, num_chunks);
let (chunk_concurrent_limit, codec_concurrent_limit) = calc_concurrency_outer_inner(
codec_pipeline_impl.num_threads,
&RecommendedConcurrency::new(min_concurrent_chunks..max_concurrent_chunks),
&codec_concurrency,
);
let codec_options = codec_pipeline_impl
.codec_options
.into_builder()
.concurrent_target(codec_concurrent_limit)
.build();
Ok(Some((chunk_concurrent_limit, codec_options)))
}
}
78 changes: 55 additions & 23 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#![warn(clippy::pedantic)]

use chunk_item::{ChunksItem, IntoItem};
use concurrency::ChunkConcurrentLimitAndCodecOptions;
use numpy::npyffi::PyArrayObject;
use numpy::{IntoPyArray, PyArray1, PyUntypedArray, PyUntypedArrayMethods};
use pyo3::exceptions::{PyRuntimeError, PyTypeError, PyValueError};
Expand All @@ -22,6 +23,7 @@ use zarrs::storage::{ReadableWritableListableStorageTraits, StorageHandle, Store

mod chunk_item;
mod codec_pipeline_store_filesystem;
mod concurrency;
#[cfg(test)]
mod tests;
mod utils;
Expand All @@ -37,9 +39,12 @@ trait CodecPipelineStore: Send + Sync {
// TODO: Use a OnceLock for store with get_or_try_init when stabilised?
#[pyclass]
pub struct CodecPipelineImpl {
codec_chain: Arc<CodecChain>,
store: Mutex<Option<Arc<dyn CodecPipelineStore>>>,
codec_options: CodecOptions,
pub(crate) codec_chain: Arc<CodecChain>,
pub(crate) store: Mutex<Option<Arc<dyn CodecPipelineStore>>>,
pub(crate) codec_options: CodecOptions,
pub(crate) chunk_concurrent_minimum: usize,
pub(crate) chunk_concurrent_maximum: usize,
pub(crate) num_threads: usize,
}

impl CodecPipelineImpl {
Expand Down Expand Up @@ -231,13 +236,22 @@ impl CodecPipelineImpl {

#[pymethods]
impl CodecPipelineImpl {
#[pyo3(signature = (metadata, validate_checksums=None, store_empty_chunks=None, concurrent_target=None))]
#[pyo3(signature = (
metadata,
validate_checksums=None,
store_empty_chunks=None,
chunk_concurrent_minimum=None,
chunk_concurrent_maximum=None,
num_threads=None,
))]
#[new]
fn new(
metadata: &str,
validate_checksums: Option<bool>,
store_empty_chunks: Option<bool>,
concurrent_target: Option<usize>,
chunk_concurrent_minimum: Option<usize>,
chunk_concurrent_maximum: Option<usize>,
num_threads: Option<usize>,
) -> PyResult<Self> {
let metadata: Vec<MetadataV3> =
serde_json::from_str(metadata).map_py_err::<PyTypeError>()?;
Expand All @@ -250,15 +264,21 @@ impl CodecPipelineImpl {
if let Some(store_empty_chunks) = store_empty_chunks {
codec_options = codec_options.store_empty_chunks(store_empty_chunks);
}
if let Some(concurrent_target) = concurrent_target {
codec_options = codec_options.concurrent_target(concurrent_target);
}
let codec_options = codec_options.build();

let chunk_concurrent_minimum = chunk_concurrent_minimum
.unwrap_or(zarrs::config::global_config().chunk_concurrent_minimum());
let chunk_concurrent_maximum =
chunk_concurrent_maximum.unwrap_or(rayon::current_num_threads());
let num_threads = num_threads.unwrap_or(rayon::current_num_threads());

Ok(Self {
codec_chain,
store: Mutex::new(None),
codec_options,
chunk_concurrent_minimum,
chunk_concurrent_maximum,
num_threads,
})
}

Expand All @@ -267,7 +287,6 @@ impl CodecPipelineImpl {
py: Python,
chunk_descriptions: Vec<chunk_item::RawWithIndices>, // FIXME: Ref / iterable?
value: &Bound<'_, PyUntypedArray>,
chunk_concurrent_limit: usize,
) -> PyResult<()> {
// Get input array
if !value.is_c_contiguous() {
Expand All @@ -280,9 +299,14 @@ impl CodecPipelineImpl {
let chunk_descriptions =
self.collect_chunk_descriptions(chunk_descriptions, &output_shape)?;

py.allow_threads(move || {
let codec_options = &self.codec_options;
// Adjust the concurrency based on the codec chain and the first chunk description
let Some((chunk_concurrent_limit, codec_options)) =
chunk_descriptions.get_chunk_concurrent_limit_and_codec_options(self)?
else {
return Ok(());
};

py.allow_threads(move || {
let update_chunk_subset = |item: chunk_item::WithSubset| {
// See zarrs::array::Array::retrieve_chunk_subset_into
if item.chunk_subset.start().iter().all(|&o| o == 0)
Expand All @@ -303,7 +327,7 @@ impl CodecPipelineImpl {
&output,
&output_shape,
&item.subset,
codec_options,
&codec_options,
)
}
} else {
Expand Down Expand Up @@ -334,7 +358,7 @@ impl CodecPipelineImpl {
let partial_decoder = self
.codec_chain
.clone()
.partial_decoder(input_handle, item.representation(), codec_options)
.partial_decoder(input_handle, item.representation(), &codec_options)
.map_py_err::<PyValueError>()?;
unsafe {
// SAFETY:
Expand All @@ -346,7 +370,7 @@ impl CodecPipelineImpl {
&output,
&output_shape,
&item.subset,
codec_options,
&codec_options,
)
}
}
Expand All @@ -368,13 +392,17 @@ impl CodecPipelineImpl {
&self,
py: Python<'py>,
chunk_descriptions: Vec<chunk_item::Raw>, // FIXME: Ref / iterable?
chunk_concurrent_limit: usize,
) -> PyResult<Vec<Bound<'py, PyArray1<u8>>>> {
let chunk_descriptions = self.collect_chunk_descriptions(chunk_descriptions, ())?;

let chunk_bytes = py.allow_threads(move || {
let codec_options = &self.codec_options;
// Adjust the concurrency based on the codec chain and the first chunk description
let Some((chunk_concurrent_limit, codec_options)) =
chunk_descriptions.get_chunk_concurrent_limit_and_codec_options(self)?
else {
return Ok(vec![]);
};

let chunk_bytes = py.allow_threads(move || {
let get_chunk_subset = |item: chunk_item::Basic| {
let chunk_encoded = item.get().map_py_err::<PyRuntimeError>()?;
Ok(if let Some(chunk_encoded) = chunk_encoded {
Expand All @@ -383,7 +411,7 @@ impl CodecPipelineImpl {
.decode(
Cow::Owned(chunk_encoded),
item.representation(),
codec_options,
&codec_options,
)
.map_py_err::<PyRuntimeError>()?
} else {
Expand Down Expand Up @@ -416,7 +444,6 @@ impl CodecPipelineImpl {
py: Python,
chunk_descriptions: Vec<chunk_item::RawWithIndices>,
value: &Bound<'_, PyUntypedArray>,
chunk_concurrent_limit: usize,
) -> PyResult<()> {
enum InputValue<'a> {
Array(ArrayBytes<'a>),
Expand All @@ -441,9 +468,14 @@ impl CodecPipelineImpl {
let chunk_descriptions =
self.collect_chunk_descriptions(chunk_descriptions, &input_shape)?;

py.allow_threads(move || {
let codec_options = &self.codec_options;
// Adjust the concurrency based on the codec chain and the first chunk description
let Some((chunk_concurrent_limit, codec_options)) =
chunk_descriptions.get_chunk_concurrent_limit_and_codec_options(self)?
else {
return Ok(());
};

py.allow_threads(move || {
let store_chunk = |item: chunk_item::WithSubset| match &input {
InputValue::Array(input) => {
let chunk_subset_bytes = input
Expand All @@ -458,7 +490,7 @@ impl CodecPipelineImpl {
&self.codec_chain,
chunk_subset_bytes,
&item.chunk_subset,
codec_options,
&codec_options,
)
}
InputValue::Constant(constant_value) => {
Expand All @@ -475,7 +507,7 @@ impl CodecPipelineImpl {
&self.codec_chain,
chunk_subset_bytes,
&item.chunk_subset,
codec_options,
&codec_options,
)
}
};
Expand Down

0 comments on commit 3db6d02

Please sign in to comment.