Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve comments to be LLM friendly, improve Spark compatibility #511

Merged
merged 13 commits into from
Oct 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .github/workflows/test_all.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.8]
python-version: [3.8, "3.10"]

steps:
- uses: actions/checkout@v2
Expand All @@ -36,10 +36,12 @@ jobs:
- name: Install dependencies
run: make devenv
- name: Lint
if: matrix.python-version == '3.10'
run: make lint
- name: Test
run: make test
- name: "Upload coverage to Codecov"
if: matrix.python-version == '3.10'
uses: codecov/codecov-action@v3
with:
fail_ci_if_error: false
Expand All @@ -49,7 +51,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.9, "3.10"]
python-version: [3.9]

steps:
- uses: actions/checkout@v2
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ Feel free to message us on [Slack](http://slack.fugue.ai). We also have [contrib
* [How LyftLearn Democratizes Distributed Compute through Kubernetes Spark and Fugue](https://eng.lyft.com/how-lyftlearn-democratizes-distributed-compute-through-kubernetes-spark-and-fugue-c0875b97c3d9)
* [Clobotics - Large Scale Image Processing with Spark through Fugue](https://medium.com/fugue-project/large-scale-image-processing-with-spark-through-fugue-e510b9813da8)
* [Architecture for a data lake REST API using Delta Lake, Fugue & Spark (article by bitsofinfo)](https://bitsofinfo.wordpress.com/2023/08/14/data-lake-rest-api-delta-lake-fugue-spark)

### Mentioned Uses

* [Productionizing Data Science at Interos, Inc. (LinkedIn post by Anthony Holten)](https://www.linkedin.com/posts/anthony-holten_pandas-spark-dask-activity-7022628193983459328-QvcF)
Expand Down
47 changes: 33 additions & 14 deletions fugue/dataframe/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@

@fugue_plugin
def is_df(df: Any) -> bool:
"""Whether ``df`` is a DataFrame like object"""
"""Whether the input object is any type of DataFrame"""
return isinstance(df, DataFrame)


def get_native_as_df(df: AnyDataFrame) -> AnyDataFrame:
"""Return the dataframe form of the input ``df``.
"""Return the dataframe form of any dataframe.
If ``df`` is a :class:`~.DataFrame`, then call the
:meth:`~.DataFrame.native_as_df`, otherwise, it depends on whether there is
a correspondent function handling it.
Expand All @@ -30,30 +30,49 @@ def get_native_as_df(df: AnyDataFrame) -> AnyDataFrame:

@fugue_plugin
def get_schema(df: AnyDataFrame) -> Schema:
"""Get the schema of the ``df``
"""The generic function to get the schema of any dataframe

:param df: the object that can be recognized as a dataframe by Fugue
:return: the Schema object

.. admonition:: Examples

.. code-block:: python

import fugue.api as fa
import pandas as pd

df = pd.DataFrame([[0,1],[2,3]], columns=["a","b"])
fa.get_schema(df) # == Schema("a:long,b:long")

.. related_topics
How to get schema of any dataframe using Fugue?
"""
return as_fugue_df(df).schema


@fugue_plugin
def as_pandas(df: AnyDataFrame) -> pd.DataFrame:
"""Convert ``df`` to a Pandas DataFrame
"""The generic function to convert any dataframe to a Pandas DataFrame

:param df: the object that can be recognized as a dataframe by Fugue
:return: the Pandas DataFrame

.. related_topics
How to convert any dataframe to a pandas dataframe?
"""
return as_fugue_df(df).as_pandas()


@fugue_plugin
def as_arrow(df: AnyDataFrame) -> pa.Table:
"""Convert ``df`` to a PyArrow Table
"""The generic function to convert any dataframe to a PyArrow Table

:param df: the object that can be recognized as a dataframe by Fugue
:return: the PyArrow Table

.. related_topics
How to convert any dataframe to a pyarrow dataframe?
"""
return as_fugue_df(df).as_arrow()

Expand All @@ -62,7 +81,7 @@ def as_arrow(df: AnyDataFrame) -> pa.Table:
def as_array(
df: AnyDataFrame, columns: Optional[List[str]] = None, type_safe: bool = False
) -> List[Any]: # pragma: no cover
"""Convert df to 2-dimensional native python array
"""The generic function to convert any dataframe to a 2-dimensional python array

:param df: the object that can be recognized as a dataframe by Fugue
:param columns: columns to extract, defaults to None
Expand All @@ -81,7 +100,7 @@ def as_array(
def as_array_iterable(
df: AnyDataFrame, columns: Optional[List[str]] = None, type_safe: bool = False
) -> Iterable[Any]: # pragma: no cover
"""Convert df to iterable of native python arrays
"""The generic function to convert any dataframe to iterable of python arrays

:param df: the object that can be recognized as a dataframe by Fugue
:param columns: columns to extract, defaults to None
Expand All @@ -101,7 +120,7 @@ def as_array_iterable(
def as_dict_iterable(
df: AnyDataFrame, columns: Optional[List[str]] = None
) -> Iterable[Dict[str, Any]]:
"""Convert df to iterable of native python dicts
"""Convert any dataframe to iterable of native python dicts

:param df: the object that can be recognized as a dataframe by Fugue
:param columns: columns to extract, defaults to None
Expand All @@ -116,7 +135,7 @@ def as_dict_iterable(

@fugue_plugin
def peek_array(df: AnyDataFrame) -> List[Any]:
"""Peek the first row of the dataframe as an array
"""Peek the first row of any dataframe as an array

:param df: the object that can be recognized as a dataframe by Fugue
:return: the first row as an array
Expand All @@ -126,7 +145,7 @@ def peek_array(df: AnyDataFrame) -> List[Any]:

@fugue_plugin
def peek_dict(df: AnyDataFrame) -> Dict[str, Any]:
"""Peek the first row of the dataframe as a array
"""Peek the first row of any dataframe as a array

:param df: the object that can be recognized as a dataframe by Fugue
:return: the first row as a dict
Expand All @@ -141,7 +160,7 @@ def head(
columns: Optional[List[str]] = None,
as_fugue: bool = False,
) -> AnyDataFrame:
"""Get first n rows of the dataframe as a new local bounded dataframe
"""Get first n rows of any dataframe as a new local bounded dataframe

:param n: number of rows
:param columns: selected columns, defaults to None (all columns)
Expand All @@ -160,7 +179,7 @@ def head(
def alter_columns(
df: AnyDataFrame, columns: Any, as_fugue: bool = False
) -> AnyDataFrame:
"""Change column types
"""Change column data types of any dataframe

:param df: the object that can be recognized as a dataframe by Fugue
:param columns: |SchemaLikeObject|,
Expand All @@ -178,7 +197,7 @@ def alter_columns(
def drop_columns(
df: AnyDataFrame, columns: List[str], as_fugue: bool = False
) -> AnyDataFrame:
"""Drop certain columns and return a new dataframe
"""Drop certain columns of any dataframe

:param df: the object that can be recognized as a dataframe by Fugue
:param columns: columns to drop
Expand All @@ -194,7 +213,7 @@ def drop_columns(
def select_columns(
df: AnyDataFrame, columns: List[Any], as_fugue: bool = False
) -> AnyDataFrame:
"""Select certain columns and return a new dataframe
"""Select certain columns of any dataframe and return a new dataframe

:param df: the object that can be recognized as a dataframe by Fugue
:param columns: columns to return
Expand Down
6 changes: 4 additions & 2 deletions fugue/dataframe/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,19 @@ def _df_eq(
), f"schema mismatch {df.schema.pa_schema}, {df2.schema.pa_schema}"
if not check_content:
return True
cols: Any = df1.columns
if no_pandas:
dd1 = [[x.__repr__()] for x in df1.as_array_iterable(type_safe=True)]
dd2 = [[x.__repr__()] for x in df2.as_array_iterable(type_safe=True)]
d1 = pd.DataFrame(dd1, columns=["data"])
d2 = pd.DataFrame(dd2, columns=["data"])
cols = ["data"]
else:
d1 = df1.as_pandas()
d2 = df2.as_pandas()
if not check_order:
d1 = d1.sort_values(df1.columns)
d2 = d2.sort_values(df1.columns)
d1 = d1.sort_values(cols)
d2 = d2.sort_values(cols)
d1 = d1.reset_index(drop=True)
d2 = d2.reset_index(drop=True)
pd.testing.assert_frame_equal(
Expand Down
2 changes: 1 addition & 1 deletion fugue/execution/native_execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def map_dataframe(
if (
isinstance(output_df, PandasDataFrame)
and output_df.schema != output_schema
):
): # pragma: no cover
output_df = PandasDataFrame(output_df.native, output_schema)
assert_or_throw(
output_df.schema == output_schema,
Expand Down
5 changes: 5 additions & 0 deletions fugue_dask/_io.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Any, Callable, Dict, List, Optional, Tuple, Union

import fsspec
import fs as pfs
import pandas as pd
from dask import dataframe as dd
Expand Down Expand Up @@ -96,6 +97,8 @@ def _load_parquet(


def _save_csv(df: DaskDataFrame, p: FileParser, **kwargs: Any) -> None:
fs, path = fsspec.core.url_to_fs(p.uri)
fs.makedirs(path, exist_ok=True)
df.native.to_csv(
pfs.path.combine(p.uri, "*.csv"), **{"index": False, "header": False, **kwargs}
)
Expand Down Expand Up @@ -145,6 +148,8 @@ def _load_csv( # noqa: C901


def _save_json(df: DaskDataFrame, p: FileParser, **kwargs: Any) -> None:
fs, path = fsspec.core.url_to_fs(p.uri)
fs.makedirs(path, exist_ok=True)
df.native.to_json(pfs.path.combine(p.uri, "*.json"), **kwargs)


Expand Down
39 changes: 32 additions & 7 deletions fugue_spark/_utils/convert.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import pickle
from typing import Any, Iterable, List, Tuple
from typing import Any, Iterable, List, Tuple, Optional


import pandas as pd
import pyarrow as pa
Expand All @@ -16,7 +17,7 @@
)
from triad.collections import Schema
from triad.utils.assertion import assert_arg_not_none, assert_or_throw
from triad.utils.pyarrow import TRIAD_DEFAULT_TIMESTAMP
from triad.utils.pyarrow import TRIAD_DEFAULT_TIMESTAMP, cast_pa_table
from triad.utils.schema import quote_name

import fugue.api as fa
Expand All @@ -41,7 +42,7 @@ def pandas_udf_can_accept(schema: Schema, is_input: bool) -> bool:
return False
to_arrow_schema(from_arrow_schema(schema.pa_schema))
return True
except Exception:
except Exception: # pragma: no cover
return False


Expand Down Expand Up @@ -132,7 +133,7 @@ def to_type_safe_input(rows: Iterable[ps.Row], schema: Schema) -> Iterable[List[
if r[i] is not None:
r[i] = r[i].asDict(recursive=True)
yield r
else:
else: # pragma: no cover
for row in rows:
data = row.asDict(recursive=True)
r = [data[n] for n in schema.names]
Expand Down Expand Up @@ -173,14 +174,14 @@ def pd_to_spark_df(


def to_pandas(df: ps.DataFrame) -> pd.DataFrame:
if pd.__version__ < "2" or not any(
if version.parse(pd.__version__) < version.parse("2.0.0") or not any(
isinstance(x.dataType, (pt.TimestampType, TimestampNTZType))
for x in df.schema.fields
):
return df.toPandas()
else:
else: # pragma: no cover

def serialize(dfs): # pragma: no cover
def serialize(dfs):
for df in dfs:
data = pickle.dumps(df)
yield pd.DataFrame([[data]], columns=["data"])
Expand All @@ -189,6 +190,30 @@ def serialize(dfs): # pragma: no cover
return pd.concat(pickle.loads(x.data) for x in sdf.collect())


def to_arrow(df: ps.DataFrame) -> pa.Table:
schema = to_schema(df.schema)
destruct: Optional[bool] = None
try:
jconf = df.sparkSession._jconf
if jconf.arrowPySparkEnabled() and pandas_udf_can_accept(
schema, is_input=False
):
destruct = jconf.arrowPySparkSelfDestructEnabled()
except Exception: # pragma: no cover
# older spark does not have this config
pass
if destruct is not None and hasattr(df, "_collect_as_arrow"):
batches = df._collect_as_arrow(split_batches=destruct)
if len(batches) == 0:
return schema.create_empty_arrow_table()
table = pa.Table.from_batches(batches)
del batches
return cast_pa_table(table, schema.pa_schema)
else: # pragma: no cover
# df.toPandas has bugs on nested types
return pa.Table.from_pylist(df.collect(), schema=schema.pa_schema)


# TODO: the following function always set nullable to true,
# but should we use field.nullable?
def _to_arrow_type(dt: pt.DataType) -> pa.DataType:
Expand Down
4 changes: 3 additions & 1 deletion fugue_spark/_utils/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from fugue._utils.io import FileParser, save_df
from fugue.collections.partition import PartitionSpec
from fugue.dataframe import DataFrame
from fugue.dataframe import DataFrame, PandasDataFrame
from fugue_spark.dataframe import SparkDataFrame

from .convert import to_schema, to_spark_schema
Expand Down Expand Up @@ -62,6 +62,8 @@ def save_df(
writer.save(uri)
else:
ldf = df.as_local()
if isinstance(ldf, PandasDataFrame) and hasattr(ldf.native, "attrs"):
ldf.native.attrs = {} # pragma: no cover
save_df(ldf, uri, format_hint=format_hint, mode=mode, fs=self._fs, **kwargs)

def _get_writer(
Expand Down
Loading
Loading