From beb1e97bf17f7b947291e5a7612f19804e96daa8 Mon Sep 17 00:00:00 2001 From: milancurcic Date: Wed, 20 Sep 2023 13:42:13 -0400 Subject: [PATCH 1/2] Add optional rows parameter to unpack_ragged and apply_ragged --- clouddrift/analysis.py | 31 ++++++++++++++++++++++++--- tests/analysis_tests.py | 47 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+), 3 deletions(-) diff --git a/clouddrift/analysis.py b/clouddrift/analysis.py index c0e56302..6d5a0623 100644 --- a/clouddrift/analysis.py +++ b/clouddrift/analysis.py @@ -18,6 +18,7 @@ def apply_ragged( arrays: list[np.ndarray], rowsize: list[int], *args: tuple, + rows: Union[int, Iterable[int]] = None, executor: futures.Executor = futures.ThreadPoolExecutor(max_workers=None), **kwargs: dict, ) -> Union[tuple[np.ndarray], np.ndarray]: @@ -45,6 +46,9 @@ def apply_ragged( List of integers specifying the number of data points in each row. *args : tuple Additional arguments to pass to ``func``. + rows : int or Iterable[int], optional + The row(s) of the ragged array to apply ``func`` to. If ``rows`` is + ``None`` (default), then ``func`` will be applied to all rows. executor : concurrent.futures.Executor, optional Executor to use for concurrent execution. Default is ``ThreadPoolExecutor`` with the default number of ``max_workers``. @@ -72,6 +76,15 @@ def apply_ragged( array([1., 1., 2., 2., 2., 3., 3., 3., 3.]), array([1., 1., 1., 1., 1., 1., 1., 1., 1.])) + To apply ``func`` to only a subset of rows, use the ``rows`` argument: + + >>> u1, v1 = apply_ragged(velocity_from_position, [x, y, t], rowsize, rows=0, coord_system="cartesian") + array([1., 1.]), + array([1., 1.])) + >>> u1, v1 = apply_ragged(velocity_from_position, [x, y, t], rowsize, rows=[0, 1], coord_system="cartesian") + array([1., 1., 2., 2., 2.]), + array([1., 1., 1., 1., 1.])) + Raises ------ ValueError @@ -88,7 +101,7 @@ def apply_ragged( raise ValueError("The sum of rowsize must equal the length of arr.") # split the array(s) into trajectories - arrays = [unpack_ragged(arr, rowsize) for arr in arrays] + arrays = [unpack_ragged(arr, rowsize, rows) for arr in arrays] iter = [[arrays[i][j] for i in range(len(arrays))] for j in range(len(arrays[0]))] # parallel execution @@ -1061,7 +1074,9 @@ def subset( def unpack_ragged( - ragged_array: np.ndarray, rowsize: np.ndarray[int] + ragged_array: np.ndarray, + rowsize: np.ndarray[int], + rows: Union[int, Iterable[int]] = None, ) -> list[np.ndarray]: """Unpack a ragged array into a list of regular arrays. @@ -1076,6 +1091,8 @@ def unpack_ragged( rowsize : array-like An array of integers whose values is the size of each row in the ragged array + rows : int or Iterable[int], optional + A row or list of rows to unpack. Default is None, which unpacks all rows. Returns ------- @@ -1092,6 +1109,8 @@ def unpack_ragged( lon = unpack_ragged(ds.lon, ds["rowsize"]) # return a list[xr.DataArray] (slower) lon = unpack_ragged(ds.lon.values, ds["rowsize"]) # return a list[np.ndarray] (faster) + first_lon = unpack_ragged(ds.lon.values, ds["rowsize"], rows=0) # return only the first row + first_two_lons = unpack_ragged(ds.lon.values, ds["rowsize"], rows=[0, 1]) # return first two rows Looping over trajectories in a ragged Xarray Dataset to compute velocities for each: @@ -1106,4 +1125,10 @@ def unpack_ragged( u, v = velocity_from_position(lon, lat, time) """ indices = np.insert(np.cumsum(np.array(rowsize)), 0, 0) - return [ragged_array[indices[n] : indices[n + 1]] for n in range(indices.size - 1)] + + if rows is None: + rows = range(indices.size - 1) + if isinstance(rows, int): + rows = [rows] + + return [ragged_array[indices[n] : indices[n + 1]] for n in rows] diff --git a/tests/analysis_tests.py b/tests/analysis_tests.py index b9f180e8..d42c3070 100644 --- a/tests/analysis_tests.py +++ b/tests/analysis_tests.py @@ -738,6 +738,23 @@ def test_velocity_ndarray(self): ) ) + def test_with_rows(self): + y = apply_ragged( + lambda x: x**2, + np.array([1, 2, 3, 4]), + [2, 2], + rows=0, + ) + self.assertTrue(np.all(y == np.array([1, 4]))) + + y = apply_ragged( + lambda x: x**2, + np.array([1, 2, 3, 4]), + [2, 2], + rows=[0, 1], + ) + self.assertTrue(np.all(y == np.array([1, 4, 9, 16]))) + def test_velocity_dataarray(self): for executor in [futures.ThreadPoolExecutor(), futures.ProcessPoolExecutor()]: u, v = apply_ragged( @@ -889,3 +906,33 @@ def test_unpack_ragged(self): self.assertTrue( np.all([lon[n].size == ds["rowsize"][n] for n in range(len(lon))]) ) + + def test_unpack_ragged_rows(self): + ds = sample_ragged_array().to_xarray() + x = ds.lon.values + rowsize = ds.rowsize.values + + self.assertTrue( + all( + np.array_equal(a, b) + for a, b in zip( + unpack_ragged(x, rowsize, None), unpack_ragged(x, rowsize) + ) + ) + ) + self.assertTrue( + all( + np.array_equal(a, b) + for a, b in zip( + unpack_ragged(x, rowsize, 0), unpack_ragged(x, rowsize)[:1] + ) + ) + ) + self.assertTrue( + all( + np.array_equal(a, b) + for a, b in zip( + unpack_ragged(x, rowsize, [0, 1]), unpack_ragged(x, rowsize)[:2] + ) + ) + ) From 7ea17f66bbe5f2afa295f8cf1208a9f8eec5a939 Mon Sep 17 00:00:00 2001 From: milancurcic Date: Thu, 21 Sep 2023 12:09:52 -0400 Subject: [PATCH 2/2] Fix conflict resolve from rowsize_to_index --- clouddrift/analysis.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clouddrift/analysis.py b/clouddrift/analysis.py index 3425d950..5579bb0e 100644 --- a/clouddrift/analysis.py +++ b/clouddrift/analysis.py @@ -1151,7 +1151,7 @@ def unpack_ragged( )): u, v = velocity_from_position(lon, lat, time) """ - indices = np.insert(np.cumsum(np.array(rowsize)), 0, 0) + indices = rowsize_to_index(rowsize) if rows is None: rows = range(indices.size - 1)