From 0394030e01b35a87cd294ad3c78619684d564aa1 Mon Sep 17 00:00:00 2001 From: xiaonanyang-db Date: Mon, 19 Sep 2022 10:19:28 -0700 Subject: [PATCH 01/26] SPARK-40474 Infer columns with mixed date and timestamp as String in CSV schema inference --- .../sql/catalyst/csv/CSVInferSchema.scala | 15 ++++++----- .../spark/sql/catalyst/csv/CSVOptions.scala | 9 +++---- .../sql/catalyst/csv/UnivocityParser.scala | 27 ++++++------------- .../catalyst/csv/CSVInferSchemaSuite.scala | 8 +++--- .../catalyst/csv/UnivocityParserSuite.scala | 22 --------------- .../execution/datasources/csv/CSVSuite.scala | 19 ++++++------- 6 files changed, 33 insertions(+), 67 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index 53d748989204..df3d5cc58318 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -123,10 +123,8 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { case LongType => tryParseLong(field) case _: DecimalType => tryParseDecimal(field) case DoubleType => tryParseDouble(field) - case DateType => tryParseDateTime(field) - case TimestampNTZType if options.prefersDate => tryParseDateTime(field) + case DateType => tryParseDate(field) case TimestampNTZType => tryParseTimestampNTZ(field) - case TimestampType if options.prefersDate => tryParseDateTime(field) case TimestampType => tryParseTimestamp(field) case BooleanType => tryParseBoolean(field) case StringType => StringType @@ -179,13 +177,13 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { if ((allCatch opt field.toDouble).isDefined || isInfOrNan(field)) { DoubleType } else if (options.prefersDate) { - tryParseDateTime(field) + tryParseDate(field) } else { tryParseTimestampNTZ(field) } } - private def tryParseDateTime(field: String): DataType = { + private def tryParseDate(field: String): DataType = { if ((allCatch opt dateFormatter.parse(field)).isDefined) { DateType } else { @@ -233,7 +231,12 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { * is compatible with both input data types. */ private def compatibleType(t1: DataType, t2: DataType): Option[DataType] = { - TypeCoercion.findTightestCommonType(t1, t2).orElse(findCompatibleTypeForCSV(t1, t2)) + (t1, t2) match { + // For fields with mixing dates and timestamps, relax it as string type + case (DateType, TimestampType) | (TimestampType, DateType) | + (DateType, TimestampNTZType) | (TimestampNTZType, DateType) => Some(StringType) + case _ => TypeCoercion.findTightestCommonType(t1, t2).orElse(findCompatibleTypeForCSV(t1, t2)) + } } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index 1162c2882dd7..866f01e7b4a0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -149,13 +149,10 @@ class CSVOptions( val locale: Locale = parameters.get("locale").map(Locale.forLanguageTag).getOrElse(Locale.US) /** - * Infer columns with all valid date entries as date type (otherwise inferred as timestamp type) - * if schema inference is enabled. When being used with user-provided schema, tries to parse - * timestamp values as dates if the values do not conform to the timestamp formatter before - * falling back to the backward compatible parsing - the parsed values will be cast to timestamp - * afterwards. + * Infer columns with all valid date entries as date type (otherwise inferred as string type) + * if schema inference is enabled. * - * Disabled by default for backwards compatibility and performance. + * Disabled by default for backwards compatibility. * * Not compatible with legacyTimeParserPolicy == LEGACY since legacy date parser will accept * extra trailing characters. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 9d855d1a93d6..b090aece8347 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -28,7 +28,6 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, OrderedFilters} import org.apache.spark.sql.catalyst.expressions.{Cast, EmptyRow, ExprUtils, GenericInternalRow, Literal} import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.catalyst.util.DateTimeUtils.{daysToMicros, TimeZoneUTC} import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._ import org.apache.spark.sql.errors.QueryExecutionErrors @@ -224,7 +223,7 @@ class UnivocityParser( case NonFatal(e) => // If fails to parse, then tries the way used in 2.0 and 1.x for backwards // compatibility if enabled. - if (!enableParsingFallbackForTimestampType) { + if (!enableParsingFallbackForDateType) { throw e } val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum)) @@ -238,29 +237,19 @@ class UnivocityParser( timestampFormatter.parse(datum) } catch { case NonFatal(e) => - // There may be date type entries in timestamp column due to schema inference - if (options.prefersDate) { - daysToMicros(dateFormatter.parse(datum), options.zoneId) - } else { - // If fails to parse, then tries the way used in 2.0 and 1.x for backwards - // compatibility if enabled. - if (!enableParsingFallbackForDateType) { - throw e - } - val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum)) - DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse(throw(e)) + // If fails to parse, then tries the way used in 2.0 and 1.x for backwards + // compatibility if enabled. + if (!enableParsingFallbackForTimestampType) { + throw e } + val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum)) + DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse(throw(e)) } } case _: TimestampNTZType => (d: String) => nullSafeDatum(d, name, nullable, options) { datum => - try { - timestampNTZFormatter.parseWithoutTimeZone(datum, false) - } catch { - case NonFatal(e) if options.prefersDate => - daysToMicros(dateFormatter.parse(datum), TimeZoneUTC.toZoneId) - } + timestampNTZFormatter.parseWithoutTimeZone(datum, false) } case _: StringType => (d: String) => diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala index 7066a5614ee9..bfb87bdf4297 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala @@ -235,9 +235,11 @@ class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper { assert(inferSchema.inferField(DateType, "2003/02/05") == TimestampNTZType) } - // inferField should upgrade a date field to timestamp if the typeSoFar is a timestamp - assert(inferSchema.inferField(TimestampNTZType, "2012_12_12") == TimestampNTZType) - assert(inferSchema.inferField(TimestampType, "2018_12_03") == TimestampType) + // inferField should infer a field as string type if it contains mixing dates and timestamps + assert(inferSchema.inferField(TimestampNTZType, "2012_12_12") == StringType) + assert(inferSchema.inferField(TimestampType, "2018_12_03") == StringType) + assert(inferSchema.inferField(DateType, "2012|12|12") == StringType) + assert(inferSchema.inferField(DateType, "2018/12/03") == StringType) // No errors when Date and Timestamp have the same format. Inference defaults to date options = new CSVOptions( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala index 42bc122dfdcb..ebf69082a3fb 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala @@ -372,26 +372,4 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper { } assert(err.getMessage.contains("Illegal pattern character: n")) } - - test("SPARK-39469: dates should be parsed correctly in timestamp column when prefersDate=true") { - def checkDate(dataType: DataType): Unit = { - val timestampsOptions = - new CSVOptions(Map("prefersDate" -> "true", "timestampFormat" -> "dd/MM/yyyy HH:mm", - "timestampNTZFormat" -> "dd-MM-yyyy HH:mm", "dateFormat" -> "dd_MM_yyyy"), - false, DateTimeUtils.getZoneId("-08:00").toString) - // Use CSVOption ZoneId="-08:00" (PST) to test that Dates in TimestampNTZ column are always - // converted to their equivalent UTC timestamp - val dateString = "08_09_2001" - val expected = dataType match { - case TimestampType => date(2001, 9, 8, 0, 0, 0, 0, ZoneOffset.of("-08:00")) - case TimestampNTZType => date(2001, 9, 8, 0, 0, 0, 0, ZoneOffset.UTC) - case DateType => days(2001, 9, 8) - } - val parser = new UnivocityParser(new StructType(), timestampsOptions) - assert(parser.makeConverter("d", dataType).apply(dateString) == expected) - } - checkDate(TimestampType) - checkDate(TimestampNTZType) - checkDate(DateType) - } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index f74f7a00c133..21f332fd04a6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -2848,18 +2848,15 @@ abstract class CSVSuite .load(testFile(dateInferSchemaFile)) val expectedSchema = StructType(List(StructField("date", DateType), - StructField("timestamp-date", TimestampType), - StructField("date-timestamp", TimestampType))) + StructField("timestamp-date", StringType), + StructField("date-timestamp", StringType))) assert(results.schema == expectedSchema) val expected = Seq( - Seq(Date.valueOf("2001-9-8"), Timestamp.valueOf("2014-10-27 18:30:0.0"), - Timestamp.valueOf("1765-03-28 00:00:0.0")), - Seq(Date.valueOf("1941-1-2"), Timestamp.valueOf("2000-09-14 01:01:0.0"), - Timestamp.valueOf("1423-11-12 23:41:0.0")), - Seq(Date.valueOf("0293-11-7"), Timestamp.valueOf("1995-06-25 00:00:00.0"), - Timestamp.valueOf("2016-01-28 20:00:00.0")) + Seq(Date.valueOf("2001-9-8"), "2014-10-27T18:30:00", "1765-03-28"), + Seq(Date.valueOf("1941-1-2"), "2000-09-14T01:01:00", "1423-11-12T23:41:00"), + Seq(Date.valueOf("0293-11-7"), "1995-06-25", "2016-01-28T20:00:00") ) assert(results.collect().toSeq.map(_.toSeq) == expected) } @@ -2894,9 +2891,9 @@ abstract class CSVSuite checkAnswer( output, Seq( - Row(Timestamp.valueOf("2020-02-01 12:34:56")), - Row(Timestamp.valueOf("2020-02-02 00:00:00")), - Row(null) + Row("2020-02-01 12:34:56"), + Row("2020-02-02"), + Row("invalid") ) ) } From f4fadf7e83a97652403400b6241a4657f4a32f15 Mon Sep 17 00:00:00 2001 From: xiaonanyang-db Date: Mon, 19 Sep 2022 12:48:37 -0700 Subject: [PATCH 02/26] [SPARK-40474] remove unused imports --- .../org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala index ebf69082a3fb..37605e14b926 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.csv import java.math.BigDecimal import java.text.{DecimalFormat, DecimalFormatSymbols} -import java.time.{ZoneOffset} import java.util.{Locale, TimeZone} import org.apache.commons.lang3.time.FastDateFormat From 813ac7495a6e3ce62badf983a5f645a956cbdc8a Mon Sep 17 00:00:00 2001 From: xiaonanyang-db Date: Mon, 19 Sep 2022 14:27:38 -0700 Subject: [PATCH 03/26] [SPARK-40474] Resolve test failures --- docs/sql-data-sources-csv.md | 2 +- .../sql/catalyst/csv/CSVInferSchemaSuite.scala | 17 ++++++++--------- .../execution/datasources/csv/CSVSuite.scala | 6 +++--- 3 files changed, 12 insertions(+), 13 deletions(-) diff --git a/docs/sql-data-sources-csv.md b/docs/sql-data-sources-csv.md index 98d31a59ac7a..4171be7596dc 100644 --- a/docs/sql-data-sources-csv.md +++ b/docs/sql-data-sources-csv.md @@ -111,7 +111,7 @@ Data source options of CSV can be set via: prefersDate false - During schema inference (inferSchema), attempts to infer string columns that contain dates or timestamps as Date if the values satisfy the dateFormat option and failed to be parsed by the respective formatter. With a user-provided schema, attempts to parse timestamp columns as dates using dateFormat if they fail to conform to timestampFormat, in this case the parsed values will be cast to timestamp type afterwards. + During schema inference (inferSchema), attempts to infer string columns that contain dates as Date if the values satisfy the dateFormat option or default date format. For columns that contain mixing dates and timestamps, infer them as StringType. read diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala index bfb87bdf4297..52d7d8cfaa5f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala @@ -111,10 +111,10 @@ class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper { Array(LongType)).sameElements(Array(DoubleType))) assert( inferSchema.mergeRowTypes(Array(DateType), - Array(TimestampNTZType)).sameElements(Array(TimestampNTZType))) + Array(TimestampNTZType)).sameElements(Array(StringType))) assert( inferSchema.mergeRowTypes(Array(DateType), - Array(TimestampType)).sameElements(Array(TimestampType))) + Array(TimestampType)).sameElements(Array(StringType))) } test("Null fields are handled properly when a nullValue is specified") { @@ -221,25 +221,24 @@ class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper { assert(inferSchema.inferField(NullType, "2018-12-03") == DateType) } - test("SPARK-39469: inferring date and timestamp types in a mixed column with prefersDate=true") { + test("SPARK-39469: inferring the schema of columns with mixing dates and timestamps properly") { var options = new CSVOptions( Map("dateFormat" -> "yyyy_MM_dd", "timestampFormat" -> "yyyy|MM|dd", "timestampNTZFormat" -> "yyyy/MM/dd", "prefersDate" -> "true"), columnPruning = false, defaultTimeZoneId = "UTC") var inferSchema = new CSVInferSchema(options) + assert(inferSchema.inferField(DateType, "2012_12_12") == DateType) - assert(inferSchema.inferField(DateType, "2003|01|01") == TimestampType) + + // inferField should infer a column as string type if it contains mixing dates and timestamps + assert(inferSchema.inferField(DateType, "2003|01|01") == StringType) // SQL configuration must be set to default to TimestampNTZ withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> "TIMESTAMP_NTZ") { - assert(inferSchema.inferField(DateType, "2003/02/05") == TimestampNTZType) + assert(inferSchema.inferField(DateType, "2003/02/05") == StringType) } - - // inferField should infer a field as string type if it contains mixing dates and timestamps assert(inferSchema.inferField(TimestampNTZType, "2012_12_12") == StringType) assert(inferSchema.inferField(TimestampType, "2018_12_03") == StringType) - assert(inferSchema.inferField(DateType, "2012|12|12") == StringType) - assert(inferSchema.inferField(DateType, "2018/12/03") == StringType) // No errors when Date and Timestamp have the same format. Inference defaults to date options = new CSVOptions( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 21f332fd04a6..a1933405f47d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -2891,9 +2891,9 @@ abstract class CSVSuite checkAnswer( output, Seq( - Row("2020-02-01 12:34:56"), - Row("2020-02-02"), - Row("invalid") + Row(Timestamp.valueOf("2020-02-01 12:34:56")), + Row(Timestamp.valueOf("2020-02-02 00:00:00")), + Row(null) ) ) } From 5c2dde85e44bcee986afc9a7144b9fbb6de0e8f2 Mon Sep 17 00:00:00 2001 From: xiaonanyang-db Date: Mon, 19 Sep 2022 22:41:52 -0700 Subject: [PATCH 04/26] [SPARK-40474] fix test failures --- .../apache/spark/sql/execution/datasources/csv/CSVSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index a1933405f47d..11cb526cb5f1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -2829,6 +2829,7 @@ abstract class CSVSuite val options2 = Map( "header" -> "true", "inferSchema" -> "true", + "timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", "prefersDate" -> "true") // Error should be thrown when attempting to prefersDate with Legacy parser From 0d2be1dced412b8c782f9f47598c039ecad5484a Mon Sep 17 00:00:00 2001 From: xiaonanyang-db Date: Mon, 19 Sep 2022 23:15:58 -0700 Subject: [PATCH 05/26] [SPARK-40474] handle edge cases --- .../apache/spark/sql/catalyst/csv/CSVOptions.scala | 12 +++++++++++- .../sql/execution/datasources/csv/CSVSuite.scala | 1 - 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index 866f01e7b4a0..db933ef19476 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -179,6 +179,10 @@ class CSVOptions( if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) { Some(parameters.getOrElse("timestampFormat", s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX")) + } else if (prefersDate) { + // If prefersDate, use Iso8601TimestampFormatter (with strict timestamp parsing) to + // avoid parsing dates in timestamp columns as timestamp type + Some(parameters.getOrElse("timestampFormat", TimestampFormatter.defaultPattern())) } else { parameters.get("timestampFormat") } @@ -189,7 +193,13 @@ class CSVOptions( s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]" }) - val timestampNTZFormatInRead: Option[String] = parameters.get("timestampNTZFormat") + val timestampNTZFormatInRead: Option[String] = if (prefersDate) { + // If prefersDate, use Iso8601TimestampFormatter (with strict timestamp parsing) to + // avoid parsing dates in timestamp columns as timestamp type + Some(parameters.getOrElse("timestampFormat", TimestampFormatter.defaultPattern())) + } else { + parameters.get("timestampNTZFormat") + } val timestampNTZFormatInWrite: String = parameters.getOrElse("timestampNTZFormat", s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS]") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 11cb526cb5f1..a1933405f47d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -2829,7 +2829,6 @@ abstract class CSVSuite val options2 = Map( "header" -> "true", "inferSchema" -> "true", - "timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", "prefersDate" -> "true") // Error should be thrown when attempting to prefersDate with Legacy parser From df569460e262213c82e4b2a0b7019d213c544c2a Mon Sep 17 00:00:00 2001 From: xiaonanyang-db Date: Mon, 19 Sep 2022 23:31:59 -0700 Subject: [PATCH 06/26] [SPARK-40474] handle edge cases --- .../apache/spark/sql/catalyst/csv/CSVOptions.scala | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index db933ef19476..c8d3eeff975b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -179,12 +179,10 @@ class CSVOptions( if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) { Some(parameters.getOrElse("timestampFormat", s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX")) - } else if (prefersDate) { - // If prefersDate, use Iso8601TimestampFormatter (with strict timestamp parsing) to + } else { + // Use Iso8601TimestampFormatter (with strict timestamp parsing) to // avoid parsing dates in timestamp columns as timestamp type Some(parameters.getOrElse("timestampFormat", TimestampFormatter.defaultPattern())) - } else { - parameters.get("timestampFormat") } val timestampFormatInWrite: String = parameters.getOrElse("timestampFormat", if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) { @@ -193,12 +191,10 @@ class CSVOptions( s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]" }) - val timestampNTZFormatInRead: Option[String] = if (prefersDate) { - // If prefersDate, use Iso8601TimestampFormatter (with strict timestamp parsing) to + val timestampNTZFormatInRead: Option[String] = { + // Use Iso8601TimestampFormatter (with strict timestamp parsing) to // avoid parsing dates in timestamp columns as timestamp type Some(parameters.getOrElse("timestampFormat", TimestampFormatter.defaultPattern())) - } else { - parameters.get("timestampNTZFormat") } val timestampNTZFormatInWrite: String = parameters.getOrElse("timestampNTZFormat", s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS]") From 4bc480d5372f7863a71d16f48f1e9f6cdd1be29d Mon Sep 17 00:00:00 2001 From: xiaonanyang-db Date: Tue, 20 Sep 2022 10:01:14 -0700 Subject: [PATCH 07/26] SPARK-40474 revert part of CSVOptions changes --- .../org/apache/spark/sql/catalyst/csv/CSVOptions.scala | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index c8d3eeff975b..344d36396e70 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -180,9 +180,7 @@ class CSVOptions( Some(parameters.getOrElse("timestampFormat", s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX")) } else { - // Use Iso8601TimestampFormatter (with strict timestamp parsing) to - // avoid parsing dates in timestamp columns as timestamp type - Some(parameters.getOrElse("timestampFormat", TimestampFormatter.defaultPattern())) + parameters.get("timestampFormat") } val timestampFormatInWrite: String = parameters.getOrElse("timestampFormat", if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) { @@ -191,11 +189,7 @@ class CSVOptions( s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]" }) - val timestampNTZFormatInRead: Option[String] = { - // Use Iso8601TimestampFormatter (with strict timestamp parsing) to - // avoid parsing dates in timestamp columns as timestamp type - Some(parameters.getOrElse("timestampFormat", TimestampFormatter.defaultPattern())) - } + val timestampNTZFormatInRead: Option[String] = parameters.get("timestampFormat") val timestampNTZFormatInWrite: String = parameters.getOrElse("timestampNTZFormat", s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS]") From f6ed29f799bc4441354f359088074f8355f0ba6b Mon Sep 17 00:00:00 2001 From: xiaonanyang-db Date: Tue, 20 Sep 2022 10:03:30 -0700 Subject: [PATCH 08/26] SPARK-40474 revert part of CSVOptions changes --- .../scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index 344d36396e70..866f01e7b4a0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -189,7 +189,7 @@ class CSVOptions( s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]" }) - val timestampNTZFormatInRead: Option[String] = parameters.get("timestampFormat") + val timestampNTZFormatInRead: Option[String] = parameters.get("timestampNTZFormat") val timestampNTZFormatInWrite: String = parameters.getOrElse("timestampNTZFormat", s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS]") From 93b6422d0b3bbfd3021485b12cc87b43a105b023 Mon Sep 17 00:00:00 2001 From: xiaonanyang-db Date: Tue, 20 Sep 2022 15:02:11 -0700 Subject: [PATCH 09/26] [SPARK-40474] fix test failures --- .../apache/spark/sql/execution/datasources/csv/CSVSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index a1933405f47d..a88bd9a206d7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -2848,7 +2848,7 @@ abstract class CSVSuite .load(testFile(dateInferSchemaFile)) val expectedSchema = StructType(List(StructField("date", DateType), - StructField("timestamp-date", StringType), + StructField("timestamp-date", TimestampType), StructField("date-timestamp", StringType))) assert(results.schema == expectedSchema) From 6942f2b5c9933c2a17b9cc48bd62faa23baeaafe Mon Sep 17 00:00:00 2001 From: xiaonanyang-db Date: Tue, 20 Sep 2022 18:11:03 -0700 Subject: [PATCH 10/26] [SPARK-40474] handle columns with mixing dates and timestamps inference properly when lenient timestamp formatter is used --- .../sql/catalyst/csv/CSVInferSchema.scala | 36 +++++++- .../execution/datasources/csv/CSVSuite.scala | 88 ++++++++++++------- 2 files changed, 89 insertions(+), 35 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index df3d5cc58318..98839fbbd99c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -59,6 +59,11 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { ExprUtils.getDecimalParser(options.locale) } + // Date formats that could be parsed in DefaultTimestampFormatter + // Reference: DateTimeUtils.parseTimestampString + private val LENIENT_TS_FORMATTER_SUPPORTED_DATE_FORMATS = Set( + "yyyy-MM-dd", "yyyy-M-d", "yyyy-M-dd", "yyyy-MM-d", "yyyy-MM", "yyyy-M", "yyyy") + /** * Similar to the JSON schema inference * 1. Infer type of each row @@ -131,7 +136,36 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { case other: DataType => throw QueryExecutionErrors.dataTypeUnexpectedError(other) } - compatibleType(typeSoFar, typeElemInfer).getOrElse(StringType) + + (typeSoFar, typeElemInfer) match { + case (DateType, TimestampType) | (DateType, TimestampNTZType) => + // For a column containing mixing dates and timestamps, infer it as timestamp type + // if its dates can be inferred as timestamp type + val dateFormat = options.dateFormatInRead.getOrElse(DateFormatter.defaultPattern) + if (canParseDateAsTimestamp(dateFormat, typeElemInfer)) { + typeElemInfer + } else { + StringType + } + case _ => compatibleType(typeSoFar, typeElemInfer).getOrElse(StringType) + } + } + } + + /** + * Return if strings of given date format can be parsed as timestamps + * 1. If user provides timestamp format, we will parse strings as timestamps using + * Iso8601TimestampFormatter (with strict timestamp parsing). Any date string can not be parsed + * as timestamp type in this case + * 2. Otherwise, we will use DefaultTimestampFormatter to parse strings as timestamps, which + * is more lenient and can parse strings of some date formats as timestamps.] + */ + private def canParseDateAsTimestamp(dateFormat: String, tsType: DataType): Boolean = { + if ((tsType.isInstanceOf[TimestampType] && options.timestampFormatInRead.isEmpty) || + (tsType.isInstanceOf[TimestampNTZType] && options.timestampNTZFormatInRead.isEmpty)) { + LENIENT_TS_FORMATTER_SUPPORTED_DATE_FORMATS.contains(dateFormat) + } else { + false } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index a88bd9a206d7..afca7bde0b47 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -2819,48 +2819,68 @@ abstract class CSVSuite } } - test("SPARK-39469: Infer schema for date type") { - val options1 = Map( - "header" -> "true", - "inferSchema" -> "true", - "timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", - "dateFormat" -> "yyyy-MM-dd", - "prefersDate" -> "true") - val options2 = Map( - "header" -> "true", - "inferSchema" -> "true", - "prefersDate" -> "true") - - // Error should be thrown when attempting to prefersDate with Legacy parser - if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) { - checkError( - exception = intercept[SparkIllegalArgumentException] { - spark.read.format("csv").options(options1).load(testFile(dateInferSchemaFile)) - }, - errorClass = "CANNOT_INFER_DATE") - } else { - // 1. Specify date format and timestamp format - // 2. Date inference should work with default date format when dateFormat is not provided - Seq(options1, options2).foreach {options => + test("SPARK-39469: Infer schema for columns with only dates " + + "and columns with mixing date and timestamps correctly") { + def checkCSVReadDatetime( + options: Map[String, String], + expectedSchema: StructType, + expectedData: Seq[Seq[Any]]): Unit = { + + // Error should be thrown when attempting to prefersDate with Legacy parser + if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) { + checkError( + exception = intercept[SparkIllegalArgumentException] { + spark.read.format("csv").options(options).load(testFile(dateInferSchemaFile)) + }, + errorClass = "CANNOT_INFER_DATE") + } else { val results = spark.read .format("csv") .options(options) .load(testFile(dateInferSchemaFile)) - val expectedSchema = StructType(List(StructField("date", DateType), - StructField("timestamp-date", TimestampType), - StructField("date-timestamp", StringType))) assert(results.schema == expectedSchema) - - val expected = - Seq( - Seq(Date.valueOf("2001-9-8"), "2014-10-27T18:30:00", "1765-03-28"), - Seq(Date.valueOf("1941-1-2"), "2000-09-14T01:01:00", "1423-11-12T23:41:00"), - Seq(Date.valueOf("0293-11-7"), "1995-06-25", "2016-01-28T20:00:00") - ) - assert(results.collect().toSeq.map(_.toSeq) == expected) + assert(results.collect().toSeq.map(_.toSeq) == expectedData) } } + + // When timestamp format is given, infer columns with mixing dates and timestamps as string type + var options = Map( + "header" -> "true", + "inferSchema" -> "true", + "timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", + "dateFormat" -> "yyyy-MM-dd", + "prefersDate" -> "true") + var expectedSchema = StructType(List(StructField("date", DateType), + StructField("timestamp-date", StringType), + StructField("date-timestamp", StringType))) + var expectedData = + Seq( + Seq(Date.valueOf("2001-9-8"), "2014-10-27T18:30:00", "1765-03-28"), + Seq(Date.valueOf("1941-1-2"), "2000-09-14T01:01:00", "1423-11-12T23:41:00"), + Seq(Date.valueOf("0293-11-7"), "1995-06-25", "2016-01-28T20:00:00") + ) + checkCSVReadDatetime(options, expectedSchema, expectedData) + + // When timestamp format is not given, infer columns with mixing dates and timestamps as + // timestamp type + options = Map( + "header" -> "true", + "inferSchema" -> "true", + "prefersDate" -> "true") + expectedSchema = StructType(List(StructField("date", DateType), + StructField("timestamp-date", TimestampType), + StructField("date-timestamp", TimestampType))) + expectedData = + Seq( + Seq(Date.valueOf("2001-9-8"), Timestamp.valueOf("2014-10-27 18:30:0.0"), + Timestamp.valueOf("1765-03-28 00:00:0.0")), + Seq(Date.valueOf("1941-1-2"), Timestamp.valueOf("2000-09-14 01:01:0.0"), + Timestamp.valueOf("1423-11-12 23:41:0.0")), + Seq(Date.valueOf("0293-11-7"), Timestamp.valueOf("1995-06-25 00:00:00.0"), + Timestamp.valueOf("2016-01-28 20:00:00.0")) + ) + checkCSVReadDatetime(options, expectedSchema, expectedData) } test("SPARK-39904: Parse incorrect timestamp values with prefersDate=true") { From b4a6f1dc72304b3b1c47a329e63bbabec663c7f3 Mon Sep 17 00:00:00 2001 From: xiaonanyang-db Date: Tue, 20 Sep 2022 20:52:01 -0700 Subject: [PATCH 11/26] [SPARK-40474] remove unnecessary changes --- .../org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index 98839fbbd99c..23c65cafd40e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -265,12 +265,7 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { * is compatible with both input data types. */ private def compatibleType(t1: DataType, t2: DataType): Option[DataType] = { - (t1, t2) match { - // For fields with mixing dates and timestamps, relax it as string type - case (DateType, TimestampType) | (TimestampType, DateType) | - (DateType, TimestampNTZType) | (TimestampNTZType, DateType) => Some(StringType) - case _ => TypeCoercion.findTightestCommonType(t1, t2).orElse(findCompatibleTypeForCSV(t1, t2)) - } + TypeCoercion.findTightestCommonType(t1, t2).orElse(findCompatibleTypeForCSV(t1, t2)) } /** From 15026186a60733083c6fac72273d1966dd6a783c Mon Sep 17 00:00:00 2001 From: xiaonanyang-db Date: Tue, 20 Sep 2022 21:33:48 -0700 Subject: [PATCH 12/26] [SPARK-40474] small changes --- .../sql/catalyst/csv/CSVInferSchema.scala | 64 ++++++++++--------- 1 file changed, 34 insertions(+), 30 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index 23c65cafd40e..4fe30f586ff6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -137,35 +137,7 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { throw QueryExecutionErrors.dataTypeUnexpectedError(other) } - (typeSoFar, typeElemInfer) match { - case (DateType, TimestampType) | (DateType, TimestampNTZType) => - // For a column containing mixing dates and timestamps, infer it as timestamp type - // if its dates can be inferred as timestamp type - val dateFormat = options.dateFormatInRead.getOrElse(DateFormatter.defaultPattern) - if (canParseDateAsTimestamp(dateFormat, typeElemInfer)) { - typeElemInfer - } else { - StringType - } - case _ => compatibleType(typeSoFar, typeElemInfer).getOrElse(StringType) - } - } - } - - /** - * Return if strings of given date format can be parsed as timestamps - * 1. If user provides timestamp format, we will parse strings as timestamps using - * Iso8601TimestampFormatter (with strict timestamp parsing). Any date string can not be parsed - * as timestamp type in this case - * 2. Otherwise, we will use DefaultTimestampFormatter to parse strings as timestamps, which - * is more lenient and can parse strings of some date formats as timestamps.] - */ - private def canParseDateAsTimestamp(dateFormat: String, tsType: DataType): Boolean = { - if ((tsType.isInstanceOf[TimestampType] && options.timestampFormatInRead.isEmpty) || - (tsType.isInstanceOf[TimestampNTZType] && options.timestampNTZFormatInRead.isEmpty)) { - LENIENT_TS_FORMATTER_SUPPORTED_DATE_FORMATS.contains(dateFormat) - } else { - false + compatibleType(typeSoFar, typeElemInfer).getOrElse(StringType) } } @@ -265,7 +237,39 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { * is compatible with both input data types. */ private def compatibleType(t1: DataType, t2: DataType): Option[DataType] = { - TypeCoercion.findTightestCommonType(t1, t2).orElse(findCompatibleTypeForCSV(t1, t2)) + (t1, t2) match { + case (DateType, TimestampType) | (DateType, TimestampNTZType) | + (TimestampNTZType, DateType) | (TimestampType, DateType) => + // For a column containing mixing dates and timestamps + // infer it as timestamp type if its dates can be inferred as timestamp type + // otherwise infer it as StringType + val dateFormat = options.dateFormatInRead.getOrElse(DateFormatter.defaultPattern) + t1 match { + case DateType if canParseDateAsTimestamp(dateFormat, t2) => + Some(t2) + case TimestampType | TimestampNTZType if canParseDateAsTimestamp(dateFormat, t1) => + Some(t1) + case _ => Some(StringType) + } + case _ => TypeCoercion.findTightestCommonType(t1, t2).orElse(findCompatibleTypeForCSV(t1, t2)) + } + } + + /** + * Return if strings of given date format can be parsed as timestamps + * 1. If user provides timestamp format, we will parse strings as timestamps using + * Iso8601TimestampFormatter (with strict timestamp parsing). Any date string can not be parsed + * as timestamp type in this case + * 2. Otherwise, we will use DefaultTimestampFormatter to parse strings as timestamps, which + * is more lenient and can parse strings of some date formats as timestamps.] + */ + private def canParseDateAsTimestamp(dateFormat: String, tsType: DataType): Boolean = { + if ((tsType.isInstanceOf[TimestampType] && options.timestampFormatInRead.isEmpty) || + (tsType.isInstanceOf[TimestampNTZType] && options.timestampNTZFormatInRead.isEmpty)) { + LENIENT_TS_FORMATTER_SUPPORTED_DATE_FORMATS.contains(dateFormat) + } else { + false + } } /** From 1f570987e575f0ad48cc0e02a61f169f3e1da095 Mon Sep 17 00:00:00 2001 From: xiaonanyang-db Date: Tue, 20 Sep 2022 22:35:00 -0700 Subject: [PATCH 13/26] [SPARK-40474] remove new line added by mistake --- .../scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index 4fe30f586ff6..ec35008614bc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -136,7 +136,6 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { case other: DataType => throw QueryExecutionErrors.dataTypeUnexpectedError(other) } - compatibleType(typeSoFar, typeElemInfer).getOrElse(StringType) } } From e9150ec7e110d9d6a7fe215d8f92d3424affb58e Mon Sep 17 00:00:00 2001 From: xiaonanyang-db Date: Tue, 20 Sep 2022 23:59:04 -0700 Subject: [PATCH 14/26] [SPARK-40474] address comments --- docs/sql-data-sources-csv.md | 2 +- .../apache/spark/sql/catalyst/csv/CSVInferSchema.scala | 8 +++++--- .../spark/sql/execution/datasources/csv/CSVSuite.scala | 8 ++++---- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/docs/sql-data-sources-csv.md b/docs/sql-data-sources-csv.md index 4171be7596dc..a43e6d441b7d 100644 --- a/docs/sql-data-sources-csv.md +++ b/docs/sql-data-sources-csv.md @@ -111,7 +111,7 @@ Data source options of CSV can be set via: prefersDate false - During schema inference (inferSchema), attempts to infer string columns that contain dates as Date if the values satisfy the dateFormat option or default date format. For columns that contain mixing dates and timestamps, infer them as StringType. + During schema inference (inferSchema), attempts to infer string columns that contain dates as Date if the values satisfy the dateFormat option or default date format. For columns that contain a mixture of dates and timestamps, try inferring them as TimestampType if timestamp format not specified, otherwise infer them as StringType. read diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index ec35008614bc..0a3696d1d52e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -61,6 +61,8 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { // Date formats that could be parsed in DefaultTimestampFormatter // Reference: DateTimeUtils.parseTimestampString + // Used to determine inferring a column with mixture of dates and timestamps as TimestampType or + // StringType when no timestamp format is specified (the lenient timestamp formatter will be used) private val LENIENT_TS_FORMATTER_SUPPORTED_DATE_FORMATS = Set( "yyyy-MM-dd", "yyyy-M-d", "yyyy-M-dd", "yyyy-MM-d", "yyyy-MM", "yyyy-M", "yyyy") @@ -239,7 +241,7 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { (t1, t2) match { case (DateType, TimestampType) | (DateType, TimestampNTZType) | (TimestampNTZType, DateType) | (TimestampType, DateType) => - // For a column containing mixing dates and timestamps + // For a column containing a mixture of dates and timestamps // infer it as timestamp type if its dates can be inferred as timestamp type // otherwise infer it as StringType val dateFormat = options.dateFormatInRead.getOrElse(DateFormatter.defaultPattern) @@ -255,12 +257,12 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { } /** - * Return if strings of given date format can be parsed as timestamps + * Return true if strings of given date format can be parsed as timestamps * 1. If user provides timestamp format, we will parse strings as timestamps using * Iso8601TimestampFormatter (with strict timestamp parsing). Any date string can not be parsed * as timestamp type in this case * 2. Otherwise, we will use DefaultTimestampFormatter to parse strings as timestamps, which - * is more lenient and can parse strings of some date formats as timestamps.] + * is more lenient and can parse strings of some date formats as timestamps. */ private def canParseDateAsTimestamp(dateFormat: String, tsType: DataType): Boolean = { if ((tsType.isInstanceOf[TimestampType] && options.timestampFormatInRead.isEmpty) || diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index b31233f27352..55f66a587486 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -2822,11 +2822,11 @@ abstract class CSVSuite test("SPARK-39469: Infer schema for columns with only dates " + "and columns with mixing date and timestamps correctly") { def checkCSVReadDatetime( - options: Map[String, String], - expectedSchema: StructType, - expectedData: Seq[Seq[Any]]): Unit = { + options: Map[String, String], + expectedSchema: StructType, + expectedData: Seq[Seq[Any]]): Unit = { - // Error should be thrown when attempting to prefersDate with Legacy parser + // Error should be thrown when attempting to use prefersDate with Legacy parser if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) { checkError( exception = intercept[SparkIllegalArgumentException] { From a07e4329275c63e94ba638c6a53fe99f70439810 Mon Sep 17 00:00:00 2001 From: xiaonanyang-db Date: Wed, 21 Sep 2022 00:02:38 -0700 Subject: [PATCH 15/26] [SPARK-40474] small changes --- docs/sql-data-sources-csv.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sql-data-sources-csv.md b/docs/sql-data-sources-csv.md index a43e6d441b7d..cf1c2f4e1642 100644 --- a/docs/sql-data-sources-csv.md +++ b/docs/sql-data-sources-csv.md @@ -111,7 +111,7 @@ Data source options of CSV can be set via: prefersDate false - During schema inference (inferSchema), attempts to infer string columns that contain dates as Date if the values satisfy the dateFormat option or default date format. For columns that contain a mixture of dates and timestamps, try inferring them as TimestampType if timestamp format not specified, otherwise infer them as StringType. + During schema inference (inferSchema), attempts to infer string columns that contain dates as Date if the values satisfy the dateFormat option or default date format. For columns that contain a mixture of dates and timestamps, try inferring them as TimestampType if timestamp format not specified, otherwise infer them as StringType. read From 255aea35688e9413082a2e13f714a97bde32aad4 Mon Sep 17 00:00:00 2001 From: xiaonanyang-db Date: Wed, 21 Sep 2022 08:50:49 -0700 Subject: [PATCH 16/26] [SPARK-40474] fix test failures --- .../catalyst/csv/CSVInferSchemaSuite.scala | 20 +++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala index 52d7d8cfaa5f..707ab87792f6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala @@ -97,8 +97,8 @@ class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper { } test("Type arrays are merged to highest common type") { - val options = new CSVOptions(Map.empty[String, String], false, "UTC") - val inferSchema = new CSVInferSchema(options) + var options = new CSVOptions(Map.empty[String, String], false, "UTC") + var inferSchema = new CSVInferSchema(options) assert( inferSchema.mergeRowTypes(Array(StringType), @@ -109,6 +109,22 @@ class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper { assert( inferSchema.mergeRowTypes(Array(DoubleType), Array(LongType)).sameElements(Array(DoubleType))) + + // Can merge DateType and TimestampType into TimestampType when no timestamp format specified + assert( + inferSchema.mergeRowTypes(Array(DateType), + Array(TimestampNTZType)).sameElements(Array(TimestampNTZType))) + assert( + inferSchema.mergeRowTypes(Array(DateType), + Array(TimestampType)).sameElements(Array(TimestampType))) + + // Merge DateType and TimestampType into StringType when there are timestamp formats specified + options = new CSVOptions( + Map("timestampFormat" -> "yyyy-MM-dd HH:mm:ss", + "timestampNTZFormat" -> "yyyy/MM/dd HH:mm:ss"), + false, + "UTC") + inferSchema = new CSVInferSchema(options) assert( inferSchema.mergeRowTypes(Array(DateType), Array(TimestampNTZType)).sameElements(Array(StringType))) From 533c4874f8f218f727ce03e5720745172e7f3a86 Mon Sep 17 00:00:00 2001 From: xiaonanyang-db Date: Wed, 21 Sep 2022 20:29:23 -0700 Subject: [PATCH 17/26] [SPARK-40474] address review comments --- .../org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala | 7 ++++--- .../org/apache/spark/sql/catalyst/csv/CSVOptions.scala | 6 +++--- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index 0a3696d1d52e..bdfa4ac3f0f8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -241,9 +241,10 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { (t1, t2) match { case (DateType, TimestampType) | (DateType, TimestampNTZType) | (TimestampNTZType, DateType) | (TimestampType, DateType) => - // For a column containing a mixture of dates and timestamps - // infer it as timestamp type if its dates can be inferred as timestamp type - // otherwise infer it as StringType + // For a column containing a mixture of dates and timestamps, infer it as timestamp type + // if its dates can be inferred as timestamp type, otherwise infer it as StringType. + // This only happens when the timestamp pattern is not specified, as the default timestamp + // parser is very lenient and can parse date string as well. val dateFormat = options.dateFormatInRead.getOrElse(DateFormatter.defaultPattern) t1 match { case DateType if canParseDateAsTimestamp(dateFormat, t2) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index 866f01e7b4a0..6edb1b43a73a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -149,8 +149,8 @@ class CSVOptions( val locale: Locale = parameters.get("locale").map(Locale.forLanguageTag).getOrElse(Locale.US) /** - * Infer columns with all valid date entries as date type (otherwise inferred as string type) - * if schema inference is enabled. + * Infer columns with all valid date entries as date type (otherwise inferred as string or + * timestamp type) if schema inference is enabled. * * Disabled by default for backwards compatibility. * @@ -158,7 +158,7 @@ class CSVOptions( * extra trailing characters. */ val prefersDate = { - val inferDateFlag = getBool("prefersDate") + val inferDateFlag = getBool("prefersDate", true) if (inferDateFlag && SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) { throw QueryExecutionErrors.inferDateWithLegacyTimeParserError() } From be4c86f070037c46d9781bb1ef4fa0e3fff0a931 Mon Sep 17 00:00:00 2001 From: xiaonanyang-db Date: Wed, 21 Sep 2022 22:27:37 -0700 Subject: [PATCH 18/26] [SPARK-40474] fix test failures --- sql/core/src/test/resources/sql-tests/inputs/date.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/resources/sql-tests/inputs/date.sql b/sql/core/src/test/resources/sql-tests/inputs/date.sql index ab57c7c754c6..1adb15d3657e 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/date.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/date.sql @@ -139,7 +139,7 @@ select date '2012-01-01' - interval '2-2' year to month, -- Unsupported narrow text style select to_date('26/October/2015', 'dd/MMMMM/yyyy'); select from_json('{"d":"26/October/2015"}', 'd Date', map('dateFormat', 'dd/MMMMM/yyyy')); -select from_csv('26/October/2015', 'd Date', map('dateFormat', 'dd/MMMMM/yyyy')); +select from_csv('26/October/2015', 'd Date', map('dateFormat', 'dd/MMMMM/yyyy', 'prefersDate', 'false')); -- Add a number of units to a timestamp or a date select dateadd(MICROSECOND, 1001, timestamp'2022-02-25 01:02:03.123'); From c7225b1b7c3e1ebad69415688cc898b291668039 Mon Sep 17 00:00:00 2001 From: xiaonanyang-db Date: Wed, 21 Sep 2022 22:38:41 -0700 Subject: [PATCH 19/26] [SPARK-40474] update doc --- docs/sql-data-sources-csv.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sql-data-sources-csv.md b/docs/sql-data-sources-csv.md index cf1c2f4e1642..42b117bea464 100644 --- a/docs/sql-data-sources-csv.md +++ b/docs/sql-data-sources-csv.md @@ -110,7 +110,7 @@ Data source options of CSV can be set via: prefersDate - false + true During schema inference (inferSchema), attempts to infer string columns that contain dates as Date if the values satisfy the dateFormat option or default date format. For columns that contain a mixture of dates and timestamps, try inferring them as TimestampType if timestamp format not specified, otherwise infer them as StringType. read From 9e87d6e498aa7d15490ae02bf8c8650754baca00 Mon Sep 17 00:00:00 2001 From: xiaonanyang-db Date: Wed, 21 Sep 2022 23:13:23 -0700 Subject: [PATCH 20/26] [SPARK-40474] disable prefersDate when leagcyTimeParser is enabled --- .../org/apache/spark/sql/catalyst/csv/CSVOptions.scala | 10 +++++----- sql/core/src/test/resources/sql-tests/inputs/date.sql | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index 6edb1b43a73a..afa063239b1f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -152,17 +152,17 @@ class CSVOptions( * Infer columns with all valid date entries as date type (otherwise inferred as string or * timestamp type) if schema inference is enabled. * - * Disabled by default for backwards compatibility. + * Enabled by default. * * Not compatible with legacyTimeParserPolicy == LEGACY since legacy date parser will accept * extra trailing characters. */ val prefersDate = { - val inferDateFlag = getBool("prefersDate", true) - if (inferDateFlag && SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) { - throw QueryExecutionErrors.inferDateWithLegacyTimeParserError() + if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) { + false + } else { + getBool("prefersDate", true) } - inferDateFlag } // Provide a default value for dateFormatInRead when prefersDate. This ensures that the diff --git a/sql/core/src/test/resources/sql-tests/inputs/date.sql b/sql/core/src/test/resources/sql-tests/inputs/date.sql index 1adb15d3657e..ab57c7c754c6 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/date.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/date.sql @@ -139,7 +139,7 @@ select date '2012-01-01' - interval '2-2' year to month, -- Unsupported narrow text style select to_date('26/October/2015', 'dd/MMMMM/yyyy'); select from_json('{"d":"26/October/2015"}', 'd Date', map('dateFormat', 'dd/MMMMM/yyyy')); -select from_csv('26/October/2015', 'd Date', map('dateFormat', 'dd/MMMMM/yyyy', 'prefersDate', 'false')); +select from_csv('26/October/2015', 'd Date', map('dateFormat', 'dd/MMMMM/yyyy')); -- Add a number of units to a timestamp or a date select dateadd(MICROSECOND, 1001, timestamp'2022-02-25 01:02:03.123'); From af66b83e54980f08ffb943c71dda2ba6715c9eea Mon Sep 17 00:00:00 2001 From: xiaonanyang-db Date: Thu, 22 Sep 2022 00:03:25 -0700 Subject: [PATCH 21/26] [SPARK-40474] fix test failures --- .../org/apache/spark/sql/catalyst/csv/CSVOptions.scala | 7 ++++--- .../apache/spark/sql/catalyst/csv/UnivocityParser.scala | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index afa063239b1f..bcff750425af 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -155,7 +155,7 @@ class CSVOptions( * Enabled by default. * * Not compatible with legacyTimeParserPolicy == LEGACY since legacy date parser will accept - * extra trailing characters. + * extra trailing characters. Thus, disabled when legacyTimeParserPolicy == LEGACY */ val prefersDate = { if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) { @@ -165,13 +165,14 @@ class CSVOptions( } } + val dateFormatParamOpt: Option[String] = parameters.get("dateFormat") // Provide a default value for dateFormatInRead when prefersDate. This ensures that the // Iso8601DateFormatter (with strict date parsing) is used for date inference val dateFormatInRead: Option[String] = if (prefersDate) { - Option(parameters.getOrElse("dateFormat", DateFormatter.defaultPattern)) + Option(dateFormatParamOpt.getOrElse(DateFormatter.defaultPattern)) } else { - parameters.get("dateFormat") + dateFormatParamOpt } val dateFormatInWrite: String = parameters.getOrElse("dateFormat", DateFormatter.defaultPattern) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index b090aece8347..39808ec85022 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -134,7 +134,7 @@ class UnivocityParser( .orElse(SQLConf.get.csvEnableDateTimeParsingFallback) .getOrElse { SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY || - options.dateFormatInRead.isEmpty + options.dateFormatParamOpt.isEmpty } // Retrieve the raw record string. From 812fa656691921bb4021e962e04f11ffcce6fe46 Mon Sep 17 00:00:00 2001 From: xiaonanyang-db Date: Thu, 22 Sep 2022 00:42:56 -0700 Subject: [PATCH 22/26] [SPARK-40474] fix test failures --- .../spark/sql/catalyst/csv/CSVOptions.scala | 5 +-- .../sql/catalyst/csv/UnivocityParser.scala | 5 +-- .../execution/datasources/csv/CSVSuite.scala | 42 ++++++++----------- 3 files changed, 20 insertions(+), 32 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index bcff750425af..242b0439e7bb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -165,14 +165,13 @@ class CSVOptions( } } - val dateFormatParamOpt: Option[String] = parameters.get("dateFormat") // Provide a default value for dateFormatInRead when prefersDate. This ensures that the // Iso8601DateFormatter (with strict date parsing) is used for date inference val dateFormatInRead: Option[String] = if (prefersDate) { - Option(dateFormatParamOpt.getOrElse(DateFormatter.defaultPattern)) + Option(parameters.getOrElse("dateFormat", DateFormatter.defaultPattern)) } else { - dateFormatParamOpt + parameters.get("dateFormat") } val dateFormatInWrite: String = parameters.getOrElse("dateFormat", DateFormatter.defaultPattern) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 39808ec85022..4196bb8815f1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -132,10 +132,7 @@ class UnivocityParser( private val enableParsingFallbackForDateType = options.enableDateTimeParsingFallback .orElse(SQLConf.get.csvEnableDateTimeParsingFallback) - .getOrElse { - SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY || - options.dateFormatParamOpt.isEmpty - } + .getOrElse(!options.prefersDate) // Retrieve the raw record string. private def getCurrentInput: UTF8String = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 55f66a587486..1a003a68fa68 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -2612,7 +2612,8 @@ abstract class CSVSuite test("SPARK-30960: parse date/timestamp string with legacy format") { val ds = Seq("2020-1-12 3:23:34.12, 2020-1-12 T").toDS() - val csv = spark.read.option("header", false).schema("t timestamp, d date").csv(ds) + val csv = spark.read.option("header", false).option("prefersDate", false) + .schema("t timestamp, d date").csv(ds) checkAnswer(csv, Row(Timestamp.valueOf("2020-1-12 3:23:34.12"), Date.valueOf("2020-1-12"))) } @@ -2777,10 +2778,10 @@ abstract class CSVSuite withTempPath { path => Seq( """d,ts_ltz,ts_ntz""", - """2021,2021,2021""", - """2021-01,2021-01 ,2021-01""", - """ 2021-2-1,2021-3-02,2021-10-1""", - """2021-8-18 00:00:00,2021-8-18 21:44:30Z,2021-8-18T21:44:30.123""" + """2021-01-01,2021,2021""", + """2021-01-02,2021-01 ,2021-01""", + """2021-02-01,2021-3-02,2021-10-1""", + """2021-08-18,2021-8-18 21:44:30Z,2021-8-18T21:44:30.123""" ).toDF().repartition(1).write.text(path.getCanonicalPath) val readback = spark.read.schema("d date, ts_ltz timestamp_ltz, ts_ntz timestamp_ntz") .option("header", true) @@ -2790,7 +2791,7 @@ abstract class CSVSuite Seq( Row(LocalDate.of(2021, 1, 1), Instant.parse("2021-01-01T00:00:00Z"), LocalDateTime.of(2021, 1, 1, 0, 0, 0)), - Row(LocalDate.of(2021, 1, 1), Instant.parse("2021-01-01T00:00:00Z"), + Row(LocalDate.of(2021, 1, 2), Instant.parse("2021-01-01T00:00:00Z"), LocalDateTime.of(2021, 1, 1, 0, 0, 0)), Row(LocalDate.of(2021, 2, 1), Instant.parse("2021-03-02T00:00:00Z"), LocalDateTime.of(2021, 10, 1, 0, 0, 0)), @@ -2826,22 +2827,13 @@ abstract class CSVSuite expectedSchema: StructType, expectedData: Seq[Seq[Any]]): Unit = { - // Error should be thrown when attempting to use prefersDate with Legacy parser - if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) { - checkError( - exception = intercept[SparkIllegalArgumentException] { - spark.read.format("csv").options(options).load(testFile(dateInferSchemaFile)) - }, - errorClass = "CANNOT_INFER_DATE") - } else { - val results = spark.read - .format("csv") - .options(options) - .load(testFile(dateInferSchemaFile)) + val results = spark.read + .format("csv") + .options(options) + .load(testFile(dateInferSchemaFile)) - assert(results.schema == expectedSchema) - assert(results.collect().toSeq.map(_.toSeq) == expectedData) - } + assert(results.schema == expectedSchema) + assert(results.collect().toSeq.map(_.toSeq) == expectedData) } // When timestamp format is given, infer columns with mixing dates and timestamps as string type @@ -2944,17 +2936,17 @@ abstract class CSVSuite } check( - "legacy", + "corrected", Seq( - Row(1, Date.valueOf("2020-01-01"), Timestamp.valueOf("2020-01-01 00:00:00")), + Row(1, null, null), Row(2, Date.valueOf("2020-12-03"), Timestamp.valueOf("2020-12-03 00:00:00")) ) ) check( - "corrected", + "legacy", Seq( - Row(1, null, null), + Row(1, Date.valueOf("2020-01-01"), Timestamp.valueOf("2020-01-01 00:00:00")), Row(2, Date.valueOf("2020-12-03"), Timestamp.valueOf("2020-12-03 00:00:00")) ) ) From 5288eb0cf913d1ac0a424082bd609939e76e5dbf Mon Sep 17 00:00:00 2001 From: xiaonanyang-db Date: Thu, 22 Sep 2022 11:12:41 -0700 Subject: [PATCH 23/26] [SPARK-40474] fix tests --- .../catalyst/csv/CSVInferSchemaSuite.scala | 9 +- .../catalyst/csv/UnivocityParserSuite.scala | 3 +- .../execution/datasources/csv/CSVSuite.scala | 149 +++++++++++------- 3 files changed, 100 insertions(+), 61 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala index 707ab87792f6..8cae2400e0ce 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala @@ -217,19 +217,18 @@ class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper { test("SPARK-39469: inferring date type") { // "yyyy/MM/dd" format - var options = new CSVOptions(Map("dateFormat" -> "yyyy/MM/dd", "prefersDate" -> "true"), + var options = new CSVOptions(Map("dateFormat" -> "yyyy/MM/dd"), false, "UTC") var inferSchema = new CSVInferSchema(options) assert(inferSchema.inferField(NullType, "2018/12/02") == DateType) // "MMM yyyy" format - options = new CSVOptions(Map("dateFormat" -> "MMM yyyy", "prefersDate" -> "true"), + options = new CSVOptions(Map("dateFormat" -> "MMM yyyy"), false, "GMT") inferSchema = new CSVInferSchema(options) assert(inferSchema.inferField(NullType, "Dec 2018") == DateType) // Field should strictly match date format to infer as date options = new CSVOptions( - Map("dateFormat" -> "yyyy-MM-dd", "timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", - "prefersDate" -> "true"), + Map("dateFormat" -> "yyyy-MM-dd", "timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss"), columnPruning = false, defaultTimeZoneId = "GMT") inferSchema = new CSVInferSchema(options) @@ -240,7 +239,7 @@ class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper { test("SPARK-39469: inferring the schema of columns with mixing dates and timestamps properly") { var options = new CSVOptions( Map("dateFormat" -> "yyyy_MM_dd", "timestampFormat" -> "yyyy|MM|dd", - "timestampNTZFormat" -> "yyyy/MM/dd", "prefersDate" -> "true"), + "timestampNTZFormat" -> "yyyy/MM/dd"), columnPruning = false, defaultTimeZoneId = "UTC") var inferSchema = new CSVInferSchema(options) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala index 37605e14b926..87e189adb42e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala @@ -351,7 +351,8 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper { days(2020, 1, 12)) } - val options = new CSVOptions(Map.empty[String, String], false, "UTC") + // To use legacy date parser in UnivocityParser, `prefersDate` need to be disabled. + val options = new CSVOptions(Map("prefersDate" -> "false"), false, "UTC") check(new UnivocityParser(StructType(Seq.empty), options)) def optionsWithPattern(enableFallback: Boolean): CSVOptions = new CSVOptions( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 1a003a68fa68..a8e9c65b565e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -36,7 +36,7 @@ import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.GzipCodec import org.apache.logging.log4j.Level -import org.apache.spark.{SparkConf, SparkException, SparkIllegalArgumentException, SparkUpgradeException, TestUtils} +import org.apache.spark.{SparkConf, SparkException, SparkUpgradeException, TestUtils} import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Encoders, QueryTest, Row} import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils} import org.apache.spark.sql.execution.datasources.CommonFileDataSourceSuite @@ -2820,62 +2820,107 @@ abstract class CSVSuite } } - test("SPARK-39469: Infer schema for columns with only dates " + - "and columns with mixing date and timestamps correctly") { - def checkCSVReadDatetime( - options: Map[String, String], - expectedSchema: StructType, - expectedData: Seq[Seq[Any]]): Unit = { + test("SPARK-39469: Infer schema for columns with all dates") { + withTempPath { path => + Seq( + "2001-09-08", + "1941-01-02", + "0293-11-07" + ).toDF() + .repartition(1) + .write.text(path.getAbsolutePath) - val results = spark.read + val options = Map( + "header" -> "false", + "inferSchema" -> "true", + "timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss") + + val df = spark.read .format("csv") .options(options) - .load(testFile(dateInferSchemaFile)) + .load(path.getAbsolutePath) - assert(results.schema == expectedSchema) - assert(results.collect().toSeq.map(_.toSeq) == expectedData) + val expected = if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) { + // When legacy parser is enabled, `prefersDate` will be disabled + Seq( + Row("2001-09-08"), + Row("1941-01-02"), + Row("0293-11-07") + ) + } else { + Seq( + Row(Date.valueOf("2001-9-8")), + Row(Date.valueOf("1941-1-2")), + Row(Date.valueOf("0293-11-7")) + ) + } + + checkAnswer(df, expected) } + } - // When timestamp format is given, infer columns with mixing dates and timestamps as string type - var options = Map( - "header" -> "true", - "inferSchema" -> "true", - "timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", - "dateFormat" -> "yyyy-MM-dd", - "prefersDate" -> "true") - var expectedSchema = StructType(List(StructField("date", DateType), - StructField("timestamp-date", StringType), - StructField("date-timestamp", StringType))) - var expectedData = + test("SPARK-40474: Infer schema for columns with a mix of dates and timestamp") { + withTempPath { path => Seq( - Seq(Date.valueOf("2001-9-8"), "2014-10-27T18:30:00", "1765-03-28"), - Seq(Date.valueOf("1941-1-2"), "2000-09-14T01:01:00", "1423-11-12T23:41:00"), - Seq(Date.valueOf("0293-11-7"), "1995-06-25", "2016-01-28T20:00:00") - ) - checkCSVReadDatetime(options, expectedSchema, expectedData) + "1765-03-28", + "1423-11-12T23:41:00", + "2016-01-28T20:00:00" + ).toDF() + .repartition(1) + .write.text(path.getAbsolutePath) - // When timestamp format is not given, infer columns with mixing dates and timestamps as - // timestamp type - options = Map( - "header" -> "true", - "inferSchema" -> "true", - "prefersDate" -> "true") - expectedSchema = StructType(List(StructField("date", DateType), - StructField("timestamp-date", TimestampType), - StructField("date-timestamp", TimestampType))) - expectedData = - Seq( - Seq(Date.valueOf("2001-9-8"), Timestamp.valueOf("2014-10-27 18:30:0.0"), - Timestamp.valueOf("1765-03-28 00:00:0.0")), - Seq(Date.valueOf("1941-1-2"), Timestamp.valueOf("2000-09-14 01:01:0.0"), - Timestamp.valueOf("1423-11-12 23:41:0.0")), - Seq(Date.valueOf("0293-11-7"), Timestamp.valueOf("1995-06-25 00:00:00.0"), - Timestamp.valueOf("2016-01-28 20:00:00.0")) - ) - checkCSVReadDatetime(options, expectedSchema, expectedData) + if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) { + val options = Map( + "header" -> "false", + "inferSchema" -> "true", + "timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss") + val df = spark.read + .format("csv") + .options(options) + .load(path.getAbsolutePath) + val expected = Seq( + Row(Timestamp.valueOf("1765-03-28 00:00:00.0")), + Row(Timestamp.valueOf("1423-11-12 23:41:00.0")), + Row(Timestamp.valueOf("2016-01-28 20:00:00.0")) + ) + checkAnswer(df, expected) + } else { + // When timestampFormat is specified, infer and parse the column as strings + val options1 = Map( + "header" -> "false", + "inferSchema" -> "true", + "timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss") + val df1 = spark.read + .format("csv") + .options(options1) + .load(path.getAbsolutePath) + val expected1 = Seq( + Row("1765-03-28"), + Row("1423-11-12T23:41:00"), + Row("2016-01-28T20:00:00") + ) + checkAnswer(df1, expected1) + + // When timestampFormat is not specified, infer and parse the column as + // timestamp type if possible + val options2 = Map( + "header" -> "false", + "inferSchema" -> "true") + val df2 = spark.read + .format("csv") + .options(options2) + .load(path.getAbsolutePath) + val expected2 = Seq( + Row(Timestamp.valueOf("1765-03-28 00:00:00.0")), + Row(Timestamp.valueOf("1423-11-12 23:41:00.0")), + Row(Timestamp.valueOf("2016-01-28 20:00:00.0")) + ) + checkAnswer(df2, expected2) + } + } } - test("SPARK-39904: Parse incorrect timestamp values with prefersDate=true") { + test("SPARK-39904: Parse incorrect timestamp values") { withTempPath { path => Seq( "2020-02-01 12:34:56", @@ -2890,16 +2935,10 @@ abstract class CSVSuite val output = spark.read .schema(schema) - .option("prefersDate", "true") .csv(path.getAbsolutePath) - if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) { - checkError( - exception = intercept[SparkIllegalArgumentException] { - output.collect() - }, - errorClass = "CANNOT_INFER_DATE") - } else { + if (SQLConf.get.legacyTimeParserPolicy != LegacyBehaviorPolicy.LEGACY) { + // When legacy parser is enabled, `prefersDate` will be disabled checkAnswer( output, Seq( From a2f0b8076d6f808df53cb22634cf562d509432a2 Mon Sep 17 00:00:00 2001 From: xiaonanyang-db Date: Thu, 22 Sep 2022 21:58:49 -0700 Subject: [PATCH 24/26] [SPARK-40474] revert code causing behavior change --- .../apache/spark/sql/catalyst/csv/CSVOptions.scala | 5 +++-- .../spark/sql/catalyst/csv/UnivocityParser.scala | 5 ++++- .../sql/execution/datasources/csv/CSVSuite.scala | 13 ++++++------- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index 242b0439e7bb..88396c65cc07 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -165,13 +165,14 @@ class CSVOptions( } } + val dateFormatOption: Option[String] = parameters.get("dateFormat") // Provide a default value for dateFormatInRead when prefersDate. This ensures that the // Iso8601DateFormatter (with strict date parsing) is used for date inference val dateFormatInRead: Option[String] = if (prefersDate) { - Option(parameters.getOrElse("dateFormat", DateFormatter.defaultPattern)) + Option(dateFormatOption.getOrElse(DateFormatter.defaultPattern)) } else { - parameters.get("dateFormat") + dateFormatOption } val dateFormatInWrite: String = parameters.getOrElse("dateFormat", DateFormatter.defaultPattern) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 4196bb8815f1..8464e394ab5a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -132,7 +132,10 @@ class UnivocityParser( private val enableParsingFallbackForDateType = options.enableDateTimeParsingFallback .orElse(SQLConf.get.csvEnableDateTimeParsingFallback) - .getOrElse(!options.prefersDate) + .getOrElse { + SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY || + options.dateFormatOption.isEmpty + } // Retrieve the raw record string. private def getCurrentInput: UTF8String = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index a8e9c65b565e..fc870190a1fb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -2612,8 +2612,7 @@ abstract class CSVSuite test("SPARK-30960: parse date/timestamp string with legacy format") { val ds = Seq("2020-1-12 3:23:34.12, 2020-1-12 T").toDS() - val csv = spark.read.option("header", false).option("prefersDate", false) - .schema("t timestamp, d date").csv(ds) + val csv = spark.read.option("header", false).schema("t timestamp, d date").csv(ds) checkAnswer(csv, Row(Timestamp.valueOf("2020-1-12 3:23:34.12"), Date.valueOf("2020-1-12"))) } @@ -2778,10 +2777,10 @@ abstract class CSVSuite withTempPath { path => Seq( """d,ts_ltz,ts_ntz""", - """2021-01-01,2021,2021""", - """2021-01-02,2021-01 ,2021-01""", - """2021-02-01,2021-3-02,2021-10-1""", - """2021-08-18,2021-8-18 21:44:30Z,2021-8-18T21:44:30.123""" + """2021,2021,2021""", + """2021-01,2021-01 ,2021-01""", + """ 2021-2-1,2021-3-02,2021-10-1""", + """2021-8-18 00:00:00,2021-8-18 21:44:30Z,2021-8-18T21:44:30.123""" ).toDF().repartition(1).write.text(path.getCanonicalPath) val readback = spark.read.schema("d date, ts_ltz timestamp_ltz, ts_ntz timestamp_ntz") .option("header", true) @@ -2791,7 +2790,7 @@ abstract class CSVSuite Seq( Row(LocalDate.of(2021, 1, 1), Instant.parse("2021-01-01T00:00:00Z"), LocalDateTime.of(2021, 1, 1, 0, 0, 0)), - Row(LocalDate.of(2021, 1, 2), Instant.parse("2021-01-01T00:00:00Z"), + Row(LocalDate.of(2021, 1, 1), Instant.parse("2021-01-01T00:00:00Z"), LocalDateTime.of(2021, 1, 1, 0, 0, 0)), Row(LocalDate.of(2021, 2, 1), Instant.parse("2021-03-02T00:00:00Z"), LocalDateTime.of(2021, 10, 1, 0, 0, 0)), From 00a8661378ce666af513dc83204f77d3facfb1f5 Mon Sep 17 00:00:00 2001 From: xiaonanyang-db Date: Thu, 22 Sep 2022 22:04:41 -0700 Subject: [PATCH 25/26] [SPARK-40474] revert changes --- .../apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala index 87e189adb42e..37605e14b926 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala @@ -351,8 +351,7 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper { days(2020, 1, 12)) } - // To use legacy date parser in UnivocityParser, `prefersDate` need to be disabled. - val options = new CSVOptions(Map("prefersDate" -> "false"), false, "UTC") + val options = new CSVOptions(Map.empty[String, String], false, "UTC") check(new UnivocityParser(StructType(Seq.empty), options)) def optionsWithPattern(enableFallback: Boolean): CSVOptions = new CSVOptions( From 16e187c2a3444e4e81ec7bcbb3d723f85278e60f Mon Sep 17 00:00:00 2001 From: xiaonanyang-db Date: Thu, 22 Sep 2022 22:07:40 -0700 Subject: [PATCH 26/26] SPARK-40474 reduce diff --- .../spark/sql/execution/datasources/csv/CSVSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index fc870190a1fb..4091609f3008 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -2974,17 +2974,17 @@ abstract class CSVSuite } check( - "corrected", + "legacy", Seq( - Row(1, null, null), + Row(1, Date.valueOf("2020-01-01"), Timestamp.valueOf("2020-01-01 00:00:00")), Row(2, Date.valueOf("2020-12-03"), Timestamp.valueOf("2020-12-03 00:00:00")) ) ) check( - "legacy", + "corrected", Seq( - Row(1, Date.valueOf("2020-01-01"), Timestamp.valueOf("2020-01-01 00:00:00")), + Row(1, null, null), Row(2, Date.valueOf("2020-12-03"), Timestamp.valueOf("2020-12-03 00:00:00")) ) )