Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -818,15 +818,13 @@ class ALSCleanerSuite extends SparkFunSuite {
FileUtils.listFiles(localDir, TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE).asScala.toSet
try {
conf.set("spark.local.dir", localDir.getAbsolutePath)
val sc = new SparkContext("local[2]", "test", conf)
val sc = new SparkContext("local[2]", "ALSCleanerSuite", conf)
try {
sc.setCheckpointDir(checkpointDir.getAbsolutePath)
// Generate test data
val (training, _) = ALSSuite.genImplicitTestData(sc, 20, 5, 1, 0.2, 0)
// Implicitly test the cleaning of parents during ALS training
val spark = SparkSession.builder
.master("local[2]")
.appName("ALSCleanerSuite")
.sparkContext(sc)
.getOrCreate()
import spark.implicits._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ private[ml] object TreeTests extends SparkFunSuite {
categoricalFeatures: Map[Int, Int],
numClasses: Int): DataFrame = {
val spark = SparkSession.builder()
.master("local[2]")
.appName("TreeTests")
.sparkContext(data.sparkContext)
.getOrCreate()
import spark.implicits._
Expand Down
23 changes: 8 additions & 15 deletions sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,8 @@ object SparkSession {

private[this] var userSuppliedContext: Option[SparkContext] = None

// The `SparkConf` inside the given `SparkContext` may get changed if you specify some options
// for this builder.
private[spark] def sparkContext(sparkContext: SparkContext): Builder = synchronized {
userSuppliedContext = Option(sparkContext)
this
Expand Down Expand Up @@ -859,7 +861,7 @@ object SparkSession {
*
* @since 2.2.0
*/
def withExtensions(f: SparkSessionExtensions => Unit): Builder = {
def withExtensions(f: SparkSessionExtensions => Unit): Builder = synchronized {
f(extensions)
this
}
Expand Down Expand Up @@ -904,22 +906,14 @@ object SparkSession {

// No active nor global default session. Create a new one.
val sparkContext = userSuppliedContext.getOrElse {
// set app name if not given
val randomAppName = java.util.UUID.randomUUID().toString
val sparkConf = new SparkConf()
options.foreach { case (k, v) => sparkConf.set(k, v) }
Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Jul 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is changed from the previous patch. options are copied into new SparkConf.

if (!sparkConf.contains("spark.app.name")) {
sparkConf.setAppName(randomAppName)
}
val sc = SparkContext.getOrCreate(sparkConf)
// maybe this is an existing SparkContext, update its SparkConf which maybe used
// by SparkSession
options.foreach { case (k, v) => sc.conf.set(k, v) }
if (!sc.conf.contains("spark.app.name")) {
sc.conf.setAppName(randomAppName)
}
sc
// set a random app name if not given.
sparkConf.setAppName(options.getOrElse("spark.app.name",
java.util.UUID.randomUUID().toString))
SparkContext.getOrCreate(sparkConf)
}
options.foreach { case (k, v) => sparkContext.conf.set(k, v) }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only concern is sparkContext could be shared by multiple sessions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then, how do you think about #18501 using initialSessionOptions? That is used only once.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already did this in https://github.com/apache/spark/pull/18512/files#diff-d91c284798f1c98bf03a31855e26d71cL917 .

The only difference is about userSuppliedContext, which is given by a private method, so we should be able to check if it's safe or not.

Copy link
Member

@gatorsmile gatorsmile Jul 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not fully private. We expose it to public in HiveContext.scala

    sparkContext.stop()
    val conf = new SparkConf().setAppName("test").setMaster("local").set("key1", "value1")
    val sparkContext2 = new SparkContext(conf)
    val session =
      SparkSession.builder().config("key2", "value2").sparkContext(sparkContext2).getOrCreate()
    assert(session.conf.get("key1") == "value1")
    assert(session.conf.get("key2") == "value2")
    assert(session.sparkContext.conf.get("key1") == "value1")
    assert(session.sparkContext.conf.get("key2").isEmpty) // <-- This line will fail after the changes of this PR.
    assert(session.sparkContext.conf.get("spark.app.name") == "test")
    session.stop()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing behavior is weird. See #18517


// Initialize extensions if the user has defined a configurator class.
val extensionConfOption = sparkContext.conf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS)
Expand All @@ -940,7 +934,6 @@ object SparkSession {
}

session = new SparkSession(sparkContext, None, None, extensions)
options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
defaultSession.set(session)

// Register a successfully instantiated context to the singleton. This should be at the
Expand Down