diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index b58d976abbeb..1c013cf25b8a 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -2127,10 +2127,14 @@ def toPandas(self): from pyspark.sql.types import _check_dataframe_convert_date, \ _check_dataframe_localize_timestamps import pyarrow - batches = self._collectAsArrow() + # Rename columns to avoid duplicated column names. + tmp_column_names = ['col_{}'.format(i) for i in range(len(self.columns))] + batches = self.toDF(*tmp_column_names)._collectAsArrow() if len(batches) > 0: table = pyarrow.Table.from_batches(batches) pdf = table.to_pandas() + # Rename back to the original column names. + pdf.columns = self.columns pdf = _check_dataframe_convert_date(pdf, self.schema) return _check_dataframe_localize_timestamps(pdf, timezone) else: diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index d359e005fc9d..1bbf08d2056f 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -3296,6 +3296,19 @@ def test_to_pandas(self): self.assertEquals(types[4], np.object) # datetime.date self.assertEquals(types[5], 'datetime64[ns]') + @unittest.skipIf(not _have_pandas, _pandas_requirement_message) + def test_to_pandas_with_duplicated_column_names(self): + import numpy as np + + sql = "select 1 v, 1 v" + for arrowEnabled in [False, True]: + with self.sql_conf({"spark.sql.execution.arrow.enabled": arrowEnabled}): + df = self.spark.sql(sql) + pdf = df.toPandas() + types = pdf.dtypes + self.assertEquals(types.iloc[0], np.int32) + self.assertEquals(types.iloc[1], np.int32) + @unittest.skipIf(not _have_pandas, _pandas_requirement_message) def test_to_pandas_on_cross_join(self): import numpy as np @@ -3307,12 +3320,14 @@ def test_to_pandas_on_cross_join(self): select explode(sequence(1, 3)) v ) t2 """ - with self.sql_conf({"spark.sql.crossJoin.enabled": True}): - df = self.spark.sql(sql) - pdf = df.toPandas() - types = pdf.dtypes - self.assertEquals(types.iloc[0], np.int32) - self.assertEquals(types.iloc[1], np.int32) + for arrowEnabled in [False, True]: + with self.sql_conf({"spark.sql.crossJoin.enabled": True, + "spark.sql.execution.arrow.enabled": arrowEnabled}): + df = self.spark.sql(sql) + pdf = df.toPandas() + types = pdf.dtypes + self.assertEquals(types.iloc[0], np.int32) + self.assertEquals(types.iloc[1], np.int32) @unittest.skipIf(_have_pandas, "Required Pandas was found.") def test_to_pandas_required_pandas_not_found(self):