Skip to content

Commit

Permalink
source: support expressions in where
Browse files Browse the repository at this point in the history
  • Loading branch information
satrana42 authored and nonibansal committed Nov 12, 2024
1 parent 5933996 commit 6091464
Show file tree
Hide file tree
Showing 15 changed files with 1,977 additions and 321 deletions.
2 changes: 2 additions & 0 deletions docs/examples/api-reference/sources/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import sys
from datetime import datetime

from fennel.expr import col
from fennel.testing import mock

__owner__ = "nikhil@fennel.ai"
Expand Down Expand Up @@ -41,6 +42,7 @@ def test_source_decorator(client):
schema={"age": str},
), # converting age dtype to int
},
where=eval(col("age") >= 18, schema={"age": int}),
env="prod",
sample=Sample(0.2, using=["email"]),
bounded=True,
Expand Down
32 changes: 30 additions & 2 deletions docs/examples/concepts/source.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import sys
from datetime import datetime

from fennel.connectors import Kafka, S3, source
from fennel.connectors import Kafka, S3, eval, source
from fennel.datasets import dataset
from fennel.expr import col
from fennel.testing import mock

__owner__ = "owner@example.com"
Expand Down Expand Up @@ -70,7 +71,7 @@ class Order:


@mock
def test_filter_preproc_source(client):
def test_where_source(client):
if sys.version_info >= (3, 10):
s3 = S3(name="mys3")

Expand All @@ -94,3 +95,30 @@ class Order:
datasets=[Order],
)
# /docsnip


@mock
def test_expression_where_source(client):
if sys.version_info >= (3, 10):
s3 = S3(name="mys3")

# docsnip where
# docsnip-highlight start
@source(
s3.bucket("data", path="*"),
disorder="1w",
cdc="append",
where=eval(col("skuid") >= 1000, schema={"skuid": int}),
)
# docsnip-highlight end
@dataset
class Order:
uid: int
skuid: int
at: datetime

client.commit(
message="some commit msg",
datasets=[Order],
)
# /docsnip
9 changes: 9 additions & 0 deletions docs/pages/api-reference/decorators/source.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,15 @@ for other parent types apart from struct, please reach out to Fennel support.

</Expandable>

<Expandable title="where" type="Optional[Callable | Eval]" defaultVal="None">
When present, filters source dataset rows with the input value.

As of now there are two kinds of values of where:
* `Callable`: In this case the input is a lambda which is used to filter rows.
* `Eval`: Similar to eval value in preproc, the input here is an expression which is
used to filter rows.
</Expandable>


<Expandable title="bounded" type="bool" defaultVal="False">
When not set or set as `False`, it indicates that the source possesses infinite
Expand Down
5 changes: 3 additions & 2 deletions docs/pages/concepts/source.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,11 @@ of that column on another column.

### Where
The `where` field in the source provides a way to filter out some rows from source
during ingestion. Provide the filter criteria as a lambda function.
during ingestion. Provide the filter criteria as a lambda function or an
expression.

<pre snippet="concepts/source#where" status="success"
message="Using lambda to filter during ingestion."
message="Using lambda or expression to filter during ingestion."
></pre>

### Since
Expand Down
4 changes: 4 additions & 0 deletions fennel/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
# Changelog

## [1.5.52] - 2024-11-12
- Source: support expressions in where

## [1.5.51] - 2024-11-09
- Fix bug in mock client for session based dedup operator

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
import pandas as pd
import pytest

from fennel.connectors import Webhook, source
from fennel.connectors import Webhook, eval, source
from fennel.datasets import (
Dataset,
dataset,
field,
pipeline,
)
from fennel.expr import col
from fennel.lib import inputs, meta
from fennel.testing import mock

Expand All @@ -20,7 +21,7 @@

@pytest.mark.integration
@mock
def test_preproc_filter(client):
def test_where(client):
if sys.version_info >= (3, 10):

@meta(owner="nitin@fennel.ai")
Expand Down Expand Up @@ -91,3 +92,56 @@ def pipeline_window(cls, event: Dataset):
keys=pd.DataFrame({"user_id": [1, 2, 3, 4, 5]}),
)
assert df["age"].tolist() == [pd.NA, 1, pd.NA, pd.NA, pd.NA]


@pytest.mark.integration
@mock
def test_where_expression(client):
if sys.version_info >= (3, 10):

@meta(owner="satwant@fennel.ai")
@source(
webhook.endpoint("A1"),
cdc="append",
disorder="14d",
env="prod",
where=eval(
(col("age") > 0) & (col("age") % 100 == 1), schema={"age": int}
),
)
@dataset
class A1:
user_id: int
age: int
t: datetime

@dataset(index=True)
class A2:
user_id: int = field(key=True)
age: int
t: datetime

@pipeline
@inputs(A1)
def pipeline_window(cls, event: Dataset):
return event.groupby("user_id").latest()

client.commit(datasets=[A1, A2], message="first_commit", env="prod")

now = datetime.now(timezone.utc)
df = pd.DataFrame(
{
"user_id": [1, 1, 1, 2, 2, 3, 4, 5, 5, 5],
"age": [100, 11, 12, 1, 2, 2, 3, 3, 4, 5],
"t": [now, now, now, now, now, now, now, now, now, now],
}
)

client.log("fennel_webhook", "A1", df)
client.sleep(30)

df, _ = client.lookup(
A2,
keys=pd.DataFrame({"user_id": [1, 2, 3, 4, 5]}),
)
assert df["age"].tolist() == [pd.NA, 1, pd.NA, pd.NA, pd.NA]
1 change: 1 addition & 0 deletions fennel/connectors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
Ref,
ref,
PreProcValue,
WhereValue,
at_timestamp,
Eval,
eval,
Expand Down
9 changes: 5 additions & 4 deletions fennel/connectors/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,19 @@ class Config:
arbitrary_types_allowed = True


