Skip to content

Commit

Permalink
(chore): great progress! need to figure out linearized indexing!
Browse files Browse the repository at this point in the history
  • Loading branch information
ilan-gold committed Aug 22, 2024
1 parent 558afd7 commit 3008894
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 5 deletions.
22 changes: 22 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ paste = "1.0"
dlpark = { version = "0.4.1", features = ["pyo3"] }
image = "0.25.0"
rayon_iter_concurrent_limit = "0.2.0"
rayon = "1.10.0"
rayon = "1.10.0"
numpy = "0.21"
96 changes: 93 additions & 3 deletions src/array.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use numpy::ndarray::{Array, ArrayBase, ArrayViewD};
use numpy::{PyArray, PyArray2, PyArrayDyn, PyArrayMethods};
use pyo3::exceptions::{PyIndexError, PyTypeError, PyValueError};
use pyo3::prelude::*;
use rayon_iter_concurrent_limit::iter_concurrent_limit;
Expand All @@ -12,14 +14,20 @@ use dlpark::prelude::*;
use std::ffi::c_void;
use rayon::iter::{IntoParallelIterator, ParallelBridge, ParallelIterator};
use rayon::prelude::*;
use crate::utils::update_bytes_flen;
use crate::utils::{cartesian_product, update_bytes_flen, update_bytes_flen_with_indexer};

struct Chunk<'a> {
index: &'a Vec<u64>,
selection: &'a ArraySubset,
out_selection: &'a ArraySubset,
}

struct NdArrayChunk<'a> {
index: &'a Vec<u64>,
selection: &'a Vec<Vec<i64>>,
out_selection: &'a ArraySubset,
}

#[pyclass]
pub struct ZarrsPythonArray {
pub arr: RustArray<dyn ReadableStorageTraits + 'static>
Expand Down Expand Up @@ -60,8 +68,11 @@ impl ZarrsPythonArray {
if let Ok(coord_downcast) = coord.downcast::<PyTuple>() {
coord_extracted = coord_downcast.extract()?;
return Ok(coord_extracted);
} else if let Ok(nd_array) = coord.downcast::<PyArray2<u64>>() {
let nd_array_extracted: Vec<u64> = nd_array.to_vec()?;
return Ok(nd_array_extracted);
} else {
return Err(PyValueError::new_err(format!("Cannot take {0}, must be int or slice", coord.to_string())));
return Err(PyValueError::new_err(format!("Cannot take {0}, must be int, ndarray, or slice", coord.to_string())));
}
}
return Err(PyTypeError::new_err(format!("Unsupported type: {0}", chunk_coord_and_selection)));
Expand Down Expand Up @@ -93,6 +104,53 @@ impl ZarrsPythonArray {
return Err(PyTypeError::new_err(format!("Unsupported type: {0}", chunk_coord_and_selection)));
}).collect::<PyResult<Vec<ArraySubset>>>()
}

fn extract_selection_to_vec_indices(&self, chunk_coords_and_selections: &Bound<'_, PyList>, index: usize) -> PyResult<Vec<Vec<Vec<i64>>>> {
chunk_coords_and_selections.into_iter().map(|chunk_coord_and_selection| {
if let Ok(chunk_coord_and_selection_tuple) = chunk_coord_and_selection.downcast::<PyTuple>() {
let selection = chunk_coord_and_selection_tuple.get_item(index)?;
if let Ok(tuple) = selection.downcast::<PyTuple>(){
let res = tuple.into_iter().map(|(val)| {
if let Ok(nd_array) = val.downcast::<PyArrayDyn<i64>>() {
let res = nd_array.to_vec()?;
Ok(res)
} else {
Err(PyTypeError::new_err(format!("Unsupported type: {0}", tuple)))
}
}).collect::<PyResult<Vec<Vec<i64>>>>()?;
return Ok(res);
} else {
return Err(PyTypeError::new_err(format!("Unsupported type: {0}", selection)));
}
}
return Err(PyTypeError::new_err(format!("Unsupported type: {0}", chunk_coord_and_selection)));
}).collect::<PyResult<Vec<Vec<Vec<i64>>>>>()
}

fn is_selection_numpy_array(&self, chunk_coords_and_selections: &Bound<'_, PyList>, index: usize) -> bool {
let results = chunk_coords_and_selections.into_iter().map(|chunk_coord_and_selection| {
if let Ok(chunk_coord_and_selection_tuple) = chunk_coord_and_selection.downcast::<PyTuple>() {
let selection = chunk_coord_and_selection_tuple.get_item(index);
if let Ok(selection_unwrapped) = selection {
if let Ok(tuple) = selection_unwrapped.downcast::<PyTuple>(){
let res: Vec<bool> = tuple.into_iter().map(|(val)| -> bool {
let nd_array = val.downcast::<PyArrayDyn<i64>>();
let res = match nd_array {
Ok(_) => true,
Err(_) => false
};
return res;
}).collect();
return res;
}
return vec![false];
}
return vec![false]
}
return vec![false];
}).flatten().collect::<Vec<bool>>();
results.iter().any(|x: &bool| *x )
}
}

