From a94685b4c0012ca1fd28142937e439a9262b41fa Mon Sep 17 00:00:00 2001 From: Han Wang Date: Tue, 5 Sep 2023 07:25:05 +0000 Subject: [PATCH 01/13] Improve comments to be LLM friendly --- fugue/dataframe/api.py | 47 +++++++++++++++++++++++++++++------------- 1 file changed, 33 insertions(+), 14 deletions(-) diff --git a/fugue/dataframe/api.py b/fugue/dataframe/api.py index 265af619..e27ddec7 100644 --- a/fugue/dataframe/api.py +++ b/fugue/dataframe/api.py @@ -11,12 +11,12 @@ @fugue_plugin def is_df(df: Any) -> bool: - """Whether ``df`` is a DataFrame like object""" + """Whether the input object is any type of DataFrame""" return isinstance(df, DataFrame) def get_native_as_df(df: AnyDataFrame) -> AnyDataFrame: - """Return the dataframe form of the input ``df``. + """Return the dataframe form of any dataframe. If ``df`` is a :class:`~.DataFrame`, then call the :meth:`~.DataFrame.native_as_df`, otherwise, it depends on whether there is a correspondent function handling it. @@ -30,30 +30,49 @@ def get_native_as_df(df: AnyDataFrame) -> AnyDataFrame: @fugue_plugin def get_schema(df: AnyDataFrame) -> Schema: - """Get the schema of the ``df`` + """The generic function to get the schema of any dataframe :param df: the object that can be recognized as a dataframe by Fugue :return: the Schema object + + .. admonition:: Examples + + .. code-block:: python + + import fugue.api as fa + import pandas as pd + + df = pd.DataFrame([[0,1],[2,3]], columns=["a","b"]) + fa.get_schema(df) # == Schema("a:long,b:long") + + .. related_topics + How to get schema of any dataframe using Fugue? """ return as_fugue_df(df).schema @fugue_plugin def as_pandas(df: AnyDataFrame) -> pd.DataFrame: - """Convert ``df`` to a Pandas DataFrame + """The generic function to convert any dataframe to a Pandas DataFrame :param df: the object that can be recognized as a dataframe by Fugue :return: the Pandas DataFrame + + .. related_topics + How to convert any dataframe to a pandas dataframe? """ return as_fugue_df(df).as_pandas() @fugue_plugin def as_arrow(df: AnyDataFrame) -> pa.Table: - """Convert ``df`` to a PyArrow Table + """The generic function to convert any dataframe to a PyArrow Table :param df: the object that can be recognized as a dataframe by Fugue :return: the PyArrow Table + + .. related_topics + How to convert any dataframe to a pyarrow dataframe? """ return as_fugue_df(df).as_arrow() @@ -62,7 +81,7 @@ def as_arrow(df: AnyDataFrame) -> pa.Table: def as_array( df: AnyDataFrame, columns: Optional[List[str]] = None, type_safe: bool = False ) -> List[Any]: # pragma: no cover - """Convert df to 2-dimensional native python array + """The generic function to convert any dataframe to a 2-dimensional python array :param df: the object that can be recognized as a dataframe by Fugue :param columns: columns to extract, defaults to None @@ -81,7 +100,7 @@ def as_array( def as_array_iterable( df: AnyDataFrame, columns: Optional[List[str]] = None, type_safe: bool = False ) -> Iterable[Any]: # pragma: no cover - """Convert df to iterable of native python arrays + """The generic function to convert any dataframe to iterable of python arrays :param df: the object that can be recognized as a dataframe by Fugue :param columns: columns to extract, defaults to None @@ -101,7 +120,7 @@ def as_array_iterable( def as_dict_iterable( df: AnyDataFrame, columns: Optional[List[str]] = None ) -> Iterable[Dict[str, Any]]: - """Convert df to iterable of native python dicts + """Convert any dataframe to iterable of native python dicts :param df: the object that can be recognized as a dataframe by Fugue :param columns: columns to extract, defaults to None @@ -116,7 +135,7 @@ def as_dict_iterable( @fugue_plugin def peek_array(df: AnyDataFrame) -> List[Any]: - """Peek the first row of the dataframe as an array + """Peek the first row of any dataframe as an array :param df: the object that can be recognized as a dataframe by Fugue :return: the first row as an array @@ -126,7 +145,7 @@ def peek_array(df: AnyDataFrame) -> List[Any]: @fugue_plugin def peek_dict(df: AnyDataFrame) -> Dict[str, Any]: - """Peek the first row of the dataframe as a array + """Peek the first row of any dataframe as a array :param df: the object that can be recognized as a dataframe by Fugue :return: the first row as a dict @@ -141,7 +160,7 @@ def head( columns: Optional[List[str]] = None, as_fugue: bool = False, ) -> AnyDataFrame: - """Get first n rows of the dataframe as a new local bounded dataframe + """Get first n rows of any dataframe as a new local bounded dataframe :param n: number of rows :param columns: selected columns, defaults to None (all columns) @@ -160,7 +179,7 @@ def head( def alter_columns( df: AnyDataFrame, columns: Any, as_fugue: bool = False ) -> AnyDataFrame: - """Change column types + """Change column data types of any dataframe :param df: the object that can be recognized as a dataframe by Fugue :param columns: |SchemaLikeObject|, @@ -178,7 +197,7 @@ def alter_columns( def drop_columns( df: AnyDataFrame, columns: List[str], as_fugue: bool = False ) -> AnyDataFrame: - """Drop certain columns and return a new dataframe + """Drop certain columns of any dataframe :param df: the object that can be recognized as a dataframe by Fugue :param columns: columns to drop @@ -194,7 +213,7 @@ def drop_columns( def select_columns( df: AnyDataFrame, columns: List[Any], as_fugue: bool = False ) -> AnyDataFrame: - """Select certain columns and return a new dataframe + """Select certain columns of any dataframe and return a new dataframe :param df: the object that can be recognized as a dataframe by Fugue :param columns: columns to return From c07e364f51e9f11d5fdc618de083a93f914231c0 Mon Sep 17 00:00:00 2001 From: Han Wang Date: Tue, 3 Oct 2023 06:21:57 +0000 Subject: [PATCH 02/13] update --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index f546e29f..89f9dd80 100644 --- a/README.md +++ b/README.md @@ -252,7 +252,7 @@ Feel free to message us on [Slack](http://slack.fugue.ai). We also have [contrib * [How LyftLearn Democratizes Distributed Compute through Kubernetes Spark and Fugue](https://eng.lyft.com/how-lyftlearn-democratizes-distributed-compute-through-kubernetes-spark-and-fugue-c0875b97c3d9) * [Clobotics - Large Scale Image Processing with Spark through Fugue](https://medium.com/fugue-project/large-scale-image-processing-with-spark-through-fugue-e510b9813da8) * [Architecture for a data lake REST API using Delta Lake, Fugue & Spark (article by bitsofinfo)](https://bitsofinfo.wordpress.com/2023/08/14/data-lake-rest-api-delta-lake-fugue-spark) - + ### Mentioned Uses * [Productionizing Data Science at Interos, Inc. (LinkedIn post by Anthony Holten)](https://www.linkedin.com/posts/anthony-holten_pandas-spark-dask-activity-7022628193983459328-QvcF) From f968de7ed728ad98ed9c95acb7ae774ee8ea06d7 Mon Sep 17 00:00:00 2001 From: Han Wang Date: Tue, 3 Oct 2023 07:56:19 +0000 Subject: [PATCH 03/13] update --- fugue_dask/_io.py | 5 +++++ fugue_spark/_utils/io.py | 4 +++- scripts/setupsparkconnect.sh | 4 ++-- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/fugue_dask/_io.py b/fugue_dask/_io.py index 70375841..e4d0001b 100644 --- a/fugue_dask/_io.py +++ b/fugue_dask/_io.py @@ -1,5 +1,6 @@ from typing import Any, Callable, Dict, List, Optional, Tuple, Union +import fsspec import fs as pfs import pandas as pd from dask import dataframe as dd @@ -96,6 +97,8 @@ def _load_parquet( def _save_csv(df: DaskDataFrame, p: FileParser, **kwargs: Any) -> None: + fs, path = fsspec.core.url_to_fs(p.uri) + fs.makedirs(path, exist_ok=True) df.native.to_csv( pfs.path.combine(p.uri, "*.csv"), **{"index": False, "header": False, **kwargs} ) @@ -145,6 +148,8 @@ def _load_csv( # noqa: C901 def _save_json(df: DaskDataFrame, p: FileParser, **kwargs: Any) -> None: + fs, path = fsspec.core.url_to_fs(p.uri) + fs.makedirs(path, exist_ok=True) df.native.to_json(pfs.path.combine(p.uri, "*.json"), **kwargs) diff --git a/fugue_spark/_utils/io.py b/fugue_spark/_utils/io.py index fd4e8bf9..9bf3d0a4 100644 --- a/fugue_spark/_utils/io.py +++ b/fugue_spark/_utils/io.py @@ -9,7 +9,7 @@ from fugue._utils.io import FileParser, save_df from fugue.collections.partition import PartitionSpec -from fugue.dataframe import DataFrame +from fugue.dataframe import DataFrame, PandasDataFrame from fugue_spark.dataframe import SparkDataFrame from .convert import to_schema, to_spark_schema @@ -62,6 +62,8 @@ def save_df( writer.save(uri) else: ldf = df.as_local() + if isinstance(ldf, PandasDataFrame) and hasattr(ldf.native, "attrs"): + ldf.native.attrs = {} save_df(ldf, uri, format_hint=format_hint, mode=mode, fs=self._fs, **kwargs) def _get_writer( diff --git a/scripts/setupsparkconnect.sh b/scripts/setupsparkconnect.sh index 87c28c12..8e1911fc 100644 --- a/scripts/setupsparkconnect.sh +++ b/scripts/setupsparkconnect.sh @@ -1,3 +1,3 @@ -wget https://dlcdn.apache.org/spark/spark-3.4.0/spark-3.4.0-bin-hadoop3.tgz -O - | tar -xz -C /tmp +wget https://dlcdn.apache.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz -O - | tar -xz -C /tmp # export SPARK_NO_DAEMONIZE=1 -bash /tmp/spark-3.4.0-bin-hadoop3/sbin/start-connect-server.sh --jars https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.4.0/spark-connect_2.12-3.4.0.jar +bash /tmp/spark-3.5.0-bin-hadoop3/sbin/start-connect-server.sh --jars https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.0/spark-connect_2.12-3.5.0.jar From 6a45db849c561c8617f3b9cc95bb1cf0b8bfe2e6 Mon Sep 17 00:00:00 2001 From: Han Wang Date: Wed, 4 Oct 2023 06:49:29 +0000 Subject: [PATCH 04/13] update --- tests/fugue_spark/test_dataframe.py | 3 +-- tests/fugue_spark/test_execution_engine.py | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/fugue_spark/test_dataframe.py b/tests/fugue_spark/test_dataframe.py index d4e19dc5..9b706280 100644 --- a/tests/fugue_spark/test_dataframe.py +++ b/tests/fugue_spark/test_dataframe.py @@ -23,8 +23,7 @@ def init_session(self, spark_session): self.spark_session = spark_session def df(self, data: Any = None, schema: Any = None) -> SparkDataFrame: - session = SparkSession.builder.getOrCreate() - engine = SparkExecutionEngine(session) + engine = SparkExecutionEngine(self.spark_session) return engine.to_df(data, schema=schema) def test_alter_columns_invalid(self): diff --git a/tests/fugue_spark/test_execution_engine.py b/tests/fugue_spark/test_execution_engine.py index f54b4864..fba2b0a3 100644 --- a/tests/fugue_spark/test_execution_engine.py +++ b/tests/fugue_spark/test_execution_engine.py @@ -224,7 +224,7 @@ def test_any_column_name(self): def test_default_session(self): a = FugueWorkflow().df([[0]], "a:int") - df_eq(a.compute(SparkExecutionEngine), [[0]], "a:int") + df_eq(a.compute(self.spark_session), [[0]], "a:int") def test_repartition(self): with FugueWorkflow() as dag: From 40d9c1e999422021bc13c3bda7beacac138fdad5 Mon Sep 17 00:00:00 2001 From: Han Wang Date: Wed, 4 Oct 2023 07:01:53 +0000 Subject: [PATCH 05/13] update --- tests/fugue_spark/test_execution_engine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/fugue_spark/test_execution_engine.py b/tests/fugue_spark/test_execution_engine.py index fba2b0a3..e7e20133 100644 --- a/tests/fugue_spark/test_execution_engine.py +++ b/tests/fugue_spark/test_execution_engine.py @@ -212,7 +212,7 @@ def make_engine(self): def test_df_init(self): sdf = self.spark_session.createDataFrame([[1.1]], "a:double") a = FugueWorkflow().df(sdf) - df_eq(a.compute(SparkExecutionEngine), [[1.1]], "a:double") + df_eq(a.compute(self.spark_session), [[1.1]], "a:double") def test_yield_table(self): pass From 571a700d763653d94349103c8c74c9a056d9c912 Mon Sep 17 00:00:00 2001 From: Han Wang Date: Wed, 4 Oct 2023 07:59:51 +0000 Subject: [PATCH 06/13] update --- fugue/dataframe/utils.py | 6 ++++-- fugue_test/builtin_suite.py | 2 +- tests/fugue_spark/utils/test_convert.py | 5 ++++- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/fugue/dataframe/utils.py b/fugue/dataframe/utils.py index 952bfb72..83d5a36c 100644 --- a/fugue/dataframe/utils.py +++ b/fugue/dataframe/utils.py @@ -82,17 +82,19 @@ def _df_eq( ), f"schema mismatch {df.schema.pa_schema}, {df2.schema.pa_schema}" if not check_content: return True + cols: Any = df1.columns if no_pandas: dd1 = [[x.__repr__()] for x in df1.as_array_iterable(type_safe=True)] dd2 = [[x.__repr__()] for x in df2.as_array_iterable(type_safe=True)] d1 = pd.DataFrame(dd1, columns=["data"]) d2 = pd.DataFrame(dd2, columns=["data"]) + cols = ["data"] else: d1 = df1.as_pandas() d2 = df2.as_pandas() if not check_order: - d1 = d1.sort_values(df1.columns) - d2 = d2.sort_values(df1.columns) + d1 = d1.sort_values(cols) + d2 = d2.sort_values(cols) d1 = d1.reset_index(drop=True) d2 = d2.reset_index(drop=True) pd.testing.assert_frame_equal( diff --git a/fugue_test/builtin_suite.py b/fugue_test/builtin_suite.py index 510aa330..aa3af323 100644 --- a/fugue_test/builtin_suite.py +++ b/fugue_test/builtin_suite.py @@ -1329,7 +1329,7 @@ def transform(self, df): [[datetime.date(2020, 1, 1), datetime.datetime(2020, 1, 2)]], "a:date,b:datetime", ) - b.assert_eq(a) + b.assert_eq(a, no_pandas=True) c = dag.df([["2020-01-01", "2020-01-01 00:00:00"]], "a:date,b:datetime") c.transform(T2).assert_eq(c) c.partition(by=["a"]).transform(T2).assert_eq(c) diff --git a/tests/fugue_spark/utils/test_convert.py b/tests/fugue_spark/utils/test_convert.py index 07b4da58..55265049 100644 --- a/tests/fugue_spark/utils/test_convert.py +++ b/tests/fugue_spark/utils/test_convert.py @@ -1,5 +1,7 @@ import pyarrow as pa from pytest import raises +import pyspark +from packaging import version from triad import Schema from fugue_spark._utils.convert import ( @@ -15,7 +17,8 @@ def test_pandas_udf_can_accept(): for is_input in [True, False]: assert pandas_udf_can_accept(Schema("a:int,b:str"), is_input) assert pandas_udf_can_accept(Schema("a:int,b:[str],c:[float]"), is_input) - assert not pandas_udf_can_accept(Schema("a:int,b:[datetime]"), is_input) + if version.parse(pyspark.__version__) < version.parse("3.5"): + assert not pandas_udf_can_accept(Schema("a:int,b:[datetime]"), is_input) assert pandas_udf_can_accept(Schema("a:int,b:{a:int}"), True) assert not pandas_udf_can_accept(Schema("a:int,b:{a:int}"), False) From 19d424dd79d66e72d95b9131c28696b855b0754b Mon Sep 17 00:00:00 2001 From: Han Wang Date: Wed, 4 Oct 2023 08:15:21 +0000 Subject: [PATCH 07/13] update --- fugue/execution/native_execution_engine.py | 2 +- tests/fugue/execution/test_naive_execution_engine.py | 11 ++++++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/fugue/execution/native_execution_engine.py b/fugue/execution/native_execution_engine.py index b9f5b8cc..6cae5f10 100644 --- a/fugue/execution/native_execution_engine.py +++ b/fugue/execution/native_execution_engine.py @@ -143,7 +143,7 @@ def map_dataframe( if ( isinstance(output_df, PandasDataFrame) and output_df.schema != output_schema - ): + ): # pragma: no cover output_df = PandasDataFrame(output_df.native, output_schema) assert_or_throw( output_df.schema == output_schema, diff --git a/tests/fugue/execution/test_naive_execution_engine.py b/tests/fugue/execution/test_naive_execution_engine.py index d7a0bcc7..051a31cd 100644 --- a/tests/fugue/execution/test_naive_execution_engine.py +++ b/tests/fugue/execution/test_naive_execution_engine.py @@ -4,7 +4,7 @@ import pyarrow as pa import fugue.api as fa -from fugue import FugueWorkflow, NativeExecutionEngine, QPDPandasEngine +from fugue import FugueWorkflow, NativeExecutionEngine, QPDPandasEngine, PandasDataFrame from fugue.execution.execution_engine import _get_file_threshold from fugue_test.builtin_suite import BuiltInTests from fugue_test.execution_suite import ExecutionEngineTests @@ -99,6 +99,15 @@ def num_part(df: pd.DataFrame) -> pd.DataFrame: res = fa.transform(df, num_part, schema="*,b:long", partition=2) assert res.values.tolist() == [[1, 2], [2, 2], [3, 1]] + def test_pandas_df_convert(self): + def tr(df: pd.DataFrame) -> pd.DataFrame: + return df + + pdf = PandasDataFrame(pd.DataFrame(dict(a=[1]))) + df = fa.transform(pdf, tr, schema="a:str") + assert fa.get_schema(df) == "a:str" + assert fa.as_array(df) == [["1"]] + def test_get_file_threshold(): assert -1 == _get_file_threshold(None) From b2e3bb63f795ff9a16e3c093724f592c25431e5e Mon Sep 17 00:00:00 2001 From: Han Wang Date: Thu, 5 Oct 2023 02:01:00 +0000 Subject: [PATCH 08/13] Switch method for Spark df to arrow --- fugue_spark/_utils/convert.py | 18 ++++++++++++++++-- fugue_spark/dataframe.py | 25 +++++++++++++++++++++++-- fugue_spark/execution_engine.py | 10 +++++++--- 3 files changed, 46 insertions(+), 7 deletions(-) diff --git a/fugue_spark/_utils/convert.py b/fugue_spark/_utils/convert.py index 062ff565..090cdf6c 100644 --- a/fugue_spark/_utils/convert.py +++ b/fugue_spark/_utils/convert.py @@ -16,7 +16,7 @@ ) from triad.collections import Schema from triad.utils.assertion import assert_arg_not_none, assert_or_throw -from triad.utils.pyarrow import TRIAD_DEFAULT_TIMESTAMP +from triad.utils.pyarrow import TRIAD_DEFAULT_TIMESTAMP, cast_pa_table from triad.utils.schema import quote_name import fugue.api as fa @@ -41,7 +41,7 @@ def pandas_udf_can_accept(schema: Schema, is_input: bool) -> bool: return False to_arrow_schema(from_arrow_schema(schema.pa_schema)) return True - except Exception: + except Exception: # pragma: no cover return False @@ -189,6 +189,20 @@ def serialize(dfs): # pragma: no cover return pd.concat(pickle.loads(x.data) for x in sdf.collect()) +def to_arrow(df: ps.DataFrame) -> pa.Table: + schema = to_schema(df.schema) + if hasattr(df, "_collect_as_arrow"): + destruct = df.sparkSession._jconf.arrowPySparkSelfDestructEnabled() + batches = df._collect_as_arrow(split_batches=destruct) + if len(batches) == 0: + return schema.create_empty_arrow_table() + table = pa.Table.from_batches(batches) + del batches + else: # pragma: no cover + table = pa.Table.from_pandas(to_pandas(df)) + return cast_pa_table(table, schema.pa_schema) + + # TODO: the following function always set nullable to true, # but should we use field.nullable? def _to_arrow_type(dt: pt.DataType) -> pa.DataType: diff --git a/fugue_spark/dataframe.py b/fugue_spark/dataframe.py index f1e133b3..aab9f29d 100644 --- a/fugue_spark/dataframe.py +++ b/fugue_spark/dataframe.py @@ -17,7 +17,9 @@ ) from fugue.exceptions import FugueDataFrameOperationError from fugue.plugins import ( + as_arrow, as_local_bounded, + as_pandas, count, drop_columns, get_column_names, @@ -31,7 +33,13 @@ select_columns, ) -from ._utils.convert import to_cast_expression, to_pandas, to_schema, to_type_safe_input +from ._utils.convert import ( + to_arrow, + to_cast_expression, + to_pandas, + to_schema, + to_type_safe_input, +) from ._utils.misc import is_spark_connect, is_spark_dataframe @@ -128,7 +136,10 @@ def _select_cols(self, cols: List[Any]) -> DataFrame: return SparkDataFrame(self.native[schema.names]) def as_pandas(self) -> pd.DataFrame: - return to_pandas(self.native) + return _spark_df_as_pandas(self.native) + + def as_arrow(self, type_safe: bool = False) -> pa.Table: + return _spark_df_as_arrow(self.native) def rename(self, columns: Dict[str, str]) -> DataFrame: try: @@ -192,6 +203,16 @@ def _spark_is_df(df: ps.DataFrame) -> bool: return True +@as_arrow.candidate(lambda df: isinstance(df, ps.DataFrame)) +def _spark_df_as_arrow(df: ps.DataFrame) -> pd.DataFrame: + return to_arrow(df) + + +@as_pandas.candidate(lambda df: isinstance(df, ps.DataFrame)) +def _spark_df_as_pandas(df: ps.DataFrame) -> pd.DataFrame: + return to_pandas(df) + + @get_num_partitions.candidate(lambda df: is_spark_dataframe(df)) def _spark_num_partitions(df: ps.DataFrame) -> int: return df.rdd.getNumPartitions() diff --git a/fugue_spark/execution_engine.py b/fugue_spark/execution_engine.py index b895d6ce..4a3d917b 100644 --- a/fugue_spark/execution_engine.py +++ b/fugue_spark/execution_engine.py @@ -5,6 +5,7 @@ import pandas as pd import pyarrow as pa import pyspark.sql as ps +from py4j.protocol import Py4JError from pyspark import StorageLevel from pyspark.rdd import RDD from pyspark.sql import SparkSession @@ -350,9 +351,12 @@ def __init__(self, spark_session: Optional[SparkSession] = None, conf: Any = Non self._spark_session = spark_session cf = dict(FUGUE_SPARK_DEFAULT_CONF) if not self.is_spark_connect: - cf.update( - {x[0]: x[1] for x in spark_session.sparkContext.getConf().getAll()} - ) + try: + spark_conf = spark_session.sparkContext.getConf() + cf.update({x[0]: x[1] for x in spark_conf.getAll()}) + except Py4JError: # pragma: no cover: + # edge case: https://github.com/fugue-project/fugue/issues/517z + pass cf.update(ParamDict(conf)) super().__init__(cf) self._lock = SerializableRLock() From d09c08a45277ea7e53ce052da6111339ee858d69 Mon Sep 17 00:00:00 2001 From: Han Wang Date: Thu, 5 Oct 2023 05:12:58 +0000 Subject: [PATCH 09/13] update --- fugue_spark/_utils/convert.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/fugue_spark/_utils/convert.py b/fugue_spark/_utils/convert.py index 090cdf6c..3756f874 100644 --- a/fugue_spark/_utils/convert.py +++ b/fugue_spark/_utils/convert.py @@ -1,5 +1,6 @@ import pickle -from typing import Any, Iterable, List, Tuple +from typing import Any, Iterable, List, Tuple, Optional + import pandas as pd import pyarrow as pa @@ -191,8 +192,13 @@ def serialize(dfs): # pragma: no cover def to_arrow(df: ps.DataFrame) -> pa.Table: schema = to_schema(df.schema) - if hasattr(df, "_collect_as_arrow"): + destruct: Optional[bool] = None + try: destruct = df.sparkSession._jconf.arrowPySparkSelfDestructEnabled() + except Exception: # pragma: no cover + # older spark does not have this config + pass + if destruct is not None and hasattr(df, "_collect_as_arrow"): batches = df._collect_as_arrow(split_batches=destruct) if len(batches) == 0: return schema.create_empty_arrow_table() From 600be4e0e07b17f495e0cba4a78889a2fbed1880 Mon Sep 17 00:00:00 2001 From: Han Wang Date: Thu, 5 Oct 2023 05:46:58 +0000 Subject: [PATCH 10/13] update --- fugue_spark/_utils/convert.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/fugue_spark/_utils/convert.py b/fugue_spark/_utils/convert.py index 3756f874..e693a98b 100644 --- a/fugue_spark/_utils/convert.py +++ b/fugue_spark/_utils/convert.py @@ -194,7 +194,11 @@ def to_arrow(df: ps.DataFrame) -> pa.Table: schema = to_schema(df.schema) destruct: Optional[bool] = None try: - destruct = df.sparkSession._jconf.arrowPySparkSelfDestructEnabled() + jconf = df.sparkSession._jconf + if jconf.arrowPySparkEnabled() and pandas_udf_can_accept( + schema, is_input=False + ): + destruct = jconf.arrowPySparkSelfDestructEnabled() except Exception: # pragma: no cover # older spark does not have this config pass From 70a5c1a2d87b6aa1866d199a4b3e521a46033f5d Mon Sep 17 00:00:00 2001 From: Han Wang Date: Thu, 5 Oct 2023 06:52:44 +0000 Subject: [PATCH 11/13] update --- fugue_spark/_utils/convert.py | 7 ++++--- fugue_spark/dataframe.py | 9 ++------- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/fugue_spark/_utils/convert.py b/fugue_spark/_utils/convert.py index e693a98b..3328c563 100644 --- a/fugue_spark/_utils/convert.py +++ b/fugue_spark/_utils/convert.py @@ -174,7 +174,7 @@ def pd_to_spark_df( def to_pandas(df: ps.DataFrame) -> pd.DataFrame: - if pd.__version__ < "2" or not any( + if version.parse(pd.__version__) < version.parse("2.0.0") or not any( isinstance(x.dataType, (pt.TimestampType, TimestampNTZType)) for x in df.schema.fields ): @@ -208,9 +208,10 @@ def to_arrow(df: ps.DataFrame) -> pa.Table: return schema.create_empty_arrow_table() table = pa.Table.from_batches(batches) del batches + return cast_pa_table(table, schema.pa_schema) else: # pragma: no cover - table = pa.Table.from_pandas(to_pandas(df)) - return cast_pa_table(table, schema.pa_schema) + # df.toPandas has bugs on nested types + return pa.Table.from_pylist(df.collect(), schema=schema.pa_schema) # TODO: the following function always set nullable to true, diff --git a/fugue_spark/dataframe.py b/fugue_spark/dataframe.py index aab9f29d..81806091 100644 --- a/fugue_spark/dataframe.py +++ b/fugue_spark/dataframe.py @@ -9,11 +9,10 @@ from triad.utils.assertion import assert_or_throw from fugue.dataframe import ( - ArrayDataFrame, + ArrowDataFrame, DataFrame, IterableDataFrame, LocalBoundedDataFrame, - PandasDataFrame, ) from fugue.exceptions import FugueDataFrameOperationError from fugue.plugins import ( @@ -100,11 +99,7 @@ def is_bounded(self) -> bool: return True def as_local_bounded(self) -> LocalBoundedDataFrame: - if any(pa.types.is_nested(t) for t in self.schema.types): - data = list(to_type_safe_input(self.native.collect(), self.schema)) - res: LocalBoundedDataFrame = ArrayDataFrame(data, self.schema) - else: - res = PandasDataFrame(self.as_pandas(), self.schema) + res = ArrowDataFrame(self.as_arrow()) if self.has_metadata: res.reset_metadata(self.metadata) return res From 2f9f36482c6396f7de6ee33e83eb6f38aac855d4 Mon Sep 17 00:00:00 2001 From: Han Wang Date: Thu, 5 Oct 2023 07:23:59 +0000 Subject: [PATCH 12/13] update --- .github/workflows/test_all.yml | 6 ++++-- fugue_spark/_utils/io.py | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/.github/workflows/test_all.yml b/.github/workflows/test_all.yml index 1b7a531f..8130aca8 100644 --- a/.github/workflows/test_all.yml +++ b/.github/workflows/test_all.yml @@ -25,7 +25,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: [3.8] + python-version: [3.8, "3.10"] steps: - uses: actions/checkout@v2 @@ -36,10 +36,12 @@ jobs: - name: Install dependencies run: make devenv - name: Lint + if: matrix.python-version == '3.10' run: make lint - name: Test run: make test - name: "Upload coverage to Codecov" + if: matrix.python-version == '3.10' uses: codecov/codecov-action@v3 with: fail_ci_if_error: false @@ -49,7 +51,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: [3.9, "3.10"] + python-version: [3.9] steps: - uses: actions/checkout@v2 diff --git a/fugue_spark/_utils/io.py b/fugue_spark/_utils/io.py index 9bf3d0a4..c36a8856 100644 --- a/fugue_spark/_utils/io.py +++ b/fugue_spark/_utils/io.py @@ -63,7 +63,7 @@ def save_df( else: ldf = df.as_local() if isinstance(ldf, PandasDataFrame) and hasattr(ldf.native, "attrs"): - ldf.native.attrs = {} + ldf.native.attrs = {} # pragma: no cover save_df(ldf, uri, format_hint=format_hint, mode=mode, fs=self._fs, **kwargs) def _get_writer( From 937c431281a411ea09540360ce5cb2f7da890418 Mon Sep 17 00:00:00 2001 From: Han Wang Date: Thu, 5 Oct 2023 08:02:10 +0000 Subject: [PATCH 13/13] update --- fugue_spark/_utils/convert.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/fugue_spark/_utils/convert.py b/fugue_spark/_utils/convert.py index 3328c563..20029a79 100644 --- a/fugue_spark/_utils/convert.py +++ b/fugue_spark/_utils/convert.py @@ -133,7 +133,7 @@ def to_type_safe_input(rows: Iterable[ps.Row], schema: Schema) -> Iterable[List[ if r[i] is not None: r[i] = r[i].asDict(recursive=True) yield r - else: + else: # pragma: no cover for row in rows: data = row.asDict(recursive=True) r = [data[n] for n in schema.names] @@ -179,9 +179,9 @@ def to_pandas(df: ps.DataFrame) -> pd.DataFrame: for x in df.schema.fields ): return df.toPandas() - else: + else: # pragma: no cover - def serialize(dfs): # pragma: no cover + def serialize(dfs): for df in dfs: data = pickle.dumps(df) yield pd.DataFrame([[data]], columns=["data"])