diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 984979ac5e9b..616de86aa682 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -29,6 +29,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.catalyst.util.DateTimeUtils.TimestampParser import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.Utils @@ -53,6 +54,8 @@ class JacksonParser( private val factory = new JsonFactory() options.setJacksonOptions(factory) + @transient private lazy val timestampParser = new TimestampParser(options.timestampFormat) + /** * Create a converter which converts the JSON documents held by the `JsonParser` * to a value according to a desired schema. This is a wrapper for the method @@ -213,15 +216,12 @@ class JacksonParser( (parser: JsonParser) => parseJsonToken[java.lang.Long](parser, dataType) { case VALUE_STRING => val stringValue = parser.getText - // This one will lose microseconds parts. - // See https://issues.apache.org/jira/browse/SPARK-10681. Long.box { - Try(options.timestampFormat.parse(stringValue).getTime * 1000L) - .getOrElse { - // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards - // compatibility. - DateTimeUtils.stringToTime(stringValue).getTime * 1000L - } + Try(timestampParser.parse(stringValue)).getOrElse { + // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards + // compatibility. + DateTimeUtils.stringToTime(stringValue).getTime * 1000L + } } case VALUE_NUMBER_INT => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 5deb83ef5624..d2bb5954914e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -18,15 +18,18 @@ package org.apache.spark.sql.catalyst.util import java.sql.{Date, Timestamp} -import java.text.{DateFormat, SimpleDateFormat} +import java.text.{DateFormat, ParsePosition, SimpleDateFormat} import java.time.Instant -import java.util.{Calendar, Locale, TimeZone} +import java.util.{Calendar, GregorianCalendar, Locale, TimeZone} import java.util.concurrent.ConcurrentHashMap import java.util.function.{Function => JFunction} import javax.xml.bind.DatatypeConverter import scala.annotation.tailrec +import org.apache.commons.lang3.time.FastDateFormat + +import org.apache.spark.sql.types.Decimal import org.apache.spark.unsafe.types.UTF8String /** @@ -1164,4 +1167,43 @@ object DateTimeUtils { threadLocalTimestampFormat.remove() threadLocalDateFormat.remove() } + + /** + * The custom sub-class of `GregorianCalendar` is needed to get access to + * protected `fields` immediately after parsing. We cannot use + * the `get()` method because it performs normalization of the fraction + * part. Accordingly, the `MILLISECOND` field doesn't contain original value. + */ + private class MicrosCalendar(tz: TimeZone) extends GregorianCalendar(tz, Locale.US) { + // Converts parsed `MILLISECOND` field to seconds fraction in microsecond precision. + // For example if the fraction pattern is `SSSS` then `digitsInFraction` = 4, and + // if the `MILLISECOND` field was parsed to `1234`. + def getMicros(digitsInFraction: Int): SQLTimestamp = { + // Append 6 zeros to the field: 1234 -> 1234000000 + val d = fields(Calendar.MILLISECOND) * MICROS_PER_SECOND + // Take the first 6 digits from `d`: 1234000000 -> 123400 + // The rest contains exactly `digitsInFraction`: `0000` = 10 ^ digitsInFraction + // So, the result is `(1234 * 1000000) / (10 ^ digitsInFraction) + d / Decimal.POW_10(digitsInFraction) + } + } + + /** + * An instance of the class is aimed to re-use many times. It contains helper objects + * `cal` and `digitsInFraction` that are reused between `parse()` invokes. + */ + class TimestampParser(format: FastDateFormat) { + private val digitsInFraction = format.getPattern.count(_ == 'S') + private val cal = new MicrosCalendar(format.getTimeZone) + + def parse(s: String): SQLTimestamp = { + cal.clear() // Clear the calendar because it can be re-used many times + if (!format.parse(s, new ParsePosition(0), cal)) { + throw new IllegalArgumentException(s"'$s' is an invalid timestamp") + } + val micros = cal.getMicros(digitsInFraction) + cal.set(Calendar.MILLISECOND, 0) + cal.getTimeInMillis * MICROS_PER_MILLIS + micros + } + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala index 121823249a7f..0da28c403816 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala @@ -432,7 +432,7 @@ object Decimal { /** Maximum number of decimal digits a Long can represent */ val MAX_LONG_DIGITS = 18 - private val POW_10 = Array.tabulate[Long](MAX_LONG_DIGITS + 1)(i => math.pow(10, i).toLong) + val POW_10 = Array.tabulate[Long](MAX_LONG_DIGITS + 1)(i => math.pow(10, i).toLong) private val BIG_DEC_ZERO = BigDecimal(0) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala index 2d055c7dddac..ced003c6ef2e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala @@ -21,6 +21,8 @@ import java.sql.{Date, Timestamp} import java.text.SimpleDateFormat import java.util.{Calendar, Locale, TimeZone} +import org.apache.commons.lang3.time.FastDateFormat + import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.util.DateTimeUtils._ import org.apache.spark.unsafe.types.UTF8String @@ -692,4 +694,41 @@ class DateTimeUtilsSuite extends SparkFunSuite { } } } + + test("parsing timestamp strings up to microsecond precision") { + DateTimeTestUtils.outstandingTimezones.foreach { timeZone => + def check(pattern: String, input: String, reference: String): Unit = { + val parser = new TimestampParser(FastDateFormat.getInstance(pattern, timeZone, Locale.US)) + val expected = DateTimeUtils.stringToTimestamp( + UTF8String.fromString(reference), timeZone).get + val actual = parser.parse(input) + assert(actual === expected) + } + + check("yyyy-MM-dd'T'HH:mm:ss.SSSSSSSXXX", + "2019-10-14T09:39:07.3220000Z", "2019-10-14T09:39:07.322Z") + check("yyyy-MM-dd'T'HH:mm:ss.SSSSSS", + "2019-10-14T09:39:07.322000", "2019-10-14T09:39:07.322") + check("yyyy-MM-dd'T'HH:mm:ss.SSSX", + "2019-10-14T09:39:07.322Z", "2019-10-14T09:39:07.322Z") + check("yyyy-MM-dd'T'HH:mm:ss.SSSSSSX", + "2019-10-14T09:39:07.123456Z", "2019-10-14T09:39:07.123456Z") + check("yyyy-MM-dd'T'HH:mm:ss.SSSSSSX", + "2019-10-14T09:39:07.000010Z", "2019-10-14T09:39:07.00001Z") + check("yyyy-MM-dd'T'HH:mm:ss.S", + "2019-10-14T09:39:07.1", "2019-10-14T09:39:07.1") + check("yyyy-MM-dd'T'HH:mm:ss.SS", + "2019-10-14T09:39:07.10", "2019-10-14T09:39:07.1") + + try { + new TimestampParser( + FastDateFormat.getInstance("yyyy/MM/dd HH_mm_ss.SSSSSS", timeZone, Locale.US)) + .parse("2019/11/14 20#25#30.123456") + fail("Expected to throw an exception for the invalid input") + } catch { + case e: IllegalArgumentException => + assert(e.getMessage.contains("is an invalid timestamp")) + } + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index 69bd11f0ae3b..e847e408c7f1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -29,6 +29,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.spark.sql.catalyst.util.{BadRecordException, DateTimeUtils} +import org.apache.spark.sql.catalyst.util.DateTimeUtils.TimestampParser import org.apache.spark.sql.execution.datasources.FailureSafeParser import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -77,6 +78,8 @@ class UnivocityParser( private val row = new GenericInternalRow(requiredSchema.length) + @transient private lazy val timestampParser = new TimestampParser(options.timestampFormat) + // Retrieve the raw record string. private def getCurrentInput: UTF8String = { UTF8String.fromString(tokenizer.getContext.currentParsedContent().stripLineEnd) @@ -156,10 +159,7 @@ class UnivocityParser( case _: TimestampType => (d: String) => nullSafeDatum(d, name, nullable, options) { datum => - // This one will lose microseconds parts. - // See https://issues.apache.org/jira/browse/SPARK-10681. - Try(options.timestampFormat.parse(datum).getTime * 1000L) - .getOrElse { + Try(timestampParser.parse(datum)).getOrElse { // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards // compatibility. DateTimeUtils.stringToTime(datum).getTime * 1000L diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index 53ae1e0249e6..35087ce5ec5c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -518,4 +518,14 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { jsonDF.select(to_json(from_json($"a", schema))), Seq(Row(json))) } + + test("from_json - timestamp in micros") { + val df = Seq("""{"time": "1970-01-01T00:00:00.123456"}""").toDS() + val schema = new StructType().add("time", TimestampType) + val options = Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss.SSSSSS") + + checkAnswer( + df.select(from_json($"value", schema, options)), + Row(Row(java.sql.Timestamp.valueOf("1970-01-01 00:00:00.123456")))) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index d714cb2433ad..95c9dc5b7467 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1875,4 +1875,16 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te } } } + + test("parse timestamp in microsecond precision") { + withTempPath { path => + val t = "2019-11-14 20:35:30.123456" + Seq(t).toDF("t").write.text(path.getAbsolutePath) + val readback = spark.read + .schema("t timestamp") + .option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS") + .csv(path.getAbsolutePath) + checkAnswer(readback, Row(Timestamp.valueOf(t))) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala index 458edb253fb3..96011cf1ee42 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParserSuite.scala @@ -26,9 +26,9 @@ import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String class UnivocityParserSuite extends SparkFunSuite { - private val parser = new UnivocityParser( - StructType(Seq.empty), - new CSVOptions(Map.empty[String, String], false, "GMT")) + private def getParser(options: CSVOptions) = { + new UnivocityParser(StructType(Seq.empty), options) + } private def assertNull(v: Any) = assert(v == null) @@ -40,8 +40,10 @@ class UnivocityParserSuite extends SparkFunSuite { stringValues.zip(decimalValues).foreach { case (strVal, decimalVal) => val decimalValue = new BigDecimal(decimalVal.toString) val options = new CSVOptions(Map.empty[String, String], false, "GMT") - assert(parser.makeConverter("_1", decimalType, options = options).apply(strVal) === - Decimal(decimalValue, decimalType.precision, decimalType.scale)) + assert( + getParser(options) + .makeConverter("_1", decimalType, options = options) + .apply(strVal) === Decimal(decimalValue, decimalType.precision, decimalType.scale)) } } @@ -53,13 +55,14 @@ class UnivocityParserSuite extends SparkFunSuite { types.foreach { t => // Tests that a custom nullValue. val nullValueOptions = new CSVOptions(Map("nullValue" -> "-"), false, "GMT") - val converter = - parser.makeConverter("_1", t, nullable = true, options = nullValueOptions) + val converter = getParser(nullValueOptions) + .makeConverter("_1", t, nullable = true, options = nullValueOptions) assertNull(converter.apply("-")) assertNull(converter.apply(null)) // Tests that the default nullValue is empty string. val options = new CSVOptions(Map.empty[String, String], false, "GMT") + val parser = getParser(options) assertNull(parser.makeConverter("_1", t, nullable = true, options = options).apply("")) } @@ -67,8 +70,8 @@ class UnivocityParserSuite extends SparkFunSuite { types.foreach { t => // Casts a null to not nullable field should throw an exception. val options = new CSVOptions(Map("nullValue" -> "-"), false, "GMT") - val converter = - parser.makeConverter("_1", t, nullable = false, options = options) + val converter = getParser(options) + .makeConverter("_1", t, nullable = false, options = options) var message = intercept[RuntimeException] { converter.apply("-") }.getMessage @@ -83,22 +86,25 @@ class UnivocityParserSuite extends SparkFunSuite { // null. Seq(true, false).foreach { b => val options = new CSVOptions(Map("nullValue" -> "null"), false, "GMT") - val converter = - parser.makeConverter("_1", StringType, nullable = b, options = options) + val converter = getParser(options) + .makeConverter("_1", StringType, nullable = b, options = options) assert(converter.apply("") == UTF8String.fromString("")) } } test("Throws exception for empty string with non null type") { - val options = new CSVOptions(Map.empty[String, String], false, "GMT") + val options = new CSVOptions(Map.empty[String, String], false, "GMT") val exception = intercept[RuntimeException]{ - parser.makeConverter("_1", IntegerType, nullable = false, options = options).apply("") + getParser(options) + .makeConverter("_1", IntegerType, nullable = false, options = options) + .apply("") } assert(exception.getMessage.contains("null value found but field _1 is not nullable.")) } test("Types are cast correctly") { val options = new CSVOptions(Map.empty[String, String], false, "GMT") + val parser = getParser(options) assert(parser.makeConverter("_1", ByteType, options = options).apply("10") == 10) assert(parser.makeConverter("_1", ShortType, options = options).apply("10") == 10) assert(parser.makeConverter("_1", IntegerType, options = options).apply("10") == 10) @@ -111,17 +117,17 @@ class UnivocityParserSuite extends SparkFunSuite { new CSVOptions(Map("timestampFormat" -> "dd/MM/yyyy hh:mm"), false, "GMT") val customTimestamp = "31/01/2015 00:00" val expectedTime = timestampsOptions.timestampFormat.parse(customTimestamp).getTime - val castedTimestamp = - parser.makeConverter("_1", TimestampType, nullable = true, options = timestampsOptions) - .apply(customTimestamp) + val castedTimestamp = getParser(timestampsOptions) + .makeConverter("_1", TimestampType, nullable = true, options = timestampsOptions) + .apply(customTimestamp) assert(castedTimestamp == expectedTime * 1000L) val customDate = "31/01/2015" val dateOptions = new CSVOptions(Map("dateFormat" -> "dd/MM/yyyy"), false, "GMT") val expectedDate = dateOptions.dateFormat.parse(customDate).getTime - val castedDate = - parser.makeConverter("_1", DateType, nullable = true, options = dateOptions) - .apply(customTimestamp) + val castedDate = getParser(dateOptions) + .makeConverter("_1", DateType, nullable = true, options = dateOptions) + .apply(customTimestamp) assert(castedDate == DateTimeUtils.millisToDays(expectedDate)) val timestamp = "2015-01-01 00:00:00" @@ -138,7 +144,7 @@ class UnivocityParserSuite extends SparkFunSuite { types.foreach { dt => input.foreach { v => val message = intercept[NumberFormatException] { - parser.makeConverter("_1", dt, options = options).apply(v) + getParser(options).makeConverter("_1", dt, options = options).apply(v) }.getMessage assert(message.contains(v)) } @@ -147,7 +153,7 @@ class UnivocityParserSuite extends SparkFunSuite { test("Float NaN values are parsed correctly") { val options = new CSVOptions(Map("nanValue" -> "nn"), false, "GMT") - val floatVal: Float = parser.makeConverter( + val floatVal: Float = getParser(options).makeConverter( "_1", FloatType, nullable = true, options = options ).apply("nn").asInstanceOf[Float] @@ -158,7 +164,7 @@ class UnivocityParserSuite extends SparkFunSuite { test("Double NaN values are parsed correctly") { val options = new CSVOptions(Map("nanValue" -> "-"), false, "GMT") - val doubleVal: Double = parser.makeConverter( + val doubleVal: Double = getParser(options).makeConverter( "_1", DoubleType, nullable = true, options = options ).apply("-").asInstanceOf[Double] @@ -167,14 +173,14 @@ class UnivocityParserSuite extends SparkFunSuite { test("Float infinite values can be parsed") { val negativeInfOptions = new CSVOptions(Map("negativeInf" -> "max"), false, "GMT") - val floatVal1 = parser.makeConverter( + val floatVal1 = getParser(negativeInfOptions).makeConverter( "_1", FloatType, nullable = true, options = negativeInfOptions ).apply("max").asInstanceOf[Float] assert(floatVal1 == Float.NegativeInfinity) val positiveInfOptions = new CSVOptions(Map("positiveInf" -> "max"), false, "GMT") - val floatVal2 = parser.makeConverter( + val floatVal2 = getParser(positiveInfOptions).makeConverter( "_1", FloatType, nullable = true, options = positiveInfOptions ).apply("max").asInstanceOf[Float] @@ -183,18 +189,17 @@ class UnivocityParserSuite extends SparkFunSuite { test("Double infinite values can be parsed") { val negativeInfOptions = new CSVOptions(Map("negativeInf" -> "max"), false, "GMT") - val doubleVal1 = parser.makeConverter( + val doubleVal1 = getParser(negativeInfOptions).makeConverter( "_1", DoubleType, nullable = true, options = negativeInfOptions ).apply("max").asInstanceOf[Double] assert(doubleVal1 == Double.NegativeInfinity) val positiveInfOptions = new CSVOptions(Map("positiveInf" -> "max"), false, "GMT") - val doubleVal2 = parser.makeConverter( + val doubleVal2 = getParser(positiveInfOptions).makeConverter( "_1", DoubleType, nullable = true, options = positiveInfOptions ).apply("max").asInstanceOf[Double] assert(doubleVal2 == Double.PositiveInfinity) } - }