Skip to content

Conversation

@HeartSaVioR
Copy link
Contributor

@HeartSaVioR HeartSaVioR commented Dec 27, 2019

What changes were proposed in this pull request?

This patch is based on #23921 but revised to be simpler, as well as adds UT to test the behavior.
(This patch contains the commit from #23921 to retain credit.)

Spark loads new JARs for ADD JAR and CREATE FUNCTION ... USING JAR into jar classloader in shared state, and changes current thread's context classloader to jar classloader as many parts of remaining codes rely on current thread's context classloader.

This would work if the further queries will run in same thread and there's no change on context classloader for the thread, but once the context classloader of current thread is switched back by various reason, Spark fails to create instance of class for the function.

This bug mostly affects spark-shell, as spark-shell will roll back current thread's context classloader at every prompt. But it may also affects the case of job-server, where the queries may be running in multiple threads.

This patch fixes the issue via switching the context classloader to the classloader which loads the class. Hopefully FunctionBuilder created by makeFunctionBuilder has the information of Class as a part of closure, hence the Class itself can be provided regardless of current thread's context classloader.

Why are the changes needed?

Without this patch, end users cannot execute Hive UDF using JAR twice in spark-shell.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

New UT.


private var threadContextClassLoader: ClassLoader = _

override protected def beforeEach(): Unit = {
Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Dec 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

beforeEach and afterEach are needed to make a new UT pass, as some tests change the current thread's context classloader to jar classloader.

@HeartSaVioR
Copy link
Contributor Author

The ideal fix would be not changing current thread's context classloader in addJar, and still make things work. I guess there're many places relying on context classloader, so that requires broader changes.

@HeartSaVioR
Copy link
Contributor Author

I mentioned Hive UDF as I haven't heard about creating Spark UDF function using jar. Please let me know if that's not the case.

@SparkQA
Copy link

SparkQA commented Dec 27, 2019

Test build #115861 has finished for PR 27025 at commit 02188e0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

…lassloader for tests in SQLQuerySuite didn't work
@SparkQA
Copy link

SparkQA commented Dec 28, 2019

Test build #115880 has finished for PR 27025 at commit b801ac5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

|CREATE FUNCTION udtf_count3
|AS 'org.apache.hadoop.hive.contrib.udtf.example.GenericUDTFCount3'
|USING JAR '$jarURL'
""".stripMargin)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent

@maropu
Copy link
Member

maropu commented Dec 28, 2019

sql/hive/src/test/TestUDTF-dynamicload.jar is a weird path..., can we avoid this?

@HeartSaVioR
Copy link
Contributor Author

sql/hive/src/test/TestUDTF-dynamicload.jar is a weird path..., can we avoid this?

Hmm... the jar shouldn't be in classpath. What about sql/hive/src/test/data/TestUDTF.jar and leaving README.txt in sql/hive/src/test/data to explain this path shouldn't be in classpath?

@maropu
Copy link
Member

maropu commented Dec 28, 2019

sorry, but I don't have a smart idea, either... cc: @HyukjinKwon @cloud-fan

@SparkQA
Copy link

SparkQA commented Dec 29, 2019

Test build #115896 has finished for PR 27025 at commit 418e027.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Dec 29, 2019

17:55:15.408 ERROR org.apache.spark.SparkContext: Failed to add <a href='file:///home/jenkins/workspace/SparkPullRequestBuilder/sql/hive/src/test/noclasspath/TestUDTF.jar'>file:///home/jenkins/workspace/SparkPullRequestBuilder/sql/hive/src/test/noclasspath/TestUDTF.jar</a> to Spark environment
java.lang.IllegalArgumentException: requirement failed: File TestUDTF.jar was already registered with a different path (old path = /home/jenkins/workspace/SparkPullRequestBuilder/sql/hive/target/scala-2.12/test-classes/TestUDTF.jar, new path = /home/jenkins/workspace/SparkPullRequestBuilder/sql/hive/src/test/noclasspath/TestUDTF.jar
        at scala.Predef$.require(Predef.scala:281)
        at org.apache.spark.rpc.netty.NettyStreamManager.addJar(NettyStreamManager.scala:79)
        at org.apache.spark.SparkContext.addLocalJarFile$1(SparkContext.scala:1853)
        at org.apache.spark.SparkContext.addJar(SparkContext.scala:1901)
        at org.apache.spark.sql.internal.SessionResourceLoader.addJar(SessionState.scala:166)
        at org.apache.spark.sql.hive.HiveSessionResourceLoader.addJar(HiveSessionStateBuilder.scala:118)
        at org.apache.spark.sql.internal.SessionResourceLoader.loadResource(SessionState.scala:149)
        at org.apache.spark.sql.catalyst.catalog.SessionCatalog.$anonfun$loadFunctionResources$1(SessionCatalog.scala:1287)
        at org.apache.spark.sql.catalyst.catalog.SessionCatalog.$anonfun$loadFunctionResources$1$adapted(SessionCatalog.scala:1287)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at org.apache.spark.sql.catalyst.catalog.SessionCatalog.loadFunctionResources(SessionCatalog.scala:1287)
        at org.apache.spark.sql.catalyst.catalog.SessionCatalog.lookupFunction(SessionCatalog.scala:1428)
        at org.apache.spark.sql.hive.HiveSessionCatalog.super$lookupFunction(HiveSessionCatalog.scala:135)
        at org.apache.spark.sql.hive.HiveSessionCatalog.$anonfun$lookupFunction0$2(HiveSessionCatalog.scala:135)
        at scala.util.Try$.apply(Try.scala:213)
        at org.apache.spark.sql.hive.HiveSessionCatalog.lookupFunction0(HiveSessionCatalog.scala:135)
        at org.apache.spark.sql.hive.HiveSessionCatalog.lookupFunction(HiveSessionCatalog.scala:121)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$15$$anonfun$applyOrElse$96.$anonfun$applyOrElse$99(Analyzer.scala:1739)
        at org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:53)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$15$$anonfun$applyOrElse$96.applyOrElse(Analyzer.scala:1739)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$15$$anonfun$applyOrElse$96.applyOrElse(Analyzer.scala:1722)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:286)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:286)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:291)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:376)
...

It doesn't seem to allow same name - I'll just rename the jar to TestUDTF-spark-26560.jar after the jar name.

@SparkQA
Copy link

SparkQA commented Dec 29, 2019

Test build #115913 has finished for PR 27025 at commit 988061b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 30, 2019

Test build #115953 has finished for PR 27025 at commit 39a2171.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

udfExpr.get.asInstanceOf[HiveGenericUDTF].elementSchema // Force it to check data types.
// Current thread context classloader may not be the one loaded the class. Need to switch
// context classloader to initialize instance properly.
Utils.withContextClassLoader(clazz.getClassLoader) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it guaranteed that clazz.getClassLoader is the sharedState.jarClassLoader?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the class is from classpath (not loaded from addJar), it would be spark ClassLoader instead of jarClassLoader, though jarClassLoader may be able to load it as it contains Spark classloader. So just changing to jarClassLoader may work in most cases, but this would also work for the classloader which dynamically loads the classes, as we're using classloader which "loaded" the class we want to instantiate.

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except one question

@cloud-fan cloud-fan closed this in 5d870ef Jan 2, 2020
@cloud-fan
Copy link
Contributor

thanks, merging to master!

@HeartSaVioR can you open another PR for 2.4?

@HeartSaVioR
Copy link
Contributor Author

Thanks all for reviewing and merging!

@HeartSaVioR can you open another PR for 2.4?

Sure, I'll submit a PR for 2.4 as well. Thanks!

@HeartSaVioR HeartSaVioR deleted the SPARK-26560-revised branch January 2, 2020 08:04
HeartSaVioR added a commit to HeartSaVioR/spark that referenced this pull request Jan 2, 2020
…ardless of current thread context classloader

This patch is based on apache#23921 but revised to be simpler, as well as adds UT to test the behavior.
(This patch contains the commit from apache#23921 to retain credit.)

Spark loads new JARs for `ADD JAR` and `CREATE FUNCTION ... USING JAR` into jar classloader in shared state, and changes current thread's context classloader to jar classloader as many parts of remaining codes rely on current thread's context classloader.

This would work if the further queries will run in same thread and there's no change on context classloader for the thread, but once the context classloader of current thread is switched back by various reason, Spark fails to create instance of class for the function.

This bug mostly affects spark-shell, as spark-shell will roll back current thread's context classloader at every prompt. But it may also affects the case of job-server, where the queries may be running in multiple threads.

This patch fixes the issue via switching the context classloader to the classloader which loads the class. Hopefully FunctionBuilder created by `makeFunctionBuilder` has the information of Class as a part of closure, hence the Class itself can be provided regardless of current thread's context classloader.

Without this patch, end users cannot execute Hive UDF using JAR twice in spark-shell.

No.

New UT.

Closes apache#27025 from HeartSaVioR/SPARK-26560-revised.

Lead-authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Co-authored-by: nivo091 <nivedeeta.singh@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
// mismatch, etc. Here we catch the exception and throw AnalysisException instead.
if (classOf[UDF].isAssignableFrom(clazz)) {
udfExpr = Some(HiveSimpleUDF(name, new HiveFunctionWrapper(clazz.getName), input))
udfExpr.get.dataType // Force it to check input data types.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found a potential problem: here we call HiveSimpleUDF.dateType (which is a lazy val), to force to load the class with the corrected class loader.

However, if the expression gets transformed later, which copies HiveSimpleUDF, then calling HiveSimpleUDF.dataType will re-trigger the class loading, and at that time there is no guarantee that the corrected classloader is used.

I think we should materialize the loaded class in HiveSimpleUDF.

@HeartSaVioR can you take a look?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pinging me.

Could you please confirm my understanding? Actually my knowledge to resolve this issue came from debugging (like, reverse-engineering) so I'm not sure I get it 100%.

If my understanding is correct, this seems to be the simple reproducer - could you please confirm I understand correctly?

// uses classloader which loads clazz
val udf = HiveSimpleUDF(name, new HiveFunctionWrapper(clazz.getName), input)
udf.dataType
val newUdf = udf.makeCopy(Array.empty)
// change classloader which doesn't load clazz
newUdf.dataType

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, HiveSimpleUDF needs to load class when dataType is first called. So even if we load the class here in HiveSessionCatalog, but once HiveSimpleUDF is copied during transformation, it needs to load class again.

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Mar 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I figured out above code doesn't give error - HiveFunctionWrapper stores instance which is copied in makeCopy() - so once the instance is created it doesn't seems to require changing classloader.

That said, below code gives error:

// uses classloader which loads clazz
val udf = HiveGenericUDTF(name, new HiveFunctionWrapper(clazz.getName), input)
// make sure HiveFunctionWrapper.createFunction is not called here

// change classloader which doesn't load clazz
val newUdf = udf.makeCopy(udf.productIterator.map(_.asInstanceOf[AnyRef]).toArray)
newUdf.dataType

Interestingly, like below, if we do makeCopy with classloader which loads clazz, it also doesn't give any error:

// uses classloader which loads clazz
val udf = HiveGenericUDTF(name, new HiveFunctionWrapper(clazz.getName), input)
// make sure HiveFunctionWrapper.createFunction is not called here

val newUdf = udf.makeCopy(udf.productIterator.map(_.asInstanceOf[AnyRef]).toArray)
// change classloader which doesn't load clazz
newUdf.dataType

we force call .dataType after creating HiveXXXUDF, so if my understanding is correct it won't matter.

Could you please check whether my observation is correct, or please let me know if I'm missing something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The experimental UT code I used is below (added to SQLQuerySuite.scala) :

test("SPARK-26560 ...experimenting Wenchen's comment...") {
    // force to use Spark classloader as other test (even in other test suites) may change the
    // current thread's context classloader to jar classloader
    Utils.withContextClassLoader(Utils.getSparkClassLoader) {
      withUserDefinedFunction("udtf_count3" -> false) {
        val sparkClassLoader = Thread.currentThread().getContextClassLoader

        // This jar file should not be placed to the classpath; GenericUDTFCount3 is slightly
        // modified version of GenericUDTFCount2 in hive/contrib, which emits the count for
        // three times.
        val jarPath = "src/test/noclasspath/TestUDTF-spark-26560.jar"
        val jarURL = s"file://${System.getProperty("user.dir")}/$jarPath"

        val className = "org.apache.hadoop.hive.contrib.udtf.example.GenericUDTFCount3"
        sql(
          s"""
             |CREATE FUNCTION udtf_count3
             |AS '$className'
             |USING JAR '$jarURL'
          """.stripMargin)

        assert(Thread.currentThread().getContextClassLoader eq sparkClassLoader)

        // JAR will be loaded at first usage, and it will change the current thread's
        // context classloader to jar classloader in sharedState.
        // See SessionState.addJar for details.
        sql("SELECT udtf_count3(a) FROM (SELECT 1 AS a FROM src LIMIT 3) t")

        assert(Thread.currentThread().getContextClassLoader ne sparkClassLoader)
        assert(Thread.currentThread().getContextClassLoader eq
          spark.sqlContext.sharedState.jarClassLoader)

        // uses classloader which loads clazz
        val name = "default.udtf_count3"

        val input = Array(AttributeReference("a", IntegerType, nullable = false)())
        val udf = HiveGenericUDTF(name, new HiveFunctionWrapper(className), input)
        // FIXME: uncommenting below line will lead test passing
//        udf.dataType

        // Roll back to the original classloader and run query again. Without this line, the test
        // would pass, as thread's context classloader is changed to jar classloader. But thread
        // context classloader can be changed from others as well which would fail the query; one
        // example is spark-shell, which thread context classloader rolls back automatically. This
        // mimics the behavior of spark-shell.
        Thread.currentThread().setContextClassLoader(sparkClassLoader)

        // FIXME: doing this "within" the context classloader which loads the UDF class will
        //   lead test passing even we comment out udf.dataType
        val newUdf = udf.makeCopy(udf.productIterator.map(_.asInstanceOf[AnyRef]).toArray)

        newUdf.dataType
      }
    }
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK let me put my findings: If you look at HiveFunctionWrapper.createFunction, it says we don't cache the instance for Simple UDF

    def createFunction[UDFType <: AnyRef](): UDFType = {
      if (instance != null) {
        instance.asInstanceOf[UDFType]
      } else {
        val func = Utils.getContextOrSparkClassLoader
          .loadClass(functionClassName).newInstance.asInstanceOf[UDFType]
        if (!func.isInstanceOf[UDF]) {
          // We cache the function if it's no the Simple UDF,
          // as we always have to create new instance for Simple UDF
          instance = func
        }
        func
      }
    }

I don't know the history but I assume "we always have to create new instance for Simple UDF" is correct. I think what we can do is to cache the loaded Class as well as the instance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh OK. I missed the case we don't cache the function. Thanks for the pointer!
I'll try to reproduce the finding, and fix it without touching assumption.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants