diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala index e240ed8e1f7..d9795435699 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala @@ -18,22 +18,27 @@ package org.apache.kyuubi.engine.spark.schema import java.nio.ByteBuffer -import java.nio.charset.StandardCharsets -import java.sql.Timestamp -import java.time._ -import java.util.Date +import java.time.ZoneId import scala.collection.JavaConverters._ import org.apache.hive.service.rpc.thrift._ import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter} +import org.apache.spark.sql.execution.HiveResult +import org.apache.spark.sql.execution.HiveResult.TimeFormatters import org.apache.spark.sql.types._ -import org.apache.kyuubi.engine.spark.schema.SchemaHelper.TIMESTAMP_NTZ import org.apache.kyuubi.util.RowSetUtils._ object RowSet { + def getTimeFormatters(timeZone: ZoneId): TimeFormatters = { + val dateFormatter = DateFormatter() + val timestampFormatter = TimestampFormatter.getFractionFormatter(timeZone) + TimeFormatters(dateFormatter, timestampFormatter) + } + def toTRowSet( bytes: Array[Byte], protocolVersion: TProtocolVersion): TRowSet = { @@ -68,9 +73,9 @@ object RowSet { } def toRowBasedSet(rows: Seq[Row], schema: StructType, timeZone: ZoneId): TRowSet = { - var i = 0 val rowSize = rows.length val tRows = new java.util.ArrayList[TRow](rowSize) + var i = 0 while (i < rowSize) { val row = rows(i) val tRow = new TRow() @@ -151,13 +156,8 @@ object RowSet { while (i < rowSize) { val row = rows(i) nulls.set(i, row.isNullAt(ordinal)) - val value = - if (row.isNullAt(ordinal)) { - "" - } else { - toHiveString((row.get(ordinal), typ), timeZone) - } - values.add(value) + values.add( + HiveResult.toHiveString((row.get(ordinal), typ), false, getTimeFormatters(timeZone))) i += 1 } TColumn.stringVal(new TStringColumn(values, nulls)) @@ -239,68 +239,15 @@ object RowSet { val tStrValue = new TStringValue if (!row.isNullAt(ordinal)) { tStrValue.setValue( - toHiveString((row.get(ordinal), types(ordinal).dataType), timeZone)) + HiveResult.toHiveString( + (row.get(ordinal), types(ordinal).dataType), + false, + getTimeFormatters(timeZone))) } TColumnValue.stringVal(tStrValue) } } - /** - * A simpler impl of Spark's toHiveString - */ - def toHiveString(dataWithType: (Any, DataType), timeZone: ZoneId): String = { - dataWithType match { - case (null, _) => - // Only match nulls in nested type values - "null" - - case (d: Date, DateType) => - formatDate(d) - - case (ld: LocalDate, DateType) => - formatLocalDate(ld) - - case (t: Timestamp, TimestampType) => - formatTimestamp(t) - - case (t: LocalDateTime, ntz) if ntz.getClass.getSimpleName.equals(TIMESTAMP_NTZ) => - formatLocalDateTime(t) - - case (i: Instant, TimestampType) => - formatInstant(i, Option(timeZone)) - - case (bin: Array[Byte], BinaryType) => - new String(bin, StandardCharsets.UTF_8) - - case (decimal: java.math.BigDecimal, DecimalType()) => - decimal.toPlainString - - case (s: String, StringType) => - // Only match string in nested type values - "\"" + s + "\"" - - case (d: Duration, _) => toDayTimeIntervalString(d) - - case (p: Period, _) => toYearMonthIntervalString(p) - - case (seq: scala.collection.Seq[_], ArrayType(typ, _)) => - seq.map(v => (v, typ)).map(e => toHiveString(e, timeZone)).mkString("[", ",", "]") - - case (m: Map[_, _], MapType(kType, vType, _)) => - m.map { case (key, value) => - toHiveString((key, kType), timeZone) + ":" + toHiveString((value, vType), timeZone) - }.toSeq.sorted.mkString("{", ",", "}") - - case (struct: Row, StructType(fields)) => - struct.toSeq.zip(fields).map { case (v, t) => - s""""${t.name}":${toHiveString((v, t.dataType), timeZone)}""" - }.mkString("{", ",", "}") - - case (other, _) => - other.toString - } - } - private def toTColumn(data: Array[Byte]): TColumn = { val values = new java.util.ArrayList[ByteBuffer](1) values.add(ByteBuffer.wrap(data)) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala index 23f7df21310..4b853582621 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala @@ -21,8 +21,9 @@ import java.time.ZoneId import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row} +import org.apache.spark.sql.execution.HiveResult import org.apache.spark.sql.functions._ -import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType} +import org.apache.spark.sql.types._ import org.apache.kyuubi.engine.spark.schema.RowSet @@ -41,11 +42,11 @@ object SparkDatasetHelper { val dt = DataType.fromDDL(schemaDDL) dt match { case StructType(Array(StructField(_, st: StructType, _, _))) => - RowSet.toHiveString((row, st), timeZone) + HiveResult.toHiveString((row, st), true, RowSet.getTimeFormatters(timeZone)) case StructType(Array(StructField(_, at: ArrayType, _, _))) => - RowSet.toHiveString((row.toSeq.head, at), timeZone) + HiveResult.toHiveString((row.toSeq.head, at), true, RowSet.getTimeFormatters(timeZone)) case StructType(Array(StructField(_, mt: MapType, _, _))) => - RowSet.toHiveString((row.toSeq.head, mt), timeZone) + HiveResult.toHiveString((row.toSeq.head, mt), true, RowSet.getTimeFormatters(timeZone)) case _ => throw new UnsupportedOperationException } @@ -54,7 +55,7 @@ object SparkDatasetHelper { val cols = df.schema.map { case sf @ StructField(name, _: StructType, _, _) => toHiveStringUDF(quotedCol(name), lit(sf.toDDL)).as(name) - case sf @ StructField(name, (_: MapType | _: ArrayType), _, _) => + case sf @ StructField(name, _: MapType | _: ArrayType, _, _) => toHiveStringUDF(struct(quotedCol(name)), lit(sf.toDDL)).as(name) case StructField(name, _, _, _) => quotedCol(name) } diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/schema/RowSetSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/schema/RowSetSuite.scala index 803eea3e6cd..6c91d1800db 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/schema/RowSetSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/schema/RowSetSuite.scala @@ -26,11 +26,11 @@ import scala.collection.JavaConverters._ import org.apache.hive.service.rpc.thrift.TProtocolVersion import org.apache.spark.sql.Row +import org.apache.spark.sql.execution.HiveResult import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.CalendarInterval import org.apache.kyuubi.KyuubiFunSuite -import org.apache.kyuubi.engine.spark.schema.RowSet.toHiveString class RowSetSuite extends KyuubiFunSuite { @@ -159,22 +159,28 @@ class RowSetSuite extends KyuubiFunSuite { val decCol = cols.next().getStringVal decCol.getValues.asScala.zipWithIndex.foreach { - case (b, 11) => assert(b.isEmpty) + case (b, 11) => assert(b === "NULL") case (b, i) => assert(b === s"$i.$i") } val dateCol = cols.next().getStringVal dateCol.getValues.asScala.zipWithIndex.foreach { - case (b, 11) => assert(b.isEmpty) + case (b, 11) => assert(b === "NULL") case (b, i) => - assert(b === toHiveString((Date.valueOf(s"2018-11-${i + 1}"), DateType), zoneId)) + assert(b === HiveResult.toHiveString( + (Date.valueOf(s"2018-11-${i + 1}"), DateType), + false, + HiveResult.getTimeFormatters)) } val tsCol = cols.next().getStringVal tsCol.getValues.asScala.zipWithIndex.foreach { - case (b, 11) => assert(b.isEmpty) + case (b, 11) => assert(b === "NULL") case (b, i) => assert(b === - toHiveString((Timestamp.valueOf(s"2018-11-17 13:33:33.$i"), TimestampType), zoneId)) + HiveResult.toHiveString( + (Timestamp.valueOf(s"2018-11-17 13:33:33.$i"), TimestampType), + false, + HiveResult.getTimeFormatters)) } val binCol = cols.next().getBinaryVal @@ -185,23 +191,25 @@ class RowSetSuite extends KyuubiFunSuite { val arrCol = cols.next().getStringVal arrCol.getValues.asScala.zipWithIndex.foreach { - case (b, 11) => assert(b === "") - case (b, i) => assert(b === toHiveString( + case (b, 11) => assert(b === "NULL") + case (b, i) => assert(b === HiveResult.toHiveString( (Array.fill(i)(java.lang.Double.valueOf(s"$i.$i")).toSeq, ArrayType(DoubleType)), - zoneId)) + false, + HiveResult.getTimeFormatters)) } val mapCol = cols.next().getStringVal mapCol.getValues.asScala.zipWithIndex.foreach { - case (b, 11) => assert(b === "") - case (b, i) => assert(b === toHiveString( + case (b, 11) => assert(b === "NULL") + case (b, i) => assert(b === HiveResult.toHiveString( (Map(i -> java.lang.Double.valueOf(s"$i.$i")), MapType(IntegerType, DoubleType)), - zoneId)) + false, + HiveResult.getTimeFormatters)) } val intervalCol = cols.next().getStringVal intervalCol.getValues.asScala.zipWithIndex.foreach { - case (b, 11) => assert(b === "") + case (b, 11) => assert(b === "NULL") case (b, i) => assert(b === new CalendarInterval(i, i, i).toString) } } @@ -237,7 +245,7 @@ class RowSetSuite extends KyuubiFunSuite { assert(r6.get(9).getStringVal.getValue === "2018-11-06") val r7 = iter.next().getColVals - assert(r7.get(10).getStringVal.getValue === "2018-11-17 13:33:33.600") + assert(r7.get(10).getStringVal.getValue === "2018-11-17 13:33:33.6") assert(r7.get(11).getStringVal.getValue === new String( Array.fill[Byte](6)(6.toByte), StandardCharsets.UTF_8)) @@ -245,7 +253,10 @@ class RowSetSuite extends KyuubiFunSuite { val r8 = iter.next().getColVals assert(r8.get(12).getStringVal.getValue === Array.fill(7)(7.7d).mkString("[", ",", "]")) assert(r8.get(13).getStringVal.getValue === - toHiveString((Map(7 -> 7.7d), MapType(IntegerType, DoubleType)), zoneId)) + HiveResult.toHiveString( + (Map(7 -> 7.7d), MapType(IntegerType, DoubleType)), + false, + HiveResult.getTimeFormatters)) val r9 = iter.next().getColVals assert(r9.get(14).getStringVal.getValue === new CalendarInterval(8, 8, 8).toString) diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/RowSetUtils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/RowSetUtils.scala index 6eb6b6225ff..fca79c0f2a5 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/RowSetUtils.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/RowSetUtils.scala @@ -18,14 +18,11 @@ package org.apache.kyuubi.util import java.nio.ByteBuffer -import java.sql.Timestamp -import java.time.{Duration, Instant, LocalDate, LocalDateTime, Period, ZoneId} +import java.time.{Instant, LocalDate, LocalDateTime, ZoneId} import java.time.chrono.IsoChronology -import java.time.format.DateTimeFormatter import java.time.format.DateTimeFormatterBuilder import java.time.temporal.ChronoField import java.util.{Date, Locale} -import java.util.concurrent.TimeUnit import scala.language.implicitConversions @@ -37,24 +34,18 @@ private[kyuubi] object RowSetUtils { final private val SECOND_PER_HOUR: Long = SECOND_PER_MINUTE * 60L final private val SECOND_PER_DAY: Long = SECOND_PER_HOUR * 24L - private lazy val dateFormatter = { - createDateTimeFormatterBuilder().appendPattern("yyyy-MM-dd") - .toFormatter(Locale.US) - .withChronology(IsoChronology.INSTANCE) - } + private lazy val dateFormatter = createDateTimeFormatterBuilder() + .appendPattern("yyyy-MM-dd") + .toFormatter(Locale.US) + .withChronology(IsoChronology.INSTANCE) private lazy val legacyDateFormatter = FastDateFormat.getInstance("yyyy-MM-dd", Locale.US) - private lazy val timestampFormatter: DateTimeFormatter = { - createDateTimeFormatterBuilder().appendPattern("yyyy-MM-dd HH:mm:ss") - .appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true) - .toFormatter(Locale.US) - .withChronology(IsoChronology.INSTANCE) - } - - private lazy val legacyTimestampFormatter = { - FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss.SSS", Locale.US) - } + private lazy val timestampFormatter = createDateTimeFormatterBuilder() + .appendPattern("yyyy-MM-dd HH:mm:ss") + .appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true) + .toFormatter(Locale.US) + .withChronology(IsoChronology.INSTANCE) private def createDateTimeFormatterBuilder(): DateTimeFormatterBuilder = { new DateTimeFormatterBuilder().parseCaseInsensitive() @@ -77,34 +68,7 @@ private[kyuubi] object RowSetUtils { .getOrElse(timestampFormatter.format(i)) } - def formatTimestamp(t: Timestamp): String = { - legacyTimestampFormatter.format(t) - } - implicit def bitSetToBuffer(bitSet: java.util.BitSet): ByteBuffer = { ByteBuffer.wrap(bitSet.toByteArray) } - - def toDayTimeIntervalString(d: Duration): String = { - var rest = d.getSeconds - var sign = "" - if (d.getSeconds < 0) { - sign = "-" - rest = -rest - } - val days = TimeUnit.SECONDS.toDays(rest) - rest %= SECOND_PER_DAY - val hours = TimeUnit.SECONDS.toHours(rest) - rest %= SECOND_PER_HOUR - val minutes = TimeUnit.SECONDS.toMinutes(rest) - val seconds = rest % SECOND_PER_MINUTE - f"$sign$days $hours%02d:$minutes%02d:$seconds%02d.${d.getNano}%09d" - } - - def toYearMonthIntervalString(d: Period): String = { - val years = d.getYears - val months = d.getMonths - val sign = if (years < 0 || months < 0) "-" else "" - s"$sign${Math.abs(years)}-${Math.abs(months)}" - } } diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkDataTypeTests.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkDataTypeTests.scala index 6881677034e..7345b7523c9 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkDataTypeTests.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkDataTypeTests.scala @@ -159,7 +159,7 @@ trait SparkDataTypeTests extends HiveJDBCTestHelper { } } - test("execute statement - select timestamp") { + test("execute statement - select timestamp - second") { withJdbcStatement() { statement => val resultSet = statement.executeQuery("SELECT TIMESTAMP '2018-11-17 13:33:33' AS col") assert(resultSet.next()) @@ -171,6 +171,18 @@ trait SparkDataTypeTests extends HiveJDBCTestHelper { } } + test("execute statement - select timestamp - millisecond") { + withJdbcStatement() { statement => + val resultSet = statement.executeQuery("SELECT TIMESTAMP '2018-11-17 13:33:33.12345' AS col") + assert(resultSet.next()) + assert(resultSet.getTimestamp("col") === Timestamp.valueOf("2018-11-17 13:33:33.12345")) + val metaData = resultSet.getMetaData + assert(metaData.getColumnType(1) === java.sql.Types.TIMESTAMP) + assert(metaData.getPrecision(1) === 29) + assert(metaData.getScale(1) === 9) + } + } + test("execute statement - select timestamp_ntz") { assume(SPARK_ENGINE_VERSION >= "3.4") withJdbcStatement() { statement => diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowColumnarBatchRow.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowColumnarBatchRow.java index 20ed55a1d62..fa914ce5d7d 100644 --- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowColumnarBatchRow.java +++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowColumnarBatchRow.java @@ -105,6 +105,10 @@ public Object getMap(int ordinal) { } public Object get(int ordinal, TTypeId dataType) { + long seconds; + long milliseconds; + long microseconds; + int nanos; switch (dataType) { case BOOLEAN_TYPE: return getBoolean(ordinal); @@ -127,13 +131,17 @@ public Object get(int ordinal, TTypeId dataType) { case STRING_TYPE: return getString(ordinal); case TIMESTAMP_TYPE: - return new Timestamp(getLong(ordinal) / 1000); + microseconds = getLong(ordinal); + nanos = (int) (microseconds % 1000000) * 1000; + Timestamp timestamp = new Timestamp(microseconds / 1000); + timestamp.setNanos(nanos); + return timestamp; case DATE_TYPE: return DateUtils.internalToDate(getInt(ordinal)); case INTERVAL_DAY_TIME_TYPE: - long microseconds = getLong(ordinal); - long seconds = microseconds / 1000000; - int nanos = (int) (microseconds % 1000000) * 1000; + microseconds = getLong(ordinal); + seconds = microseconds / 1000000; + nanos = (int) (microseconds % 1000000) * 1000; return new HiveIntervalDayTime(seconds, nanos); case INTERVAL_YEAR_MONTH_TYPE: return new HiveIntervalYearMonth(getInt(ordinal)); diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkSqlEngineSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkSqlEngineSuite.scala index 461a8287016..9ab627413d3 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkSqlEngineSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkSqlEngineSuite.scala @@ -132,5 +132,22 @@ class SparkSqlEngineSuite extends WithKyuubiServer with HiveJDBCTestHelper { } } + test("Spark session timezone format") { + withJdbcStatement() { statement => + val setUTCResultSet = statement.executeQuery("set spark.sql.session.timeZone=UTC") + assert(setUTCResultSet.next()) + val utcResultSet = statement.executeQuery("select from_utc_timestamp(from_unixtime(" + + "1670404535000/1000,'yyyy-MM-dd HH:mm:ss'),'GMT+08:00')") + assert(utcResultSet.next()) + assert(utcResultSet.getString(1) === "2022-12-07 17:15:35.0") + val setGMT8ResultSet = statement.executeQuery("set spark.sql.session.timeZone=GMT+8") + assert(setGMT8ResultSet.next()) + val gmt8ResultSet = statement.executeQuery("select from_utc_timestamp(from_unixtime(" + + "1670404535000/1000,'yyyy-MM-dd HH:mm:ss'),'GMT+08:00')") + assert(gmt8ResultSet.next()) + assert(gmt8ResultSet.getString(1) === "2022-12-08 01:15:35.0") + } + } + override protected def jdbcUrl: String = getJdbcUrl }