diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 87deaf20e2b25..d1f0e1a45aa18 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -24,7 +24,6 @@ import scala.collection.JavaConverters._ import scala.language.implicitConversions import scala.reflect.ClassTag -import com.google.common.base.Optional import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.mapred.{JobConf, OutputFormat} @@ -472,9 +471,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * partition the output RDD. */ def leftOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner) - : JavaPairRDD[K, (V, Optional[W])] = { - val joinResult = rdd.leftOuterJoin(other, partitioner) - fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}) + : JavaPairRDD[K, (V, Option[W])] = { + fromRDD(rdd.leftOuterJoin(other, partitioner)) } /** @@ -484,9 +482,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * partition the output RDD. */ def rightOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner) - : JavaPairRDD[K, (Optional[V], W)] = { - val joinResult = rdd.rightOuterJoin(other, partitioner) - fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}) + : JavaPairRDD[K, (Option[V], W)] = { + fromRDD(rdd.rightOuterJoin(other, partitioner)) } /** @@ -498,11 +495,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * in `this` have key k. Uses the given Partitioner to partition the output RDD. */ def fullOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner) - : JavaPairRDD[K, (Optional[V], Optional[W])] = { - val joinResult = rdd.fullOuterJoin(other, partitioner) - fromRDD(joinResult.mapValues{ case (v, w) => - (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w)) - }) + : JavaPairRDD[K, (Option[V], Option[W])] = { + fromRDD(rdd.fullOuterJoin(other, partitioner)) } /** @@ -559,9 +553,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output * using the existing partitioner/parallelism level. */ - def leftOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, Optional[W])] = { - val joinResult = rdd.leftOuterJoin(other) - fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}) + def leftOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, Option[W])] = { + fromRDD(rdd.leftOuterJoin(other)) } /** @@ -571,9 +564,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * into `numPartitions` partitions. */ def leftOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int) - : JavaPairRDD[K, (V, Optional[W])] = { - val joinResult = rdd.leftOuterJoin(other, numPartitions) - fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}) + : JavaPairRDD[K, (V, Option[W])] = { + fromRDD(rdd.leftOuterJoin(other, numPartitions)) } /** @@ -582,9 +574,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting * RDD using the existing partitioner/parallelism level. */ - def rightOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], W)] = { - val joinResult = rdd.rightOuterJoin(other) - fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}) + def rightOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Option[V], W)] = { + fromRDD(rdd.rightOuterJoin(other)) } /** @@ -594,9 +585,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * RDD into the given number of partitions. */ def rightOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int) - : JavaPairRDD[K, (Optional[V], W)] = { - val joinResult = rdd.rightOuterJoin(other, numPartitions) - fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}) + : JavaPairRDD[K, (Option[V], W)] = { + fromRDD(rdd.rightOuterJoin(other, numPartitions)) } /** @@ -608,11 +598,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * in `this` have key k. Hash-partitions the resulting RDD using the existing partitioner/ * parallelism level. */ - def fullOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], Optional[W])] = { - val joinResult = rdd.fullOuterJoin(other) - fromRDD(joinResult.mapValues{ case (v, w) => - (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w)) - }) + def fullOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Option[V], Option[W])] = { + fromRDD(rdd.fullOuterJoin(other)) } /** @@ -624,11 +611,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * in `this` have key k. Hash-partitions the resulting RDD into the given number of partitions. */ def fullOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int) - : JavaPairRDD[K, (Optional[V], Optional[W])] = { - val joinResult = rdd.fullOuterJoin(other, numPartitions) - fromRDD(joinResult.mapValues{ case (v, w) => - (JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w)) - }) + : JavaPairRDD[K, (Option[V], Option[W])] = { + fromRDD(rdd.fullOuterJoin(other, numPartitions)) } /** diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala index ed312770ee131..8928695aa998f 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala @@ -17,8 +17,6 @@ package org.apache.spark.api.java -import java.util.Comparator - import scala.language.implicitConversions import scala.reflect.ClassTag diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 0e4d7dce0f2f5..b4b2462064b57 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -24,7 +24,6 @@ import java.util.{Comparator, List => JList, Iterator => JIterator} import scala.collection.JavaConverters._ import scala.reflect.ClassTag -import com.google.common.base.Optional import org.apache.hadoop.io.compress.CompressionCodec import org.apache.spark._ @@ -68,7 +67,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def getNumPartitions: Int = rdd.getNumPartitions /** The partitioner of this RDD. */ - def partitioner: Optional[Partitioner] = JavaUtils.optionToOptional(rdd.partitioner) + def partitioner: Option[Partitioner] = rdd.partitioner /** The [[org.apache.spark.SparkContext]] that this RDD was created on. */ def context: SparkContext = rdd.context @@ -550,9 +549,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { /** * Gets the name of the file to which this RDD was checkpointed */ - def getCheckpointFile(): Optional[String] = { - JavaUtils.optionToOptional(rdd.getCheckpointFile) - } + def getCheckpointFile(): Option[String] = rdd.getCheckpointFile /** A description of this RDD and its recursive dependencies for debugging. */ def toDebugString(): String = { diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 4f54cd69e2175..aea4722ae38e7 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -25,7 +25,6 @@ import scala.collection.JavaConverters._ import scala.language.implicitConversions import scala.reflect.ClassTag -import com.google.common.base.Optional import org.apache.hadoop.conf.Configuration import org.apache.spark.input.PortableDataStream import org.apache.hadoop.mapred.{InputFormat, JobConf} @@ -650,7 +649,7 @@ class JavaSparkContext(val sc: SparkContext) * or the spark.home Java property, or the SPARK_HOME environment variable * (in that order of preference). If neither of these is set, return None. */ - def getSparkHome(): Optional[String] = JavaUtils.optionToOptional(sc.getSparkHome()) + def getSparkHome(): Option[String] = sc.getSparkHome() /** * Add a file to be downloaded with this Spark job on every node. @@ -707,7 +706,7 @@ class JavaSparkContext(val sc: SparkContext) sc.setCheckpointDir(dir) } - def getCheckpointDir: Optional[String] = JavaUtils.optionToOptional(sc.getCheckpointDir) + def getCheckpointDir: Option[String] = sc.getCheckpointDir protected def checkpointFile[T](path: String): JavaRDD[T] = { implicit val ctag: ClassTag[T] = fakeClassTag diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala b/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala index 8f9647eea9e25..e6defecba67b0 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala @@ -18,18 +18,11 @@ package org.apache.spark.api.java import java.util.Map.Entry - -import com.google.common.base.Optional - import java.{util => ju} + import scala.collection.mutable private[spark] object JavaUtils { - def optionToOptional[T](option: Option[T]): Optional[T] = - option match { - case Some(value) => Optional.of(value) - case None => Optional.absent() - } // Workaround for SPARK-3926 / SI-8911 def mapAsSerializableJavaMap[A, B](underlying: collection.Map[A, B]): SerializableMapWrapper[A, B] diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 11f1248c24d38..84c1e91ef59a7 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -24,6 +24,7 @@ import java.util.*; import java.util.concurrent.*; +import scala.Option; import scala.Tuple2; import scala.Tuple3; import scala.Tuple4; @@ -35,7 +36,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.base.Throwables; -import com.google.common.base.Optional; import com.google.common.base.Charsets; import com.google.common.io.Files; import org.apache.hadoop.io.IntWritable; @@ -221,7 +221,7 @@ public int getPartition(Object key) { JavaPairRDD repartitioned = rdd.repartitionAndSortWithinPartitions(partitioner); - Assert.assertTrue(repartitioned.partitioner().isPresent()); + Assert.assertTrue(repartitioned.partitioner().isDefined()); Assert.assertEquals(repartitioned.partitioner().get(), partitioner); List>> partitions = repartitioned.glom().collect(); Assert.assertEquals(partitions.get(0), @@ -491,15 +491,15 @@ public void leftOuterJoin() { new Tuple2<>(2, 'z'), new Tuple2<>(4, 'w') )); - List>>> joined = + List>>> joined = rdd1.leftOuterJoin(rdd2).collect(); Assert.assertEquals(5, joined.size()); - Tuple2>> firstUnmatched = + Tuple2>> firstUnmatched = rdd1.leftOuterJoin(rdd2).filter( - new Function>>, Boolean>() { + new Function>>, Boolean>() { @Override - public Boolean call(Tuple2>> tup) { - return !tup._2()._2().isPresent(); + public Boolean call(Tuple2>> tup) { + return !tup._2()._2().isDefined(); } }).first(); Assert.assertEquals(3, firstUnmatched._1().intValue()); @@ -1456,7 +1456,7 @@ public void checkpointAndRestore() { rdd.count(); // Forces the DAG to cause a checkpoint Assert.assertTrue(rdd.isCheckpointed()); - Assert.assertTrue(rdd.getCheckpointFile().isPresent()); + Assert.assertTrue(rdd.getCheckpointFile().isDefined()); JavaRDD recovered = sc.checkpointFile(rdd.getCheckpointFile().get()); Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), recovered.collect()); } @@ -1792,32 +1792,6 @@ public void testAsyncActionErrorWrapping() throws Exception { Assert.assertTrue(future.isDone()); } - - /** - * Test for SPARK-3647. This test needs to use the maven-built assembly to trigger the issue, - * since that's the only artifact where Guava classes have been relocated. - */ - @Test - public void testGuavaOptional() { - // Stop the context created in setUp() and start a local-cluster one, to force usage of the - // assembly. - sc.stop(); - JavaSparkContext localCluster = new JavaSparkContext("local-cluster[1,1,1024]", "JavaAPISuite"); - try { - JavaRDD rdd1 = localCluster.parallelize(Arrays.asList(1, 2, null), 3); - JavaRDD> rdd2 = rdd1.map( - new Function>() { - @Override - public Optional call(Integer i) { - return Optional.fromNullable(i); - } - }); - rdd2.collect(); - } finally { - localCluster.stop(); - } - } - static class Class1 {} static class Class2 {} diff --git a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java index 14975265ab2ce..db65e56f56a1e 100644 --- a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java +++ b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java @@ -21,10 +21,10 @@ import java.io.Serializable; import java.util.*; +import scala.Option; import scala.Tuple2; import com.google.common.collect.Iterables; -import com.google.common.base.Optional; import com.google.common.io.Files; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; @@ -110,11 +110,11 @@ public void leftOuterJoin() { new Tuple2<>(2, 'z'), new Tuple2<>(4, 'w') )); - List>>> joined = + List>>> joined = rdd1.leftOuterJoin(rdd2).collect(); Assert.assertEquals(5, joined.size()); - Tuple2>> firstUnmatched = - rdd1.leftOuterJoin(rdd2).filter(tup -> !tup._2()._2().isPresent()).first(); + Tuple2>> firstUnmatched = + rdd1.leftOuterJoin(rdd2).filter(tup -> !tup._2()._2().isDefined()).first(); Assert.assertEquals(3, firstUnmatched._1().intValue()); }