Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ private[csv] object CSVInferSchema {
// DecimalTypes have different precisions and scales, so we try to find the common type.
findTightestCommonType(typeSoFar, tryParseDecimal(field, options)).getOrElse(StringType)
case DoubleType => tryParseDouble(field, options)
case DateType => tryParseDate(field, options)
case TimestampType => tryParseTimestamp(field, options)
case BooleanType => tryParseBoolean(field, options)
case StringType => StringType
Expand Down Expand Up @@ -140,14 +141,23 @@ private[csv] object CSVInferSchema {
private def tryParseDouble(field: String, options: CSVOptions): DataType = {
if ((allCatch opt field.toDouble).isDefined || isInfOrNan(field, options)) {
DoubleType
} else {
tryParseDate(field, options)
}
}

private def tryParseDate(field: String, options: CSVOptions): DataType = {
// This case infers a custom `dateFormat` is set.
if ((allCatch opt options.dateFormatter.parse(field)).isDefined) {
DateType
} else {
tryParseTimestamp(field, options)
}
}

private def tryParseTimestamp(field: String, options: CSVOptions): DataType = {
// This case infers a custom `dataFormat` is set.
if ((allCatch opt options.timestampFormat.parse(field)).isDefined) {
// This case infers a custom `timestampFormat` is set.
if ((allCatch opt options.timestampFormatter.parse(field)).isDefined) {
Copy link
Member

@HyukjinKwon HyukjinKwon Apr 29, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we replace it to timestampFormatter in CSV parsing logic too and document it in the migration guide? (e.g., date format is now inferred correctly and also things you mentioned in #20140 (comment))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably, adding a configuration to control this behaviour looks preferred in this case.

TimestampType
} else if ((allCatch opt DateTimeUtils.stringToTime(field)).isDefined) {
// We keep this for backwards compatibility.
Expand Down Expand Up @@ -216,6 +226,8 @@ private[csv] object CSVInferSchema {
} else {
Some(DecimalType(range + scale, scale))
}
// By design 'TimestampType' (8 bytes) is larger than 'DateType' (4 bytes).
case (t1: DateType, t2: TimestampType) => Some(TimestampType)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should do the opposite case too

case (t1: TimestampType, t2: DateType) => Some(TimestampType)


case _ => None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.datasources.csv

import java.nio.charset.StandardCharsets
import java.time.format.{DateTimeFormatter, ResolverStyle}
import java.util.{Locale, TimeZone}

import com.univocity.parsers.csv.{CsvParserSettings, CsvWriterSettings, UnescapedQuoteHandling}
Expand Down Expand Up @@ -150,6 +151,16 @@ class CSVOptions(

val isCommentSet = this.comment != '\u0000'

lazy val dateFormatter: DateTimeFormatter = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@transient lazy val

DateTimeFormatter.ofPattern(dateFormat.getPattern)
.withLocale(Locale.US).withZone(timeZone.toZoneId).withResolverStyle(ResolverStyle.SMART)
}

lazy val timestampFormatter: DateTimeFormatter = {
DateTimeFormatter.ofPattern(timestampFormat.getPattern)
.withLocale(Locale.US).withZone(timeZone.toZoneId).withResolverStyle(ResolverStyle.SMART)
}

def asWriterSettings: CsvWriterSettings = {
val writerSettings = new CsvWriterSettings()
val format = writerSettings.getFormat
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
timestamp,date
26/08/2015 22:31:46.913,27/09/2015
27/10/2014 22:33:31.601,26/12/2016
28/01/2016 22:33:52.888,28/01/2017
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,21 @@ class CSVInferSchemaSuite extends SparkFunSuite {
assert(CSVInferSchema.inferField(IntegerType, textValueOne, options) == expectedTypeOne)
}

test("Timestamp field types are inferred correctly via custom data format") {
test("Timestamp field types are inferred correctly via custom date format") {
var options = new CSVOptions(Map("timestampFormat" -> "yyyy-mm"), "GMT")
assert(CSVInferSchema.inferField(TimestampType, "2015-08", options) == TimestampType)
options = new CSVOptions(Map("timestampFormat" -> "yyyy"), "GMT")
assert(CSVInferSchema.inferField(TimestampType, "2015", options) == TimestampType)
}

test("Date field types are inferred correctly via custom date and timestamp format") {
val options = new CSVOptions(Map("dateFormat" -> "dd/MM/yyyy",
"timestampFormat" -> "dd/MM/yyyy HH:mm:ss.SSS"), "GMT")
assert(CSVInferSchema.inferField(TimestampType,
"28/01/2017 22:31:46.913", options) == TimestampType)
assert(CSVInferSchema.inferField(DateType, "16/12/2012", options) == DateType)
}

test("Timestamp field types are inferred correctly from other types") {
val options = new CSVOptions(Map.empty[String, String], "GMT")
assert(CSVInferSchema.inferField(IntegerType, "2015-08-20 14", options) == StringType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
private val simpleSparseFile = "test-data/simple_sparse.csv"
private val numbersFile = "test-data/numbers.csv"
private val datesFile = "test-data/dates.csv"
private val datesAndTimestampsFile = "test-data/dates-and-timestamps.csv"
private val unescapedQuotesFile = "test-data/unescaped-quotes.csv"
private val valueMalformedFile = "test-data/value-malformed.csv"

Expand Down Expand Up @@ -566,6 +567,44 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
assert(results.toSeq.map(_.toSeq) === expected)
}

test("inferring timestamp types and date types via custom formats") {
val options = Map(
"header" -> "true",
"inferSchema" -> "true",
"timestampFormat" -> "dd/MM/yyyy HH:mm:ss.SSS",
"dateFormat" -> "dd/MM/yyyy")
val results = spark.read
.format("csv")
.options(options)
.load(testFile(datesAndTimestampsFile))
assert(results.schema{0}.dataType===TimestampType)
assert(results.schema{1}.dataType===DateType)
val timestamps = spark.read
.format("csv")
.options(options)
.load(testFile(datesAndTimestampsFile))
.select("timestamp")
.collect()
val timestampFormat = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss.SSS", Locale.US)
val timestampExpected =
Seq(Seq(new Timestamp(timestampFormat.parse("26/08/2015 22:31:46.913").getTime)),
Seq(new Timestamp(timestampFormat.parse("27/10/2014 22:33:31.601").getTime)),
Seq(new Timestamp(timestampFormat.parse("28/01/2016 22:33:52.888").getTime)))
assert(timestamps.toSeq.map(_.toSeq) === timestampExpected)
val dates = spark.read
.format("csv")
.options(options)
.load(testFile(datesAndTimestampsFile))
.select("date")
.collect()
val dateFormat = new SimpleDateFormat("dd/MM/yyyy", Locale.US)
val dateExpected =
Seq(Seq(new Date(dateFormat.parse("27/09/2015").getTime)),
Seq(new Date(dateFormat.parse("26/12/2016").getTime)),
Seq(new Date(dateFormat.parse("28/01/2017").getTime)))
assert(dates.toSeq.map(_.toSeq) === dateExpected)
}

test("load date types via custom date format") {
val customSchema = new StructType(Array(StructField("date", DateType, true)))
val options = Map(
Expand Down