Skip to content

Commit e482dc1

Browse files
committed
Backport the fix of 'read.json(rdd)' in #10559 to branch-1.6
1 parent 96e32db commit e482dc1

File tree

2 files changed

+14
-4
lines changed

2 files changed

+14
-4
lines changed

python/pyspark/sql/readwriter.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,17 @@ def json(self, path, schema=None):
177177
elif type(path) == list:
178178
return self._df(self._jreader.json(self._sqlContext._sc._jvm.PythonUtils.toSeq(path)))
179179
elif isinstance(path, RDD):
180-
return self._df(self._jreader.json(path._jrdd))
180+
def func(iterator):
181+
for x in iterator:
182+
if not isinstance(x, basestring):
183+
x = unicode(x)
184+
if isinstance(x, unicode):
185+
x = x.encode("utf-8")
186+
yield x
187+
keyed = path.mapPartitions(func)
188+
keyed._bypass_serializer = True
189+
jrdd = keyed._jrdd.map(self._sqlContext._jvm.BytesToString())
190+
return self._df(self._jreader.json(jrdd))
181191
else:
182192
raise TypeError("path can be only string or RDD")
183193

python/pyspark/sql/tests.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ def test_broadcast_in_udf(self):
326326

327327
def test_basic_functions(self):
328328
rdd = self.sc.parallelize(['{"foo":"bar"}', '{"foo":"baz"}'])
329-
df = self.sqlCtx.jsonRDD(rdd)
329+
df = self.sqlCtx.read.json(rdd)
330330
df.count()
331331
df.collect()
332332
df.schema
@@ -345,7 +345,7 @@ def test_basic_functions(self):
345345
df.collect()
346346

347347
def test_apply_schema_to_row(self):
348-
df = self.sqlCtx.jsonRDD(self.sc.parallelize(["""{"a":2}"""]))
348+
df = self.sqlCtx.read.json(self.sc.parallelize(["""{"a":2}"""]))
349349
df2 = self.sqlCtx.createDataFrame(df.map(lambda x: x), df.schema)
350350
self.assertEqual(df.collect(), df2.collect())
351351

@@ -821,7 +821,7 @@ def test_save_and_load_builder(self):
821821
def test_help_command(self):
822822
# Regression test for SPARK-5464
823823
rdd = self.sc.parallelize(['{"foo":"bar"}', '{"foo":"baz"}'])
824-
df = self.sqlCtx.jsonRDD(rdd)
824+
df = self.sqlCtx.read.json(rdd)
825825
# render_doc() reproduces the help() exception without printing output
826826
pydoc.render_doc(df)
827827
pydoc.render_doc(df.foo)

0 commit comments

Comments
 (0)