diff --git a/docs/pages/api-reference/operators/join.md b/docs/pages/api-reference/operators/join.md index 549984d66..7004cf930 100644 --- a/docs/pages/api-reference/operators/join.md +++ b/docs/pages/api-reference/operators/join.md @@ -48,7 +48,8 @@ sides must have the same data types. Optional kwarg specifying the time window relative to the left side timestamp within which the join should be performed. This can be seen as adding another -condition to join like `WHERE left_time - d1 < right_time AND right_time < left_time + d1` +condition to join like `WHERE left_time - d1 < right_time AND right_time < left_time + d2` +where (d1, d2) = within. - The first value in the tuple represents how far back in time should a join happen. The term "forever" means that we can go infinitely back in time when searching for an event to join from the left-hand side data. diff --git a/fennel/CHANGELOG.md b/fennel/CHANGELOG.md index 462489e1d..21c2eef2d 100644 --- a/fennel/CHANGELOG.md +++ b/fennel/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## [1.5.19] - 2024-09-06 +- Add ability to specify fields in join + ## [1.5.18] - 2024-09-05 - Struct initializer + arrow fixes + type promotion in assign diff --git a/fennel/client_tests/test_dataset.py b/fennel/client_tests/test_dataset.py index 6d3e1619b..d270df70b 100644 --- a/fennel/client_tests/test_dataset.py +++ b/fennel/client_tests/test_dataset.py @@ -1397,6 +1397,151 @@ def pipeline_join(cls, info: Dataset, sale: Dataset): ] +@meta(owner="test@test.com") +@source( + webhook.endpoint("MovieRevenueWithRightFields"), + disorder="14d", + cdc="upsert", +) +@dataset(index=True) +class MovieRevenueWithRightFields: + movie: oneof(str, ["Jumanji", "Titanic", "RaOne", "ABC"]) = field( # type: ignore + key=True + ) + revenue: int + extra_field: int + t: datetime + + +@meta(owner="satwant@fennel.ai") +@dataset(index=True) +class MovieStatsWithRightFields: + movie: oneof(str, ["Jumanji", "Titanic", "RaOne", "ABC"]) = field( # type: ignore + key=True + ) + rating: float + revenue_in_millions: float + t: Optional[datetime] + ts: datetime = field(timestamp=True) + + @pipeline + @inputs(MovieRating, MovieRevenueWithRightFields) + def pipeline_join(cls, rating: Dataset, revenue: Dataset): + def to_millions(df: pd.DataFrame) -> pd.DataFrame: + df[str(cls.revenue_in_millions)] = df["revenue"] / 1000000 + df[str(cls.revenue_in_millions)].fillna(-1, inplace=True) + return df[ + [ + str(cls.movie), + str(cls.t), + str(cls.ts), + str(cls.revenue_in_millions), + str(cls.rating), + ] + ] + + rating = rating.rename({"t": "ts"}) # type: ignore + c = rating.join( + revenue, how="left", on=[str(cls.movie)], fields=["revenue", "t"] + ) + # Transform provides additional columns which will be filtered out. + return c.transform( + to_millions, + schema={ + str(cls.movie): oneof( + str, ["Jumanji", "Titanic", "RaOne", "ABC"] + ), + str(cls.rating): float, + str(cls.t): Optional[datetime], + str(cls.ts): datetime, + str(cls.revenue_in_millions): float, + }, + ) + + +class TestBasicJoinWithRightFields(unittest.TestCase): + @pytest.mark.integration + @mock + def test_basic_join_with_fields(self, client): + # # Sync the dataset + client.commit( + message="msg", + datasets=[ + MovieRating, + MovieRevenueWithRightFields, + MovieStatsWithRightFields, + RatingActivity, + ], + ) + now = datetime.now(timezone.utc) + one_hour_ago = now - timedelta(hours=1) + data = [ + ["Jumanji", 4, 343, 789, one_hour_ago], + ["Titanic", 5, 729, 1232, now], + ] + columns = ["movie", "rating", "num_ratings", "sum_ratings", "t"] + df = pd.DataFrame(data, columns=columns) + response = client.log("fennel_webhook", "MovieRating", df) + assert response.status_code == requests.codes.OK, response.json() + + two_hours_ago = now - timedelta(hours=2) + data = [ + ["Jumanji", 2000000, 1, two_hours_ago], + ["Titanic", 50000000, 2, now], + ] + columns = ["movie", "revenue", "extra_field", "t"] + df = pd.DataFrame(data, columns=columns) + response = client.log( + "fennel_webhook", "MovieRevenueWithRightFields", df + ) + assert response.status_code == requests.codes.OK, response.json() + client.sleep() + + # Do some lookups to verify pipeline_join is working as expected + keys = pd.DataFrame({"movie": ["Jumanji", "Titanic"]}) + df, _ = client.lookup( + "MovieStatsWithRightFields", + keys=keys, + ) + assert df.shape == (2, 5) + assert df["movie"].tolist() == ["Jumanji", "Titanic"] + assert df["rating"].tolist() == [4, 5] + assert df["revenue_in_millions"].tolist() == [2, 50] + assert df["t"].tolist() == [two_hours_ago, now] + assert "extra_field" not in df.columns + + # Do some lookup at various timestamps in the past + ts = pd.Series([two_hours_ago, one_hour_ago, one_hour_ago, now]) + keys = pd.DataFrame( + {"movie": ["Jumanji", "Jumanji", "Titanic", "Titanic"]} + ) + df, _ = client.lookup( + "MovieStatsWithRightFields", + timestamps=ts, + keys=keys, + ) + assert df.shape == (4, 5) + assert df["movie"].tolist() == [ + "Jumanji", + "Jumanji", + "Titanic", + "Titanic", + ] + assert pd.isna(df["rating"].tolist()[0]) + assert df["rating"].tolist()[1] == 4 + assert pd.isna(df["rating"].tolist()[2]) + assert df["rating"].tolist()[3] == 5 + assert pd.isna(df["revenue_in_millions"].tolist()[0]) + assert df["revenue_in_millions"].tolist()[1] == 2 + assert pd.isna(df["revenue_in_millions"].tolist()[2]) + assert df["revenue_in_millions"].tolist()[3] == 50 + assert pd.isna(df["t"].tolist()[0]) + assert df["t"].tolist()[1] == two_hours_ago + assert pd.isna(df["t"].tolist()[2]) + assert df["t"].tolist()[3] == now + assert "extra_field" not in df.columns + + class TestBasicAggregate(unittest.TestCase): @pytest.mark.integration @mock diff --git a/fennel/client_tests/test_social_network.py b/fennel/client_tests/test_social_network.py index 966e54d38..424543b51 100644 --- a/fennel/client_tests/test_social_network.py +++ b/fennel/client_tests/test_social_network.py @@ -41,6 +41,19 @@ class PostInfo: timestamp: datetime +@source( + webhook.endpoint("PostInfoWithRightFields"), disorder="14d", cdc="upsert" +) +@dataset(index=True) +@meta(owner="data-eng@myspace.com") +class PostInfoWithRightFields: + title: str + category: str # type: ignore + post_id: int = field(key=True) + timestamp: datetime + extra_field: str + + @meta(owner="data-eng@myspace.com") @dataset @source(webhook.endpoint("ViewData"), disorder="14d", cdc="append") @@ -100,6 +113,25 @@ def count_user_views(cls, view_data: Dataset, post_info: Dataset): ) +@meta(owner="ml-eng@myspace.com") +@dataset(index=True) +class UserCategoryDatasetWithRightFields: + user_id: str = field(key=True) + category: str = field(key=True) + num_views: int + time_stamp: datetime + + @pipeline + @inputs(ViewData, PostInfoWithRightFields) + def count_user_views(cls, view_data: Dataset, post_info: Dataset): + post_info_enriched = view_data.join( + post_info, how="inner", on=["post_id"], fields=["title", "category"] + ) + return post_info_enriched.groupby("user_id", "category").aggregate( + [Count(window=Continuous("6y 8s"), into_field="num_views")] + ) + + @meta(owner="ml-eng@myspace.com") @dataset(index=True) class LastViewedPost: @@ -166,6 +198,28 @@ def extract_user_views(cls, ts: pd.Series, user_ids: pd.Series): return views["num_views"] +@meta(owner="feature-team@myspace.com") +@featureset +class UserFeaturesWithRightFields: + user_id: str = F(Request.user_id) # type: ignore + num_views: int + category: str = F(Request.category) # type: ignore + num_category_views: int = F(UserCategoryDatasetWithRightFields.num_views, default=0) # type: ignore + category_view_ratio: float = F(col("num_category_views") / col("num_views")) + last_viewed_post: int = F(LastViewedPost.post_id, default=-1) # type: ignore + last_viewed_post2: List[int] = F( + LastViewedPostByAgg.post_id, default=[-1] # type: ignore + ) + + @extractor(deps=[UserViewsDataset]) # type: ignore + @inputs(Request.user_id) + @outputs("num_views") + def extract_user_views(cls, ts: pd.Series, user_ids: pd.Series): + views, _ = UserViewsDataset.lookup(ts, user_id=user_ids) # type: ignore + views = views.fillna(0) + return views["num_views"] + + @pytest.mark.integration @mock def test_social_network(client): @@ -259,6 +313,107 @@ def test_social_network(client): assert df.shape == (1998, 4) +@pytest.mark.integration +@mock +def test_social_network_with_fields(client): + client.commit( + message="social network", + datasets=[ + UserInfo, + PostInfoWithRightFields, + ViewData, + CityInfo, + UserViewsDataset, + UserCategoryDatasetWithRightFields, + LastViewedPost, + LastViewedPostByAgg, + ], + featuresets=[Request, UserFeaturesWithRightFields], + ) + user_data_df = pd.read_csv("fennel/client_tests/data/user_data.csv") + post_data_df = pd.read_csv("fennel/client_tests/data/post_data.csv") + post_data_len = len(post_data_df.index) + post_data_df["extra_field"] = list(range(0, post_data_len)) + view_data_df = pd.read_csv("fennel/client_tests/data/view_data_sampled.csv") + ts = "2018-01-01 00:00:00" + user_data_df["timestamp"] = ts + post_data_df["timestamp"] = ts + view_data_df["time_stamp"] = view_data_df["time_stamp"].apply( + lambda x: datetime.strptime(x, "%m/%d/%Y %H:%M %p") + ) + # # Advance all timestamps by 6 years + user_data_df["timestamp"] = pd.to_datetime( + user_data_df["timestamp"] + ) + pd.DateOffset(years=4) + post_data_df["timestamp"] = pd.to_datetime( + post_data_df["timestamp"] + ) + pd.DateOffset(years=4) + view_data_df["time_stamp"] = view_data_df["time_stamp"] + pd.DateOffset( + years=4 + ) + + res = client.log("fennel_webhook", "UserInfo", user_data_df) + assert res.status_code == requests.codes.OK, res.json() + res = client.log("fennel_webhook", "PostInfoWithRightFields", post_data_df) + assert res.status_code == requests.codes.OK, res.json() + res = client.log("fennel_webhook", "ViewData", view_data_df) + assert res.status_code == requests.codes.OK, res.json() + + if client.is_integration_client(): + client.sleep(120) + + keys = pd.DataFrame( + { + "city": ["Wufeng", "Coyaima", "San Angelo"], + "gender": ["Male", "Male", "Female"], + } + ) + + df, found = client.lookup( + "CityInfo", + keys=keys, + ) + assert found.to_list() == [True, True, True] + + feature_df = client.query( + outputs=[UserFeaturesWithRightFields], + inputs=[Request.user_id, Request.category], + input_dataframe=pd.DataFrame( + { + "Request.user_id": [ + "5eece14efc13ae6609000000", + "5eece14efc13ae660900003c", + ], + "Request.category": ["banking", "programming"], + } + ), + ) + assert ( + feature_df["UserFeaturesWithRightFields.num_views"].to_list(), + feature_df["UserFeaturesWithRightFields.num_category_views"].to_list(), + feature_df["UserFeaturesWithRightFields.category_view_ratio"].to_list(), + ) == ([2, 4], [0, 1], [0.0, 0.25]) + + # Assert that both the last_viewed_post and last_viewed_post2 features are extracted correctly + last_post_viewed = feature_df[ + "UserFeaturesWithRightFields.last_viewed_post" + ].to_list() + last_post_viewed2 = [ + x[0] + for x in feature_df[ + "UserFeaturesWithRightFields.last_viewed_post2" + ].to_list() + ] + assert last_post_viewed == [936609766, 735291550] + assert last_post_viewed2 == last_post_viewed + + if client.is_integration_client(): + return + df = client.get_dataset_df("UserCategoryDatasetWithRightFields") + assert "extra_field" not in df.columns + assert df.shape == (1998, 4) + + @mock def test_social_network_with_mock_log(client): client.commit( diff --git a/fennel/datasets/datasets.py b/fennel/datasets/datasets.py index f4daa5340..c5b6b4b53 100644 --- a/fennel/datasets/datasets.py +++ b/fennel/datasets/datasets.py @@ -301,6 +301,7 @@ def join( left_on: Optional[List[str]] = None, right_on: Optional[List[str]] = None, within: Tuple[Duration, Duration] = ("forever", "0s"), + fields: Optional[List[str]] = None, ) -> Join: if not isinstance(other, Dataset) and isinstance(other, _Node): raise ValueError( @@ -309,7 +310,7 @@ def join( ) if not isinstance(other, _Node): raise TypeError("Cannot join with a non-dataset object") - return Join(self, other, within, how, on, left_on, right_on) + return Join(self, other, within, how, on, left_on, right_on, fields) def rename(self, columns: Dict[str, str]) -> _Node: return Rename(self, columns) @@ -935,6 +936,7 @@ def __init__( on: Optional[List[str]] = None, left_on: Optional[List[str]] = None, right_on: Optional[List[str]] = None, + fields: Optional[List[str]] = None, # Currently not supported lsuffix: str = "", rsuffix: str = "", @@ -963,6 +965,7 @@ def __init__( self.right_on = right_on self.within = within self.how = how + self.fields = fields self.lsuffix = lsuffix self.rsuffix = rsuffix self.node.out_edges.append(self) @@ -976,6 +979,7 @@ def signature(self): self.left_on, self.right_on, self.how, + self.fields, self.lsuffix, self.rsuffix, ) @@ -987,6 +991,7 @@ def signature(self): self.right_on, self.within, self.how, + self.fields, self.lsuffix, self.rsuffix, ) @@ -1004,6 +1009,8 @@ def make_types_optional(types: Dict[str, Type]) -> Dict[str, Type]: self.dataset.dsschema().values ) + right_ts = self.dataset.dsschema().timestamp + rhs_keys = set(self.dataset.dsschema().keys) join_keys = set(self.on) if self.on is not None else set(self.right_on) # Ensure on or right_on are the keys of the right dataset @@ -1034,15 +1041,42 @@ def make_types_optional(types: Dict[str, Type]) -> Dict[str, Type]: if self.how == "left": right_value_schema = make_types_optional(right_value_schema) - # Add right value columns to left schema. Check for column name collisions + # If fields is set, check that it contains elements from right schema values and timestamp only + if self.fields is not None and len(self.fields) > 0: + allowed_col_names = [x for x in right_value_schema.keys()] + [ + right_ts + ] + for col_name in self.fields: + if col_name not in allowed_col_names: + raise ValueError( + f"fields member `{col_name}` not present in allowed fields {allowed_col_names} of right input " + f"{self.dataset.dsschema().name}" + ) + + # Add right value columns to left schema. Check for column name collisions. Filter keys present in fields. joined_dsschema = copy.deepcopy(left_dsschema) for col, dtype in right_value_schema.items(): if col in left_schema: raise ValueError( f"Column name collision. `{col}` already exists in schema of left input {left_dsschema.name}, while joining with {self.dataset.dsschema().name}" ) + if ( + self.fields is not None + and len(self.fields) > 0 + and col not in self.fields + ): + continue joined_dsschema.append_value_column(col, dtype) + # Add timestamp column if present in fields + if self.fields is not None and right_ts in self.fields: + if self.how == "left": + joined_dsschema.append_value_column( + right_ts, Optional[datetime.datetime] + ) + else: + joined_dsschema.append_value_column(right_ts, datetime.datetime) + return joined_dsschema @@ -2842,6 +2876,27 @@ def validate_right_index(right_dataset: Dataset): f'"how" in {output_schema_name} must be either "inner" or "left" for `{output_schema_name}`' ) + if obj.fields is not None and len(obj.fields) > 0: + allowed_fields = [x for x in right_schema.values.keys()] + [ + right_schema.timestamp + ] + for field in obj.fields: + if field not in allowed_fields: + raise ValueError( + f"Field `{field}` specified in fields {obj.fields} " + f"doesn't exist in allowed fields {allowed_fields} of " + f"right schema of {output_schema_name}." + ) + + if ( + right_schema.timestamp in obj.fields + and right_schema.timestamp in left_schema.fields() + ): + raise ValueError( + f"Field `{right_schema.timestamp}` specified in fields {obj.fields} " + f"already exists in left schema of {output_schema_name}." + ) + output_schema = obj.dsschema() output_schema.name = output_schema_name return output_schema diff --git a/fennel/datasets/test_dataset.py b/fennel/datasets/test_dataset.py index 6cffdc67c..beff4b084 100644 --- a/fennel/datasets/test_dataset.py +++ b/fennel/datasets/test_dataset.py @@ -595,7 +595,7 @@ class ABCDataset: @includes(add_one) @inputs(A, B) def pipeline1(cls, a: Dataset, b: Dataset): - return a.join(b, how="left", left_on=["a1"], right_on=["b1"]) + return a.join(b, how="inner", left_on=["a1"], right_on=["b1"]) view = InternalTestClient() view.add(ABCDataset) @@ -637,14 +637,14 @@ def add_one(x: int): @includes(add_one) @inputs(A, B) def pipeline1(cls, a: Dataset, b: Dataset): - return a.join(b, how="left", left_on=["a1"], right_on=["b1"]) + return a.join(b, how="inner", left_on=["a1"], right_on=["b1"]) """ assert expected_gen_code == pipeline_req.pycode.generated_code expected_source_code = """@pipeline @includes(add_one) @inputs(A, B) def pipeline1(cls, a: Dataset, b: Dataset): - return a.join(b, how="left", left_on=["a1"], right_on=["b1"]) + return a.join(b, how="inner", left_on=["a1"], right_on=["b1"]) """ assert expected_source_code == pipeline_req.pycode.source_code p = { @@ -696,17 +696,17 @@ def pipeline1(cls, a: Dataset, b: Dataset): ) operator_req = sync_request.operators[2] o = { - "id": "12a2088d8d7a0d265a7bd3f694fc81aa", - "is_root": True, - "pipeline_name": "pipeline1", - "dataset_name": "ABCDataset", + "id": "b8a998fc5e47160f2ef4d3e4570d6bab", + "isRoot": True, + "pipelineName": "pipeline1", + "datasetName": "ABCDataset", "join": { - "lhs_operand_id": "A", - "rhs_dsref_operand_id": "B", + "lhsOperandId": "A", + "rhsDsrefOperandId": "B", "on": {"a1": "b1"}, - "how": 0, + "how": "Inner", }, - "ds_version": 1, + "dsVersion": 1, } expected_operator_request = ParseDict(o, ds_proto.Operator()) assert operator_req == expected_operator_request, error_message( @@ -868,17 +868,16 @@ def pipeline1(cls, a: Dataset, b: Dataset): ) operator_req = sync_request.operators[2] o = { - "id": "12a2088d8d7a0d265a7bd3f694fc81aa", - "is_root": True, - "pipeline_name": "pipeline1", - "dataset_name": "ABCDatasetDefault", + "id": "3338ee30aac1dc899789da9fc78fa025", + "isRoot": True, + "pipelineName": "pipeline1", + "datasetName": "ABCDatasetDefault", "join": { - "lhs_operand_id": "A", - "rhs_dsref_operand_id": "B", + "lhsOperandId": "A", + "rhsDsrefOperandId": "B", "on": {"a1": "b1"}, - "how": 0, }, - "ds_version": 1, + "dsVersion": 1, } expected_operator_request = ParseDict(o, ds_proto.Operator()) assert operator_req == expected_operator_request, error_message( @@ -966,17 +965,16 @@ def pipeline1(cls, a: Dataset, b: Dataset): ) operator_req = sync_request.operators[2] o = { - "id": "12a2088d8d7a0d265a7bd3f694fc81aa", - "is_root": True, - "pipeline_name": "pipeline1", - "dataset_name": "ABCDatasetDefault", + "id": "3338ee30aac1dc899789da9fc78fa025", + "isRoot": True, + "pipelineName": "pipeline1", + "datasetName": "ABCDatasetDefault", "join": { - "lhs_operand_id": "A", - "rhs_dsref_operand_id": "B", + "lhsOperandId": "A", + "rhsDsrefOperandId": "B", "on": {"a1": "b1"}, - "how": 0, }, - "ds_version": 1, + "dsVersion": 1, } expected_operator_request = ParseDict(o, ds_proto.Operator()) assert operator_req == expected_operator_request, error_message( @@ -1064,18 +1062,17 @@ def pipeline1(cls, a: Dataset, b: Dataset): ) operator_req = sync_request.operators[2] o = { - "id": "12a2088d8d7a0d265a7bd3f694fc81aa", - "is_root": True, - "pipeline_name": "pipeline1", - "dataset_name": "ABDatasetLow", + "id": "3338ee30aac1dc899789da9fc78fa025", + "isRoot": True, + "pipelineName": "pipeline1", + "datasetName": "ABDatasetLow", "join": { - "lhs_operand_id": "A", - "rhs_dsref_operand_id": "B", + "lhsOperandId": "A", + "rhsDsrefOperandId": "B", "on": {"a1": "b1"}, - "within_low": "3600s", - "how": 0, + "withinLow": "3600s", }, - "ds_version": 1, + "dsVersion": 1, } expected_operator_request = ParseDict(o, ds_proto.Operator()) assert operator_req == expected_operator_request, error_message( @@ -1163,18 +1160,17 @@ def pipeline1(cls, a: Dataset, b: Dataset): ) operator_req = sync_request.operators[2] o = { - "id": "12a2088d8d7a0d265a7bd3f694fc81aa", - "is_root": True, - "pipeline_name": "pipeline1", - "dataset_name": "ABDatasetHigh", + "id": "3338ee30aac1dc899789da9fc78fa025", + "isRoot": True, + "pipelineName": "pipeline1", + "datasetName": "ABDatasetHigh", "join": { - "lhs_operand_id": "A", - "rhs_dsref_operand_id": "B", + "lhsOperandId": "A", + "rhsDsrefOperandId": "B", "on": {"a1": "b1"}, - "how": 0, - "within_high": "86400s", + "withinHigh": "86400s", }, - "ds_version": 1, + "dsVersion": 1, } expected_operator_request = ParseDict(o, ds_proto.Operator()) assert operator_req == expected_operator_request, error_message( @@ -1262,19 +1258,18 @@ def pipeline1(cls, a: Dataset, b: Dataset): ) operator_req = sync_request.operators[2] o = { - "id": "12a2088d8d7a0d265a7bd3f694fc81aa", - "is_root": True, - "pipeline_name": "pipeline1", - "dataset_name": "ABDataset", + "id": "3338ee30aac1dc899789da9fc78fa025", + "isRoot": True, + "pipelineName": "pipeline1", + "datasetName": "ABDataset", "join": { - "lhs_operand_id": "A", - "rhs_dsref_operand_id": "B", + "lhsOperandId": "A", + "rhsDsrefOperandId": "B", "on": {"a1": "b1"}, - "how": 0, - "within_low": "259200s", - "within_high": "31536000s", + "withinLow": "259200s", + "withinHigh": "31536000s", }, - "ds_version": 1, + "dsVersion": 1, } expected_operator_request = ParseDict(o, ds_proto.Operator()) assert operator_req == expected_operator_request, error_message( @@ -1479,16 +1474,15 @@ def extract_info(df: pd.DataFrame) -> pd.DataFrame: operator_req = sync_request.operators[3] o = { - "id": "4202e94cf2e47bf5bcc94fd57aee8d0f", + "id": "246863a3fc1191098d24b4034f704851", "pipelineName": "create_fraud_dataset", "datasetName": "FraudReportAggregatedDataset", "join": { "lhsOperandId": "101097826c6986ddb25ce924985d9217", "rhsDsrefOperandId": "UserInfoDataset", "on": {"user_id": "user_id"}, - "how": 0, }, - "ds_version": 1, + "dsVersion": 1, } expected_operator_request = ParseDict(o, ds_proto.Operator()) assert operator_req == expected_operator_request, error_message( @@ -1497,11 +1491,11 @@ def extract_info(df: pd.DataFrame) -> pd.DataFrame: operator_req = erase_operator_pycode(sync_request.operators[4]) o = { - "id": "bfa10d216f843625785d24e6b9d890fb", + "id": "6158406804b946bda0c38a994229e995", "pipelineName": "create_fraud_dataset", "datasetName": "FraudReportAggregatedDataset", "transform": { - "operandId": "4202e94cf2e47bf5bcc94fd57aee8d0f", + "operandId": "246863a3fc1191098d24b4034f704851", "schema": { "user_id": {"intType": {}}, "merchant_id": {"intType": {}}, @@ -1518,14 +1512,14 @@ def extract_info(df: pd.DataFrame) -> pd.DataFrame: operator_req = sync_request.operators[5] o = { - "id": "acd519ba5789e767383099d0561e07c8", + "id": "c898276d34d964b833155b0e36a4ba2b", "pipelineName": "create_fraud_dataset", "datasetName": "FraudReportAggregatedDataset", "dedup": { - "operandId": "bfa10d216f843625785d24e6b9d890fb", + "operandId": "6158406804b946bda0c38a994229e995", "columns": ["user_id", "merchant_id"], }, - "ds_version": 1, + "dsVersion": 1, } expected_operator_request = ParseDict(o, ds_proto.Operator()) assert operator_req == expected_operator_request, error_message( @@ -1534,12 +1528,12 @@ def extract_info(df: pd.DataFrame) -> pd.DataFrame: operator_req = sync_request.operators[6] o = { - "id": "0b381d6b2444c390000402aaa4485a26", + "id": "45877eefa2fe6d8dbd5aba2fb07e5cb5", "isRoot": True, "pipelineName": "create_fraud_dataset", "datasetName": "FraudReportAggregatedDataset", "aggregate": { - "operandId": "acd519ba5789e767383099d0561e07c8", + "operandId": "c898276d34d964b833155b0e36a4ba2b", "keys": ["merchant_id"], "specs": [ { @@ -1556,17 +1550,17 @@ def extract_info(df: pd.DataFrame) -> pd.DataFrame: }, { "quantile": { + "of": "transaction_amount", "name": "median_transaction_amount", "window": {"sliding": {"duration": "604800s"}}, - "quantile": 0.5, "default": 0.0, - "of": "transaction_amount", + "quantile": 0.5, "approx": True, } }, ], }, - "ds_version": 1, + "dsVersion": 1, } expected_operator_request = ParseDict(o, ds_proto.Operator()) assert operator_req == expected_operator_request, error_message( diff --git a/fennel/datasets/test_invalid_dataset.py b/fennel/datasets/test_invalid_dataset.py index c15904b4c..012546503 100644 --- a/fennel/datasets/test_invalid_dataset.py +++ b/fennel/datasets/test_invalid_dataset.py @@ -764,6 +764,109 @@ def create_pipeline(cls, a: Dataset, b: Dataset): ) +def test_dataset_incorrect_join_fields(): + with pytest.raises(ValueError) as e: + + @dataset + class XYZ: + user_id: int + name: str + timestamp: datetime + + @dataset(index=True) + class ABC: + user_id: int = field(key=True) + age: int + timestamp: datetime + + @dataset + class XYZJoinedABC: + user_id: int + name: str + age: int + timestamp: datetime + + @pipeline + @inputs(XYZ, ABC) + def create_pipeline(cls, a: Dataset, b: Dataset): + c = a.join(b, how="inner", on=["user_id"], fields=["rank"]) # type: ignore + return c + + assert ( + str(e.value) + == "Field `rank` specified in fields ['rank'] doesn't exist in " + "allowed fields ['age', 'timestamp'] of right schema of " + "'[Pipeline:create_pipeline]->join node'." + ) + + with pytest.raises(ValueError) as e: + + @dataset + class XYZ: + user_id: int + name: str + timestamp: datetime + + @dataset(index=True) + class ABC: + user_id: int = field(key=True) + age: int + timestamp: datetime + + @dataset + class XYZJoinedABC1: + user_id: int + name: str + age: int + timestamp: datetime + + @pipeline + @inputs(XYZ, ABC) + def create_pipeline(cls, a: Dataset, b: Dataset): + c = a.join(b, how="inner", on=["user_id"], fields=["user_id"]) # type: ignore + return c + + assert ( + str(e.value) + == "Field `user_id` specified in fields ['user_id'] doesn't exist in " + "allowed fields ['age', 'timestamp'] of right schema of " + "'[Pipeline:create_pipeline]->join node'." + ) + + with pytest.raises(ValueError) as e: + + @dataset + class XYZ: + user_id: int + name: str + timestamp: datetime + + @dataset(index=True) + class ABC: + user_id: int = field(key=True) + age: int + timestamp: datetime + + @dataset + class XYZJoinedABC2: + user_id: int + name: str + age: int + timestamp: datetime + + @pipeline + @inputs(XYZ, ABC) + def create_pipeline(cls, a: Dataset, b: Dataset): + c = a.join(b, how="inner", on=["user_id"], fields=["timestamp"]) # type: ignore + return c + + assert ( + str(e.value) + == "Field `timestamp` specified in fields ['timestamp'] already " + "exists in left schema of '[Pipeline:create_pipeline]->join node'." + ) + + def test_dataset_incorrect_join_bounds(): with pytest.raises(ValueError) as e: diff --git a/fennel/gen/expr_pb2.py b/fennel/gen/expr_pb2.py index 32387c6bc..d3391b295 100644 --- a/fennel/gen/expr_pb2.py +++ b/fennel/gen/expr_pb2.py @@ -14,7 +14,7 @@ import fennel.gen.schema_pb2 as schema__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\nexpr.proto\x12\x11\x66\x65nnel.proto.expr\x1a\x0cschema.proto\"\x9a\x06\n\x04\x45xpr\x12%\n\x03ref\x18\x01 \x01(\x0b\x32\x16.fennel.proto.expr.RefH\x00\x12\x36\n\x0cjson_literal\x18\x02 \x01(\x0b\x32\x1e.fennel.proto.expr.JsonLiteralH\x00\x12)\n\x05unary\x18\x04 \x01(\x0b\x32\x18.fennel.proto.expr.UnaryH\x00\x12\'\n\x04\x63\x61se\x18\x05 \x01(\x0b\x32\x17.fennel.proto.expr.CaseH\x00\x12+\n\x06\x62inary\x18\x06 \x01(\x0b\x32\x19.fennel.proto.expr.BinaryH\x00\x12+\n\x06isnull\x18\x07 \x01(\x0b\x32\x19.fennel.proto.expr.IsNullH\x00\x12/\n\x08\x66illnull\x18\x08 \x01(\x0b\x32\x1b.fennel.proto.expr.FillNullH\x00\x12,\n\x07list_fn\x18\t \x01(\x0b\x32\x19.fennel.proto.expr.ListFnH\x00\x12,\n\x07math_fn\x18\n \x01(\x0b\x32\x19.fennel.proto.expr.MathFnH\x00\x12\x30\n\tstruct_fn\x18\x0b \x01(\x0b\x32\x1b.fennel.proto.expr.StructFnH\x00\x12,\n\x07\x64ict_fn\x18\x0c \x01(\x0b\x32\x19.fennel.proto.expr.DictFnH\x00\x12\x30\n\tstring_fn\x18\r \x01(\x0b\x32\x1b.fennel.proto.expr.StringFnH\x00\x12\x34\n\x0b\x64\x61tetime_fn\x18\x0e \x01(\x0b\x32\x1d.fennel.proto.expr.DateTimeFnH\x00\x12>\n\x10\x64\x61tetime_literal\x18\x0f \x01(\x0b\x32\".fennel.proto.expr.DatetimeLiteralH\x00\x12\x34\n\x0bmake_struct\x18\x10 \x01(\x0b\x32\x1d.fennel.proto.expr.MakeStructH\x00\x12\x32\n\nfrom_epoch\x18\x11 \x01(\x0b\x32\x1c.fennel.proto.expr.FromEpochH\x00\x42\x06\n\x04node\"a\n\tFromEpoch\x12)\n\x08\x64uration\x18\x01 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\x12)\n\x04unit\x18\x02 \x01(\x0e\x32\x1b.fennel.proto.expr.TimeUnit\"\xad\x01\n\x0f\x44\x61tetimeLiteral\x12\x0c\n\x04year\x18\x01 \x01(\r\x12\r\n\x05month\x18\x02 \x01(\r\x12\x0b\n\x03\x64\x61y\x18\x03 \x01(\r\x12\x0c\n\x04hour\x18\x04 \x01(\r\x12\x0e\n\x06minute\x18\x05 \x01(\r\x12\x0e\n\x06second\x18\x06 \x01(\r\x12\x13\n\x0bmicrosecond\x18\x07 \x01(\r\x12-\n\x08timezone\x18\x08 \x01(\x0b\x32\x1b.fennel.proto.expr.Timezone\"\xc5\x01\n\nMakeStruct\x12\x34\n\x0bstruct_type\x18\x01 \x01(\x0b\x32\x1f.fennel.proto.schema.StructType\x12\x39\n\x06\x66ields\x18\x02 \x03(\x0b\x32).fennel.proto.expr.MakeStruct.FieldsEntry\x1a\x46\n\x0b\x46ieldsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12&\n\x05value\x18\x02 \x01(\x0b\x32\x17.fennel.proto.expr.Expr:\x02\x38\x01\"L\n\x0bJsonLiteral\x12\x0f\n\x07literal\x18\x01 \x01(\t\x12,\n\x05\x64type\x18\x02 \x01(\x0b\x32\x1d.fennel.proto.schema.DataType\"\x13\n\x03Ref\x12\x0c\n\x04name\x18\x01 \x01(\t\"Y\n\x05Unary\x12&\n\x02op\x18\x01 \x01(\x0e\x32\x1a.fennel.proto.expr.UnaryOp\x12(\n\x07operand\x18\x02 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\"}\n\x06\x42inary\x12%\n\x04left\x18\x01 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\x12&\n\x05right\x18\x02 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\x12$\n\x02op\x18\x03 \x01(\x0e\x32\x18.fennel.proto.expr.BinOp\"b\n\x04\x43\x61se\x12.\n\twhen_then\x18\x01 \x03(\x0b\x32\x1b.fennel.proto.expr.WhenThen\x12*\n\totherwise\x18\x02 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\"X\n\x08WhenThen\x12%\n\x04when\x18\x01 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\x12%\n\x04then\x18\x02 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\"2\n\x06IsNull\x12(\n\x07operand\x18\x01 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\"[\n\x08\x46illNull\x12(\n\x07operand\x18\x01 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\x12%\n\x04\x66ill\x18\x02 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\"\x93\x01\n\x06ListOp\x12%\n\x03len\x18\x01 \x01(\x0b\x32\x16.fennel.proto.expr.LenH\x00\x12&\n\x03get\x18\x02 \x01(\x0b\x32\x17.fennel.proto.expr.ExprH\x00\x12/\n\x08\x63ontains\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.expr.ContainsH\x00\x42\t\n\x07\x66n_type\"\x05\n\x03Len\"4\n\x08\x43ontains\x12(\n\x07\x65lement\x18\x01 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\"V\n\x06ListFn\x12%\n\x04list\x18\x01 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\x12%\n\x02\x66n\x18\x02 \x01(\x0b\x32\x19.fennel.proto.expr.ListOp\"\xb9\x01\n\x06MathOp\x12)\n\x05round\x18\x01 \x01(\x0b\x32\x18.fennel.proto.expr.RoundH\x00\x12%\n\x03\x61\x62s\x18\x02 \x01(\x0b\x32\x16.fennel.proto.expr.AbsH\x00\x12\'\n\x04\x63\x65il\x18\x03 \x01(\x0b\x32\x17.fennel.proto.expr.CeilH\x00\x12)\n\x05\x66loor\x18\x04 \x01(\x0b\x32\x18.fennel.proto.expr.FloorH\x00\x42\t\n\x07\x66n_type\"\x1a\n\x05Round\x12\x11\n\tprecision\x18\x01 \x01(\x05\"\x05\n\x03\x41\x62s\"\x06\n\x04\x43\x65il\"\x07\n\x05\x46loor\"Y\n\x06MathFn\x12(\n\x07operand\x18\x01 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\x12%\n\x02\x66n\x18\x02 \x01(\x0b\x32\x19.fennel.proto.expr.MathOp\"&\n\x08StructOp\x12\x0f\n\x05\x66ield\x18\x01 \x01(\tH\x00\x42\t\n\x07\x66n_type\"\\\n\x08StructFn\x12\'\n\x06struct\x18\x01 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\x12\'\n\x02\x66n\x18\x02 \x01(\x0b\x32\x1b.fennel.proto.expr.StructOp\"a\n\x07\x44ictGet\x12&\n\x05\x66ield\x18\x01 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\x12.\n\rdefault_value\x18\x03 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\"\x96\x01\n\x06\x44ictOp\x12%\n\x03len\x18\x01 \x01(\x0b\x32\x16.fennel.proto.expr.LenH\x00\x12)\n\x03get\x18\x02 \x01(\x0b\x32\x1a.fennel.proto.expr.DictGetH\x00\x12/\n\x08\x63ontains\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.expr.ContainsH\x00\x42\t\n\x07\x66n_type\"V\n\x06\x44ictFn\x12%\n\x04\x64ict\x18\x01 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\x12%\n\x02\x66n\x18\x02 \x01(\x0b\x32\x19.fennel.proto.expr.DictOp\"\xc5\x03\n\x08StringOp\x12%\n\x03len\x18\x01 \x01(\x0b\x32\x16.fennel.proto.expr.LenH\x00\x12-\n\x07tolower\x18\x02 \x01(\x0b\x32\x1a.fennel.proto.expr.ToLowerH\x00\x12-\n\x07toupper\x18\x03 \x01(\x0b\x32\x1a.fennel.proto.expr.ToUpperH\x00\x12/\n\x08\x63ontains\x18\x04 \x01(\x0b\x32\x1b.fennel.proto.expr.ContainsH\x00\x12\x33\n\nstartswith\x18\x05 \x01(\x0b\x32\x1d.fennel.proto.expr.StartsWithH\x00\x12/\n\x08\x65ndswith\x18\x06 \x01(\x0b\x32\x1b.fennel.proto.expr.EndsWithH\x00\x12+\n\x06\x63oncat\x18\x07 \x01(\x0b\x32\x19.fennel.proto.expr.ConcatH\x00\x12/\n\x08strptime\x18\x08 \x01(\x0b\x32\x1b.fennel.proto.expr.StrptimeH\x00\x12\x34\n\x0bjson_decode\x18\t \x01(\x0b\x32\x1d.fennel.proto.expr.JsonDecodeH\x00\x42\t\n\x07\x66n_type\"\x1c\n\x08Timezone\x12\x10\n\x08timezone\x18\x01 \x01(\t\":\n\nJsonDecode\x12,\n\x05\x64type\x18\x01 \x01(\x0b\x32\x1d.fennel.proto.schema.DataType\"I\n\x08Strptime\x12\x0e\n\x06\x66ormat\x18\x01 \x01(\t\x12-\n\x08timezone\x18\x02 \x01(\x0b\x32\x1b.fennel.proto.expr.Timezone\"\t\n\x07ToLower\"\t\n\x07ToUpper\"2\n\nStartsWith\x12$\n\x03key\x18\x01 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\"0\n\x08\x45ndsWith\x12$\n\x03key\x18\x01 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\"0\n\x06\x43oncat\x12&\n\x05other\x18\x01 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\"\\\n\x08StringFn\x12\'\n\x06string\x18\x01 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\x12\'\n\x02\x66n\x18\x02 \x01(\x0b\x32\x1b.fennel.proto.expr.StringOp\"b\n\nDateTimeFn\x12)\n\x08\x64\x61tetime\x18\x01 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\x12)\n\x02\x66n\x18\x02 \x01(\x0b\x32\x1d.fennel.proto.expr.DateTimeOp\"\xd2\x01\n\nDateTimeOp\x12)\n\x05since\x18\x01 \x01(\x0b\x32\x18.fennel.proto.expr.SinceH\x00\x12\x34\n\x0bsince_epoch\x18\x02 \x01(\x0b\x32\x1d.fennel.proto.expr.SinceEpochH\x00\x12/\n\x08strftime\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.expr.StrftimeH\x00\x12\'\n\x04part\x18\x04 \x01(\x0b\x32\x17.fennel.proto.expr.PartH\x00\x42\t\n\x07\x66n_type\"Z\n\x05Since\x12&\n\x05other\x18\x01 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\x12)\n\x04unit\x18\x02 \x01(\x0e\x32\x1b.fennel.proto.expr.TimeUnit\"7\n\nSinceEpoch\x12)\n\x04unit\x18\x01 \x01(\x0e\x32\x1b.fennel.proto.expr.TimeUnit\"\x1a\n\x08Strftime\x12\x0e\n\x06\x66ormat\x18\x01 \x01(\t\"1\n\x04Part\x12)\n\x04unit\x18\x01 \x01(\x0e\x32\x1b.fennel.proto.expr.TimeUnit*\x1b\n\x07UnaryOp\x12\x07\n\x03NEG\x10\x00\x12\x07\n\x03NOT\x10\x01*\x86\x01\n\x05\x42inOp\x12\x07\n\x03\x41\x44\x44\x10\x00\x12\x07\n\x03SUB\x10\x01\x12\x07\n\x03MUL\x10\x02\x12\x07\n\x03\x44IV\x10\x03\x12\x07\n\x03MOD\x10\x04\x12\r\n\tFLOOR_DIV\x10\x05\x12\x06\n\x02\x45Q\x10\x06\x12\x06\n\x02NE\x10\x07\x12\x06\n\x02GT\x10\x08\x12\x07\n\x03GTE\x10\t\x12\x06\n\x02LT\x10\n\x12\x07\n\x03LTE\x10\x0b\x12\x07\n\x03\x41ND\x10\x0c\x12\x06\n\x02OR\x10\r*\x83\x01\n\x08TimeUnit\x12\x0b\n\x07UNKNOWN\x10\x00\x12\n\n\x06SECOND\x10\x01\x12\n\n\x06MINUTE\x10\x02\x12\x08\n\x04HOUR\x10\x03\x12\x07\n\x03\x44\x41Y\x10\x04\x12\x08\n\x04WEEK\x10\x05\x12\t\n\x05MONTH\x10\x06\x12\x08\n\x04YEAR\x10\x07\x12\x0f\n\x0bMICROSECOND\x10\x08\x12\x0f\n\x0bMILLISECOND\x10\tb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\nexpr.proto\x12\x11\x66\x65nnel.proto.expr\x1a\x0cschema.proto\"\x9a\x06\n\x04\x45xpr\x12%\n\x03ref\x18\x01 \x01(\x0b\x32\x16.fennel.proto.expr.RefH\x00\x12\x36\n\x0cjson_literal\x18\x02 \x01(\x0b\x32\x1e.fennel.proto.expr.JsonLiteralH\x00\x12)\n\x05unary\x18\x04 \x01(\x0b\x32\x18.fennel.proto.expr.UnaryH\x00\x12\'\n\x04\x63\x61se\x18\x05 \x01(\x0b\x32\x17.fennel.proto.expr.CaseH\x00\x12+\n\x06\x62inary\x18\x06 \x01(\x0b\x32\x19.fennel.proto.expr.BinaryH\x00\x12+\n\x06isnull\x18\x07 \x01(\x0b\x32\x19.fennel.proto.expr.IsNullH\x00\x12/\n\x08\x66illnull\x18\x08 \x01(\x0b\x32\x1b.fennel.proto.expr.FillNullH\x00\x12,\n\x07list_fn\x18\t \x01(\x0b\x32\x19.fennel.proto.expr.ListFnH\x00\x12,\n\x07math_fn\x18\n \x01(\x0b\x32\x19.fennel.proto.expr.MathFnH\x00\x12\x30\n\tstruct_fn\x18\x0b \x01(\x0b\x32\x1b.fennel.proto.expr.StructFnH\x00\x12,\n\x07\x64ict_fn\x18\x0c \x01(\x0b\x32\x19.fennel.proto.expr.DictFnH\x00\x12\x30\n\tstring_fn\x18\r \x01(\x0b\x32\x1b.fennel.proto.expr.StringFnH\x00\x12\x34\n\x0b\x64\x61tetime_fn\x18\x0e \x01(\x0b\x32\x1d.fennel.proto.expr.DateTimeFnH\x00\x12>\n\x10\x64\x61tetime_literal\x18\x0f \x01(\x0b\x32\".fennel.proto.expr.DatetimeLiteralH\x00\x12\x34\n\x0bmake_struct\x18\x10 \x01(\x0b\x32\x1d.fennel.proto.expr.MakeStructH\x00\x12\x32\n\nfrom_epoch\x18\x11 \x01(\x0b\x32\x1c.fennel.proto.expr.FromEpochH\x00\x42\x06\n\x04node\"a\n\tFromEpoch\x12)\n\x08\x64uration\x18\x01 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\x12)\n\x04unit\x18\x02 \x01(\x0e\x32\x1b.fennel.proto.expr.TimeUnit\"\xad\x01\n\x0f\x44\x61tetimeLiteral\x12\x0c\n\x04year\x18\x01 \x01(\r\x12\r\n\x05month\x18\x02 \x01(\r\x12\x0b\n\x03\x64\x61y\x18\x03 \x01(\r\x12\x0c\n\x04hour\x18\x04 \x01(\r\x12\x0e\n\x06minute\x18\x05 \x01(\r\x12\x0e\n\x06second\x18\x06 \x01(\r\x12\x13\n\x0bmicrosecond\x18\x07 \x01(\r\x12-\n\x08timezone\x18\x08 \x01(\x0b\x32\x1b.fennel.proto.expr.Timezone\"\xc5\x01\n\nMakeStruct\x12\x34\n\x0bstruct_type\x18\x01 \x01(\x0b\x32\x1f.fennel.proto.schema.StructType\x12\x39\n\x06\x66ields\x18\x02 \x03(\x0b\x32).fennel.proto.expr.MakeStruct.FieldsEntry\x1a\x46\n\x0b\x46ieldsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12&\n\x05value\x18\x02 \x01(\x0b\x32\x17.fennel.proto.expr.Expr:\x02\x38\x01\"L\n\x0bJsonLiteral\x12\x0f\n\x07literal\x18\x01 \x01(\t\x12,\n\x05\x64type\x18\x02 \x01(\x0b\x32\x1d.fennel.proto.schema.DataType\"\x13\n\x03Ref\x12\x0c\n\x04name\x18\x01 \x01(\t\"Y\n\x05Unary\x12&\n\x02op\x18\x01 \x01(\x0e\x32\x1a.fennel.proto.expr.UnaryOp\x12(\n\x07operand\x18\x02 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\"}\n\x06\x42inary\x12%\n\x04left\x18\x01 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\x12&\n\x05right\x18\x02 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\x12$\n\x02op\x18\x03 \x01(\x0e\x32\x18.fennel.proto.expr.BinOp\"b\n\x04\x43\x61se\x12.\n\twhen_then\x18\x01 \x03(\x0b\x32\x1b.fennel.proto.expr.WhenThen\x12*\n\totherwise\x18\x02 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\"X\n\x08WhenThen\x12%\n\x04when\x18\x01 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\x12%\n\x04then\x18\x02 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\"2\n\x06IsNull\x12(\n\x07operand\x18\x01 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\"[\n\x08\x46illNull\x12(\n\x07operand\x18\x01 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\x12%\n\x04\x66ill\x18\x02 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\"\xc3\x01\n\x06ListOp\x12%\n\x03len\x18\x01 \x01(\x0b\x32\x16.fennel.proto.expr.LenH\x00\x12&\n\x03get\x18\x02 \x01(\x0b\x32\x17.fennel.proto.expr.ExprH\x00\x12/\n\x08\x63ontains\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.expr.ContainsH\x00\x12.\n\x08has_null\x18\x04 \x01(\x0b\x32\x1a.fennel.proto.expr.HasNullH\x00\x42\t\n\x07\x66n_type\"\x05\n\x03Len\"\t\n\x07HasNull\"4\n\x08\x43ontains\x12(\n\x07\x65lement\x18\x01 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\"V\n\x06ListFn\x12%\n\x04list\x18\x01 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\x12%\n\x02\x66n\x18\x02 \x01(\x0b\x32\x19.fennel.proto.expr.ListOp\"\xb9\x01\n\x06MathOp\x12)\n\x05round\x18\x01 \x01(\x0b\x32\x18.fennel.proto.expr.RoundH\x00\x12%\n\x03\x61\x62s\x18\x02 \x01(\x0b\x32\x16.fennel.proto.expr.AbsH\x00\x12\'\n\x04\x63\x65il\x18\x03 \x01(\x0b\x32\x17.fennel.proto.expr.CeilH\x00\x12)\n\x05\x66loor\x18\x04 \x01(\x0b\x32\x18.fennel.proto.expr.FloorH\x00\x42\t\n\x07\x66n_type\"\x1a\n\x05Round\x12\x11\n\tprecision\x18\x01 \x01(\x05\"\x05\n\x03\x41\x62s\"\x06\n\x04\x43\x65il\"\x07\n\x05\x46loor\"Y\n\x06MathFn\x12(\n\x07operand\x18\x01 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\x12%\n\x02\x66n\x18\x02 \x01(\x0b\x32\x19.fennel.proto.expr.MathOp\"&\n\x08StructOp\x12\x0f\n\x05\x66ield\x18\x01 \x01(\tH\x00\x42\t\n\x07\x66n_type\"\\\n\x08StructFn\x12\'\n\x06struct\x18\x01 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\x12\'\n\x02\x66n\x18\x02 \x01(\x0b\x32\x1b.fennel.proto.expr.StructOp\"a\n\x07\x44ictGet\x12&\n\x05\x66ield\x18\x01 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\x12.\n\rdefault_value\x18\x03 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\"\x96\x01\n\x06\x44ictOp\x12%\n\x03len\x18\x01 \x01(\x0b\x32\x16.fennel.proto.expr.LenH\x00\x12)\n\x03get\x18\x02 \x01(\x0b\x32\x1a.fennel.proto.expr.DictGetH\x00\x12/\n\x08\x63ontains\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.expr.ContainsH\x00\x42\t\n\x07\x66n_type\"V\n\x06\x44ictFn\x12%\n\x04\x64ict\x18\x01 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\x12%\n\x02\x66n\x18\x02 \x01(\x0b\x32\x19.fennel.proto.expr.DictOp\"\xc5\x03\n\x08StringOp\x12%\n\x03len\x18\x01 \x01(\x0b\x32\x16.fennel.proto.expr.LenH\x00\x12-\n\x07tolower\x18\x02 \x01(\x0b\x32\x1a.fennel.proto.expr.ToLowerH\x00\x12-\n\x07toupper\x18\x03 \x01(\x0b\x32\x1a.fennel.proto.expr.ToUpperH\x00\x12/\n\x08\x63ontains\x18\x04 \x01(\x0b\x32\x1b.fennel.proto.expr.ContainsH\x00\x12\x33\n\nstartswith\x18\x05 \x01(\x0b\x32\x1d.fennel.proto.expr.StartsWithH\x00\x12/\n\x08\x65ndswith\x18\x06 \x01(\x0b\x32\x1b.fennel.proto.expr.EndsWithH\x00\x12+\n\x06\x63oncat\x18\x07 \x01(\x0b\x32\x19.fennel.proto.expr.ConcatH\x00\x12/\n\x08strptime\x18\x08 \x01(\x0b\x32\x1b.fennel.proto.expr.StrptimeH\x00\x12\x34\n\x0bjson_decode\x18\t \x01(\x0b\x32\x1d.fennel.proto.expr.JsonDecodeH\x00\x42\t\n\x07\x66n_type\"\x1c\n\x08Timezone\x12\x10\n\x08timezone\x18\x01 \x01(\t\":\n\nJsonDecode\x12,\n\x05\x64type\x18\x01 \x01(\x0b\x32\x1d.fennel.proto.schema.DataType\"I\n\x08Strptime\x12\x0e\n\x06\x66ormat\x18\x01 \x01(\t\x12-\n\x08timezone\x18\x02 \x01(\x0b\x32\x1b.fennel.proto.expr.Timezone\"\t\n\x07ToLower\"\t\n\x07ToUpper\"2\n\nStartsWith\x12$\n\x03key\x18\x01 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\"0\n\x08\x45ndsWith\x12$\n\x03key\x18\x01 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\"0\n\x06\x43oncat\x12&\n\x05other\x18\x01 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\"\\\n\x08StringFn\x12\'\n\x06string\x18\x01 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\x12\'\n\x02\x66n\x18\x02 \x01(\x0b\x32\x1b.fennel.proto.expr.StringOp\"b\n\nDateTimeFn\x12)\n\x08\x64\x61tetime\x18\x01 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\x12)\n\x02\x66n\x18\x02 \x01(\x0b\x32\x1d.fennel.proto.expr.DateTimeOp\"\xd2\x01\n\nDateTimeOp\x12)\n\x05since\x18\x01 \x01(\x0b\x32\x18.fennel.proto.expr.SinceH\x00\x12\x34\n\x0bsince_epoch\x18\x02 \x01(\x0b\x32\x1d.fennel.proto.expr.SinceEpochH\x00\x12/\n\x08strftime\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.expr.StrftimeH\x00\x12\'\n\x04part\x18\x04 \x01(\x0b\x32\x17.fennel.proto.expr.PartH\x00\x42\t\n\x07\x66n_type\"Z\n\x05Since\x12&\n\x05other\x18\x01 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\x12)\n\x04unit\x18\x02 \x01(\x0e\x32\x1b.fennel.proto.expr.TimeUnit\"7\n\nSinceEpoch\x12)\n\x04unit\x18\x01 \x01(\x0e\x32\x1b.fennel.proto.expr.TimeUnit\"\x1a\n\x08Strftime\x12\x0e\n\x06\x66ormat\x18\x01 \x01(\t\"1\n\x04Part\x12)\n\x04unit\x18\x01 \x01(\x0e\x32\x1b.fennel.proto.expr.TimeUnit*\x1b\n\x07UnaryOp\x12\x07\n\x03NEG\x10\x00\x12\x07\n\x03NOT\x10\x01*\x86\x01\n\x05\x42inOp\x12\x07\n\x03\x41\x44\x44\x10\x00\x12\x07\n\x03SUB\x10\x01\x12\x07\n\x03MUL\x10\x02\x12\x07\n\x03\x44IV\x10\x03\x12\x07\n\x03MOD\x10\x04\x12\r\n\tFLOOR_DIV\x10\x05\x12\x06\n\x02\x45Q\x10\x06\x12\x06\n\x02NE\x10\x07\x12\x06\n\x02GT\x10\x08\x12\x07\n\x03GTE\x10\t\x12\x06\n\x02LT\x10\n\x12\x07\n\x03LTE\x10\x0b\x12\x07\n\x03\x41ND\x10\x0c\x12\x06\n\x02OR\x10\r*\x83\x01\n\x08TimeUnit\x12\x0b\n\x07UNKNOWN\x10\x00\x12\n\n\x06SECOND\x10\x01\x12\n\n\x06MINUTE\x10\x02\x12\x08\n\x04HOUR\x10\x03\x12\x07\n\x03\x44\x41Y\x10\x04\x12\x08\n\x04WEEK\x10\x05\x12\t\n\x05MONTH\x10\x06\x12\x08\n\x04YEAR\x10\x07\x12\x0f\n\x0bMICROSECOND\x10\x08\x12\x0f\n\x0bMILLISECOND\x10\tb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -23,12 +23,12 @@ DESCRIPTOR._options = None _globals['_MAKESTRUCT_FIELDSENTRY']._options = None _globals['_MAKESTRUCT_FIELDSENTRY']._serialized_options = b'8\001' - _globals['_UNARYOP']._serialized_start=4505 - _globals['_UNARYOP']._serialized_end=4532 - _globals['_BINOP']._serialized_start=4535 - _globals['_BINOP']._serialized_end=4669 - _globals['_TIMEUNIT']._serialized_start=4672 - _globals['_TIMEUNIT']._serialized_end=4803 + _globals['_UNARYOP']._serialized_start=4564 + _globals['_UNARYOP']._serialized_end=4591 + _globals['_BINOP']._serialized_start=4594 + _globals['_BINOP']._serialized_end=4728 + _globals['_TIMEUNIT']._serialized_start=4731 + _globals['_TIMEUNIT']._serialized_end=4862 _globals['_EXPR']._serialized_start=48 _globals['_EXPR']._serialized_end=842 _globals['_FROMEPOCH']._serialized_start=844 @@ -56,65 +56,67 @@ _globals['_FILLNULL']._serialized_start=1878 _globals['_FILLNULL']._serialized_end=1969 _globals['_LISTOP']._serialized_start=1972 - _globals['_LISTOP']._serialized_end=2119 - _globals['_LEN']._serialized_start=2121 - _globals['_LEN']._serialized_end=2126 - _globals['_CONTAINS']._serialized_start=2128 - _globals['_CONTAINS']._serialized_end=2180 - _globals['_LISTFN']._serialized_start=2182 - _globals['_LISTFN']._serialized_end=2268 - _globals['_MATHOP']._serialized_start=2271 - _globals['_MATHOP']._serialized_end=2456 - _globals['_ROUND']._serialized_start=2458 - _globals['_ROUND']._serialized_end=2484 - _globals['_ABS']._serialized_start=2486 - _globals['_ABS']._serialized_end=2491 - _globals['_CEIL']._serialized_start=2493 - _globals['_CEIL']._serialized_end=2499 - _globals['_FLOOR']._serialized_start=2501 - _globals['_FLOOR']._serialized_end=2508 - _globals['_MATHFN']._serialized_start=2510 - _globals['_MATHFN']._serialized_end=2599 - _globals['_STRUCTOP']._serialized_start=2601 - _globals['_STRUCTOP']._serialized_end=2639 - _globals['_STRUCTFN']._serialized_start=2641 - _globals['_STRUCTFN']._serialized_end=2733 - _globals['_DICTGET']._serialized_start=2735 - _globals['_DICTGET']._serialized_end=2832 - _globals['_DICTOP']._serialized_start=2835 - _globals['_DICTOP']._serialized_end=2985 - _globals['_DICTFN']._serialized_start=2987 - _globals['_DICTFN']._serialized_end=3073 - _globals['_STRINGOP']._serialized_start=3076 - _globals['_STRINGOP']._serialized_end=3529 - _globals['_TIMEZONE']._serialized_start=3531 - _globals['_TIMEZONE']._serialized_end=3559 - _globals['_JSONDECODE']._serialized_start=3561 - _globals['_JSONDECODE']._serialized_end=3619 - _globals['_STRPTIME']._serialized_start=3621 - _globals['_STRPTIME']._serialized_end=3694 - _globals['_TOLOWER']._serialized_start=3696 - _globals['_TOLOWER']._serialized_end=3705 - _globals['_TOUPPER']._serialized_start=3707 - _globals['_TOUPPER']._serialized_end=3716 - _globals['_STARTSWITH']._serialized_start=3718 - _globals['_STARTSWITH']._serialized_end=3768 - _globals['_ENDSWITH']._serialized_start=3770 - _globals['_ENDSWITH']._serialized_end=3818 - _globals['_CONCAT']._serialized_start=3820 - _globals['_CONCAT']._serialized_end=3868 - _globals['_STRINGFN']._serialized_start=3870 - _globals['_STRINGFN']._serialized_end=3962 - _globals['_DATETIMEFN']._serialized_start=3964 - _globals['_DATETIMEFN']._serialized_end=4062 - _globals['_DATETIMEOP']._serialized_start=4065 - _globals['_DATETIMEOP']._serialized_end=4275 - _globals['_SINCE']._serialized_start=4277 - _globals['_SINCE']._serialized_end=4367 - _globals['_SINCEEPOCH']._serialized_start=4369 - _globals['_SINCEEPOCH']._serialized_end=4424 - _globals['_STRFTIME']._serialized_start=4426 - _globals['_STRFTIME']._serialized_end=4452 - _globals['_PART']._serialized_start=4454 - _globals['_PART']._serialized_end=4503 + _globals['_LISTOP']._serialized_end=2167 + _globals['_LEN']._serialized_start=2169 + _globals['_LEN']._serialized_end=2174 + _globals['_HASNULL']._serialized_start=2176 + _globals['_HASNULL']._serialized_end=2185 + _globals['_CONTAINS']._serialized_start=2187 + _globals['_CONTAINS']._serialized_end=2239 + _globals['_LISTFN']._serialized_start=2241 + _globals['_LISTFN']._serialized_end=2327 + _globals['_MATHOP']._serialized_start=2330 + _globals['_MATHOP']._serialized_end=2515 + _globals['_ROUND']._serialized_start=2517 + _globals['_ROUND']._serialized_end=2543 + _globals['_ABS']._serialized_start=2545 + _globals['_ABS']._serialized_end=2550 + _globals['_CEIL']._serialized_start=2552 + _globals['_CEIL']._serialized_end=2558 + _globals['_FLOOR']._serialized_start=2560 + _globals['_FLOOR']._serialized_end=2567 + _globals['_MATHFN']._serialized_start=2569 + _globals['_MATHFN']._serialized_end=2658 + _globals['_STRUCTOP']._serialized_start=2660 + _globals['_STRUCTOP']._serialized_end=2698 + _globals['_STRUCTFN']._serialized_start=2700 + _globals['_STRUCTFN']._serialized_end=2792 + _globals['_DICTGET']._serialized_start=2794 + _globals['_DICTGET']._serialized_end=2891 + _globals['_DICTOP']._serialized_start=2894 + _globals['_DICTOP']._serialized_end=3044 + _globals['_DICTFN']._serialized_start=3046 + _globals['_DICTFN']._serialized_end=3132 + _globals['_STRINGOP']._serialized_start=3135 + _globals['_STRINGOP']._serialized_end=3588 + _globals['_TIMEZONE']._serialized_start=3590 + _globals['_TIMEZONE']._serialized_end=3618 + _globals['_JSONDECODE']._serialized_start=3620 + _globals['_JSONDECODE']._serialized_end=3678 + _globals['_STRPTIME']._serialized_start=3680 + _globals['_STRPTIME']._serialized_end=3753 + _globals['_TOLOWER']._serialized_start=3755 + _globals['_TOLOWER']._serialized_end=3764 + _globals['_TOUPPER']._serialized_start=3766 + _globals['_TOUPPER']._serialized_end=3775 + _globals['_STARTSWITH']._serialized_start=3777 + _globals['_STARTSWITH']._serialized_end=3827 + _globals['_ENDSWITH']._serialized_start=3829 + _globals['_ENDSWITH']._serialized_end=3877 + _globals['_CONCAT']._serialized_start=3879 + _globals['_CONCAT']._serialized_end=3927 + _globals['_STRINGFN']._serialized_start=3929 + _globals['_STRINGFN']._serialized_end=4021 + _globals['_DATETIMEFN']._serialized_start=4023 + _globals['_DATETIMEFN']._serialized_end=4121 + _globals['_DATETIMEOP']._serialized_start=4124 + _globals['_DATETIMEOP']._serialized_end=4334 + _globals['_SINCE']._serialized_start=4336 + _globals['_SINCE']._serialized_end=4426 + _globals['_SINCEEPOCH']._serialized_start=4428 + _globals['_SINCEEPOCH']._serialized_end=4483 + _globals['_STRFTIME']._serialized_start=4485 + _globals['_STRFTIME']._serialized_end=4511 + _globals['_PART']._serialized_start=4513 + _globals['_PART']._serialized_end=4562 # @@protoc_insertion_point(module_scope) diff --git a/fennel/gen/expr_pb2.pyi b/fennel/gen/expr_pb2.pyi index 9c95e3c27..b423c09cc 100644 --- a/fennel/gen/expr_pb2.pyi +++ b/fennel/gen/expr_pb2.pyi @@ -453,6 +453,7 @@ class ListOp(google.protobuf.message.Message): LEN_FIELD_NUMBER: builtins.int GET_FIELD_NUMBER: builtins.int CONTAINS_FIELD_NUMBER: builtins.int + HAS_NULL_FIELD_NUMBER: builtins.int @property def len(self) -> global___Len: ... @property @@ -461,16 +462,19 @@ class ListOp(google.protobuf.message.Message): @property def contains(self) -> global___Contains: """Check if the list contains an element""" + @property + def has_null(self) -> global___HasNull: ... def __init__( self, *, len: global___Len | None = ..., get: global___Expr | None = ..., contains: global___Contains | None = ..., + has_null: global___HasNull | None = ..., ) -> None: ... - def HasField(self, field_name: typing_extensions.Literal["contains", b"contains", "fn_type", b"fn_type", "get", b"get", "len", b"len"]) -> builtins.bool: ... - def ClearField(self, field_name: typing_extensions.Literal["contains", b"contains", "fn_type", b"fn_type", "get", b"get", "len", b"len"]) -> None: ... - def WhichOneof(self, oneof_group: typing_extensions.Literal["fn_type", b"fn_type"]) -> typing_extensions.Literal["len", "get", "contains"] | None: ... + def HasField(self, field_name: typing_extensions.Literal["contains", b"contains", "fn_type", b"fn_type", "get", b"get", "has_null", b"has_null", "len", b"len"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["contains", b"contains", "fn_type", b"fn_type", "get", b"get", "has_null", b"has_null", "len", b"len"]) -> None: ... + def WhichOneof(self, oneof_group: typing_extensions.Literal["fn_type", b"fn_type"]) -> typing_extensions.Literal["len", "get", "contains", "has_null"] | None: ... global___ListOp = ListOp @@ -484,6 +488,16 @@ class Len(google.protobuf.message.Message): global___Len = Len +@typing_extensions.final +class HasNull(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + def __init__( + self, + ) -> None: ... + +global___HasNull = HasNull + @typing_extensions.final class Contains(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor diff --git a/fennel/internal_lib/to_proto/serializer.py b/fennel/internal_lib/to_proto/serializer.py index 65a8b2af6..95ff21cfd 100644 --- a/fennel/internal_lib/to_proto/serializer.py +++ b/fennel/internal_lib/to_proto/serializer.py @@ -258,6 +258,7 @@ def visitJoin(self, obj): ), within_low=within_low, within_high=within_high, + rhs_fields=obj.fields, ), ) diff --git a/fennel/testing/executor.py b/fennel/testing/executor.py index 54ada5b29..8b554acd2 100644 --- a/fennel/testing/executor.py +++ b/fennel/testing/executor.py @@ -496,6 +496,13 @@ def visitJoin(self, obj) -> Optional[NodeRet]: right_df = right_df.rename( columns={right_timestamp_field: tmp_right_ts} ) + # Conserve the rhs timestamp field if the join demands it + if ( + obj.fields is not None + and len(obj.fields) > 0 + and right_timestamp_field in obj.fields + ): + right_df[right_timestamp_field] = right_df[tmp_right_ts] # Set the value of the left timestamp - this is the timestamp that will be used for the join # - to be the upper bound of the join query (this is the max ts of a valid right dataset entry) @@ -656,6 +663,33 @@ def emited_ts(row): columns=[tmp_ts_low, ts_query_field, tmp_left_ts, tmp_right_ts], inplace=True, ) + + if obj.fields is not None and len(obj.fields) > 0: + all_right_fields = [f.name for f in right_ret.fields] # type: ignore + for col_name in obj.fields: + if col_name in right_ret.key_fields: # type: ignore + raise Exception( + f"fields member {col_name} cannot be one of right dataframe's " + f"key fields {right_ret.key_fields}" # type: ignore + ) + if col_name not in all_right_fields: + raise Exception( + f"fields member {col_name} not present in right dataframe's " # type: ignore + f"fields {right_ret.fields}" + ) + + cols_to_drop = [] + for field in right_ret.fields: # type: ignore + col_name = field.name + if ( + col_name not in right_ret.key_fields # type: ignore + and col_name != right_ret.timestamp_field # type: ignore + and col_name not in obj.fields + ): + cols_to_drop.append(col_name) + + merged_df.drop(columns=cols_to_drop, inplace=True) + # sort the dataframe by the timestamp sorted_df = merged_df.sort_values(left_timestamp_field) diff --git a/pyproject.toml b/pyproject.toml index edd57735a..0c3654d35 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "fennel-ai" -version = "1.5.18" +version = "1.5.19" description = "The modern realtime feature engineering platform" authors = ["Fennel AI "] packages = [{ include = "fennel" }]