Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support date and datetime types in Min and Max aggregations #560

Merged
merged 5 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions fennel/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## [1.5.37] - 2024-10-10
- Support date and datetime types in Min and Max aggregations

## [1.5.36] - 2024-10-09
- Remove format parameter from query_offline.

Expand Down
103 changes: 96 additions & 7 deletions fennel/client_tests/test_date_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,18 @@
webhook = Webhook(name="fennel_webhook")
__owner__ = "eng@fennel.ai"

default_ts = datetime.fromtimestamp(0, timezone.utc)
default_date = default_ts.date()


@source(webhook.endpoint("Transactions"), disorder="14d", cdc="append")
@dataset
class Transactions:
user_id: int
amount: Decimal[2]
is_debit: bool
transaction_ts: datetime
transaction_date: date
timestamp: datetime = field(timestamp=True)


Expand All @@ -45,6 +50,10 @@ class DebitDataset:
sum: Decimal[2]
min: Decimal[2]
max: Decimal[2]
earliest_transaction_ts: datetime
latest_transaction_ts: datetime
earliest_transaction_date: date
latest_transaction_date: date
stddev: float
median: float
count: int
Expand Down Expand Up @@ -77,6 +86,30 @@ def pipeline(cls, event: Dataset):
window=Continuous("forever"),
default=0.0,
),
Min(
of="transaction_ts",
into_field="earliest_transaction_ts",
window=Continuous("forever"),
default=0.0,
),
Max(
of="transaction_ts",
into_field="latest_transaction_ts",
window=Continuous("forever"),
default=0.0,
),
Min(
of="transaction_date",
into_field="earliest_transaction_date",
window=Continuous("forever"),
default=0.0,
),
Max(
of="transaction_date",
into_field="latest_transaction_date",
window=Continuous("forever"),
default=0.0,
),
Stddev(
of="amount",
into_field="stddev",
Expand Down Expand Up @@ -104,6 +137,18 @@ class DebitFeatures:
sum: Decimal[2] = F(DebitDataset.sum, default=0.0)
min: Decimal[2] = F(DebitDataset.min, default=0.0)
max: Decimal[2] = F(DebitDataset.max, default=0.0)
earliest_transaction_ts: datetime = F(
DebitDataset.earliest_transaction_ts, default=default_ts
)
latest_transaction_ts: datetime = F(
DebitDataset.latest_transaction_ts, default=default_ts
)
earliest_transaction_date: date = F(
DebitDataset.earliest_transaction_date, default=default_date
)
latest_transaction_date: date = F(
DebitDataset.latest_transaction_date, default=default_date
)
stddev: float = F(DebitDataset.stddev, default=0.0)
median: float = F(DebitDataset.median, default=0.0)

Expand All @@ -119,8 +164,12 @@ def test_date_type(client):
)
assert response.status_code == requests.codes.OK, response.json()

now = datetime.now(timezone.utc)
# microseconds are dropped when df is converted to json before
# passing to backend
now = datetime.now(timezone.utc).replace(microsecond=0)
now_l1d = now - timedelta(days=1)
now_date = now.date()
now_l1d_date = now_l1d.date()
df = pd.DataFrame(
{
"user_id": [1, 1, 2, 2, 1],
Expand All @@ -132,6 +181,14 @@ def test_date_type(client):
1100.10,
],
"is_debit": [True, False, True, True, False],
"transaction_ts": [now, now_l1d, now_l1d, now, now],
"transaction_date": [
now_date,
now_l1d_date,
now_l1d_date,
now_date,
now_date,
],
"timestamp": [now, now_l1d, now_l1d, now_l1d, now],
}
)
Expand All @@ -147,6 +204,10 @@ def test_date_type(client):
DebitFeatures.count,
DebitFeatures.max,
DebitFeatures.min,
DebitFeatures.earliest_transaction_ts,
DebitFeatures.latest_transaction_ts,
DebitFeatures.earliest_transaction_date,
DebitFeatures.latest_transaction_date,
DebitFeatures.stddev,
DebitFeatures.median,
DebitFeatures.sum,
Expand All @@ -156,16 +217,16 @@ def test_date_type(client):
{
"DebitFeatures.user_id": [1, 1, 2, 2, 3],
"DebitFeatures.txn_date": [
str(now.date()),
str(now_l1d.date()),
str(now.date()),
str(now_l1d.date()),
str(now.date()),
str(now_date),
str(now_l1d_date),
str(now_date),
str(now_l1d_date),
str(now_date),
],
},
),
)
assert df.shape == (5, 7)
assert df.shape == (5, 11)
assert df["DebitFeatures.count"].tolist() == [1, 0, 0, 2, 0]
assert df["DebitFeatures.avg"].tolist() == [
pytest.approx(1200.1),
Expand All @@ -188,6 +249,34 @@ def test_date_type(client):
PythonDecimal("1400.10"),
PythonDecimal("0.00"),
]
assert df["DebitFeatures.earliest_transaction_ts"].tolist() == [
now,
default_ts,
default_ts,
now_l1d,
default_ts,
]
assert df["DebitFeatures.latest_transaction_ts"].tolist() == [
now,
default_ts,
default_ts,
now,
default_ts,
]
assert df["DebitFeatures.earliest_transaction_date"].tolist() == [
now_date,
default_date,
default_date,
now_l1d_date,
default_date,
]
assert df["DebitFeatures.latest_transaction_date"].tolist() == [
now_date,
default_date,
default_date,
now_date,
default_date,
]
assert df["DebitFeatures.sum"].tolist() == [
PythonDecimal("1200.10"),
PythonDecimal("0.00"),
Expand Down
36 changes: 20 additions & 16 deletions fennel/datasets/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -2773,32 +2773,36 @@ def visitAggregate(self, obj) -> DSSchema:
values[agg.into_field] = List[list_type] # type: ignore
elif isinstance(agg, Min):
dtype = input_schema.get_type(agg.of)
if (
get_primitive_dtype_with_optional(dtype)
not in primitive_numeric_types
):
primtive_dtype = get_primitive_dtype_with_optional(dtype)
allowed_types = primitive_numeric_types + [
datetime.date,
datetime.datetime,
]
if primtive_dtype not in allowed_types:
raise TypeError(
f"invalid min: type of field `{agg.of}` is not int or float"
f"invalid min: type of field `{agg.of}` is not int, float, date or datetime"
)
if get_primitive_dtype_with_optional(
dtype
) == pd.Int64Dtype and (int(agg.default) != agg.default):
if primtive_dtype == pd.Int64Dtype and (
int(agg.default) != agg.default
):
raise TypeError(
f"invalid min: default value `{agg.default}` not of type `int`"
)
values[agg.into_field] = fennel_get_optional_inner(dtype) # type: ignore
elif isinstance(agg, Max):
dtype = input_schema.get_type(agg.of)
if (
get_primitive_dtype_with_optional(dtype)
not in primitive_numeric_types
):
primtive_dtype = get_primitive_dtype_with_optional(dtype)
allowed_types = primitive_numeric_types + [
datetime.date,
datetime.datetime,
]
if primtive_dtype not in allowed_types:
raise TypeError(
f"invalid max: type of field `{agg.of}` is not int or float"
f"invalid max: type of field `{agg.of}` is not int, float, date or datetime"
)
if get_primitive_dtype_with_optional(
dtype
) == pd.Int64Dtype and (int(agg.default) != agg.default):
if primtive_dtype == pd.Int64Dtype and (
int(agg.default) != agg.default
):
raise TypeError(
f"invalid max: default value `{agg.default}` not of type `int`"
)
Expand Down
3 changes: 2 additions & 1 deletion fennel/datasets/test_schema_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,8 @@ def pipeline(cls, a: Dataset):
)

assert (
str(e.value) == """invalid max: type of field `b` is not int or float"""
str(e.value)
== """invalid max: type of field `b` is not int, float, date or datetime"""
)


Expand Down
51 changes: 47 additions & 4 deletions fennel/testing/execute_aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,24 +229,67 @@ def get_val(self):
return list(self.vals[: self.k])


class MinHeapObj(object):
def __init__(self, val):
self.val = val

def __lt__(self, other):
return self.val < other.val

def __eq__(self, other):
return self.val == other.val

def __str__(self):
return str(self.val)

def __hash__(self):
return hash(self.val)


class MaxHeapObj(object):
def __init__(self, val):
self.val = val

def __lt__(self, other):
return self.val > other.val

def __eq__(self, other):
return self.val == other.val

def __str__(self):
return str(self.val)

def __hash__(self):
return hash(self.val)


class Heap:
def __init__(self, heap_type="min"):
self.elements = []
self.heap_type = 1 if heap_type == "min" else -1
self.heap_type = heap_type
self.del_elements = set()

def __len__(self):
return len(self.elements)

def _get_heap_obj(self, element):
if self.heap_type == "min":
return MinHeapObj(element)
else:
return MaxHeapObj(element)

def _get_orig_obj(self, heap_obj):
return heap_obj.val

def push(self, element):
element = self.heap_type * element
element = self._get_heap_obj(element)
if element in self.del_elements:
self.del_elements.remove(element)
else:
heapq.heappush(self.elements, element)

def remove(self, element):
element = self.heap_type * element
element = self._get_heap_obj(element)
if element in self.del_elements:
return
self.del_elements.add(element)
Expand All @@ -257,7 +300,7 @@ def top(self):
heapq.heappop(self.elements)
if len(self.elements) == 0:
return None
return self.elements[0] * self.heap_type
return self._get_orig_obj(self.elements[0])


class MinState(AggState):
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "fennel-ai"
version = "1.5.36"
version = "1.5.37"
description = "The modern realtime feature engineering platform"
authors = ["Fennel AI <developers@fennel.ai>"]
packages = [{ include = "fennel" }]
Expand Down
Loading