Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SYSTEMDS-3758] Python API Builtin triu, tril, argmin, argmax and casting Scalar <-> Matrix <-> Frame #2113

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 91 additions & 41 deletions src/main/python/systemds/operator/nodes/frame.py
Baunsgaard marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@
__all__ = ["Frame"]

import os
from typing import (TYPE_CHECKING, Dict, Iterable, Optional, Sequence, Tuple,
Union)
from typing import TYPE_CHECKING, Dict, Iterable, Optional, Sequence, Tuple, Union

import numpy as np
import pandas as pd
Expand All @@ -34,9 +33,12 @@
from systemds.operator.nodes.matrix import Matrix
from systemds.script_building.dag import DAGNode, OutputType
from systemds.utils.consts import VALID_INPUT_TYPES
from systemds.utils.converters import (frame_block_to_pandas,
pandas_to_frame_block)
from systemds.utils.helpers import check_is_empty_slice, check_no_less_than_zero, get_slice_string
from systemds.utils.converters import frame_block_to_pandas, pandas_to_frame_block
from systemds.utils.helpers import (
check_is_empty_slice,
check_no_less_than_zero,
get_slice_string,
)

if TYPE_CHECKING:
# to avoid cyclic dependencies during runtime
Expand All @@ -47,30 +49,49 @@ class Frame(OperationNode):

_pd_dataframe: pd.DataFrame

def __init__(self, sds_context: "SystemDSContext", operation: str,
unnamed_input_nodes: Union[str,
Iterable[VALID_INPUT_TYPES]] = None,
named_input_nodes: Dict[str, VALID_INPUT_TYPES] = None,
local_data: pd.DataFrame = None, brackets: bool = False) -> "Frame":
def __init__(
self,
sds_context: "SystemDSContext",
operation: str,
unnamed_input_nodes: Union[str, Iterable[VALID_INPUT_TYPES]] = None,
named_input_nodes: Dict[str, VALID_INPUT_TYPES] = None,
local_data: pd.DataFrame = None,
brackets: bool = False,
) -> "Frame":
is_python_local_data = False
if local_data is not None:
self._pd_dataframe = local_data
is_python_local_data = True
else:
self._pd_dataframe = None

super().__init__(sds_context, operation, unnamed_input_nodes,
named_input_nodes, OutputType.FRAME, is_python_local_data, brackets)
super().__init__(
sds_context,
operation,
unnamed_input_nodes,
named_input_nodes,
OutputType.FRAME,
is_python_local_data,
brackets,
)

def pass_python_data_to_prepared_script(self, sds, var_name: str, prepared_script: JavaObject) -> None:
def pass_python_data_to_prepared_script(
self, sds, var_name: str, prepared_script: JavaObject
) -> None:
assert (
self.is_python_local_data), "Can only pass data to prepared script if it is python local!"
self.is_python_local_data
), "Can only pass data to prepared script if it is python local!"
if self._is_pandas():
prepared_script.setFrame(
var_name, pandas_to_frame_block(sds, self._pd_dataframe), True
) # True for reuse

def code_line(self, var_name: str, unnamed_input_vars: Sequence[str], named_input_vars: Dict[str, str]) -> str:
def code_line(
self,
var_name: str,
unnamed_input_vars: Sequence[str],
named_input_vars: Dict[str, str],
) -> str:
code_line = super().code_line(var_name, unnamed_input_vars, named_input_vars)
if self._is_pandas():
code_line = code_line.format(file_name=var_name)
Expand All @@ -84,7 +105,10 @@ def compute(self, verbose: bool = False, lineage: bool = False) -> pd.DataFrame:
return super().compute(verbose, lineage)

def _parse_output_result_variables(self, result_variables):
return frame_block_to_pandas(self.sds_context, result_variables.getFrameBlock(self._script.out_var_name[0]))
return frame_block_to_pandas(
self.sds_context,
result_variables.getFrameBlock(self._script.out_var_name[0]),
)

def _is_pandas(self) -> bool:
return self._pd_dataframe is not None
Expand Down Expand Up @@ -112,68 +136,94 @@ def transform_apply(self, spec: "Scalar", meta: "Frame"):
params_dict = {"target": self, "spec": spec, "meta": meta}
return Matrix(self.sds_context, "transformapply", named_input_nodes=params_dict)

def rbind(self, other) -> 'Frame':
def rbind(self, other) -> "Frame":
"""
Row-wise frame concatenation, by concatenating the second frame as additional rows to the first frame.
Row-wise frame concatenation, by concatenating the second frame as additional rows to the first frame.
:param: The other frame to bind to the right hand side
:return: The OperationNode containing the concatenated frames.
"""

