Skip to content

Commit

Permalink
[DEV 3941] Add server support for discrete aggregation (#529)
Browse files Browse the repository at this point in the history
Add server support for discrete aggregation
  • Loading branch information
nonibansal committed Aug 20, 2024
1 parent 7e7eb6b commit 6fbad75
Show file tree
Hide file tree
Showing 14 changed files with 390 additions and 130 deletions.
3 changes: 3 additions & 0 deletions fennel/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## [1.5.5] - 2024-08-20
- Enable discrete aggregation with lookback

## [1.5.4] - 2024-08-14
- Add support for removal of auto extractors

Expand Down
94 changes: 85 additions & 9 deletions fennel/client_tests/test_discrete_window_aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,17 @@
webhook = Webhook(name="fennel_webhook")


@source(webhook.endpoint("Transactions"), cdc="append", disorder="14d")
@dataset
class Transactions:
id: str
user_id: str
amount: float
t: datetime


@mock
def test_discrete_hopping_tumbling_window_aggregation(client):
@source(webhook.endpoint("Transactions"), cdc="append", disorder="14d")
@dataset
class Transactions:
id: str
user_id: str
amount: float
t: datetime

@dataset(index=True)
class Stats:
Expand Down Expand Up @@ -122,7 +124,7 @@ def pipeline(cls, event: Dataset):
EventCount,
keys=pd.DataFrame({"user_id": [1, 2, 3]}),
)
assert df["count"].tolist() == [0, 0, 0]
assert df["count"].tolist() == [0, 3, 2]

# Test offline lookups
df, _ = client.lookup(
Expand Down Expand Up @@ -219,5 +221,79 @@ def pipeline(cls, event: Dataset):
EventCountAggregate,
keys=pd.DataFrame({"user_id": [1, 2, 3]}),
)
expected = [2.454, 1.756, 1.648]
expected = [2.454, 1.613, 1.537]
assert df["avg"].tolist() == pytest.approx(expected, abs=1e-3)


@mock
def test_discrete_aggregation_with_lookback(client):

@dataset(index=True)
class Stats:
user_id: str = field(key=True)
sum: float
count: int
t: datetime

@pipeline
@inputs(Transactions)
def pipeline(cls, event: Dataset):
return (
event.dedup("id")
.groupby("user_id")
.aggregate(
count=Count(window=Tumbling("1h", lookback="1h")),
sum=Sum(
window=Hopping("1h", "30m", lookback="1h"), of="amount"
),
)
)

client.commit(datasets=[Transactions, Stats], message="first_commit")

now = datetime(2024, 1, 1, 15, 30, 0, tzinfo=timezone.utc)
now_1h = now - timedelta(hours=1)
now_2h = now - timedelta(hours=2)
now_3h = now - timedelta(hours=3)

# Frontier -> 2024-01-01 15:30:00
# With lookback as one hour
# Count -> (2024-01-01 13:00:00, 2024-01-01 14:00:00)
# Sum -> (2024-01-01 13:30:00, 2024-01-01 14:30:00)

df = pd.DataFrame(
{
"id": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
"user_id": [1, 1, 1, 2, 2, 2, 3, 3, 3, 4],
"amount": [100, 200, 300, 400, 500, 600, 700, 800, 900, 1000],
"t": [
now_1h,
now_2h,
now_3h,
now_1h,
now_2h,
now_3h,
now_1h,
now_2h,
now_3h,
now,
],
}
)
client.log("fennel_webhook", "Transactions", df)
client.sleep()

df, _ = client.lookup(
Stats,
keys=pd.DataFrame({"user_id": [1, 2, 3]}),
timestamps=pd.Series([now, now, now]),
)
assert df["count"].tolist() == [1, 1, 1]
assert df["sum"].tolist() == [200, 500, 800]

df, _ = client.lookup(
Stats,
keys=pd.DataFrame({"user_id": [1, 2, 3]}),
)
assert df["count"].tolist() == [1, 1, 1]
assert df["sum"].tolist() == [200, 500, 800]
11 changes: 11 additions & 0 deletions fennel/datasets/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -2548,6 +2548,7 @@ def visitAggregate(self, obj) -> DSSchema:

found_discrete = False
found_non_discrete = False
lookback = None
for agg in obj.aggregates:

# If default window is present in groupby then each aggregate spec cannot have window different from
Expand Down Expand Up @@ -2577,6 +2578,16 @@ def visitAggregate(self, obj) -> DSSchema:
)
found_non_discrete = True

# Check lookback in all windows are same
if isinstance(agg.window, (Hopping, Tumbling)):
curr_lookback = agg.window.lookback_total_seconds()
if lookback and curr_lookback != lookback:
raise ValueError(
f"Windows in all specs must have same lookback found {lookback} and {curr_lookback} in "
f"pipeline `{self.pipeline_name}`."
)
lookback = curr_lookback

exceptions = agg.validate()
if exceptions is not None:
raise ValueError(f"Invalid aggregate `{agg}`: {exceptions}")
Expand Down
53 changes: 43 additions & 10 deletions fennel/dtypes/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,15 @@
from dataclasses import dataclass
from datetime import datetime
from textwrap import dedent
from typing import TYPE_CHECKING, Union, List, get_args, ForwardRef, Any
from typing import (
TYPE_CHECKING,
Union,
List,
get_args,
ForwardRef,
Any,
Optional,
)

import google.protobuf.duration_pb2 as duration_proto # type: ignore
import pandas as pd
Expand Down Expand Up @@ -333,6 +341,7 @@ def signature(self) -> str:
@dataclass
class Tumbling:
duration: str
lookback: Optional[str] = None

