Skip to content

Commit

Permalink
[SPARK-11042] [SQL] Add a mechanism to ban creating multiple root SQL…
Browse files Browse the repository at this point in the history
…Contexts/HiveContexts in a JVM

https://issues.apache.org/jira/browse/SPARK-11042

Author: Yin Huai <yhuai@databricks.com>

Closes apache#9058 from yhuai/SPARK-11042.
  • Loading branch information
yhuai authored and davies committed Oct 12, 2015
1 parent 2e572c4 commit 8a354be
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 7 deletions.
10 changes: 10 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,16 @@ private[spark] object SQLConf {

import SQLConfEntry._

val ALLOW_MULTIPLE_CONTEXTS = booleanConf("spark.sql.allowMultipleContexts",
defaultValue = Some(true),
doc = "When set to true, creating multiple SQLContexts/HiveContexts is allowed." +
"When set to false, only one SQLContext/HiveContext is allowed to be created " +
"through the constructor (new SQLContexts/HiveContexts created through newSession " +
"method is allowed). Please note that this conf needs to be set in Spark Conf. Once" +
"a SQLContext/HiveContext has been created, changing the value of this conf will not" +
"have effect.",
isPublic = true)

val COMPRESS_CACHED = booleanConf("spark.sql.inMemoryColumnarStorage.compressed",
defaultValue = Some(true),
doc = "When set to true Spark SQL will automatically select a compression codec for each " +
Expand Down
42 changes: 38 additions & 4 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.collection.immutable
import scala.reflect.runtime.universe.TypeTag
import scala.util.control.NonFatal

import org.apache.spark.SparkContext
import org.apache.spark.{SparkException, SparkContext}
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -64,22 +64,48 @@ import org.apache.spark.util.Utils
*/
class SQLContext private[sql](
@transient val sparkContext: SparkContext,
@transient protected[sql] val cacheManager: CacheManager)
@transient protected[sql] val cacheManager: CacheManager,
val isRootContext: Boolean)
extends org.apache.spark.Logging with Serializable {

self =>

def this(sparkContext: SparkContext) = this(sparkContext, new CacheManager)
def this(sparkContext: SparkContext) = this(sparkContext, new CacheManager, true)
def this(sparkContext: JavaSparkContext) = this(sparkContext.sc)

// If spark.sql.allowMultipleContexts is true, we will throw an exception if a user
// wants to create a new root SQLContext (a SLQContext that is not created by newSession).
private val allowMultipleContexts =
sparkContext.conf.getBoolean(
SQLConf.ALLOW_MULTIPLE_CONTEXTS.key,
SQLConf.ALLOW_MULTIPLE_CONTEXTS.defaultValue.get)

// Assert no root SQLContext is running when allowMultipleContexts is false.
{
if (!allowMultipleContexts && isRootContext) {
SQLContext.getInstantiatedContextOption() match {
case Some(rootSQLContext) =>
val errMsg = "Only one SQLContext/HiveContext may be running in this JVM. " +
s"It is recommended to use SQLContext.getOrCreate to get the instantiated " +
s"SQLContext/HiveContext. To ignore this error, " +
s"set ${SQLConf.ALLOW_MULTIPLE_CONTEXTS.key} = true in SparkConf."
throw new SparkException(errMsg)
case None => // OK
}
}
}

/**
* Returns a SQLContext as new session, with separated SQL configurations, temporary tables,
* registered functions, but sharing the same SparkContext and CacheManager.
*
* @since 1.6.0
*/
def newSession(): SQLContext = {
new SQLContext(sparkContext, cacheManager)
new SQLContext(
sparkContext = sparkContext,
cacheManager = cacheManager,
isRootContext = false)
}

/**
Expand Down Expand Up @@ -1239,6 +1265,10 @@ object SQLContext {
instantiatedContext.compareAndSet(null, sqlContext)
}

private[sql] def getInstantiatedContextOption(): Option[SQLContext] = {
Option(instantiatedContext.get())
}

/**
* Changes the SQLContext that will be returned in this thread and its children when
* SQLContext.getOrCreate() is called. This can be used to ensure that a given thread receives
Expand All @@ -1260,6 +1290,10 @@ object SQLContext {
activeContext.remove()
}

private[sql] def getActiveContextOption(): Option[SQLContext] = {
Option(activeContext.get())
}

/**
* Converts an iterator of Java Beans to InternalRow using the provided
* bean info & schema. This is not related to the singleton, but is a static
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql

import org.apache.spark._
import org.scalatest.BeforeAndAfterAll

class MultiSQLContextsSuite extends SparkFunSuite with BeforeAndAfterAll {

private var originalActiveSQLContext: Option[SQLContext] = _
private var originalInstantiatedSQLContext: Option[SQLContext] = _
private var sparkConf: SparkConf = _

override protected def beforeAll(): Unit = {
originalActiveSQLContext = SQLContext.getActiveContextOption()
originalInstantiatedSQLContext = SQLContext.getInstantiatedContextOption()

SQLContext.clearActive()
originalInstantiatedSQLContext.foreach(ctx => SQLContext.clearInstantiatedContext(ctx))
sparkConf =
new SparkConf(false)
.setMaster("local[*]")
.setAppName("test")
.set("spark.ui.enabled", "false")
.set("spark.driver.allowMultipleContexts", "true")
}

override protected def afterAll(): Unit = {
// Set these states back.
originalActiveSQLContext.foreach(ctx => SQLContext.setActive(ctx))
originalInstantiatedSQLContext.foreach(ctx => SQLContext.setInstantiatedContext(ctx))
}

def testNewSession(rootSQLContext: SQLContext): Unit = {
// Make sure we can successfully create new Session.
rootSQLContext.newSession()

// Reset the state. It is always safe to clear the active context.
SQLContext.clearActive()
}

def testCreatingNewSQLContext(allowsMultipleContexts: Boolean): Unit = {
val conf =
sparkConf
.clone
.set(SQLConf.ALLOW_MULTIPLE_CONTEXTS.key, allowsMultipleContexts.toString)
val sparkContext = new SparkContext(conf)

try {
if (allowsMultipleContexts) {
new SQLContext(sparkContext)
SQLContext.clearActive()
} else {
// If allowsMultipleContexts is false, make sure we can get the error.
val message = intercept[SparkException] {
new SQLContext(sparkContext)
}.getMessage
assert(message.contains("Only one SQLContext/HiveContext may be running"))
}
} finally {
sparkContext.stop()
}
}

test("test the flag to disallow creating multiple root SQLContext") {
Seq(false, true).foreach { allowMultipleSQLContexts =>
val conf =
sparkConf
.clone
.set(SQLConf.ALLOW_MULTIPLE_CONTEXTS.key, allowMultipleSQLContexts.toString)
val sc = new SparkContext(conf)
try {
val rootSQLContext = new SQLContext(sc)
testNewSession(rootSQLContext)
testNewSession(rootSQLContext)
testCreatingNewSQLContext(allowMultipleSQLContexts)

SQLContext.clearInstantiatedContext(rootSQLContext)
} finally {
sc.stop()
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,11 @@ class HiveContext private[hive](
sc: SparkContext,
cacheManager: CacheManager,
@transient execHive: ClientWrapper,
@transient metaHive: ClientInterface) extends SQLContext(sc, cacheManager) with Logging {
@transient metaHive: ClientInterface,
isRootContext: Boolean) extends SQLContext(sc, cacheManager, isRootContext) with Logging {
self =>

def this(sc: SparkContext) = this(sc, new CacheManager, null, null)
def this(sc: SparkContext) = this(sc, new CacheManager, null, null, true)
def this(sc: JavaSparkContext) = this(sc.sc)

import org.apache.spark.sql.hive.HiveContext._
Expand All @@ -105,7 +106,12 @@ class HiveContext private[hive](
* and Hive client (both of execution and metadata) with existing HiveContext.
*/
override def newSession(): HiveContext = {
new HiveContext(sc, cacheManager, executionHive.newSession(), metadataHive.newSession())
new HiveContext(
sc = sc,
cacheManager = cacheManager,
execHive = executionHive.newSession(),
metaHive = metadataHive.newSession(),
isRootContext = false)
}

/**
Expand Down

0 comments on commit 8a354be

Please sign in to comment.