From dc777a86939b5a37bc0762bb97a025b90300dbe5 Mon Sep 17 00:00:00 2001 From: Andrey Pavlenko Date: Wed, 28 Jun 2023 16:47:02 +0200 Subject: [PATCH] FIX-#1851: Squash multiple LogicalProject nodes In case of multiple sequential LogicalProject nodes, Calcite squashes them, but gets the column names from the first one. To fix the issue, squash on the Modin side and keep the names from the last one. Signed-off-by: Andrey Pavlenko --- .../implementations/hdk_on_native/calcite_builder.py | 12 ++++++++++++ .../hdk_on_native/dataframe/dataframe.py | 4 +--- .../implementations/hdk_on_native/dataframe/utils.py | 8 ++++---- .../hdk_on_native/partitioning/partition_manager.py | 8 +------- 4 files changed, 18 insertions(+), 14 deletions(-) diff --git a/modin/experimental/core/execution/native/implementations/hdk_on_native/calcite_builder.py b/modin/experimental/core/execution/native/implementations/hdk_on_native/calcite_builder.py index edeac113655..bd56ac7bab0 100644 --- a/modin/experimental/core/execution/native/implementations/hdk_on_native/calcite_builder.py +++ b/modin/experimental/core/execution/native/implementations/hdk_on_native/calcite_builder.py @@ -738,6 +738,18 @@ def _push(self, node): node : CalciteBaseNode A node to add. """ + if ( + len(self.res) != 0 + and isinstance(node, CalciteProjectionNode) + and isinstance(self.res[-1], CalciteProjectionNode) + and all(isinstance(expr, CalciteInputRefExpr) for expr in node.exprs) + ): + # Replace the last CalciteProjectionNode with this one and + # translate the input refs. + exprs = self.res.pop().exprs + node = CalciteProjectionNode( + node.fields, [exprs[expr.input] for expr in node.exprs] + ) self.res.append(node) def _last(self): diff --git a/modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py b/modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py index 317eeb45549..89cd0bc5468 100644 --- a/modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py +++ b/modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py @@ -2041,9 +2041,7 @@ def _materialize(self): new_table, unsupported_cols=[], encode_col_names=False )[0] else: - partitions = self._partition_mgr_cls.run_exec_plan( - self._op, self._table_cols - ) + partitions = self._partition_mgr_cls.run_exec_plan(self._op) self._partitions = partitions self._op = FrameNode(self) diff --git a/modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/utils.py b/modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/utils.py index 14d48f379ce..5fb2d01dfe8 100644 --- a/modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/utils.py +++ b/modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/utils.py @@ -429,12 +429,12 @@ def to_empty_pandas_df(df): index_cols = None else: index_cols = ColNameCodec.mangle_index_names(merged.index.names) - for orig_name, mangled_name in zip(merged.index.names, index_cols): + for name in index_cols: # Using _dtypes here since it contains all column names, # including the index. - df = left if mangled_name in left._dtypes else right - exprs[orig_name] = df.ref(mangled_name) - new_dtypes.append(df._dtypes[mangled_name]) + df = left if name in left._dtypes else right + exprs[name] = df.ref(name) + new_dtypes.append(df._dtypes[name]) left_col_names = set(left.columns) right_col_names = set(right.columns) diff --git a/modin/experimental/core/execution/native/implementations/hdk_on_native/partitioning/partition_manager.py b/modin/experimental/core/execution/native/implementations/hdk_on_native/partitioning/partition_manager.py index b90a9161b32..dd242bee9d3 100644 --- a/modin/experimental/core/execution/native/implementations/hdk_on_native/partitioning/partition_manager.py +++ b/modin/experimental/core/execution/native/implementations/hdk_on_native/partitioning/partition_manager.py @@ -227,7 +227,7 @@ def is_supported_dtype(dtype): ) @classmethod - def run_exec_plan(cls, plan, columns): + def run_exec_plan(cls, plan): """ Run execution plan in HDK storage format to materialize frame. @@ -235,8 +235,6 @@ def run_exec_plan(cls, plan, columns): ---------- plan : DFAlgNode A root of an execution plan tree. - columns : list of str - A frame column names. Returns ------- @@ -256,10 +254,6 @@ def run_exec_plan(cls, plan, columns): calcite_json = "execute calcite " + calcite_json table = worker.executeRA(calcite_json) - # workaround for https://github.com/modin-project/modin/issues/1851 - if DoUseCalcite.get(): - table._column_names = [ColNameCodec.encode(c) for c in columns] - res = np.empty((1, 1), dtype=np.dtype(object)) res[0][0] = cls._partition_class(table)