diff --git a/src/main/python/systemds/operator/nodes/frame.py b/src/main/python/systemds/operator/nodes/frame.py index 137a0b21558..27eeb6497a0 100644 --- a/src/main/python/systemds/operator/nodes/frame.py +++ b/src/main/python/systemds/operator/nodes/frame.py @@ -22,8 +22,7 @@ __all__ = ["Frame"] import os -from typing import (TYPE_CHECKING, Dict, Iterable, Optional, Sequence, Tuple, - Union) +from typing import TYPE_CHECKING, Dict, Iterable, Optional, Sequence, Tuple, Union import numpy as np import pandas as pd @@ -34,9 +33,12 @@ from systemds.operator.nodes.matrix import Matrix from systemds.script_building.dag import DAGNode, OutputType from systemds.utils.consts import VALID_INPUT_TYPES -from systemds.utils.converters import (frame_block_to_pandas, - pandas_to_frame_block) -from systemds.utils.helpers import check_is_empty_slice, check_no_less_than_zero, get_slice_string +from systemds.utils.converters import frame_block_to_pandas, pandas_to_frame_block +from systemds.utils.helpers import ( + check_is_empty_slice, + check_no_less_than_zero, + get_slice_string, +) if TYPE_CHECKING: # to avoid cyclic dependencies during runtime @@ -47,11 +49,15 @@ class Frame(OperationNode): _pd_dataframe: pd.DataFrame - def __init__(self, sds_context: "SystemDSContext", operation: str, - unnamed_input_nodes: Union[str, - Iterable[VALID_INPUT_TYPES]] = None, - named_input_nodes: Dict[str, VALID_INPUT_TYPES] = None, - local_data: pd.DataFrame = None, brackets: bool = False) -> "Frame": + def __init__( + self, + sds_context: "SystemDSContext", + operation: str, + unnamed_input_nodes: Union[str, Iterable[VALID_INPUT_TYPES]] = None, + named_input_nodes: Dict[str, VALID_INPUT_TYPES] = None, + local_data: pd.DataFrame = None, + brackets: bool = False, + ) -> "Frame": is_python_local_data = False if local_data is not None: self._pd_dataframe = local_data @@ -59,18 +65,33 @@ def __init__(self, sds_context: "SystemDSContext", operation: str, else: self._pd_dataframe = None - super().__init__(sds_context, operation, unnamed_input_nodes, - named_input_nodes, OutputType.FRAME, is_python_local_data, brackets) + super().__init__( + sds_context, + operation, + unnamed_input_nodes, + named_input_nodes, + OutputType.FRAME, + is_python_local_data, + brackets, + ) - def pass_python_data_to_prepared_script(self, sds, var_name: str, prepared_script: JavaObject) -> None: + def pass_python_data_to_prepared_script( + self, sds, var_name: str, prepared_script: JavaObject + ) -> None: assert ( - self.is_python_local_data), "Can only pass data to prepared script if it is python local!" + self.is_python_local_data + ), "Can only pass data to prepared script if it is python local!" if self._is_pandas(): prepared_script.setFrame( var_name, pandas_to_frame_block(sds, self._pd_dataframe), True ) # True for reuse - def code_line(self, var_name: str, unnamed_input_vars: Sequence[str], named_input_vars: Dict[str, str]) -> str: + def code_line( + self, + var_name: str, + unnamed_input_vars: Sequence[str], + named_input_vars: Dict[str, str], + ) -> str: code_line = super().code_line(var_name, unnamed_input_vars, named_input_vars) if self._is_pandas(): code_line = code_line.format(file_name=var_name) @@ -84,7 +105,10 @@ def compute(self, verbose: bool = False, lineage: bool = False) -> pd.DataFrame: return super().compute(verbose, lineage) def _parse_output_result_variables(self, result_variables): - return frame_block_to_pandas(self.sds_context, result_variables.getFrameBlock(self._script.out_var_name[0])) + return frame_block_to_pandas( + self.sds_context, + result_variables.getFrameBlock(self._script.out_var_name[0]), + ) def _is_pandas(self) -> bool: return self._pd_dataframe is not None @@ -112,68 +136,94 @@ def transform_apply(self, spec: "Scalar", meta: "Frame"): params_dict = {"target": self, "spec": spec, "meta": meta} return Matrix(self.sds_context, "transformapply", named_input_nodes=params_dict) - def rbind(self, other) -> 'Frame': + def rbind(self, other) -> "Frame": """ - Row-wise frame concatenation, by concatenating the second frame as additional rows to the first frame. + Row-wise frame concatenation, by concatenating the second frame as additional rows to the first frame. :param: The other frame to bind to the right hand side :return: The OperationNode containing the concatenated frames. """ return Frame(self.sds_context, "rbind", [self, other]) - def cbind(self, other) -> 'Frame': + def cbind(self, other) -> "Frame": """ - Column-wise frame concatenation, by concatenating the second frame as additional columns to the first frame. + Column-wise frame concatenation, by concatenating the second frame as additional columns to the first frame. :param: The other frame to bind to the right hand side. :return: The Frame containing the concatenated frames. """ return Frame(self.sds_context, "cbind", [self, other]) - def replace(self, pattern: str, replacement: str) -> 'Frame': + def replace(self, pattern: str, replacement: str) -> "Frame": """ Replace all instances of string with replacement string :param: pattern the string to replace :param: replacement the string to replace with - :return: The Frame containing the replaced values + :return: The Frame containing the replaced values """ - return Frame(self.sds_context, "replace", named_input_nodes={"target": self, "pattern": f"'{pattern}'", "replacement": f"'{replacement}'"}) + return Frame( + self.sds_context, + "replace", + named_input_nodes={ + "target": self, + "pattern": f"'{pattern}'", + "replacement": f"'{replacement}'", + }, + ) - def to_string(self, **kwargs: Dict[str, VALID_INPUT_TYPES]) -> 'Scalar': - """ Converts the input to a string representation. + def to_string(self, **kwargs: Dict[str, VALID_INPUT_TYPES]) -> "Scalar": + """Converts the input to a string representation. :return: `Scalar` containing the string. """ - return Scalar(self.sds_context, 'toString', [self], kwargs, output_type=OutputType.STRING) + return Scalar( + self.sds_context, "toString", [self], kwargs, output_type=OutputType.STRING + ) + + def to_matrix(self): + return Matrix(self.sds_context, "as.matrix", [self]) + + def to_scalar(self): + return Scalar(self.sds_context, "as.scalar", [self]) def __str__(self): return "FrameNode" - def nRow(self) -> 'Scalar': - return Scalar(self.sds_context, 'nrow', [self]) + def nRow(self) -> "Scalar": + return Scalar(self.sds_context, "nrow", [self]) - def nCol(self) -> 'Scalar': - return Scalar(self.sds_context, 'ncol', [self]) + def nCol(self) -> "Scalar": + return Scalar(self.sds_context, "ncol", [self]) - def __getitem__(self, i) -> 'Frame': + def __getitem__(self, i) -> "Frame": if isinstance(i, tuple) and len(i) > 2: raise ValueError("Maximum of two dimensions are allowed") elif isinstance(i, list): check_no_less_than_zero(i) slice = self.sds_context.from_numpy(np.array(i)) + 1 - select = Matrix(self.sds_context, "table", - [slice, 1, self.nRow(), 1]) - ret = Frame(self.sds_context, "removeEmpty", [], { - 'target': self, 'margin': '"rows"', 'select': select}) + select = Matrix(self.sds_context, "table", [slice, 1, self.nRow(), 1]) + ret = Frame( + self.sds_context, + "removeEmpty", + [], + {"target": self, "margin": '"rows"', "select": select}, + ) return ret elif isinstance(i, tuple) and isinstance(i[0], list) and isinstance(i[1], list): raise NotImplementedError("double slicing is not supported yet") - elif isinstance(i, tuple) and check_is_empty_slice(i[0]) and isinstance(i[1], list): + elif ( + isinstance(i, tuple) + and check_is_empty_slice(i[0]) + and isinstance(i[1], list) + ): check_no_less_than_zero(i[1]) slice = self.sds_context.from_numpy(np.array(i[1])) + 1 - select = Matrix(self.sds_context, "table", - [slice, 1, self.nCol(), 1]) - ret = Frame(self.sds_context, "removeEmpty", [], { - 'target': self, 'margin': '"cols"', 'select': select}) + select = Matrix(self.sds_context, "table", [slice, 1, self.nCol(), 1]) + ret = Frame( + self.sds_context, + "removeEmpty", + [], + {"target": self, "margin": '"cols"', "select": select}, + ) return ret else: sliceIns = get_slice_string(i) - return Frame(self.sds_context, '', [self, sliceIns], brackets=True) + return Frame(self.sds_context, "", [self, sliceIns], brackets=True) diff --git a/src/main/python/systemds/operator/nodes/matrix.py b/src/main/python/systemds/operator/nodes/matrix.py index 41bb481da56..d5eb8812875 100644 --- a/src/main/python/systemds/operator/nodes/matrix.py +++ b/src/main/python/systemds/operator/nodes/matrix.py @@ -634,6 +634,14 @@ def to_string(self, **kwargs: Dict[str, VALID_INPUT_TYPES]) -> "Scalar": self.sds_context, "toString", [self], kwargs, output_type=OutputType.STRING ) + def to_scalar(self): + return Scalar(self.sds_context, "as.scalar", [self]) + + def to_frame(self): + from systemds.operator import Frame + + return Frame(self.sds_context, "as.frame", [self]) + def isNA(self) -> "Matrix": """Computes a boolean indicator matrix of the same shape as the input, indicating where NA (not available) values are located. Currently, NA is only capturing NaN values. @@ -810,5 +818,89 @@ def quantile(self, p, weights: "Matrix" = None) -> "OperationNode": else: raise ValueError("P has to be a Scalar or Matrix") + def triu(self, include_diagonal=True, return_values=True) -> "Matrix": + """Selects the upper triangular part of a matrix, configurable to include the diagonal and return values or ones + + :param include_diagonal: boolean, default True + :param return_values: boolean, default True, if set to False returns ones + :return: `Matrix` + """ + named_input_nodes = { + "target": self, + "diag": self.sds_context.scalar(include_diagonal), + "values": self.sds_context.scalar(return_values), + } + return Matrix( + self.sds_context, "upper.tri", named_input_nodes=named_input_nodes + ) + + def tril(self, include_diagonal=True, return_values=True) -> "Matrix": + """Selects the lower triangular part of a matrix, configurable to include the diagonal and return values or ones + + :param include_diagonal: boolean, default True + :param return_values: boolean, default True, if set to False returns ones + :return: `Matrix` + """ + named_input_nodes = { + "target": self, + "diag": self.sds_context.scalar(include_diagonal), + "values": self.sds_context.scalar(return_values), + } + return Matrix( + self.sds_context, "lower.tri", named_input_nodes=named_input_nodes + ) + + def argmin(self, axis: int = None) -> "OperationNode": + """Return the index of the minimum if axis is None or a column vector for row-wise / column-wise minima + computation. + + :param axis: can be 0 or 1 to do either row or column sums + :return: `Matrix` representing operation for row / columns or 'Scalar' representing operation for complete + """ + if axis == 0: + return Matrix(self.sds_context, "rowIndexMin", [self.t()]) + elif axis == 1: + return Matrix(self.sds_context, "rowIndexMin", [self]) + elif axis is None: + return Matrix( + self.sds_context, + "rowIndexMin", + [self.reshape(1, self.nCol() * self.nRow())], + ).to_scalar() + else: + raise ValueError( + f"Axis has to be either 0, 1 or None, for column, row or complete {self.operation}" + ) + + def argmax(self, axis: int = None) -> "OperationNode": + """Return the index of the maximum if axis is None or a column vector for row-wise / column-wise maxima + computation. + + :param axis: can be 0 or 1 to do either row or column sums + :return: `Matrix` representing operation for row / columns or 'Scalar' representing operation for complete + """ + if axis == 0: + return Matrix(self.sds_context, "rowIndexMax", [self.t()]) + elif axis == 1: + return Matrix(self.sds_context, "rowIndexMax", [self]) + elif axis is None: + return Matrix( + self.sds_context, + "rowIndexMax", + [self.reshape(1, self.nCol() * self.nRow())], + ).to_scalar() + else: + raise ValueError( + f"Axis has to be either 0, 1 or None, for column, row or complete {self.operation}" + ) + + def reshape(self, rows, cols=1): + """Gives a new shape to a matrix without changing its data. + + :param rows: number of rows + :param cols: number of columns, defaults to 1 + :return: `Matrix` representing operation""" + return Matrix(self.sds_context, "matrix", [self, rows, cols]) + def __str__(self): return "MatrixNode" diff --git a/src/main/python/systemds/operator/nodes/scalar.py b/src/main/python/systemds/operator/nodes/scalar.py index 7a369b2625b..d746aeacef3 100644 --- a/src/main/python/systemds/operator/nodes/scalar.py +++ b/src/main/python/systemds/operator/nodes/scalar.py @@ -22,38 +22,56 @@ __all__ = ["Scalar"] import os -from typing import (TYPE_CHECKING, Dict, Iterable, Optional, Sequence, Tuple, - Union) +from typing import TYPE_CHECKING, Dict, Iterable, Optional, Sequence, Tuple, Union import numpy as np from py4j.java_gateway import JavaObject, JVMView from systemds.operator.operation_node import OperationNode from systemds.script_building.dag import OutputType -from systemds.utils.consts import (BINARY_OPERATIONS, VALID_ARITHMETIC_TYPES, - VALID_INPUT_TYPES) +from systemds.utils.consts import ( + BINARY_OPERATIONS, + VALID_ARITHMETIC_TYPES, + VALID_INPUT_TYPES, +) from systemds.utils.converters import numpy_to_matrix_block class Scalar(OperationNode): __assign: bool - def __init__(self, sds_context, operation: str, - unnamed_input_nodes: Iterable[VALID_INPUT_TYPES] = None, - named_input_nodes: Dict[str, VALID_INPUT_TYPES] = None, - output_type: OutputType = OutputType.DOUBLE, - assign: bool = False) -> 'Scalar': + def __init__( + self, + sds_context, + operation: str, + unnamed_input_nodes: Iterable[VALID_INPUT_TYPES] = None, + named_input_nodes: Dict[str, VALID_INPUT_TYPES] = None, + output_type: OutputType = OutputType.DOUBLE, + assign: bool = False, + ) -> "Scalar": self.__assign = assign - super().__init__(sds_context, operation, unnamed_input_nodes=unnamed_input_nodes, - named_input_nodes=named_input_nodes, output_type=output_type) - - def pass_python_data_to_prepared_script(self, sds, var_name: str, prepared_script: JavaObject) -> None: - raise RuntimeError( - 'Scalar Operation Nodes, should not have python data input') - - def code_line(self, var_name: str, unnamed_input_vars: Sequence[str], - named_input_vars: Dict[str, str]) -> str: + super().__init__( + sds_context, + operation, + unnamed_input_nodes=unnamed_input_nodes, + named_input_nodes=named_input_nodes, + output_type=output_type, + ) + + def pass_python_data_to_prepared_script( + self, sds, var_name: str, prepared_script: JavaObject + ) -> None: + raise RuntimeError("Scalar Operation Nodes, should not have python data input") + + def code_line( + self, + var_name: str, + unnamed_input_vars: Sequence[str], + named_input_vars: Dict[str, str], + ) -> str: if self.__assign: - return f'{var_name}={self.operation};' + if type(self.operation) is bool: + self.operation = "TRUE" if self.operation else "FALSE" + return f"{var_name}={self.operation};" else: return super().code_line(var_name, unnamed_input_vars, named_input_vars) @@ -65,133 +83,138 @@ def _parse_output_result_variables(self, result_variables): return result_variables.getDouble(self._script.out_var_name[0]) elif self.output_type == OutputType.STRING: return result_variables.getString(self._script.out_var_name[0]) + elif self.output_type == OutputType.BOOLEAN: + return result_variables.getBoolean(self._script.out_var_name[0]) + elif self.output_type == OutputType.INT: + return int(result_variables.getString(self._script.out_var_name[0])) else: raise NotImplemented( - "Not currently support scalar type: " + self.output_type) + "Not currently support scalar type: " + self.output_type + ) - def __add__(self, other: VALID_ARITHMETIC_TYPES) -> 'Scalar': - return Scalar(self.sds_context, '+', [self, other]) + def __add__(self, other: VALID_ARITHMETIC_TYPES) -> "Scalar": + return Scalar(self.sds_context, "+", [self, other]) # Left hand side - def __radd__(self, other: VALID_ARITHMETIC_TYPES) -> 'Scalar': - return Scalar(self.sds_context, '+', [other, self]) + def __radd__(self, other: VALID_ARITHMETIC_TYPES) -> "Scalar": + return Scalar(self.sds_context, "+", [other, self]) - def __sub__(self, other: VALID_ARITHMETIC_TYPES) -> 'Scalar': - return Scalar(self.sds_context, '-', [self, other]) + def __sub__(self, other: VALID_ARITHMETIC_TYPES) -> "Scalar": + return Scalar(self.sds_context, "-", [self, other]) # Left hand side - def __rsub__(self, other: VALID_ARITHMETIC_TYPES) -> 'Scalar': - return Scalar(self.sds_context, '-', [other, self]) + def __rsub__(self, other: VALID_ARITHMETIC_TYPES) -> "Scalar": + return Scalar(self.sds_context, "-", [other, self]) - def __mul__(self, other: VALID_ARITHMETIC_TYPES) -> 'Scalar': - return Scalar(self.sds_context, '*', [self, other]) + def __mul__(self, other: VALID_ARITHMETIC_TYPES) -> "Scalar": + return Scalar(self.sds_context, "*", [self, other]) - def __rmul__(self, other: VALID_ARITHMETIC_TYPES) -> 'Scalar': - return Scalar(self.sds_context, '*', [other, self]) + def __rmul__(self, other: VALID_ARITHMETIC_TYPES) -> "Scalar": + return Scalar(self.sds_context, "*", [other, self]) - def __truediv__(self, other: VALID_ARITHMETIC_TYPES) -> 'Scalar': - return Scalar(self.sds_context, '/', [self, other]) + def __truediv__(self, other: VALID_ARITHMETIC_TYPES) -> "Scalar": + return Scalar(self.sds_context, "/", [self, other]) - def __rtruediv__(self, other: VALID_ARITHMETIC_TYPES) -> 'Scalar': - return Scalar(self.sds_context, '/', [other, self]) + def __rtruediv__(self, other: VALID_ARITHMETIC_TYPES) -> "Scalar": + return Scalar(self.sds_context, "/", [other, self]) - def __floordiv__(self, other: VALID_ARITHMETIC_TYPES) -> 'Scalar': - return Scalar(self.sds_context, '//', [self, other]) + def __floordiv__(self, other: VALID_ARITHMETIC_TYPES) -> "Scalar": + return Scalar(self.sds_context, "//", [self, other]) - def __rfloordiv__(self, other: VALID_ARITHMETIC_TYPES) -> 'Scalar': - return Scalar(self.sds_context, '//', [other, self]) + def __rfloordiv__(self, other: VALID_ARITHMETIC_TYPES) -> "Scalar": + return Scalar(self.sds_context, "//", [other, self]) - def __lt__(self, other) -> 'Scalar': - return Scalar(self.sds_context, '<', [self, other]) + def __lt__(self, other) -> "Scalar": + return Scalar(self.sds_context, "<", [self, other]) - def __rlt__(self, other) -> 'Scalar': - return Scalar(self.sds_context, '<', [other, self]) + def __rlt__(self, other) -> "Scalar": + return Scalar(self.sds_context, "<", [other, self]) - def __le__(self, other) -> 'Scalar': - return Scalar(self.sds_context, '<=', [self, other]) + def __le__(self, other) -> "Scalar": + return Scalar(self.sds_context, "<=", [self, other]) - def __rle__(self, other) -> 'Scalar': - return Scalar(self.sds_context, '<=', [other, self]) + def __rle__(self, other) -> "Scalar": + return Scalar(self.sds_context, "<=", [other, self]) - def __gt__(self, other) -> 'Scalar': - return Scalar(self.sds_context, '>', [self, other]) + def __gt__(self, other) -> "Scalar": + return Scalar(self.sds_context, ">", [self, other]) - def __rgt__(self, other) -> 'Scalar': - return Scalar(self.sds_context, '>', [other, self]) + def __rgt__(self, other) -> "Scalar": + return Scalar(self.sds_context, ">", [other, self]) - def __ge__(self, other) -> 'Scalar': - return Scalar(self.sds_context, '>=', [self, other]) + def __ge__(self, other) -> "Scalar": + return Scalar(self.sds_context, ">=", [self, other]) - def __rge__(self, other) -> 'Scalar': - return Scalar(self.sds_context, '>=', [other, self]) + def __rge__(self, other) -> "Scalar": + return Scalar(self.sds_context, ">=", [other, self]) - def __eq__(self, other) -> 'Scalar': - return Scalar(self.sds_context, '==', [self, other]) + def __eq__(self, other) -> "Scalar": + return Scalar(self.sds_context, "==", [self, other]) - def __req__(self, other) -> 'Scalar': - return Scalar(self.sds_context, '==', [other, self]) + def __req__(self, other) -> "Scalar": + return Scalar(self.sds_context, "==", [other, self]) - def __ne__(self, other) -> 'Scalar': - return Scalar(self.sds_context, '!=', [self, other]) + def __ne__(self, other) -> "Scalar": + return Scalar(self.sds_context, "!=", [self, other]) - def __rne__(self, other) -> 'Scalar': - return Scalar(self.sds_context, '!=', [other, self]) + def __rne__(self, other) -> "Scalar": + return Scalar(self.sds_context, "!=", [other, self]) - def __matmul__(self, other: 'Scalar') -> 'Scalar': - return Scalar(self.sds_context, '%*%', [self, other]) + def __matmul__(self, other: "Scalar") -> "Scalar": + return Scalar(self.sds_context, "%*%", [self, other]) - def sum(self) -> 'Scalar': - return Scalar(self.sds_context, 'sum', [self], output_type=OutputType.DOUBLE) + def sum(self) -> "Scalar": + return Scalar(self.sds_context, "sum", [self], output_type=OutputType.DOUBLE) - def mean(self) -> 'Scalar': - return Scalar(self.sds_context, 'mean', [self], output_type=OutputType.DOUBLE) + def mean(self) -> "Scalar": + return Scalar(self.sds_context, "mean", [self], output_type=OutputType.DOUBLE) - def var(self, axis: int = None) -> 'Scalar': - return Scalar(self.sds_context, 'var', [self], output_type=OutputType.DOUBLE) + def var(self, axis: int = None) -> "Scalar": + return Scalar(self.sds_context, "var", [self], output_type=OutputType.DOUBLE) - def abs(self) -> 'Scalar': + def abs(self) -> "Scalar": """Calculate absolute. :return: `Scalar` representing operation """ - return Scalar(self.sds_context, 'abs', [self]) + return Scalar(self.sds_context, "abs", [self]) - def sqrt(self) -> 'Scalar': + def sqrt(self) -> "Scalar": """Calculate square root. :return: `Scalar` representing operation """ - return Scalar(self.sds_context, 'sqrt', [self]) + return Scalar(self.sds_context, "sqrt", [self]) - def floor(self) -> 'Scalar': + def floor(self) -> "Scalar": """Return the floor of the input, element-wise. :return: `Scalar` representing operation """ - return Scalar(self.sds_context, 'floor', [self]) + return Scalar(self.sds_context, "floor", [self]) - def ceil(self) -> 'Scalar': + def ceil(self) -> "Scalar": """Return the ceiling of the input, element-wise. :return: `Scalar` representing operation """ - return Scalar(self.sds_context, 'ceil', [self]) + return Scalar(self.sds_context, "ceil", [self]) - def log(self) -> 'Scalar': + def log(self) -> "Scalar": """Calculate logarithm. :return: `Scalar` representing operation """ - return Scalar(self.sds_context, 'log', [self]) + return Scalar(self.sds_context, "log", [self]) - def sin(self) -> 'Scalar': + def sin(self) -> "Scalar": """Calculate sin. :return: `Scalar` representing operation """ - return Scalar(self.sds_context, 'sin', [self]) + return Scalar(self.sds_context, "sin", [self]) - def exp(self) -> 'Scalar': + def exp(self) -> "Scalar": """Calculate exponential. :return: `Scalar` representing operation @@ -206,90 +229,116 @@ def sign(self) -> "Scalar": """ return Scalar(self.sds_context, "sign", [self]) - def cos(self) -> 'Scalar': + def cos(self) -> "Scalar": """Calculate cos. :return: `Scalar` representing operation """ - return Scalar(self.sds_context, 'cos', [self]) + return Scalar(self.sds_context, "cos", [self]) - def tan(self) -> 'Scalar': + def tan(self) -> "Scalar": """Calculate tan. :return: `Scalar` representing operation """ - return Scalar(self.sds_context, 'tan', [self]) + return Scalar(self.sds_context, "tan", [self]) - def asin(self) -> 'Scalar': + def asin(self) -> "Scalar": """Calculate arcsin. :return: `Scalar` representing operation """ - return Scalar(self.sds_context, 'asin', [self]) + return Scalar(self.sds_context, "asin", [self]) - def acos(self) -> 'Scalar': + def acos(self) -> "Scalar": """Calculate arccos. :return: `Scalar` representing operation """ - return Scalar(self.sds_context, 'acos', [self]) + return Scalar(self.sds_context, "acos", [self]) - def atan(self) -> 'Scalar': + def atan(self) -> "Scalar": """Calculate arctan. :return: `Scalar` representing operation """ - return Scalar(self.sds_context, 'atan', [self]) + return Scalar(self.sds_context, "atan", [self]) - def sinh(self) -> 'Scalar': + def sinh(self) -> "Scalar": """Calculate sin. :return: `Scalar` representing operation """ - return Scalar(self.sds_context, 'sinh', [self]) + return Scalar(self.sds_context, "sinh", [self]) - def cosh(self) -> 'Scalar': + def cosh(self) -> "Scalar": """Calculate cos. :return: `Scalar` representing operation """ - return Scalar(self.sds_context, 'cosh', [self]) + return Scalar(self.sds_context, "cosh", [self]) - def tanh(self) -> 'Scalar': + def tanh(self) -> "Scalar": """Calculate tan. :return: `Scalar` representing operation """ - return Scalar(self.sds_context, 'tanh', [self]) + return Scalar(self.sds_context, "tanh", [self]) - def to_string(self, **kwargs: Dict[str, VALID_INPUT_TYPES]) -> 'Scalar': - """ Converts the input to a string representation. + def to_string(self, **kwargs: Dict[str, VALID_INPUT_TYPES]) -> "Scalar": + """Converts the input to a string representation. :return: `Scalar` containing the string. """ - return Scalar(self.sds_context, 'toString', [self], named_input_nodes=kwargs, output_type=OutputType.STRING) + return Scalar( + self.sds_context, + "toString", + [self], + named_input_nodes=kwargs, + output_type=OutputType.STRING, + ) + + def to_frame(self): + from systemds.operator import Frame + + return Frame(self.sds_context, "as.frame", [self]) + + def to_matrix(self): + from systemds.operator import Matrix + + return Matrix(self.sds_context, "as.matrix", [self]) + + def to_int(self) -> "Scalar": + return Scalar( + self.sds_context, "as.integer", [self], output_type=OutputType.INT + ) + + def to_boolean(self) -> "Scalar": + return Scalar( + self.sds_context, "as.logical", [self], output_type=OutputType.BOOLEAN + ) - def isNA(self) -> 'Scalar': - """ Computes a boolean indicator matrix of the same shape as the input, indicating where NA (not available) + def isNA(self) -> "Scalar": + """Computes a boolean indicator matrix of the same shape as the input, indicating where NA (not available) values are located. Currently, NA is only capturing NaN values. :return: the OperationNode representing this operation """ - return Scalar(self.sds_context, 'isNA', [self]) + return Scalar(self.sds_context, "isNA", [self]) - def isNaN(self) -> 'Scalar': - """ Computes a boolean indicator matrix of the same shape as the input, indicating where NaN (not a number) + def isNaN(self) -> "Scalar": + """Computes a boolean indicator matrix of the same shape as the input, indicating where NaN (not a number) values are located. :return: the OperationNode representing this operation """ - return Scalar(self.sds_context, 'isNaN', [self]) + return Scalar(self.sds_context, "isNaN", [self]) - def isInf(self) -> 'Scalar': - """ Computes a boolean indicator matrix of the same shape as the input, indicating where Inf (positive or + def isInf(self) -> "Scalar": + """Computes a boolean indicator matrix of the same shape as the input, indicating where Inf (positive or negative infinity) values are located. :return: the OperationNode representing this operation """ - return Scalar(self.sds_context, 'isInf', [self]) + return Scalar(self.sds_context, "isInf", [self]) def __str__(self): return "ScalarNode" diff --git a/src/main/python/systemds/script_building/dag.py b/src/main/python/systemds/script_building/dag.py index fcb707e2228..69830925f10 100644 --- a/src/main/python/systemds/script_building/dag.py +++ b/src/main/python/systemds/script_building/dag.py @@ -43,6 +43,8 @@ class OutputType(Enum): NONE = auto() SCALAR = auto() STRING = auto() + INT = auto() + BOOLEAN = auto() IMPORT = auto() UNKNOWN = auto() @@ -52,17 +54,17 @@ def from_str(label: Union[str, VALID_INPUT_TYPES]): if label is not None: if isinstance(label, str): lc = label.lower() - if lc in ['matrix', 'matrixblock']: + if lc in ["matrix", "matrixblock"]: return OutputType.MATRIX - elif lc in ['frame', 'frameblock']: + elif lc in ["frame", "frameblock"]: return OutputType.FRAME - elif lc in ['scalar']: + elif lc in ["scalar"]: return OutputType.SCALAR - elif lc in ['double']: + elif lc in ["double"]: return OutputType.DOUBLE - elif lc in ['string', 'str']: + elif lc in ["string", "str"]: return OutputType.STRING - elif lc in ['list']: + elif lc in ["list"]: return OutputType.LIST else: if isinstance(label, DAGNode): @@ -93,10 +95,11 @@ def from_type(obj): class DAGNode(ABC): """A Node in the directed-acyclic-graph (DAG) defining all operations.""" - sds_context: 'SystemDSContext' - _unnamed_input_nodes: Sequence[Union['DAGNode', str, int, float, bool]] - _named_input_nodes: Dict[str, Union['DAGNode', str, int, float, bool]] - _named_output_nodes: Dict[str, Union['DAGNode', str, int, float, bool]] + + sds_context: "SystemDSContext" + _unnamed_input_nodes: Sequence[Union["DAGNode", str, int, float, bool]] + _named_input_nodes: Dict[str, Union["DAGNode", str, int, float, bool]] + _named_output_nodes: Dict[str, Union["DAGNode", str, int, float, bool]] _source_node: Optional["DAGNode"] _output_type: OutputType _script: Optional["DMLScript"] @@ -120,7 +123,12 @@ def get_lineage_trace(self) -> str: # therefore we could cache the result. raise NotImplementedError - def code_line(self, var_name: str, unnamed_input_vars: Sequence[str], named_input_vars: Dict[str, str]) -> str: + def code_line( + self, + var_name: str, + unnamed_input_vars: Sequence[str], + named_input_vars: Dict[str, str], + ) -> str: """Generates the DML code line equal to the intended action of this node. :param var_name: Name of DML-variable this nodes result should be saved in @@ -130,7 +138,9 @@ def code_line(self, var_name: str, unnamed_input_vars: Sequence[str], named_inpu """ raise NotImplementedError - def pass_python_data_to_prepared_script(self, jvm: JVMView, var_name: str, prepared_script: JavaObject) -> None: + def pass_python_data_to_prepared_script( + self, jvm: JVMView, var_name: str, prepared_script: JavaObject + ) -> None: """Passes data from python to the prepared script object. :param jvm: the java virtual machine object diff --git a/src/main/python/tests/matrix/test_arg_min_max.py b/src/main/python/tests/matrix/test_arg_min_max.py new file mode 100644 index 00000000000..602a9dfee27 --- /dev/null +++ b/src/main/python/tests/matrix/test_arg_min_max.py @@ -0,0 +1,95 @@ +# ------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# ------------------------------------------------------------- + +import unittest +import numpy as np +from systemds.context import SystemDSContext + +np.random.seed(7) +m = np.array([[1, 2, 3], [6, 5, 4], [8, 7, 9]]) +M = np.random.random_integers(9, size=300).reshape(100, 3) +p = np.array([0.25, 0.5, 0.75]) +m2 = np.array([1, 2, 3, 4, 5]) +w2 = np.array([1, 1, 1, 1, 5]) + + +def weighted_quantiles(values, weights, quantiles=0.5): + i = np.argsort(values) + c = np.cumsum(weights[i]) + return values[i[np.searchsorted(c, np.array(quantiles) * c[-1])]] + + +class TestARGMINMAX(unittest.TestCase): + def setUp(self): + self.sds = SystemDSContext() + + def tearDown(self): + self.sds.close() + + def test_argmin_basic1(self): + sds_input = self.sds.from_numpy(m) + sds_result = sds_input.argmin(0).compute() + np_result = np.argmin(m, axis=0).reshape(-1, 1) + assert np.allclose(sds_result - 1, np_result, 1e-9) + + def test_argmin_basic2(self): + sds_input = self.sds.from_numpy(m) + sds_result = sds_input.argmin(1).compute() + np_result = np.argmin(m, axis=1).reshape(-1, 1) + assert np.allclose(sds_result - 1, np_result, 1e-9) + + def test_argmin_basic3(self): + sds_input = self.sds.from_numpy(m) + sds_result = sds_input.argmin().compute(verbose=True) + np_result = np.argmin(m) + assert np.allclose(sds_result - 1, np_result, 1e-9) + + def test_argmin_basic4(self): + sds_input = self.sds.from_numpy(m) + with self.assertRaises(ValueError): + sds_input.argmin(3) + + def test_argmax_basic1(self): + sds_input = self.sds.from_numpy(m) + sds_result = sds_input.argmax(0).compute() + np_result = np.argmax(m, axis=0).reshape(-1, 1) + assert np.allclose(sds_result - 1, np_result, 1e-9) + + def test_argmax_basic2(self): + sds_input = self.sds.from_numpy(m) + sds_result = sds_input.argmax(1).compute() + np_result = np.argmax(m, axis=1).reshape(-1, 1) + assert np.allclose(sds_result - 1, np_result, 1e-9) + + def test_argmax_basic3(self): + sds_input = self.sds.from_numpy(m) + sds_result = sds_input.argmax().compute() + np_result = np.argmax(m) + assert np.allclose(sds_result - 1, np_result, 1e-9) + + def test_argmax_basic4(self): + sds_input = self.sds.from_numpy(m) + with self.assertRaises(ValueError): + sds_input.argmax(3) + + +if __name__ == "__main__": + unittest.main() diff --git a/src/main/python/tests/matrix/test_casting.py b/src/main/python/tests/matrix/test_casting.py new file mode 100644 index 00000000000..e845477b615 --- /dev/null +++ b/src/main/python/tests/matrix/test_casting.py @@ -0,0 +1,76 @@ +# ------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# ------------------------------------------------------------- + +import unittest +import numpy as np +from systemds.context import SystemDSContext +from pandas import DataFrame +from numpy import ndarray + + +class TestDIAG(unittest.TestCase): + def setUp(self): + self.sds = SystemDSContext() + + def tearDown(self): + self.sds.close() + + def test_casting_basic1(self): + sds_input = self.sds.from_numpy(np.array([[1]])) + sds_result = sds_input.to_scalar().compute() + self.assertTrue(type(sds_result) == float) + + def test_casting_basic2(self): + sds_input = self.sds.from_numpy(np.array([[1]])) + sds_result = sds_input.to_frame().compute() + self.assertTrue(type(sds_result) == DataFrame) + + def test_casting_basic3(self): + sds_result = self.sds.scalar(1.0).to_frame().compute() + self.assertTrue(type(sds_result) == DataFrame) + + def test_casting_basic4(self): + sds_result = self.sds.scalar(1.0).to_matrix().compute() + self.assertTrue(type(sds_result) == ndarray) + + def test_casting_basic5(self): + ar = ndarray((2, 2)) + df = DataFrame(ar) + sds_result = self.sds.from_pandas(df).to_matrix().compute() + self.assertTrue(type(sds_result) == ndarray and np.allclose(ar, sds_result)) + + def test_casting_basic6(self): + ar = ndarray((1, 1)) + df = DataFrame(ar) + sds_result = self.sds.from_pandas(df).to_scalar().compute() + self.assertTrue(type(sds_result) == float) + + def test_casting_basic7(self): + sds_result = self.sds.scalar(1.0).to_int().compute() + self.assertTrue(type(sds_result) == int and sds_result) + + def test_casting_basic8(self): + sds_result = self.sds.scalar(1.0).to_boolean().compute(verbose=True) + self.assertTrue(type(sds_result) == bool) + + +if __name__ == "__main__": + unittest.main() diff --git a/src/main/python/tests/matrix/test_triangular.py b/src/main/python/tests/matrix/test_triangular.py new file mode 100644 index 00000000000..f7ea2d840be --- /dev/null +++ b/src/main/python/tests/matrix/test_triangular.py @@ -0,0 +1,104 @@ +# ------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# ------------------------------------------------------------- + +import unittest +import numpy as np +from systemds.context import SystemDSContext + +m1 = np.array([[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12], [13, 14, 15, 16]]) + +m2 = np.random.random((10, 10)) + + +class TestTRIANGULAR(unittest.TestCase): + def setUp(self): + self.sds = SystemDSContext() + + def tearDown(self): + self.sds.close() + + def test_triu_basic1(self): + sds_input = self.sds.from_numpy(m1) + sds_result = sds_input.triu().compute() + np_result = np.triu(m1) + assert np.allclose(sds_result, np_result, 1e-9) + + def test_triu_basic2(self): + sds_input = self.sds.from_numpy(m1) + sds_result = sds_input.triu(include_diagonal=False).compute() + np_result = np.triu(m1, 1) + assert np.allclose(sds_result, np_result, 1e-9) + + def test_triu_basic3(self): + sds_input = self.sds.from_numpy(m1) + sds_result = sds_input.triu(return_values=False).compute() + np_result = np.triu(m1) > 0 + assert np.allclose(sds_result, np_result, 1e-9) + + def test_triu_basic4(self): + sds_input = self.sds.from_numpy(m1) + sds_result = sds_input.triu( + return_values=False, include_diagonal=False + ).compute() + np_result = np.triu(m1, 1) > 0 + assert np.allclose(sds_result, np_result, 1e-9) + + def test_triu_random(self): + sds_input = self.sds.from_numpy(m2) + sds_result = sds_input.triu().compute() + np_result = np.triu(m2) + assert np.allclose(sds_result, np_result, 1e-9) + + def test_tril_basic1(self): + sds_input = self.sds.from_numpy(m1) + sds_result = sds_input.tril().compute() + np_result = np.tril(m1) + assert np.allclose(sds_result, np_result, 1e-9) + + def test_tril_basic2(self): + sds_input = self.sds.from_numpy(m1) + sds_result = sds_input.tril(include_diagonal=False).compute() + np_result = np.tril(m1, -1) + assert np.allclose(sds_result, np_result, 1e-9) + + def test_tril_basic3(self): + sds_input = self.sds.from_numpy(m1) + sds_result = sds_input.tril(return_values=False).compute() + np_result = np.tril(m1) > 0 + assert np.allclose(sds_result, np_result, 1e-9) + + def test_tril_basic4(self): + sds_input = self.sds.from_numpy(m1) + sds_result = sds_input.tril( + return_values=False, include_diagonal=False + ).compute() + np_result = np.tril(m1, -1) > 0 + assert np.allclose(sds_result, np_result, 1e-9) + + def test_tril_random(self): + sds_input = self.sds.from_numpy(m2) + sds_result = sds_input.tril().compute() + np_result = np.tril(m2) + assert np.allclose(sds_result, np_result, 1e-9) + + +if __name__ == "__main__": + unittest.main()