diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index c9be1b9d100b0..dfb6abaf34507 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -17,19 +17,26 @@ package org.apache.spark.sql.hive +import java.io.File +import java.net.URI + import scala.util.{Failure, Success, Try} import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.ql.exec.{UDAF, UDF} import org.apache.hadoop.hive.ql.exec.{FunctionRegistry => HiveFunctionRegistry} +import org.apache.hadoop.hive.ql.session.SessionState +import org.apache.hadoop.hive.ql.session.SessionState.ResourceType import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, GenericUDF, GenericUDTF} +import org.apache.hadoop.util.Shell import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.FunctionRegistry import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder -import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, GlobalTempViewManager, SessionCatalog} +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, ExpressionInfo} import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -37,7 +44,7 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DecimalType, DoubleType} -import org.apache.spark.util.Utils +import org.apache.spark.util.{ShutdownHookManager, Utils} private[sql] class HiveSessionCatalog( @@ -135,6 +142,35 @@ private[sql] class HiveSessionCatalog( } } + override def loadFunctionResources(resources: Seq[FunctionResource]): Unit = { + logDebug("loading hive permanent function resources") + resources.foreach { resource => + val resourceType = resource.resourceType match { + case JarResource => + ResourceType.JAR + case FileResource => + ResourceType.FILE + case ArchiveResource => + ResourceType.ARCHIVE + } + val uri = if (!Shell.WINDOWS) { + new URI(resource.uri) + } + else { + new Path(resource.uri).toUri + } + val scheme = if (uri.getScheme == null) null else uri.getScheme.toLowerCase + if (scheme == null || scheme == "file") { + functionResourceLoader.loadResource(resource) + } else { + val sessionState = SessionState.get() + val localPath = sessionState.add_resource(resourceType, resource.uri) + ShutdownHookManager.registerShutdownDeleteDir(new File(localPath).getParentFile) + functionResourceLoader.loadResource(FunctionResource(resource.resourceType, localPath)) + } + } + } + override def lookupFunction(name: FunctionIdentifier, children: Seq[Expression]): Expression = { try { lookupFunction0(name, children)