Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,49 +66,52 @@ private[sql] class HiveSessionCatalog(
name: String,
clazz: Class[_],
input: Seq[Expression]): Expression = {

Try(super.makeFunctionExpression(name, clazz, input)).getOrElse {
var udfExpr: Option[Expression] = None
try {
// When we instantiate hive UDF wrapper class, we may throw exception if the input
// expressions don't satisfy the hive UDF, such as type mismatch, input number
// mismatch, etc. Here we catch the exception and throw AnalysisException instead.
if (classOf[UDF].isAssignableFrom(clazz)) {
udfExpr = Some(HiveSimpleUDF(name, new HiveFunctionWrapper(clazz.getName), input))
udfExpr.get.dataType // Force it to check input data types.
} else if (classOf[GenericUDF].isAssignableFrom(clazz)) {
udfExpr = Some(HiveGenericUDF(name, new HiveFunctionWrapper(clazz.getName), input))
udfExpr.get.dataType // Force it to check input data types.
} else if (classOf[AbstractGenericUDAFResolver].isAssignableFrom(clazz)) {
udfExpr = Some(HiveUDAFFunction(name, new HiveFunctionWrapper(clazz.getName), input))
udfExpr.get.dataType // Force it to check input data types.
} else if (classOf[UDAF].isAssignableFrom(clazz)) {
udfExpr = Some(HiveUDAFFunction(
name,
new HiveFunctionWrapper(clazz.getName),
input,
isUDAFBridgeRequired = true))
udfExpr.get.dataType // Force it to check input data types.
} else if (classOf[GenericUDTF].isAssignableFrom(clazz)) {
udfExpr = Some(HiveGenericUDTF(name, new HiveFunctionWrapper(clazz.getName), input))
udfExpr.get.asInstanceOf[HiveGenericUDTF].elementSchema // Force it to check data types.
// Current thread context classloader may not be the one loaded the class. Need to switch
// context classloader to initialize instance properly.
Utils.withContextClassLoader(clazz.getClassLoader) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it guaranteed that clazz.getClassLoader is the sharedState.jarClassLoader?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the class is from classpath (not loaded from addJar), it would be spark ClassLoader instead of jarClassLoader, though jarClassLoader may be able to load it as it contains Spark classloader. So just changing to jarClassLoader may work in most cases, but this would also work for the classloader which dynamically loads the classes, as we're using classloader which "loaded" the class we want to instantiate.

Try(super.makeFunctionExpression(name, clazz, input)).getOrElse {
var udfExpr: Option[Expression] = None
try {
// When we instantiate hive UDF wrapper class, we may throw exception if the input
// expressions don't satisfy the hive UDF, such as type mismatch, input number
// mismatch, etc. Here we catch the exception and throw AnalysisException instead.
if (classOf[UDF].isAssignableFrom(clazz)) {
udfExpr = Some(HiveSimpleUDF(name, new HiveFunctionWrapper(clazz.getName), input))
udfExpr.get.dataType // Force it to check input data types.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found a potential problem: here we call HiveSimpleUDF.dateType (which is a lazy val), to force to load the class with the corrected class loader.

However, if the expression gets transformed later, which copies HiveSimpleUDF, then calling HiveSimpleUDF.dataType will re-trigger the class loading, and at that time there is no guarantee that the corrected classloader is used.

I think we should materialize the loaded class in HiveSimpleUDF.

@HeartSaVioR can you take a look?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pinging me.

Could you please confirm my understanding? Actually my knowledge to resolve this issue came from debugging (like, reverse-engineering) so I'm not sure I get it 100%.

If my understanding is correct, this seems to be the simple reproducer - could you please confirm I understand correctly?

// uses classloader which loads clazz
val udf = HiveSimpleUDF(name, new HiveFunctionWrapper(clazz.getName), input)
udf.dataType
val newUdf = udf.makeCopy(Array.empty)
// change classloader which doesn't load clazz
newUdf.dataType

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, HiveSimpleUDF needs to load class when dataType is first called. So even if we load the class here in HiveSessionCatalog, but once HiveSimpleUDF is copied during transformation, it needs to load class again.

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Mar 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I figured out above code doesn't give error - HiveFunctionWrapper stores instance which is copied in makeCopy() - so once the instance is created it doesn't seems to require changing classloader.

That said, below code gives error:

// uses classloader which loads clazz
val udf = HiveGenericUDTF(name, new HiveFunctionWrapper(clazz.getName), input)
// make sure HiveFunctionWrapper.createFunction is not called here

// change classloader which doesn't load clazz
val newUdf = udf.makeCopy(udf.productIterator.map(_.asInstanceOf[AnyRef]).toArray)
newUdf.dataType

Interestingly, like below, if we do makeCopy with classloader which loads clazz, it also doesn't give any error:

// uses classloader which loads clazz
val udf = HiveGenericUDTF(name, new HiveFunctionWrapper(clazz.getName), input)
// make sure HiveFunctionWrapper.createFunction is not called here

val newUdf = udf.makeCopy(udf.productIterator.map(_.asInstanceOf[AnyRef]).toArray)
// change classloader which doesn't load clazz
newUdf.dataType

we force call .dataType after creating HiveXXXUDF, so if my understanding is correct it won't matter.

Could you please check whether my observation is correct, or please let me know if I'm missing something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The experimental UT code I used is below (added to SQLQuerySuite.scala) :

test("SPARK-26560 ...experimenting Wenchen's comment...") {
    // force to use Spark classloader as other test (even in other test suites) may change the
    // current thread's context classloader to jar classloader
    Utils.withContextClassLoader(Utils.getSparkClassLoader) {
      withUserDefinedFunction("udtf_count3" -> false) {
        val sparkClassLoader = Thread.currentThread().getContextClassLoader

        // This jar file should not be placed to the classpath; GenericUDTFCount3 is slightly
        // modified version of GenericUDTFCount2 in hive/contrib, which emits the count for
        // three times.
        val jarPath = "src/test/noclasspath/TestUDTF-spark-26560.jar"
        val jarURL = s"file://${System.getProperty("user.dir")}/$jarPath"

        val className = "org.apache.hadoop.hive.contrib.udtf.example.GenericUDTFCount3"
        sql(
          s"""
             |CREATE FUNCTION udtf_count3
             |AS '$className'
             |USING JAR '$jarURL'
          """.stripMargin)

        assert(Thread.currentThread().getContextClassLoader eq sparkClassLoader)

        // JAR will be loaded at first usage, and it will change the current thread's
        // context classloader to jar classloader in sharedState.
        // See SessionState.addJar for details.
        sql("SELECT udtf_count3(a) FROM (SELECT 1 AS a FROM src LIMIT 3) t")

        assert(Thread.currentThread().getContextClassLoader ne sparkClassLoader)
        assert(Thread.currentThread().getContextClassLoader eq
          spark.sqlContext.sharedState.jarClassLoader)

        // uses classloader which loads clazz
        val name = "default.udtf_count3"

        val input = Array(AttributeReference("a", IntegerType, nullable = false)())
        val udf = HiveGenericUDTF(name, new HiveFunctionWrapper(className), input)
        // FIXME: uncommenting below line will lead test passing
//        udf.dataType

        // Roll back to the original classloader and run query again. Without this line, the test
        // would pass, as thread's context classloader is changed to jar classloader. But thread
        // context classloader can be changed from others as well which would fail the query; one
        // example is spark-shell, which thread context classloader rolls back automatically. This
        // mimics the behavior of spark-shell.
        Thread.currentThread().setContextClassLoader(sparkClassLoader)

        // FIXME: doing this "within" the context classloader which loads the UDF class will
        //   lead test passing even we comment out udf.dataType
        val newUdf = udf.makeCopy(udf.productIterator.map(_.asInstanceOf[AnyRef]).toArray)

        newUdf.dataType
      }
    }
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK let me put my findings: If you look at HiveFunctionWrapper.createFunction, it says we don't cache the instance for Simple UDF

    def createFunction[UDFType <: AnyRef](): UDFType = {
      if (instance != null) {
        instance.asInstanceOf[UDFType]
      } else {
        val func = Utils.getContextOrSparkClassLoader
          .loadClass(functionClassName).newInstance.asInstanceOf[UDFType]
        if (!func.isInstanceOf[UDF]) {
          // We cache the function if it's no the Simple UDF,
          // as we always have to create new instance for Simple UDF
          instance = func
        }
        func
      }
    }

I don't know the history but I assume "we always have to create new instance for Simple UDF" is correct. I think what we can do is to cache the loaded Class as well as the instance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh OK. I missed the case we don't cache the function. Thanks for the pointer!
I'll try to reproduce the finding, and fix it without touching assumption.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

} else if (classOf[GenericUDF].isAssignableFrom(clazz)) {
udfExpr = Some(HiveGenericUDF(name, new HiveFunctionWrapper(clazz.getName), input))
udfExpr.get.dataType // Force it to check input data types.
} else if (classOf[AbstractGenericUDAFResolver].isAssignableFrom(clazz)) {
udfExpr = Some(HiveUDAFFunction(name, new HiveFunctionWrapper(clazz.getName), input))
udfExpr.get.dataType // Force it to check input data types.
} else if (classOf[UDAF].isAssignableFrom(clazz)) {
udfExpr = Some(HiveUDAFFunction(
name,
new HiveFunctionWrapper(clazz.getName),
input,
isUDAFBridgeRequired = true))
udfExpr.get.dataType // Force it to check input data types.
} else if (classOf[GenericUDTF].isAssignableFrom(clazz)) {
udfExpr = Some(HiveGenericUDTF(name, new HiveFunctionWrapper(clazz.getName), input))
udfExpr.get.asInstanceOf[HiveGenericUDTF].elementSchema // Force it to check data types.
}
} catch {
case NonFatal(e) =>
val noHandlerMsg = s"No handler for UDF/UDAF/UDTF '${clazz.getCanonicalName}': $e"
val errorMsg =
if (classOf[GenericUDTF].isAssignableFrom(clazz)) {
s"$noHandlerMsg\nPlease make sure your function overrides " +
"`public StructObjectInspector initialize(ObjectInspector[] args)`."
} else {
noHandlerMsg
}
val analysisException = new AnalysisException(errorMsg)
analysisException.setStackTrace(e.getStackTrace)
throw analysisException
}
udfExpr.getOrElse {
throw new AnalysisException(s"No handler for UDF/UDAF/UDTF '${clazz.getCanonicalName}'")
}
} catch {
case NonFatal(e) =>
val noHandlerMsg = s"No handler for UDF/UDAF/UDTF '${clazz.getCanonicalName}': $e"
val errorMsg =
if (classOf[GenericUDTF].isAssignableFrom(clazz)) {
s"$noHandlerMsg\nPlease make sure your function overrides " +
"`public StructObjectInspector initialize(ObjectInspector[] args)`."
} else {
noHandlerMsg
}
val analysisException = new AnalysisException(errorMsg)
analysisException.setStackTrace(e.getStackTrace)
throw analysisException
}
udfExpr.getOrElse {
throw new AnalysisException(s"No handler for UDF/UDAF/UDTF '${clazz.getCanonicalName}'")
}
}
}
Expand Down
1 change: 1 addition & 0 deletions sql/hive/src/test/noclasspath/README
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Place files which are being used as resources of tests but shouldn't be added to classpath.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

case class Nested1(f1: Nested2)
case class Nested2(f2: Nested3)
Expand Down Expand Up @@ -2491,4 +2492,51 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}
}

test("SPARK-26560 Spark should be able to run Hive UDF using jar regardless of " +
"current thread context classloader") {
// force to use Spark classloader as other test (even in other test suites) may change the
// current thread's context classloader to jar classloader
Utils.withContextClassLoader(Utils.getSparkClassLoader) {
withUserDefinedFunction("udtf_count3" -> false) {
val sparkClassLoader = Thread.currentThread().getContextClassLoader

// This jar file should not be placed to the classpath; GenericUDTFCount3 is slightly
// modified version of GenericUDTFCount2 in hive/contrib, which emits the count for
// three times.
val jarPath = "src/test/noclasspath/TestUDTF-spark-26560.jar"
val jarURL = s"file://${System.getProperty("user.dir")}/$jarPath"

sql(
s"""
|CREATE FUNCTION udtf_count3
|AS 'org.apache.hadoop.hive.contrib.udtf.example.GenericUDTFCount3'
|USING JAR '$jarURL'
""".stripMargin)

assert(Thread.currentThread().getContextClassLoader eq sparkClassLoader)

// JAR will be loaded at first usage, and it will change the current thread's
// context classloader to jar classloader in sharedState.
// See SessionState.addJar for details.
checkAnswer(
sql("SELECT udtf_count3(a) FROM (SELECT 1 AS a FROM src LIMIT 3) t"),
Row(3) :: Row(3) :: Row(3) :: Nil)

assert(Thread.currentThread().getContextClassLoader ne sparkClassLoader)
assert(Thread.currentThread().getContextClassLoader eq
spark.sqlContext.sharedState.jarClassLoader)

// Roll back to the original classloader and run query again. Without this line, the test
// would pass, as thread's context classloader is changed to jar classloader. But thread
// context classloader can be changed from others as well which would fail the query; one
// example is spark-shell, which thread context classloader rolls back automatically. This
// mimics the behavior of spark-shell.
Thread.currentThread().setContextClassLoader(sparkClassLoader)
checkAnswer(
sql("SELECT udtf_count3(a) FROM (SELECT 1 AS a FROM src LIMIT 3) t"),
Row(3) :: Row(3) :: Row(3) :: Nil)
}
}
}
}