Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,50 +61,21 @@ private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext
extends OutputWriter {

private val recordWriter: RecordWriter[Void, Row] = {
val conf = context.getConfiguration
val outputFormat = {
// When appending new Parquet files to an existing Parquet file directory, to avoid
// overwriting existing data files, we need to find out the max task ID encoded in these data
// file names.
// TODO Make this snippet a utility function for other data source developers
val maxExistingTaskId = {
// Note that `path` may point to a temporary location. Here we retrieve the real
// destination path from the configuration
val outputPath = new Path(conf.get("spark.sql.sources.output.path"))
val fs = outputPath.getFileSystem(conf)

if (fs.exists(outputPath)) {
// Pattern used to match task ID in part file names, e.g.:
//
// part-r-00001.gz.parquet
// ^~~~~
val partFilePattern = """part-.-(\d{1,}).*""".r

fs.listStatus(outputPath).map(_.getPath.getName).map {
case partFilePattern(id) => id.toInt
case name if name.startsWith("_") => 0
case name if name.startsWith(".") => 0
case name => throw new AnalysisException(
s"Trying to write Parquet files to directory $outputPath, " +
s"but found items with illegal name '$name'.")
}.reduceOption(_ max _).getOrElse(0)
} else {
0
}
}

new ParquetOutputFormat[Row]() {
// Here we override `getDefaultWorkFile` for two reasons:
//
// 1. To allow appending. We need to generate output file name based on the max available
// task ID computed above.
// 1. To allow appending. We need to generate unique output file names to avoid
// overwriting existing files (either exist before the write job, or are just written
// by other tasks within the same write job).
//
// 2. To allow dynamic partitioning. Default `getDefaultWorkFile` uses
// `FileOutputCommitter.getWorkPath()`, which points to the base directory of all
// partitions in the case of dynamic partitioning.
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
val split = context.getTaskAttemptID.getTaskID.getId + maxExistingTaskId + 1
new Path(path, f"part-r-$split%05d$extension")
val uniqueWriteJobId = context.getConfiguration.get("spark.sql.sources.writeJobUUID")
val split = context.getTaskAttemptID.getTaskID.getId
new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
}
}
}
Expand Down
59 changes: 49 additions & 10 deletions sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.sources

import java.util.Date
import java.util.{UUID, Date}

import scala.collection.mutable

Expand Down Expand Up @@ -59,6 +59,28 @@ private[sql] case class InsertIntoDataSource(
}
}

