Skip to content

Commit

Permalink
Add dropnull to FirstK (#556)
Browse files Browse the repository at this point in the history
Add dropnull to FirstK aggregation to filter out None values from Optional input.
  • Loading branch information
satrana42 authored Sep 27, 2024
1 parent 15ec345 commit e7d9900
Show file tree
Hide file tree
Showing 13 changed files with 193 additions and 20 deletions.
2 changes: 2 additions & 0 deletions docs/examples/api-reference/aggregations/firstk.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def firstk_pipeline(cls, ds: Dataset):
limit=10,
dedup=False,
window=Continuous("1d"),
dropnull=False,
),
# docsnip-highlight end
)
Expand Down Expand Up @@ -143,6 +144,7 @@ def bad_pipeline(cls, ds: Dataset):
limit=10,
dedup=False,
window=Continuous("1d"),
dropnull=False,
),
# docsnip-highlight end
)
Expand Down
3 changes: 3 additions & 0 deletions fennel/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## [1.5.32] - 2024-09-27
- Add dropnull to FirstK

## [1.5.31] - 2024-09-26
- Fix bug in signature of assign operator.

Expand Down
105 changes: 105 additions & 0 deletions fennel/client_tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
pipeline,
Dataset,
LastK,
FirstK,
Min,
Max,
Sum,
Expand Down Expand Up @@ -4514,6 +4515,110 @@ def pipeline(cls, event: Dataset):
]


@pytest.mark.integration
@mock
def test_firstk_null(client):
@dataset
@source(webhook.endpoint("A"), disorder="14d", cdc="append")
class A:
id: int
value: Optional[int]
ts: datetime

@dataset(index=True)
class B:
id: int = field(key=True)
value: List[Optional[int]]
ts: datetime

@pipeline
@inputs(A)
def pipeline(cls, event: Dataset):
return event.groupby("id").aggregate(
value=FirstK(
of="value",
window=Continuous("forever"),
dedup=False,
limit=2,
dropnull=False,
)
)

client.commit(datasets=[A, B], message="test")

now = datetime.now(timezone.utc)
yesterday = now - timedelta(days=1)
day_before_yesterday = now - timedelta(days=2)
df = pd.DataFrame(
{
"id": [1, 1, 2, 2, 3, 3],
"value": [None, 1, None, 2, None, 3],
"ts": [yesterday, now, now, day_before_yesterday, now, yesterday],
}
)
df["value"] = df["value"].astype("Int64")
client.log("fennel_webhook", "A", df)
client.sleep(30)

results, _ = client.lookup(
B,
keys=pd.DataFrame({"id": [1, 2, 3]}),
)
assert results["value"].tolist() == [[pd.NA, 1], [2, pd.NA], [3, pd.NA]]


@pytest.mark.integration
@mock
def test_firstk_dropnull(client):
@dataset
@source(webhook.endpoint("A"), disorder="14d", cdc="append")
class A:
id: int
value: Optional[int]
ts: datetime

@dataset(index=True)
class B:
id: int = field(key=True)
value: List[int]
ts: datetime

@pipeline
@inputs(A)
def pipeline(cls, event: Dataset):
return event.groupby("id").aggregate(
value=FirstK(
of="value",
window=Continuous("forever"),
dedup=False,
limit=2,
dropnull=True,
)
)

client.commit(datasets=[A, B], message="test")

now = datetime.now(timezone.utc)
yesterday = now - timedelta(days=1)
day_before_yesterday = now - timedelta(days=2)
df = pd.DataFrame(
{
"id": [1, 1, 2, 2, 3, 3],
"value": [None, 1, None, 2, None, 3],
"ts": [yesterday, now, now, day_before_yesterday, now, yesterday],
}
)
df["value"] = df["value"].astype("Int64")
client.log("fennel_webhook", "A", df)
client.sleep(30)

