diff --git a/src/main/python/systemds/operator/nodes/matrix.py b/src/main/python/systemds/operator/nodes/matrix.py index 672c40c7969..a843f4ac78a 100644 --- a/src/main/python/systemds/operator/nodes/matrix.py +++ b/src/main/python/systemds/operator/nodes/matrix.py @@ -688,11 +688,49 @@ def replace( ) def inv(self) -> "Matrix": - """ Computes the inverse of a squared matrix. + """Computes the inverse of a squared matrix. :return: The Matrix representing the result of this operation """ return Matrix(self.sds_context, "inv", [self]) + def cumsum(self) -> "Matrix": + """Column prefix-sum. (For row-prefix sum, use X.t().cumsum().t()) + + :return: The Matrix representing the result of this operation + """ + return Matrix(self.sds_context, "cumsum", [self]) + + def cumprod(self) -> "Matrix": + """Column prefix-product. (For row-prefix prod, use X.t().cumprod().t()) + + :return: The Matrix representing the result of this operation + """ + return Matrix(self.sds_context, "cumprod", [self]) + + def cumsumprod(self) -> "Matrix": + """Column prefix-sumprod of an 2-column matrix: + Y = X.comsumprod(), + where Y[i,1] = X[i,1] + X[i,2]*Y[i-1,1] for i in [1,2, .., nrow(X)] + The aggregator is initialized with 0 (Y[0,1] = 0) + + :return: The Matrix representing the result of this operation + """ + return Matrix(self.sds_context, "cumsumprod", [self]) + + def cummin(self) -> "Matrix": + """Column prefix-min. (For row-prefix min, use X.t().cummin().t()) + + :return: The Matrix representing the result of this operation + """ + return Matrix(self.sds_context, "cummin", [self]) + + def cummax(self) -> "Matrix": + """Column prefix-max. (For row-prefix max, use X.t().cummax().t()) + + :return: The Matrix representing the result of this operation + """ + return Matrix(self.sds_context, "cummax", [self]) + def __str__(self): return "MatrixNode" diff --git a/src/main/python/tests/matrix/test_cumulative.py b/src/main/python/tests/matrix/test_cumulative.py new file mode 100644 index 00000000000..963f1014170 --- /dev/null +++ b/src/main/python/tests/matrix/test_cumulative.py @@ -0,0 +1,130 @@ +# ------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# ------------------------------------------------------------- + +import unittest +import numpy as np +from systemds.context import SystemDSContext + +m1 = np.array([[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12], [13, 14, 15, 16]]) +m2 = np.random.randint(10, size=(10, 10)) +m3 = np.random.random((10, 10)) + + +def comsumprod(m): + s = 0 + out = [] + for i in m: + s = i[0] + i[1] * s + out.append(s) + return np.array(out).reshape(-1, 1) + + +class TestCUMBASE(unittest.TestCase): + def setUp(self): + self.sds = SystemDSContext() + + def tearDown(self): + self.sds.close() + + def test_cumsum_basic(self): + sds_input = self.sds.from_numpy(m1) + sds_result = sds_input.cumsum().compute() + np_result = np.cumsum(m1, 0) + assert np.allclose(sds_result, np_result, 1e-9) + + def test_cumsum_random1(self): + sds_input = self.sds.from_numpy(m2) + sds_result = sds_input.cumsum().compute() + np_result = np.cumsum(m2, 0) + assert np.allclose(sds_result, np_result, 1e-9) + + def test_cumsum_random2(self): + sds_input = self.sds.from_numpy(m3) + sds_result = sds_input.cumsum().compute() + np_result = np.cumsum(m3, 0) + assert np.allclose(sds_result, np_result, 1e-9) + + def test_cumprod_basic(self): + sds_input = self.sds.from_numpy(m1) + sds_result = sds_input.cumprod().compute() + np_result = np.cumprod(m1, 0) + assert np.allclose(sds_result, np_result, 1e-9) + + def test_cumprod_random1(self): + sds_input = self.sds.from_numpy(m2) + sds_result = sds_input.cumprod().compute() + np_result = np.cumprod(m2, 0) + assert np.allclose(sds_result, np_result, 1e-9) + + def test_cumprod_random2(self): + sds_input = self.sds.from_numpy(m3) + sds_result = sds_input.cumprod().compute() + np_result = np.cumprod(m3, 0) + assert np.allclose(sds_result, np_result, 1e-9) + + def test_cumsumprod_basic(self): + m = m1[:, :2] # 2-col matrix + sds_input = self.sds.from_numpy(m) + sds_result = sds_input.cumsumprod().compute() + exp_result = comsumprod(m) + self.assertTrue(np.allclose(sds_result, exp_result, 1e-9)) + + def test_cumsumprod_random1(self): + m = m2[:, :2] + sds_input = self.sds.from_numpy(m) + sds_result = sds_input.cumsumprod().compute() + exp_result = comsumprod(m) + self.assertTrue(np.allclose(sds_result, exp_result, 1e-9)) + + def test_cumsumprod_random2(self): + m = m3[:, :2] + sds_input = self.sds.from_numpy(m) + sds_result = sds_input.cumsumprod().compute() + exp_result = comsumprod(m) + self.assertTrue(np.allclose(sds_result, exp_result, 1e-9)) + + def test_cummin_random1(self): + sds_input = self.sds.from_numpy(m2) + sds_result = sds_input.cummin().compute() + np_result = np.minimum.accumulate(m2, 0) + assert np.allclose(sds_result, np_result, 1e-9) + + def test_cummin_random2(self): + sds_input = self.sds.from_numpy(m3) + sds_result = sds_input.cummin().compute() + np_result = np.minimum.accumulate(m3, 0) + assert np.allclose(sds_result, np_result, 1e-9) + + def test_cummax_random1(self): + sds_input = self.sds.from_numpy(m2) + sds_result = sds_input.cummax().compute() + np_result = np.maximum.accumulate(m2, 0) + assert np.allclose(sds_result, np_result, 1e-9) + + def test_cummax_random2(self): + sds_input = self.sds.from_numpy(m3) + sds_result = sds_input.cummax().compute() + np_result = np.maximum.accumulate(m3, 0) + assert np.allclose(sds_result, np_result, 1e-9) + + +if __name__ == "__main__": + unittest.main()