Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,12 @@ package object config {
.longConf
.createWithDefault(4 * 1024 * 1024)

private[spark] val IGNORE_EMPTY_SPLITS = ConfigBuilder("spark.files.ignoreEmptySplits")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This config should be made internal, and the name should be improved because it's not about spark files.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll send a follow-up PR to fix this.

.doc("If true, methods that use HadoopRDD and NewHadoopRDD such as " +
"SparkContext.textFiles will not create a partition for input splits that are empty.")
.booleanConf
.createWithDefault(false)

private[spark] val SECRET_REDACTION_PATTERN =
ConfigBuilder("spark.redaction.regex")
.doc("Regex to decide which Spark configuration properties and environment variables in " +
Expand Down
12 changes: 9 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.IGNORE_CORRUPT_FILES
import org.apache.spark.internal.config.{IGNORE_CORRUPT_FILES, IGNORE_EMPTY_SPLITS}
import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
import org.apache.spark.scheduler.{HDFSCacheTaskLocation, HostTaskLocation}
import org.apache.spark.storage.StorageLevel
Expand Down Expand Up @@ -134,6 +134,8 @@ class HadoopRDD[K, V](

private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES)

private val ignoreEmptySplits = sparkContext.getConf.get(IGNORE_EMPTY_SPLITS)

// Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
protected def getJobConf(): JobConf = {
val conf: Configuration = broadcastedConf.value.value
Expand Down Expand Up @@ -195,8 +197,12 @@ class HadoopRDD[K, V](
val jobConf = getJobConf()
// add the credentials here as this can be called before SparkContext initialized
SparkHadoopUtil.get.addCredentials(jobConf)
val inputFormat = getInputFormat(jobConf)
val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
val inputSplits = if (ignoreEmptySplits) {
allInputSplits.filter(_.getLength > 0)
} else {
allInputSplits
}
val array = new Array[Partition](inputSplits.size)
for (i <- 0 until inputSplits.size) {
array(i) = new HadoopPartition(id, i, inputSplits(i))
Expand Down
13 changes: 10 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.IOException
import java.text.SimpleDateFormat
import java.util.{Date, Locale}

import scala.collection.JavaConverters.asScalaBufferConverter
import scala.reflect.ClassTag

import org.apache.hadoop.conf.{Configurable, Configuration}
Expand All @@ -34,7 +35,7 @@ import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.IGNORE_CORRUPT_FILES
import org.apache.spark.internal.config.{IGNORE_CORRUPT_FILES, IGNORE_EMPTY_SPLITS}
import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager}
Expand Down Expand Up @@ -89,6 +90,8 @@ class NewHadoopRDD[K, V](

private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES)

private val ignoreEmptySplits = sparkContext.getConf.get(IGNORE_EMPTY_SPLITS)

def getConf: Configuration = {
val conf: Configuration = confBroadcast.value.value
if (shouldCloneJobConf) {
Expand Down Expand Up @@ -121,8 +124,12 @@ class NewHadoopRDD[K, V](
configurable.setConf(_conf)
case _ =>
}
val jobContext = new JobContextImpl(_conf, jobId)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val allRowSplits = inputFormat.getSplits(new JobContextImpl(_conf, jobId)).asScala
val rawSplits = if (ignoreEmptySplits) {
allRowSplits.filter(_.getLength > 0)
} else {
allRowSplits
}
val result = new Array[Partition](rawSplits.size)
for (i <- 0 until rawSplits.size) {
result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
Expand Down
97 changes: 89 additions & 8 deletions core/src/test/scala/org/apache/spark/FileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, TextInputFormat => NewTextInputFormat}
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}

import org.apache.spark.internal.config.IGNORE_CORRUPT_FILES
import org.apache.spark.internal.config.{IGNORE_CORRUPT_FILES, IGNORE_EMPTY_SPLITS}
import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -347,10 +347,10 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
}
}

test ("allow user to disable the output directory existence checking (old Hadoop API") {
val sf = new SparkConf()
sf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false")
sc = new SparkContext(sf)
test ("allow user to disable the output directory existence checking (old Hadoop API)") {
val conf = new SparkConf()
conf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false")
sc = new SparkContext(conf)
val randomRDD = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 1)
randomRDD.saveAsTextFile(tempDir.getPath + "/output")
assert(new File(tempDir.getPath + "/output/part-00000").exists() === true)
Expand Down Expand Up @@ -380,9 +380,9 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
}

test ("allow user to disable the output directory existence checking (new Hadoop API") {
val sf = new SparkConf()
sf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false")
sc = new SparkContext(sf)
val conf = new SparkConf()
conf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false")
sc = new SparkContext(conf)
val randomRDD = sc.parallelize(
Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](
Expand Down Expand Up @@ -510,4 +510,85 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
}
}

test("spark.files.ignoreEmptySplits work correctly (old Hadoop API)") {
val conf = new SparkConf()
conf.setAppName("test").setMaster("local").set(IGNORE_EMPTY_SPLITS, true)
sc = new SparkContext(conf)

def testIgnoreEmptySplits(
data: Array[Tuple2[String, String]],
actualPartitionNum: Int,
expectedPartitionNum: Int): Unit = {
val output = new File(tempDir, "output")
sc.parallelize(data, actualPartitionNum)
.saveAsHadoopFile[TextOutputFormat[String, String]](output.getPath)
for (i <- 0 until actualPartitionNum) {
assert(new File(output, s"part-0000$i").exists() === true)
}
val hadoopRDD = sc.textFile(new File(output, "part-*").getPath)
assert(hadoopRDD.partitions.length === expectedPartitionNum)
Utils.deleteRecursively(output)
}

// Ensure that if all of the splits are empty, we remove the splits correctly
testIgnoreEmptySplits(
data = Array.empty[Tuple2[String, String]],
actualPartitionNum = 1,
expectedPartitionNum = 0)

// Ensure that if no split is empty, we don't lose any splits
testIgnoreEmptySplits(
data = Array(("key1", "a"), ("key2", "a"), ("key3", "b")),
actualPartitionNum = 2,
expectedPartitionNum = 2)

// Ensure that if part of the splits are empty, we remove the splits correctly
testIgnoreEmptySplits(
data = Array(("key1", "a"), ("key2", "a")),
actualPartitionNum = 5,
expectedPartitionNum = 2)
}

test("spark.files.ignoreEmptySplits work correctly (new Hadoop API)") {
val conf = new SparkConf()
conf.setAppName("test").setMaster("local").set(IGNORE_EMPTY_SPLITS, true)
sc = new SparkContext(conf)

def testIgnoreEmptySplits(
data: Array[Tuple2[String, String]],
actualPartitionNum: Int,
expectedPartitionNum: Int): Unit = {
val output = new File(tempDir, "output")
sc.parallelize(data, actualPartitionNum)
.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](output.getPath)
for (i <- 0 until actualPartitionNum) {
assert(new File(output, s"part-r-0000$i").exists() === true)
}
val hadoopRDD = sc.newAPIHadoopFile(
new File(output, "part-r-*").getPath,
classOf[NewTextInputFormat],
classOf[LongWritable],
classOf[Text]).asInstanceOf[NewHadoopRDD[_, _]]
assert(hadoopRDD.partitions.length === expectedPartitionNum)
Utils.deleteRecursively(output)
}

// Ensure that if all of the splits are empty, we remove the splits correctly
testIgnoreEmptySplits(
data = Array.empty[Tuple2[String, String]],
actualPartitionNum = 1,
expectedPartitionNum = 0)

// Ensure that if no split is empty, we don't lose any splits
testIgnoreEmptySplits(
data = Array(("1", "a"), ("2", "a"), ("3", "b")),
actualPartitionNum = 2,
expectedPartitionNum = 2)

// Ensure that if part of the splits are empty, we remove the splits correctly
testIgnoreEmptySplits(
data = Array(("1", "a"), ("2", "b")),
actualPartitionNum = 5,
expectedPartitionNum = 2)
}
}