Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
db24636
make hadoop configuration available to user for all hadoop input form…
koertkuipers May 26, 2015
333d943
add JobConf to all RDD saveAs... methods
koertkuipers Jun 3, 2015
1f82a33
actually use conf in saveAsSequenceFile
koertkuipers Jun 16, 2015
135b96e
merge from master
koertkuipers Jun 16, 2015
425a578
expose hadoop Configuration or JobConf for all methods that use hadoo…
koertkuipers Jun 19, 2015
9230543
address issues raised by andrewor14
koertkuipers Jun 19, 2015
2bfa320
merge from master
koertkuipers Jun 19, 2015
2122160
really simple tests that the Configuration provided gets used in (New…
koertkuipers Jun 19, 2015
df2c2ae
fix scalastyle errors
koertkuipers Jun 19, 2015
e2f7023
dont break binary compatibility (make MiMa happy)
koertkuipers Jun 20, 2015
1cbd95a
silence mima for JavaRDDLike.saveAsTextFile and JavaRDDLike.saveAsObj…
koertkuipers Jun 24, 2015
c5cf6b0
merge from master
koertkuipers Jul 2, 2015
96ccee0
merge from master
koertkuipers Jul 2, 2015
3097312
merge from master
koertkuipers Aug 16, 2015
cfe3f0c
merge from master
koertkuipers Sep 28, 2015
7ca662c
move mima excludes to section for right version
koertkuipers Oct 2, 2015
5483148
merge from master
koertkuipers Oct 13, 2015
470b3d9
Merge branch 'master' into feat-hadoop-input-format-advanced-control
koertkuipers Oct 18, 2015
208c019
merge from master
koertkuipers Nov 11, 2015
5e0b89c
merge from master
koertkuipers Apr 20, 2016
0ea4e5c
fix scalastyle errors
koertkuipers Apr 20, 2016
60c34e1
move mima exclusions for SPARK-8398 to version 2.0
koertkuipers Apr 20, 2016
c06548d
merge from master
koertkuipers Apr 20, 2016
34f97d4
merge from master
koertkuipers Apr 21, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 182 additions & 22 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -792,12 +792,22 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*/
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
minPartitions: Int,
conf: Configuration): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
minPartitions, conf).map(pair => pair._2.toString).setName(path)
}

/**
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
*/
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] =
textFile(path, minPartitions, hadoopConfiguration)

/**
* Read a directory of text files from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI. Each file is read as a single record and returned in a
Expand Down Expand Up @@ -831,9 +841,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*/
def wholeTextFiles(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[(String, String)] = withScope {
minPartitions: Int,
conf: Configuration): RDD[(String, String)] = withScope {
assertNotStopped()
val job = NewHadoopJob.getInstance(hadoopConfiguration)
val job = NewHadoopJob.getInstance(conf)
// Use setInputPaths so that wholeTextFiles aligns with hadoopFile/textFile in taking
// comma separated files as input. (see SPARK-7155)
NewFileInputFormat.setInputPaths(job, path)
Expand All @@ -848,6 +859,40 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}

/**
* Read a directory of text files from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI. Each file is read as a single record and returned in a
* key-value pair, where the key is the path of each file, the value is the content of each file.
*
* <p> For example, if you have the following files:
* {{{
* hdfs://a-hdfs-path/part-00000
* hdfs://a-hdfs-path/part-00001
* ...
* hdfs://a-hdfs-path/part-nnnnn
* }}}
*
* Do `val rdd = sparkContext.wholeTextFile("hdfs://a-hdfs-path")`,
*
* <p> then `rdd` contains
* {{{
* (a-hdfs-path/part-00000, its content)
* (a-hdfs-path/part-00001, its content)
* ...
* (a-hdfs-path/part-nnnnn, its content)
* }}}
*
* @note Small files are preferred, large file is also allowable, but may cause bad performance.
*
* @param minPartitions A suggestion value of the minimal splitting number for input data.
*/
def wholeTextFiles(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[(String, String)] =
wholeTextFiles(path, minPartitions, hadoopConfiguration)

/**
* :: Experimental ::
*
* Get an RDD for a Hadoop-readable dataset as PortableDataStream for each file
* (useful for binary data)
*
Expand Down Expand Up @@ -880,9 +925,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*/
def binaryFiles(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)] = withScope {
minPartitions: Int,
conf: Configuration): RDD[(String, PortableDataStream)] = withScope {
assertNotStopped()
val job = NewHadoopJob.getInstance(hadoopConfiguration)
val job = NewHadoopJob.getInstance(conf)
// Use setInputPaths so that binaryFiles aligns with hadoopFile/textFile in taking
// comma separated files as input. (see SPARK-7155)
NewFileInputFormat.setInputPaths(job, path)
Expand All @@ -897,6 +943,42 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}

/**
* :: Experimental ::
*
* Get an RDD for a Hadoop-readable dataset as PortableDataStream for each file
* (useful for binary data)
*
* For example, if you have the following files:
* {{{
* hdfs://a-hdfs-path/part-00000
* hdfs://a-hdfs-path/part-00001
* ...
* hdfs://a-hdfs-path/part-nnnnn
* }}}
*
* Do
* `val rdd = sparkContext.dataStreamFiles("hdfs://a-hdfs-path")`,
*
* then `rdd` contains
* {{{
* (a-hdfs-path/part-00000, its content)
* (a-hdfs-path/part-00001, its content)
* ...
* (a-hdfs-path/part-nnnnn, its content)
* }}}
*
* @param minPartitions A suggestion value of the minimal splitting number for input data.
*
* @note Small files are preferred; very large files may cause bad performance.
*/
def binaryFiles(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)] =
binaryFiles(path, minPartitions, hadoopConfiguration)

