-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-37907][SQL] InvokeLike support ConstantFolding #35207
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
ping @cloud-fan |
|
shall we fix |
Done, need to add UT for another two case? |
| comparePlans(optimized, correctAnswer) | ||
| } | ||
|
|
||
| test("SPARK-37907: StaticInvoke support ConstantFolding") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we test both invokes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and also update the PR title please
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we test both invokes?
Added
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
Outdated
Show resolved
Hide resolved
viirya
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What the real use case for this change? Does it make sense to call Invoke with constant inputs? If the inputs are constant, users should do the "invoke" in Java/Scala code and take the result as literals.
As @cloud-fan suggested, some new added function expression will use |
|
Note that, invoke has been used in more places like DS v2 functions, built-in SQL functions ( |
|
since #35243 merged, we can start review this one. Any more suggestion? |
| def propagateNull: Boolean | ||
|
|
||
| override def foldable: Boolean = | ||
| children.forall(_.foldable) && deterministic && !dataType.isInstanceOf[ObjectType] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan here add condition deterministic
|
|
||
| def propagateNull: Boolean | ||
|
|
||
| override def foldable: Boolean = children.forall(_.foldable) && deterministic |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we remove deterministic? I don't see other expressions require it when defining foldable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we remove
deterministic? I don't see other expressions require it when defining foldable.
If it's not deterministic, such as Invoke return a rand result, if we fold this, we may return incorrect answer
|
ping @cloud-fan GA passed |
|
thanks, merging to master! |
…rializable literal embedded in the plan ### What changes were proposed in this pull request? Block `InvokeLike` expressions with `ObjectType` result from constant-folding, to ensure constant-folded results are trusted to be serializable. This is a conservative fix for ease of backport to Spark 3.3. A separate future change can relax the restriction and support constant-folding to serializable `ObjectType` as well. ### Why are the changes needed? This fixes a regression introduced by #35207 . It enabled the constant-folding logic to aggressively fold `InvokeLike` expressions (e.g. `Invoke`, `StaticInvoke`), when all arguments are foldable and the expression itself is deterministic. But it could go overly aggressive and constant-fold to non-serializable results, which would be problematic when that result needs to be serialized and sent over the wire. In the wild, users of sparksql-scalapb have hit this issue. The constant folding logic would fold a chain of `Invoke` / `StaticInvoke` expressions from only holding onto a serializable literal to holding onto a non-serializable literal: ``` Literal(com.example.protos.demo.Person$...).scalaDescriptor.findFieldByNumber.get ``` this expression works fine before constant-folding because the literal that gets sent to the executors is serializable, but when aggressive constant-folding kicks in it ends up as a `Literal(scalapb.descriptors.FieldDescriptor...)` which isn't serializable. The following minimal repro demonstrates this issue: ``` import org.apache.spark.sql.Column import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, StaticInvoke} import org.apache.spark.sql.types.{LongType, ObjectType} class NotSerializableBoxedLong(longVal: Long) { def add(other: Long): Long = longVal + other } case class SerializableBoxedLong(longVal: Long) { def toNotSerializable(): NotSerializableBoxedLong = new NotSerializableBoxedLong(longVal) } val litExpr = Literal.fromObject(SerializableBoxedLong(42L), ObjectType(classOf[SerializableBoxedLong])) val toNotSerializableExpr = Invoke(litExpr, "toNotSerializable", ObjectType(classOf[NotSerializableBoxedLong])) val addExpr = Invoke(toNotSerializableExpr, "add", LongType, Seq(UnresolvedAttribute.quotedString("id"))) val df = spark.range(2).select(new Column(addExpr)) df.collect ``` would result in an error if aggressive constant-folding kicked in: ``` ... Caused by: java.io.NotSerializableException: NotSerializableBoxedLong Serialization stack: - object not serializable (class: NotSerializableBoxedLong, value: NotSerializableBoxedLong71231636) - element of array (index: 1) - array (class [Ljava.lang.Object;, size 2) - element of array (index: 1) - array (class [Ljava.lang.Object;, size 3) - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;) - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.sql.execution.WholeStageCodegenExec, functionalInterfaceMethod=scala/Function2.apply:(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/sql/execution/WholeStageCodegenExec.$anonfun$doExecute$4$adapted:(Lorg/apache/spark/sql/catalyst/expressions/codegen/CodeAndComment;[Ljava/lang/Object;Lorg/apache/spark/sql/execution/metric/SQLMetric;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, instantiatedMethodType=(Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, numCaptured=3]) - writeReplace data (class: java.lang.invoke.SerializedLambda) - object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$3123/1641694389, org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$3123/1641694389185db22c) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:49) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:115) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:441) ``` ### Does this PR introduce _any_ user-facing change? Yes, a regression in ObjectType expression starting from Spark 3.3.0 is fixed. ### How was this patch tested? The existing test cases in `ConstantFoldingSuite` continues to pass; added a new test case to demonstrate the regression issue. Closes #37823 from rednaxelafx/fix-invokelike-constantfold. Authored-by: Kris Mok <kris.mok@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…rializable literal embedded in the plan ### What changes were proposed in this pull request? Block `InvokeLike` expressions with `ObjectType` result from constant-folding, to ensure constant-folded results are trusted to be serializable. This is a conservative fix for ease of backport to Spark 3.3. A separate future change can relax the restriction and support constant-folding to serializable `ObjectType` as well. ### Why are the changes needed? This fixes a regression introduced by #35207 . It enabled the constant-folding logic to aggressively fold `InvokeLike` expressions (e.g. `Invoke`, `StaticInvoke`), when all arguments are foldable and the expression itself is deterministic. But it could go overly aggressive and constant-fold to non-serializable results, which would be problematic when that result needs to be serialized and sent over the wire. In the wild, users of sparksql-scalapb have hit this issue. The constant folding logic would fold a chain of `Invoke` / `StaticInvoke` expressions from only holding onto a serializable literal to holding onto a non-serializable literal: ``` Literal(com.example.protos.demo.Person$...).scalaDescriptor.findFieldByNumber.get ``` this expression works fine before constant-folding because the literal that gets sent to the executors is serializable, but when aggressive constant-folding kicks in it ends up as a `Literal(scalapb.descriptors.FieldDescriptor...)` which isn't serializable. The following minimal repro demonstrates this issue: ``` import org.apache.spark.sql.Column import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, StaticInvoke} import org.apache.spark.sql.types.{LongType, ObjectType} class NotSerializableBoxedLong(longVal: Long) { def add(other: Long): Long = longVal + other } case class SerializableBoxedLong(longVal: Long) { def toNotSerializable(): NotSerializableBoxedLong = new NotSerializableBoxedLong(longVal) } val litExpr = Literal.fromObject(SerializableBoxedLong(42L), ObjectType(classOf[SerializableBoxedLong])) val toNotSerializableExpr = Invoke(litExpr, "toNotSerializable", ObjectType(classOf[NotSerializableBoxedLong])) val addExpr = Invoke(toNotSerializableExpr, "add", LongType, Seq(UnresolvedAttribute.quotedString("id"))) val df = spark.range(2).select(new Column(addExpr)) df.collect ``` would result in an error if aggressive constant-folding kicked in: ``` ... Caused by: java.io.NotSerializableException: NotSerializableBoxedLong Serialization stack: - object not serializable (class: NotSerializableBoxedLong, value: NotSerializableBoxedLong71231636) - element of array (index: 1) - array (class [Ljava.lang.Object;, size 2) - element of array (index: 1) - array (class [Ljava.lang.Object;, size 3) - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;) - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.sql.execution.WholeStageCodegenExec, functionalInterfaceMethod=scala/Function2.apply:(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/sql/execution/WholeStageCodegenExec.$anonfun$doExecute$4$adapted:(Lorg/apache/spark/sql/catalyst/expressions/codegen/CodeAndComment;[Ljava/lang/Object;Lorg/apache/spark/sql/execution/metric/SQLMetric;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, instantiatedMethodType=(Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, numCaptured=3]) - writeReplace data (class: java.lang.invoke.SerializedLambda) - object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$3123/1641694389, org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$3123/1641694389185db22c) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:49) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:115) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:441) ``` ### Does this PR introduce _any_ user-facing change? Yes, a regression in ObjectType expression starting from Spark 3.3.0 is fixed. ### How was this patch tested? The existing test cases in `ConstantFoldingSuite` continues to pass; added a new test case to demonstrate the regression issue. Closes #37823 from rednaxelafx/fix-invokelike-constantfold. Authored-by: Kris Mok <kris.mok@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 5b96e82) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
Currently,
InvokeLikenot implementfoldable, can't be optimized by ConstantFolding, this pr support thisWhy are the changes needed?
Make StaticInvoke support ConstantFolding
Does this PR introduce any user-facing change?
No
How was this patch tested?
Added UT