Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}
import java.time.{Instant, LocalDate}

import org.apache.spark.sql.Row
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter}
import org.apache.spark.sql.execution.command.{DescribeCommandBase, ExecutedCommandExec, ShowTablesCommand}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval
Expand All @@ -36,27 +37,43 @@ object HiveResult {
* Returns the result as a hive compatible sequence of strings. This is used in tests and
* `SparkSQLDriver` for CLI applications.
*/
def hiveResultString(executedPlan: SparkPlan): Seq[String] = executedPlan match {
case ExecutedCommandExec(_: DescribeCommandBase) =>
// If it is a describe command for a Hive table, we want to have the output format
// be similar with Hive.
executedPlan.executeCollectPublic().map {
case Row(name: String, dataType: String, comment) =>
Seq(name, dataType,
Option(comment.asInstanceOf[String]).getOrElse(""))
.map(s => String.format(s"%-20s", s))
.mkString("\t")
}
// SHOW TABLES in Hive only output table names, while ours output database, table name, isTemp.
case command @ ExecutedCommandExec(s: ShowTablesCommand) if !s.isExtended =>
command.executeCollect().map(_.getString(1))
case other =>
val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq
// We need the types so we can output struct field names
val types = executedPlan.output.map(_.dataType)
// Reformat to match hive tab delimited output.
result.map(_.zip(types).map(e => toHiveString(e)))
.map(_.mkString("\t"))
def hiveResultString(ds: Dataset[_]): Seq[String] = {
val executedPlan = ds.queryExecution.executedPlan
executedPlan match {
case ExecutedCommandExec(_: DescribeCommandBase) =>
// If it is a describe command for a Hive table, we want to have the output format
// be similar with Hive.
executedPlan.executeCollectPublic().map {
case Row(name: String, dataType: String, comment) =>
Seq(name, dataType,
Option(comment.asInstanceOf[String]).getOrElse(""))
.map(s => String.format(s"%-20s", s))
.mkString("\t")
}
// SHOW TABLES in Hive only output table names,
// while ours output database, table name, isTemp.
case command @ ExecutedCommandExec(s: ShowTablesCommand) if !s.isExtended =>
command.executeCollect().map(_.getString(1))
case _ =>
val sessionWithJava8DatetimeEnabled = {
val cloned = ds.sparkSession.cloneSession()
cloned.conf.set(SQLConf.DATETIME_JAVA8API_ENABLED.key, true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this always true?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the old Date/Timestamp doesn't follow the new calendar and may produce wrong string for some date/timestamp values.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh wait, we format Date/Timestamp by our own formatter, so this should be no problem.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-- !query
set spark.sql.datetime.java8API.enabled
-- !query schema
struct<key:string,value:string>
-- !query output
spark.sql.datetime.java8API.enabled	false


-- !query
set set spark.sql.session.timeZone=America/Los_Angeles
-- !query schema
struct<key:string,value:string>
-- !query output
set spark.sql.session.timeZone	America/Los_Angeles


-- !query
SELECT DATE_TRUNC('MILLENNIUM', DATE '1970-03-20')
-- !query schema
struct<date_trunc(MILLENNIUM, CAST(DATE '1970-03-20' AS TIMESTAMP)):timestamp>
-- !query output
1001-01-01 00:00:00

I rm this line and run SQLQueryTestSuite with cases above, the results are the same. Or does this problem only exists for spark-sql script?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or does this problem only exists for spark-sql script?

Only when thrift-server is involved in the loop.

Copy link
Member

@yaooqinn yaooqinn Feb 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also pass these tests through ThriftServerQueryTestSuite

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark-sql> set spark.sql.session.timeZone=America/Los_Angeles;
spark.sql.session.timeZone	America/Los_Angeles
spark-sql> SELECT DATE_TRUNC('MILLENNIUM', DATE '1970-03-20');
1001-01-01 00:00:00
spark-sql> select version();
3.1.0 b3dcb63a682bc31827a86cf381f157a81e9e07ac

Also correct with bin/spark-sql

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I tested it too and looks fine. Maybe some refactor of how to format old Date/Timestamp fixes it already.

@yaooqinn can you send a PR to revert it? Let's see if all tests pass.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

cloned
}
sessionWithJava8DatetimeEnabled.withActive {
// We cannot collect the original dataset because its encoders could be created
// with disabled Java 8 date-time API.
val result: Seq[Seq[Any]] = Dataset.ofRows(ds.sparkSession, ds.logicalPlan)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

found a problem. Dataset.ofRows will set the input session as active, so we should write Dataset.ofRows(sessionWithJava8DatetimeEnabled, ... and remove the outer sessionWithJava8DatetimeEnabled.withActive.

.queryExecution
.executedPlan
.executeCollectPublic().map(_.toSeq).toSeq
// We need the types so we can output struct field names
val types = executedPlan.output.map(_.dataType)
// Reformat to match hive tab delimited output.
result.map(_.zip(types).map(e => toHiveString(e)))
.map(_.mkString("\t"))
}
}
}

private lazy val zoneId = DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,7 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
example.split(" > ").toList.foreach(_ match {
case exampleRe(sql, output) =>
val df = clonedSpark.sql(sql)
val actual = unindentAndTrim(
hiveResultString(df.queryExecution.executedPlan).mkString("\n"))
val actual = unindentAndTrim(hiveResultString(df).mkString("\n"))
val expected = unindentAndTrim(output)
assert(actual === expected)
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,6 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession {
localSparkSession.conf.set(SQLConf.ANSI_ENABLED.key, true)
case _ =>
}
localSparkSession.conf.set(SQLConf.DATETIME_JAVA8API_ENABLED.key, true)

if (configSet.nonEmpty) {
// Execute the list of set operation in order to add the desired configs
Expand Down Expand Up @@ -512,7 +511,7 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession {
val schema = df.schema.catalogString
// Get answer, but also get rid of the #1234 expression ids that show up in explain plans
val answer = SQLExecution.withNewExecutionId(df.queryExecution, Some(sql)) {
hiveResultString(df.queryExecution.executedPlan).map(replaceNotIncludedMsg)
hiveResultString(df).map(replaceNotIncludedMsg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in ThriftServerQueryTestSuite, we get the result by JDBC, so there is no DataFrame created.

We should follow pgsql and return java 8 datetime when the config is enabled. https://jdbc.postgresql.org/documentation/head/8-date-time.html

cc @wangyum @yaooqinn

}

// If the output is not pre-sorted, sort it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@ class HiveResultSuite extends SharedSparkSession {
test("date formatting in hive result") {
val dates = Seq("2018-12-28", "1582-10-13", "1582-10-14", "1582-10-15")
val df = dates.toDF("a").selectExpr("cast(a as date) as b")
val executedPlan1 = df.queryExecution.executedPlan
val result = HiveResult.hiveResultString(executedPlan1)
val result = HiveResult.hiveResultString(df)
assert(result == dates)
val executedPlan2 = df.selectExpr("array(b)").queryExecution.executedPlan
val result2 = HiveResult.hiveResultString(executedPlan2)
val df2 = df.selectExpr("array(b)")
val result2 = HiveResult.hiveResultString(df2)
assert(result2 == dates.map(x => s"[$x]"))
}

Expand All @@ -40,11 +39,10 @@ class HiveResultSuite extends SharedSparkSession {
"1582-10-14 01:02:03",
"1582-10-15 01:02:03")
val df = timestamps.toDF("a").selectExpr("cast(a as timestamp) as b")
val executedPlan1 = df.queryExecution.executedPlan
val result = HiveResult.hiveResultString(executedPlan1)
val result = HiveResult.hiveResultString(df)
assert(result == timestamps)
val executedPlan2 = df.selectExpr("array(b)").queryExecution.executedPlan
val result2 = HiveResult.hiveResultString(executedPlan2)
val df2 = df.selectExpr("array(b)")
val result2 = HiveResult.hiveResultString(df2)
assert(result2 == timestamps.map(x => s"[$x]"))
}

Expand All @@ -57,15 +55,14 @@ class HiveResultSuite extends SharedSparkSession {
test("decimal formatting in hive result") {
val df = Seq(new java.math.BigDecimal("1")).toDS()
Seq(2, 6, 18).foreach { scala =>
val executedPlan =
df.selectExpr(s"CAST(value AS decimal(38, $scala))").queryExecution.executedPlan
val result = HiveResult.hiveResultString(executedPlan)
val decimalDf = df.selectExpr(s"CAST(value AS decimal(38, $scala))")
val result = HiveResult.hiveResultString(decimalDf)
assert(result.head.split("\\.").last.length === scala)
}

val executedPlan = Seq(java.math.BigDecimal.ZERO).toDS()
.selectExpr(s"CAST(value AS decimal(38, 8))").queryExecution.executedPlan
val result = HiveResult.hiveResultString(executedPlan)
val df2 = Seq(java.math.BigDecimal.ZERO).toDS()
.selectExpr(s"CAST(value AS decimal(38, 8))")
val result = HiveResult.hiveResultString(df2)
assert(result.head === "0.00000000")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.thriftserver

import java.security.PrivilegedExceptionAction
import java.sql.{Date, Timestamp}
import java.time.{Instant, LocalDate}
import java.util.{Arrays, Map => JMap, UUID}
import java.util.concurrent.RejectedExecutionException

Expand Down Expand Up @@ -178,7 +179,14 @@ private[hive] class SparkExecuteStatementOperation(
}
curCol += 1
}
resultRowSet.addRow(row.toArray.asInstanceOf[Array[Object]])
// Convert date-time instances to types that are acceptable by Hive libs
// used in conversions to strings.
val resultRow = row.map {
case i: Instant => Timestamp.from(i)
Copy link
Member

@yaooqinn yaooqinn Feb 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seems no java8 datetime values to be add to the row buffer here by SparkExecuteStatementOperation#addNonNullColumnValue

https://github.com/apache/spark/pull/27552/files#diff-72dcd8f81a51c8a815159fdf0332acdcR84-R116

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you help fix it? I think we should output java8 datetime values if the config is enabled.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are limited by hive-jdbc module, see https://github.com/apache/hive/blob/a7e704c679a00db68db9b9f921d133d79a32cfcc/jdbc/src/java/org/apache/hive/jdbc/HiveBaseResultSet.java#L427-L457, we might need our own jdbc driver implementation to achieve this

case ld: LocalDate => Date.valueOf(ld)
case other => other
}.toArray.asInstanceOf[Array[Object]]
resultRowSet.addRow(resultRow)
curRow += 1
resultOffset += 1
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,10 @@ private[hive] class SparkSQLDriver(val context: SQLContext = SparkSQLEnv.sqlCont
// TODO unify the error code
try {
context.sparkContext.setJobDescription(command)
val execution = context.sessionState.executePlan(context.sql(command).logicalPlan)
val df = context.sql(command)
val execution = df.queryExecution
hiveResponse = SQLExecution.withNewExecutionId(execution) {
hiveResultString(execution.executedPlan)
hiveResultString(df)
}
tableSchema = getResultSetSchema(execution)
new CommandProcessorResponse(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,9 @@ abstract class HiveComparisonTest
val catalystResults = queryList.zip(hiveResults).map { case (queryString, hive) =>
val query = new TestHiveQueryExecution(queryString.replace("../../data", testDataPath))
def getResult(): Seq[String] = {
SQLExecution.withNewExecutionId(query)(hiveResultString(query.executedPlan))
SQLExecution.withNewExecutionId(query) {
hiveResultString(Dataset.ofRows(query.sparkSession, query.logical))
}
}
try { (query, prepareAnswer(query, getResult())) } catch {
case e: Throwable =>
Expand Down