From ed2ccf05be7613611d80e2e4b95fd3aec79134b8 Mon Sep 17 00:00:00 2001 From: William Benton Date: Wed, 12 Mar 2014 21:56:32 -0500 Subject: [PATCH 1/8] Test cases for SPARK-897. Tests to make sure that passing an unserializable closure to a transformation fails fast. --- .../ProactiveClosureSerializationSuite.scala | 79 +++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 core/src/test/scala/org/apache/spark/serializer/ProactiveClosureSerializationSuite.scala diff --git a/core/src/test/scala/org/apache/spark/serializer/ProactiveClosureSerializationSuite.scala b/core/src/test/scala/org/apache/spark/serializer/ProactiveClosureSerializationSuite.scala new file mode 100644 index 0000000000000..84a9824359f1b --- /dev/null +++ b/core/src/test/scala/org/apache/spark/serializer/ProactiveClosureSerializationSuite.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer; + +import java.io.NotSerializableException + +import org.scalatest.FunSuite + +import org.apache.spark.rdd.RDD +import org.apache.spark.SparkException +import org.apache.spark.SharedSparkContext + +/* A trivial (but unserializable) container for trivial functions */ +class UnserializableClass { + def op[T](x: T) = x.toString + + def pred[T](x: T) = x.toString.length % 2 == 0 +} + +class ProactiveClosureSerializationSuite extends FunSuite with SharedSparkContext { + + def fixture = (sc.parallelize(0 until 1000).map(_.toString), new UnserializableClass) + + test("throws expected serialization exceptions on actions") { + val (data, uc) = fixture + + val ex = intercept[SparkException] { + data.map(uc.op(_)).count + } + + assert(ex.getMessage.contains("Task not serializable")) + } + + // There is probably a cleaner way to eliminate boilerplate here, but we're + // iterating over a map from transformation names to functions that perform that + // transformation on a given RDD, creating one test case for each + + for (transformation <- + Map("map" -> xmap _, "flatMap" -> xflatMap _, "filter" -> xfilter _, "mapWith" -> xmapWith _, + "mapPartitions" -> xmapPartitions _, "mapPartitionsWithIndex" -> xmapPartitionsWithIndex _, + "mapPartitionsWithContext" -> xmapPartitionsWithContext _, "filterWith" -> xfilterWith _)) { + val (name, xf) = transformation + + test(s"$name transformations throw proactive serialization exceptions") { + val (data, uc) = fixture + + val ex = intercept[SparkException] { + xf(data, uc) + } + + assert(ex.getMessage.contains("Task not serializable"), s"RDD.$name doesn't proactively throw NotSerializableException") + } + } + + private def xmap(x: RDD[String], uc: UnserializableClass): RDD[String] = x.map(y=>uc.op(y)) + private def xmapWith(x: RDD[String], uc: UnserializableClass): RDD[String] = x.mapWith(x => x.toString)((x,y)=>x + uc.op(y)) + private def xflatMap(x: RDD[String], uc: UnserializableClass): RDD[String] = x.flatMap(y=>Seq(uc.op(y))) + private def xfilter(x: RDD[String], uc: UnserializableClass): RDD[String] = x.filter(y=>uc.pred(y)) + private def xfilterWith(x: RDD[String], uc: UnserializableClass): RDD[String] = x.filterWith(x => x.toString)((x,y)=>uc.pred(y)) + private def xmapPartitions(x: RDD[String], uc: UnserializableClass): RDD[String] = x.mapPartitions(_.map(y=>uc.op(y))) + private def xmapPartitionsWithIndex(x: RDD[String], uc: UnserializableClass): RDD[String] = x.mapPartitionsWithIndex((_, it) => it.map(y=>uc.op(y))) + private def xmapPartitionsWithContext(x: RDD[String], uc: UnserializableClass): RDD[String] = x.mapPartitionsWithContext((_, it) => it.map(y=>uc.op(y))) + +} From 5bfff2473dcbab605d00d586399dd500ea8c4d40 Mon Sep 17 00:00:00 2001 From: William Benton Date: Thu, 13 Mar 2014 14:40:42 -0500 Subject: [PATCH 2/8] Adds proactive closure-serializablilty checking ClosureCleaner.clean now checks to ensure that its closure argument is serializable by default and throws a SparkException with the underlying NotSerializableException in the detail message otherwise. As a result, transformation invocations with unserializable closures will fail at their call sites rather than when they actually execute. ClosureCleaner.clean now takes a second boolean argument; pass false to disable serializability-checking behavior at call sites where this behavior isn't desired. --- .../org/apache/spark/util/ClosureCleaner.scala | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index 4916d9b86cca5..61c535e725d46 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -25,7 +25,7 @@ import scala.collection.mutable.Set import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.{ClassReader, ClassVisitor, MethodVisitor, Type} import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes._ -import org.apache.spark.{Logging, SparkException} +import org.apache.spark.{Logging, SparkEnv, SparkException} private[spark] object ClosureCleaner extends Logging { // Get an ASM class reader for a given class from the JAR that loaded it @@ -101,7 +101,7 @@ private[spark] object ClosureCleaner extends Logging { } } - def clean(func: AnyRef) { + def clean(func: AnyRef, checkSerializable: Boolean = true) { // TODO: cache outerClasses / innerClasses / accessedFields val outerClasses = getOuterClasses(func) val innerClasses = getInnerClasses(func) @@ -153,6 +153,18 @@ private[spark] object ClosureCleaner extends Logging { field.setAccessible(true) field.set(func, outer) } + + if (checkSerializable) { + ensureSerializable(func) + } + } + + private def ensureSerializable(func: AnyRef) { + try { + SparkEnv.get.closureSerializer.newInstance().serialize(func) + } catch { + case ex: Exception => throw new SparkException("Task not serializable: " + ex.toString) + } } private def instantiateClass(cls: Class[_], outer: AnyRef, inInterpreter: Boolean): AnyRef = { From abe816b726c8647caa5c8ebcc5047189eb2dd32d Mon Sep 17 00:00:00 2001 From: William Benton Date: Fri, 14 Mar 2014 12:33:33 -0500 Subject: [PATCH 3/8] Make proactive serializability checking optional. SparkContext.clean uses ClosureCleaner's proactive serializability checking by default. This commit adds a parameter to the clean method of SparkContext that allows clients to specify that serializability checking should not occur as part of closure cleaning. --- core/src/main/scala/org/apache/spark/SparkContext.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 0678bdd02110e..89327e823024e 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1205,8 +1205,8 @@ class SparkContext(config: SparkConf) extends Logging { * Clean a closure to make it ready to serialized and send to tasks * (removes unreferenced variables in $outer's, updates REPL variables) */ - private[spark] def clean[F <: AnyRef](f: F): F = { - ClosureCleaner.clean(f) + private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = { + ClosureCleaner.clean(f, checkSerializable) f } From be1ecd693189a61329efb540e6dce2243ea209eb Mon Sep 17 00:00:00 2001 From: William Benton Date: Fri, 14 Mar 2014 12:34:42 -0500 Subject: [PATCH 4/8] Don't check serializability of DStream transforms. Since the DStream is reachable from within these closures, they aren't checkable by the straightforward technique of passing them to the closure serializer. --- .../org/apache/spark/streaming/dstream/DStream.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 4709a62381647..c93846b364ea9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -532,7 +532,7 @@ abstract class DStream[T: ClassTag] ( * 'this' DStream will be registered as an output stream and therefore materialized. */ def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) { - new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register() + new ForEachDStream(this, context.sparkContext.clean(foreachFunc, false)).register() } /** @@ -540,7 +540,7 @@ abstract class DStream[T: ClassTag] ( * on each RDD of 'this' DStream. */ def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = { - transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r))) + transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r), false)) } /** @@ -548,7 +548,7 @@ abstract class DStream[T: ClassTag] ( * on each RDD of 'this' DStream. */ def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = { - val cleanedF = context.sparkContext.clean(transformFunc) + val cleanedF = context.sparkContext.clean(transformFunc, false) val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { assert(rdds.length == 1) cleanedF(rdds.head.asInstanceOf[RDD[T]], time) @@ -563,7 +563,7 @@ abstract class DStream[T: ClassTag] ( def transformWith[U: ClassTag, V: ClassTag]( other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V] ): DStream[V] = { - val cleanedF = ssc.sparkContext.clean(transformFunc) + val cleanedF = ssc.sparkContext.clean(transformFunc, false) transformWith(other, (rdd1: RDD[T], rdd2: RDD[U], time: Time) => cleanedF(rdd1, rdd2)) } @@ -574,7 +574,7 @@ abstract class DStream[T: ClassTag] ( def transformWith[U: ClassTag, V: ClassTag]( other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V] ): DStream[V] = { - val cleanedF = ssc.sparkContext.clean(transformFunc) + val cleanedF = ssc.sparkContext.clean(transformFunc, false) val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { assert(rdds.length == 2) val rdd1 = rdds(0).asInstanceOf[RDD[T]] From b215dea0c17fad9fb793b22ea45345d97f673dbe Mon Sep 17 00:00:00 2001 From: William Benton Date: Thu, 1 May 2014 09:31:44 -0500 Subject: [PATCH 5/8] Fixed spurious failures in ImplicitOrderingSuite --- .../apache/spark/ImplicitOrderingSuite.scala | 75 +++++++++++++------ 1 file changed, 52 insertions(+), 23 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/ImplicitOrderingSuite.scala b/core/src/test/scala/org/apache/spark/ImplicitOrderingSuite.scala index 4bd889135631b..8e4a9e2c9f56c 100644 --- a/core/src/test/scala/org/apache/spark/ImplicitOrderingSuite.scala +++ b/core/src/test/scala/org/apache/spark/ImplicitOrderingSuite.scala @@ -19,9 +19,29 @@ package org.apache.spark import org.scalatest.FunSuite +import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ class ImplicitOrderingSuite extends FunSuite with LocalSparkContext { + // Tests that PairRDDFunctions grabs an implicit Ordering in various cases where it should. + test("basic inference of Orderings"){ + sc = new SparkContext("local", "test") + val rdd = sc.parallelize(1 to 10) + + // These RDD methods are in the companion object so that the unserializable ScalaTest Engine + // won't be reachable from the closure object + + // Infer orderings after basic maps to particular types + val basicMapExpectations = ImplicitOrderingSuite.basicMapExpectations(rdd) + basicMapExpectations.map({case (met, explain) => assert(met, explain)}) + + // Infer orderings for other RDD methods + val otherRDDMethodExpectations = ImplicitOrderingSuite.otherRDDMethodExpectations(rdd) + otherRDDMethodExpectations.map({case (met, explain) => assert(met, explain)}) + } +} + +private object ImplicitOrderingSuite { class NonOrderedClass {} class ComparableClass extends Comparable[ComparableClass] { @@ -31,27 +51,36 @@ class ImplicitOrderingSuite extends FunSuite with LocalSparkContext { class OrderedClass extends Ordered[OrderedClass] { override def compare(o: OrderedClass): Int = ??? } - - // Tests that PairRDDFunctions grabs an implicit Ordering in various cases where it should. - test("basic inference of Orderings"){ - sc = new SparkContext("local", "test") - val rdd = sc.parallelize(1 to 10) - - // Infer orderings after basic maps to particular types - assert(rdd.map(x => (x, x)).keyOrdering.isDefined) - assert(rdd.map(x => (1, x)).keyOrdering.isDefined) - assert(rdd.map(x => (x.toString, x)).keyOrdering.isDefined) - assert(rdd.map(x => (null, x)).keyOrdering.isDefined) - assert(rdd.map(x => (new NonOrderedClass, x)).keyOrdering.isEmpty) - assert(rdd.map(x => (new ComparableClass, x)).keyOrdering.isDefined) - assert(rdd.map(x => (new OrderedClass, x)).keyOrdering.isDefined) - - // Infer orderings for other RDD methods - assert(rdd.groupBy(x => x).keyOrdering.isDefined) - assert(rdd.groupBy(x => new NonOrderedClass).keyOrdering.isEmpty) - assert(rdd.groupBy(x => new ComparableClass).keyOrdering.isDefined) - assert(rdd.groupBy(x => new OrderedClass).keyOrdering.isDefined) - assert(rdd.groupBy((x: Int) => x, 5).keyOrdering.isDefined) - assert(rdd.groupBy((x: Int) => x, new HashPartitioner(5)).keyOrdering.isDefined) + + def basicMapExpectations(rdd: RDD[Int]) = { + List((rdd.map(x => (x, x)).keyOrdering.isDefined, + "rdd.map(x => (x, x)).keyOrdering.isDefined"), + (rdd.map(x => (1, x)).keyOrdering.isDefined, + "rdd.map(x => (1, x)).keyOrdering.isDefined"), + (rdd.map(x => (x.toString, x)).keyOrdering.isDefined, + "rdd.map(x => (x.toString, x)).keyOrdering.isDefined"), + (rdd.map(x => (null, x)).keyOrdering.isDefined, + "rdd.map(x => (null, x)).keyOrdering.isDefined"), + (rdd.map(x => (new NonOrderedClass, x)).keyOrdering.isEmpty, + "rdd.map(x => (new NonOrderedClass, x)).keyOrdering.isEmpty"), + (rdd.map(x => (new ComparableClass, x)).keyOrdering.isDefined, + "rdd.map(x => (new ComparableClass, x)).keyOrdering.isDefined"), + (rdd.map(x => (new OrderedClass, x)).keyOrdering.isDefined, + "rdd.map(x => (new OrderedClass, x)).keyOrdering.isDefined")) } -} + + def otherRDDMethodExpectations(rdd: RDD[Int]) = { + List((rdd.groupBy(x => x).keyOrdering.isDefined, + "rdd.groupBy(x => x).keyOrdering.isDefined"), + (rdd.groupBy(x => new NonOrderedClass).keyOrdering.isEmpty, + "rdd.groupBy(x => new NonOrderedClass).keyOrdering.isEmpty"), + (rdd.groupBy(x => new ComparableClass).keyOrdering.isDefined, + "rdd.groupBy(x => new ComparableClass).keyOrdering.isDefined"), + (rdd.groupBy(x => new OrderedClass).keyOrdering.isDefined, + "rdd.groupBy(x => new OrderedClass).keyOrdering.isDefined"), + (rdd.groupBy((x: Int) => x, 5).keyOrdering.isDefined, + "rdd.groupBy((x: Int) => x, 5).keyOrdering.isDefined"), + (rdd.groupBy((x: Int) => x, new HashPartitioner(5)).keyOrdering.isDefined, + "rdd.groupBy((x: Int) => x, new HashPartitioner(5)).keyOrdering.isDefined")) + } +} \ No newline at end of file From 3b3f74a4ae4834ca20be7f4f2ea3ebdefe8576ef Mon Sep 17 00:00:00 2001 From: William Benton Date: Sun, 29 Jun 2014 10:49:18 -0500 Subject: [PATCH 6/8] Stylistic and doc cleanups. --- .../scala/org/apache/spark/SparkContext.scala | 8 +++++ .../apache/spark/util/ClosureCleaner.scala | 2 +- .../ProactiveClosureSerializationSuite.scala | 35 ++++++++++++------- 3 files changed, 32 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 89327e823024e..64a90e18d2b29 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1204,6 +1204,14 @@ class SparkContext(config: SparkConf) extends Logging { /** * Clean a closure to make it ready to serialized and send to tasks * (removes unreferenced variables in $outer's, updates REPL variables) + * If checkSerializable is set, clean will also proactively + * check to see if f is serializable and throw a SparkException + * if not. + * + * @param f the closure to clean + * @param checkSerializable whether or not to immediately check f for serializability + * @throws SparkException if checkSerializable is set but f is not + * serializable */ private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = { ClosureCleaner.clean(f, checkSerializable) diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index 61c535e725d46..e3f52f6ff1e63 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -163,7 +163,7 @@ private[spark] object ClosureCleaner extends Logging { try { SparkEnv.get.closureSerializer.newInstance().serialize(func) } catch { - case ex: Exception => throw new SparkException("Task not serializable: " + ex.toString) + case ex: Exception => throw new SparkException("Task not serializable", ex) } } diff --git a/core/src/test/scala/org/apache/spark/serializer/ProactiveClosureSerializationSuite.scala b/core/src/test/scala/org/apache/spark/serializer/ProactiveClosureSerializationSuite.scala index 84a9824359f1b..5d15a68ac7e4f 100644 --- a/core/src/test/scala/org/apache/spark/serializer/ProactiveClosureSerializationSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/ProactiveClosureSerializationSuite.scala @@ -51,9 +51,11 @@ class ProactiveClosureSerializationSuite extends FunSuite with SharedSparkContex // transformation on a given RDD, creating one test case for each for (transformation <- - Map("map" -> xmap _, "flatMap" -> xflatMap _, "filter" -> xfilter _, "mapWith" -> xmapWith _, - "mapPartitions" -> xmapPartitions _, "mapPartitionsWithIndex" -> xmapPartitionsWithIndex _, - "mapPartitionsWithContext" -> xmapPartitionsWithContext _, "filterWith" -> xfilterWith _)) { + Map("map" -> xmap _, "flatMap" -> xflatMap _, "filter" -> xfilter _, + "mapWith" -> xmapWith _, "mapPartitions" -> xmapPartitions _, + "mapPartitionsWithIndex" -> xmapPartitionsWithIndex _, + "mapPartitionsWithContext" -> xmapPartitionsWithContext _, + "filterWith" -> xfilterWith _)) { val (name, xf) = transformation test(s"$name transformations throw proactive serialization exceptions") { @@ -63,17 +65,26 @@ class ProactiveClosureSerializationSuite extends FunSuite with SharedSparkContex xf(data, uc) } - assert(ex.getMessage.contains("Task not serializable"), s"RDD.$name doesn't proactively throw NotSerializableException") + assert(ex.getMessage.contains("Task not serializable"), + s"RDD.$name doesn't proactively throw NotSerializableException") } } - private def xmap(x: RDD[String], uc: UnserializableClass): RDD[String] = x.map(y=>uc.op(y)) - private def xmapWith(x: RDD[String], uc: UnserializableClass): RDD[String] = x.mapWith(x => x.toString)((x,y)=>x + uc.op(y)) - private def xflatMap(x: RDD[String], uc: UnserializableClass): RDD[String] = x.flatMap(y=>Seq(uc.op(y))) - private def xfilter(x: RDD[String], uc: UnserializableClass): RDD[String] = x.filter(y=>uc.pred(y)) - private def xfilterWith(x: RDD[String], uc: UnserializableClass): RDD[String] = x.filterWith(x => x.toString)((x,y)=>uc.pred(y)) - private def xmapPartitions(x: RDD[String], uc: UnserializableClass): RDD[String] = x.mapPartitions(_.map(y=>uc.op(y))) - private def xmapPartitionsWithIndex(x: RDD[String], uc: UnserializableClass): RDD[String] = x.mapPartitionsWithIndex((_, it) => it.map(y=>uc.op(y))) - private def xmapPartitionsWithContext(x: RDD[String], uc: UnserializableClass): RDD[String] = x.mapPartitionsWithContext((_, it) => it.map(y=>uc.op(y))) + private def xmap(x: RDD[String], uc: UnserializableClass): RDD[String] = + x.map(y=>uc.op(y)) + private def xmapWith(x: RDD[String], uc: UnserializableClass): RDD[String] = + x.mapWith(x => x.toString)((x,y)=>x + uc.op(y)) + private def xflatMap(x: RDD[String], uc: UnserializableClass): RDD[String] = + x.flatMap(y=>Seq(uc.op(y))) + private def xfilter(x: RDD[String], uc: UnserializableClass): RDD[String] = + x.filter(y=>uc.pred(y)) + private def xfilterWith(x: RDD[String], uc: UnserializableClass): RDD[String] = + x.filterWith(x => x.toString)((x,y)=>uc.pred(y)) + private def xmapPartitions(x: RDD[String], uc: UnserializableClass): RDD[String] = + x.mapPartitions(_.map(y=>uc.op(y))) + private def xmapPartitionsWithIndex(x: RDD[String], uc: UnserializableClass): RDD[String] = + x.mapPartitionsWithIndex((_, it) => it.map(y=>uc.op(y))) + private def xmapPartitionsWithContext(x: RDD[String], uc: UnserializableClass): RDD[String] = + x.mapPartitionsWithContext((_, it) => it.map(y=>uc.op(y))) } From 64d04d28d57bcca53a164a59800fc63d9e263299 Mon Sep 17 00:00:00 2001 From: William Benton Date: Sun, 29 Jun 2014 11:41:40 -0500 Subject: [PATCH 7/8] FailureSuite now checks both messages and causes. --- .../test/scala/org/apache/spark/FailureSuite.scala | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/FailureSuite.scala b/core/src/test/scala/org/apache/spark/FailureSuite.scala index 12dbebcb28644..e755d2e309398 100644 --- a/core/src/test/scala/org/apache/spark/FailureSuite.scala +++ b/core/src/test/scala/org/apache/spark/FailureSuite.scala @@ -22,6 +22,8 @@ import org.scalatest.FunSuite import org.apache.spark.SparkContext._ import org.apache.spark.util.NonSerializable +import java.io.NotSerializableException + // Common state shared by FailureSuite-launched tasks. We use a global object // for this because any local variables used in the task closures will rightfully // be copied for each task, so there's no other way for them to share state. @@ -102,7 +104,8 @@ class FailureSuite extends FunSuite with LocalSparkContext { results.collect() } assert(thrown.getClass === classOf[SparkException]) - assert(thrown.getMessage.contains("NotSerializableException")) + assert(thrown.getMessage.contains("NotSerializableException") || + thrown.getCause.getClass === classOf[NotSerializableException]) FailureSuiteState.clear() } @@ -116,21 +119,24 @@ class FailureSuite extends FunSuite with LocalSparkContext { sc.parallelize(1 to 10, 2).map(x => a).count() } assert(thrown.getClass === classOf[SparkException]) - assert(thrown.getMessage.contains("NotSerializableException")) + assert(thrown.getMessage.contains("NotSerializableException") || + thrown.getCause.getClass === classOf[NotSerializableException]) // Non-serializable closure in an earlier stage val thrown1 = intercept[SparkException] { sc.parallelize(1 to 10, 2).map(x => (x, a)).partitionBy(new HashPartitioner(3)).count() } assert(thrown1.getClass === classOf[SparkException]) - assert(thrown1.getMessage.contains("NotSerializableException")) + assert(thrown1.getMessage.contains("NotSerializableException") || + thrown1.getCause.getClass === classOf[NotSerializableException]) // Non-serializable closure in foreach function val thrown2 = intercept[SparkException] { sc.parallelize(1 to 10, 2).foreach(x => println(a)) } assert(thrown2.getClass === classOf[SparkException]) - assert(thrown2.getMessage.contains("NotSerializableException")) + assert(thrown2.getMessage.contains("NotSerializableException") || + thrown2.getCause.getClass === classOf[NotSerializableException]) FailureSuiteState.clear() } From bceab8a20a203d8e004ab4eaa4a3a971b3013498 Mon Sep 17 00:00:00 2001 From: William Benton Date: Sun, 29 Jun 2014 22:29:46 -0500 Subject: [PATCH 8/8] Commented DStream corner cases for serializability checking. --- .../apache/spark/streaming/dstream/DStream.scala | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index c93846b364ea9..e05db236addca 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -532,6 +532,9 @@ abstract class DStream[T: ClassTag] ( * 'this' DStream will be registered as an output stream and therefore materialized. */ def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) { + // because the DStream is reachable from the outer object here, and because + // DStreams can't be serialized with closures, we can't proactively check + // it for serializability and so we pass the optional false to SparkContext.clean new ForEachDStream(this, context.sparkContext.clean(foreachFunc, false)).register() } @@ -540,6 +543,9 @@ abstract class DStream[T: ClassTag] ( * on each RDD of 'this' DStream. */ def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = { + // because the DStream is reachable from the outer object here, and because + // DStreams can't be serialized with closures, we can't proactively check + // it for serializability and so we pass the optional false to SparkContext.clean transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r), false)) } @@ -548,6 +554,9 @@ abstract class DStream[T: ClassTag] ( * on each RDD of 'this' DStream. */ def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = { + // because the DStream is reachable from the outer object here, and because + // DStreams can't be serialized with closures, we can't proactively check + // it for serializability and so we pass the optional false to SparkContext.clean val cleanedF = context.sparkContext.clean(transformFunc, false) val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { assert(rdds.length == 1) @@ -563,6 +572,9 @@ abstract class DStream[T: ClassTag] ( def transformWith[U: ClassTag, V: ClassTag]( other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V] ): DStream[V] = { + // because the DStream is reachable from the outer object here, and because + // DStreams can't be serialized with closures, we can't proactively check + // it for serializability and so we pass the optional false to SparkContext.clean val cleanedF = ssc.sparkContext.clean(transformFunc, false) transformWith(other, (rdd1: RDD[T], rdd2: RDD[U], time: Time) => cleanedF(rdd1, rdd2)) } @@ -574,6 +586,9 @@ abstract class DStream[T: ClassTag] ( def transformWith[U: ClassTag, V: ClassTag]( other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V] ): DStream[V] = { + // because the DStream is reachable from the outer object here, and because + // DStreams can't be serialized with closures, we can't proactively check + // it for serializability and so we pass the optional false to SparkContext.clean val cleanedF = ssc.sparkContext.clean(transformFunc, false) val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { assert(rdds.length == 2)