Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL] Support lead/lag window function with negative input offset #5026

Merged
merged 2 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -524,4 +524,18 @@ class VeloxFunctionsValidateSuite extends VeloxWholeStageTransformerSuite {
}
}

test("lag/lead window function with negative input offset") {
runQueryAndCompare(
"select lag(l_orderkey, -2) over" +
" (partition by l_suppkey order by l_orderkey) from lineitem ") {
checkOperatorMatch[WindowExecTransformer]
}

runQueryAndCompare(
"select lead(l_orderkey, -2) over" +
" (partition by l_suppkey order by l_orderkey) from lineitem ") {
checkOperatorMatch[WindowExecTransformer]
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -536,26 +536,11 @@ trait SparkPlanExecApi {
attributeSeq = originalInputAttributes)
.doTransform(args))
// Spark only accepts foldable offset. Converts it to LongType literal.
var offset = offsetWf.offset.eval(EmptyRow).asInstanceOf[Int]
if (wf.isInstanceOf[Lead]) {
if (offset < 0) {
// Velox always expects non-negative offset.
throw new UnsupportedOperationException(
s"${wf.nodeName} does not support negative offset: $offset")
}
} else {
// For Lag
// Spark would use `-inputOffset` as offset, so here we forbid positive offset.
// Which means the inputOffset is negative.
if (offset > 0) {
// Velox always expects non-negative offset.
throw new UnsupportedOperationException(
s"${wf.nodeName} does not support negative offset: $offset")
}
// Revert the Spark change and use the original input offset
offset = -offset
}
val offsetNode = ExpressionBuilder.makeLiteral(offset.toLong, LongType, false)
val offset = offsetWf.offset.eval(EmptyRow).asInstanceOf[Int]
// Velox only allows negative offset. WindowFunctionsBuilder#create converts
// lag/lead with negative offset to the function with positive offset. So just
// makes offsetNode store positive value.
val offsetNode = ExpressionBuilder.makeLiteral(Math.abs(offset.toLong), LongType, false)
childrenNodeList.add(offsetNode)
// NullType means Null is the default value. Don't pass it to native.
if (offsetWf.default.dataType != NullType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ case class WindowExecTransformer(
val childCtx = child.asInstanceOf[TransformSupport].doTransform(context)
val operatorId = context.nextOperatorId(this.nodeName)
if (windowExpression == null || windowExpression.isEmpty) {
// The computing for this project is not needed.
// The computing for this operator is not needed.
context.registerEmptyRelToOperator(operatorId)
return childCtx
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,27 @@ package io.glutenproject.expression

import io.glutenproject.exception.GlutenNotSupportException
import io.glutenproject.expression.ConverterUtils.FunctionConfig
import io.glutenproject.expression.ExpressionNames.{LAG, LEAD}
import io.glutenproject.substrait.expression.ExpressionBuilder

import org.apache.spark.sql.catalyst.expressions.{Expression, WindowExpression, WindowFunction}
import org.apache.spark.sql.catalyst.expressions.{EmptyRow, Expression, Lag, Lead, WindowExpression, WindowFunction}

import scala.util.control.Breaks.{break, breakable}

object WindowFunctionsBuilder {
def create(args: java.lang.Object, windowFunc: WindowFunction): Long = {
val functionMap = args.asInstanceOf[java.util.HashMap[String, java.lang.Long]]
val substraitFunc = ExpressionMappings.expressionsMap.get(windowFunc.getClass)
val substraitFunc = windowFunc match {
// Handle lag with negative inputOffset, e.g., converts lag(c1, -1) to lead(c1, 1).
// Spark uses `-inputOffset` as `offset` for Lag function.
case lag: Lag if lag.offset.eval(EmptyRow).asInstanceOf[Int] > 0 =>
Some(LEAD)
// Handle lead with negative offset, e.g., converts lead(c1, -1) to lag(c1, 1).
case lead: Lead if lead.offset.eval(EmptyRow).asInstanceOf[Int] < 0 =>
Some(LAG)
case _ =>
ExpressionMappings.expressionsMap.get(windowFunc.getClass)
}
if (substraitFunc.isEmpty) {
throw new GlutenNotSupportException(
s"not currently supported: ${windowFunc.getClass.getName}.")
Expand Down
Loading