diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md index 7391f424169..75fc3d91b14 100644 --- a/docs/deployment/settings.md +++ b/docs/deployment/settings.md @@ -429,22 +429,23 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co ### Operation -| Key | Default | Meaning | Type | Since | -|-----------------------------------------|---------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|-------| -| kyuubi.operation.idle.timeout | PT3H | Operation will be closed when it's not accessed for this duration of time | duration | 1.0.0 | -| kyuubi.operation.interrupt.on.cancel | true | When true, all running tasks will be interrupted if one cancels a query. When false, all running tasks will remain until finished. | boolean | 1.2.0 | -| kyuubi.operation.language | SQL | Choose a programing language for the following inputs | string | 1.5.0 | -| kyuubi.operation.log.dir.root | server_operation_logs | Root directory for query operation log at server-side. | string | 1.4.0 | -| kyuubi.operation.plan.only.excludes | ResetCommand,SetCommand,SetNamespaceCommand,UseStatement,SetCatalogAndNamespace | Comma-separated list of query plan names, in the form of simple class names, i.e, for `SET abc=xyz`, the value will be `SetCommand`. For those auxiliary plans, such as `switch databases`, `set properties`, or `create temporary view` etc., which are used for setup evaluating environments for analyzing actual queries, we can use this config to exclude them and let them take effect. See also kyuubi.operation.plan.only.mode. | seq | 1.5.0 | -| kyuubi.operation.plan.only.mode | none | Configures the statement performed mode, The value can be 'parse', 'analyze', 'optimize', 'optimize_with_stats', 'physical', 'execution', or 'none', when it is 'none', indicate to the statement will be fully executed, otherwise only way without executing the query. different engines currently support different modes, the Spark engine supports all modes, and the Flink engine supports 'parse', 'physical', and 'execution', other engines do not support planOnly currently. | string | 1.4.0 | -| kyuubi.operation.plan.only.output.style | plain | Configures the planOnly output style. The value can be 'plain' or 'json', and the default value is 'plain'. This configuration supports only the output styles of the Spark engine | string | 1.7.0 | -| kyuubi.operation.progress.enabled | false | Whether to enable the operation progress. When true, the operation progress will be returned in `GetOperationStatus`. | boolean | 1.6.0 | -| kyuubi.operation.query.timeout | <undefined> | Timeout for query executions at server-side, take effect with client-side timeout(`java.sql.Statement.setQueryTimeout`) together, a running query will be cancelled automatically if timeout. It's off by default, which means only client-side take full control of whether the query should timeout or not. If set, client-side timeout is capped at this point. To cancel the queries right away without waiting for task to finish, consider enabling kyuubi.operation.interrupt.on.cancel together. | duration | 1.2.0 | -| kyuubi.operation.result.format | thrift | Specify the result format, available configs are: | string | 1.7.0 | -| kyuubi.operation.result.max.rows | 0 | Max rows of Spark query results. Rows exceeding the limit would be ignored. By setting this value to 0 to disable the max rows limit. | int | 1.6.0 | -| kyuubi.operation.scheduler.pool | <undefined> | The scheduler pool of job. Note that, this config should be used after changing Spark config spark.scheduler.mode=FAIR. | string | 1.1.1 | -| kyuubi.operation.spark.listener.enabled | true | When set to true, Spark engine registers an SQLOperationListener before executing the statement, logging a few summary statistics when each stage completes. | boolean | 1.6.0 | -| kyuubi.operation.status.polling.timeout | PT5S | Timeout(ms) for long polling asynchronous running sql query's status | duration | 1.0.0 | +| Key | Default | Meaning | Type | Since | +|-------------------------------------------------|---------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|-------| +| kyuubi.operation.idle.timeout | PT3H | Operation will be closed when it's not accessed for this duration of time | duration | 1.0.0 | +| kyuubi.operation.interrupt.on.cancel | true | When true, all running tasks will be interrupted if one cancels a query. When false, all running tasks will remain until finished. | boolean | 1.2.0 | +| kyuubi.operation.language | SQL | Choose a programing language for the following inputs | string | 1.5.0 | +| kyuubi.operation.log.dir.root | server_operation_logs | Root directory for query operation log at server-side. | string | 1.4.0 | +| kyuubi.operation.plan.only.excludes | ResetCommand,SetCommand,SetNamespaceCommand,UseStatement,SetCatalogAndNamespace | Comma-separated list of query plan names, in the form of simple class names, i.e, for `SET abc=xyz`, the value will be `SetCommand`. For those auxiliary plans, such as `switch databases`, `set properties`, or `create temporary view` etc., which are used for setup evaluating environments for analyzing actual queries, we can use this config to exclude them and let them take effect. See also kyuubi.operation.plan.only.mode. | seq | 1.5.0 | +| kyuubi.operation.plan.only.mode | none | Configures the statement performed mode, The value can be 'parse', 'analyze', 'optimize', 'optimize_with_stats', 'physical', 'execution', or 'none', when it is 'none', indicate to the statement will be fully executed, otherwise only way without executing the query. different engines currently support different modes, the Spark engine supports all modes, and the Flink engine supports 'parse', 'physical', and 'execution', other engines do not support planOnly currently. | string | 1.4.0 | +| kyuubi.operation.plan.only.output.style | plain | Configures the planOnly output style. The value can be 'plain' or 'json', and the default value is 'plain'. This configuration supports only the output styles of the Spark engine | string | 1.7.0 | +| kyuubi.operation.progress.enabled | false | Whether to enable the operation progress. When true, the operation progress will be returned in `GetOperationStatus`. | boolean | 1.6.0 | +| kyuubi.operation.query.timeout | <undefined> | Timeout for query executions at server-side, take effect with client-side timeout(`java.sql.Statement.setQueryTimeout`) together, a running query will be cancelled automatically if timeout. It's off by default, which means only client-side take full control of whether the query should timeout or not. If set, client-side timeout is capped at this point. To cancel the queries right away without waiting for task to finish, consider enabling kyuubi.operation.interrupt.on.cancel together. | duration | 1.2.0 | +| kyuubi.operation.result.arrow.timestampAsString | false | When true, arrow-based rowsets will convert columns of type timestamp to strings for transmission. | boolean | 1.7.0 | +| kyuubi.operation.result.format | thrift | Specify the result format, available configs are: | string | 1.7.0 | +| kyuubi.operation.result.max.rows | 0 | Max rows of Spark query results. Rows exceeding the limit would be ignored. By setting this value to 0 to disable the max rows limit. | int | 1.6.0 | +| kyuubi.operation.scheduler.pool | <undefined> | The scheduler pool of job. Note that, this config should be used after changing Spark config spark.scheduler.mode=FAIR. | string | 1.1.1 | +| kyuubi.operation.spark.listener.enabled | true | When set to true, Spark engine registers an SQLOperationListener before executing the statement, logging a few summary statistics when each stage completes. | boolean | 1.6.0 | +| kyuubi.operation.status.polling.timeout | PT5S | Timeout(ms) for long polling asynchronous running sql query's status | duration | 1.0.0 | ### Server diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala index fac90f7ead1..2b90525c1ec 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala @@ -162,17 +162,12 @@ class ExecuteStatement( } } - // TODO:(fchen) make this configurable - val kyuubiBeelineConvertToString = true - def convertComplexType(df: DataFrame): DataFrame = { - if (kyuubiBeelineConvertToString) { - SparkDatasetHelper.convertTopLevelComplexTypeToHiveString(df) - } else { - df - } + SparkDatasetHelper.convertTopLevelComplexTypeToHiveString(df, timestampAsString) } override def getResultSetMetadataHints(): Seq[String] = - Seq(s"__kyuubi_operation_result_format__=$resultFormat") + Seq( + s"__kyuubi_operation_result_format__=$resultFormat", + s"__kyuubi_operation_result_arrow_timestampAsString__=$timestampAsString") } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala index 06884534d9c..a6a7fc896af 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala @@ -24,7 +24,7 @@ import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TProgressU import org.apache.spark.kyuubi.{SparkProgressMonitor, SQLOperationListener} import org.apache.spark.kyuubi.SparkUtilsHelper.redact import org.apache.spark.sql.{DataFrame, Row, SparkSession} -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.types.StructType import org.apache.kyuubi.{KyuubiSQLException, Utils} @@ -136,7 +136,7 @@ abstract class SparkOperation(session: Session) spark.sparkContext.setLocalProperty protected def withLocalProperties[T](f: => T): T = { - SQLConf.withExistingConf(spark.sessionState.conf) { + SQLExecution.withSQLConfPropagated(spark) { val originalSession = SparkSession.getActiveSession try { SparkSession.setActiveSession(spark) @@ -279,6 +279,10 @@ abstract class SparkOperation(session: Session) spark.conf.get("kyuubi.operation.result.format", "thrift") } + protected def timestampAsString: Boolean = { + spark.conf.get("kyuubi.operation.result.arrow.timestampAsString", "false").toBoolean + } + protected def setSessionUserSign(): Unit = { ( session.conf.get(KYUUBI_SESSION_SIGN_PUBLICKEY), diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala index 46c3bce4d73..1a542937338 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.kyuubi -import java.time.ZoneId - import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.functions._ @@ -31,12 +29,13 @@ object SparkDatasetHelper { ds.toArrowBatchRdd } - def convertTopLevelComplexTypeToHiveString(df: DataFrame): DataFrame = { - val timeZone = ZoneId.of(df.sparkSession.sessionState.conf.sessionLocalTimeZone) + def convertTopLevelComplexTypeToHiveString( + df: DataFrame, + timestampAsString: Boolean): DataFrame = { val quotedCol = (name: String) => col(quoteIfNeeded(name)) - // an udf to call `RowSet.toHiveString` on complex types(struct/array/map). + // an udf to call `RowSet.toHiveString` on complex types(struct/array/map) and timestamp type. val toHiveStringUDF = udf[String, Row, String]((row, schemaDDL) => { val dt = DataType.fromDDL(schemaDDL) dt match { @@ -46,6 +45,8 @@ object SparkDatasetHelper { RowSet.toHiveString((row.toSeq.head, at), nested = true) case StructType(Array(StructField(_, mt: MapType, _, _))) => RowSet.toHiveString((row.toSeq.head, mt), nested = true) + case StructType(Array(StructField(_, tt: TimestampType, _, _))) => + RowSet.toHiveString((row.toSeq.head, tt), nested = true) case _ => throw new UnsupportedOperationException } @@ -56,6 +57,8 @@ object SparkDatasetHelper { toHiveStringUDF(quotedCol(name), lit(sf.toDDL)).as(name) case sf @ StructField(name, _: MapType | _: ArrayType, _, _) => toHiveStringUDF(struct(quotedCol(name)), lit(sf.toDDL)).as(name) + case sf @ StructField(name, _: TimestampType, _, _) if timestampAsString => + toHiveStringUDF(struct(quotedCol(name)), lit(sf.toDDL)).as(name) case StructField(name, _, _, _) => quotedCol(name) } df.select(cols: _*) diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala index e464569147c..60cc528912d 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala @@ -35,6 +35,13 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp override def resultFormat: String = "arrow" + override def beforeEach(): Unit = { + super.beforeEach() + withJdbcStatement() { statement => + checkResultSetFormat(statement, "arrow") + } + } + test("detect resultSet format") { withJdbcStatement() { statement => checkResultSetFormat(statement, "arrow") @@ -43,7 +50,42 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp } } - def checkResultSetFormat(statement: Statement, expectFormat: String): Unit = { + test("Spark session timezone format") { + withJdbcStatement() { statement => + def check(expect: String): Unit = { + val query = + """ + |SELECT + | from_utc_timestamp( + | from_unixtime( + | 1670404535000 / 1000, 'yyyy-MM-dd HH:mm:ss' + | ), + | 'GMT+08:00' + | ) + |""".stripMargin + val resultSet = statement.executeQuery(query) + assert(resultSet.next()) + assert(resultSet.getString(1) == expect) + } + + def setTimeZone(timeZone: String): Unit = { + val rs = statement.executeQuery(s"set spark.sql.session.timeZone=$timeZone") + assert(rs.next()) + } + + Seq("true", "false").foreach { timestampAsString => + statement.executeQuery( + s"set ${KyuubiConf.ARROW_BASED_ROWSET_TIMESTAMP_AS_STRING.key}=$timestampAsString") + checkArrowBasedRowSetTimestampAsString(statement, timestampAsString) + setTimeZone("UTC") + check("2022-12-07 17:15:35.0") + setTimeZone("GMT+8") + check("2022-12-08 01:15:35.0") + } + } + } + + private def checkResultSetFormat(statement: Statement, expectFormat: String): Unit = { val query = s""" |SELECT '$${hivevar:${KyuubiConf.OPERATION_RESULT_FORMAT.key}}' AS col @@ -52,4 +94,16 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp assert(resultSet.next()) assert(resultSet.getString("col") === expectFormat) } + + private def checkArrowBasedRowSetTimestampAsString( + statement: Statement, + expect: String): Unit = { + val query = + s""" + |SELECT '$${hivevar:${KyuubiConf.ARROW_BASED_ROWSET_TIMESTAMP_AS_STRING.key}}' AS col + |""".stripMargin + val resultSet = statement.executeQuery(query) + assert(resultSet.next()) + assert(resultSet.getString("col") === expect) + } } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index 01bd46bd384..0c6a30c24e2 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -1676,6 +1676,14 @@ object KyuubiConf { .transform(_.toLowerCase(Locale.ROOT)) .createWithDefault("thrift") + val ARROW_BASED_ROWSET_TIMESTAMP_AS_STRING: ConfigEntry[Boolean] = + buildConf("kyuubi.operation.result.arrow.timestampAsString") + .doc("When true, arrow-based rowsets will convert columns of type timestamp to strings for" + + " transmission.") + .version("1.7.0") + .booleanConf + .createWithDefault(false) + val SERVER_OPERATION_LOG_DIR_ROOT: ConfigEntry[String] = buildConf("kyuubi.operation.log.dir.root") .doc("Root directory for query operation log at server-side.") diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcColumnAttributes.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcColumnAttributes.java index 06fb398999a..b0257cfff09 100644 --- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcColumnAttributes.java +++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcColumnAttributes.java @@ -20,7 +20,7 @@ public class JdbcColumnAttributes { public int precision = 0; public int scale = 0; - public String timeZone = ""; + public String timeZone = null; public JdbcColumnAttributes() {} diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiArrowBasedResultSet.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiArrowBasedResultSet.java index c3e75c0ea0e..ef5008503aa 100644 --- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiArrowBasedResultSet.java +++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiArrowBasedResultSet.java @@ -50,6 +50,7 @@ public abstract class KyuubiArrowBasedResultSet implements SQLResultSet { protected Schema arrowSchema; protected VectorSchemaRoot root; protected ArrowColumnarBatchRow row; + protected boolean timestampAsString = true; protected BufferAllocator allocator; @@ -312,11 +313,18 @@ private Object getColumnValue(int columnIndex) throws SQLException { if (wasNull) { return null; } else { - return row.get(columnIndex - 1, columnType); + JdbcColumnAttributes attributes = columnAttributes.get(columnIndex - 1); + return row.get( + columnIndex - 1, + columnType, + attributes == null ? null : attributes.timeZone, + timestampAsString); } } catch (Exception e) { - e.printStackTrace(); - throw new KyuubiSQLException("Unrecognized column type:", e); + throw new KyuubiSQLException( + String.format( + "Error getting row of type %s at column index %d", columnType, columnIndex - 1), + e); } } diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiArrowQueryResultSet.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiArrowQueryResultSet.java index 1f2af29dc16..fda70f463e9 100644 --- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiArrowQueryResultSet.java +++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiArrowQueryResultSet.java @@ -58,9 +58,6 @@ public class KyuubiArrowQueryResultSet extends KyuubiArrowBasedResultSet { private boolean isScrollable = false; private boolean fetchFirst = false; - // TODO:(fchen) make this configurable - protected boolean convertComplexTypeToString = true; - private final TProtocolVersion protocol; public static class Builder { @@ -87,6 +84,8 @@ public static class Builder { private boolean isScrollable = false; private ReentrantLock transportLock = null; + private boolean timestampAsString = true; + public Builder(Statement statement) throws SQLException { this.statement = statement; this.connection = statement.getConnection(); @@ -153,6 +152,11 @@ public Builder setScrollable(boolean setScrollable) { return this; } + public Builder setTimestampAsString(boolean timestampAsString) { + this.timestampAsString = timestampAsString; + return this; + } + public Builder setTransportLock(ReentrantLock transportLock) { this.transportLock = transportLock; return this; @@ -189,10 +193,10 @@ protected KyuubiArrowQueryResultSet(Builder builder) throws SQLException { this.maxRows = builder.maxRows; } this.isScrollable = builder.isScrollable; + this.timestampAsString = builder.timestampAsString; this.protocol = builder.getProtocolVersion(); arrowSchema = - ArrowUtils.toArrowSchema( - columnNames, convertComplexTypeToStringType(columnTypes), columnAttributes); + ArrowUtils.toArrowSchema(columnNames, convertToStringType(columnTypes), columnAttributes); if (allocator == null) { initArrowSchemaAndAllocator(); } @@ -271,8 +275,7 @@ private void retrieveSchema() throws SQLException { columnAttributes.add(getColumnAttributes(primitiveTypeEntry)); } arrowSchema = - ArrowUtils.toArrowSchema( - columnNames, convertComplexTypeToStringType(columnTypes), columnAttributes); + ArrowUtils.toArrowSchema(columnNames, convertToStringType(columnTypes), columnAttributes); } catch (SQLException eS) { throw eS; // rethrow the SQLException as is } catch (Exception ex) { @@ -480,22 +483,25 @@ public boolean isClosed() { return isClosed; } - private List convertComplexTypeToStringType(List colTypes) { - if (convertComplexTypeToString) { - return colTypes.stream() - .map( - type -> { - if (type == TTypeId.ARRAY_TYPE - || type == TTypeId.MAP_TYPE - || type == TTypeId.STRUCT_TYPE) { - return TTypeId.STRING_TYPE; - } else { - return type; - } - }) - .collect(Collectors.toList()); - } else { - return colTypes; - } + /** + * 1. the complex types (map/array/struct) are always converted to string type to transport 2. if + * the user set `timestampAsString = true`, then the timestamp type will be converted to string + * type too. + */ + private List convertToStringType(List colTypes) { + return colTypes.stream() + .map( + type -> { + if ((type == TTypeId.ARRAY_TYPE + || type == TTypeId.MAP_TYPE + || type == TTypeId.STRUCT_TYPE) // complex type (map/array/struct) + // timestamp type + || (type == TTypeId.TIMESTAMP_TYPE && timestampAsString)) { + return TTypeId.STRING_TYPE; + } else { + return type; + } + }) + .collect(Collectors.toList()); } } diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java index ab7c06a5589..b452ca6aa36 100644 --- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java +++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java @@ -37,6 +37,7 @@ public class KyuubiStatement implements SQLStatement, KyuubiLoggable { public static final Logger LOG = LoggerFactory.getLogger(KyuubiStatement.class.getName()); public static final int DEFAULT_FETCH_SIZE = 1000; public static final String DEFAULT_RESULT_FORMAT = "thrift"; + public static final String DEFAULT_ARROW_TIMESTAMP_AS_STRING = "false"; private final KyuubiConnection connection; private TCLIService.Iface client; private TOperationHandle stmtHandle = null; @@ -45,7 +46,8 @@ public class KyuubiStatement implements SQLStatement, KyuubiLoggable { private int fetchSize = DEFAULT_FETCH_SIZE; private boolean isScrollableResultset = false; private boolean isOperationComplete = false; - private Map properties = new HashMap<>(); + + private Map properties = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); /** * We need to keep a reference to the result set to support the following: * statement.execute(String sql); @@ -213,6 +215,11 @@ private boolean executeWithConfOverlay(String sql, Map confOverl LOG.info("kyuubi.operation.result.format: " + resultFormat); switch (resultFormat) { case "arrow": + boolean timestampAsString = + Boolean.parseBoolean( + properties.getOrDefault( + "__kyuubi_operation_result_arrow_timestampAsString__", + DEFAULT_ARROW_TIMESTAMP_AS_STRING)); resultSet = new KyuubiArrowQueryResultSet.Builder(this) .setClient(client) @@ -222,6 +229,7 @@ private boolean executeWithConfOverlay(String sql, Map confOverl .setFetchSize(fetchSize) .setScrollable(isScrollableResultset) .setSchema(columnNames, columnTypes, columnAttributes) + .setTimestampAsString(timestampAsString) .build(); break; default: @@ -270,6 +278,11 @@ public boolean executeAsync(String sql) throws SQLException { LOG.info("kyuubi.operation.result.format: " + resultFormat); switch (resultFormat) { case "arrow": + boolean timestampAsString = + Boolean.parseBoolean( + properties.getOrDefault( + "__kyuubi_operation_result_arrow_timestampAsString__", + DEFAULT_ARROW_TIMESTAMP_AS_STRING)); resultSet = new KyuubiArrowQueryResultSet.Builder(this) .setClient(client) @@ -279,6 +292,7 @@ public boolean executeAsync(String sql) throws SQLException { .setFetchSize(fetchSize) .setScrollable(isScrollableResultset) .setSchema(columnNames, columnTypes, columnAttributes) + .setTimestampAsString(timestampAsString) .build(); break; default: diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowColumnarBatchRow.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowColumnarBatchRow.java index fa914ce5d7d..373867069b4 100644 --- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowColumnarBatchRow.java +++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowColumnarBatchRow.java @@ -19,6 +19,8 @@ import java.math.BigDecimal; import java.sql.Timestamp; +import java.time.LocalDateTime; +import org.apache.arrow.vector.util.DateUtility; import org.apache.hive.service.rpc.thrift.TTypeId; import org.apache.kyuubi.jdbc.hive.common.DateUtils; import org.apache.kyuubi.jdbc.hive.common.HiveIntervalDayTime; @@ -104,7 +106,7 @@ public Object getMap(int ordinal) { throw new UnsupportedOperationException(); } - public Object get(int ordinal, TTypeId dataType) { + public Object get(int ordinal, TTypeId dataType, String timeZone, boolean timestampAsString) { long seconds; long milliseconds; long microseconds; @@ -131,17 +133,19 @@ public Object get(int ordinal, TTypeId dataType) { case STRING_TYPE: return getString(ordinal); case TIMESTAMP_TYPE: - microseconds = getLong(ordinal); - nanos = (int) (microseconds % 1000000) * 1000; - Timestamp timestamp = new Timestamp(microseconds / 1000); - timestamp.setNanos(nanos); - return timestamp; + if (timestampAsString) { + return Timestamp.valueOf(getString(ordinal)); + } else { + LocalDateTime localDateTime = + DateUtility.getLocalDateTimeFromEpochMicro(getLong(ordinal), timeZone); + return Timestamp.valueOf(localDateTime); + } case DATE_TYPE: return DateUtils.internalToDate(getInt(ordinal)); case INTERVAL_DAY_TIME_TYPE: microseconds = getLong(ordinal); - seconds = microseconds / 1000000; - nanos = (int) (microseconds % 1000000) * 1000; + seconds = microseconds / 1_000_000; + nanos = (int) (microseconds % 1_000_000) * 1_000; return new HiveIntervalDayTime(seconds, nanos); case INTERVAL_YEAR_MONTH_TYPE: return new HiveIntervalYearMonth(getInt(ordinal));