From f7abf711f6b6e65be30796e85b1dd7d4d64a4aae Mon Sep 17 00:00:00 2001 From: PHILO-HE Date: Tue, 19 Mar 2024 22:42:48 +0800 Subject: [PATCH 1/2] Initial commit --- .../backendsapi/SparkPlanExecApi.scala | 25 ++++--------------- .../execution/WindowExecTransformer.scala | 2 +- .../expression/WindowFunctionsBuilder.scala | 15 +++++++++-- 3 files changed, 19 insertions(+), 23 deletions(-) diff --git a/gluten-core/src/main/scala/io/glutenproject/backendsapi/SparkPlanExecApi.scala b/gluten-core/src/main/scala/io/glutenproject/backendsapi/SparkPlanExecApi.scala index 1fa18634d66e..4f40a8a238b4 100644 --- a/gluten-core/src/main/scala/io/glutenproject/backendsapi/SparkPlanExecApi.scala +++ b/gluten-core/src/main/scala/io/glutenproject/backendsapi/SparkPlanExecApi.scala @@ -536,26 +536,11 @@ trait SparkPlanExecApi { attributeSeq = originalInputAttributes) .doTransform(args)) // Spark only accepts foldable offset. Converts it to LongType literal. - var offset = offsetWf.offset.eval(EmptyRow).asInstanceOf[Int] - if (wf.isInstanceOf[Lead]) { - if (offset < 0) { - // Velox always expects non-negative offset. - throw new UnsupportedOperationException( - s"${wf.nodeName} does not support negative offset: $offset") - } - } else { - // For Lag - // Spark would use `-inputOffset` as offset, so here we forbid positive offset. - // Which means the inputOffset is negative. - if (offset > 0) { - // Velox always expects non-negative offset. - throw new UnsupportedOperationException( - s"${wf.nodeName} does not support negative offset: $offset") - } - // Revert the Spark change and use the original input offset - offset = -offset - } - val offsetNode = ExpressionBuilder.makeLiteral(offset.toLong, LongType, false) + val offset = offsetWf.offset.eval(EmptyRow).asInstanceOf[Int] + // Velox only allows negative offset. WindowFunctionsBuilder#create converts + // lag/lead with negative offset to the function with positive offset. So just + // makes offsetNode store positive value. + val offsetNode = ExpressionBuilder.makeLiteral(Math.abs(offset.toLong), LongType, false) childrenNodeList.add(offsetNode) // NullType means Null is the default value. Don't pass it to native. if (offsetWf.default.dataType != NullType) { diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/WindowExecTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/WindowExecTransformer.scala index 2b81fbec926c..e4053b7a5194 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/WindowExecTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/WindowExecTransformer.scala @@ -183,7 +183,7 @@ case class WindowExecTransformer( val childCtx = child.asInstanceOf[TransformSupport].doTransform(context) val operatorId = context.nextOperatorId(this.nodeName) if (windowExpression == null || windowExpression.isEmpty) { - // The computing for this project is not needed. + // The computing for this operator is not needed. context.registerEmptyRelToOperator(operatorId) return childCtx } diff --git a/gluten-core/src/main/scala/io/glutenproject/expression/WindowFunctionsBuilder.scala b/gluten-core/src/main/scala/io/glutenproject/expression/WindowFunctionsBuilder.scala index 07ad3ae236ad..c610d9705bbb 100644 --- a/gluten-core/src/main/scala/io/glutenproject/expression/WindowFunctionsBuilder.scala +++ b/gluten-core/src/main/scala/io/glutenproject/expression/WindowFunctionsBuilder.scala @@ -18,16 +18,27 @@ package io.glutenproject.expression import io.glutenproject.exception.GlutenNotSupportException import io.glutenproject.expression.ConverterUtils.FunctionConfig +import io.glutenproject.expression.ExpressionNames.{LAG, LEAD} import io.glutenproject.substrait.expression.ExpressionBuilder -import org.apache.spark.sql.catalyst.expressions.{Expression, WindowExpression, WindowFunction} +import org.apache.spark.sql.catalyst.expressions.{EmptyRow, Expression, Lag, Lead, WindowExpression, WindowFunction} import scala.util.control.Breaks.{break, breakable} object WindowFunctionsBuilder { def create(args: java.lang.Object, windowFunc: WindowFunction): Long = { val functionMap = args.asInstanceOf[java.util.HashMap[String, java.lang.Long]] - val substraitFunc = ExpressionMappings.expressionsMap.get(windowFunc.getClass) + val substraitFunc = windowFunc match { + // Handle lag with negative inputOffset, e.g., converts lag(c1, -1) to lead(c1, 1). + // Spark uses `-inputOffset` as `offset` for Lag function. + case lag: Lag if lag.offset.eval(EmptyRow).asInstanceOf[Int] > 0 => + Some(LEAD) + // Handle lead with negative offset, e.g., converts lead(c1, -1) to lag(c1, 1). + case lead: Lead if lead.offset.eval(EmptyRow).asInstanceOf[Int] < 0 => + Some(LAG) + case _ => + ExpressionMappings.expressionsMap.get(windowFunc.getClass) + } if (substraitFunc.isEmpty) { throw new GlutenNotSupportException( s"not currently supported: ${windowFunc.getClass.getName}.") From 84f9d7d145b49da552db1069ed4f91ef3155b61a Mon Sep 17 00:00:00 2001 From: PHILO-HE Date: Wed, 20 Mar 2024 10:20:25 +0800 Subject: [PATCH 2/2] Add a test --- .../execution/VeloxFunctionsValidateSuite.scala | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/backends-velox/src/test/scala/io/glutenproject/execution/VeloxFunctionsValidateSuite.scala b/backends-velox/src/test/scala/io/glutenproject/execution/VeloxFunctionsValidateSuite.scala index 4cffa121dae3..e5bee97611a2 100644 --- a/backends-velox/src/test/scala/io/glutenproject/execution/VeloxFunctionsValidateSuite.scala +++ b/backends-velox/src/test/scala/io/glutenproject/execution/VeloxFunctionsValidateSuite.scala @@ -524,4 +524,18 @@ class VeloxFunctionsValidateSuite extends VeloxWholeStageTransformerSuite { } } + test("lag/lead window function with negative input offset") { + runQueryAndCompare( + "select lag(l_orderkey, -2) over" + + " (partition by l_suppkey order by l_orderkey) from lineitem ") { + checkOperatorMatch[WindowExecTransformer] + } + + runQueryAndCompare( + "select lead(l_orderkey, -2) over" + + " (partition by l_suppkey order by l_orderkey) from lineitem ") { + checkOperatorMatch[WindowExecTransformer] + } + } + }