Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.parquet.hadoop.example.ExampleParquetWriter
import org.apache.parquet.io.api.Binary
import org.apache.parquet.schema.{MessageType, MessageTypeParser}

import org.apache.spark.{SPARK_VERSION_SHORT, SparkException, TestUtils}
import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf, SparkException, TestUtils}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow}
Expand Down Expand Up @@ -1206,43 +1206,6 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
errorMessage.contains("is not a valid DFS filename"))
}

test("SPARK-7837 Do not close output writer twice when commitTask() fails") {
withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) {
// Using a output committer that always fail when committing a task, so that both
// `commitTask()` and `abortTask()` are invoked.
val extraOptions = Map[String, String](
SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key ->
classOf[TaskCommitFailureParquetOutputCommitter].getCanonicalName
)

// Before fixing SPARK-7837, the following code results in an NPE because both
// `commitTask()` and `abortTask()` try to close output writers.

withTempPath { dir =>
val m1 = intercept[SparkException] {
spark.range(1).coalesce(1).write.options(extraOptions).parquet(dir.getCanonicalPath)
}
assert(m1.getErrorClass == "TASK_WRITE_FAILED")
assert(m1.getCause.getMessage.contains("Intentional exception for testing purposes"))
}

withTempPath { dir =>
val m2 = intercept[SparkException] {
val df = spark.range(1).select($"id" as Symbol("a"), $"id" as Symbol("b"))
.coalesce(1)
df.write.partitionBy("a").options(extraOptions).parquet(dir.getCanonicalPath)
}
if (m2.getErrorClass != null) {
assert(m2.getErrorClass == "TASK_WRITE_FAILED")
assert(m2.getCause.getMessage.contains("Intentional exception for testing purposes"))
} else {
assert(m2.getMessage.contains("TASK_WRITE_FAILED"))
}
}
}
}

test("SPARK-11044 Parquet writer version fixed as version1 ") {
withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) {
Expand Down Expand Up @@ -1587,6 +1550,56 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
}

// Parquet IO test suite with output commit coordination disabled.
// This test suite is separated ParquetIOSuite to avoid race condition of failure events
// from `OutputCommitCoordination` and `TaskSetManager`.
class ParquetIOWithoutOutputCommitCoordinationSuite
extends QueryTest with ParquetTest with SharedSparkSession {
import testImplicits._

override protected def sparkConf: SparkConf = {
super.sparkConf
.set("spark.hadoop.outputCommitCoordination.enabled", "false")
}

test("SPARK-7837 Do not close output writer twice when commitTask() fails") {
withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) {
// Using a output committer that always fail when committing a task, so that both
// `commitTask()` and `abortTask()` are invoked.
val extraOptions = Map[String, String](
SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key ->
classOf[TaskCommitFailureParquetOutputCommitter].getCanonicalName
)

// Before fixing SPARK-7837, the following code results in an NPE because both
// `commitTask()` and `abortTask()` try to close output writers.

withTempPath { dir =>
val m1 = intercept[SparkException] {
spark.range(1).coalesce(1).write.options(extraOptions).parquet(dir.getCanonicalPath)
}
assert(m1.getErrorClass == "TASK_WRITE_FAILED")
assert(m1.getCause.getMessage.contains("Intentional exception for testing purposes"))
}

withTempPath { dir =>
val m2 = intercept[SparkException] {
val df = spark.range(1).select($"id" as Symbol("a"), $"id" as Symbol("b"))
.coalesce(1)
df.write.partitionBy("a").options(extraOptions).parquet(dir.getCanonicalPath)
}
if (m2.getErrorClass != null) {
assert(m2.getErrorClass == "TASK_WRITE_FAILED")
assert(m2.getCause.getMessage.contains("Intentional exception for testing purposes"))
} else {
assert(m2.getMessage.contains("TASK_WRITE_FAILED"))
}
}
}
}
}

class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
extends ParquetOutputCommitter(outputPath, context) {

Expand Down