diff --git a/clouddrift/analysis.py b/clouddrift/analysis.py index 9eec9312..5579bb0e 100644 --- a/clouddrift/analysis.py +++ b/clouddrift/analysis.py @@ -18,6 +18,7 @@ def apply_ragged( arrays: list[np.ndarray], rowsize: list[int], *args: tuple, + rows: Union[int, Iterable[int]] = None, executor: futures.Executor = futures.ThreadPoolExecutor(max_workers=None), **kwargs: dict, ) -> Union[tuple[np.ndarray], np.ndarray]: @@ -45,6 +46,9 @@ def apply_ragged( List of integers specifying the number of data points in each row. *args : tuple Additional arguments to pass to ``func``. + rows : int or Iterable[int], optional + The row(s) of the ragged array to apply ``func`` to. If ``rows`` is + ``None`` (default), then ``func`` will be applied to all rows. executor : concurrent.futures.Executor, optional Executor to use for concurrent execution. Default is ``ThreadPoolExecutor`` with the default number of ``max_workers``. @@ -72,6 +76,15 @@ def apply_ragged( array([1., 1., 2., 2., 2., 3., 3., 3., 3.]), array([1., 1., 1., 1., 1., 1., 1., 1., 1.])) + To apply ``func`` to only a subset of rows, use the ``rows`` argument: + + >>> u1, v1 = apply_ragged(velocity_from_position, [x, y, t], rowsize, rows=0, coord_system="cartesian") + array([1., 1.]), + array([1., 1.])) + >>> u1, v1 = apply_ragged(velocity_from_position, [x, y, t], rowsize, rows=[0, 1], coord_system="cartesian") + array([1., 1., 2., 2., 2.]), + array([1., 1., 1., 1., 1.])) + Raises ------ ValueError @@ -88,7 +101,7 @@ def apply_ragged( raise ValueError("The sum of rowsize must equal the length of arr.") # split the array(s) into trajectories - arrays = [unpack_ragged(arr, rowsize) for arr in arrays] + arrays = [unpack_ragged(arr, rowsize, rows) for arr in arrays] iter = [[arrays[i][j] for i in range(len(arrays))] for j in range(len(arrays[0]))] # parallel execution @@ -1088,7 +1101,9 @@ def subset( def unpack_ragged( - ragged_array: np.ndarray, rowsize: np.ndarray[int] + ragged_array: np.ndarray, + rowsize: np.ndarray[int], + rows: Union[int, Iterable[int]] = None, ) -> list[np.ndarray]: """Unpack a ragged array into a list of regular arrays. @@ -1103,6 +1118,8 @@ def unpack_ragged( rowsize : array-like An array of integers whose values is the size of each row in the ragged array + rows : int or Iterable[int], optional + A row or list of rows to unpack. Default is None, which unpacks all rows. Returns ------- @@ -1119,6 +1136,8 @@ def unpack_ragged( lon = unpack_ragged(ds.lon, ds["rowsize"]) # return a list[xr.DataArray] (slower) lon = unpack_ragged(ds.lon.values, ds["rowsize"]) # return a list[np.ndarray] (faster) + first_lon = unpack_ragged(ds.lon.values, ds["rowsize"], rows=0) # return only the first row + first_two_lons = unpack_ragged(ds.lon.values, ds["rowsize"], rows=[0, 1]) # return first two rows Looping over trajectories in a ragged Xarray Dataset to compute velocities for each: @@ -1133,4 +1152,10 @@ def unpack_ragged( u, v = velocity_from_position(lon, lat, time) """ indices = rowsize_to_index(rowsize) - return [ragged_array[indices[n] : indices[n + 1]] for n in range(indices.size - 1)] + + if rows is None: + rows = range(indices.size - 1) + if isinstance(rows, int): + rows = [rows] + + return [ragged_array[indices[n] : indices[n + 1]] for n in rows] diff --git a/tests/analysis_tests.py b/tests/analysis_tests.py index 2c8af5a2..89244359 100644 --- a/tests/analysis_tests.py +++ b/tests/analysis_tests.py @@ -739,6 +739,23 @@ def test_velocity_ndarray(self): ) ) + def test_with_rows(self): + y = apply_ragged( + lambda x: x**2, + np.array([1, 2, 3, 4]), + [2, 2], + rows=0, + ) + self.assertTrue(np.all(y == np.array([1, 4]))) + + y = apply_ragged( + lambda x: x**2, + np.array([1, 2, 3, 4]), + [2, 2], + rows=[0, 1], + ) + self.assertTrue(np.all(y == np.array([1, 4, 9, 16]))) + def test_velocity_dataarray(self): for executor in [futures.ThreadPoolExecutor(), futures.ProcessPoolExecutor()]: u, v = apply_ragged( @@ -891,13 +908,32 @@ def test_unpack_ragged(self): np.all([lon[n].size == ds["rowsize"][n] for n in range(len(lon))]) ) + def test_unpack_ragged_rows(self): + ds = sample_ragged_array().to_xarray() + x = ds.lon.values + rowsize = ds.rowsize.values -class rowsize_to_index_tests(unittest.TestCase): - def test_rowsize_to_index(self): - rowsize = [2, 3, 4] - expected = np.array([0, 2, 5, 9]) - self.assertTrue(np.all(rowsize_to_index(rowsize) == expected)) - self.assertTrue(np.all(rowsize_to_index(np.array(rowsize)) == expected)) self.assertTrue( - np.all(rowsize_to_index(xr.DataArray(data=rowsize)) == expected) + all( + np.array_equal(a, b) + for a, b in zip( + unpack_ragged(x, rowsize, None), unpack_ragged(x, rowsize) + ) + ) + ) + self.assertTrue( + all( + np.array_equal(a, b) + for a, b in zip( + unpack_ragged(x, rowsize, 0), unpack_ragged(x, rowsize)[:1] + ) + ) + ) + self.assertTrue( + all( + np.array_equal(a, b) + for a, b in zip( + unpack_ragged(x, rowsize, [0, 1]), unpack_ragged(x, rowsize)[:2] + ) + ) )