From b3f8283d43d91a9f2deeaab1728569044dcc4099 Mon Sep 17 00:00:00 2001 From: Satwant Rana Date: Fri, 13 Sep 2024 21:55:28 +0530 Subject: [PATCH 1/5] Support date and datetime types in Min and Max aggregations --- fennel/client_tests/test_date_type.py | 89 ++++++++++++++++++++++-- fennel/datasets/datasets.py | 30 ++++---- fennel/datasets/test_schema_validator.py | 2 +- fennel/testing/execute_aggregation.py | 33 +++++++-- 4 files changed, 127 insertions(+), 27 deletions(-) diff --git a/fennel/client_tests/test_date_type.py b/fennel/client_tests/test_date_type.py index babef9b79..a400637b9 100644 --- a/fennel/client_tests/test_date_type.py +++ b/fennel/client_tests/test_date_type.py @@ -27,6 +27,9 @@ webhook = Webhook(name="fennel_webhook") __owner__ = "eng@fennel.ai" +default_ts = datetime.fromtimestamp(0, timezone.utc) +default_date = default_ts.date() + @source(webhook.endpoint("Transactions"), disorder="14d", cdc="append") @dataset @@ -34,6 +37,8 @@ class Transactions: user_id: int amount: Decimal[2] is_debit: bool + transaction_ts: datetime + transaction_date: date timestamp: datetime = field(timestamp=True) @@ -45,6 +50,10 @@ class DebitDataset: sum: Decimal[2] min: Decimal[2] max: Decimal[2] + earliest_transaction_ts: datetime + latest_transaction_ts: datetime + earliest_transaction_date: date + latest_transaction_date: date stddev: float median: float count: int @@ -77,6 +86,30 @@ def pipeline(cls, event: Dataset): window=Continuous("forever"), default=0.0, ), + Min( + of="transaction_ts", + into_field="earliest_transaction_ts", + window=Continuous("forever"), + default=0.0, + ), + Max( + of="transaction_ts", + into_field="latest_transaction_ts", + window=Continuous("forever"), + default=0.0, + ), + Min( + of="transaction_date", + into_field="earliest_transaction_date", + window=Continuous("forever"), + default=0.0, + ), + Max( + of="transaction_date", + into_field="latest_transaction_date", + window=Continuous("forever"), + default=0.0, + ), Stddev( of="amount", into_field="stddev", @@ -104,6 +137,14 @@ class DebitFeatures: sum: Decimal[2] = F(DebitDataset.sum, default=0.0) min: Decimal[2] = F(DebitDataset.min, default=0.0) max: Decimal[2] = F(DebitDataset.max, default=0.0) + earliest_transaction_ts: datetime = F( + DebitDataset.earliest_transaction_ts, default=default_ts) + latest_transaction_ts: datetime = F( + DebitDataset.latest_transaction_ts, default=default_ts) + earliest_transaction_date: date = F( + DebitDataset.earliest_transaction_date, default=default_date) + latest_transaction_date: date = F( + DebitDataset.latest_transaction_date, default=default_date) stddev: float = F(DebitDataset.stddev, default=0.0) median: float = F(DebitDataset.median, default=0.0) @@ -121,6 +162,8 @@ def test_date_type(client): now = datetime.now(timezone.utc) now_l1d = now - timedelta(days=1) + now_date = now.date() + now_l1d_date = now_l1d.date() df = pd.DataFrame( { "user_id": [1, 1, 2, 2, 1], @@ -132,6 +175,8 @@ def test_date_type(client): 1100.10, ], "is_debit": [True, False, True, True, False], + "transaction_ts": [now, now_l1d, now_l1d, now, now], + "transaction_date": [now_date, now_l1d_date, now_l1d_date, now_date, now_date], "timestamp": [now, now_l1d, now_l1d, now_l1d, now], } ) @@ -147,6 +192,10 @@ def test_date_type(client): DebitFeatures.count, DebitFeatures.max, DebitFeatures.min, + DebitFeatures.earliest_transaction_ts, + DebitFeatures.latest_transaction_ts, + DebitFeatures.earliest_transaction_date, + DebitFeatures.latest_transaction_date, DebitFeatures.stddev, DebitFeatures.median, DebitFeatures.sum, @@ -156,16 +205,16 @@ def test_date_type(client): { "DebitFeatures.user_id": [1, 1, 2, 2, 3], "DebitFeatures.txn_date": [ - str(now.date()), - str(now_l1d.date()), - str(now.date()), - str(now_l1d.date()), - str(now.date()), + str(now_date), + str(now_l1d_date), + str(now_date), + str(now_l1d_date), + str(now_date), ], }, ), ) - assert df.shape == (5, 7) + assert df.shape == (5, 11) assert df["DebitFeatures.count"].tolist() == [1, 0, 0, 2, 0] assert df["DebitFeatures.avg"].tolist() == [ pytest.approx(1200.1), @@ -188,6 +237,34 @@ def test_date_type(client): PythonDecimal("1400.10"), PythonDecimal("0.00"), ] + assert df["DebitFeatures.earliest_transaction_ts"].tolist() == [ + now, + default_ts, + default_ts, + now_l1d, + default_ts, + ] + assert df["DebitFeatures.latest_transaction_ts"].tolist() == [ + now, + default_ts, + default_ts, + now, + default_ts, + ] + assert df["DebitFeatures.earliest_transaction_date"].tolist() == [ + now_date, + default_date, + default_date, + now_l1d_date, + default_date, + ] + assert df["DebitFeatures.latest_transaction_date"].tolist() == [ + now_date, + default_date, + default_date, + now_date, + default_date, + ] assert df["DebitFeatures.sum"].tolist() == [ PythonDecimal("1200.10"), PythonDecimal("0.00"), diff --git a/fennel/datasets/datasets.py b/fennel/datasets/datasets.py index 7dd6c882f..5e31a3b4f 100644 --- a/fennel/datasets/datasets.py +++ b/fennel/datasets/datasets.py @@ -2773,32 +2773,30 @@ def visitAggregate(self, obj) -> DSSchema: values[agg.into_field] = List[list_type] # type: ignore elif isinstance(agg, Min): dtype = input_schema.get_type(agg.of) - if ( - get_primitive_dtype_with_optional(dtype) - not in primitive_numeric_types - ): + primtive_dtype = get_primitive_dtype_with_optional(dtype) + allowed_types = primitive_numeric_types + [datetime.date, datetime.datetime] + if primtive_dtype not in allowed_types: raise TypeError( - f"invalid min: type of field `{agg.of}` is not int or float" + f"invalid min: type of field `{agg.of}` is not int, float, date or datetime" ) - if get_primitive_dtype_with_optional( - dtype - ) == pd.Int64Dtype and (int(agg.default) != agg.default): + if primtive_dtype == pd.Int64Dtype and ( + int(agg.default) != agg.default + ): raise TypeError( f"invalid min: default value `{agg.default}` not of type `int`" ) values[agg.into_field] = fennel_get_optional_inner(dtype) # type: ignore elif isinstance(agg, Max): dtype = input_schema.get_type(agg.of) - if ( - get_primitive_dtype_with_optional(dtype) - not in primitive_numeric_types - ): + primtive_dtype = get_primitive_dtype_with_optional(dtype) + allowed_types = primitive_numeric_types + [datetime.date, datetime.datetime] + if primtive_dtype not in allowed_types: raise TypeError( - f"invalid max: type of field `{agg.of}` is not int or float" + f"invalid max: type of field `{agg.of}` is not int, float, date or datetime" ) - if get_primitive_dtype_with_optional( - dtype - ) == pd.Int64Dtype and (int(agg.default) != agg.default): + if primtive_dtype == pd.Int64Dtype and ( + int(agg.default) != agg.default + ): raise TypeError( f"invalid max: default value `{agg.default}` not of type `int`" ) diff --git a/fennel/datasets/test_schema_validator.py b/fennel/datasets/test_schema_validator.py index 240fa238e..355d06b82 100644 --- a/fennel/datasets/test_schema_validator.py +++ b/fennel/datasets/test_schema_validator.py @@ -459,7 +459,7 @@ def pipeline(cls, a: Dataset): ) assert ( - str(e.value) == """invalid max: type of field `b` is not int or float""" + str(e.value) == """invalid max: type of field `b` is not int, float, date or datetime""" ) diff --git a/fennel/testing/execute_aggregation.py b/fennel/testing/execute_aggregation.py index 4d72a4209..77bda2ed7 100644 --- a/fennel/testing/execute_aggregation.py +++ b/fennel/testing/execute_aggregation.py @@ -229,24 +229,49 @@ def get_val(self): return list(self.vals[: self.k]) +class MinHeapObj(object): + def __init__(self, val): self.val = val + def __lt__(self, other): return self.val < other.val + def __eq__(self, other): return self.val == other.val + def __str__(self): return str(self.val) + def __hash__(self): return hash(self.val) + + +class MaxHeapObj(object): + def __init__(self, val): self.val = val + def __lt__(self, other): return self.val > other.val + def __eq__(self, other): return self.val == other.val + def __str__(self): return str(self.val) + def __hash__(self): return hash(self.val) + + class Heap: def __init__(self, heap_type="min"): self.elements = [] - self.heap_type = 1 if heap_type == "min" else -1 + self.heap_type = heap_type self.del_elements = set() def __len__(self): return len(self.elements) + def _get_heap_obj(self, element): + if self.heap_type == "min": + return MinHeapObj(element) + else: + return MaxHeapObj(element) + + def _get_orig_obj(self, heap_obj): + return heap_obj.val + def push(self, element): - element = self.heap_type * element + element = self._get_heap_obj(element) if element in self.del_elements: self.del_elements.remove(element) else: heapq.heappush(self.elements, element) def remove(self, element): - element = self.heap_type * element + element = self._get_heap_obj(element) if element in self.del_elements: return self.del_elements.add(element) @@ -257,7 +282,7 @@ def top(self): heapq.heappop(self.elements) if len(self.elements) == 0: return None - return self.elements[0] * self.heap_type + return self._get_orig_obj(self.elements[0]) class MinState(AggState): From 9bc98f59839c64d5539d56138883e8245729516c Mon Sep 17 00:00:00 2001 From: Satwant Rana Date: Wed, 18 Sep 2024 11:08:36 +0530 Subject: [PATCH 2/5] Fix an integration bug --- fennel/client_tests/test_date_type.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/fennel/client_tests/test_date_type.py b/fennel/client_tests/test_date_type.py index a400637b9..0e2fd5b2c 100644 --- a/fennel/client_tests/test_date_type.py +++ b/fennel/client_tests/test_date_type.py @@ -160,7 +160,9 @@ def test_date_type(client): ) assert response.status_code == requests.codes.OK, response.json() - now = datetime.now(timezone.utc) + # microseconds are dropped when df is converted to json before + # passing to backend + now = datetime.now(timezone.utc).replace(microsecond=0) now_l1d = now - timedelta(days=1) now_date = now.date() now_l1d_date = now_l1d.date() From 5484de225a3a5fcf0377d5109fbe3ade96bb2c07 Mon Sep 17 00:00:00 2001 From: Satwant Rana Date: Wed, 9 Oct 2024 23:58:04 +0530 Subject: [PATCH 3/5] Address comments --- fennel/CHANGELOG.md | 3 +++ pyproject.toml | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/fennel/CHANGELOG.md b/fennel/CHANGELOG.md index 8b35b1dc7..ee6e595db 100644 --- a/fennel/CHANGELOG.md +++ b/fennel/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## [1.5.37] - 2024-10-10 +- Support date and datetime types in Min and Max aggregations + ## [1.5.36] - 2024-10-09 - Remove format parameter from query_offline. diff --git a/pyproject.toml b/pyproject.toml index 93b0450e2..1e6862c43 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "fennel-ai" -version = "1.5.36" +version = "1.5.37" description = "The modern realtime feature engineering platform" authors = ["Fennel AI "] packages = [{ include = "fennel" }] From f0e64354fc81f89e1bad618597983010fef7df17 Mon Sep 17 00:00:00 2001 From: Satwant Rana Date: Thu, 10 Oct 2024 00:13:32 +0530 Subject: [PATCH 4/5] Fix linter errors --- fennel/testing/execute_aggregation.py | 38 ++++++++++++++++++++------- 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/fennel/testing/execute_aggregation.py b/fennel/testing/execute_aggregation.py index 77bda2ed7..a15dce7f8 100644 --- a/fennel/testing/execute_aggregation.py +++ b/fennel/testing/execute_aggregation.py @@ -230,19 +230,37 @@ def get_val(self): class MinHeapObj(object): - def __init__(self, val): self.val = val - def __lt__(self, other): return self.val < other.val - def __eq__(self, other): return self.val == other.val - def __str__(self): return str(self.val) - def __hash__(self): return hash(self.val) + def __init__(self, val): + self.val = val + + def __lt__(self, other): + return self.val < other.val + + def __eq__(self, other): + return self.val == other.val + + def __str__(self): + return str(self.val) + + def __hash__(self): + return hash(self.val) class MaxHeapObj(object): - def __init__(self, val): self.val = val - def __lt__(self, other): return self.val > other.val - def __eq__(self, other): return self.val == other.val - def __str__(self): return str(self.val) - def __hash__(self): return hash(self.val) + def __init__(self, val): + self.val = val + + def __lt__(self, other): + return self.val > other.val + + def __eq__(self, other): + return self.val == other.val + + def __str__(self): + return str(self.val) + + def __hash__(self): + return hash(self.val) class Heap: From fdb4d0f91e63d874174ca8519b22ba46057b1a95 Mon Sep 17 00:00:00 2001 From: Satwant Rana Date: Thu, 10 Oct 2024 00:22:22 +0530 Subject: [PATCH 5/5] Fix more linter errors --- fennel/client_tests/test_date_type.py | 20 +++++++++++++++----- fennel/datasets/datasets.py | 10 ++++++++-- fennel/datasets/test_schema_validator.py | 3 ++- 3 files changed, 25 insertions(+), 8 deletions(-) diff --git a/fennel/client_tests/test_date_type.py b/fennel/client_tests/test_date_type.py index 0e2fd5b2c..278e17012 100644 --- a/fennel/client_tests/test_date_type.py +++ b/fennel/client_tests/test_date_type.py @@ -138,13 +138,17 @@ class DebitFeatures: min: Decimal[2] = F(DebitDataset.min, default=0.0) max: Decimal[2] = F(DebitDataset.max, default=0.0) earliest_transaction_ts: datetime = F( - DebitDataset.earliest_transaction_ts, default=default_ts) + DebitDataset.earliest_transaction_ts, default=default_ts + ) latest_transaction_ts: datetime = F( - DebitDataset.latest_transaction_ts, default=default_ts) + DebitDataset.latest_transaction_ts, default=default_ts + ) earliest_transaction_date: date = F( - DebitDataset.earliest_transaction_date, default=default_date) + DebitDataset.earliest_transaction_date, default=default_date + ) latest_transaction_date: date = F( - DebitDataset.latest_transaction_date, default=default_date) + DebitDataset.latest_transaction_date, default=default_date + ) stddev: float = F(DebitDataset.stddev, default=0.0) median: float = F(DebitDataset.median, default=0.0) @@ -178,7 +182,13 @@ def test_date_type(client): ], "is_debit": [True, False, True, True, False], "transaction_ts": [now, now_l1d, now_l1d, now, now], - "transaction_date": [now_date, now_l1d_date, now_l1d_date, now_date, now_date], + "transaction_date": [ + now_date, + now_l1d_date, + now_l1d_date, + now_date, + now_date, + ], "timestamp": [now, now_l1d, now_l1d, now_l1d, now], } ) diff --git a/fennel/datasets/datasets.py b/fennel/datasets/datasets.py index 5e31a3b4f..058ccb53e 100644 --- a/fennel/datasets/datasets.py +++ b/fennel/datasets/datasets.py @@ -2774,7 +2774,10 @@ def visitAggregate(self, obj) -> DSSchema: elif isinstance(agg, Min): dtype = input_schema.get_type(agg.of) primtive_dtype = get_primitive_dtype_with_optional(dtype) - allowed_types = primitive_numeric_types + [datetime.date, datetime.datetime] + allowed_types = primitive_numeric_types + [ + datetime.date, + datetime.datetime, + ] if primtive_dtype not in allowed_types: raise TypeError( f"invalid min: type of field `{agg.of}` is not int, float, date or datetime" @@ -2789,7 +2792,10 @@ def visitAggregate(self, obj) -> DSSchema: elif isinstance(agg, Max): dtype = input_schema.get_type(agg.of) primtive_dtype = get_primitive_dtype_with_optional(dtype) - allowed_types = primitive_numeric_types + [datetime.date, datetime.datetime] + allowed_types = primitive_numeric_types + [ + datetime.date, + datetime.datetime, + ] if primtive_dtype not in allowed_types: raise TypeError( f"invalid max: type of field `{agg.of}` is not int, float, date or datetime" diff --git a/fennel/datasets/test_schema_validator.py b/fennel/datasets/test_schema_validator.py index 355d06b82..cf10497ec 100644 --- a/fennel/datasets/test_schema_validator.py +++ b/fennel/datasets/test_schema_validator.py @@ -459,7 +459,8 @@ def pipeline(cls, a: Dataset): ) assert ( - str(e.value) == """invalid max: type of field `b` is not int, float, date or datetime""" + str(e.value) + == """invalid max: type of field `b` is not int, float, date or datetime""" )