From 151e653638163aa01848c08b3a05fd56a4f3fb51 Mon Sep 17 00:00:00 2001 From: Han Wang Date: Sat, 3 Oct 2020 23:07:33 -0700 Subject: [PATCH] Switch basic dataframe operations to qpd backend & Add set operations (#64) * switch join backend to qpd * update * fix pandas and dask, fail spark * update * update * update * add programming interface * update --- README.md | 17 +++ fugue/execution/execution_engine.py | 94 +++++++++++- fugue/execution/native_execution_engine.py | 143 +++++++++--------- fugue/extensions/_builtins/__init__.py | 10 +- fugue/extensions/_builtins/processors.py | 25 +++- fugue/workflow/workflow.py | 124 +++++++++++++++- fugue_dask/_utils.py | 4 +- fugue_dask/execution_engine.py | 165 ++++++++++----------- fugue_spark/execution_engine.py | 61 ++++++++ fugue_test/builtin_suite.py | 107 +++++++++++++ fugue_test/execution_suite.py | 135 +++++++++++++++-- fugue_version/__init__.py | 2 +- setup.py | 2 +- 13 files changed, 709 insertions(+), 180 deletions(-) diff --git a/README.md b/README.md index c897621f..84064727 100644 --- a/README.md +++ b/README.md @@ -117,3 +117,20 @@ If you installed the requirements manually, install the git hook scripts with: ``` pre-commit install ``` + + +## Update History + +### 0.4.1 +* Added set operations to programming interface: `union`, `subtract`, `intersect` +* Added `distinct` to programming interface +* Ensured partitioning follows SQL convention: groups with null keys are NOT removed +* Switched `join`, `union`, `subtract`, `intersect`, `distinct` to QPD implementations, so they follow SQL convention +* Set operations in Fugue SQL can directly operate on Fugue statemens (e.g. `TRANSFORM USING t1 UNION TRANSFORM USING t2`) +* Fixed bugs +* Added onboarding document for contributors + +### <=0.4.0 + +* Main features of Fugue core and Fugue SQL +* Support backends: Pandas, Spark and Dask diff --git a/fugue/execution/execution_engine.py b/fugue/execution/execution_engine.py index e3bfbfaa..4233f1d5 100644 --- a/fugue/execution/execution_engine.py +++ b/fugue/execution/execution_engine.py @@ -224,7 +224,7 @@ def join( ``inner``, ``left_outer``, ``right_outer``, ``full_outer``, ``cross`` :param on: it can always be inferred, but if you provide, it will be validated against the inferred keys. - :param metadata: dict-like object to add to the joined dataframe, + :param metadata: dict-like object to add to the result dataframe, defaults to None :return: the joined dataframe @@ -234,6 +234,98 @@ def join( """ raise NotImplementedError + @abstractmethod + def union( + self, + df1: DataFrame, + df2: DataFrame, + distinct: bool = True, + metadata: Any = None, + ) -> DataFrame: # pragma: no cover + """Join two dataframes + + :param df1: the first dataframe + :param df2: the second dataframe + :param distinct: ``true`` for ``UNION`` (== ``UNION DISTINCT``), + ``false`` for ``UNION ALL`` + :param metadata: dict-like object to add to the result dataframe, + defaults to None + :return: the unioned dataframe + + :Notice: + + Currently, the schema of ``df1`` and ``df2`` must be identical, or + an exception will be thrown. + """ + raise NotImplementedError + + @abstractmethod + def subtract( + self, + df1: DataFrame, + df2: DataFrame, + distinct: bool = True, + metadata: Any = None, + ) -> DataFrame: # pragma: no cover + """``df1 - df2`` + + :param df1: the first dataframe + :param df2: the second dataframe + :param distinct: ``true`` for ``EXCEPT`` (== ``EXCEPT DISTINCT``), + ``false`` for ``EXCEPT ALL`` + :param metadata: dict-like object to add to the result dataframe, + defaults to None + :return: the unioned dataframe + + :Notice: + + Currently, the schema of ``df1`` and ``df2`` must be identical, or + an exception will be thrown. + """ + raise NotImplementedError + + @abstractmethod + def intersect( + self, + df1: DataFrame, + df2: DataFrame, + distinct: bool = True, + metadata: Any = None, + ) -> DataFrame: # pragma: no cover + """Intersect ``df1`` and ``df2`` + + :param df1: the first dataframe + :param df2: the second dataframe + :param distinct: ``true`` for ``INTERSECT`` (== ``INTERSECT DISTINCT``), + ``false`` for ``INTERSECT ALL`` + :param metadata: dict-like object to add to the result dataframe, + defaults to None + :return: the unioned dataframe + + :Notice: + + Currently, the schema of ``df1`` and ``df2`` must be identical, or + an exception will be thrown. + """ + raise NotImplementedError + + @abstractmethod + def distinct( + self, + df: DataFrame, + metadata: Any = None, + ) -> DataFrame: # pragma: no cover + """Equivalent to ``SELECT DISTINCT * FROM df`` + + :param df: dataframe + :param metadata: dict-like object to add to the result dataframe, + defaults to None + :type metadata: Any, optional + :return: [description] + :rtype: DataFrame + """ + pass + def zip( self, df1: DataFrame, diff --git a/fugue/execution/native_execution_engine.py b/fugue/execution/native_execution_engine.py index f2a8689c..4e59573e 100644 --- a/fugue/execution/native_execution_engine.py +++ b/fugue/execution/native_execution_engine.py @@ -1,8 +1,8 @@ import logging -from typing import Any, Callable, Iterable, List, Optional, Union +from typing import Any, Callable, List, Optional, Union import pandas as pd -import pyarrow as pa +from fugue._utils.io import load_df, save_df from fugue.collections.partition import ( EMPTY_PARTITION_SPEC, PartitionCursor, @@ -22,13 +22,12 @@ ExecutionEngine, SQLEngine, ) -from fugue._utils.io import load_df, save_df +from qpd_pandas.engine import PandasUtils from sqlalchemy import create_engine from triad.collections import Schema from triad.collections.dict import ParamDict from triad.collections.fs import FileSystem from triad.utils.assertion import assert_or_throw -from triad.utils.pandas_like import PD_UTILS class SqliteEngine(SQLEngine): @@ -37,8 +36,8 @@ class SqliteEngine(SQLEngine): :param execution_engine: the execution engine this sql engine will run on """ - def __init__(self, execution_engine: ExecutionEngine) -> None: - return super().__init__(execution_engine) + def __init__(self, execution_engine: ExecutionEngine): + super().__init__(execution_engine) def select(self, dfs: DataFrames, statement: str) -> DataFrame: sql_engine = create_engine("sqlite:///:memory:") @@ -78,6 +77,11 @@ def fs(self) -> FileSystem: def default_sql_engine(self) -> SQLEngine: return self._default_sql_engine + @property + def pl_utils(self) -> PandasUtils: + """Pandas-like dataframe utils""" + return PandasUtils() + def stop(self) -> None: # pragma: no cover return @@ -134,7 +138,7 @@ def _map(pdf: pd.DataFrame) -> pd.DataFrame: output_df = map_func(cursor, input_df) return output_df.as_pandas() - result = PD_UTILS.safe_groupby_apply( + result = self.pl_utils.safe_groupby_apply( df.as_pandas(), partition_spec.partition_by, _map ) return PandasDataFrame(result, output_schema, metadata) @@ -154,59 +158,64 @@ def join( metadata: Any = None, ) -> DataFrame: key_schema, output_schema = get_join_schemas(df1, df2, how=how, on=on) - how = how.lower().replace("_", "").replace(" ", "") - if how == "cross": - d1 = df1.as_pandas() - d2 = df2.as_pandas() - d1["__cross_join_index__"] = 1 - d2["__cross_join_index__"] = 1 - d = d1.merge(d2, on=("__cross_join_index__")).drop( - "__cross_join_index__", axis=1 - ) - return PandasDataFrame(d.reset_index(drop=True), output_schema, metadata) - if how in ["semi", "leftsemi"]: - d1 = df1.as_pandas() - d2 = df2.as_pandas()[key_schema.names] - d = d1.merge(d2, on=key_schema.names, how="inner") - return PandasDataFrame(d.reset_index(drop=True), output_schema, metadata) - if how in ["anti", "leftanti"]: - d1 = df1.as_pandas() - d2 = df2.as_pandas()[key_schema.names] - d2["__anti_join_dummy__"] = 1.0 - d = d1.merge(d2, on=key_schema.names, how="left") - d = d[d.iloc[:, -1].isnull()] - return PandasDataFrame( - d.drop(["__anti_join_dummy__"], axis=1).reset_index(drop=True), - output_schema, - metadata, - ) - fix_left, fix_right = False, False - if how in ["leftouter"]: - how = "left" - self._validate_outer_joinable(df2.schema, key_schema) - fix_right = True - if how in ["rightouter"]: - how = "right" - self._validate_outer_joinable(df1.schema, key_schema) - fix_left = True - if how in ["fullouter"]: - how = "outer" - self._validate_outer_joinable(df1.schema, key_schema) - self._validate_outer_joinable(df2.schema, key_schema) - fix_left, fix_right = True, True - d1 = df1.as_pandas() - d2 = df2.as_pandas() - d = d1.merge(d2, on=key_schema.names, how=how) - if fix_left: - d = self._fix_nan( - d, output_schema, df1.schema.exclude(list(df2.schema.keys())).keys() - ) - if fix_right: - d = self._fix_nan( - d, output_schema, df2.schema.exclude(list(df1.schema.keys())).keys() - ) + d = self.pl_utils.join( + df1.as_pandas(), df2.as_pandas(), join_type=how, on=key_schema.names + ) return PandasDataFrame(d.reset_index(drop=True), output_schema, metadata) + def union( + self, + df1: DataFrame, + df2: DataFrame, + distinct: bool = True, + metadata: Any = None, + ) -> DataFrame: + assert_or_throw( + df1.schema == df2.schema, ValueError(f"{df1.schema} != {df2.schema}") + ) + d = self.pl_utils.union(df1.as_pandas(), df2.as_pandas(), unique=distinct) + return PandasDataFrame(d.reset_index(drop=True), df1.schema, metadata) + + def subtract( + self, + df1: DataFrame, + df2: DataFrame, + distinct: bool = True, + metadata: Any = None, + ) -> DataFrame: + assert_or_throw( + distinct, NotImplementedError("EXCEPT ALL for NativeExecutionEngine") + ) + assert_or_throw( + df1.schema == df2.schema, ValueError(f"{df1.schema} != {df2.schema}") + ) + d = self.pl_utils.except_df(df1.as_pandas(), df2.as_pandas(), unique=distinct) + return PandasDataFrame(d.reset_index(drop=True), df1.schema, metadata) + + def intersect( + self, + df1: DataFrame, + df2: DataFrame, + distinct: bool = True, + metadata: Any = None, + ) -> DataFrame: + assert_or_throw( + distinct, NotImplementedError("INTERSECT ALL for NativeExecutionEngine") + ) + assert_or_throw( + df1.schema == df2.schema, ValueError(f"{df1.schema} != {df2.schema}") + ) + d = self.pl_utils.intersect(df1.as_pandas(), df2.as_pandas(), unique=distinct) + return PandasDataFrame(d.reset_index(drop=True), df1.schema, metadata) + + def distinct( + self, + df: DataFrame, + metadata: Any = None, + ) -> DataFrame: + d = self.pl_utils.drop_duplicates(df.as_pandas()) + return PandasDataFrame(d.reset_index(drop=True), df.schema, metadata) + def load_df( self, path: Union[str, List[str]], @@ -236,21 +245,3 @@ def save_df( ) df = self.to_df(df) save_df(df, path, format_hint=format_hint, mode=mode, fs=self.fs, **kwargs) - - def _validate_outer_joinable(self, schema: Schema, key_schema: Schema) -> None: - # TODO: this is to prevent wrong behavior of pandas, we may not need it - # s = schema - key_schema - # if any(pa.types.is_boolean(v) or pa.types.is_integer(v) for v in s.types): - # raise NotImplementedError( - # f"{schema} excluding {key_schema} is not outer joinable" - # ) - return - - def _fix_nan( - self, df: pd.DataFrame, schema: Schema, keys: Iterable[str] - ) -> pd.DataFrame: - for key in keys: - if pa.types.is_floating(schema[key].type): - continue - df[key] = df[key].where(pd.notna(df[key]), None) - return df diff --git a/fugue/extensions/_builtins/__init__.py b/fugue/extensions/_builtins/__init__.py index 7dd53956..431c9d83 100644 --- a/fugue/extensions/_builtins/__init__.py +++ b/fugue/extensions/_builtins/__init__.py @@ -1,12 +1,14 @@ # flake8: noqa -from fugue.extensions._builtins.outputters import Show, AssertEqual, Save from fugue.extensions._builtins.creators import CreateData, Load +from fugue.extensions._builtins.outputters import AssertEqual, Save, Show from fugue.extensions._builtins.processors import ( + DropColumns, + Rename, + Distinct, RunJoin, - RunTransformer, + RunSetOperation, RunSQLSelect, - Rename, - DropColumns, + RunTransformer, SelectColumns, Zip, ) diff --git a/fugue/extensions/_builtins/processors.py b/fugue/extensions/_builtins/processors.py index ac91307b..23755fbf 100644 --- a/fugue/extensions/_builtins/processors.py +++ b/fugue/extensions/_builtins/processors.py @@ -1,4 +1,4 @@ -from typing import List, no_type_check +from typing import List, no_type_check, Any from fugue.collections.partition import PartitionCursor from fugue.dataframe import ( @@ -83,6 +83,29 @@ def process(self, dfs: DataFrames) -> DataFrame: return df +class RunSetOperation(Processor): + def process(self, dfs: DataFrames) -> DataFrame: + if len(dfs) == 1: + return dfs[0] + how = self.params.get_or_throw("how", str) + func: Any = { + "union": self.execution_engine.union, + "subtract": self.execution_engine.subtract, + "intersect": self.execution_engine.intersect, + }[how] + distinct = self.params.get("distinct", True) + df = dfs[0] + for i in range(1, len(dfs)): + df = func(df, dfs[i], distinct=distinct) + return df + + +class Distinct(Processor): + def process(self, dfs: DataFrames) -> DataFrame: + assert_or_throw(len(dfs) == 1, FugueWorkflowError("not single input")) + return self.execution_engine.distinct(dfs[0]) + + class RunSQLSelect(Processor): def process(self, dfs: DataFrames) -> DataFrame: statement = self.params.get_or_throw("statement", str) diff --git a/fugue/workflow/workflow.py b/fugue/workflow/workflow.py index cc8d122e..f5247182 100644 --- a/fugue/workflow/workflow.py +++ b/fugue/workflow/workflow.py @@ -14,6 +14,8 @@ Load, Rename, RunJoin, + Distinct, + RunSetOperation, RunSQLSelect, RunTransformer, Save, @@ -380,6 +382,62 @@ def cross_join(self: TDF, *dfs: Any) -> TDF: """ return self.join(*dfs, how="cross") + def union(self: TDF, *dfs: Any, distinct: bool = True) -> TDF: + """Union this dataframe with ``dfs``. + + :param dfs: |DataFramesLikeObject| + :param distinct: whether to perform `distinct` after union, + default to True + :return: unioned dataframe + + :Notice: + + Currently, all dataframes in ``dfs`` must have identical schema, otherwise + exception will be thrown. + """ + df = self.workflow.union(self, *dfs, distinct=distinct) + return self._to_self_type(df) + + def subtract(self: TDF, *dfs: Any, distinct: bool = True) -> TDF: + """Subtract ``dfs`` from this dataframe. + + :param dfs: |DataFramesLikeObject| + :param distinct: whether to perform `distinct` after subtraction, + default to True + :return: subtracted dataframe + + :Notice: + + Currently, all dataframes in ``dfs`` must have identical schema, otherwise + exception will be thrown. + """ + df = self.workflow.subtract(self, *dfs, distinct=distinct) + return self._to_self_type(df) + + def intersect(self: TDF, *dfs: Any, distinct: bool = True) -> TDF: + """Intersect this dataframe with ``dfs``. + + :param dfs: |DataFramesLikeObject| + :param distinct: whether to perform `distinct` after intersection, + default to True + :return: intersected dataframe + + :Notice: + + Currently, all dataframes in ``dfs`` must have identical schema, otherwise + exception will be thrown. + """ + df = self.workflow.intersect(self, *dfs, distinct=distinct) + return self._to_self_type(df) + + def distinct(self: TDF) -> TDF: + """Get distinct dataframe. Equivalent to ``SELECT DISTINCT * FROM df`` + + :return: dataframe with unique records + """ + df = self.workflow.process(self, using=Distinct) + return self._to_self_type(df) + def checkpoint(self: TDF, namespace: Any = None) -> TDF: """[CURRENTLY NO EFFECT] set checkpoint for the current dataframe @@ -885,7 +943,7 @@ def show( def join( self, *dfs: Any, how: str, on: Optional[Iterable[str]] = None - ) -> WorkflowDataFrame: # pragma: no cover + ) -> WorkflowDataFrame: """Join dataframes. |ReadJoin| @@ -899,6 +957,70 @@ def join( _on: List[str] = list(on) if on is not None else [] return self.process(*dfs, using=RunJoin, params=dict(how=how, on=_on)) + def set_op(self, how: str, *dfs: Any, distinct: bool = True) -> WorkflowDataFrame: + """Union, subtract or intersect dataframes. + + :param how: can accept ``union``, ``left_semi``, ``anti``, ``left_anti``, + ``inner``, ``left_outer``, ``right_outer``, ``full_outer``, ``cross`` + :param dfs: |DataFramesLikeObject| + :param distinct: whether to perform `distinct` after the set operation, + default to True + :return: result dataframe of the set operation + + :Notice: + + Currently, all dataframes in ``dfs`` must have identical schema, otherwise + exception will be thrown. + """ + return self.process( + *dfs, using=RunSetOperation, params=dict(how=how, distinct=distinct) + ) + + def union(self, *dfs: Any, distinct: bool = True) -> WorkflowDataFrame: + """Union dataframes in ``dfs``. + + :param dfs: |DataFramesLikeObject| + :param distinct: whether to perform `distinct` after union, + default to True + :return: unioned dataframe + + :Notice: + + Currently, all dataframes in ``dfs`` must have identical schema, otherwise + exception will be thrown. + """ + return self.set_op("union", *dfs, distinct=distinct) + + def subtract(self, *dfs: Any, distinct: bool = True) -> WorkflowDataFrame: + """Subtract ``dfs[1:]`` from ``dfs[0]``. + + :param dfs: |DataFramesLikeObject| + :param distinct: whether to perform `distinct` after subtraction, + default to True + :return: subtracted dataframe + + :Notice: + + Currently, all dataframes in ``dfs`` must have identical schema, otherwise + exception will be thrown. + """ + return self.set_op("subtract", *dfs, distinct=distinct) + + def intersect(self, *dfs: Any, distinct: bool = True) -> WorkflowDataFrame: + """Intersect dataframes in ``dfs``. + + :param dfs: |DataFramesLikeObject| + :param distinct: whether to perform `distinct` after intersection, + default to True + :return: intersected dataframe + + :Notice: + + Currently, all dataframes in ``dfs`` must have identical schema, otherwise + exception will be thrown. + """ + return self.set_op("intersect", *dfs, distinct=distinct) + def zip( self, *dfs: Any, diff --git a/fugue_dask/_utils.py b/fugue_dask/_utils.py index 3975579e..1d198f71 100644 --- a/fugue_dask/_utils.py +++ b/fugue_dask/_utils.py @@ -3,10 +3,10 @@ import dask.dataframe as pd import pandas import pyarrow as pa -from triad.utils.pandas_like import PandasLikeUtils +from qpd_dask.engine import DaskUtils as DaskUtilsBase -class DaskUtils(PandasLikeUtils[pd.DataFrame]): +class DaskUtils(DaskUtilsBase): def as_arrow( self, df: pd.DataFrame, schema: Optional[pa.Schema] = None ) -> pa.Table: diff --git a/fugue_dask/execution_engine.py b/fugue_dask/execution_engine.py index 7b3a78be..cffe2ee8 100644 --- a/fugue_dask/execution_engine.py +++ b/fugue_dask/execution_engine.py @@ -1,31 +1,31 @@ import logging -from typing import Any, Callable, Iterable, List, Optional, Union +from typing import Any, Callable, List, Optional, Union import dask.dataframe as pd -import pyarrow as pa +from fugue._utils.io import load_df, save_df from fugue.collections.partition import ( EMPTY_PARTITION_SPEC, PartitionCursor, PartitionSpec, ) from fugue.constants import KEYWORD_CORECOUNT, KEYWORD_ROWCOUNT -from fugue.dataframe import DataFrame, LocalDataFrame, PandasDataFrame, DataFrames +from fugue.dataframe import DataFrame, DataFrames, LocalDataFrame, PandasDataFrame from fugue.dataframe.utils import get_join_schemas from fugue.execution.execution_engine import ( _DEFAULT_JOIN_KEYS, ExecutionEngine, SQLEngine, ) -from fugue._utils.io import load_df, save_df -from fugue_dask.dataframe import DEFAULT_CONFIG, DaskDataFrame -from fugue_dask._utils import DASK_UTILS +from qpd_dask import run_sql_on_dask from triad.collections import Schema from triad.collections.dict import ParamDict from triad.collections.fs import FileSystem from triad.utils.assertion import assert_or_throw from triad.utils.hash import to_uuid from triad.utils.threading import RunOnce -from qpd_dask import run_sql_on_dask + +from fugue_dask._utils import DaskUtils +from fugue_dask.dataframe import DEFAULT_CONFIG, DaskDataFrame class QPDDaskEngine(SQLEngine): @@ -34,8 +34,8 @@ class QPDDaskEngine(SQLEngine): :param execution_engine: the execution engine this sql engine will run on """ - def __init__(self, execution_engine: ExecutionEngine) -> None: - return super().__init__(execution_engine) + def __init__(self, execution_engine: ExecutionEngine): + super().__init__(execution_engine) def select(self, dfs: DataFrames, statement: str) -> DataFrame: dask_dfs = { @@ -83,6 +83,11 @@ def fs(self) -> FileSystem: def default_sql_engine(self) -> SQLEngine: return self._default_sql_engine + @property + def pl_utils(self) -> DaskUtils: + """Pandas-like dataframe utils""" + return DaskUtils() + def stop(self) -> None: # pragma: no cover """It does nothing""" return @@ -195,7 +200,7 @@ def _map(pdf: Any) -> pd.DataFrame: result = pdf.native.map_partitions(_map, meta=output_schema.pandas_dtype) else: df = self.repartition(df, PartitionSpec(num=partition_spec.num_partitions)) - result = DASK_UTILS.safe_groupby_apply( + result = self.pl_utils.safe_groupby_apply( df.native, partition_spec.partition_by, _map, @@ -218,60 +223,72 @@ def join( metadata: Any = None, ) -> DataFrame: key_schema, output_schema = get_join_schemas(df1, df2, how=how, on=on) - how = how.lower().replace("_", "").replace(" ", "") - if how == "cross": - d1 = self.to_df(df1).native - d2 = self.to_df(df2).native - d1["__cross_join_index__"] = 1 - d2["__cross_join_index__"] = 1 - d = d1.merge(d2, on=("__cross_join_index__")).drop( - "__cross_join_index__", axis=1 - ) - return DaskDataFrame(d.reset_index(drop=True), output_schema, metadata) - if how in ["semi", "leftsemi"]: - d1 = self.to_df(df1).native - d2 = self.to_df(df2).native[key_schema.names] - d = d1.merge(d2, on=key_schema.names, how="inner") - return DaskDataFrame(d.reset_index(drop=True), output_schema, metadata) - if how in ["anti", "leftanti"]: - d1 = self.to_df(df1).native - d2 = self.to_df(df2).native[key_schema.names] - if DASK_UTILS.empty(d1) or DASK_UTILS.empty(d2): - return df1 - d2["__anti_join_dummy__"] = 1.0 - d = d1.merge(d2, on=key_schema.names, how="left") - d = d[d["__anti_join_dummy__"].isnull()] - return DaskDataFrame( - d.drop(["__anti_join_dummy__"], axis=1).reset_index(drop=True), - output_schema, - metadata, - ) - fix_left, fix_right = False, False - if how in ["leftouter"]: - how = "left" - self._validate_outer_joinable(df2.schema, key_schema) - fix_right = True - if how in ["rightouter"]: - how = "right" - self._validate_outer_joinable(df1.schema, key_schema) - fix_left = True - if how in ["fullouter"]: - how = "outer" - self._validate_outer_joinable(df1.schema, key_schema) - self._validate_outer_joinable(df2.schema, key_schema) - fix_left, fix_right = True, True - d1 = self.to_df(df1).native - d2 = self.to_df(df2).native - d = d1.merge(d2, on=key_schema.names, how=how) - if fix_left: - d = self._fix_nan( - d, output_schema, df1.schema.exclude(list(df2.schema.keys())).keys() - ) - if fix_right: - d = self._fix_nan( - d, output_schema, df2.schema.exclude(list(df1.schema.keys())).keys() - ) - return DaskDataFrame(d.reset_index(drop=True), output_schema, metadata) + d = self.pl_utils.join( + self.to_df(df1).native, + self.to_df(df2).native, + join_type=how, + on=key_schema.names, + ) + return DaskDataFrame(d, output_schema, metadata) + + def union( + self, + df1: DataFrame, + df2: DataFrame, + distinct: bool = True, + metadata: Any = None, + ) -> DataFrame: + assert_or_throw( + df1.schema == df2.schema, ValueError(f"{df1.schema} != {df2.schema}") + ) + d = self.pl_utils.union( + self.to_df(df1).native, self.to_df(df2).native, unique=distinct + ) + return DaskDataFrame(d, df1.schema, metadata) + + def subtract( + self, + df1: DataFrame, + df2: DataFrame, + distinct: bool = True, + metadata: Any = None, + ) -> DataFrame: + assert_or_throw( + distinct, NotImplementedError("EXCEPT ALL for DaskExecutionEngine") + ) + assert_or_throw( + df1.schema == df2.schema, ValueError(f"{df1.schema} != {df2.schema}") + ) + d = self.pl_utils.except_df( + self.to_df(df1).native, self.to_df(df2).native, unique=distinct + ) + return DaskDataFrame(d, df1.schema, metadata) + + def intersect( + self, + df1: DataFrame, + df2: DataFrame, + distinct: bool = True, + metadata: Any = None, + ) -> DataFrame: + assert_or_throw( + distinct, NotImplementedError("INTERSECT ALL for DaskExecutionEngine") + ) + assert_or_throw( + df1.schema == df2.schema, ValueError(f"{df1.schema} != {df2.schema}") + ) + d = self.pl_utils.intersect( + self.to_df(df1).native, self.to_df(df2).native, unique=distinct + ) + return DaskDataFrame(d, df1.schema, metadata) + + def distinct( + self, + df: DataFrame, + metadata: Any = None, + ) -> DataFrame: + d = self.pl_utils.drop_duplicates(self.to_df(df).native) + return DaskDataFrame(d, df.schema, metadata) def load_df( self, @@ -302,23 +319,3 @@ def save_df( ) df = self.to_df(df).as_local() save_df(df, path, format_hint=format_hint, mode=mode, fs=self.fs, **kwargs) - - def _validate_outer_joinable(self, schema: Schema, key_schema: Schema) -> None: - # TODO: this is to prevent wrong behavior of pandas, we may not need it - # s = schema - key_schema - # if any(pa.types.is_boolean(v) or pa.types.is_integer(v) for v in s.types): - # raise NotImplementedError( - # f"{schema} excluding {key_schema} is not outer joinable" - # ) - return - - def _fix_nan( - self, df: pd.DataFrame, schema: Schema, keys: Iterable[str] - ) -> pd.DataFrame: - if DASK_UTILS.empty(df): - return df - for key in keys: - if pa.types.is_floating(schema[key].type): - continue - df[key] = df[key].where(df[key].notnull(), None) - return df diff --git a/fugue_spark/execution_engine.py b/fugue_spark/execution_engine.py index 32a5d71d..3ef94fbb 100644 --- a/fugue_spark/execution_engine.py +++ b/fugue_spark/execution_engine.py @@ -284,6 +284,67 @@ def join( res = d1.join(d2, on=key_schema.names, how=how).select(*cols) return self.to_df(res, output_schema, metadata) + def union( + self, + df1: DataFrame, + df2: DataFrame, + distinct: bool = True, + metadata: Any = None, + ) -> DataFrame: + assert_or_throw( + df1.schema == df2.schema, ValueError(f"{df1.schema} != {df2.schema}") + ) + d1 = self.to_df(df1).native + d2 = self.to_df(df2).native + d = d1.union(d2) + if distinct: + d = d.distinct() + return self.to_df(d, df1.schema, metadata) + + def subtract( + self, + df1: DataFrame, + df2: DataFrame, + distinct: bool = True, + metadata: Any = None, + ) -> DataFrame: + assert_or_throw( + df1.schema == df2.schema, ValueError(f"{df1.schema} != {df2.schema}") + ) + d1 = self.to_df(df1).native + d2 = self.to_df(df2).native + if distinct: + d: Any = d1.subtract(d2) + else: # pragma: no cover + d = d1.exceptAll(d2) + return self.to_df(d, df1.schema, metadata) + + def intersect( + self, + df1: DataFrame, + df2: DataFrame, + distinct: bool = True, + metadata: Any = None, + ) -> DataFrame: + assert_or_throw( + df1.schema == df2.schema, ValueError(f"{df1.schema} != {df2.schema}") + ) + d1 = self.to_df(df1).native + d2 = self.to_df(df2).native + if distinct: + d: Any = d1.intersect(d2) + else: # pragma: no cover + d = d1.intersectAll(d2) + return self.to_df(d, df1.schema, metadata) + + def distinct( + self, + df: DataFrame, + metadata: Any = None, + ) -> DataFrame: + d = self.to_df(df).native.distinct() + return self.to_df(d, df.schema, metadata) + def load_df( self, path: Union[str, List[str]], diff --git a/fugue_test/builtin_suite.py b/fugue_test/builtin_suite.py index 534e3d17..d3ddc386 100644 --- a/fugue_test/builtin_suite.py +++ b/fugue_test/builtin_suite.py @@ -28,6 +28,9 @@ class BuiltInTests(object): test suite than :class:`~fugue_test.execution_suite.ExecutionEngineTests`. Any new :class:`~fugue.execution.execution_engine.ExecutionEngine` should also pass this test suite. + + Whenever you add method to FugueWorkflow and WorkflowDataFrame, you should + add correspondent tests here """ class Tests(TestCase): @@ -357,6 +360,110 @@ def test_select(self): aa = dag.select("* FROM", a) dag.select("* FROM", b).assert_eq(aa) + def test_union(self): + with self.dag() as dag: + a = dag.df([[1, 10], [2, None], [2, None]], "x:long,y:double") + b = dag.df([[2, None], [2, 20]], "x:long,y:double") + c = dag.df([[1, 10], [2, 20]], "x:long,y:double") + a.union().assert_eq(a) + a.union(b, c).assert_eq( + ArrayDataFrame( + [ + [1, 10], + [2, None], + [2, 20], + ], + "x:long,y:double", + ) + ) + a.union(b, c, distinct=False).assert_eq( + ArrayDataFrame( + [ + [1, 10], + [2, None], + [2, None], + [2, None], + [2, 20], + [1, 10], + [2, 20], + ], + "x:long,y:double", + ) + ) + + def test_intersect(self): + with self.dag() as dag: + a = dag.df([[1, 10], [2, None], [2, None]], "x:long,y:double") + b = dag.df([[2, None], [2, 20]], "x:long,y:double") + c = dag.df([[1, 10], [2, 20]], "x:long,y:double") + # d = dag.df([[1, 10], [2, 20], [2, None]], "x:long,y:double") + a.intersect(b).assert_eq( + ArrayDataFrame( + [[2, None]], + "x:long,y:double", + ) + ) + a.intersect(b, c).assert_eq( + ArrayDataFrame( + [], + "x:long,y:double", + ) + ) + # TODO: INTERSECT ALL is not implemented (QPD issue) + # a.intersect(b, distinct=False).assert_eq( + # ArrayDataFrame( + # [[2, None], [2, None]], + # "x:long,y:double", + # ) + # ) + # a.intersect(b, d, distinct=False).assert_eq( + # ArrayDataFrame( + # [[2, None], [2, None]], + # "x:long,y:double", + # ) + # ) + + def test_subtract(self): + with self.dag() as dag: + a = dag.df([[1, 10], [2, None], [2, None]], "x:long,y:double") + b = dag.df([[2, None], [2, 20]], "x:long,y:double") + c = dag.df([[1, 10], [2, 20]], "x:long,y:double") + a.subtract(b).assert_eq( + ArrayDataFrame( + [[1, 10]], + "x:long,y:double", + ) + ) + a.subtract(c).assert_eq( + ArrayDataFrame( + [[2, None]], + "x:long,y:double", + ) + ) + # # TODO: EXCEPT ALL is not implemented (QPD issue) + # a.subtract(c, distinct=False).assert_eq( + # ArrayDataFrame( + # [[2, None], [2, None]], + # "x:long,y:double", + # ) + # ) + a.subtract(b, c).assert_eq( + ArrayDataFrame( + [], + "x:long,y:double", + ) + ) + + def test_distinct(self): + with self.dag() as dag: + a = dag.df([[1, 10], [2, None], [2, None]], "x:long,y:double") + a.distinct().assert_eq( + ArrayDataFrame( + [[1, 10], [2, None]], + "x:long,y:double", + ) + ) + def test_col_ops(self): with self.dag() as dag: a = dag.df([[1, 10], [2, 20]], "x:long,y:long") diff --git a/fugue_test/execution_suite.py b/fugue_test/execution_suite.py index a4f552bb..edf23926 100644 --- a/fugue_test/execution_suite.py +++ b/fugue_test/execution_suite.py @@ -133,22 +133,50 @@ def with_nat(cursor, data): return PandasDataFrame(df, schema) e = self.engine + # test with multiple key with null values + o = ArrayDataFrame( + [[1, None, 1], [1, None, 0], [None, None, 1]], + "a:double,b:double,c:int", + dict(a=1), + ) + c = e.map( + o, select_top, o.schema, PartitionSpec(by=["a", "b"], presort="c") + ) + df_eq( + c, + [[1, None, 0], [None, None, 1]], + "a:double,b:double,c:int", + throw=True, + ) # test datetime with nat dt = datetime.now() o = ArrayDataFrame( - [[dt, 2], [None, 2], [None, 1], [dt, 5], [None, 4]], - "a:datetime,b:int", + [ + [dt, 2, 1], + [None, 2, None], + [None, 1, None], + [dt, 5, 1], + [None, 4, None], + ], + "a:datetime,b:int,c:double", dict(a=1), ) c = e.map( - o, select_top, o.schema, PartitionSpec(by=["a"], presort="b DESC") + o, select_top, o.schema, PartitionSpec(by=["a", "c"], presort="b DESC") + ) + df_eq( + c, + [[None, 4, None], [dt, 5, 1]], + "a:datetime,b:int,c:double", + throw=True, + ) + d = e.map( + c, with_nat, "a:datetime,b:int,c:double,nat:datetime", PartitionSpec() ) - df_eq(c, [[None, 4], [dt, 5]], "a:datetime,b:int", throw=True) - d = e.map(c, with_nat, "a:datetime,b:int,nat:datetime", PartitionSpec()) df_eq( d, - [[None, 4, None], [dt, 5, None]], - "a:datetime,b:int,nat:datetime", + [[None, 4, None, None], [dt, 5, 1, None]], + "a:datetime,b:int,c:double,nat:datetime", throw=True, ) # test list @@ -247,7 +275,7 @@ def test__join_outer(self): c, [[1, "2", 6.0], [3, "4", None]], "a:int,b:str,c:double", throw=True ) c = e.join(b, a, how="left_outer", on=["a"]) - assert c.as_pandas().values.tolist()[1][2] is None + # assert c.as_pandas().values.tolist()[1][2] is None df_eq( c, [[6.0, 1, "2"], [2.0, 7, None]], "c:double,a:int,b:str", throw=True ) @@ -255,7 +283,7 @@ def test__join_outer(self): a = e.to_df([[1, "2"], [3, "4"]], "a:int,b:str") b = e.to_df([["6", 1], ["2", 7]], "c:str,a:int") c = e.join(a, b, how="right_outer", on=["a"]) - assert c.as_pandas().values.tolist()[1][1] is None + # assert c.as_pandas().values.tolist()[1][1] is None df_eq(c, [[1, "2", "6"], [7, None, "2"]], "a:int,b:str,c:str", throw=True) c = e.join(a, b, how="full_outer", on=["a"]) @@ -327,6 +355,95 @@ def test__join_anti(self): c = e.join(a, b, how="anti", on=["a"]) df_eq(c, [], "a:int,b:int", throw=True) + def test__join_with_null_keys(self): + # SQL will not match null values + e = self.engine + a = e.to_df([[1, 2, 3], [4, None, 6]], "a:double,b:double,c:int") + b = e.to_df([[1, 2, 33], [4, None, 63]], "a:double,b:double,d:int") + c = e.join(a, b, how="INNER") + df_eq(c, [[1, 2, 3, 33]], "a:double,b:double,c:int,d:int", throw=True) + + def test_union(self): + e = self.engine + a = e.to_df([[1, 2, 3], [4, None, 6]], "a:double,b:double,c:int") + b = e.to_df([[1, 2, 33], [4, None, 6]], "a:double,b:double,c:int") + c = e.union(a, b, metadata=dict(a=1)) + df_eq( + c, + [[1, 2, 3], [4, None, 6], [1, 2, 33]], + "a:double,b:double,c:int", + metadata=dict(a=1), + throw=True, + ) + c = e.union(a, b, distinct=False) + df_eq( + c, + [[1, 2, 3], [4, None, 6], [1, 2, 33], [4, None, 6]], + "a:double,b:double,c:int", + throw=True, + ) + + def test_subtract(self): + e = self.engine + a = e.to_df([[1, 2, 3], [1, 2, 3], [4, None, 6]], "a:double,b:double,c:int") + b = e.to_df([[1, 2, 33], [4, None, 6]], "a:double,b:double,c:int") + c = e.subtract(a, b, metadata=dict(a=1)) + df_eq( + c, + [[1, 2, 3]], + "a:double,b:double,c:int", + metadata=dict(a=1), + throw=True, + ) + # TODO: EXCEPT ALL is not implemented (QPD issue) + # c = e.subtract(a, b, distinct=False) + # df_eq( + # c, + # [[1, 2, 3], [1, 2, 3]], + # "a:double,b:double,c:int", + # throw=True, + # ) + + def test_intersect(self): + e = self.engine + a = e.to_df( + [[1, 2, 3], [4, None, 6], [4, None, 6]], "a:double,b:double,c:int" + ) + b = e.to_df( + [[1, 2, 33], [4, None, 6], [4, None, 6], [4, None, 6]], + "a:double,b:double,c:int", + ) + c = e.intersect(a, b, metadata=dict(a=1)) + df_eq( + c, + [[4, None, 6]], + "a:double,b:double,c:int", + metadata=dict(a=1), + throw=True, + ) + # TODO: INTERSECT ALL is not implemented (QPD issue) + # c = e.intersect(a, b, distinct=False) + # df_eq( + # c, + # [[4, None, 6], [4, None, 6]], + # "a:double,b:double,c:int", + # throw=True, + # ) + + def test_distinct(self): + e = self.engine + a = e.to_df( + [[4, None, 6], [1, 2, 3], [4, None, 6]], "a:double,b:double,c:int" + ) + c = e.distinct(a, metadata=dict(a=1)) + df_eq( + c, + [[4, None, 6], [1, 2, 3]], + "a:double,b:double,c:int", + metadata=dict(a=1), + throw=True, + ) + def test__serialize_by_partition(self): e = self.engine a = e.to_df([[1, 2], [3, 4], [1, 5]], "a:int,b:int") diff --git a/fugue_version/__init__.py b/fugue_version/__init__.py index 6a9beea8..3d26edf7 100644 --- a/fugue_version/__init__.py +++ b/fugue_version/__init__.py @@ -1 +1 @@ -__version__ = "0.4.0" +__version__ = "0.4.1" diff --git a/setup.py b/setup.py index 301ec909..f37216e5 100644 --- a/setup.py +++ b/setup.py @@ -20,7 +20,7 @@ install_requires=[ "triad>=0.4.0", "adagio>=0.2.1", - "qpd>=0.2.2", + "qpd>=0.2.4", "sqlalchemy", "pyarrow>=0.15.1", ],