diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala index 9b86d865622d..a379f8642b00 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala @@ -24,6 +24,7 @@ import com.fasterxml.jackson.core._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.SpecializedGetters import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData} +import org.apache.spark.sql.catalyst.util.DateTimeUtils.TimestampParser import org.apache.spark.sql.types._ /** @@ -74,6 +75,8 @@ private[sql] class JacksonGenerator( private val lineSeparator: String = options.lineSeparatorInWrite + @transient private lazy val timestampParser = new TimestampParser(options.timestampFormat) + private def makeWriter(dataType: DataType): ValueWriter = dataType match { case NullType => (row: SpecializedGetters, ordinal: Int) => @@ -113,8 +116,7 @@ private[sql] class JacksonGenerator( case TimestampType => (row: SpecializedGetters, ordinal: Int) => - val timestampString = - options.timestampFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal))) + val timestampString = timestampParser.format(row.getLong(ordinal)) gen.writeString(timestampString) case DateType => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index d2bb5954914e..f6993ae23644 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -1173,12 +1173,16 @@ object DateTimeUtils { * protected `fields` immediately after parsing. We cannot use * the `get()` method because it performs normalization of the fraction * part. Accordingly, the `MILLISECOND` field doesn't contain original value. + * + * Also this class allows to set raw value to the `MILLISECOND` field + * directly before formatting. */ - private class MicrosCalendar(tz: TimeZone) extends GregorianCalendar(tz, Locale.US) { + private class MicrosCalendar(tz: TimeZone, digitsInFraction: Int) + extends GregorianCalendar(tz, Locale.US) { // Converts parsed `MILLISECOND` field to seconds fraction in microsecond precision. // For example if the fraction pattern is `SSSS` then `digitsInFraction` = 4, and // if the `MILLISECOND` field was parsed to `1234`. - def getMicros(digitsInFraction: Int): SQLTimestamp = { + def getMicros(): SQLTimestamp = { // Append 6 zeros to the field: 1234 -> 1234000000 val d = fields(Calendar.MILLISECOND) * MICROS_PER_SECOND // Take the first 6 digits from `d`: 1234000000 -> 123400 @@ -1186,24 +1190,39 @@ object DateTimeUtils { // So, the result is `(1234 * 1000000) / (10 ^ digitsInFraction) d / Decimal.POW_10(digitsInFraction) } + + // Converts the seconds fraction in microsecond precision to a value + // that can be correctly formatted according to the specified fraction pattern. + // The method performs operations opposite to `getMicros()`. + def setMicros(micros: Long): Unit = { + val d = micros * Decimal.POW_10(digitsInFraction) + fields(Calendar.MILLISECOND) = (d / MICROS_PER_SECOND).toInt + } } /** * An instance of the class is aimed to re-use many times. It contains helper objects - * `cal` and `digitsInFraction` that are reused between `parse()` invokes. + * `cal` which is reused between `parse()` and `format` invokes. */ - class TimestampParser(format: FastDateFormat) { - private val digitsInFraction = format.getPattern.count(_ == 'S') - private val cal = new MicrosCalendar(format.getTimeZone) + class TimestampParser(fastDateFormat: FastDateFormat) { + private val cal = new MicrosCalendar( + fastDateFormat.getTimeZone, + fastDateFormat.getPattern.count(_ == 'S')) def parse(s: String): SQLTimestamp = { cal.clear() // Clear the calendar because it can be re-used many times - if (!format.parse(s, new ParsePosition(0), cal)) { + if (!fastDateFormat.parse(s, new ParsePosition(0), cal)) { throw new IllegalArgumentException(s"'$s' is an invalid timestamp") } - val micros = cal.getMicros(digitsInFraction) + val micros = cal.getMicros() cal.set(Calendar.MILLISECOND, 0) cal.getTimeInMillis * MICROS_PER_MILLIS + micros } + + def format(timestamp: SQLTimestamp): String = { + cal.setTimeInMillis(Math.floorDiv(timestamp, MICROS_PER_SECOND) * MILLIS_PER_SECOND) + cal.setMicros(Math.floorMod(timestamp, MICROS_PER_SECOND)) + fastDateFormat.format(cal) + } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala index ced003c6ef2e..7eb3d2b3a885 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala @@ -731,4 +731,44 @@ class DateTimeUtilsSuite extends SparkFunSuite { } } } + + test("formatting timestamp strings up to microsecond precision") { + DateTimeTestUtils.outstandingTimezones.foreach { timeZone => + def check(pattern: String, input: String, expected: String): Unit = { + val parser = new TimestampParser(FastDateFormat.getInstance(pattern, timeZone, Locale.US)) + val timestamp = DateTimeUtils.stringToTimestamp( + UTF8String.fromString(input), timeZone).get + val actual = parser.format(timestamp) + assert(actual === expected) + } + + check( + "yyyy-MM-dd HH:mm:ss.SSSSSSS", "2019-10-14T09:39:07.123456", + "2019-10-14 09:39:07.1234560") + check( + "yyyy-MM-dd HH:mm:ss.SSSSSS", "1960-01-01T09:39:07.123456", + "1960-01-01 09:39:07.123456") + check( + "yyyy-MM-dd HH:mm:ss.SSSSS", "0001-10-14T09:39:07.1", + "0001-10-14 09:39:07.10000") + check( + "yyyy-MM-dd HH:mm:ss.SSSS", "9999-12-31T23:59:59.999", + "9999-12-31 23:59:59.9990") + check( + "yyyy-MM-dd HH:mm:ss.SSS", "1970-01-01T00:00:00.0101", + "1970-01-01 00:00:00.010") + check( + "yyyy-MM-dd HH:mm:ss.SS", "2019-10-14T09:39:07.09", + "2019-10-14 09:39:07.09") + check( + "yyyy-MM-dd HH:mm:ss.S", "2019-10-14T09:39:07.2", + "2019-10-14 09:39:07.2") + check( + "yyyy-MM-dd HH:mm:ss.S", "2019-10-14T09:39:07", + "2019-10-14 09:39:07.0") + check( + "yyyy-MM-dd HH:mm:ss", "2019-10-14T09:39:07.123456", + "2019-10-14 09:39:07") + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala index 4082a0df8ba7..3118091111a7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala @@ -23,6 +23,7 @@ import com.univocity.parsers.csv.CsvWriter import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.catalyst.util.DateTimeUtils.TimestampParser import org.apache.spark.sql.types._ private[csv] class UnivocityGenerator( @@ -42,14 +43,15 @@ private[csv] class UnivocityGenerator( private val valueConverters: Array[ValueConverter] = schema.map(_.dataType).map(makeConverter).toArray + @transient private lazy val timestampParser = new TimestampParser(options.timestampFormat) + private def makeConverter(dataType: DataType): ValueConverter = dataType match { case DateType => (row: InternalRow, ordinal: Int) => options.dateFormat.format(DateTimeUtils.toJavaDate(row.getInt(ordinal))) case TimestampType => - (row: InternalRow, ordinal: Int) => - options.timestampFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal))) + (row: InternalRow, ordinal: Int) => timestampParser.format(row.getLong(ordinal)) case udt: UserDefinedType[_] => makeConverter(udt.sqlType) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index 35087ce5ec5c..b1f74464c4db 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -528,4 +528,11 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { df.select(from_json($"value", schema, options)), Row(Row(java.sql.Timestamp.valueOf("1970-01-01 00:00:00.123456")))) } + + test("to_json - timestamp in micros") { + val s = "2019-11-18 11:56:00.123456" + val df = Seq(java.sql.Timestamp.valueOf(s)).toDF("t").select( + to_json(struct($"t"), Map("timestampFormat" -> "yyyy-MM-dd HH:mm:ss.SSSSSS"))) + checkAnswer(df, Row(s"""{"t":"$s"}""")) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 95c9dc5b7467..2ea8f4fa414a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1887,4 +1887,19 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te checkAnswer(readback, Row(Timestamp.valueOf(t))) } } + + test("Roundtrip in reading and writing timestamps in microsecond precision") { + withTempPath { path => + val timestamp = Timestamp.valueOf("2019-11-18 11:56:00.123456") + Seq(timestamp).toDF("t") + .write + .option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS") + .csv(path.getAbsolutePath) + val readback = spark.read + .schema("t timestamp") + .option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS") + .csv(path.getAbsolutePath) + checkAnswer(readback, Row(timestamp)) + } + } }