Skip to content

Commit 043edb6

Browse files
committed
minor updates, add test
1 parent 662460f commit 043edb6

File tree

7 files changed

+36
-21
lines changed

7 files changed

+36
-21
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {
4040
isParsing = true)
4141

4242
private val timestampNTZFormatter = TimestampFormatter(
43-
options.timestampNTZFormat,
43+
options.timestampNTZFormatInRead,
4444
options.zoneId,
4545
legacyFormat = FAST_DATE_FORMAT,
4646
isParsing = true,

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,9 @@ class CSVOptions(
164164
s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]"
165165
})
166166

167-
val timestampNTZFormat: Option[String] = parameters.get("timestampNTZFormat")
167+
val timestampNTZFormatInRead: Option[String] = parameters.get("timestampNTZFormat")
168+
val timestampNTZFormatInWrite: String = parameters.getOrElse("timestampNTZFormat",
169+
s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS]")
168170

169171
val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)
170172

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ class UnivocityGenerator(
4949
legacyFormat = FAST_DATE_FORMAT,
5050
isParsing = false)
5151
private val timestampNTZFormatter = TimestampFormatter(
52-
options.timestampNTZFormat,
52+
options.timestampNTZFormatInWrite,
5353
options.zoneId,
5454
legacyFormat = FAST_DATE_FORMAT,
5555
isParsing = false,

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ class UnivocityParser(
9494
legacyFormat = FAST_DATE_FORMAT,
9595
isParsing = true)
9696
private lazy val timestampNTZFormatter = TimestampFormatter(
97-
options.timestampNTZFormat,
97+
options.timestampNTZFormatInRead,
9898
options.zoneId,
9999
legacyFormat = FAST_DATE_FORMAT,
100100
isParsing = true,

sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,4 +368,15 @@ class CsvFunctionsSuite extends QueryTest with SharedSparkSession {
368368
.selectExpr("value.a")
369369
checkAnswer(fromCsvDF, Row(localDT))
370370
}
371+
372+
test("SPARK-36490: Handle incorrectly formatted timestamp_ntz values in from_csv") {
373+
val fromCsvDF = Seq("2021-08-12T15:16:23.000+11:00").toDF("csv")
374+
.select(
375+
from_csv(
376+
$"csv",
377+
StructType(StructField("a", TimestampNTZType) :: Nil),
378+
Map.empty[String, String]) as "value")
379+
.selectExpr("value.a")
380+
checkAnswer(fromCsvDF, Row(null))
381+
}
371382
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmark.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -343,11 +343,11 @@ object CSVBenchmark extends SqlBasedBenchmark {
343343
override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
344344
runBenchmark("Benchmark to measure CSV read/write performance") {
345345
val numIters = 3
346-
quotedValuesBenchmark(rowsNum = 50 * 1000, numIters)
347-
multiColumnsBenchmark(rowsNum = 1000 * 1000, numIters)
348-
countBenchmark(rowsNum = 10 * 1000 * 1000, numIters)
346+
// quotedValuesBenchmark(rowsNum = 50 * 1000, numIters)
347+
// multiColumnsBenchmark(rowsNum = 1000 * 1000, numIters)
348+
// countBenchmark(rowsNum = 10 * 1000 * 1000, numIters)
349349
datetimeBenchmark(rowsNum = 10 * 1000 * 1000, numIters)
350-
filtersPushdownBenchmark(rowsNum = 100 * 1000, numIters)
350+
// filtersPushdownBenchmark(rowsNum = 100 * 1000, numIters)
351351
}
352352
}
353353
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1130,19 +1130,21 @@ abstract class CSVSuite
11301130
.option("header", "true")
11311131
.load(path)
11321132

1133-
if (spark.conf.get(SQLConf.LEGACY_TIME_PARSER_POLICY.key) == "legacy") {
1134-
// Timestamps without timezone are parsed as strings, so the col0 type would be
1135-
// StringType which is similar to reading without schema inference.
1136-
val exp = spark.read.format("csv").option("header", "true").load(path)
1137-
checkAnswer(res, exp)
1138-
} else {
1139-
val exp = spark.sql("""
1140-
select timestamp_ltz'2020-12-12T12:12:12.000' as col0 union all
1141-
select timestamp_ltz'2020-12-12T17:12:12.000Z' as col0 union all
1142-
select timestamp_ltz'2020-12-12T17:12:12.000+05:00' as col0 union all
1143-
select timestamp_ltz'2020-12-12T12:12:12.000' as col0
1144-
""")
1145-
checkAnswer(res, exp)
1133+
for (policy <- Seq("exception", "corrected", "legacy")) {
1134+
if (spark.conf.get(SQLConf.LEGACY_TIME_PARSER_POLICY.key) == "legacy") {
1135+
// Timestamps without timezone are parsed as strings, so the col0 type would be
1136+
// StringType which is similar to reading without schema inference.
1137+
val exp = spark.read.format("csv").option("header", "true").load(path)
1138+
checkAnswer(res, exp)
1139+
} else {
1140+
val exp = spark.sql("""
1141+
select timestamp_ltz'2020-12-12T12:12:12.000' as col0 union all
1142+
select timestamp_ltz'2020-12-12T17:12:12.000Z' as col0 union all
1143+
select timestamp_ltz'2020-12-12T17:12:12.000+05:00' as col0 union all
1144+
select timestamp_ltz'2020-12-12T12:12:12.000' as col0
1145+
""")
1146+
checkAnswer(res, exp)
1147+
}
11461148
}
11471149
}
11481150
}

0 commit comments

Comments
 (0)