Skip to content

Conversation

@rednaxelafx
Copy link
Contributor

What changes were proposed in this pull request?

Block InvokeLike expressions with ObjectType result from constant-folding, to ensure constant-folded results are trusted to be serializable.
This is a conservative fix for ease of backport to Spark 3.3. A separate future change can relax the restriction and support constant-folding to serializable ObjectType as well.

Why are the changes needed?

This fixes a regression introduced by #35207 . It enabled the constant-folding logic to aggressively fold InvokeLike expressions (e.g. Invoke, StaticInvoke), when all arguments are foldable and the expression itself is deterministic. But it could go overly aggressive and constant-fold to non-serializable results, which would be problematic when that result needs to be serialized and sent over the wire.

In the wild, users of sparksql-scalapb have hit this issue. The constant folding logic would fold a chain of Invoke / StaticInvoke expressions from only holding onto a serializable literal to holding onto a non-serializable literal:

Literal(com.example.protos.demo.Person$@...).scalaDescriptor.findFieldByNumber.get

this expression works fine before constant-folding because the literal that gets sent to the executors is serializable, but when aggressive constant-folding kicks in it ends up as a Literal(scalapb.descriptors.FieldDescriptor@...) which isn't serializable.

The following minimal repro demonstrates this issue:

import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, StaticInvoke}
import org.apache.spark.sql.types.{LongType, ObjectType}
class NotSerializableBoxedLong(longVal: Long) { def add(other: Long): Long = longVal + other }
case class SerializableBoxedLong(longVal: Long) { def toNotSerializable(): NotSerializableBoxedLong = new NotSerializableBoxedLong(longVal) }
val litExpr = Literal.fromObject(SerializableBoxedLong(42L), ObjectType(classOf[SerializableBoxedLong]))
val toNotSerializableExpr = Invoke(litExpr, "toNotSerializable", ObjectType(classOf[NotSerializableBoxedLong]))
val addExpr = Invoke(toNotSerializableExpr, "add", LongType, Seq(UnresolvedAttribute.quotedString("id")))
val df = spark.range(2).select(new Column(addExpr))
df.collect

would result in an error if aggressive constant-folding kicked in:

...
Caused by: java.io.NotSerializableException: NotSerializableBoxedLong
Serialization stack:
	- object not serializable (class: NotSerializableBoxedLong, value: NotSerializableBoxedLong@71231636)
	- element of array (index: 1)
	- array (class [Ljava.lang.Object;, size 2)
	- element of array (index: 1)
	- array (class [Ljava.lang.Object;, size 3)
	- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
	- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.sql.execution.WholeStageCodegenExec, functionalInterfaceMethod=scala/Function2.apply:(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/sql/execution/WholeStageCodegenExec.$anonfun$doExecute$4$adapted:(Lorg/apache/spark/sql/catalyst/expressions/codegen/CodeAndComment;[Ljava/lang/Object;Lorg/apache/spark/sql/execution/metric/SQLMetric;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, instantiatedMethodType=(Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, numCaptured=3])
	- writeReplace data (class: java.lang.invoke.SerializedLambda)
	- object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$3123/1641694389, org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$3123/1641694389@185db22c)
  at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
  at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:49)
  at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:115)
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:441)

Does this PR introduce any user-facing change?

Yes, a regression in ObjectType expression starting from Spark 3.3.0 is fixed.

How was this patch tested?

The existing test cases in ConstantFoldingSuite continues to pass; added a new test case to demonstrate the regression issue.

@github-actions github-actions bot added the SQL label Sep 7, 2022
@rednaxelafx
Copy link
Contributor Author

cc @cloud-fan @AngersZhuuuu

// serializability, because the type-level info with java.io.Serializable and
// java.io.Externalizable marker interfaces are not strong guarantees.
// This restriction can be relaxed in the future to expose more optimizations.
!dt.isInstanceOf[ObjectType]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be safe, how about !dt.existsRecursively(_.isInstanceOf[ObjectType])?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be safe, how about !dt.existsRecursively(_.isInstanceOf[ObjectType])?

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you both very much! This is a great suggestion, addressed in latest commit.

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch!

