Skip to content

Commit

Permalink
Python: Add And, Or, Not, AlwaysTrue, AlwaysFalse expressions (#4466)
Browse files Browse the repository at this point in the history
  • Loading branch information
CircArgs authored Apr 24, 2022
1 parent b5f367b commit 449a743
Show file tree
Hide file tree
Showing 2 changed files with 266 additions and 0 deletions.
140 changes: 140 additions & 0 deletions python/src/iceberg/expressions/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
# under the License.
from abc import ABC, abstractmethod
from enum import Enum, auto
from functools import reduce
from typing import Any, Generic, TypeVar

from iceberg.files import StructProtocol
from iceberg.types import Singleton

T = TypeVar("T")

Expand Down Expand Up @@ -126,6 +128,144 @@ def __ge__(self, other):
return self.value >= other.value


class BooleanExpression(ABC):
"""base class for all boolean expressions"""

@abstractmethod
def __invert__(self) -> "BooleanExpression":
...


class And(BooleanExpression):
"""AND operation expression - logical conjunction"""

def __new__(cls, left: BooleanExpression, right: BooleanExpression, *rest: BooleanExpression):
if rest:
return reduce(And, (left, right, *rest))
if left is AlwaysFalse() or right is AlwaysFalse():
return AlwaysFalse()
elif left is AlwaysTrue():
return right
elif right is AlwaysTrue():
return left
self = super().__new__(cls)
self._left = left # type: ignore
self._right = right # type: ignore
return self

@property
def left(self) -> BooleanExpression:
return self._left # type: ignore

@property
def right(self) -> BooleanExpression:
return self._right # type: ignore

def __eq__(self, other) -> bool:
return id(self) == id(other) or (isinstance(other, And) and self.left == other.left and self.right == other.right)

def __invert__(self) -> "Or":
return Or(~self.left, ~self.right)

def __repr__(self) -> str:
return f"And({repr(self.left)}, {repr(self.right)})"

def __str__(self) -> str:
return f"({self.left} and {self.right})"


class Or(BooleanExpression):
"""OR operation expression - logical disjunction"""

def __new__(cls, left: BooleanExpression, right: BooleanExpression, *rest: BooleanExpression):
if rest:
return reduce(Or, (left, right, *rest))
if left is AlwaysTrue() or right is AlwaysTrue():
return AlwaysTrue()
elif left is AlwaysFalse():
return right
elif right is AlwaysFalse():
return left
self = super().__new__(cls)
self._left = left # type: ignore
self._right = right # type: ignore
return self

@property
def left(self) -> BooleanExpression:
return self._left # type: ignore

@property
def right(self) -> BooleanExpression:
return self._right # type: ignore

def __eq__(self, other) -> bool:
return id(self) == id(other) or (isinstance(other, Or) and self.left == other.left and self.right == other.right)

def __invert__(self) -> "And":
return And(~self.left, ~self.right)

def __repr__(self) -> str:
return f"Or({repr(self.left)}, {repr(self.right)})"

def __str__(self) -> str:
return f"({self.left} or {self.right})"


class Not(BooleanExpression):
"""NOT operation expression - logical negation"""

def __new__(cls, child: BooleanExpression):
if child is AlwaysTrue():
return AlwaysFalse()
elif child is AlwaysFalse():
return AlwaysTrue()
elif isinstance(child, Not):
return child.child
return super().__new__(cls)

def __init__(self, child):
self.child = child

def __eq__(self, other) -> bool:
return id(self) == id(other) or (isinstance(other, Not) and self.child == other.child)

def __invert__(self) -> BooleanExpression:
return self.child

def __repr__(self) -> str:
return f"Not({repr(self.child)})"

def __str__(self) -> str:
return f"(not {self.child})"


class AlwaysTrue(BooleanExpression, Singleton):
"""TRUE expression"""

def __invert__(self) -> "AlwaysFalse":
return AlwaysFalse()

def __repr__(self) -> str:
return "AlwaysTrue()"

def __str__(self) -> str:
return "true"


class AlwaysFalse(BooleanExpression, Singleton):
"""FALSE expression"""

def __invert__(self) -> "AlwaysTrue":
return AlwaysTrue()

def __repr__(self) -> str:
return "AlwaysTrue()"

def __str__(self) -> str:
return "false"


class Accessor:
"""An accessor for a specific position in a container that implements the StructProtocol"""

Expand Down
126 changes: 126 additions & 0 deletions python/tests/expressions/test_expressions_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import pytest

from iceberg.expressions import base
from iceberg.types import Singleton


@pytest.mark.parametrize(
Expand Down Expand Up @@ -61,6 +62,131 @@ def test_raise_on_no_negation_for_operation(operation):
assert str(exc_info.value) == f"No negation defined for operation {operation}"


class TestExpressionA(base.BooleanExpression, Singleton):
def __invert__(self):
return TestExpressionB()

def __repr__(self):
return "TestExpressionA()"

def __str__(self):
return "testexpra"


class TestExpressionB(base.BooleanExpression, Singleton):
def __invert__(self):
return TestExpressionA()

def __repr__(self):
return "TestExpressionB()"

def __str__(self):
return "testexprb"


@pytest.mark.parametrize(
"op, rep",
[
(
base.And(TestExpressionA(), TestExpressionB()),
"And(TestExpressionA(), TestExpressionB())",
),
(
base.Or(TestExpressionA(), TestExpressionB()),
"Or(TestExpressionA(), TestExpressionB())",
),
(base.Not(TestExpressionA()), "Not(TestExpressionA())"),
],
)
def test_reprs(op, rep):
assert repr(op) == rep


@pytest.mark.parametrize(
"op, string",
[
(base.And(TestExpressionA(), TestExpressionB()), "(testexpra and testexprb)"),
(base.Or(TestExpressionA(), TestExpressionB()), "(testexpra or testexprb)"),
(base.Not(TestExpressionA()), "(not testexpra)"),
],
)
def test_strs(op, string):
assert str(op) == string


@pytest.mark.parametrize(
"input, testexpra, testexprb",
[
(
base.And(TestExpressionA(), TestExpressionB()),
base.And(TestExpressionA(), TestExpressionB()),
base.Or(TestExpressionA(), TestExpressionB()),
),
(
base.Or(TestExpressionA(), TestExpressionB()),
base.Or(TestExpressionA(), TestExpressionB()),
base.And(TestExpressionA(), TestExpressionB()),
),
(base.Not(TestExpressionA()), base.Not(TestExpressionA()), TestExpressionB()),
(TestExpressionA(), TestExpressionA(), TestExpressionB()),
(TestExpressionB(), TestExpressionB(), TestExpressionA()),
],
)
def test_eq(input, testexpra, testexprb):
assert input == testexpra and input != testexprb


@pytest.mark.parametrize(
"input, exp",
[
(
base.And(TestExpressionA(), TestExpressionB()),
base.Or(TestExpressionB(), TestExpressionA()),
),
(
base.Or(TestExpressionA(), TestExpressionB()),
base.And(TestExpressionB(), TestExpressionA()),
),
(base.Not(TestExpressionA()), TestExpressionA()),
(TestExpressionA(), TestExpressionB()),
],
)
def test_negate(input, exp):
assert ~input == exp


@pytest.mark.parametrize(
"input, exp",
[
(
base.And(TestExpressionA(), TestExpressionB(), TestExpressionA()),
base.And(base.And(TestExpressionA(), TestExpressionB()), TestExpressionA()),
),
(
base.Or(TestExpressionA(), TestExpressionB(), TestExpressionA()),
base.Or(base.Or(TestExpressionA(), TestExpressionB()), TestExpressionA()),
),
(base.Not(base.Not(TestExpressionA())), TestExpressionA()),
],
)
def test_reduce(input, exp):
assert input == exp


@pytest.mark.parametrize(
"input, exp",
[
(base.And(base.AlwaysTrue(), TestExpressionB()), TestExpressionB()),
(base.And(base.AlwaysFalse(), TestExpressionB()), base.AlwaysFalse()),
(base.Or(base.AlwaysTrue(), TestExpressionB()), base.AlwaysTrue()),
(base.Or(base.AlwaysFalse(), TestExpressionB()), TestExpressionB()),
(base.Not(base.Not(TestExpressionA())), TestExpressionA()),
],
)
def test_base_AlwaysTrue_base_AlwaysFalse(input, exp):
assert input == exp


def test_accessor_base_class(foo_struct):
"""Test retrieving a value at a position of a container using an accessor"""

Expand Down

0 comments on commit 449a743

Please sign in to comment.