Skip to content

Commit

Permalink
output transform (#97)
Browse files Browse the repository at this point in the history
* output transform

* update

* fugue sql update

* update sql

* update comment
  • Loading branch information
goodwanghan committed Nov 6, 2020
1 parent 53d03e9 commit 3a8eea5
Show file tree
Hide file tree
Showing 23 changed files with 8,390 additions and 7,248 deletions.
82 changes: 75 additions & 7 deletions fugue/_utils/interfaceless.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import copy
import inspect
import re
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, get_type_hints
Expand All @@ -16,8 +17,8 @@
from triad.collections import Schema
from triad.utils.assertion import assert_or_throw
from triad.utils.convert import get_full_type_path, to_type
from triad.utils.iter import EmptyAwareIterable, make_empty_aware
from triad.utils.hash import to_uuid
from triad.utils.iter import EmptyAwareIterable, make_empty_aware

_COMMENT_SCHEMA_ANNOTATION = "schema"

Expand Down Expand Up @@ -83,11 +84,27 @@ def dummy():
return res.replace(" ", "")


def is_class_method(func: Callable) -> bool:
sig = inspect.signature(func)
# TODO: this is not the best way
return "self" in sig.parameters


class FunctionWrapper(object):
def __init__(self, func: Callable, params_re: str = ".*", return_re: str = ".*"):
self._params, self._rt = _parse_function(func, params_re, return_re)
def __init__(
self,
func: Callable,
params_re: str = ".*",
return_re: str = ".*",
):
self._class_method, self._params, self._rt = _parse_function(
func, params_re, return_re
)
self._func = func

def __deepcopy__(self, memo: Any) -> Any:
return copy.copy(self)

def __call__(self, *args: Any, **kwargs: Any) -> Any:
return self._func(*args, **kwargs)

Expand All @@ -108,6 +125,7 @@ def run( # noqa: C901
kwargs: Dict[str, Any],
ignore_unknown: bool = False,
output_schema: Any = None,
output: bool = True,
) -> Any:
p: Dict[str, Any] = {}
for i in range(len(args)):
Expand Down Expand Up @@ -136,20 +154,29 @@ def run( # noqa: C901
elif not ignore_unknown and len(p) > 0:
raise ValueError(f"{p} are not acceptable parameters")
rt = self._func(**rargs)
if not output:
if isinstance(self._rt, _DataFrameParamBase):
self._rt.count(rt)
return
if isinstance(self._rt, _DataFrameParamBase):
return self._rt.to_output_df(rt, output_schema)
return rt


def _parse_function(
func: Callable, params_re: str = ".*", return_re: str = ".*"
) -> Tuple[IndexedOrderedDict[str, "_FuncParam"], "_FuncParam"]:
) -> Tuple[bool, IndexedOrderedDict[str, "_FuncParam"], "_FuncParam"]:
sig = inspect.signature(func)
annotations = get_type_hints(func)
res: IndexedOrderedDict[str, "_FuncParam"] = IndexedOrderedDict()
class_method = False
for k, w in sig.parameters.items():
anno = annotations.get(k, w.annotation)
res[k] = _parse_param(anno, w)
if k == "self":
res[k] = _SelfParam(w)
class_method = True
else:
anno = annotations.get(k, w.annotation)
res[k] = _parse_param(anno, w)
anno = annotations.get("return", sig.return_annotation)
rt = _parse_param(anno, None, none_as_other=False)
params_str = "".join(x.code for x in res.values())
Expand All @@ -159,7 +186,7 @@ def _parse_function(
assert_or_throw(
re.match(return_re, rt.code), TypeError(f"Return type not valid {rt}")
)
return res, rt
return class_method, res, rt


def _parse_param( # noqa: C901
Expand Down Expand Up @@ -238,6 +265,9 @@ def to_input_data(self, df: DataFrame) -> Any: # pragma: no cover
def to_output_df(self, df: Any, schema: Any) -> DataFrame: # pragma: no cover
raise NotImplementedError

def count(self, df: Any) -> int: # pragma: no cover
raise NotImplementedError


class _DataFrameParam(_DataFrameParamBase):
def __init__(self, param: Optional[inspect.Parameter]):
Expand All @@ -253,6 +283,12 @@ def to_output_df(self, output: DataFrame, schema: Any) -> DataFrame:
)
return output

def count(self, df: DataFrame) -> int:
if df.is_bounded:
return df.count()
else:
return sum(1 for _ in df.as_array_iterable())


class _LocalDataFrameParam(_DataFrameParamBase):
def __init__(self, param: Optional[inspect.Parameter]):
Expand All @@ -268,6 +304,12 @@ def to_output_df(self, output: LocalDataFrame, schema: Any) -> DataFrame:
)
return output

def count(self, df: LocalDataFrame) -> int:
if df.is_bounded:
return df.count()
else:
return sum(1 for _ in df.as_array_iterable())


class _ListListParam(_DataFrameParamBase):
def __init__(self, param: Optional[inspect.Parameter]):
Expand All @@ -279,6 +321,9 @@ def to_input_data(self, df: DataFrame) -> List[List[Any]]:
def to_output_df(self, output: List[List[Any]], schema: Any) -> DataFrame:
return ArrayDataFrame(output, schema)

def count(self, df: List[List[Any]]) -> int:
return len(df)


class _IterableListParam(_DataFrameParamBase):
def __init__(self, param: Optional[inspect.Parameter]):
Expand All @@ -290,6 +335,9 @@ def to_input_data(self, df: DataFrame) -> Iterable[List[Any]]:
def to_output_df(self, output: Iterable[List[Any]], schema: Any) -> DataFrame:
return IterableDataFrame(output, schema)

def count(self, df: Iterable[List[Any]]) -> int:
return sum(1 for _ in df)


class _EmptyAwareIterableListParam(_DataFrameParamBase):
def __init__(self, param: Optional[inspect.Parameter]):
Expand All @@ -303,6 +351,9 @@ def to_output_df(
) -> DataFrame:
return IterableDataFrame(output, schema)

def count(self, df: EmptyAwareIterable[List[Any]]) -> int:
return sum(1 for _ in df)


class _ListDictParam(_DataFrameParamBase):
def __init__(self, param: Optional[inspect.Parameter]):
Expand All @@ -320,6 +371,9 @@ def get_all() -> Iterable[List[Any]]:

return IterableDataFrame(get_all(), schema)

def count(self, df: List[Dict[str, Any]]) -> int:
return len(df)


class _IterableDictParam(_DataFrameParamBase):
def __init__(self, param: Optional[inspect.Parameter]):
Expand All @@ -337,6 +391,9 @@ def get_all() -> Iterable[List[Any]]:

return IterableDataFrame(get_all(), schema)

def count(self, df: Iterable[Dict[str, Any]]) -> int:
return sum(1 for _ in df)


class _EmptyAwareIterableDictParam(_DataFrameParamBase):
def __init__(self, param: Optional[inspect.Parameter]):
Expand All @@ -356,6 +413,9 @@ def get_all() -> Iterable[List[Any]]:

return IterableDataFrame(get_all(), schema)

def count(self, df: EmptyAwareIterable[Dict[str, Any]]) -> int:
return sum(1 for _ in df)


class _PandasParam(_DataFrameParamBase):
def __init__(self, param: Optional[inspect.Parameter]):
Expand All @@ -367,12 +427,20 @@ def to_input_data(self, df: DataFrame) -> pd.DataFrame:
def to_output_df(self, output: pd.DataFrame, schema: Any) -> DataFrame:
return PandasDataFrame(output, schema)

def count(self, df: pd.DataFrame) -> int:
return df.shape[0]


class _NoneParam(_FuncParam):
def __init__(self, param: Optional[inspect.Parameter]):
super().__init__(param, "NoneType", "n")


class _SelfParam(_FuncParam):
def __init__(self, param: Optional[inspect.Parameter]):
super().__init__(param, "[Self]", "0")


class _OtherParam(_FuncParam):
def __init__(self, param: Optional[inspect.Parameter]):
super().__init__(param, "[Other]", "x")
Expand Down
14 changes: 9 additions & 5 deletions fugue/extensions/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
# flake8: noqa
from fugue.extensions.creator import Creator, creator
from fugue.extensions.outputter import Outputter, outputter
from fugue.extensions.processor import Processor, processor
from fugue.extensions.transformer import (
Transformer,
CoTransformer,
transformer,
OutputCoTransformer,
OutputTransformer,
Transformer,
cotransformer,
output_cotransformer,
output_transformer,
transformer,
)
from fugue.extensions.creator import Creator, creator
from fugue.extensions.processor import Processor, processor
from fugue.extensions.outputter import Outputter, outputter
1 change: 1 addition & 0 deletions fugue/extensions/_builtins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from fugue.extensions._builtins.outputters import (
AssertEqual,
AssertNotEqual,
RunOutputTransformer,
Save,
Show,
)
Expand Down
122 changes: 118 additions & 4 deletions fugue/extensions/_builtins/outputters.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
from threading import RLock
from typing import List, no_type_check

from fugue.dataframe import DataFrames
from fugue.dataframe.utils import _df_eq as df_eq
from fugue.collections.partition import PartitionCursor
from fugue.dataframe import DataFrame, DataFrames, LocalDataFrame
from fugue.dataframe.array_dataframe import ArrayDataFrame
from fugue.dataframe.utils import _df_eq, to_local_bounded_df
from fugue.exceptions import FugueWorkflowError
from fugue.execution.execution_engine import _generate_comap_empty_dfs
from fugue.extensions.outputter import Outputter
from fugue.extensions.transformer.convert import _to_output_transformer
from fugue.extensions.transformer.transformer import CoTransformer, Transformer
from triad.collections.dict import ParamDict
from triad.collections.schema import Schema
from triad.utils.assertion import assert_or_throw
from triad.utils.convert import to_type


class Show(Outputter):
Expand All @@ -31,15 +40,15 @@ def process(self, dfs: DataFrames) -> None:
assert_or_throw(len(dfs) > 1, FugueWorkflowError("can't accept single input"))
expected = dfs[0]
for i in range(1, len(dfs)):
df_eq(expected, dfs[i], throw=True, **self.params)
_df_eq(expected, dfs[i], throw=True, **self.params)


class AssertNotEqual(Outputter):
def process(self, dfs: DataFrames) -> None:
assert_or_throw(len(dfs) > 1, FugueWorkflowError("can't accept single input"))
expected = dfs[0]
for i in range(1, len(dfs)):
assert not df_eq(expected, dfs[i], throw=False, **self.params)
assert not _df_eq(expected, dfs[i], throw=False, **self.params)


class Save(Outputter):
Expand All @@ -61,3 +70,108 @@ def process(self, dfs: DataFrames) -> None:
force_single=force_single,
**kwargs
)