return Frame(self.sds_context, "rbind", [self, other])

def cbind(self, other) -> 'Frame':
def cbind(self, other) -> "Frame":
"""
Column-wise frame concatenation, by concatenating the second frame as additional columns to the first frame.
Column-wise frame concatenation, by concatenating the second frame as additional columns to the first frame.
:param: The other frame to bind to the right hand side.
:return: The Frame containing the concatenated frames.
"""
return Frame(self.sds_context, "cbind", [self, other])

def replace(self, pattern: str, replacement: str) -> 'Frame':
def replace(self, pattern: str, replacement: str) -> "Frame":
"""
Replace all instances of string with replacement string
:param: pattern the string to replace
:param: replacement the string to replace with
:return: The Frame containing the replaced values
:return: The Frame containing the replaced values
"""
return Frame(self.sds_context, "replace", named_input_nodes={"target": self, "pattern": f"'{pattern}'", "replacement": f"'{replacement}'"})
return Frame(
self.sds_context,
"replace",
named_input_nodes={
"target": self,
"pattern": f"'{pattern}'",
"replacement": f"'{replacement}'",
},
)

def to_string(self, **kwargs: Dict[str, VALID_INPUT_TYPES]) -> 'Scalar':
""" Converts the input to a string representation.
def to_string(self, **kwargs: Dict[str, VALID_INPUT_TYPES]) -> "Scalar":
"""Converts the input to a string representation.
:return: `Scalar` containing the string.
"""
return Scalar(self.sds_context, 'toString', [self], kwargs, output_type=OutputType.STRING)
return Scalar(
self.sds_context, "toString", [self], kwargs, output_type=OutputType.STRING
)

def to_matrix(self):
return Matrix(self.sds_context, "as.matrix", [self])

def to_scalar(self):
return Scalar(self.sds_context, "as.scalar", [self])

def __str__(self):
return "FrameNode"

def nRow(self) -> 'Scalar':
return Scalar(self.sds_context, 'nrow', [self])
def nRow(self) -> "Scalar":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i am fine with changing all of the return types to use " instead of ', but does it really make a difference?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the formatting comes from black, as you recommended

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, maybe we should add a verification in Github actions that analyze if the code base is all formated according to black, i have seen this in other repositories such as Modyn

The check would be simple to add in a new action using:
https://black.readthedocs.io/en/stable/usage_and_configuration/the_basics.html#check

return Scalar(self.sds_context, "nrow", [self])

def nCol(self) -> 'Scalar':
return Scalar(self.sds_context, 'ncol', [self])
def nCol(self) -> "Scalar":
return Scalar(self.sds_context, "ncol", [self])

def __getitem__(self, i) -> 'Frame':
def __getitem__(self, i) -> "Frame":
if isinstance(i, tuple) and len(i) > 2:
raise ValueError("Maximum of two dimensions are allowed")
elif isinstance(i, list):
check_no_less_than_zero(i)
slice = self.sds_context.from_numpy(np.array(i)) + 1
select = Matrix(self.sds_context, "table",
[slice, 1, self.nRow(), 1])
ret = Frame(self.sds_context, "removeEmpty", [], {
'target': self, 'margin': '"rows"', 'select': select})
select = Matrix(self.sds_context, "table", [slice, 1, self.nRow(), 1])
ret = Frame(
self.sds_context,
"removeEmpty",
[],
{"target": self, "margin": '"rows"', "select": select},
)
return ret
elif isinstance(i, tuple) and isinstance(i[0], list) and isinstance(i[1], list):
raise NotImplementedError("double slicing is not supported yet")
elif isinstance(i, tuple) and check_is_empty_slice(i[0]) and isinstance(i[1], list):
elif (
isinstance(i, tuple)
and check_is_empty_slice(i[0])
and isinstance(i[1], list)
):
check_no_less_than_zero(i[1])
slice = self.sds_context.from_numpy(np.array(i[1])) + 1
select = Matrix(self.sds_context, "table",
[slice, 1, self.nCol(), 1])
ret = Frame(self.sds_context, "removeEmpty", [], {
'target': self, 'margin': '"cols"', 'select': select})
select = Matrix(self.sds_context, "table", [slice, 1, self.nCol(), 1])
ret = Frame(
self.sds_context,
"removeEmpty",
[],
{"target": self, "margin": '"cols"', "select": select},
)
return ret
else:
sliceIns = get_slice_string(i)
return Frame(self.sds_context, '', [self, sliceIns], brackets=True)
return Frame(self.sds_context, "", [self, sliceIns], brackets=True)
92 changes: 92 additions & 0 deletions src/main/python/systemds/operator/nodes/matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,14 @@ def to_string(self, **kwargs: Dict[str, VALID_INPUT_TYPES]) -> "Scalar":
self.sds_context, "toString", [self], kwargs, output_type=OutputType.STRING
)

def to_scalar(self):
return Scalar(self.sds_context, "as.scalar", [self])

def to_frame(self):
from systemds.operator import Frame
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cyclic dependency, i understand it is how i thought about it as well, however, maybe we need to add it afterwards dynamically instead.

See:

https://stackoverflow.com/questions/972/adding-a-method-to-an-existing-object-instance-in-python

This would allow us to add the to_scalar, to_frame and to_matrix on the operation_node directly inside frame, matrix and scalar themselves to overwrite the operation node.

This would also allow us to have documentation on the method, since you add an empty method with docs that would throw an exception on the operation_node, and then on the import of Matrix etc it overwrite the Class method such that new instances use the overwritten method in Matrix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, currently the cyclic dependency error is avoided since the import happens inside the function, which won't be executed while importing afaik.


return Frame(self.sds_context, "as.frame", [self])

def isNA(self) -> "Matrix":
"""Computes a boolean indicator matrix of the same shape as the input, indicating where NA (not available)
values are located. Currently, NA is only capturing NaN values.
Expand Down Expand Up @@ -810,5 +818,89 @@ def quantile(self, p, weights: "Matrix" = None) -> "OperationNode":
else:
raise ValueError("P has to be a Scalar or Matrix")

