Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 22 additions & 43 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,8 @@ import org.apache.spark.util.logging.DriverLogger
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
* cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
*
* Only one SparkContext may be active per JVM. You must `stop()` the active SparkContext before
* creating a new one. This limitation may eventually be removed; see SPARK-2243 for more details.
*
* @note Only one `SparkContext` should be active per JVM. You must `stop()` the
* active `SparkContext` before creating a new one.
* @param config a Spark Config object describing the application configuration. Any settings in
* this config overrides the default configs as well as system properties.
*/
Expand All @@ -75,14 +74,10 @@ class SparkContext(config: SparkConf) extends Logging {
// The call site where this SparkContext was constructed.
private val creationSite: CallSite = Utils.getCallSite()

// If true, log warnings instead of throwing exceptions when multiple SparkContexts are active
private val allowMultipleContexts: Boolean =
config.getBoolean("spark.driver.allowMultipleContexts", false)

// In order to prevent multiple SparkContexts from being active at the same time, mark this
// context as having started construction.
// NOTE: this must be placed at the beginning of the SparkContext constructor.
SparkContext.markPartiallyConstructed(this, allowMultipleContexts)
SparkContext.markPartiallyConstructed(this)

val startTime = System.currentTimeMillis()

Expand Down Expand Up @@ -2392,7 +2387,7 @@ class SparkContext(config: SparkConf) extends Logging {
// In order to prevent multiple SparkContexts from being active at the same time, mark this
// context as having finished construction.
// NOTE: this must be placed at the end of the SparkContext constructor.
SparkContext.setActiveContext(this, allowMultipleContexts)
SparkContext.setActiveContext(this)
}

/**
Expand All @@ -2409,43 +2404,35 @@ object SparkContext extends Logging {
private val SPARK_CONTEXT_CONSTRUCTOR_LOCK = new Object()

/**
* The active, fully-constructed SparkContext. If no SparkContext is active, then this is `null`.
* The active, fully-constructed SparkContext. If no SparkContext is active, then this is `null`.
*
* Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK.
* Access to this field is guarded by `SPARK_CONTEXT_CONSTRUCTOR_LOCK`.
*/
private val activeContext: AtomicReference[SparkContext] =
new AtomicReference[SparkContext](null)

/**
* Points to a partially-constructed SparkContext if some thread is in the SparkContext
* Points to a partially-constructed SparkContext if another thread is in the SparkContext
* constructor, or `None` if no SparkContext is being constructed.
*
* Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK
* Access to this field is guarded by `SPARK_CONTEXT_CONSTRUCTOR_LOCK`.
*/
private var contextBeingConstructed: Option[SparkContext] = None

/**
* Called to ensure that no other SparkContext is running in this JVM.
*
* Throws an exception if a running context is detected and logs a warning if another thread is
* constructing a SparkContext. This warning is necessary because the current locking scheme
* constructing a SparkContext. This warning is necessary because the current locking scheme
* prevents us from reliably distinguishing between cases where another context is being
* constructed and cases where another constructor threw an exception.
*/
private def assertNoOtherContextIsRunning(
sc: SparkContext,
allowMultipleContexts: Boolean): Unit = {
private def assertNoOtherContextIsRunning(sc: SparkContext): Unit = {
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
Option(activeContext.get()).filter(_ ne sc).foreach { ctx =>
val errMsg = "Only one SparkContext may be running in this JVM (see SPARK-2243)." +
" To ignore this error, set spark.driver.allowMultipleContexts = true. " +
val errMsg = "Only one SparkContext should be running in this JVM (see SPARK-2243)." +
s"The currently running SparkContext was created at:\n${ctx.creationSite.longForm}"
val exception = new SparkException(errMsg)
if (allowMultipleContexts) {
logWarning("Multiple running SparkContexts detected in the same JVM!", exception)
} else {
throw exception
}
throw new SparkException(errMsg)
}

contextBeingConstructed.filter(_ ne sc).foreach { otherContext =>
Expand All @@ -2454,7 +2441,7 @@ object SparkContext extends Logging {
val otherContextCreationSite =
Option(otherContext.creationSite).map(_.longForm).getOrElse("unknown location")
val warnMsg = "Another SparkContext is being constructed (or threw an exception in its" +
" constructor). This may indicate an error, since only one SparkContext may be" +
" constructor). This may indicate an error, since only one SparkContext should be" +
" running in this JVM (see SPARK-2243)." +
s" The other SparkContext was created at:\n$otherContextCreationSite"
logWarning(warnMsg)
Expand All @@ -2467,8 +2454,6 @@ object SparkContext extends Logging {
* singleton object. Because we can only have one active SparkContext per JVM,
* this is useful when applications may wish to share a SparkContext.
*
* @note This function cannot be used to create multiple SparkContext instances
* even if multiple contexts are allowed.
* @param config `SparkConfig` that will be used for initialisation of the `SparkContext`
* @return current `SparkContext` (or a new one if it wasn't created before the function call)
*/
Expand All @@ -2477,7 +2462,7 @@ object SparkContext extends Logging {
// from assertNoOtherContextIsRunning within setActiveContext
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
if (activeContext.get() == null) {
setActiveContext(new SparkContext(config), allowMultipleContexts = false)
setActiveContext(new SparkContext(config))
} else {
if (config.getAll.nonEmpty) {
logWarning("Using an existing SparkContext; some configuration may not take effect.")
Expand All @@ -2494,14 +2479,12 @@ object SparkContext extends Logging {
*
* This method allows not passing a SparkConf (useful if just retrieving).
*
* @note This function cannot be used to create multiple SparkContext instances
* even if multiple contexts are allowed.
* @return current `SparkContext` (or a new one if wasn't created before the function call)
*/
def getOrCreate(): SparkContext = {
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
if (activeContext.get() == null) {
setActiveContext(new SparkContext(), allowMultipleContexts = false)
setActiveContext(new SparkContext())
}
activeContext.get()
}
Expand All @@ -2516,16 +2499,14 @@ object SparkContext extends Logging {

/**
* Called at the beginning of the SparkContext constructor to ensure that no SparkContext is
* running. Throws an exception if a running context is detected and logs a warning if another
* thread is constructing a SparkContext. This warning is necessary because the current locking
* running. Throws an exception if a running context is detected and logs a warning if another
* thread is constructing a SparkContext. This warning is necessary because the current locking
* scheme prevents us from reliably distinguishing between cases where another context is being
* constructed and cases where another constructor threw an exception.
*/
private[spark] def markPartiallyConstructed(
sc: SparkContext,
allowMultipleContexts: Boolean): Unit = {
private[spark] def markPartiallyConstructed(sc: SparkContext): Unit = {
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
assertNoOtherContextIsRunning(sc, allowMultipleContexts)
assertNoOtherContextIsRunning(sc)
contextBeingConstructed = Some(sc)
}
}
Expand All @@ -2534,18 +2515,16 @@ object SparkContext extends Logging {
* Called at the end of the SparkContext constructor to ensure that no other SparkContext has
* raced with this constructor and started.
*/
private[spark] def setActiveContext(
sc: SparkContext,
allowMultipleContexts: Boolean): Unit = {
private[spark] def setActiveContext(sc: SparkContext): Unit = {
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
assertNoOtherContextIsRunning(sc, allowMultipleContexts)
assertNoOtherContextIsRunning(sc)
contextBeingConstructed = None
activeContext.set(sc)
}
}

/**
* Clears the active SparkContext metadata. This is called by `SparkContext#stop()`. It's
* Clears the active SparkContext metadata. This is called by `SparkContext#stop()`. It's
* also called in unit tests to prevent a flood of warnings from test suites that don't / can't
* properly clean up their SparkContexts.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD}
* A Java-friendly version of [[org.apache.spark.SparkContext]] that returns
* [[org.apache.spark.api.java.JavaRDD]]s and works with Java collections instead of Scala ones.
*
* Only one SparkContext may be active per JVM. You must `stop()` the active SparkContext before
* creating a new one. This limitation may eventually be removed; see SPARK-2243 for more details.
* @note Only one `SparkContext` should be active per JVM. You must `stop()` the
* active `SparkContext` before creating a new one.
*/
class JavaSparkContext(val sc: SparkContext) extends Closeable {

Expand Down
19 changes: 1 addition & 18 deletions core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
test("Only one SparkContext may be active at a time") {
// Regression test for SPARK-4180
val conf = new SparkConf().setAppName("test").setMaster("local")
.set("spark.driver.allowMultipleContexts", "false")
sc = new SparkContext(conf)
val envBefore = SparkEnv.get
// A SparkContext is already running, so we shouldn't be able to create a second one
Expand All @@ -58,7 +57,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
}

test("Can still construct a new SparkContext after failing to construct a previous one") {
val conf = new SparkConf().set("spark.driver.allowMultipleContexts", "false")
val conf = new SparkConf()
// This is an invalid configuration (no app name or master URL)
intercept[SparkException] {
new SparkContext(conf)
Expand All @@ -67,18 +66,6 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
sc = new SparkContext(conf.setMaster("local").setAppName("test"))
}

test("Check for multiple SparkContexts can be disabled via undocumented debug option") {
var secondSparkContext: SparkContext = null
try {
val conf = new SparkConf().setAppName("test").setMaster("local")
.set("spark.driver.allowMultipleContexts", "true")
sc = new SparkContext(conf)
secondSparkContext = new SparkContext(conf)
} finally {
Option(secondSparkContext).foreach(_.stop())
}
}

test("Test getOrCreate") {
var sc2: SparkContext = null
SparkContext.clearActiveContext()
Expand All @@ -92,10 +79,6 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
assert(sc === sc2)
assert(sc eq sc2)

// Try creating second context to confirm that it's still possible, if desired
sc2 = new SparkContext(new SparkConf().setAppName("test3").setMaster("local")
.set("spark.driver.allowMultipleContexts", "true"))

sc2.stop()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ import org.apache.spark.util.AccumulatorV2

class ExternalClusterManagerSuite extends SparkFunSuite with LocalSparkContext {
test("launch of backend and scheduler") {
val conf = new SparkConf().setMaster("myclusterManager").
setAppName("testcm").set("spark.driver.allowMultipleContexts", "true")
val conf = new SparkConf().setMaster("myclusterManager").setAppName("testcm")
sc = new SparkContext(conf)
// check if the scheduler components are created and initialized
sc.schedulerBackend match {
Expand Down
2 changes: 1 addition & 1 deletion docs/rdd-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ The first thing a Spark program must do is to create a [SparkContext](api/scala/
how to access a cluster. To create a `SparkContext` you first need to build a [SparkConf](api/scala/index.html#org.apache.spark.SparkConf) object
that contains information about your application.

Only one SparkContext may be active per JVM. You must `stop()` the active SparkContext before creating a new one.
Only one SparkContext should be active per JVM. You must `stop()` the active SparkContext before creating a new one.

{% highlight scala %}
val conf = new SparkConf().setAppName(appName).setMaster(master)
Expand Down
4 changes: 4 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,10 @@ object MimaExcludes {
// [SPARK-26139] Implement shuffle write metrics in SQL
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ShuffleDependency.this"),

// [SPARK-26362][CORE] Remove 'spark.driver.allowMultipleContexts' to disallow multiple creation of SparkContexts
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkContext.setActiveContext"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkContext.markPartiallyConstructed"),

// Data Source V2 API changes
(problem: Problem) => problem match {
case MissingClassProblem(cls) =>
Expand Down
3 changes: 3 additions & 0 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ class SparkContext(object):
Main entry point for Spark functionality. A SparkContext represents the
connection to a Spark cluster, and can be used to create L{RDD} and
broadcast variables on that cluster.

.. note:: Only one :class:`SparkContext` should be active per JVM. You must `stop()`
the active :class:`SparkContext` before creating a new one.
"""

_gateway = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,6 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
.setMaster("local[*]")
.setAppName("test")
.set("spark.ui.enabled", "false")
.set("spark.driver.allowMultipleContexts", "true")
.set(SQLConf.SHUFFLE_PARTITIONS.key, "5")
.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1")
Expand Down