results, _ = client.lookup(
B,
keys=pd.DataFrame({"id": [1, 2, 3]}),
)
assert results["value"].tolist() == [[1], [2], [3]]


@mock
def test_unkey_operator(client):
@dataset
Expand Down
1 change: 1 addition & 0 deletions fennel/client_tests/test_social_network.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ def first_viewed_post(cls, view_data: Dataset):
window=Continuous("forever"),
limit=1,
dedup=False,
dropnull=False,
)
)

Expand Down
1 change: 1 addition & 0 deletions fennel/client_tests/test_struct_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def movie_info(cls, movie_cast: Dataset):
window=Continuous("forever"),
limit=3,
dedup=False,
dropnull=False,
),
]
)
Expand Down
2 changes: 2 additions & 0 deletions fennel/datasets/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ class FirstK(AggregateType):
of: str
limit: int
dedup: bool
dropnull: bool

def to_proto(self):
if self.window is None:
Expand All @@ -295,6 +296,7 @@ def to_proto(self):
of=self.of,
limit=self.limit,
dedup=self.dedup,
dropnull=self.dropnull,
)
)

Expand Down
12 changes: 12 additions & 0 deletions fennel/datasets/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,12 @@ def dsschema(self):
values[agg.into_field] = List[list_type] # type: ignore
elif isinstance(agg, FirstK):
dtype = input_schema.get_type(agg.of)
if agg.dropnull:
if not fennel_is_optional(dtype):
raise TypeError(
"dropnull is only allowed for optional types"
)
dtype = fennel_get_optional_inner(dtype)
list_type = get_python_type_from_pd(dtype)
values[agg.into_field] = List[list_type] # type: ignore
elif isinstance(agg, Stddev):
Expand Down Expand Up @@ -2740,6 +2746,12 @@ def visitAggregate(self, obj) -> DSSchema:
values[agg.into_field] = List[list_type] # type: ignore
elif isinstance(agg, FirstK):
dtype = input_schema.get_type(agg.of)
if agg.dropnull:
if not fennel_is_optional(dtype):
raise TypeError(
"dropnull is only allowed for optional types"
)
dtype = fennel_get_optional_inner(dtype)
list_type = get_python_type_from_pd(dtype)
values[agg.into_field] = List[list_type] # type: ignore
elif isinstance(agg, Min):
Expand Down
1 change: 1 addition & 0 deletions fennel/datasets/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ def create_aggregated_dataset(cls, user_info: Dataset):
dedup=True,
window=Continuous("forever"),
into_field=str(cls.countries),
dropnull=False,
),
]
)
Expand Down
28 changes: 14 additions & 14 deletions fennel/gen/spec_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion fennel/gen/spec_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,14 @@ class FirstK(google.protobuf.message.Message):
LIMIT_FIELD_NUMBER: builtins.int
DEDUP_FIELD_NUMBER: builtins.int
WINDOW_FIELD_NUMBER: builtins.int
DROPNULL_FIELD_NUMBER: builtins.int
of: builtins.str
name: builtins.str
limit: builtins.int
dedup: builtins.bool
@property
def window(self) -> window_pb2.Window: ...
dropnull: builtins.bool
def __init__(
self,
*,
Expand All @@ -209,9 +211,10 @@ class FirstK(google.protobuf.message.Message):
limit: builtins.int = ...,
dedup: builtins.bool = ...,
window: window_pb2.Window | None = ...,
dropnull: builtins.bool = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["window", b"window"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["dedup", b"dedup", "limit", b"limit", "name", b"name", "of", b"of", "window", b"window"]) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["dedup", b"dedup", "dropnull", b"dropnull", "limit", b"limit", "name", b"name", "of", b"of", "window", b"window"]) -> None: ...

global___FirstK = FirstK

Expand Down
13 changes: 11 additions & 2 deletions fennel/testing/execute_aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,19 @@ def get_val(self):


