Skip to content

Commit

Permalink
add dedup window support (#584)
Browse files Browse the repository at this point in the history
* add dedup window support

* add documentation for dedup operator

---------

Co-authored-by: Hoang Phan <hoang@fennel.ai>
  • Loading branch information
petrpan26 and Hoang Phan authored Oct 17, 2024
1 parent 44456e0 commit 3b708c6
Show file tree
Hide file tree
Showing 29 changed files with 1,419 additions and 1,018 deletions.
3 changes: 3 additions & 0 deletions .wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ deanonymized
debezium
declaratively
dedup
deduped
deduping
deltalake
denormalize
destructions
Expand Down Expand Up @@ -275,6 +277,7 @@ kwargs
lastName
latencies
lifecycle
lookback
lookup
lookups
metaflags
Expand Down
174 changes: 174 additions & 0 deletions docs/examples/api-reference/operators/dedup.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,177 @@ def dedup_by_all_pipeline(cls, ds: Dataset):
datetime(2021, 1, 1, 0, 0, 0, tzinfo=timezone.utc),
datetime(2021, 1, 2, 0, 0, 0, tzinfo=timezone.utc),
]

@mock
def test_dedup_with_session_window(self, client):
# docsnip dedup_with_session_window
from fennel.datasets import dataset, pipeline, Dataset
from fennel.lib import inputs
from fennel.connectors import source, Webhook
from fennel.dtypes import Session

webhook = Webhook(name="webhook")

@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
@dataset
class Transaction:
txid: int
uid: int
amount: int
timestamp: datetime

@dataset
class Deduped:
txid: int
uid: int
amount: int
timestamp: datetime

@pipeline
@inputs(Transaction)
def dedup_by_all_pipeline(cls, ds: Dataset):
# docsnip-highlight next-line
return ds.dedup(by="txid", window=Session(gap="10s"))

# /docsnip

client.commit(message="some msg", datasets=[Transaction, Deduped])
# log some rows to the transaction dataset, with some duplicates
client.log(
"webhook",
"Transaction",
pd.DataFrame(
[
{
"txid": 1,
"uid": 1,
"amount": 10,
"timestamp": "2021-01-01T00:00:00",
},
{
"txid": 1,
"uid": 3,
"amount": 30,
"timestamp": "2021-01-01T00:00:11",
},
{
"txid": 1,
"uid": 30,
"amount": 40,
"timestamp": "2021-01-01T00:00:11",
},
{
"txid": 1,
"uid": 20,
"amount": 20,
"timestamp": "2021-01-01T00:00:02",
},
{
"txid": 1,
"uid": 4,
"amount": 40,
"timestamp": "2021-01-01T00:00:21",
},
{
"txid": 2,
"uid": 4,
"amount": 40,
"timestamp": "2021-01-01T00:00:21",
},
]
),
)
# do lookup on the WithSquare dataset
df = client.get_dataset_df("Deduped")
assert df["txid"].tolist() == [1, 1, 2]
assert df["uid"].tolist() == [30, 4, 4]
assert df["amount"].tolist() == [40, 40, 40]
assert df["timestamp"].tolist() == [
datetime(2021, 1, 1, 0, 0, 11, tzinfo=timezone.utc),
datetime(2021, 1, 1, 0, 0, 21, tzinfo=timezone.utc),
datetime(2021, 1, 1, 0, 0, 21, tzinfo=timezone.utc),
]

@mock
def test_dedup_with_tumbling_window(self, client):
# docsnip dedup_with_tumbling_window
from fennel.datasets import dataset, pipeline, Dataset
from fennel.lib import inputs
from fennel.connectors import source, Webhook
from fennel.dtypes import Tumbling

webhook = Webhook(name="webhook")

@source(webhook.endpoint("Transaction"), disorder="14d", cdc="append")
@dataset
class Transaction:
txid: int
uid: int
amount: int
timestamp: datetime

@dataset
class Deduped:
txid: int
uid: int
amount: int
timestamp: datetime

@pipeline
@inputs(Transaction)
def dedup_by_all_pipeline(cls, ds: Dataset):
# docsnip-highlight next-line
return ds.dedup(by="txid", window=Tumbling(duration="10s"))

# /docsnip

