Skip to content

Commit

Permalink
[InMemoryDataset redesign] Add features to IndexChunkMapper
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Sep 22, 2024
1 parent efecb3e commit 82cc8fd
Show file tree
Hide file tree
Showing 3 changed files with 551 additions and 1 deletion.
16 changes: 16 additions & 0 deletions versioned_hdf5/subchunk_map.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,27 @@ cdef class IndexChunkMapper:
bint shift,
)

cpdef Py_ssize_t[:] chunk_indices_in_range(self, Py_ssize_t a, Py_ssize_t b)
cpdef object chunks_indexer(self)
cpdef object whole_chunks_indexer(self)

# Private methods. Cython complains if we don't export _everything_.
cdef tuple[Py_ssize_t, Py_ssize_t] _chunk_start_stop(
self, Py_ssize_t a, Py_ssize_t b
)
cdef tuple[object, object, object] chunk_submap_compat(self, Py_ssize_t chunk_idx)

cpdef tuple[object, object] zip_chunk_submap(
list[IndexChunkMapper] mappers,
Py_ssize_t[:] chunk_idx,
)

cpdef tuple[object, object] zip_slab_submap(
list[IndexChunkMapper] mappers,
Py_ssize_t[:] fromc_idx,
Py_ssize_t[:] toc_idx,
)

