diff --git a/docs/sql-pyspark-pandas-with-arrow.md b/docs/sql-pyspark-pandas-with-arrow.md index d18ca0beb0fc6..23477bbb93831 100644 --- a/docs/sql-pyspark-pandas-with-arrow.md +++ b/docs/sql-pyspark-pandas-with-arrow.md @@ -7,7 +7,7 @@ displayTitle: PySpark Usage Guide for Pandas with Apache Arrow * Table of contents {:toc} -## Apache Arrow in Spark +## Apache Arrow in PySpark Apache Arrow is an in-memory columnar data format that is used in Spark to efficiently transfer data between JVM and Python processes. This currently is most beneficial to Python users that @@ -20,7 +20,7 @@ working with Arrow-enabled data. If you install PySpark using pip, then PyArrow can be brought in as an extra dependency of the SQL module with the command `pip install pyspark[sql]`. Otherwise, you must ensure that PyArrow -is installed and available on all cluster nodes. The current supported version is 0.8.0. +is installed and available on all cluster nodes. The current supported version is 0.12.1. You can install using pip or conda from the conda-forge channel. See PyArrow [installation](https://arrow.apache.org/docs/python/install.html) for details. @@ -128,8 +128,7 @@ For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/p ### Supported SQL Types Currently, all Spark SQL data types are supported by Arrow-based conversion except `MapType`, -`ArrayType` of `TimestampType`, and nested `StructType`. `BinaryType` is supported only when -installed PyArrow is equal to or higher than 0.10.0. +`ArrayType` of `TimestampType`, and nested `StructType`. ### Setting Arrow Batch Size diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index d9047a59695a1..0423f2ab3169c 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -265,10 +265,14 @@ def __init__(self, timezone, safecheck, assign_cols_by_name): self._safecheck = safecheck self._assign_cols_by_name = assign_cols_by_name - def arrow_to_pandas(self, arrow_column, data_type): - from pyspark.sql.types import _arrow_column_to_pandas, _check_series_localize_timestamps + def arrow_to_pandas(self, arrow_column): + from pyspark.sql.types import _check_series_localize_timestamps + + # If the given column is a date type column, creates a series of datetime.date directly + # instead of creating datetime64[ns] as intermediate data to avoid overflow caused by + # datetime64[ns] type handling. + s = arrow_column.to_pandas(date_as_object=True) - s = _arrow_column_to_pandas(arrow_column, data_type) s = _check_series_localize_timestamps(s, self._timezone) return s @@ -280,8 +284,6 @@ def _create_batch(self, series): :param series: A single pandas.Series, list of Series, or list of (series, arrow_type) :return: Arrow RecordBatch """ - import decimal - from distutils.version import LooseVersion import pandas as pd import pyarrow as pa from pyspark.sql.types import _check_series_convert_timestamps_internal @@ -294,24 +296,10 @@ def _create_batch(self, series): def create_array(s, t): mask = s.isnull() # Ensure timestamp series are in expected form for Spark internal representation - # TODO: maybe don't need None check anymore as of Arrow 0.9.1 if t is not None and pa.types.is_timestamp(t): s = _check_series_convert_timestamps_internal(s.fillna(0), self._timezone) # TODO: need cast after Arrow conversion, ns values cause error with pandas 0.19.2 return pa.Array.from_pandas(s, mask=mask).cast(t, safe=False) - elif t is not None and pa.types.is_string(t) and sys.version < '3': - # TODO: need decode before converting to Arrow in Python 2 - # TODO: don't need as of Arrow 0.9.1 - return pa.Array.from_pandas(s.apply( - lambda v: v.decode("utf-8") if isinstance(v, str) else v), mask=mask, type=t) - elif t is not None and pa.types.is_decimal(t) and \ - LooseVersion("0.9.0") <= LooseVersion(pa.__version__) < LooseVersion("0.10.0"): - # TODO: see ARROW-2432. Remove when the minimum PyArrow version becomes 0.10.0. - return pa.Array.from_pandas(s.apply( - lambda v: decimal.Decimal('NaN') if v is None else v), mask=mask, type=t) - elif LooseVersion(pa.__version__) < LooseVersion("0.11.0"): - # TODO: see ARROW-1949. Remove when the minimum PyArrow version becomes 0.11.0. - return pa.Array.from_pandas(s, mask=mask, type=t) try: array = pa.Array.from_pandas(s, mask=mask, type=t, safe=self._safecheck) @@ -345,12 +333,7 @@ def create_array(s, t): for i, field in enumerate(t)] struct_arrs, struct_names = zip(*arrs_names) - - # TODO: from_arrays args switched for v0.9.0, remove when bump min pyarrow version - if LooseVersion(pa.__version__) < LooseVersion("0.9.0"): - arrs.append(pa.StructArray.from_arrays(struct_names, struct_arrs)) - else: - arrs.append(pa.StructArray.from_arrays(struct_arrs, struct_names)) + arrs.append(pa.StructArray.from_arrays(struct_arrs, struct_names)) else: arrs.append(create_array(s, t)) @@ -370,10 +353,8 @@ def load_stream(self, stream): """ batches = super(ArrowStreamPandasSerializer, self).load_stream(stream) import pyarrow as pa - from pyspark.sql.types import from_arrow_type for batch in batches: - yield [self.arrow_to_pandas(c, from_arrow_type(c.type)) - for c in pa.Table.from_batches([batch]).itercolumns()] + yield [self.arrow_to_pandas(c) for c in pa.Table.from_batches([batch]).itercolumns()] def __repr__(self): return "ArrowStreamPandasSerializer" @@ -389,17 +370,17 @@ def __init__(self, timezone, safecheck, assign_cols_by_name, df_for_struct=False .__init__(timezone, safecheck, assign_cols_by_name) self._df_for_struct = df_for_struct - def arrow_to_pandas(self, arrow_column, data_type): - from pyspark.sql.types import StructType, \ - _arrow_column_to_pandas, _check_dataframe_localize_timestamps + def arrow_to_pandas(self, arrow_column): + import pyarrow.types as types - if self._df_for_struct and type(data_type) == StructType: + if self._df_for_struct and types.is_struct(arrow_column.type): import pandas as pd - series = [_arrow_column_to_pandas(column, field.dataType).rename(field.name) - for column, field in zip(arrow_column.flatten(), data_type)] - s = _check_dataframe_localize_timestamps(pd.concat(series, axis=1), self._timezone) + series = [super(ArrowStreamPandasUDFSerializer, self).arrow_to_pandas(column) + .rename(field.name) + for column, field in zip(arrow_column.flatten(), arrow_column.type)] + s = pd.concat(series, axis=1) else: - s = super(ArrowStreamPandasUDFSerializer, self).arrow_to_pandas(arrow_column, data_type) + s = super(ArrowStreamPandasUDFSerializer, self).arrow_to_pandas(arrow_column) return s def dump_stream(self, iterator, stream): diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 472d2969b3e19..7b20728cd730b 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -2107,13 +2107,15 @@ def toPandas(self): # of PyArrow is found, if 'spark.sql.execution.arrow.enabled' is enabled. if use_arrow: try: - from pyspark.sql.types import _arrow_table_to_pandas, \ - _check_dataframe_localize_timestamps + from pyspark.sql.types import _check_dataframe_localize_timestamps import pyarrow batches = self._collectAsArrow() if len(batches) > 0: table = pyarrow.Table.from_batches(batches) - pdf = _arrow_table_to_pandas(table, self.schema) + # Pandas DataFrame created from PyArrow uses datetime64[ns] for date type + # values, but we should use datetime.date to match the behavior with when + # Arrow optimization is disabled. + pdf = table.to_pandas(date_as_object=True) return _check_dataframe_localize_timestamps(pdf, timezone) else: return pd.DataFrame.from_records([], columns=self.columns) diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index 6c07cb91a625c..5b2300e0b81f5 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -545,11 +545,7 @@ def _create_from_pandas_with_arrow(self, pdf, schema, timezone): # Create the Spark schema from list of names passed in with Arrow types if isinstance(schema, (list, tuple)): - if LooseVersion(pa.__version__) < LooseVersion("0.12.0"): - temp_batch = pa.RecordBatch.from_pandas(pdf[0:100], preserve_index=False) - arrow_schema = temp_batch.schema - else: - arrow_schema = pa.Schema.from_pandas(pdf, preserve_index=False) + arrow_schema = pa.Schema.from_pandas(pdf, preserve_index=False) # TODO(rshkv): Remove when we stop supporting Python 2 (#678) if sys.version < '3' and LooseVersion(pa.__version__) >= LooseVersion("0.10.0"): diff --git a/python/pyspark/sql/tests/test_arrow.py b/python/pyspark/sql/tests/test_arrow.py index fd0b443171793..c8f11b4cfee65 100644 --- a/python/pyspark/sql/tests/test_arrow.py +++ b/python/pyspark/sql/tests/test_arrow.py @@ -47,7 +47,6 @@ class ArrowTests(ReusedSQLTestCase): def setUpClass(cls): from datetime import date, datetime from decimal import Decimal - from distutils.version import LooseVersion super(ArrowTests, cls).setUpClass() cls.warnings_lock = threading.Lock() @@ -69,23 +68,16 @@ def setUpClass(cls): StructField("5_double_t", DoubleType(), True), StructField("6_decimal_t", DecimalType(38, 18), True), StructField("7_date_t", DateType(), True), - StructField("8_timestamp_t", TimestampType(), True)]) + StructField("8_timestamp_t", TimestampType(), True), + StructField("9_binary_t", BinaryType(), True)]) cls.data = [(u"a", 1, 10, 0.2, 2.0, Decimal("2.0"), - date(1969, 1, 1), datetime(1969, 1, 1, 1, 1, 1)), + date(1969, 1, 1), datetime(1969, 1, 1, 1, 1, 1), bytearray(b"a")), (u"b", 2, 20, 0.4, 4.0, Decimal("4.0"), - date(2012, 2, 2), datetime(2012, 2, 2, 2, 2, 2)), + date(2012, 2, 2), datetime(2012, 2, 2, 2, 2, 2), bytearray(b"bb")), (u"c", 3, 30, 0.8, 6.0, Decimal("6.0"), - date(2100, 3, 3), datetime(2100, 3, 3, 3, 3, 3)), + date(2100, 3, 3), datetime(2100, 3, 3, 3, 3, 3), bytearray(b"ccc")), (u"d", 4, 40, 1.0, 8.0, Decimal("8.0"), - date(2262, 4, 12), datetime(2262, 3, 3, 3, 3, 3))] - - # TODO: remove version check once minimum pyarrow version is 0.10.0 - if LooseVersion("0.10.0") <= LooseVersion(pa.__version__): - cls.schema.add(StructField("9_binary_t", BinaryType(), True)) - cls.data[0] = cls.data[0] + (bytearray(b"a"),) - cls.data[1] = cls.data[1] + (bytearray(b"bb"),) - cls.data[2] = cls.data[2] + (bytearray(b"ccc"),) - cls.data[3] = cls.data[3] + (bytearray(b"dddd"),) + date(2262, 4, 12), datetime(2262, 3, 3, 3, 3, 3), bytearray(b"dddd"))] @classmethod def tearDownClass(cls): @@ -124,8 +116,6 @@ def test_toPandas_fallback_enabled(self): assert_frame_equal(pdf, pd.DataFrame({u'map': [{u'a': 1}]})) def test_toPandas_fallback_disabled(self): - from distutils.version import LooseVersion - schema = StructType([StructField("map", MapType(StringType(), IntegerType()), True)]) df = self.spark.createDataFrame([(None,)], schema=schema) with QuietTest(self.sc): @@ -133,14 +123,6 @@ def test_toPandas_fallback_disabled(self): with self.assertRaisesRegexp(Exception, 'Unsupported type'): df.toPandas() - # TODO: remove BinaryType check once minimum pyarrow version is 0.10.0 - if LooseVersion(pa.__version__) < LooseVersion("0.10.0"): - schema = StructType([StructField("binary", BinaryType(), True)]) - df = self.spark.createDataFrame([(None,)], schema=schema) - with QuietTest(self.sc): - with self.assertRaisesRegexp(Exception, 'Unsupported type.*BinaryType'): - df.toPandas() - def test_null_conversion(self): df_null = self.spark.createDataFrame([tuple([None for _ in range(len(self.data[0]))])] + self.data) @@ -391,20 +373,11 @@ def test_createDataFrame_fallback_enabled(self): self.assertEqual(df.collect(), [Row(a={u'a': 1})]) def test_createDataFrame_fallback_disabled(self): - from distutils.version import LooseVersion - with QuietTest(self.sc): with self.assertRaisesRegexp(TypeError, 'Unsupported type'): self.spark.createDataFrame( pd.DataFrame([[{u'a': 1}]]), "a: map") - # TODO: remove BinaryType check once minimum pyarrow version is 0.10.0 - if LooseVersion(pa.__version__) < LooseVersion("0.10.0"): - with QuietTest(self.sc): - with self.assertRaisesRegexp(TypeError, 'Unsupported type.*BinaryType'): - self.spark.createDataFrame( - pd.DataFrame([[{'a': b'aaa'}]]), "a: binary") - # Regression test for SPARK-23314 def test_timestamp_dst(self): # Daylight saving time for Los Angeles for 2015 is Sun, Nov 1 at 2:00 am diff --git a/python/pyspark/sql/tests/test_pandas_udf.py b/python/pyspark/sql/tests/test_pandas_udf.py index fd6d4e18ed2ab..72dc05bb02cdc 100644 --- a/python/pyspark/sql/tests/test_pandas_udf.py +++ b/python/pyspark/sql/tests/test_pandas_udf.py @@ -198,10 +198,8 @@ def foofoo(x, y): ) def test_pandas_udf_detect_unsafe_type_conversion(self): - from distutils.version import LooseVersion import pandas as pd import numpy as np - import pyarrow as pa values = [1.0] * 3 pdf = pd.DataFrame({'A': values}) @@ -209,15 +207,14 @@ def test_pandas_udf_detect_unsafe_type_conversion(self): @pandas_udf(returnType="int") def udf(column): - return pd.Series(np.linspace(0, 1, 3)) + return pd.Series(np.linspace(0, 1, len(column))) # Since 0.11.0, PyArrow supports the feature to raise an error for unsafe cast. - if LooseVersion(pa.__version__) >= LooseVersion("0.11.0"): - with self.sql_conf({ - "spark.sql.execution.pandas.arrowSafeTypeConversion": True}): - with self.assertRaisesRegexp(Exception, - "Exception thrown when converting pandas.Series"): - df.select(['A']).withColumn('udf', udf('A')).collect() + with self.sql_conf({ + "spark.sql.execution.pandas.arrowSafeTypeConversion": True}): + with self.assertRaisesRegexp(Exception, + "Exception thrown when converting pandas.Series"): + df.select(['A']).withColumn('udf', udf('A')).collect() # Disabling Arrow safe type check. with self.sql_conf({ @@ -225,35 +222,24 @@ def udf(column): df.select(['A']).withColumn('udf', udf('A')).collect() def test_pandas_udf_arrow_overflow(self): - from distutils.version import LooseVersion import pandas as pd - import pyarrow as pa df = self.spark.range(0, 1) @pandas_udf(returnType="byte") def udf(column): - return pd.Series([128]) - - # Arrow 0.11.0+ allows enabling or disabling safe type check. - if LooseVersion(pa.__version__) >= LooseVersion("0.11.0"): - # When enabling safe type check, Arrow 0.11.0+ disallows overflow cast. - with self.sql_conf({ - "spark.sql.execution.pandas.arrowSafeTypeConversion": True}): - with self.assertRaisesRegexp(Exception, - "Exception thrown when converting pandas.Series"): - df.withColumn('udf', udf('id')).collect() - - # Disabling safe type check, let Arrow do the cast anyway. - with self.sql_conf({"spark.sql.execution.pandas.arrowSafeTypeConversion": False}): + return pd.Series([128] * len(column)) + + # When enabling safe type check, Arrow 0.11.0+ disallows overflow cast. + with self.sql_conf({ + "spark.sql.execution.pandas.arrowSafeTypeConversion": True}): + with self.assertRaisesRegexp(Exception, + "Exception thrown when converting pandas.Series"): df.withColumn('udf', udf('id')).collect() - else: - # SQL config `arrowSafeTypeConversion` no matters for older Arrow. - # Overflow cast causes an error. - with self.sql_conf({"spark.sql.execution.pandas.arrowSafeTypeConversion": False}): - with self.assertRaisesRegexp(Exception, - "Integer value out of bounds"): - df.withColumn('udf', udf('id')).collect() + + # Disabling safe type check, let Arrow do the cast anyway. + with self.sql_conf({"spark.sql.execution.pandas.arrowSafeTypeConversion": False}): + df.withColumn('udf', udf('id')).collect() if __name__ == "__main__": diff --git a/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py b/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py index c8bad99a7705e..1d87c636ab34e 100644 --- a/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py +++ b/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py @@ -21,7 +21,6 @@ from collections import OrderedDict from decimal import Decimal -from distutils.version import LooseVersion from pyspark.sql import Row from pyspark.sql.functions import array, explode, col, lit, udf, sum, pandas_udf, PandasUDFType @@ -65,20 +64,17 @@ def test_supported_types(self): 1, 2, 3, 4, 5, 1.1, 2.2, Decimal(1.123), - [1, 2, 2], True, 'hello' + [1, 2, 2], True, 'hello', + bytearray([0x01, 0x02]) ] output_fields = [ ('id', IntegerType()), ('byte', ByteType()), ('short', ShortType()), ('int', IntegerType()), ('long', LongType()), ('float', FloatType()), ('double', DoubleType()), ('decim', DecimalType(10, 3)), - ('array', ArrayType(IntegerType())), ('bool', BooleanType()), ('str', StringType()) + ('array', ArrayType(IntegerType())), ('bool', BooleanType()), ('str', StringType()), + ('bin', BinaryType()) ] - # TODO: Add BinaryType to variables above once minimum pyarrow version is 0.10.0 - if LooseVersion(pa.__version__) >= LooseVersion("0.10.0"): - values.append(bytearray([0x01, 0x02])) - output_fields.append(('bin', BinaryType())) - output_schema = StructType([StructField(*x) for x in output_fields]) df = self.spark.createDataFrame([values], schema=output_schema) @@ -95,6 +91,7 @@ def test_supported_types(self): bool=False if pdf.bool else True, str=pdf.str + 'there', array=pdf.array, + bin=pdf.bin ), output_schema, PandasUDFType.GROUPED_MAP @@ -112,6 +109,7 @@ def test_supported_types(self): bool=False if pdf.bool else True, str=pdf.str + 'there', array=pdf.array, + bin=pdf.bin ), output_schema, PandasUDFType.GROUPED_MAP @@ -130,6 +128,7 @@ def test_supported_types(self): bool=False if pdf.bool else True, str=pdf.str + 'there', array=pdf.array, + bin=pdf.bin ), output_schema, PandasUDFType.GROUPED_MAP @@ -291,10 +290,6 @@ def test_unsupported_types(self): StructField('struct', StructType([StructField('l', LongType())])), ] - # TODO: Remove this if-statement once minimum pyarrow version is 0.10.0 - if LooseVersion(pa.__version__) < LooseVersion("0.10.0"): - unsupported_types.append(StructField('bin', BinaryType())) - for unsupported_type in unsupported_types: schema = StructType([StructField('id', LongType(), True), unsupported_type]) with QuietTest(self.sc): @@ -466,13 +461,8 @@ def invalid_positional_types(pdf): with QuietTest(self.sc): with self.assertRaisesRegexp(Exception, "KeyError: 'id'"): grouped_df.apply(column_name_typo).collect() - if LooseVersion(pa.__version__) < LooseVersion("0.11.0"): - # TODO: see ARROW-1949. Remove when the minimum PyArrow version becomes 0.11.0. - with self.assertRaisesRegexp(Exception, "No cast implemented"): - grouped_df.apply(invalid_positional_types).collect() - else: - with self.assertRaisesRegexp(Exception, "an integer is required"): - grouped_df.apply(invalid_positional_types).collect() + with self.assertRaisesRegexp(Exception, "an integer is required"): + grouped_df.apply(invalid_positional_types).collect() def test_positional_assignment_conf(self): with self.sql_conf({ diff --git a/python/pyspark/sql/tests/test_pandas_udf_scalar.py b/python/pyspark/sql/tests/test_pandas_udf_scalar.py index dd0bd20635d6f..8138696137bc0 100644 --- a/python/pyspark/sql/tests/test_pandas_udf_scalar.py +++ b/python/pyspark/sql/tests/test_pandas_udf_scalar.py @@ -28,7 +28,6 @@ from datetime import date, datetime from decimal import Decimal -from distutils.version import LooseVersion from pyspark.rdd import PythonEvalType from pyspark.sql import Column @@ -240,19 +239,12 @@ def test_vectorized_udf_datatype_string(self): self.assertEquals(df.collect(), res.collect()) def test_vectorized_udf_null_binary(self): - if LooseVersion(pa.__version__) < LooseVersion("0.10.0"): - with QuietTest(self.sc): - with self.assertRaisesRegexp( - NotImplementedError, - 'Invalid returnType.*scalar Pandas UDF.*BinaryType'): - pandas_udf(lambda x: x, BinaryType()) - else: - data = [(bytearray(b"a"),), (None,), (bytearray(b"bb"),), (bytearray(b"ccc"),)] - schema = StructType().add("binary", BinaryType()) - df = self.spark.createDataFrame(data, schema) - str_f = pandas_udf(lambda x: x, BinaryType()) - res = df.select(str_f(col('binary'))) - self.assertEquals(df.collect(), res.collect()) + data = [(bytearray(b"a"),), (None,), (bytearray(b"bb"),), (bytearray(b"ccc"),)] + schema = StructType().add("binary", BinaryType()) + df = self.spark.createDataFrame(data, schema) + str_f = pandas_udf(lambda x: x, BinaryType()) + res = df.select(str_f(col('binary'))) + self.assertEquals(df.collect(), res.collect()) def test_vectorized_udf_array_type(self): data = [([1, 2],), ([3, 4],)] @@ -293,15 +285,7 @@ def func(id): struct_f = pandas_udf(lambda x: x, return_type) actual = df.select(struct_f(struct(col('id'), col('id').cast('string').alias('str')))) - if LooseVersion(pa.__version__) < LooseVersion("0.10.0"): - with QuietTest(self.sc): - from py4j.protocol import Py4JJavaError - with self.assertRaisesRegexp( - Py4JJavaError, - 'Unsupported type in conversion from Arrow'): - self.assertEqual(expected, actual.collect()) - else: - self.assertEqual(expected, actual.collect()) + self.assertEqual(expected, actual.collect()) def test_vectorized_udf_struct_complex(self): df = self.spark.range(10) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 0e299fbc3d53e..115ad0e7f5b14 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -1584,7 +1584,6 @@ def convert(self, obj, gateway_client): def to_arrow_type(dt): """ Convert Spark data type to pyarrow type """ - from distutils.version import LooseVersion import pyarrow as pa if type(dt) == BooleanType: arrow_type = pa.bool_() @@ -1605,10 +1604,6 @@ def to_arrow_type(dt): elif type(dt) == StringType: arrow_type = pa.string() elif type(dt) == BinaryType: - # TODO: remove version check once minimum pyarrow version is 0.10.0 - if LooseVersion(pa.__version__) < LooseVersion("0.10.0"): - raise TypeError("Unsupported type in conversion to Arrow: " + str(dt) + - "\nPlease install pyarrow >= 0.10.0 for BinaryType support.") arrow_type = pa.binary() elif type(dt) == DateType: arrow_type = pa.date32() @@ -1642,8 +1637,6 @@ def to_arrow_schema(schema): def from_arrow_type(at): """ Convert pyarrow type to Spark data type. """ - from distutils.version import LooseVersion - import pyarrow as pa import pyarrow.types as types if types.is_boolean(at): spark_type = BooleanType() @@ -1664,10 +1657,6 @@ def from_arrow_type(at): elif types.is_string(at): spark_type = StringType() elif types.is_binary(at): - # TODO: remove version check once minimum pyarrow version is 0.10.0 - if LooseVersion(pa.__version__) < LooseVersion("0.10.0"): - raise TypeError("Unsupported type in conversion from Arrow: " + str(at) + - "\nPlease install pyarrow >= 0.10.0 for BinaryType support.") spark_type = BinaryType() elif types.is_date32(at): spark_type = DateType() @@ -1684,10 +1673,6 @@ def from_arrow_type(at): spark_type = ArrayType(from_arrow_type(at.value_type)) elif types.is_struct(at): - # TODO: remove version check once minimum pyarrow version is 0.10.0 - if LooseVersion(pa.__version__) < LooseVersion("0.10.0"): - raise TypeError("Unsupported type in conversion from Arrow: " + str(at) + - "\nPlease install pyarrow >= 0.10.0 for StructType support.") if any(types.is_struct(field.type) for field in at): raise TypeError("Nested StructType not supported in conversion from Arrow: " + str(at)) return StructType( @@ -1706,61 +1691,13 @@ def from_arrow_schema(arrow_schema): for field in arrow_schema]) -def _arrow_column_to_pandas(column, data_type): - """ Convert Arrow Column to pandas Series. - - :param series: pyarrow.lib.Column - :param data_type: a Spark data type for the column - """ - import pandas as pd - import pyarrow as pa - from distutils.version import LooseVersion - # If the given column is a date type column, creates a series of datetime.date directly instead - # of creating datetime64[ns] as intermediate data to avoid overflow caused by datetime64[ns] - # type handling. - if LooseVersion(pa.__version__) < LooseVersion("0.11.0"): - if type(data_type) == DateType: - return pd.Series(column.to_pylist(), name=column.name) - else: - return column.to_pandas() - else: - # Since Arrow 0.11.0, support date_as_object to return datetime.date instead of - # np.datetime64. - return column.to_pandas(date_as_object=True) - - -def _arrow_table_to_pandas(table, schema): - """ Convert Arrow Table to pandas DataFrame. - - Pandas DataFrame created from PyArrow uses datetime64[ns] for date type values, but we should - use datetime.date to match the behavior with when Arrow optimization is disabled. - - :param table: pyarrow.lib.Table - :param schema: a Spark schema of the pyarrow.lib.Table - """ - import pandas as pd - import pyarrow as pa - from distutils.version import LooseVersion - # If the given table contains a date type column, use `_arrow_column_to_pandas` for pyarrow<0.11 - # or use `date_as_object` option for pyarrow>=0.11 to avoid creating datetime64[ns] as - # intermediate data. - if LooseVersion(pa.__version__) < LooseVersion("0.11.0"): - if any(type(field.dataType) == DateType for field in schema): - return pd.concat([_arrow_column_to_pandas(column, field.dataType) - for column, field in zip(table.itercolumns(), schema)], axis=1) - else: - return table.to_pandas() - else: - return table.to_pandas(date_as_object=True) - - def _infer_binary_columns_as_arrow_string(schema, pandas_df): import pandas as pd import pyarrow as pa for field_index, field in enumerate(schema): if field.type == pa.binary() and \ - pd.api.types.infer_dtype(pandas_df.iloc[:, field_index]) == "string": + pd.api.types.infer_dtype(pandas_df.iloc[:, field_index], skipna=True) == "string": field_as_string = pa.field(field.name, pa.string()) schema = schema.set(field_index, field_as_string) diff --git a/python/pyspark/sql/utils.py b/python/pyspark/sql/utils.py index bdb3a1467f1d8..709d3a0642616 100644 --- a/python/pyspark/sql/utils.py +++ b/python/pyspark/sql/utils.py @@ -136,7 +136,7 @@ def require_minimum_pyarrow_version(): """ Raise ImportError if minimum version of pyarrow is not installed """ # TODO(HyukjinKwon): Relocate and deduplicate the version specification. - minimum_pyarrow_version = "0.8.0" + minimum_pyarrow_version = "0.12.1" from distutils.version import LooseVersion try: diff --git a/python/setup.py b/python/setup.py index 22f0940db93e1..503c721d5a534 100644 --- a/python/setup.py +++ b/python/setup.py @@ -100,10 +100,11 @@ def _supports_symlinks(): file=sys.stderr) sys.exit(-1) -# If you are changing the versions here, please also change ./python/pyspark/sql/utils.py and -# ./python/run-tests.py. In case of Arrow, you should also check ./pom.xml. +# If you are changing the versions here, please also change ./python/pyspark/sql/utils.py +# For Arrow, you should also check ./pom.xml and ensure there are no breaking changes in the +# binary format protocol with the Java version, see ARROW_HOME/format/* for specifications. _minimum_pandas_version = "0.19.2" -_minimum_pyarrow_version = "0.8.0" +_minimum_pyarrow_version = "0.12.1" try: # We copy the shell script to be under pyspark/python/pyspark so that the launcher scripts