client.commit(message="some msg", datasets=[Transaction, Deduped])
# log some rows to the transaction dataset, with some duplicates
client.log(
"webhook",
"Transaction",
pd.DataFrame(
[
{
"txid": 1,
"uid": 1,
"amount": 10,
"timestamp": "2021-01-01T00:00:00",
},
{
"txid": 1,
"uid": 3,
"amount": 30,
"timestamp": "2021-01-01T00:00:09",
},
{
"txid": 1,
"uid": 4,
"amount": 40,
"timestamp": "2021-01-01T00:00:09",
},
{
"txid": 1,
"uid": 2,
"amount": 20,
"timestamp": "2021-01-01T00:00:19",
},
{
"txid": 2,
"uid": 2,
"amount": 20,
"timestamp": "2021-01-01T00:00:05",
},
]
),
)
# do lookup on the WithSquare dataset
df = client.get_dataset_df("Deduped")
assert df["txid"].tolist() == [2, 1, 1]
assert df["uid"].tolist() == [2, 4, 2]
assert df["amount"].tolist() == [20, 40, 20]
assert df["timestamp"].tolist() == [
datetime(2021, 1, 1, 0, 0, 5, tzinfo=timezone.utc),
datetime(2021, 1, 1, 0, 0, 9, tzinfo=timezone.utc),
datetime(2021, 1, 1, 0, 0, 19, tzinfo=timezone.utc),
]
23 changes: 21 additions & 2 deletions docs/pages/api-reference/operators/dedup.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,15 @@ Operator to dedup keyless datasets (e.g. event streams).
The list of columns to use for identifying duplicates. If not specified, all
the columns are used for identifying duplicates.

Two rows of the input dataset are considered duplicates if and only if they have
the same values for the timestamp column and all the `by` columns.
If window is specified, two rows of the input dataset are considered duplicates when they are in the same window and have the same value for the by columns.

If window is not specified, two rows are considered duplicates when they have the exact same values for the timestamp column and all the by columns.
</Expandable>

<Expandable title="window" type="Optional[Tumbling | Session]" defaultVal="None">

The window to group rows for deduping. If not specified, the rows will be deduped only by the `by` columns and the timestamp.

</Expandable>

<pre snippet="api-reference/operators/dedup#basic" status="success"
Expand All @@ -24,6 +31,14 @@ the same values for the timestamp column and all the `by` columns.
message="Dedup using all the fields">
</pre>

<pre snippet="api-reference/operators/dedup#dedup_with_session_window" status="success"
message="Dedup using session window">
</pre>

<pre snippet="api-reference/operators/dedup#dedup_with_tumbling_window" status="success"
message="Dedup using tumbling window">
</pre>

#### Returns
<Expandable type="Dataset">
Returns a keyless dataset having the same schema as the input dataset but with
Expand All @@ -35,3 +50,7 @@ some duplicated rows filtered out.
Commit error to apply dedup on a keyed dataset.
</Expandable>


<Expandable title="Dedup on hopping window or tumbling window with lookback">
Dedup on hopping window or tumbling window with lookback is not supported.
</Expandable>
31 changes: 26 additions & 5 deletions fennel/datasets/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,12 @@ def select(self, *args, columns: Optional[List[str]] = None) -> _Node:
)
return Select(self, cols, drop_cols)

def dedup(self, *args, by: Optional[List[str]] = None) -> _Node:
def dedup(
self,
*args,
by: Optional[List[str]] = None,
window: Optional[Union[Session, Tumbling]] = None,
) -> _Node:
# If 'by' is not provided, dedup by all value fields.
# Note: we don't use key fields because dedup cannot be applied on keyed datasets.
collist: List[str] = []
Expand All @@ -365,7 +370,7 @@ def dedup(self, *args, by: Optional[List[str]] = None) -> _Node:
"Invalid arguments to dedup. Must specify either 'by' or positional arguments."
)

return Dedup(self, collist)
return Dedup(self, collist, window)

def explode(self, *args, columns: List[str] = None) -> _Node:
columns = _Node.__get_list_args(*args, columns=columns, name="explode")
Expand Down Expand Up @@ -853,16 +858,32 @@ def dsschema(self):


