diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index fee0e6df7177..3d585864eefe 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -33,7 +33,7 @@ displayTitle: Spark SQL Upgrading Guide - Spark applications which are built with Spark version 2.4 and prior, and call methods of `UserDefinedFunction`, need to be re-compiled with Spark 3.0, as they are not binary compatible with Spark 3.0. - - Since Spark 3.0, CSV datasource uses java.time API for parsing and generating CSV content. New formatting implementation supports date/timestamp patterns conformed to ISO 8601. To switch back to the implementation used in Spark 2.4 and earlier, set `spark.sql.legacy.timeParser.enabled` to `true`. + - Since Spark 3.0, CSV/JSON datasources use java.time API for parsing and generating CSV/JSON content. In Spark version 2.4 and earlier, java.text.SimpleDateFormat is used for the same purpuse with fallbacks to the parsing mechanisms of Spark 2.0 and 1.x. For example, `2018-12-08 10:39:21.123` with the pattern `yyyy-MM-dd'T'HH:mm:ss.SSS` cannot be parsed since Spark 3.0 because the timestamp does not match to the pattern but it can be parsed by earlier Spark versions due to a fallback to `Timestamp.valueOf`. To parse the same timestamp since Spark 3.0, the pattern should be `yyyy-MM-dd HH:mm:ss.SSS`. To switch back to the implementation used in Spark 2.4 and earlier, set `spark.sql.legacy.timeParser.enabled` to `true`. ## Upgrading From Spark SQL 2.3 to 2.4 diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index 345dc4d41993..35ade136cc60 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -22,13 +22,13 @@ import scala.util.control.Exception.allCatch import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis.TypeCoercion import org.apache.spark.sql.catalyst.expressions.ExprUtils -import org.apache.spark.sql.catalyst.util.DateTimeFormatter +import org.apache.spark.sql.catalyst.util.TimestampFormatter import org.apache.spark.sql.types._ class CSVInferSchema(val options: CSVOptions) extends Serializable { @transient - private lazy val timeParser = DateTimeFormatter( + private lazy val timestampParser = TimestampFormatter( options.timestampFormat, options.timeZone, options.locale) @@ -160,7 +160,7 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { private def tryParseTimestamp(field: String): DataType = { // This case infers a custom `dataFormat` is set. - if ((allCatch opt timeParser.parse(field)).isDefined) { + if ((allCatch opt timestampParser.parse(field)).isDefined) { TimestampType } else { tryParseBoolean(field) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala index af09cd6c8449..f012d96138f3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala @@ -22,7 +22,7 @@ import java.io.Writer import com.univocity.parsers.csv.CsvWriter import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeFormatter} +import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter} import org.apache.spark.sql.types._ class UnivocityGenerator( @@ -41,18 +41,18 @@ class UnivocityGenerator( private val valueConverters: Array[ValueConverter] = schema.map(_.dataType).map(makeConverter).toArray - private val timeFormatter = DateTimeFormatter( + private val timestampFormatter = TimestampFormatter( options.timestampFormat, options.timeZone, options.locale) - private val dateFormatter = DateFormatter(options.dateFormat, options.timeZone, options.locale) + private val dateFormatter = DateFormatter(options.dateFormat, options.locale) private def makeConverter(dataType: DataType): ValueConverter = dataType match { case DateType => (row: InternalRow, ordinal: Int) => dateFormatter.format(row.getInt(ordinal)) case TimestampType => - (row: InternalRow, ordinal: Int) => timeFormatter.format(row.getLong(ordinal)) + (row: InternalRow, ordinal: Int) => timestampFormatter.format(row.getLong(ordinal)) case udt: UserDefinedType[_] => makeConverter(udt.sqlType) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 0f375e036029..ed089120055e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -74,11 +74,11 @@ class UnivocityParser( private val row = new GenericInternalRow(requiredSchema.length) - private val timeFormatter = DateTimeFormatter( + private val timestampFormatter = TimestampFormatter( options.timestampFormat, options.timeZone, options.locale) - private val dateFormatter = DateFormatter(options.dateFormat, options.timeZone, options.locale) + private val dateFormatter = DateFormatter(options.dateFormat, options.locale) // Retrieve the raw record string. private def getCurrentInput: UTF8String = { @@ -158,7 +158,7 @@ class UnivocityParser( } case _: TimestampType => (d: String) => - nullSafeDatum(d, name, nullable, options)(timeFormatter.parse) + nullSafeDatum(d, name, nullable, options)(timestampFormatter.parse) case _: DateType => (d: String) => nullSafeDatum(d, name, nullable, options)(dateFormatter.parse) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index e10b8a327c01..eaff3fa7bec2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -21,7 +21,6 @@ import java.nio.charset.{Charset, StandardCharsets} import java.util.{Locale, TimeZone} import com.fasterxml.jackson.core.{JsonFactory, JsonParser} -import org.apache.commons.lang3.time.FastDateFormat import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.util._ @@ -82,13 +81,10 @@ private[sql] class JSONOptions( val timeZone: TimeZone = DateTimeUtils.getTimeZone( parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId)) - // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe. - val dateFormat: FastDateFormat = - FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"), locale) + val dateFormat: String = parameters.getOrElse("dateFormat", "yyyy-MM-dd") - val timestampFormat: FastDateFormat = - FastDateFormat.getInstance( - parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"), timeZone, locale) + val timestampFormat: String = + parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX") val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala index d02a2be8ddad..951f5190cd50 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala @@ -23,7 +23,7 @@ import com.fasterxml.jackson.core._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.SpecializedGetters -import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData} +import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.types._ /** @@ -77,6 +77,12 @@ private[sql] class JacksonGenerator( private val lineSeparator: String = options.lineSeparatorInWrite + private val timestampFormatter = TimestampFormatter( + options.timestampFormat, + options.timeZone, + options.locale) + private val dateFormatter = DateFormatter(options.dateFormat, options.locale) + private def makeWriter(dataType: DataType): ValueWriter = dataType match { case NullType => (row: SpecializedGetters, ordinal: Int) => @@ -116,14 +122,12 @@ private[sql] class JacksonGenerator( case TimestampType => (row: SpecializedGetters, ordinal: Int) => - val timestampString = - options.timestampFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal))) + val timestampString = timestampFormatter.format(row.getLong(ordinal)) gen.writeString(timestampString) case DateType => (row: SpecializedGetters, ordinal: Int) => - val dateString = - options.dateFormat.format(DateTimeUtils.toJavaDate(row.getInt(ordinal))) + val dateString = dateFormatter.format(row.getInt(ordinal)) gen.writeString(dateString) case BinaryType => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 2357595906b1..4862fa289702 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -55,6 +55,12 @@ class JacksonParser( private val factory = new JsonFactory() options.setJacksonOptions(factory) + private val timestampFormatter = TimestampFormatter( + options.timestampFormat, + options.timeZone, + options.locale) + private val dateFormatter = DateFormatter(options.dateFormat, options.locale) + /** * Create a converter which converts the JSON documents held by the `JsonParser` * to a value according to a desired schema. This is a wrapper for the method @@ -218,17 +224,7 @@ class JacksonParser( case TimestampType => (parser: JsonParser) => parseJsonToken[java.lang.Long](parser, dataType) { case VALUE_STRING if parser.getTextLength >= 1 => - val stringValue = parser.getText - // This one will lose microseconds parts. - // See https://issues.apache.org/jira/browse/SPARK-10681. - Long.box { - Try(options.timestampFormat.parse(stringValue).getTime * 1000L) - .getOrElse { - // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards - // compatibility. - DateTimeUtils.stringToTime(stringValue).getTime * 1000L - } - } + timestampFormatter.parse(parser.getText) case VALUE_NUMBER_INT => parser.getLongValue * 1000000L @@ -237,22 +233,7 @@ class JacksonParser( case DateType => (parser: JsonParser) => parseJsonToken[java.lang.Integer](parser, dataType) { case VALUE_STRING if parser.getTextLength >= 1 => - val stringValue = parser.getText - // This one will lose microseconds parts. - // See https://issues.apache.org/jira/browse/SPARK-10681.x - Int.box { - Try(DateTimeUtils.millisToDays(options.dateFormat.parse(stringValue).getTime)) - .orElse { - // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards - // compatibility. - Try(DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(stringValue).getTime)) - } - .getOrElse { - // In Spark 1.5.0, we store the data as number of days since epoch in string. - // So, we just convert it to Int. - stringValue.toInt - } - } + dateFormatter.parse(parser.getText) } case BinaryType => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala similarity index 63% rename from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala index ad1f4131de2f..2b8d22dde926 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.util import java.time._ import java.time.format.DateTimeFormatterBuilder -import java.time.temporal.{ChronoField, TemporalQueries} +import java.time.temporal.{ChronoField, TemporalAccessor, TemporalQueries} import java.util.{Locale, TimeZone} import scala.util.Try @@ -28,31 +28,44 @@ import org.apache.commons.lang3.time.FastDateFormat import org.apache.spark.sql.internal.SQLConf -sealed trait DateTimeFormatter { +sealed trait TimestampFormatter { def parse(s: String): Long // returns microseconds since epoch def format(us: Long): String } -class Iso8601DateTimeFormatter( +trait FormatterUtils { + protected def zoneId: ZoneId + protected def buildFormatter( + pattern: String, + locale: Locale): java.time.format.DateTimeFormatter = { + new DateTimeFormatterBuilder() + .appendPattern(pattern) + .parseDefaulting(ChronoField.YEAR_OF_ERA, 1970) + .parseDefaulting(ChronoField.MONTH_OF_YEAR, 1) + .parseDefaulting(ChronoField.DAY_OF_MONTH, 1) + .parseDefaulting(ChronoField.HOUR_OF_DAY, 0) + .parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0) + .parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0) + .toFormatter(locale) + } + protected def toInstantWithZoneId(temporalAccessor: TemporalAccessor): java.time.Instant = { + val localDateTime = LocalDateTime.from(temporalAccessor) + val zonedDateTime = ZonedDateTime.of(localDateTime, zoneId) + Instant.from(zonedDateTime) + } +} + +class Iso8601TimestampFormatter( pattern: String, timeZone: TimeZone, - locale: Locale) extends DateTimeFormatter { - val formatter = new DateTimeFormatterBuilder() - .appendPattern(pattern) - .parseDefaulting(ChronoField.YEAR_OF_ERA, 1970) - .parseDefaulting(ChronoField.MONTH_OF_YEAR, 1) - .parseDefaulting(ChronoField.DAY_OF_MONTH, 1) - .parseDefaulting(ChronoField.HOUR_OF_DAY, 0) - .parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0) - .parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0) - .toFormatter(locale) + locale: Locale) extends TimestampFormatter with FormatterUtils { + val zoneId = timeZone.toZoneId + val formatter = buildFormatter(pattern, locale) def toInstant(s: String): Instant = { val temporalAccessor = formatter.parse(s) if (temporalAccessor.query(TemporalQueries.offset()) == null) { - val localDateTime = LocalDateTime.from(temporalAccessor) - val zonedDateTime = ZonedDateTime.of(localDateTime, timeZone.toZoneId) - Instant.from(zonedDateTime) + toInstantWithZoneId(temporalAccessor) } else { Instant.from(temporalAccessor) } @@ -75,10 +88,10 @@ class Iso8601DateTimeFormatter( } } -class LegacyDateTimeFormatter( +class LegacyTimestampFormatter( pattern: String, timeZone: TimeZone, - locale: Locale) extends DateTimeFormatter { + locale: Locale) extends TimestampFormatter { val format = FastDateFormat.getInstance(pattern, timeZone, locale) protected def toMillis(s: String): Long = format.parse(s).getTime @@ -90,21 +103,21 @@ class LegacyDateTimeFormatter( } } -class LegacyFallbackDateTimeFormatter( +class LegacyFallbackTimestampFormatter( pattern: String, timeZone: TimeZone, - locale: Locale) extends LegacyDateTimeFormatter(pattern, timeZone, locale) { + locale: Locale) extends LegacyTimestampFormatter(pattern, timeZone, locale) { override def toMillis(s: String): Long = { Try {super.toMillis(s)}.getOrElse(DateTimeUtils.stringToTime(s).getTime) } } -object DateTimeFormatter { - def apply(format: String, timeZone: TimeZone, locale: Locale): DateTimeFormatter = { +object TimestampFormatter { + def apply(format: String, timeZone: TimeZone, locale: Locale): TimestampFormatter = { if (SQLConf.get.legacyTimeParserEnabled) { - new LegacyFallbackDateTimeFormatter(format, timeZone, locale) + new LegacyFallbackTimestampFormatter(format, timeZone, locale) } else { - new Iso8601DateTimeFormatter(format, timeZone, locale) + new Iso8601TimestampFormatter(format, timeZone, locale) } } } @@ -116,13 +129,19 @@ sealed trait DateFormatter { class Iso8601DateFormatter( pattern: String, - timeZone: TimeZone, - locale: Locale) extends DateFormatter { + locale: Locale) extends DateFormatter with FormatterUtils { + + val zoneId = ZoneId.of("UTC") + + val formatter = buildFormatter(pattern, locale) - val dateTimeFormatter = new Iso8601DateTimeFormatter(pattern, timeZone, locale) + def toInstant(s: String): Instant = { + val temporalAccessor = formatter.parse(s) + toInstantWithZoneId(temporalAccessor) + } override def parse(s: String): Int = { - val seconds = dateTimeFormatter.toInstant(s).getEpochSecond + val seconds = toInstant(s).getEpochSecond val days = Math.floorDiv(seconds, DateTimeUtils.SECONDS_PER_DAY) days.toInt @@ -130,15 +149,12 @@ class Iso8601DateFormatter( override def format(days: Int): String = { val instant = Instant.ofEpochSecond(days * DateTimeUtils.SECONDS_PER_DAY) - dateTimeFormatter.formatter.withZone(timeZone.toZoneId).format(instant) + formatter.withZone(zoneId).format(instant) } } -class LegacyDateFormatter( - pattern: String, - timeZone: TimeZone, - locale: Locale) extends DateFormatter { - val format = FastDateFormat.getInstance(pattern, timeZone, locale) +class LegacyDateFormatter(pattern: String, locale: Locale) extends DateFormatter { + val format = FastDateFormat.getInstance(pattern, locale) def parse(s: String): Int = { val milliseconds = format.parse(s).getTime @@ -153,8 +169,7 @@ class LegacyDateFormatter( class LegacyFallbackDateFormatter( pattern: String, - timeZone: TimeZone, - locale: Locale) extends LegacyDateFormatter(pattern, timeZone, locale) { + locale: Locale) extends LegacyDateFormatter(pattern, locale) { override def parse(s: String): Int = { Try(super.parse(s)).orElse { // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards @@ -169,11 +184,11 @@ class LegacyFallbackDateFormatter( } object DateFormatter { - def apply(format: String, timeZone: TimeZone, locale: Locale): DateFormatter = { + def apply(format: String, locale: Locale): DateFormatter = { if (SQLConf.get.legacyTimeParserEnabled) { - new LegacyFallbackDateFormatter(format, timeZone, locale) + new LegacyFallbackDateFormatter(format, locale) } else { - new Iso8601DateFormatter(format, timeZone, locale) + new Iso8601DateFormatter(format, locale) } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala deleted file mode 100644 index 02d4ee049060..000000000000 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.util - -import java.util.{Locale, TimeZone} - -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeFormatter, DateTimeTestUtils} - -class DateTimeFormatterSuite extends SparkFunSuite { - test("parsing dates using time zones") { - val localDate = "2018-12-02" - val expectedDays = Map( - "UTC" -> 17867, - "PST" -> 17867, - "CET" -> 17866, - "Africa/Dakar" -> 17867, - "America/Los_Angeles" -> 17867, - "Antarctica/Vostok" -> 17866, - "Asia/Hong_Kong" -> 17866, - "Europe/Amsterdam" -> 17866) - DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone => - val formatter = DateFormatter("yyyy-MM-dd", TimeZone.getTimeZone(timeZone), Locale.US) - val daysSinceEpoch = formatter.parse(localDate) - assert(daysSinceEpoch === expectedDays(timeZone)) - } - } - - test("parsing timestamps using time zones") { - val localDate = "2018-12-02T10:11:12.001234" - val expectedMicros = Map( - "UTC" -> 1543745472001234L, - "PST" -> 1543774272001234L, - "CET" -> 1543741872001234L, - "Africa/Dakar" -> 1543745472001234L, - "America/Los_Angeles" -> 1543774272001234L, - "Antarctica/Vostok" -> 1543723872001234L, - "Asia/Hong_Kong" -> 1543716672001234L, - "Europe/Amsterdam" -> 1543741872001234L) - DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone => - val formatter = DateTimeFormatter( - "yyyy-MM-dd'T'HH:mm:ss.SSSSSS", - TimeZone.getTimeZone(timeZone), - Locale.US) - val microsSinceEpoch = formatter.parse(localDate) - assert(microsSinceEpoch === expectedMicros(timeZone)) - } - } - - test("format dates using time zones") { - val daysSinceEpoch = 17867 - val expectedDate = Map( - "UTC" -> "2018-12-02", - "PST" -> "2018-12-01", - "CET" -> "2018-12-02", - "Africa/Dakar" -> "2018-12-02", - "America/Los_Angeles" -> "2018-12-01", - "Antarctica/Vostok" -> "2018-12-02", - "Asia/Hong_Kong" -> "2018-12-02", - "Europe/Amsterdam" -> "2018-12-02") - DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone => - val formatter = DateFormatter("yyyy-MM-dd", TimeZone.getTimeZone(timeZone), Locale.US) - val date = formatter.format(daysSinceEpoch) - assert(date === expectedDate(timeZone)) - } - } - - test("format timestamps using time zones") { - val microsSinceEpoch = 1543745472001234L - val expectedTimestamp = Map( - "UTC" -> "2018-12-02T10:11:12.001234", - "PST" -> "2018-12-02T02:11:12.001234", - "CET" -> "2018-12-02T11:11:12.001234", - "Africa/Dakar" -> "2018-12-02T10:11:12.001234", - "America/Los_Angeles" -> "2018-12-02T02:11:12.001234", - "Antarctica/Vostok" -> "2018-12-02T16:11:12.001234", - "Asia/Hong_Kong" -> "2018-12-02T18:11:12.001234", - "Europe/Amsterdam" -> "2018-12-02T11:11:12.001234") - DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone => - val formatter = DateTimeFormatter( - "yyyy-MM-dd'T'HH:mm:ss.SSSSSS", - TimeZone.getTimeZone(timeZone), - Locale.US) - val timestamp = formatter.format(microsSinceEpoch) - assert(timestamp === expectedTimestamp(timeZone)) - } - } -} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimestampFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimestampFormatterSuite.scala new file mode 100644 index 000000000000..43e348c7eebf --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimestampFormatterSuite.scala @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.util + +import java.util.{Locale, TimeZone} + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.plans.SQLHelper +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.internal.SQLConf + +class DateTimestampFormatterSuite extends SparkFunSuite with SQLHelper { + test("parsing dates") { + DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone => + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) { + val formatter = DateFormatter("yyyy-MM-dd", Locale.US) + val daysSinceEpoch = formatter.parse("2018-12-02") + assert(daysSinceEpoch === 17867) + } + } + } + + test("parsing timestamps using time zones") { + val localDate = "2018-12-02T10:11:12.001234" + val expectedMicros = Map( + "UTC" -> 1543745472001234L, + "PST" -> 1543774272001234L, + "CET" -> 1543741872001234L, + "Africa/Dakar" -> 1543745472001234L, + "America/Los_Angeles" -> 1543774272001234L, + "Antarctica/Vostok" -> 1543723872001234L, + "Asia/Hong_Kong" -> 1543716672001234L, + "Europe/Amsterdam" -> 1543741872001234L) + DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone => + val formatter = TimestampFormatter( + "yyyy-MM-dd'T'HH:mm:ss.SSSSSS", + TimeZone.getTimeZone(timeZone), + Locale.US) + val microsSinceEpoch = formatter.parse(localDate) + assert(microsSinceEpoch === expectedMicros(timeZone)) + } + } + + test("format dates") { + DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone => + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) { + val formatter = DateFormatter("yyyy-MM-dd", Locale.US) + val date = formatter.format(17867) + assert(date === "2018-12-02") + } + } + } + + test("format timestamps using time zones") { + val microsSinceEpoch = 1543745472001234L + val expectedTimestamp = Map( + "UTC" -> "2018-12-02T10:11:12.001234", + "PST" -> "2018-12-02T02:11:12.001234", + "CET" -> "2018-12-02T11:11:12.001234", + "Africa/Dakar" -> "2018-12-02T10:11:12.001234", + "America/Los_Angeles" -> "2018-12-02T02:11:12.001234", + "Antarctica/Vostok" -> "2018-12-02T16:11:12.001234", + "Asia/Hong_Kong" -> "2018-12-02T18:11:12.001234", + "Europe/Amsterdam" -> "2018-12-02T11:11:12.001234") + DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone => + val formatter = TimestampFormatter( + "yyyy-MM-dd'T'HH:mm:ss.SSSSSS", + TimeZone.getTimeZone(timeZone), + Locale.US) + val timestamp = formatter.format(microsSinceEpoch) + assert(timestamp === expectedTimestamp(timeZone)) + } + } + + test("roundtrip timestamp -> micros -> timestamp using timezones") { + Seq( + -58710115316212000L, + -18926315945345679L, + -9463427405253013L, + -244000001L, + 0L, + 99628200102030L, + 1543749753123456L, + 2177456523456789L, + 11858049903010203L).foreach { micros => + DateTimeTestUtils.outstandingTimezones.foreach { timeZone => + val formatter = TimestampFormatter("yyyy-MM-dd'T'HH:mm:ss.SSSSSS", timeZone, Locale.US) + val timestamp = formatter.format(micros) + val parsed = formatter.parse(timestamp) + assert(micros === parsed) + } + } + } + + test("roundtrip micros -> timestamp -> micros using timezones") { + Seq( + "0109-07-20T18:38:03.788000", + "1370-04-01T10:00:54.654321", + "1670-02-11T14:09:54.746987", + "1969-12-31T23:55:55.999999", + "1970-01-01T00:00:00.000000", + "1973-02-27T02:30:00.102030", + "2018-12-02T11:22:33.123456", + "2039-01-01T01:02:03.456789", + "2345-10-07T22:45:03.010203").foreach { timestamp => + DateTimeTestUtils.outstandingTimezones.foreach { timeZone => + val formatter = TimestampFormatter("yyyy-MM-dd'T'HH:mm:ss.SSSSSS", timeZone, Locale.US) + val micros = formatter.parse(timestamp) + val formatted = formatter.format(micros) + assert(timestamp === formatted) + } + } + } + + test("roundtrip date -> days -> date") { + Seq( + "0050-01-01", + "0953-02-02", + "1423-03-08", + "1969-12-31", + "1972-08-25", + "1975-09-26", + "2018-12-12", + "2038-01-01", + "5010-11-17").foreach { date => + DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone => + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) { + val formatter = DateFormatter("yyyy-MM-dd", Locale.US) + val days = formatter.parse(date) + val formatted = formatter.format(days) + assert(date === formatted) + } + } + } + } + + test("roundtrip days -> date -> days") { + Seq( + -701265, + -371419, + -199722, + -1, + 0, + 967, + 2094, + 17877, + 24837, + 1110657).foreach { days => + DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone => + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) { + val formatter = DateFormatter("yyyy-MM-dd", Locale.US) + val date = formatter.format(days) + val parsed = formatter.parse(date) + assert(days === parsed) + } + } + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index dff37ca2d40f..96f7d24b3635 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -57,14 +57,17 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } val factory = new JsonFactory() - def enforceCorrectType(value: Any, dataType: DataType): Any = { + def enforceCorrectType( + value: Any, + dataType: DataType, + options: Map[String, String] = Map.empty): Any = { val writer = new StringWriter() Utils.tryWithResource(factory.createGenerator(writer)) { generator => generator.writeObject(value) generator.flush() } - val dummyOption = new JSONOptions(Map.empty[String, String], "GMT") + val dummyOption = new JSONOptions(options, SQLConf.get.sessionLocalTimeZone) val dummySchema = StructType(Seq.empty) val parser = new JacksonParser(dummySchema, dummyOption, allowArrayAsStructs = true) @@ -96,19 +99,27 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(intNumber.toLong * 1000L)), enforceCorrectType(intNumber.toLong, TimestampType)) val strTime = "2014-09-30 12:34:56" - checkTypePromotion(DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(strTime)), - enforceCorrectType(strTime, TimestampType)) + checkTypePromotion( + expected = DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(strTime)), + enforceCorrectType(strTime, TimestampType, + Map("timestampFormat" -> "yyyy-MM-dd HH:mm:ss"))) val strDate = "2014-10-15" checkTypePromotion( DateTimeUtils.fromJavaDate(Date.valueOf(strDate)), enforceCorrectType(strDate, DateType)) val ISO8601Time1 = "1970-01-01T01:00:01.0Z" - val ISO8601Time2 = "1970-01-01T02:00:01-01:00" checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(3601000)), - enforceCorrectType(ISO8601Time1, TimestampType)) + enforceCorrectType( + ISO8601Time1, + TimestampType, + Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss.SX"))) + val ISO8601Time2 = "1970-01-01T02:00:01-01:00" checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(10801000)), - enforceCorrectType(ISO8601Time2, TimestampType)) + enforceCorrectType( + ISO8601Time2, + TimestampType, + Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ssXXX"))) val ISO8601Date = "1970-01-01" checkTypePromotion(DateTimeUtils.millisToDays(32400000), @@ -1440,103 +1451,105 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } test("backward compatibility") { - // This test we make sure our JSON support can read JSON data generated by previous version - // of Spark generated through toJSON method and JSON data source. - // The data is generated by the following program. - // Here are a few notes: - // - Spark 1.5.0 cannot save timestamp data. So, we manually added timestamp field (col13) - // in the JSON object. - // - For Spark before 1.5.1, we do not generate UDTs. So, we manually added the UDT value to - // JSON objects generated by those Spark versions (col17). - // - If the type is NullType, we do not write data out. - - // Create the schema. - val struct = - StructType( - StructField("f1", FloatType, true) :: - StructField("f2", ArrayType(BooleanType), true) :: Nil) + withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> "true") { + // This test we make sure our JSON support can read JSON data generated by previous version + // of Spark generated through toJSON method and JSON data source. + // The data is generated by the following program. + // Here are a few notes: + // - Spark 1.5.0 cannot save timestamp data. So, we manually added timestamp field (col13) + // in the JSON object. + // - For Spark before 1.5.1, we do not generate UDTs. So, we manually added the UDT value to + // JSON objects generated by those Spark versions (col17). + // - If the type is NullType, we do not write data out. + + // Create the schema. + val struct = + StructType( + StructField("f1", FloatType, true) :: + StructField("f2", ArrayType(BooleanType), true) :: Nil) - val dataTypes = - Seq( - StringType, BinaryType, NullType, BooleanType, - ByteType, ShortType, IntegerType, LongType, - FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5), - DateType, TimestampType, - ArrayType(IntegerType), MapType(StringType, LongType), struct, - new TestUDT.MyDenseVectorUDT()) - val fields = dataTypes.zipWithIndex.map { case (dataType, index) => - StructField(s"col$index", dataType, nullable = true) - } - val schema = StructType(fields) + val dataTypes = + Seq( + StringType, BinaryType, NullType, BooleanType, + ByteType, ShortType, IntegerType, LongType, + FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5), + DateType, TimestampType, + ArrayType(IntegerType), MapType(StringType, LongType), struct, + new TestUDT.MyDenseVectorUDT()) + val fields = dataTypes.zipWithIndex.map { case (dataType, index) => + StructField(s"col$index", dataType, nullable = true) + } + val schema = StructType(fields) - val constantValues = - Seq( - "a string in binary".getBytes(StandardCharsets.UTF_8), - null, - true, - 1.toByte, - 2.toShort, - 3, - Long.MaxValue, - 0.25.toFloat, - 0.75, - new java.math.BigDecimal(s"1234.23456"), - new java.math.BigDecimal(s"1.23456"), - java.sql.Date.valueOf("2015-01-01"), - java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123"), - Seq(2, 3, 4), - Map("a string" -> 2000L), - Row(4.75.toFloat, Seq(false, true)), - new TestUDT.MyDenseVector(Array(0.25, 2.25, 4.25))) - val data = - Row.fromSeq(Seq("Spark " + spark.sparkContext.version) ++ constantValues) :: Nil + val constantValues = + Seq( + "a string in binary".getBytes(StandardCharsets.UTF_8), + null, + true, + 1.toByte, + 2.toShort, + 3, + Long.MaxValue, + 0.25.toFloat, + 0.75, + new java.math.BigDecimal(s"1234.23456"), + new java.math.BigDecimal(s"1.23456"), + java.sql.Date.valueOf("2015-01-01"), + java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123"), + Seq(2, 3, 4), + Map("a string" -> 2000L), + Row(4.75.toFloat, Seq(false, true)), + new TestUDT.MyDenseVector(Array(0.25, 2.25, 4.25))) + val data = + Row.fromSeq(Seq("Spark " + spark.sparkContext.version) ++ constantValues) :: Nil - // Data generated by previous versions. - // scalastyle:off - val existingJSONData = + // Data generated by previous versions. + // scalastyle:off + val existingJSONData = """{"col0":"Spark 1.2.2","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: - """{"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: - """{"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: - """{"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: - """{"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: - """{"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: - """{"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"16436","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: Nil - // scalastyle:on - - // Generate data for the current version. - val df = spark.createDataFrame(spark.sparkContext.parallelize(data, 1), schema) - withTempPath { path => - df.write.format("json").mode("overwrite").save(path.getCanonicalPath) + """{"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: + """{"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: + """{"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: + """{"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: + """{"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: + """{"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"16436","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: Nil + // scalastyle:on + + // Generate data for the current version. + val df = spark.createDataFrame(spark.sparkContext.parallelize(data, 1), schema) + withTempPath { path => + df.write.format("json").mode("overwrite").save(path.getCanonicalPath) - // df.toJSON will convert internal rows to external rows first and then generate - // JSON objects. While, df.write.format("json") will write internal rows directly. - val allJSON = + // df.toJSON will convert internal rows to external rows first and then generate + // JSON objects. While, df.write.format("json") will write internal rows directly. + val allJSON = existingJSONData ++ df.toJSON.collect() ++ sparkContext.textFile(path.getCanonicalPath).collect() - Utils.deleteRecursively(path) - sparkContext.parallelize(allJSON, 1).saveAsTextFile(path.getCanonicalPath) - - // Read data back with the schema specified. - val col0Values = - Seq( - "Spark 1.2.2", - "Spark 1.3.1", - "Spark 1.3.1", - "Spark 1.4.1", - "Spark 1.4.1", - "Spark 1.5.0", - "Spark 1.5.0", - "Spark " + spark.sparkContext.version, - "Spark " + spark.sparkContext.version) - val expectedResult = col0Values.map { v => - Row.fromSeq(Seq(v) ++ constantValues) + Utils.deleteRecursively(path) + sparkContext.parallelize(allJSON, 1).saveAsTextFile(path.getCanonicalPath) + + // Read data back with the schema specified. + val col0Values = + Seq( + "Spark 1.2.2", + "Spark 1.3.1", + "Spark 1.3.1", + "Spark 1.4.1", + "Spark 1.4.1", + "Spark 1.5.0", + "Spark 1.5.0", + "Spark " + spark.sparkContext.version, + "Spark " + spark.sparkContext.version) + val expectedResult = col0Values.map { v => + Row.fromSeq(Seq(v) ++ constantValues) + } + checkAnswer( + spark.read.format("json").schema(schema).load(path.getCanonicalPath), + expectedResult + ) } - checkAnswer( - spark.read.format("json").schema(schema).load(path.getCanonicalPath), - expectedResult - ) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala index 6075f2c8877d..f0f62b608785 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.sources import java.io.File +import java.util.TimeZone import scala.util.Random @@ -125,56 +126,62 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes } else { Seq(false) } - for (dataType <- supportedDataTypes) { - for (parquetDictionaryEncodingEnabled <- parquetDictionaryEncodingEnabledConfs) { - val extraMessage = if (isParquetDataSource) { - s" with parquet.enable.dictionary = $parquetDictionaryEncodingEnabled" - } else { - "" - } - logInfo(s"Testing $dataType data type$extraMessage") - - val extraOptions = Map[String, String]( - "parquet.enable.dictionary" -> parquetDictionaryEncodingEnabled.toString - ) - - withTempPath { file => - val path = file.getCanonicalPath - - val dataGenerator = RandomDataGenerator.forType( - dataType = dataType, - nullable = true, - new Random(System.nanoTime()) - ).getOrElse { - fail(s"Failed to create data generator for schema $dataType") + // TODO: Support new parser too, see SPARK-26374. + withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> "true") { + for (dataType <- supportedDataTypes) { + for (parquetDictionaryEncodingEnabled <- parquetDictionaryEncodingEnabledConfs) { + val extraMessage = if (isParquetDataSource) { + s" with parquet.enable.dictionary = $parquetDictionaryEncodingEnabled" + } else { + "" + } + logInfo(s"Testing $dataType data type$extraMessage") + + val extraOptions = Map[String, String]( + "parquet.enable.dictionary" -> parquetDictionaryEncodingEnabled.toString + ) + + withTempPath { file => + val path = file.getCanonicalPath + + val seed = System.nanoTime() + withClue(s"Random data generated with the seed: ${seed}") { + val dataGenerator = RandomDataGenerator.forType( + dataType = dataType, + nullable = true, + new Random(seed) + ).getOrElse { + fail(s"Failed to create data generator for schema $dataType") + } + + // Create a DF for the schema with random data. The index field is used to sort the + // DataFrame. This is a workaround for SPARK-10591. + val schema = new StructType() + .add("index", IntegerType, nullable = false) + .add("col", dataType, nullable = true) + val rdd = + spark.sparkContext.parallelize((1 to 10).map(i => Row(i, dataGenerator()))) + val df = spark.createDataFrame(rdd, schema).orderBy("index").coalesce(1) + + df.write + .mode("overwrite") + .format(dataSourceName) + .option("dataSchema", df.schema.json) + .options(extraOptions) + .save(path) + + val loadedDF = spark + .read + .format(dataSourceName) + .option("dataSchema", df.schema.json) + .schema(df.schema) + .options(extraOptions) + .load(path) + .orderBy("index") + + checkAnswer(loadedDF, df) + } } - - // Create a DF for the schema with random data. The index field is used to sort the - // DataFrame. This is a workaround for SPARK-10591. - val schema = new StructType() - .add("index", IntegerType, nullable = false) - .add("col", dataType, nullable = true) - val rdd = - spark.sparkContext.parallelize((1 to 10).map(i => Row(i, dataGenerator()))) - val df = spark.createDataFrame(rdd, schema).orderBy("index").coalesce(1) - - df.write - .mode("overwrite") - .format(dataSourceName) - .option("dataSchema", df.schema.json) - .options(extraOptions) - .save(path) - - val loadedDF = spark - .read - .format(dataSourceName) - .option("dataSchema", df.schema.json) - .schema(df.schema) - .options(extraOptions) - .load(path) - .orderBy("index") - - checkAnswer(loadedDF, df) } } }