From d9b439ab9e5b48fe240bb9f2cd6925351a290392 Mon Sep 17 00:00:00 2001 From: Nakroma Date: Tue, 3 Dec 2024 13:03:10 +0100 Subject: [PATCH 1/7] [SYSTEMDS-3548] Fix performance test for load_numpy string case This commit fixes the load_numpy string performance test case. It keeps the CLI usage consistent with the other test cases, but converts the dtype to the correct one internally. --- scripts/perftest/python/io/load_numpy.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/scripts/perftest/python/io/load_numpy.py b/scripts/perftest/python/io/load_numpy.py index 8bd489b064a..171aa601913 100644 --- a/scripts/perftest/python/io/load_numpy.py +++ b/scripts/perftest/python/io/load_numpy.py @@ -86,4 +86,8 @@ def main(args): help=help_force_dtype, ) args = parser.parse_args() + + if args.dtype == "string": # numpy has no "string" dtype, convert to "str" + args.dtype = "str" + main(args) From ae15fe8a741fa14f20d68317d3341d24aa5d14cd Mon Sep 17 00:00:00 2001 From: Nakroma Date: Tue, 3 Dec 2024 13:33:29 +0100 Subject: [PATCH 2/7] [SYSTEMDS-3548] Fix Py4j boolean array convert This commit fixes the array boolean convert breaking for row numbers above 64. It also adds a bit more error handling to prevent cases like this in the future. --- .../apache/sysds/runtime/util/Py4jConverterUtils.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java b/src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java index be6a749fb75..75c03b31a1c 100644 --- a/src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java +++ b/src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java @@ -27,6 +27,7 @@ import org.apache.sysds.runtime.DMLRuntimeException; import org.apache.sysds.runtime.frame.data.columns.Array; import org.apache.sysds.runtime.frame.data.columns.ArrayFactory; +import org.apache.sysds.runtime.frame.data.columns.BitSetArray; import org.apache.sysds.runtime.frame.data.columns.BooleanArray; import org.apache.sysds.runtime.matrix.data.MatrixBlock; @@ -157,7 +158,13 @@ public static Array convert(byte[] data, int numElements, Types.ValueType val break; case BOOLEAN: for(int i = 0; i < numElements; i++) { - ((BooleanArray) array).set(i, buffer.get() != 0); + if (array instanceof BooleanArray) { + ((BooleanArray) array).set(i, buffer.get() != 0); + } else if (array instanceof BitSetArray) { + ((BitSetArray) array).set(i, buffer.get() != 0); + } else { + throw new DMLRuntimeException("Array factory returned invalid array type for boolean values."); + } } break; case STRING: From 0b54a158dcf3c80ee46ade72c5c9497a594d57e3 Mon Sep 17 00:00:00 2001 From: Nakroma Date: Wed, 4 Dec 2024 13:56:15 +0100 Subject: [PATCH 3/7] [SYSTEMDS-3548] Parallelize pandas_to_frame_block This commit parallelizes the column processing in the pandas DataFrame to FrameBlock conversion. --- src/main/python/systemds/utils/converters.py | 56 +++++++++++++------- 1 file changed, 37 insertions(+), 19 deletions(-) diff --git a/src/main/python/systemds/utils/converters.py b/src/main/python/systemds/utils/converters.py index 8551b8ce6a7..70a5aa116b1 100644 --- a/src/main/python/systemds/utils/converters.py +++ b/src/main/python/systemds/utils/converters.py @@ -23,6 +23,7 @@ import numpy as np import pandas as pd +import concurrent.futures from py4j.java_gateway import JavaClass, JavaGateway, JavaObject, JVMView @@ -81,6 +82,33 @@ def matrix_block_to_numpy(jvm: JVMView, mb: JavaObject): ) +def convert_column(jvm, rows, j, col_type, pd_col): + """Converts a given pandas column to a FrameBlock representation. + + :param jvm: The JVMView of the current SystemDS context. + :param rows: The number of rows in the pandas DataFrame. + :param j: The current column index. + :param col_type: The ValueType of the column. + :param pd_col: The pandas column to convert. + """ + if col_type == jvm.org.apache.sysds.common.Types.ValueType.STRING: + byte_data = bytearray() + for value in pd_col.astype(str): + encoded_value = value.encode("utf-8") + byte_data.extend(struct.pack(">I", len(encoded_value))) + byte_data.extend(encoded_value) + else: + col_data = pd_col.fillna("").to_numpy() + byte_data = bytearray(col_data.tobytes()) + + converted_array = ( + jvm.org.apache.sysds.runtime.util.Py4jConverterUtils.convert( + byte_data, rows, col_type + ) + ) + return j, converted_array + + def pandas_to_frame_block(sds, pd_df: pd.DataFrame): """Converts a given pandas DataFrame to an internal FrameBlock representation. @@ -131,25 +159,15 @@ def pandas_to_frame_block(sds, pd_df: pd.DataFrame): fb = jc_FrameBlock(j_valueTypeArray, j_colNameArray, rows) - # convert and set data for each column - for j, col_name in enumerate(col_names): - col_type = schema[j] - if col_type == jvm.org.apache.sysds.common.Types.ValueType.STRING: - byte_data = bytearray() - for value in pd_df[col_name].astype(str): - encoded_value = value.encode("utf-8") - byte_data.extend(struct.pack(">I", len(encoded_value))) - byte_data.extend(encoded_value) - else: - col_data = pd_df[col_name].fillna("").to_numpy() - byte_data = bytearray(col_data.tobytes()) - - converted_array = ( - jvm.org.apache.sysds.runtime.util.Py4jConverterUtils.convert( - byte_data, rows, col_type - ) - ) - fb.setColumn(j, converted_array) + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = [ + executor.submit(convert_column, jvm, rows, j, schema[j], pd_df[col_name]) + for j, col_name in enumerate(col_names) + ] + for future in concurrent.futures.as_completed(futures): + j, converted_array = future.result() + fb.setColumn(j, converted_array) + return fb else: j_dataArray = java_gate.new_array(jc_String, rows, cols) From a41cfe6b007b38e0496c1bafd5dfd93ba8e18973 Mon Sep 17 00:00:00 2001 From: Nakroma Date: Wed, 18 Dec 2024 15:04:43 +0100 Subject: [PATCH 4/7] [SYSTEMDS-3548] Update converters.py to be black compliant --- src/main/python/systemds/utils/converters.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/main/python/systemds/utils/converters.py b/src/main/python/systemds/utils/converters.py index 70a5aa116b1..8fc22962309 100644 --- a/src/main/python/systemds/utils/converters.py +++ b/src/main/python/systemds/utils/converters.py @@ -101,10 +101,8 @@ def convert_column(jvm, rows, j, col_type, pd_col): col_data = pd_col.fillna("").to_numpy() byte_data = bytearray(col_data.tobytes()) - converted_array = ( - jvm.org.apache.sysds.runtime.util.Py4jConverterUtils.convert( - byte_data, rows, col_type - ) + converted_array = jvm.org.apache.sysds.runtime.util.Py4jConverterUtils.convert( + byte_data, rows, col_type ) return j, converted_array @@ -161,7 +159,9 @@ def pandas_to_frame_block(sds, pd_df: pd.DataFrame): with concurrent.futures.ThreadPoolExecutor() as executor: futures = [ - executor.submit(convert_column, jvm, rows, j, schema[j], pd_df[col_name]) + executor.submit( + convert_column, jvm, rows, j, schema[j], pd_df[col_name] + ) for j, col_name in enumerate(col_names) ] for future in concurrent.futures.as_completed(futures): From b3e512656196328770f89b2bd84e211db4b9896c Mon Sep 17 00:00:00 2001 From: Nakroma Date: Thu, 19 Dec 2024 13:21:02 +0100 Subject: [PATCH 5/7] [SYSTEMDS-3548] Update I/O tests to not include systemds context start during run statement --- scripts/perftest/python/io/load_native.py | 15 ++++++++------- scripts/perftest/python/io/load_numpy.py | 14 +++++++------- scripts/perftest/python/io/load_pandas.py | 15 ++++++++------- 3 files changed, 23 insertions(+), 21 deletions(-) diff --git a/scripts/perftest/python/io/load_native.py b/scripts/perftest/python/io/load_native.py index aa1d89156bc..310303453c4 100644 --- a/scripts/perftest/python/io/load_native.py +++ b/scripts/perftest/python/io/load_native.py @@ -21,11 +21,11 @@ import argparse import timeit +from systemds.context import SystemDSContext setup = "\n".join( [ - "from systemds.context import SystemDSContext", "from systemds.script_building.script import DMLScript", ] ) @@ -33,18 +33,19 @@ run = "\n".join( [ - "with SystemDSContext(logging_level=10, py4j_logging_level=50) as ctx:", - " node = ctx.read(src)", - " script = DMLScript(ctx)", - " script.build_code(node)", - " script.execute()", + "node = ctx.read(src)", + "script = DMLScript(ctx)", + "script.build_code(node)", + "script.execute()", + "ctx.close()", ] ) def main(args): - gvars = {"src": args.src} + gvars = {"src": args.src, "ctx": SystemDSContext(logging_level=10, py4j_logging_level=50)} print(timeit.timeit(run, setup, globals=gvars, number=args.number)) + gvars["ctx"].close() if __name__ == "__main__": diff --git a/scripts/perftest/python/io/load_numpy.py b/scripts/perftest/python/io/load_numpy.py index 171aa601913..11622b37226 100644 --- a/scripts/perftest/python/io/load_numpy.py +++ b/scripts/perftest/python/io/load_numpy.py @@ -22,10 +22,10 @@ import argparse import timeit +from systemds.context import SystemDSContext setup = "\n".join( [ - "from systemds.context import SystemDSContext", "from systemds.script_building.script import DMLScript", "import numpy as np", "array = np.loadtxt(src, delimiter=',')", @@ -37,11 +37,10 @@ run = "\n".join( [ - "with SystemDSContext(logging_level=10, py4j_logging_level=50) as ctx:", - " matrix_from_np = ctx.from_numpy(array)", - " script = DMLScript(ctx)", - " script.add_input_from_python('test', matrix_from_np)", - " script.execute()", + "matrix_from_np = ctx.from_numpy(array)", + "script = DMLScript(ctx)", + "script.add_input_from_python('test', matrix_from_np)", + "script.execute()", ] ) @@ -66,8 +65,9 @@ def main(args): - gvars = {"src": args.src, "dtype": args.dtype} + gvars = {"src": args.src, "dtype": args.dtype, "ctx": SystemDSContext(logging_level=10, py4j_logging_level=50)} print(timeit.timeit(run, setup, globals=gvars, number=args.number)) + gvars["ctx"].close() if __name__ == "__main__": diff --git a/scripts/perftest/python/io/load_pandas.py b/scripts/perftest/python/io/load_pandas.py index 30714ca9079..8c8a975ab47 100644 --- a/scripts/perftest/python/io/load_pandas.py +++ b/scripts/perftest/python/io/load_pandas.py @@ -21,10 +21,10 @@ import argparse import timeit +from systemds.context import SystemDSContext setup = "\n".join( [ - "from systemds.context import SystemDSContext", "from systemds.script_building.script import DMLScript", "import pandas as pd", "df = pd.read_csv(src, header=None)", @@ -36,11 +36,11 @@ run = "\n".join( [ - "with SystemDSContext(logging_level=10, py4j_logging_level=50) as ctx:", - " frame_from_pandas = ctx.from_pandas(df)", - " script = DMLScript(ctx)", - " script.add_input_from_python('test', frame_from_pandas)", - " script.execute()", + "frame_from_pandas = ctx.from_pandas(df)", + "script = DMLScript(ctx)", + "script.add_input_from_python('test', frame_from_pandas)", + "script.execute()", + "ctx.close()", ] ) @@ -64,8 +64,9 @@ def main(args): - gvars = {"src": args.src, "dtype": args.dtype} + gvars = {"src": args.src, "dtype": args.dtype, "ctx": SystemDSContext(logging_level=10, py4j_logging_level=50)} print(timeit.timeit(run, setup, globals=gvars, number=args.number)) + gvars["ctx"].close() if __name__ == "__main__": From 89202c1bc2efcc41160816988c8dba27c208fcd8 Mon Sep 17 00:00:00 2001 From: Nakroma Date: Thu, 19 Dec 2024 13:24:10 +0100 Subject: [PATCH 6/7] [SYSTEMDS-3548] Remove redundant close statements --- scripts/perftest/python/io/load_native.py | 1 - scripts/perftest/python/io/load_pandas.py | 1 - 2 files changed, 2 deletions(-) diff --git a/scripts/perftest/python/io/load_native.py b/scripts/perftest/python/io/load_native.py index 310303453c4..b6bf77c3c35 100644 --- a/scripts/perftest/python/io/load_native.py +++ b/scripts/perftest/python/io/load_native.py @@ -37,7 +37,6 @@ "script = DMLScript(ctx)", "script.build_code(node)", "script.execute()", - "ctx.close()", ] ) diff --git a/scripts/perftest/python/io/load_pandas.py b/scripts/perftest/python/io/load_pandas.py index 8c8a975ab47..60cb46f0cce 100644 --- a/scripts/perftest/python/io/load_pandas.py +++ b/scripts/perftest/python/io/load_pandas.py @@ -40,7 +40,6 @@ "script = DMLScript(ctx)", "script.add_input_from_python('test', frame_from_pandas)", "script.execute()", - "ctx.close()", ] ) From 6b1f68cf350c80c59a8d7cf41753af9858d67214 Mon Sep 17 00:00:00 2001 From: Nakroma Date: Sun, 29 Dec 2024 20:13:24 +0100 Subject: [PATCH 7/7] [SYSTEMDS-3548] Parallelize FrameBlock column allocation This commit moves the assignment of column data to the FrameBlock to the parallel column processing. --- src/main/python/systemds/utils/converters.py | 38 ++++++++++---------- 1 file changed, 18 insertions(+), 20 deletions(-) diff --git a/src/main/python/systemds/utils/converters.py b/src/main/python/systemds/utils/converters.py index 8fc22962309..38fdab8ca70 100644 --- a/src/main/python/systemds/utils/converters.py +++ b/src/main/python/systemds/utils/converters.py @@ -82,7 +82,7 @@ def matrix_block_to_numpy(jvm: JVMView, mb: JavaObject): ) -def convert_column(jvm, rows, j, col_type, pd_col): +def convert_column(jvm, rows, j, col_type, pd_col, fb, col_name): """Converts a given pandas column to a FrameBlock representation. :param jvm: The JVMView of the current SystemDS context. @@ -104,7 +104,9 @@ def convert_column(jvm, rows, j, col_type, pd_col): converted_array = jvm.org.apache.sysds.runtime.util.Py4jConverterUtils.convert( byte_data, rows, col_type ) - return j, converted_array + + fb.setColumnName(j, str(col_name)) + fb.setColumn(j, converted_array) def pandas_to_frame_block(sds, pd_df: pd.DataFrame): @@ -146,41 +148,37 @@ def pandas_to_frame_block(sds, pd_df: pd.DataFrame): jc_String = jvm.java.lang.String jc_FrameBlock = jvm.org.apache.sysds.runtime.frame.data.FrameBlock j_valueTypeArray = java_gate.new_array(jc_ValueType, len(schema)) - j_colNameArray = java_gate.new_array(jc_String, len(col_names)) # execution speed increases with optimized code when the number of rows exceeds 4 if rows > 4: for i in range(len(schema)): j_valueTypeArray[i] = schema[i] - for i in range(len(col_names)): - j_colNameArray[i] = str(col_names[i]) - fb = jc_FrameBlock(j_valueTypeArray, j_colNameArray, rows) + fb = jc_FrameBlock(j_valueTypeArray, rows) with concurrent.futures.ThreadPoolExecutor() as executor: - futures = [ - executor.submit( - convert_column, jvm, rows, j, schema[j], pd_df[col_name] - ) - for j, col_name in enumerate(col_names) - ] - for future in concurrent.futures.as_completed(futures): - j, converted_array = future.result() - fb.setColumn(j, converted_array) + executor.map( + lambda j, col_name: convert_column( + jvm, rows, j, schema[j], pd_df[col_name], fb, col_name + ), + range(len(col_names)), + col_names, + ) return fb else: j_dataArray = java_gate.new_array(jc_String, rows, cols) - for i in range(len(schema)): - j_valueTypeArray[i] = schema[i] - for i in range(len(col_names)): - j_colNameArray[i] = str(col_names[i]) - j = 0 + j_colNameArray = java_gate.new_array(jc_String, len(col_names)) + for j, col_name in enumerate(col_names): + j_valueTypeArray[j] = schema[j] + j_colNameArray[j] = str(col_names[j]) col_data = pd_df[col_name].fillna("").to_numpy(dtype=str) + for i in range(col_data.shape[0]): if col_data[i]: j_dataArray[i][j] = col_data[i] + fb = jc_FrameBlock(j_valueTypeArray, j_colNameArray, j_dataArray) return fb