def triu(self, include_diagonal=True, return_values=True) -> "Matrix":
"""Selects the upper triangular part of a matrix, configurable to include the diagonal and return values or ones

:param include_diagonal: boolean, default True
:param return_values: boolean, default True, if set to False returns ones
:return: `Matrix`
"""
named_input_nodes = {
"target": self,
"diag": self.sds_context.scalar(include_diagonal),
"values": self.sds_context.scalar(return_values),
}
return Matrix(
self.sds_context, "upper.tri", named_input_nodes=named_input_nodes
)

def tril(self, include_diagonal=True, return_values=True) -> "Matrix":
"""Selects the lower triangular part of a matrix, configurable to include the diagonal and return values or ones

:param include_diagonal: boolean, default True
:param return_values: boolean, default True, if set to False returns ones
:return: `Matrix`
"""
named_input_nodes = {
"target": self,
"diag": self.sds_context.scalar(include_diagonal),
"values": self.sds_context.scalar(return_values),
}
return Matrix(
self.sds_context, "lower.tri", named_input_nodes=named_input_nodes
)

def argmin(self, axis: int = None) -> "OperationNode":
"""Return the index of the minimum if axis is None or a column vector for row-wise / column-wise minima
computation.

:param axis: can be 0 or 1 to do either row or column sums
:return: `Matrix` representing operation for row / columns or 'Scalar' representing operation for complete
"""
if axis == 0:
return Matrix(self.sds_context, "rowIndexMin", [self.t()])
elif axis == 1:
return Matrix(self.sds_context, "rowIndexMin", [self])
elif axis is None:
return Matrix(
self.sds_context,
"rowIndexMin",
[self.reshape(1, self.nCol() * self.nRow())],
).to_scalar()
else:
raise ValueError(
f"Axis has to be either 0, 1 or None, for column, row or complete {self.operation}"
)

def argmax(self, axis: int = None) -> "OperationNode":
"""Return the index of the maximum if axis is None or a column vector for row-wise / column-wise maxima
computation.

:param axis: can be 0 or 1 to do either row or column sums
:return: `Matrix` representing operation for row / columns or 'Scalar' representing operation for complete
"""
if axis == 0:
return Matrix(self.sds_context, "rowIndexMax", [self.t()])
elif axis == 1:
return Matrix(self.sds_context, "rowIndexMax", [self])
elif axis is None:
return Matrix(
self.sds_context,
"rowIndexMax",
[self.reshape(1, self.nCol() * self.nRow())],
).to_scalar()
else:
raise ValueError(
f"Axis has to be either 0, 1 or None, for column, row or complete {self.operation}"
)

def reshape(self, rows, cols=1):
"""Gives a new shape to a matrix without changing its data.

:param rows: number of rows
:param cols: number of columns, defaults to 1
:return: `Matrix` representing operation"""
return Matrix(self.sds_context, "matrix", [self, rows, cols])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the rest of the functions looks good.


def __str__(self):
return "MatrixNode"
Loading
Loading