From ab24cd5ed9aaac2e66e3f11e89afd3793460c9fe Mon Sep 17 00:00:00 2001 From: Aditya Nambiar Date: Tue, 20 Aug 2024 11:40:25 -0700 Subject: [PATCH] expr: Read side expressions (#531) * expr: Read side expressions * expr: Rename F to col * feature: Bring back F for feature * Improve error messages --- docs/examples/api-reference/client/commit.py | 4 +- docs/examples/featuresets/overview.py | 12 +- .../testing-and-ci-cd/ci_cd/featuresets.py | 2 +- docs/pages/data-quality/metaflags.md | 8 +- docs/pages/orphan/composite-features.md | 20 +-- docs/pages/orphan/request-based-features.md | 12 +- fennel/CHANGELOG.md | 3 + fennel/__init__.py | 2 +- .../test_complex_autogen_extractor.py | 3 +- fennel/client_tests/test_dataset.py | 18 +-- fennel/client_tests/test_featureset.py | 104 +++++++++++++++- fennel/client_tests/test_social_network.py | 31 +---- fennel/connectors/test_connectors.py | 24 ++-- fennel/datasets/datasets.py | 4 +- fennel/datasets/test_invalid_dataset.py | 24 ++-- fennel/expr/__init__.py | 2 +- fennel/expr/expr.py | 10 +- fennel/expr/test_expr.py | 99 ++++++++++----- fennel/expr/visitor.py | 84 +++++++++++++ fennel/featuresets/featureset.py | 99 +++++++++++++-- fennel/featuresets/test_featureset.py | 117 +++++++++++++++++- fennel/featuresets/test_invalid_featureset.py | 71 +++++++++++ fennel/gen/featureset_pb2.py | 35 +++--- fennel/gen/featureset_pb2.pyi | 12 +- fennel/gen/pycode_pb2.pyi | 2 +- .../graph_algorithms/extractor_order.py | 6 +- fennel/internal_lib/to_proto/source_code.py | 2 + fennel/internal_lib/to_proto/to_proto.py | 13 +- fennel/testing/query_engine.py | 31 +++++ fennel/testing/test_cast_df_to_schema.py | 4 +- 30 files changed, 691 insertions(+), 167 deletions(-) diff --git a/docs/examples/api-reference/client/commit.py b/docs/examples/api-reference/client/commit.py index 035362822..21dc6fac2 100644 --- a/docs/examples/api-reference/client/commit.py +++ b/docs/examples/api-reference/client/commit.py @@ -61,7 +61,7 @@ def test_incremental(client): # docsnip incremental from fennel.datasets import dataset, field from fennel.connectors import source, Webhook - from fennel.featuresets import featureset, feature, extractor + from fennel.featuresets import featureset, feature as F, extractor from fennel.lib import inputs, outputs webhook = Webhook(name="some_webhook") @@ -82,7 +82,7 @@ class Transaction: @featureset class TransactionFeatures: txid: int - amount: int = feature(Transaction.amount, default=0) + amount: int = F(Transaction.amount, default=0) amount_is_high: bool @extractor(env="bronze") diff --git a/docs/examples/featuresets/overview.py b/docs/examples/featuresets/overview.py index 21dcfb514..947ed159c 100644 --- a/docs/examples/featuresets/overview.py +++ b/docs/examples/featuresets/overview.py @@ -13,7 +13,7 @@ def test_featureset_overview(): # docsnip featureset - from fennel.featuresets import featureset, extractor + from fennel.featuresets import featureset, extractor, feature as F from fennel.lib import inputs, outputs @featureset @@ -75,7 +75,7 @@ def fn(cls, ts: pd.Series, durations: pd.Series) -> pd.Series: @mock def test_featureset_auto_extractors(client): from fennel.datasets import dataset, field - from fennel.featuresets import featureset, extractor + from fennel.featuresets import featureset, extractor, feature as F from fennel.lib import inputs, outputs from fennel.connectors import source, Webhook @@ -94,7 +94,7 @@ class Movie: class MovieFeatures: id: int # docsnip-highlight next-line - duration: int = feature(Movie.duration, default=-1) + duration: int = F(Movie.duration, default=-1) over_2hrs: bool @extractor @@ -148,7 +148,7 @@ def is_over_2hrs(cls, ts: pd.Series, durations: pd.Series) -> pd.Series: @mock def test_featureset_alias(client): - from fennel.featuresets import featureset, feature, extractor + from fennel.featuresets import featureset, feature as F, extractor from fennel.lib import inputs, outputs from fennel.connectors import source, Webhook @@ -168,8 +168,8 @@ class Request: @featureset class MovieFeatures: - id: int = feature(Request.movie_id) # docsnip-highlight - duration: int = feature(Movie.duration, default=-1) + id: int = F(Request.movie_id) # docsnip-highlight + duration: int = F(Movie.duration, default=-1) # /docsnip diff --git a/docs/examples/testing-and-ci-cd/ci_cd/featuresets.py b/docs/examples/testing-and-ci-cd/ci_cd/featuresets.py index 7aabc8354..f550aad5c 100644 --- a/docs/examples/testing-and-ci-cd/ci_cd/featuresets.py +++ b/docs/examples/testing-and-ci-cd/ci_cd/featuresets.py @@ -7,7 +7,7 @@ # docsnip gh_action_featureset -from fennel import featureset, feature as F +from fennel.featuresets import featureset, feature as F @featureset diff --git a/docs/pages/data-quality/metaflags.md b/docs/pages/data-quality/metaflags.md index 8c119b0ce..79d90b8cf 100644 --- a/docs/pages/data-quality/metaflags.md +++ b/docs/pages/data-quality/metaflags.md @@ -36,10 +36,10 @@ class User: @meta(owner='feed-team@xyz.ai') @featureset class UserFeatures: - uid: int = feature() - zip: str = feature().meta(tags=['PII']) - bmi: float = feature().meta(owner='alan@xyz.ai') - bmr: float = feature().meta(deprecated=True) + uid: int = F() + zip: str = F().meta(tags=['PII']) + bmi: float = F().meta(owner='alan@xyz.ai') + bmr: float = F().meta(deprecated=True) .. @meta(description='based on algorithm specified here: bit.ly/xyy123') diff --git a/docs/pages/orphan/composite-features.md b/docs/pages/orphan/composite-features.md index 6d56d7308..4e6ced5da 100644 --- a/docs/pages/orphan/composite-features.md +++ b/docs/pages/orphan/composite-features.md @@ -11,14 +11,14 @@ In many cases, it's important to write features about a composite of two or more ```python @featureset class User: - id: int = feature() - name: str = feature() + id: int = F() + name: str = F() .. @featureset class Post: - id: int = feature() - creator_uid: int = feature() + id: int = F() + creator_uid: int = F() ... @extractor @@ -29,9 +29,9 @@ class Post: @featureset class UserCreator: # describes features for (uid, creator_uid) pairs - viewer: int = feature() - creator: int = feature() - affinity: float = feature() + viewer: int = F() + creator: int = F() + affinity: float = F() ... @extractor @@ -43,9 +43,9 @@ class UserCreator: @featureset class UserPost: # describes features for (uid, pid) pairs - uid: int = feature() - pid: int = feature() - viewer_author_affinity = feature() + uid: int = F() + pid: int = F() + viewer_author_affinity = F() ... @extractor diff --git a/docs/pages/orphan/request-based-features.md b/docs/pages/orphan/request-based-features.md index 55b026e17..85a4a7029 100644 --- a/docs/pages/orphan/request-based-features.md +++ b/docs/pages/orphan/request-based-features.md @@ -13,16 +13,16 @@ are very naturally modeled in Fennel. Let's look at one good way of doing this: ```python @featureset class SearchRequest: - time: datetime = feature() - ip: str = feature() - device_type: str = feature() - query: str = feature() + time: datetime = F() + ip: str = F() + device_type: str = F() + query: str = F() @featureset class UserFeatures: - uid: int = feature() + uid: int = F() ... - ctr_by_device_type: float = feature() + ctr_by_device_type: float = F() .. @extractor diff --git a/fennel/CHANGELOG.md b/fennel/CHANGELOG.md index 48eafd34d..d7d6f264e 100644 --- a/fennel/CHANGELOG.md +++ b/fennel/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## [1.5.6] - 2024-08-20 +- Add support for expression based extractors + ## [1.5.5] - 2024-08-20 - Enable discrete aggregation with lookback diff --git a/fennel/__init__.py b/fennel/__init__.py index 57de1f18e..cbe921db3 100644 --- a/fennel/__init__.py +++ b/fennel/__init__.py @@ -15,5 +15,5 @@ AggregateType, Stddev, ) -from fennel.featuresets import featureset, feature, extractor +from fennel.featuresets import featureset, feature as F, extractor from fennel.lib import meta diff --git a/fennel/client_tests/test_complex_autogen_extractor.py b/fennel/client_tests/test_complex_autogen_extractor.py index 60d5ef8ce..b869b2388 100644 --- a/fennel/client_tests/test_complex_autogen_extractor.py +++ b/fennel/client_tests/test_complex_autogen_extractor.py @@ -5,7 +5,8 @@ import pytest from dateutil.relativedelta import relativedelta # type: ignore -from fennel import meta, Count, featureset, feature as F, extractor +from fennel import meta, Count, featureset, extractor +from fennel.featuresets import feature as F from fennel.connectors import Webhook from fennel.connectors import source from fennel.datasets import dataset, field, pipeline, Dataset diff --git a/fennel/client_tests/test_dataset.py b/fennel/client_tests/test_dataset.py index ac40068b9..6001c994d 100644 --- a/fennel/client_tests/test_dataset.py +++ b/fennel/client_tests/test_dataset.py @@ -12,7 +12,7 @@ import fennel._vendor.requests as requests from fennel.connectors import source, Webhook, ref -from fennel.expr import F +from fennel.expr import col from fennel.datasets import ( dataset, field, @@ -935,9 +935,11 @@ class MovieRatingAssignExpr: @inputs(MovieRating) def pipeline_assign(cls, m: Dataset): rating_transformed = m.assign( - rating_sq=(F("rating") * F("rating")).astype(float), - rating_cube=(F("rating") * F("rating") * F("rating")).astype(float), - rating_into_5=(F("rating") * 5).astype(float), + rating_sq=(col("rating") * col("rating")).astype(float), + rating_cube=(col("rating") * col("rating") * col("rating")).astype( + float + ), + rating_into_5=(col("rating") * 5).astype(float), ) return rating_transformed.drop("num_ratings", "sum_ratings", "rating") @@ -1873,11 +1875,11 @@ class PositiveRatingActivityExpr: @pipeline @inputs(RatingActivity) def filter_positive_ratings(cls, rating: Dataset): - filtered_ds = rating.filter(F("rating") >= 3.5) + filtered_ds = rating.filter(col("rating") >= 3.5) filter2 = filtered_ds.filter( - (F("movie") == "Jumanji") - | (F("movie") == "Titanic") - | (F("movie") == "RaOne") + (col("movie") == "Jumanji") + | (col("movie") == "Titanic") + | (col("movie") == "RaOne") ) return filter2.groupby("movie").aggregate( Count(window=Continuous("forever"), into_field=str(cls.cnt_rating)), diff --git a/fennel/client_tests/test_featureset.py b/fennel/client_tests/test_featureset.py index c3e03bc70..062fa7311 100644 --- a/fennel/client_tests/test_featureset.py +++ b/fennel/client_tests/test_featureset.py @@ -19,6 +19,7 @@ expectations, expect_column_values_to_be_between, ) +from fennel.expr import col from fennel.testing import mock, log ################################################################################ @@ -94,6 +95,8 @@ class UserInfoMultipleExtractor: is_name_common: bool age_reciprocal: float age_doubled: int + age_reciprocal_expr: float = F(1 / (col("age") / (3600.0 * 24)) + 0.01) + age_double_expr: int = F(col("age") * 2) @extractor(deps=[UserInfoDataset]) # type: ignore @inputs("userid") @@ -154,7 +157,6 @@ def get_common_names(cls): class TestSimpleExtractor(unittest.TestCase): @pytest.mark.integration def test_get_age_and_name_features(self): - print("Running test_get_age_and_name_features") age = pd.Series([32, 24]) name = pd.Series(["John", "Rahul"]) assert UserInfoMultipleExtractor.all() == [ @@ -167,6 +169,8 @@ def test_get_age_and_name_features(self): "UserInfoMultipleExtractor.is_name_common", "UserInfoMultipleExtractor.age_reciprocal", "UserInfoMultipleExtractor.age_doubled", + "UserInfoMultipleExtractor.age_reciprocal_expr", + "UserInfoMultipleExtractor.age_double_expr", ] ts = pd.Series([datetime(2020, 1, 1), datetime(2020, 1, 1)]) df = UserInfoMultipleExtractor.get_age_and_name_features( @@ -235,6 +239,90 @@ def test_simple_extractor(self, client): res["UserInfoMultipleExtractor.age_doubled"].tolist(), [64, 48] ) + @pytest.mark.integration + @mock + def test_e2e_query(self, client): + client.commit( + message="some commit msg", + datasets=[UserInfoDataset], + featuresets=[UserInfoMultipleExtractor], + ) + now = datetime.now(timezone.utc) + data = [ + [18232, "John", 32, "USA", now], + [18234, "Monica", 24, "Chile", now], + ] + columns = ["user_id", "name", "age", "country", "timestamp"] + input_df = pd.DataFrame(data, columns=columns) + response = client.log("fennel_webhook", "UserInfoDataset", input_df) + assert response.status_code == requests.codes.OK, response.json() + client.sleep() + + feature_df = client.query( + outputs=[UserInfoMultipleExtractor], + inputs=[UserInfoMultipleExtractor.userid], + input_dataframe=pd.DataFrame( + {"UserInfoMultipleExtractor.userid": [18232, 18234]} + ), + ) + assert feature_df.shape == (2, 11) + assert feature_df.columns.tolist() == [ + "UserInfoMultipleExtractor.userid", + "UserInfoMultipleExtractor.name", + "UserInfoMultipleExtractor.country_geoid", + "UserInfoMultipleExtractor.age", + "UserInfoMultipleExtractor.age_squared", + "UserInfoMultipleExtractor.age_cubed", + "UserInfoMultipleExtractor.is_name_common", + "UserInfoMultipleExtractor.age_reciprocal", + "UserInfoMultipleExtractor.age_doubled", + "UserInfoMultipleExtractor.age_reciprocal_expr", + "UserInfoMultipleExtractor.age_double_expr", + ] + assert feature_df["UserInfoMultipleExtractor.userid"].tolist() == [ + 18232, + 18234, + ] + + assert feature_df["UserInfoMultipleExtractor.age"].tolist() == [32, 24] + assert feature_df["UserInfoMultipleExtractor.age_squared"].tolist() == [ + 1024, + 576, + ] + assert feature_df["UserInfoMultipleExtractor.age_cubed"].tolist() == [ + 32768, + 13824, + ] + assert feature_df[ + "UserInfoMultipleExtractor.is_name_common" + ].tolist() == [ + True, + False, + ] + expected_age_reciprocal = [2700.01, 3600.01] + assert ( + feature_df["UserInfoMultipleExtractor.age_reciprocal"].tolist() + == expected_age_reciprocal + ) + assert feature_df["UserInfoMultipleExtractor.age_doubled"].tolist() == [ + 64, + 48, + ] + assert feature_df[ + "UserInfoMultipleExtractor.country_geoid" + ].tolist() == [5, 3] + assert feature_df["UserInfoMultipleExtractor.name"].tolist() == [ + "John", + "Monica", + ] + assert ( + feature_df["UserInfoMultipleExtractor.age_reciprocal_expr"].tolist() + == expected_age_reciprocal + ) + assert feature_df[ + "UserInfoMultipleExtractor.age_double_expr" + ].tolist() == [64, 48] + @struct class Velocity: @@ -420,11 +508,23 @@ def test_dag_resolution2(self, client): {"UserInfoMultipleExtractor.userid": [18232, 18234]} ), ) - self.assertEqual(feature_df.shape, (2, 9)) + self.assertEqual(feature_df.shape, (2, 11)) self.assertEqual( list(feature_df["UserInfoMultipleExtractor.age_reciprocal"]), [2700.01, 3600.01], ) + self.assertEqual( + list(feature_df["UserInfoMultipleExtractor.age_doubled"]), + [64, 48], + ) + self.assertEqual( + list(feature_df["UserInfoMultipleExtractor.age_reciprocal_expr"]), + [2700.01, 3600.01], + ) + self.assertEqual( + list(feature_df["UserInfoMultipleExtractor.age_double_expr"]), + [64, 48], + ) @featureset diff --git a/fennel/client_tests/test_social_network.py b/fennel/client_tests/test_social_network.py index b598b8fe6..966e54d38 100644 --- a/fennel/client_tests/test_social_network.py +++ b/fennel/client_tests/test_social_network.py @@ -12,6 +12,7 @@ from fennel.featuresets import featureset, feature as F, extractor from fennel.lib import meta, inputs, outputs from fennel.testing import mock, log +from fennel.expr import col webhook = Webhook(name="fennel_webhook") @@ -148,8 +149,9 @@ class Request: class UserFeatures: user_id: str = F(Request.user_id) # type: ignore num_views: int - num_category_views: int - category_view_ratio: float + category: str = F(Request.category) # type: ignore + num_category_views: int = F(UserCategoryDataset.num_views, default=0) # type: ignore + category_view_ratio: float = F(col("num_category_views") / col("num_views")) last_viewed_post: int = F(LastViewedPost.post_id, default=-1) # type: ignore last_viewed_post2: List[int] = F( LastViewedPostByAgg.post_id, default=[-1] # type: ignore @@ -161,33 +163,8 @@ class UserFeatures: def extract_user_views(cls, ts: pd.Series, user_ids: pd.Series): views, _ = UserViewsDataset.lookup(ts, user_id=user_ids) # type: ignore views = views.fillna(0) - return views["num_views"] - @extractor(deps=[UserCategoryDataset, UserViewsDataset]) # type: ignore - @inputs(Request.user_id, Request.category) - @outputs("category_view_ratio", "num_category_views") - def extractor_category_view( - cls, - ts: pd.Series, - user_ids: pd.Series, - categories: pd.Series, - ): - category_views, _ = UserCategoryDataset.lookup( # type: ignore - ts, user_id=user_ids, category=categories - ) - views, _ = UserViewsDataset.lookup(ts, user_id=user_ids) # type: ignore - category_views = category_views.fillna(0) - views = views.fillna(0.001) - category_views["num_views"] = category_views["num_views"].astype(int) - category_view_ratio = category_views["num_views"] / views["num_views"] - return pd.DataFrame( - { - "category_view_ratio": category_view_ratio, - "num_category_views": category_views["num_views"], - } - ) - @pytest.mark.integration @mock diff --git a/fennel/connectors/test_connectors.py b/fennel/connectors/test_connectors.py index 64d7a340f..d266f73b9 100644 --- a/fennel/connectors/test_connectors.py +++ b/fennel/connectors/test_connectors.py @@ -2285,7 +2285,7 @@ class UserInfoDataset: source_request = sync_request.sources[0] s = { "table": { - "mysql_table": { + "mysqlTable": { "db": { "name": "mysql", "mysql": { @@ -2296,24 +2296,24 @@ class UserInfoDataset: "port": 3306, }, }, - "table_name": "users", - }, + "tableName": "users", + } }, "dataset": "UserInfoDataset", + "dsVersion": 1, "every": "3600s", - "cdc": "Upsert", + "cursor": "added_on", "disorder": "72000s", + "timestampField": "timestamp", + "cdc": "Upsert", "bounded": True, "idleness": "3600s", - "cursor": "added_on", - "timestamp_field": "timestamp", - "dsVersion": 1, "filter": { - "entry_point": "UserInfoDataset_wrapper_2e6e95b302_filter", - "source_code": 'lambda df: df["user_id"] == 1', - "core_code": 'lambda df: df["user_id"] == 1', - "generated_code": '\n\n@meta(owner="test@test.com")\n@dataset\nclass UserInfoDataset:\n user_id: int = field(key=True)\n name: str\n gender: str\n # Users date of birth\n dob: str\n age: int\n account_creation_date: datetime\n country: Optional[str]\n timestamp: datetime = field(timestamp=True)\n\n\n @classmethod\n def wrapper_2e6e95b302(cls, *args, **kwargs):\n _fennel_internal = lambda df: df["user_id"] == 1\n return _fennel_internal(*args, **kwargs)\n\n\ndef UserInfoDataset_wrapper_2e6e95b302(*args, **kwargs):\n _fennel_internal = UserInfoDataset.__fennel_original_cls__\n return getattr(_fennel_internal, "wrapper_2e6e95b302")(*args, **kwargs)\n\ndef UserInfoDataset_wrapper_2e6e95b302_filter(df: pd.DataFrame) -> pd.DataFrame:\n return df[UserInfoDataset_wrapper_2e6e95b302(df)]\n ', - "imports": "__fennel_gen_code__=True\nimport pandas as pd\nimport numpy as np\nimport json\nimport os\nimport sys\nfrom datetime import datetime, date\nimport time\nimport random\nimport math\nimport re\nfrom enum import Enum\nfrom typing import *\nfrom collections import defaultdict\nfrom fennel.connectors.connectors import *\nfrom fennel.datasets import *\nfrom fennel.featuresets import *\nfrom fennel.featuresets import feature as F\nfrom fennel.lib.expectations import *\nfrom fennel.internal_lib.schema import *\nfrom fennel.internal_lib.utils import *\nfrom fennel.lib.params import *\nfrom fennel.dtypes.dtypes import *\nfrom fennel.datasets.aggregate import *\nfrom fennel.lib.includes import includes\nfrom fennel.lib.metadata import meta\nfrom fennel.lib import secrets, bucketize\nfrom fennel.datasets.datasets import dataset_lookup\n", + "entryPoint": "UserInfoDataset_wrapper_2e6e95b302_filter", + "sourceCode": 'lambda df: df["user_id"] == 1', + "coreCode": 'lambda df: df["user_id"] == 1', + "generatedCode": '\n\n@meta(owner="test@test.com")\n@dataset\nclass UserInfoDataset:\n user_id: int = field(key=True)\n name: str\n gender: str\n # Users date of birth\n dob: str\n age: int\n account_creation_date: datetime\n country: Optional[str]\n timestamp: datetime = field(timestamp=True)\n\n\n @classmethod\n def wrapper_2e6e95b302(cls, *args, **kwargs):\n _fennel_internal = lambda df: df["user_id"] == 1\n return _fennel_internal(*args, **kwargs)\n\n\ndef UserInfoDataset_wrapper_2e6e95b302(*args, **kwargs):\n _fennel_internal = UserInfoDataset.__fennel_original_cls__\n return getattr(_fennel_internal, "wrapper_2e6e95b302")(*args, **kwargs)\n\ndef UserInfoDataset_wrapper_2e6e95b302_filter(df: pd.DataFrame) -> pd.DataFrame:\n return df[UserInfoDataset_wrapper_2e6e95b302(df)]\n ', + "imports": "__fennel_gen_code__=True\nimport pandas as pd\nimport numpy as np\nimport json\nimport os\nimport sys\nfrom datetime import datetime, date\nimport time\nimport random\nimport math\nimport re\nfrom enum import Enum\nfrom typing import *\nfrom collections import defaultdict\nfrom fennel.connectors.connectors import *\nfrom fennel.datasets import *\nfrom fennel.featuresets import *\nfrom fennel.featuresets import feature\nfrom fennel.featuresets import feature as F\nfrom fennel.lib.expectations import *\nfrom fennel.internal_lib.schema import *\nfrom fennel.internal_lib.utils import *\nfrom fennel.lib.params import *\nfrom fennel.dtypes.dtypes import *\nfrom fennel.datasets.aggregate import *\nfrom fennel.lib.includes import includes\nfrom fennel.lib.metadata import meta\nfrom fennel.lib import secrets, bucketize\nfrom fennel.datasets.datasets import dataset_lookup\nfrom fennel.expr import col\n", }, } expected_source_request = ParseDict(s, connector_proto.Source()) diff --git a/fennel/datasets/datasets.py b/fennel/datasets/datasets.py index 078aad15b..4140c2efe 100644 --- a/fennel/datasets/datasets.py +++ b/fennel/datasets/datasets.py @@ -256,8 +256,8 @@ def assign(self, *args, **kwargs) -> _Node: >>> ds.assign("new_column", int, lambda x: x.old_column + 1) 2. Assigning one or more columns: >>> ds.assign( - ... new_column1=(F("old_column1") + 1).astype(int), - ... new_column2=(F("old_column2") + 2).astype(int), + ... new_column1=(col("old_column1") + 1).astype(int), + ... new_column2=(col("old_column2") + 2).astype(int), ... ) """ diff --git a/fennel/datasets/test_invalid_dataset.py b/fennel/datasets/test_invalid_dataset.py index 3ab4667b8..6e55f77bf 100644 --- a/fennel/datasets/test_invalid_dataset.py +++ b/fennel/datasets/test_invalid_dataset.py @@ -18,7 +18,7 @@ index, ) from fennel.dtypes import struct, Window, Continuous, Session -from fennel.expr import F +from fennel.expr import col from fennel.lib import ( meta, inputs, @@ -240,14 +240,16 @@ class RatingActivityTransformed: @inputs(RatingActivity) def transform(cls, rating: Dataset): return rating.assign( - rating_sq=(F("rating") * F("rating")).astype(str), - movie_suffixed=F("movie").str.concat("_suffix").astype(int), + rating_sq=(col("rating") * col("rating")).astype(str), + movie_suffixed=col("movie") + .str.concat("_suffix") + .astype(int), ).drop("rating", "movie") expected_err = ( "found type errors in assign node of `RatingActivityTransformed.transform`:\n" - + "\t'rating_sq' is of type `str`, can not be cast to `float`. Full expression: `(Ref('rating') * Ref('rating'))`\n" - + "\t'movie_suffixed' is of type `int`, can not be cast to `str`. Full expression: `Ref('movie') + \"_suffix\"`" + + "\t'rating_sq' is of type `str`, can not be cast to `float`. Full expression: `(col('rating') * col('rating'))`\n" + + "\t'movie_suffixed' is of type `int`, can not be cast to `str`. Full expression: `col('movie') + \"_suffix\"`" ) assert str(e.value) == expected_err @@ -266,8 +268,10 @@ class RatingActivityTransformed2: @inputs(RatingActivity) def transform(cls, rating: Dataset): return rating.assign( - rating_sq=(F("rating") * F("rating")).astype(float), - movie_suffixed=F("movie").str.concat("_suffix").astype(str), + rating_sq=(col("rating") * col("rating")).astype(float), + movie_suffixed=col("movie") + .str.concat("_suffix") + .astype(str), ).drop("rating", "movie") assert ( @@ -289,8 +293,8 @@ class RatingActivityTransformed3: @inputs(RatingActivity) def transform(cls, rating: Dataset): return rating.assign( - rating_sq=(F("rating") % F("rating")).astype(float), - movie_suffixed=(F("movie") + "_suffix").astype(str), + rating_sq=(col("rating") % col("rating")).astype(float), + movie_suffixed=(col("movie") + "_suffix").astype(str), ).drop("rating", "movie") assert ( @@ -312,7 +316,7 @@ class RatingActivityFiltered: @pipeline @inputs(RatingActivity) def transform(cls, rating: Dataset): - return rating.filter(F("rating") + 3.5).drop("movie") + return rating.filter(col("rating") + 3.5).drop("movie") assert ( str(e.value) diff --git a/fennel/expr/__init__.py b/fennel/expr/__init__.py index 243f83e54..d1c29db03 100644 --- a/fennel/expr/__init__.py +++ b/fennel/expr/__init__.py @@ -1 +1 @@ -from fennel.expr.expr import F, lit, when, Expr +from fennel.expr.expr import col, lit, when, Expr diff --git a/fennel/expr/expr.py b/fennel/expr/expr.py index d45f93968..bf3e6a627 100644 --- a/fennel/expr/expr.py +++ b/fennel/expr/expr.py @@ -390,6 +390,12 @@ def pa_to_pd(pa_data): arrow_col = eval(proto_bytes, df_pa, proto_schema) return pa_to_pd(arrow_col) + def __str__(self) -> str: # type: ignore + from fennel.expr.visitor import ExprPrinter + + printer = ExprPrinter() + return printer.print(self.root) + class _Bool(Expr): def __init__(self, expr: Expr): @@ -759,7 +765,7 @@ def __init__(self, col: str): super(Ref, self).__init__() def __str__(self) -> str: - return f"Ref('{self._col}')" + return f"col('{self._col}')" class IsNull(Expr): @@ -801,7 +807,7 @@ def make_expr(v: Any) -> Any: ################################################################# -def F(col: str) -> Expr: +def col(col: str) -> Expr: return Ref(col) diff --git a/fennel/expr/test_expr.py b/fennel/expr/test_expr.py index f8e04bb93..30b176d09 100644 --- a/fennel/expr/test_expr.py +++ b/fennel/expr/test_expr.py @@ -4,8 +4,8 @@ from typing import Dict from fennel.datasets import dataset -from fennel.expr import F, when -from fennel.expr.visitor import ExprPrinter +from fennel.expr import col, when +from fennel.expr.visitor import ExprPrinter, FetchReferences from fennel.expr.serializer import ExprSerializer from google.protobuf.json_format import ParseDict # type: ignore from fennel.gen.expr_pb2 import Expr @@ -13,18 +13,21 @@ def test_basic_expr1(): - expr = (F("num") + F("d")).isnull() + expr = (col("num") + col("d")).isnull() df = pd.DataFrame({"num": [1, 2, 3, 4], "d": [5, 6, 7, 8]}) assert expr.typeof({"num": int, "d": int}) == bool ret = expr.eval(df, {"num": int, "d": int}) assert ret.tolist() == [False, False, False, False] + ref_extractor = FetchReferences() + ref_extractor.visit(expr.root) + assert ref_extractor.refs == {"num", "d"} def test_basic_expr2(): - expr = F("a") + F("b") + 3 + expr = col("a") + col("b") + 3 printer = ExprPrinter() - expected = "((Ref('a') + Ref('b')) + 3)" + expected = "((col('a') + col('b')) + 3)" assert expected == printer.print(expr.root) serializer = ExprSerializer() proto_expr = serializer.serialize(expr.root) @@ -58,11 +61,15 @@ class TestDataset: assert ret.tolist() == [9, 11, 13, 15] assert expr.typeof({"a": int, "b": int}) == int + ref_extractor = FetchReferences() + ref_extractor.visit(expr.root) + assert ref_extractor.refs == {"a", "b"} + def test_math_expr(): - expr = (F("a").num.floor() + 3.2).num.ceil() + expr = (col("a").num.floor() + 3.2).num.ceil() printer = ExprPrinter() - expected = "CEIL((FLOOR(Ref('a')) + 3.2))" + expected = "CEIL((FLOOR(col('a')) + 3.2))" assert expected == printer.print(expr.root) serializer = ExprSerializer() proto_expr = serializer.serialize(expr.root) @@ -94,23 +101,37 @@ def test_math_expr(): assert ret.tolist() == [5, 6, 7, 8] assert expr.typeof({"a": float}) == int + ref_extractor = FetchReferences() + ref_extractor.visit(expr.root) + assert ref_extractor.refs == {"a"} + expr = ( - when(F("a").num.floor() > 5) - .then(F("b")) - .when(F("a") > 3) - .then(F("a")) - .otherwise(1) + when(col("a").num.floor() > 5) + .then(col("b")) + .when(col("a") > 3) + .then(col("a")) + .otherwise(1 + col("d")) + ) + df = pd.DataFrame( + { + "a": [1.4, 3.2, 6.1, 4.8], + "b": [100, 200, 300, 400], + "d": [1, 2, 3, 4], + } ) - df = pd.DataFrame({"a": [1.4, 3.2, 6.1, 4.8], "b": [100, 200, 300, 400]}) - ret = expr.eval(df, {"a": float, "b": int}) - assert ret.tolist() == [1, 3.2, 300, 4.8] - assert expr.typeof({"a": float, "b": int}) == float + ret = expr.eval(df, {"a": float, "b": int, "d": int}) + assert ret.tolist() == [2.0, 3.2, 300, 4.8] + assert expr.typeof({"a": float, "b": int, "d": int}) == float + + ref_extractor = FetchReferences() + ref_extractor.visit(expr.root) + assert ref_extractor.refs == {"a", "b", "d"} def test_bool_expr(): - expr = (F("a") == 5) | ((F("b") == "random") & (F("c") == 3.2)) + expr = (col("a") == 5) | ((col("b") == "random") & (col("c") == 3.2)) printer = ExprPrinter() - expected = """((Ref('a') == 5) or ((Ref('b') == "random") and (Ref('c') == 3.2)))""" + expected = """((col('a') == 5) or ((col('b') == "random") and (col('c') == 3.2)))""" assert expected == printer.print(expr.root) df = pd.DataFrame( @@ -124,20 +145,31 @@ def test_bool_expr(): assert ret.tolist() == [False, True, False, True] assert expr.typeof({"a": int, "b": str, "c": float}) == bool + ref_extractor = FetchReferences() + ref_extractor.visit(expr.root) + assert ref_extractor.refs == {"a", "b", "c"} + def test_str_expr(): - expr = (F("a").str.concat(F("b"))).str.lower().len().ceil() + expr = (col("a").str.concat(col("b"))).str.lower().len().ceil() printer = ExprPrinter() - expected = "CEIL(LEN(LOWER(Ref('a') + Ref('b'))))" + expected = "CEIL(LEN(LOWER(col('a') + col('b'))))" assert expected == printer.print(expr.root) + ref_extractor = FetchReferences() + ref_extractor.visit(expr.root) + assert ref_extractor.refs == {"a", "b"} expr = ( - when(((F("a").str.concat(F("b"))).str.upper()).str.contains(F("c"))) - .then(F("b")) + when( + ((col("a").str.concat(col("b"))).str.upper()).str.contains(col("c")) + ) + .then(col("b")) .otherwise("No Match") ) - expected = """WHEN CONTAINS(UPPER(Ref('a') + Ref('b')), Ref('c')) THEN Ref('b') ELSE "No Match\"""" + expected = """WHEN CONTAINS(UPPER(col('a') + col('b')), col('c')) THEN col('b') ELSE "No Match\"""" assert expected == printer.print(expr.root) + ref_extractor = FetchReferences() + assert ref_extractor.fetch(expr.root) == {"a", "b", "c"} df = pd.DataFrame( { "a": ["p", "BRandomS", "CRandomStrin", "tqz"], @@ -159,15 +191,15 @@ def test_str_expr(): ] assert expr.typeof({"a": str, "b": str, "c": str}) == str expr = ( - when(F("a").str.contains("p")) - .then(F("b")) - .when(F("b").str.contains("b")) - .then(F("a")) - .when(F("c").str.contains("C")) - .then(F("c")) + when(col("a").str.contains("p")) + .then(col("b")) + .when(col("b").str.contains("b")) + .then(col("a")) + .when(col("c").str.contains("C")) + .then(col("c")) .otherwise("No Match") ) - expected = """WHEN CONTAINS(Ref('a'), "p") THEN Ref('b') WHEN CONTAINS(Ref('b'), "b") THEN Ref('a') WHEN CONTAINS(Ref('c'), "C") THEN Ref('c') ELSE "No Match\"""" + expected = """WHEN CONTAINS(col('a'), "p") THEN col('b') WHEN CONTAINS(col('b'), "b") THEN col('a') WHEN CONTAINS(col('c'), "C") THEN col('c') ELSE "No Match\"""" assert expected == printer.print(expr.root) serializer = ExprSerializer() proto_expr = serializer.serialize(expr.root) @@ -252,13 +284,16 @@ def test_str_expr(): def test_dict_op(): - expr = (F("a").dict.get("x") + F("a").dict.get("y")).num.ceil() + F( + expr = (col("a").dict.get("x") + col("a").dict.get("y")).num.ceil() + col( "a" ).dict.len() printer = ExprPrinter() expected = ( - """(CEIL((Ref('a').get("x") + Ref('a').get("y"))) + LEN(Ref('a')))""" + """(CEIL((col('a').get("x") + col('a').get("y"))) + LEN(col('a')))""" ) + ref_extractor = FetchReferences() + ref_extractor.visit(expr.root) + assert ref_extractor.refs == {"a"} assert expected == printer.print(expr.root) serializer = ExprSerializer() proto_expr = serializer.serialize(expr.root) diff --git a/fennel/expr/visitor.py b/fennel/expr/visitor.py index 781e4a245..b668c8ccb 100644 --- a/fennel/expr/visitor.py +++ b/fennel/expr/visitor.py @@ -236,3 +236,87 @@ def visitDict(self, obj): return f"{self.visit(obj.expr)}.get('{self.visit(obj.op.key)}', {self.visit(obj.op.default)})" elif isinstance(obj.op, DictLen): return f"LEN({self.visit(obj.expr)})" + + +class FetchReferences(Visitor): + + def __init__(self): + self.refs = set() + + def fetch(self, obj): + self.visit(obj) + return self.refs + + def visitRef(self, obj): + self.refs.add(obj._col) + + def visitUnary(self, obj): + self.visit(obj.expr) + + def visitBinary(self, obj): + self.visit(obj.left) + self.visit(obj.right) + + def visitIsNull(self, obj): + self.visit(obj.expr) + + def visitFillNull(self, obj): + self.visit(obj.expr) + self.visit(obj.fill) + + def visitWhen(self, obj): + cur_when = obj + when_then_pairs: List[When, Then] = [] + while cur_when is not None: + if cur_when._then is None: + raise InvalidExprException( + f"THEN clause missing for WHEN clause {self.visit(cur_when)}" + ) + when_then_pairs.append((cur_when, cur_when._then)) + cur_when = cur_when._then._chained_when + + for when, then in when_then_pairs: + self.visit(when.expr) + self.visit(then.expr) + + if when_then_pairs[-1][1]._otherwise is not None: + self.visit(when_then_pairs[-1][1]._otherwise.expr) + + def visitThen(self, obj): + self.visit(obj.expr) + + def visitOtherwise(self, obj): + self.visit(obj.expr) + + def visitNumber(self, obj): + self.visit(obj.operand) + + def visitString(self, obj): + self.visit(obj.operand) + if isinstance(obj.op, StrContains): + self.visit(obj.op.item) + elif isinstance(obj.op, Concat): + self.visit(obj.op.other) + + def visitDict(self, obj): + self.visit(obj.expr) + if isinstance(obj.op, DictContains): + self.visit(obj.op.item) + elif isinstance(obj.op, DictGet): + self.visit(obj.op.key) + if obj.op.default is not None: + self.visit(obj.op.default) + + def visitList(self, obj): + for item in obj.items: + self.visit(item) + + def visitStruct(self, obj): + for field in obj.fields: + self.visit(field) + + def visitLiteral(self, obj): + pass + + def visitBool(self, obj): + self.visit(obj.expr) diff --git a/fennel/featuresets/featureset.py b/fennel/featuresets/featureset.py index 2bd4227f1..79a5166e8 100644 --- a/fennel/featuresets/featureset.py +++ b/fennel/featuresets/featureset.py @@ -19,11 +19,14 @@ from fennel.datasets import Dataset, Field from fennel.datasets.datasets import get_index, IndexDuration +from fennel.expr.expr import Expr +from fennel.expr.visitor import FetchReferences from fennel.gen.featureset_pb2 import ExtractorType from fennel.internal_lib.schema import ( validate_val_with_dtype, fennel_get_optional_inner, ) +from fennel.internal_lib.utils.utils import dtype_to_string from fennel.lib import FENNEL_GEN_CODE_MARKER from fennel.lib.expectations import Expectations, GE_ATTR_FUNC from fennel.lib.includes import EnvSelector @@ -74,13 +77,17 @@ def feature( ) -> T: # type: ignore if len(args) > 1: raise TypeError( - f"Please reference to only one feature or one field at a time found : {args}" + f"Please refer to only one feature/field/expression at a time found : {args}" ) if default is not None: if len(args) == 0: raise TypeError( 'Please specify a reference to a field of a dataset to use "default" param' ) + if isinstance(args[0], Expr): + raise ValueError( + f"error in expression based extractor '{args[0]}'; can not set default value for expressions, maybe use fillnull instead?" + ) if not isinstance(args[0], Field): raise TypeError( f"'Please specify a reference to a field of a dataset to use \"default\" param', found arg: `{args[0]}` and default: `{default}`" @@ -93,7 +100,10 @@ def feature( # Rest of fields filled in later ) if len(args) != 0: - feature_obj.ref = args[0] + if isinstance(args[0], Expr): + feature_obj._expr = args[0] + else: + feature_obj.ref = args[0] else: feature_obj.ref = None feature_obj.ref_default = default @@ -346,6 +356,7 @@ class Feature: _ref_default: Optional[Any] = None _ref_version: Optional[int] = None _ref_env: Optional[Union[str, List[str]]] = None + _expr: Optional[Expr] = None _name: str = "" _featureset_name: str = "" fqn_: str = "" @@ -551,6 +562,68 @@ def _add_feature_names_as_attributes(self): def all(self) -> List[Feature]: return self._features + def _get_expression_extractors(self) -> List[Extractor]: + extractors = [] + for feature in self._features: + expr = feature._expr + if expr is None: + continue + if not isinstance(expr, Expr): + raise TypeError( + f"Expected an Expr object for feature {feature.name} " + f"but found {type(expr)}" + ) + + ref_extractor = FetchReferences() + user_defined_inputs = ref_extractor.fetch(expr) + extractor = Extractor( + name=f"_fennel_expr_{feature.fqn()}", + extractor_type=ExtractorType.EXPR, + user_defined_inputs=list(user_defined_inputs), + user_defined_outputs=[feature], + version=feature.ref_version or 0, + func=None, + derived_extractor_info=None, + depends_on=None, + env=feature.ref_env, + ) + # Converting the string in extractor inputs decorator to Feature object + inputs = [] + for input in extractor.user_defined_inputs: + if isinstance(input, str): + try: + inputs.append(self._feature_map[input]) + except KeyError: + raise ValueError( + f"extractor for '{feature}' refers to feature col('{input}') not present in '{self._name}'; 'col' can only reference features from the same featureset" + ) + elif isinstance(input, Feature): + inputs.append(input) + else: + raise ValueError( + f"Parameter `{input}` is not a feature, but a " + f"`{type(input)}`, and hence not supported as an input for the extractor " + f"`{extractor.name}`" + ) + + extractor.expr = expr + extractor.inputs = inputs + input_types = {inp.name: inp.dtype for inp in inputs} + computed_dtype = expr.typeof(input_types) + if computed_dtype != feature.dtype: + raise TypeError( + f"expression '{expr}' for feature '{feature.name}' is of type '{dtype_to_string(feature.dtype)}' not '{dtype_to_string(computed_dtype)}'" + ) + extractor.featureset = self._name + extractor.outputs = [feature] + feature_meta = get_meta(feature) + if feature_meta: + extractor = cast( + Extractor, meta(**feature_meta.dict())(extractor) + ) + extractors.append(extractor) + return extractors + def _get_generated_extractors( self, ) -> List[Extractor]: @@ -574,15 +647,15 @@ def _get_generated_extractors( # aliasing if isinstance(ref, Feature): extractor = Extractor( - f"_fennel_alias_{ref.fqn()}", - ExtractorType.ALIAS, - [ref], - [feature], - feature.ref_version, - None, - None, - None, - feature.ref_env, + name=f"_fennel_alias_{ref.fqn()}", + extractor_type=ExtractorType.ALIAS, + user_defined_inputs=[ref], + user_defined_outputs=[feature], + version=feature.ref_version, + func=None, + derived_extractor_info=None, + depends_on=None, + env=feature.ref_env, ) extractor.outputs = [feature] extractor.inputs = [ref] @@ -641,6 +714,8 @@ def _get_generated_extractors( def _get_extractors(self) -> List[Extractor]: extractors = [] + # Expression based extractors + extractors.extend(list(self._get_expression_extractors())) # getting auto generated extractors for features extractors.extend(list(self._get_generated_extractors())) @@ -835,6 +910,8 @@ class Extractor: func: Optional[Callable] derived_extractor_info: Optional[DatasetLookupInfo] featureset: str + # Expression based extractors + expr: Optional[Expr] # depended on datasets: used for autogenerated extractors depends_on: List[Dataset] diff --git a/fennel/featuresets/test_featureset.py b/fennel/featuresets/test_featureset.py index 739657b60..3fd005446 100644 --- a/fennel/featuresets/test_featureset.py +++ b/fennel/featuresets/test_featureset.py @@ -11,6 +11,7 @@ from fennel.featuresets import featureset, extractor, feature as F from fennel.lib import meta, inputs, outputs, desc from fennel.testing import * +from fennel.expr import col webhook = Webhook(name="fennel_webhook") @@ -49,10 +50,16 @@ class UserInfo: # The users gender among male/female/non-binary gender: str age: int = F().meta(owner="aditya@fennel.ai") + age_sq: int = F(col("age") * col("age")) + age_double: int = F(col("age") * 2) + height: int + weight: float + bmi: float = F(col("weight") / col("height") / col("height") * 2.20462) income: int = F().meta(deprecated=True) @extractor(deps=[UserInfoDataset], version=2) @inputs(User.id, User.age) + @outputs("userid", "home_geoid", "gender", "age", "income") def get_user_info( cls, ts: pd.Series, user_id: pd.Series, user_age: pd.Series ): @@ -75,12 +82,17 @@ def get_user_info( "UserInfo.home_geoid", "UserInfo.gender", "UserInfo.age", + "UserInfo.age_sq", + "UserInfo.age_double", + "UserInfo.height", + "UserInfo.weight", + "UserInfo.bmi", "UserInfo.income", ] sync_request = view._get_sync_request_proto() assert len(sync_request.feature_sets) == 2 - assert len(sync_request.extractors) == 1 - assert len(sync_request.features) == 7 + assert len(sync_request.extractors) == 4 + assert len(sync_request.features) == 12 featureset_request = sync_request.feature_sets[0] f = { "name": "UserInfo", @@ -145,6 +157,61 @@ def get_user_info( actual_feature, expected_feature ) actual_feature = sync_request.features[4] + f = { + "name": "age_sq", + "dtype": {"int_type": {}}, + "metadata": {}, + "feature_set_name": "UserInfo", + } + expected_feature = ParseDict(f, fs_proto.Feature()) + assert actual_feature == expected_feature, error_message( + actual_feature, expected_feature + ) + actual_feature = sync_request.features[5] + f = { + "name": "age_double", + "dtype": {"int_type": {}}, + "metadata": {}, + "feature_set_name": "UserInfo", + } + expected_feature = ParseDict(f, fs_proto.Feature()) + assert actual_feature == expected_feature, error_message( + actual_feature, expected_feature + ) + actual_feature = sync_request.features[6] + f = { + "name": "height", + "dtype": {"int_type": {}}, + "metadata": {}, + "feature_set_name": "UserInfo", + } + expected_feature = ParseDict(f, fs_proto.Feature()) + assert actual_feature == expected_feature, error_message( + actual_feature, expected_feature + ) + actual_feature = sync_request.features[7] + f = { + "name": "weight", + "dtype": {"double_type": {}}, + "metadata": {}, + "feature_set_name": "UserInfo", + } + expected_feature = ParseDict(f, fs_proto.Feature()) + assert actual_feature == expected_feature, error_message( + actual_feature, expected_feature + ) + actual_feature = sync_request.features[8] + f = { + "name": "bmi", + "dtype": {"double_type": {}}, + "metadata": {}, + "feature_set_name": "UserInfo", + } + expected_feature = ParseDict(f, fs_proto.Feature()) + assert actual_feature == expected_feature, error_message( + actual_feature, expected_feature + ) + actual_feature = sync_request.features[9] f = { "name": "income", "dtype": {"int_type": {}}, @@ -157,7 +224,51 @@ def get_user_info( ) # extractors - actual_extractor = erase_extractor_pycode(sync_request.extractors[0]) + actual_extractor = sync_request.extractors[0] + e = { + "name": "_fennel_expr_UserInfo.age_sq", + "inputs": [{"feature": {"featureSetName": "UserInfo", "name": "age"}}], + "features": ["age_sq"], + "metadata": {}, + "featureSetName": "UserInfo", + "extractorType": "EXPR", + "expr": { + "binary": { + "left": {"ref": {"name": "age"}}, + "right": {"ref": {"name": "age"}}, + "op": "MUL", + } + }, + } + expected_extractor = ParseDict(e, fs_proto.Extractor()) + assert actual_extractor == expected_extractor, error_message( + actual_extractor, expected_extractor + ) + + actual_extractor = sync_request.extractors[1] + e = { + "name": "_fennel_expr_UserInfo.age_double", + "inputs": [{"feature": {"featureSetName": "UserInfo", "name": "age"}}], + "features": ["age_double"], + "metadata": {}, + "featureSetName": "UserInfo", + "extractorType": "EXPR", + "expr": { + "binary": { + "left": {"ref": {"name": "age"}}, + "right": { + "jsonLiteral": {"literal": "2", "dtype": {"intType": {}}} + }, + "op": "MUL", + } + }, + } + expected_extractor = ParseDict(e, fs_proto.Extractor()) + assert actual_extractor == expected_extractor, error_message( + actual_extractor, expected_extractor + ) + + actual_extractor = erase_extractor_pycode(sync_request.extractors[3]) e = { "name": "get_user_info", "datasets": ["UserInfoDataset"], diff --git a/fennel/featuresets/test_invalid_featureset.py b/fennel/featuresets/test_invalid_featureset.py index 6de383b8b..13bd08e66 100644 --- a/fennel/featuresets/test_invalid_featureset.py +++ b/fennel/featuresets/test_invalid_featureset.py @@ -10,6 +10,7 @@ from fennel.datasets import dataset, field from fennel.featuresets import featureset, extractor, feature as F from fennel.lib import inputs, outputs +from fennel.expr import col # noinspection PyUnresolvedReferences from fennel.testing import * @@ -385,3 +386,73 @@ class B: str(e.value) == "'Please specify a reference to a field of a dataset to use \"default\" param', found arg: `user_id` and default: `0`" ) + + +@mock +def test_invalid_expr_feature(client): + + # Using a feature that is not defined in the featureset + with pytest.raises(ValueError) as e: + + @featureset + class UserInfo3: + user_id: int + home_geoid: int + age: int = F(UserInfoDataset.age, default=0) + age_squared: int = F(col("Age") * col("Age")) + credit_score: int + + assert ( + str(e.value) + == "extractor for 'age_squared' refers to feature col('Age') not present in 'UserInfo3'; 'col' can only reference features from the same featureset" + ) + + # Using default value for an expression feature + with pytest.raises(ValueError) as e: + + @featureset + class UserInfo4: + user_id: int + home_geoid: int + age: int = F(UserInfoDataset.age, default=0) + age_squared: int = F(col("age") * col("age"), default=0) + credit_score: int + + assert ( + str(e.value) + == "error in expression based extractor 'col('age') * col('age')'; can not set default value for expressions, maybe use fillnull instead?" + ) + + # Incorrect type for an expression feature + with pytest.raises(TypeError) as e: + + @featureset + class UserInfo5: + user_id: int + home_geoid: int + age: int = F(UserInfoDataset.age, default=0) + age_squared: str = F(col("age") * col("age")) + credit_score: int + + assert ( + str(e.value) + == "expression 'col('age') * col('age')' for feature 'age_squared' is of type 'str' not 'int'" + ) + + # Using dataset field in expression feature + with pytest.raises(ValueError) as e: + + @featureset + class UserInfo6: + user_id: int + home_geoid: int + age: int = F(UserInfoDataset.age, default=0) + age_squared: int = F( + col("UserInfoDataset.age") * col("UserInfoDataset.age") + ) + credit_score: int + + assert ( + str(e.value) + == "extractor for 'age_squared' refers to feature col('UserInfoDataset.age') not present in 'UserInfo6'; 'col' can only reference features from the same featureset" + ) diff --git a/fennel/gen/featureset_pb2.py b/fennel/gen/featureset_pb2.py index 741675039..b72b61600 100644 --- a/fennel/gen/featureset_pb2.py +++ b/fennel/gen/featureset_pb2.py @@ -14,29 +14,30 @@ import fennel.gen.metadata_pb2 as metadata__pb2 import fennel.gen.schema_pb2 as schema__pb2 import fennel.gen.pycode_pb2 as pycode__pb2 +import fennel.gen.expr_pb2 as expr__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x10\x66\x65\x61tureset.proto\x12\x17\x66\x65nnel.proto.featureset\x1a\x0emetadata.proto\x1a\x0cschema.proto\x1a\x0cpycode.proto\"\x8c\x01\n\x0e\x43oreFeatureset\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x31\n\x08metadata\x18\x02 \x01(\x0b\x32\x1f.fennel.proto.metadata.Metadata\x12+\n\x06pycode\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCode\x12\x0c\n\x04tags\x18\x04 \x03(\t\"\xa0\x01\n\x07\x46\x65\x61ture\x12\x0c\n\x04name\x18\x01 \x01(\t\x12,\n\x05\x64type\x18\x02 \x01(\x0b\x32\x1d.fennel.proto.schema.DataType\x12\x31\n\x08metadata\x18\x03 \x01(\x0b\x32\x1f.fennel.proto.metadata.Metadata\x12\x18\n\x10\x66\x65\x61ture_set_name\x18\x04 \x01(\t\x12\x0c\n\x04tags\x18\x05 \x03(\t\"S\n\x0f\x46ieldLookupInfo\x12)\n\x05\x66ield\x18\x01 \x01(\x0b\x32\x1a.fennel.proto.schema.Field\x12\x15\n\rdefault_value\x18\x03 \x01(\t\"\xa0\x03\n\tExtractor\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x10\n\x08\x64\x61tasets\x18\x02 \x03(\t\x12.\n\x06inputs\x18\x03 \x03(\x0b\x32\x1e.fennel.proto.featureset.Input\x12\x10\n\x08\x66\x65\x61tures\x18\x04 \x03(\t\x12\x31\n\x08metadata\x18\x05 \x01(\x0b\x32\x1f.fennel.proto.metadata.Metadata\x12\x0f\n\x07version\x18\x06 \x01(\x05\x12+\n\x06pycode\x18\x07 \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCode\x12\x18\n\x10\x66\x65\x61ture_set_name\x18\x08 \x01(\t\x12>\n\x0e\x65xtractor_type\x18\t \x01(\x0e\x32&.fennel.proto.featureset.ExtractorType\x12>\n\nfield_info\x18\n \x01(\x0b\x32(.fennel.proto.featureset.FieldLookupInfoH\x00\x12\x0c\n\x04tags\x18\x0b \x03(\tB\x18\n\x16\x64\x65rived_extractor_info\"s\n\x05Input\x12\x37\n\x07\x66\x65\x61ture\x18\x01 \x01(\x0b\x32&.fennel.proto.featureset.Input.Feature\x1a\x31\n\x07\x46\x65\x61ture\x12\x18\n\x10\x66\x65\x61ture_set_name\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\"\xad\x01\n\x05Model\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x30\n\x06inputs\x18\x02 \x03(\x0b\x32 .fennel.proto.featureset.Feature\x12\x31\n\x07outputs\x18\x03 \x03(\x0b\x32 .fennel.proto.featureset.Feature\x12\x31\n\x08metadata\x18\x04 \x01(\x0b\x32\x1f.fennel.proto.metadata.Metadata*3\n\rExtractorType\x12\x0b\n\x07PY_FUNC\x10\x00\x12\n\n\x06LOOKUP\x10\x01\x12\t\n\x05\x41LIAS\x10\x02\x62\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x10\x66\x65\x61tureset.proto\x12\x17\x66\x65nnel.proto.featureset\x1a\x0emetadata.proto\x1a\x0cschema.proto\x1a\x0cpycode.proto\x1a\nexpr.proto\"\x8c\x01\n\x0e\x43oreFeatureset\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x31\n\x08metadata\x18\x02 \x01(\x0b\x32\x1f.fennel.proto.metadata.Metadata\x12+\n\x06pycode\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCode\x12\x0c\n\x04tags\x18\x04 \x03(\t\"\xa0\x01\n\x07\x46\x65\x61ture\x12\x0c\n\x04name\x18\x01 \x01(\t\x12,\n\x05\x64type\x18\x02 \x01(\x0b\x32\x1d.fennel.proto.schema.DataType\x12\x31\n\x08metadata\x18\x03 \x01(\x0b\x32\x1f.fennel.proto.metadata.Metadata\x12\x18\n\x10\x66\x65\x61ture_set_name\x18\x04 \x01(\t\x12\x0c\n\x04tags\x18\x05 \x03(\t\"S\n\x0f\x46ieldLookupInfo\x12)\n\x05\x66ield\x18\x01 \x01(\x0b\x32\x1a.fennel.proto.schema.Field\x12\x15\n\rdefault_value\x18\x03 \x01(\t\"\xc7\x03\n\tExtractor\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x10\n\x08\x64\x61tasets\x18\x02 \x03(\t\x12.\n\x06inputs\x18\x03 \x03(\x0b\x32\x1e.fennel.proto.featureset.Input\x12\x10\n\x08\x66\x65\x61tures\x18\x04 \x03(\t\x12\x31\n\x08metadata\x18\x05 \x01(\x0b\x32\x1f.fennel.proto.metadata.Metadata\x12\x0f\n\x07version\x18\x06 \x01(\x05\x12+\n\x06pycode\x18\x07 \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCode\x12\x18\n\x10\x66\x65\x61ture_set_name\x18\x08 \x01(\t\x12>\n\x0e\x65xtractor_type\x18\t \x01(\x0e\x32&.fennel.proto.featureset.ExtractorType\x12>\n\nfield_info\x18\n \x01(\x0b\x32(.fennel.proto.featureset.FieldLookupInfoH\x00\x12%\n\x04\x65xpr\x18\x0c \x01(\x0b\x32\x17.fennel.proto.expr.Expr\x12\x0c\n\x04tags\x18\x0b \x03(\tB\x18\n\x16\x64\x65rived_extractor_info\"s\n\x05Input\x12\x37\n\x07\x66\x65\x61ture\x18\x01 \x01(\x0b\x32&.fennel.proto.featureset.Input.Feature\x1a\x31\n\x07\x46\x65\x61ture\x12\x18\n\x10\x66\x65\x61ture_set_name\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\"\xad\x01\n\x05Model\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x30\n\x06inputs\x18\x02 \x03(\x0b\x32 .fennel.proto.featureset.Feature\x12\x31\n\x07outputs\x18\x03 \x03(\x0b\x32 .fennel.proto.featureset.Feature\x12\x31\n\x08metadata\x18\x04 \x01(\x0b\x32\x1f.fennel.proto.metadata.Metadata*=\n\rExtractorType\x12\x0b\n\x07PY_FUNC\x10\x00\x12\n\n\x06LOOKUP\x10\x01\x12\t\n\x05\x41LIAS\x10\x02\x12\x08\n\x04\x45XPR\x10\x03\x62\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'featureset_pb2', _globals) if _descriptor._USE_C_DESCRIPTORS == False: DESCRIPTOR._options = None - _globals['_EXTRACTORTYPE']._serialized_start=1192 - _globals['_EXTRACTORTYPE']._serialized_end=1243 - _globals['_COREFEATURESET']._serialized_start=90 - _globals['_COREFEATURESET']._serialized_end=230 - _globals['_FEATURE']._serialized_start=233 - _globals['_FEATURE']._serialized_end=393 - _globals['_FIELDLOOKUPINFO']._serialized_start=395 - _globals['_FIELDLOOKUPINFO']._serialized_end=478 - _globals['_EXTRACTOR']._serialized_start=481 - _globals['_EXTRACTOR']._serialized_end=897 - _globals['_INPUT']._serialized_start=899 - _globals['_INPUT']._serialized_end=1014 - _globals['_INPUT_FEATURE']._serialized_start=965 - _globals['_INPUT_FEATURE']._serialized_end=1014 - _globals['_MODEL']._serialized_start=1017 - _globals['_MODEL']._serialized_end=1190 + _globals['_EXTRACTORTYPE']._serialized_start=1243 + _globals['_EXTRACTORTYPE']._serialized_end=1304 + _globals['_COREFEATURESET']._serialized_start=102 + _globals['_COREFEATURESET']._serialized_end=242 + _globals['_FEATURE']._serialized_start=245 + _globals['_FEATURE']._serialized_end=405 + _globals['_FIELDLOOKUPINFO']._serialized_start=407 + _globals['_FIELDLOOKUPINFO']._serialized_end=490 + _globals['_EXTRACTOR']._serialized_start=493 + _globals['_EXTRACTOR']._serialized_end=948 + _globals['_INPUT']._serialized_start=950 + _globals['_INPUT']._serialized_end=1065 + _globals['_INPUT_FEATURE']._serialized_start=1016 + _globals['_INPUT_FEATURE']._serialized_end=1065 + _globals['_MODEL']._serialized_start=1068 + _globals['_MODEL']._serialized_end=1241 # @@protoc_insertion_point(module_scope) diff --git a/fennel/gen/featureset_pb2.pyi b/fennel/gen/featureset_pb2.pyi index 0b4c0edd2..e511ac948 100644 --- a/fennel/gen/featureset_pb2.pyi +++ b/fennel/gen/featureset_pb2.pyi @@ -4,6 +4,7 @@ isort:skip_file """ import builtins import collections.abc +import expr_pb2 import google.protobuf.descriptor import google.protobuf.internal.containers import google.protobuf.internal.enum_type_wrapper @@ -31,6 +32,7 @@ class _ExtractorTypeEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._ """user supplied python extractor function""" LOOKUP: _ExtractorType.ValueType # 1 ALIAS: _ExtractorType.ValueType # 2 + EXPR: _ExtractorType.ValueType # 3 class ExtractorType(_ExtractorType, metaclass=_ExtractorTypeEnumTypeWrapper): ... @@ -38,6 +40,7 @@ PY_FUNC: ExtractorType.ValueType # 0 """user supplied python extractor function""" LOOKUP: ExtractorType.ValueType # 1 ALIAS: ExtractorType.ValueType # 2 +EXPR: ExtractorType.ValueType # 3 global___ExtractorType = ExtractorType @typing_extensions.final @@ -134,6 +137,7 @@ class Extractor(google.protobuf.message.Message): FEATURE_SET_NAME_FIELD_NUMBER: builtins.int EXTRACTOR_TYPE_FIELD_NUMBER: builtins.int FIELD_INFO_FIELD_NUMBER: builtins.int + EXPR_FIELD_NUMBER: builtins.int TAGS_FIELD_NUMBER: builtins.int name: builtins.str @property @@ -157,6 +161,9 @@ class Extractor(google.protobuf.message.Message): required iff extractor_type == LOOKUP """ @property + def expr(self) -> expr_pb2.Expr: + """required iff extractor_type == EXPR""" + @property def tags(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: ... def __init__( self, @@ -171,10 +178,11 @@ class Extractor(google.protobuf.message.Message): feature_set_name: builtins.str = ..., extractor_type: global___ExtractorType.ValueType = ..., field_info: global___FieldLookupInfo | None = ..., + expr: expr_pb2.Expr | None = ..., tags: collections.abc.Iterable[builtins.str] | None = ..., ) -> None: ... - def HasField(self, field_name: typing_extensions.Literal["derived_extractor_info", b"derived_extractor_info", "field_info", b"field_info", "metadata", b"metadata", "pycode", b"pycode"]) -> builtins.bool: ... - def ClearField(self, field_name: typing_extensions.Literal["datasets", b"datasets", "derived_extractor_info", b"derived_extractor_info", "extractor_type", b"extractor_type", "feature_set_name", b"feature_set_name", "features", b"features", "field_info", b"field_info", "inputs", b"inputs", "metadata", b"metadata", "name", b"name", "pycode", b"pycode", "tags", b"tags", "version", b"version"]) -> None: ... + def HasField(self, field_name: typing_extensions.Literal["derived_extractor_info", b"derived_extractor_info", "expr", b"expr", "field_info", b"field_info", "metadata", b"metadata", "pycode", b"pycode"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["datasets", b"datasets", "derived_extractor_info", b"derived_extractor_info", "expr", b"expr", "extractor_type", b"extractor_type", "feature_set_name", b"feature_set_name", "features", b"features", "field_info", b"field_info", "inputs", b"inputs", "metadata", b"metadata", "name", b"name", "pycode", b"pycode", "tags", b"tags", "version", b"version"]) -> None: ... def WhichOneof(self, oneof_group: typing_extensions.Literal["derived_extractor_info", b"derived_extractor_info"]) -> typing_extensions.Literal["field_info"] | None: ... global___Extractor = Extractor diff --git a/fennel/gen/pycode_pb2.pyi b/fennel/gen/pycode_pb2.pyi index 9b997c4f8..3ec819c89 100644 --- a/fennel/gen/pycode_pb2.pyi +++ b/fennel/gen/pycode_pb2.pyi @@ -100,7 +100,7 @@ class PyCode(google.protobuf.message.Message): global___PyCode = PyCode @typing_extensions.final -class UDF(google.protobuf.message.Message): +class UDcol(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor PYCODE_FIELD_NUMBER: builtins.int diff --git a/fennel/internal_lib/graph_algorithms/extractor_order.py b/fennel/internal_lib/graph_algorithms/extractor_order.py index 378e9c021..6f8edce77 100644 --- a/fennel/internal_lib/graph_algorithms/extractor_order.py +++ b/fennel/internal_lib/graph_algorithms/extractor_order.py @@ -77,18 +77,18 @@ def get_feature_vertex(f: Union[Feature, Featureset, str]) -> str: if isinstance(f, Feature): featureset = f.featureset_name caps_only = "".join([c for c in featureset if c.isupper()]) - return f"F({caps_only}.{f.name})" + return f"col({caps_only}.{f.name})" elif isinstance(f, Featureset): raise ValueError( "Featureset is not supported as an input to an extractor" ) elif type(f) is tuple: - return "DF(" + ",".join([get_feature_vertex(f) for f in f]) + ")" + return "Dcol(" + ",".join([get_feature_vertex(f) for f in f]) + ")" elif isinstance(f, str): featureset = f.split(".")[0] caps_only = "".join([c for c in featureset if c.isupper()]) feature_name = f.split(".")[1] - return f"F({caps_only}.{feature_name})" + return f"col({caps_only}.{feature_name})" raise ValueError(f"Unknown type {type(f)}") vertices = set() diff --git a/fennel/internal_lib/to_proto/source_code.py b/fennel/internal_lib/to_proto/source_code.py index 63cb83ad6..dc275f876 100644 --- a/fennel/internal_lib/to_proto/source_code.py +++ b/fennel/internal_lib/to_proto/source_code.py @@ -402,6 +402,7 @@ def get_all_imports() -> str: "from fennel.connectors.connectors import *", "from fennel.datasets import *", "from fennel.featuresets import *", + "from fennel.featuresets import feature", "from fennel.featuresets import feature as F", "from fennel.lib.expectations import *", "from fennel.internal_lib.schema import *", @@ -413,6 +414,7 @@ def get_all_imports() -> str: "from fennel.lib.metadata import meta", "from fennel.lib import secrets, bucketize", "from fennel.datasets.datasets import dataset_lookup", + "from fennel.expr import col", ] gen_code_marker = f"{FENNEL_GEN_CODE_MARKER}=True\n" diff --git a/fennel/internal_lib/to_proto/to_proto.py b/fennel/internal_lib/to_proto/to_proto.py index ddc468c7c..229a84304 100644 --- a/fennel/internal_lib/to_proto/to_proto.py +++ b/fennel/internal_lib/to_proto/to_proto.py @@ -11,6 +11,7 @@ from google.protobuf.wrappers_pb2 import StringValue import fennel.connectors as connectors +from fennel.expr.serializer import ExprSerializer import fennel.gen.connector_pb2 as connector_proto import fennel.gen.dataset_pb2 as ds_proto import fennel.gen.expectations_pb2 as exp_proto @@ -508,7 +509,7 @@ def featureset_to_proto(fs: Featureset) -> fs_proto.CoreFeatureset: import numpy as np from typing import List, Dict, Tuple, Optional, Union, Any, no_type_check from fennel.featuresets import * - from fennel.featuresets import featureset, feature + from fennel.featuresets import featureset, feature as F from fennel.lib.metadata import meta from fennel.lib.includes import includes from fennel.internal_lib.schema import * @@ -623,6 +624,15 @@ def _extractor_to_proto( extractor.derived_extractor_info ) + proto_expr = None + if extractor.extractor_type == ExtractorType.EXPR: + if extractor.expr is None: + raise TypeError( + f"Expr extractor `{extractor.name}` must have an expr" + ) + serializer = ExprSerializer() + proto_expr = serializer.serialize(extractor.expr.root) + if extractor.extractor_type == ExtractorType.PY_FUNC: metadata = get_metadata_proto(extractor.func) else: @@ -641,6 +651,7 @@ def _extractor_to_proto( feature_set_name=extractor.featureset, extractor_type=extractor.extractor_type, field_info=extractor_field_info, + expr=proto_expr, ) return proto_extractor diff --git a/fennel/testing/query_engine.py b/fennel/testing/query_engine.py index 3c62142f0..03ab7ad6b 100644 --- a/fennel/testing/query_engine.py +++ b/fennel/testing/query_engine.py @@ -168,6 +168,13 @@ def run_extractors( self._check_schema_exceptions(output, dsschema, extractor.name) continue + if extractor.extractor_type == ProtoExtractorType.EXPR: + output = self._compute_expr_extractor( + extractor, intermediate_data + ) + self._check_schema_exceptions(output, dsschema, extractor.name) + continue + allowed_datasets = self._get_allowed_datasets(extractor) fennel.datasets.datasets.dataset_lookup = ( data_engine.get_dataset_lookup_impl( @@ -346,6 +353,30 @@ def _check_schema_exceptions( f"invalid schema for data: {exceptions}" ) + def _compute_expr_extractor( + self, + extractor: Extractor, + intermediate_data: Dict[str, pd.Series], + ) -> pd.Series: + if len(extractor.outputs) != 1: + raise ValueError( + f"expression based extractor outputs {len(extractor.outputs)} features, expected one" + ) + if len(extractor.depends_on) != 0: + raise ValueError( + f"extractor for feature {extractor.outputs[0]} depends on {len(extractor.depends_on)} datasets, expression based extractors can not depend on datasets" + ) + input_features = { + k.name: intermediate_data[k.fqn()] for k in extractor.inputs # type: ignore + } + expr = extractor.expr + input_schema = {f.name: f.dtype for f in extractor.inputs} + df = pd.DataFrame(input_features) + res = expr.eval(df, input_schema) # type: ignore + res.name = extractor.fqn_output_features()[0] + intermediate_data[extractor.fqn_output_features()[0]] = res + return res + def _compute_lookup_extractor( self, data_engine: DataEngine, diff --git a/fennel/testing/test_cast_df_to_schema.py b/fennel/testing/test_cast_df_to_schema.py index 8b08e1848..24086f4c3 100644 --- a/fennel/testing/test_cast_df_to_schema.py +++ b/fennel/testing/test_cast_df_to_schema.py @@ -7,7 +7,7 @@ from fennel.datasets import dataset, field from fennel.dtypes import between, oneof, regex -from fennel.featuresets import featureset, feature +from fennel.featuresets import featureset, feature as F from fennel.gen import schema_pb2 as schema_proto from fennel.internal_lib.to_proto.to_proto import fields_to_dsschema from fennel.testing import mock @@ -578,7 +578,7 @@ class UserPhone: @featureset class UserFeatures: user_id: int - latest_phone_update: Optional[datetime] = feature(UserPhone.updated_at) + latest_phone_update: Optional[datetime] = F(UserPhone.updated_at) client.commit( datasets=[UserPhone], featuresets=[UserFeatures], message="first-commit"