Skip to content

Commit ee214ef

Browse files
ueshinHyukjinKwon
authored andcommitted
[SPARK-25525][SQL][PYSPARK] Do not update conf for existing SparkContext in SparkSession.getOrCreate.
## What changes were proposed in this pull request? In [SPARK-20946](https://issues.apache.org/jira/browse/SPARK-20946), we modified `SparkSession.getOrCreate` to not update conf for existing `SparkContext` because `SparkContext` is shared by all sessions. We should not update it in PySpark side as well. ## How was this patch tested? Added tests. Closes #22545 from ueshin/issues/SPARK-25525/not_update_existing_conf. Authored-by: Takuya UESHIN <ueshin@databricks.com> Signed-off-by: hyukjinkwon <gurwls223@apache.org>
1 parent 5def10e commit ee214ef

File tree

2 files changed

+49
-11
lines changed

2 files changed

+49
-11
lines changed

python/pyspark/sql/session.py

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ def getOrCreate(self):
156156
default.
157157
158158
>>> s1 = SparkSession.builder.config("k1", "v1").getOrCreate()
159-
>>> s1.conf.get("k1") == s1.sparkContext.getConf().get("k1") == "v1"
159+
>>> s1.conf.get("k1") == "v1"
160160
True
161161
162162
In case an existing SparkSession is returned, the config options specified
@@ -179,19 +179,13 @@ def getOrCreate(self):
179179
sparkConf = SparkConf()
180180
for key, value in self._options.items():
181181
sparkConf.set(key, value)
182-
sc = SparkContext.getOrCreate(sparkConf)
183182
# This SparkContext may be an existing one.
184-
for key, value in self._options.items():
185-
# we need to propagate the confs
186-
# before we create the SparkSession. Otherwise, confs like
187-
# warehouse path and metastore url will not be set correctly (
188-
# these confs cannot be changed once the SparkSession is created).
189-
sc._conf.set(key, value)
183+
sc = SparkContext.getOrCreate(sparkConf)
184+
# Do not update `SparkConf` for existing `SparkContext`, as it's shared
185+
# by all sessions.
190186
session = SparkSession(sc)
191187
for key, value in self._options.items():
192188
session._jsparkSession.sessionState().conf().setConfString(key, value)
193-
for key, value in self._options.items():
194-
session.sparkContext._conf.set(key, value)
195189
return session
196190

197191
builder = Builder()

python/pyspark/sql/tests.py

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@
8080
_have_pyarrow = _pyarrow_requirement_message is None
8181
_test_compiled = _test_not_compiled_message is None
8282

83-
from pyspark import SparkContext
83+
from pyspark import SparkConf, SparkContext
8484
from pyspark.sql import SparkSession, SQLContext, HiveContext, Column, Row
8585
from pyspark.sql.types import *
8686
from pyspark.sql.types import UserDefinedType, _infer_type, _make_type_verifier
@@ -283,6 +283,50 @@ def test_invalid_create_row(self):
283283
self.assertRaises(ValueError, lambda: row_class(1, 2, 3))
284284

285285

286+
class SparkSessionBuilderTests(unittest.TestCase):
287+
288+
def test_create_spark_context_first_then_spark_session(self):
289+
sc = None
290+
session = None
291+
try:
292+
conf = SparkConf().set("key1", "value1")
293+
sc = SparkContext('local[4]', "SessionBuilderTests", conf=conf)
294+
session = SparkSession.builder.config("key2", "value2").getOrCreate()
295+
296+
self.assertEqual(session.conf.get("key1"), "value1")
297+
self.assertEqual(session.conf.get("key2"), "value2")
298+
self.assertEqual(session.sparkContext, sc)
299+
300+
self.assertFalse(sc.getConf().contains("key2"))
301+
self.assertEqual(sc.getConf().get("key1"), "value1")
302+
finally:
303+
if session is not None:
304+
session.stop()
305+
if sc is not None:
306+
sc.stop()
307+
308+
def test_another_spark_session(self):
309+
session1 = None
310+
session2 = None
311+
try:
312+
session1 = SparkSession.builder.config("key1", "value1").getOrCreate()
313+
session2 = SparkSession.builder.config("key2", "value2").getOrCreate()
314+
315+
self.assertEqual(session1.conf.get("key1"), "value1")
316+
self.assertEqual(session2.conf.get("key1"), "value1")
317+
self.assertEqual(session1.conf.get("key2"), "value2")
318+
self.assertEqual(session2.conf.get("key2"), "value2")
319+
self.assertEqual(session1.sparkContext, session2.sparkContext)
320+
321+
self.assertEqual(session1.sparkContext.getConf().get("key1"), "value1")
322+
self.assertFalse(session1.sparkContext.getConf().contains("key2"))
323+
finally:
324+
if session1 is not None:
325+
session1.stop()
326+
if session2 is not None:
327+
session2.stop()
328+
329+
286330
class SQLTests(ReusedSQLTestCase):
287331

288332
@classmethod

0 commit comments

Comments
 (0)