# Utility functions
cpdef Py_ssize_t ceil_a_over_b(Py_ssize_t a, Py_ssize_t b)
cpdef Py_ssize_t[:, :] cartesian_product(list[Py_ssize_t[:]] views)
325 changes: 325 additions & 0 deletions versioned_hdf5/subchunk_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,43 @@ def chunk_submap_compat(
out, sub = self.chunk_submap(chunk_idx, chunk_idx + 1, True)
return Slice(start, stop, 1), out, sub

@cython.ccall
def chunk_indices_in_range(self, a: Py_ssize_t, b: Py_ssize_t) -> Py_ssize_t[:]:
"""Return the subset of chunk_indices in [a, b[
Raises
------
ChunkMapIndexError
If there is no intersection
"""
start_idx: Py_ssize_t
stop_idx: Py_ssize_t
start_idx, stop_idx = np.searchsorted(self.chunk_indices, [a, b], side="left")
if stop_idx <= start_idx:
raise ChunkMapIndexError()
return self.chunk_indices[start_idx:stop_idx]

@cython.ccall
def chunks_indexer(self): # -> slice | NDArray[np.intp]:
"""Return a numpy basic or advanced index, to be applied along the matching axis
to an array with one point per chunk, that returns all chunks involved in the
selection without altering the shape of the array.
"""
return np.asarray(self.chunk_indices)

@cython.ccall
@abc.abstractmethod
def whole_chunks_indexer(self): # -> slice | NDArray[np.intp]:
"""Return a subset of chunks_indexer that selects only the chunks where all
points of the chunk are included in the selection.
e.g. if the index of this mapper is [True, True, False, False, True, False]
and self.chunk_size=2, then:
- self.chunks_indexer -> [0, 2]
- self.whole_chunks_indexer -> [0]
"""


@cython.cclass
class SliceMapper(IndexChunkMapper):
Expand Down Expand Up @@ -306,6 +343,59 @@ def _subindex_slice_chunk(

return slice(start, stop, step)

@cython.ccall
def chunks_indexer(self): # -> slice | NDArray[np.intp]:
indices = self.chunk_indices
n_chunks = len(indices)

if n_chunks == 0:
return slice(0, 0, 1)
elif self.step > self.chunk_size:
return np.asarray(indices)
else:
chunk_start = indices[0]
chunk_stop = indices[n_chunks - 1] + 1
return slice(int(chunk_start), int(chunk_stop), 1)

@cython.ccall
def whole_chunks_indexer(self): # -> slice | NDArray[np.intp]:
if self.chunk_size == 1:
# All chunks are wholly selected
return self.chunks_indexer()

indices = self.chunk_indices
n_chunks = len(indices)
if n_chunks == 0:
return slice(0, 0, 1)
last_idx = indices[n_chunks - 1]

if self.step > 1:
# All chunks up to and excluding the last one are partially selected
# The last chunk in chunk_indices is wholly selected only if it's the last
# chunk in the dataset and has size 1
if last_idx < self.dset_size // self.chunk_size:
# Not the last chunk, or last chunk but exactly divisible by chunk_size
return slice(0, 0, 1)
if self.dset_size % self.chunk_size > 1:
# Last chunk, but it contains more than one point
return slice(0, 0, 1)
# Last chunk, and it contains exactly one point
return slice(last_idx, last_idx + 1, 1)

# step==1. The first and last chunk may be partially selected;
# the rest are always wholly selected.
chunk_start = indices[0]
if self.start % self.chunk_size != 0:
# First chunk is partially selected
chunk_start += 1

chunk_stop = last_idx # excluded
if self.stop == self.dset_size or self.stop % self.chunk_size == 0:
# Last chunk is wholly selected
chunk_stop += 1

return slice(int(chunk_start), int(chunk_stop), 1)


@cython.cclass
class IntegerArrayMapper(IndexChunkMapper):
Expand Down Expand Up @@ -341,6 +431,32 @@ def chunk_submap(

return mask, sub

@cython.cfunc
def _chunk_sizes_in_chunk_indices(self): # -> NDArray[np.intp] | int:
"""Return the number of points taken from each chunk within self.chunk_indices.
All but the last chunk always contain self.chunk_size points.
"""
indices = self.chunk_indices
n_chunks = len(indices)
if n_chunks == 0:
return self.chunk_size

if indices[n_chunks - 1] < self.dset_size // self.chunk_size:
# Not the last chunk, or last chunk but exactly divisible by chunk_size
return self.chunk_size

out = np.full_like(indices, fill_value=self.chunk_size)
out[n_chunks - 1] = self.dset_size % self.chunk_size
return out

@cython.ccall
def whole_chunks_indexer(self):
# Don't double count when the same index is picked twice
idx_unique = np.unique(self.idx)
_, counts = np.unique(idx_unique // self.chunk_size, return_counts=True)
indices = np.asarray(self.chunk_indices)
return indices[counts == self._chunk_sizes_in_chunk_indices()]


@cython.cclass
class BooleanArrayMapper(IndexChunkMapper):
Expand Down Expand Up @@ -424,6 +540,24 @@ def chunk_submap(
sub,
)

@cython.cfunc
def _chunk_sizes_in_dset(self): # -> NDArray[np.intp] | int:
"""Return the number of points for each chunk within the whole dataset.
All but the last chunk always contain self.chunk_size points.
"""
partial = self.dset_size % self.chunk_size
if not partial:
return self.chunk_size
n_dset_chunks = ceil_a_over_b(self.dset_size, self.chunk_size)
out = np.full(n_dset_chunks, fill_value=self.chunk_size, dtype=np.intp)
out[n_dset_chunks - 1] = partial
return out

@cython.ccall
def whole_chunks_indexer(self):
counts = np.asarray(self._chunk_selected_counts)
return np.flatnonzero(counts == self._chunk_sizes_in_dset())


@cython.cclass
class IntegerMapper(IndexChunkMapper):
Expand Down Expand Up @@ -451,6 +585,28 @@ def chunk_submap(
start, _ = self._chunk_start_stop(chunk_start_idx, chunk_stop_idx)
return DROP_AXIS, self.idx - start if shift else self.idx

@cython.ccall
def chunks_indexer(self):
# a[i] would change the shape
# a[[i]] (the default without overriding this method) would return a copy
# a[i:i+1] returns a view, which is faster
i = self.chunk_indices[0]
return slice(i, i + 1, 1)

@cython.ccall
def whole_chunks_indexer(self):
if self.chunk_size == 1:
return self.chunks_indexer()

# If the index is in the last chunk and the last chunk is size 1, then it's
# wholly selected. In any other case, it's partially selected.
partial = self.dset_size % self.chunk_size
if partial != 1:
return slice(0, 0, 1)
if self.chunk_indices[0] != self.dset_size // self.chunk_size:
return slice(0, 0, 1)
return self.chunks_indexer()


@cython.cclass
class EverythingMapper(IndexChunkMapper):
Expand All @@ -472,6 +628,14 @@ def chunk_submap(
sub = slice(0, stop - start, 1) if shift else out
return out, sub

@cython.ccall
def chunks_indexer(self):
return slice(None)

@cython.ccall
def whole_chunks_indexer(self):
return slice(None)


def index_chunk_mappers(
idx: Any,
Expand Down Expand Up @@ -609,3 +773,164 @@ def as_subchunk_map(
chunk_subidxs = chunk_subidxs[:idx_len]

yield Tuple(*chunk_slices), arr_subidxs, chunk_subidxs


@cython.ccall
def zip_chunk_submap(
mappers: list[IndexChunkMapper], chunk_idx: Py_ssize_t[:]
) -> tuple[AnySlicerND, AnySlicerND]:
"""Call IndexChunkMapper.chunk_submap() along every axis
and return the resulting indices to slice a single chunk
Parameters
----------
mappers:
Output of index_chunk_mappers()
chunk_idx:
Chunk index
Returns
-------
out
The n-dimensional numpy index selecting the points within the return value of
__getitem__ or within the value parameter of __setitem__
sub
The n-dimensional numpy index selecting the points within the chunk,
with all addresses shifted to the start of the chunk.
In other words::
chunk = arr[tuple(slice(i*c, (i+1)*c) for i, c in zip(chunk_idx, chunk_size)]
return_value[out] = chunk[sub] # __getitem__
chunk[sub] = value_param[out] # __setitem__
Raises
------
ChunkMapIndexError
If the chunk index is not among those selected by IndexChunkMapper.chunk_indices
See Also
--------
zip_slab_submap
IndexChunkMapper.chunk_submap
Examples
--------
>>> _, mappers = index_chunk_mappers(
... (slice(15, 35, 2), 19), chunk_size=(10, 10), shape=(100, 100)
... )
>>> zip_chunk_submap(mappers, np.asarray((3, 1), dtype=np.intp))
((slice(8, 10, 1),), (slice(1, 4, 2), 9))
"""
out_idx = []
sub_idx = []

for i in range(len(chunk_idx)):
mapper: IndexChunkMapper = mappers[i]
a = chunk_idx[i]
out, sub = mapper.chunk_submap(a, a + 1, shift=True)

# skip axes which were sliced by a scalar
if out is not DROP_AXIS:
out_idx.append(out)
sub_idx.append(sub)

return tuple(out_idx), tuple(sub_idx)


@cython.ccall
def zip_slab_submap(
mappers: list[IndexChunkMapper],
fromc_idx: Py_ssize_t[:],
toc_idx: Py_ssize_t[:],
) -> tuple[AnySlicerND, AnySlicerND]:
"""Call IndexChunkMapper.chunk_submap() along every axis
and return the resulting indices to slice the whole array
within the limits of a hyperrectangular selection of chunks.
Parameters
----------
mappers:
Output of index_chunk_mappers()
fromc_idx:
Top-left corner, included, of the hyperrectangle of chunks
toc_idx:
Bottom-right corner, excluded, of the hyperrectangle of chunks to process.
Returns
-------
out
The n-dimensional numpy index selecting the points within the return value of
__getitem__ or within the value parameter of __setitem__
sub
The n-dimensional numpy index selecting the input points within the whole array,
relative to the beginning of the entire array itself (not the selection).
In other words::
return_value[sub] = arr[out] # __getitem__
arr[out] = value_param[sub] # __setitem__
Raises
------
ChunkMapIndexError
If there is no intersection between the index stored in the mappers and the
chunks in the range [fromc_idx, toc_idx[
See Also
--------
zip_chunk_submap
IndexChunkMapper.chunk_submap
Examples
--------
>>> _, mappers = index_chunk_mappers(
... (slice(15, 35, 2), 19), chunk_size=(10, 10), shape=(100, 100)
... )
>>> zip_slab_submap(mappers, np.asarray((2, 1), np.intp), np.asarray((4, 4), np.intp))
((slice(3, 10, 1),), (slice(21, 34, 2), 19))
Notes
-----
This is a separate function from zip_chunk_submap, despite the two having very
similar code, because Cython does not optimize well input parameters that can have
multiple types (such as toc_idx: Py_ssize_t[:] | None).
"""
out_idx = []
sub_idx = []

for i in range(len(fromc_idx)):
mapper: IndexChunkMapper = mappers[i]
a = fromc_idx[i]
b = toc_idx[i]
# Raises ChunkMapIndexError if there is no intersection
out, sub = mapper.chunk_submap(a, b, shift=False)

# skip axes which were sliced by a scalar
if out is not DROP_AXIS:
out_idx.append(out)
sub_idx.append(sub)

return tuple(out_idx), tuple(sub_idx)


@cython.ccall
def cartesian_product(views: list[Py_ssize_t[:]]) -> Py_ssize_t[:, :]:
"""Cartesian product of 1D views of indices
Same as np.array(list(itertools.product(*arrays)))
Adapted from https://stackoverflow.com/questions/11144513/cartesian-product-of-x-and-y-array-points-into-single-array-of-2d-points
>>> np.asarray(cartesian_product([np.array([1, 2]), np.array([3, 4])]))
array([[1, 3],
[1, 4],
[2, 3],
[2, 4]])
"""
arrays = [np.asarray(v) for v in views]
la = len(arrays)
if not la:
return np.empty((0, 0), dtype=np.intp)
arr = np.empty([len(a) for a in arrays] + [la], dtype=np.intp)
for i, a in enumerate(np.ix_(*arrays)):
arr[..., i] = a
return arr.reshape(-1, la)
Loading

0 comments on commit 82cc8fd

Please sign in to comment.