Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,34 @@

package org.apache.spark.sql.hive

import java.io.File
import java.net.URI

import scala.util.{Failure, Success, Try}
import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.ql.exec.{UDAF, UDF}
import org.apache.hadoop.hive.ql.exec.{FunctionRegistry => HiveFunctionRegistry}
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.ql.session.SessionState.ResourceType
import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, GenericUDF, GenericUDTF}
import org.apache.hadoop.util.Shell

import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, ExpressionInfo}
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DecimalType, DoubleType}
import org.apache.spark.util.Utils
import org.apache.spark.util.{ShutdownHookManager, Utils}


private[sql] class HiveSessionCatalog(
Expand Down Expand Up @@ -135,6 +142,35 @@ private[sql] class HiveSessionCatalog(
}
}

override def loadFunctionResources(resources: Seq[FunctionResource]): Unit = {
logDebug("loading hive permanent function resources")
resources.foreach { resource =>
val resourceType = resource.resourceType match {
case JarResource =>
ResourceType.JAR
case FileResource =>
ResourceType.FILE
case ArchiveResource =>
ResourceType.ARCHIVE
}
val uri = if (!Shell.WINDOWS) {
new URI(resource.uri)
}
else {
new Path(resource.uri).toUri
}
val scheme = if (uri.getScheme == null) null else uri.getScheme.toLowerCase
if (scheme == null || scheme == "file") {
functionResourceLoader.loadResource(resource)
} else {
val sessionState = SessionState.get()
val localPath = sessionState.add_resource(resourceType, resource.uri)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you trying to create a Hive UDF via Spark, then call it through Hive?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, I just use sessionState.add_resource to download the resouces

ShutdownHookManager.registerShutdownDeleteDir(new File(localPath).getParentFile)
functionResourceLoader.loadResource(FunctionResource(resource.resourceType, localPath))
}
}
}

override def lookupFunction(name: FunctionIdentifier, children: Seq[Expression]): Expression = {
try {
lookupFunction0(name, children)
Expand Down