/**
* A command for writing data to a [[HadoopFsRelation]]. Supports both overwriting and appending.
* Writing to dynamic partitions is also supported. Each [[InsertIntoHadoopFsRelation]] issues a
* single write job, and owns a UUID that identifies this job. Each concrete implementation of
* [[HadoopFsRelation]] should use this UUID together with task id to generate unique file path for
* each task output file. This UUID is passed to executor side via a property named
* `spark.sql.sources.writeJobUUID`.
*
* Different writer containers, [[DefaultWriterContainer]] and [[DynamicPartitionWriterContainer]]
* are used to write to normal tables and tables with dynamic partitions.
*
* Basic work flow of this command is:
*
* 1. Driver side setup, including output committer initialization and data source specific
* preparation work for the write job to be issued.
* 2. Issues a write job consists of one or more executor side tasks, each of which writes all
* rows within an RDD partition.
* 3. If no exception is thrown in a task, commits that task, otherwise aborts that task; If any
* exception is thrown during task commitment, also aborts that task.
* 4. If all tasks are committed, commit the job, otherwise aborts the job; If any exception is
* thrown during job commitment, also aborts the job.
*/
private[sql] case class InsertIntoHadoopFsRelation(
@transient relation: HadoopFsRelation,
@transient query: LogicalPlan,
Expand Down Expand Up @@ -271,6 +293,13 @@ private[sql] abstract class BaseWriterContainer(

protected val serializableConf = new SerializableWritable(ContextUtil.getConfiguration(job))

// This UUID is used to avoid output file name collision between different appending write jobs.
// These jobs may belong to different SparkContext instances. Concrete data source implementations
// may use this UUID to generate unique file names (e.g., `part-r-<task-id>-<job-uuid>.parquet`).
// The reason why this ID is used to identify a job rather than a single task output file is
// that, speculative tasks must generate the same output file name as the original task.
private val uniqueWriteJobId = UUID.randomUUID()

// This is only used on driver side.
@transient private val jobContext: JobContext = job

Expand Down Expand Up @@ -298,6 +327,11 @@ private[sql] abstract class BaseWriterContainer(
setupIDs(0, 0, 0)
setupConf()

// This UUID is sent to executor side together with the serialized `Configuration` object within
// the `Job` instance. `OutputWriters` on the executor side should use this UUID to generate
// unique task output files.
job.getConfiguration.set("spark.sql.sources.writeJobUUID", uniqueWriteJobId.toString)

// Order of the following two lines is important. For Hadoop 1, TaskAttemptContext constructor
// clones the Configuration object passed in. If we initialize the TaskAttemptContext first,
// configurations made in prepareJobForWrite(job) are not populated into the TaskAttemptContext.
Expand Down Expand Up @@ -425,15 +459,16 @@ private[sql] class DefaultWriterContainer(
assert(writer != null, "OutputWriter instance should have been initialized")
writer.close()
super.commitTask()
} catch {
case cause: Throwable =>
super.abortTask()
throw new RuntimeException("Failed to commit task", cause)
} catch { case cause: Throwable =>
// This exception will be handled in `InsertIntoHadoopFsRelation.insert$writeRows`, and will
// cause `abortTask()` to be invoked.
throw new RuntimeException("Failed to commit task", cause)
}
}

override def abortTask(): Unit = {
try {
// It's possible that the task fails before `writer` gets initialized
if (writer != null) {
writer.close()
}
Expand Down Expand Up @@ -477,21 +512,25 @@ private[sql] class DynamicPartitionWriterContainer(
})
}

override def commitTask(): Unit = {
try {
private def clearOutputWriters(): Unit = {
if (outputWriters.nonEmpty) {
outputWriters.values.foreach(_.close())
outputWriters.clear()
}
}

override def commitTask(): Unit = {
try {
clearOutputWriters()
super.commitTask()
} catch { case cause: Throwable =>
super.abortTask()
throw new RuntimeException("Failed to commit task", cause)
}
}

override def abortTask(): Unit = {
try {
outputWriters.values.foreach(_.close())
outputWriters.clear()
clearOutputWriters()
} finally {
super.abortTask()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ private[orc] object OrcFileOperator extends Logging{
val fspath = new Path(pathStr)
val fs = fspath.getFileSystem(conf)
val orcFiles = listOrcFiles(pathStr, conf)

logDebug(s"Creating ORC Reader from ${orcFiles.head}")
// TODO Need to consider all files when schema evolution is taken into account.
OrcFile.createReader(fs, orcFiles.head)
}
Expand All @@ -42,6 +42,7 @@ private[orc] object OrcFileOperator extends Logging{
val reader = getFileReader(path, conf)
val readerInspector = reader.getObjectInspector.asInstanceOf[StructObjectInspector]
val schema = readerInspector.getTypeName
logDebug(s"Reading schema from file $path, got Hive schema string: $schema")
HiveMetastoreTypes.toDataType(schema).asInstanceOf[StructType]
}

Expand All @@ -52,14 +53,14 @@ private[orc] object OrcFileOperator extends Logging{
def listOrcFiles(pathStr: String, conf: Configuration): Seq[Path] = {
val origPath = new Path(pathStr)
val fs = origPath.getFileSystem(conf)
val path = origPath.makeQualified(fs)
val path = origPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
val paths = SparkHadoopUtil.get.listLeafStatuses(fs, origPath)
.filterNot(_.isDir)
.map(_.getPath)
.filterNot(_.getName.startsWith("_"))
.filterNot(_.getName.startsWith("."))

if (paths == null || paths.size == 0) {
if (paths == null || paths.isEmpty) {
throw new IllegalArgumentException(
s"orcFileOperator: path $path does not have valid orc files matching the pattern")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,9 @@ private[orc] class OrcOutputWriter(
recordWriterInstantiated = true

val conf = context.getConfiguration
val uniqueWriteJobId = conf.get("spark.sql.sources.writeJobUUID")
val partition = context.getTaskAttemptID.getTaskID.getId
val filename = f"part-r-$partition%05d-${System.currentTimeMillis}%015d.orc"
val filename = f"part-r-$partition%05d-$uniqueWriteJobId.orc"

new OrcOutputFormat().getRecordWriter(
new Path(path, filename).getFileSystem(conf),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import scala.collection.JavaConversions._
object TestHive
extends TestHiveContext(
new SparkContext(
"local[2]",
"local[32]",
"TestSQLContext",
new SparkConf()
.set("spark.sql.test", "")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
import org.apache.spark.sql.hive.test.TestHive.implicits._

sparkContext
.makeRDD(1 to 10)
.makeRDD(1 to 100)
.map(i => OrcData(i, s"part-$i"))
.toDF()
.registerTempTable(s"orc_temp_table")
Expand All @@ -70,43 +70,43 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
}

test("create temporary orc table") {
checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_source"), Row(10))
checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_source"), Row(100))

checkAnswer(
sql("SELECT * FROM normal_orc_source"),
(1 to 10).map(i => Row(i, s"part-$i")))
(1 to 100).map(i => Row(i, s"part-$i")))

checkAnswer(
sql("SELECT * FROM normal_orc_source where intField > 5"),
(6 to 10).map(i => Row(i, s"part-$i")))
(6 to 100).map(i => Row(i, s"part-$i")))

checkAnswer(
sql("SELECT COUNT(intField), stringField FROM normal_orc_source GROUP BY stringField"),
(1 to 10).map(i => Row(1, s"part-$i")))
(1 to 100).map(i => Row(1, s"part-$i")))
}

test("create temporary orc table as") {
checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_as_source"), Row(10))
checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_as_source"), Row(100))

checkAnswer(
sql("SELECT * FROM normal_orc_source"),
(1 to 10).map(i => Row(i, s"part-$i")))
(1 to 100).map(i => Row(i, s"part-$i")))

checkAnswer(
sql("SELECT * FROM normal_orc_source WHERE intField > 5"),
(6 to 10).map(i => Row(i, s"part-$i")))
(6 to 100).map(i => Row(i, s"part-$i")))

checkAnswer(
sql("SELECT COUNT(intField), stringField FROM normal_orc_source GROUP BY stringField"),
(1 to 10).map(i => Row(1, s"part-$i")))
(1 to 100).map(i => Row(1, s"part-$i")))
}

test("appending insert") {
sql("INSERT INTO TABLE normal_orc_source SELECT * FROM orc_temp_table WHERE intField > 5")

checkAnswer(
sql("SELECT * FROM normal_orc_source"),
(1 to 5).map(i => Row(i, s"part-$i")) ++ (6 to 10).flatMap { i =>
(1 to 5).map(i => Row(i, s"part-$i")) ++ (6 to 100).flatMap { i =>
Seq.fill(2)(Row(i, s"part-$i"))
})
}
Expand All @@ -119,7 +119,7 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {

checkAnswer(
sql("SELECT * FROM normal_orc_as_source"),
(6 to 10).map(i => Row(i, s"part-$i")))
(6 to 100).map(i => Row(i, s"part-$i")))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ class AppendingTextOutputFormat(outputFile: Path) extends TextOutputFormat[NullW
numberFormat.setGroupingUsed(false)

override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
val uniqueWriteJobId = context.getConfiguration.get("spark.sql.sources.writeJobUUID")
val split = context.getTaskAttemptID.getTaskID.getId
val name = FileOutputFormat.getOutputName(context)
new Path(outputFile, s"$name-${numberFormat.format(split)}-${UUID.randomUUID()}")
new Path(outputFile, s"$name-${numberFormat.format(split)}-$uniqueWriteJobId")
}
}

Expand Down Expand Up @@ -156,6 +157,7 @@ class CommitFailureTestRelation(
context: TaskAttemptContext): OutputWriter = {
new SimpleTextOutputWriter(path, context) {
override def close(): Unit = {
super.close()
sys.error("Intentional task commitment failure for testing purpose.")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,20 @@ import java.io.File
import com.google.common.io.Files
import org.apache.hadoop.fs.Path

import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql._
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._
import org.apache.spark.{SparkException, SparkFunSuite}

abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
override val sqlContext: SQLContext = TestHive

import sqlContext._
import sqlContext.implicits._

val dataSourceName = classOf[SimpleTextSource].getCanonicalName
val dataSourceName: String

val dataSchema =
StructType(
Expand Down Expand Up @@ -470,6 +470,33 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
checkAnswer(table("t"), df.select('b, 'c, 'a).collect())
}
}

// NOTE: This test suite is not super deterministic. On nodes with only relatively few cores
// (4 or even 1), it's hard to reproduce the data loss issue. But on nodes with for example 8 or
// more cores, the issue can be reproduced steadily. Fortunately our Jenkins builder meets this
// requirement. We probably want to move this test case to spark-integration-tests or spark-perf
// later.
test("SPARK-8406: Avoids name collision while writing Parquet files") {
withTempPath { dir =>
val path = dir.getCanonicalPath
sqlContext
.range(10000)
.repartition(250)
.write
.mode(SaveMode.Overwrite)
.format(dataSourceName)
.save(path)

assertResult(10000) {
sqlContext
.read
.format(dataSourceName)
.option("dataSchema", StructType(StructField("id", LongType) :: Nil).json)
.load(path)
.count()
}
}
}
}

class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
Expand Down Expand Up @@ -502,15 +529,17 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
}

class CommitFailureTestRelationSuite extends SparkFunSuite with SQLTestUtils {
import TestHive.implicits._

override val sqlContext = TestHive

// When committing a task, `CommitFailureTestSource` throws an exception for testing purpose.
val dataSourceName: String = classOf[CommitFailureTestSource].getCanonicalName

test("SPARK-7684: commitTask() failure should fallback to abortTask()") {
withTempPath { file =>
val df = (1 to 3).map(i => i -> s"val_$i").toDF("a", "b")
// Here we coalesce partition number to 1 to ensure that only a single task is issued. This
// prevents race condition happened when FileOutputCommitter tries to remove the `_temporary`
// directory while committing/aborting the job. See SPARK-8513 for more details.
val df = sqlContext.range(0, 10).coalesce(1)
intercept[SparkException] {
df.write.format(dataSourceName).save(file.getCanonicalPath)
}
Expand Down