Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.fasterxml.jackson.core._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.TimestampParser
import org.apache.spark.sql.types._

/**
Expand Down Expand Up @@ -74,6 +75,8 @@ private[sql] class JacksonGenerator(

private val lineSeparator: String = options.lineSeparatorInWrite

@transient private lazy val timestampParser = new TimestampParser(options.timestampFormat)

private def makeWriter(dataType: DataType): ValueWriter = dataType match {
case NullType =>
(row: SpecializedGetters, ordinal: Int) =>
Expand Down Expand Up @@ -113,8 +116,7 @@ private[sql] class JacksonGenerator(

case TimestampType =>
(row: SpecializedGetters, ordinal: Int) =>
val timestampString =
options.timestampFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)))
val timestampString = timestampParser.format(row.getLong(ordinal))
gen.writeString(timestampString)

case DateType =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1173,37 +1173,56 @@ object DateTimeUtils {
* protected `fields` immediately after parsing. We cannot use
* the `get()` method because it performs normalization of the fraction
* part. Accordingly, the `MILLISECOND` field doesn't contain original value.
*
* Also this class allows to set raw value to the `MILLISECOND` field
* directly before formatting.
*/
private class MicrosCalendar(tz: TimeZone) extends GregorianCalendar(tz, Locale.US) {
private class MicrosCalendar(tz: TimeZone, digitsInFraction: Int)
extends GregorianCalendar(tz, Locale.US) {
// Converts parsed `MILLISECOND` field to seconds fraction in microsecond precision.
// For example if the fraction pattern is `SSSS` then `digitsInFraction` = 4, and
// if the `MILLISECOND` field was parsed to `1234`.
def getMicros(digitsInFraction: Int): SQLTimestamp = {
def getMicros(): SQLTimestamp = {
// Append 6 zeros to the field: 1234 -> 1234000000
val d = fields(Calendar.MILLISECOND) * MICROS_PER_SECOND
// Take the first 6 digits from `d`: 1234000000 -> 123400
// The rest contains exactly `digitsInFraction`: `0000` = 10 ^ digitsInFraction
// So, the result is `(1234 * 1000000) / (10 ^ digitsInFraction)
d / Decimal.POW_10(digitsInFraction)
}

// Converts the seconds fraction in microsecond precision to a value
// that can be correctly formatted according to the specified fraction pattern.
// The method performs operations opposite to `getMicros()`.
def setMicros(micros: Long): Unit = {
val d = micros * Decimal.POW_10(digitsInFraction)
fields(Calendar.MILLISECOND) = (d / MICROS_PER_SECOND).toInt
}
}

/**
* An instance of the class is aimed to re-use many times. It contains helper objects
* `cal` and `digitsInFraction` that are reused between `parse()` invokes.
* `cal` which is reused between `parse()` and `format` invokes.
*/
class TimestampParser(format: FastDateFormat) {
private val digitsInFraction = format.getPattern.count(_ == 'S')
private val cal = new MicrosCalendar(format.getTimeZone)
class TimestampParser(fastDateFormat: FastDateFormat) {
private val cal = new MicrosCalendar(
fastDateFormat.getTimeZone,
fastDateFormat.getPattern.count(_ == 'S'))

def parse(s: String): SQLTimestamp = {
cal.clear() // Clear the calendar because it can be re-used many times
if (!format.parse(s, new ParsePosition(0), cal)) {
if (!fastDateFormat.parse(s, new ParsePosition(0), cal)) {
throw new IllegalArgumentException(s"'$s' is an invalid timestamp")
}
val micros = cal.getMicros(digitsInFraction)
val micros = cal.getMicros()
cal.set(Calendar.MILLISECOND, 0)
cal.getTimeInMillis * MICROS_PER_MILLIS + micros
}

def format(timestamp: SQLTimestamp): String = {
cal.setTimeInMillis(Math.floorDiv(timestamp, MICROS_PER_SECOND) * MILLIS_PER_SECOND)
cal.setMicros(Math.floorMod(timestamp, MICROS_PER_SECOND))
fastDateFormat.format(cal)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -731,4 +731,44 @@ class DateTimeUtilsSuite extends SparkFunSuite {
}
}
}

test("formatting timestamp strings up to microsecond precision") {
DateTimeTestUtils.outstandingTimezones.foreach { timeZone =>
def check(pattern: String, input: String, expected: String): Unit = {
val parser = new TimestampParser(FastDateFormat.getInstance(pattern, timeZone, Locale.US))
val timestamp = DateTimeUtils.stringToTimestamp(
UTF8String.fromString(input), timeZone).get
val actual = parser.format(timestamp)
assert(actual === expected)
}

check(
"yyyy-MM-dd HH:mm:ss.SSSSSSS", "2019-10-14T09:39:07.123456",
"2019-10-14 09:39:07.1234560")
check(
"yyyy-MM-dd HH:mm:ss.SSSSSS", "1960-01-01T09:39:07.123456",
"1960-01-01 09:39:07.123456")
check(
"yyyy-MM-dd HH:mm:ss.SSSSS", "0001-10-14T09:39:07.1",
"0001-10-14 09:39:07.10000")
check(
"yyyy-MM-dd HH:mm:ss.SSSS", "9999-12-31T23:59:59.999",
"9999-12-31 23:59:59.9990")
check(
"yyyy-MM-dd HH:mm:ss.SSS", "1970-01-01T00:00:00.0101",
"1970-01-01 00:00:00.010")
check(
"yyyy-MM-dd HH:mm:ss.SS", "2019-10-14T09:39:07.09",
"2019-10-14 09:39:07.09")
check(
"yyyy-MM-dd HH:mm:ss.S", "2019-10-14T09:39:07.2",
"2019-10-14 09:39:07.2")
check(
"yyyy-MM-dd HH:mm:ss.S", "2019-10-14T09:39:07",
"2019-10-14 09:39:07.0")
check(
"yyyy-MM-dd HH:mm:ss", "2019-10-14T09:39:07.123456",
"2019-10-14 09:39:07")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.univocity.parsers.csv.CsvWriter

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils.TimestampParser
import org.apache.spark.sql.types._

private[csv] class UnivocityGenerator(
Expand All @@ -42,14 +43,15 @@ private[csv] class UnivocityGenerator(
private val valueConverters: Array[ValueConverter] =
schema.map(_.dataType).map(makeConverter).toArray

@transient private lazy val timestampParser = new TimestampParser(options.timestampFormat)

private def makeConverter(dataType: DataType): ValueConverter = dataType match {
case DateType =>
(row: InternalRow, ordinal: Int) =>
options.dateFormat.format(DateTimeUtils.toJavaDate(row.getInt(ordinal)))

case TimestampType =>
(row: InternalRow, ordinal: Int) =>
options.timestampFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)))
(row: InternalRow, ordinal: Int) => timestampParser.format(row.getLong(ordinal))

case udt: UserDefinedType[_] => makeConverter(udt.sqlType)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,4 +528,11 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
df.select(from_json($"value", schema, options)),
Row(Row(java.sql.Timestamp.valueOf("1970-01-01 00:00:00.123456"))))
}

test("to_json - timestamp in micros") {
val s = "2019-11-18 11:56:00.123456"
val df = Seq(java.sql.Timestamp.valueOf(s)).toDF("t").select(
to_json(struct($"t"), Map("timestampFormat" -> "yyyy-MM-dd HH:mm:ss.SSSSSS")))
checkAnswer(df, Row(s"""{"t":"$s"}"""))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1887,4 +1887,19 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
checkAnswer(readback, Row(Timestamp.valueOf(t)))
}
}

test("Roundtrip in reading and writing timestamps in microsecond precision") {
withTempPath { path =>
val timestamp = Timestamp.valueOf("2019-11-18 11:56:00.123456")
Seq(timestamp).toDF("t")
.write
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS")
.csv(path.getAbsolutePath)
val readback = spark.read
.schema("t timestamp")
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS")
.csv(path.getAbsolutePath)
checkAnswer(readback, Row(timestamp))
}
}
}