Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2127,10 +2127,14 @@ def toPandas(self):
from pyspark.sql.types import _check_dataframe_convert_date, \
_check_dataframe_localize_timestamps
import pyarrow
batches = self._collectAsArrow()
# Rename columns to avoid duplicated column names.
tmp_column_names = ['col_{}'.format(i) for i in range(len(self.columns))]
batches = self.toDF(*tmp_column_names)._collectAsArrow()
if len(batches) > 0:
table = pyarrow.Table.from_batches(batches)
pdf = table.to_pandas()
# Rename back to the original column names.
pdf.columns = self.columns
pdf = _check_dataframe_convert_date(pdf, self.schema)
return _check_dataframe_localize_timestamps(pdf, timezone)
else:
Expand Down
27 changes: 21 additions & 6 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -3296,6 +3296,19 @@ def test_to_pandas(self):
self.assertEquals(types[4], np.object) # datetime.date
self.assertEquals(types[5], 'datetime64[ns]')

@unittest.skipIf(not _have_pandas, _pandas_requirement_message)
def test_to_pandas_with_duplicated_column_names(self):
import numpy as np

sql = "select 1 v, 1 v"
for arrowEnabled in [False, True]:
with self.sql_conf({"spark.sql.execution.arrow.enabled": arrowEnabled}):
df = self.spark.sql(sql)
pdf = df.toPandas()
types = pdf.dtypes
self.assertEquals(types.iloc[0], np.int32)
self.assertEquals(types.iloc[1], np.int32)

@unittest.skipIf(not _have_pandas, _pandas_requirement_message)
def test_to_pandas_on_cross_join(self):
import numpy as np
Expand All @@ -3307,12 +3320,14 @@ def test_to_pandas_on_cross_join(self):
select explode(sequence(1, 3)) v
) t2
"""
with self.sql_conf({"spark.sql.crossJoin.enabled": True}):
df = self.spark.sql(sql)
pdf = df.toPandas()
types = pdf.dtypes
self.assertEquals(types.iloc[0], np.int32)
self.assertEquals(types.iloc[1], np.int32)
for arrowEnabled in [False, True]:
with self.sql_conf({"spark.sql.crossJoin.enabled": True,
"spark.sql.execution.arrow.enabled": arrowEnabled}):
df = self.spark.sql(sql)
pdf = df.toPandas()
types = pdf.dtypes
self.assertEquals(types.iloc[0], np.int32)
self.assertEquals(types.iloc[1], np.int32)

@unittest.skipIf(_have_pandas, "Required Pandas was found.")
def test_to_pandas_required_pandas_not_found(self):
Expand Down