From 45c02b52b8a40b103413c36288c10312c9d333d7 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Tue, 28 Feb 2017 04:35:04 +0900 Subject: [PATCH 01/15] Retry an execution by calling eval() if caught an exception --- .../execution/basicPhysicalOperators.scala | 29 +++++++++++++++---- .../org/apache/spark/sql/DataFrameSuite.scala | 11 +++++++ 2 files changed, 34 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 44278e37c527..46fa98217f21 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -213,12 +213,29 @@ case class FilterExec(condition: Expression, child: SparkPlan) protected override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") child.execute().mapPartitionsWithIndexInternal { (index, iter) => - val predicate = newPredicate(condition, child.output) - predicate.initialize(0) - iter.filter { row => - val r = predicate.eval(row) - if (r) numOutputRows += 1 - r + try { + val predicate = newPredicate(condition, child.output) + predicate.initialize(0) + iter.filter { row => + val r = predicate.eval(row) + if (r) numOutputRows += 1 + r + } + } catch { + case e: Exception => + iter.filter { row => + val str = condition.toString + val logMessage = if (str.length > 256) { + str.substring(0, 256 - 3) + "..." + } else { + str + } + logWarning(s"Codegen disabled for this expression:\n $logMessage") + val r = BindReferences.bindReference(condition, child.output) + .eval(row).isInstanceOf[Predicate] + if (r) numOutputRows += 1 + r + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index b4893b56a8a8..8e0aafd583d9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1731,5 +1731,16 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { .join(df2, df1("x") === df2("x1"), "left_outer") .filter($"x1".isNotNull || !$"y".isin("a!")) .count + } + + test("SPARK-19372: Filter can be executed w/o generated code due to JVM code size limit") { + val N = 400 + val rows = Seq(Row.fromSeq(Seq.fill(N)("string"))) + val schema = StructType(Seq.tabulate(N)(i => StructField(s"_c$i", StringType))) + val df = spark.createDataFrame(spark.sparkContext.makeRDD(rows), schema) + + val filter = (0 until N) + .foldLeft(lit(false))((e, index) => e.or(df.col(df.columns(index)) =!= "string")) + df.filter(filter).count } } From 6a94c2fd43bbb76c28f6b20b5f027bb4a2b8bc04 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Tue, 7 Mar 2017 00:59:17 +0900 Subject: [PATCH 02/15] catch only JaninoRuntimeException --- .../spark/sql/execution/basicPhysicalOperators.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 46fa98217f21..df82bee310ac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -20,6 +20,9 @@ package org.apache.spark.sql.execution import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration.Duration +import org.apache.commons.lang3.exception.ExceptionUtils +import org.codehaus.janino.JaninoRuntimeException + import org.apache.spark.{InterruptibleIterator, SparkException, TaskContext} import org.apache.spark.rdd.{PartitionwiseSampledRDD, RDD} import org.apache.spark.sql.catalyst.InternalRow @@ -32,6 +35,7 @@ import org.apache.spark.sql.types.LongType import org.apache.spark.util.ThreadUtils import org.apache.spark.util.random.{BernoulliCellSampler, PoissonSampler} + /** Physical plan for Project. */ case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryExecNode with CodegenSupport { @@ -222,7 +226,8 @@ case class FilterExec(condition: Expression, child: SparkPlan) r } } catch { - case e: Exception => + // JaninoRuntimeException is in a nested exception if Java compilation error occurs + case e: Exception if ExceptionUtils.getRootCause(e).isInstanceOf[JaninoRuntimeException] => iter.filter { row => val str = condition.toString val logMessage = if (str.length > 256) { From 1bb2211dff82c4186c4f479c3e5e4a60328facf1 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 9 Mar 2017 19:01:56 +0900 Subject: [PATCH 03/15] throw JaninoRuntimeException when a compilation error occurs --- .../expressions/codegen/CodeGenerator.scala | 22 +++++++++++++++---- .../sql/execution/WholeStageCodegenExec.scala | 4 +++- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 760ead42c762..28171874d330 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -27,7 +27,9 @@ import scala.language.existentials import scala.util.control.NonFatal import com.google.common.cache.{CacheBuilder, CacheLoader} -import org.codehaus.janino.{ByteArrayClassLoader, ClassBodyEvaluator, SimpleCompiler} +import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException} +import org.apache.commons.lang3.exception.ExceptionUtils +import org.codehaus.janino.{ByteArrayClassLoader, ClassBodyEvaluator, JaninoRuntimeException, SimpleCompiler} import org.codehaus.janino.util.ClassFile import org.apache.spark.{SparkEnv, TaskContext, TaskKilledException} @@ -899,8 +901,20 @@ object CodeGenerator extends Logging { /** * Compile the Java source code into a Java class, using Janino. */ - def compile(code: CodeAndComment): GeneratedClass = { + def compile(code: CodeAndComment): GeneratedClass = try { cache.get(code) + } catch { + // Cache.get() may wrap the original exception. See the following URL + // http://google.github.io/guava/releases/14.0/api/docs/com/google/common/cache/ + // Cache.html#get(K,%20java.util.concurrent.Callable) + case e : UncheckedExecutionException => + val excChains = ExceptionUtils.getThrowables(e) + val exc = if (excChains.length == 1) excChains(0) else excChains(excChains.length - 2) + throw exc + case e : ExecutionError => + val excChains = ExceptionUtils.getThrowables(e) + val exc = if (excChains.length == 1) excChains(0) else excChains(excChains.length - 2) + throw exc } /** @@ -951,10 +965,10 @@ object CodeGenerator extends Logging { evaluator.cook("generated.java", code.body) recordCompilationStats(evaluator) } catch { - case e: Exception => + case e: JaninoRuntimeException => val msg = s"failed to compile: $e\n$formatted" logError(msg, e) - throw new Exception(msg, e) + throw new JaninoRuntimeException(msg, e) } evaluator.getClazz().newInstance().asInstanceOf[GeneratedClass] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index c1e1a631c677..c2da3388b15f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution import java.util.Locale +import org.codehaus.janino.JaninoRuntimeException + import org.apache.spark.broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -374,7 +376,7 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co try { CodeGenerator.compile(cleanedSource) } catch { - case e: Exception if !Utils.isTesting && sqlContext.conf.wholeStageFallback => + case e: JaninoRuntimeException if !Utils.isTesting && sqlContext.conf.wholeStageFallback => // We should already saw the error message logWarning(s"Whole-stage codegen disabled for this plan:\n $treeString") return child.execute() From 96c10330842fe1153f2605b74a423c7aa54dccb3 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 9 Mar 2017 19:02:39 +0900 Subject: [PATCH 04/15] refactor newPredicate --- .../spark/sql/execution/SparkPlan.scala | 18 ++++++++++- .../execution/basicPhysicalOperators.scala | 30 ++++++------------- 2 files changed, 26 insertions(+), 22 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index cadab37a449a..9e9457f2ddc7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -22,6 +22,8 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, Da import scala.collection.mutable.ArrayBuffer import scala.concurrent.ExecutionContext +import org.codehaus.janino.JaninoRuntimeException + import org.apache.spark.{broadcast, SparkEnv} import org.apache.spark.internal.Logging import org.apache.spark.io.CompressionCodec @@ -355,7 +357,21 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ protected def newPredicate( expression: Expression, inputSchema: Seq[Attribute]): GenPredicate = { - GeneratePredicate.generate(expression, inputSchema) + try { + GeneratePredicate.generate(expression, inputSchema) + } catch { + case e: JaninoRuntimeException => + val str = expression.toString + val logMessage = if (str.length > 256) { + str.substring(0, 256 - 3) + "..." + } else { + str + } + logWarning(s"Codegen disabled for this expression:\n $logMessage") + null + case e: Exception => + throw e + } } protected def newOrdering( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index df82bee310ac..c12afee4943d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -20,9 +20,6 @@ package org.apache.spark.sql.execution import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration.Duration -import org.apache.commons.lang3.exception.ExceptionUtils -import org.codehaus.janino.JaninoRuntimeException - import org.apache.spark.{InterruptibleIterator, SparkException, TaskContext} import org.apache.spark.rdd.{PartitionwiseSampledRDD, RDD} import org.apache.spark.sql.catalyst.InternalRow @@ -217,30 +214,21 @@ case class FilterExec(condition: Expression, child: SparkPlan) protected override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") child.execute().mapPartitionsWithIndexInternal { (index, iter) => - try { - val predicate = newPredicate(condition, child.output) + val predicate = newPredicate(condition, child.output) + if (predicate != null) { predicate.initialize(0) iter.filter { row => val r = predicate.eval(row) if (r) numOutputRows += 1 r } - } catch { - // JaninoRuntimeException is in a nested exception if Java compilation error occurs - case e: Exception if ExceptionUtils.getRootCause(e).isInstanceOf[JaninoRuntimeException] => - iter.filter { row => - val str = condition.toString - val logMessage = if (str.length > 256) { - str.substring(0, 256 - 3) + "..." - } else { - str - } - logWarning(s"Codegen disabled for this expression:\n $logMessage") - val r = BindReferences.bindReference(condition, child.output) - .eval(row).isInstanceOf[Predicate] - if (r) numOutputRows += 1 - r - } + } else { + val predicate = BindReferences.bindReference(condition, child.output) + iter.filter { row => + val r = predicate.eval(row).isInstanceOf[Predicate] + if (r) numOutputRows += 1 + r + } } } } From e870e3d2d7ec5d9f970bef6e7e583b47ac28de85 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Fri, 10 Mar 2017 17:36:27 +0900 Subject: [PATCH 05/15] newPredicate always returns Predicate regardless of compilation error of Java code --- .../catalog/ExternalCatalogUtils.scala | 2 +- .../sql/catalyst/expressions/predicates.scala | 10 ++++++---- .../spark/sql/execution/SparkPlan.scala | 2 +- .../execution/basicPhysicalOperators.scala | 19 +++++-------------- .../PartitioningAwareFileIndex.scala | 2 +- .../sql/sources/SimpleTextRelation.scala | 2 +- 6 files changed, 15 insertions(+), 22 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala index 3ca9e6a8da5b..1fc3a654cfeb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala @@ -155,7 +155,7 @@ object ExternalCatalogUtils { }) inputPartitions.filter { p => - boundPredicate(p.toRow(partitionSchema, defaultTimeZoneId)) + boundPredicate.eval(p.toRow(partitionSchema, defaultTimeZoneId)) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index 5034566132f7..b2785f054bc3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -20,20 +20,22 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} +import org.apache.spark.sql.catalyst.expressions.codegen.{Predicate => GenPredicate} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.TypeUtils import org.apache.spark.sql.types._ object InterpretedPredicate { - def create(expression: Expression, inputSchema: Seq[Attribute]): (InternalRow => Boolean) = + def create(expression: Expression, inputSchema: Seq[Attribute]): InterpretedPredicate = create(BindReferences.bindReference(expression, inputSchema)) - def create(expression: Expression): (InternalRow => Boolean) = { - (r: InternalRow) => expression.eval(r).asInstanceOf[Boolean] - } + def create(expression: Expression): InterpretedPredicate = new InterpretedPredicate(expression) } +class InterpretedPredicate(expression: Expression) extends GenPredicate { + def eval(r: InternalRow): Boolean = expression.eval(r).asInstanceOf[Boolean] +} /** * An [[Expression]] that returns a boolean value. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 9e9457f2ddc7..0a5cf684afc2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -368,7 +368,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ str } logWarning(s"Codegen disabled for this expression:\n $logMessage") - null + InterpretedPredicate.create(expression, inputSchema) case e: Exception => throw e } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index c12afee4943d..f3b2d90d6052 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -215,20 +215,11 @@ case class FilterExec(condition: Expression, child: SparkPlan) val numOutputRows = longMetric("numOutputRows") child.execute().mapPartitionsWithIndexInternal { (index, iter) => val predicate = newPredicate(condition, child.output) - if (predicate != null) { - predicate.initialize(0) - iter.filter { row => - val r = predicate.eval(row) - if (r) numOutputRows += 1 - r - } - } else { - val predicate = BindReferences.bindReference(condition, child.output) - iter.filter { row => - val r = predicate.eval(row).isInstanceOf[Predicate] - if (r) numOutputRows += 1 - r - } + predicate.initialize(0) + iter.filter { row => + val r = predicate.eval(row) + if (r) numOutputRows += 1 + r } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index ffd7f6c750f8..6b6f6388d54e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -177,7 +177,7 @@ abstract class PartitioningAwareFileIndex( }) val selected = partitions.filter { - case PartitionPath(values, _) => boundPredicate(values) + case PartitionPath(values, _) => boundPredicate.eval(values) } logInfo { val total = partitions.length diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala index 9f4009bfe402..60a4638f610b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala @@ -103,7 +103,7 @@ class SimpleTextSource extends TextBasedFileFormat with DataSourceRegister { // `Cast`ed values are always of internal types (e.g. UTF8String instead of String) Cast(Literal(value), dataType).eval() }) - }.filter(predicate).map(projection) + }.filter(predicate.eval).map(projection) // Appends partition values val fullOutput = requiredSchema.toAttributes ++ partitionSchema.toAttributes From cb291cdcaf31c8c688f1b16137c99938fd15635d Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Mon, 13 Mar 2017 15:44:18 +0900 Subject: [PATCH 06/15] address review comments --- .../expressions/codegen/CodeGenerator.scala | 5 ++++ .../spark/sql/execution/SparkPlan.scala | 30 +++++++++++-------- .../sql/execution/WholeStageCodegenExec.scala | 2 +- .../execution/basicPhysicalOperators.scala | 1 - 4 files changed, 24 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 28171874d330..9591025471bd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -29,6 +29,7 @@ import scala.util.control.NonFatal import com.google.common.cache.{CacheBuilder, CacheLoader} import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException} import org.apache.commons.lang3.exception.ExceptionUtils +import org.codehaus.commons.compiler.CompileException import org.codehaus.janino.{ByteArrayClassLoader, ClassBodyEvaluator, JaninoRuntimeException, SimpleCompiler} import org.codehaus.janino.util.ClassFile @@ -969,6 +970,10 @@ object CodeGenerator extends Logging { val msg = s"failed to compile: $e\n$formatted" logError(msg, e) throw new JaninoRuntimeException(msg, e) + case e: CompileException => + val msg = s"failed to compile: $e\n$formatted" + logError(msg, e) + throw new CompileException(msg, e.asInstanceOf[CompileException].getLocation) } evaluator.getClazz().newInstance().asInstanceOf[GeneratedClass] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 0a5cf684afc2..8a5231b35ce2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -22,6 +22,7 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, Da import scala.collection.mutable.ArrayBuffer import scala.concurrent.ExecutionContext +import org.codehaus.commons.compiler.CompileException import org.codehaus.janino.JaninoRuntimeException import org.apache.spark.{broadcast, SparkEnv} @@ -36,7 +37,7 @@ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types.DataType -import org.apache.spark.util.ThreadUtils +import org.apache.spark.util.{ThreadUtils, Utils} /** * The base class for physical operators. @@ -355,22 +356,27 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ GenerateMutableProjection.generate(expressions, inputSchema, useSubexprElimination) } + private def genInterpretedPredicate( + expression: Expression, inputSchema: Seq[Attribute]): InterpretedPredicate = { + val str = expression.toString + val logMessage = if (str.length > 256) { + str.substring(0, 256 - 3) + "..." + } else { + str + } + logWarning(s"Codegen disabled for this expression:\n $logMessage") + InterpretedPredicate.create(expression, inputSchema) + } + protected def newPredicate( expression: Expression, inputSchema: Seq[Attribute]): GenPredicate = { try { GeneratePredicate.generate(expression, inputSchema) } catch { - case e: JaninoRuntimeException => - val str = expression.toString - val logMessage = if (str.length > 256) { - str.substring(0, 256 - 3) + "..." - } else { - str - } - logWarning(s"Codegen disabled for this expression:\n $logMessage") - InterpretedPredicate.create(expression, inputSchema) - case e: Exception => - throw e + case e: JaninoRuntimeException if !Utils.isTesting && sqlContext.conf.wholeStageFallback => + genInterpretedPredicate(expression, inputSchema) + case e: CompileException if !Utils.isTesting && sqlContext.conf.wholeStageFallback => + genInterpretedPredicate(expression, inputSchema) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index c2da3388b15f..aaef4edaa07f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -376,7 +376,7 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co try { CodeGenerator.compile(cleanedSource) } catch { - case e: JaninoRuntimeException if !Utils.isTesting && sqlContext.conf.wholeStageFallback => + case e: Exception if !Utils.isTesting && sqlContext.conf.wholeStageFallback => // We should already saw the error message logWarning(s"Whole-stage codegen disabled for this plan:\n $treeString") return child.execute() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index f3b2d90d6052..44278e37c527 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -32,7 +32,6 @@ import org.apache.spark.sql.types.LongType import org.apache.spark.util.ThreadUtils import org.apache.spark.util.random.{BernoulliCellSampler, PoissonSampler} - /** Physical plan for Project. */ case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryExecNode with CodegenSupport { From e653d50bc7660b01bbc6b4edbaab939376a862e5 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Mon, 13 Mar 2017 17:51:19 +0900 Subject: [PATCH 07/15] address review comment --- .../apache/spark/sql/catalyst/expressions/predicates.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index b2785f054bc3..ba1e9084d0b8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} -import org.apache.spark.sql.catalyst.expressions.codegen.{Predicate => GenPredicate} +import org.apache.spark.sql.catalyst.expressions.codegen.{Predicate => BasePredicate} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.TypeUtils import org.apache.spark.sql.types._ @@ -33,7 +33,7 @@ object InterpretedPredicate { def create(expression: Expression): InterpretedPredicate = new InterpretedPredicate(expression) } -class InterpretedPredicate(expression: Expression) extends GenPredicate { +class InterpretedPredicate(expression: Expression) extends BasePredicate { def eval(r: InternalRow): Boolean = expression.eval(r).asInstanceOf[Boolean] } From 8548b0e93ed036bc1fcd1c9a259cf82cc80dca48 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Tue, 14 Mar 2017 04:20:20 +0900 Subject: [PATCH 08/15] fix test failure --- .../main/scala/org/apache/spark/sql/execution/SparkPlan.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 8a5231b35ce2..b77bdfd39fa0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -373,9 +373,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ try { GeneratePredicate.generate(expression, inputSchema) } catch { - case e: JaninoRuntimeException if !Utils.isTesting && sqlContext.conf.wholeStageFallback => + case e: JaninoRuntimeException if sqlContext == null || sqlContext.conf.wholeStageFallback => genInterpretedPredicate(expression, inputSchema) - case e: CompileException if !Utils.isTesting && sqlContext.conf.wholeStageFallback => + case e: CompileException if sqlContext == null || sqlContext.conf.wholeStageFallback => genInterpretedPredicate(expression, inputSchema) } } From 51a71a6fc0a611cf04e9a77fe13a3f3606548ec1 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 20 Apr 2017 03:27:49 +0900 Subject: [PATCH 09/15] fix scala style error --- .../src/test/scala/org/apache/spark/sql/DataFrameSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 8e0aafd583d9..b239b370f523 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1744,3 +1744,4 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { df.filter(filter).count } } + From ea67c8a423f760f30fae76c4fe85c275a3800564 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 20 Apr 2017 03:42:52 +0900 Subject: [PATCH 10/15] fix scala style error --- .../src/test/scala/org/apache/spark/sql/DataFrameSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index b239b370f523..835d65193423 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1731,7 +1731,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { .join(df2, df1("x") === df2("x1"), "left_outer") .filter($"x1".isNotNull || !$"y".isin("a!")) .count - } + } test("SPARK-19372: Filter can be executed w/o generated code due to JVM code size limit") { val N = 400 From c2e6b8cabc820d24e882d3441d1738ab0368cecb Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 20 Apr 2017 10:50:27 +0900 Subject: [PATCH 11/15] fix code styles --- .../org/apache/spark/sql/execution/WholeStageCodegenExec.scala | 2 -- .../src/test/scala/org/apache/spark/sql/DataFrameSuite.scala | 1 - 2 files changed, 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index aaef4edaa07f..c1e1a631c677 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql.execution import java.util.Locale -import org.codehaus.janino.JaninoRuntimeException - import org.apache.spark.broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 835d65193423..e12e3969732e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1744,4 +1744,3 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { df.filter(filter).count } } - From 8b6ba755f75fc678b732b408e17c4a4670da1d92 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Fri, 21 Apr 2017 02:36:56 +0900 Subject: [PATCH 12/15] remove unused import --- .../main/scala/org/apache/spark/sql/execution/SparkPlan.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index b77bdfd39fa0..64a44696bcc7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types.DataType -import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.util.ThreadUtils /** * The base class for physical operators. From 1f19c80391879cdd9a449faa214303b6ac2729da Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Fri, 12 May 2017 17:53:45 +0900 Subject: [PATCH 13/15] addressed review comments --- .../sql/catalyst/expressions/codegen/CodeGenerator.scala | 8 ++------ .../spark/sql/catalyst/expressions/predicates.scala | 4 ++-- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 9591025471bd..db0aa8963de4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -908,11 +908,7 @@ object CodeGenerator extends Logging { // Cache.get() may wrap the original exception. See the following URL // http://google.github.io/guava/releases/14.0/api/docs/com/google/common/cache/ // Cache.html#get(K,%20java.util.concurrent.Callable) - case e : UncheckedExecutionException => - val excChains = ExceptionUtils.getThrowables(e) - val exc = if (excChains.length == 1) excChains(0) else excChains(excChains.length - 2) - throw exc - case e : ExecutionError => + case e @ (_: UncheckedExecutionException | _: ExecutionError) => val excChains = ExceptionUtils.getThrowables(e) val exc = if (excChains.length == 1) excChains(0) else excChains(excChains.length - 2) throw exc @@ -973,7 +969,7 @@ object CodeGenerator extends Logging { case e: CompileException => val msg = s"failed to compile: $e\n$formatted" logError(msg, e) - throw new CompileException(msg, e.asInstanceOf[CompileException].getLocation) + throw new CompileException(msg, e.getLocation) } evaluator.getClazz().newInstance().asInstanceOf[GeneratedClass] } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index ba1e9084d0b8..c15ee2ab270b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -33,8 +33,8 @@ object InterpretedPredicate { def create(expression: Expression): InterpretedPredicate = new InterpretedPredicate(expression) } -class InterpretedPredicate(expression: Expression) extends BasePredicate { - def eval(r: InternalRow): Boolean = expression.eval(r).asInstanceOf[Boolean] +case class InterpretedPredicate(expression: Expression) extends BasePredicate { + override def eval(r: InternalRow): Boolean = expression.eval(r).asInstanceOf[Boolean] } /** From 3868bf5591207f5a1973a17194caa3e7823ef399 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Tue, 16 May 2017 07:21:58 +0900 Subject: [PATCH 14/15] address review comment --- .../sql/catalyst/expressions/codegen/CodeGenerator.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index db0aa8963de4..f8da78b5f5e3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -909,9 +909,7 @@ object CodeGenerator extends Logging { // http://google.github.io/guava/releases/14.0/api/docs/com/google/common/cache/ // Cache.html#get(K,%20java.util.concurrent.Callable) case e @ (_: UncheckedExecutionException | _: ExecutionError) => - val excChains = ExceptionUtils.getThrowables(e) - val exc = if (excChains.length == 1) excChains(0) else excChains(excChains.length - 2) - throw exc + throw e.getCause } /** From a5fd4653af29b6549221e6897957cc18c3f3e7df Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Tue, 16 May 2017 22:40:46 +0900 Subject: [PATCH 15/15] address review comments --- .../scala/org/apache/spark/sql/execution/SparkPlan.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 64a44696bcc7..c4ed96640eb1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -373,9 +373,8 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ try { GeneratePredicate.generate(expression, inputSchema) } catch { - case e: JaninoRuntimeException if sqlContext == null || sqlContext.conf.wholeStageFallback => - genInterpretedPredicate(expression, inputSchema) - case e: CompileException if sqlContext == null || sqlContext.conf.wholeStageFallback => + case e @ (_: JaninoRuntimeException | _: CompileException) + if sqlContext == null || sqlContext.conf.wholeStageFallback => genInterpretedPredicate(expression, inputSchema) } }