Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 127 additions & 4 deletions sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
package org.apache.spark.sql

import java.beans.Introspector
import java.util.Properties

import scala.collection.immutable
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
Expand All @@ -30,7 +28,7 @@ import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{CATALOG_IMPLEMENTATION, ConfigEntry}
import org.apache.spark.internal.config.CATALOG_IMPLEMENTATION
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalog.Catalog
import org.apache.spark.sql.catalyst._
Expand All @@ -49,7 +47,16 @@ import org.apache.spark.util.Utils


/**
* The entry point to Spark execution.
* The entry point to programming Spark with the Dataset and DataFrame API.
*
* To create a SparkSession, use the following builder pattern:
*
* {{{
* SparkSession.builder()
* .master("local")
* .config("spark.some.config.option", "some-value").
* .getOrCreate()
* }}}
*/
class SparkSession private(
@transient val sparkContext: SparkContext,
Expand Down Expand Up @@ -635,6 +642,122 @@ class SparkSession private(

object SparkSession {

/**
* Builder for [[SparkSession]].
*/
class Builder {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding a clear() method so that Builder instance can be reused ?


private[this] val options = new scala.collection.mutable.HashMap[String, String]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about using j.u.c.ConcurrentHashMap?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It creates a lot of garbage for something that's not expected to be concurrent.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, what I meant was moving locking point from Building instance into options. I thought only getOrCreate needs locking on Builder instance.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But, forget about my comments. Builder is so simple and current implementation is solid, too.


/**
* Sets a name for the application, which will be shown in the Spark web UI.
*
* @since 2.0.0
*/
def appName(name: String): Builder = config("spark.app.name", name)

/**
* Sets a config option. Options set using this method are automatically propagated to
* both [[SparkConf]] and SparkSession's own configuration.
*
* @since 2.0.0
*/
def config(key: String, value: String): Builder = synchronized {
options += key -> value
this
}

/**
* Sets a config option. Options set using this method are automatically propagated to
* both [[SparkConf]] and SparkSession's own configuration.
*
* @since 2.0.0
*/
def config(key: String, value: Long): Builder = synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about other primitive types for the value: Int, Float, Short ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They don't matter as they just map into Long / Double.

options += key -> value.toString
this
}

/**
* Sets a config option. Options set using this method are automatically propagated to
* both [[SparkConf]] and SparkSession's own configuration.
*
* @since 2.0.0
*/
def config(key: String, value: Double): Builder = synchronized {
options += key -> value.toString
this
}

/**
* Sets a config option. Options set using this method are automatically propagated to
* both [[SparkConf]] and SparkSession's own configuration.
*
* @since 2.0.0
*/
def config(key: String, value: Boolean): Builder = synchronized {
options += key -> value.toString
this
}

/**
* Sets a list of config options based on the given [[SparkConf]].
*
* @since 2.0.0
*/
def config(conf: SparkConf): Builder = synchronized {
conf.getAll.foreach { case (k, v) => options += k -> v }
this
}

/**
* Sets the Spark master URL to connect to, such as "local" to run locally, "local[4]" to
* run locally with 4 cores, or "spark://master:7077" to run on a Spark standalone cluster.
*
* @since 2.0.0
*/
def master(master: String): Builder = config("spark.master", master)

/**
* Enables Hive support, including connectivity to a persistent Hive metastore, support for
* Hive serdes, and Hive user-defined functions.
*
* @since 2.0.0
*/
def enableHiveSupport(): Builder = synchronized {
if (hiveClassesArePresent) {
config(CATALOG_IMPLEMENTATION.key, "hive")
} else {
throw new IllegalArgumentException(
"Unable to instantiate SparkSession with Hive support because " +
"Hive classes are not found.")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can make withHiveSupport call this now: SparkSession.enableHiveSupport().getOrCreate()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably remove that now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably as a separate pr

}

/**
* Gets an existing [[SparkSession]] or, if there is no existing one, creates a new one
* based on the options set in this builder.
*
* @since 2.0.0
*/
def getOrCreate(): SparkSession = synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably also have SparkSession.getOrCreate. I think people are going to try that before checking out the builder stuff.

// Step 1. Create a SparkConf
// Step 2. Get a SparkContext
// Step 3. Get a SparkSession
val sparkConf = new SparkConf()
options.foreach { case (k, v) => sparkConf.set(k, v) }
val sparkContext = SparkContext.getOrCreate(sparkConf)

SQLContext.getOrCreate(sparkContext).sparkSession
}
}

/**
* Creates a [[SparkSession.Builder]] for constructing a [[SparkSession]].
* @since 2.0.0
*/
def builder: Builder = new Builder

private val HIVE_SHARED_STATE_CLASS_NAME = "org.apache.spark.sql.hive.HiveSharedState"
private val HIVE_SESSION_STATE_CLASS_NAME = "org.apache.spark.sql.hive.HiveSessionState"

Expand Down