@cloud-fan
Copy link
Contributor

thanks, merging to master/3.3!

@cloud-fan cloud-fan closed this in 5b96e82 Sep 8, 2022
cloud-fan pushed a commit that referenced this pull request Sep 8, 2022
…rializable literal embedded in the plan

### What changes were proposed in this pull request?

Block `InvokeLike` expressions with `ObjectType` result from constant-folding, to ensure constant-folded results are trusted to be serializable.
This is a conservative fix for ease of backport to Spark 3.3. A separate future change can relax the restriction and support constant-folding to serializable `ObjectType` as well.

### Why are the changes needed?

This fixes a regression introduced by #35207 . It enabled the constant-folding logic to aggressively fold `InvokeLike` expressions (e.g. `Invoke`, `StaticInvoke`), when all arguments are foldable and the expression itself is deterministic. But it could go overly aggressive and constant-fold to non-serializable results, which would be problematic when that result needs to be serialized and sent over the wire.

In the wild, users of sparksql-scalapb have hit this issue. The constant folding logic would fold a chain of `Invoke` / `StaticInvoke` expressions from only holding onto a serializable literal to holding onto a non-serializable literal:
```
Literal(com.example.protos.demo.Person$...).scalaDescriptor.findFieldByNumber.get
```
this expression works fine before constant-folding because the literal that gets sent to the executors is serializable, but when aggressive constant-folding kicks in it ends up as a `Literal(scalapb.descriptors.FieldDescriptor...)` which isn't serializable.

The following minimal repro demonstrates this issue:
```
import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, StaticInvoke}
import org.apache.spark.sql.types.{LongType, ObjectType}
class NotSerializableBoxedLong(longVal: Long) { def add(other: Long): Long = longVal + other }
case class SerializableBoxedLong(longVal: Long) { def toNotSerializable(): NotSerializableBoxedLong = new NotSerializableBoxedLong(longVal) }
val litExpr = Literal.fromObject(SerializableBoxedLong(42L), ObjectType(classOf[SerializableBoxedLong]))
val toNotSerializableExpr = Invoke(litExpr, "toNotSerializable", ObjectType(classOf[NotSerializableBoxedLong]))
val addExpr = Invoke(toNotSerializableExpr, "add", LongType, Seq(UnresolvedAttribute.quotedString("id")))
val df = spark.range(2).select(new Column(addExpr))
df.collect
```
would result in an error if aggressive constant-folding kicked in:
```
...
Caused by: java.io.NotSerializableException: NotSerializableBoxedLong
Serialization stack:
	- object not serializable (class: NotSerializableBoxedLong, value: NotSerializableBoxedLong71231636)
	- element of array (index: 1)
	- array (class [Ljava.lang.Object;, size 2)
	- element of array (index: 1)
	- array (class [Ljava.lang.Object;, size 3)
	- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
	- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.sql.execution.WholeStageCodegenExec, functionalInterfaceMethod=scala/Function2.apply:(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/sql/execution/WholeStageCodegenExec.$anonfun$doExecute$4$adapted:(Lorg/apache/spark/sql/catalyst/expressions/codegen/CodeAndComment;[Ljava/lang/Object;Lorg/apache/spark/sql/execution/metric/SQLMetric;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, instantiatedMethodType=(Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, numCaptured=3])
	- writeReplace data (class: java.lang.invoke.SerializedLambda)
	- object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$3123/1641694389, org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$3123/1641694389185db22c)
  at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
  at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:49)
  at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:115)
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:441)
```

### Does this PR introduce _any_ user-facing change?

Yes, a regression in ObjectType expression starting from Spark 3.3.0 is fixed.

### How was this patch tested?

The existing test cases in `ConstantFoldingSuite` continues to pass; added a new test case to demonstrate the regression issue.

Closes #37823 from rednaxelafx/fix-invokelike-constantfold.

Authored-by: Kris Mok <kris.mok@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 5b96e82)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants