Skip to content

Commit

Permalink
expectation: Disallow expectations on terminal datasets
Browse files Browse the repository at this point in the history
  • Loading branch information
aditya-nambiar committed Sep 13, 2024
1 parent bd41ab7 commit 08ccfb1
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 15 deletions.
3 changes: 3 additions & 0 deletions fennel/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## [1.5.21] - 2024-09-12
- Raise an error if expectations are defined on terminal datasets.

## [1.5.20] - 2024-09-10
- Fix bug in using complex expressions in feature extractor.

Expand Down
11 changes: 9 additions & 2 deletions fennel/client_tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,10 @@ def t(df: pd.DataFrame) -> pd.DataFrame:
return x.assign("rating_orig", float, lambda df: df["rating_sq"] ** 0.5)


def square(x: int) -> int:
return x**2


@meta(owner="test@test.com")
@dataset(index=True)
class MovieRatingAssign:
Expand All @@ -833,9 +837,12 @@ class MovieRatingAssign:
t: datetime

@pipeline
@includes(square)
@inputs(MovieRating)
def pipeline_assign(cls, m: Dataset):
rating_sq = m.assign("rating_sq", float, lambda df: df["rating"] ** 2)
rating_sq = m.assign(
"rating_sq", float, lambda df: square(df["rating"])
)
rating_cube = rating_sq.assign(
"rating_cube", float, lambda df: df["rating_sq"] * df["rating"]
)
Expand Down Expand Up @@ -868,7 +875,7 @@ def pipeline_assign(cls, m: Dataset):
rating_into_5=(col("rating") * 5).astype(float),
)
rating_transformed = rating_transformed.assign(
"some_const", str, lambda df: "some_const"
some_const=lit("some_const").astype(str)
)
return rating_transformed.drop("num_ratings", "sum_ratings", "rating")

Expand Down
28 changes: 20 additions & 8 deletions fennel/datasets/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import pandas as pd
from typing_extensions import Literal


from fennel.dtypes import Continuous
from fennel.expr.expr import TypedExpr
from fennel.expr.visitor import ExprPrinter
import fennel.gen.index_pb2 as index_proto
Expand Down Expand Up @@ -1644,18 +1646,20 @@ def get_terminal_schema(self) -> DSSchema:
def signature(self):
return f"{self._dataset_name}.{self._root}"

def set_terminal_node(self, node: _Node) -> bool:
def is_terminal_node(self, node: _Node) -> bool:
if node is None:
raise Exception(f"Pipeline {self.name} cannot return None.")
self.terminal_node = node
has_continuous_window = False
if isinstance(node, Aggregate):
# If any of the aggregates are exponential decay, then it is a terminal node
if any(isinstance(agg, ExpDecaySum) for agg in node.aggregates):
return True
return (
isinstance(node, Aggregate)
and node.emit_strategy == EmitStrategy.Eager
)
has_continuous_window = any(
isinstance(agg.window, Continuous) for agg in node.aggregates
)

return isinstance(node, Aggregate) and has_continuous_window