#[pymethods]
Expand All @@ -105,7 +163,6 @@ impl ZarrsPythonArray {
let data_type_size = chunk_representation.data_type().size();
let out_shape_extracted = out_shape.into_iter().map(|x| x.extract::<u64>()).collect::<PyResult<Vec<u64>>>()?;
let coords_extracted = &self.extract_coords(chunk_coords_and_selection_list)?;
let selections_extracted = self.extract_selection_to_array_subset(chunk_coords_and_selections, 1)?;
let out_selections_extracted = &self.extract_selection_to_array_subset(chunk_coords_and_selections, 2)?;
let chunks = ArraySubset::new_with_shape(self.arr.chunk_grid_shape().unwrap());
let concurrent_target = std::thread::available_parallelism().unwrap().get();
Expand All @@ -126,6 +183,39 @@ impl ZarrsPythonArray {
let codec_options = CodecOptionsBuilder::new().concurrent_target(codec_concurrent_target).build();
let size_output = out_shape_extracted.iter().product::<u64>() as usize;
let mut output = Vec::with_capacity(size_output * data_type_size);

if self.is_selection_numpy_array(chunk_coords_and_selections, 1) {
let selections_extracted = self.extract_selection_to_vec_indices(chunk_coords_and_selections, 1)?;
let borrowed_selections = &selections_extracted;
println!("hereeees");
{
let output =
UnsafeCellSlice::new_from_vec_with_spare_capacity(&mut output);
let retrieve_chunk = |chunk: NdArrayChunk| {
println!("{:?} {:?}", cartesian_product(chunk.selection), out_shape_extracted);
let indices: Vec<u64> = cartesian_product(chunk.selection).iter().map(|x| x.iter().enumerate().fold(0, |acc, (ind, x)| {acc + (*x as u64) * out_shape_extracted[1..].iter().product::<u64>()})).collect();
let chunk_subset_bytes = self.arr.retrieve_chunk(&chunk.index).map_err(|x| PyErr::new::<PyTypeError, _>(x.to_string()))?;
update_bytes_flen_with_indexer(
unsafe { output.get() },
&out_shape_extracted,
&chunk_subset_bytes,
&chunk.out_selection,
&indices,
data_type_size,
);
Ok::<_, PyErr>(())
};
let zipped_iterator = coords_extracted.into_iter().zip(borrowed_selections.into_iter()).zip(out_selections_extracted.into_iter()).map(|((index, selection), out_selection)| NdArrayChunk { index, selection, out_selection });
iter_concurrent_limit!(
chunks_concurrent_limit,
zipped_iterator.collect::<Vec<NdArrayChunk>>(),
try_for_each,
retrieve_chunk
)?;
}
}
let selections_extracted = self.extract_selection_to_array_subset(chunk_coords_and_selections, 1)?;
let out_selections_extracted = &self.extract_selection_to_array_subset(chunk_coords_and_selections, 2)?;
let borrowed_selections = &selections_extracted;
{
let output =
Expand Down
44 changes: 43 additions & 1 deletion src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,34 @@ use zarrs::array_subset::ArraySubset;

pub fn err<T>(msg: String) -> Result<T, PyErr> { Err(PyErr::new::<PyTypeError, _>(msg)) }

pub fn update_bytes_flen_with_indexer(
output_bytes: &mut [u8],
output_shape: &[u64],
subset_bytes: &[u8],
subset: &ArraySubset,
indexer: &Vec<u64>,
data_type_size: usize,
) {

let contiguous_indices =
unsafe { subset.contiguous_linearised_indices_unchecked(output_shape) };
// TODO: Par iteration?
let mut indexer_index = 0;
println!("{:?}, {:?}", subset_bytes.len(), indexer);
for (array_subset_element_index, _num_elements) in contiguous_indices.iter() {
let mut output_offset = usize::try_from(array_subset_element_index).unwrap() * data_type_size;
for _num_elem in 0.._num_elements {
let decoded_offset = (indexer[indexer_index] as usize) * data_type_size;
debug_assert!((output_offset + data_type_size) <= output_bytes.len());
debug_assert!((decoded_offset + data_type_size) <= subset_bytes.len());
output_bytes[output_offset..output_offset + data_type_size]
.copy_from_slice(&subset_bytes[decoded_offset..decoded_offset + data_type_size]);
indexer_index += 1;
output_offset += data_type_size;
}
}
}

pub fn update_bytes_flen(
output_bytes: &mut [u8],
output_shape: &[u64],
Expand All @@ -18,7 +46,7 @@ pub fn update_bytes_flen(
debug_assert_eq!(
subset_bytes.len(),
subset.num_elements_usize() * data_type_size,
"Failed subset check: subset_bytes.len(): {:?}, subset.num_elements_usize(): {:?}, data_type_size: {:?}", output_bytes.len(), output_shape, data_type_size,
"Failed subset check: subset_bytes.len(): {:?}, subset.num_elements_usize(): {:?}, data_type_size: {:?}", subset, subset.num_elements_usize(), data_type_size,
);

let contiguous_indices =
Expand All @@ -34,4 +62,18 @@ pub fn update_bytes_flen(
.copy_from_slice(&subset_bytes[decoded_offset..decoded_offset + length]);
decoded_offset += length;
}
}

pub fn cartesian_product<T: Clone>(vecs: &Vec<Vec<T>>) -> Vec<Vec<T>> {
vecs.into_iter().fold(vec![vec![]], |acc, vec| {
acc.into_iter()
.flat_map(|prefix| {
vec.iter().map(move |elem| {
let mut new_prefix = prefix.clone();
new_prefix.push(elem.clone());
new_prefix
}).collect::<Vec<_>>()
})
.collect()
})
}

0 comments on commit 3008894

Please sign in to comment.