From 1eda9e4921617bc71acf2bb502cf3a22ee43c41f Mon Sep 17 00:00:00 2001 From: zsxwing Date: Fri, 14 Nov 2014 15:35:02 +0800 Subject: [PATCH 01/11] Reorganize 'implicit's to improve the API convenience --- .../scala/org/apache/spark/Accumulators.scala | 23 ++++++ .../scala/org/apache/spark/SparkContext.scala | 30 +++++--- .../scala/org/apache/spark/rdd/package.scala | 29 +++++++- .../org/apache/spark/ImplicitSuite.scala | 73 +++++++++++++++++++ 4 files changed, 144 insertions(+), 11 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/ImplicitSuite.scala diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala index 2301caafb07f..182735629aaf 100644 --- a/core/src/main/scala/org/apache/spark/Accumulators.scala +++ b/core/src/main/scala/org/apache/spark/Accumulators.scala @@ -244,6 +244,29 @@ trait AccumulatorParam[T] extends AccumulableParam[T, T] { } } +object AccumulatorParam { + + implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] { + def addInPlace(t1: Double, t2: Double): Double = t1 + t2 + def zero(initialValue: Double) = 0.0 + } + + implicit object IntAccumulatorParam extends AccumulatorParam[Int] { + def addInPlace(t1: Int, t2: Int): Int = t1 + t2 + def zero(initialValue: Int) = 0 + } + + implicit object LongAccumulatorParam extends AccumulatorParam[Long] { + def addInPlace(t1: Long, t2: Long) = t1 + t2 + def zero(initialValue: Long) = 0L + } + + implicit object FloatAccumulatorParam extends AccumulatorParam[Float] { + def addInPlace(t1: Float, t2: Float) = t1 + t2 + def zero(initialValue: Float) = 0f + } +} + // TODO: The multi-thread support in accumulators is kind of lame; check // if there's a more intuitive way of doing it right private object Accumulators { diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 03ea672c813d..ddd71082079f 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1427,46 +1427,56 @@ object SparkContext extends Logging { private[spark] val DRIVER_IDENTIFIER = "" - implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] { + @deprecated("An API for backforward compatibility", "1.2.0") + object DoubleAccumulatorParam extends AccumulatorParam[Double] { def addInPlace(t1: Double, t2: Double): Double = t1 + t2 def zero(initialValue: Double) = 0.0 } - implicit object IntAccumulatorParam extends AccumulatorParam[Int] { + @deprecated("An API for backforward compatibility", "1.2.0") + object IntAccumulatorParam extends AccumulatorParam[Int] { def addInPlace(t1: Int, t2: Int): Int = t1 + t2 def zero(initialValue: Int) = 0 } - implicit object LongAccumulatorParam extends AccumulatorParam[Long] { + @deprecated("An API for backforward compatibility", "1.2.0") + object LongAccumulatorParam extends AccumulatorParam[Long] { def addInPlace(t1: Long, t2: Long) = t1 + t2 def zero(initialValue: Long) = 0L } - implicit object FloatAccumulatorParam extends AccumulatorParam[Float] { + @deprecated("An API for backforward compatibility", "1.2.0") + object FloatAccumulatorParam extends AccumulatorParam[Float] { def addInPlace(t1: Float, t2: Float) = t1 + t2 def zero(initialValue: Float) = 0f } // TODO: Add AccumulatorParams for other types, e.g. lists and strings - implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)]) + @deprecated("An API for backforward compatibility", "1.2.0") + def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)]) (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) = { new PairRDDFunctions(rdd) } - implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = new AsyncRDDActions(rdd) + @deprecated("An API for backforward compatibility", "1.2.0") + def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = new AsyncRDDActions(rdd) - implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag]( + @deprecated("An API for backforward compatibility", "1.2.0") + def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag]( rdd: RDD[(K, V)]) = new SequenceFileRDDFunctions(rdd) - implicit def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag]( + @deprecated("An API for backforward compatibility", "1.2.0") + def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag]( rdd: RDD[(K, V)]) = new OrderedRDDFunctions[K, V, (K, V)](rdd) - implicit def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]) = new DoubleRDDFunctions(rdd) + @deprecated("An API for backforward compatibility", "1.2.0") + def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]) = new DoubleRDDFunctions(rdd) - implicit def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T]) = + @deprecated("An API for backforward compatibility", "1.2.0") + def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T]) = new DoubleRDDFunctions(rdd.map(x => num.toDouble(x))) // Implicit conversions to common Writable types, for saveAsSequenceFile diff --git a/core/src/main/scala/org/apache/spark/rdd/package.scala b/core/src/main/scala/org/apache/spark/rdd/package.scala index 55fc6e4d2b4d..74498b984ad9 100644 --- a/core/src/main/scala/org/apache/spark/rdd/package.scala +++ b/core/src/main/scala/org/apache/spark/rdd/package.scala @@ -17,7 +17,34 @@ package org.apache.spark +import scala.language.implicitConversions +import scala.reflect.ClassTag + +import org.apache.hadoop.io.Writable + /** * Provides several RDD implementations. See [[org.apache.spark.rdd.RDD]]. */ -package object rdd +package object rdd { + + implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)]) + (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) = { + new PairRDDFunctions(rdd) + } + + implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = new AsyncRDDActions(rdd) + + implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag]( + rdd: RDD[(K, V)]) = + new SequenceFileRDDFunctions(rdd) + + implicit def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag]( + rdd: RDD[(K, V)]) = + new OrderedRDDFunctions[K, V, (K, V)](rdd) + + implicit def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]) = new DoubleRDDFunctions(rdd) + + implicit def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T]) = + new DoubleRDDFunctions(rdd.map(x => num.toDouble(x))) + +} diff --git a/core/src/test/scala/org/apache/spark/ImplicitSuite.scala b/core/src/test/scala/org/apache/spark/ImplicitSuite.scala new file mode 100644 index 000000000000..db0f63cd3506 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/ImplicitSuite.scala @@ -0,0 +1,73 @@ +package org.apache.spark + +/** + * A test suite to make sure all `implicit` functions work correctly. + * Please don't `import org.apache.spark.SparkContext._` in this class. + * + * As `implicit` is a compiler feature, we don't need to run this class. + * What we need to do is making the compiler happy. + */ +class ImplicitSuite { + + + // We only want to test if `implict` works well with the compiler, so we don't need a real + // SparkContext. + def mockSparkContext[T]: org.apache.spark.SparkContext = null + + // We only want to test if `implict` works well with the compiler, so we don't need a real RDD. + def mockRDD[T]: org.apache.spark.rdd.RDD[T] = null + + def testRddToPairRDDFunctions(): Unit = { + val rdd: org.apache.spark.rdd.RDD[(Int, Int)] = mockRDD + rdd.groupByKey + } + + def testRddToAsyncRDDActions(): Unit = { + val rdd: org.apache.spark.rdd.RDD[Int] = mockRDD + rdd.countAsync() + } + + def testRddToSequenceFileRDDFunctions(): Unit = { + // TODO eliminating `import intToIntWritable` needs refactoring SequenceFileRDDFunctions. + // That will be a breaking change. + import org.apache.spark.SparkContext.intToIntWritable + val rdd: org.apache.spark.rdd.RDD[(Int, Int)] = mockRDD + rdd.saveAsSequenceFile("/a/test/path") + } + + def testRddToOrderedRDDFunctions(): Unit = { + val rdd: org.apache.spark.rdd.RDD[(Int, Int)] = mockRDD + rdd.sortByKey() + } + + def testDoubleRDDToDoubleRDDFunctions(): Unit = { + val rdd: org.apache.spark.rdd.RDD[Double] = mockRDD + rdd.stats() + } + + + def testNumericRDDToDoubleRDDFunctions(): Unit = { + val rdd: org.apache.spark.rdd.RDD[Int] = mockRDD + rdd.stats() + } + + def testDoubleAccumulatorParam(): Unit = { + val sc = mockSparkContext + sc.accumulator(123.4) + } + + def testIntAccumulatorParam(): Unit = { + val sc = mockSparkContext + sc.accumulator(123) + } + + def testLongAccumulatorParam(): Unit = { + val sc = mockSparkContext + sc.accumulator(123L) + } + + def testFloatAccumulatorParam(): Unit = { + val sc = mockSparkContext + sc.accumulator(123F) + } +} From 3ac4f07cb23c25f3670658567ca54be44ee04ee2 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Fri, 14 Nov 2014 15:58:58 +0800 Subject: [PATCH 02/11] Add license header --- .../scala/org/apache/spark/ImplicitSuite.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/ImplicitSuite.scala b/core/src/test/scala/org/apache/spark/ImplicitSuite.scala index db0f63cd3506..4d00bc3929df 100644 --- a/core/src/test/scala/org/apache/spark/ImplicitSuite.scala +++ b/core/src/test/scala/org/apache/spark/ImplicitSuite.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark /** From 9b73188430ab00cc312a105c105c58e6d015ea39 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Fri, 14 Nov 2014 16:13:13 +0800 Subject: [PATCH 03/11] Fix the code style issue --- core/src/test/scala/org/apache/spark/ImplicitSuite.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/ImplicitSuite.scala b/core/src/test/scala/org/apache/spark/ImplicitSuite.scala index 4d00bc3929df..58ec6ed4b120 100644 --- a/core/src/test/scala/org/apache/spark/ImplicitSuite.scala +++ b/core/src/test/scala/org/apache/spark/ImplicitSuite.scala @@ -26,7 +26,6 @@ package org.apache.spark */ class ImplicitSuite { - // We only want to test if `implict` works well with the compiler, so we don't need a real // SparkContext. def mockSparkContext[T]: org.apache.spark.SparkContext = null @@ -36,7 +35,7 @@ class ImplicitSuite { def testRddToPairRDDFunctions(): Unit = { val rdd: org.apache.spark.rdd.RDD[(Int, Int)] = mockRDD - rdd.groupByKey + rdd.groupByKey() } def testRddToAsyncRDDActions(): Unit = { @@ -62,7 +61,6 @@ class ImplicitSuite { rdd.stats() } - def testNumericRDDToDoubleRDDFunctions(): Unit = { val rdd: org.apache.spark.rdd.RDD[Int] = mockRDD rdd.stats() From 3bdcae2a36bf14c338f6eb5e050a0bff97970132 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Fri, 14 Nov 2014 19:37:19 +0800 Subject: [PATCH 04/11] Move WritableConverter implicits to object WritableConverter --- .../scala/org/apache/spark/SparkContext.scala | 62 ++++++++++++++++--- .../apache/spark/api/java/JavaPairRDD.scala | 3 +- .../spark/api/java/JavaSparkContext.scala | 2 +- .../org/apache/spark/ImplicitSuite.scala | 40 ++++++++++++ 4 files changed, 96 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ddd71082079f..32b0d81ce242 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1510,32 +1510,40 @@ object SparkContext extends Logging { new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W])) } - implicit def intWritableConverter(): WritableConverter[Int] = + @deprecated("An API for backforward compatibility", "1.2.0") + def intWritableConverter(): WritableConverter[Int] = simpleWritableConverter[Int, IntWritable](_.get) - implicit def longWritableConverter(): WritableConverter[Long] = + @deprecated("An API for backforward compatibility", "1.2.0") + def longWritableConverter(): WritableConverter[Long] = simpleWritableConverter[Long, LongWritable](_.get) - implicit def doubleWritableConverter(): WritableConverter[Double] = + @deprecated("An API for backforward compatibility", "1.2.0") + def doubleWritableConverter(): WritableConverter[Double] = simpleWritableConverter[Double, DoubleWritable](_.get) - implicit def floatWritableConverter(): WritableConverter[Float] = + @deprecated("An API for backforward compatibility", "1.2.0") + def floatWritableConverter(): WritableConverter[Float] = simpleWritableConverter[Float, FloatWritable](_.get) - implicit def booleanWritableConverter(): WritableConverter[Boolean] = + @deprecated("An API for backforward compatibility", "1.2.0") + def booleanWritableConverter(): WritableConverter[Boolean] = simpleWritableConverter[Boolean, BooleanWritable](_.get) - implicit def bytesWritableConverter(): WritableConverter[Array[Byte]] = { + @deprecated("An API for backforward compatibility", "1.2.0") + def bytesWritableConverter(): WritableConverter[Array[Byte]] = { simpleWritableConverter[Array[Byte], BytesWritable](bw => // getBytes method returns array which is longer then data to be returned Arrays.copyOfRange(bw.getBytes, 0, bw.getLength) ) } - implicit def stringWritableConverter(): WritableConverter[String] = + @deprecated("An API for backforward compatibility", "1.2.0") + def stringWritableConverter(): WritableConverter[String] = simpleWritableConverter[String, Text](_.toString) - implicit def writableWritableConverter[T <: Writable]() = + @deprecated("An API for backforward compatibility", "1.2.0") + def writableWritableConverter[T <: Writable]() = new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T]) /** @@ -1760,3 +1768,41 @@ private[spark] class WritableConverter[T]( val writableClass: ClassTag[T] => Class[_ <: Writable], val convert: Writable => T) extends Serializable + +object WritableConverter { + + // Helper objects for converting common types to Writable + private def simpleWritableConverter[T, W <: Writable: ClassTag](convert: W => T) + : WritableConverter[T] = { + val wClass = classTag[W].runtimeClass.asInstanceOf[Class[W]] + new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W])) + } + + implicit def intWritableConverter(): WritableConverter[Int] = + simpleWritableConverter[Int, IntWritable](_.get) + + implicit def longWritableConverter(): WritableConverter[Long] = + simpleWritableConverter[Long, LongWritable](_.get) + + implicit def doubleWritableConverter(): WritableConverter[Double] = + simpleWritableConverter[Double, DoubleWritable](_.get) + + implicit def floatWritableConverter(): WritableConverter[Float] = + simpleWritableConverter[Float, FloatWritable](_.get) + + implicit def booleanWritableConverter(): WritableConverter[Boolean] = + simpleWritableConverter[Boolean, BooleanWritable](_.get) + + implicit def bytesWritableConverter(): WritableConverter[Array[Byte]] = { + simpleWritableConverter[Array[Byte], BytesWritable](bw => + // getBytes method returns array which is longer then data to be returned + Arrays.copyOfRange(bw.getBytes, 0, bw.getLength) + ) + } + + implicit def stringWritableConverter(): WritableConverter[String] = + simpleWritableConverter[String, Text](_.toString) + + implicit def writableWritableConverter[T <: Writable]() = + new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T]) +} diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index e37f3acaf6e3..25ec18bbe676 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -32,13 +32,12 @@ import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} import org.apache.spark.{HashPartitioner, Partitioner} import org.apache.spark.Partitioner._ -import org.apache.spark.SparkContext.rddToPairRDDFunctions import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.JavaSparkContext.fakeClassTag import org.apache.spark.api.java.JavaUtils.mapAsSerializableJavaMap import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, PairFunction} import org.apache.spark.partial.{BoundedDouble, PartialResult} -import org.apache.spark.rdd.{OrderedRDDFunctions, RDD} +import org.apache.spark.rdd.{OrderedRDDFunctions, RDD, rddToPairRDDFunctions} import org.apache.spark.storage.StorageLevel import org.apache.spark.util.Utils diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 5c6e8d32c5c8..92f261d9edb9 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -33,7 +33,7 @@ import org.apache.hadoop.mapred.{InputFormat, JobConf} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.spark._ -import org.apache.spark.SparkContext._ +import org.apache.spark.AccumulatorParam._ import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.JavaSparkContext.fakeClassTag import org.apache.spark.broadcast.Broadcast diff --git a/core/src/test/scala/org/apache/spark/ImplicitSuite.scala b/core/src/test/scala/org/apache/spark/ImplicitSuite.scala index 58ec6ed4b120..df86961df926 100644 --- a/core/src/test/scala/org/apache/spark/ImplicitSuite.scala +++ b/core/src/test/scala/org/apache/spark/ImplicitSuite.scala @@ -85,4 +85,44 @@ class ImplicitSuite { val sc = mockSparkContext sc.accumulator(123F) } + + def testIntWritableConverter(): Unit = { + val sc = mockSparkContext + sc.sequenceFile[Int, Int]("/a/test/path") + } + + def testLongWritableConverter(): Unit = { + val sc = mockSparkContext + sc.sequenceFile[Long, Long]("/a/test/path") + } + + def testDoubleWritableConverter(): Unit = { + val sc = mockSparkContext + sc.sequenceFile[Double, Double]("/a/test/path") + } + + def testFloatWritableConverter(): Unit = { + val sc = mockSparkContext + sc.sequenceFile[Float, Float]("/a/test/path") + } + + def testBooleanWritableConverter(): Unit = { + val sc = mockSparkContext + sc.sequenceFile[Boolean, Boolean]("/a/test/path") + } + + def testBytesWritableConverter(): Unit = { + val sc = mockSparkContext + sc.sequenceFile[Array[Byte], Array[Byte]]("/a/test/path") + } + + def testStringWritableConverter(): Unit = { + val sc = mockSparkContext + sc.sequenceFile[String, String]("/a/test/path") + } + + def testWritableWritableConverter(): Unit = { + val sc = mockSparkContext + sc.sequenceFile[org.apache.hadoop.io.Text, org.apache.hadoop.io.Text]("/a/test/path") + } } From 185c12f1bd372d6a0d164e373c958b826ec0d22f Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sun, 16 Nov 2014 00:03:23 +0800 Subject: [PATCH 05/11] Remove simpleWritableConverter from SparkContext --- .../main/scala/org/apache/spark/SparkContext.scala | 12 +++--------- .../scala/org/apache/spark/SparkContextSuite.scala | 2 +- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 32b0d81ce242..106e4c76415f 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -52,6 +52,7 @@ import org.apache.spark.storage._ import org.apache.spark.ui.SparkUI import org.apache.spark.ui.jobs.JobProgressListener import org.apache.spark.util._ +import org.apache.spark.WritableConverter.simpleWritableConverter /** * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark @@ -1503,13 +1504,6 @@ object SparkContext extends Logging { arr.map(x => anyToWritable(x)).toArray) } - // Helper objects for converting common types to Writable - private def simpleWritableConverter[T, W <: Writable: ClassTag](convert: W => T) - : WritableConverter[T] = { - val wClass = classTag[W].runtimeClass.asInstanceOf[Class[W]] - new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W])) - } - @deprecated("An API for backforward compatibility", "1.2.0") def intWritableConverter(): WritableConverter[Int] = simpleWritableConverter[Int, IntWritable](_.get) @@ -1769,10 +1763,10 @@ private[spark] class WritableConverter[T]( val convert: Writable => T) extends Serializable -object WritableConverter { +private[spark] object WritableConverter { // Helper objects for converting common types to Writable - private def simpleWritableConverter[T, W <: Writable: ClassTag](convert: W => T) + private[spark] def simpleWritableConverter[T, W <: Writable: ClassTag](convert: W => T) : WritableConverter[T] = { val wClass = classTag[W].runtimeClass.asInstanceOf[Class[W]] new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W])) diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 31edad1c56c7..bd60d5f97d1a 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -29,7 +29,7 @@ class SparkContextSuite extends FunSuite { bytesWritable.set(inputArray, 0, 10) bytesWritable.set(inputArray, 0, 5) - val converter = SparkContext.bytesWritableConverter() + val converter = WritableConverter.bytesWritableConverter() val byteArray = converter.convert(bytesWritable) assert(byteArray.length === 5) From 726621846ff726c4a80081e880def4cb045e7c5d Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sun, 16 Nov 2014 00:38:38 +0800 Subject: [PATCH 06/11] Add comments to warn the duplicate codes in SparkContext --- .../scala/org/apache/spark/Accumulators.scala | 5 ++ .../scala/org/apache/spark/SparkContext.scala | 59 +++++++++++++------ .../scala/org/apache/spark/rdd/package.scala | 5 ++ 3 files changed, 51 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala index 182735629aaf..9b7d9e82b778 100644 --- a/core/src/main/scala/org/apache/spark/Accumulators.scala +++ b/core/src/main/scala/org/apache/spark/Accumulators.scala @@ -246,6 +246,11 @@ trait AccumulatorParam[T] extends AccumulableParam[T, T] { object AccumulatorParam { + // The following implicit objects were in SparkContext before 1.2 and users had to + // `import SparkContext._` to enable them. Now we move them here to make the compiler find + // them automatically. However, as there are duplicate codes in SparkContext for backward + // compatibility, please update them accordingly if you modify the following implicit objects. + implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] { def addInPlace(t1: Double, t2: Double): Double = t1 + t2 def zero(initialValue: Double) = 0.0 diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 106e4c76415f..9d2312c13976 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1428,25 +1428,29 @@ object SparkContext extends Logging { private[spark] val DRIVER_IDENTIFIER = "" - @deprecated("An API for backforward compatibility", "1.2.0") + @deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " + + "backward compatibility.", "1.2.0") object DoubleAccumulatorParam extends AccumulatorParam[Double] { def addInPlace(t1: Double, t2: Double): Double = t1 + t2 def zero(initialValue: Double) = 0.0 } - @deprecated("An API for backforward compatibility", "1.2.0") + @deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " + + "backward compatibility.", "1.2.0") object IntAccumulatorParam extends AccumulatorParam[Int] { def addInPlace(t1: Int, t2: Int): Int = t1 + t2 def zero(initialValue: Int) = 0 } - @deprecated("An API for backforward compatibility", "1.2.0") + @deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " + + "backward compatibility.", "1.2.0") object LongAccumulatorParam extends AccumulatorParam[Long] { def addInPlace(t1: Long, t2: Long) = t1 + t2 def zero(initialValue: Long) = 0L } - @deprecated("An API for backforward compatibility", "1.2.0") + @deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " + + "backward compatibility.", "1.2.0") object FloatAccumulatorParam extends AccumulatorParam[Float] { def addInPlace(t1: Float, t2: Float) = t1 + t2 def zero(initialValue: Float) = 0f @@ -1454,29 +1458,35 @@ object SparkContext extends Logging { // TODO: Add AccumulatorParams for other types, e.g. lists and strings - @deprecated("An API for backforward compatibility", "1.2.0") + @deprecated("Replaced by implicit functions in org.apache.spark.rdd package object. This is " + + "kept here only for backward compatibility.", "1.2.0") def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)]) (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) = { new PairRDDFunctions(rdd) } - @deprecated("An API for backforward compatibility", "1.2.0") + @deprecated("Replaced by implicit functions in org.apache.spark.rdd package object. This is " + + "kept here only for backward compatibility.", "1.2.0") def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = new AsyncRDDActions(rdd) - @deprecated("An API for backforward compatibility", "1.2.0") + @deprecated("Replaced by implicit functions in org.apache.spark.rdd package object. This is " + + "kept here only for backward compatibility.", "1.2.0") def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag]( rdd: RDD[(K, V)]) = new SequenceFileRDDFunctions(rdd) - @deprecated("An API for backforward compatibility", "1.2.0") + @deprecated("Replaced by implicit functions in org.apache.spark.rdd package object. This is " + + "kept here only for backward compatibility.", "1.2.0") def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag]( rdd: RDD[(K, V)]) = new OrderedRDDFunctions[K, V, (K, V)](rdd) - @deprecated("An API for backforward compatibility", "1.2.0") + @deprecated("Replaced by implicit functions in org.apache.spark.rdd package object. This is " + + "kept here only for backward compatibility.", "1.2.0") def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]) = new DoubleRDDFunctions(rdd) - @deprecated("An API for backforward compatibility", "1.2.0") + @deprecated("Replaced by implicit functions in org.apache.spark.rdd package object. This is " + + "kept here only for backward compatibility.", "1.2.0") def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T]) = new DoubleRDDFunctions(rdd.map(x => num.toDouble(x))) @@ -1504,27 +1514,33 @@ object SparkContext extends Logging { arr.map(x => anyToWritable(x)).toArray) } - @deprecated("An API for backforward compatibility", "1.2.0") + @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + + "backward compatibility.", "1.2.0") def intWritableConverter(): WritableConverter[Int] = simpleWritableConverter[Int, IntWritable](_.get) - @deprecated("An API for backforward compatibility", "1.2.0") + @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + + "backward compatibility.", "1.2.0") def longWritableConverter(): WritableConverter[Long] = simpleWritableConverter[Long, LongWritable](_.get) - @deprecated("An API for backforward compatibility", "1.2.0") + @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + + "backward compatibility.", "1.2.0") def doubleWritableConverter(): WritableConverter[Double] = simpleWritableConverter[Double, DoubleWritable](_.get) - @deprecated("An API for backforward compatibility", "1.2.0") + @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + + "backward compatibility.", "1.2.0") def floatWritableConverter(): WritableConverter[Float] = simpleWritableConverter[Float, FloatWritable](_.get) - @deprecated("An API for backforward compatibility", "1.2.0") + @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + + "backward compatibility.", "1.2.0") def booleanWritableConverter(): WritableConverter[Boolean] = simpleWritableConverter[Boolean, BooleanWritable](_.get) - @deprecated("An API for backforward compatibility", "1.2.0") + @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + + "backward compatibility.", "1.2.0") def bytesWritableConverter(): WritableConverter[Array[Byte]] = { simpleWritableConverter[Array[Byte], BytesWritable](bw => // getBytes method returns array which is longer then data to be returned @@ -1532,11 +1548,13 @@ object SparkContext extends Logging { ) } - @deprecated("An API for backforward compatibility", "1.2.0") + @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + + "backward compatibility.", "1.2.0") def stringWritableConverter(): WritableConverter[String] = simpleWritableConverter[String, Text](_.toString) - @deprecated("An API for backforward compatibility", "1.2.0") + @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + + "backward compatibility.", "1.2.0") def writableWritableConverter[T <: Writable]() = new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T]) @@ -1772,6 +1790,11 @@ private[spark] object WritableConverter { new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W])) } + // The following implicit functions were in SparkContext before 1.2 and users had to + // `import SparkContext._` to enable them. Now we move them here to make the compiler find + // them automatically. However, as there are duplicate codes in SparkContext for backward + // compatibility, please update them accordingly if you modify the following implicit functions. + implicit def intWritableConverter(): WritableConverter[Int] = simpleWritableConverter[Int, IntWritable](_.get) diff --git a/core/src/main/scala/org/apache/spark/rdd/package.scala b/core/src/main/scala/org/apache/spark/rdd/package.scala index 74498b984ad9..3c22bb354e2a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/package.scala +++ b/core/src/main/scala/org/apache/spark/rdd/package.scala @@ -27,6 +27,11 @@ import org.apache.hadoop.io.Writable */ package object rdd { + // The following implicit functions were in SparkContext before 1.2 and users had to + // `import SparkContext._` to enable them. Now we move them here to make the compiler find + // them automatically. However, as there are duplicate codes in SparkContext for backward + // compatibility, please update them accordingly if you modify the following implicit functions. + implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)]) (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) = { new PairRDDFunctions(rdd) From 34641d413ea2e31128caa3e179750c39f373fe23 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sun, 16 Nov 2014 15:28:24 +0800 Subject: [PATCH 07/11] Move ImplicitSuite to org.apache.sparktest --- .../scala/org/apache/{spark => sparktest}/ImplicitSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename core/src/test/scala/org/apache/{spark => sparktest}/ImplicitSuite.scala (99%) diff --git a/core/src/test/scala/org/apache/spark/ImplicitSuite.scala b/core/src/test/scala/org/apache/sparktest/ImplicitSuite.scala similarity index 99% rename from core/src/test/scala/org/apache/spark/ImplicitSuite.scala rename to core/src/test/scala/org/apache/sparktest/ImplicitSuite.scala index df86961df926..4918e2d92beb 100644 --- a/core/src/test/scala/org/apache/spark/ImplicitSuite.scala +++ b/core/src/test/scala/org/apache/sparktest/ImplicitSuite.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark +package org.apache.sparktest /** * A test suite to make sure all `implicit` functions work correctly. From 52353deb3612daae91fe32ed3a774f90620c29f0 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Mon, 17 Nov 2014 11:59:27 +0800 Subject: [PATCH 08/11] Remove private[spark] from object WritableConverter --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 9d2312c13976..432a7a6c0375 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1781,7 +1781,7 @@ private[spark] class WritableConverter[T]( val convert: Writable => T) extends Serializable -private[spark] object WritableConverter { +object WritableConverter { // Helper objects for converting common types to Writable private[spark] def simpleWritableConverter[T, W <: Writable: ClassTag](convert: W => T) From 2b5f5a44015a73b86b482862ac489044f04653bc Mon Sep 17 00:00:00 2001 From: zsxwing Date: Mon, 17 Nov 2014 14:15:06 +0800 Subject: [PATCH 09/11] Comments for the deprecated functions --- .../scala/org/apache/spark/Accumulators.scala | 2 ++ .../scala/org/apache/spark/SparkContext.scala | 15 ++++++++++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala index 9b7d9e82b778..dc1e8f6c21b6 100644 --- a/core/src/main/scala/org/apache/spark/Accumulators.scala +++ b/core/src/main/scala/org/apache/spark/Accumulators.scala @@ -270,6 +270,8 @@ object AccumulatorParam { def addInPlace(t1: Float, t2: Float) = t1 + t2 def zero(initialValue: Float) = 0f } + + // TODO: Add AccumulatorParams for other types, e.g. lists and strings } // TODO: The multi-thread support in accumulators is kind of lame; check diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 432a7a6c0375..d94720fbf2f4 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1428,6 +1428,11 @@ object SparkContext extends Logging { private[spark] val DRIVER_IDENTIFIER = "" + // The following deprecated objects have already been copied to `object AccumulatorParam` to + // make the compiler find them automatically. They are duplicate codes only for backward + // compatibility, please update `object AccumulatorParam` accordingly if you plan to modify the + // following ones. + @deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " + "backward compatibility.", "1.2.0") object DoubleAccumulatorParam extends AccumulatorParam[Double] { @@ -1456,7 +1461,10 @@ object SparkContext extends Logging { def zero(initialValue: Float) = 0f } - // TODO: Add AccumulatorParams for other types, e.g. lists and strings + // The following deprecated functions have already been copied to `org.apache.spark.rdd` package + // object to make the compiler find them automatically. They are duplicate codes only for backward + // compatibility, please update `org.apache.spark.rdd` package object accordingly if you plan to + // modify the following ones. @deprecated("Replaced by implicit functions in org.apache.spark.rdd package object. This is " + "kept here only for backward compatibility.", "1.2.0") @@ -1514,6 +1522,11 @@ object SparkContext extends Logging { arr.map(x => anyToWritable(x)).toArray) } + // The following deprecated functions have already been copied to `object WritableConverter` to + // make the compiler find them automatically. They are duplicate codes only for backward + // compatibility, please update `object WritableConverter` accordingly if you plan to modify the + // following ones. + @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + "backward compatibility.", "1.2.0") def intWritableConverter(): WritableConverter[Int] = From 9c27aff5e3c954c08fd24678a9abb2ca1292714f Mon Sep 17 00:00:00 2001 From: zsxwing Date: Mon, 17 Nov 2014 19:53:29 +0800 Subject: [PATCH 10/11] Move implicit functions to object RDD and forward old functions to new implicit ones directly --- .../scala/org/apache/spark/SparkContext.scala | 53 ++++++++----------- .../apache/spark/api/java/JavaPairRDD.scala | 3 +- .../main/scala/org/apache/spark/rdd/RDD.scala | 30 +++++++++++ .../scala/org/apache/spark/rdd/package.scala | 34 +----------- 4 files changed, 56 insertions(+), 64 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index d94720fbf2f4..0b33c0cf42b8 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -52,7 +52,6 @@ import org.apache.spark.storage._ import org.apache.spark.ui.SparkUI import org.apache.spark.ui.jobs.JobProgressListener import org.apache.spark.util._ -import org.apache.spark.WritableConverter.simpleWritableConverter /** * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark @@ -1461,42 +1460,41 @@ object SparkContext extends Logging { def zero(initialValue: Float) = 0f } - // The following deprecated functions have already been copied to `org.apache.spark.rdd` package - // object to make the compiler find them automatically. They are duplicate codes only for backward - // compatibility, please update `org.apache.spark.rdd` package object accordingly if you plan to - // modify the following ones. + // The following deprecated functions have already been moved to `object RDD` to + // make the compiler find them automatically. They are still kept here for backward compatibility + // and just call the corresponding functions in `object RDD`. @deprecated("Replaced by implicit functions in org.apache.spark.rdd package object. This is " + "kept here only for backward compatibility.", "1.2.0") def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)]) (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) = { - new PairRDDFunctions(rdd) + RDD.rddToPairRDDFunctions(rdd) } @deprecated("Replaced by implicit functions in org.apache.spark.rdd package object. This is " + "kept here only for backward compatibility.", "1.2.0") - def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = new AsyncRDDActions(rdd) + def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = RDD.rddToAsyncRDDActions(rdd) @deprecated("Replaced by implicit functions in org.apache.spark.rdd package object. This is " + "kept here only for backward compatibility.", "1.2.0") def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag]( rdd: RDD[(K, V)]) = - new SequenceFileRDDFunctions(rdd) + RDD.rddToSequenceFileRDDFunctions(rdd) @deprecated("Replaced by implicit functions in org.apache.spark.rdd package object. This is " + "kept here only for backward compatibility.", "1.2.0") def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag]( rdd: RDD[(K, V)]) = - new OrderedRDDFunctions[K, V, (K, V)](rdd) + RDD.rddToOrderedRDDFunctions(rdd) @deprecated("Replaced by implicit functions in org.apache.spark.rdd package object. This is " + "kept here only for backward compatibility.", "1.2.0") - def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]) = new DoubleRDDFunctions(rdd) + def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]) = RDD.doubleRDDToDoubleRDDFunctions(rdd) @deprecated("Replaced by implicit functions in org.apache.spark.rdd package object. This is " + "kept here only for backward compatibility.", "1.2.0") def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T]) = - new DoubleRDDFunctions(rdd.map(x => num.toDouble(x))) + RDD.numericRDDToDoubleRDDFunctions(rdd) // Implicit conversions to common Writable types, for saveAsSequenceFile @@ -1522,54 +1520,49 @@ object SparkContext extends Logging { arr.map(x => anyToWritable(x)).toArray) } - // The following deprecated functions have already been copied to `object WritableConverter` to - // make the compiler find them automatically. They are duplicate codes only for backward - // compatibility, please update `object WritableConverter` accordingly if you plan to modify the - // following ones. + // The following deprecated functions have already been moved to `object WritableConverter` to + // make the compiler find them automatically. They are still kept here for backward compatibility + // and just call the corresponding functions in `object WritableConverter`. @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + "backward compatibility.", "1.2.0") def intWritableConverter(): WritableConverter[Int] = - simpleWritableConverter[Int, IntWritable](_.get) + WritableConverter.intWritableConverter() @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + "backward compatibility.", "1.2.0") def longWritableConverter(): WritableConverter[Long] = - simpleWritableConverter[Long, LongWritable](_.get) + WritableConverter.longWritableConverter() @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + "backward compatibility.", "1.2.0") def doubleWritableConverter(): WritableConverter[Double] = - simpleWritableConverter[Double, DoubleWritable](_.get) + WritableConverter.doubleWritableConverter() @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + "backward compatibility.", "1.2.0") def floatWritableConverter(): WritableConverter[Float] = - simpleWritableConverter[Float, FloatWritable](_.get) + WritableConverter.floatWritableConverter() @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + "backward compatibility.", "1.2.0") def booleanWritableConverter(): WritableConverter[Boolean] = - simpleWritableConverter[Boolean, BooleanWritable](_.get) + WritableConverter.booleanWritableConverter() @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + "backward compatibility.", "1.2.0") - def bytesWritableConverter(): WritableConverter[Array[Byte]] = { - simpleWritableConverter[Array[Byte], BytesWritable](bw => - // getBytes method returns array which is longer then data to be returned - Arrays.copyOfRange(bw.getBytes, 0, bw.getLength) - ) - } + def bytesWritableConverter(): WritableConverter[Array[Byte]] = + WritableConverter.bytesWritableConverter() @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + "backward compatibility.", "1.2.0") def stringWritableConverter(): WritableConverter[String] = - simpleWritableConverter[String, Text](_.toString) + WritableConverter.stringWritableConverter() @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + "backward compatibility.", "1.2.0") def writableWritableConverter[T <: Writable]() = - new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T]) + WritableConverter.writableWritableConverter() /** * Find the JAR from which a given class was loaded, to make it easy for users to pass @@ -1805,8 +1798,8 @@ object WritableConverter { // The following implicit functions were in SparkContext before 1.2 and users had to // `import SparkContext._` to enable them. Now we move them here to make the compiler find - // them automatically. However, as there are duplicate codes in SparkContext for backward - // compatibility, please update them accordingly if you modify the following implicit functions. + // them automatically. However, we still keep the old functions in SparkContext for backward + // compatibility and forward to the following functions directly. implicit def intWritableConverter(): WritableConverter[Int] = simpleWritableConverter[Int, IntWritable](_.get) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 25ec18bbe676..7af3538262fd 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -37,7 +37,8 @@ import org.apache.spark.api.java.JavaSparkContext.fakeClassTag import org.apache.spark.api.java.JavaUtils.mapAsSerializableJavaMap import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, PairFunction} import org.apache.spark.partial.{BoundedDouble, PartialResult} -import org.apache.spark.rdd.{OrderedRDDFunctions, RDD, rddToPairRDDFunctions} +import org.apache.spark.rdd.{OrderedRDDFunctions, RDD} +import org.apache.spark.rdd.RDD.rddToPairRDDFunctions import org.apache.spark.storage.StorageLevel import org.apache.spark.util.Utils diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 716f2dd17733..337002ffaed6 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -21,6 +21,7 @@ import java.util.{Properties, Random} import scala.collection.{mutable, Map} import scala.collection.mutable.ArrayBuffer +import scala.language.implicitConversions import scala.reflect.{classTag, ClassTag} import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus @@ -28,6 +29,7 @@ import org.apache.hadoop.io.BytesWritable import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.io.NullWritable import org.apache.hadoop.io.Text +import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred.TextOutputFormat import org.apache.spark._ @@ -1383,3 +1385,31 @@ abstract class RDD[T: ClassTag]( new JavaRDD(this)(elementClassTag) } } + +object RDD { + + // The following implicit functions were in SparkContext before 1.2 and users had to + // `import SparkContext._` to enable them. Now we move them here to make the compiler find + // them automatically. However, we still keep the old functions in SparkContext for backward + // compatibility and forward to the following functions directly. + + implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)]) + (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) = { + new PairRDDFunctions(rdd) + } + + implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = new AsyncRDDActions(rdd) + + implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag]( + rdd: RDD[(K, V)]) = + new SequenceFileRDDFunctions(rdd) + + implicit def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag]( + rdd: RDD[(K, V)]) = + new OrderedRDDFunctions[K, V, (K, V)](rdd) + + implicit def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]) = new DoubleRDDFunctions(rdd) + + implicit def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T]) = + new DoubleRDDFunctions(rdd.map(x => num.toDouble(x))) +} diff --git a/core/src/main/scala/org/apache/spark/rdd/package.scala b/core/src/main/scala/org/apache/spark/rdd/package.scala index 3c22bb354e2a..55fc6e4d2b4d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/package.scala +++ b/core/src/main/scala/org/apache/spark/rdd/package.scala @@ -17,39 +17,7 @@ package org.apache.spark -import scala.language.implicitConversions -import scala.reflect.ClassTag - -import org.apache.hadoop.io.Writable - /** * Provides several RDD implementations. See [[org.apache.spark.rdd.RDD]]. */ -package object rdd { - - // The following implicit functions were in SparkContext before 1.2 and users had to - // `import SparkContext._` to enable them. Now we move them here to make the compiler find - // them automatically. However, as there are duplicate codes in SparkContext for backward - // compatibility, please update them accordingly if you modify the following implicit functions. - - implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)]) - (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) = { - new PairRDDFunctions(rdd) - } - - implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = new AsyncRDDActions(rdd) - - implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag]( - rdd: RDD[(K, V)]) = - new SequenceFileRDDFunctions(rdd) - - implicit def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag]( - rdd: RDD[(K, V)]) = - new OrderedRDDFunctions[K, V, (K, V)](rdd) - - implicit def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]) = new DoubleRDDFunctions(rdd) - - implicit def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T]) = - new DoubleRDDFunctions(rdd.map(x => num.toDouble(x))) - -} +package object rdd From fc303144d5f884bec5f6cbaf3ad89a9f97c31ee9 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Fri, 21 Nov 2014 16:25:33 +0800 Subject: [PATCH 11/11] Update the comments --- .../main/scala/org/apache/spark/SparkContext.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 0b33c0cf42b8..3aabe8bb1664 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1464,34 +1464,34 @@ object SparkContext extends Logging { // make the compiler find them automatically. They are still kept here for backward compatibility // and just call the corresponding functions in `object RDD`. - @deprecated("Replaced by implicit functions in org.apache.spark.rdd package object. This is " + + @deprecated("Replaced by implicit functions in the RDD companion object. This is " + "kept here only for backward compatibility.", "1.2.0") def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)]) (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) = { RDD.rddToPairRDDFunctions(rdd) } - @deprecated("Replaced by implicit functions in org.apache.spark.rdd package object. This is " + + @deprecated("Replaced by implicit functions in the RDD companion object. This is " + "kept here only for backward compatibility.", "1.2.0") def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = RDD.rddToAsyncRDDActions(rdd) - @deprecated("Replaced by implicit functions in org.apache.spark.rdd package object. This is " + + @deprecated("Replaced by implicit functions in the RDD companion object. This is " + "kept here only for backward compatibility.", "1.2.0") def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag]( rdd: RDD[(K, V)]) = RDD.rddToSequenceFileRDDFunctions(rdd) - @deprecated("Replaced by implicit functions in org.apache.spark.rdd package object. This is " + + @deprecated("Replaced by implicit functions in the RDD companion object. This is " + "kept here only for backward compatibility.", "1.2.0") def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag]( rdd: RDD[(K, V)]) = RDD.rddToOrderedRDDFunctions(rdd) - @deprecated("Replaced by implicit functions in org.apache.spark.rdd package object. This is " + + @deprecated("Replaced by implicit functions in the RDD companion object. This is " + "kept here only for backward compatibility.", "1.2.0") def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]) = RDD.doubleRDDToDoubleRDDFunctions(rdd) - @deprecated("Replaced by implicit functions in org.apache.spark.rdd package object. This is " + + @deprecated("Replaced by implicit functions in the RDD companion object. This is " + "kept here only for backward compatibility.", "1.2.0") def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T]) = RDD.numericRDDToDoubleRDDFunctions(rdd)