From 22c79867b41cbf6a80ebb52700a4607f30f05671 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 19 Jul 2018 15:42:03 +0800 Subject: [PATCH 1/4] Make FileSourceScanExec canonicalizable in executor side --- .../sql/execution/DataSourceScanExec.scala | 8 ++--- .../spark/sql/execution/SparkPlan.scala | 10 +++--- .../execution/FileSourceScanExecSuite.scala | 36 +++++++++++++++++++ 3 files changed, 45 insertions(+), 9 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/FileSourceScanExecSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index d7f2654be045..4d7db0a8bbf5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -166,10 +166,10 @@ case class FileSourceScanExec( override val tableIdentifier: Option[TableIdentifier]) extends DataSourceScanExec with ColumnarBatchScan { - override val supportsBatch: Boolean = relation.fileFormat.supportBatch( + override lazy val supportsBatch: Boolean = relation.fileFormat.supportBatch( relation.sparkSession, StructType.fromAttributes(output)) - override val needsUnsafeRowConversion: Boolean = { + override lazy val needsUnsafeRowConversion: Boolean = { if (relation.fileFormat.isInstanceOf[ParquetSource]) { SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled } else { @@ -199,7 +199,7 @@ case class FileSourceScanExec( ret } - override val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = { + override lazy val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = { val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) { relation.bucketSpec } else { @@ -270,7 +270,7 @@ case class FileSourceScanExec( private val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter) logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}") - override val metadata: Map[String, String] = { + override lazy val metadata: Map[String, String] = { def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]") val location = relation.location val locationDesc = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 398758a3331b..1f97993e2045 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -47,17 +47,15 @@ import org.apache.spark.util.ThreadUtils abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializable { /** - * A handle to the SQL Context that was used to create this plan. Since many operators need + * A handle to the SQL Context that was used to create this plan. Since many operators need * access to the sqlContext for RDD operations or configuration this field is automatically * populated by the query planning infrastructure. */ - @transient - final val sqlContext = SparkSession.getActiveSession.map(_.sqlContext).orNull + @transient final val sqlContext = SparkSession.getActiveSession.map(_.sqlContext).orNull protected def sparkContext = sqlContext.sparkContext // sqlContext will be null when SparkPlan nodes are created without the active sessions. - // So far, this only happens in the test cases. val subexpressionEliminationEnabled: Boolean = if (sqlContext != null) { sqlContext.conf.subexpressionEliminationEnabled } else { @@ -69,7 +67,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ /** Overridden make copy also propagates sqlContext to copied plan. */ override def makeCopy(newArgs: Array[AnyRef]): SparkPlan = { - SparkSession.setActiveSession(sqlContext.sparkSession) + if (sqlContext != null) { + SparkSession.setActiveSession(sqlContext.sparkSession) + } super.makeCopy(newArgs) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/FileSourceScanExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/FileSourceScanExecSuite.scala new file mode 100644 index 000000000000..1164d35c02bb --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/FileSourceScanExecSuite.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.test.SharedSQLContext + +class FileSourceScanExecSuite extends SharedSQLContext { + test("FileSourceScanExec should be canonicalizable in executor side") { + withTempPath { path => + spark.range(1).toDF().write.parquet(path.getAbsolutePath) + val df = spark.read.parquet(path.getAbsolutePath) + val fileSourceScanExec = + df.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get + try { + spark.range(1).foreach(_ => fileSourceScanExec.canonicalized) + } catch { + case e: Throwable => fail("FileSourceScanExec was not canonicalizable", e) + } + } + } +} From b4888367fe22e62be84a55929c4213fb40cdb498 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 19 Jul 2018 18:57:25 +0800 Subject: [PATCH 2/4] Address comments --- .../org/apache/spark/sql/execution/DataSourceScanExec.scala | 2 ++ .../spark/sql/execution/FileSourceScanExecSuite.scala | 6 +++--- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 4d7db0a8bbf5..36ed016773b6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -166,6 +166,8 @@ case class FileSourceScanExec( override val tableIdentifier: Option[TableIdentifier]) extends DataSourceScanExec with ColumnarBatchScan { + // Note that some vals referring the file-based relation are lazy intentionally + // so that this plan can be canonicalized on executor side too. See SPARK-23731. override lazy val supportsBatch: Boolean = relation.fileFormat.supportBatch( relation.sparkSession, StructType.fromAttributes(output)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/FileSourceScanExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/FileSourceScanExecSuite.scala index 1164d35c02bb..05783bdf6294 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/FileSourceScanExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/FileSourceScanExecSuite.scala @@ -20,12 +20,12 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.test.SharedSQLContext class FileSourceScanExecSuite extends SharedSQLContext { - test("FileSourceScanExec should be canonicalizable in executor side") { + test("FileSourceScanExec should be canonicalizable on executor side") { withTempPath { path => - spark.range(1).toDF().write.parquet(path.getAbsolutePath) + spark.range(1).write.parquet(path.getAbsolutePath) val df = spark.read.parquet(path.getAbsolutePath) val fileSourceScanExec = - df.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get + df.queryExecution.sparkPlan.collectFirst { case p: FileSourceScanExec => p }.get try { spark.range(1).foreach(_ => fileSourceScanExec.canonicalized) } catch { From 7f531bd3962685ff2bd271af8721653319f618bf Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 20 Jul 2018 17:09:09 +0800 Subject: [PATCH 3/4] Address comments --- .../spark/sql/execution/FileSourceScanExecSuite.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/FileSourceScanExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/FileSourceScanExecSuite.scala index 05783bdf6294..48e66e86a436 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/FileSourceScanExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/FileSourceScanExecSuite.scala @@ -17,17 +17,21 @@ package org.apache.spark.sql.execution +import org.apache.spark.SparkEnv import org.apache.spark.sql.test.SharedSQLContext class FileSourceScanExecSuite extends SharedSQLContext { - test("FileSourceScanExec should be canonicalizable on executor side") { + test("SPARK-23731: FileSourceScanExec should be canonicalizable after being (de)serialized") { withTempPath { path => spark.range(1).write.parquet(path.getAbsolutePath) val df = spark.read.parquet(path.getAbsolutePath) val fileSourceScanExec = df.queryExecution.sparkPlan.collectFirst { case p: FileSourceScanExec => p }.get + val serializer = SparkEnv.get.serializer.newInstance() + val readback = + serializer.deserialize[FileSourceScanExec](serializer.serialize(fileSourceScanExec)) try { - spark.range(1).foreach(_ => fileSourceScanExec.canonicalized) + readback.canonicalized } catch { case e: Throwable => fail("FileSourceScanExec was not canonicalizable", e) } From be6e5941991ca045100456e11a59a9b2eb77a1ea Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 20 Jul 2018 17:14:56 +0800 Subject: [PATCH 4/4] Move test case to SparkPlanSuite --- .../execution/FileSourceScanExecSuite.scala | 40 ------------------- .../spark/sql/execution/SparkPlanSuite.scala | 17 ++++++++ 2 files changed, 17 insertions(+), 40 deletions(-) delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/FileSourceScanExecSuite.scala diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/FileSourceScanExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/FileSourceScanExecSuite.scala deleted file mode 100644 index 48e66e86a436..000000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/FileSourceScanExecSuite.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution - -import org.apache.spark.SparkEnv -import org.apache.spark.sql.test.SharedSQLContext - -class FileSourceScanExecSuite extends SharedSQLContext { - test("SPARK-23731: FileSourceScanExec should be canonicalizable after being (de)serialized") { - withTempPath { path => - spark.range(1).write.parquet(path.getAbsolutePath) - val df = spark.read.parquet(path.getAbsolutePath) - val fileSourceScanExec = - df.queryExecution.sparkPlan.collectFirst { case p: FileSourceScanExec => p }.get - val serializer = SparkEnv.get.serializer.newInstance() - val readback = - serializer.deserialize[FileSourceScanExec](serializer.serialize(fileSourceScanExec)) - try { - readback.canonicalized - } catch { - case e: Throwable => fail("FileSourceScanExec was not canonicalizable", e) - } - } - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala index 750d9e4adf8b..34dc6f37c0e4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution +import org.apache.spark.SparkEnv import org.apache.spark.sql.QueryTest import org.apache.spark.sql.test.SharedSQLContext @@ -33,4 +34,20 @@ class SparkPlanSuite extends QueryTest with SharedSQLContext { intercept[IllegalStateException] { plan.executeTake(1) } } + test("SPARK-23731 plans should be canonicalizable after being (de)serialized") { + withTempPath { path => + spark.range(1).write.parquet(path.getAbsolutePath) + val df = spark.read.parquet(path.getAbsolutePath) + val fileSourceScanExec = + df.queryExecution.sparkPlan.collectFirst { case p: FileSourceScanExec => p }.get + val serializer = SparkEnv.get.serializer.newInstance() + val readback = + serializer.deserialize[FileSourceScanExec](serializer.serialize(fileSourceScanExec)) + try { + readback.canonicalized + } catch { + case e: Throwable => fail("FileSourceScanExec was not canonicalizable", e) + } + } + } }