Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/pyspark-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ Please refer [Migration Guide: SQL, Datasets and DataFrame](sql-migration-guide.
- Since Spark 3.0, `Column.getItem` is fixed such that it does not call `Column.apply`. Consequently, if `Column` is used as an argument to `getItem`, the indexing operator should be used.
For example, `map_col.getItem(col('id'))` should be replaced with `map_col[col('id')]`.

- As of Spark 3.0 `Row` field names are no longer sorted alphabetically when constructing with named arguments for Python versions 3.6 and above, and the order of fields will match that as entered. To enable sorted fields by default, as in Spark 2.4, set the environment variable `PYSPARK_ROW_FIELD_SORTING_ENABLED` to "true". For Python versions less than 3.6, the field names will be sorted alphabetically as the only option.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could we mention that this must be set for all processes? For example, set the environment variable PYSPARK_ROW_FIELD_SORTING_ENABLEDto "true" for **executors and driver**. This env must be consistent on all executors and driver. Any inconsistency may cause failures or incorrect answers

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. Let me fix it.


## Upgrading from PySpark 2.3 to 2.4

- In PySpark, when Arrow optimization is enabled, previously `toPandas` just failed when Arrow optimization is unable to be used whereas `createDataFrame` from Pandas DataFrame allowed the fallback to non-optimization. Now, both `toPandas` and `createDataFrame` from Pandas DataFrame allow the fallback by default, which can be switched off by `spark.sql.execution.arrow.fallback.enabled`.
Expand Down
13 changes: 13 additions & 0 deletions python/pyspark/sql/tests/test_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -968,6 +968,19 @@ def __init__(self, **kwargs):
with self.assertRaises(exp, msg=msg):
_make_type_verifier(data_type, nullable=False)(obj)

@unittest.skipIf(sys.version_info[:2] < (3, 6), "Create Row without sorting fields")
def test_row_without_field_sorting(self):
sorting_enabled_tmp = Row._row_field_sorting_enabled
Row._row_field_sorting_enabled = False

r = Row(b=1, a=2)
TestRow = Row("b", "a")
expected = TestRow(1, 2)

self.assertEqual(r, expected)
self.assertEqual(repr(r), "Row(b=1, a=2)")
Row._row_field_sorting_enabled = sorting_enabled_tmp


if __name__ == "__main__":
from pyspark.sql.tests.test_types import *
Expand Down
56 changes: 45 additions & 11 deletions python/pyspark/sql/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# limitations under the License.
#

import os
import sys
import decimal
import time
Expand All @@ -25,6 +26,7 @@
import base64
from array import array
import ctypes
import warnings

if sys.version >= "3":
long = int
Expand Down Expand Up @@ -1432,10 +1434,23 @@ class Row(tuple):

``key in row`` will search through row keys.

Row can be used to create a row object by using named arguments,
the fields will be sorted by names. It is not allowed to omit
a named argument to represent the value is None or missing. This should be
explicitly set to None in this case.
Row can be used to create a row object by using named arguments.
It is not allowed to omit a named argument to represent the value is
None or missing. This should be explicitly set to None in this case.

NOTE: As of Spark 3.0.0, Rows created from named arguments no longer have
field names sorted alphabetically and will be ordered in the position as
entered. To enable sorting for Rows compatible with Spark 2.x, set the
environment variable "PYSPARK_ROW_FIELD_SORTING_ENABLED" to "true". This
option is deprecated and will be removed in future versions of Spark. For
Python versions < 3.6, the order of named arguments is not guaranteed to
be the same as entered, see https://www.python.org/dev/peps/pep-0468. In
this case, a warning will be issued and the Row will fallback to sort the
field names automatically.

NOTE: Examples with Row in pydocs are run with the environment variable
"PYSPARK_ROW_FIELD_SORTING_ENABLED" set to "true" which results in output
where fields are sorted.

>>> row = Row(name="Alice", age=11)
>>> row
Expand Down Expand Up @@ -1474,21 +1489,40 @@ class Row(tuple):
True
"""

def __new__(self, *args, **kwargs):
# Remove after Python < 3.6 dropped, see SPARK-29748
_row_field_sorting_enabled = \
os.environ.get('PYSPARK_ROW_FIELD_SORTING_ENABLED', 'false').lower() == 'true'

if _row_field_sorting_enabled:
warnings.warn("The environment variable 'PYSPARK_ROW_FIELD_SORTING_ENABLED' "
"is deprecated and will be removed in future versions of Spark")

def __new__(cls, *args, **kwargs):
if args and kwargs:
raise ValueError("Can not use both args "
"and kwargs to create Row")
if kwargs:
if not Row._row_field_sorting_enabled and sys.version_info[:2] < (3, 6):
warnings.warn("To use named arguments for Python version < 3.6, Row fields will be "
"automatically sorted. This warning can be skipped by setting the "
"environment variable 'PYSPARK_ROW_FIELD_SORTING_ENABLED' to 'true'.")
Row._row_field_sorting_enabled = True

# create row objects
names = sorted(kwargs.keys())
Copy link
Member

@HyukjinKwon HyukjinKwon Dec 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, after a second thought, why don't we just have an env to switch on and off the sorting, and disable it in Spark 3.0, and remove the env out in Spark 3.1? I think it will need less changes I suspect (rather than having a separate class for legacy row)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we could do that but that doesn't solve the problem of the __from_dict__ flag that is not needed if there is no sorting. That flag isn't serialized which causes different behavior when serialized.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, actually it looks like it could be possible to only add the __from_dict__ flag if sorting is enabled too. I can give that a try and see if it works, wdyt?

row = tuple.__new__(self, [kwargs[n] for n in names])
row.__fields__ = names
row.__from_dict__ = True
return row
if Row._row_field_sorting_enabled:
# Remove after Python < 3.6 dropped, see SPARK-29748
names = sorted(kwargs.keys())
row = tuple.__new__(cls, [kwargs[n] for n in names])
row.__fields__ = names
row.__from_dict__ = True
else:
row = tuple.__new__(cls, list(kwargs.values()))
row.__fields__ = list(kwargs.keys())

return row
else:
# create row class or objects
return tuple.__new__(self, args)
return tuple.__new__(cls, args)

def asDict(self, recursive=False):
"""
Expand Down
3 changes: 2 additions & 1 deletion python/run-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ def run_individual_python_test(target_dir, test_name, pyspark_python):
'SPARK_TESTING': '1',
'SPARK_PREPEND_CLASSES': '1',
'PYSPARK_PYTHON': which(pyspark_python),
'PYSPARK_DRIVER_PYTHON': which(pyspark_python)
'PYSPARK_DRIVER_PYTHON': which(pyspark_python),
'PYSPARK_ROW_FIELD_SORTING_ENABLED': 'true'
})

# Create a unique temp directory under 'target/' for each run. The TMPDIR variable is
Expand Down