diff --git a/fennel/CHANGELOG.md b/fennel/CHANGELOG.md index c69d3c019..6b096b591 100644 --- a/fennel/CHANGELOG.md +++ b/fennel/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## [1.5.21] - 2024-09-12 +- Raise an error if expectations are defined on terminal datasets. + ## [1.5.20] - 2024-09-10 - Fix bug in using complex expressions in feature extractor. diff --git a/fennel/client_tests/test_dataset.py b/fennel/client_tests/test_dataset.py index f6dc6c6fe..de7bee43c 100644 --- a/fennel/client_tests/test_dataset.py +++ b/fennel/client_tests/test_dataset.py @@ -821,6 +821,10 @@ def t(df: pd.DataFrame) -> pd.DataFrame: return x.assign("rating_orig", float, lambda df: df["rating_sq"] ** 0.5) +def square(x: int) -> int: + return x**2 + + @meta(owner="test@test.com") @dataset(index=True) class MovieRatingAssign: @@ -833,9 +837,12 @@ class MovieRatingAssign: t: datetime @pipeline + @includes(square) @inputs(MovieRating) def pipeline_assign(cls, m: Dataset): - rating_sq = m.assign("rating_sq", float, lambda df: df["rating"] ** 2) + rating_sq = m.assign( + "rating_sq", float, lambda df: square(df["rating"]) + ) rating_cube = rating_sq.assign( "rating_cube", float, lambda df: df["rating_sq"] * df["rating"] ) @@ -868,7 +875,7 @@ def pipeline_assign(cls, m: Dataset): rating_into_5=(col("rating") * 5).astype(float), ) rating_transformed = rating_transformed.assign( - "some_const", str, lambda df: "some_const" + some_const=lit("some_const").astype(str) ) return rating_transformed.drop("num_ratings", "sum_ratings", "rating") diff --git a/fennel/datasets/datasets.py b/fennel/datasets/datasets.py index c5b6b4b53..de3b7e688 100644 --- a/fennel/datasets/datasets.py +++ b/fennel/datasets/datasets.py @@ -26,6 +26,8 @@ import pandas as pd from typing_extensions import Literal + +from fennel.dtypes import Continuous from fennel.expr.expr import TypedExpr from fennel.expr.visitor import ExprPrinter import fennel.gen.index_pb2 as index_proto @@ -1644,18 +1646,20 @@ def get_terminal_schema(self) -> DSSchema: def signature(self): return f"{self._dataset_name}.{self._root}" - def set_terminal_node(self, node: _Node) -> bool: + def is_terminal_node(self, node: _Node) -> bool: if node is None: raise Exception(f"Pipeline {self.name} cannot return None.") self.terminal_node = node + has_continuous_window = False if isinstance(node, Aggregate): # If any of the aggregates are exponential decay, then it is a terminal node if any(isinstance(agg, ExpDecaySum) for agg in node.aggregates): return True - return ( - isinstance(node, Aggregate) - and node.emit_strategy == EmitStrategy.Eager - ) + has_continuous_window = any( + isinstance(agg.window, Continuous) for agg in node.aggregates + ) + + return isinstance(node, Aggregate) and has_continuous_window def set_dataset_name(self, ds_name: str): self._dataset_name = ds_name @@ -1916,11 +1920,10 @@ def _get_pipelines(self) -> List[Pipeline]: f"Duplicate pipeline name {pipeline.name} for dataset {dataset_name}." ) names.add(pipeline.name) - is_terminal = pipeline.set_terminal_node( + is_terminal = pipeline.is_terminal_node( pipeline.func(self, *pipeline.inputs) ) - if is_terminal: - self.is_terminal = is_terminal + self.is_terminal = is_terminal pipelines.append(pipeline) pipelines[-1].set_dataset_name(dataset_name) @@ -1959,6 +1962,14 @@ def _get_expectations(self): expectation = getattr(method, GE_ATTR_FUNC) if expectation is None: return None + + # Check that the dataset does not have a terminal aggregate node with Continuous windows + if self.is_terminal: + raise ValueError( + f"Dataset {self._name} has a terminal aggregate node with Continuous windows, we currently dont support expectations on continuous windows." + "This is because values are materialized into buckets which are combined at read time." + ) + # Check that the expectation function only takes 1 parameter: cls. sig = inspect.signature(expectation.func) if len(sig.parameters) != 1: @@ -2648,6 +2659,7 @@ def visitAggregate(self, obj) -> DSSchema: f"Count unique aggregate `{agg}` must have `of` field." ) if not is_hashable(input_schema.get_type(agg.of)): + dtype = input_schema.get_type(agg.of) raise TypeError( f"Cannot use count unique for field `{agg.of}` of " f"type `{dtype_to_string(dtype)}`, as it is not " # type: ignore diff --git a/fennel/datasets/test_invalid_dataset.py b/fennel/datasets/test_invalid_dataset.py index 012546503..456f60fef 100644 --- a/fennel/datasets/test_invalid_dataset.py +++ b/fennel/datasets/test_invalid_dataset.py @@ -17,7 +17,7 @@ ExpDecaySum, index, ) -from fennel.dtypes import struct, Window, Continuous, Session +from fennel.dtypes import struct, Window, Continuous, Session, Hopping from fennel.expr import col from fennel.lib import ( meta, @@ -320,6 +320,78 @@ def transform(cls, rating: Dataset): ) +def test_expectations_on_aggregated_datasets(): + with pytest.raises(ValueError) as e: + + @meta(owner="test@test.com") + @dataset + class PositiveRatingActivity: + cnt_rating: int + movie: str = field(key=True) + t: datetime + + @expectations + def dataset_expectations(cls): + return [ + expect_column_values_to_be_between( + column=str(cls.cnt_rating), min_value=0, max_value=100 + ), + ] + + @pipeline + @inputs(RatingActivity) + def filter_positive_ratings(cls, rating: Dataset): + filtered_ds = rating.filter(lambda df: df["rating"] >= 3.5) + filter2 = filtered_ds.filter( + lambda df: df["movie"].isin(["Jumanji", "Titanic", "RaOne"]) + ) + return filter2.groupby("movie").aggregate( + [ + Count( + window=Continuous("forever"), + into_field=str(cls.cnt_rating), + ), + ], + ) + + assert ( + str(e.value) + == "Dataset PositiveRatingActivity has a terminal aggregate node with Continuous windows, we currently dont support expectations on continuous windows.This is because values are materialized into buckets which are combined at read time." + ) + + # Discrete window is fine + @meta(owner="test@test.com") + @dataset + class PositiveRatingActivity2: + cnt_rating: int + movie: str = field(key=True) + t: datetime + + @expectations + def dataset_expectations(cls): + return [ + expect_column_values_to_be_between( + column=str(cls.cnt_rating), min_value=0, max_value=100 + ), + ] + + @pipeline + @inputs(RatingActivity) + def filter_positive_ratings(cls, rating: Dataset): + filtered_ds = rating.filter(lambda df: df["rating"] >= 3.5) + filter2 = filtered_ds.filter( + lambda df: df["movie"].isin(["Jumanji", "Titanic", "RaOne"]) + ) + return filter2.groupby("movie").aggregate( + [ + Count( + window=Hopping("7d", "1d"), + into_field=str(cls.cnt_rating), + ), + ], + ) + + def test_incorrect_aggregate(): with pytest.raises(ValueError) as e: diff --git a/pyproject.toml b/pyproject.toml index c7d178698..9393b5543 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "fennel-ai" -version = "1.5.20" +version = "1.5.21" description = "The modern realtime feature engineering platform" authors = ["Fennel AI "] packages = [{ include = "fennel" }]