From e7d9900687f2b78d024d0c713d900a224fd4c9bb Mon Sep 17 00:00:00 2001 From: Satwant Rana <4613501+satrana42@users.noreply.github.com> Date: Fri, 27 Sep 2024 22:09:36 +0530 Subject: [PATCH] Add dropnull to FirstK (#556) Add dropnull to FirstK aggregation to filter out None values from Optional input. --- .../api-reference/aggregations/firstk.py | 2 + fennel/CHANGELOG.md | 3 + fennel/client_tests/test_dataset.py | 105 ++++++++++++++++++ fennel/client_tests/test_social_network.py | 1 + fennel/client_tests/test_struct_type.py | 1 + fennel/datasets/aggregate.py | 2 + fennel/datasets/datasets.py | 12 ++ fennel/datasets/test_dataset.py | 1 + fennel/gen/spec_pb2.py | 28 ++--- fennel/gen/spec_pb2.pyi | 5 +- fennel/testing/execute_aggregation.py | 13 ++- fennel/testing/test_execute_aggregation.py | 38 ++++++- pyproject.toml | 2 +- 13 files changed, 193 insertions(+), 20 deletions(-) diff --git a/docs/examples/api-reference/aggregations/firstk.py b/docs/examples/api-reference/aggregations/firstk.py index 05cb1cd5b..9cd20a326 100644 --- a/docs/examples/api-reference/aggregations/firstk.py +++ b/docs/examples/api-reference/aggregations/firstk.py @@ -42,6 +42,7 @@ def firstk_pipeline(cls, ds: Dataset): limit=10, dedup=False, window=Continuous("1d"), + dropnull=False, ), # docsnip-highlight end ) @@ -143,6 +144,7 @@ def bad_pipeline(cls, ds: Dataset): limit=10, dedup=False, window=Continuous("1d"), + dropnull=False, ), # docsnip-highlight end ) diff --git a/fennel/CHANGELOG.md b/fennel/CHANGELOG.md index 2a1e346d7..8d04212cf 100644 --- a/fennel/CHANGELOG.md +++ b/fennel/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## [1.5.32] - 2024-09-27 +- Add dropnull to FirstK + ## [1.5.31] - 2024-09-26 - Fix bug in signature of assign operator. diff --git a/fennel/client_tests/test_dataset.py b/fennel/client_tests/test_dataset.py index 17cd04859..600a1b9ce 100644 --- a/fennel/client_tests/test_dataset.py +++ b/fennel/client_tests/test_dataset.py @@ -19,6 +19,7 @@ pipeline, Dataset, LastK, + FirstK, Min, Max, Sum, @@ -4514,6 +4515,110 @@ def pipeline(cls, event: Dataset): ] +@pytest.mark.integration +@mock +def test_firstk_null(client): + @dataset + @source(webhook.endpoint("A"), disorder="14d", cdc="append") + class A: + id: int + value: Optional[int] + ts: datetime + + @dataset(index=True) + class B: + id: int = field(key=True) + value: List[Optional[int]] + ts: datetime + + @pipeline + @inputs(A) + def pipeline(cls, event: Dataset): + return event.groupby("id").aggregate( + value=FirstK( + of="value", + window=Continuous("forever"), + dedup=False, + limit=2, + dropnull=False, + ) + ) + + client.commit(datasets=[A, B], message="test") + + now = datetime.now(timezone.utc) + yesterday = now - timedelta(days=1) + day_before_yesterday = now - timedelta(days=2) + df = pd.DataFrame( + { + "id": [1, 1, 2, 2, 3, 3], + "value": [None, 1, None, 2, None, 3], + "ts": [yesterday, now, now, day_before_yesterday, now, yesterday], + } + ) + df["value"] = df["value"].astype("Int64") + client.log("fennel_webhook", "A", df) + client.sleep(30) + + results, _ = client.lookup( + B, + keys=pd.DataFrame({"id": [1, 2, 3]}), + ) + assert results["value"].tolist() == [[pd.NA, 1], [2, pd.NA], [3, pd.NA]] + + +@pytest.mark.integration +@mock +def test_firstk_dropnull(client): + @dataset + @source(webhook.endpoint("A"), disorder="14d", cdc="append") + class A: + id: int + value: Optional[int] + ts: datetime + + @dataset(index=True) + class B: + id: int = field(key=True) + value: List[int] + ts: datetime + + @pipeline + @inputs(A) + def pipeline(cls, event: Dataset): + return event.groupby("id").aggregate( + value=FirstK( + of="value", + window=Continuous("forever"), + dedup=False, + limit=2, + dropnull=True, + ) + ) + + client.commit(datasets=[A, B], message="test") + + now = datetime.now(timezone.utc) + yesterday = now - timedelta(days=1) + day_before_yesterday = now - timedelta(days=2) + df = pd.DataFrame( + { + "id": [1, 1, 2, 2, 3, 3], + "value": [None, 1, None, 2, None, 3], + "ts": [yesterday, now, now, day_before_yesterday, now, yesterday], + } + ) + df["value"] = df["value"].astype("Int64") + client.log("fennel_webhook", "A", df) + client.sleep(30) + + results, _ = client.lookup( + B, + keys=pd.DataFrame({"id": [1, 2, 3]}), + ) + assert results["value"].tolist() == [[1], [2], [3]] + + @mock def test_unkey_operator(client): @dataset diff --git a/fennel/client_tests/test_social_network.py b/fennel/client_tests/test_social_network.py index 87510744a..32e2f16d0 100644 --- a/fennel/client_tests/test_social_network.py +++ b/fennel/client_tests/test_social_network.py @@ -183,6 +183,7 @@ def first_viewed_post(cls, view_data: Dataset): window=Continuous("forever"), limit=1, dedup=False, + dropnull=False, ) ) diff --git a/fennel/client_tests/test_struct_type.py b/fennel/client_tests/test_struct_type.py index 0b9cdbe5c..a5fcd4833 100644 --- a/fennel/client_tests/test_struct_type.py +++ b/fennel/client_tests/test_struct_type.py @@ -79,6 +79,7 @@ def movie_info(cls, movie_cast: Dataset): window=Continuous("forever"), limit=3, dedup=False, + dropnull=False, ), ] ) diff --git a/fennel/datasets/aggregate.py b/fennel/datasets/aggregate.py index bef430600..4ddb3f835 100644 --- a/fennel/datasets/aggregate.py +++ b/fennel/datasets/aggregate.py @@ -284,6 +284,7 @@ class FirstK(AggregateType): of: str limit: int dedup: bool + dropnull: bool def to_proto(self): if self.window is None: @@ -295,6 +296,7 @@ def to_proto(self): of=self.of, limit=self.limit, dedup=self.dedup, + dropnull=self.dropnull, ) ) diff --git a/fennel/datasets/datasets.py b/fennel/datasets/datasets.py index 0224bc5d7..80591b120 100644 --- a/fennel/datasets/datasets.py +++ b/fennel/datasets/datasets.py @@ -688,6 +688,12 @@ def dsschema(self): values[agg.into_field] = List[list_type] # type: ignore elif isinstance(agg, FirstK): dtype = input_schema.get_type(agg.of) + if agg.dropnull: + if not fennel_is_optional(dtype): + raise TypeError( + "dropnull is only allowed for optional types" + ) + dtype = fennel_get_optional_inner(dtype) list_type = get_python_type_from_pd(dtype) values[agg.into_field] = List[list_type] # type: ignore elif isinstance(agg, Stddev): @@ -2740,6 +2746,12 @@ def visitAggregate(self, obj) -> DSSchema: values[agg.into_field] = List[list_type] # type: ignore elif isinstance(agg, FirstK): dtype = input_schema.get_type(agg.of) + if agg.dropnull: + if not fennel_is_optional(dtype): + raise TypeError( + "dropnull is only allowed for optional types" + ) + dtype = fennel_get_optional_inner(dtype) list_type = get_python_type_from_pd(dtype) values[agg.into_field] = List[list_type] # type: ignore elif isinstance(agg, Min): diff --git a/fennel/datasets/test_dataset.py b/fennel/datasets/test_dataset.py index 4fbdf328c..37f0f0684 100644 --- a/fennel/datasets/test_dataset.py +++ b/fennel/datasets/test_dataset.py @@ -290,6 +290,7 @@ def create_aggregated_dataset(cls, user_info: Dataset): dedup=True, window=Continuous("forever"), into_field=str(cls.countries), + dropnull=False, ), ] ) diff --git a/fennel/gen/spec_pb2.py b/fennel/gen/spec_pb2.py index b1a8f493e..f2a47489a 100644 --- a/fennel/gen/spec_pb2.py +++ b/fennel/gen/spec_pb2.py @@ -14,7 +14,7 @@ import fennel.gen.window_pb2 as window__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\nspec.proto\x12\x11\x66\x65nnel.proto.spec\x1a\x0cwindow.proto\"\x8f\x04\n\x07PreSpec\x12%\n\x03sum\x18\x01 \x01(\x0b\x32\x16.fennel.proto.spec.SumH\x00\x12-\n\x07\x61verage\x18\x02 \x01(\x0b\x32\x1a.fennel.proto.spec.AverageH\x00\x12)\n\x05\x63ount\x18\x03 \x01(\x0b\x32\x18.fennel.proto.spec.CountH\x00\x12*\n\x06last_k\x18\x04 \x01(\x0b\x32\x18.fennel.proto.spec.LastKH\x00\x12%\n\x03min\x18\x05 \x01(\x0b\x32\x16.fennel.proto.spec.MinH\x00\x12%\n\x03max\x18\x06 \x01(\x0b\x32\x16.fennel.proto.spec.MaxH\x00\x12+\n\x06stddev\x18\x07 \x01(\x0b\x32\x19.fennel.proto.spec.StddevH\x00\x12/\n\x08\x64istinct\x18\x08 \x01(\x0b\x32\x1b.fennel.proto.spec.DistinctH\x00\x12/\n\x08quantile\x18\t \x01(\x0b\x32\x1b.fennel.proto.spec.QuantileH\x00\x12\x41\n\texp_decay\x18\n \x01(\x0b\x32,.fennel.proto.spec.ExponentialDecayAggregateH\x00\x12,\n\x07\x66irst_k\x18\x0b \x01(\x0b\x32\x19.fennel.proto.spec.FirstKH\x00\x42\t\n\x07variant\"L\n\x03Sum\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\"a\n\x07\x41verage\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0f\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01\"\x80\x01\n\x05\x43ount\x12\x0c\n\x04name\x18\x01 \x01(\t\x12+\n\x06window\x18\x02 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0e\n\x06unique\x18\x03 \x01(\x08\x12\x0e\n\x06\x61pprox\x18\x04 \x01(\x08\x12\n\n\x02of\x18\x05 \x01(\t\x12\x10\n\x08\x64ropnull\x18\x06 \x01(\x08\"~\n\x05LastK\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\r\n\x05limit\x18\x03 \x01(\r\x12\r\n\x05\x64\x65\x64up\x18\x04 \x01(\x08\x12+\n\x06window\x18\x05 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x10\n\x08\x64ropnull\x18\x06 \x01(\x08\"m\n\x06\x46irstK\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\r\n\x05limit\x18\x03 \x01(\r\x12\r\n\x05\x64\x65\x64up\x18\x04 \x01(\x08\x12+\n\x06window\x18\x05 \x01(\x0b\x32\x1b.fennel.proto.window.Window\"]\n\x03Min\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0f\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01\"]\n\x03Max\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0f\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01\"`\n\x06Stddev\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0f\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01\"c\n\x08\x44istinct\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x10\n\x08\x64ropnull\x18\x04 \x01(\x08\"\x95\x01\n\x08Quantile\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x14\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01H\x00\x88\x01\x01\x12\x10\n\x08quantile\x18\x05 \x01(\x01\x12\x0e\n\x06\x61pprox\x18\x06 \x01(\x08\x42\n\n\x08_default\"}\n\x19\x45xponentialDecayAggregate\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x19\n\x11half_life_seconds\x18\x04 \x01(\rb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\nspec.proto\x12\x11\x66\x65nnel.proto.spec\x1a\x0cwindow.proto\"\x8f\x04\n\x07PreSpec\x12%\n\x03sum\x18\x01 \x01(\x0b\x32\x16.fennel.proto.spec.SumH\x00\x12-\n\x07\x61verage\x18\x02 \x01(\x0b\x32\x1a.fennel.proto.spec.AverageH\x00\x12)\n\x05\x63ount\x18\x03 \x01(\x0b\x32\x18.fennel.proto.spec.CountH\x00\x12*\n\x06last_k\x18\x04 \x01(\x0b\x32\x18.fennel.proto.spec.LastKH\x00\x12%\n\x03min\x18\x05 \x01(\x0b\x32\x16.fennel.proto.spec.MinH\x00\x12%\n\x03max\x18\x06 \x01(\x0b\x32\x16.fennel.proto.spec.MaxH\x00\x12+\n\x06stddev\x18\x07 \x01(\x0b\x32\x19.fennel.proto.spec.StddevH\x00\x12/\n\x08\x64istinct\x18\x08 \x01(\x0b\x32\x1b.fennel.proto.spec.DistinctH\x00\x12/\n\x08quantile\x18\t \x01(\x0b\x32\x1b.fennel.proto.spec.QuantileH\x00\x12\x41\n\texp_decay\x18\n \x01(\x0b\x32,.fennel.proto.spec.ExponentialDecayAggregateH\x00\x12,\n\x07\x66irst_k\x18\x0b \x01(\x0b\x32\x19.fennel.proto.spec.FirstKH\x00\x42\t\n\x07variant\"L\n\x03Sum\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\"a\n\x07\x41verage\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0f\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01\"\x80\x01\n\x05\x43ount\x12\x0c\n\x04name\x18\x01 \x01(\t\x12+\n\x06window\x18\x02 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0e\n\x06unique\x18\x03 \x01(\x08\x12\x0e\n\x06\x61pprox\x18\x04 \x01(\x08\x12\n\n\x02of\x18\x05 \x01(\t\x12\x10\n\x08\x64ropnull\x18\x06 \x01(\x08\"~\n\x05LastK\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\r\n\x05limit\x18\x03 \x01(\r\x12\r\n\x05\x64\x65\x64up\x18\x04 \x01(\x08\x12+\n\x06window\x18\x05 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x10\n\x08\x64ropnull\x18\x06 \x01(\x08\"\x7f\n\x06\x46irstK\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\r\n\x05limit\x18\x03 \x01(\r\x12\r\n\x05\x64\x65\x64up\x18\x04 \x01(\x08\x12+\n\x06window\x18\x05 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x10\n\x08\x64ropnull\x18\x06 \x01(\x08\"]\n\x03Min\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0f\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01\"]\n\x03Max\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0f\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01\"`\n\x06Stddev\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0f\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01\"c\n\x08\x44istinct\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x10\n\x08\x64ropnull\x18\x04 \x01(\x08\"\x95\x01\n\x08Quantile\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x14\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01H\x00\x88\x01\x01\x12\x10\n\x08quantile\x18\x05 \x01(\x01\x12\x0e\n\x06\x61pprox\x18\x06 \x01(\x08\x42\n\n\x08_default\"}\n\x19\x45xponentialDecayAggregate\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x19\n\x11half_life_seconds\x18\x04 \x01(\rb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -32,17 +32,17 @@ _globals['_LASTK']._serialized_start=885 _globals['_LASTK']._serialized_end=1011 _globals['_FIRSTK']._serialized_start=1013 - _globals['_FIRSTK']._serialized_end=1122 - _globals['_MIN']._serialized_start=1124 - _globals['_MIN']._serialized_end=1217 - _globals['_MAX']._serialized_start=1219 - _globals['_MAX']._serialized_end=1312 - _globals['_STDDEV']._serialized_start=1314 - _globals['_STDDEV']._serialized_end=1410 - _globals['_DISTINCT']._serialized_start=1412 - _globals['_DISTINCT']._serialized_end=1511 - _globals['_QUANTILE']._serialized_start=1514 - _globals['_QUANTILE']._serialized_end=1663 - _globals['_EXPONENTIALDECAYAGGREGATE']._serialized_start=1665 - _globals['_EXPONENTIALDECAYAGGREGATE']._serialized_end=1790 + _globals['_FIRSTK']._serialized_end=1140 + _globals['_MIN']._serialized_start=1142 + _globals['_MIN']._serialized_end=1235 + _globals['_MAX']._serialized_start=1237 + _globals['_MAX']._serialized_end=1330 + _globals['_STDDEV']._serialized_start=1332 + _globals['_STDDEV']._serialized_end=1428 + _globals['_DISTINCT']._serialized_start=1430 + _globals['_DISTINCT']._serialized_end=1529 + _globals['_QUANTILE']._serialized_start=1532 + _globals['_QUANTILE']._serialized_end=1681 + _globals['_EXPONENTIALDECAYAGGREGATE']._serialized_start=1683 + _globals['_EXPONENTIALDECAYAGGREGATE']._serialized_end=1808 # @@protoc_insertion_point(module_scope) diff --git a/fennel/gen/spec_pb2.pyi b/fennel/gen/spec_pb2.pyi index aca068cfe..99baeba0a 100644 --- a/fennel/gen/spec_pb2.pyi +++ b/fennel/gen/spec_pb2.pyi @@ -195,12 +195,14 @@ class FirstK(google.protobuf.message.Message): LIMIT_FIELD_NUMBER: builtins.int DEDUP_FIELD_NUMBER: builtins.int WINDOW_FIELD_NUMBER: builtins.int + DROPNULL_FIELD_NUMBER: builtins.int of: builtins.str name: builtins.str limit: builtins.int dedup: builtins.bool @property def window(self) -> window_pb2.Window: ... + dropnull: builtins.bool def __init__( self, *, @@ -209,9 +211,10 @@ class FirstK(google.protobuf.message.Message): limit: builtins.int = ..., dedup: builtins.bool = ..., window: window_pb2.Window | None = ..., + dropnull: builtins.bool = ..., ) -> None: ... def HasField(self, field_name: typing_extensions.Literal["window", b"window"]) -> builtins.bool: ... - def ClearField(self, field_name: typing_extensions.Literal["dedup", b"dedup", "limit", b"limit", "name", b"name", "of", b"of", "window", b"window"]) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["dedup", b"dedup", "dropnull", b"dropnull", "limit", b"limit", "name", b"name", "of", b"of", "window", b"window"]) -> None: ... global___FirstK = FirstK diff --git a/fennel/testing/execute_aggregation.py b/fennel/testing/execute_aggregation.py index 01de14972..4d72a4209 100644 --- a/fennel/testing/execute_aggregation.py +++ b/fennel/testing/execute_aggregation.py @@ -186,12 +186,19 @@ def get_val(self): class FirstKState(AggState): - def __init__(self, k, dedup): + def __init__(self, k, dedup, dropnull): self.k = k self.dedeup = dedup + self.dropnull = dropnull self.vals = [] + def is_null(self, val): + return (val is None) or pd.isnull(val) + def add_val_to_state(self, val, _ts): + if self.dropnull and self.is_null(val): + return list(self.vals[: self.k]) + self.vals.append(val) if not self.dedeup: return list(self.vals[: self.k]) @@ -848,7 +855,9 @@ def get_aggregated_df( aggregate.limit, aggregate.dedup, aggregate.dropnull ) elif isinstance(aggregate, FirstK): - state[key] = FirstKState(aggregate.limit, aggregate.dedup) + state[key] = FirstKState( + aggregate.limit, aggregate.dedup, aggregate.dropnull + ) elif isinstance(aggregate, Min): state[key] = MinState(aggregate.default) elif isinstance(aggregate, Max): diff --git a/fennel/testing/test_execute_aggregation.py b/fennel/testing/test_execute_aggregation.py index e00431bf8..13743c7f2 100644 --- a/fennel/testing/test_execute_aggregation.py +++ b/fennel/testing/test_execute_aggregation.py @@ -100,7 +100,7 @@ def test_lastk_state_dedup(): def test_firstk_state(): - state = FirstKState(k=3, dedup=False) + state = FirstKState(k=3, dedup=False, dropnull=False) now = datetime.now(timezone.utc) assert state.add_val_to_state(1, now) == [1] assert state.add_val_to_state(2, now) == [1, 2] @@ -115,7 +115,7 @@ def test_firstk_state(): def test_firstk_state_dedup(): - state = FirstKState(k=3, dedup=True) + state = FirstKState(k=3, dedup=True, dropnull=False) now = datetime.now(timezone.utc) assert state.add_val_to_state(1, now) == [1] assert state.add_val_to_state(2, now) == [1, 2] @@ -128,6 +128,40 @@ def test_firstk_state_dedup(): assert state.del_val_from_state(4, now) == [1, 2] +def test_firstk_state_null(): + state = FirstKState(k=3, dedup=False, dropnull=False) + now = datetime.now(timezone.utc) + assert state.add_val_to_state(1, now) == [1] + assert state.add_val_to_state(None, now) == [1, None] + assert state.add_val_to_state(2, now) == [1, None, 2] + assert state.add_val_to_state(3, now) == [1, None, 2] + assert state.add_val_to_state(None, now) == [1, None, 2] + assert state.add_val_to_state(4, now) == [1, None, 2] + + assert state.del_val_from_state(2, now) == [1, None, 3] + assert state.del_val_from_state(3, now) == [1, None, None] + assert state.del_val_from_state(None, now) == [1, None, 4] + assert state.del_val_from_state(4, now) == [1, None] + assert state.del_val_from_state(None, now) == [1] + + +def test_firstk_state_dropnull(): + state = FirstKState(k=3, dedup=False, dropnull=True) + now = datetime.now(timezone.utc) + assert state.add_val_to_state(1, now) == [1] + assert state.add_val_to_state(None, now) == [1] + assert state.add_val_to_state(2, now) == [1, 2] + assert state.add_val_to_state(3, now) == [1, 2, 3] + assert state.add_val_to_state(4, now) == [1, 2, 3] + assert state.add_val_to_state(None, now) == [1, 2, 3] + assert state.add_val_to_state(5, now) == [1, 2, 3] + + assert state.del_val_from_state(3, now) == [1, 2, 4] + assert state.del_val_from_state(4, now) == [1, 2, 5] + assert state.del_val_from_state(5, now) == [1, 2] + assert state.del_val_from_state(5, now) == [1, 2] + + def test_min_state(): state = MinState(default=3.0) now = datetime.now(timezone.utc) diff --git a/pyproject.toml b/pyproject.toml index 122c864fe..5936c8e19 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "fennel-ai" -version = "1.5.31" +version = "1.5.32" description = "The modern realtime feature engineering platform" authors = ["Fennel AI "] packages = [{ include = "fennel" }]