Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes

val dataSourceName: String

protected val parquetDataSourceName: String = "parquet"

private def isParquetDataSource: Boolean = dataSourceName == parquetDataSourceName

protected def supportsDataType(dataType: DataType): Boolean = true

val dataSchema =
Expand Down Expand Up @@ -114,10 +118,21 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
new UDT.MyDenseVectorUDT()
).filter(supportsDataType)

for (dataType <- supportedDataTypes) {
for (parquetDictionaryEncodingEnabled <- Seq(true, false)) {
test(s"test all data types - $dataType with parquet.enable.dictionary = " +
s"$parquetDictionaryEncodingEnabled") {
test(s"test all data types") {
val parquetDictionaryEncodingEnabledConfs = if (isParquetDataSource) {
// Run with/without Parquet dictionary encoding enabled for Parquet data source.
Seq(true, false)
} else {
Seq(false)
}
for (dataType <- supportedDataTypes) {
for (parquetDictionaryEncodingEnabled <- parquetDictionaryEncodingEnabledConfs) {
val extraMessage = if (isParquetDataSource) {
s" with parquet.enable.dictionary = $parquetDictionaryEncodingEnabled"
} else {
""
}
logInfo(s"Testing $dataType data type$extraMessage")

val extraOptions = Map[String, String](
"parquet.enable.dictionary" -> parquetDictionaryEncodingEnabled.toString
Expand Down Expand Up @@ -754,33 +769,6 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
}
}

// NOTE: This test suite is not super deterministic. On nodes with only relatively few cores
// (4 or even 1), it's hard to reproduce the data loss issue. But on nodes with for example 8 or
// more cores, the issue can be reproduced steadily. Fortunately our Jenkins builder meets this
// requirement. We probably want to move this test case to spark-integration-tests or spark-perf
// later.
test("SPARK-8406: Avoids name collision while writing files") {
withTempPath { dir =>
val path = dir.getCanonicalPath
spark
.range(10000)
.repartition(250)
.write
.mode(SaveMode.Overwrite)
.format(dataSourceName)
.save(path)

assertResult(10000) {
spark
.read
.format(dataSourceName)
.option("dataSchema", StructType(StructField("id", LongType) :: Nil).json)
.load(path)
.count()
}
}
}

test("SPARK-8887: Explicitly define which data types can be used as dynamic partition columns") {
val df = Seq(
(1, "v1", Array(1, 2, 3), Map("k1" -> "v1"), Tuple2(1, "4")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql.types._
class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
import testImplicits._

override val dataSourceName: String = "parquet"
override val dataSourceName: String = parquetDataSourceName

// Parquet does not play well with NullType.
override protected def supportsDataType(dataType: DataType): Boolean = dataType match {
Expand Down Expand Up @@ -232,4 +232,33 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
}
}
}

// NOTE: This test suite is not super deterministic. On nodes with only relatively few cores
// (4 or even 1), it's hard to reproduce the data loss issue. But on nodes with for example 8 or
// more cores, the issue can be reproduced steadily. Fortunately our Jenkins builder meets this
// requirement. We probably want to move this test case to spark-integration-tests or spark-perf
// later.
// Also, this test is slow. As now all the file format data source are using common code
// for creating result files, we can test Parquet only to reduce test time.
test("SPARK-8406: Avoids name collision while writing files") {
withTempPath { dir =>
val path = dir.getCanonicalPath
spark
.range(10000)
.repartition(250)
.write
.mode(SaveMode.Overwrite)
.format(dataSourceName)
.save(path)

assertResult(10000) {
spark
.read
.format(dataSourceName)
.option("dataSchema", StructType(StructField("id", LongType) :: Nil).json)
.load(path)
.count()
}
}
}
}