def ref(ref_name: str) -> PreProcValue:
def ref(ref_name: str) -> Ref:
return Ref(name=ref_name)


def eval(
eval_type: Union[Callable, Expr, TypedExpr],
schema: Optional[Dict[str, Type]] = None,
) -> PreProcValue:
) -> Eval:
return Eval(eval_type=eval_type, additional_schema=schema)


PreProcValue = Union[Ref, Any, Eval]
WhereValue = Union[Callable, Eval]


def preproc_has_indirection(preproc: Optional[Dict[str, PreProcValue]]):
Expand Down Expand Up @@ -120,7 +121,7 @@ def source(
preproc: Optional[Dict[str, PreProcValue]] = None,
bounded: bool = False,
idleness: Optional[Duration] = None,
where: Optional[Callable] = None,
where: Optional[WhereValue] = None,
sample: Optional[Union[float, Sample]] = None,
) -> Callable[[T], Any]:
"""
Expand Down Expand Up @@ -815,7 +816,7 @@ class DataConnector:
pre_proc: Optional[Dict[str, PreProcValue]] = None
bounded: bool = False
idleness: Optional[Duration] = None
where: Optional[Callable] = None
where: Optional[WhereValue] = None
sample: Optional[Union[float, Sample]] = None
how: Optional[Literal["incremental", "recreate"] | SnapshotData] = None
create: Optional[bool] = None
Expand Down
83 changes: 82 additions & 1 deletion fennel/connectors/test_connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -4188,7 +4188,7 @@ def test_valid_preproc_value():
)


def test_filter_preproc():
def test_where():
if sys.version_info >= (3, 10):

@source(
Expand Down Expand Up @@ -4261,6 +4261,87 @@ class UserInfoDataset:
)


def test_where_expression():
if sys.version_info >= (3, 10):

@source(
mysql.table(
"users",
cursor="added_on",
),
every="1h",
disorder="20h",
bounded=True,
idleness="1h",
cdc="upsert",
where=eval(col("user_id") == 1, schema={"user_id": int}),
)
@meta(owner="test@test.com")
@dataset
class UserInfoDataset:
user_id: int = field(key=True)
name: str
gender: str
# Users date of birth
dob: str
age: int
account_creation_date: datetime
country: Optional[str]
timestamp: datetime = field(timestamp=True)

view = InternalTestClient()
view.add(UserInfoDataset)
sync_request = view._get_sync_request_proto()

assert len(sync_request.sources) == 1
source_request = sync_request.sources[0]
s = {
"table": {
"mysqlTable": {
"db": {
"name": "mysql",
"mysql": {
"host": "localhost",
"database": "test",
"user": "root",
"password": "root",
"port": 3306,
},
},
"tableName": "users",
}
},
"dataset": "UserInfoDataset",
"dsVersion": 1,
"every": "3600s",
"cursor": "added_on",
"disorder": "72000s",
"timestampField": "timestamp",
"cdc": "Upsert",
"bounded": True,
"idleness": "3600s",
"filter_schema": {
"fields": [{"name": "user_id", "dtype": {"intType": {}}}]
},
"filter_expr": {
"binary": {
"left": {"ref": {"name": "user_id"}},
"right": {
"jsonLiteral": {
"literal": "1",
"dtype": {"intType": {}},
}
},
"op": "EQ",
}
},
}
expected_source_request = ParseDict(s, connector_proto.Source())
assert source_request == expected_source_request, error_message(
source_request, expected_source_request
)


def test_assign_python_preproc():
if sys.version_info >= (3, 10):

Expand Down
43 changes: 43 additions & 0 deletions fennel/connectors/test_invalid_connectors.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from datetime import datetime, timezone
from lib2to3.fixes.fix_tuple_params import tuple_name
import pandas as pd
from typing import Optional

from fennel.connectors.connectors import Protobuf, ref, eval
Expand All @@ -18,6 +19,7 @@
at_timestamp,
BigQuery,
S3Connector,
Webhook,
HTTP,
Certificate,
)
Expand All @@ -29,6 +31,10 @@
# noinspection PyUnresolvedReferences
from fennel.testing import *


__owner__ = "test@test.com"
webhook = Webhook(name="fennel_webhook")

mysql = MySQL(
name="mysql",
host="localhost",
Expand Down Expand Up @@ -1042,3 +1048,40 @@ class UserInfoDataset:
'`age` is of type `int` in Dataset `UserInfoDataset`, can not be cast to `float`. Full expression: `col("val1")`'
== str(e.value)
)


@mock
def test_invalid_where(client):

@source(
webhook.endpoint("UserInfoDataset"),
cdc="upsert",
disorder="14d",
env="prod",
where=eval(col("age") >= 18, schema={"age": float}),
)
@meta(owner="test@test.com")
@dataset
class UserInfoDataset:
user_id: int = field(key=True)
age: int
timestamp: datetime = field(timestamp=True)

client.commit(datasets=[UserInfoDataset], message="test")

with pytest.raises(Exception) as e:
now = datetime.now(timezone.utc)
df = pd.DataFrame(
{
"user_id": [1, 1, 1, 2, 2, 3, 4, 5, 5, 5],
"age": [100, 11, 12, 1, 2, 2, 3, 3, 4, 5],
"t": [now, now, now, now, now, now, now, now, now, now],
}
)
client.log("fennel_webhook", "UserInfoDataset", df)
assert (
"Schema validation failed during data insertion to `UserInfoDataset`: "
"Error using where for dataset `UserInfoDataset`: "
"Field `age` defined in schema for eval where has different type in the dataframe."
== str(e.value)
)
Loading

0 comments on commit 6091464

Please sign in to comment.