class RunOutputTransformer(Outputter):
@no_type_check
def process(self, dfs: DataFrames) -> None:
df = dfs[0]
tf = _to_output_transformer(
self.params.get_or_none("transformer", object),
)
tf._workflow_conf = self.execution_engine.conf
tf._params = self.params.get("params", ParamDict()) # type: ignore
tf._partition_spec = self.partition_spec # type: ignore
ie = self.params.get("ignore_errors", [])
self._ignore_errors = [to_type(x, Exception) for x in ie]
tf.validate_on_runtime(df)
if isinstance(tf, Transformer):
self.transform(df, tf)
else:
self.cotransform(df, tf)

def transform(self, df: DataFrame, tf: Transformer) -> None:
tf._key_schema = self.partition_spec.get_key_schema(df.schema) # type: ignore
tf._output_schema = Schema(tf.get_output_schema(df)) # type: ignore
tr = _TransformerRunner(df, tf, self._ignore_errors) # type: ignore
df = self.execution_engine.map(
df=df,
map_func=tr.run,
output_schema=tf.output_schema, # type: ignore
partition_spec=tf.partition_spec,
on_init=tr.on_init,
)
self.execution_engine.persist(df, lazy=False)

@no_type_check
def cotransform(self, df: DataFrame, tf: CoTransformer) -> None:
assert_or_throw(
df.metadata.get("serialized", False), "must use serialized dataframe"
)
tf._key_schema = df.schema - list(df.metadata["serialized_cols"].values())
# TODO: currently, get_output_schema only gets empty dataframes
empty_dfs = _generate_comap_empty_dfs(
df.metadata["schemas"], df.metadata.get("serialized_has_name", False)
)
tf._output_schema = Schema(tf.get_output_schema(empty_dfs))
tr = _CoTransformerRunner(df, tf, self._ignore_errors)
df = self.execution_engine.comap(
df=df,
map_func=tr.run,
output_schema=tf.output_schema,
partition_spec=tf.partition_spec,
on_init=tr.on_init,
)
self.execution_engine.persist(df, lazy=False)


