From 8a354bef55ce9cc0fa77fa1c3a9d62c16438ca1b Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Mon, 12 Oct 2015 13:50:34 -0700 Subject: [PATCH] [SPARK-11042] [SQL] Add a mechanism to ban creating multiple root SQLContexts/HiveContexts in a JVM https://issues.apache.org/jira/browse/SPARK-11042 Author: Yin Huai Closes #9058 from yhuai/SPARK-11042. --- .../scala/org/apache/spark/sql/SQLConf.scala | 10 ++ .../org/apache/spark/sql/SQLContext.scala | 42 +++++++- .../spark/sql/MultiSQLContextsSuite.scala | 99 +++++++++++++++++++ .../apache/spark/sql/hive/HiveContext.scala | 12 ++- 4 files changed, 156 insertions(+), 7 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/MultiSQLContextsSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 47397c4be3cb6..f62df9bdebcc0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -186,6 +186,16 @@ private[spark] object SQLConf { import SQLConfEntry._ + val ALLOW_MULTIPLE_CONTEXTS = booleanConf("spark.sql.allowMultipleContexts", + defaultValue = Some(true), + doc = "When set to true, creating multiple SQLContexts/HiveContexts is allowed." + + "When set to false, only one SQLContext/HiveContext is allowed to be created " + + "through the constructor (new SQLContexts/HiveContexts created through newSession " + + "method is allowed). Please note that this conf needs to be set in Spark Conf. Once" + + "a SQLContext/HiveContext has been created, changing the value of this conf will not" + + "have effect.", + isPublic = true) + val COMPRESS_CACHED = booleanConf("spark.sql.inMemoryColumnarStorage.compressed", defaultValue = Some(true), doc = "When set to true Spark SQL will automatically select a compression codec for each " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 2bdfd82af0adb..1bd291389241a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -26,7 +26,7 @@ import scala.collection.immutable import scala.reflect.runtime.universe.TypeTag import scala.util.control.NonFatal -import org.apache.spark.SparkContext +import org.apache.spark.{SparkException, SparkContext} import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.rdd.RDD @@ -64,14 +64,37 @@ import org.apache.spark.util.Utils */ class SQLContext private[sql]( @transient val sparkContext: SparkContext, - @transient protected[sql] val cacheManager: CacheManager) + @transient protected[sql] val cacheManager: CacheManager, + val isRootContext: Boolean) extends org.apache.spark.Logging with Serializable { self => - def this(sparkContext: SparkContext) = this(sparkContext, new CacheManager) + def this(sparkContext: SparkContext) = this(sparkContext, new CacheManager, true) def this(sparkContext: JavaSparkContext) = this(sparkContext.sc) + // If spark.sql.allowMultipleContexts is true, we will throw an exception if a user + // wants to create a new root SQLContext (a SLQContext that is not created by newSession). + private val allowMultipleContexts = + sparkContext.conf.getBoolean( + SQLConf.ALLOW_MULTIPLE_CONTEXTS.key, + SQLConf.ALLOW_MULTIPLE_CONTEXTS.defaultValue.get) + + // Assert no root SQLContext is running when allowMultipleContexts is false. + { + if (!allowMultipleContexts && isRootContext) { + SQLContext.getInstantiatedContextOption() match { + case Some(rootSQLContext) => + val errMsg = "Only one SQLContext/HiveContext may be running in this JVM. " + + s"It is recommended to use SQLContext.getOrCreate to get the instantiated " + + s"SQLContext/HiveContext. To ignore this error, " + + s"set ${SQLConf.ALLOW_MULTIPLE_CONTEXTS.key} = true in SparkConf." + throw new SparkException(errMsg) + case None => // OK + } + } + } + /** * Returns a SQLContext as new session, with separated SQL configurations, temporary tables, * registered functions, but sharing the same SparkContext and CacheManager. @@ -79,7 +102,10 @@ class SQLContext private[sql]( * @since 1.6.0 */ def newSession(): SQLContext = { - new SQLContext(sparkContext, cacheManager) + new SQLContext( + sparkContext = sparkContext, + cacheManager = cacheManager, + isRootContext = false) } /** @@ -1239,6 +1265,10 @@ object SQLContext { instantiatedContext.compareAndSet(null, sqlContext) } + private[sql] def getInstantiatedContextOption(): Option[SQLContext] = { + Option(instantiatedContext.get()) + } + /** * Changes the SQLContext that will be returned in this thread and its children when * SQLContext.getOrCreate() is called. This can be used to ensure that a given thread receives @@ -1260,6 +1290,10 @@ object SQLContext { activeContext.remove() } + private[sql] def getActiveContextOption(): Option[SQLContext] = { + Option(activeContext.get()) + } + /** * Converts an iterator of Java Beans to InternalRow using the provided * bean info & schema. This is not related to the singleton, but is a static diff --git a/sql/core/src/test/scala/org/apache/spark/sql/MultiSQLContextsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/MultiSQLContextsSuite.scala new file mode 100644 index 0000000000000..0e8fcb6a858b1 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/MultiSQLContextsSuite.scala @@ -0,0 +1,99 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql + +import org.apache.spark._ +import org.scalatest.BeforeAndAfterAll + +class MultiSQLContextsSuite extends SparkFunSuite with BeforeAndAfterAll { + + private var originalActiveSQLContext: Option[SQLContext] = _ + private var originalInstantiatedSQLContext: Option[SQLContext] = _ + private var sparkConf: SparkConf = _ + + override protected def beforeAll(): Unit = { + originalActiveSQLContext = SQLContext.getActiveContextOption() + originalInstantiatedSQLContext = SQLContext.getInstantiatedContextOption() + + SQLContext.clearActive() + originalInstantiatedSQLContext.foreach(ctx => SQLContext.clearInstantiatedContext(ctx)) + sparkConf = + new SparkConf(false) + .setMaster("local[*]") + .setAppName("test") + .set("spark.ui.enabled", "false") + .set("spark.driver.allowMultipleContexts", "true") + } + + override protected def afterAll(): Unit = { + // Set these states back. + originalActiveSQLContext.foreach(ctx => SQLContext.setActive(ctx)) + originalInstantiatedSQLContext.foreach(ctx => SQLContext.setInstantiatedContext(ctx)) + } + + def testNewSession(rootSQLContext: SQLContext): Unit = { + // Make sure we can successfully create new Session. + rootSQLContext.newSession() + + // Reset the state. It is always safe to clear the active context. + SQLContext.clearActive() + } + + def testCreatingNewSQLContext(allowsMultipleContexts: Boolean): Unit = { + val conf = + sparkConf + .clone + .set(SQLConf.ALLOW_MULTIPLE_CONTEXTS.key, allowsMultipleContexts.toString) + val sparkContext = new SparkContext(conf) + + try { + if (allowsMultipleContexts) { + new SQLContext(sparkContext) + SQLContext.clearActive() + } else { + // If allowsMultipleContexts is false, make sure we can get the error. + val message = intercept[SparkException] { + new SQLContext(sparkContext) + }.getMessage + assert(message.contains("Only one SQLContext/HiveContext may be running")) + } + } finally { + sparkContext.stop() + } + } + + test("test the flag to disallow creating multiple root SQLContext") { + Seq(false, true).foreach { allowMultipleSQLContexts => + val conf = + sparkConf + .clone + .set(SQLConf.ALLOW_MULTIPLE_CONTEXTS.key, allowMultipleSQLContexts.toString) + val sc = new SparkContext(conf) + try { + val rootSQLContext = new SQLContext(sc) + testNewSession(rootSQLContext) + testNewSession(rootSQLContext) + testCreatingNewSQLContext(allowMultipleSQLContexts) + + SQLContext.clearInstantiatedContext(rootSQLContext) + } finally { + sc.stop() + } + } + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index dad1e2347c387..ddeadd3eb737d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -89,10 +89,11 @@ class HiveContext private[hive]( sc: SparkContext, cacheManager: CacheManager, @transient execHive: ClientWrapper, - @transient metaHive: ClientInterface) extends SQLContext(sc, cacheManager) with Logging { + @transient metaHive: ClientInterface, + isRootContext: Boolean) extends SQLContext(sc, cacheManager, isRootContext) with Logging { self => - def this(sc: SparkContext) = this(sc, new CacheManager, null, null) + def this(sc: SparkContext) = this(sc, new CacheManager, null, null, true) def this(sc: JavaSparkContext) = this(sc.sc) import org.apache.spark.sql.hive.HiveContext._ @@ -105,7 +106,12 @@ class HiveContext private[hive]( * and Hive client (both of execution and metadata) with existing HiveContext. */ override def newSession(): HiveContext = { - new HiveContext(sc, cacheManager, executionHive.newSession(), metadataHive.newSession()) + new HiveContext( + sc = sc, + cacheManager = cacheManager, + execHive = executionHive.newSession(), + metaHive = metadataHive.newSession(), + isRootContext = false) } /**