From e17ab6e995a213e442e91df168e87fb724672613 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Thu, 30 Dec 2021 12:38:37 +0900 Subject: [PATCH] [SPARK-37779][SQL] Make ColumnarToRowExec plan canonicalizable after (de)serialization This PR proposes to add a driver-side check on `supportsColumnar` sanity check at `ColumnarToRowExec`. SPARK-23731 fixed the plans to be serializable by leveraging lazy but SPARK-28213 happened to refer to the lazy variable at: https://github.com/apache/spark/blob/77b164aac9764049a4820064421ef82ec0bc14fb/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala#L68 This can fail during canonicalization during, for example, eliminating sub common expressions (on executor side): ``` java.lang.NullPointerException at org.apache.spark.sql.execution.FileSourceScanExec.supportsColumnar$lzycompute(DataSourceScanExec.scala:280) at org.apache.spark.sql.execution.FileSourceScanExec.supportsColumnar(DataSourceScanExec.scala:279) at org.apache.spark.sql.execution.InputAdapter.supportsColumnar(WholeStageCodegenExec.scala:509) at org.apache.spark.sql.execution.ColumnarToRowExec.(Columnar.scala:67) ... at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:581) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:580) at org.apache.spark.sql.execution.ScalarSubquery.canonicalized$lzycompute(subquery.scala:110) ... at org.apache.spark.sql.catalyst.expressions.ExpressionEquals.hashCode(EquivalentExpressions.scala:275) ... at scala.collection.mutable.HashTable.findEntry$(HashTable.scala:135) at scala.collection.mutable.HashMap.findEntry(HashMap.scala:44) at scala.collection.mutable.HashMap.get(HashMap.scala:74) at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExpr(EquivalentExpressions.scala:46) at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTreeHelper$1(EquivalentExpressions.scala:147) at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:170) at org.apache.spark.sql.catalyst.expressions.SubExprEvaluationRuntime.$anonfun$proxyExpressions$1(SubExprEvaluationRuntime.scala:89) at org.apache.spark.sql.catalyst.expressions.SubExprEvaluationRuntime.$anonfun$proxyExpressions$1$adapted(SubExprEvaluationRuntime.scala:89) at scala.collection.immutable.List.foreach(List.scala:392) ``` This fix is still a bandaid fix but at least addresses the issue with minimized change - this fix should ideally be backported too. Pretty unlikely - when `ColumnarToRowExec` has to be canonicalized on the executor side (see the stacktrace), but yes. it would fix a bug. Unittest was added. Closes #35058 from HyukjinKwon/SPARK-37779. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon (cherry picked from commit 195f1aaf4361fb8f5f31ef7f5c63464767ad88bd) Signed-off-by: Hyukjin Kwon --- .../apache/spark/sql/execution/Columnar.scala | 3 ++- .../spark/sql/execution/SparkPlanSuite.scala | 19 +++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala index ccb525d2e192e..e2bdf4e2d955b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala @@ -64,7 +64,8 @@ trait ColumnarToRowTransition extends UnaryExecNode * [[MapPartitionsInRWithArrowExec]]. Eventually this should replace those implementations. */ case class ColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransition with CodegenSupport { - assert(child.supportsColumnar) + // supportsColumnar requires to be only called on driver side, see also SPARK-37779. + assert(TaskContext.get != null || child.supportsColumnar) override def output: Seq[Attribute] = child.output diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala index 56fff1107ae39..dacf8fecbeeb6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala @@ -88,4 +88,23 @@ class SparkPlanSuite extends QueryTest with SharedSparkSession { test("SPARK-30780 empty LocalTableScan should use RDD without partitions") { assert(LocalTableScanExec(Nil, Nil).execute().getNumPartitions == 0) } + + test("SPARK-37779: ColumnarToRowExec should be canonicalizable after being (de)serialized") { + withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { + withTempPath { path => + spark.range(1).write.parquet(path.getAbsolutePath) + val df = spark.read.parquet(path.getAbsolutePath) + val columnarToRowExec = + df.queryExecution.executedPlan.collectFirst { case p: ColumnarToRowExec => p }.get + try { + spark.range(1).foreach { _ => + columnarToRowExec.canonicalized + () + } + } catch { + case e: Throwable => fail("ColumnarToRowExec was not canonicalizable", e) + } + } + } + } }