Skip to content

Commit

Permalink
Switch basic dataframe operations to qpd backend & Add set operations (
Browse files Browse the repository at this point in the history
…#64)

* switch join backend to qpd

* update

* fix pandas and dask, fail spark

* update

* update

* update

* add programming interface

* update
  • Loading branch information
goodwanghan committed Oct 4, 2020
1 parent 4c6f074 commit 151e653
Show file tree
Hide file tree
Showing 13 changed files with 709 additions and 180 deletions.
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,20 @@ If you installed the requirements manually, install the git hook scripts with:
```
pre-commit install
```


## Update History

### 0.4.1
* Added set operations to programming interface: `union`, `subtract`, `intersect`
* Added `distinct` to programming interface
* Ensured partitioning follows SQL convention: groups with null keys are NOT removed
* Switched `join`, `union`, `subtract`, `intersect`, `distinct` to QPD implementations, so they follow SQL convention
* Set operations in Fugue SQL can directly operate on Fugue statemens (e.g. `TRANSFORM USING t1 UNION TRANSFORM USING t2`)
* Fixed bugs
* Added onboarding document for contributors

### <=0.4.0

* Main features of Fugue core and Fugue SQL
* Support backends: Pandas, Spark and Dask
94 changes: 93 additions & 1 deletion fugue/execution/execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ def join(
``inner``, ``left_outer``, ``right_outer``, ``full_outer``, ``cross``
:param on: it can always be inferred, but if you provide, it will be
validated against the inferred keys.
:param metadata: dict-like object to add to the joined dataframe,
:param metadata: dict-like object to add to the result dataframe,
defaults to None
:return: the joined dataframe
Expand All @@ -234,6 +234,98 @@ def join(
"""
raise NotImplementedError

@abstractmethod
def union(
self,
df1: DataFrame,
df2: DataFrame,
distinct: bool = True,
metadata: Any = None,
) -> DataFrame: # pragma: no cover
"""Join two dataframes
:param df1: the first dataframe
:param df2: the second dataframe
:param distinct: ``true`` for ``UNION`` (== ``UNION DISTINCT``),
``false`` for ``UNION ALL``
:param metadata: dict-like object to add to the result dataframe,
defaults to None
:return: the unioned dataframe
:Notice:
Currently, the schema of ``df1`` and ``df2`` must be identical, or
an exception will be thrown.
"""
raise NotImplementedError

@abstractmethod
def subtract(
self,
df1: DataFrame,
df2: DataFrame,
distinct: bool = True,
metadata: Any = None,
) -> DataFrame: # pragma: no cover
"""``df1 - df2``
:param df1: the first dataframe
:param df2: the second dataframe
:param distinct: ``true`` for ``EXCEPT`` (== ``EXCEPT DISTINCT``),
``false`` for ``EXCEPT ALL``
:param metadata: dict-like object to add to the result dataframe,
defaults to None
:return: the unioned dataframe
:Notice:
Currently, the schema of ``df1`` and ``df2`` must be identical, or
an exception will be thrown.
"""
raise NotImplementedError

@abstractmethod
def intersect(
self,
df1: DataFrame,
df2: DataFrame,
distinct: bool = True,
metadata: Any = None,
) -> DataFrame: # pragma: no cover
"""Intersect ``df1`` and ``df2``
:param df1: the first dataframe
:param df2: the second dataframe
:param distinct: ``true`` for ``INTERSECT`` (== ``INTERSECT DISTINCT``),
``false`` for ``INTERSECT ALL``
:param metadata: dict-like object to add to the result dataframe,
defaults to None
:return: the unioned dataframe
:Notice:
Currently, the schema of ``df1`` and ``df2`` must be identical, or
an exception will be thrown.
"""
raise NotImplementedError

@abstractmethod
def distinct(
self,
df: DataFrame,
metadata: Any = None,
) -> DataFrame: # pragma: no cover
"""Equivalent to ``SELECT DISTINCT * FROM df``
:param df: dataframe
:param metadata: dict-like object to add to the result dataframe,
defaults to None
:type metadata: Any, optional
:return: [description]
:rtype: DataFrame
"""
pass

def zip(
self,
df1: DataFrame,
Expand Down
143 changes: 67 additions & 76 deletions fugue/execution/native_execution_engine.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import logging
from typing import Any, Callable, Iterable, List, Optional, Union
from typing import Any, Callable, List, Optional, Union

import pandas as pd
import pyarrow as pa
from fugue._utils.io import load_df, save_df
from fugue.collections.partition import (
EMPTY_PARTITION_SPEC,
PartitionCursor,
Expand All @@ -22,13 +22,12 @@
ExecutionEngine,
SQLEngine,
)
from fugue._utils.io import load_df, save_df
from qpd_pandas.engine import PandasUtils
from sqlalchemy import create_engine
from triad.collections import Schema
from triad.collections.dict import ParamDict
from triad.collections.fs import FileSystem
from triad.utils.assertion import assert_or_throw
from triad.utils.pandas_like import PD_UTILS


class SqliteEngine(SQLEngine):
Expand All @@ -37,8 +36,8 @@ class SqliteEngine(SQLEngine):
:param execution_engine: the execution engine this sql engine will run on
"""

def __init__(self, execution_engine: ExecutionEngine) -> None:
return super().__init__(execution_engine)
def __init__(self, execution_engine: ExecutionEngine):
super().__init__(execution_engine)

def select(self, dfs: DataFrames, statement: str) -> DataFrame:
sql_engine = create_engine("sqlite:///:memory:")
Expand Down Expand Up @@ -78,6 +77,11 @@ def fs(self) -> FileSystem:
def default_sql_engine(self) -> SQLEngine:
return self._default_sql_engine

@property
def pl_utils(self) -> PandasUtils:
"""Pandas-like dataframe utils"""
return PandasUtils()

def stop(self) -> None: # pragma: no cover
return

Expand Down Expand Up @@ -134,7 +138,7 @@ def _map(pdf: pd.DataFrame) -> pd.DataFrame:
output_df = map_func(cursor, input_df)
return output_df.as_pandas()

result = PD_UTILS.safe_groupby_apply(
result = self.pl_utils.safe_groupby_apply(
df.as_pandas(), partition_spec.partition_by, _map
)
return PandasDataFrame(result, output_schema, metadata)
Expand All @@ -154,59 +158,64 @@ def join(
metadata: Any = None,
) -> DataFrame:
key_schema, output_schema = get_join_schemas(df1, df2, how=how, on=on)
how = how.lower().replace("_", "").replace(" ", "")
if how == "cross":
d1 = df1.as_pandas()
d2 = df2.as_pandas()
d1["__cross_join_index__"] = 1
d2["__cross_join_index__"] = 1
d = d1.merge(d2, on=("__cross_join_index__")).drop(
"__cross_join_index__", axis=1
)
return PandasDataFrame(d.reset_index(drop=True), output_schema, metadata)
if how in ["semi", "leftsemi"]:
d1 = df1.as_pandas()
d2 = df2.as_pandas()[key_schema.names]
d = d1.merge(d2, on=key_schema.names, how="inner")
return PandasDataFrame(d.reset_index(drop=True), output_schema, metadata)
if how in ["anti", "leftanti"]:
d1 = df1.as_pandas()
d2 = df2.as_pandas()[key_schema.names]
d2["__anti_join_dummy__"] = 1.0
d = d1.merge(d2, on=key_schema.names, how="left")
d = d[d.iloc[:, -1].isnull()]
return PandasDataFrame(
d.drop(["__anti_join_dummy__"], axis=1).reset_index(drop=True),
output_schema,
metadata,
)
fix_left, fix_right = False, False
if how in ["leftouter"]:
how = "left"
self._validate_outer_joinable(df2.schema, key_schema)
fix_right = True
if how in ["rightouter"]:
how = "right"
self._validate_outer_joinable(df1.schema, key_schema)
fix_left = True
if how in ["fullouter"]:
how = "outer"
self._validate_outer_joinable(df1.schema, key_schema)
self._validate_outer_joinable(df2.schema, key_schema)
fix_left, fix_right = True, True
d1 = df1.as_pandas()
d2 = df2.as_pandas()
d = d1.merge(d2, on=key_schema.names, how=how)
if fix_left:
d = self._fix_nan(
d, output_schema, df1.schema.exclude(list(df2.schema.keys())).keys()
)
if fix_right:
d = self._fix_nan(
d, output_schema, df2.schema.exclude(list(df1.schema.keys())).keys()
)
d = self.pl_utils.join(
df1.as_pandas(), df2.as_pandas(), join_type=how, on=key_schema.names
)
return PandasDataFrame(d.reset_index(drop=True), output_schema, metadata)

def union(
self,
df1: DataFrame,
df2: DataFrame,
distinct: bool = True,
metadata: Any = None,
) -> DataFrame:
assert_or_throw(
df1.schema == df2.schema, ValueError(f"{df1.schema} != {df2.schema}")
)
d = self.pl_utils.union(df1.as_pandas(), df2.as_pandas(), unique=distinct)
return PandasDataFrame(d.reset_index(drop=True), df1.schema, metadata)

def subtract(
self,
df1: DataFrame,
df2: DataFrame,
distinct: bool = True,
metadata: Any = None,
) -> DataFrame:
assert_or_throw(
distinct, NotImplementedError("EXCEPT ALL for NativeExecutionEngine")
)
assert_or_throw(
df1.schema == df2.schema, ValueError(f"{df1.schema} != {df2.schema}")
)
d = self.pl_utils.except_df(df1.as_pandas(), df2.as_pandas(), unique=distinct)
return PandasDataFrame(d.reset_index(drop=True), df1.schema, metadata)

def intersect(
self,
df1: DataFrame,
df2: DataFrame,
distinct: bool = True,
metadata: Any = None,
) -> DataFrame:
assert_or_throw(
distinct, NotImplementedError("INTERSECT ALL for NativeExecutionEngine")
)
assert_or_throw(
df1.schema == df2.schema, ValueError(f"{df1.schema} != {df2.schema}")
)
d = self.pl_utils.intersect(df1.as_pandas(), df2.as_pandas(), unique=distinct)
return PandasDataFrame(d.reset_index(drop=True), df1.schema, metadata)

def distinct(
self,
df: DataFrame,
metadata: Any = None,
) -> DataFrame:
d = self.pl_utils.drop_duplicates(df.as_pandas())
return PandasDataFrame(d.reset_index(drop=True), df.schema, metadata)

def load_df(
self,
path: Union[str, List[str]],
Expand Down Expand Up @@ -236,21 +245,3 @@ def save_df(
)
df = self.to_df(df)
save_df(df, path, format_hint=format_hint, mode=mode, fs=self.fs, **kwargs)

def _validate_outer_joinable(self, schema: Schema, key_schema: Schema) -> None:
# TODO: this is to prevent wrong behavior of pandas, we may not need it
# s = schema - key_schema
# if any(pa.types.is_boolean(v) or pa.types.is_integer(v) for v in s.types):
# raise NotImplementedError(
# f"{schema} excluding {key_schema} is not outer joinable"
# )
return

def _fix_nan(
self, df: pd.DataFrame, schema: Schema, keys: Iterable[str]
) -> pd.DataFrame:
for key in keys:
if pa.types.is_floating(schema[key].type):
continue
df[key] = df[key].where(pd.notna(df[key]), None)
return df
10 changes: 6 additions & 4 deletions fugue/extensions/_builtins/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
# flake8: noqa
from fugue.extensions._builtins.outputters import Show, AssertEqual, Save
from fugue.extensions._builtins.creators import CreateData, Load
from fugue.extensions._builtins.outputters import AssertEqual, Save, Show
from fugue.extensions._builtins.processors import (
DropColumns,
Rename,
Distinct,
RunJoin,
RunTransformer,
RunSetOperation,
RunSQLSelect,
Rename,
DropColumns,
RunTransformer,
SelectColumns,
Zip,
)
25 changes: 24 additions & 1 deletion fugue/extensions/_builtins/processors.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List, no_type_check
from typing import List, no_type_check, Any

from fugue.collections.partition import PartitionCursor
from fugue.dataframe import (
Expand Down Expand Up @@ -83,6 +83,29 @@ def process(self, dfs: DataFrames) -> DataFrame:
return df


class RunSetOperation(Processor):
def process(self, dfs: DataFrames) -> DataFrame:
if len(dfs) == 1:
return dfs[0]
how = self.params.get_or_throw("how", str)
func: Any = {
"union": self.execution_engine.union,
"subtract": self.execution_engine.subtract,
"intersect": self.execution_engine.intersect,
}[how]
distinct = self.params.get("distinct", True)
df = dfs[0]
for i in range(1, len(dfs)):
df = func(df, dfs[i], distinct=distinct)
return df


class Distinct(Processor):
def process(self, dfs: DataFrames) -> DataFrame:
assert_or_throw(len(dfs) == 1, FugueWorkflowError("not single input"))
return self.execution_engine.distinct(dfs[0])


class RunSQLSelect(Processor):
def process(self, dfs: DataFrames) -> DataFrame:
statement = self.params.get_or_throw("statement", str)
Expand Down
Loading

0 comments on commit 151e653

Please sign in to comment.