Skip to content

Commit

Permalink
Improve extension parsing logic (#74)
Browse files Browse the repository at this point in the history
* Improve extension parsing logic

* update sql logic

* update

* update
  • Loading branch information
goodwanghan committed Oct 7, 2020
1 parent 07f24c6 commit b51f706
Show file tree
Hide file tree
Showing 13 changed files with 242 additions and 72 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,14 @@ pre-commit install

## Update History

### 0.4.2

* Require pandas 1.0 because of parquet schema
* Improved Fugue SQL extension parsing logic
* Doc for contributors to setup their environment

### 0.4.1

* Added set operations to programming interface: `union`, `subtract`, `intersect`
* Added `distinct` to programming interface
* Ensured partitioning follows SQL convention: groups with null keys are NOT removed
Expand Down
16 changes: 12 additions & 4 deletions fugue/extensions/creator/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from fugue._utils.interfaceless import FunctionWrapper, parse_output_schema_from_comment
from triad.collections import Schema
from triad.utils.assertion import assert_or_throw
from triad.utils.convert import to_function, to_instance
from triad.utils.convert import to_function, to_instance, get_caller_global_local_vars
from triad.utils.hash import to_uuid


Expand All @@ -23,14 +23,22 @@ def deco(func: Callable) -> _FuncAsCreator:
return deco


def _to_creator(obj: Any, schema: Any = None) -> Creator:
def _to_creator(
obj: Any,
schema: Any = None,
global_vars: Optional[Dict[str, Any]] = None,
local_vars: Optional[Dict[str, Any]] = None,
) -> Creator:
global_vars, local_vars = get_caller_global_local_vars(global_vars, local_vars)
exp: Optional[Exception] = None
try:
return copy.copy(to_instance(obj, Creator))
return copy.copy(
to_instance(obj, Creator, global_vars=global_vars, local_vars=local_vars)
)
except Exception as e:
exp = e
try:
f = to_function(obj)
f = to_function(obj, global_vars=global_vars, local_vars=local_vars)
# this is for string expression of function with decorator
if isinstance(f, Creator):
return copy.copy(f)
Expand Down
15 changes: 11 additions & 4 deletions fugue/extensions/outputter/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from fugue.dataframe import DataFrames
from fugue.exceptions import FugueInterfacelessError
from triad.utils.convert import to_function, to_instance
from triad.utils.convert import get_caller_global_local_vars, to_function, to_instance
from fugue.extensions.outputter.outputter import Outputter
from fugue._utils.interfaceless import FunctionWrapper
from triad.utils.hash import to_uuid
Expand All @@ -21,14 +21,21 @@ def deco(func: Callable) -> _FuncAsOutputter:
return deco


def _to_outputter(obj: Any) -> Outputter:
def _to_outputter(
obj: Any,
global_vars: Optional[Dict[str, Any]] = None,
local_vars: Optional[Dict[str, Any]] = None,
) -> Outputter:
global_vars, local_vars = get_caller_global_local_vars(global_vars, local_vars)
exp: Optional[Exception] = None
try:
return copy.copy(to_instance(obj, Outputter))
return copy.copy(
to_instance(obj, Outputter, global_vars=global_vars, local_vars=local_vars)
)
except Exception as e:
exp = e
try:
f = to_function(obj)
f = to_function(obj, global_vars=global_vars, local_vars=local_vars)
# this is for string expression of function with decorator
if isinstance(f, Outputter):
return copy.copy(f)
Expand Down
16 changes: 12 additions & 4 deletions fugue/extensions/processor/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from fugue._utils.interfaceless import FunctionWrapper, parse_output_schema_from_comment
from triad.collections import Schema
from triad.utils.assertion import assert_or_throw
from triad.utils.convert import to_function, to_instance
from triad.utils.convert import get_caller_global_local_vars, to_function, to_instance
from triad.utils.hash import to_uuid


Expand All @@ -23,14 +23,22 @@ def deco(func: Callable) -> _FuncAsProcessor:
return deco


def _to_processor(obj: Any, schema: Any = None) -> Processor:
def _to_processor(
obj: Any,
schema: Any = None,
global_vars: Optional[Dict[str, Any]] = None,
local_vars: Optional[Dict[str, Any]] = None,
) -> Processor:
global_vars, local_vars = get_caller_global_local_vars(global_vars, local_vars)
exp: Optional[Exception] = None
try:
return copy.copy(to_instance(obj, Processor))
return copy.copy(
to_instance(obj, Processor, global_vars=global_vars, local_vars=local_vars)
)
except Exception as e:
exp = e
try:
f = to_function(obj)
f = to_function(obj, global_vars=global_vars, local_vars=local_vars)
# this is for string expression of function with decorator
if isinstance(f, Processor):
return copy.copy(f)
Expand Down
28 changes: 20 additions & 8 deletions fugue/extensions/transformer/convert.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import copy
from typing import Any, Callable, List, Optional, Union, no_type_check
from typing import Any, Callable, Dict, List, Optional, Union, no_type_check

from fugue._utils.interfaceless import FunctionWrapper, parse_output_schema_from_comment
from fugue.dataframe import DataFrame, DataFrames, LocalDataFrame
from fugue.exceptions import FugueInterfacelessError
from fugue.extensions.transformer.transformer import CoTransformer, Transformer
from fugue._utils.interfaceless import FunctionWrapper, parse_output_schema_from_comment
from triad.collections.schema import Schema
from triad.utils.assertion import assert_arg_not_none
from triad.utils.convert import to_function, to_instance
from triad.utils.convert import get_caller_global_local_vars, to_function, to_instance
from triad.utils.hash import to_uuid


Expand Down Expand Up @@ -36,19 +36,31 @@ def deco(func: Callable) -> _FuncAsCoTransformer:


def _to_transformer( # noqa: C901
obj: Any, schema: Any = None
obj: Any,
schema: Any = None,
global_vars: Optional[Dict[str, Any]] = None,
local_vars: Optional[Dict[str, Any]] = None,
) -> Union[Transformer, CoTransformer]:
global_vars, local_vars = get_caller_global_local_vars(global_vars, local_vars)
exp: Optional[Exception] = None
try:
return copy.copy(to_instance(obj, Transformer))
return copy.copy(
to_instance(
obj, Transformer, global_vars=global_vars, local_vars=local_vars
)
)
except Exception as e:
exp = e
try:
return copy.copy(to_instance(obj, CoTransformer))
return copy.copy(
to_instance(
obj, CoTransformer, global_vars=global_vars, local_vars=local_vars
)
)
except Exception as e:
exp = e
try:
f = to_function(obj)
f = to_function(obj, global_vars=global_vars, local_vars=local_vars)
# this is for string expression of function with decorator
if isinstance(f, Transformer):
return copy.copy(f)
Expand All @@ -57,7 +69,7 @@ def _to_transformer( # noqa: C901
except Exception as e:
exp = e
try:
f = to_function(obj)
f = to_function(obj, global_vars=global_vars, local_vars=local_vars)
# this is for string expression of function with decorator
if isinstance(f, CoTransformer):
return copy.copy(f)
Expand Down
41 changes: 34 additions & 7 deletions fugue/workflow/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from triad.collections.dict import ParamDict
from triad.utils.assertion import assert_or_throw


_DEFAULT_IGNORE_ERRORS: List[Any] = []

TDF = TypeVar("TDF", bound="WorkflowDataFrame")
Expand Down Expand Up @@ -132,7 +133,7 @@ def process(
Please read the :ref:`Processor Tutorial <tutorial:/tutorials/processor.ipynb>`
:param using: processor-like object
:param using: processor-like object, can't be a string expression
:param schema: |SchemaLikeObject|, defaults to None. The processor
will be able to access this value from
:meth:`~fugue.extensions.context.ExtensionContext.output_schema`
Expand All @@ -145,6 +146,9 @@ def process(
:return: result dataframe
:rtype: :class:`~.WorkflowDataFrame`
"""
assert_or_throw(
not isinstance(using, str), f"processor {using} can't be string expression"
)
if pre_partition is None:
pre_partition = self._metadata.get("pre_partition", PartitionSpec())
df = self.workflow.process(
Expand All @@ -158,14 +162,17 @@ def output(self, using: Any, params: Any = None, pre_partition: Any = None) -> N
Please read the :ref:`Outputter Tutorial <tutorial:/tutorials/outputter.ipynb>`
:param using: outputter-like object
:param using: outputter-like object, can't be a string expression
:param params: |ParamsLikeObject| to run the outputter, defaults to None.
The outputter will be able to access this value from
:meth:`~fugue.extensions.context.ExtensionContext.params`
:param pre_partition: |PartitionLikeObject|, defaults to None.
The outputter will be able to access this value from
:meth:`~fugue.extensions.context.ExtensionContext.partition_spec`
"""
assert_or_throw(
not isinstance(using, str), f"outputter {using} can't be string expression"
)
if pre_partition is None:
pre_partition = self._metadata.get("pre_partition", PartitionSpec())
self.workflow.output(
Expand Down Expand Up @@ -232,7 +239,7 @@ def transform(
Please read the
:ref:`Transformer Tutorial <tutorial:/tutorials/transformer.ipynb>`
:param using: transformer-like object
:param using: transformer-like object, can't be a string expression
:param schema: |SchemaLikeObject|, defaults to None. The transformer
will be able to access this value from
:meth:`~fugue.extensions.context.ExtensionContext.output_schema`
Expand All @@ -247,6 +254,10 @@ def transform(
:return: the transformed dataframe
:rtype: :class:`~.WorkflowDataFrame`
"""
assert_or_throw(
not isinstance(using, str),
f"transformer {using} can't be string expression",
)
if pre_partition is None:
pre_partition = self._metadata.get("pre_partition", PartitionSpec())
df = self.workflow.transform(
Expand Down Expand Up @@ -783,7 +794,7 @@ def create(
Please read the :ref:`Creator Tutorial <tutorial:/tutorials/creator.ipynb>`
:param using: creator-like object
:param using: creator-like object, can't be a string expression
:param schema: |SchemaLikeObject|, defaults to None. The creator
will be able to access this value from
:meth:`~fugue.extensions.context.ExtensionContext.output_schema`
Expand All @@ -795,6 +806,10 @@ def create(
:meth:`~fugue.extensions.context.ExtensionContext.partition_spec`
:return: result dataframe
"""
assert_or_throw(
not isinstance(using, str),
f"creator {using} can't be string expression",
)
task = Create(creator=using, schema=schema, params=params)
return self.add(task)

Expand All @@ -811,7 +826,7 @@ def process(
Please read the :ref:`Processor Tutorial <tutorial:/tutorials/processor.ipynb>`
:param dfs: |DataFramesLikeObject|
:param using: processor-like object
:param using: processor-like object, can't be a string expression
:param schema: |SchemaLikeObject|, defaults to None. The processor
will be able to access this value from
:meth:`~fugue.extensions.context.ExtensionContext.output_schema`
Expand All @@ -824,6 +839,10 @@ def process(
:return: result dataframe
"""
assert_or_throw(
not isinstance(using, str),
f"processor {using} can't be string expression",
)
dfs = self._to_dfs(*dfs)
task = Process(
len(dfs),
Expand All @@ -845,14 +864,18 @@ def output(
Please read the :ref:`Outputter Tutorial <tutorial:/tutorials/outputter.ipynb>`
:param using: outputter-like object
:param using: outputter-like object, can't be a string expression
:param params: |ParamsLikeObject| to run the outputter, defaults to None.
The outputter will be able to access this value from
:meth:`~fugue.extensions.context.ExtensionContext.params`
:param pre_partition: |PartitionLikeObject|, defaults to None.
The outputter will be able to access this value from
:meth:`~fugue.extensions.context.ExtensionContext.partition_spec`
"""
assert_or_throw(
not isinstance(using, str),
f"outputter {using} can't be string expression",
)
dfs = self._to_dfs(*dfs)
task = Output(
len(dfs),
Expand Down Expand Up @@ -1080,7 +1103,7 @@ def transform(
:ref:`Transformer Tutorial <tutorial:/tutorials/transformer.ipynb>`
:param dfs: |DataFramesLikeObject|
:param using: transformer-like object
:param using: transformer-like object, can't be a string expression
:param schema: |SchemaLikeObject|, defaults to None. The transformer
will be able to access this value from
:meth:`~fugue.extensions.context.ExtensionContext.output_schema`
Expand All @@ -1094,6 +1117,10 @@ def transform(
defaults to empty list
:return: the transformed dataframe
"""
assert_or_throw(
not isinstance(using, str),
f"transformer {using} can't be string expression",
)
assert_or_throw(
len(dfs) == 1,
NotImplementedError("transform supports only single dataframe"),
Expand Down
Loading

0 comments on commit b51f706

Please sign in to comment.