Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ private[sql] class DefaultWriterContainer(
val writer = outputWriterFactory.newInstance(getWorkPath, dataSchema, taskAttemptContext)
writer.initConverter(dataSchema)

var writerClosed = false

// If anything below fails, we should abort the task.
try {
while (iterator.hasNext) {
Expand All @@ -235,7 +237,10 @@ private[sql] class DefaultWriterContainer(
def commitTask(): Unit = {
try {
assert(writer != null, "OutputWriter instance should have been initialized")
writer.close()
if (!writerClosed) {
writer.close()
writerClosed = true
}
super.commitTask()
} catch {
case cause: Throwable =>
Expand All @@ -247,7 +252,10 @@ private[sql] class DefaultWriterContainer(

def abortTask(): Unit = {
try {
writer.close()
if (!writerClosed) {
writer.close()
writerClosed = true
}
} finally {
super.abortTask()
}
Expand Down Expand Up @@ -275,6 +283,8 @@ private[sql] class DynamicPartitionWriterContainer(
val outputWriters = new java.util.HashMap[InternalRow, OutputWriter]
executorSideSetup(taskContext)

var outputWritersCleared = false

// Returns the partition key given an input row
val getPartitionKey = UnsafeProjection.create(partitionColumns, inputSchema)
// Returns the data columns to be written given an input row
Expand Down Expand Up @@ -379,8 +389,11 @@ private[sql] class DynamicPartitionWriterContainer(
}

def clearOutputWriters(): Unit = {
outputWriters.asScala.values.foreach(_.close())
outputWriters.clear()
if (!outputWritersCleared) {
outputWriters.asScala.values.foreach(_.close())
outputWriters.clear()
outputWritersCleared = true
}
}

def commitTask(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {

configuration.set(
"spark.sql.parquet.output.committer.class",
classOf[BogusParquetOutputCommitter].getCanonicalName)
classOf[JobCommitFailureParquetOutputCommitter].getCanonicalName)

try {
val message = intercept[SparkException] {
Expand All @@ -450,12 +450,54 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
}.toString
assert(errorMessage.contains("UnknownHostException"))
}

test("SPARK-7837 Do not close output writer twice when commitTask() fails") {
val clonedConf = new Configuration(configuration)

// Using a output committer that always fail when committing a task, so that both
// `commitTask()` and `abortTask()` are invoked.
configuration.set(
"spark.sql.parquet.output.committer.class",
classOf[TaskCommitFailureParquetOutputCommitter].getCanonicalName)

try {
// Before fixing SPARK-7837, the following code results in an NPE because both
// `commitTask()` and `abortTask()` try to close output writers.

withTempPath { dir =>
val m1 = intercept[SparkException] {
sqlContext.range(1).coalesce(1).write.parquet(dir.getCanonicalPath)
}.getCause.getMessage
assert(m1.contains("Intentional exception for testing purposes"))
}

withTempPath { dir =>
val m2 = intercept[SparkException] {
val df = sqlContext.range(1).select('id as 'a, 'id as 'b).coalesce(1)
df.write.partitionBy("a").parquet(dir.getCanonicalPath)
}.getCause.getMessage
assert(m2.contains("Intentional exception for testing purposes"))
}
} finally {
// Hadoop 1 doesn't have `Configuration.unset`
configuration.clear()
clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue))
}
}
}

class BogusParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
extends ParquetOutputCommitter(outputPath, context) {

override def commitJob(jobContext: JobContext): Unit = {
sys.error("Intentional exception for testing purposes")
}
}

class TaskCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
extends ParquetOutputCommitter(outputPath, context) {

override def commitTask(context: TaskAttemptContext): Unit = {
sys.error("Intentional exception for testing purposes")
}
}