From 7e2788e941bd17007cda711930a8a7aa0b6c0173 Mon Sep 17 00:00:00 2001 From: Yuan Zhou Date: Mon, 21 Mar 2022 23:26:26 +0800 Subject: [PATCH] fallback Sort after SortHashAgg Signed-off-by: Yuan Zhou --- .../com/intel/oap/extension/ColumnarOverrides.scala | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala index 71c2fbda4..8346500cd 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala @@ -43,7 +43,7 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.ShufflePartitionSpec import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.{ShuffleStageInfo, _} -import org.apache.spark.sql.execution.aggregate.HashAggregateExec +import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec} import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.exchange._ @@ -168,6 +168,17 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] { case _ => ColumnarSortExec(plan.sortOrder, plan.global, child, plan.testSpillFrequency) } + case plan: SortAggregateExec => + //FIXME: fallback sortagg and sort to improve per + val sortPlan = plan.child + if (sortPlan.isInstanceOf[SortExec]) { + val sortChild = sortPlan.asInstanceOf[SortExec].child + val child = replaceWithColumnarPlan(sortChild) + sortPlan.withNewChildren(Seq(child)) + plan.withNewChildren(Seq(sortPlan.withNewChildren(Seq(child)))) + } else { + plan + } case plan: ShuffleExchangeExec => val child = replaceWithColumnarPlan(plan.child) logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")