Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add as_dicts to Fugue API #522

Merged
merged 6 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/test_ray.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ concurrency:

jobs:
test_ray_lower_bound:
name: Ray 2.1.0
name: Ray 2.4.0
runs-on: ubuntu-latest

steps:
Expand All @@ -33,7 +33,7 @@ jobs:
- name: Install dependencies
run: make devenv
- name: Setup Ray
run: pip install ray[data]==2.1.0 pyarrow==6.0.1 pandas==1.5.3 'pydantic<2'
run: pip install ray[data]==2.4.0 pyarrow==6.0.1 pandas==1.5.3 'pydantic<2'
- name: Test
run: make testray

Expand Down
1 change: 1 addition & 0 deletions fugue/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
as_array_iterable,
as_arrow,
as_dict_iterable,
as_dicts,
as_fugue_df,
as_pandas,
drop_columns,
Expand Down
21 changes: 19 additions & 2 deletions fugue/dataframe/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,32 @@ def as_array_iterable(
return as_fugue_df(df).as_array_iterable(columns=columns, type_safe=type_safe)


@fugue_plugin
def as_dicts(
df: AnyDataFrame, columns: Optional[List[str]] = None
) -> List[Dict[str, Any]]:
"""Convert any dataframe to a list of python dicts

:param df: the object that can be recognized as a dataframe by Fugue
:param columns: columns to extract, defaults to None
:return: a list of python dicts

.. note::

The default implementation enforces ``type_safe`` True
"""
return as_fugue_df(df).as_dicts(columns=columns)


@fugue_plugin
def as_dict_iterable(
df: AnyDataFrame, columns: Optional[List[str]] = None
) -> Iterable[Dict[str, Any]]:
"""Convert any dataframe to iterable of native python dicts
"""Convert any dataframe to iterable of python dicts

:param df: the object that can be recognized as a dataframe by Fugue
:param columns: columns to extract, defaults to None
:return: iterable of native python dicts
:return: iterable of python dicts

.. note::

Expand Down
59 changes: 48 additions & 11 deletions fugue/dataframe/arrow_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@

from .api import (
alter_columns,
as_array,
as_array_iterable,
as_dict_iterable,
as_dicts,
as_pandas,
drop_columns,
get_column_names,
Expand All @@ -30,6 +34,12 @@
select_columns,
)
from .dataframe import DataFrame, LocalBoundedDataFrame, _input_schema
from .utils import (
pa_table_as_array,
pa_table_as_array_iterable,
pa_table_as_dict_iterable,
pa_table_as_dicts,
)


class ArrowDataFrame(LocalBoundedDataFrame):
Expand Down Expand Up @@ -174,21 +184,20 @@ def as_arrow(self, type_safe: bool = False) -> pa.Table:
def as_array(
self, columns: Optional[List[str]] = None, type_safe: bool = False
) -> List[Any]:
return list(self.as_array_iterable(columns, type_safe=type_safe))
return pa_table_as_array(self.native, columns=columns)

def as_dicts(self, columns: Optional[List[str]] = None) -> List[Dict[str, Any]]:
return pa_table_as_dicts(self.native, columns=columns)

def as_array_iterable(
self, columns: Optional[List[str]] = None, type_safe: bool = False
) -> Iterable[Any]:
if self.empty:
return
if columns is not None:
for x in self[columns].as_array_iterable(type_safe=type_safe):
yield x
else:
d = self.native.to_pydict()
cols = [d[n] for n in self.columns]
for arr in zip(*cols):
yield list(arr)
yield from pa_table_as_array_iterable(self.native, columns=columns)

def as_dict_iterable(
self, columns: Optional[List[str]] = None
) -> Iterable[Dict[str, Any]]:
yield from pa_table_as_dict_iterable(self.native, columns=columns)


@as_local.candidate(lambda df: isinstance(df, pa.Table))
Expand All @@ -212,6 +221,34 @@ def _pa_table_as_pandas(df: pa.Table) -> pd.DataFrame:
)


@as_array.candidate(lambda df, *args, **kwargs: isinstance(df, pa.Table))
def _pa_table_as_array(
df: pa.Table, columns: Optional[List[str]] = None, type_safe: bool = False
) -> List[Any]:
return pa_table_as_array(df, columns=columns)


@as_array_iterable.candidate(lambda df, *args, **kwargs: isinstance(df, pa.Table))
def _pa_table_as_array_iterable(
df: pa.Table, columns: Optional[List[str]] = None, type_safe: bool = False
) -> Iterable[Any]:
yield from pa_table_as_array_iterable(df, columns=columns)


@as_dicts.candidate(lambda df, *args, **kwargs: isinstance(df, pa.Table))
def _pa_table_as_dicts(
df: pa.Table, columns: Optional[List[str]] = None
) -> List[Dict[str, Any]]:
return pa_table_as_dicts(df, columns=columns)


@as_dict_iterable.candidate(lambda df, *args, **kwargs: isinstance(df, pa.Table))
def _pa_table_as_dict_iterable(
df: pa.Table, columns: Optional[List[str]] = None
) -> Iterable[Dict[str, Any]]:
yield from pa_table_as_dict_iterable(df, columns=columns)


