Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

0.8.0 #39

Merged
merged 5 commits into from
Oct 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Change Log

## 0.8.0

- patch classes if they have piping operator method
- auto register numpy ufuncs
- pump executing to 1.1.1 to fix pwwang/datar#149

## 0.7.6

- 🐛 Fix `numpy.ndarray` as data argument for verbs
Expand Down
4 changes: 2 additions & 2 deletions pipda/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from .verb import (
Verb,
VerbCall,
register_piping,
register_verb,
)
from .piping import register_piping

__version__ = "0.7.6"
__version__ = "0.8.0"
33 changes: 31 additions & 2 deletions pipda/expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@

from abc import ABC, abstractmethod
from functools import partialmethod
from typing import Any, TYPE_CHECKING
from typing import TYPE_CHECKING, Any, Callable

from .context import ContextBase

if TYPE_CHECKING:
from .operator import OperatorCall
from .function import FunctionCall
from .reference import ReferenceAttr, ReferenceItem

OPERATORS = {
Expand Down Expand Up @@ -55,7 +56,35 @@ class Expression(ABC):
"""The abstract Expression class"""

_pipda_operator = None
__array_ufunc__ = None

def __array_ufunc__(
self,
ufunc: Callable,
method: str,
*inputs: Any,
**kwargs: Any,
) -> FunctionCall:
"""Allow numpy ufunc to work on Expression objects"""

from .piping import PIPING_OPS
from .verb import VerbCall

if (
ufunc.__name__ == PIPING_OPS[VerbCall.PIPING][2]
and isinstance(inputs[1], VerbCall)
and len(inputs) == 2
and method == "__call__"
):
# We can't patch numpy.ndarray
return inputs[1]._pipda_eval(inputs[0])

from .function import Function, FunctionCall

if method == "reduce":
ufunc = ufunc.reduce

fun = Function(ufunc, None, {})
return FunctionCall(fun, *inputs, **kwargs)

def __hash__(self) -> int:
"""Make it hashable"""
Expand Down
26 changes: 19 additions & 7 deletions pipda/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,27 @@ def _pipda_eval(self, data: Any, context: ContextType = None) -> Any:
},
)

bound = func.bind_arguments(*self._pipda_args, **self._pipda_kwargs)
context = func.contexts["_"] or context
extra_contexts = func.extra_contexts
for key, val in bound.arguments.items():
ctx = extra_contexts["_"].get(key, context)
val = evaluate_expr(val, data, ctx)
bound.arguments[key] = val
extra_contexts = func.extra_contexts["_"]

return func.func(*bound.args, **bound.kwargs)
if extra_contexts:
bound = func.bind_arguments(*self._pipda_args, **self._pipda_kwargs)

for key, val in bound.arguments.items():
ctx = extra_contexts.get(key, context)
val = evaluate_expr(val, data, ctx)
bound.arguments[key] = val

return func.func(*bound.args, **bound.kwargs)

# we don't need signature if there is no extra context
return func.func(
*(evaluate_expr(arg, data, context) for arg in self._pipda_args),
**{
key: evaluate_expr(val, data, context)
for key, val in self._pipda_kwargs.items()
},
)


class Registered(ABC):
Expand Down
177 changes: 177 additions & 0 deletions pipda/piping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
import ast
import functools
from typing import Type, Dict, Callable

from .verb import VerbCall

PIPING_OPS = {
# op: (method, ast node, numpy ufunc name)
">>": ("__rrshift__", ast.RShift, "right_shift"),
"|": ("__ror__", ast.BitOr, "bitwise_or"),
"//": ("__rfloordiv__", ast.FloorDiv, "floor_divide"),
"@": ("__rmatmul__", ast.MatMult, "matmul"),
"%": ("__rmod__", ast.Mod, "remainder"),
"&": ("__rand__", ast.BitAnd, "bitwise_and"),
"^": ("__rxor__", ast.BitXor, "bitwise_xor"),
}

PATCHED_CLASSES: Dict[Type, Dict[str, Callable]] = {
# kls:
# {} # registered but not patched
# {"method": <method>, "imethod": <imethod>} # patched
}


def _patch_cls_method(kls: Type, method: str) -> None:
"""Borrowed from https://github.com/sspipe/sspipe"""
try:
original = getattr(kls, method)
except AttributeError:
return

PATCHED_CLASSES[kls][method] = original

@functools.wraps(original)
def wrapper(self, x, *args, **kwargs):
if isinstance(x, VerbCall):
return NotImplemented
return original(self, x, *args, **kwargs)

setattr(kls, method, wrapper)


def _unpatch_cls_method(kls: Type, method: str) -> None:
if method in PATCHED_CLASSES[kls]:
setattr(kls, method, PATCHED_CLASSES[kls].pop(method))


def _patch_cls_operator(kls: Type, op: str) -> None:
method = PIPING_OPS[op][0].replace("__r", "__")
imethod = PIPING_OPS[op][0].replace("__r", "__i")
_patch_cls_method(kls, method)
_patch_cls_method(kls, imethod)


def _unpatch_cls_operator(kls: Type, op: str) -> None:
method = PIPING_OPS[op][0].replace("__r", "__")
imethod = PIPING_OPS[op][0].replace("__r", "__i")
_unpatch_cls_method(kls, method)
_unpatch_cls_method(kls, imethod)


def patch_classes(*classes: Type) -> None:
"""Patch the classes in case it has piping operator defined

For example, DataFrame.__or__ has already been defined, so we need to
patch it to force it to use __ror__ of VerbCall if `|` is registered
for piping.

Args:
classes: The classes to patch
"""
for kls in classes:
if kls not in PATCHED_CLASSES:
PATCHED_CLASSES[kls] = {}

