Skip to content

Commit

Permalink
Add optional rows parameter to unpack_ragged and apply_ragged (C…
Browse files Browse the repository at this point in the history
…loud-Drift#272)

* Add optional rows parameter to unpack_ragged and apply_ragged

* Fix conflict resolve from rowsize_to_index
  • Loading branch information
milancurcic authored and Philippe Miron committed Nov 16, 2023
1 parent 465f26c commit 5fda844
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 10 deletions.
43 changes: 40 additions & 3 deletions clouddrift/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def apply_ragged(
arrays: list[np.ndarray],
rowsize: list[int],
*args: tuple,
rows: Union[int, Iterable[int]] = None,
executor: futures.Executor = futures.ThreadPoolExecutor(max_workers=None),
**kwargs: dict,
) -> Union[tuple[np.ndarray], np.ndarray]:
Expand Down Expand Up @@ -45,6 +46,9 @@ def apply_ragged(
List of integers specifying the number of data points in each row.
*args : tuple
Additional arguments to pass to ``func``.
rows : int or Iterable[int], optional
The row(s) of the ragged array to apply ``func`` to. If ``rows`` is
``None`` (default), then ``func`` will be applied to all rows.
executor : concurrent.futures.Executor, optional
Executor to use for concurrent execution. Default is ``ThreadPoolExecutor``
with the default number of ``max_workers``.
Expand All @@ -60,6 +64,27 @@ def apply_ragged(
Examples
--------
Using ``velocity_from_position`` with ``apply_ragged``, calculate the velocities of
multiple particles, the coordinates of which are found in the ragged arrays x, y, and t
that share row sizes 2, 3, and 4:
>>> rowsize = [2, 3, 4]
>>> x = np.array([1, 2, 10, 12, 14, 30, 33, 36, 39])
>>> y = np.array([0, 1, 2, 3, 4, 5, 6, 7, 8])
>>> t = np.array([1, 2, 1, 2, 3, 1, 2, 3, 4])
>>> u1, v1 = apply_ragged(velocity_from_position, [x, y, t], rowsize, coord_system="cartesian")
array([1., 1., 2., 2., 2., 3., 3., 3., 3.]),
array([1., 1., 1., 1., 1., 1., 1., 1., 1.]))
To apply ``func`` to only a subset of rows, use the ``rows`` argument:
>>> u1, v1 = apply_ragged(velocity_from_position, [x, y, t], rowsize, rows=0, coord_system="cartesian")
array([1., 1.]),
array([1., 1.]))
>>> u1, v1 = apply_ragged(velocity_from_position, [x, y, t], rowsize, rows=[0, 1], coord_system="cartesian")
array([1., 1., 2., 2., 2.]),
array([1., 1., 1., 1., 1.]))
Raises
------
ValueError
Expand All @@ -76,7 +101,7 @@ def apply_ragged(
raise ValueError("The sum of rowsize must equal the length of arr.")

# split the array(s) into trajectories
arrays = [unpack_ragged(arr, rowsize) for arr in arrays]
arrays = [unpack_ragged(arr, rowsize, rows) for arr in arrays]
iter = [[arrays[i][j] for i in range(len(arrays))] for j in range(len(arrays[0]))]

# parallel execution
Expand Down Expand Up @@ -1076,7 +1101,9 @@ def subset(


def unpack_ragged(
ragged_array: np.ndarray, rowsize: np.ndarray[int]
ragged_array: np.ndarray,
rowsize: np.ndarray[int],
rows: Union[int, Iterable[int]] = None,
) -> list[np.ndarray]:
"""Unpack a ragged array into a list of regular arrays.
Expand All @@ -1091,6 +1118,8 @@ def unpack_ragged(
rowsize : array-like
An array of integers whose values is the size of each row in the ragged
array
rows : int or Iterable[int], optional
A row or list of rows to unpack. Default is None, which unpacks all rows.
Returns
-------
Expand All @@ -1107,6 +1136,8 @@ def unpack_ragged(
lon = unpack_ragged(ds.lon, ds["rowsize"]) # return a list[xr.DataArray] (slower)
lon = unpack_ragged(ds.lon.values, ds["rowsize"]) # return a list[np.ndarray] (faster)
first_lon = unpack_ragged(ds.lon.values, ds["rowsize"], rows=0) # return only the first row
first_two_lons = unpack_ragged(ds.lon.values, ds["rowsize"], rows=[0, 1]) # return first two rows
Looping over trajectories in a ragged Xarray Dataset to compute velocities
for each:
Expand All @@ -1121,4 +1152,10 @@ def unpack_ragged(
u, v = velocity_from_position(lon, lat, time)
"""
indices = rowsize_to_index(rowsize)
return [ragged_array[indices[n] : indices[n + 1]] for n in range(indices.size - 1)]

if rows is None:
rows = range(indices.size - 1)
if isinstance(rows, int):
rows = [rows]

return [ragged_array[indices[n] : indices[n + 1]] for n in rows]
50 changes: 43 additions & 7 deletions tests/analysis_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,23 @@ def test_velocity_ndarray(self):
)
)

def test_with_rows(self):
y = apply_ragged(
lambda x: x**2,
np.array([1, 2, 3, 4]),
[2, 2],
rows=0,
)
self.assertTrue(np.all(y == np.array([1, 4])))

y = apply_ragged(
lambda x: x**2,
np.array([1, 2, 3, 4]),
[2, 2],
rows=[0, 1],
)
self.assertTrue(np.all(y == np.array([1, 4, 9, 16])))

def test_velocity_dataarray(self):
for executor in [futures.ThreadPoolExecutor(), futures.ProcessPoolExecutor()]:
u, v = apply_ragged(
Expand Down Expand Up @@ -891,13 +908,32 @@ def test_unpack_ragged(self):
np.all([lon[n].size == ds["rowsize"][n] for n in range(len(lon))])
)

def test_unpack_ragged_rows(self):
ds = sample_ragged_array().to_xarray()
x = ds.lon.values
rowsize = ds.rowsize.values

class rowsize_to_index_tests(unittest.TestCase):
def test_rowsize_to_index(self):
rowsize = [2, 3, 4]
expected = np.array([0, 2, 5, 9])
self.assertTrue(np.all(rowsize_to_index(rowsize) == expected))
self.assertTrue(np.all(rowsize_to_index(np.array(rowsize)) == expected))
self.assertTrue(
np.all(rowsize_to_index(xr.DataArray(data=rowsize)) == expected)
all(
np.array_equal(a, b)
for a, b in zip(
unpack_ragged(x, rowsize, None), unpack_ragged(x, rowsize)
)
)
)
self.assertTrue(
all(
np.array_equal(a, b)
for a, b in zip(
unpack_ragged(x, rowsize, 0), unpack_ragged(x, rowsize)[:1]
)
)
)
self.assertTrue(
all(
np.array_equal(a, b)
for a, b in zip(
unpack_ragged(x, rowsize, [0, 1]), unpack_ragged(x, rowsize)[:2]
)
)
)

0 comments on commit 5fda844

Please sign in to comment.