Skip to content

Commit

Permalink
[SPARK-37779][SQL] Make ColumnarToRowExec plan canonicalizable after …
Browse files Browse the repository at this point in the history
…(de)serialization

This PR proposes to add a driver-side check on `supportsColumnar` sanity check at `ColumnarToRowExec`.

SPARK-23731 fixed the plans to be serializable by leveraging lazy but SPARK-28213 happened to refer to the lazy variable at: https://github.com/apache/spark/blob/77b164aac9764049a4820064421ef82ec0bc14fb/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala#L68

This can fail during canonicalization during, for example, eliminating sub common expressions (on executor side):

```
java.lang.NullPointerException
    at org.apache.spark.sql.execution.FileSourceScanExec.supportsColumnar$lzycompute(DataSourceScanExec.scala:280)
    at org.apache.spark.sql.execution.FileSourceScanExec.supportsColumnar(DataSourceScanExec.scala:279)
    at org.apache.spark.sql.execution.InputAdapter.supportsColumnar(WholeStageCodegenExec.scala:509)
    at org.apache.spark.sql.execution.ColumnarToRowExec.<init>(Columnar.scala:67)
    ...
    at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:581)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:580)
    at org.apache.spark.sql.execution.ScalarSubquery.canonicalized$lzycompute(subquery.scala:110)
    ...
    at org.apache.spark.sql.catalyst.expressions.ExpressionEquals.hashCode(EquivalentExpressions.scala:275)
    ...
    at scala.collection.mutable.HashTable.findEntry$(HashTable.scala:135)
    at scala.collection.mutable.HashMap.findEntry(HashMap.scala:44)
    at scala.collection.mutable.HashMap.get(HashMap.scala:74)
    at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExpr(EquivalentExpressions.scala:46)
    at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTreeHelper$1(EquivalentExpressions.scala:147)
    at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:170)
    at org.apache.spark.sql.catalyst.expressions.SubExprEvaluationRuntime.$anonfun$proxyExpressions$1(SubExprEvaluationRuntime.scala:89)
    at org.apache.spark.sql.catalyst.expressions.SubExprEvaluationRuntime.$anonfun$proxyExpressions$1$adapted(SubExprEvaluationRuntime.scala:89)
    at scala.collection.immutable.List.foreach(List.scala:392)
```

This fix is still a bandaid fix but at least addresses the issue with minimized change - this fix should ideally be backported too.

Pretty unlikely - when `ColumnarToRowExec` has to be canonicalized on the executor side (see the stacktrace), but yes. it would fix a bug.

Unittest was added.

Closes #35058 from HyukjinKwon/SPARK-37779.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 195f1aa)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
HyukjinKwon committed Dec 30, 2021
1 parent 5e0f0da commit e17ab6e
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ trait ColumnarToRowTransition extends UnaryExecNode
* [[MapPartitionsInRWithArrowExec]]. Eventually this should replace those implementations.
*/
case class ColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransition with CodegenSupport {
assert(child.supportsColumnar)
// supportsColumnar requires to be only called on driver side, see also SPARK-37779.
assert(TaskContext.get != null || child.supportsColumnar)

override def output: Seq[Attribute] = child.output

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,23 @@ class SparkPlanSuite extends QueryTest with SharedSparkSession {
test("SPARK-30780 empty LocalTableScan should use RDD without partitions") {
assert(LocalTableScanExec(Nil, Nil).execute().getNumPartitions == 0)
}

test("SPARK-37779: ColumnarToRowExec should be canonicalizable after being (de)serialized") {
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
withTempPath { path =>
spark.range(1).write.parquet(path.getAbsolutePath)
val df = spark.read.parquet(path.getAbsolutePath)
val columnarToRowExec =
df.queryExecution.executedPlan.collectFirst { case p: ColumnarToRowExec => p }.get
try {
spark.range(1).foreach { _ =>
columnarToRowExec.canonicalized
()
}
} catch {
case e: Throwable => fail("ColumnarToRowExec was not canonicalizable", e)
}
}
}
}
}

0 comments on commit e17ab6e

Please sign in to comment.