Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
0394030
SPARK-40474 Infer columns with mixed date and timestamp as String in …
xiaonanyang-db Sep 19, 2022
f4fadf7
[SPARK-40474] remove unused imports
xiaonanyang-db Sep 19, 2022
813ac74
[SPARK-40474] Resolve test failures
xiaonanyang-db Sep 19, 2022
5c2dde8
[SPARK-40474] fix test failures
xiaonanyang-db Sep 20, 2022
0d2be1d
[SPARK-40474] handle edge cases
xiaonanyang-db Sep 20, 2022
df56946
[SPARK-40474] handle edge cases
xiaonanyang-db Sep 20, 2022
4bc480d
SPARK-40474 revert part of CSVOptions changes
xiaonanyang-db Sep 20, 2022
f6ed29f
SPARK-40474 revert part of CSVOptions changes
xiaonanyang-db Sep 20, 2022
93b6422
[SPARK-40474] fix test failures
xiaonanyang-db Sep 20, 2022
6942f2b
[SPARK-40474] handle columns with mixing dates and timestamps inferen…
xiaonanyang-db Sep 21, 2022
b4a6f1d
[SPARK-40474] remove unnecessary changes
xiaonanyang-db Sep 21, 2022
1502618
[SPARK-40474] small changes
xiaonanyang-db Sep 21, 2022
4767ae7
Merge remote-tracking branch 'origin' into SPARK-40474
xiaonanyang-db Sep 21, 2022
1f57098
[SPARK-40474] remove new line added by mistake
xiaonanyang-db Sep 21, 2022
e9150ec
[SPARK-40474] address comments
xiaonanyang-db Sep 21, 2022
a07e432
[SPARK-40474] small changes
xiaonanyang-db Sep 21, 2022
255aea3
[SPARK-40474] fix test failures
xiaonanyang-db Sep 21, 2022
533c487
[SPARK-40474] address review comments
xiaonanyang-db Sep 22, 2022
be4c86f
[SPARK-40474] fix test failures
xiaonanyang-db Sep 22, 2022
c7225b1
[SPARK-40474] update doc
xiaonanyang-db Sep 22, 2022
9e87d6e
[SPARK-40474] disable prefersDate when leagcyTimeParser is enabled
xiaonanyang-db Sep 22, 2022
af66b83
[SPARK-40474] fix test failures
xiaonanyang-db Sep 22, 2022
812fa65
[SPARK-40474] fix test failures
xiaonanyang-db Sep 22, 2022
5288eb0
[SPARK-40474] fix tests
xiaonanyang-db Sep 22, 2022
a2f0b80
[SPARK-40474] revert code causing behavior change
xiaonanyang-db Sep 23, 2022
00a8661
[SPARK-40474] revert changes
xiaonanyang-db Sep 23, 2022
16e187c
SPARK-40474 reduce diff
xiaonanyang-db Sep 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/sql-data-sources-csv.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ Data source options of CSV can be set via:
</tr>
<tr>
<td><code>prefersDate</code></td>
<td>false</td>
<td>During schema inference (<code>inferSchema</code>), attempts to infer string columns that contain dates or timestamps as <code>Date</code> if the values satisfy the <code>dateFormat</code> option and failed to be parsed by the respective formatter. With a user-provided schema, attempts to parse timestamp columns as dates using <code>dateFormat</code> if they fail to conform to <code>timestampFormat</code>, in this case the parsed values will be cast to timestamp type afterwards.</td>
<td>true</td>
<td>During schema inference (<code>inferSchema</code>), attempts to infer string columns that contain dates as <code>Date</code> if the values satisfy the <code>dateFormat</code> option or default date format. For columns that contain a mixture of dates and timestamps, try inferring them as <code>TimestampType</code> if timestamp format not specified, otherwise infer them as <code>StringType</code>.</td>
<td>read</td>
</tr>
<tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {
ExprUtils.getDecimalParser(options.locale)
}

