Skip to content

Commit

Permalink
Merge pull request #132 from josephnowak/feature/improve-the-reindex-…
Browse files Browse the repository at this point in the history
…with-pad

Adding pyproject.toml and using ruff to format the code
  • Loading branch information
josephnowak authored Oct 22, 2024
2 parents b6aec77 + c70babd commit 7065b4b
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 73 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

[project]
name = "tensordb"
version = "0.32.1"
version = "0.32.2"
description = "Database based in a file system storage combined with Xarray and Zarr"
keywords = ["Database Files Xarray Handler Zarr Store Read Write Append Update Upsert Backup Delete S3"]
readme = "README.md"
Expand Down
121 changes: 52 additions & 69 deletions src/tensordb/algorithms.py
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,7 @@ def reindex_with_pad(
coords,
preferred_chunks: dict[str, int],
fill_value: Any,
method: str = None,
apply_chunk: bool = True,
) -> xr.Dataset | xr.DataArray:
"""
Expand Down Expand Up @@ -856,6 +857,8 @@ def reindex_with_pad(
to be preserved unless the apply_chunk is set to True (default)
:param fill_value:
Used in the reindex method
:param method
Used in the reindex method
:param apply_chunk, default True
If true then the chunk method is going to be applied after calling the reindex
method of Xarray (which is called after padding the data).
Expand All @@ -865,80 +868,60 @@ def reindex_with_pad(
:return:
A chunked reindexed data
"""
data = data.copy()
coords = {k: np.array(v) for k, v in coords.items()}

pad_width = {
dim: (0, preferred_chunks[dim] - size)
for dim, size in data.sizes.items()
# Only add artificial data if the chunk is bigger than the size of the data
if preferred_chunks.get(dim, 0) > size
}
# If it is not necessary to pad additional data then just use
# reindex directly
if (
len(pad_width) == 0
or any(len(coord) == 0 for coord in coords)
or any(v == 0 for v in data.sizes.values())
):
data = data.reindex(coords, fill_value=fill_value)
if method == "bfill":
raise NotImplementedError(
"The bfill method is not implemented for the moment"
)

reindex_data = data.copy()
reindex_mapped_pad_coords = {}
reindex_mapped_coords = {}
inv_autoincrement_map = {}

for dim, coord in coords.items():
coord = np.array(coord)
pad_size = preferred_chunks.get(dim, 0) - data.sizes[dim]
reindex_mapped_coords[dim] = coord
if pad_size <= 0:
continue
data_coord = data.coords[dim].to_numpy()
total_coord = np.union1d(np.array(coord), data_coord)

auto_map = {c: i for i, c in enumerate(total_coord)}
mapped_data_coord = np.array([auto_map[c] for c in data_coord])
reindex_mapped_coords[dim] = np.array([auto_map[c] for c in coord])
reindex_mapped_pad_coords[dim] = np.concatenate(
(
mapped_data_coord,
list(range(len(total_coord), len(total_coord) + pad_size)),
),
)

reindex_data.coords[dim] = mapped_data_coord
inv_autoincrement_map[dim] = {v: k for k, v in auto_map.items()}

if not reindex_mapped_pad_coords:
data = data.reindex(coords, fill_value=fill_value, method=method)
if apply_chunk:
data = data.chunk(preferred_chunks)
return data

# Create an autoincrement index per dim using ints to be able to map the coords
autoincrement_map = {}
for dim, coord in coords.items():
autoincrement_map[dim] = {}
extra_coord = np.array(data.coords[dim][~data.coords[dim].isin(coord)])
for v in np.concatenate((coord, extra_coord)):
if v not in autoincrement_map[dim]:
autoincrement_map[dim][v] = len(autoincrement_map[dim])

inv_autoincrement_map = {
dim: {v: k for k, v in map_coord.items()}
for dim, map_coord in autoincrement_map.items()
}

# Add artificial data at the end of every dim if necessary to complete
# the chunk size.
padded_data = data.pad(
pad_width,
mode="edge",
reindex_data = reindex_data.reindex(reindex_mapped_pad_coords, method="ffill")
reindex_data = reindex_data.chunk(
{dim: -1 for dim in reindex_mapped_pad_coords.keys()}
)
# Create a single chunk on the dims that has artificial data
padded_data = padded_data.chunk({dim: -1 for dim in pad_width.keys()})

# Using the autoincrement index we can assign a unique coord value
# to the artificial data created by the pad method
for dim in padded_data.dims:
mapped_coord = [autoincrement_map[dim][v] for v in data.coords[dim].values]
# Unique elements for the artificial data
mapped_coord += [
len(autoincrement_map[dim]) + i
for i, v in enumerate(
padded_data.coords[dim][len(mapped_coord) :].values
)
]
padded_data.coords[dim] = mapped_coord

# Reindex the padded data using the mapped version of the coords
# this is going to drop all the artificial data automatically
# and should also generate a better chunking
padded_reindex = padded_data.reindex(
{
dim: [autoincrement_map[dim][v] for v in coord]
for dim, coord in coords.items()
},
fill_value=fill_value,
reindex_data = reindex_data.reindex(
reindex_mapped_coords, method=method, fill_value=fill_value
)

if apply_chunk:
padded_reindex = padded_reindex.chunk(preferred_chunks)
for dim in reindex_mapped_pad_coords.keys():
reindex_data.coords[dim] = np.array(
[
inv_autoincrement_map[dim][c]
for c in reindex_data.coords[dim].to_numpy()
]
)

# Revert the mapping
for dim in padded_reindex.dims:
padded_reindex.coords[dim] = [
inv_autoincrement_map[dim][v] for v in padded_reindex.coords[dim].values
]
return padded_reindex
if apply_chunk:
reindex_data = reindex_data.chunk(preferred_chunks)
return reindex_data
15 changes: 12 additions & 3 deletions tests/test_algorithms.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,10 @@ def test_rolling_overlap(window, apply_ffill):
assert expected.equals(rolling_arr)


@pytest.mark.parametrize(
"method",
["ffill", None],
)
@pytest.mark.parametrize(
"slices",
[{"a": [0, 3, 4], "b": [1, 3]}, {}, {"a": [0, 1], "b": [0]}],
Expand All @@ -547,9 +551,9 @@ def test_rolling_overlap(window, apply_ffill):
)
@pytest.mark.parametrize(
"chunks",
[{"a": 3, "b": 2}, {"a": 2, "b": 3}, {"a": 1, "b": 1}, {"a": 2}],
[{"a": 3, "b": 2}, {"a": 2, "b": 3}, {"a": 1, "b": 1}, {"a": 2}, {}],
)
def test_reindex_with_pad(slices, coords, chunks):
def test_reindex_with_pad(method, slices, coords, chunks):
arr = xr.DataArray(
np.arange(5 * 7).reshape((5, 7)).astype(float),
dims=["a", "b"],
Expand All @@ -558,15 +562,20 @@ def test_reindex_with_pad(slices, coords, chunks):
"b": list(range(7)),
},
).chunk(chunks)
if coords == {}:
coords = arr.coords
arr = arr.isel(a=slice(0, 0), b=slice(0, 0))
arr = arr.isel(**slices)
expected = arr.reindex(coords, fill_value=-1.0)
expected = arr.reindex(coords, fill_value=-1.0, method=method)

result = Algorithms.reindex_with_pad(
data=arr,
coords=coords,
fill_value=-1.0,
preferred_chunks=chunks,
method=method,
)

assert result.chunksizes == expected.chunk(chunks).chunksizes
assert result.equals(expected)

Expand Down

0 comments on commit 7065b4b

Please sign in to comment.