diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index 0893f46c89dce..73bc638e0ee35 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -54,6 +54,8 @@ license: | - Since Spark 3.3, nulls are written as empty strings in CSV data source by default. In Spark 3.2 or earlier, nulls were written as empty strings as quoted empty strings, `""`. To restore the previous behavior, set `nullValue` to `""`. + - Since Spark 3.3, Spark Thrift Server will return the available system function metadata for databases only once, and Spark will set the function schema as `SYSTEM`. In Spark 3.2 or earlier, Spark Thrift Server will return all system functions metadata for all databases which results in duplicates. To restore the behavior before Spark 3.3, you can set `spark.sql.thriftserver.uniqueSystemFunctions` to `false`. + - Since Spark 3.3, DESCRIBE FUNCTION fails if the function does not exist. In Spark 3.2 or earlier, DESCRIBE FUNCTION can still run and print "Function: func_name not found". - Since Spark 3.3, the table property `external` becomes reserved. Certain commands will fail if you specify the `external` property, such as `CREATE TABLE ... TBLPROPERTIES` and `ALTER TABLE ... SET TBLPROPERTIES`. In Spark 3.2 and earlier, the table property `external` is silently ignored. You can set `spark.sql.legacy.notReserveProperties` to `true` to restore the old behavior. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 7a3809378cfd1..aab3549148da1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -324,6 +324,9 @@ object FunctionRegistry { val FUNC_ALIAS = TreeNodeTag[String]("functionAliasName") + val builtinFunctionScope = "SYSTEM" + val userFunctionScope = "USER" + // Note: Whenever we add a new entry here, make sure we also update ExpressionToSQLSuite val expressions: Map[String, (ExpressionInfo, FunctionBuilder)] = Map( // misc non-aggregate functions diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 464768ac7ce2b..09bc130cc8644 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -1776,9 +1776,11 @@ class SessionCatalog( // The session catalog caches some persistent functions in the FunctionRegistry // so there can be duplicates. functions.map { - case f if FunctionRegistry.functionSet.contains(f) => (f, "SYSTEM") - case f if TableFunctionRegistry.functionSet.contains(f) => (f, "SYSTEM") - case f => (f, "USER") + case f if FunctionRegistry.functionSet.contains(f) => + (f, FunctionRegistry.builtinFunctionScope) + case f if TableFunctionRegistry.functionSet.contains(f) => + (f, FunctionRegistry.builtinFunctionScope) + case f => (f, FunctionRegistry.userFunctionScope) }.distinct } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 59a896a29b6f2..b21612be9cc5e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1169,6 +1169,14 @@ object SQLConf { .intConf .createWithDefault(200) + val THRIFTSERVER_UNIQUE_SYSTEM_FUNCTIONS = + buildConf("spark.sql.thriftserver.uniqueSystemFunctions") + .doc("When true, Spark Thrift Server will return the available system function metadata " + + "for databases only once, and Spark will set the function schema as 'SYSTEM'.") + .version("3.3.0") + .booleanConf + .createWithDefault(true) + // This is used to set the default data source val DEFAULT_DATA_SOURCE_NAME = buildConf("spark.sql.sources.default") .doc("The default data source to use in input/output.") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala index f2c3bfdb90037..361ef7567b2f7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala @@ -173,8 +173,9 @@ case class ShowFunctionsCommand( sparkSession.sessionState.catalog .listFunctions(db, pattern.getOrElse("*")) .collect { - case (f, "USER") if showUserFunctions => f.unquotedString - case (f, "SYSTEM") if showSystemFunctions => f.unquotedString + case (f, FunctionRegistry.userFunctionScope) if showUserFunctions => f.unquotedString + case (f, FunctionRegistry.builtinFunctionScope) if showSystemFunctions => + f.unquotedString } // Hard code "<>", "!=", "between", "case", and "||" // for now as there is no corresponding functions. diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala index 352528e26e318..80345d0fc5e2e 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala @@ -29,6 +29,9 @@ import org.apache.hive.service.cli.session.HiveSession import org.apache.spark.internal.Logging import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.FunctionIdentifier +import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, TableFunctionRegistry} +import org.apache.spark.sql.internal.SQLConf /** * Spark's own GetFunctionsOperation @@ -80,8 +83,21 @@ private[hive] class SparkGetFunctionsOperation( parentSession.getUsername) try { + val separateDisplaySystemFunctions = + sqlContext.conf.getConf(SQLConf.THRIFTSERVER_UNIQUE_SYSTEM_FUNCTIONS) + var matchedBuiltInFunctions = if (separateDisplaySystemFunctions && functionPattern == "*" + && matchingDbs.nonEmpty) { + FunctionRegistry.functionSet ++ TableFunctionRegistry.functionSet + } else { + Set.empty[FunctionIdentifier] + } matchingDbs.foreach { db => catalog.listFunctions(db, functionPattern).foreach { + case (funcIdentifier, FunctionRegistry.`builtinFunctionScope`) + if separateDisplaySystemFunctions => + if (!matchedBuiltInFunctions.contains(funcIdentifier)) { + matchedBuiltInFunctions += funcIdentifier + } case (funcIdentifier, _) => val info = catalog.lookupFunctionInfo(funcIdentifier) val rowData = Array[AnyRef]( @@ -94,6 +110,17 @@ private[hive] class SparkGetFunctionsOperation( rowSet.addRow(rowData); } } + matchedBuiltInFunctions.foreach { functionIdentifier => + val info = catalog.lookupFunctionInfo(functionIdentifier) + val rowData = Array[AnyRef]( + DEFAULT_HIVE_CATALOG, // FUNCTION_CAT + FunctionRegistry.builtinFunctionScope, // FUNCTION_SCHEM + functionIdentifier.funcName, // FUNCTION_NAME + s"Usage: ${info.getUsage}\nExtended Usage:${info.getExtended}", // REMARKS + DatabaseMetaData.functionResultUnknown.asInstanceOf[AnyRef], // FUNCTION_TYPE + info.getClassName) // SPECIFIC_NAME + rowSet.addRow(rowData); + } setState(OperationState.FINISHED) } catch onError() diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala index a3f1a064f073a..62fc86e0d8838 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala @@ -210,11 +210,14 @@ class SparkMetadataOperationSuite extends HiveThriftServer2TestBase { } test("Spark's own GetFunctionsOperation(SparkGetFunctionsOperation)") { - def checkResult(rs: ResultSet, functionNames: Seq[String]): Unit = { + def checkResult( + rs: ResultSet, + functionNames: Seq[String], + functionSchema: String = "default"): Unit = { functionNames.foreach { func => val exprInfo = FunctionRegistry.expressions(func)._1 assert(rs.next()) - assert(rs.getString("FUNCTION_SCHEM") === "default") + assert(rs.getString("FUNCTION_SCHEM") === functionSchema) assert(rs.getString("FUNCTION_NAME") === exprInfo.getName) assert(rs.getString("REMARKS") === s"Usage: ${exprInfo.getUsage}\nExtended Usage:${exprInfo.getExtended}") @@ -226,6 +229,7 @@ class SparkMetadataOperationSuite extends HiveThriftServer2TestBase { } withJdbcStatement() { statement => + statement.execute(s"SET ${SQLConf.THRIFTSERVER_UNIQUE_SYSTEM_FUNCTIONS.key}=false") val metaData = statement.getConnection.getMetaData // Hive does not have an overlay function, we use overlay to test. checkResult(metaData.getFunctions(null, null, "overlay"), Seq("overlay")) @@ -236,6 +240,23 @@ class SparkMetadataOperationSuite extends HiveThriftServer2TestBase { checkResult(metaData.getFunctions(null, "default", "shift*"), Seq("shiftleft", "shiftright", "shiftrightunsigned")) checkResult(metaData.getFunctions(null, "default", "upPer"), Seq("upper")) + + statement.execute(s"SET ${SQLConf.THRIFTSERVER_UNIQUE_SYSTEM_FUNCTIONS.key}=true") + checkResult(metaData.getFunctions(null, null, "overlay"), Seq("overlay"), + FunctionRegistry.builtinFunctionScope) + checkResult(metaData.getFunctions(null, null, "overla*"), Seq("overlay"), + FunctionRegistry.builtinFunctionScope) + checkResult(metaData.getFunctions(null, "", "overla*"), Seq("overlay"), + FunctionRegistry.builtinFunctionScope) + checkResult(metaData.getFunctions(null, null, "does-not-exist*"), Seq.empty, + FunctionRegistry.builtinFunctionScope) + checkResult(metaData.getFunctions(null, "default", "overlay"), Seq("overlay"), + FunctionRegistry.builtinFunctionScope) + checkResult(metaData.getFunctions(null, "default", "shift*"), + Seq("shiftleft", "shiftright", "shiftrightunsigned"), + FunctionRegistry.builtinFunctionScope) + checkResult(metaData.getFunctions(null, "default", "upPer"), Seq("upper"), + FunctionRegistry.builtinFunctionScope) } } @@ -685,4 +706,36 @@ class SparkMetadataOperationSuite extends HiveThriftServer2TestBase { } } } + + test("SPARK-37173: SparkGetFunctionOperation return builtin function only once") { + def checkFunctions( + rs: ResultSet, + functionName: String, + expectedFunctionSchemas: Seq[String], + repeats: Int): Unit = { + var nums = 0 + var functionSchemas = Seq.empty[String] + while (rs.next()) { + if (rs.getString("FUNCTION_NAME") == functionName) { + functionSchemas = functionSchemas :+ rs.getString("FUNCTION_SCHEM") + nums += 1 + } + } + assert(nums === repeats) + functionSchemas.zip(expectedFunctionSchemas).foreach { case (actual, expected) => + assert(actual === expected) + } + } + + withDatabase("test_spark_37173") { statement => + statement.execute(s"CREATE DATABASE IF NOT EXISTS test_spark_37173") + statement.execute(s"SET ${SQLConf.THRIFTSERVER_UNIQUE_SYSTEM_FUNCTIONS.key}=false") + val metaData = statement.getConnection.getMetaData + checkFunctions(metaData.getFunctions(null, "*", "*"), + "length", Seq("default", "test_spark_37173"), 2) + statement.execute(s"SET ${SQLConf.THRIFTSERVER_UNIQUE_SYSTEM_FUNCTIONS.key}=true") + checkFunctions(metaData.getFunctions(null, "*", "*"), + "length", Seq("SYSTEM"), 1) + } + } }