class _TransformerRunner(object):
def __init__(
self, df: DataFrame, transformer: Transformer, ignore_errors: List[type]
):
self.schema = df.schema
self.metadata = df.metadata
self.transformer = transformer
self.ignore_errors = tuple(ignore_errors)

def run(self, cursor: PartitionCursor, df: LocalDataFrame) -> LocalDataFrame:
self.transformer._cursor = cursor # type: ignore
df._metadata = self.metadata
try:
to_local_bounded_df(self.transformer.transform(df))
return ArrayDataFrame([], self.transformer.output_schema)
except self.ignore_errors: # type: ignore
return ArrayDataFrame([], self.transformer.output_schema)

def on_init(self, partition_no: int, df: DataFrame) -> None:
s = self.transformer.partition_spec
self.transformer._cursor = s.get_cursor( # type: ignore
self.schema, partition_no
)
df._metadata = self.metadata
self.transformer.on_init(df)


class _CoTransformerRunner(object):
def __init__(
self, df: DataFrame, transformer: CoTransformer, ignore_errors: List[type]
):
self.schema = df.schema
self.metadata = df.metadata
self.transformer = transformer
self.ignore_errors = tuple(ignore_errors)

def run(self, cursor: PartitionCursor, dfs: DataFrames) -> LocalDataFrame:
self.transformer._cursor = cursor # type: ignore
try:
to_local_bounded_df(self.transformer.transform(dfs))
return ArrayDataFrame([], self.transformer.output_schema)
except self.ignore_errors: # type: ignore
return ArrayDataFrame([], self.transformer.output_schema)

def on_init(self, partition_no: int, dfs: DataFrames) -> None:
s = self.transformer.partition_spec
self.transformer._cursor = s.get_cursor( # type: ignore
self.schema, partition_no
)
self.transformer.on_init(dfs)
Loading

0 comments on commit 3a8eea5

Please sign in to comment.