From db24636978d3679d7b43d73763ef6a540d3cf739 Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Tue, 26 May 2015 16:25:29 -0400 Subject: [PATCH 01/12] make hadoop configuration available to user for all hadoop input formats in spark context --- .../scala/org/apache/spark/SparkContext.scala | 50 ++++++++----------- 1 file changed, 20 insertions(+), 30 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ea6c0dea08e4..e5ce2d11c9e1 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -794,10 +794,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli */ def textFile( path: String, - minPartitions: Int = defaultMinPartitions): RDD[String] = withScope { + minPartitions: Int = defaultMinPartitions, + conf: Configuration = hadoopConfiguration): RDD[String] = withScope { assertNotStopped() hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], - minPartitions).map(pair => pair._2.toString) + minPartitions, conf).map(pair => pair._2.toString) } /** @@ -829,9 +830,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli */ def wholeTextFiles( path: String, - minPartitions: Int = defaultMinPartitions): RDD[(String, String)] = withScope { + minPartitions: Int = defaultMinPartitions, + conf: Configuration = hadoopConfiguration): RDD[(String, String)] = withScope { assertNotStopped() - val job = new NewHadoopJob(hadoopConfiguration) + val job = new NewHadoopJob(conf) // Use setInputPaths so that wholeTextFiles aligns with hadoopFile/textFile in taking // comma separated files as input. (see SPARK-7155) NewFileInputFormat.setInputPaths(job, path) @@ -878,9 +880,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli @Experimental def binaryFiles( path: String, - minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)] = withScope { + minPartitions: Int = defaultMinPartitions, + conf: Configuration = hadoopConfiguration): RDD[(String, PortableDataStream)] = withScope { assertNotStopped() - val job = new NewHadoopJob(hadoopConfiguration) + val job = new NewHadoopJob(conf) // Use setInputPaths so that binaryFiles aligns with hadoopFile/textFile in taking // comma separated files as input. (see SPARK-7155) NewFileInputFormat.setInputPaths(job, path) @@ -971,10 +974,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], - minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope { + minPartitions: Int = defaultMinPartitions, + conf: Configuration = hadoopConfiguration): RDD[(K, V)] = withScope { assertNotStopped() // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it. - val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration)) + val confBroadcast = broadcast(new SerializableWritable(conf)) val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path) new HadoopRDD( this, @@ -1108,27 +1112,12 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V], - minPartitions: Int + minPartitions: Int = defaultMinPartitions, + conf: Configuration = hadoopConfiguration ): RDD[(K, V)] = withScope { assertNotStopped() val inputFormatClass = classOf[SequenceFileInputFormat[K, V]] - hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions) - } - - /** Get an RDD for a Hadoop SequenceFile with given key and value types. - * - * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each - * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle - * operation will create many references to the same object. - * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first - * copy them using a `map` function. - * */ - def sequenceFile[K, V]( - path: String, - keyClass: Class[K], - valueClass: Class[V]): RDD[(K, V)] = withScope { - assertNotStopped() - sequenceFile(path, keyClass, valueClass, defaultMinPartitions) + hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions, conf) } /** @@ -1154,7 +1143,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * copy them using a `map` function. */ def sequenceFile[K, V] - (path: String, minPartitions: Int = defaultMinPartitions) + (path: String, minPartitions: Int = defaultMinPartitions, conf: Configuration = hadoopConfiguration) (implicit km: ClassTag[K], vm: ClassTag[V], kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]): RDD[(K, V)] = { withScope { @@ -1164,7 +1153,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli val format = classOf[SequenceFileInputFormat[Writable, Writable]] val writables = hadoopFile(path, format, kc.writableClass(km).asInstanceOf[Class[Writable]], - vc.writableClass(vm).asInstanceOf[Class[Writable]], minPartitions) + vc.writableClass(vm).asInstanceOf[Class[Writable]], minPartitions, conf) writables.map { case (k, v) => (kc.convert(k), vc.convert(v)) } } } @@ -1179,9 +1168,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli */ def objectFile[T: ClassTag]( path: String, - minPartitions: Int = defaultMinPartitions): RDD[T] = withScope { + minPartitions: Int = defaultMinPartitions, + conf: Configuration = hadoopConfiguration): RDD[T] = withScope { assertNotStopped() - sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minPartitions) + sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minPartitions, conf) .flatMap(x => Utils.deserialize[Array[T]](x._2.getBytes, Utils.getContextOrSparkClassLoader)) } From 333d943b77533fa162a51460820fdac7423a54fb Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Wed, 3 Jun 2015 17:25:25 -0400 Subject: [PATCH 02/12] add JobConf to all RDD saveAs... methods --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 10 +++++----- .../apache/spark/rdd/SequenceFileRDDFunctions.scala | 3 ++- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index d772f03f7665..c5876cd860ea 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -27,7 +27,7 @@ import scala.reflect.{classTag, ClassTag} import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus import org.apache.hadoop.io.{BytesWritable, NullWritable, Text} import org.apache.hadoop.io.compress.CompressionCodec -import org.apache.hadoop.mapred.TextOutputFormat +import org.apache.hadoop.mapred.{ JobConf, TextOutputFormat } import org.apache.spark._ import org.apache.spark.Partitioner._ @@ -1376,7 +1376,7 @@ abstract class RDD[T: ClassTag]( /** * Save this RDD as a text file, using string representations of elements. */ - def saveAsTextFile(path: String): Unit = withScope { + def saveAsTextFile(path: String, conf: JobConf = new JobConf(sc.hadoopConfiguration)): Unit = withScope { // https://issues.apache.org/jira/browse/SPARK-2075 // // NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot find an implicit @@ -1397,7 +1397,7 @@ abstract class RDD[T: ClassTag]( } } RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null) - .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path) + .saveAsHadoopFile(path, classOf[NullWritable], classOf[Text], classOf[TextOutputFormat[NullWritable, Text]], conf) } /** @@ -1421,10 +1421,10 @@ abstract class RDD[T: ClassTag]( /** * Save this RDD as a SequenceFile of serialized objects. */ - def saveAsObjectFile(path: String): Unit = withScope { + def saveAsObjectFile(path: String, conf: JobConf = new JobConf(sc.hadoopConfiguration)): Unit = withScope { this.mapPartitions(iter => iter.grouped(10).map(_.toArray)) .map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x)))) - .saveAsSequenceFile(path) + .saveAsSequenceFile(path, None, conf) } /** diff --git a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala index 3dfcf67f0eb6..3cf4371e2643 100644 --- a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala @@ -87,7 +87,8 @@ class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag */ def saveAsSequenceFile( path: String, - codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope { + codec: Option[Class[_ <: CompressionCodec]] = None, + conf: JobConf = new JobConf(self.context.hadoopConfiguration)): Unit = self.withScope { def anyToWritable[U <% Writable](u: U): Writable = u // TODO We cannot force the return type of `anyToWritable` be same as keyWritableClass and From 1f82a33b5a7c1077ffdb6c515ac209043a0366d0 Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Tue, 16 Jun 2015 13:26:13 -0400 Subject: [PATCH 03/12] actually use conf in saveAsSequenceFile --- .../org/apache/spark/rdd/SequenceFileRDDFunctions.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala index 3cf4371e2643..dc024c7e8c71 100644 --- a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala @@ -101,18 +101,17 @@ class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag logInfo("Saving as sequence file of type (" + keyWritableClass.getSimpleName + "," + valueWritableClass.getSimpleName + ")" ) val format = classOf[SequenceFileOutputFormat[Writable, Writable]] - val jobConf = new JobConf(self.context.hadoopConfiguration) if (!convertKey && !convertValue) { - self.saveAsHadoopFile(path, keyWritableClass, valueWritableClass, format, jobConf, codec) + self.saveAsHadoopFile(path, keyWritableClass, valueWritableClass, format, conf, codec) } else if (!convertKey && convertValue) { self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile( - path, keyWritableClass, valueWritableClass, format, jobConf, codec) + path, keyWritableClass, valueWritableClass, format, conf, codec) } else if (convertKey && !convertValue) { self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile( - path, keyWritableClass, valueWritableClass, format, jobConf, codec) + path, keyWritableClass, valueWritableClass, format, conf, codec) } else if (convertKey && convertValue) { self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile( - path, keyWritableClass, valueWritableClass, format, jobConf, codec) + path, keyWritableClass, valueWritableClass, format, conf, codec) } } } From 425a57844075cfd87c316db3a28deb5e542390c3 Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Fri, 19 Jun 2015 11:20:34 -0400 Subject: [PATCH 04/12] expose hadoop Configuration or JobConf for all methods that use hadoop input/output formats in java api --- .../apache/spark/api/java/JavaRDDLike.scala | 16 ++- .../spark/api/java/JavaSparkContext.scala | 136 ++++++++++++++++++ 2 files changed, 151 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index c95615a5a930..3959b5dd7206 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -26,6 +26,7 @@ import scala.collection.JavaConverters._ import scala.reflect.ClassTag import com.google.common.base.Optional +import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.io.compress.CompressionCodec import org.apache.spark._ @@ -515,6 +516,13 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { */ def isEmpty(): Boolean = rdd.isEmpty() + /** + * Save this RDD as a text file, using string representations of elements. + */ + def saveAsTextFile(path: String, conf: JobConf): Unit = { + rdd.saveAsTextFile(path, conf) + } + /** * Save this RDD as a text file, using string representations of elements. */ @@ -522,7 +530,6 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { rdd.saveAsTextFile(path) } - /** * Save this RDD as a compressed text file, using string representations of elements. */ @@ -530,6 +537,13 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { rdd.saveAsTextFile(path, codec) } + /** + * Save this RDD as a SequenceFile of serialized objects. + */ + def saveAsObjectFile(path: String, conf: JobConf): Unit = { + rdd.saveAsObjectFile(path, conf) + } + /** * Save this RDD as a SequenceFile of serialized objects. */ diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 02e49a853c5f..7bcbe472f56c 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -190,8 +190,48 @@ class JavaSparkContext(val sc: SparkContext) def textFile(path: String, minPartitions: Int): JavaRDD[String] = sc.textFile(path, minPartitions) + /** + * Read a text file from HDFS, a local file system (available on all nodes), or any + * Hadoop-supported file system URI, and return it as an RDD of Strings. + */ + def textFile(path: String, minPartitions: Int, conf: Configuration): JavaRDD[String] = + sc.textFile(path, minPartitions, conf) + + /** + * Read a directory of text files from HDFS, a local file system (available on all nodes), or any + * Hadoop-supported file system URI. Each file is read as a single record and returned in a + * key-value pair, where the key is the path of each file, the value is the content of each file. + * + *

For example, if you have the following files: + * {{{ + * hdfs://a-hdfs-path/part-00000 + * hdfs://a-hdfs-path/part-00001 + * ... + * hdfs://a-hdfs-path/part-nnnnn + * }}} + * + * Do + * {{{ + * JavaPairRDD rdd = sparkContext.wholeTextFiles("hdfs://a-hdfs-path") + * }}} + * + *

then `rdd` contains + * {{{ + * (a-hdfs-path/part-00000, its content) + * (a-hdfs-path/part-00001, its content) + * ... + * (a-hdfs-path/part-nnnnn, its content) + * }}} + * + * @note Small files are preferred, large file is also allowable, but may cause bad performance. + * + * @param minPartitions A suggestion value of the minimal splitting number for input data. + */ + def wholeTextFiles(path: String, minPartitions: Int, conf: Configuration): JavaPairRDD[String, String] = + new JavaPairRDD(sc.wholeTextFiles(path, minPartitions, conf)) + /** * Read a directory of text files from HDFS, a local file system (available on all nodes), or any * Hadoop-supported file system URI. Each file is read as a single record and returned in a @@ -235,6 +275,38 @@ class JavaSparkContext(val sc: SparkContext) def wholeTextFiles(path: String): JavaPairRDD[String, String] = new JavaPairRDD(sc.wholeTextFiles(path)) + /** + * Read a directory of binary files from HDFS, a local file system (available on all nodes), + * or any Hadoop-supported file system URI as a byte array. Each file is read as a single + * record and returned in a key-value pair, where the key is the path of each file, + * the value is the content of each file. + * + * For example, if you have the following files: + * {{{ + * hdfs://a-hdfs-path/part-00000 + * hdfs://a-hdfs-path/part-00001 + * ... + * hdfs://a-hdfs-path/part-nnnnn + * }}} + * + * Do + * `JavaPairRDD rdd = sparkContext.dataStreamFiles("hdfs://a-hdfs-path")`, + * + * then `rdd` contains + * {{{ + * (a-hdfs-path/part-00000, its content) + * (a-hdfs-path/part-00001, its content) + * ... + * (a-hdfs-path/part-nnnnn, its content) + * }}} + * + * @note Small files are preferred; very large files but may cause bad performance. + * + * @param minPartitions A suggestion value of the minimal splitting number for input data. + */ + def binaryFiles(path: String, minPartitions: Int, conf: Configuration): JavaPairRDD[String, PortableDataStream] = + new JavaPairRDD(sc.binaryFiles(path, minPartitions, conf)) + /** * Read a directory of binary files from HDFS, a local file system (available on all nodes), * or any Hadoop-supported file system URI as a byte array. Each file is read as a single @@ -300,6 +372,19 @@ class JavaSparkContext(val sc: SparkContext) def binaryFiles(path: String): JavaPairRDD[String, PortableDataStream] = new JavaPairRDD(sc.binaryFiles(path, defaultMinPartitions)) + /** + * :: Experimental :: + * + * Load data from a flat binary file, assuming the length of each record is constant. + * + * @param path Directory to the input data files + * @return An RDD of data with values, represented as byte arrays + */ + @Experimental + def binaryRecords(path: String, recordLength: Int, conf: Configuration): JavaRDD[Array[Byte]] = { + new JavaRDD(sc.binaryRecords(path, recordLength, conf)) + } + /** * :: Experimental :: * @@ -313,6 +398,24 @@ class JavaSparkContext(val sc: SparkContext) new JavaRDD(sc.binaryRecords(path, recordLength)) } + /** Get an RDD for a Hadoop SequenceFile with given key and value types. + * + * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each + * record, directly caching the returned RDD will create many references to the same object. + * If you plan to directly cache Hadoop writable objects, you should first copy them using + * a `map` function. + * */ + def sequenceFile[K, V](path: String, + keyClass: Class[K], + valueClass: Class[V], + minPartitions: Int, + conf: Configuration + ): JavaPairRDD[K, V] = { + implicit val ctagK: ClassTag[K] = ClassTag(keyClass) + implicit val ctagV: ClassTag[V] = ClassTag(valueClass) + new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minPartitions, conf)) + } + /** Get an RDD for a Hadoop SequenceFile with given key and value types. * * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each @@ -344,6 +447,18 @@ class JavaSparkContext(val sc: SparkContext) new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass)) } + /** + * Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable keys and + * BytesWritable values that contain a serialized partition. This is still an experimental storage + * format and may not be supported exactly as is in future Spark releases. It will also be pretty + * slow if you use the default serializer (Java serialization), though the nice thing about it is + * that there's very little effort required to save arbitrary objects. + */ + def objectFile[T](path: String, minPartitions: Int, conf: Configuration): JavaRDD[T] = { + implicit val ctag: ClassTag[T] = fakeClassTag + sc.objectFile(path, minPartitions, conf)(ctag) + } + /** * Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable keys and * BytesWritable values that contain a serialized partition. This is still an experimental storage @@ -429,6 +544,27 @@ class JavaSparkContext(val sc: SparkContext) new JavaHadoopRDD(rdd.asInstanceOf[HadoopRDD[K, V]]) } + /** Get an RDD for a Hadoop file with an arbitrary InputFormat. + * + * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each + * record, directly caching the returned RDD will create many references to the same object. + * If you plan to directly cache Hadoop writable objects, you should first copy them using + * a `map` function. + */ + def hadoopFile[K, V, F <: InputFormat[K, V]]( + path: String, + inputFormatClass: Class[F], + keyClass: Class[K], + valueClass: Class[V], + minPartitions: Int, + conf: Configuration + ): JavaPairRDD[K, V] = { + implicit val ctagK: ClassTag[K] = ClassTag(keyClass) + implicit val ctagV: ClassTag[V] = ClassTag(valueClass) + val rdd = sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions, conf) + new JavaHadoopRDD(rdd.asInstanceOf[HadoopRDD[K, V]]) + } + /** Get an RDD for a Hadoop file with an arbitrary InputFormat. * * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each From 923054376a9ce79545583966d7249e52a443f3ba Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Fri, 19 Jun 2015 12:14:18 -0400 Subject: [PATCH 05/12] address issues raised by andrewor14 --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index eefc720cb67d..d7b175f278e6 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -27,7 +27,7 @@ import scala.reflect.{classTag, ClassTag} import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus import org.apache.hadoop.io.{BytesWritable, NullWritable, Text} import org.apache.hadoop.io.compress.CompressionCodec -import org.apache.hadoop.mapred.{ JobConf, TextOutputFormat } +import org.apache.hadoop.mapred.{JobConf, TextOutputFormat} import org.apache.spark._ import org.apache.spark.Partitioner._ @@ -1397,7 +1397,9 @@ abstract class RDD[T: ClassTag]( } } RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null) - .saveAsHadoopFile(path, classOf[NullWritable], classOf[Text], classOf[TextOutputFormat[NullWritable, Text]], conf) + .saveAsHadoopFile( + path, classOf[NullWritable],classOf[Text], + classOf[TextOutputFormat[NullWritable, Text]], conf) } /** @@ -1424,7 +1426,7 @@ abstract class RDD[T: ClassTag]( def saveAsObjectFile(path: String, conf: JobConf = new JobConf(sc.hadoopConfiguration)): Unit = withScope { this.mapPartitions(iter => iter.grouped(10).map(_.toArray)) .map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x)))) - .saveAsSequenceFile(path, None, conf) + .saveAsSequenceFile(path, conf = conf) } /** From 21221602ff0b0e5c4cead917330eb8aadd7c1fad Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Fri, 19 Jun 2015 17:08:15 -0400 Subject: [PATCH 06/12] really simple tests that the Configuration provided gets used in (New)HadoopRDD --- .../org/apache/spark/SparkContextSuite.scala | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 6838b35ab4cc..746a995063ca 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -24,9 +24,11 @@ import com.google.common.base.Charsets._ import com.google.common.io.Files import org.apache.hadoop.io.{BytesWritable, LongWritable, Text} +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.TextInputFormat import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFormat} import org.apache.spark.util.Utils +import org.apache.spark.rdd.{RDD, HadoopRDD, NewHadoopRDD} import scala.concurrent.Await import scala.concurrent.duration.Duration @@ -272,4 +274,29 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext { sc.stop() } } + + test("Passing configuration into methods that create (New)HadoopRDD (SPARK-8398)") { + try { + sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) + val conf = new Configuration(sc.hadoopConfiguration) + val k = "test" + val v = "dummyForTest" + conf.set(k, v) + def sourceRDD(rdd: RDD[_]): RDD[_] = + if (!rdd.dependencies.isEmpty) rdd.dependencies.head.rdd else rdd + + assert(sourceRDD(sc.textFile("nonexistent", 1, conf)).asInstanceOf[HadoopRDD[_, _]] + .getConf.get(k) == v) + assert(sourceRDD(sc.wholeTextFiles("nonexistent", 1, conf)).asInstanceOf[NewHadoopRDD[_, _]] + .getConf.get(k) == v) + assert(sourceRDD(sc.binaryFiles("nonexistent", 1, conf)).asInstanceOf[NewHadoopRDD[_, _]] + .getConf.get(k) == v) + assert(sourceRDD(sc.sequenceFile[Int, Int]("nonexistent", 1, conf)).asInstanceOf[HadoopRDD[_, _]] + .getConf.get(k) == v) + assert(sourceRDD(sc.objectFile[Int]("nonexistent", 1, conf)).asInstanceOf[HadoopRDD[_, _]] + .getConf.get(k) == v) + } finally { + sc.stop() + } + } } From df2c2ae2fe88c4532dd680290d7d91e43a8b4f9b Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Fri, 19 Jun 2015 18:05:46 -0400 Subject: [PATCH 07/12] fix scalastyle errors --- .../scala/org/apache/spark/SparkContext.scala | 7 +++---- .../spark/api/java/JavaSparkContext.scala | 10 ++++++++-- .../main/scala/org/apache/spark/rdd/RDD.scala | 8 +++++--- .../org/apache/spark/SparkContextSuite.scala | 20 +++++++++---------- 4 files changed, 26 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ba016a98bf57..78444ba1f27a 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1142,10 +1142,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first * copy them using a `map` function. */ - def sequenceFile[K, V] - (path: String, minPartitions: Int = defaultMinPartitions, conf: Configuration = hadoopConfiguration) - (implicit km: ClassTag[K], vm: ClassTag[V], - kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]): RDD[(K, V)] = { + def sequenceFile[K, V](path: String, minPartitions: Int = defaultMinPartitions, + conf: Configuration = hadoopConfiguration)(implicit km: ClassTag[K], vm: ClassTag[V], + kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]): RDD[(K, V)] = { withScope { assertNotStopped() val kc = clean(kcf)() diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 7bcbe472f56c..1615fa9569b8 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -229,7 +229,10 @@ class JavaSparkContext(val sc: SparkContext) * * @param minPartitions A suggestion value of the minimal splitting number for input data. */ - def wholeTextFiles(path: String, minPartitions: Int, conf: Configuration): JavaPairRDD[String, String] = + def wholeTextFiles( + path: String, + minPartitions: Int, + conf: Configuration): JavaPairRDD[String, String] = new JavaPairRDD(sc.wholeTextFiles(path, minPartitions, conf)) /** @@ -304,7 +307,10 @@ class JavaSparkContext(val sc: SparkContext) * * @param minPartitions A suggestion value of the minimal splitting number for input data. */ - def binaryFiles(path: String, minPartitions: Int, conf: Configuration): JavaPairRDD[String, PortableDataStream] = + def binaryFiles( + path: String, + minPartitions: Int, + conf: Configuration): JavaPairRDD[String, PortableDataStream] = new JavaPairRDD(sc.binaryFiles(path, minPartitions, conf)) /** diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index d7b175f278e6..37196cd68a2a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1376,7 +1376,8 @@ abstract class RDD[T: ClassTag]( /** * Save this RDD as a text file, using string representations of elements. */ - def saveAsTextFile(path: String, conf: JobConf = new JobConf(sc.hadoopConfiguration)): Unit = withScope { + def saveAsTextFile(path: String, conf: JobConf = new JobConf(sc.hadoopConfiguration)) + : Unit = withScope { // https://issues.apache.org/jira/browse/SPARK-2075 // // NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot find an implicit @@ -1398,7 +1399,7 @@ abstract class RDD[T: ClassTag]( } RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null) .saveAsHadoopFile( - path, classOf[NullWritable],classOf[Text], + path, classOf[NullWritable], classOf[Text], classOf[TextOutputFormat[NullWritable, Text]], conf) } @@ -1423,7 +1424,8 @@ abstract class RDD[T: ClassTag]( /** * Save this RDD as a SequenceFile of serialized objects. */ - def saveAsObjectFile(path: String, conf: JobConf = new JobConf(sc.hadoopConfiguration)): Unit = withScope { + def saveAsObjectFile(path: String, conf: JobConf = new JobConf(sc.hadoopConfiguration)) + : Unit = withScope { this.mapPartitions(iter => iter.grouped(10).map(_.toArray)) .map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x)))) .saveAsSequenceFile(path, conf = conf) diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 746a995063ca..329c9adecdc6 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -285,16 +285,16 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext { def sourceRDD(rdd: RDD[_]): RDD[_] = if (!rdd.dependencies.isEmpty) rdd.dependencies.head.rdd else rdd - assert(sourceRDD(sc.textFile("nonexistent", 1, conf)).asInstanceOf[HadoopRDD[_, _]] - .getConf.get(k) == v) - assert(sourceRDD(sc.wholeTextFiles("nonexistent", 1, conf)).asInstanceOf[NewHadoopRDD[_, _]] - .getConf.get(k) == v) - assert(sourceRDD(sc.binaryFiles("nonexistent", 1, conf)).asInstanceOf[NewHadoopRDD[_, _]] - .getConf.get(k) == v) - assert(sourceRDD(sc.sequenceFile[Int, Int]("nonexistent", 1, conf)).asInstanceOf[HadoopRDD[_, _]] - .getConf.get(k) == v) - assert(sourceRDD(sc.objectFile[Int]("nonexistent", 1, conf)).asInstanceOf[HadoopRDD[_, _]] - .getConf.get(k) == v) + assert(sourceRDD(sc.textFile("nonexistent", 1, conf)) + .asInstanceOf[HadoopRDD[_, _]].getConf.get(k) == v) + assert(sourceRDD(sc.wholeTextFiles("nonexistent", 1, conf)) + .asInstanceOf[NewHadoopRDD[_, _]].getConf.get(k) == v) + assert(sourceRDD(sc.binaryFiles("nonexistent", 1, conf)) + .asInstanceOf[NewHadoopRDD[_, _]].getConf.get(k) == v) + assert(sourceRDD(sc.sequenceFile[Int, Int]("nonexistent", 1, conf)) + .asInstanceOf[HadoopRDD[_, _]].getConf.get(k) == v) + assert(sourceRDD(sc.objectFile[Int]("nonexistent", 1, conf)) + .asInstanceOf[HadoopRDD[_, _]].getConf.get(k) == v) } finally { sc.stop() } From e2f702314099aa9c5b72099e3a7a7ac54b2baf47 Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Sat, 20 Jun 2015 15:33:09 -0400 Subject: [PATCH 08/12] dont break binary compatibility (make MiMa happy) --- .../scala/org/apache/spark/SparkContext.scala | 196 ++++++++++++++++-- .../main/scala/org/apache/spark/rdd/RDD.scala | 20 +- .../spark/rdd/SequenceFileRDDFunctions.scala | 16 +- 3 files changed, 210 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 78444ba1f27a..7da5f0b67e69 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -794,13 +794,22 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli */ def textFile( path: String, - minPartitions: Int = defaultMinPartitions, - conf: Configuration = hadoopConfiguration): RDD[String] = withScope { + minPartitions: Int, + conf: Configuration): RDD[String] = withScope { assertNotStopped() hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions, conf).map(pair => pair._2.toString) } + /** + * Read a text file from HDFS, a local file system (available on all nodes), or any + * Hadoop-supported file system URI, and return it as an RDD of Strings. + */ + def textFile( + path: String, + minPartitions: Int = defaultMinPartitions): RDD[String] = + textFile(path, minPartitions, hadoopConfiguration) + /** * Read a directory of text files from HDFS, a local file system (available on all nodes), or any * Hadoop-supported file system URI. Each file is read as a single record and returned in a @@ -830,8 +839,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli */ def wholeTextFiles( path: String, - minPartitions: Int = defaultMinPartitions, - conf: Configuration = hadoopConfiguration): RDD[(String, String)] = withScope { + minPartitions: Int, + conf: Configuration): RDD[(String, String)] = withScope { assertNotStopped() val job = new NewHadoopJob(conf) // Use setInputPaths so that wholeTextFiles aligns with hadoopFile/textFile in taking @@ -847,6 +856,37 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli minPartitions).setName(path) } + /** + * Read a directory of text files from HDFS, a local file system (available on all nodes), or any + * Hadoop-supported file system URI. Each file is read as a single record and returned in a + * key-value pair, where the key is the path of each file, the value is the content of each file. + * + *

For example, if you have the following files: + * {{{ + * hdfs://a-hdfs-path/part-00000 + * hdfs://a-hdfs-path/part-00001 + * ... + * hdfs://a-hdfs-path/part-nnnnn + * }}} + * + * Do `val rdd = sparkContext.wholeTextFile("hdfs://a-hdfs-path")`, + * + *

then `rdd` contains + * {{{ + * (a-hdfs-path/part-00000, its content) + * (a-hdfs-path/part-00001, its content) + * ... + * (a-hdfs-path/part-nnnnn, its content) + * }}} + * + * @note Small files are preferred, large file is also allowable, but may cause bad performance. + * + * @param minPartitions A suggestion value of the minimal splitting number for input data. + */ + def wholeTextFiles( + path: String, + minPartitions: Int = defaultMinPartitions): RDD[(String, String)] = + wholeTextFiles(path, minPartitions, hadoopConfiguration) /** * :: Experimental :: @@ -880,8 +920,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli @Experimental def binaryFiles( path: String, - minPartitions: Int = defaultMinPartitions, - conf: Configuration = hadoopConfiguration): RDD[(String, PortableDataStream)] = withScope { + minPartitions: Int, + conf: Configuration): RDD[(String, PortableDataStream)] = withScope { assertNotStopped() val job = new NewHadoopJob(conf) // Use setInputPaths so that binaryFiles aligns with hadoopFile/textFile in taking @@ -897,6 +937,41 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli minPartitions).setName(path) } + /** + * :: Experimental :: + * + * Get an RDD for a Hadoop-readable dataset as PortableDataStream for each file + * (useful for binary data) + * + * For example, if you have the following files: + * {{{ + * hdfs://a-hdfs-path/part-00000 + * hdfs://a-hdfs-path/part-00001 + * ... + * hdfs://a-hdfs-path/part-nnnnn + * }}} + * + * Do + * `val rdd = sparkContext.dataStreamFiles("hdfs://a-hdfs-path")`, + * + * then `rdd` contains + * {{{ + * (a-hdfs-path/part-00000, its content) + * (a-hdfs-path/part-00001, its content) + * ... + * (a-hdfs-path/part-nnnnn, its content) + * }}} + * + * @param minPartitions A suggestion value of the minimal splitting number for input data. + * + * @note Small files are preferred; very large files may cause bad performance. + */ + @Experimental + def binaryFiles( + path: String, + minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)] = + binaryFiles(path, minPartitions, hadoopConfiguration) + /** * :: Experimental :: * @@ -974,8 +1049,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], - minPartitions: Int = defaultMinPartitions, - conf: Configuration = hadoopConfiguration): RDD[(K, V)] = withScope { + minPartitions: Int, + conf: Configuration): RDD[(K, V)] = withScope { assertNotStopped() // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it. val confBroadcast = broadcast(new SerializableConfiguration(conf)) @@ -990,6 +1065,22 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli minPartitions).setName(path) } + /** Get an RDD for a Hadoop file with an arbitrary InputFormat + * + * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each + * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle + * operation will create many references to the same object. + * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first + * copy them using a `map` function. + */ + def hadoopFile[K, V]( + path: String, + inputFormatClass: Class[_ <: InputFormat[K, V]], + keyClass: Class[K], + valueClass: Class[V], + minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = + hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions, hadoopConfiguration) + /** * Smarter version of hadoopFile() that uses class tags to figure out the classes of keys, * values and the InputFormat so that users don't need to pass them directly. Instead, callers @@ -1109,17 +1200,49 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first * copy them using a `map` function. */ - def sequenceFile[K, V](path: String, + def sequenceFile[K, V]( + path: String, keyClass: Class[K], valueClass: Class[V], - minPartitions: Int = defaultMinPartitions, - conf: Configuration = hadoopConfiguration + minPartitions: Int, + conf: Configuration ): RDD[(K, V)] = withScope { assertNotStopped() val inputFormatClass = classOf[SequenceFileInputFormat[K, V]] hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions, conf) } + /** Get an RDD for a Hadoop SequenceFile with given key and value types. + * + * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each + * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle + * operation will create many references to the same object. + * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first + * copy them using a `map` function. + */ + def sequenceFile[K, V]( + path: String, + keyClass: Class[K], + valueClass: Class[V], + minPartitions: Int + ): RDD[(K, V)] = + sequenceFile(path, keyClass, valueClass, minPartitions, hadoopConfiguration) + + /** Get an RDD for a Hadoop SequenceFile with given key and value types. + * + * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each + * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle + * operation will create many references to the same object. + * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first + * copy them using a `map` function. + */ + def sequenceFile[K, V]( + path: String, + keyClass: Class[K], + valueClass: Class[V] + ): RDD[(K, V)] = + sequenceFile(path, keyClass, valueClass, defaultMinPartitions, hadoopConfiguration) + /** * Version of sequenceFile() for types implicitly convertible to Writables through a * WritableConverter. For example, to access a SequenceFile where the keys are Text and the @@ -1142,8 +1265,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first * copy them using a `map` function. */ - def sequenceFile[K, V](path: String, minPartitions: Int = defaultMinPartitions, - conf: Configuration = hadoopConfiguration)(implicit km: ClassTag[K], vm: ClassTag[V], + def sequenceFile[K, V]( + path: String, + minPartitions: Int, + conf: Configuration)(implicit km: ClassTag[K], vm: ClassTag[V], kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]): RDD[(K, V)] = { withScope { assertNotStopped() @@ -1157,6 +1282,34 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } } + /** + * Version of sequenceFile() for types implicitly convertible to Writables through a + * WritableConverter. For example, to access a SequenceFile where the keys are Text and the + * values are IntWritable, you could simply write + * {{{ + * sparkContext.sequenceFile[String, Int](path, ...) + * }}} + * + * WritableConverters are provided in a somewhat strange way (by an implicit function) to support + * both subclasses of Writable and types for which we define a converter (e.g. Int to + * IntWritable). The most natural thing would've been to have implicit objects for the + * converters, but then we couldn't have an object for every subclass of Writable (you can't + * have a parameterized singleton object). We use functions instead to create a new converter + * for the appropriate type. In addition, we pass the converter a ClassTag of its type to + * allow it to figure out the Writable class to use in the subclass case. + * + * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each + * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle + * operation will create many references to the same object. + * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first + * copy them using a `map` function. + */ + def sequenceFile[K, V]( + path: String, + minPartitions: Int = defaultMinPartitions)(implicit km: ClassTag[K], vm: ClassTag[V], + kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]): RDD[(K, V)] = + sequenceFile(path, minPartitions, hadoopConfiguration)(km, vm, kcf, vcf) + /** * Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable keys and * BytesWritable values that contain a serialized partition. This is still an experimental @@ -1167,13 +1320,26 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli */ def objectFile[T: ClassTag]( path: String, - minPartitions: Int = defaultMinPartitions, - conf: Configuration = hadoopConfiguration): RDD[T] = withScope { + minPartitions: Int, + conf: Configuration): RDD[T] = withScope { assertNotStopped() sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minPartitions, conf) .flatMap(x => Utils.deserialize[Array[T]](x._2.getBytes, Utils.getContextOrSparkClassLoader)) } + /** + * Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable keys and + * BytesWritable values that contain a serialized partition. This is still an experimental + * storage format and may not be supported exactly as is in future Spark releases. It will also + * be pretty slow if you use the default serializer (Java serialization), + * though the nice thing about it is that there's very little effort required to save arbitrary + * objects. + */ + def objectFile[T: ClassTag]( + path: String, + minPartitions: Int = defaultMinPartitions): RDD[T] = + objectFile(path, minPartitions, hadoopConfiguration) + protected[spark] def checkpointFile[T: ClassTag](path: String): RDD[T] = withScope { new CheckpointRDD[T](this, path) } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 37196cd68a2a..25099a799bc4 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1376,8 +1376,7 @@ abstract class RDD[T: ClassTag]( /** * Save this RDD as a text file, using string representations of elements. */ - def saveAsTextFile(path: String, conf: JobConf = new JobConf(sc.hadoopConfiguration)) - : Unit = withScope { + def saveAsTextFile(path: String, conf: JobConf): Unit = withScope { // https://issues.apache.org/jira/browse/SPARK-2075 // // NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot find an implicit @@ -1403,6 +1402,12 @@ abstract class RDD[T: ClassTag]( classOf[TextOutputFormat[NullWritable, Text]], conf) } + /** + * Save this RDD as a text file, using string representations of elements. + */ + def saveAsTextFile(path: String): Unit = + saveAsTextFile(path, new JobConf(sc.hadoopConfiguration)) + /** * Save this RDD as a compressed text file, using string representations of elements. */ @@ -1424,13 +1429,18 @@ abstract class RDD[T: ClassTag]( /** * Save this RDD as a SequenceFile of serialized objects. */ - def saveAsObjectFile(path: String, conf: JobConf = new JobConf(sc.hadoopConfiguration)) - : Unit = withScope { + def saveAsObjectFile(path: String, conf: JobConf): Unit = withScope { this.mapPartitions(iter => iter.grouped(10).map(_.toArray)) .map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x)))) - .saveAsSequenceFile(path, conf = conf) + .saveAsSequenceFile(path, None, conf) } + /** + * Save this RDD as a SequenceFile of serialized objects. + */ + def saveAsObjectFile(path: String): Unit = + saveAsObjectFile(path, new JobConf(sc.hadoopConfiguration)) + /** * Creates tuples of the elements in this RDD by applying `f`. */ diff --git a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala index b8b69b68cdf1..eac653ae817f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala @@ -87,8 +87,8 @@ class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag */ def saveAsSequenceFile( path: String, - codec: Option[Class[_ <: CompressionCodec]] = None, - conf: JobConf = new JobConf(self.context.hadoopConfiguration)): Unit = self.withScope { + codec: Option[Class[_ <: CompressionCodec]], + conf: JobConf): Unit = self.withScope { def anyToWritable[U <% Writable](u: U): Writable = u // TODO We cannot force the return type of `anyToWritable` be same as keyWritableClass and @@ -114,4 +114,16 @@ class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag path, keyWritableClass, valueWritableClass, format, conf, codec) } } + + /** + * Output the RDD as a Hadoop SequenceFile using the Writable types we infer from the RDD's key + * and value types. If the key or value are Writable, then we use their classes directly; + * otherwise we map primitive types such as Int and Double to IntWritable, DoubleWritable, etc, + * byte arrays to BytesWritable, and Strings to Text. The `path` can be on any Hadoop-supported + * file system. + */ + def saveAsSequenceFile( + path: String, + codec: Option[Class[_ <: CompressionCodec]] = None): Unit = + saveAsSequenceFile(path, codec, new JobConf(self.context.hadoopConfiguration)) } From 1cbd95a9a0592d8dbab21d0cc6a403bc1fff1b9d Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Tue, 23 Jun 2015 23:52:28 -0400 Subject: [PATCH 09/12] silence mima for JavaRDDLike.saveAsTextFile and JavaRDDLike.saveAsObjectFile --- project/MimaExcludes.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 8a93ca299951..dac1ba724d5f 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -44,6 +44,11 @@ object MimaExcludes { // JavaRDDLike is not meant to be extended by user programs ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.api.java.JavaRDDLike.partitioner"), + // New methods introduced in SPARK-8398 that expose hadoop Configuration + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.api.java.JavaRDDLike.saveAsTextFile"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.api.java.JavaRDDLike.saveAsObjectFile"), // Mima false positive (was a private[spark] class) ProblemFilters.exclude[MissingClassProblem]( "org.apache.spark.util.collection.PairIterator"), From 7ca662c909de4ba1984bbc4f6b5453fe7b4352e9 Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Thu, 1 Oct 2015 22:51:53 -0400 Subject: [PATCH 10/12] move mima excludes to section for right version --- project/MimaExcludes.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 7db3cf335795..b11cec6e0fd9 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -78,6 +78,12 @@ object MimaExcludes { "org.apache.spark.ml.regression.LeastSquaresAggregator.add"), ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.ml.regression.LeastSquaresCostFun.this") + ) ++ Seq( + // New methods introduced in SPARK-8398 that expose hadoop Configuration + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.api.java.JavaRDDLike.saveAsTextFile"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.api.java.JavaRDDLike.saveAsObjectFile") ) case v if v.startsWith("1.5") => Seq( @@ -90,11 +96,6 @@ object MimaExcludes { // JavaRDDLike is not meant to be extended by user programs ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.api.java.JavaRDDLike.partitioner"), - // New methods introduced in SPARK-8398 that expose hadoop Configuration - ProblemFilters.exclude[MissingMethodProblem]( - "org.apache.spark.api.java.JavaRDDLike.saveAsTextFile"), - ProblemFilters.exclude[MissingMethodProblem]( - "org.apache.spark.api.java.JavaRDDLike.saveAsObjectFile"), // Modification of private static method ProblemFilters.exclude[IncompatibleMethTypeProblem]( "org.apache.spark.streaming.kafka.KafkaUtils.org$apache$spark$streaming$kafka$KafkaUtils$$leadersForRanges"), From 0ea4e5ccc6524b1aab363740df9f3cf4d2c9fe35 Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Wed, 20 Apr 2016 11:03:22 -0400 Subject: [PATCH 11/12] fix scalastyle errors --- .../apache/spark/api/java/JavaRDDLike.scala | 2 +- .../spark/api/java/JavaSparkContext.scala | 30 ++++++++++--------- .../org/apache/spark/SparkContextSuite.scala | 5 ++-- 3 files changed, 19 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index bafa73e88537..33e444844644 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -24,8 +24,8 @@ import java.util.{Comparator, Iterator => JIterator, List => JList} import scala.collection.JavaConverters._ import scala.reflect.ClassTag -import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.io.compress.CompressionCodec +import org.apache.hadoop.mapred.JobConf import org.apache.spark._ import org.apache.spark.annotation.Since diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index a355340c6e2c..09b85196e7f4 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -404,13 +404,14 @@ class JavaSparkContext(val sc: SparkContext) new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minPartitions, conf)) } - /** Get an RDD for a Hadoop SequenceFile with given key and value types. - * - * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each - * record, directly caching the returned RDD will create many references to the same object. - * If you plan to directly cache Hadoop writable objects, you should first copy them using - * a `map` function. - * */ + /** + * Get an RDD for a Hadoop SequenceFile with given key and value types. + * + * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each + * record, directly caching the returned RDD will create many references to the same object. + * If you plan to directly cache Hadoop writable objects, you should first copy them using + * a `map` function. + */ def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V], @@ -555,13 +556,14 @@ class JavaSparkContext(val sc: SparkContext) new JavaHadoopRDD(rdd.asInstanceOf[HadoopRDD[K, V]]) } - /** Get an RDD for a Hadoop file with an arbitrary InputFormat. - * - * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each - * record, directly caching the returned RDD will create many references to the same object. - * If you plan to directly cache Hadoop writable objects, you should first copy them using - * a `map` function. - */ + /** + * Get an RDD for a Hadoop file with an arbitrary InputFormat. + * + * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each + * record, directly caching the returned RDD will create many references to the same object. + * If you plan to directly cache Hadoop writable objects, you should first copy them using + * a `map` function. + */ def hadoopFile[K, V, F <: InputFormat[K, V]]( path: String, inputFormatClass: Class[F], diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 9ae5455912f7..8c47b23026b2 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -25,15 +25,14 @@ import scala.concurrent.Await import scala.concurrent.duration.Duration import com.google.common.io.Files -import org.apache.hadoop.io.{BytesWritable, LongWritable, Text} import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.io.{BytesWritable, LongWritable, Text} import org.apache.hadoop.mapred.TextInputFormat import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFormat} - import org.scalatest.Matchers._ +import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD, RDD} import org.apache.spark.util.Utils -import org.apache.spark.rdd.{RDD, HadoopRDD, NewHadoopRDD} class SparkContextSuite extends SparkFunSuite with LocalSparkContext { From 60c34e1baa8da2db1fc29b1dd57bb903beed39e2 Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Wed, 20 Apr 2016 11:34:02 -0400 Subject: [PATCH 12/12] move mima exclusions for SPARK-8398 to version 2.0 --- project/MimaExcludes.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index e1d49b283ec0..dbc43e549953 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -652,6 +652,12 @@ object MimaExcludes { ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.TaskMetricDistributions.shuffleWriteMetrics"), ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.TaskMetricDistributions.shuffleReadMetrics"), ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.status.api.v1.TaskMetricDistributions.this") + ) ++ Seq( + // New methods introduced in SPARK-8398 that expose hadoop Configuration + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.api.java.JavaRDDLike.saveAsTextFile"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.api.java.JavaRDDLike.saveAsObjectFile") ) case v if v.startsWith("1.6") => Seq( @@ -705,12 +711,6 @@ object MimaExcludes { "org.apache.spark.ml.regression.LeastSquaresAggregator.add"), ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.ml.regression.LeastSquaresCostFun.this") - ) ++ Seq( - // New methods introduced in SPARK-8398 that expose hadoop Configuration - ProblemFilters.exclude[MissingMethodProblem]( - "org.apache.spark.api.java.JavaRDDLike.saveAsTextFile"), - ProblemFilters.exclude[MissingMethodProblem]( - "org.apache.spark.api.java.JavaRDDLike.saveAsObjectFile") ) ++ Seq( ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.sql.SQLContext.clearLastInstantiatedContext"),