/**
* :: Experimental ::
*
* Load data from a flat binary file, assuming the length of each record is constant.
*
* '''Note:''' We ensure that the byte array for each record in the resulting RDD
Expand Down Expand Up @@ -973,10 +1055,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
minPartitions: Int,
conf: Configuration): RDD[(K, V)] = withScope {
assertNotStopped()
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
val confBroadcast = broadcast(new SerializableConfiguration(conf))
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
new HadoopRDD(
this,
Expand All @@ -988,6 +1071,22 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
minPartitions).setName(path)
}

/** Get an RDD for a Hadoop file with an arbitrary InputFormat
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] =
hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions, hadoopConfiguration)

/**
* Smarter version of hadoopFile() that uses class tags to figure out the classes of keys,
* values and the InputFormat so that users don't need to pass them directly. Instead, callers
Expand Down Expand Up @@ -1108,14 +1207,16 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
def sequenceFile[K, V](path: String,
def sequenceFile[K, V](
path: String,
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int
minPartitions: Int,
conf: Configuration
): RDD[(K, V)] = withScope {
assertNotStopped()
val inputFormatClass = classOf[SequenceFileInputFormat[K, V]]
hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions)
hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions, conf)
}

/**
Expand All @@ -1130,10 +1231,26 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
def sequenceFile[K, V](
path: String,
keyClass: Class[K],
valueClass: Class[V]): RDD[(K, V)] = withScope {
assertNotStopped()
sequenceFile(path, keyClass, valueClass, defaultMinPartitions)
}
valueClass: Class[V],
minPartitions: Int
): RDD[(K, V)] =
sequenceFile(path, keyClass, valueClass, minPartitions, hadoopConfiguration)

/**
* Get an RDD for a Hadoop SequenceFile with given key and value types.
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
def sequenceFile[K, V](
path: String,
keyClass: Class[K],
valueClass: Class[V]
): RDD[(K, V)] =
sequenceFile(path, keyClass, valueClass, defaultMinPartitions, hadoopConfiguration)

/**
* Version of sequenceFile() for types implicitly convertible to Writables through a
Expand All @@ -1157,22 +1274,51 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
def sequenceFile[K, V]
(path: String, minPartitions: Int = defaultMinPartitions)
(implicit km: ClassTag[K], vm: ClassTag[V],
kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]): RDD[(K, V)] = {
def sequenceFile[K, V](
path: String,
minPartitions: Int,
conf: Configuration)(implicit km: ClassTag[K], vm: ClassTag[V],
kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]): RDD[(K, V)] = {
withScope {
assertNotStopped()
val kc = clean(kcf)()
val vc = clean(vcf)()
val format = classOf[SequenceFileInputFormat[Writable, Writable]]
val writables = hadoopFile(path, format,
kc.writableClass(km).asInstanceOf[Class[Writable]],
vc.writableClass(vm).asInstanceOf[Class[Writable]], minPartitions)
vc.writableClass(vm).asInstanceOf[Class[Writable]], minPartitions, conf)
writables.map { case (k, v) => (kc.convert(k), vc.convert(v)) }
}
}

/**
* Version of sequenceFile() for types implicitly convertible to Writables through a
* WritableConverter. For example, to access a SequenceFile where the keys are Text and the
* values are IntWritable, you could simply write
* {{{
* sparkContext.sequenceFile[String, Int](path, ...)
* }}}
*
* WritableConverters are provided in a somewhat strange way (by an implicit function) to support
* both subclasses of Writable and types for which we define a converter (e.g. Int to
* IntWritable). The most natural thing would've been to have implicit objects for the
* converters, but then we couldn't have an object for every subclass of Writable (you can't
* have a parameterized singleton object). We use functions instead to create a new converter
* for the appropriate type. In addition, we pass the converter a ClassTag of its type to
* allow it to figure out the Writable class to use in the subclass case.
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
def sequenceFile[K, V](
path: String,
minPartitions: Int = defaultMinPartitions)(implicit km: ClassTag[K], vm: ClassTag[V],
kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]): RDD[(K, V)] =
sequenceFile(path, minPartitions, hadoopConfiguration)(km, vm, kcf, vcf)

/**
* Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable keys and
* BytesWritable values that contain a serialized partition. This is still an experimental
Expand All @@ -1183,12 +1329,26 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*/
def objectFile[T: ClassTag](
path: String,
minPartitions: Int = defaultMinPartitions): RDD[T] = withScope {
minPartitions: Int,
conf: Configuration): RDD[T] = withScope {
assertNotStopped()
sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minPartitions)
sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minPartitions, conf)
.flatMap(x => Utils.deserialize[Array[T]](x._2.getBytes, Utils.getContextOrSparkClassLoader))
}