if not PATCHED_CLASSES[kls]:
_patch_cls_operator(kls, VerbCall.PIPING)


def unpatch_classes(*classes: Type) -> None:
"""Unpatch the classes

Args:
classes: The classes to unpatch
"""
for kls in classes:
if PATCHED_CLASSES[kls]:
_unpatch_cls_operator(kls, VerbCall.PIPING)
# Don't patch it in the future
del PATCHED_CLASSES[kls]


def _patch_all(op: str) -> None:
"""Patch all registered classes that has the operator defined

Args:
op: The operator used for piping
Avaiable: ">>", "|", "//", "@", "%", "&" and "^"
un: Unpatch the classes
"""
for kls in PATCHED_CLASSES:
_patch_cls_operator(kls, op)


def _unpatch_all(op: str) -> None:
"""Unpatch all registered classes

Args:
op: The operator used for piping
Avaiable: ">>", "|", "//", "@", "%", "&" and "^"
"""
for kls in PATCHED_CLASSES:
_unpatch_cls_operator(kls, op)


def _patch_default_classes() -> None:
"""Patch the default/commonly used classes"""

try:
import pandas
patch_classes(
pandas.DataFrame,
pandas.Series,
pandas.Index,
pandas.Categorical,
)
except ImportError:
pass

try: # pragma: no cover
from modin import pandas
patch_classes(
pandas.DataFrame,
pandas.Series,
pandas.Index,
pandas.Categorical,
)
except ImportError:
pass

try: # pragma: no cover
import torch
patch_classes(torch.Tensor)
except ImportError:
pass

try: # pragma: no cover
from django.db.models import query
patch_classes(query.QuerySet)
except ImportError:
pass


def register_piping(op: str) -> None:
"""Register the piping operator for verbs

Args:
op: The operator used for piping
Avaiable: ">>", "|", "//", "@", "%", "&" and "^"
"""
if op not in PIPING_OPS:
raise ValueError(f"Unsupported piping operator: {op}")

if VerbCall.PIPING:
orig_method = VerbCall.__orig_opmethod__
curr_method = PIPING_OPS[VerbCall.PIPING][0]
setattr(VerbCall, curr_method, orig_method)
_unpatch_all(VerbCall.PIPING)

VerbCall.PIPING = op
VerbCall.__orig_opmethod__ = getattr(VerbCall, PIPING_OPS[op][0])
setattr(VerbCall, PIPING_OPS[op][0], VerbCall._pipda_eval)
_patch_all(op)


register_piping(">>")
_patch_default_classes()
3 changes: 2 additions & 1 deletion pipda/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ def is_piping_verbcall(verb: str, fallback: str) -> bool:
True if it is a piping verb call, otherwise False
"""
from executing import Source
from .verb import PIPING_OPS, VerbCall
from .verb import VerbCall
from .piping import PIPING_OPS

frame = sys._getframe(2)
node = Source.executing(frame).node
Expand Down
61 changes: 20 additions & 41 deletions pipda/verb.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""Provide verb definition"""
from __future__ import annotations

import ast
from enum import Enum
from typing import (
TYPE_CHECKING,
Expand Down Expand Up @@ -29,16 +28,6 @@
from inspect import Signature
from .context import ContextType

PIPING_OPS = {
">>": ("__rrshift__", ast.RShift),
"|": ("__ror__", ast.BitOr),
"//": ("__rfloordiv__", ast.FloorDiv),
"@": ("__rmatmul__", ast.MatMult),
"%": ("__rmod__", ast.Mod),
"&": ("__rand__", ast.BitAnd),
"^": ("__rxor__", ast.BitXor),
}


class VerbCall(Expression):
"""A verb call
Expand Down Expand Up @@ -91,17 +80,28 @@ def _pipda_eval(self, data: Any, context: ContextType = None) -> Any:
self._pipda_func.extra_contexts.get(func, None)
or self._pipda_func.extra_contexts["_"]
)
bound = self._pipda_func.bind_arguments(
if extra_contexts:
bound = self._pipda_func.bind_arguments(
data,
*self._pipda_args,
**self._pipda_kwargs,
)
for key, val in bound.arguments.items():
ctx = extra_contexts.get(key, context)
val = evaluate_expr(val, data, ctx)
bound.arguments[key] = val

return func(*bound.args, **bound.kwargs)

# we don't need signature if there is no extra context
return func(
data,
*self._pipda_args,
**self._pipda_kwargs,
*(evaluate_expr(arg, data, context) for arg in self._pipda_args),
**{
key: evaluate_expr(val, data, context)
for key, val in self._pipda_kwargs.items()
},
)
for key, val in bound.arguments.items():
ctx = extra_contexts.get(key, context)
val = evaluate_expr(val, data, ctx)
bound.arguments[key] = val

return func(*bound.args, **bound.kwargs)


class Verb(Registered):
Expand Down Expand Up @@ -298,24 +298,3 @@ def register_verb(
dep=dep,
ast_fallback=ast_fallback,
)


def register_piping(op: str) -> None:
"""Register the piping operator for verbs

Args:
op: The operator used for piping
Avaiable: ">>", "|", "//", "@", "%", "&" and "^"
"""
if op not in PIPING_OPS:
raise ValueError(f"Unsupported piping operator: {op}")

if VerbCall.PIPING:
curr_method = PIPING_OPS[VerbCall.PIPING][0]
delattr(VerbCall, curr_method)

VerbCall.PIPING = op
setattr(VerbCall, PIPING_OPS[op][0], VerbCall._pipda_eval)


register_piping(">>")
Loading