class FirstKState(AggState):
def __init__(self, k, dedup):
def __init__(self, k, dedup, dropnull):
self.k = k
self.dedeup = dedup
self.dropnull = dropnull
self.vals = []

def is_null(self, val):
return (val is None) or pd.isnull(val)

def add_val_to_state(self, val, _ts):
if self.dropnull and self.is_null(val):
return list(self.vals[: self.k])

self.vals.append(val)
if not self.dedeup:
return list(self.vals[: self.k])
Expand Down Expand Up @@ -848,7 +855,9 @@ def get_aggregated_df(
aggregate.limit, aggregate.dedup, aggregate.dropnull
)
elif isinstance(aggregate, FirstK):
state[key] = FirstKState(aggregate.limit, aggregate.dedup)
state[key] = FirstKState(
aggregate.limit, aggregate.dedup, aggregate.dropnull
)
elif isinstance(aggregate, Min):
state[key] = MinState(aggregate.default)
elif isinstance(aggregate, Max):
Expand Down
38 changes: 36 additions & 2 deletions fennel/testing/test_execute_aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def test_lastk_state_dedup():


def test_firstk_state():
state = FirstKState(k=3, dedup=False)
state = FirstKState(k=3, dedup=False, dropnull=False)
now = datetime.now(timezone.utc)
assert state.add_val_to_state(1, now) == [1]
assert state.add_val_to_state(2, now) == [1, 2]
Expand All @@ -115,7 +115,7 @@ def test_firstk_state():


def test_firstk_state_dedup():
state = FirstKState(k=3, dedup=True)
state = FirstKState(k=3, dedup=True, dropnull=False)
now = datetime.now(timezone.utc)
assert state.add_val_to_state(1, now) == [1]
assert state.add_val_to_state(2, now) == [1, 2]
Expand All @@ -128,6 +128,40 @@ def test_firstk_state_dedup():
assert state.del_val_from_state(4, now) == [1, 2]


def test_firstk_state_null():
state = FirstKState(k=3, dedup=False, dropnull=False)
now = datetime.now(timezone.utc)
assert state.add_val_to_state(1, now) == [1]
assert state.add_val_to_state(None, now) == [1, None]
assert state.add_val_to_state(2, now) == [1, None, 2]
assert state.add_val_to_state(3, now) == [1, None, 2]
assert state.add_val_to_state(None, now) == [1, None, 2]
assert state.add_val_to_state(4, now) == [1, None, 2]

assert state.del_val_from_state(2, now) == [1, None, 3]
assert state.del_val_from_state(3, now) == [1, None, None]
assert state.del_val_from_state(None, now) == [1, None, 4]
assert state.del_val_from_state(4, now) == [1, None]
assert state.del_val_from_state(None, now) == [1]


def test_firstk_state_dropnull():
state = FirstKState(k=3, dedup=False, dropnull=True)
now = datetime.now(timezone.utc)
assert state.add_val_to_state(1, now) == [1]
assert state.add_val_to_state(None, now) == [1]
assert state.add_val_to_state(2, now) == [1, 2]
assert state.add_val_to_state(3, now) == [1, 2, 3]
assert state.add_val_to_state(4, now) == [1, 2, 3]
assert state.add_val_to_state(None, now) == [1, 2, 3]
assert state.add_val_to_state(5, now) == [1, 2, 3]

assert state.del_val_from_state(3, now) == [1, 2, 4]
assert state.del_val_from_state(4, now) == [1, 2, 5]
assert state.del_val_from_state(5, now) == [1, 2]
assert state.del_val_from_state(5, now) == [1, 2]


def test_min_state():
state = MinState(default=3.0)
now = datetime.now(timezone.utc)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "fennel-ai"
version = "1.5.31"
version = "1.5.32"
description = "The modern realtime feature engineering platform"
authors = ["Fennel AI <developers@fennel.ai>"]
packages = [{ include = "fennel" }]
Expand Down

0 comments on commit e7d9900

Please sign in to comment.