From 8c03d5180c60281ab77ee4eaa955bd5a15c3e78f Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 18 Nov 2019 20:11:10 +0300 Subject: [PATCH 1/6] Format timestamps up to micros precision --- .../sql/catalyst/json/JacksonGenerator.scala | 7 ++-- .../sql/catalyst/util/DateTimeUtils.scala | 19 +++++++-- .../catalyst/util/DateTimeUtilsSuite.scala | 40 +++++++++++++++++++ 3 files changed, 59 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala index 9b86d865622d..84bc72276f73 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala @@ -20,9 +20,9 @@ package org.apache.spark.sql.catalyst.json import java.io.Writer import com.fasterxml.jackson.core._ - import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.SpecializedGetters +import org.apache.spark.sql.catalyst.util.DateTimeUtils.TimestampParser import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData} import org.apache.spark.sql.types._ @@ -74,6 +74,8 @@ private[sql] class JacksonGenerator( private val lineSeparator: String = options.lineSeparatorInWrite + @transient private lazy val timestampParser = new TimestampParser(options.timestampFormat) + private def makeWriter(dataType: DataType): ValueWriter = dataType match { case NullType => (row: SpecializedGetters, ordinal: Int) => @@ -113,8 +115,7 @@ private[sql] class JacksonGenerator( case TimestampType => (row: SpecializedGetters, ordinal: Int) => - val timestampString = - options.timestampFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal))) + val timestampString = timestampParser.format(row.getLong(ordinal)) gen.writeString(timestampString) case DateType => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index d2bb5954914e..357ab55f1be6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -1186,24 +1186,35 @@ object DateTimeUtils { // So, the result is `(1234 * 1000000) / (10 ^ digitsInFraction) d / Decimal.POW_10(digitsInFraction) } + + def setMicros(digitsInFraction: Int, micros: Long): Unit = { + val d = micros * Decimal.POW_10(digitsInFraction) + fields(Calendar.MILLISECOND) = (d / MICROS_PER_SECOND).toInt + } } /** * An instance of the class is aimed to re-use many times. It contains helper objects * `cal` and `digitsInFraction` that are reused between `parse()` invokes. */ - class TimestampParser(format: FastDateFormat) { - private val digitsInFraction = format.getPattern.count(_ == 'S') - private val cal = new MicrosCalendar(format.getTimeZone) + class TimestampParser(fastDateFormat: FastDateFormat) { + private val digitsInFraction = fastDateFormat.getPattern.count(_ == 'S') + private val cal = new MicrosCalendar(fastDateFormat.getTimeZone) def parse(s: String): SQLTimestamp = { cal.clear() // Clear the calendar because it can be re-used many times - if (!format.parse(s, new ParsePosition(0), cal)) { + if (!fastDateFormat.parse(s, new ParsePosition(0), cal)) { throw new IllegalArgumentException(s"'$s' is an invalid timestamp") } val micros = cal.getMicros(digitsInFraction) cal.set(Calendar.MILLISECOND, 0) cal.getTimeInMillis * MICROS_PER_MILLIS + micros } + + def format(timestamp: SQLTimestamp): String = { + cal.setTimeInMillis(Math.floorDiv(timestamp, MICROS_PER_SECOND) * MILLIS_PER_SECOND) + cal.setMicros(digitsInFraction, Math.floorMod(timestamp, MICROS_PER_SECOND)) + fastDateFormat.format(cal) + } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala index ced003c6ef2e..7eb3d2b3a885 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala @@ -731,4 +731,44 @@ class DateTimeUtilsSuite extends SparkFunSuite { } } } + + test("formatting timestamp strings up to microsecond precision") { + DateTimeTestUtils.outstandingTimezones.foreach { timeZone => + def check(pattern: String, input: String, expected: String): Unit = { + val parser = new TimestampParser(FastDateFormat.getInstance(pattern, timeZone, Locale.US)) + val timestamp = DateTimeUtils.stringToTimestamp( + UTF8String.fromString(input), timeZone).get + val actual = parser.format(timestamp) + assert(actual === expected) + } + + check( + "yyyy-MM-dd HH:mm:ss.SSSSSSS", "2019-10-14T09:39:07.123456", + "2019-10-14 09:39:07.1234560") + check( + "yyyy-MM-dd HH:mm:ss.SSSSSS", "1960-01-01T09:39:07.123456", + "1960-01-01 09:39:07.123456") + check( + "yyyy-MM-dd HH:mm:ss.SSSSS", "0001-10-14T09:39:07.1", + "0001-10-14 09:39:07.10000") + check( + "yyyy-MM-dd HH:mm:ss.SSSS", "9999-12-31T23:59:59.999", + "9999-12-31 23:59:59.9990") + check( + "yyyy-MM-dd HH:mm:ss.SSS", "1970-01-01T00:00:00.0101", + "1970-01-01 00:00:00.010") + check( + "yyyy-MM-dd HH:mm:ss.SS", "2019-10-14T09:39:07.09", + "2019-10-14 09:39:07.09") + check( + "yyyy-MM-dd HH:mm:ss.S", "2019-10-14T09:39:07.2", + "2019-10-14 09:39:07.2") + check( + "yyyy-MM-dd HH:mm:ss.S", "2019-10-14T09:39:07", + "2019-10-14 09:39:07.0") + check( + "yyyy-MM-dd HH:mm:ss", "2019-10-14T09:39:07.123456", + "2019-10-14 09:39:07") + } + } } From 374748fb76eedea385604e266b24aceec673f630 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 18 Nov 2019 20:11:28 +0300 Subject: [PATCH 2/6] A test for to_json --- .../scala/org/apache/spark/sql/JsonFunctionsSuite.scala | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index 35087ce5ec5c..b1f74464c4db 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -528,4 +528,11 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { df.select(from_json($"value", schema, options)), Row(Row(java.sql.Timestamp.valueOf("1970-01-01 00:00:00.123456")))) } + + test("to_json - timestamp in micros") { + val s = "2019-11-18 11:56:00.123456" + val df = Seq(java.sql.Timestamp.valueOf(s)).toDF("t").select( + to_json(struct($"t"), Map("timestampFormat" -> "yyyy-MM-dd HH:mm:ss.SSSSSS"))) + checkAnswer(df, Row(s"""{"t":"$s"}""")) + } } From e6ec78bee96b929dcf22fe73bc93b4670cd5462c Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 18 Nov 2019 20:28:40 +0300 Subject: [PATCH 3/6] Pass digitsInFraction to the constructor of MicrosCalendar --- .../sql/catalyst/util/DateTimeUtils.scala | 24 ++++++++++++------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 357ab55f1be6..f6993ae23644 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -1173,12 +1173,16 @@ object DateTimeUtils { * protected `fields` immediately after parsing. We cannot use * the `get()` method because it performs normalization of the fraction * part. Accordingly, the `MILLISECOND` field doesn't contain original value. + * + * Also this class allows to set raw value to the `MILLISECOND` field + * directly before formatting. */ - private class MicrosCalendar(tz: TimeZone) extends GregorianCalendar(tz, Locale.US) { + private class MicrosCalendar(tz: TimeZone, digitsInFraction: Int) + extends GregorianCalendar(tz, Locale.US) { // Converts parsed `MILLISECOND` field to seconds fraction in microsecond precision. // For example if the fraction pattern is `SSSS` then `digitsInFraction` = 4, and // if the `MILLISECOND` field was parsed to `1234`. - def getMicros(digitsInFraction: Int): SQLTimestamp = { + def getMicros(): SQLTimestamp = { // Append 6 zeros to the field: 1234 -> 1234000000 val d = fields(Calendar.MILLISECOND) * MICROS_PER_SECOND // Take the first 6 digits from `d`: 1234000000 -> 123400 @@ -1187,7 +1191,10 @@ object DateTimeUtils { d / Decimal.POW_10(digitsInFraction) } - def setMicros(digitsInFraction: Int, micros: Long): Unit = { + // Converts the seconds fraction in microsecond precision to a value + // that can be correctly formatted according to the specified fraction pattern. + // The method performs operations opposite to `getMicros()`. + def setMicros(micros: Long): Unit = { val d = micros * Decimal.POW_10(digitsInFraction) fields(Calendar.MILLISECOND) = (d / MICROS_PER_SECOND).toInt } @@ -1195,25 +1202,26 @@ object DateTimeUtils { /** * An instance of the class is aimed to re-use many times. It contains helper objects - * `cal` and `digitsInFraction` that are reused between `parse()` invokes. + * `cal` which is reused between `parse()` and `format` invokes. */ class TimestampParser(fastDateFormat: FastDateFormat) { - private val digitsInFraction = fastDateFormat.getPattern.count(_ == 'S') - private val cal = new MicrosCalendar(fastDateFormat.getTimeZone) + private val cal = new MicrosCalendar( + fastDateFormat.getTimeZone, + fastDateFormat.getPattern.count(_ == 'S')) def parse(s: String): SQLTimestamp = { cal.clear() // Clear the calendar because it can be re-used many times if (!fastDateFormat.parse(s, new ParsePosition(0), cal)) { throw new IllegalArgumentException(s"'$s' is an invalid timestamp") } - val micros = cal.getMicros(digitsInFraction) + val micros = cal.getMicros() cal.set(Calendar.MILLISECOND, 0) cal.getTimeInMillis * MICROS_PER_MILLIS + micros } def format(timestamp: SQLTimestamp): String = { cal.setTimeInMillis(Math.floorDiv(timestamp, MICROS_PER_SECOND) * MILLIS_PER_SECOND) - cal.setMicros(digitsInFraction, Math.floorMod(timestamp, MICROS_PER_SECOND)) + cal.setMicros(Math.floorMod(timestamp, MICROS_PER_SECOND)) fastDateFormat.format(cal) } } From 84902b89edf85b5e5cb8511e02ce0203b082e4c3 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 18 Nov 2019 20:45:39 +0300 Subject: [PATCH 4/6] Fix CSV --- .../datasources/csv/UnivocityGenerator.scala | 6 +- .../execution/datasources/csv/CSVSuite.scala | 100 +++++++++++------- 2 files changed, 64 insertions(+), 42 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala index 4082a0df8ba7..3118091111a7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala @@ -23,6 +23,7 @@ import com.univocity.parsers.csv.CsvWriter import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.catalyst.util.DateTimeUtils.TimestampParser import org.apache.spark.sql.types._ private[csv] class UnivocityGenerator( @@ -42,14 +43,15 @@ private[csv] class UnivocityGenerator( private val valueConverters: Array[ValueConverter] = schema.map(_.dataType).map(makeConverter).toArray + @transient private lazy val timestampParser = new TimestampParser(options.timestampFormat) + private def makeConverter(dataType: DataType): ValueConverter = dataType match { case DateType => (row: InternalRow, ordinal: Int) => options.dateFormat.format(DateTimeUtils.toJavaDate(row.getInt(ordinal))) case TimestampType => - (row: InternalRow, ordinal: Int) => - options.timestampFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal))) + (row: InternalRow, ordinal: Int) => timestampParser.format(row.getLong(ordinal)) case udt: UserDefinedType[_] => makeConverter(udt.sqlType) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 95c9dc5b7467..1f3522e2f973 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -41,6 +41,7 @@ import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} import org.apache.spark.sql.types._ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with TestCsvData { + import testImplicits._ private val carsFile = "test-data/cars.csv" @@ -66,13 +67,13 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te /** Verifies data and schema. */ private def verifyCars( - df: DataFrame, - withHeader: Boolean, - numCars: Int = 3, - numFields: Int = 5, - checkHeader: Boolean = true, - checkValues: Boolean = true, - checkTypes: Boolean = false): Unit = { + df: DataFrame, + withHeader: Boolean, + numCars: Int = 3, + numFields: Int = 5, + checkHeader: Boolean = true, + checkValues: Boolean = true, + checkTypes: Boolean = false): Unit = { val numColumns = numFields val numRows = if (withHeader) numCars else numCars + 1 @@ -212,9 +213,9 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te // scalastyle:off spark.sql( s""" - |CREATE TEMPORARY VIEW carsTable USING csv - |OPTIONS (path "${testFile(carsFile8859)}", header "true", - |charset "iso-8859-1", delimiter "þ") + |CREATE TEMPORARY VIEW carsTable USING csv + |OPTIONS (path "${testFile(carsFile8859)}", header "true", + |charset "iso-8859-1", delimiter "þ") """.stripMargin.replaceAll("\n", " ")) // scalastyle:on verifyCars(spark.table("carsTable"), withHeader = true) @@ -239,8 +240,8 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te withView("carsTable") { spark.sql( s""" - |CREATE TEMPORARY VIEW carsTable USING csv - |OPTIONS (path "${testFile(carsTsvFile)}", header "true", delimiter "\t") + |CREATE TEMPORARY VIEW carsTable USING csv + |OPTIONS (path "${testFile(carsTsvFile)}", header "true", delimiter "\t") """.stripMargin.replaceAll("\n", " ")) verifyCars(spark.table("carsTable"), numFields = 6, withHeader = true, checkHeader = false) @@ -251,11 +252,11 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te withView("carsTable") { spark.sql( s""" - |CREATE TEMPORARY VIEW carsTable - |(yearMade double, makeName string, modelName string, priceTag decimal, - | comments string, grp string) - |USING csv - |OPTIONS (path "${testFile(carsTsvFile)}", header "true", delimiter "\t") + |CREATE TEMPORARY VIEW carsTable + |(yearMade double, makeName string, modelName string, priceTag decimal, + | comments string, grp string) + |USING csv + |OPTIONS (path "${testFile(carsTsvFile)}", header "true", delimiter "\t") """.stripMargin.replaceAll("\n", " ")) assert( @@ -338,10 +339,10 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te withView("carsTable") { spark.sql( s""" - |CREATE TEMPORARY VIEW carsTable - |(yearMade double, makeName string, modelName string, comments string, grp string) - |USING csv - |OPTIONS (path "${testFile(emptyFile)}", header "false") + |CREATE TEMPORARY VIEW carsTable + |(yearMade double, makeName string, modelName string, comments string, grp string) + |USING csv + |OPTIONS (path "${testFile(emptyFile)}", header "false") """.stripMargin.replaceAll("\n", " ")) assert(spark.sql("SELECT count(*) FROM carsTable").collect().head(0) === 0) @@ -352,10 +353,10 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te withView("carsTable") { spark.sql( s""" - |CREATE TEMPORARY VIEW carsTable - |(yearMade double, makeName string, modelName string, comments string, blank string) - |USING csv - |OPTIONS (path "${testFile(carsFile)}", header "true") + |CREATE TEMPORARY VIEW carsTable + |(yearMade double, makeName string, modelName string, comments string, blank string) + |USING csv + |OPTIONS (path "${testFile(carsFile)}", header "true") """.stripMargin.replaceAll("\n", " ")) val cars = spark.table("carsTable") @@ -580,8 +581,8 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te val expected = Seq(Seq(1, 2, 3, 4, 5.01D, Timestamp.valueOf("2015-08-20 15:57:00")), - Seq(6, 7, 8, 9, 0, Timestamp.valueOf("2015-08-21 16:58:01")), - Seq(1, 2, 3, 4, 5, Timestamp.valueOf("2015-08-23 18:00:42"))) + Seq(6, 7, 8, 9, 0, Timestamp.valueOf("2015-08-21 16:58:01")), + Seq(1, 2, 3, 4, 5, Timestamp.valueOf("2015-08-23 18:00:42"))) assert(results.toSeq.map(_.toSeq) === expected) } @@ -804,7 +805,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te assert( df.schema.fields.map(field => field.dataType).deep == - Array(IntegerType, IntegerType, IntegerType, IntegerType).deep) + Array(IntegerType, IntegerType, IntegerType, IntegerType).deep) } test("old csv data source name works") { @@ -1084,7 +1085,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te Seq("1").toDF().write.text(path.getAbsolutePath) val schema = StructType( StructField("a", IntegerType, true) :: - StructField("b", IntegerType, true) :: Nil) + StructField("b", IntegerType, true) :: Nil) val df = spark.read .schema(schema) .option("header", "false") @@ -1106,8 +1107,8 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te .csv(testFile(valueMalformedFile)) checkAnswer(df1, Row(null, null) :: - Row(1, java.sql.Date.valueOf("1983-08-04")) :: - Nil) + Row(1, java.sql.Date.valueOf("1983-08-04")) :: + Nil) // If `schema` has `columnNameOfCorruptRecord`, it should handle corrupt records val columnNameOfCorruptRecord = "_unparsed" @@ -1121,8 +1122,8 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te .csv(testFile(valueMalformedFile)) checkAnswer(df2, Row(null, null, "0,2013-111-11 12:13:14") :: - Row(1, java.sql.Date.valueOf("1983-08-04"), null) :: - Nil) + Row(1, java.sql.Date.valueOf("1983-08-04"), null) :: + Nil) // We put a `columnNameOfCorruptRecord` field in the middle of a schema val schemaWithCorrField2 = new StructType() @@ -1138,8 +1139,8 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te .csv(testFile(valueMalformedFile)) checkAnswer(df3, Row(null, "0,2013-111-11 12:13:14", null) :: - Row(1, null, java.sql.Date.valueOf("1983-08-04")) :: - Nil) + Row(1, null, java.sql.Date.valueOf("1983-08-04")) :: + Nil) val errMsg = intercept[AnalysisException] { spark @@ -1195,7 +1196,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te "92233720368547758070", "\n\n1.7976931348623157E308", "true", - null) + null) checkAnswer(df, expected) } } @@ -1605,10 +1606,10 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te odf.write.option("header", false).csv(path.getCanonicalPath) val ischema = new StructType().add("f2", DoubleType).add("f1", DoubleType) val idf = spark.read - .schema(ischema) - .option("header", false) - .option("enforceSchema", false) - .csv(path.getCanonicalPath) + .schema(ischema) + .option("header", false) + .option("enforceSchema", false) + .csv(path.getCanonicalPath) checkAnswer(idf, odf) } @@ -1675,8 +1676,11 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te test("SPARK-23786: warning should be printed if CSV header doesn't conform to schema") { class TestAppender extends AppenderSkeleton { var events = new java.util.ArrayList[LoggingEvent] + override def close(): Unit = {} + override def requiresLayout: Boolean = false + protected def append(event: LoggingEvent): Unit = events.add(event) } @@ -1795,6 +1799,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te assert(df.count() == expected) } + def checkCount(expected: Long): Unit = { val validRec = "1" val inputs = Seq( @@ -1887,4 +1892,19 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te checkAnswer(readback, Row(Timestamp.valueOf(t))) } } + + test("Roundtrip in reading and writing timestamps in microsecond precision") { + withTempPath { path => + val timestamp = Timestamp.valueOf("2019-11-18 11:56:00.123456") + Seq(timestamp).toDF("t") + .write + .option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS") + .csv(path.getAbsolutePath) + val readback = spark.read + .schema("t timestamp") + .option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS") + .csv(path.getAbsolutePath) + checkAnswer(readback, Row(timestamp)) + } + } } From 5aa576a2081fc9dee34634f8ff803f5da4431688 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 18 Nov 2019 20:50:47 +0300 Subject: [PATCH 5/6] Fix imports --- .../org/apache/spark/sql/catalyst/json/JacksonGenerator.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala index 84bc72276f73..a379f8642b00 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala @@ -20,10 +20,11 @@ package org.apache.spark.sql.catalyst.json import java.io.Writer import com.fasterxml.jackson.core._ + import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.SpecializedGetters -import org.apache.spark.sql.catalyst.util.DateTimeUtils.TimestampParser import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData} +import org.apache.spark.sql.catalyst.util.DateTimeUtils.TimestampParser import org.apache.spark.sql.types._ /** From d71a2f9a6a2f871a91093dd7704f08c9b589a3c0 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 18 Nov 2019 21:08:59 +0300 Subject: [PATCH 6/6] Revert unrelated changes in CSVSuite --- .../execution/datasources/csv/CSVSuite.scala | 85 +++++++++---------- 1 file changed, 40 insertions(+), 45 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 1f3522e2f973..2ea8f4fa414a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -41,7 +41,6 @@ import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} import org.apache.spark.sql.types._ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with TestCsvData { - import testImplicits._ private val carsFile = "test-data/cars.csv" @@ -67,13 +66,13 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te /** Verifies data and schema. */ private def verifyCars( - df: DataFrame, - withHeader: Boolean, - numCars: Int = 3, - numFields: Int = 5, - checkHeader: Boolean = true, - checkValues: Boolean = true, - checkTypes: Boolean = false): Unit = { + df: DataFrame, + withHeader: Boolean, + numCars: Int = 3, + numFields: Int = 5, + checkHeader: Boolean = true, + checkValues: Boolean = true, + checkTypes: Boolean = false): Unit = { val numColumns = numFields val numRows = if (withHeader) numCars else numCars + 1 @@ -213,9 +212,9 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te // scalastyle:off spark.sql( s""" - |CREATE TEMPORARY VIEW carsTable USING csv - |OPTIONS (path "${testFile(carsFile8859)}", header "true", - |charset "iso-8859-1", delimiter "þ") + |CREATE TEMPORARY VIEW carsTable USING csv + |OPTIONS (path "${testFile(carsFile8859)}", header "true", + |charset "iso-8859-1", delimiter "þ") """.stripMargin.replaceAll("\n", " ")) // scalastyle:on verifyCars(spark.table("carsTable"), withHeader = true) @@ -240,8 +239,8 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te withView("carsTable") { spark.sql( s""" - |CREATE TEMPORARY VIEW carsTable USING csv - |OPTIONS (path "${testFile(carsTsvFile)}", header "true", delimiter "\t") + |CREATE TEMPORARY VIEW carsTable USING csv + |OPTIONS (path "${testFile(carsTsvFile)}", header "true", delimiter "\t") """.stripMargin.replaceAll("\n", " ")) verifyCars(spark.table("carsTable"), numFields = 6, withHeader = true, checkHeader = false) @@ -252,11 +251,11 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te withView("carsTable") { spark.sql( s""" - |CREATE TEMPORARY VIEW carsTable - |(yearMade double, makeName string, modelName string, priceTag decimal, - | comments string, grp string) - |USING csv - |OPTIONS (path "${testFile(carsTsvFile)}", header "true", delimiter "\t") + |CREATE TEMPORARY VIEW carsTable + |(yearMade double, makeName string, modelName string, priceTag decimal, + | comments string, grp string) + |USING csv + |OPTIONS (path "${testFile(carsTsvFile)}", header "true", delimiter "\t") """.stripMargin.replaceAll("\n", " ")) assert( @@ -339,10 +338,10 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te withView("carsTable") { spark.sql( s""" - |CREATE TEMPORARY VIEW carsTable - |(yearMade double, makeName string, modelName string, comments string, grp string) - |USING csv - |OPTIONS (path "${testFile(emptyFile)}", header "false") + |CREATE TEMPORARY VIEW carsTable + |(yearMade double, makeName string, modelName string, comments string, grp string) + |USING csv + |OPTIONS (path "${testFile(emptyFile)}", header "false") """.stripMargin.replaceAll("\n", " ")) assert(spark.sql("SELECT count(*) FROM carsTable").collect().head(0) === 0) @@ -353,10 +352,10 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te withView("carsTable") { spark.sql( s""" - |CREATE TEMPORARY VIEW carsTable - |(yearMade double, makeName string, modelName string, comments string, blank string) - |USING csv - |OPTIONS (path "${testFile(carsFile)}", header "true") + |CREATE TEMPORARY VIEW carsTable + |(yearMade double, makeName string, modelName string, comments string, blank string) + |USING csv + |OPTIONS (path "${testFile(carsFile)}", header "true") """.stripMargin.replaceAll("\n", " ")) val cars = spark.table("carsTable") @@ -581,8 +580,8 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te val expected = Seq(Seq(1, 2, 3, 4, 5.01D, Timestamp.valueOf("2015-08-20 15:57:00")), - Seq(6, 7, 8, 9, 0, Timestamp.valueOf("2015-08-21 16:58:01")), - Seq(1, 2, 3, 4, 5, Timestamp.valueOf("2015-08-23 18:00:42"))) + Seq(6, 7, 8, 9, 0, Timestamp.valueOf("2015-08-21 16:58:01")), + Seq(1, 2, 3, 4, 5, Timestamp.valueOf("2015-08-23 18:00:42"))) assert(results.toSeq.map(_.toSeq) === expected) } @@ -805,7 +804,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te assert( df.schema.fields.map(field => field.dataType).deep == - Array(IntegerType, IntegerType, IntegerType, IntegerType).deep) + Array(IntegerType, IntegerType, IntegerType, IntegerType).deep) } test("old csv data source name works") { @@ -1085,7 +1084,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te Seq("1").toDF().write.text(path.getAbsolutePath) val schema = StructType( StructField("a", IntegerType, true) :: - StructField("b", IntegerType, true) :: Nil) + StructField("b", IntegerType, true) :: Nil) val df = spark.read .schema(schema) .option("header", "false") @@ -1107,8 +1106,8 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te .csv(testFile(valueMalformedFile)) checkAnswer(df1, Row(null, null) :: - Row(1, java.sql.Date.valueOf("1983-08-04")) :: - Nil) + Row(1, java.sql.Date.valueOf("1983-08-04")) :: + Nil) // If `schema` has `columnNameOfCorruptRecord`, it should handle corrupt records val columnNameOfCorruptRecord = "_unparsed" @@ -1122,8 +1121,8 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te .csv(testFile(valueMalformedFile)) checkAnswer(df2, Row(null, null, "0,2013-111-11 12:13:14") :: - Row(1, java.sql.Date.valueOf("1983-08-04"), null) :: - Nil) + Row(1, java.sql.Date.valueOf("1983-08-04"), null) :: + Nil) // We put a `columnNameOfCorruptRecord` field in the middle of a schema val schemaWithCorrField2 = new StructType() @@ -1139,8 +1138,8 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te .csv(testFile(valueMalformedFile)) checkAnswer(df3, Row(null, "0,2013-111-11 12:13:14", null) :: - Row(1, null, java.sql.Date.valueOf("1983-08-04")) :: - Nil) + Row(1, null, java.sql.Date.valueOf("1983-08-04")) :: + Nil) val errMsg = intercept[AnalysisException] { spark @@ -1196,7 +1195,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te "92233720368547758070", "\n\n1.7976931348623157E308", "true", - null) + null) checkAnswer(df, expected) } } @@ -1606,10 +1605,10 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te odf.write.option("header", false).csv(path.getCanonicalPath) val ischema = new StructType().add("f2", DoubleType).add("f1", DoubleType) val idf = spark.read - .schema(ischema) - .option("header", false) - .option("enforceSchema", false) - .csv(path.getCanonicalPath) + .schema(ischema) + .option("header", false) + .option("enforceSchema", false) + .csv(path.getCanonicalPath) checkAnswer(idf, odf) } @@ -1676,11 +1675,8 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te test("SPARK-23786: warning should be printed if CSV header doesn't conform to schema") { class TestAppender extends AppenderSkeleton { var events = new java.util.ArrayList[LoggingEvent] - override def close(): Unit = {} - override def requiresLayout: Boolean = false - protected def append(event: LoggingEvent): Unit = events.add(event) } @@ -1799,7 +1795,6 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te assert(df.count() == expected) } - def checkCount(expected: Long): Unit = { val validRec = "1" val inputs = Seq(