diff --git a/fennel/CHANGELOG.md b/fennel/CHANGELOG.md index 8b35b1dc..ee6e595d 100644 --- a/fennel/CHANGELOG.md +++ b/fennel/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## [1.5.37] - 2024-10-10 +- Support date and datetime types in Min and Max aggregations + ## [1.5.36] - 2024-10-09 - Remove format parameter from query_offline. diff --git a/fennel/client_tests/test_date_type.py b/fennel/client_tests/test_date_type.py index babef9b7..278e1701 100644 --- a/fennel/client_tests/test_date_type.py +++ b/fennel/client_tests/test_date_type.py @@ -27,6 +27,9 @@ webhook = Webhook(name="fennel_webhook") __owner__ = "eng@fennel.ai" +default_ts = datetime.fromtimestamp(0, timezone.utc) +default_date = default_ts.date() + @source(webhook.endpoint("Transactions"), disorder="14d", cdc="append") @dataset @@ -34,6 +37,8 @@ class Transactions: user_id: int amount: Decimal[2] is_debit: bool + transaction_ts: datetime + transaction_date: date timestamp: datetime = field(timestamp=True) @@ -45,6 +50,10 @@ class DebitDataset: sum: Decimal[2] min: Decimal[2] max: Decimal[2] + earliest_transaction_ts: datetime + latest_transaction_ts: datetime + earliest_transaction_date: date + latest_transaction_date: date stddev: float median: float count: int @@ -77,6 +86,30 @@ def pipeline(cls, event: Dataset): window=Continuous("forever"), default=0.0, ), + Min( + of="transaction_ts", + into_field="earliest_transaction_ts", + window=Continuous("forever"), + default=0.0, + ), + Max( + of="transaction_ts", + into_field="latest_transaction_ts", + window=Continuous("forever"), + default=0.0, + ), + Min( + of="transaction_date", + into_field="earliest_transaction_date", + window=Continuous("forever"), + default=0.0, + ), + Max( + of="transaction_date", + into_field="latest_transaction_date", + window=Continuous("forever"), + default=0.0, + ), Stddev( of="amount", into_field="stddev", @@ -104,6 +137,18 @@ class DebitFeatures: sum: Decimal[2] = F(DebitDataset.sum, default=0.0) min: Decimal[2] = F(DebitDataset.min, default=0.0) max: Decimal[2] = F(DebitDataset.max, default=0.0) + earliest_transaction_ts: datetime = F( + DebitDataset.earliest_transaction_ts, default=default_ts + ) + latest_transaction_ts: datetime = F( + DebitDataset.latest_transaction_ts, default=default_ts + ) + earliest_transaction_date: date = F( + DebitDataset.earliest_transaction_date, default=default_date + ) + latest_transaction_date: date = F( + DebitDataset.latest_transaction_date, default=default_date + ) stddev: float = F(DebitDataset.stddev, default=0.0) median: float = F(DebitDataset.median, default=0.0) @@ -119,8 +164,12 @@ def test_date_type(client): ) assert response.status_code == requests.codes.OK, response.json() - now = datetime.now(timezone.utc) + # microseconds are dropped when df is converted to json before + # passing to backend + now = datetime.now(timezone.utc).replace(microsecond=0) now_l1d = now - timedelta(days=1) + now_date = now.date() + now_l1d_date = now_l1d.date() df = pd.DataFrame( { "user_id": [1, 1, 2, 2, 1], @@ -132,6 +181,14 @@ def test_date_type(client): 1100.10, ], "is_debit": [True, False, True, True, False], + "transaction_ts": [now, now_l1d, now_l1d, now, now], + "transaction_date": [ + now_date, + now_l1d_date, + now_l1d_date, + now_date, + now_date, + ], "timestamp": [now, now_l1d, now_l1d, now_l1d, now], } ) @@ -147,6 +204,10 @@ def test_date_type(client): DebitFeatures.count, DebitFeatures.max, DebitFeatures.min, + DebitFeatures.earliest_transaction_ts, + DebitFeatures.latest_transaction_ts, + DebitFeatures.earliest_transaction_date, + DebitFeatures.latest_transaction_date, DebitFeatures.stddev, DebitFeatures.median, DebitFeatures.sum, @@ -156,16 +217,16 @@ def test_date_type(client): { "DebitFeatures.user_id": [1, 1, 2, 2, 3], "DebitFeatures.txn_date": [ - str(now.date()), - str(now_l1d.date()), - str(now.date()), - str(now_l1d.date()), - str(now.date()), + str(now_date), + str(now_l1d_date), + str(now_date), + str(now_l1d_date), + str(now_date), ], }, ), ) - assert df.shape == (5, 7) + assert df.shape == (5, 11) assert df["DebitFeatures.count"].tolist() == [1, 0, 0, 2, 0] assert df["DebitFeatures.avg"].tolist() == [ pytest.approx(1200.1), @@ -188,6 +249,34 @@ def test_date_type(client): PythonDecimal("1400.10"), PythonDecimal("0.00"), ] + assert df["DebitFeatures.earliest_transaction_ts"].tolist() == [ + now, + default_ts, + default_ts, + now_l1d, + default_ts, + ] + assert df["DebitFeatures.latest_transaction_ts"].tolist() == [ + now, + default_ts, + default_ts, + now, + default_ts, + ] + assert df["DebitFeatures.earliest_transaction_date"].tolist() == [ + now_date, + default_date, + default_date, + now_l1d_date, + default_date, + ] + assert df["DebitFeatures.latest_transaction_date"].tolist() == [ + now_date, + default_date, + default_date, + now_date, + default_date, + ] assert df["DebitFeatures.sum"].tolist() == [ PythonDecimal("1200.10"), PythonDecimal("0.00"), diff --git a/fennel/datasets/datasets.py b/fennel/datasets/datasets.py index 7dd6c882..058ccb53 100644 --- a/fennel/datasets/datasets.py +++ b/fennel/datasets/datasets.py @@ -2773,32 +2773,36 @@ def visitAggregate(self, obj) -> DSSchema: values[agg.into_field] = List[list_type] # type: ignore elif isinstance(agg, Min): dtype = input_schema.get_type(agg.of) - if ( - get_primitive_dtype_with_optional(dtype) - not in primitive_numeric_types - ): + primtive_dtype = get_primitive_dtype_with_optional(dtype) + allowed_types = primitive_numeric_types + [ + datetime.date, + datetime.datetime, + ] + if primtive_dtype not in allowed_types: raise TypeError( - f"invalid min: type of field `{agg.of}` is not int or float" + f"invalid min: type of field `{agg.of}` is not int, float, date or datetime" ) - if get_primitive_dtype_with_optional( - dtype - ) == pd.Int64Dtype and (int(agg.default) != agg.default): + if primtive_dtype == pd.Int64Dtype and ( + int(agg.default) != agg.default + ): raise TypeError( f"invalid min: default value `{agg.default}` not of type `int`" ) values[agg.into_field] = fennel_get_optional_inner(dtype) # type: ignore elif isinstance(agg, Max): dtype = input_schema.get_type(agg.of) - if ( - get_primitive_dtype_with_optional(dtype) - not in primitive_numeric_types - ): + primtive_dtype = get_primitive_dtype_with_optional(dtype) + allowed_types = primitive_numeric_types + [ + datetime.date, + datetime.datetime, + ] + if primtive_dtype not in allowed_types: raise TypeError( - f"invalid max: type of field `{agg.of}` is not int or float" + f"invalid max: type of field `{agg.of}` is not int, float, date or datetime" ) - if get_primitive_dtype_with_optional( - dtype - ) == pd.Int64Dtype and (int(agg.default) != agg.default): + if primtive_dtype == pd.Int64Dtype and ( + int(agg.default) != agg.default + ): raise TypeError( f"invalid max: default value `{agg.default}` not of type `int`" ) diff --git a/fennel/datasets/test_schema_validator.py b/fennel/datasets/test_schema_validator.py index 240fa238..cf10497e 100644 --- a/fennel/datasets/test_schema_validator.py +++ b/fennel/datasets/test_schema_validator.py @@ -459,7 +459,8 @@ def pipeline(cls, a: Dataset): ) assert ( - str(e.value) == """invalid max: type of field `b` is not int or float""" + str(e.value) + == """invalid max: type of field `b` is not int, float, date or datetime""" ) diff --git a/fennel/testing/execute_aggregation.py b/fennel/testing/execute_aggregation.py index 4d72a420..a15dce7f 100644 --- a/fennel/testing/execute_aggregation.py +++ b/fennel/testing/execute_aggregation.py @@ -229,24 +229,67 @@ def get_val(self): return list(self.vals[: self.k]) +class MinHeapObj(object): + def __init__(self, val): + self.val = val + + def __lt__(self, other): + return self.val < other.val + + def __eq__(self, other): + return self.val == other.val + + def __str__(self): + return str(self.val) + + def __hash__(self): + return hash(self.val) + + +class MaxHeapObj(object): + def __init__(self, val): + self.val = val + + def __lt__(self, other): + return self.val > other.val + + def __eq__(self, other): + return self.val == other.val + + def __str__(self): + return str(self.val) + + def __hash__(self): + return hash(self.val) + + class Heap: def __init__(self, heap_type="min"): self.elements = [] - self.heap_type = 1 if heap_type == "min" else -1 + self.heap_type = heap_type self.del_elements = set() def __len__(self): return len(self.elements) + def _get_heap_obj(self, element): + if self.heap_type == "min": + return MinHeapObj(element) + else: + return MaxHeapObj(element) + + def _get_orig_obj(self, heap_obj): + return heap_obj.val + def push(self, element): - element = self.heap_type * element + element = self._get_heap_obj(element) if element in self.del_elements: self.del_elements.remove(element) else: heapq.heappush(self.elements, element) def remove(self, element): - element = self.heap_type * element + element = self._get_heap_obj(element) if element in self.del_elements: return self.del_elements.add(element) @@ -257,7 +300,7 @@ def top(self): heapq.heappop(self.elements) if len(self.elements) == 0: return None - return self.elements[0] * self.heap_type + return self._get_orig_obj(self.elements[0]) class MinState(AggState): diff --git a/pyproject.toml b/pyproject.toml index 93b0450e..1e6862c4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "fennel-ai" -version = "1.5.36" +version = "1.5.37" description = "The modern realtime feature engineering platform" authors = ["Fennel AI "] packages = [{ include = "fennel" }]