Skip to content

Conversation

@HeartSaVioR
Copy link
Contributor

What changes were proposed in this pull request?

This patch fixes the issue that external listeners are not initialized properly when spark.sql.hive.metastore.jars is set to either "maven" or custom list of jar.
("builtin" is not a case here - all jars in Spark classloader are also available in separate classloader)

The culprit is lazy initialization (lazy val or passing builder function) & thread context classloader. HiveClient leverages IsolatedClientLoader to properly load Hive and relevant libraries without issue - to not mess up with Spark classpath it uses separate classloader with leveraging thread context classloader.

But there's a messed-up case - SessionState is being initialized while HiveClient changed the thread context classloader from Spark classloader to Hive isolated one, and streaming query listeners are loaded from changed classloader while initializing SessionState.

This patch forces initializing SessionState in SparkSQLEnv to avoid such case.

Why are the changes needed?

ClassNotFoundException could occur in spark-sql with specific configuration, as explained above.

Does this PR introduce any user-facing change?

No, as I don't think end users assume the classloader of external listeners is only containing jars for Hive client.

How was this patch tested?

New UT added which fails on master branch and passes with the patch.

The error message with master branch when running UT:

java.lang.IllegalArgumentException: Error while instantiating 'org.apache.spark.sql.hive.HiveSessionStateBuilder':;
org.apache.spark.sql.AnalysisException: java.lang.IllegalArgumentException: Error while instantiating 'org.apache.spark.sql.hive.HiveSessionStateBuilder':;
	at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:109)
	at org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:221)
	at org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:147)
	at org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:137)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.init(SparkSQLEnv.scala:59)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLEnvSuite.$anonfun$new$2(SparkSQLEnvSuite.scala:44)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLEnvSuite.withSystemProperties(SparkSQLEnvSuite.scala:61)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLEnvSuite.$anonfun$new$1(SparkSQLEnvSuite.scala:43)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
	at org.scalatest.Transformer.apply(Transformer.scala:20)
	at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
	at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:149)
	at org.scalatest.FunSuiteLike.invokeWithFixture$1(FunSuiteLike.scala:184)
	at org.scalatest.FunSuiteLike.$anonfun$runTest$1(FunSuiteLike.scala:196)
	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:286)
	at org.scalatest.FunSuiteLike.runTest(FunSuiteLike.scala:196)
	at org.scalatest.FunSuiteLike.runTest$(FunSuiteLike.scala:178)
	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:56)
	at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:221)
	at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:214)
	at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:56)
	at org.scalatest.FunSuiteLike.$anonfun$runTests$1(FunSuiteLike.scala:229)
	at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:393)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:381)
	at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:376)
	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:458)
	at org.scalatest.FunSuiteLike.runTests(FunSuiteLike.scala:229)
	at org.scalatest.FunSuiteLike.runTests$(FunSuiteLike.scala:228)
	at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
	at org.scalatest.Suite.run(Suite.scala:1124)
	at org.scalatest.Suite.run$(Suite.scala:1106)
	at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
	at org.scalatest.FunSuiteLike.$anonfun$run$1(FunSuiteLike.scala:233)
	at org.scalatest.SuperEngine.runImpl(Engine.scala:518)
	at org.scalatest.FunSuiteLike.run(FunSuiteLike.scala:233)
	at org.scalatest.FunSuiteLike.run$(FunSuiteLike.scala:232)
	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:56)
	at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
	at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
	at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
	at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:56)
	at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1349)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1343)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1343)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:1033)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:1011)
	at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1509)
	at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1011)
	at org.scalatest.tools.Runner$.run(Runner.scala:850)
	at org.scalatest.tools.Runner.run(Runner.scala)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:133)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:27)