// Date formats that could be parsed in DefaultTimestampFormatter
// Reference: DateTimeUtils.parseTimestampString
// Used to determine inferring a column with mixture of dates and timestamps as TimestampType or
// StringType when no timestamp format is specified (the lenient timestamp formatter will be used)
private val LENIENT_TS_FORMATTER_SUPPORTED_DATE_FORMATS = Set(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I don't quite get this part, would you be able to elaborate on why we need to keep a set of such formats?

More of a thought experiment, I don't think this actually happens in practice: cast allows years longer than 4 digits, is it something that needs to be supported here? For more context, https://issues.apache.org/jira/browse/SPARK-39731.

Also, will this work? My understanding is that we will not, only yyyy-MM-dd.

dateFormat = "yyyy/MM/dd"
timestampFormat = "yyyy/MM/dd HH:mm:ss" 

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this set to determine inferring a column with mixture of dates and timestamps as TimestampType or StringType when no timestamp format is specified (the lenient timestamp formatter will be used)

Copy link
Contributor Author

@xiaonanyang-db xiaonanyang-db Sep 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dateFormat = "yyyy/MM/dd"
timestampFormat = "yyyy/MM/dd HH:mm:ss" 

I don't quite understand your question on this case.
But speaking in the context of this PR, because timestampFormat is specified, a column with a mix of dates and timestamps will be inferred as StringType.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More of a thought experiment, I don't think this actually happens in practice: cast allows years longer than 4 digits, is it something that needs to be supported here? For more context, https://issues.apache.org/jira/browse/SPARK-39731.

That's some interesting formats, I am not sure if we need to take care of them here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate on why we need LENIENT_TS_FORMATTER_SUPPORTED_DATE_FORMATS? I understand how it is used. Also, I am not a supporter of hardcoding date/time formats here.

Copy link
Contributor Author

@xiaonanyang-db xiaonanyang-db Sep 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When timestamp format is not specified, the desired behavior is that a column with mix of dates and timestamps could be inferred as timestamp type if the lenient timestamp formatter can parse the date strings under the column as well.

To achieve that without bringing other performance concern, we want to simply check if the date format could be supported by the lenient timestamp formatter. Does that make sense?

"yyyy-MM-dd", "yyyy-M-d", "yyyy-M-dd", "yyyy-MM-d", "yyyy-MM", "yyyy-M", "yyyy")

/**
* Similar to the JSON schema inference
* 1. Infer type of each row
Expand Down Expand Up @@ -123,10 +130,8 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {
case LongType => tryParseLong(field)
case _: DecimalType => tryParseDecimal(field)
case DoubleType => tryParseDouble(field)
case DateType => tryParseDateTime(field)
case TimestampNTZType if options.prefersDate => tryParseDateTime(field)
case DateType => tryParseDate(field)
case TimestampNTZType => tryParseTimestampNTZ(field)
case TimestampType if options.prefersDate => tryParseDateTime(field)
case TimestampType => tryParseTimestamp(field)
case BooleanType => tryParseBoolean(field)
case StringType => StringType
Expand Down Expand Up @@ -179,13 +184,13 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {
if ((allCatch opt field.toDouble).isDefined || isInfOrNan(field)) {
DoubleType
} else if (options.prefersDate) {
tryParseDateTime(field)
tryParseDate(field)
} else {
tryParseTimestampNTZ(field)
}
}

private def tryParseDateTime(field: String): DataType = {
private def tryParseDate(field: String): DataType = {
if ((allCatch opt dateFormatter.parse(field)).isDefined) {
DateType
} else {
Expand Down Expand Up @@ -233,7 +238,40 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {
* is compatible with both input data types.
*/
private def compatibleType(t1: DataType, t2: DataType): Option[DataType] = {
TypeCoercion.findTightestCommonType(t1, t2).orElse(findCompatibleTypeForCSV(t1, t2))
(t1, t2) match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this match be in findCompatibleTypeForCSV? Or does findTightestCommonType merge DateType and TimestampType in a way that is not applicable here?

Copy link
Contributor Author

@xiaonanyang-db xiaonanyang-db Sep 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

findTightestCommonType merge DateType and TimestampType in a way that is not applicable here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What result does findTightestCommonType return for DateType and TimestampType?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(d1, d2) match {
      case (_: TimestampType, _: DateType) | (_: DateType, _: TimestampType) =>
        TimestampType

      case (_: TimestampType, _: TimestampNTZType) | (_: TimestampNTZType, _: TimestampType) =>
        TimestampType

      case (_: TimestampNTZType, _: DateType) | (_: DateType, _: TimestampNTZType) =>
        TimestampNTZType
    }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind, I checked the code, the resulting type will be TimestampType or TimestampNTZ.

case (DateType, TimestampType) | (DateType, TimestampNTZType) |
(TimestampNTZType, DateType) | (TimestampType, DateType) =>
// For a column containing a mixture of dates and timestamps, infer it as timestamp type
// if its dates can be inferred as timestamp type, otherwise infer it as StringType.
// This only happens when the timestamp pattern is not specified, as the default timestamp
// parser is very lenient and can parse date string as well.
val dateFormat = options.dateFormatInRead.getOrElse(DateFormatter.defaultPattern)
t1 match {
case DateType if canParseDateAsTimestamp(dateFormat, t2) =>
Some(t2)
case TimestampType | TimestampNTZType if canParseDateAsTimestamp(dateFormat, t1) =>
Some(t1)
case _ => Some(StringType)
}
case _ => TypeCoercion.findTightestCommonType(t1, t2).orElse(findCompatibleTypeForCSV(t1, t2))
}
}