def __post_init__(self):
"""
Expand All @@ -342,6 +351,11 @@ def __post_init__(self):
raise ValueError(
"'forever' is not a valid duration value for Tumbling window."
)
if self.lookback:
try:
duration_to_timedelta(self.lookback)
except Exception as e:
raise ValueError(f"Failed when parsing lookback : {e}.")
try:
duration_to_timedelta(self.duration)
except Exception as e:
Expand All @@ -363,11 +377,17 @@ def duration_total_seconds(self) -> int:
def stride_total_seconds(self) -> int:
return int(duration_to_timedelta(self.duration).total_seconds())

def lookback_total_seconds(self) -> int:
if self.lookback is None:
return 0
return int(duration_to_timedelta(self.lookback).total_seconds())


@dataclass
class Hopping:
duration: str
stride: str
lookback: Optional[str] = None

def __post_init__(self):
"""
Expand All @@ -377,9 +397,12 @@ def __post_init__(self):
stride_timedelta = duration_to_timedelta(self.stride)
except Exception as e:
raise ValueError(f"Failed when parsing stride : {e}.")
if self.duration == "forever":
raise ValueError("Forever hopping window is not yet supported.")
else:
if self.lookback:
try:
duration_to_timedelta(self.lookback)
except Exception as e:
raise ValueError(f"Failed when parsing lookback : {e}.")
if self.duration != "forever":
try:
duration_timedelta = duration_to_timedelta(self.duration)
except Exception as e:
Expand All @@ -390,14 +413,19 @@ def __post_init__(self):
)

def to_proto(self) -> window_proto.Window:
duration = duration_proto.Duration()
duration.FromTimedelta(duration_to_timedelta(self.duration))

stride = duration_proto.Duration()
stride.FromTimedelta(duration_to_timedelta(self.stride))
return window_proto.Window(
hopping=window_proto.Hopping(duration=duration, stride=stride)
)

if self.duration != "forever":
duration = duration_proto.Duration()
duration.FromTimedelta(duration_to_timedelta(self.duration))
return window_proto.Window(
hopping=window_proto.Hopping(duration=duration, stride=stride)
)
else:
return window_proto.Window(
forever_hopping=window_proto.ForeverHopping(stride=stride)
)

def signature(self) -> str:
return f"hopping-{self.duration}-{self.stride}"
Expand All @@ -412,6 +440,11 @@ def duration_total_seconds(self) -> int:
def stride_total_seconds(self) -> int:
return int(duration_to_timedelta(self.stride).total_seconds())

def lookback_total_seconds(self) -> int:
if self.lookback is None:
return 0
return int(duration_to_timedelta(self.lookback).total_seconds())


@dataclass
class Session:
Expand Down
120 changes: 60 additions & 60 deletions fennel/gen/dataset_pb2.py

Large diffs are not rendered by default.

32 changes: 17 additions & 15 deletions fennel/gen/dataset_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,21 @@ else:

DESCRIPTOR: google.protobuf.descriptor.FileDescriptor

class _EmitStrategy:
ValueType = typing.NewType("ValueType", builtins.int)
V: typing_extensions.TypeAlias = ValueType

class _EmitStrategyEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[_EmitStrategy.ValueType], builtins.type):
DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor
Eager: _EmitStrategy.ValueType # 0
Final: _EmitStrategy.ValueType # 1

class EmitStrategy(_EmitStrategy, metaclass=_EmitStrategyEnumTypeWrapper): ...

Eager: EmitStrategy.ValueType # 0
Final: EmitStrategy.ValueType # 1
global___EmitStrategy = EmitStrategy

@typing_extensions.final
class CoreDataset(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
Expand Down Expand Up @@ -281,19 +296,6 @@ global___Operator = Operator
class Aggregate(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor

class _EmitStrategy:
ValueType = typing.NewType("ValueType", builtins.int)
V: typing_extensions.TypeAlias = ValueType

class _EmitStrategyEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[Aggregate._EmitStrategy.ValueType], builtins.type): # noqa: F821
DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor
Eager: Aggregate._EmitStrategy.ValueType # 0
Final: Aggregate._EmitStrategy.ValueType # 1

class EmitStrategy(_EmitStrategy, metaclass=_EmitStrategyEnumTypeWrapper): ...
Eager: Aggregate.EmitStrategy.ValueType # 0
Final: Aggregate.EmitStrategy.ValueType # 1

OPERAND_ID_FIELD_NUMBER: builtins.int
KEYS_FIELD_NUMBER: builtins.int
SPECS_FIELD_NUMBER: builtins.int
Expand All @@ -306,7 +308,7 @@ class Aggregate(google.protobuf.message.Message):
@property
def specs(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[spec_pb2.PreSpec]: ...
along: builtins.str
emit_strategy: global___Aggregate.EmitStrategy.ValueType
emit_strategy: global___EmitStrategy.ValueType
operand_name: builtins.str
"""NOTE: FOLLOWING PROPERTIES ARE SET BY THE SERVER AND WILL BE IGNORED BY
THE CLIENT
Expand All @@ -318,7 +320,7 @@ class Aggregate(google.protobuf.message.Message):
keys: collections.abc.Iterable[builtins.str] | None = ...,
specs: collections.abc.Iterable[spec_pb2.PreSpec] | None = ...,
along: builtins.str | None = ...,
emit_strategy: global___Aggregate.EmitStrategy.ValueType = ...,
emit_strategy: global___EmitStrategy.ValueType = ...,
operand_name: builtins.str = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["_along", b"_along", "along", b"along"]) -> builtins.bool: ...
Expand Down
30 changes: 16 additions & 14 deletions fennel/gen/window_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 6fbad75

Please sign in to comment.