Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SYSTEMDS-3548] Optimize IO path Python interface for SystemDS #2154

Closed
wants to merge 8 commits into from
14 changes: 7 additions & 7 deletions scripts/perftest/python/io/load_native.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,30 +21,30 @@

import argparse
import timeit
from systemds.context import SystemDSContext


setup = "\n".join(
[
"from systemds.context import SystemDSContext",
"from systemds.script_building.script import DMLScript",
]
)


run = "\n".join(
[
"with SystemDSContext(logging_level=10, py4j_logging_level=50) as ctx:",
" node = ctx.read(src)",
" script = DMLScript(ctx)",
" script.build_code(node)",
" script.execute()",
"node = ctx.read(src)",
"script = DMLScript(ctx)",
"script.build_code(node)",
"script.execute()",
]
)


def main(args):
gvars = {"src": args.src}
gvars = {"src": args.src, "ctx": SystemDSContext(logging_level=10, py4j_logging_level=50)}
print(timeit.timeit(run, setup, globals=gvars, number=args.number))
gvars["ctx"].close()


if __name__ == "__main__":
Expand Down
18 changes: 11 additions & 7 deletions scripts/perftest/python/io/load_numpy.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@

import argparse
import timeit
from systemds.context import SystemDSContext

setup = "\n".join(
[
"from systemds.context import SystemDSContext",
"from systemds.script_building.script import DMLScript",
"import numpy as np",
"array = np.loadtxt(src, delimiter=',')",
Expand All @@ -37,11 +37,10 @@

run = "\n".join(
[
"with SystemDSContext(logging_level=10, py4j_logging_level=50) as ctx:",
" matrix_from_np = ctx.from_numpy(array)",
" script = DMLScript(ctx)",
" script.add_input_from_python('test', matrix_from_np)",
" script.execute()",
"matrix_from_np = ctx.from_numpy(array)",
"script = DMLScript(ctx)",
"script.add_input_from_python('test', matrix_from_np)",
"script.execute()",
]
)

Expand All @@ -66,8 +65,9 @@


def main(args):
gvars = {"src": args.src, "dtype": args.dtype}
gvars = {"src": args.src, "dtype": args.dtype, "ctx": SystemDSContext(logging_level=10, py4j_logging_level=50)}
print(timeit.timeit(run, setup, globals=gvars, number=args.number))
gvars["ctx"].close()


if __name__ == "__main__":
Expand All @@ -86,4 +86,8 @@ def main(args):
help=help_force_dtype,
)
args = parser.parse_args()

if args.dtype == "string": # numpy has no "string" dtype, convert to "str"
args.dtype = "str"

main(args)
14 changes: 7 additions & 7 deletions scripts/perftest/python/io/load_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@

import argparse
import timeit
from systemds.context import SystemDSContext

setup = "\n".join(
[
"from systemds.context import SystemDSContext",
"from systemds.script_building.script import DMLScript",
"import pandas as pd",
"df = pd.read_csv(src, header=None)",
Expand All @@ -36,11 +36,10 @@

run = "\n".join(
[
"with SystemDSContext(logging_level=10, py4j_logging_level=50) as ctx:",
" frame_from_pandas = ctx.from_pandas(df)",
" script = DMLScript(ctx)",
" script.add_input_from_python('test', frame_from_pandas)",
" script.execute()",
"frame_from_pandas = ctx.from_pandas(df)",
"script = DMLScript(ctx)",
"script.add_input_from_python('test', frame_from_pandas)",
"script.execute()",
]
)

Expand All @@ -64,8 +63,9 @@


def main(args):
gvars = {"src": args.src, "dtype": args.dtype}
gvars = {"src": args.src, "dtype": args.dtype, "ctx": SystemDSContext(logging_level=10, py4j_logging_level=50)}
print(timeit.timeit(run, setup, globals=gvars, number=args.number))
gvars["ctx"].close()


if __name__ == "__main__":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.frame.data.columns.Array;
import org.apache.sysds.runtime.frame.data.columns.ArrayFactory;
import org.apache.sysds.runtime.frame.data.columns.BitSetArray;
import org.apache.sysds.runtime.frame.data.columns.BooleanArray;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;

Expand Down Expand Up @@ -157,7 +158,13 @@ public static Array<?> convert(byte[] data, int numElements, Types.ValueType val
break;
case BOOLEAN:
for(int i = 0; i < numElements; i++) {
((BooleanArray) array).set(i, buffer.get() != 0);
if (array instanceof BooleanArray) {
((BooleanArray) array).set(i, buffer.get() != 0);
} else if (array instanceof BitSetArray) {
((BitSetArray) array).set(i, buffer.get() != 0);
} else {
throw new DMLRuntimeException("Array factory returned invalid array type for boolean values.");
}
}
break;
case STRING:
Expand Down
70 changes: 43 additions & 27 deletions src/main/python/systemds/utils/converters.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import numpy as np
import pandas as pd
import concurrent.futures
from py4j.java_gateway import JavaClass, JavaGateway, JavaObject, JVMView


Expand Down Expand Up @@ -81,6 +82,33 @@ def matrix_block_to_numpy(jvm: JVMView, mb: JavaObject):
)


def convert_column(jvm, rows, j, col_type, pd_col, fb, col_name):
"""Converts a given pandas column to a FrameBlock representation.

:param jvm: The JVMView of the current SystemDS context.
:param rows: The number of rows in the pandas DataFrame.
:param j: The current column index.
:param col_type: The ValueType of the column.
:param pd_col: The pandas column to convert.
"""
if col_type == jvm.org.apache.sysds.common.Types.ValueType.STRING:
byte_data = bytearray()
for value in pd_col.astype(str):
encoded_value = value.encode("utf-8")
byte_data.extend(struct.pack(">I", len(encoded_value)))
byte_data.extend(encoded_value)
else:
col_data = pd_col.fillna("").to_numpy()
byte_data = bytearray(col_data.tobytes())

converted_array = jvm.org.apache.sysds.runtime.util.Py4jConverterUtils.convert(
byte_data, rows, col_type
)

fb.setColumnName(j, str(col_name))
fb.setColumn(j, converted_array)


def pandas_to_frame_block(sds, pd_df: pd.DataFrame):
"""Converts a given pandas DataFrame to an internal FrameBlock representation.

Expand Down Expand Up @@ -120,49 +148,37 @@ def pandas_to_frame_block(sds, pd_df: pd.DataFrame):
jc_String = jvm.java.lang.String
jc_FrameBlock = jvm.org.apache.sysds.runtime.frame.data.FrameBlock
j_valueTypeArray = java_gate.new_array(jc_ValueType, len(schema))
j_colNameArray = java_gate.new_array(jc_String, len(col_names))

# execution speed increases with optimized code when the number of rows exceeds 4
if rows > 4:
for i in range(len(schema)):
j_valueTypeArray[i] = schema[i]
for i in range(len(col_names)):
j_colNameArray[i] = str(col_names[i])

fb = jc_FrameBlock(j_valueTypeArray, j_colNameArray, rows)
fb = jc_FrameBlock(j_valueTypeArray, rows)

# convert and set data for each column
for j, col_name in enumerate(col_names):
col_type = schema[j]
if col_type == jvm.org.apache.sysds.common.Types.ValueType.STRING:
byte_data = bytearray()
for value in pd_df[col_name].astype(str):
encoded_value = value.encode("utf-8")
byte_data.extend(struct.pack(">I", len(encoded_value)))
byte_data.extend(encoded_value)
else:
col_data = pd_df[col_name].fillna("").to_numpy()
byte_data = bytearray(col_data.tobytes())

converted_array = (
jvm.org.apache.sysds.runtime.util.Py4jConverterUtils.convert(
byte_data, rows, col_type
)
with concurrent.futures.ThreadPoolExecutor() as executor:
executor.map(
lambda j, col_name: convert_column(
jvm, rows, j, schema[j], pd_df[col_name], fb, col_name
),
range(len(col_names)),
col_names,
)
fb.setColumn(j, converted_array)

return fb
else:
j_dataArray = java_gate.new_array(jc_String, rows, cols)
for i in range(len(schema)):
j_valueTypeArray[i] = schema[i]
for i in range(len(col_names)):
j_colNameArray[i] = str(col_names[i])
j = 0
j_colNameArray = java_gate.new_array(jc_String, len(col_names))

for j, col_name in enumerate(col_names):
j_valueTypeArray[j] = schema[j]
j_colNameArray[j] = str(col_names[j])
col_data = pd_df[col_name].fillna("").to_numpy(dtype=str)

for i in range(col_data.shape[0]):
if col_data[i]:
j_dataArray[i][j] = col_data[i]

fb = jc_FrameBlock(j_valueTypeArray, j_colNameArray, j_dataArray)
return fb

Expand Down
Loading