@alter_columns.candidate(lambda df, *args, **kwargs: isinstance(df, pa.Table))
def _pa_table_alter_columns(
df: pa.Table, columns: Any, as_fugue: bool = False
Expand Down
22 changes: 20 additions & 2 deletions fugue/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,13 +237,31 @@ def head(
"""
raise NotImplementedError

def as_dicts(self, columns: Optional[List[str]] = None) -> List[Dict[str, Any]]:
"""Convert to a list of python dicts

:param columns: columns to extract, defaults to None
:return: a list of python dicts

.. note::

The default implementation enforces ``type_safe`` True
"""
if columns is None:
columns = self.columns
idx = range(len(columns))
return [
{columns[i]: x[i] for i in idx}
for x in self.as_array(columns, type_safe=True)
]

def as_dict_iterable(
self, columns: Optional[List[str]] = None
) -> Iterable[Dict[str, Any]]:
"""Convert to iterable of native python dicts
"""Convert to iterable of python dicts

:param columns: columns to extract, defaults to None
:return: iterable of native python dicts
:return: iterable of python dicts

.. note::

Expand Down
2 changes: 1 addition & 1 deletion fugue/dataframe/function_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ def count(self, df: EmptyAwareIterable[List[Any]]) -> int:
class _ListDictParam(_LocalNoSchemaDataFrameParam):
@no_type_check
def to_input_data(self, df: DataFrame, ctx: Any) -> List[Dict[str, Any]]:
return list(df.as_local().as_dict_iterable())
return df.as_local().as_dicts()

@no_type_check
def to_output_df(
Expand Down
3 changes: 3 additions & 0 deletions fugue/dataframe/iterable_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ def as_array(
) -> List[Any]:
return list(self.as_array_iterable(columns, type_safe=type_safe))

def as_dicts(self, columns: Optional[List[str]] = None) -> List[Dict[str, Any]]:
return list(self.as_dict_iterable(columns))

def as_array_iterable(
self, columns: Optional[List[str]] = None, type_safe: bool = False
) -> Iterable[Any]:
Expand Down
73 changes: 73 additions & 0 deletions fugue/dataframe/pandas_dataframe.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from typing import Any, Dict, Iterable, List, Optional, Tuple

import pandas as pd
import pyarrow as pa
from triad import assert_or_throw
from triad.collections.schema import Schema
from triad.utils.pandas_like import PD_UTILS
from triad.utils.pyarrow import pa_batch_to_dicts

from fugue.dataset.api import (
as_fugue_dataset,
Expand All @@ -17,6 +20,10 @@
from fugue.exceptions import FugueDataFrameOperationError

from .api import (
as_array,
as_array_iterable,
as_dict_iterable,
as_dicts,
drop_columns,
get_column_names,
get_schema,
Expand Down Expand Up @@ -134,6 +141,9 @@ def alter_columns(self, columns: Any) -> DataFrame:
return self
return PandasDataFrame(self.native, new_schema)

def as_arrow(self, type_safe: bool = False) -> pa.Table:
return PD_UTILS.as_arrow(self.native, schema=self.schema.pa_schema)

def as_array(
self, columns: Optional[List[str]] = None, type_safe: bool = False
) -> List[Any]:
Expand All @@ -150,6 +160,18 @@ def as_array_iterable(
):
yield row

def as_dicts(self, columns: Optional[List[str]] = None) -> List[Dict[str, Any]]:
res: List[Dict[str, Any]] = []
for block in _to_dicts(self.native, columns, self.schema):
res += block
return res

def as_dict_iterable(
self, columns: Optional[List[str]] = None
) -> Iterable[Dict[str, Any]]:
for block in _to_dicts(self.native, columns, self.schema):
yield from block

def head(
self, n: int, columns: Optional[List[str]] = None
) -> LocalBoundedDataFrame:
Expand Down Expand Up @@ -272,6 +294,43 @@ def _pd_head(
return _adjust_df(df.head(n), as_fugue=as_fugue)


@as_array.candidate(lambda df, *args, **kwargs: isinstance(df, pd.DataFrame))
def _pd_as_array(
df: pd.DataFrame, columns: Optional[List[str]] = None, type_safe: bool = False
) -> List[Any]:
return list(_pd_as_array_iterable(df, columns, type_safe=type_safe))


@as_array_iterable.candidate(lambda df, *args, **kwargs: isinstance(df, pd.DataFrame))
def _pd_as_array_iterable(
df: pd.DataFrame, columns: Optional[List[str]] = None, type_safe: bool = False
) -> Iterable[Any]:
for row in PD_UTILS.as_array_iterable(
df,
columns=columns,
type_safe=type_safe,
):
yield row


@as_dicts.candidate(lambda df, *args, **kwargs: isinstance(df, pd.DataFrame))
def _pd_as_dicts(
df: pd.DataFrame, columns: Optional[List[str]] = None
) -> List[Dict[str, Any]]:
res: List[Dict[str, Any]] = []
for block in _to_dicts(df, columns):
res += block
return res


@as_dict_iterable.candidate(lambda df, *args, **kwargs: isinstance(df, pd.DataFrame))
def _pd_as_dict_iterable(
df: pa.Table, columns: Optional[List[str]] = None
) -> Iterable[Dict[str, Any]]:
for block in _to_dicts(df, columns):
yield from block


def _adjust_df(res: pd.DataFrame, as_fugue: bool):
return res if not as_fugue else PandasDataFrame(res)

Expand All @@ -280,3 +339,17 @@ def _assert_no_missing(df: pd.DataFrame, columns: Iterable[Any]) -> None:
missing = [x for x in columns if x not in df.columns]
if len(missing) > 0:
raise FugueDataFrameOperationError("found nonexistent columns: {missing}")


def _to_dicts(
df: pd.DataFrame,
columns: Optional[List[str]] = None,
schema: Optional[Schema] = None,
) -> Iterable[List[Dict[str, Any]]]:
cols = list(df.columns) if columns is None else columns
assert_or_throw(len(cols) > 0, ValueError("columns cannot be empty"))
pa_schema = schema.extract(cols).pa_schema if schema is not None else None
adf = PD_UTILS.as_arrow(df[cols], schema=pa_schema)
for batch in adf.to_batches():
if batch.num_rows > 0:
yield pa_batch_to_dicts(batch)
Loading
Loading