/**
* Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable keys and
* BytesWritable values that contain a serialized partition. This is still an experimental
* storage format and may not be supported exactly as is in future Spark releases. It will also
* be pretty slow if you use the default serializer (Java serialization),
* though the nice thing about it is that there's very little effort required to save arbitrary
* objects.
*/
def objectFile[T: ClassTag](
path: String,
minPartitions: Int = defaultMinPartitions): RDD[T] =
objectFile(path, minPartitions, hadoopConfiguration)

protected[spark] def checkpointFile[T: ClassTag](path: String): RDD[T] = withScope {
new ReliableCheckpointRDD[T](this, path)
}
Expand Down
16 changes: 15 additions & 1 deletion core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.collection.JavaConverters._
import scala.reflect.ClassTag

import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.mapred.JobConf

import org.apache.spark._
import org.apache.spark.annotation.Since
Expand Down Expand Up @@ -492,21 +493,34 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def isEmpty(): Boolean = rdd.isEmpty()

/**
* Save this RDD as a text file, using string representations of elements.
*/
def saveAsTextFile(path: String, conf: JobConf): Unit = {
rdd.saveAsTextFile(path, conf)
}

/**
* Save this RDD as a text file, using string representations of elements.
*/
def saveAsTextFile(path: String): Unit = {
rdd.saveAsTextFile(path)
}


/**
* Save this RDD as a compressed text file, using string representations of elements.
*/
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit = {
rdd.saveAsTextFile(path, codec)
}

/**
* Save this RDD as a SequenceFile of serialized objects.
*/
def saveAsObjectFile(path: String, conf: JobConf): Unit = {
rdd.saveAsObjectFile(path, conf)
}

/**
* Save this RDD as a SequenceFile of serialized objects.
*/
Expand Down
Loading