Skip to content

Commit

Permalink
[SYSTEMDS-3744-48] Python API cumulative aggregate
Browse files Browse the repository at this point in the history
This commit adds the cumulative column aggregates to
the python API:

cumsum, cumprod, cumsumprod, cummin, cummax

Closes #2093
  • Loading branch information
e-strauss authored and Baunsgaard committed Sep 4, 2024
1 parent f190880 commit d3bdb8a
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 1 deletion.
40 changes: 39 additions & 1 deletion src/main/python/systemds/operator/nodes/matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -688,11 +688,49 @@ def replace(
)

def inv(self) -> "Matrix":
""" Computes the inverse of a squared matrix.
"""Computes the inverse of a squared matrix.
:return: The Matrix representing the result of this operation
"""
return Matrix(self.sds_context, "inv", [self])

def cumsum(self) -> "Matrix":
"""Column prefix-sum. (For row-prefix sum, use X.t().cumsum().t())
:return: The Matrix representing the result of this operation
"""
return Matrix(self.sds_context, "cumsum", [self])

def cumprod(self) -> "Matrix":
"""Column prefix-product. (For row-prefix prod, use X.t().cumprod().t())
:return: The Matrix representing the result of this operation
"""
return Matrix(self.sds_context, "cumprod", [self])

def cumsumprod(self) -> "Matrix":
"""Column prefix-sumprod of an 2-column matrix:
Y = X.comsumprod(),
where Y[i,1] = X[i,1] + X[i,2]*Y[i-1,1] for i in [1,2, .., nrow(X)]
The aggregator is initialized with 0 (Y[0,1] = 0)
:return: The Matrix representing the result of this operation
"""
return Matrix(self.sds_context, "cumsumprod", [self])

def cummin(self) -> "Matrix":
"""Column prefix-min. (For row-prefix min, use X.t().cummin().t())
:return: The Matrix representing the result of this operation
"""
return Matrix(self.sds_context, "cummin", [self])

def cummax(self) -> "Matrix":
"""Column prefix-max. (For row-prefix max, use X.t().cummax().t())
:return: The Matrix representing the result of this operation
"""
return Matrix(self.sds_context, "cummax", [self])

def __str__(self):
return "MatrixNode"
130 changes: 130 additions & 0 deletions src/main/python/tests/matrix/test_cumulative.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
# -------------------------------------------------------------
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# -------------------------------------------------------------

import unittest
import numpy as np
from systemds.context import SystemDSContext

m1 = np.array([[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12], [13, 14, 15, 16]])
m2 = np.random.randint(10, size=(10, 10))
m3 = np.random.random((10, 10))


def comsumprod(m):
s = 0
out = []
for i in m:
s = i[0] + i[1] * s
out.append(s)
return np.array(out).reshape(-1, 1)


class TestCUMBASE(unittest.TestCase):
def setUp(self):
self.sds = SystemDSContext()

def tearDown(self):
self.sds.close()

def test_cumsum_basic(self):
sds_input = self.sds.from_numpy(m1)
sds_result = sds_input.cumsum().compute()
np_result = np.cumsum(m1, 0)
assert np.allclose(sds_result, np_result, 1e-9)

def test_cumsum_random1(self):
sds_input = self.sds.from_numpy(m2)
sds_result = sds_input.cumsum().compute()
np_result = np.cumsum(m2, 0)
assert np.allclose(sds_result, np_result, 1e-9)

def test_cumsum_random2(self):
sds_input = self.sds.from_numpy(m3)
sds_result = sds_input.cumsum().compute()
np_result = np.cumsum(m3, 0)
assert np.allclose(sds_result, np_result, 1e-9)

def test_cumprod_basic(self):
sds_input = self.sds.from_numpy(m1)
sds_result = sds_input.cumprod().compute()
np_result = np.cumprod(m1, 0)
assert np.allclose(sds_result, np_result, 1e-9)

def test_cumprod_random1(self):
sds_input = self.sds.from_numpy(m2)
sds_result = sds_input.cumprod().compute()
np_result = np.cumprod(m2, 0)
assert np.allclose(sds_result, np_result, 1e-9)

def test_cumprod_random2(self):
sds_input = self.sds.from_numpy(m3)
sds_result = sds_input.cumprod().compute()
np_result = np.cumprod(m3, 0)
assert np.allclose(sds_result, np_result, 1e-9)

def test_cumsumprod_basic(self):
m = m1[:, :2] # 2-col matrix
sds_input = self.sds.from_numpy(m)
sds_result = sds_input.cumsumprod().compute()
exp_result = comsumprod(m)
self.assertTrue(np.allclose(sds_result, exp_result, 1e-9))

def test_cumsumprod_random1(self):
m = m2[:, :2]
sds_input = self.sds.from_numpy(m)
sds_result = sds_input.cumsumprod().compute()
exp_result = comsumprod(m)
self.assertTrue(np.allclose(sds_result, exp_result, 1e-9))

def test_cumsumprod_random2(self):
m = m3[:, :2]
sds_input = self.sds.from_numpy(m)
sds_result = sds_input.cumsumprod().compute()
exp_result = comsumprod(m)
self.assertTrue(np.allclose(sds_result, exp_result, 1e-9))

def test_cummin_random1(self):
sds_input = self.sds.from_numpy(m2)
sds_result = sds_input.cummin().compute()
np_result = np.minimum.accumulate(m2, 0)
assert np.allclose(sds_result, np_result, 1e-9)

def test_cummin_random2(self):
sds_input = self.sds.from_numpy(m3)
sds_result = sds_input.cummin().compute()
np_result = np.minimum.accumulate(m3, 0)
assert np.allclose(sds_result, np_result, 1e-9)

def test_cummax_random1(self):
sds_input = self.sds.from_numpy(m2)
sds_result = sds_input.cummax().compute()
np_result = np.maximum.accumulate(m2, 0)
assert np.allclose(sds_result, np_result, 1e-9)

def test_cummax_random2(self):
sds_input = self.sds.from_numpy(m3)
sds_result = sds_input.cummax().compute()
np_result = np.maximum.accumulate(m3, 0)
assert np.allclose(sds_result, np_result, 1e-9)


if __name__ == "__main__":
unittest.main()

0 comments on commit d3bdb8a

Please sign in to comment.