/**
* Return true if strings of given date format can be parsed as timestamps
* 1. If user provides timestamp format, we will parse strings as timestamps using
* Iso8601TimestampFormatter (with strict timestamp parsing). Any date string can not be parsed
* as timestamp type in this case
* 2. Otherwise, we will use DefaultTimestampFormatter to parse strings as timestamps, which
* is more lenient and can parse strings of some date formats as timestamps.
*/
private def canParseDateAsTimestamp(dateFormat: String, tsType: DataType): Boolean = {
if ((tsType.isInstanceOf[TimestampType] && options.timestampFormatInRead.isEmpty) ||
(tsType.isInstanceOf[TimestampNTZType] && options.timestampNTZFormatInRead.isEmpty)) {
LENIENT_TS_FORMATTER_SUPPORTED_DATE_FORMATS.contains(dateFormat)
Copy link
Contributor

@cloud-fan cloud-fan Sep 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to cover these corner cases? We can just say that we can only parse date as timestamp if neither timestamp pattern nor date pattern is specified.

Copy link
Contributor Author

@xiaonanyang-db xiaonanyang-db Sep 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a behavior change in terms of Spark 3.3 branch, where a column with mixed dates and timestamps could be inferred as timestamp type if possible when no timestamp pattern specified.

} else {
false
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,32 +149,30 @@ class CSVOptions(
val locale: Locale = parameters.get("locale").map(Locale.forLanguageTag).getOrElse(Locale.US)

/**
* Infer columns with all valid date entries as date type (otherwise inferred as timestamp type)
* if schema inference is enabled. When being used with user-provided schema, tries to parse
* timestamp values as dates if the values do not conform to the timestamp formatter before
* falling back to the backward compatible parsing - the parsed values will be cast to timestamp
* afterwards.
* Infer columns with all valid date entries as date type (otherwise inferred as string or
* timestamp type) if schema inference is enabled.
*
* Disabled by default for backwards compatibility and performance.
* Enabled by default.
*
* Not compatible with legacyTimeParserPolicy == LEGACY since legacy date parser will accept
* extra trailing characters.
* extra trailing characters. Thus, disabled when legacyTimeParserPolicy == LEGACY
*/
val prefersDate = {
val inferDateFlag = getBool("prefersDate")
if (inferDateFlag && SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) {
throw QueryExecutionErrors.inferDateWithLegacyTimeParserError()
if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) {
false
} else {
getBool("prefersDate", true)
}
inferDateFlag
}

val dateFormatOption: Option[String] = parameters.get("dateFormat")
// Provide a default value for dateFormatInRead when prefersDate. This ensures that the
// Iso8601DateFormatter (with strict date parsing) is used for date inference
val dateFormatInRead: Option[String] =
if (prefersDate) {
Option(parameters.getOrElse("dateFormat", DateFormatter.defaultPattern))
Option(dateFormatOption.getOrElse(DateFormatter.defaultPattern))
} else {
parameters.get("dateFormat")
dateFormatOption
}
val dateFormatInWrite: String = parameters.getOrElse("dateFormat", DateFormatter.defaultPattern)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, OrderedFilters}
import org.apache.spark.sql.catalyst.expressions.{Cast, EmptyRow, ExprUtils, GenericInternalRow, Literal}
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{daysToMicros, TimeZoneUTC}
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
import org.apache.spark.sql.errors.QueryExecutionErrors
Expand Down Expand Up @@ -135,7 +134,7 @@ class UnivocityParser(
.orElse(SQLConf.get.csvEnableDateTimeParsingFallback)
.getOrElse {
SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY ||
options.dateFormatInRead.isEmpty
options.dateFormatOption.isEmpty
}

// Retrieve the raw record string.
Expand Down Expand Up @@ -238,29 +237,19 @@ class UnivocityParser(
timestampFormatter.parse(datum)
} catch {
case NonFatal(e) =>
// There may be date type entries in timestamp column due to schema inference
if (options.prefersDate) {
daysToMicros(dateFormatter.parse(datum), options.zoneId)
} else {
// If fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility if enabled.
if (!enableParsingFallbackForTimestampType) {
throw e
}
val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum))
DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse(throw(e))
// If fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility if enabled.
if (!enableParsingFallbackForTimestampType) {
throw e
}
val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum))
DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse(throw(e))
}
}

