From 1ddfc8e0e7646393a25f4122cf7fef58df864a81 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 23 Jul 2024 21:38:26 -0700 Subject: [PATCH] fix: Unsupported window expression should fall back to Spark --- .../scala/org/apache/comet/serde/QueryPlanSerde.scala | 6 +++++- .../scala/org/apache/comet/exec/CometExecSuite.scala | 9 +++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 41a69f7a3..d91ae5e4d 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2535,8 +2535,12 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim } }.toArray - val windowExprProto = winExprs.map(windowExprToProto(_, output)) + if (winExprs.length != windowExpression.length) { + withInfo(op, "Unsupported window expression(s)") + return None + } + val windowExprProto = winExprs.map(windowExprToProto(_, output)) val partitionExprs = partitionSpec.map(exprToProto(_, child.output)) val sortOrders = orderSpec.map(exprToProto(_, child.output)) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index e657af9b9..b8c0d5668 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -63,6 +63,15 @@ class CometExecSuite extends CometTestBase { } } + test("Unsupported window expression should fall back to Spark") { + checkAnswer( + spark.sql("select sum(a) over () from values 1.0, 2.0, 3.0 T(a)"), + Row(6.0) :: Row(6.0) :: Row(6.0) :: Nil) + checkAnswer( + spark.sql("select avg(a) over () from values 1.0, 2.0, 3.0 T(a)"), + Row(2.0) :: Row(2.0) :: Row(2.0) :: Nil) + } + test("fix CometNativeExec.doCanonicalize for ReusedExchangeExec") { assume(isSpark34Plus, "ChunkedByteBuffer is not serializable before Spark 3.4+") withSQLConf(