diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index 47cf8bbc5b68..251625ae412c 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -103,13 +103,17 @@ def toPandas(self): try: from pyspark.sql.pandas.types import _check_series_localize_timestamps import pyarrow - batches = self._collect_as_arrow() + # Rename columns to avoid duplicated column names. + tmp_column_names = ['col_{}'.format(i) for i in range(len(self.columns))] + batches = self.toDF(*tmp_column_names)._collect_as_arrow() if len(batches) > 0: table = pyarrow.Table.from_batches(batches) # Pandas DataFrame created from PyArrow uses datetime64[ns] for date type # values, but we should use datetime.date to match the behavior with when # Arrow optimization is disabled. pdf = table.to_pandas(date_as_object=True) + # Rename back to the original column names. + pdf.columns = self.columns for field in self.schema: if isinstance(field.dataType, TimestampType): pdf[field.name] = \ diff --git a/python/pyspark/sql/tests/test_dataframe.py b/python/pyspark/sql/tests/test_dataframe.py index b3f4d5cd6b94..9861178158f8 100644 --- a/python/pyspark/sql/tests/test_dataframe.py +++ b/python/pyspark/sql/tests/test_dataframe.py @@ -529,6 +529,19 @@ def test_to_pandas(self): self.assertEquals(types[4], np.object) # datetime.date self.assertEquals(types[5], 'datetime64[ns]') + @unittest.skipIf(not have_pandas, pandas_requirement_message) + def test_to_pandas_with_duplicated_column_names(self): + import numpy as np + + sql = "select 1 v, 1 v" + for arrowEnabled in [False, True]: + with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": arrowEnabled}): + df = self.spark.sql(sql) + pdf = df.toPandas() + types = pdf.dtypes + self.assertEquals(types.iloc[0], np.int32) + self.assertEquals(types.iloc[1], np.int32) + @unittest.skipIf(not have_pandas, pandas_requirement_message) def test_to_pandas_on_cross_join(self): import numpy as np @@ -540,12 +553,14 @@ def test_to_pandas_on_cross_join(self): select explode(sequence(1, 3)) v ) t2 """ - with self.sql_conf({"spark.sql.crossJoin.enabled": True}): - df = self.spark.sql(sql) - pdf = df.toPandas() - types = pdf.dtypes - self.assertEquals(types.iloc[0], np.int32) - self.assertEquals(types.iloc[1], np.int32) + for arrowEnabled in [False, True]: + with self.sql_conf({"spark.sql.crossJoin.enabled": True, + "spark.sql.execution.arrow.pyspark.enabled": arrowEnabled}): + df = self.spark.sql(sql) + pdf = df.toPandas() + types = pdf.dtypes + self.assertEquals(types.iloc[0], np.int32) + self.assertEquals(types.iloc[1], np.int32) @unittest.skipIf(have_pandas, "Required Pandas was found.") def test_to_pandas_required_pandas_not_found(self):