class Dedup(_Node):
def __init__(self, node: _Node, by: List[str]):
def __init__(
self,
node: _Node,
by: List[str],
window: Optional[Union[Session, Tumbling]] = None,
):
super().__init__()
self.node = node
self.by = by
self.node.out_edges.append(self)
self.window = window

if window is not None:
if not isinstance(window, (Session, Tumbling)):
raise TypeError(
f"invalid dedup operator: 'window' can either be Session or Tumbling but found {type(window).__name__}"
)
if isinstance(window, Tumbling) and window.lookback != "0s":
raise ValueError(
"invalid dedup: not allowed to specify 'lookback' in the tumble window"
)

def signature(self):
if isinstance(self.node, Dataset):
return fhash(self.node._name, self.by)
return fhash(self.node.signature(), self.by)
return fhash(self.node._name, self.by, self.window)
return fhash(self.node.signature(), self.by, self.window)

def dsschema(self):
return self.node.dsschema()
Expand Down
11 changes: 7 additions & 4 deletions fennel/datasets/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1351,7 +1351,9 @@ def extract_info(df: pd.DataFrame) -> pd.DataFrame:
"user_id": int,
},
)
ds_deduped = ds_transform.dedup(by=["user_id", "merchant_id"])
ds_deduped = ds_transform.dedup(
by=["user_id", "merchant_id"], window=Session("1d")
)
return ds_deduped.groupby("merchant_id").aggregate(
[
Count(
Expand Down Expand Up @@ -1534,12 +1536,13 @@ def extract_info(df: pd.DataFrame) -> pd.DataFrame:

operator_req = sync_request.operators[5]
o = {
"id": "c898276d34d964b833155b0e36a4ba2b",
"id": "4522d140b6400791dd56c6017fb720fe",
"pipelineName": "create_fraud_dataset",
"datasetName": "FraudReportAggregatedDataset",
"dedup": {
"operandId": "6158406804b946bda0c38a994229e995",
"columns": ["user_id", "merchant_id"],
"windowType": {"session": {"gap": "86400s"}},
},
"dsVersion": 1,
}
Expand All @@ -1550,12 +1553,12 @@ def extract_info(df: pd.DataFrame) -> pd.DataFrame:

operator_req = sync_request.operators[6]
o = {
"id": "45877eefa2fe6d8dbd5aba2fb07e5cb5",
"id": "70bead822edd4fd97c4198df5047511b",
"isRoot": True,
"pipelineName": "create_fraud_dataset",
"datasetName": "FraudReportAggregatedDataset",
"aggregate": {
"operandId": "c898276d34d964b833155b0e36a4ba2b",
"operandId": "4522d140b6400791dd56c6017fb720fe",
"keys": ["merchant_id"],
"specs": [
{
Expand Down
50 changes: 50 additions & 0 deletions fennel/datasets/test_schema_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,56 @@ def pipeline_dedup(cls, rating: Dataset):
)


# dedup is not supported with hopping window
def test_dedup_with_hopping_window_fails():
with pytest.raises(TypeError) as e:

@meta(owner="abhay@fennel.ai")
@dataset
class MovieStats:
movie: str = field(key=True)
rating: float
revenue: int
t: datetime

@pipeline
@inputs(MovieRating)
def pipeline_dedup(cls, rating: Dataset):
return rating.dedup(
by=[MovieRating.movie], window=Hopping("1d", "1h") # type: ignore
)

assert (
str(e.value)
== """invalid dedup operator: 'window' can either be Session or Tumbling but found Hopping"""
)


# dedup is not supported with tumbling window with lookback
def test_dedup_with_tumbling_window_with_lookback_fails():
with pytest.raises(ValueError) as e:

@meta(owner="abhay@fennel.ai")
@dataset
class MovieStats:
movie: str = field(key=True)
rating: float
revenue: int
t: datetime

@pipeline
@inputs(MovieRating)
def pipeline_dedup(cls, rating: Dataset):
return rating.dedup(
by=[MovieRating.movie], window=Tumbling("1d", lookback="1d")
)

assert (
str(e.value)
== """invalid dedup: not allowed to specify 'lookback' in the tumble window"""
)


# Schema of deduped dataset should match source dataset
def test_dedup_schema_different_fails():
with pytest.raises(TypeError) as e:
Expand Down
Loading

0 comments on commit 3b708c6

Please sign in to comment.