From 9f3a1cd0c2e95bd6239facf3584d590a88aafce4 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sat, 26 Aug 2017 15:16:02 -0700 Subject: [PATCH 1/3] fix. --- .../org/apache/spark/sql/internal/SQLConf.scala | 6 +++--- .../org/apache/spark/sql/execution/SparkPlan.scala | 6 ++++-- .../spark/sql/execution/WholeStageCodegenExec.scala | 2 +- .../apache/spark/sql/DataFrameFunctionsSuite.scala | 2 +- .../scala/org/apache/spark/sql/DataFrameSuite.scala | 12 +++++++++++- .../org/apache/spark/sql/test/SharedSQLContext.scala | 2 ++ .../org/apache/spark/sql/hive/test/TestHive.scala | 1 + 7 files changed, 23 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index a685099505ee..24f51ef16310 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -551,9 +551,9 @@ object SQLConf { .intConf .createWithDefault(100) - val WHOLESTAGE_FALLBACK = buildConf("spark.sql.codegen.fallback") + val CODEGEN_FALLBACK = buildConf("spark.sql.codegen.fallback") .internal() - .doc("When true, whole stage codegen could be temporary disabled for the part of query that" + + .doc("When true, (whole stage) codegen could be temporary disabled for the part of query that" + " fail to compile generated code") .booleanConf .createWithDefault(true) @@ -1041,7 +1041,7 @@ class SQLConf extends Serializable with Logging { def wholeStageMaxNumFields: Int = getConf(WHOLESTAGE_MAX_NUM_FIELDS) - def wholeStageFallback: Boolean = getConf(WHOLESTAGE_FALLBACK) + def codegenFallback: Boolean = getConf(CODEGEN_FALLBACK) def maxCaseBranchesForCodegen: Int = getConf(MAX_CASES_BRANCHES) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index c7277c21cebb..04e8a407424a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -54,6 +54,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ @transient final val sqlContext = SparkSession.getActiveSession.map(_.sqlContext).orNull + // whether we should fallback when hitting compilation errors caused by codegen + private val codeGenFallBack = sqlContext == null || sqlContext.conf.codegenFallback + protected def sparkContext = sqlContext.sparkContext // sqlContext will be null when we are being deserialized on the slaves. In this instance @@ -370,8 +373,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ try { GeneratePredicate.generate(expression, inputSchema) } catch { - case e @ (_: JaninoRuntimeException | _: CompileException) - if sqlContext == null || sqlContext.conf.wholeStageFallback => + case _ @ (_: JaninoRuntimeException | _: CompileException) if codeGenFallBack => genInterpretedPredicate(expression, inputSchema) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index bacb7090a70a..a41a7ca56a0a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -382,7 +382,7 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co try { CodeGenerator.compile(cleanedSource) } catch { - case e: Exception if !Utils.isTesting && sqlContext.conf.wholeStageFallback => + case _: Exception if !Utils.isTesting && sqlContext.conf.codegenFallback => // We should already saw the error message logWarning(s"Whole-stage codegen disabled for this plan:\n $treeString") return child.execute() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala index 0681b9cbeb1d..50e475984f45 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala @@ -422,7 +422,7 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext { v } withSQLConf( - (SQLConf.WHOLESTAGE_FALLBACK.key, codegenFallback.toString), + (SQLConf.CODEGEN_FALLBACK.key, codegenFallback.toString), (SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, wholeStage.toString)) { val df = spark.range(0, 4, 1, 4).withColumn("c", c) val rows = df.collect() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 5eb34e587e95..13341645e8ff 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -2011,7 +2011,17 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { val filter = (0 until N) .foldLeft(lit(false))((e, index) => e.or(df.col(df.columns(index)) =!= "string")) - df.filter(filter).count + + withSQLConf(SQLConf.CODEGEN_FALLBACK.key -> "true") { + df.filter(filter).count() + } + + withSQLConf(SQLConf.CODEGEN_FALLBACK.key -> "false") { + val e = intercept[SparkException] { + df.filter(filter).count() + }.getMessage + assert(e.contains("grows beyond 64 KB")) + } } test("SPARK-20897: cached self-join should not fail") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala index 1f073d5f64c6..cd8d0708d8a3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala @@ -24,6 +24,7 @@ import org.scalatest.concurrent.Eventually import org.apache.spark.{DebugFilesystem, SparkConf} import org.apache.spark.sql.{SparkSession, SQLContext} +import org.apache.spark.sql.internal.SQLConf /** * Helper trait for SQL test suites where all tests share a single [[TestSparkSession]]. @@ -34,6 +35,7 @@ trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach with Eventua new SparkConf() .set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName) .set("spark.unsafe.exceptionOnMemoryLeak", "true") + .set(SQLConf.CODEGEN_FALLBACK.key, "false") } /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 10c9a2de6540..0f6a81b6f813 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -51,6 +51,7 @@ object TestHive "TestSQLContext", new SparkConf() .set("spark.sql.test", "") + .set(SQLConf.CODEGEN_FALLBACK.key, "false") .set("spark.sql.hive.metastore.barrierPrefixes", "org.apache.spark.sql.hive.execution.PairSerDe") .set("spark.sql.warehouse.dir", TestHiveContext.makeWarehouseDir().toURI.getPath) From 26dcbd6310507ad96d5cf5b124cd93e6b788a6ea Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 29 Aug 2017 12:48:07 -0700 Subject: [PATCH 2/3] fix --- .../org/apache/spark/sql/execution/SparkPlan.scala | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 04e8a407424a..4535e81eea4e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -54,19 +54,13 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ @transient final val sqlContext = SparkSession.getActiveSession.map(_.sqlContext).orNull - // whether we should fallback when hitting compilation errors caused by codegen - private val codeGenFallBack = sqlContext == null || sqlContext.conf.codegenFallback - protected def sparkContext = sqlContext.sparkContext - // sqlContext will be null when we are being deserialized on the slaves. In this instance - // the value of subexpressionEliminationEnabled will be set by the deserializer after the - // constructor has run. - val subexpressionEliminationEnabled: Boolean = if (sqlContext != null) { + // whether we should fallback when hitting compilation errors caused by codegen + private lazy val codeGenFallBack = sqlContext.conf.codegenFallback + + protected lazy val subexpressionEliminationEnabled = sqlContext.conf.subexpressionEliminationEnabled - } else { - false - } /** Overridden make copy also propagates sqlContext to copied plan. */ override def makeCopy(newArgs: Array[AnyRef]): SparkPlan = { From 4f39a5220f6b3d318ad0c07eeec54aed5dad20e8 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 29 Aug 2017 12:56:37 -0700 Subject: [PATCH 3/3] fix --- .../scala/org/apache/spark/sql/execution/SparkPlan.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 4535e81eea4e..b1db9dd9dd8a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -57,10 +57,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ protected def sparkContext = sqlContext.sparkContext // whether we should fallback when hitting compilation errors caused by codegen - private lazy val codeGenFallBack = sqlContext.conf.codegenFallback + private val codeGenFallBack = sqlContext.conf.codegenFallback - protected lazy val subexpressionEliminationEnabled = - sqlContext.conf.subexpressionEliminationEnabled + protected val subexpressionEliminationEnabled = sqlContext.conf.subexpressionEliminationEnabled /** Overridden make copy also propagates sqlContext to copied plan. */ override def makeCopy(newArgs: Array[AnyRef]): SparkPlan = {