Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,8 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {


/**
* :: Experimental ::
*
* Get an RDD for a Hadoop-readable dataset as PortableDataStream for each file
* (useful for binary data)
*
Expand Down Expand Up @@ -602,6 +604,8 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
}

/**
* :: Experimental ::
*
* Load data from a flat binary file, assuming the length of each record is constant.
*
* @param path Directory to the input data files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,14 @@ import java.io.Closeable
import java.util
import java.util.{Map => JMap}

import java.io.DataInputStream

import org.apache.hadoop.io.{BytesWritable, LongWritable}
import org.apache.spark.input.{PortableDataStream, FixedLengthBinaryInputFormat}

import scala.collection.JavaConversions
import scala.collection.JavaConversions._
import scala.language.implicitConversions
import scala.reflect.ClassTag

import com.google.common.base.Optional
import org.apache.hadoop.conf.Configuration
import org.apache.spark.input.PortableDataStream
import org.apache.hadoop.mapred.{InputFormat, JobConf}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}

Expand Down Expand Up @@ -286,6 +282,8 @@ class JavaSparkContext(val sc: SparkContext)
new JavaPairRDD(sc.binaryFiles(path, minPartitions))

/**
* :: Experimental ::
*
* Read a directory of binary files from HDFS, a local file system (available on all nodes),
* or any Hadoop-supported file system URI as a byte array. Each file is read as a single
* record and returned in a key-value pair, where the key is the path of each file,
Expand All @@ -312,15 +310,19 @@ class JavaSparkContext(val sc: SparkContext)
*
* @note Small files are preferred; very large files but may cause bad performance.
*/
@Experimental
def binaryFiles(path: String): JavaPairRDD[String, PortableDataStream] =
new JavaPairRDD(sc.binaryFiles(path, defaultMinPartitions))

/**
* :: Experimental ::
*
* Load data from a flat binary file, assuming the length of each record is constant.
*
* @param path Directory to the input data files
* @return An RDD of data with values, represented as byte arrays
*/
@Experimental
def binaryRecords(path: String, recordLength: Int): JavaRDD[Array[Byte]] = {
new JavaRDD(sc.binaryRecords(path, recordLength))
}
Expand Down
45 changes: 29 additions & 16 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.io._
import java.net._
import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections}

import org.apache.spark.input.PortableDataStream

import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.language.existentials
Expand Down Expand Up @@ -395,22 +397,33 @@ private[spark] object PythonRDD extends Logging {
newIter.asInstanceOf[Iterator[String]].foreach { str =>
writeUTF(str, dataOut)
}
case pair: Tuple2[_, _] =>
pair._1 match {
case bytePair: Array[Byte] =>
newIter.asInstanceOf[Iterator[Tuple2[Array[Byte], Array[Byte]]]].foreach { pair =>
dataOut.writeInt(pair._1.length)
dataOut.write(pair._1)
dataOut.writeInt(pair._2.length)
dataOut.write(pair._2)
}
case stringPair: String =>
newIter.asInstanceOf[Iterator[Tuple2[String, String]]].foreach { pair =>
writeUTF(pair._1, dataOut)
writeUTF(pair._2, dataOut)
}
case other =>
throw new SparkException("Unexpected Tuple2 element type " + pair._1.getClass)
case stream: PortableDataStream =>
newIter.asInstanceOf[Iterator[PortableDataStream]].foreach { stream =>
val bytes = stream.toArray()
dataOut.writeInt(bytes.length)
dataOut.write(bytes)
}
case (key: String, stream: PortableDataStream) =>
newIter.asInstanceOf[Iterator[(String, PortableDataStream)]].foreach {
case (key, stream) =>
writeUTF(key, dataOut)
val bytes = stream.toArray()
dataOut.writeInt(bytes.length)
dataOut.write(bytes)
}
case (key: String, value: String) =>
newIter.asInstanceOf[Iterator[(String, String)]].foreach {
case (key, value) =>
writeUTF(key, dataOut)
writeUTF(value, dataOut)
}
case (key: Array[Byte], value: Array[Byte]) =>
newIter.asInstanceOf[Iterator[(Array[Byte], Array[Byte])]].foreach {
case (key, value) =>
dataOut.writeInt(key.length)
dataOut.write(key)
dataOut.writeInt(value.length)
dataOut.write(value)
}
case other =>
throw new SparkException("Unexpected element type " + first.getClass)
Expand Down
32 changes: 31 additions & 1 deletion python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from pyspark.files import SparkFiles
from pyspark.java_gateway import launch_gateway
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \
PairDeserializer, CompressedSerializer, AutoBatchedSerializer
PairDeserializer, CompressedSerializer, AutoBatchedSerializer, NoOpSerializer
from pyspark.storagelevel import StorageLevel
from pyspark.rdd import RDD
from pyspark.traceback_utils import CallSite, first_spark_call
Expand Down Expand Up @@ -388,6 +388,36 @@ def wholeTextFiles(self, path, minPartitions=None, use_unicode=True):
return RDD(self._jsc.wholeTextFiles(path, minPartitions), self,
PairDeserializer(UTF8Deserializer(use_unicode), UTF8Deserializer(use_unicode)))

def binaryFiles(self, path, minPartitions=None):
"""
:: Experimental ::

Read a directory of binary files from HDFS, a local file system
(available on all nodes), or any Hadoop-supported file system URI
as a byte array. Each file is read as a single record and returned
in a key-value pair, where the key is the path of each file, the
value is the content of each file.

Note: Small files are preferred, large file is also allowable, but
may cause bad performance.
"""
minPartitions = minPartitions or self.defaultMinPartitions
return RDD(self._jsc.binaryFiles(path, minPartitions), self,
PairDeserializer(UTF8Deserializer(), NoOpSerializer()))

def binaryRecords(self, path, recordLength):
"""
:: Experimental ::

Load data from a flat binary file, assuming each record is a set of numbers
with the specified numerical format (see ByteBuffer), and the number of
bytes per record is constant.

:param path: Directory to the input data files
:param recordLength: The length at which to split the records
"""
return RDD(self._jsc.binaryRecords(path, recordLength), self, NoOpSerializer())

def _dictToJavaMap(self, d):
jm = self._jvm.java.util.HashMap()
if not d:
Expand Down
19 changes: 19 additions & 0 deletions python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1110,6 +1110,25 @@ def test_converters(self):
(u'\x03', [2.0])]
self.assertEqual(maps, em)

def test_binary_files(self):
path = os.path.join(self.tempdir.name, "binaryfiles")
os.mkdir(path)
data = "short binary data"
with open(os.path.join(path, "part-0000"), 'w') as f:
f.write(data)
[(p, d)] = self.sc.binaryFiles(path).collect()
self.assertTrue(p.endswith("part-0000"))
self.assertEqual(d, data)

def test_binary_records(self):
path = os.path.join(self.tempdir.name, "binaryrecords")
os.mkdir(path)
with open(os.path.join(path, "part-0000"), 'w') as f:
for i in range(100):
f.write('%04d' % i)
result = self.sc.binaryRecords(path, 4).map(int).collect()
self.assertEqual(range(100), result)


class OutputFormatTests(ReusedPySparkTestCase):

Expand Down