def set_dataset_name(self, ds_name: str):
self._dataset_name = ds_name
Expand Down Expand Up @@ -1916,11 +1920,10 @@ def _get_pipelines(self) -> List[Pipeline]:
f"Duplicate pipeline name {pipeline.name} for dataset {dataset_name}."
)
names.add(pipeline.name)
is_terminal = pipeline.set_terminal_node(
is_terminal = pipeline.is_terminal_node(
pipeline.func(self, *pipeline.inputs)
)
if is_terminal:
self.is_terminal = is_terminal
self.is_terminal = is_terminal
pipelines.append(pipeline)
pipelines[-1].set_dataset_name(dataset_name)

Expand Down Expand Up @@ -1959,6 +1962,14 @@ def _get_expectations(self):
expectation = getattr(method, GE_ATTR_FUNC)
if expectation is None:
return None

# Check that the dataset does not have a terminal aggregate node with Continuous windows
if self.is_terminal:
raise ValueError(
f"Dataset {self._name} has a terminal aggregate node with Continuous windows, we currently dont support expectations on continuous windows."
"This is because values are materialized into buckets which are combined at read time."
)

# Check that the expectation function only takes 1 parameter: cls.
sig = inspect.signature(expectation.func)
if len(sig.parameters) != 1:
Expand Down Expand Up @@ -2648,6 +2659,7 @@ def visitAggregate(self, obj) -> DSSchema:
f"Count unique aggregate `{agg}` must have `of` field."
)
if not is_hashable(input_schema.get_type(agg.of)):
dtype = input_schema.get_type(agg.of)
raise TypeError(
f"Cannot use count unique for field `{agg.of}` of "
f"type `{dtype_to_string(dtype)}`, as it is not " # type: ignore
Expand Down
74 changes: 73 additions & 1 deletion fennel/datasets/test_invalid_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
ExpDecaySum,
index,
)
from fennel.dtypes import struct, Window, Continuous, Session
from fennel.dtypes import struct, Window, Continuous, Session, Hopping
from fennel.expr import col
from fennel.lib import (
meta,
Expand Down Expand Up @@ -320,6 +320,78 @@ def transform(cls, rating: Dataset):
)


def test_expectations_on_aggregated_datasets():
with pytest.raises(ValueError) as e:

@meta(owner="test@test.com")
@dataset
class PositiveRatingActivity:
cnt_rating: int
movie: str = field(key=True)
t: datetime

@expectations
def dataset_expectations(cls):
return [
expect_column_values_to_be_between(
column=str(cls.cnt_rating), min_value=0, max_value=100
),
]

@pipeline
@inputs(RatingActivity)
def filter_positive_ratings(cls, rating: Dataset):
filtered_ds = rating.filter(lambda df: df["rating"] >= 3.5)
filter2 = filtered_ds.filter(
lambda df: df["movie"].isin(["Jumanji", "Titanic", "RaOne"])
)
return filter2.groupby("movie").aggregate(
[
Count(
window=Continuous("forever"),
into_field=str(cls.cnt_rating),
),
],
)

assert (
str(e.value)
== "Dataset PositiveRatingActivity has a terminal aggregate node with Continuous windows, we currently dont support expectations on continuous windows.This is because values are materialized into buckets which are combined at read time."
)

# Discrete window is fine
@meta(owner="test@test.com")
@dataset
class PositiveRatingActivity2:
cnt_rating: int
movie: str = field(key=True)
t: datetime

@expectations
def dataset_expectations(cls):
return [
expect_column_values_to_be_between(
column=str(cls.cnt_rating), min_value=0, max_value=100
),
]

@pipeline
@inputs(RatingActivity)
def filter_positive_ratings(cls, rating: Dataset):
filtered_ds = rating.filter(lambda df: df["rating"] >= 3.5)
filter2 = filtered_ds.filter(
lambda df: df["movie"].isin(["Jumanji", "Titanic", "RaOne"])
)
return filter2.groupby("movie").aggregate(
[
Count(
window=Hopping("7d", "1d"),
into_field=str(cls.cnt_rating),
),
],
)


def test_incorrect_aggregate():
with pytest.raises(ValueError) as e:

Expand Down
4 changes: 1 addition & 3 deletions fennel/internal_lib/to_proto/to_proto.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,9 +498,7 @@ def source_from_ds(
all_sources: List[connectors.DataConnector] = getattr(
ds, connectors.SOURCE_FIELD
)
return [
source for source in all_sources if source.envs.is_entity_selected(env)
]
return [source for source in all_sources if source.envs.is_entity_selected(env)]


def source_proto_from_ds(
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "fennel-ai"
version = "1.5.20"
version = "1.5.21"
description = "The modern realtime feature engineering platform"
authors = ["Fennel AI <developers@fennel.ai>"]
packages = [{ include = "fennel" }]
Expand Down

0 comments on commit 08ccfb1

Please sign in to comment.