Caused by: java.lang.IllegalArgumentException: Error while instantiating 'org.apache.spark.sql.hive.HiveSessionStateBuilder':
	at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$instantiateSessionState(SparkSession.scala:1054)
	at org.apache.spark.sql.SparkSession.$anonfun$sessionState$2(SparkSession.scala:156)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:154)
	at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:151)
	at org.apache.spark.sql.SparkSession.$anonfun$new$3(SparkSession.scala:105)
	at scala.Option.map(Option.scala:230)
	at org.apache.spark.sql.SparkSession.$anonfun$new$1(SparkSession.scala:105)
	at org.apache.spark.sql.internal.SQLConf$.get(SQLConf.scala:164)
	at org.apache.spark.sql.hive.client.HiveClientImpl.newState(HiveClientImpl.scala:183)
	at org.apache.spark.sql.hive.client.HiveClientImpl.<init>(HiveClientImpl.scala:127)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:300)
	at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:421)
	at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:314)
	at org.apache.spark.sql.hive.HiveExternalCatalog.client$lzycompute(HiveExternalCatalog.scala:68)
	at org.apache.spark.sql.hive.HiveExternalCatalog.client(HiveExternalCatalog.scala:67)
	at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$databaseExists$1(HiveExternalCatalog.scala:221)
	at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
	at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:99)
	... 58 more
Caused by: java.lang.ClassNotFoundException: test.custom.listener.DummyQueryExecutionListener
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at org.apache.spark.util.Utils$.classForName(Utils.scala:206)
	at org.apache.spark.util.Utils$.$anonfun$loadExtensions$1(Utils.scala:2746)
	at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
	at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:108)
	at org.apache.spark.util.Utils$.loadExtensions(Utils.scala:2744)
	at org.apache.spark.sql.util.ExecutionListenerManager.$anonfun$new$1(QueryExecutionListener.scala:83)
	at org.apache.spark.sql.util.ExecutionListenerManager.$anonfun$new$1$adapted(QueryExecutionListener.scala:82)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.sql.util.ExecutionListenerManager.<init>(QueryExecutionListener.scala:82)
	at org.apache.spark.sql.internal.BaseSessionStateBuilder.$anonfun$listenerManager$2(BaseSessionStateBuilder.scala:293)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.internal.BaseSessionStateBuilder.listenerManager(BaseSessionStateBuilder.scala:293)
	at org.apache.spark.sql.internal.BaseSessionStateBuilder.build(BaseSessionStateBuilder.scala:320)
	at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$instantiateSessionState(SparkSession.scala:1051)
	... 80 more

…HiveClient in SparkSQLEnv

### What changes were proposed in this pull request?

This patch fixes the issue that external listeners are not initialized properly when `spark.sql.hive.metastore.jars` is set to either "maven" or custom list of jar.
("builtin" is not a case here - all jars in Spark classloader are also available in separate classloader)

The culprit is lazy initialization (lazy val or passing builder function) & thread context classloader. HiveClient leverages IsolatedClientLoader to properly load Hive and relevant libraries without issue - to not mess up with Spark classpath it uses separate classloader with leveraging thread context classloader.

But there's a messed-up case - SessionState is being initialized while HiveClient changed the thread context classloader from Spark classloader to Hive isolated one, and streaming query listeners are loaded from changed classloader while initializing SessionState.

This patch forces initializing SessionState in SparkSQLEnv to avoid such case.

### Why are the changes needed?

ClassNotFoundException could occur in spark-sql with specific configuration, as explained above.

### Does this PR introduce any user-facing change?

No, as I don't think end users assume the classloader of external listeners is only containing jars for Hive client.

### How was this patch tested?

New UT added which fails on master branch and passes with the patch.

The error message with master branch when running UT:

