Skip to content

Commit 4ddaff8

Browse files
SPARK-25003: Address Reviewer Comments
1 parent 67d9772 commit 4ddaff8

File tree

3 files changed

+26
-44
lines changed

3 files changed

+26
-44
lines changed

python/pyspark/sql/session.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -212,16 +212,13 @@ def __init__(self, sparkContext, jsparkSession=None):
212212
self._sc = sparkContext
213213
self._jsc = self._sc._jsc
214214
self._jvm = self._sc._jvm
215-
216215
if jsparkSession is None:
217216
if self._jvm.SparkSession.getDefaultSession().isDefined() \
218217
and not self._jvm.SparkSession.getDefaultSession().get() \
219218
.sparkContext().isStopped():
220219
jsparkSession = self._jvm.SparkSession.getDefaultSession().get()
221220
else:
222-
extensions = self._sc._jvm.org.apache.spark.sql\
223-
.SparkSessionExtensions(self._jsc.getConf())
224-
jsparkSession = self._jvm.SparkSession(self._jsc.sc(), extensions)
221+
jsparkSession = self._jvm.SparkSession(self._jsc.sc())
225222

226223
self._jsparkSession = jsparkSession
227224
self._jwrapped = self._jsparkSession.sqlContext()

sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,10 +86,7 @@ class SparkSession private(
8686

8787
private[sql] def this(sc: SparkContext) {
8888
this(sc, None, None, new SparkSessionExtensions)
89-
}
90-
91-
private[sql] def this(sc: SparkContext, extensions: SparkSessionExtensions) {
92-
this(sc, None, None, extensions)
89+
SparkSession.applyExtensionsFromConf(sc.getConf, this.extensions)
9390
}
9491

9592
sparkContext.assertNotStopped()
@@ -939,7 +936,7 @@ object SparkSession extends Logging {
939936
// Do not update `SparkConf` for existing `SparkContext`, as it's shared by all sessions.
940937
}
941938

942-
SparkSessionExtensions.applyExtensionsFromConf(sparkContext.conf, extensions)
939+
applyExtensionsFromConf(sparkContext.conf, extensions)
943940

944941
session = new SparkSession(sparkContext, None, None, extensions)
945942
options.foreach { case (k, v) => session.initialSessionOptions.put(k, v) }
@@ -1124,4 +1121,27 @@ object SparkSession extends Logging {
11241121
SparkSession.clearDefaultSession()
11251122
}
11261123
}
1124+
1125+
/**
1126+
* Initialize extensions if the user has defined a configurator class in their SparkConf.
1127+
* This class will be applied to the extensions passed into this function.
1128+
*/
1129+
private[sql] def applyExtensionsFromConf(conf: SparkConf, extensions: SparkSessionExtensions) {
1130+
val extensionConfOption = conf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS)
1131+
if (extensionConfOption.isDefined) {
1132+
val extensionConfClassName = extensionConfOption.get
1133+
try {
1134+
val extensionConfClass = Utils.classForName(extensionConfClassName)
1135+
val extensionConf = extensionConfClass.newInstance()
1136+
.asInstanceOf[SparkSessionExtensions => Unit]
1137+
extensionConf(extensions)
1138+
} catch {
1139+
// Ignore the error if we cannot find the class or when the class has the wrong type.
1140+
case e@(_: ClassCastException |
1141+
_: ClassNotFoundException |
1142+
_: NoClassDefFoundError) =>
1143+
logWarning(s"Cannot use $extensionConfClassName to configure session extensions.", e)
1144+
}
1145+
}
1146+
}
11271147
}

sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,10 @@ package org.apache.spark.sql
1919

2020
import scala.collection.mutable
2121

22-
import org.apache.spark.SparkConf
2322
import org.apache.spark.annotation.{DeveloperApi, Experimental, InterfaceStability}
24-
import org.apache.spark.internal.Logging
2523
import org.apache.spark.sql.catalyst.parser.ParserInterface
2624
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2725
import org.apache.spark.sql.catalyst.rules.Rule
28-
import org.apache.spark.sql.internal.StaticSQLConf
29-
import org.apache.spark.util.Utils
3026

3127
/**
3228
* :: Experimental ::
@@ -70,11 +66,6 @@ class SparkSessionExtensions {
7066
type StrategyBuilder = SparkSession => Strategy
7167
type ParserBuilder = (SparkSession, ParserInterface) => ParserInterface
7268

73-
private[sql] def this(conf: SparkConf) {
74-
this()
75-
SparkSessionExtensions.applyExtensionsFromConf(conf, this)
76-
}
77-
7869
private[this] val resolutionRuleBuilders = mutable.Buffer.empty[RuleBuilder]
7970

8071
/**
@@ -178,29 +169,3 @@ class SparkSessionExtensions {
178169
parserBuilders += builder
179170
}
180171
}
181-
182-
object SparkSessionExtensions extends Logging {
183-
184-
/**
185-
* Initialize extensions if the user has defined a configurator class in their SparkConf.
186-
* This class will be applied to the extensions passed into this function.
187-
*/
188-
private[sql] def applyExtensionsFromConf(conf: SparkConf, extensions: SparkSessionExtensions) {
189-
val extensionConfOption = conf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS)
190-
if (extensionConfOption.isDefined) {
191-
val extensionConfClassName = extensionConfOption.get
192-
try {
193-
val extensionConfClass = Utils.classForName(extensionConfClassName)
194-
val extensionConf = extensionConfClass.newInstance()
195-
.asInstanceOf[SparkSessionExtensions => Unit]
196-
extensionConf(extensions)
197-
} catch {
198-
// Ignore the error if we cannot find the class or when the class has the wrong type.
199-
case e@(_: ClassCastException |
200-
_: ClassNotFoundException |
201-
_: NoClassDefFoundError) =>
202-
logWarning(s"Cannot use $extensionConfClassName to configure session extensions.", e)
203-
}
204-
}
205-
}
206-
}

0 commit comments

Comments
 (0)