From f7fde9e5b99ae822066892585b6c73d2e4dbff79 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Mon, 22 Apr 2019 19:30:31 +0900 Subject: [PATCH 1/3] [SPARK-27276][PYTHON][SQL] Increase minimum version of pyarrow to 0.12.1 and remove prior workarounds This increases the minimum support version of pyarrow to 0.12.1 and removes workarounds in pyspark to remain compatible with prior versions. This means that users will need to have at least pyarrow 0.12.1 installed and available in the cluster or an `ImportError` will be raised to indicate an upgrade is needed. Existing tests using: Python 2.7.15, pyarrow 0.12.1, pandas 0.24.2 Python 3.6.7, pyarrow 0.12.1, pandas 0.24.0 Closes #24298 from BryanCutler/arrow-bump-min-pyarrow-SPARK-27276. Authored-by: Bryan Cutler Signed-off-by: HyukjinKwon --- python/pyspark/serializers.py | 53 +++++----------- python/pyspark/sql/dataframe.py | 8 ++- python/pyspark/sql/session.py | 6 +- python/pyspark/sql/tests/test_arrow.py | 39 ++---------- python/pyspark/sql/tests/test_pandas_udf.py | 48 +++++--------- .../sql/tests/test_pandas_udf_grouped_map.py | 28 +++------ .../sql/tests/test_pandas_udf_scalar.py | 30 +++------ python/pyspark/sql/types.py | 63 ------------------- python/pyspark/sql/utils.py | 2 +- python/setup.py | 7 ++- 10 files changed, 67 insertions(+), 217 deletions(-) diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index d9047a59695a1..0423f2ab3169c 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -265,10 +265,14 @@ def __init__(self, timezone, safecheck, assign_cols_by_name): self._safecheck = safecheck self._assign_cols_by_name = assign_cols_by_name - def arrow_to_pandas(self, arrow_column, data_type): - from pyspark.sql.types import _arrow_column_to_pandas, _check_series_localize_timestamps + def arrow_to_pandas(self, arrow_column): + from pyspark.sql.types import _check_series_localize_timestamps + + # If the given column is a date type column, creates a series of datetime.date directly + # instead of creating datetime64[ns] as intermediate data to avoid overflow caused by + # datetime64[ns] type handling. + s = arrow_column.to_pandas(date_as_object=True) - s = _arrow_column_to_pandas(arrow_column, data_type) s = _check_series_localize_timestamps(s, self._timezone) return s @@ -280,8 +284,6 @@ def _create_batch(self, series): :param series: A single pandas.Series, list of Series, or list of (series, arrow_type) :return: Arrow RecordBatch """ - import decimal - from distutils.version import LooseVersion import pandas as pd import pyarrow as pa from pyspark.sql.types import _check_series_convert_timestamps_internal @@ -294,24 +296,10 @@ def _create_batch(self, series): def create_array(s, t): mask = s.isnull() # Ensure timestamp series are in expected form for Spark internal representation - # TODO: maybe don't need None check anymore as of Arrow 0.9.1 if t is not None and pa.types.is_timestamp(t): s = _check_series_convert_timestamps_internal(s.fillna(0), self._timezone) # TODO: need cast after Arrow conversion, ns values cause error with pandas 0.19.2 return pa.Array.from_pandas(s, mask=mask).cast(t, safe=False) - elif t is not None and pa.types.is_string(t) and sys.version < '3': - # TODO: need decode before converting to Arrow in Python 2 - # TODO: don't need as of Arrow 0.9.1 - return pa.Array.from_pandas(s.apply( - lambda v: v.decode("utf-8") if isinstance(v, str) else v), mask=mask, type=t) - elif t is not None and pa.types.is_decimal(t) and \ - LooseVersion("0.9.0") <= LooseVersion(pa.__version__) < LooseVersion("0.10.0"): - # TODO: see ARROW-2432. Remove when the minimum PyArrow version becomes 0.10.0. - return pa.Array.from_pandas(s.apply( - lambda v: decimal.Decimal('NaN') if v is None else v), mask=mask, type=t) - elif LooseVersion(pa.__version__) < LooseVersion("0.11.0"): - # TODO: see ARROW-1949. Remove when the minimum PyArrow version becomes 0.11.0. - return pa.Array.from_pandas(s, mask=mask, type=t) try: array = pa.Array.from_pandas(s, mask=mask, type=t, safe=self._safecheck) @@ -345,12 +333,7 @@ def create_array(s, t): for i, field in enumerate(t)] struct_arrs, struct_names = zip(*arrs_names) - - # TODO: from_arrays args switched for v0.9.0, remove when bump min pyarrow version - if LooseVersion(pa.__version__) < LooseVersion("0.9.0"): - arrs.append(pa.StructArray.from_arrays(struct_names, struct_arrs)) - else: - arrs.append(pa.StructArray.from_arrays(struct_arrs, struct_names)) + arrs.append(pa.StructArray.from_arrays(struct_arrs, struct_names)) else: arrs.append(create_array(s, t)) @@ -370,10 +353,8 @@ def load_stream(self, stream): """ batches = super(ArrowStreamPandasSerializer, self).load_stream(stream) import pyarrow as pa - from pyspark.sql.types import from_arrow_type for batch in batches: - yield [self.arrow_to_pandas(c, from_arrow_type(c.type)) - for c in pa.Table.from_batches([batch]).itercolumns()] + yield [self.arrow_to_pandas(c) for c in pa.Table.from_batches([batch]).itercolumns()] def __repr__(self): return "ArrowStreamPandasSerializer" @@ -389,17 +370,17 @@ def __init__(self, timezone, safecheck, assign_cols_by_name, df_for_struct=False .__init__(timezone, safecheck, assign_cols_by_name) self._df_for_struct = df_for_struct - def arrow_to_pandas(self, arrow_column, data_type): - from pyspark.sql.types import StructType, \ - _arrow_column_to_pandas, _check_dataframe_localize_timestamps + def arrow_to_pandas(self, arrow_column): + import pyarrow.types as types - if self._df_for_struct and type(data_type) == StructType: + if self._df_for_struct and types.is_struct(arrow_column.type): import pandas as pd - series = [_arrow_column_to_pandas(column, field.dataType).rename(field.name) - for column, field in zip(arrow_column.flatten(), data_type)] - s = _check_dataframe_localize_timestamps(pd.concat(series, axis=1), self._timezone) + series = [super(ArrowStreamPandasUDFSerializer, self).arrow_to_pandas(column) + .rename(field.name) + for column, field in zip(arrow_column.flatten(), arrow_column.type)] + s = pd.concat(series, axis=1) else: - s = super(ArrowStreamPandasUDFSerializer, self).arrow_to_pandas(arrow_column, data_type) + s = super(ArrowStreamPandasUDFSerializer, self).arrow_to_pandas(arrow_column) return s def dump_stream(self, iterator, stream): diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 472d2969b3e19..7b20728cd730b 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -2107,13 +2107,15 @@ def toPandas(self): # of PyArrow is found, if 'spark.sql.execution.arrow.enabled' is enabled. if use_arrow: try: - from pyspark.sql.types import _arrow_table_to_pandas, \ - _check_dataframe_localize_timestamps + from pyspark.sql.types import _check_dataframe_localize_timestamps import pyarrow batches = self._collectAsArrow() if len(batches) > 0: table = pyarrow.Table.from_batches(batches) - pdf = _arrow_table_to_pandas(table, self.schema) + # Pandas DataFrame created from PyArrow uses datetime64[ns] for date type + # values, but we should use datetime.date to match the behavior with when + # Arrow optimization is disabled. + pdf = table.to_pandas(date_as_object=True) return _check_dataframe_localize_timestamps(pdf, timezone) else: return pd.DataFrame.from_records([], columns=self.columns) diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index 6c07cb91a625c..5b2300e0b81f5 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -545,11 +545,7 @@ def _create_from_pandas_with_arrow(self, pdf, schema, timezone): # Create the Spark schema from list of names passed in with Arrow types if isinstance(schema, (list, tuple)): - if LooseVersion(pa.__version__) < LooseVersion("0.12.0"): - temp_batch = pa.RecordBatch.from_pandas(pdf[0:100], preserve_index=False) - arrow_schema = temp_batch.schema - else: - arrow_schema = pa.Schema.from_pandas(pdf, preserve_index=False) + arrow_schema = pa.Schema.from_pandas(pdf, preserve_index=False) # TODO(rshkv): Remove when we stop supporting Python 2 (#678) if sys.version < '3' and LooseVersion(pa.__version__) >= LooseVersion("0.10.0"): diff --git a/python/pyspark/sql/tests/test_arrow.py b/python/pyspark/sql/tests/test_arrow.py index fd0b443171793..c8f11b4cfee65 100644 --- a/python/pyspark/sql/tests/test_arrow.py +++ b/python/pyspark/sql/tests/test_arrow.py @@ -47,7 +47,6 @@ class ArrowTests(ReusedSQLTestCase): def setUpClass(cls): from datetime import date, datetime from decimal import Decimal - from distutils.version import LooseVersion super(ArrowTests, cls).setUpClass() cls.warnings_lock = threading.Lock() @@ -69,23 +68,16 @@ def setUpClass(cls): StructField("5_double_t", DoubleType(), True), StructField("6_decimal_t", DecimalType(38, 18), True), StructField("7_date_t", DateType(), True), - StructField("8_timestamp_t", TimestampType(), True)]) + StructField("8_timestamp_t", TimestampType(), True), + StructField("9_binary_t", BinaryType(), True)]) cls.data = [(u"a", 1, 10, 0.2, 2.0, Decimal("2.0"), - date(1969, 1, 1), datetime(1969, 1, 1, 1, 1, 1)), + date(1969, 1, 1), datetime(1969, 1, 1, 1, 1, 1), bytearray(b"a")), (u"b", 2, 20, 0.4, 4.0, Decimal("4.0"), - date(2012, 2, 2), datetime(2012, 2, 2, 2, 2, 2)), + date(2012, 2, 2), datetime(2012, 2, 2, 2, 2, 2), bytearray(b"bb")), (u"c", 3, 30, 0.8, 6.0, Decimal("6.0"), - date(2100, 3, 3), datetime(2100, 3, 3, 3, 3, 3)), + date(2100, 3, 3), datetime(2100, 3, 3, 3, 3, 3), bytearray(b"ccc")), (u"d", 4, 40, 1.0, 8.0, Decimal("8.0"), - date(2262, 4, 12), datetime(2262, 3, 3, 3, 3, 3))] - - # TODO: remove version check once minimum pyarrow version is 0.10.0 - if LooseVersion("0.10.0") <= LooseVersion(pa.__version__): - cls.schema.add(StructField("9_binary_t", BinaryType(), True)) - cls.data[0] = cls.data[0] + (bytearray(b"a"),) - cls.data[1] = cls.data[1] + (bytearray(b"bb"),) - cls.data[2] = cls.data[2] + (bytearray(b"ccc"),) - cls.data[3] = cls.data[3] + (bytearray(b"dddd"),) + date(2262, 4, 12), datetime(2262, 3, 3, 3, 3, 3), bytearray(b"dddd"))] @classmethod def tearDownClass(cls): @@ -124,8 +116,6 @@ def test_toPandas_fallback_enabled(self): assert_frame_equal(pdf, pd.DataFrame({u'map': [{u'a': 1}]})) def test_toPandas_fallback_disabled(self): - from distutils.version import LooseVersion - schema = StructType([StructField("map", MapType(StringType(), IntegerType()), True)]) df = self.spark.createDataFrame([(None,)], schema=schema) with QuietTest(self.sc): @@ -133,14 +123,6 @@ def test_toPandas_fallback_disabled(self): with self.assertRaisesRegexp(Exception, 'Unsupported type'): df.toPandas() - # TODO: remove BinaryType check once minimum pyarrow version is 0.10.0 - if LooseVersion(pa.__version__) < LooseVersion("0.10.0"): - schema = StructType([StructField("binary", BinaryType(), True)]) - df = self.spark.createDataFrame([(None,)], schema=schema) - with QuietTest(self.sc): - with self.assertRaisesRegexp(Exception, 'Unsupported type.*BinaryType'): - df.toPandas() - def test_null_conversion(self): df_null = self.spark.createDataFrame([tuple([None for _ in range(len(self.data[0]))])] + self.data) @@ -391,20 +373,11 @@ def test_createDataFrame_fallback_enabled(self): self.assertEqual(df.collect(), [Row(a={u'a': 1})]) def test_createDataFrame_fallback_disabled(self): - from distutils.version import LooseVersion - with QuietTest(self.sc): with self.assertRaisesRegexp(TypeError, 'Unsupported type'): self.spark.createDataFrame( pd.DataFrame([[{u'a': 1}]]), "a: map") - # TODO: remove BinaryType check once minimum pyarrow version is 0.10.0 - if LooseVersion(pa.__version__) < LooseVersion("0.10.0"): - with QuietTest(self.sc): - with self.assertRaisesRegexp(TypeError, 'Unsupported type.*BinaryType'): - self.spark.createDataFrame( - pd.DataFrame([[{'a': b'aaa'}]]), "a: binary") - # Regression test for SPARK-23314 def test_timestamp_dst(self): # Daylight saving time for Los Angeles for 2015 is Sun, Nov 1 at 2:00 am diff --git a/python/pyspark/sql/tests/test_pandas_udf.py b/python/pyspark/sql/tests/test_pandas_udf.py index fd6d4e18ed2ab..72dc05bb02cdc 100644 --- a/python/pyspark/sql/tests/test_pandas_udf.py +++ b/python/pyspark/sql/tests/test_pandas_udf.py @@ -198,10 +198,8 @@ def foofoo(x, y): ) def test_pandas_udf_detect_unsafe_type_conversion(self): - from distutils.version import LooseVersion import pandas as pd import numpy as np - import pyarrow as pa values = [1.0] * 3 pdf = pd.DataFrame({'A': values}) @@ -209,15 +207,14 @@ def test_pandas_udf_detect_unsafe_type_conversion(self): @pandas_udf(returnType="int") def udf(column): - return pd.Series(np.linspace(0, 1, 3)) + return pd.Series(np.linspace(0, 1, len(column))) # Since 0.11.0, PyArrow supports the feature to raise an error for unsafe cast. - if LooseVersion(pa.__version__) >= LooseVersion("0.11.0"): - with self.sql_conf({ - "spark.sql.execution.pandas.arrowSafeTypeConversion": True}): - with self.assertRaisesRegexp(Exception, - "Exception thrown when converting pandas.Series"): - df.select(['A']).withColumn('udf', udf('A')).collect() + with self.sql_conf({ + "spark.sql.execution.pandas.arrowSafeTypeConversion": True}): + with self.assertRaisesRegexp(Exception, + "Exception thrown when converting pandas.Series"): + df.select(['A']).withColumn('udf', udf('A')).collect() # Disabling Arrow safe type check. with self.sql_conf({ @@ -225,35 +222,24 @@ def udf(column): df.select(['A']).withColumn('udf', udf('A')).collect() def test_pandas_udf_arrow_overflow(self): - from distutils.version import LooseVersion import pandas as pd - import pyarrow as pa df = self.spark.range(0, 1) @pandas_udf(returnType="byte") def udf(column): - return pd.Series([128]) - - # Arrow 0.11.0+ allows enabling or disabling safe type check. - if LooseVersion(pa.__version__) >= LooseVersion("0.11.0"): - # When enabling safe type check, Arrow 0.11.0+ disallows overflow cast. - with self.sql_conf({ - "spark.sql.execution.pandas.arrowSafeTypeConversion": True}): - with self.assertRaisesRegexp(Exception, - "Exception thrown when converting pandas.Series"): - df.withColumn('udf', udf('id')).collect() - - # Disabling safe type check, let Arrow do the cast anyway. - with self.sql_conf({"spark.sql.execution.pandas.arrowSafeTypeConversion": False}): + return pd.Series([128] * len(column)) + + # When enabling safe type check, Arrow 0.11.0+ disallows overflow cast. + with self.sql_conf({ + "spark.sql.execution.pandas.arrowSafeTypeConversion": True}): + with self.assertRaisesRegexp(Exception, + "Exception thrown when converting pandas.Series"): df.withColumn('udf', udf('id')).collect() - else: - # SQL config `arrowSafeTypeConversion` no matters for older Arrow. - # Overflow cast causes an error. - with self.sql_conf({"spark.sql.execution.pandas.arrowSafeTypeConversion": False}): - with self.assertRaisesRegexp(Exception, - "Integer value out of bounds"): - df.withColumn('udf', udf('id')).collect() + + # Disabling safe type check, let Arrow do the cast anyway. + with self.sql_conf({"spark.sql.execution.pandas.arrowSafeTypeConversion": False}): + df.withColumn('udf', udf('id')).collect() if __name__ == "__main__": diff --git a/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py b/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py index c8bad99a7705e..1d87c636ab34e 100644 --- a/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py +++ b/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py @@ -21,7 +21,6 @@ from collections import OrderedDict from decimal import Decimal -from distutils.version import LooseVersion from pyspark.sql import Row from pyspark.sql.functions import array, explode, col, lit, udf, sum, pandas_udf, PandasUDFType @@ -65,20 +64,17 @@ def test_supported_types(self): 1, 2, 3, 4, 5, 1.1, 2.2, Decimal(1.123), - [1, 2, 2], True, 'hello' + [1, 2, 2], True, 'hello', + bytearray([0x01, 0x02]) ] output_fields = [ ('id', IntegerType()), ('byte', ByteType()), ('short', ShortType()), ('int', IntegerType()), ('long', LongType()), ('float', FloatType()), ('double', DoubleType()), ('decim', DecimalType(10, 3)), - ('array', ArrayType(IntegerType())), ('bool', BooleanType()), ('str', StringType()) + ('array', ArrayType(IntegerType())), ('bool', BooleanType()), ('str', StringType()), + ('bin', BinaryType()) ] - # TODO: Add BinaryType to variables above once minimum pyarrow version is 0.10.0 - if LooseVersion(pa.__version__) >= LooseVersion("0.10.0"): - values.append(bytearray([0x01, 0x02])) - output_fields.append(('bin', BinaryType())) - output_schema = StructType([StructField(*x) for x in output_fields]) df = self.spark.createDataFrame([values], schema=output_schema) @@ -95,6 +91,7 @@ def test_supported_types(self): bool=False if pdf.bool else True, str=pdf.str + 'there', array=pdf.array, + bin=pdf.bin ), output_schema, PandasUDFType.GROUPED_MAP @@ -112,6 +109,7 @@ def test_supported_types(self): bool=False if pdf.bool else True, str=pdf.str + 'there', array=pdf.array, + bin=pdf.bin ), output_schema, PandasUDFType.GROUPED_MAP @@ -130,6 +128,7 @@ def test_supported_types(self): bool=False if pdf.bool else True, str=pdf.str + 'there', array=pdf.array, + bin=pdf.bin ), output_schema, PandasUDFType.GROUPED_MAP @@ -291,10 +290,6 @@ def test_unsupported_types(self): StructField('struct', StructType([StructField('l', LongType())])), ] - # TODO: Remove this if-statement once minimum pyarrow version is 0.10.0 - if LooseVersion(pa.__version__) < LooseVersion("0.10.0"): - unsupported_types.append(StructField('bin', BinaryType())) - for unsupported_type in unsupported_types: schema = StructType([StructField('id', LongType(), True), unsupported_type]) with QuietTest(self.sc): @@ -466,13 +461,8 @@ def invalid_positional_types(pdf): with QuietTest(self.sc): with self.assertRaisesRegexp(Exception, "KeyError: 'id'"): grouped_df.apply(column_name_typo).collect() - if LooseVersion(pa.__version__) < LooseVersion("0.11.0"): - # TODO: see ARROW-1949. Remove when the minimum PyArrow version becomes 0.11.0. - with self.assertRaisesRegexp(Exception, "No cast implemented"): - grouped_df.apply(invalid_positional_types).collect() - else: - with self.assertRaisesRegexp(Exception, "an integer is required"): - grouped_df.apply(invalid_positional_types).collect() + with self.assertRaisesRegexp(Exception, "an integer is required"): + grouped_df.apply(invalid_positional_types).collect() def test_positional_assignment_conf(self): with self.sql_conf({ diff --git a/python/pyspark/sql/tests/test_pandas_udf_scalar.py b/python/pyspark/sql/tests/test_pandas_udf_scalar.py index dd0bd20635d6f..8138696137bc0 100644 --- a/python/pyspark/sql/tests/test_pandas_udf_scalar.py +++ b/python/pyspark/sql/tests/test_pandas_udf_scalar.py @@ -28,7 +28,6 @@ from datetime import date, datetime from decimal import Decimal -from distutils.version import LooseVersion from pyspark.rdd import PythonEvalType from pyspark.sql import Column @@ -240,19 +239,12 @@ def test_vectorized_udf_datatype_string(self): self.assertEquals(df.collect(), res.collect()) def test_vectorized_udf_null_binary(self): - if LooseVersion(pa.__version__) < LooseVersion("0.10.0"): - with QuietTest(self.sc): - with self.assertRaisesRegexp( - NotImplementedError, - 'Invalid returnType.*scalar Pandas UDF.*BinaryType'): - pandas_udf(lambda x: x, BinaryType()) - else: - data = [(bytearray(b"a"),), (None,), (bytearray(b"bb"),), (bytearray(b"ccc"),)] - schema = StructType().add("binary", BinaryType()) - df = self.spark.createDataFrame(data, schema) - str_f = pandas_udf(lambda x: x, BinaryType()) - res = df.select(str_f(col('binary'))) - self.assertEquals(df.collect(), res.collect()) + data = [(bytearray(b"a"),), (None,), (bytearray(b"bb"),), (bytearray(b"ccc"),)] + schema = StructType().add("binary", BinaryType()) + df = self.spark.createDataFrame(data, schema) + str_f = pandas_udf(lambda x: x, BinaryType()) + res = df.select(str_f(col('binary'))) + self.assertEquals(df.collect(), res.collect()) def test_vectorized_udf_array_type(self): data = [([1, 2],), ([3, 4],)] @@ -293,15 +285,7 @@ def func(id): struct_f = pandas_udf(lambda x: x, return_type) actual = df.select(struct_f(struct(col('id'), col('id').cast('string').alias('str')))) - if LooseVersion(pa.__version__) < LooseVersion("0.10.0"): - with QuietTest(self.sc): - from py4j.protocol import Py4JJavaError - with self.assertRaisesRegexp( - Py4JJavaError, - 'Unsupported type in conversion from Arrow'): - self.assertEqual(expected, actual.collect()) - else: - self.assertEqual(expected, actual.collect()) + self.assertEqual(expected, actual.collect()) def test_vectorized_udf_struct_complex(self): df = self.spark.range(10) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 0e299fbc3d53e..041d19ca64161 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -1584,7 +1584,6 @@ def convert(self, obj, gateway_client): def to_arrow_type(dt): """ Convert Spark data type to pyarrow type """ - from distutils.version import LooseVersion import pyarrow as pa if type(dt) == BooleanType: arrow_type = pa.bool_() @@ -1605,10 +1604,6 @@ def to_arrow_type(dt): elif type(dt) == StringType: arrow_type = pa.string() elif type(dt) == BinaryType: - # TODO: remove version check once minimum pyarrow version is 0.10.0 - if LooseVersion(pa.__version__) < LooseVersion("0.10.0"): - raise TypeError("Unsupported type in conversion to Arrow: " + str(dt) + - "\nPlease install pyarrow >= 0.10.0 for BinaryType support.") arrow_type = pa.binary() elif type(dt) == DateType: arrow_type = pa.date32() @@ -1642,8 +1637,6 @@ def to_arrow_schema(schema): def from_arrow_type(at): """ Convert pyarrow type to Spark data type. """ - from distutils.version import LooseVersion - import pyarrow as pa import pyarrow.types as types if types.is_boolean(at): spark_type = BooleanType() @@ -1664,10 +1657,6 @@ def from_arrow_type(at): elif types.is_string(at): spark_type = StringType() elif types.is_binary(at): - # TODO: remove version check once minimum pyarrow version is 0.10.0 - if LooseVersion(pa.__version__) < LooseVersion("0.10.0"): - raise TypeError("Unsupported type in conversion from Arrow: " + str(at) + - "\nPlease install pyarrow >= 0.10.0 for BinaryType support.") spark_type = BinaryType() elif types.is_date32(at): spark_type = DateType() @@ -1684,10 +1673,6 @@ def from_arrow_type(at): spark_type = ArrayType(from_arrow_type(at.value_type)) elif types.is_struct(at): - # TODO: remove version check once minimum pyarrow version is 0.10.0 - if LooseVersion(pa.__version__) < LooseVersion("0.10.0"): - raise TypeError("Unsupported type in conversion from Arrow: " + str(at) + - "\nPlease install pyarrow >= 0.10.0 for StructType support.") if any(types.is_struct(field.type) for field in at): raise TypeError("Nested StructType not supported in conversion from Arrow: " + str(at)) return StructType( @@ -1706,54 +1691,6 @@ def from_arrow_schema(arrow_schema): for field in arrow_schema]) -def _arrow_column_to_pandas(column, data_type): - """ Convert Arrow Column to pandas Series. - - :param series: pyarrow.lib.Column - :param data_type: a Spark data type for the column - """ - import pandas as pd - import pyarrow as pa - from distutils.version import LooseVersion - # If the given column is a date type column, creates a series of datetime.date directly instead - # of creating datetime64[ns] as intermediate data to avoid overflow caused by datetime64[ns] - # type handling. - if LooseVersion(pa.__version__) < LooseVersion("0.11.0"): - if type(data_type) == DateType: - return pd.Series(column.to_pylist(), name=column.name) - else: - return column.to_pandas() - else: - # Since Arrow 0.11.0, support date_as_object to return datetime.date instead of - # np.datetime64. - return column.to_pandas(date_as_object=True) - - -def _arrow_table_to_pandas(table, schema): - """ Convert Arrow Table to pandas DataFrame. - - Pandas DataFrame created from PyArrow uses datetime64[ns] for date type values, but we should - use datetime.date to match the behavior with when Arrow optimization is disabled. - - :param table: pyarrow.lib.Table - :param schema: a Spark schema of the pyarrow.lib.Table - """ - import pandas as pd - import pyarrow as pa - from distutils.version import LooseVersion - # If the given table contains a date type column, use `_arrow_column_to_pandas` for pyarrow<0.11 - # or use `date_as_object` option for pyarrow>=0.11 to avoid creating datetime64[ns] as - # intermediate data. - if LooseVersion(pa.__version__) < LooseVersion("0.11.0"): - if any(type(field.dataType) == DateType for field in schema): - return pd.concat([_arrow_column_to_pandas(column, field.dataType) - for column, field in zip(table.itercolumns(), schema)], axis=1) - else: - return table.to_pandas() - else: - return table.to_pandas(date_as_object=True) - - def _infer_binary_columns_as_arrow_string(schema, pandas_df): import pandas as pd import pyarrow as pa diff --git a/python/pyspark/sql/utils.py b/python/pyspark/sql/utils.py index bdb3a1467f1d8..709d3a0642616 100644 --- a/python/pyspark/sql/utils.py +++ b/python/pyspark/sql/utils.py @@ -136,7 +136,7 @@ def require_minimum_pyarrow_version(): """ Raise ImportError if minimum version of pyarrow is not installed """ # TODO(HyukjinKwon): Relocate and deduplicate the version specification. - minimum_pyarrow_version = "0.8.0" + minimum_pyarrow_version = "0.12.1" from distutils.version import LooseVersion try: diff --git a/python/setup.py b/python/setup.py index 22f0940db93e1..503c721d5a534 100644 --- a/python/setup.py +++ b/python/setup.py @@ -100,10 +100,11 @@ def _supports_symlinks(): file=sys.stderr) sys.exit(-1) -# If you are changing the versions here, please also change ./python/pyspark/sql/utils.py and -# ./python/run-tests.py. In case of Arrow, you should also check ./pom.xml. +# If you are changing the versions here, please also change ./python/pyspark/sql/utils.py +# For Arrow, you should also check ./pom.xml and ensure there are no breaking changes in the +# binary format protocol with the Java version, see ARROW_HOME/format/* for specifications. _minimum_pandas_version = "0.19.2" -_minimum_pyarrow_version = "0.8.0" +_minimum_pyarrow_version = "0.12.1" try: # We copy the shell script to be under pyspark/python/pyspark so that the launcher scripts From cc90b214ea4155dd2d8d1c421ad98884ee794a88 Mon Sep 17 00:00:00 2001 From: Will Raschkowski Date: Sun, 17 May 2020 19:14:22 +0200 Subject: [PATCH 2/3] Fix pandas infer_dtype warning --- python/pyspark/sql/types.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 041d19ca64161..115ad0e7f5b14 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -1697,7 +1697,7 @@ def _infer_binary_columns_as_arrow_string(schema, pandas_df): for field_index, field in enumerate(schema): if field.type == pa.binary() and \ - pd.api.types.infer_dtype(pandas_df.iloc[:, field_index]) == "string": + pd.api.types.infer_dtype(pandas_df.iloc[:, field_index], skipna=True) == "string": field_as_string = pa.field(field.name, pa.string()) schema = schema.set(field_index, field_as_string) From 87e4b7454199912330b6e0b2631e4b952a2155dd Mon Sep 17 00:00:00 2001 From: HyukjinKwon Date: Wed, 1 May 2019 10:13:43 -0700 Subject: [PATCH 3/3] [SPARK-27276][PYTHON][DOCS][FOLLOW-UP] Update documentation about Arrow version in PySpark as well ## What changes were proposed in this pull request? Looks updating documentation from 0.8.0 to 0.12.1 was missed. ## How was this patch tested? N/A Closes #24504 from HyukjinKwon/SPARK-27276-followup. Authored-by: HyukjinKwon Signed-off-by: Bryan Cutler --- docs/sql-pyspark-pandas-with-arrow.md | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/docs/sql-pyspark-pandas-with-arrow.md b/docs/sql-pyspark-pandas-with-arrow.md index d18ca0beb0fc6..23477bbb93831 100644 --- a/docs/sql-pyspark-pandas-with-arrow.md +++ b/docs/sql-pyspark-pandas-with-arrow.md @@ -7,7 +7,7 @@ displayTitle: PySpark Usage Guide for Pandas with Apache Arrow * Table of contents {:toc} -## Apache Arrow in Spark +## Apache Arrow in PySpark Apache Arrow is an in-memory columnar data format that is used in Spark to efficiently transfer data between JVM and Python processes. This currently is most beneficial to Python users that @@ -20,7 +20,7 @@ working with Arrow-enabled data. If you install PySpark using pip, then PyArrow can be brought in as an extra dependency of the SQL module with the command `pip install pyspark[sql]`. Otherwise, you must ensure that PyArrow -is installed and available on all cluster nodes. The current supported version is 0.8.0. +is installed and available on all cluster nodes. The current supported version is 0.12.1. You can install using pip or conda from the conda-forge channel. See PyArrow [installation](https://arrow.apache.org/docs/python/install.html) for details. @@ -128,8 +128,7 @@ For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/p ### Supported SQL Types Currently, all Spark SQL data types are supported by Arrow-based conversion except `MapType`, -`ArrayType` of `TimestampType`, and nested `StructType`. `BinaryType` is supported only when -installed PyArrow is equal to or higher than 0.10.0. +`ArrayType` of `TimestampType`, and nested `StructType`. ### Setting Arrow Batch Size