```
java.lang.IllegalArgumentException: Error while instantiating 'org.apache.spark.sql.hive.HiveSessionStateBuilder':;
org.apache.spark.sql.AnalysisException: java.lang.IllegalArgumentException: Error while instantiating 'org.apache.spark.sql.hive.HiveSessionStateBuilder':;
	at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:109)
	at org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:221)
	at org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:147)
	at org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:137)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.init(SparkSQLEnv.scala:59)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLEnvSuite.$anonfun$new$2(SparkSQLEnvSuite.scala:44)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLEnvSuite.withSystemProperties(SparkSQLEnvSuite.scala:61)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLEnvSuite.$anonfun$new$1(SparkSQLEnvSuite.scala:43)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
	at org.scalatest.Transformer.apply(Transformer.scala:20)
	at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
	at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:149)
	at org.scalatest.FunSuiteLike.invokeWithFixture$1(FunSuiteLike.scala:184)
	at org.scalatest.FunSuiteLike.$anonfun$runTest$1(FunSuiteLike.scala:196)
	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:286)
	at org.scalatest.FunSuiteLike.runTest(FunSuiteLike.scala:196)
	at org.scalatest.FunSuiteLike.runTest$(FunSuiteLike.scala:178)
	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:56)
	at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:221)
	at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:214)
	at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:56)
	at org.scalatest.FunSuiteLike.$anonfun$runTests$1(FunSuiteLike.scala:229)
	at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:393)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:381)
	at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:376)
	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:458)
	at org.scalatest.FunSuiteLike.runTests(FunSuiteLike.scala:229)
	at org.scalatest.FunSuiteLike.runTests$(FunSuiteLike.scala:228)
	at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
	at org.scalatest.Suite.run(Suite.scala:1124)
	at org.scalatest.Suite.run$(Suite.scala:1106)
	at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
	at org.scalatest.FunSuiteLike.$anonfun$run$1(FunSuiteLike.scala:233)
	at org.scalatest.SuperEngine.runImpl(Engine.scala:518)
	at org.scalatest.FunSuiteLike.run(FunSuiteLike.scala:233)
	at org.scalatest.FunSuiteLike.run$(FunSuiteLike.scala:232)
	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:56)
	at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
	at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
	at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
	at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:56)
	at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1349)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1343)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1343)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:1033)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:1011)
	at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1509)
	at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1011)
	at org.scalatest.tools.Runner$.run(Runner.scala:850)
	at org.scalatest.tools.Runner.run(Runner.scala)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:133)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:27)
Caused by: java.lang.IllegalArgumentException: Error while instantiating 'org.apache.spark.sql.hive.HiveSessionStateBuilder':
	at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$instantiateSessionState(SparkSession.scala:1054)
	at org.apache.spark.sql.SparkSession.$anonfun$sessionState$2(SparkSession.scala:156)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:154)
	at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:151)
	at org.apache.spark.sql.SparkSession.$anonfun$new$3(SparkSession.scala:105)
	at scala.Option.map(Option.scala:230)
	at org.apache.spark.sql.SparkSession.$anonfun$new$1(SparkSession.scala:105)
	at org.apache.spark.sql.internal.SQLConf$.get(SQLConf.scala:164)
	at org.apache.spark.sql.hive.client.HiveClientImpl.newState(HiveClientImpl.scala:183)
	at org.apache.spark.sql.hive.client.HiveClientImpl.<init>(HiveClientImpl.scala:127)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:300)
	at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:421)
	at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:314)
	at org.apache.spark.sql.hive.HiveExternalCatalog.client$lzycompute(HiveExternalCatalog.scala:68)
	at org.apache.spark.sql.hive.HiveExternalCatalog.client(HiveExternalCatalog.scala:67)
	at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$databaseExists$1(HiveExternalCatalog.scala:221)
	at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
	at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:99)
	... 58 more
Caused by: java.lang.ClassNotFoundException: test.custom.listener.DummyQueryExecutionListener
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at org.apache.spark.util.Utils$.classForName(Utils.scala:206)
	at org.apache.spark.util.Utils$.$anonfun$loadExtensions$1(Utils.scala:2746)
	at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
	at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:108)
	at org.apache.spark.util.Utils$.loadExtensions(Utils.scala:2744)
	at org.apache.spark.sql.util.ExecutionListenerManager.$anonfun$new$1(QueryExecutionListener.scala:83)
	at org.apache.spark.sql.util.ExecutionListenerManager.$anonfun$new$1$adapted(QueryExecutionListener.scala:82)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.sql.util.ExecutionListenerManager.<init>(QueryExecutionListener.scala:82)
	at org.apache.spark.sql.internal.BaseSessionStateBuilder.$anonfun$listenerManager$2(BaseSessionStateBuilder.scala:293)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.internal.BaseSessionStateBuilder.listenerManager(BaseSessionStateBuilder.scala:293)
	at org.apache.spark.sql.internal.BaseSessionStateBuilder.build(BaseSessionStateBuilder.scala:320)
	at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$instantiateSessionState(SparkSession.scala:1051)
	... 80 more
```

