diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala index e48ff6e5b06..d2627fd99fd 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala @@ -88,9 +88,9 @@ class ExecutePython( val output = response.map(_.content.getOutput()).getOrElse("") val ename = response.map(_.content.getEname()).getOrElse("") val evalue = response.map(_.content.getEvalue()).getOrElse("") - val traceback = response.map(_.content.getTraceback()).getOrElse(Array.empty) + val traceback = response.map(_.content.getTraceback()).getOrElse(Seq.empty) iter = - new ArrayFetchIterator[Row](Array(Row(output, status, ename, evalue, Row(traceback: _*)))) + new ArrayFetchIterator[Row](Array(Row(output, status, ename, evalue, traceback))) setState(OperationState.FINISHED) } else { throw KyuubiSQLException(s"Interpret error:\n$statement\n $response") @@ -210,7 +210,7 @@ case class SessionPythonWorker( stdin.flush() val pythonResponse = Option(stdout.readLine()).map(ExecutePython.fromJson[PythonResponse](_)) // throw exception if internal python code fail - if (internal && pythonResponse.map(_.content.status) != Some(PythonResponse.OK_STATUS)) { + if (internal && !pythonResponse.map(_.content.status).contains(PythonResponse.OK_STATUS)) { throw KyuubiSQLException(s"Internal python code $code failure: $pythonResponse") } pythonResponse @@ -328,7 +328,7 @@ object ExecutePython extends Logging { } // for test - def defaultSparkHome(): String = { + def defaultSparkHome: String = { val homeDirFilter: FilenameFilter = (dir: File, name: String) => dir.isDirectory && name.contains("spark-") && !name.contains("-engine") // get from kyuubi-server/../externals/kyuubi-download/target @@ -418,7 +418,7 @@ case class PythonResponseContent( data: Map[String, String], ename: String, evalue: String, - traceback: Array[String], + traceback: Seq[String], status: String) { def getOutput(): String = { Option(data) @@ -431,7 +431,7 @@ case class PythonResponseContent( def getEvalue(): String = { Option(evalue).getOrElse("") } - def getTraceback(): Array[String] = { - Option(traceback).getOrElse(Array.empty) + def getTraceback(): Seq[String] = { + Option(traceback).getOrElse(Seq.empty) } } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala index b62ef6745e7..06884534d9c 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala @@ -24,6 +24,7 @@ import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TProgressU import org.apache.spark.kyuubi.{SparkProgressMonitor, SQLOperationListener} import org.apache.spark.kyuubi.SparkUtilsHelper.redact import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType import org.apache.kyuubi.{KyuubiSQLException, Utils} @@ -135,27 +136,35 @@ abstract class SparkOperation(session: Session) spark.sparkContext.setLocalProperty protected def withLocalProperties[T](f: => T): T = { - try { - spark.sparkContext.setJobGroup(statementId, redactedStatement, forceCancel) - spark.sparkContext.setLocalProperty(KYUUBI_SESSION_USER_KEY, session.user) - spark.sparkContext.setLocalProperty(KYUUBI_STATEMENT_ID_KEY, statementId) - schedulerPool match { - case Some(pool) => - spark.sparkContext.setLocalProperty(SPARK_SCHEDULER_POOL_KEY, pool) - case None => - } - if (isSessionUserSignEnabled) { - setSessionUserSign() - } + SQLConf.withExistingConf(spark.sessionState.conf) { + val originalSession = SparkSession.getActiveSession + try { + SparkSession.setActiveSession(spark) + spark.sparkContext.setJobGroup(statementId, redactedStatement, forceCancel) + spark.sparkContext.setLocalProperty(KYUUBI_SESSION_USER_KEY, session.user) + spark.sparkContext.setLocalProperty(KYUUBI_STATEMENT_ID_KEY, statementId) + schedulerPool match { + case Some(pool) => + spark.sparkContext.setLocalProperty(SPARK_SCHEDULER_POOL_KEY, pool) + case None => + } + if (isSessionUserSignEnabled) { + setSessionUserSign() + } - f - } finally { - spark.sparkContext.setLocalProperty(SPARK_SCHEDULER_POOL_KEY, null) - spark.sparkContext.setLocalProperty(KYUUBI_SESSION_USER_KEY, null) - spark.sparkContext.setLocalProperty(KYUUBI_STATEMENT_ID_KEY, null) - spark.sparkContext.clearJobGroup() - if (isSessionUserSignEnabled) { - clearSessionUserSign() + f + } finally { + spark.sparkContext.setLocalProperty(SPARK_SCHEDULER_POOL_KEY, null) + spark.sparkContext.setLocalProperty(KYUUBI_SESSION_USER_KEY, null) + spark.sparkContext.setLocalProperty(KYUUBI_STATEMENT_ID_KEY, null) + spark.sparkContext.clearJobGroup() + if (isSessionUserSignEnabled) { + clearSessionUserSign() + } + originalSession match { + case Some(session) => SparkSession.setActiveSession(session) + case None => SparkSession.clearActiveSession() + } } } } @@ -246,7 +255,7 @@ abstract class SparkOperation(session: Session) } else { val taken = iter.take(rowSetSize) RowSet.toTRowSet( - taken.toList.asInstanceOf[List[Row]], + taken.toSeq.asInstanceOf[Seq[Row]], resultSchema, getProtocolVersion, timeZone) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala index 8cc88156ba5..7be70403d5d 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala @@ -18,22 +18,25 @@ package org.apache.kyuubi.engine.spark.schema import java.nio.ByteBuffer -import java.nio.charset.StandardCharsets -import java.sql.Timestamp -import java.time._ -import java.util.Date +import java.time.ZoneId import scala.collection.JavaConverters._ import org.apache.hive.service.rpc.thrift._ import org.apache.spark.sql.Row +import org.apache.spark.sql.execution.HiveResult import org.apache.spark.sql.types._ -import org.apache.kyuubi.engine.spark.schema.SchemaHelper.TIMESTAMP_NTZ import org.apache.kyuubi.util.RowSetUtils._ object RowSet { + def toHiveString(valueAndType: (Any, DataType), nested: Boolean = false): String = { + // compatible w/ Spark 3.1 and above + val timeFormatters = HiveResult.getTimeFormatters + HiveResult.toHiveString(valueAndType, nested, timeFormatters) + } + def toTRowSet( bytes: Array[Byte], protocolVersion: TProtocolVersion): TRowSet = { @@ -68,9 +71,9 @@ object RowSet { } def toRowBasedSet(rows: Seq[Row], schema: StructType, timeZone: ZoneId): TRowSet = { - var i = 0 val rowSize = rows.length val tRows = new java.util.ArrayList[TRow](rowSize) + var i = 0 while (i < rowSize) { val row = rows(i) val tRow = new TRow() @@ -151,13 +154,7 @@ object RowSet { while (i < rowSize) { val row = rows(i) nulls.set(i, row.isNullAt(ordinal)) - val value = - if (row.isNullAt(ordinal)) { - "" - } else { - toHiveString((row.get(ordinal), typ), timeZone) - } - values.add(value) + values.add(toHiveString(row.get(ordinal) -> typ)) i += 1 } TColumn.stringVal(new TStringColumn(values, nulls)) @@ -238,69 +235,12 @@ object RowSet { case _ => val tStrValue = new TStringValue if (!row.isNullAt(ordinal)) { - tStrValue.setValue( - toHiveString((row.get(ordinal), types(ordinal).dataType), timeZone)) + tStrValue.setValue(toHiveString(row.get(ordinal) -> types(ordinal).dataType)) } TColumnValue.stringVal(tStrValue) } } - /** - * A simpler impl of Spark's toHiveString - */ - def toHiveString(dataWithType: (Any, DataType), timeZone: ZoneId): String = { - dataWithType match { - case (null, _) => - // Only match nulls in nested type values - "null" - - case (d: Date, DateType) => - formatDate(d) - - case (ld: LocalDate, DateType) => - formatLocalDate(ld) - - case (t: Timestamp, TimestampType) => - formatTimestamp(t, Option(timeZone)) - - case (t: LocalDateTime, ntz) if ntz.getClass.getSimpleName.equals(TIMESTAMP_NTZ) => - formatLocalDateTime(t) - - case (i: Instant, TimestampType) => - formatInstant(i, Option(timeZone)) - - case (bin: Array[Byte], BinaryType) => - new String(bin, StandardCharsets.UTF_8) - - case (decimal: java.math.BigDecimal, DecimalType()) => - decimal.toPlainString - - case (s: String, StringType) => - // Only match string in nested type values - "\"" + s + "\"" - - case (d: Duration, _) => toDayTimeIntervalString(d) - - case (p: Period, _) => toYearMonthIntervalString(p) - - case (seq: scala.collection.Seq[_], ArrayType(typ, _)) => - seq.map(v => (v, typ)).map(e => toHiveString(e, timeZone)).mkString("[", ",", "]") - - case (m: Map[_, _], MapType(kType, vType, _)) => - m.map { case (key, value) => - toHiveString((key, kType), timeZone) + ":" + toHiveString((value, vType), timeZone) - }.toSeq.sorted.mkString("{", ",", "}") - - case (struct: Row, StructType(fields)) => - struct.toSeq.zip(fields).map { case (v, t) => - s""""${t.name}":${toHiveString((v, t.dataType), timeZone)}""" - }.mkString("{", ",", "}") - - case (other, _) => - other.toString - } - } - private def toTColumn(data: Array[Byte]): TColumn = { val values = new java.util.ArrayList[ByteBuffer](1) values.add(ByteBuffer.wrap(data)) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala index 23f7df21310..46c3bce4d73 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala @@ -22,7 +22,7 @@ import java.time.ZoneId import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.functions._ -import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType} +import org.apache.spark.sql.types._ import org.apache.kyuubi.engine.spark.schema.RowSet @@ -41,11 +41,11 @@ object SparkDatasetHelper { val dt = DataType.fromDDL(schemaDDL) dt match { case StructType(Array(StructField(_, st: StructType, _, _))) => - RowSet.toHiveString((row, st), timeZone) + RowSet.toHiveString((row, st), nested = true) case StructType(Array(StructField(_, at: ArrayType, _, _))) => - RowSet.toHiveString((row.toSeq.head, at), timeZone) + RowSet.toHiveString((row.toSeq.head, at), nested = true) case StructType(Array(StructField(_, mt: MapType, _, _))) => - RowSet.toHiveString((row.toSeq.head, mt), timeZone) + RowSet.toHiveString((row.toSeq.head, mt), nested = true) case _ => throw new UnsupportedOperationException } @@ -54,7 +54,7 @@ object SparkDatasetHelper { val cols = df.schema.map { case sf @ StructField(name, _: StructType, _, _) => toHiveStringUDF(quotedCol(name), lit(sf.toDDL)).as(name) - case sf @ StructField(name, (_: MapType | _: ArrayType), _, _) => + case sf @ StructField(name, _: MapType | _: ArrayType, _, _) => toHiveStringUDF(struct(quotedCol(name)), lit(sf.toDDL)).as(name) case StructField(name, _, _, _) => quotedCol(name) } diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/schema/RowSetSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/schema/RowSetSuite.scala index 803eea3e6cd..a999563ea49 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/schema/RowSetSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/schema/RowSetSuite.scala @@ -30,7 +30,6 @@ import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.CalendarInterval import org.apache.kyuubi.KyuubiFunSuite -import org.apache.kyuubi.engine.spark.schema.RowSet.toHiveString class RowSetSuite extends KyuubiFunSuite { @@ -159,22 +158,22 @@ class RowSetSuite extends KyuubiFunSuite { val decCol = cols.next().getStringVal decCol.getValues.asScala.zipWithIndex.foreach { - case (b, 11) => assert(b.isEmpty) + case (b, 11) => assert(b === "NULL") case (b, i) => assert(b === s"$i.$i") } val dateCol = cols.next().getStringVal dateCol.getValues.asScala.zipWithIndex.foreach { - case (b, 11) => assert(b.isEmpty) + case (b, 11) => assert(b === "NULL") case (b, i) => - assert(b === toHiveString((Date.valueOf(s"2018-11-${i + 1}"), DateType), zoneId)) + assert(b === RowSet.toHiveString(Date.valueOf(s"2018-11-${i + 1}") -> DateType)) } val tsCol = cols.next().getStringVal tsCol.getValues.asScala.zipWithIndex.foreach { - case (b, 11) => assert(b.isEmpty) + case (b, 11) => assert(b === "NULL") case (b, i) => assert(b === - toHiveString((Timestamp.valueOf(s"2018-11-17 13:33:33.$i"), TimestampType), zoneId)) + RowSet.toHiveString(Timestamp.valueOf(s"2018-11-17 13:33:33.$i") -> TimestampType)) } val binCol = cols.next().getBinaryVal @@ -185,23 +184,21 @@ class RowSetSuite extends KyuubiFunSuite { val arrCol = cols.next().getStringVal arrCol.getValues.asScala.zipWithIndex.foreach { - case (b, 11) => assert(b === "") - case (b, i) => assert(b === toHiveString( - (Array.fill(i)(java.lang.Double.valueOf(s"$i.$i")).toSeq, ArrayType(DoubleType)), - zoneId)) + case (b, 11) => assert(b === "NULL") + case (b, i) => assert(b === RowSet.toHiveString( + Array.fill(i)(java.lang.Double.valueOf(s"$i.$i")).toSeq -> ArrayType(DoubleType))) } val mapCol = cols.next().getStringVal mapCol.getValues.asScala.zipWithIndex.foreach { - case (b, 11) => assert(b === "") - case (b, i) => assert(b === toHiveString( - (Map(i -> java.lang.Double.valueOf(s"$i.$i")), MapType(IntegerType, DoubleType)), - zoneId)) + case (b, 11) => assert(b === "NULL") + case (b, i) => assert(b === RowSet.toHiveString( + Map(i -> java.lang.Double.valueOf(s"$i.$i")) -> MapType(IntegerType, DoubleType))) } val intervalCol = cols.next().getStringVal intervalCol.getValues.asScala.zipWithIndex.foreach { - case (b, 11) => assert(b === "") + case (b, 11) => assert(b === "NULL") case (b, i) => assert(b === new CalendarInterval(i, i, i).toString) } } @@ -237,7 +234,7 @@ class RowSetSuite extends KyuubiFunSuite { assert(r6.get(9).getStringVal.getValue === "2018-11-06") val r7 = iter.next().getColVals - assert(r7.get(10).getStringVal.getValue === "2018-11-17 13:33:33.600") + assert(r7.get(10).getStringVal.getValue === "2018-11-17 13:33:33.6") assert(r7.get(11).getStringVal.getValue === new String( Array.fill[Byte](6)(6.toByte), StandardCharsets.UTF_8)) @@ -245,7 +242,7 @@ class RowSetSuite extends KyuubiFunSuite { val r8 = iter.next().getColVals assert(r8.get(12).getStringVal.getValue === Array.fill(7)(7.7d).mkString("[", ",", "]")) assert(r8.get(13).getStringVal.getValue === - toHiveString((Map(7 -> 7.7d), MapType(IntegerType, DoubleType)), zoneId)) + RowSet.toHiveString(Map(7 -> 7.7d) -> MapType(IntegerType, DoubleType))) val r9 = iter.next().getColVals assert(r9.get(14).getStringVal.getValue === new CalendarInterval(8, 8, 8).toString) diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/RowSetUtils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/RowSetUtils.scala index 82417a73092..fca79c0f2a5 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/RowSetUtils.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/RowSetUtils.scala @@ -18,14 +18,11 @@ package org.apache.kyuubi.util import java.nio.ByteBuffer -import java.sql.Timestamp -import java.time.{Duration, Instant, LocalDate, LocalDateTime, Period, ZoneId} +import java.time.{Instant, LocalDate, LocalDateTime, ZoneId} import java.time.chrono.IsoChronology -import java.time.format.DateTimeFormatter import java.time.format.DateTimeFormatterBuilder import java.time.temporal.ChronoField -import java.util.{Date, Locale, TimeZone} -import java.util.concurrent.TimeUnit +import java.util.{Date, Locale} import scala.language.implicitConversions @@ -37,24 +34,18 @@ private[kyuubi] object RowSetUtils { final private val SECOND_PER_HOUR: Long = SECOND_PER_MINUTE * 60L final private val SECOND_PER_DAY: Long = SECOND_PER_HOUR * 24L - private lazy val dateFormatter = { - createDateTimeFormatterBuilder().appendPattern("yyyy-MM-dd") - .toFormatter(Locale.US) - .withChronology(IsoChronology.INSTANCE) - } + private lazy val dateFormatter = createDateTimeFormatterBuilder() + .appendPattern("yyyy-MM-dd") + .toFormatter(Locale.US) + .withChronology(IsoChronology.INSTANCE) private lazy val legacyDateFormatter = FastDateFormat.getInstance("yyyy-MM-dd", Locale.US) - private lazy val timestampFormatter: DateTimeFormatter = { - createDateTimeFormatterBuilder().appendPattern("yyyy-MM-dd HH:mm:ss") - .appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true) - .toFormatter(Locale.US) - .withChronology(IsoChronology.INSTANCE) - } - - private lazy val legacyTimestampFormatter = { - FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss.SSS", Locale.US) - } + private lazy val timestampFormatter = createDateTimeFormatterBuilder() + .appendPattern("yyyy-MM-dd HH:mm:ss") + .appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true) + .toFormatter(Locale.US) + .withChronology(IsoChronology.INSTANCE) private def createDateTimeFormatterBuilder(): DateTimeFormatterBuilder = { new DateTimeFormatterBuilder().parseCaseInsensitive() @@ -77,40 +68,7 @@ private[kyuubi] object RowSetUtils { .getOrElse(timestampFormatter.format(i)) } - def formatTimestamp(t: Timestamp, timeZone: Option[ZoneId] = None): String = { - timeZone.map(zoneId => { - FastDateFormat.getInstance( - legacyTimestampFormatter.getPattern, - TimeZone.getTimeZone(zoneId), - legacyTimestampFormatter.getLocale) - .format(t) - }).getOrElse(legacyTimestampFormatter.format(t)) - } - implicit def bitSetToBuffer(bitSet: java.util.BitSet): ByteBuffer = { ByteBuffer.wrap(bitSet.toByteArray) } - - def toDayTimeIntervalString(d: Duration): String = { - var rest = d.getSeconds - var sign = "" - if (d.getSeconds < 0) { - sign = "-" - rest = -rest - } - val days = TimeUnit.SECONDS.toDays(rest) - rest %= SECOND_PER_DAY - val hours = TimeUnit.SECONDS.toHours(rest) - rest %= SECOND_PER_HOUR - val minutes = TimeUnit.SECONDS.toMinutes(rest) - val seconds = rest % SECOND_PER_MINUTE - f"$sign$days $hours%02d:$minutes%02d:$seconds%02d.${d.getNano}%09d" - } - - def toYearMonthIntervalString(d: Period): String = { - val years = d.getYears - val months = d.getMonths - val sign = if (years < 0 || months < 0) "-" else "" - s"$sign${Math.abs(years)}-${Math.abs(months)}" - } } diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkDataTypeTests.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkDataTypeTests.scala index 6881677034e..3164ae496b3 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkDataTypeTests.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkDataTypeTests.scala @@ -159,9 +159,10 @@ trait SparkDataTypeTests extends HiveJDBCTestHelper { } } - test("execute statement - select timestamp") { + test("execute statement - select timestamp - second") { withJdbcStatement() { statement => - val resultSet = statement.executeQuery("SELECT TIMESTAMP '2018-11-17 13:33:33' AS col") + val resultSet = statement.executeQuery( + "SELECT TIMESTAMP '2018-11-17 13:33:33' AS col") assert(resultSet.next()) assert(resultSet.getTimestamp("col") === Timestamp.valueOf("2018-11-17 13:33:33")) val metaData = resultSet.getMetaData @@ -171,13 +172,39 @@ trait SparkDataTypeTests extends HiveJDBCTestHelper { } } + test("execute statement - select timestamp - millisecond") { + withJdbcStatement() { statement => + val resultSet = statement.executeQuery( + "SELECT TIMESTAMP '2018-11-17 13:33:33.12345' AS col") + assert(resultSet.next()) + assert(resultSet.getTimestamp("col") === Timestamp.valueOf("2018-11-17 13:33:33.12345")) + val metaData = resultSet.getMetaData + assert(metaData.getColumnType(1) === java.sql.Types.TIMESTAMP) + assert(metaData.getPrecision(1) === 29) + assert(metaData.getScale(1) === 9) + } + } + + test("execute statement - select timestamp - overflow") { + withJdbcStatement() { statement => + val resultSet = statement.executeQuery( + "SELECT TIMESTAMP '2018-11-17 13:33:33.1234567' AS col") + assert(resultSet.next()) + assert(resultSet.getTimestamp("col") === Timestamp.valueOf("2018-11-17 13:33:33.123456")) + val metaData = resultSet.getMetaData + assert(metaData.getColumnType(1) === java.sql.Types.TIMESTAMP) + assert(metaData.getPrecision(1) === 29) + assert(metaData.getScale(1) === 9) + } + } + test("execute statement - select timestamp_ntz") { assume(SPARK_ENGINE_VERSION >= "3.4") withJdbcStatement() { statement => val resultSet = statement.executeQuery( - "SELECT make_timestamp_ntz(2022, 03, 24, 18, 08, 31.800) AS col") + "SELECT make_timestamp_ntz(2022, 03, 24, 18, 08, 31.8888) AS col") assert(resultSet.next()) - assert(resultSet.getTimestamp("col") === Timestamp.valueOf("2022-03-24 18:08:31.800")) + assert(resultSet.getTimestamp("col") === Timestamp.valueOf("2022-03-24 18:08:31.8888")) val metaData = resultSet.getMetaData assert(metaData.getColumnType(1) === java.sql.Types.TIMESTAMP) assert(metaData.getPrecision(1) === 29) diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowColumnarBatchRow.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowColumnarBatchRow.java index 20ed55a1d62..fa914ce5d7d 100644 --- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowColumnarBatchRow.java +++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowColumnarBatchRow.java @@ -105,6 +105,10 @@ public Object getMap(int ordinal) { } public Object get(int ordinal, TTypeId dataType) { + long seconds; + long milliseconds; + long microseconds; + int nanos; switch (dataType) { case BOOLEAN_TYPE: return getBoolean(ordinal); @@ -127,13 +131,17 @@ public Object get(int ordinal, TTypeId dataType) { case STRING_TYPE: return getString(ordinal); case TIMESTAMP_TYPE: - return new Timestamp(getLong(ordinal) / 1000); + microseconds = getLong(ordinal); + nanos = (int) (microseconds % 1000000) * 1000; + Timestamp timestamp = new Timestamp(microseconds / 1000); + timestamp.setNanos(nanos); + return timestamp; case DATE_TYPE: return DateUtils.internalToDate(getInt(ordinal)); case INTERVAL_DAY_TIME_TYPE: - long microseconds = getLong(ordinal); - long seconds = microseconds / 1000000; - int nanos = (int) (microseconds % 1000000) * 1000; + microseconds = getLong(ordinal); + seconds = microseconds / 1000000; + nanos = (int) (microseconds % 1000000) * 1000; return new HiveIntervalDayTime(seconds, nanos); case INTERVAL_YEAR_MONTH_TYPE: return new HiveIntervalYearMonth(getInt(ordinal)); diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkSqlEngineSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkSqlEngineSuite.scala index 1e35d2f1dc8..9ab627413d3 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkSqlEngineSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkSqlEngineSuite.scala @@ -139,13 +139,13 @@ class SparkSqlEngineSuite extends WithKyuubiServer with HiveJDBCTestHelper { val utcResultSet = statement.executeQuery("select from_utc_timestamp(from_unixtime(" + "1670404535000/1000,'yyyy-MM-dd HH:mm:ss'),'GMT+08:00')") assert(utcResultSet.next()) - assert(utcResultSet.getString(1) == "2022-12-07 17:15:35.0") + assert(utcResultSet.getString(1) === "2022-12-07 17:15:35.0") val setGMT8ResultSet = statement.executeQuery("set spark.sql.session.timeZone=GMT+8") assert(setGMT8ResultSet.next()) val gmt8ResultSet = statement.executeQuery("select from_utc_timestamp(from_unixtime(" + "1670404535000/1000,'yyyy-MM-dd HH:mm:ss'),'GMT+08:00')") assert(gmt8ResultSet.next()) - assert(gmt8ResultSet.getString(1) == "2022-12-08 01:15:35.0") + assert(gmt8ResultSet.getString(1) === "2022-12-08 01:15:35.0") } }