Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion python/pyspark/sql/pandas/conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,17 @@ def toPandas(self):
try:
from pyspark.sql.pandas.types import _check_series_localize_timestamps
import pyarrow
batches = self._collect_as_arrow()
# Rename columns to avoid duplicated column names.
tmp_column_names = ['col_{}'.format(i) for i in range(len(self.columns))]
batches = self.toDF(*tmp_column_names)._collect_as_arrow()
if len(batches) > 0:
table = pyarrow.Table.from_batches(batches)
# Pandas DataFrame created from PyArrow uses datetime64[ns] for date type
# values, but we should use datetime.date to match the behavior with when
# Arrow optimization is disabled.
pdf = table.to_pandas(date_as_object=True)
# Rename back to the original column names.
pdf.columns = self.columns
for field in self.schema:
if isinstance(field.dataType, TimestampType):
pdf[field.name] = \
Expand Down
27 changes: 21 additions & 6 deletions python/pyspark/sql/tests/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,19 @@ def test_to_pandas(self):
self.assertEquals(types[4], np.object) # datetime.date
self.assertEquals(types[5], 'datetime64[ns]')

@unittest.skipIf(not have_pandas, pandas_requirement_message)
def test_to_pandas_with_duplicated_column_names(self):
import numpy as np

sql = "select 1 v, 1 v"
for arrowEnabled in [False, True]:
with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": arrowEnabled}):
df = self.spark.sql(sql)
pdf = df.toPandas()
types = pdf.dtypes
self.assertEquals(types.iloc[0], np.int32)
self.assertEquals(types.iloc[1], np.int32)

@unittest.skipIf(not have_pandas, pandas_requirement_message)
def test_to_pandas_on_cross_join(self):
import numpy as np
Expand All @@ -540,12 +553,14 @@ def test_to_pandas_on_cross_join(self):
select explode(sequence(1, 3)) v
) t2
"""
with self.sql_conf({"spark.sql.crossJoin.enabled": True}):
df = self.spark.sql(sql)
pdf = df.toPandas()
types = pdf.dtypes
self.assertEquals(types.iloc[0], np.int32)
self.assertEquals(types.iloc[1], np.int32)
for arrowEnabled in [False, True]:
with self.sql_conf({"spark.sql.crossJoin.enabled": True,
"spark.sql.execution.arrow.pyspark.enabled": arrowEnabled}):
df = self.spark.sql(sql)
pdf = df.toPandas()
types = pdf.dtypes
self.assertEquals(types.iloc[0], np.int32)
self.assertEquals(types.iloc[1], np.int32)

@unittest.skipIf(have_pandas, "Required Pandas was found.")
def test_to_pandas_required_pandas_not_found(self):
Expand Down