Closes apache#26258 from HeartSaVioR/SPARK-29604.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
@HeartSaVioR
Copy link
Contributor Author

cc. @dongjoon-hyun This is branch-2.4 version of #26258. Please take a look at. Thanks!

@dongjoon-hyun
Copy link
Member

Thank you, @HeartSaVioR !

/** Only exposed for testing. */
private[sql] def listListeners(): Array[StreamingQueryListener] = {
listenerBus.listeners.asScala.toArray
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. This API is added at 3.0.0 only.


class DummyQueryExecutionListener extends QueryExecutionListener {
override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {}
override def onFailure(funcName: String, qe: QueryExecution, error: Exception): Unit = {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Merged to branch-2.4.

The logic is the same. I verified the change on SparkSQLEnv.scala locally and the other exposed reader functions are not used by the other places.

dongjoon-hyun pushed a commit that referenced this pull request Oct 30, 2019
…zing HiveClient in SparkSQLEnv

### What changes were proposed in this pull request?

This patch fixes the issue that external listeners are not initialized properly when `spark.sql.hive.metastore.jars` is set to either "maven" or custom list of jar.
("builtin" is not a case here - all jars in Spark classloader are also available in separate classloader)

The culprit is lazy initialization (lazy val or passing builder function) & thread context classloader. HiveClient leverages IsolatedClientLoader to properly load Hive and relevant libraries without issue - to not mess up with Spark classpath it uses separate classloader with leveraging thread context classloader.

But there's a messed-up case - SessionState is being initialized while HiveClient changed the thread context classloader from Spark classloader to Hive isolated one, and streaming query listeners are loaded from changed classloader while initializing SessionState.

This patch forces initializing SessionState in SparkSQLEnv to avoid such case.

### Why are the changes needed?

ClassNotFoundException could occur in spark-sql with specific configuration, as explained above.

### Does this PR introduce any user-facing change?

No, as I don't think end users assume the classloader of external listeners is only containing jars for Hive client.

### How was this patch tested?

New UT added which fails on master branch and passes with the patch.

The error message with master branch when running UT:

```
java.lang.IllegalArgumentException: Error while instantiating 'org.apache.spark.sql.hive.HiveSessionStateBuilder':;
org.apache.spark.sql.AnalysisException: java.lang.IllegalArgumentException: Error while instantiating 'org.apache.spark.sql.hive.HiveSessionStateBuilder':;
	at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:109)
	at org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:221)
	at org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:147)
	at org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:137)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.init(SparkSQLEnv.scala:59)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLEnvSuite.$anonfun$new$2(SparkSQLEnvSuite.scala:44)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLEnvSuite.withSystemProperties(SparkSQLEnvSuite.scala:61)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLEnvSuite.$anonfun$new$1(SparkSQLEnvSuite.scala:43)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
	at org.scalatest.Transformer.apply(Transformer.scala:20)
	at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
	at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:149)
	at org.scalatest.FunSuiteLike.invokeWithFixture$1(FunSuiteLike.scala:184)
	at org.scalatest.FunSuiteLike.$anonfun$runTest$1(FunSuiteLike.scala:196)
	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:286)
	at org.scalatest.FunSuiteLike.runTest(FunSuiteLike.scala:196)
	at org.scalatest.FunSuiteLike.runTest$(FunSuiteLike.scala:178)
	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:56)
	at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:221)
	at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:214)
	at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:56)
	at org.scalatest.FunSuiteLike.$anonfun$runTests$1(FunSuiteLike.scala:229)
	at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:393)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:381)
	at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:376)
	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:458)
	at org.scalatest.FunSuiteLike.runTests(FunSuiteLike.scala:229)
	at org.scalatest.FunSuiteLike.runTests$(FunSuiteLike.scala:228)
	at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
	at org.scalatest.Suite.run(Suite.scala:1124)
	at org.scalatest.Suite.run$(Suite.scala:1106)
	at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
	at org.scalatest.FunSuiteLike.$anonfun$run$1(FunSuiteLike.scala:233)
	at org.scalatest.SuperEngine.runImpl(Engine.scala:518)
	at org.scalatest.FunSuiteLike.run(FunSuiteLike.scala:233)
	at org.scalatest.FunSuiteLike.run$(FunSuiteLike.scala:232)
	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:56)
	at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
	at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
	at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
	at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:56)
	at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1349)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1343)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1343)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:1033)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:1011)
	at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1509)
	at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1011)
	at org.scalatest.tools.Runner$.run(Runner.scala:850)
	at org.scalatest.tools.Runner.run(Runner.scala)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:133)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:27)
Caused by: java.lang.IllegalArgumentException: Error while instantiating 'org.apache.spark.sql.hive.HiveSessionStateBuilder':
	at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$instantiateSessionState(SparkSession.scala:1054)
	at org.apache.spark.sql.SparkSession.$anonfun$sessionState$2(SparkSession.scala:156)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:154)
	at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:151)
	at org.apache.spark.sql.SparkSession.$anonfun$new$3(SparkSession.scala:105)
	at scala.Option.map(Option.scala:230)
	at org.apache.spark.sql.SparkSession.$anonfun$new$1(SparkSession.scala:105)
	at org.apache.spark.sql.internal.SQLConf$.get(SQLConf.scala:164)
	at org.apache.spark.sql.hive.client.HiveClientImpl.newState(HiveClientImpl.scala:183)
	at org.apache.spark.sql.hive.client.HiveClientImpl.<init>(HiveClientImpl.scala:127)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:300)
	at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:421)
	at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:314)
	at org.apache.spark.sql.hive.HiveExternalCatalog.client$lzycompute(HiveExternalCatalog.scala:68)
	at org.apache.spark.sql.hive.HiveExternalCatalog.client(HiveExternalCatalog.scala:67)
	at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$databaseExists$1(HiveExternalCatalog.scala:221)
	at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
	at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:99)
	... 58 more
Caused by: java.lang.ClassNotFoundException: test.custom.listener.DummyQueryExecutionListener
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at org.apache.spark.util.Utils$.classForName(Utils.scala:206)
	at org.apache.spark.util.Utils$.$anonfun$loadExtensions$1(Utils.scala:2746)
	at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
	at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:108)
	at org.apache.spark.util.Utils$.loadExtensions(Utils.scala:2744)
	at org.apache.spark.sql.util.ExecutionListenerManager.$anonfun$new$1(QueryExecutionListener.scala:83)
	at org.apache.spark.sql.util.ExecutionListenerManager.$anonfun$new$1$adapted(QueryExecutionListener.scala:82)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.sql.util.ExecutionListenerManager.<init>(QueryExecutionListener.scala:82)
	at org.apache.spark.sql.internal.BaseSessionStateBuilder.$anonfun$listenerManager$2(BaseSessionStateBuilder.scala:293)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.internal.BaseSessionStateBuilder.listenerManager(BaseSessionStateBuilder.scala:293)
	at org.apache.spark.sql.internal.BaseSessionStateBuilder.build(BaseSessionStateBuilder.scala:320)
	at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$instantiateSessionState(SparkSession.scala:1051)
	... 80 more
```

Closes #26316 from HeartSaVioR/SPARK-29604-branch-2.4.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
@SparkQA
Copy link

SparkQA commented Oct 30, 2019

Test build #112905 has finished for PR 26316 at commit d0560b1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

Thanks for quick handling!

@HeartSaVioR HeartSaVioR deleted the SPARK-29604-branch-2.4 branch October 30, 2019 12:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants