Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1003,7 +1003,7 @@ class SparkILoop(

// NOTE: Must be public for visibility
@DeveloperApi
def createSparkContext(): SparkContext = {
def createSparkSession(): SparkSession = {
val execUri = System.getenv("SPARK_EXECUTOR_URI")
val jars = SparkILoop.getAddedJars
val conf = new SparkConf()
Expand All @@ -1019,22 +1019,18 @@ class SparkILoop(
if (execUri != null) {
conf.set("spark.executor.uri", execUri)
}
sparkContext = new SparkContext(conf)
logInfo("Created spark context..")
Signaling.cancelOnInterrupt(sparkContext)
sparkContext
}

@DeveloperApi
// TODO: don't duplicate this code
def createSparkSession(): SparkSession = {
if (SparkSession.hiveClassesArePresent) {
val builder = SparkSession.builder.config(conf)
val sparkSession = if (SparkSession.hiveClassesArePresent) {
logInfo("Creating Spark session with Hive support")
SparkSession.builder.enableHiveSupport().getOrCreate()
builder.enableHiveSupport().getOrCreate()
} else {
logInfo("Creating Spark session")
SparkSession.builder.getOrCreate()
builder.getOrCreate()
}
sparkContext = sparkSession.sparkContext
Signaling.cancelOnInterrupt(sparkContext)
sparkSession
}

private def getMaster(): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,19 +123,14 @@ private[repl] trait SparkILoopInit {
def initializeSpark() {
intp.beQuietDuring {
command("""
@transient val spark = org.apache.spark.repl.Main.interp.createSparkSession()
@transient val sc = {
val _sc = org.apache.spark.repl.Main.interp.createSparkContext()
val _sc = spark.sparkContext
_sc.uiWebUrl.foreach(webUrl => println(s"Spark context Web UI available at ${webUrl}"))
println("Spark context available as 'sc' " +
s"(master = ${_sc.master}, app id = ${_sc.applicationId}).")
_sc
}
""")
command("""
@transient val spark = {
val _session = org.apache.spark.repl.Main.interp.createSparkSession()
println("Spark session available as 'spark'.")
_session
_sc
}
""")
command("import org.apache.spark.SparkContext._")
Expand Down
27 changes: 12 additions & 15 deletions repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,35 +71,32 @@ object Main extends Logging {
}
}

def createSparkContext(): SparkContext = {
def createSparkSession(): SparkSession = {
val execUri = System.getenv("SPARK_EXECUTOR_URI")
conf.setIfMissing("spark.app.name", "Spark shell")
// SparkContext will detect this configuration and register it with the RpcEnv's
// file server, setting spark.repl.class.uri to the actual URI for executors to
// use. This is sort of ugly but since executors are started as part of SparkContext
// initialization in certain cases, there's an initialization order issue that prevents
// this from being set after SparkContext is instantiated.
.set("spark.repl.class.outputDir", outputDir.getAbsolutePath())
// SparkContext will detect this configuration and register it with the RpcEnv's
// file server, setting spark.repl.class.uri to the actual URI for executors to
// use. This is sort of ugly but since executors are started as part of SparkContext
// initialization in certain cases, there's an initialization order issue that prevents
// this from being set after SparkContext is instantiated.
conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath())
if (execUri != null) {
conf.set("spark.executor.uri", execUri)
}
if (System.getenv("SPARK_HOME") != null) {
conf.setSparkHome(System.getenv("SPARK_HOME"))
}
sparkContext = new SparkContext(conf)
logInfo("Created spark context..")
Signaling.cancelOnInterrupt(sparkContext)
sparkContext
}

def createSparkSession(): SparkSession = {
val builder = SparkSession.builder.config(conf)
if (SparkSession.hiveClassesArePresent) {
sparkSession = SparkSession.builder.enableHiveSupport().getOrCreate()
sparkSession = builder.enableHiveSupport().getOrCreate()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we want to use builder.config("spark.sql.catalogImplementation", "hive")?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's what enableHiveSupport does?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, right. We still have this method.

logInfo("Created Spark session with Hive support")
} else {
sparkSession = SparkSession.builder.getOrCreate()
sparkSession = builder.getOrCreate()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At here, maybe it is better to explicitly set spark.sql.catalogImplementation to in-memory?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm I think it's better to keep that flag contained rather than duplicating it everywhere

logInfo("Created Spark session")
}
sparkContext = sparkSession.sparkContext
Signaling.cancelOnInterrupt(sparkContext)
sparkSession
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,14 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter)
def initializeSpark() {
intp.beQuietDuring {
processLine("""
@transient val spark = org.apache.spark.repl.Main.createSparkSession()
@transient val sc = {
val _sc = org.apache.spark.repl.Main.createSparkContext()
val _sc = spark.sparkContext
_sc.uiWebUrl.foreach(webUrl => println(s"Spark context Web UI available at ${webUrl}"))
println("Spark context available as 'sc' " +
s"(master = ${_sc.master}, app id = ${_sc.applicationId}).")
_sc
}
""")
processLine("""
@transient val spark = {
val _session = org.apache.spark.repl.Main.createSparkSession()
println("Spark session available as 'spark'.")
_session
_sc
}
""")
processLine("import org.apache.spark.SparkContext._")
Expand Down