case _: TimestampNTZType => (d: String) =>
nullSafeDatum(d, name, nullable, options) { datum =>
try {
timestampNTZFormatter.parseWithoutTimeZone(datum, false)
} catch {
case NonFatal(e) if options.prefersDate =>
daysToMicros(dateFormatter.parse(datum), TimeZoneUTC.toZoneId)
}
timestampNTZFormatter.parseWithoutTimeZone(datum, false)
}

case _: StringType => (d: String) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper {
}

test("Type arrays are merged to highest common type") {
val options = new CSVOptions(Map.empty[String, String], false, "UTC")
val inferSchema = new CSVInferSchema(options)
var options = new CSVOptions(Map.empty[String, String], false, "UTC")
var inferSchema = new CSVInferSchema(options)

assert(
inferSchema.mergeRowTypes(Array(StringType),
Expand All @@ -109,12 +109,28 @@ class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper {
assert(
inferSchema.mergeRowTypes(Array(DoubleType),
Array(LongType)).sameElements(Array(DoubleType)))

// Can merge DateType and TimestampType into TimestampType when no timestamp format specified
assert(
inferSchema.mergeRowTypes(Array(DateType),
Array(TimestampNTZType)).sameElements(Array(TimestampNTZType)))
assert(
inferSchema.mergeRowTypes(Array(DateType),
Array(TimestampType)).sameElements(Array(TimestampType)))

// Merge DateType and TimestampType into StringType when there are timestamp formats specified
options = new CSVOptions(
Map("timestampFormat" -> "yyyy-MM-dd HH:mm:ss",
"timestampNTZFormat" -> "yyyy/MM/dd HH:mm:ss"),
false,
"UTC")
inferSchema = new CSVInferSchema(options)
assert(
inferSchema.mergeRowTypes(Array(DateType),
Array(TimestampNTZType)).sameElements(Array(StringType)))
assert(
inferSchema.mergeRowTypes(Array(DateType),
Array(TimestampType)).sameElements(Array(StringType)))
}

test("Null fields are handled properly when a nullValue is specified") {
Expand Down Expand Up @@ -201,43 +217,43 @@ class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper {

test("SPARK-39469: inferring date type") {
// "yyyy/MM/dd" format
var options = new CSVOptions(Map("dateFormat" -> "yyyy/MM/dd", "prefersDate" -> "true"),
var options = new CSVOptions(Map("dateFormat" -> "yyyy/MM/dd"),
false, "UTC")
var inferSchema = new CSVInferSchema(options)
assert(inferSchema.inferField(NullType, "2018/12/02") == DateType)
// "MMM yyyy" format
options = new CSVOptions(Map("dateFormat" -> "MMM yyyy", "prefersDate" -> "true"),
options = new CSVOptions(Map("dateFormat" -> "MMM yyyy"),
false, "GMT")
inferSchema = new CSVInferSchema(options)
assert(inferSchema.inferField(NullType, "Dec 2018") == DateType)
// Field should strictly match date format to infer as date
options = new CSVOptions(
Map("dateFormat" -> "yyyy-MM-dd", "timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss",
"prefersDate" -> "true"),
Map("dateFormat" -> "yyyy-MM-dd", "timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss"),
columnPruning = false,
defaultTimeZoneId = "GMT")
inferSchema = new CSVInferSchema(options)
assert(inferSchema.inferField(NullType, "2018-12-03T11:00:00") == TimestampType)
assert(inferSchema.inferField(NullType, "2018-12-03") == DateType)
}

test("SPARK-39469: inferring date and timestamp types in a mixed column with prefersDate=true") {
test("SPARK-39469: inferring the schema of columns with mixing dates and timestamps properly") {
var options = new CSVOptions(
Map("dateFormat" -> "yyyy_MM_dd", "timestampFormat" -> "yyyy|MM|dd",
"timestampNTZFormat" -> "yyyy/MM/dd", "prefersDate" -> "true"),
"timestampNTZFormat" -> "yyyy/MM/dd"),
columnPruning = false,
defaultTimeZoneId = "UTC")
var inferSchema = new CSVInferSchema(options)

assert(inferSchema.inferField(DateType, "2012_12_12") == DateType)
assert(inferSchema.inferField(DateType, "2003|01|01") == TimestampType)

// inferField should infer a column as string type if it contains mixing dates and timestamps
assert(inferSchema.inferField(DateType, "2003|01|01") == StringType)
// SQL configuration must be set to default to TimestampNTZ
withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> "TIMESTAMP_NTZ") {
assert(inferSchema.inferField(DateType, "2003/02/05") == TimestampNTZType)
assert(inferSchema.inferField(DateType, "2003/02/05") == StringType)
}

// inferField should upgrade a date field to timestamp if the typeSoFar is a timestamp
assert(inferSchema.inferField(TimestampNTZType, "2012_12_12") == TimestampNTZType)
assert(inferSchema.inferField(TimestampType, "2018_12_03") == TimestampType)
assert(inferSchema.inferField(TimestampNTZType, "2012_12_12") == StringType)
assert(inferSchema.inferField(TimestampType, "2018_12_03") == StringType)

// No errors when Date and Timestamp have the same format. Inference defaults to date
options = new CSVOptions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.csv

import java.math.BigDecimal
import java.text.{DecimalFormat, DecimalFormatSymbols}
import java.time.{ZoneOffset}
import java.util.{Locale, TimeZone}

import org.apache.commons.lang3.time.FastDateFormat
Expand Down Expand Up @@ -372,26 +371,4 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper {
}
assert(err.getMessage.contains("Illegal pattern character: n"))
}

test("SPARK-39469: dates should be parsed correctly in timestamp column when prefersDate=true") {
def checkDate(dataType: DataType): Unit = {
val timestampsOptions =
new CSVOptions(Map("prefersDate" -> "true", "timestampFormat" -> "dd/MM/yyyy HH:mm",
"timestampNTZFormat" -> "dd-MM-yyyy HH:mm", "dateFormat" -> "dd_MM_yyyy"),
false, DateTimeUtils.getZoneId("-08:00").toString)
// Use CSVOption ZoneId="-08:00" (PST) to test that Dates in TimestampNTZ column are always
// converted to their equivalent UTC timestamp
val dateString = "08_09_2001"
val expected = dataType match {
case TimestampType => date(2001, 9, 8, 0, 0, 0, 0, ZoneOffset.of("-08:00"))
case TimestampNTZType => date(2001, 9, 8, 0, 0, 0, 0, ZoneOffset.UTC)
case DateType => days(2001, 9, 8)
}
val parser = new UnivocityParser(new StructType(), timestampsOptions)
assert(parser.makeConverter("d", dataType).apply(dateString) == expected)
}
checkDate(TimestampType)
checkDate(TimestampNTZType)
checkDate(DateType)
}
}
Loading