From 80ab231b746b72e34c9ad2612eb89c0f070ca73d Mon Sep 17 00:00:00 2001 From: tdoehmen Date: Fri, 12 May 2023 17:07:28 +0200 Subject: [PATCH 01/29] working state --- python/hsfs/constructor/query.py | 42 +++++++++++++++---- python/hsfs/core/feature_view_engine.py | 4 +- .../core/transformation_function_engine.py | 16 +------ 3 files changed, 38 insertions(+), 24 deletions(-) diff --git a/python/hsfs/constructor/query.py b/python/hsfs/constructor/query.py index c7cbe1b2ad..eb1f44c4a9 100644 --- a/python/hsfs/constructor/query.py +++ b/python/hsfs/constructor/query.py @@ -133,12 +133,18 @@ def read( schema, ) - def _collect_features(self): - features = [] - features.extend(self.features) - for j in self.joins: - features.extend(j.query.features) - return features + def _collect_features(self, with_prefix=True): + feature_map = {} + for feat in self._left_features: + feature_map[feat.name] = feat + for join in self.joins: + for feat in join.query._left_features: + if join.prefix and with_prefix: + feature_map[join.prefix + feat.name] = feat + else: + feature_map[feat.name] = feature_map.get(feat.name, []) + [ + feat + ] def show(self, n: int, online: Optional[bool] = False): """Show the first N rows of the Query. @@ -177,7 +183,7 @@ def join( If no join keys are specified, Hopsworks will use the maximal matching subset of the primary keys of the feature groups you are joining. - Joins of one level are supported, no neted joins. + Joins of one level are supported, no nested joins. !!! example "Join two feature groups" ```python @@ -532,3 +538,25 @@ def is_time_travel(self): @property def joins(self): return self._joins + + def __getattr__(self, name): + try: + return self.__getitem__(name) + except KeyError: + raise AttributeError( + f"'Query' object has no attribute '{name}'. " + "If you are trying to access a feature, fall back on " + "using the `get_feature` method." + ) + + def __getitem__(self, name): + if not isinstance(name, str): + raise TypeError( + f"Expected type `str`, got `{type(name)}`. " + "Features are accessible by name." + ) + feature = [f for f in self.__getattribute__("_features") if f.name == name] + if len(feature) == 1: + return feature[0] + else: + raise KeyError(f"'FeatureGroup' object has no feature called '{name}'.") diff --git a/python/hsfs/core/feature_view_engine.py b/python/hsfs/core/feature_view_engine.py index 9c46c2693d..149f12e685 100644 --- a/python/hsfs/core/feature_view_engine.py +++ b/python/hsfs/core/feature_view_engine.py @@ -82,13 +82,13 @@ def save(self, feature_view_obj): # If provided label matches multiple columns without prefix, then raise exception because it is ambiguous. prefix_feature_map = {} feature_map = {} - for feat in feature_view_obj.query.features: + for feat in feature_view_obj.query._left_features: prefix_feature_map[feat.name] = ( feat.name, feature_view_obj.query._left_feature_group, ) for join in feature_view_obj.query.joins: - for feat in join.query.features: + for feat in join.query._left_features: if join.prefix: prefix_feature_map[join.prefix + feat.name] = ( feat.name, diff --git a/python/hsfs/core/transformation_function_engine.py b/python/hsfs/core/transformation_function_engine.py index bde1267d6b..47188d5817 100644 --- a/python/hsfs/core/transformation_function_engine.py +++ b/python/hsfs/core/transformation_function_engine.py @@ -106,21 +106,7 @@ def attach_transformation_fn(training_dataset_obj=None, feature_view_obj=None): # If provided feature matches multiple columns without prefix, then raise exception because it is ambiguous. prefix_feature_map = {} feature_map = {} - for feat in target_obj.query.features: - prefix_feature_map[feat.name] = ( - feat.name, - target_obj.query._left_feature_group, - ) - for join in target_obj.query.joins: - for feat in join.query.features: - if join.prefix: - prefix_feature_map[join.prefix + feat.name] = ( - feat.name, - join.query._left_feature_group, - ) - feature_map[feat.name] = feature_map.get(feat.name, []) + [ - join.query._left_feature_group - ] + if target_obj._transformation_functions: for ( From 51f8c46f234a980a841e3c7ce6302760d4c41191 Mon Sep 17 00:00:00 2001 From: tdoehmen Date: Fri, 12 May 2023 19:28:38 +0200 Subject: [PATCH 02/29] refactored query parameters --- python/hsfs/constructor/query.py | 141 +++++++++++++----- python/hsfs/core/arrow_flight_client.py | 140 +++++------------ python/hsfs/core/feature_view_engine.py | 54 ++----- .../core/transformation_function_engine.py | 56 ++----- python/tests/core/test_arrow_flight_client.py | 9 +- 5 files changed, 165 insertions(+), 235 deletions(-) diff --git a/python/hsfs/constructor/query.py b/python/hsfs/constructor/query.py index eb1f44c4a9..e463cd7bf6 100644 --- a/python/hsfs/constructor/query.py +++ b/python/hsfs/constructor/query.py @@ -21,8 +21,10 @@ from hsfs import util, engine, feature_group from hsfs.core import query_constructor_api, storage_connector_api, arrow_flight_client +from hsfs.feature import Feature from hsfs.constructor import join from hsfs.constructor.filter import Filter, Logic +from hsfs.client.exceptions import FeatureStoreException class Query: @@ -50,6 +52,96 @@ def __init__( self._storage_connector_api = storage_connector_api.StorageConnectorApi( feature_store_id ) + ( + self._features_map, + self._feature_list, + self._featuregroups_map, + self._filters, + ) = self._initialize_collections() + + def _initialize_collections(self): + feature_map = {} + features_list = [] + featuregroups_map = {self._left_feature_group._id: self._left_feature_group} + filters = self._filter + + for feat in self._left_features: + features_list.append(feat) + feature_map[feat.name] = feat + for join_obj in self.joins: + featuregroups_map[ + join_obj.query._left_feature_group._id + ] = join_obj.query._left_feature_group + if filters is None: + filters = join_obj.query._filter + elif join_obj.query._filter is not None: + filters = filters & join_obj.query._filter + + for feat in join_obj.query._left_features: + features_list.append(feat) + if join_obj.prefix: + name_with_prefix = join_obj.prefix + feat.name + if name_with_prefix in feature_map: + raise FeatureStoreException( + f"Feature name {name_with_prefix} already exists in query. Consider changing the prefix." + ) + feature_map[join_obj.prefix + feat.name] = feat + feature_map[feat.name] = ( + feat.name if feat.name not in feature_map else None + ) + + return feature_map, features_list, featuregroups_map, filters + + def _update_collections(self): + ( + self._features_map, + self._feature_list, + self._featuregroups_map, + self._filters, + ) = self._initialize_collections() + + @property + def featuregroups(self): + return list(self._featuregroups_map.values()) + + @property + def features(self) -> List[Feature]: + return self.features_list + + @property + def filters(self) -> Optional[Filter]: + return self._filters + + def get_featuregroup(self, id: int): + if id not in self._featuregroups_map: + raise FeatureStoreException(f"Feature group id {id} not found in query.") + return self._featuregroups_map[id] + + def get_feature(self, feature_name: str) -> Feature: + if feature_name not in self._features_map: + raise FeatureStoreException( + f"Feature name {feature_name} not found in query." + ) + feat = self._features_map[feature_name] + if feat is None: + raise FeatureStoreException( + f"Feature name {feature_name} is ambiguous. Consider using the prefix." + ) + return feat + + def __getattr__(self, name): + try: + return self.__getitem__(name) + except KeyError: + raise AttributeError(f"'Query' object has no attribute '{name}'. ") + + def __getitem__(self, name): + if not isinstance(name, str): + raise TypeError( + f"Expected type `str`, got `{type(name)}`. " + "Features are accessible by name." + ) + return self.get_feature(name) def _prep_read(self, online, read_options): fs_query = self._query_constructor_api.construct_query(self) @@ -61,7 +153,7 @@ def _prep_read(self, online, read_options): online_conn = None if engine.get_instance().is_flyingduck_query_supported(self, read_options): - sql_query = arrow_flight_client.get_instance()._construct_query_object( + sql_query = arrow_flight_client.get_instance().create_query_object( self, sql_query ) else: @@ -133,19 +225,6 @@ def read( schema, ) - def _collect_features(self, with_prefix=True): - feature_map = {} - for feat in self._left_features: - feature_map[feat.name] = feat - for join in self.joins: - for feat in join.query._left_features: - if join.prefix and with_prefix: - feature_map[join.prefix + feat.name] = feat - else: - feature_map[feat.name] = feature_map.get(feat.name, []) + [ - feat - ] - def show(self, n: int, online: Optional[bool] = False): """Show the first N rows of the Query. @@ -223,6 +302,9 @@ def join( self._joins.append( join.Join(sub_query, on, left_on, right_on, join_type.upper(), prefix) ) + + self._update_collections() + return self def as_of( @@ -398,6 +480,9 @@ def filter(self, f: Union[Filter, Logic]): ) elif self._filter is not None: self._filter = self._filter & f + + self._update_collections() + return self def from_cache_feature_group_only(self): @@ -510,10 +595,6 @@ def left_feature_group_start_time(self, left_feature_group_start_time): def left_feature_group_end_time(self, left_feature_group_end_time): self._left_feature_group_end_time = left_feature_group_end_time - @property - def features(self): - return self._left_features - def append_feature(self, feature): """ !!! example @@ -528,6 +609,8 @@ def append_feature(self, feature): """ self._left_features.append(feature) + self._update_collections() + def is_time_travel(self): return ( self.left_feature_group_start_time @@ -538,25 +621,3 @@ def is_time_travel(self): @property def joins(self): return self._joins - - def __getattr__(self, name): - try: - return self.__getitem__(name) - except KeyError: - raise AttributeError( - f"'Query' object has no attribute '{name}'. " - "If you are trying to access a feature, fall back on " - "using the `get_feature` method." - ) - - def __getitem__(self, name): - if not isinstance(name, str): - raise TypeError( - f"Expected type `str`, got `{type(name)}`. " - "Features are accessible by name." - ) - feature = [f for f in self.__getattribute__("_features") if f.name == name] - if len(feature) == 1: - return feature[0] - else: - raise KeyError(f"'FeatureGroup' object has no feature called '{name}'.") diff --git a/python/hsfs/core/arrow_flight_client.py b/python/hsfs/core/arrow_flight_client.py index 79d98dc841..0bd1a1a60a 100644 --- a/python/hsfs/core/arrow_flight_client.py +++ b/python/hsfs/core/arrow_flight_client.py @@ -201,127 +201,54 @@ def read_path(self, path): descriptor = pyarrow.flight.FlightDescriptor.for_path(path) return self._get_dataset(descriptor) - def _construct_query_object(self, query, query_str): - ( - featuregroups, - features, - filters, - ) = self._collect_featuregroups_features_and_filters(query) + def is_flyingduck_query_object(self, query_obj): + return isinstance(query_obj, dict) and "query_string" in query_obj + + def create_query_object(self, query, query_str): + features = {} + for fg in query.featuregroups: + fg_name = self._serialize_featuregroup_name(fg) + features[fg_name] = [feat.name for feat in fg.features] + filters = self._serialize_filter_expression(query) + for feature in features: + features[feature] = list(features[feature]) query = { "query_string": self._translate_to_duckdb(query, query_str), - "featuregroups": featuregroups, "features": features, "filters": filters, } return query - def is_flyingduck_query_object(self, query_obj): - return isinstance(query_obj, dict) and "query_string" in query_obj - - def _translate_to_duckdb(self, query, query_str): - return query_str.replace( - f"`{query._left_feature_group.feature_store_name}`.`", - f"`{query._left_feature_group._get_project_name()}.", - ).replace("`", '"') - - def _update_features(self, features, fg_name, new_features): - updated_features = features.get(fg_name, set()) - updated_features.update(new_features) - features[fg_name] = updated_features - - def _collect_featuregroups_features_and_filters(self, query): - ( - featuregroups, - features, - filters, - ) = self._collect_featuregroups_features_and_filters_rec(query) - filters = self._filter_to_expression(filters, featuregroups, features) - for feature in features: - features[feature] = list(features[feature]) - return featuregroups, features, filters - - def _collect_featuregroups_features_and_filters_rec(self, query): - featuregroups = {} - fg = query._left_feature_group - fg_name = f"{fg._get_project_name()}.{fg.name}_{fg.version}" # featurestore.name_version - featuregroups[fg._id] = fg_name - filters = query._filter - - features = {fg_name: set([feat._name for feat in query._left_features])} - - if fg.event_time: - features[fg_name].update([fg.event_time]) - if fg.primary_key: - features[fg_name].update(fg.primary_key) - for join in query._joins: - join_fg = join._query._left_feature_group - join_fg_name = ( - f"{join_fg._get_project_name()}.{join_fg.name}_{join_fg.version}" - ) - left_on = join._on if len(join._on) > 0 else join._left_on - right_on = join._on if len(join._on) > 0 else join._right_on + def _serialize_featuregroup_name(self, fg): + return f"{fg._get_project_name()}.{fg.name}_{fg.version}" # featurestore.name_version - self._update_features(features, fg_name, [feat._name for feat in left_on]) - self._update_features( - features, join_fg_name, [feat._name for feat in right_on] - ) - ( - join_featuregroups, - join_features, - join_filters, - ) = self._collect_featuregroups_features_and_filters_rec(join._query) - featuregroups.update(join_featuregroups) - for join_fg_name in join_features: - self._update_features( - features, join_fg_name, join_features[join_fg_name] - ) - filters = (filters & join_filters) if join_filters is not None else filters - - return featuregroups, features, filters - - def _filter_to_expression(self, filters, featuregroups, features): - if not filters: + def _serialize_filter_expression(self, query): + if query.filters is None: return None - return self._resolve_logic(filters, featuregroups, features) + return self._serialize_logic(query.filters, query) - def _resolve_logic(self, logic, featuregroups, features): + def _serialize_logic(self, logic, query): return { "type": "logic", "logic_type": logic._type, - "left_filter": self._resolve_filter_or_logic( - logic._left_f, logic._left_l, featuregroups, features + "left_filter": self._serialize_filter_or_logic( + logic._left_f, logic._left_l, query ), - "right_filter": self._resolve_filter_or_logic( - logic._right_f, logic._right_l, featuregroups, features + "right_filter": self._serialize_filter_or_logic( + logic._right_f, logic._right_l, query ), } - def _resolve_filter_or_logic(self, filter, logic, featuregroups, features): + def _serialize_filter_or_logic(self, filter, logic, query): if filter: - return self._resolve_filter(filter, featuregroups, features) + return self._serialize_filter(filter, query) elif logic: - return self._resolve_logic(logic, featuregroups, features) + return self._serialize_logic(logic, query) else: return None - def _get_full_feature_name(self, feature, featuregroups, features): - featuregroup_name = None - - if feature._feature_group_id is None: - for fg_name, fg_features in features.items(): - if feature._name in fg_features: - featuregroup_name = fg_name - break - elif feature._feature_group_id in featuregroups: - featuregroup_name = featuregroups[feature._feature_group_id] - - if featuregroup_name is None: - raise FeatureStoreException(f"Feature {feature._name} not found in query") - - return f"{featuregroup_name}.{feature._name}" - - def _resolve_filter(self, filter, featuregroups, features): + def _serialize_filter(self, filter, query): if isinstance(filter._value, datetime.datetime): filter_value = filter._value.strftime("%Y-%m-%d %H:%M:%S") else: @@ -331,11 +258,22 @@ def _resolve_filter(self, filter, featuregroups, features): "type": "filter", "condition": filter._condition, "value": filter_value, - "feature": self._get_full_feature_name( - filter._feature, featuregroups, features - ), + "feature": self._serialize_feature_name(filter._feature, query), "numeric": False, # For backwards compatibility } + def _serialize_feature_name(self, feature, query): + if feature._feature_group_id is None: + feature = query.get_feature(feature._name) + fg = query.get_featuregroup(feature._feature_group_id) + fg_name = self._serialize_featuregroup_name(fg) + return f"{fg_name}.{feature._name}" + + def _translate_to_duckdb(self, query, query_str): + return query_str.replace( + f"`{query._left_feature_group.feature_store_name}`.`", + f"`{query._left_feature_group._get_project_name()}.", + ).replace("`", '"') + def _info_to_ticket(self, info): return info.endpoints[0].ticket diff --git a/python/hsfs/core/feature_view_engine.py b/python/hsfs/core/feature_view_engine.py index 149f12e685..3851be3121 100644 --- a/python/hsfs/core/feature_view_engine.py +++ b/python/hsfs/core/feature_view_engine.py @@ -76,53 +76,17 @@ def save(self, feature_view_obj): " feature view does not support time travel query." ) if feature_view_obj.labels: - # If provided label matches column with prefix, then attach label. - # If provided label matches only one column without prefix, then attach label. (For - # backward compatibility purpose, as of v3.0, labels are matched to columns without prefix.) - # If provided label matches multiple columns without prefix, then raise exception because it is ambiguous. - prefix_feature_map = {} - feature_map = {} - for feat in feature_view_obj.query._left_features: - prefix_feature_map[feat.name] = ( - feat.name, - feature_view_obj.query._left_feature_group, - ) - for join in feature_view_obj.query.joins: - for feat in join.query._left_features: - if join.prefix: - prefix_feature_map[join.prefix + feat.name] = ( - feat.name, - join.query._left_feature_group, - ) - feature_map[feat.name] = feature_map.get(feat.name, []) + [ - join.query._left_feature_group - ] - for label_name in feature_view_obj.labels: - if label_name in prefix_feature_map: - feature_view_obj._features.append( - training_dataset_feature.TrainingDatasetFeature( - name=prefix_feature_map[label_name][0], - label=True, - featuregroup=prefix_feature_map[label_name][1], - ) - ) - elif label_name in feature_map: - if len(feature_map[label_name]) > 1: - raise FeatureStoreException( - FeatureViewEngine.AMBIGUOUS_LABEL_ERROR.format(label_name) - ) - feature_view_obj._features.append( - training_dataset_feature.TrainingDatasetFeature( - name=label_name, - label=True, - featuregroup=feature_map[label_name][0], - ) - ) - else: - raise FeatureStoreException( - FeatureViewEngine.LABEL_NOT_EXIST_ERROR.format(label_name) + feature = feature_view_obj.query.get_feature(label_name) + feature_view_obj._features.append( + training_dataset_feature.TrainingDatasetFeature( + name=feature.name, + label=True, + featuregroup=feature_view_obj.query.get_featuregroup( + feature.feature_group_id + ), ) + ) self._transformation_function_engine.attach_transformation_fn(feature_view_obj) updated_fv = self._feature_view_api.post(feature_view_obj) self.attach_transformation_function(updated_fv) diff --git a/python/hsfs/core/transformation_function_engine.py b/python/hsfs/core/transformation_function_engine.py index 47188d5817..580b8add4b 100644 --- a/python/hsfs/core/transformation_function_engine.py +++ b/python/hsfs/core/transformation_function_engine.py @@ -19,7 +19,6 @@ from functools import partial from hsfs import training_dataset, training_dataset_feature -from hsfs.client.exceptions import FeatureStoreException from hsfs.core import transformation_function_api, statistics_api from hsfs.core.builtin_transformation_function import BuiltInTransformationFunction from hsfs import util @@ -100,13 +99,6 @@ def attach_transformation_fn(training_dataset_obj=None, feature_view_obj=None): target_obj = training_dataset_obj # todo why provide td and fv just to convert to target_obj? else: target_obj = feature_view_obj - # If provided feature matches column with prefix, then attach transformation function. - # If provided feature matches only one column without prefix, then attach transformation function. (For - # backward compatibility purpose, as of v3.0, features are matched to columns without prefix.) - # If provided feature matches multiple columns without prefix, then raise exception because it is ambiguous. - prefix_feature_map = {} - feature_map = {} - if target_obj._transformation_functions: for ( @@ -117,42 +109,20 @@ def attach_transformation_fn(training_dataset_obj=None, feature_view_obj=None): raise ValueError( "Online transformations for training dataset labels are not supported." ) - if feature_name in prefix_feature_map: - target_obj._features.append( - training_dataset_feature.TrainingDatasetFeature( - name=feature_name, - feature_group_feature_name=prefix_feature_map[feature_name][ - 0 - ], - featuregroup=prefix_feature_map[feature_name][1], - type=transformation_fn.output_type, - label=False, - transformation_function=transformation_fn, - ) - ) - elif feature_name in feature_map: - if len(feature_map[feature_name]) > 1: - raise FeatureStoreException( - TransformationFunctionEngine.AMBIGUOUS_FEATURE_ERROR.format( - feature_name - ) - ) - target_obj._features.append( - training_dataset_feature.TrainingDatasetFeature( - name=feature_name, - feature_group_feature_name=feature_name, - featuregroup=feature_map[feature_name][0], - type=transformation_fn.output_type, - label=False, - transformation_function=transformation_fn, - ) - ) - else: - raise FeatureStoreException( - TransformationFunctionEngine.FEATURE_NOT_EXIST_ERROR.format( - feature_name - ) + + feature = target_obj.query.get_feature(feature_name) + target_obj._features.append( + training_dataset_feature.TrainingDatasetFeature( + name=feature_name, + feature_group_feature_name=feature.name, + featuregroup=target_obj.query.get_featuregroup( + feature.feature_group_id + ), + type=transformation_fn.output_type, + label=False, + transformation_function=transformation_fn, ) + ) def is_builtin(self, transformation_fn_instance): return ( diff --git a/python/tests/core/test_arrow_flight_client.py b/python/tests/core/test_arrow_flight_client.py index 924f10a5d6..160e4f72aa 100644 --- a/python/tests/core/test_arrow_flight_client.py +++ b/python/tests/core/test_arrow_flight_client.py @@ -266,14 +266,13 @@ def test_construct_query_object(self, mocker, backend_fixtures): ) # Act - query_object = arrow_flight_client.get_instance()._construct_query_object( + query_object = arrow_flight_client.get_instance().create_query_object( query, "SELECT * FROM..." ) # Assert query_object_reference = { "query_string": "SELECT * FROM...", - "featuregroups": {15: "test.fg_test_1"}, "features": {"test.fg_test_1": ["intt", "stringt"]}, "filters": { "type": "logic", @@ -341,14 +340,13 @@ def test_construct_query_object_datetime_filter(self, mocker, backend_fixtures): ) # Act - query_object = arrow_flight_client.get_instance()._construct_query_object( + query_object = arrow_flight_client.get_instance().create_query_object( query, "SELECT * FROM..." ) # Assert query_object_reference = { "query_string": "SELECT * FROM...", - "featuregroups": {15: "test.fg_test_1"}, "features": {"test.fg_test_1": ["intt", "stringt"]}, "filters": { "type": "logic", @@ -380,14 +378,13 @@ def test_construct_query_object_without_fs(self, mocker, backend_fixtures): query = test_fg1.select_all().filter(Feature("intt") > 500) # Act - query_object = arrow_flight_client.get_instance()._construct_query_object( + query_object = arrow_flight_client.get_instance().create_query_object( query, "SELECT * FROM..." ) # Assert query_object_reference = { "query_string": "SELECT * FROM...", - "featuregroups": {15: "test.fg_test_1"}, "features": {"test.fg_test_1": ["intt", "stringt"]}, "filters": { "type": "logic", From 5e7856dd1303b58748b0cd2151e16fece202c8bd Mon Sep 17 00:00:00 2001 From: tdoehmen Date: Fri, 12 May 2023 19:31:20 +0200 Subject: [PATCH 03/29] removed numeric parameter --- python/hsfs/core/arrow_flight_client.py | 1 - python/tests/core/test_arrow_flight_client.py | 6 ------ 2 files changed, 7 deletions(-) diff --git a/python/hsfs/core/arrow_flight_client.py b/python/hsfs/core/arrow_flight_client.py index 0bd1a1a60a..727e40aca4 100644 --- a/python/hsfs/core/arrow_flight_client.py +++ b/python/hsfs/core/arrow_flight_client.py @@ -259,7 +259,6 @@ def _serialize_filter(self, filter, query): "condition": filter._condition, "value": filter_value, "feature": self._serialize_feature_name(filter._feature, query), - "numeric": False, # For backwards compatibility } def _serialize_feature_name(self, feature, query): diff --git a/python/tests/core/test_arrow_flight_client.py b/python/tests/core/test_arrow_flight_client.py index 160e4f72aa..84da098824 100644 --- a/python/tests/core/test_arrow_flight_client.py +++ b/python/tests/core/test_arrow_flight_client.py @@ -288,14 +288,12 @@ def test_construct_query_object(self, mocker, backend_fixtures): "condition": "GREATER_THAN", "value": 500, "feature": "test.fg_test_1.intt", - "numeric": False, }, "right_filter": { "type": "filter", "condition": "LESS_THAN", "value": 0.1, "feature": "test.fg_test_1.stringt", - "numeric": False, }, }, "right_filter": { @@ -303,7 +301,6 @@ def test_construct_query_object(self, mocker, backend_fixtures): "condition": "LESS_THAN", "value": 700, "feature": "test.fg_test_1.intt", - "numeric": False, }, }, "right_filter": { @@ -314,7 +311,6 @@ def test_construct_query_object(self, mocker, backend_fixtures): "condition": "GREATER_THAN", "value": 500, "feature": "test.fg_test_1.intt", - "numeric": False, }, "right_filter": None, }, @@ -356,7 +352,6 @@ def test_construct_query_object_datetime_filter(self, mocker, backend_fixtures): "condition": "GREATER_THAN", "value": "2011-03-01 12:57:02", "feature": "test.fg_test_1.intt", - "numeric": False, }, "right_filter": None, }, @@ -394,7 +389,6 @@ def test_construct_query_object_without_fs(self, mocker, backend_fixtures): "condition": "GREATER_THAN", "value": 500, "feature": "test.fg_test_1.intt", - "numeric": False, }, "right_filter": None, }, From 1ae8865cd1e9ebb92814dffdfc4d7baf449dbac1 Mon Sep 17 00:00:00 2001 From: tdoehmen Date: Fri, 12 May 2023 19:46:55 +0200 Subject: [PATCH 04/29] fix bug --- python/hsfs/constructor/query.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/hsfs/constructor/query.py b/python/hsfs/constructor/query.py index e463cd7bf6..b4fa9e8b59 100644 --- a/python/hsfs/constructor/query.py +++ b/python/hsfs/constructor/query.py @@ -106,7 +106,7 @@ def featuregroups(self): @property def features(self) -> List[Feature]: - return self.features_list + return self._feature_list @property def filters(self) -> Optional[Filter]: From 3dba7fe25650f8257aef30d830214a8187004652 Mon Sep 17 00:00:00 2001 From: tdoehmen Date: Fri, 12 May 2023 20:02:39 +0200 Subject: [PATCH 05/29] look in all features in all featuregroups --- python/hsfs/core/arrow_flight_client.py | 43 +++++++++++++++++-------- 1 file changed, 29 insertions(+), 14 deletions(-) diff --git a/python/hsfs/core/arrow_flight_client.py b/python/hsfs/core/arrow_flight_client.py index 727e40aca4..45f628defd 100644 --- a/python/hsfs/core/arrow_flight_client.py +++ b/python/hsfs/core/arrow_flight_client.py @@ -209,7 +209,7 @@ def create_query_object(self, query, query_str): for fg in query.featuregroups: fg_name = self._serialize_featuregroup_name(fg) features[fg_name] = [feat.name for feat in fg.features] - filters = self._serialize_filter_expression(query) + filters = self._serialize_filter_expression(query, features) for feature in features: features[feature] = list(features[feature]) @@ -223,32 +223,32 @@ def create_query_object(self, query, query_str): def _serialize_featuregroup_name(self, fg): return f"{fg._get_project_name()}.{fg.name}_{fg.version}" # featurestore.name_version - def _serialize_filter_expression(self, query): + def _serialize_filter_expression(self, query, all_features): if query.filters is None: return None - return self._serialize_logic(query.filters, query) + return self._serialize_logic(query.filters, query, all_features) - def _serialize_logic(self, logic, query): + def _serialize_logic(self, logic, query, all_features): return { "type": "logic", "logic_type": logic._type, "left_filter": self._serialize_filter_or_logic( - logic._left_f, logic._left_l, query + logic._left_f, logic._left_l, query, all_features ), "right_filter": self._serialize_filter_or_logic( - logic._right_f, logic._right_l, query + logic._right_f, logic._right_l, query, all_features ), } - def _serialize_filter_or_logic(self, filter, logic, query): + def _serialize_filter_or_logic(self, filter, logic, query, all_features): if filter: - return self._serialize_filter(filter, query) + return self._serialize_filter(filter, query, all_features) elif logic: - return self._serialize_logic(logic, query) + return self._serialize_logic(logic, query, all_features) else: return None - def _serialize_filter(self, filter, query): + def _serialize_filter(self, filter, query, all_features): if isinstance(filter._value, datetime.datetime): filter_value = filter._value.strftime("%Y-%m-%d %H:%M:%S") else: @@ -258,13 +258,28 @@ def _serialize_filter(self, filter, query): "type": "filter", "condition": filter._condition, "value": filter_value, - "feature": self._serialize_feature_name(filter._feature, query), + "feature": self._serialize_feature_name( + filter._feature, query, all_features + ), } - def _serialize_feature_name(self, feature, query): - if feature._feature_group_id is None: + def _serialize_feature_name(self, feature, query, all_features): + fg_id = feature._feature_group_id + if fg_id is None: # featuregroup id not set + # 1) search for feature in all featuregroups + for fg in all_features: + if feature._name in all_features[fg]: + fg_id = feature._feature_group_id + break + if fg_id is None: # featuregroup id still not set + # 2) search for feature including query prefixes feature = query.get_feature(feature._name) - fg = query.get_featuregroup(feature._feature_group_id) + fg_id = feature._feature_group_id + if fg_id is None: + raise FeatureStoreException( + f"Could not find feature {feature._name} in any of the featuregroups in the query" + ) + fg = query.get_featuregroup(fg_id) fg_name = self._serialize_featuregroup_name(fg) return f"{fg_name}.{feature._name}" From 48f1145a4d3b67c6647400a75952b66beb7a7c6b Mon Sep 17 00:00:00 2001 From: tdoehmen Date: Fri, 12 May 2023 20:12:12 +0200 Subject: [PATCH 06/29] support excluded features --- python/hsfs/core/arrow_flight_client.py | 41 +++++++++---------- python/tests/core/test_arrow_flight_client.py | 37 +++++++++++++++++ 2 files changed, 57 insertions(+), 21 deletions(-) diff --git a/python/hsfs/core/arrow_flight_client.py b/python/hsfs/core/arrow_flight_client.py index 45f628defd..2650780e4a 100644 --- a/python/hsfs/core/arrow_flight_client.py +++ b/python/hsfs/core/arrow_flight_client.py @@ -209,7 +209,7 @@ def create_query_object(self, query, query_str): for fg in query.featuregroups: fg_name = self._serialize_featuregroup_name(fg) features[fg_name] = [feat.name for feat in fg.features] - filters = self._serialize_filter_expression(query, features) + filters = self._serialize_filter_expression(query) for feature in features: features[feature] = list(features[feature]) @@ -223,32 +223,32 @@ def create_query_object(self, query, query_str): def _serialize_featuregroup_name(self, fg): return f"{fg._get_project_name()}.{fg.name}_{fg.version}" # featurestore.name_version - def _serialize_filter_expression(self, query, all_features): + def _serialize_filter_expression(self, query): if query.filters is None: return None - return self._serialize_logic(query.filters, query, all_features) + return self._serialize_logic(query.filters, query) - def _serialize_logic(self, logic, query, all_features): + def _serialize_logic(self, logic, query): return { "type": "logic", "logic_type": logic._type, "left_filter": self._serialize_filter_or_logic( - logic._left_f, logic._left_l, query, all_features + logic._left_f, logic._left_l, query ), "right_filter": self._serialize_filter_or_logic( - logic._right_f, logic._right_l, query, all_features + logic._right_f, logic._right_l, query ), } - def _serialize_filter_or_logic(self, filter, logic, query, all_features): + def _serialize_filter_or_logic(self, filter, logic, query): if filter: - return self._serialize_filter(filter, query, all_features) + return self._serialize_filter(filter, query) elif logic: - return self._serialize_logic(logic, query, all_features) + return self._serialize_logic(logic, query) else: return None - def _serialize_filter(self, filter, query, all_features): + def _serialize_filter(self, filter, query): if isinstance(filter._value, datetime.datetime): filter_value = filter._value.strftime("%Y-%m-%d %H:%M:%S") else: @@ -258,30 +258,29 @@ def _serialize_filter(self, filter, query, all_features): "type": "filter", "condition": filter._condition, "value": filter_value, - "feature": self._serialize_feature_name( - filter._feature, query, all_features - ), + "feature": self._serialize_feature_name(filter._feature, query), } - def _serialize_feature_name(self, feature, query, all_features): + def _serialize_feature_name(self, feature, query): fg_id = feature._feature_group_id if fg_id is None: # featuregroup id not set # 1) search for feature in all featuregroups - for fg in all_features: - if feature._name in all_features[fg]: - fg_id = feature._feature_group_id - break + for fg in query.featuregroups: + for feat in fg.features: + if feature.name == feat.name: + fg_id = feat._feature_group_id + break if fg_id is None: # featuregroup id still not set # 2) search for feature including query prefixes - feature = query.get_feature(feature._name) + feature = query.get_feature(feature.name) fg_id = feature._feature_group_id if fg_id is None: raise FeatureStoreException( - f"Could not find feature {feature._name} in any of the featuregroups in the query" + f"Could not find feature {feature.name} in any of the featuregroups in the query" ) fg = query.get_featuregroup(fg_id) fg_name = self._serialize_featuregroup_name(fg) - return f"{fg_name}.{feature._name}" + return f"{fg_name}.{feature.name}" def _translate_to_duckdb(self, query, query_str): return query_str.replace( diff --git a/python/tests/core/test_arrow_flight_client.py b/python/tests/core/test_arrow_flight_client.py index 84da098824..81e7fbef5c 100644 --- a/python/tests/core/test_arrow_flight_client.py +++ b/python/tests/core/test_arrow_flight_client.py @@ -399,3 +399,40 @@ def test_construct_query_object_without_fs(self, mocker, backend_fixtures): } assert str(query_object_reference) == str(query_object) + + def test_construct_query_object_without_fs_excluded(self, mocker, backend_fixtures): + # Arrange + self._arrange_engine_mocks(mocker, backend_fixtures) + json1 = backend_fixtures["feature_group"]["get"]["response"] + test_fg1 = feature_group.FeatureGroup.from_response_json(json1) + mocker.patch("hsfs.constructor.query.Query.to_string", return_value="") + mocker.patch("hsfs.constructor.query.Query._to_string", return_value="") + query = test_fg1.select_except(["intt"]).filter(Feature("intt") > 500) + + # Act + query_object = arrow_flight_client.get_instance().create_query_object( + query, "SELECT * FROM..." + ) + + # Assert + query_object_reference = { + "query_string": "SELECT * FROM...", + "features": {"test.fg_test_1": ["intt", "stringt"]}, + "filters": { + "type": "logic", + "logic_type": "SINGLE", + "left_filter": { + "type": "filter", + "condition": "GREATER_THAN", + "value": 500, + "feature": "test.fg_test_1.intt", + }, + "right_filter": None, + }, + } + + query_object["features"] = { + key: sorted(value) for key, value in query_object["features"].items() + } + + assert str(query_object_reference) == str(query_object) From eb57d6ef37342c658018a0a721c1ace87a2377bc Mon Sep 17 00:00:00 2001 From: tdoehmen Date: Fri, 12 May 2023 20:27:23 +0200 Subject: [PATCH 07/29] added features property to featureview --- python/hsfs/feature_view.py | 5 +++++ python/hsfs/training_dataset_feature.py | 3 +++ 2 files changed, 8 insertions(+) diff --git a/python/hsfs/feature_view.py b/python/hsfs/feature_view.py index 45e1ed53eb..3ed3d19f4e 100644 --- a/python/hsfs/feature_view.py +++ b/python/hsfs/feature_view.py @@ -2260,6 +2260,11 @@ def schema(self): """Feature view schema.""" return self._features + @property + def features(self): + """Feature view schema. (alias)""" + return self._features + @schema.setter def schema(self, features): self._features = features diff --git a/python/hsfs/training_dataset_feature.py b/python/hsfs/training_dataset_feature.py index 8760c3a023..d09fc03efc 100644 --- a/python/hsfs/training_dataset_feature.py +++ b/python/hsfs/training_dataset_feature.py @@ -111,3 +111,6 @@ def transformation_function(self, transformation_function): @property def feature_group(self): return self._feature_group + + def __repr__(self): + return f"Training Dataset Feature({self._name!r}, {self._type!r}, {self._index!r}, {self._label}, {self._feature_group_feature_name}, {self._feature_group.id!r})" From c45f51c83c7512b8e2e5829b123a1990d3ab9703 Mon Sep 17 00:00:00 2001 From: tdoehmen Date: Fri, 12 May 2023 20:31:34 +0200 Subject: [PATCH 08/29] include transformation functions in repr --- python/hsfs/training_dataset_feature.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/hsfs/training_dataset_feature.py b/python/hsfs/training_dataset_feature.py index d09fc03efc..71005580c3 100644 --- a/python/hsfs/training_dataset_feature.py +++ b/python/hsfs/training_dataset_feature.py @@ -113,4 +113,4 @@ def feature_group(self): return self._feature_group def __repr__(self): - return f"Training Dataset Feature({self._name!r}, {self._type!r}, {self._index!r}, {self._label}, {self._feature_group_feature_name}, {self._feature_group.id!r})" + return f"Training Dataset Feature({self._name!r}, {self._type!r}, {self._index!r}, {self._label}, {self._transformation_function}, {self._feature_group_feature_name}, {self._feature_group.id!r})" From 8cab66a0bd9f951e1583ed7ac4b893412303ffe3 Mon Sep 17 00:00:00 2001 From: tdoehmen Date: Fri, 12 May 2023 20:44:09 +0200 Subject: [PATCH 09/29] fix column check on join --- python/hsfs/constructor/query.py | 18 ++++++++++++++---- python/tests/core/test_arrow_flight_client.py | 1 + 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/python/hsfs/constructor/query.py b/python/hsfs/constructor/query.py index b4fa9e8b59..a3de52aa29 100644 --- a/python/hsfs/constructor/query.py +++ b/python/hsfs/constructor/query.py @@ -66,8 +66,12 @@ def _initialize_collections(self): filters = self._filter for feat in self._left_features: - features_list.append(feat) + if feat.name in feature_map: + raise FeatureStoreException( + f"Feature name {feat.name} already exists in query." + ) feature_map[feat.name] = feat + features_list.append(feat) for join_obj in self.joins: featuregroups_map[ join_obj.query._left_feature_group._id @@ -86,9 +90,15 @@ def _initialize_collections(self): f"Feature name {name_with_prefix} already exists in query. Consider changing the prefix." ) feature_map[join_obj.prefix + feat.name] = feat - feature_map[feat.name] = ( - feat.name if feat.name not in feature_map else None - ) + feature_map[feat.name] = ( + feat if feat.name not in feature_map else None + ) + else: + if feat.name in feature_map: + raise FeatureStoreException( + f"Feature name {feat.name} already exists in query." + ) + feature_map[feat.name] = feat return feature_map, features_list, featuregroups_map, filters diff --git a/python/tests/core/test_arrow_flight_client.py b/python/tests/core/test_arrow_flight_client.py index 81e7fbef5c..565f9a91c3 100644 --- a/python/tests/core/test_arrow_flight_client.py +++ b/python/tests/core/test_arrow_flight_client.py @@ -261,6 +261,7 @@ def test_construct_query_object(self, mocker, backend_fixtures): test_fg2.filter(test_fg2.features[0] > 500), left_on=["intt"], right_on=["intt"], + prefix="test_", ) .filter(test_fg1.features[0] < 700) ) From f8b0809337ee15a14267838eba7f5e1c099f9f0d Mon Sep 17 00:00:00 2001 From: tdoehmen Date: Fri, 12 May 2023 20:58:59 +0200 Subject: [PATCH 10/29] updated warning messages --- python/hsfs/constructor/query.py | 8 +++----- python/hsfs/core/feature_view_engine.py | 7 ------- 2 files changed, 3 insertions(+), 12 deletions(-) diff --git a/python/hsfs/constructor/query.py b/python/hsfs/constructor/query.py index a3de52aa29..f972302d9b 100644 --- a/python/hsfs/constructor/query.py +++ b/python/hsfs/constructor/query.py @@ -67,9 +67,7 @@ def _initialize_collections(self): for feat in self._left_features: if feat.name in feature_map: - raise FeatureStoreException( - f"Feature name {feat.name} already exists in query." - ) + raise FeatureStoreException(f"Feature name {feat.name} is not unique.") feature_map[feat.name] = feat features_list.append(feat) for join_obj in self.joins: @@ -96,7 +94,7 @@ def _initialize_collections(self): else: if feat.name in feature_map: raise FeatureStoreException( - f"Feature name {feat.name} already exists in query." + f"Feature name {feat.name} already exists in query. Consider using a prefix." ) feature_map[feat.name] = feat @@ -135,7 +133,7 @@ def get_feature(self, feature_name: str) -> Feature: feat = self._features_map[feature_name] if feat is None: raise FeatureStoreException( - f"Feature name {feature_name} is ambiguous. Consider using the prefix." + f"Feature name {feature_name} is ambiguous. Consider using a prefix." ) return feat diff --git a/python/hsfs/core/feature_view_engine.py b/python/hsfs/core/feature_view_engine.py index 3851be3121..aa045007ad 100644 --- a/python/hsfs/core/feature_view_engine.py +++ b/python/hsfs/core/feature_view_engine.py @@ -37,13 +37,6 @@ class FeatureViewEngine: _TRAINING_DATA_API_PATH = "trainingdatasets" _OVERWRITE = "overwrite" _APPEND = "append" - AMBIGUOUS_LABEL_ERROR = ( - "Provided label '{}' is ambiguous and exists in more than one feature groups. " - "You can provide the label with the prefix you specify in the join." - ) - LABEL_NOT_EXIST_ERROR = ( - "Provided label '{}' do not exist in any of the feature groups." - ) def __init__(self, feature_store_id): self._feature_store_id = feature_store_id From 30aa09550a53d84f262855533a321d533b7eedd6 Mon Sep 17 00:00:00 2001 From: tdoehmen Date: Tue, 16 May 2023 17:10:21 +0200 Subject: [PATCH 11/29] more refactoring, fixed tests --- python/hsfs/constructor/query.py | 264 +++++++++++------- python/hsfs/core/arrow_flight_client.py | 28 +- python/hsfs/core/feature_view_engine.py | 8 +- .../core/transformation_function_engine.py | 8 +- python/tests/constructor/test_query.py | 2 +- python/tests/core/test_feature_view_engine.py | 13 +- 6 files changed, 199 insertions(+), 124 deletions(-) diff --git a/python/hsfs/constructor/query.py b/python/hsfs/constructor/query.py index f972302d9b..59bc3bb16e 100644 --- a/python/hsfs/constructor/query.py +++ b/python/hsfs/constructor/query.py @@ -28,6 +28,19 @@ class Query: + ERROR_MESSAGE_ALREADY_EXISTS = "Feature name {} already exists in query." + ERROR_MESSAGE_CHANGE_PREFIX = ( + "Feature name {} already exists in query. Consider changing the prefix." + ) + ERROR_MESSAGE_USE_PREFIX = ( + "Feature name {} already exists in query. Consider using a prefix." + ) + ERROR_MESSAGE_FEATURE_NOT_UNIQUE = "Feature name {} is not unique." + ERROR_MESSAGE_FEATURE_AMBIGUOUS = ( + "Feature name {} is ambiguous. Consider using a prefix." + ) + ERROR_MESSAGE_FEATURE_NOT_FOUND = "Feature name {} not found in query." + def __init__( self, left_feature_group, @@ -52,104 +65,81 @@ def __init__( self._storage_connector_api = storage_connector_api.StorageConnectorApi( feature_store_id ) - ( - self._features_map, - self._feature_list, - self._featuregroups_map, - self._filters, - ) = self._initialize_collections() - - def _initialize_collections(self): - feature_map = {} - features_list = [] - featuregroups_map = {self._left_feature_group._id: self._left_feature_group} - filters = self._filter + self._populate_collections() + + def _check_join(self, join_obj): + for feat in join_obj.query._left_features: + prefix = join_obj.prefix + if self._feature_exists_in_query(feat.name, prefix): + name = f"{prefix}{feat.name}" if prefix else feat.name + message = ( + Query.ERROR_MESSAGE_CHANGE_PREFIX + if prefix + else Query.ERROR_MESSAGE_USE_PREFIX + ) + raise FeatureStoreException(message.format(name)) + + def _feature_exists_in_query(self, feature_name, prefix=None): + existing_features = self._query_features.get(feature_name, []) + if any([feature[1] == prefix for feature in existing_features]): + return True + if prefix: + name_with_prefix = f"{prefix}{feature_name}" + existing_features = self._query_features.get(name_with_prefix, []) + return any([feature[1] is None for feature in existing_features]) + + return False + + def _add_to_collection(self, feat, prefix, featuregroup, query_feature=True): + collection = ( + self._query_features if query_feature else self._featuregroup_features + ) + feature_entry = (feat, prefix, featuregroup) + collection[feat.name] = collection.get(feat.name, []) + [feature_entry] + if prefix: + name_with_prefix = f"{prefix}{feat.name}" + collection[name_with_prefix] = collection.get(name_with_prefix, []) + [ + feature_entry + ] + if query_feature: + self._feature_list.append(feature_entry) + + def _populate_collections(self): + self._featuregroups = {self._left_feature_group} + self._query_features = {} + self._featuregroup_features = {} + self._feature_list = [] + self._filters = self._filter for feat in self._left_features: - if feat.name in feature_map: - raise FeatureStoreException(f"Feature name {feat.name} is not unique.") - feature_map[feat.name] = feat - features_list.append(feat) + if self._feature_exists_in_query(feat.name): + raise FeatureStoreException( + Query.ERROR_MESSAGE_FEATURE_NOT_UNIQUE.format(feat.name) + ) + self._add_to_collection(feat, None, self._left_feature_group) + for feat in self._left_feature_group.features: + self._add_to_collection( + feat, None, self._left_feature_group, query_feature=False + ) for join_obj in self.joins: - featuregroups_map[ - join_obj.query._left_feature_group._id - ] = join_obj.query._left_feature_group - if filters is None: - filters = join_obj.query._filter + self._featuregroups.add(join_obj.query._left_feature_group) + + if self._filters is None: + self._filters = join_obj.query._filter elif join_obj.query._filter is not None: - filters = filters & join_obj.query._filter + self._filters = self._filters & join_obj.query._filter for feat in join_obj.query._left_features: - features_list.append(feat) - if join_obj.prefix: - name_with_prefix = join_obj.prefix + feat.name - if name_with_prefix in feature_map: - raise FeatureStoreException( - f"Feature name {name_with_prefix} already exists in query. Consider changing the prefix." - ) - feature_map[join_obj.prefix + feat.name] = feat - feature_map[feat.name] = ( - feat if feat.name not in feature_map else None - ) - else: - if feat.name in feature_map: - raise FeatureStoreException( - f"Feature name {feat.name} already exists in query. Consider using a prefix." - ) - feature_map[feat.name] = feat - - return feature_map, features_list, featuregroups_map, filters - - def _update_collections(self): - ( - self._features_map, - self._feature_list, - self._featuregroups_map, - self._filters, - ) = self._initialize_collections() - - @property - def featuregroups(self): - return list(self._featuregroups_map.values()) - - @property - def features(self) -> List[Feature]: - return self._feature_list - - @property - def filters(self) -> Optional[Filter]: - return self._filters - - def get_featuregroup(self, id: int): - if id not in self._featuregroups_map: - raise FeatureStoreException(f"Feature group id {id} not found in query.") - return self._featuregroups_map[id] - - def get_feature(self, feature_name: str) -> Feature: - if feature_name not in self._features_map: - raise FeatureStoreException( - f"Feature name {feature_name} not found in query." - ) - feat = self._features_map[feature_name] - if feat is None: - raise FeatureStoreException( - f"Feature name {feature_name} is ambiguous. Consider using a prefix." - ) - return feat - - def __getattr__(self, name): - try: - return self.__getitem__(name) - except KeyError: - raise AttributeError(f"'Query' object has no attribute '{name}'. ") - - def __getitem__(self, name): - if not isinstance(name, str): - raise TypeError( - f"Expected type `str`, got `{type(name)}`. " - "Features are accessible by name." - ) - return self.get_feature(name) + self._add_to_collection( + feat, join_obj.prefix, join_obj.query._left_feature_group + ) + for feat in join_obj.query._left_feature_group.features: + self._add_to_collection( + feat, + join_obj.prefix, + join_obj.query._left_feature_group, + query_feature=False, + ) def _prep_read(self, online, read_options): fs_query = self._query_constructor_api.construct_query(self) @@ -307,11 +297,15 @@ def join( # Returns `Query`: A new Query object representing the join. """ - self._joins.append( - join.Join(sub_query, on, left_on, right_on, join_type.upper(), prefix) + new_join = join.Join( + sub_query, on, left_on, right_on, join_type.upper(), prefix ) - self._update_collections() + self._check_join(new_join) + + self._joins.append(new_join) + + self._populate_collections() return self @@ -489,7 +483,7 @@ def filter(self, f: Union[Filter, Logic]): elif self._filter is not None: self._filter = self._filter & f - self._update_collections() + self._populate_collections() return self @@ -615,9 +609,14 @@ def append_feature(self, feature): query.append_feature('feature_name') ``` """ + if self._feature_exists_in_query(feature.name): + raise FeatureStoreException( + Query.ERROR_MESSAGE_ALREADY_EXISTS.format(feature.name) + ) + self._left_features.append(feature) - self._update_collections() + self._populate_collections() def is_time_travel(self): return ( @@ -629,3 +628,74 @@ def is_time_travel(self): @property def joins(self): return self._joins + + @property + def featuregroups(self): + return list(self._featuregroups) + + @property + def filters(self): + return self._filters + + def _filter_properties(self, feat, with_prefix=False, with_featuregroup=False): + if with_prefix and with_featuregroup: + return feat + elif with_prefix: + return (feat[0], feat[1]) + elif with_featuregroup: + return (feat[0], feat[2]) + else: + return feat[0] + + @property + def features(self, with_prefix=False, with_featuregroup=False): + return [ + self._filter_properties(feat, with_prefix, with_featuregroup) + for feat in self._feature_list + ] + + def get_feature( + self, + feature_name: str, + with_prefix=False, + with_featuregroup=False, + include_unselected=False, + ) -> Feature: + feature_lookup = ( + self._query_features + if not include_unselected + else self._featuregroup_features + ) + if feature_name not in feature_lookup: + raise FeatureStoreException( + Query.ERROR_MESSAGE_FEATURE_NOT_FOUND.format(feature_name) + ) + feats = feature_lookup[feature_name] + + # if only one feature with this name, return it + if len(feats) == 1: + return self._filter_properties(feats[0], with_prefix, with_featuregroup) + + # if there are multiple features with this name, return the one without prefix + for feat in feats: + if feat[1] is None: + return self._filter_properties(feat, with_prefix, with_featuregroup) + + # there are multiple features with this name and all have prefix, raise exception + raise FeatureStoreException( + Query.ERROR_MESSAGE_FEATURE_AMBIGUOUS.format(feature_name) + ) + + def __getattr__(self, name): + try: + return self.__getitem__(name) + except FeatureStoreException: + raise AttributeError(f"'Query' object has no attribute '{name}'. ") + + def __getitem__(self, name): + if not isinstance(name, str): + raise TypeError( + f"Expected type `str`, got `{type(name)}`. " + "Features are accessible by name." + ) + return self.get_feature(name) diff --git a/python/hsfs/core/arrow_flight_client.py b/python/hsfs/core/arrow_flight_client.py index 2650780e4a..eb1e9ba50f 100644 --- a/python/hsfs/core/arrow_flight_client.py +++ b/python/hsfs/core/arrow_flight_client.py @@ -263,23 +263,25 @@ def _serialize_filter(self, filter, query): def _serialize_feature_name(self, feature, query): fg_id = feature._feature_group_id - if fg_id is None: # featuregroup id not set - # 1) search for feature in all featuregroups - for fg in query.featuregroups: - for feat in fg.features: - if feature.name == feat.name: - fg_id = feat._feature_group_id - break - if fg_id is None: # featuregroup id still not set - # 2) search for feature including query prefixes - feature = query.get_feature(feature.name) - fg_id = feature._feature_group_id + featuregroup = None + if fg_id is None: + # find featuregroup by feature name + feat, featuregroup = query.get_feature( + feature.name, with_featuregroup=True, include_unselected=True + ) + else: + # find featuregroup by featuregroup id + for fg in query.featuregroups: + if fg.id == fg_id: + featuregroup = fg + + if featuregroup is None: raise FeatureStoreException( f"Could not find feature {feature.name} in any of the featuregroups in the query" ) - fg = query.get_featuregroup(fg_id) - fg_name = self._serialize_featuregroup_name(fg) + + fg_name = self._serialize_featuregroup_name(featuregroup) return f"{fg_name}.{feature.name}" def _translate_to_duckdb(self, query, query_str): diff --git a/python/hsfs/core/feature_view_engine.py b/python/hsfs/core/feature_view_engine.py index aa045007ad..ba4010c2ba 100644 --- a/python/hsfs/core/feature_view_engine.py +++ b/python/hsfs/core/feature_view_engine.py @@ -70,14 +70,14 @@ def save(self, feature_view_obj): ) if feature_view_obj.labels: for label_name in feature_view_obj.labels: - feature = feature_view_obj.query.get_feature(label_name) + feature, featuregroup = feature_view_obj.query.get_feature( + label_name, with_featuregroup=True + ) feature_view_obj._features.append( training_dataset_feature.TrainingDatasetFeature( name=feature.name, label=True, - featuregroup=feature_view_obj.query.get_featuregroup( - feature.feature_group_id - ), + featuregroup=featuregroup, ) ) self._transformation_function_engine.attach_transformation_fn(feature_view_obj) diff --git a/python/hsfs/core/transformation_function_engine.py b/python/hsfs/core/transformation_function_engine.py index 580b8add4b..90775dfe77 100644 --- a/python/hsfs/core/transformation_function_engine.py +++ b/python/hsfs/core/transformation_function_engine.py @@ -110,14 +110,14 @@ def attach_transformation_fn(training_dataset_obj=None, feature_view_obj=None): "Online transformations for training dataset labels are not supported." ) - feature = target_obj.query.get_feature(feature_name) + feature, featuregroup = target_obj.query.get_feature( + feature_name, with_featuregroup=True + ) target_obj._features.append( training_dataset_feature.TrainingDatasetFeature( name=feature_name, feature_group_feature_name=feature.name, - featuregroup=target_obj.query.get_featuregroup( - feature.feature_group_id - ), + featuregroup=featuregroup, type=transformation_fn.output_type, label=False, transformation_function=transformation_fn, diff --git a/python/tests/constructor/test_query.py b/python/tests/constructor/test_query.py index 18c531fb09..3e1a2ab594 100644 --- a/python/tests/constructor/test_query.py +++ b/python/tests/constructor/test_query.py @@ -157,7 +157,7 @@ def test_collect_feature(self, mocker, backend_fixtures): mocker.patch("hsfs.engine.get_type", return_value="python") q = query.Query.from_response_json(backend_fixtures["query"]["get"]["response"]) - features = q._collect_features() + features = q.features feature_names = [feature.name for feature in features] expected_feature_names = ["test_left_features", "test_left_features2"] diff --git a/python/tests/core/test_feature_view_engine.py b/python/tests/core/test_feature_view_engine.py index 31a73f78f4..fb471c7b35 100644 --- a/python/tests/core/test_feature_view_engine.py +++ b/python/tests/core/test_feature_view_engine.py @@ -27,7 +27,7 @@ from hsfs.client.exceptions import FeatureStoreException from hsfs.constructor import fs_query from hsfs.core import feature_view_engine -from hsfs.core.feature_view_engine import FeatureViewEngine +from hsfs.constructor.query import Query engine.init("python") fg1 = feature_group.FeatureGroup( @@ -181,7 +181,9 @@ def test_save_time_travel_sub_query(self, mocker): fv = feature_view.FeatureView( name="fv_name", - query=fg1.select_all().join(fg2.select_all().as_of("20221010")), + query=fg1.select_all().join( + fg2.select_all().as_of("20221010"), prefix="fg2" + ), featurestore_id=feature_store_id, ) @@ -296,7 +298,7 @@ def test_save_multiple_label_selected_1(self, mocker): mocker, _query, "label", - FeatureViewEngine.AMBIGUOUS_LABEL_ERROR.format("label"), + Query.ERROR_MESSAGE_FEATURE_AMBIGUOUS.format("label"), ) def test_save_multiple_label_selected_2(self, mocker): @@ -313,6 +315,7 @@ def test_save_multiple_label_selected_3(self, mocker): .join(fg2.select_all(), prefix="fg2_") .join(fg3.select_all(), prefix="fg3_") ) + print(fg2["label"]) self.template_save_label_success(mocker, _query, "fg3_label", fg3.id) def test_save_label_selected_in_join_only_1(self, mocker): @@ -329,7 +332,7 @@ def test_save_label_selected_in_join_only_3(self, mocker): mocker, _query, "none", - FeatureViewEngine.LABEL_NOT_EXIST_ERROR.format("none"), + Query.ERROR_MESSAGE_FEATURE_NOT_FOUND.format("none"), ) def test_save_label_self_join_1(self, mocker): @@ -637,7 +640,7 @@ def testFunction(): query=query, featurestore_id=feature_store_id, ) - fv.schema = query._collect_features() + fv.schema = query.features # Act fv_engine.attach_transformation_function(fv) From 0a4516a9733b137ad83b3227b4fccd24dd94cab3 Mon Sep 17 00:00:00 2001 From: tdoehmen Date: Tue, 16 May 2023 17:40:17 +0200 Subject: [PATCH 12/29] fixed tests, added filter check --- python/hsfs/constructor/query.py | 69 +++++++++++++++++++------ python/hsfs/core/arrow_flight_client.py | 22 +------- 2 files changed, 56 insertions(+), 35 deletions(-) diff --git a/python/hsfs/constructor/query.py b/python/hsfs/constructor/query.py index 59bc3bb16e..3d2436d5c8 100644 --- a/python/hsfs/constructor/query.py +++ b/python/hsfs/constructor/query.py @@ -21,7 +21,6 @@ from hsfs import util, engine, feature_group from hsfs.core import query_constructor_api, storage_connector_api, arrow_flight_client -from hsfs.feature import Feature from hsfs.constructor import join from hsfs.constructor.filter import Filter, Logic from hsfs.client.exceptions import FeatureStoreException @@ -40,6 +39,9 @@ class Query: "Feature name {} is ambiguous. Consider using a prefix." ) ERROR_MESSAGE_FEATURE_NOT_FOUND = "Feature name {} not found in query." + ERROR_MESSAGE_FEATURE_NOT_FOUND_FG = ( + "Feature name {} not found in any of the featuregroups in this query." + ) def __init__( self, @@ -65,19 +67,8 @@ def __init__( self._storage_connector_api = storage_connector_api.StorageConnectorApi( feature_store_id ) - self._populate_collections() - - def _check_join(self, join_obj): - for feat in join_obj.query._left_features: - prefix = join_obj.prefix - if self._feature_exists_in_query(feat.name, prefix): - name = f"{prefix}{feat.name}" if prefix else feat.name - message = ( - Query.ERROR_MESSAGE_CHANGE_PREFIX - if prefix - else Query.ERROR_MESSAGE_USE_PREFIX - ) - raise FeatureStoreException(message.format(name)) + if self._left_feature_group is not None and self._left_features is not None: + self._populate_collections() def _feature_exists_in_query(self, feature_name, prefix=None): existing_features = self._query_features.get(feature_name, []) @@ -309,6 +300,18 @@ def join( return self + def _check_join(self, join_obj): + for feat in join_obj.query._left_features: + prefix = join_obj.prefix + if self._feature_exists_in_query(feat.name, prefix): + name = f"{prefix}{feat.name}" if prefix else feat.name + message = ( + Query.ERROR_MESSAGE_CHANGE_PREFIX + if prefix + else Query.ERROR_MESSAGE_USE_PREFIX + ) + raise FeatureStoreException(message.format(name)) + def as_of( self, wallclock_time: Optional[Union[str, int, datetime, date]] = None, @@ -471,6 +474,8 @@ def filter(self, f: Union[Filter, Logic]): # Returns `Query`. The query object with the applied filter. """ + self._check_filter(f) + if self._filter is None: if isinstance(f, Filter): self._filter = Logic.Single(left_f=f) @@ -487,6 +492,22 @@ def filter(self, f: Union[Filter, Logic]): return self + def _check_filter(self, f): + if f is None: + return + + if isinstance(f, Filter): + self.get_featuregroup_by_feature(f._feature) + elif isinstance(f, Logic): + self._check_filter(f._left_f) + self._check_filter(f._right_f) + self._check_filter(f._left_l) + self._check_filter(f._right_l) + else: + raise TypeError( + "Expected type `Filter` or `Logic`, got `{}`".format(type(f)) + ) + def from_cache_feature_group_only(self): for _query in [join.query for join in self._joins] + [self]: if not isinstance(_query._left_feature_group, feature_group.FeatureGroup): @@ -654,13 +675,31 @@ def features(self, with_prefix=False, with_featuregroup=False): for feat in self._feature_list ] + def get_featuregroup_by_feature(self, feature): + fg_id = feature._feature_group_id + + if fg_id is None: + # find featuregroup by feature name + return self.get_feature( + feature.name, with_featuregroup=True, include_unselected=True + )[1] + else: + # find featuregroup by featuregroup id + for fg in self.featuregroups: + if fg.id == fg_id: + return fg + + raise FeatureStoreException( + Query.ERROR_MESSAGE_FEATURE_NOT_FOUND_FG.format(feature.name) + ) + def get_feature( self, feature_name: str, with_prefix=False, with_featuregroup=False, include_unselected=False, - ) -> Feature: + ): feature_lookup = ( self._query_features if not include_unselected diff --git a/python/hsfs/core/arrow_flight_client.py b/python/hsfs/core/arrow_flight_client.py index eb1e9ba50f..804dc3b084 100644 --- a/python/hsfs/core/arrow_flight_client.py +++ b/python/hsfs/core/arrow_flight_client.py @@ -262,26 +262,8 @@ def _serialize_filter(self, filter, query): } def _serialize_feature_name(self, feature, query): - fg_id = feature._feature_group_id - featuregroup = None - - if fg_id is None: - # find featuregroup by feature name - feat, featuregroup = query.get_feature( - feature.name, with_featuregroup=True, include_unselected=True - ) - else: - # find featuregroup by featuregroup id - for fg in query.featuregroups: - if fg.id == fg_id: - featuregroup = fg - - if featuregroup is None: - raise FeatureStoreException( - f"Could not find feature {feature.name} in any of the featuregroups in the query" - ) - - fg_name = self._serialize_featuregroup_name(featuregroup) + fg = query.get_featuregroup_by_feature(feature) + fg_name = self._serialize_featuregroup_name(fg) return f"{fg_name}.{feature.name}" def _translate_to_duckdb(self, query, query_str): From 6ad85c9f797c9d44ebdff68bc62ed047cf29d32c Mon Sep 17 00:00:00 2001 From: tdoehmen Date: Tue, 16 May 2023 17:52:20 +0200 Subject: [PATCH 13/29] fixed test --- .../tests/core/test_transformation_function_engine.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/python/tests/core/test_transformation_function_engine.py b/python/tests/core/test_transformation_function_engine.py index 7b5b9fdff5..4c1e2d42a0 100644 --- a/python/tests/core/test_transformation_function_engine.py +++ b/python/tests/core/test_transformation_function_engine.py @@ -29,6 +29,7 @@ ) from hsfs.client.exceptions import FeatureStoreException from hsfs.core import transformation_function_engine +from hsfs.constructor.query import Query fg1 = feature_group.FeatureGroup( name="test1", @@ -483,8 +484,8 @@ def testFunction(): query_no_prefix = ( fg1.select_all() - .join(fg2.select(["tf1_name"]), on=["id"]) - .join(fg3.select(["tf_name", "tf1_name"]), on=["id"]) + .join(fg2.select(["tf1_name"]), on=["id"], prefix="fg2") + .join(fg3.select(["tf_name", "tf1_name"]), on=["id"], prefix="fg3") ) tf = transformation_function.TransformationFunction( @@ -513,10 +514,8 @@ def testFunction(): ) # Assert - assert ( - str(e_info.value) - == "Provided feature 'tf1_name' in transformation functions is ambiguous and exists in more than one " - "feature groups.You can provide the feature with the prefix that was specified in the join." + assert str(e_info.value) == Query.ERROR_MESSAGE_FEATURE_AMBIGUOUS.format( + "tf1_name" ) def test_attach_transformation_fn_fv_labels(self, mocker): From 6196ec58a587be102991121964254c5fe95d0915 Mon Sep 17 00:00:00 2001 From: tdoehmen Date: Tue, 16 May 2023 18:30:09 +0200 Subject: [PATCH 14/29] fixed ambiguity issue with filter params --- python/hsfs/constructor/query.py | 40 +++++++------------ python/hsfs/core/feature_view_engine.py | 4 +- .../core/transformation_function_engine.py | 4 +- 3 files changed, 19 insertions(+), 29 deletions(-) diff --git a/python/hsfs/constructor/query.py b/python/hsfs/constructor/query.py index 3d2436d5c8..d42cbeb8be 100644 --- a/python/hsfs/constructor/query.py +++ b/python/hsfs/constructor/query.py @@ -658,31 +658,18 @@ def featuregroups(self): def filters(self): return self._filters - def _filter_properties(self, feat, with_prefix=False, with_featuregroup=False): - if with_prefix and with_featuregroup: - return feat - elif with_prefix: - return (feat[0], feat[1]) - elif with_featuregroup: - return (feat[0], feat[2]) - else: - return feat[0] - @property - def features(self, with_prefix=False, with_featuregroup=False): - return [ - self._filter_properties(feat, with_prefix, with_featuregroup) - for feat in self._feature_list - ] + def features(self): + return [feat[0] for feat in self._feature_list] def get_featuregroup_by_feature(self, feature): fg_id = feature._feature_group_id if fg_id is None: # find featuregroup by feature name - return self.get_feature( - feature.name, with_featuregroup=True, include_unselected=True - )[1] + return self.get_feature_obj( + feature.name, include_unselected=True, resolve_ambiguity=False + )[2] else: # find featuregroup by featuregroup id for fg in self.featuregroups: @@ -693,12 +680,11 @@ def get_featuregroup_by_feature(self, feature): Query.ERROR_MESSAGE_FEATURE_NOT_FOUND_FG.format(feature.name) ) - def get_feature( + def get_feature_obj( self, feature_name: str, - with_prefix=False, - with_featuregroup=False, include_unselected=False, + resolve_ambiguity=True, ): feature_lookup = ( self._query_features @@ -713,18 +699,22 @@ def get_feature( # if only one feature with this name, return it if len(feats) == 1: - return self._filter_properties(feats[0], with_prefix, with_featuregroup) + return feats[0] # if there are multiple features with this name, return the one without prefix - for feat in feats: - if feat[1] is None: - return self._filter_properties(feat, with_prefix, with_featuregroup) + if resolve_ambiguity: + for feat in feats: + if feat[1] is None: + return feat # there are multiple features with this name and all have prefix, raise exception raise FeatureStoreException( Query.ERROR_MESSAGE_FEATURE_AMBIGUOUS.format(feature_name) ) + def get_feature(self, feature_name): + return self.get_feature_obj(feature_name)[0] + def __getattr__(self, name): try: return self.__getitem__(name) diff --git a/python/hsfs/core/feature_view_engine.py b/python/hsfs/core/feature_view_engine.py index ba4010c2ba..1332b8edf3 100644 --- a/python/hsfs/core/feature_view_engine.py +++ b/python/hsfs/core/feature_view_engine.py @@ -70,8 +70,8 @@ def save(self, feature_view_obj): ) if feature_view_obj.labels: for label_name in feature_view_obj.labels: - feature, featuregroup = feature_view_obj.query.get_feature( - label_name, with_featuregroup=True + feature, prefix, featuregroup = feature_view_obj.query.get_feature_obj( + label_name, add_featuregroup=True ) feature_view_obj._features.append( training_dataset_feature.TrainingDatasetFeature( diff --git a/python/hsfs/core/transformation_function_engine.py b/python/hsfs/core/transformation_function_engine.py index 90775dfe77..8bcf6326e6 100644 --- a/python/hsfs/core/transformation_function_engine.py +++ b/python/hsfs/core/transformation_function_engine.py @@ -110,8 +110,8 @@ def attach_transformation_fn(training_dataset_obj=None, feature_view_obj=None): "Online transformations for training dataset labels are not supported." ) - feature, featuregroup = target_obj.query.get_feature( - feature_name, with_featuregroup=True + feature, prefix, featuregroup = target_obj.query.get_feature_obj( + feature_name, add_featuregroup=True ) target_obj._features.append( training_dataset_feature.TrainingDatasetFeature( From 12bcda5966a96bcbc9d33afaa42dcc4a5c4a29ee Mon Sep 17 00:00:00 2001 From: tdoehmen Date: Tue, 16 May 2023 18:44:02 +0200 Subject: [PATCH 15/29] fixed tests and cleanup --- python/hsfs/constructor/query.py | 12 ++++++------ python/hsfs/core/arrow_flight_client.py | 2 +- python/hsfs/core/feature_view_engine.py | 2 +- python/hsfs/core/transformation_function_engine.py | 2 +- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/python/hsfs/constructor/query.py b/python/hsfs/constructor/query.py index d42cbeb8be..7d55c14c84 100644 --- a/python/hsfs/constructor/query.py +++ b/python/hsfs/constructor/query.py @@ -38,6 +38,7 @@ class Query: ERROR_MESSAGE_FEATURE_AMBIGUOUS = ( "Feature name {} is ambiguous. Consider using a prefix." ) + ERROR_MESSAGE_FEATURE_AMBIGUOUS_FG = "Feature name {} is ambiguous. Consider accessing the feature through the FeatureGroup object." ERROR_MESSAGE_FEATURE_NOT_FOUND = "Feature name {} not found in query." ERROR_MESSAGE_FEATURE_NOT_FOUND_FG = ( "Feature name {} not found in any of the featuregroups in this query." @@ -666,12 +667,10 @@ def get_featuregroup_by_feature(self, feature): fg_id = feature._feature_group_id if fg_id is None: - # find featuregroup by feature name return self.get_feature_obj( feature.name, include_unselected=True, resolve_ambiguity=False )[2] else: - # find featuregroup by featuregroup id for fg in self.featuregroups: if fg.id == fg_id: return fg @@ -697,19 +696,20 @@ def get_feature_obj( ) feats = feature_lookup[feature_name] - # if only one feature with this name, return it if len(feats) == 1: return feats[0] - # if there are multiple features with this name, return the one without prefix if resolve_ambiguity: for feat in feats: if feat[1] is None: return feat - # there are multiple features with this name and all have prefix, raise exception + raise FeatureStoreException( + Query.ERROR_MESSAGE_FEATURE_AMBIGUOUS.format(feature_name) + ) + raise FeatureStoreException( - Query.ERROR_MESSAGE_FEATURE_AMBIGUOUS.format(feature_name) + Query.ERROR_MESSAGE_FEATURE_AMBIGUOUS_FG.format(feature_name) ) def get_feature(self, feature_name): diff --git a/python/hsfs/core/arrow_flight_client.py b/python/hsfs/core/arrow_flight_client.py index 804dc3b084..760d6b1062 100644 --- a/python/hsfs/core/arrow_flight_client.py +++ b/python/hsfs/core/arrow_flight_client.py @@ -221,7 +221,7 @@ def create_query_object(self, query, query_str): return query def _serialize_featuregroup_name(self, fg): - return f"{fg._get_project_name()}.{fg.name}_{fg.version}" # featurestore.name_version + return f"{fg._get_project_name()}.{fg.name}_{fg.version}" def _serialize_filter_expression(self, query): if query.filters is None: diff --git a/python/hsfs/core/feature_view_engine.py b/python/hsfs/core/feature_view_engine.py index 1332b8edf3..3e91df0095 100644 --- a/python/hsfs/core/feature_view_engine.py +++ b/python/hsfs/core/feature_view_engine.py @@ -71,7 +71,7 @@ def save(self, feature_view_obj): if feature_view_obj.labels: for label_name in feature_view_obj.labels: feature, prefix, featuregroup = feature_view_obj.query.get_feature_obj( - label_name, add_featuregroup=True + label_name ) feature_view_obj._features.append( training_dataset_feature.TrainingDatasetFeature( diff --git a/python/hsfs/core/transformation_function_engine.py b/python/hsfs/core/transformation_function_engine.py index 8bcf6326e6..935d9d848a 100644 --- a/python/hsfs/core/transformation_function_engine.py +++ b/python/hsfs/core/transformation_function_engine.py @@ -111,7 +111,7 @@ def attach_transformation_fn(training_dataset_obj=None, feature_view_obj=None): ) feature, prefix, featuregroup = target_obj.query.get_feature_obj( - feature_name, add_featuregroup=True + feature_name ) target_obj._features.append( training_dataset_feature.TrainingDatasetFeature( From 3a2068d1c848d44f872eefe9664b9d7463592eba Mon Sep 17 00:00:00 2001 From: tdoehmen Date: Tue, 16 May 2023 18:52:15 +0200 Subject: [PATCH 16/29] check full filter --- python/hsfs/constructor/query.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/hsfs/constructor/query.py b/python/hsfs/constructor/query.py index 7d55c14c84..768813f346 100644 --- a/python/hsfs/constructor/query.py +++ b/python/hsfs/constructor/query.py @@ -475,7 +475,7 @@ def filter(self, f: Union[Filter, Logic]): # Returns `Query`. The query object with the applied filter. """ - self._check_filter(f) + self._check_filter(self._filter & f) if self._filter is None: if isinstance(f, Filter): From ed61159e3a2c9102199dfa8232a213f3df5833a0 Mon Sep 17 00:00:00 2001 From: tdoehmen Date: Tue, 16 May 2023 18:59:46 +0200 Subject: [PATCH 17/29] fix append feature and check filter there as well --- python/hsfs/constructor/query.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/python/hsfs/constructor/query.py b/python/hsfs/constructor/query.py index 768813f346..0f37f346d3 100644 --- a/python/hsfs/constructor/query.py +++ b/python/hsfs/constructor/query.py @@ -313,6 +313,13 @@ def _check_join(self, join_obj): ) raise FeatureStoreException(message.format(name)) + new_filter = None + if self._filters is None: + new_filter = join_obj.query._filter + elif join_obj.query._filter is not None: + new_filter = self._filters & join_obj.query._filter + self._check_filter(new_filter) + def as_of( self, wallclock_time: Optional[Union[str, int, datetime, date]] = None, @@ -631,10 +638,13 @@ def append_feature(self, feature): query.append_feature('feature_name') ``` """ + feature = util.validate_feature(feature) + if self._feature_exists_in_query(feature.name): raise FeatureStoreException( Query.ERROR_MESSAGE_ALREADY_EXISTS.format(feature.name) ) + self._check_filter(self._filters) self._left_features.append(feature) From f2dd9a877d0a35def90e608c2fbde8b20e55bee2 Mon Sep 17 00:00:00 2001 From: tdoehmen Date: Tue, 16 May 2023 19:05:49 +0200 Subject: [PATCH 18/29] don't check merged filter on .filter --- python/hsfs/constructor/query.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/hsfs/constructor/query.py b/python/hsfs/constructor/query.py index 0f37f346d3..16292fa669 100644 --- a/python/hsfs/constructor/query.py +++ b/python/hsfs/constructor/query.py @@ -482,7 +482,7 @@ def filter(self, f: Union[Filter, Logic]): # Returns `Query`. The query object with the applied filter. """ - self._check_filter(self._filter & f) + self._check_filter(f) if self._filter is None: if isinstance(f, Filter): From 1d2538b76dbc9abb95827bc01009be8af44ae474 Mon Sep 17 00:00:00 2001 From: tdoehmen Date: Tue, 16 May 2023 20:16:54 +0200 Subject: [PATCH 19/29] working state --- python/hsfs/constructor/query.py | 98 +++++++++++-------- python/hsfs/core/arrow_flight_client.py | 2 +- python/hsfs/core/feature_view_engine.py | 2 +- .../core/transformation_function_engine.py | 2 +- python/tests/core/test_arrow_flight_client.py | 2 +- 5 files changed, 61 insertions(+), 45 deletions(-) diff --git a/python/hsfs/constructor/query.py b/python/hsfs/constructor/query.py index 16292fa669..4b54e08306 100644 --- a/python/hsfs/constructor/query.py +++ b/python/hsfs/constructor/query.py @@ -24,6 +24,7 @@ from hsfs.constructor import join from hsfs.constructor.filter import Filter, Logic from hsfs.client.exceptions import FeatureStoreException +from hsfs.feature import Feature class Query: @@ -88,19 +89,19 @@ def _add_to_collection(self, feat, prefix, featuregroup, query_feature=True): ) feature_entry = (feat, prefix, featuregroup) collection[feat.name] = collection.get(feat.name, []) + [feature_entry] - if prefix: - name_with_prefix = f"{prefix}{feat.name}" - collection[name_with_prefix] = collection.get(name_with_prefix, []) + [ - feature_entry - ] if query_feature: - self._feature_list.append(feature_entry) + if prefix: + name_with_prefix = f"{prefix}{feat.name}" + collection[name_with_prefix] = collection.get(name_with_prefix, []) + [ + feature_entry + ] + self._query_feature_list.append(feature_entry) def _populate_collections(self): self._featuregroups = {self._left_feature_group} self._query_features = {} + self._query_feature_list = [] self._featuregroup_features = {} - self._feature_list = [] self._filters = self._filter for feat in self._left_features: @@ -200,7 +201,7 @@ def read( and "pandas_types" in read_options and read_options["pandas_types"] ): - schema = self._collect_features() + schema = self.features if len(self.joins) > 0 or None in [f.type for f in schema]: raise ValueError( "Pandas types casting only supported for feature_group.read()/query.select_all()" @@ -301,8 +302,18 @@ def join( return self + def _merge_lookups(self, lookup1, lookup2): + merged_lookup = {} + for lookup1_key in lookup1: + merged_lookup[lookup1_key] = lookup1[lookup1_key] + for lookup2_key in lookup2: + merged_lookup[lookup2_key] = merged_lookup.get(lookup2_key, []) + lookup2[lookup2_key] + return merged_lookup + def _check_join(self, join_obj): - for feat in join_obj.query._left_features: + additional_featuregroup_features = {} + for fg in join_obj.query.featuregroups: + prefix = join_obj.prefix if self._feature_exists_in_query(feat.name, prefix): name = f"{prefix}{feat.name}" if prefix else feat.name @@ -318,6 +329,14 @@ def _check_join(self, join_obj): new_filter = join_obj.query._filter elif join_obj.query._filter is not None: new_filter = self._filters & join_obj.query._filter + + featuregroup_lookup = self._featuregroup_features + if additional_featuregroup_features is not None: + featuregroup_lookup = self._merge_lookups(featuregroup_lookup, + additional_featuregroup_features) + + featuregroups_to_check = self.featuregroups + (additional_featuregroups or []) + self._check_filter(new_filter) def as_of( @@ -505,7 +524,7 @@ def _check_filter(self, f): return if isinstance(f, Filter): - self.get_featuregroup_by_feature(f._feature) + self.find_feature_in_featuregroups(f._feature) elif isinstance(f, Logic): self._check_filter(f._left_f) self._check_filter(f._right_f) @@ -671,59 +690,56 @@ def filters(self): @property def features(self): - return [feat[0] for feat in self._feature_list] + return [feat[0] for feat in self._query_feature_list] + + def find_feature_in_featuregroups(self, feature: Feature, featuregroup_features=None, featuregroups=None): + if featuregroup_features is None: + featuregroup_features = self._featuregroup_features + + if featuregroups is None: + featuregroups = self.featuregroups - def get_featuregroup_by_feature(self, feature): fg_id = feature._feature_group_id + for fg in featuregroups: + if fg.id == fg_id: + return fg - if fg_id is None: - return self.get_feature_obj( - feature.name, include_unselected=True, resolve_ambiguity=False - )[2] - else: - for fg in self.featuregroups: - if fg.id == fg_id: - return fg + featuregroups_found = featuregroup_features.get(feature.name, []) + + if len(featuregroups_found) == 1: + return featuregroups_found[0] + elif len(featuregroups_found) == 0: + raise FeatureStoreException( + Query.ERROR_MESSAGE_FEATURE_NOT_FOUND_FG.format(feature.name) + ) raise FeatureStoreException( - Query.ERROR_MESSAGE_FEATURE_NOT_FOUND_FG.format(feature.name) + Query.ERROR_MESSAGE_FEATURE_AMBIGUOUS_FG.format(feature.name) ) - def get_feature_obj( + def find_feature_in_query( self, feature_name: str, - include_unselected=False, - resolve_ambiguity=True, ): - feature_lookup = ( - self._query_features - if not include_unselected - else self._featuregroup_features - ) - if feature_name not in feature_lookup: + if feature_name not in self._query_features: raise FeatureStoreException( Query.ERROR_MESSAGE_FEATURE_NOT_FOUND.format(feature_name) ) - feats = feature_lookup[feature_name] + feats = self._query_features[feature_name] if len(feats) == 1: return feats[0] - if resolve_ambiguity: - for feat in feats: - if feat[1] is None: - return feat - - raise FeatureStoreException( - Query.ERROR_MESSAGE_FEATURE_AMBIGUOUS.format(feature_name) - ) + for feat in feats: + if feat[1] is None: + return feat raise FeatureStoreException( - Query.ERROR_MESSAGE_FEATURE_AMBIGUOUS_FG.format(feature_name) + Query.ERROR_MESSAGE_FEATURE_AMBIGUOUS.format(feature_name) ) def get_feature(self, feature_name): - return self.get_feature_obj(feature_name)[0] + return self.find_feature_in_query(feature_name)[0] def __getattr__(self, name): try: diff --git a/python/hsfs/core/arrow_flight_client.py b/python/hsfs/core/arrow_flight_client.py index 760d6b1062..8d3201c1da 100644 --- a/python/hsfs/core/arrow_flight_client.py +++ b/python/hsfs/core/arrow_flight_client.py @@ -262,7 +262,7 @@ def _serialize_filter(self, filter, query): } def _serialize_feature_name(self, feature, query): - fg = query.get_featuregroup_by_feature(feature) + fg = query.find_feature_in_featuregroups(feature) fg_name = self._serialize_featuregroup_name(fg) return f"{fg_name}.{feature.name}" diff --git a/python/hsfs/core/feature_view_engine.py b/python/hsfs/core/feature_view_engine.py index 3e91df0095..a62fa6cc5e 100644 --- a/python/hsfs/core/feature_view_engine.py +++ b/python/hsfs/core/feature_view_engine.py @@ -70,7 +70,7 @@ def save(self, feature_view_obj): ) if feature_view_obj.labels: for label_name in feature_view_obj.labels: - feature, prefix, featuregroup = feature_view_obj.query.get_feature_obj( + feature, prefix, featuregroup = feature_view_obj.query.find_feature_in_query( label_name ) feature_view_obj._features.append( diff --git a/python/hsfs/core/transformation_function_engine.py b/python/hsfs/core/transformation_function_engine.py index 935d9d848a..89ead6c119 100644 --- a/python/hsfs/core/transformation_function_engine.py +++ b/python/hsfs/core/transformation_function_engine.py @@ -110,7 +110,7 @@ def attach_transformation_fn(training_dataset_obj=None, feature_view_obj=None): "Online transformations for training dataset labels are not supported." ) - feature, prefix, featuregroup = target_obj.query.get_feature_obj( + feature, prefix, featuregroup = target_obj.query.find_feature_in_query( feature_name ) target_obj._features.append( diff --git a/python/tests/core/test_arrow_flight_client.py b/python/tests/core/test_arrow_flight_client.py index 565f9a91c3..50c34e36a9 100644 --- a/python/tests/core/test_arrow_flight_client.py +++ b/python/tests/core/test_arrow_flight_client.py @@ -258,7 +258,7 @@ def test_construct_query_object(self, mocker, backend_fixtures): test_fg1.select_all() .filter((test_fg1.features[0] > 500) & (test_fg1.features[1] < 0.1)) .join( - test_fg2.filter(test_fg2.features[0] > 500), + test_fg2.filter(Feature("intt") > 500), left_on=["intt"], right_on=["intt"], prefix="test_", From 0bbae9e5011124c885afe8a516eaa0858a1e272a Mon Sep 17 00:00:00 2001 From: tdoehmen Date: Tue, 16 May 2023 20:29:09 +0200 Subject: [PATCH 20/29] removed checks --- python/hsfs/constructor/query.py | 90 ++----------------- python/hsfs/core/feature_view_engine.py | 8 +- python/tests/core/test_arrow_flight_client.py | 2 +- 3 files changed, 11 insertions(+), 89 deletions(-) diff --git a/python/hsfs/constructor/query.py b/python/hsfs/constructor/query.py index 4b54e08306..98b8e6230f 100644 --- a/python/hsfs/constructor/query.py +++ b/python/hsfs/constructor/query.py @@ -28,14 +28,7 @@ class Query: - ERROR_MESSAGE_ALREADY_EXISTS = "Feature name {} already exists in query." - ERROR_MESSAGE_CHANGE_PREFIX = ( - "Feature name {} already exists in query. Consider changing the prefix." - ) - ERROR_MESSAGE_USE_PREFIX = ( - "Feature name {} already exists in query. Consider using a prefix." - ) - ERROR_MESSAGE_FEATURE_NOT_UNIQUE = "Feature name {} is not unique." + ERROR_MESSAGE_FEATURE_AMBIGUOUS = ( "Feature name {} is ambiguous. Consider using a prefix." ) @@ -105,10 +98,6 @@ def _populate_collections(self): self._filters = self._filter for feat in self._left_features: - if self._feature_exists_in_query(feat.name): - raise FeatureStoreException( - Query.ERROR_MESSAGE_FEATURE_NOT_UNIQUE.format(feat.name) - ) self._add_to_collection(feat, None, self._left_feature_group) for feat in self._left_feature_group.features: self._add_to_collection( @@ -294,51 +283,12 @@ def join( sub_query, on, left_on, right_on, join_type.upper(), prefix ) - self._check_join(new_join) - self._joins.append(new_join) self._populate_collections() return self - def _merge_lookups(self, lookup1, lookup2): - merged_lookup = {} - for lookup1_key in lookup1: - merged_lookup[lookup1_key] = lookup1[lookup1_key] - for lookup2_key in lookup2: - merged_lookup[lookup2_key] = merged_lookup.get(lookup2_key, []) + lookup2[lookup2_key] - return merged_lookup - - def _check_join(self, join_obj): - additional_featuregroup_features = {} - for fg in join_obj.query.featuregroups: - - prefix = join_obj.prefix - if self._feature_exists_in_query(feat.name, prefix): - name = f"{prefix}{feat.name}" if prefix else feat.name - message = ( - Query.ERROR_MESSAGE_CHANGE_PREFIX - if prefix - else Query.ERROR_MESSAGE_USE_PREFIX - ) - raise FeatureStoreException(message.format(name)) - - new_filter = None - if self._filters is None: - new_filter = join_obj.query._filter - elif join_obj.query._filter is not None: - new_filter = self._filters & join_obj.query._filter - - featuregroup_lookup = self._featuregroup_features - if additional_featuregroup_features is not None: - featuregroup_lookup = self._merge_lookups(featuregroup_lookup, - additional_featuregroup_features) - - featuregroups_to_check = self.featuregroups + (additional_featuregroups or []) - - self._check_filter(new_filter) - def as_of( self, wallclock_time: Optional[Union[str, int, datetime, date]] = None, @@ -501,8 +451,6 @@ def filter(self, f: Union[Filter, Logic]): # Returns `Query`. The query object with the applied filter. """ - self._check_filter(f) - if self._filter is None: if isinstance(f, Filter): self._filter = Logic.Single(left_f=f) @@ -519,22 +467,6 @@ def filter(self, f: Union[Filter, Logic]): return self - def _check_filter(self, f): - if f is None: - return - - if isinstance(f, Filter): - self.find_feature_in_featuregroups(f._feature) - elif isinstance(f, Logic): - self._check_filter(f._left_f) - self._check_filter(f._right_f) - self._check_filter(f._left_l) - self._check_filter(f._right_l) - else: - raise TypeError( - "Expected type `Filter` or `Logic`, got `{}`".format(type(f)) - ) - def from_cache_feature_group_only(self): for _query in [join.query for join in self._joins] + [self]: if not isinstance(_query._left_feature_group, feature_group.FeatureGroup): @@ -659,12 +591,6 @@ def append_feature(self, feature): """ feature = util.validate_feature(feature) - if self._feature_exists_in_query(feature.name): - raise FeatureStoreException( - Query.ERROR_MESSAGE_ALREADY_EXISTS.format(feature.name) - ) - self._check_filter(self._filters) - self._left_features.append(feature) self._populate_collections() @@ -692,22 +618,16 @@ def filters(self): def features(self): return [feat[0] for feat in self._query_feature_list] - def find_feature_in_featuregroups(self, feature: Feature, featuregroup_features=None, featuregroups=None): - if featuregroup_features is None: - featuregroup_features = self._featuregroup_features - - if featuregroups is None: - featuregroups = self.featuregroups - + def find_feature_in_featuregroups(self, feature: Feature): fg_id = feature._feature_group_id - for fg in featuregroups: + for fg in self.featuregroups: if fg.id == fg_id: return fg - featuregroups_found = featuregroup_features.get(feature.name, []) + featuregroups_found = self._featuregroup_features.get(feature.name, []) if len(featuregroups_found) == 1: - return featuregroups_found[0] + return featuregroups_found[0][2] elif len(featuregroups_found) == 0: raise FeatureStoreException( Query.ERROR_MESSAGE_FEATURE_NOT_FOUND_FG.format(feature.name) diff --git a/python/hsfs/core/feature_view_engine.py b/python/hsfs/core/feature_view_engine.py index a62fa6cc5e..92671445ef 100644 --- a/python/hsfs/core/feature_view_engine.py +++ b/python/hsfs/core/feature_view_engine.py @@ -70,9 +70,11 @@ def save(self, feature_view_obj): ) if feature_view_obj.labels: for label_name in feature_view_obj.labels: - feature, prefix, featuregroup = feature_view_obj.query.find_feature_in_query( - label_name - ) + ( + feature, + prefix, + featuregroup, + ) = feature_view_obj.query.find_feature_in_query(label_name) feature_view_obj._features.append( training_dataset_feature.TrainingDatasetFeature( name=feature.name, diff --git a/python/tests/core/test_arrow_flight_client.py b/python/tests/core/test_arrow_flight_client.py index 50c34e36a9..565f9a91c3 100644 --- a/python/tests/core/test_arrow_flight_client.py +++ b/python/tests/core/test_arrow_flight_client.py @@ -258,7 +258,7 @@ def test_construct_query_object(self, mocker, backend_fixtures): test_fg1.select_all() .filter((test_fg1.features[0] > 500) & (test_fg1.features[1] < 0.1)) .join( - test_fg2.filter(Feature("intt") > 500), + test_fg2.filter(test_fg2.features[0] > 500), left_on=["intt"], right_on=["intt"], prefix="test_", From bdd4beca24365d0dd27f330f5bda230b6f02cc84 Mon Sep 17 00:00:00 2001 From: tdoehmen Date: Tue, 16 May 2023 20:35:00 +0200 Subject: [PATCH 21/29] some renaming --- python/hsfs/constructor/query.py | 17 +++-------------- python/hsfs/core/arrow_flight_client.py | 2 +- python/hsfs/core/feature_view_engine.py | 2 +- .../hsfs/core/transformation_function_engine.py | 2 +- 4 files changed, 6 insertions(+), 17 deletions(-) diff --git a/python/hsfs/constructor/query.py b/python/hsfs/constructor/query.py index 98b8e6230f..ca896a024e 100644 --- a/python/hsfs/constructor/query.py +++ b/python/hsfs/constructor/query.py @@ -65,17 +65,6 @@ def __init__( if self._left_feature_group is not None and self._left_features is not None: self._populate_collections() - def _feature_exists_in_query(self, feature_name, prefix=None): - existing_features = self._query_features.get(feature_name, []) - if any([feature[1] == prefix for feature in existing_features]): - return True - if prefix: - name_with_prefix = f"{prefix}{feature_name}" - existing_features = self._query_features.get(name_with_prefix, []) - return any([feature[1] is None for feature in existing_features]) - - return False - def _add_to_collection(self, feat, prefix, featuregroup, query_feature=True): collection = ( self._query_features if query_feature else self._featuregroup_features @@ -618,7 +607,7 @@ def filters(self): def features(self): return [feat[0] for feat in self._query_feature_list] - def find_feature_in_featuregroups(self, feature: Feature): + def get_featuregroup_by_feature(self, feature: Feature): fg_id = feature._feature_group_id for fg in self.featuregroups: if fg.id == fg_id: @@ -637,7 +626,7 @@ def find_feature_in_featuregroups(self, feature: Feature): Query.ERROR_MESSAGE_FEATURE_AMBIGUOUS_FG.format(feature.name) ) - def find_feature_in_query( + def get_feature_by_name( self, feature_name: str, ): @@ -659,7 +648,7 @@ def find_feature_in_query( ) def get_feature(self, feature_name): - return self.find_feature_in_query(feature_name)[0] + return self.get_feature_by_name(feature_name)[0] def __getattr__(self, name): try: diff --git a/python/hsfs/core/arrow_flight_client.py b/python/hsfs/core/arrow_flight_client.py index 8d3201c1da..760d6b1062 100644 --- a/python/hsfs/core/arrow_flight_client.py +++ b/python/hsfs/core/arrow_flight_client.py @@ -262,7 +262,7 @@ def _serialize_filter(self, filter, query): } def _serialize_feature_name(self, feature, query): - fg = query.find_feature_in_featuregroups(feature) + fg = query.get_featuregroup_by_feature(feature) fg_name = self._serialize_featuregroup_name(fg) return f"{fg_name}.{feature.name}" diff --git a/python/hsfs/core/feature_view_engine.py b/python/hsfs/core/feature_view_engine.py index 92671445ef..407fd2e187 100644 --- a/python/hsfs/core/feature_view_engine.py +++ b/python/hsfs/core/feature_view_engine.py @@ -74,7 +74,7 @@ def save(self, feature_view_obj): feature, prefix, featuregroup, - ) = feature_view_obj.query.find_feature_in_query(label_name) + ) = feature_view_obj.query.get_feature_by_name(label_name) feature_view_obj._features.append( training_dataset_feature.TrainingDatasetFeature( name=feature.name, diff --git a/python/hsfs/core/transformation_function_engine.py b/python/hsfs/core/transformation_function_engine.py index 89ead6c119..8137e31e84 100644 --- a/python/hsfs/core/transformation_function_engine.py +++ b/python/hsfs/core/transformation_function_engine.py @@ -110,7 +110,7 @@ def attach_transformation_fn(training_dataset_obj=None, feature_view_obj=None): "Online transformations for training dataset labels are not supported." ) - feature, prefix, featuregroup = target_obj.query.find_feature_in_query( + feature, prefix, featuregroup = target_obj.query.get_feature_by_name( feature_name ) target_obj._features.append( From dd7b49c75644e72a4b6735845811b01f195cbeb9 Mon Sep 17 00:00:00 2001 From: tdoehmen Date: Tue, 16 May 2023 20:58:05 +0200 Subject: [PATCH 22/29] made lookup function private/protected --- python/hsfs/constructor/query.py | 12 +++++------- python/hsfs/core/arrow_flight_client.py | 2 +- python/hsfs/core/feature_view_engine.py | 2 +- python/hsfs/core/transformation_function_engine.py | 2 +- 4 files changed, 8 insertions(+), 10 deletions(-) diff --git a/python/hsfs/constructor/query.py b/python/hsfs/constructor/query.py index ca896a024e..b688d2b59f 100644 --- a/python/hsfs/constructor/query.py +++ b/python/hsfs/constructor/query.py @@ -268,12 +268,10 @@ def join( # Returns `Query`: A new Query object representing the join. """ - new_join = join.Join( - sub_query, on, left_on, right_on, join_type.upper(), prefix + self._joins.append( + join.Join(sub_query, on, left_on, right_on, join_type.upper(), prefix) ) - self._joins.append(new_join) - self._populate_collections() return self @@ -607,7 +605,7 @@ def filters(self): def features(self): return [feat[0] for feat in self._query_feature_list] - def get_featuregroup_by_feature(self, feature: Feature): + def _get_featuregroup_by_feature(self, feature: Feature): fg_id = feature._feature_group_id for fg in self.featuregroups: if fg.id == fg_id: @@ -626,7 +624,7 @@ def get_featuregroup_by_feature(self, feature: Feature): Query.ERROR_MESSAGE_FEATURE_AMBIGUOUS_FG.format(feature.name) ) - def get_feature_by_name( + def _get_feature_by_name( self, feature_name: str, ): @@ -648,7 +646,7 @@ def get_feature_by_name( ) def get_feature(self, feature_name): - return self.get_feature_by_name(feature_name)[0] + return self._get_feature_by_name(feature_name)[0] def __getattr__(self, name): try: diff --git a/python/hsfs/core/arrow_flight_client.py b/python/hsfs/core/arrow_flight_client.py index 760d6b1062..8159985774 100644 --- a/python/hsfs/core/arrow_flight_client.py +++ b/python/hsfs/core/arrow_flight_client.py @@ -262,7 +262,7 @@ def _serialize_filter(self, filter, query): } def _serialize_feature_name(self, feature, query): - fg = query.get_featuregroup_by_feature(feature) + fg = query._get_featuregroup_by_feature(feature) fg_name = self._serialize_featuregroup_name(fg) return f"{fg_name}.{feature.name}" diff --git a/python/hsfs/core/feature_view_engine.py b/python/hsfs/core/feature_view_engine.py index 407fd2e187..3eb7ce8325 100644 --- a/python/hsfs/core/feature_view_engine.py +++ b/python/hsfs/core/feature_view_engine.py @@ -74,7 +74,7 @@ def save(self, feature_view_obj): feature, prefix, featuregroup, - ) = feature_view_obj.query.get_feature_by_name(label_name) + ) = feature_view_obj.query._get_feature_by_name(label_name) feature_view_obj._features.append( training_dataset_feature.TrainingDatasetFeature( name=feature.name, diff --git a/python/hsfs/core/transformation_function_engine.py b/python/hsfs/core/transformation_function_engine.py index 8137e31e84..dc8b0fd243 100644 --- a/python/hsfs/core/transformation_function_engine.py +++ b/python/hsfs/core/transformation_function_engine.py @@ -110,7 +110,7 @@ def attach_transformation_fn(training_dataset_obj=None, feature_view_obj=None): "Online transformations for training dataset labels are not supported." ) - feature, prefix, featuregroup = target_obj.query.get_feature_by_name( + feature, prefix, featuregroup = target_obj.query._get_feature_by_name( feature_name ) target_obj._features.append( From 042cde5ed065dcf1906b885e19d3067425570992 Mon Sep 17 00:00:00 2001 From: tdoehmen Date: Wed, 17 May 2023 11:12:40 +0200 Subject: [PATCH 23/29] added docs --- python/hsfs/constructor/query.py | 122 ++++++++++-------- python/hsfs/core/feature_view_engine.py | 2 +- python/tests/core/test_feature_view_engine.py | 12 +- 3 files changed, 75 insertions(+), 61 deletions(-) diff --git a/python/hsfs/constructor/query.py b/python/hsfs/constructor/query.py index b688d2b59f..9fd587ae67 100644 --- a/python/hsfs/constructor/query.py +++ b/python/hsfs/constructor/query.py @@ -112,6 +112,46 @@ def _populate_collections(self): query_feature=False, ) + def _get_featuregroup_by_feature(self, feature: Feature): + fg_id = feature._feature_group_id + for fg in self.featuregroups: + if fg.id == fg_id: + return fg + + featuregroups_found = self._featuregroup_features.get(feature.name, []) + + if len(featuregroups_found) == 1: + return featuregroups_found[0][2] + elif len(featuregroups_found) == 0: + raise FeatureStoreException( + Query.ERROR_MESSAGE_FEATURE_NOT_FOUND_FG.format(feature.name) + ) + + raise FeatureStoreException( + Query.ERROR_MESSAGE_FEATURE_AMBIGUOUS_FG.format(feature.name) + ) + + def _get_feature_by_name( + self, + feature_name: str, + ): + if feature_name not in self._query_features: + raise FeatureStoreException( + Query.ERROR_MESSAGE_FEATURE_NOT_FOUND.format(feature_name) + ) + feats = self._query_features[feature_name] + + if len(feats) == 1: + return feats[0] + + for feat in feats: + if feat[1] is None: + return feat + + raise FeatureStoreException( + Query.ERROR_MESSAGE_FEATURE_AMBIGUOUS.format(feature_name) + ) + def _prep_read(self, online, read_options): fs_query = self._query_constructor_api.construct_query(self) sql_query = self._to_string(fs_query, online) @@ -454,12 +494,6 @@ def filter(self, f: Union[Filter, Logic]): return self - def from_cache_feature_group_only(self): - for _query in [join.query for join in self._joins] + [self]: - if not isinstance(_query._left_feature_group, feature_group.FeatureGroup): - return False - return True - def json(self): return json.dumps(self, cls=util.FeatureStoreEncoder) @@ -510,7 +544,7 @@ def _hopsworks_json(cls, json_dict): It does not fully deserialize the message as the usecase is to send it straight back to Hopsworks to read the content of the query - Args: + Arguments: json_dict (str): a json string containing a query object Returns: @@ -550,10 +584,12 @@ def __str__(self): @property def left_feature_group_start_time(self): + """Start time of time travel for the left feature group.""" return self._left_feature_group_start_time @property def left_feature_group_end_time(self): + """End time of time travel for the left feature group.""" return self._left_feature_group_end_time @left_feature_group_start_time.setter @@ -566,15 +602,10 @@ def left_feature_group_end_time(self, left_feature_group_end_time): def append_feature(self, feature): """ - !!! example - ```python - fg1 = fs.get_feature_group("...") - fg2 = fs.get_feature_group("...") + Append a feature to the query. - query = fg1.select_all().join(fg2.select_all()) - - query.append_feature('feature_name') - ``` + # Arguments + feature: `[str, Feature]`. Name of the feature to append to the query. """ feature = util.validate_feature(feature) @@ -583,69 +614,52 @@ def append_feature(self, feature): self._populate_collections() def is_time_travel(self): + """Query contains time travel""" return ( self.left_feature_group_start_time or self.left_feature_group_end_time or any([_join.query.is_time_travel() for _join in self._joins]) ) + def is_cache_feature_group_only(self): + """Query contains only cached feature groups""" + return any( + [ + not isinstance(fg, feature_group.FeatureGroup) + for fg in self.featuregroups + ] + ) + @property def joins(self): + """List of joins in the query""" return self._joins @property def featuregroups(self): + """List of feature groups used in the query""" return list(self._featuregroups) @property def filters(self): + """All filters used in the query""" return self._filters @property def features(self): + """List of all features in the query""" return [feat[0] for feat in self._query_feature_list] - def _get_featuregroup_by_feature(self, feature: Feature): - fg_id = feature._feature_group_id - for fg in self.featuregroups: - if fg.id == fg_id: - return fg - - featuregroups_found = self._featuregroup_features.get(feature.name, []) - - if len(featuregroups_found) == 1: - return featuregroups_found[0][2] - elif len(featuregroups_found) == 0: - raise FeatureStoreException( - Query.ERROR_MESSAGE_FEATURE_NOT_FOUND_FG.format(feature.name) - ) - - raise FeatureStoreException( - Query.ERROR_MESSAGE_FEATURE_AMBIGUOUS_FG.format(feature.name) - ) - - def _get_feature_by_name( - self, - feature_name: str, - ): - if feature_name not in self._query_features: - raise FeatureStoreException( - Query.ERROR_MESSAGE_FEATURE_NOT_FOUND.format(feature_name) - ) - feats = self._query_features[feature_name] - - if len(feats) == 1: - return feats[0] - - for feat in feats: - if feat[1] is None: - return feat + def get_feature(self, feature_name): + """ + Get a feature by name. - raise FeatureStoreException( - Query.ERROR_MESSAGE_FEATURE_AMBIGUOUS.format(feature_name) - ) + # Arguments + feature_name: `str`. Name of the feature to get. - def get_feature(self, feature_name): + # Returns + `Feature`. Feature object. + """ return self._get_feature_by_name(feature_name)[0] def __getattr__(self, name): diff --git a/python/hsfs/core/feature_view_engine.py b/python/hsfs/core/feature_view_engine.py index 3eb7ce8325..bfa9441e2c 100644 --- a/python/hsfs/core/feature_view_engine.py +++ b/python/hsfs/core/feature_view_engine.py @@ -558,7 +558,7 @@ def get_parent_feature_groups(self, feature_view_obj): def _check_feature_group_accessibility(self, feature_view_obj): if ( engine.get_type() == "python" or engine.get_type() == "hive" - ) and not feature_view_obj.query.from_cache_feature_group_only(): + ) and not feature_view_obj.query.is_cache_feature_group_only(): raise NotImplementedError( "Python kernel can only read from cached feature group." " Please use `feature_view.create_training_data` instead." diff --git a/python/tests/core/test_feature_view_engine.py b/python/tests/core/test_feature_view_engine.py index fb471c7b35..e8ddb35259 100644 --- a/python/tests/core/test_feature_view_engine.py +++ b/python/tests/core/test_feature_view_engine.py @@ -1914,7 +1914,7 @@ def test_check_feature_group_accessibility(self, mocker): labels=[], ) - mock_constructor_query.from_cache_feature_group_only.return_value = False + mock_constructor_query.is_cache_feature_group_only.return_value = False # Act fv_engine._check_feature_group_accessibility(feature_view_obj=fv) @@ -1942,7 +1942,7 @@ def test_check_feature_group_accessibility_cache_feature_group(self, mocker): labels=[], ) - mock_constructor_query.from_cache_feature_group_only.return_value = True + mock_constructor_query.is_cache_feature_group_only.return_value = True # Act fv_engine._check_feature_group_accessibility(feature_view_obj=fv) @@ -1970,7 +1970,7 @@ def test_check_feature_group_accessibility_get_type_python(self, mocker): labels=[], ) - mock_constructor_query.from_cache_feature_group_only.return_value = False + mock_constructor_query.is_cache_feature_group_only.return_value = False mock_engine_get_type.return_value = "python" # Act @@ -2004,7 +2004,7 @@ def test_check_feature_group_accessibility_get_type_hive(self, mocker): labels=[], ) - mock_constructor_query.from_cache_feature_group_only.return_value = False + mock_constructor_query.is_cache_feature_group_only.return_value = False mock_engine_get_type.return_value = "hive" # Act @@ -2040,7 +2040,7 @@ def test_check_feature_group_accessibility_cache_feature_group_get_type_python( labels=[], ) - mock_constructor_query.from_cache_feature_group_only.return_value = True + mock_constructor_query.is_cache_feature_group_only.return_value = True mock_engine_get_type.return_value = "python" # Act @@ -2071,7 +2071,7 @@ def test_check_feature_group_accessibility_cache_feature_group_get_type_hive( labels=[], ) - mock_constructor_query.from_cache_feature_group_only.return_value = True + mock_constructor_query.is_cache_feature_group_only.return_value = True mock_engine_get_type.return_value = "hive" # Act From bbc2ee5ad04678f7687d12b7f31e5bca0efdb8d3 Mon Sep 17 00:00:00 2001 From: tdoehmen Date: Wed, 17 May 2023 11:33:23 +0200 Subject: [PATCH 24/29] fixed cached check --- python/hsfs/constructor/query.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/python/hsfs/constructor/query.py b/python/hsfs/constructor/query.py index 9fd587ae67..8ddda454b0 100644 --- a/python/hsfs/constructor/query.py +++ b/python/hsfs/constructor/query.py @@ -623,11 +623,8 @@ def is_time_travel(self): def is_cache_feature_group_only(self): """Query contains only cached feature groups""" - return any( - [ - not isinstance(fg, feature_group.FeatureGroup) - for fg in self.featuregroups - ] + return all( + [isinstance(fg, feature_group.FeatureGroup) for fg in self.featuregroups] ) @property From f1a31333cc1eeca83b09d8542a98242ad29cbded Mon Sep 17 00:00:00 2001 From: tdoehmen Date: Wed, 17 May 2023 15:41:38 +0200 Subject: [PATCH 25/29] updated error messages, reverted transformation function test changes --- python/hsfs/constructor/query.py | 21 ++++++++++++------- .../test_transformation_function_engine.py | 6 +++--- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/python/hsfs/constructor/query.py b/python/hsfs/constructor/query.py index 8ddda454b0..c19c7947d7 100644 --- a/python/hsfs/constructor/query.py +++ b/python/hsfs/constructor/query.py @@ -30,12 +30,19 @@ class Query: ERROR_MESSAGE_FEATURE_AMBIGUOUS = ( - "Feature name {} is ambiguous. Consider using a prefix." + "Provided feature name '{}' is ambiguous and exists in more than one feature group. " + "Consider prepending the prefix specified in the join." + ) + ERROR_MESSAGE_FEATURE_AMBIGUOUS_FG = ( + "Feature name '{}' is ambiguous and exists in more than one feature group. " + "Consider accessing the feature through the feature group object when specifying the query." + ) + ERROR_MESSAGE_FEATURE_NOT_FOUND = ( + "Feature name '{}' could not found be found in query." ) - ERROR_MESSAGE_FEATURE_AMBIGUOUS_FG = "Feature name {} is ambiguous. Consider accessing the feature through the FeatureGroup object." - ERROR_MESSAGE_FEATURE_NOT_FOUND = "Feature name {} not found in query." ERROR_MESSAGE_FEATURE_NOT_FOUND_FG = ( - "Feature name {} not found in any of the featuregroups in this query." + "Feature name '{}' could not be found in " + "any of the featuregroups in this query." ) def __init__( @@ -144,9 +151,9 @@ def _get_feature_by_name( if len(feats) == 1: return feats[0] - for feat in feats: - if feat[1] is None: - return feat + feats_without_prefix = [feat for feat in feats if feat[1] is None] + if len(feats_without_prefix) == 1: + return feats_without_prefix[0] raise FeatureStoreException( Query.ERROR_MESSAGE_FEATURE_AMBIGUOUS.format(feature_name) diff --git a/python/tests/core/test_transformation_function_engine.py b/python/tests/core/test_transformation_function_engine.py index 4c1e2d42a0..fe4e527b15 100644 --- a/python/tests/core/test_transformation_function_engine.py +++ b/python/tests/core/test_transformation_function_engine.py @@ -484,8 +484,8 @@ def testFunction(): query_no_prefix = ( fg1.select_all() - .join(fg2.select(["tf1_name"]), on=["id"], prefix="fg2") - .join(fg3.select(["tf_name", "tf1_name"]), on=["id"], prefix="fg3") + .join(fg2.select(["tf1_name"]), on=["id"]) + .join(fg3.select(["tf_name", "tf1_name"]), on=["id"]) ) tf = transformation_function.TransformationFunction( @@ -515,7 +515,7 @@ def testFunction(): # Assert assert str(e_info.value) == Query.ERROR_MESSAGE_FEATURE_AMBIGUOUS.format( - "tf1_name" + "tf_name" ) def test_attach_transformation_fn_fv_labels(self, mocker): From 0a09939ee5453c9712fdf43d701f431b0d5dc22c Mon Sep 17 00:00:00 2001 From: tdoehmen Date: Wed, 17 May 2023 19:08:04 +0200 Subject: [PATCH 26/29] added tests --- python/hsfs/constructor/query.py | 2 + python/tests/constructor/test_query.py | 216 ++++++++++++++++++++++++- 2 files changed, 212 insertions(+), 6 deletions(-) diff --git a/python/hsfs/constructor/query.py b/python/hsfs/constructor/query.py index c19c7947d7..940ff696a2 100644 --- a/python/hsfs/constructor/query.py +++ b/python/hsfs/constructor/query.py @@ -620,6 +620,8 @@ def append_feature(self, feature): self._populate_collections() + return self + def is_time_travel(self): """Query contains time travel""" return ( diff --git a/python/tests/constructor/test_query.py b/python/tests/constructor/test_query.py index 3e1a2ab594..0e4b1b4cd5 100644 --- a/python/tests/constructor/test_query.py +++ b/python/tests/constructor/test_query.py @@ -13,13 +13,58 @@ # See the License for the specific language governing permissions and # limitations under the License. # - - +import pytest from hsfs import feature_group, feature from hsfs.constructor import query, join, filter +from hsfs.client.exceptions import FeatureStoreException class TestQuery: + fg1 = feature_group.FeatureGroup( + name="test1", + version=1, + featurestore_id=99, + primary_key=[], + partition_key=[], + features=[ + feature.Feature("id", feature_group_id=11), + feature.Feature("label", feature_group_id=11), + feature.Feature("tf_name", feature_group_id=11), + ], + id=11, + stream=False, + ) + + fg2 = feature_group.FeatureGroup( + name="test2", + version=1, + featurestore_id=99, + primary_key=[], + partition_key=[], + features=[ + feature.Feature("id", feature_group_id=12), + feature.Feature("tf1_name", feature_group_id=12), + ], + id=12, + stream=False, + ) + + fg3 = feature_group.FeatureGroup( + name="test3", + version=1, + featurestore_id=99, + primary_key=[], + partition_key=[], + features=[ + feature.Feature("id", feature_group_id=13), + feature.Feature("tf_name", feature_group_id=13), + feature.Feature("tf1_name", feature_group_id=13), + feature.Feature("tf3_name", feature_group_id=13), + ], + id=13, + stream=False, + ) + def test_from_response_json_python(self, mocker, backend_fixtures): # Arrange mocker.patch("hsfs.engine.get_type", return_value="python") @@ -155,11 +200,170 @@ def test_as_of(self, mocker, backend_fixtures): def test_collect_feature(self, mocker, backend_fixtures): mocker.patch("hsfs.engine.get_type", return_value="python") - q = query.Query.from_response_json(backend_fixtures["query"]["get"]["response"]) + + # Act + q = TestQuery.fg1.select(["label"]).join(TestQuery.fg2.select(["tf1_name"])) features = q.features feature_names = [feature.name for feature in features] - expected_feature_names = ["test_left_features", "test_left_features2"] - assert len(feature_names) == len(expected_feature_names) - assert feature_names == expected_feature_names + expected_features = [TestQuery.fg1["label"], TestQuery.fg2["tf1_name"]] + expected_feature_names = ["label", "tf1_name"] + + # Assert + assert len(feature_names) == len(expected_features) + for i, feat in enumerate(expected_features): + assert feat.name == expected_feature_names[i] + + def test_collect_featuregroups(self, mocker, backend_fixtures): + mocker.patch("hsfs.engine.get_type", return_value="python") + + # Act + q = ( + TestQuery.fg1.select(["label"]) + .join(TestQuery.fg2.select(["tf1_name"])) + .join(TestQuery.fg2.select(["tf1_name"])) + ) + expected_featuregroups = [TestQuery.fg1, TestQuery.fg2] + + # Assert + assert len(q.featuregroups) == len(expected_featuregroups) + assert set(q.featuregroups) == set(expected_featuregroups) + + def test_append_feature(self, mocker, backend_fixtures): + mocker.patch("hsfs.engine.get_type", return_value="python") + + # Act + q = TestQuery.fg1.select([TestQuery.fg1["label"]]).append_feature("id") + expected_features = [TestQuery.fg1["label"], feature.Feature("id")] + + # Assert + assert len(q.features) == len(expected_features) + for i, feat in enumerate(expected_features): + assert feat.name == expected_features[i].name + + def test_get_feature(self, mocker, backend_fixtures): + mocker.patch("hsfs.engine.get_type", return_value="python") + + # Act + q = TestQuery.fg1.select(TestQuery.fg1["label"]).join( + TestQuery.fg2.select(TestQuery.fg2["tf1_name"]) + ) + + # Assert + assert id(q.get_feature("tf1_name")) == id( + TestQuery.fg2.get_feature("tf1_name") + ) + + def test_get_index(self, mocker, backend_fixtures): + mocker.patch("hsfs.engine.get_type", return_value="python") + + # Act + q = TestQuery.fg1.select(TestQuery.fg1["label"]).join( + TestQuery.fg2.select(TestQuery.fg2["tf1_name"]) + ) + + # Assert + assert id(q.get_feature("tf1_name")) == id( + TestQuery.fg2.get_feature("tf1_name") + ) + + def test_get_attr(self, mocker, backend_fixtures): + mocker.patch("hsfs.engine.get_type", return_value="python") + + # Act + q = TestQuery.fg1.select(TestQuery.fg1["label"]).join( + TestQuery.fg2.select(TestQuery.fg2["tf1_name"]) + ) + + # Assert + assert id(q.get_feature("tf1_name")) == id( + TestQuery.fg2.get_feature("tf1_name") + ) + + def test_get_feature_by_name(self, mocker, backend_fixtures): + mocker.patch("hsfs.engine.get_type", return_value="python") + + # Act + q = ( + TestQuery.fg1.select_all() + .join(TestQuery.fg2.select_all()) + .join(TestQuery.fg3.select_all()) + ) + + # Assert + assert ( + q._get_feature_by_name("tf3_name")[0].name == TestQuery.fg3["tf3_name"].name + ) + assert ( + q._get_feature_by_name("tf3_name")[0].feature_group_id + == TestQuery.fg3["tf3_name"].feature_group_id + ) + + def test_get_feature_by_name_prefix(self, mocker, backend_fixtures): + mocker.patch("hsfs.engine.get_type", return_value="python") + + # Act + q = ( + TestQuery.fg1.select_all() + .join(TestQuery.fg2.select_all()) + .join(TestQuery.fg3.select_all(), prefix="fg3") + ) + + # Assert + assert ( + q._get_feature_by_name("tf_name")[0].name == TestQuery.fg1["tf_name"].name + ) + assert ( + q._get_feature_by_name("tf_name")[0].feature_group_id + == TestQuery.fg1["tf_name"].feature_group_id + ) + + def test_get_feature_by_name_ambiguous(self, mocker, backend_fixtures): + mocker.patch("hsfs.engine.get_type", return_value="python") + + # Act + q = ( + TestQuery.fg1.select_all() + .join(TestQuery.fg2.select_all()) + .join(TestQuery.fg3.select_all(), prefix="fg3") + ) + + # Assert + with pytest.raises(FeatureStoreException) as e_info: + q._get_feature_by_name("id")[0] + + assert str(e_info.value) == query.Query.ERROR_MESSAGE_FEATURE_AMBIGUOUS.format( + "id" + ) + + def test_get_feature_by_feature_ambiguous(self, mocker, backend_fixtures): + mocker.patch("hsfs.engine.get_type", return_value="python") + + # Act + q = ( + TestQuery.fg1.select_all() + .join(TestQuery.fg2.select_all()) + .join(TestQuery.fg3.select_all(), prefix="fg3") + ) + + # Assert + with pytest.raises(FeatureStoreException) as e_info: + q._get_featuregroup_by_feature(feature.Feature("id"))[0] + + assert str( + e_info.value + ) == query.Query.ERROR_MESSAGE_FEATURE_AMBIGUOUS_FG.format("id") + + def test_get_feature_by_feature_non_ambiguous(self, mocker, backend_fixtures): + mocker.patch("hsfs.engine.get_type", return_value="python") + + # Act + q = ( + TestQuery.fg1.select_all() + .join(TestQuery.fg2.select_all()) + .join(TestQuery.fg3.select_all(), prefix="fg3") + ) + + # Assert + assert q._get_featuregroup_by_feature(TestQuery.fg3["id"]) == TestQuery.fg3 From 61f83ef39a4d11fba4132cfbd8490bf87fa56858 Mon Sep 17 00:00:00 2001 From: tdoehmen Date: Sat, 27 May 2023 17:28:09 +0200 Subject: [PATCH 27/29] added add_features_to_collection method --- python/hsfs/constructor/query.py | 31 ++++++++++++++----------------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/python/hsfs/constructor/query.py b/python/hsfs/constructor/query.py index 940ff696a2..b7342ef0f7 100644 --- a/python/hsfs/constructor/query.py +++ b/python/hsfs/constructor/query.py @@ -86,6 +86,12 @@ def _add_to_collection(self, feat, prefix, featuregroup, query_feature=True): ] self._query_feature_list.append(feature_entry) + def _add_features_to_collection(self, features, prefix, featuregroup): + for feat in features: + self._add_to_collection(feat, prefix, featuregroup) + for feat in featuregroup.features: + self._add_to_collection(feat, prefix, featuregroup, query_feature=False) + def _populate_collections(self): self._featuregroups = {self._left_feature_group} self._query_features = {} @@ -93,12 +99,9 @@ def _populate_collections(self): self._featuregroup_features = {} self._filters = self._filter - for feat in self._left_features: - self._add_to_collection(feat, None, self._left_feature_group) - for feat in self._left_feature_group.features: - self._add_to_collection( - feat, None, self._left_feature_group, query_feature=False - ) + self._add_features_to_collection( + self._left_features, None, self._left_feature_group + ) for join_obj in self.joins: self._featuregroups.add(join_obj.query._left_feature_group) @@ -107,17 +110,11 @@ def _populate_collections(self): elif join_obj.query._filter is not None: self._filters = self._filters & join_obj.query._filter - for feat in join_obj.query._left_features: - self._add_to_collection( - feat, join_obj.prefix, join_obj.query._left_feature_group - ) - for feat in join_obj.query._left_feature_group.features: - self._add_to_collection( - feat, - join_obj.prefix, - join_obj.query._left_feature_group, - query_feature=False, - ) + self._add_features_to_collection( + join_obj.query._left_features, + join_obj.prefix, + join_obj.query._left_feature_group, + ) def _get_featuregroup_by_feature(self, feature: Feature): fg_id = feature._feature_group_id From 742c5019fcb22b216fe9dd876b72fcbfd3800034 Mon Sep 17 00:00:00 2001 From: tdoehmen Date: Tue, 6 Jun 2023 18:28:50 +0200 Subject: [PATCH 28/29] collect features on request --- python/hsfs/constructor/query.py | 195 ++++++++++++++++--------------- 1 file changed, 100 insertions(+), 95 deletions(-) diff --git a/python/hsfs/constructor/query.py b/python/hsfs/constructor/query.py index b7342ef0f7..c968de2915 100644 --- a/python/hsfs/constructor/query.py +++ b/python/hsfs/constructor/query.py @@ -69,92 +69,6 @@ def __init__( self._storage_connector_api = storage_connector_api.StorageConnectorApi( feature_store_id ) - if self._left_feature_group is not None and self._left_features is not None: - self._populate_collections() - - def _add_to_collection(self, feat, prefix, featuregroup, query_feature=True): - collection = ( - self._query_features if query_feature else self._featuregroup_features - ) - feature_entry = (feat, prefix, featuregroup) - collection[feat.name] = collection.get(feat.name, []) + [feature_entry] - if query_feature: - if prefix: - name_with_prefix = f"{prefix}{feat.name}" - collection[name_with_prefix] = collection.get(name_with_prefix, []) + [ - feature_entry - ] - self._query_feature_list.append(feature_entry) - - def _add_features_to_collection(self, features, prefix, featuregroup): - for feat in features: - self._add_to_collection(feat, prefix, featuregroup) - for feat in featuregroup.features: - self._add_to_collection(feat, prefix, featuregroup, query_feature=False) - - def _populate_collections(self): - self._featuregroups = {self._left_feature_group} - self._query_features = {} - self._query_feature_list = [] - self._featuregroup_features = {} - self._filters = self._filter - - self._add_features_to_collection( - self._left_features, None, self._left_feature_group - ) - for join_obj in self.joins: - self._featuregroups.add(join_obj.query._left_feature_group) - - if self._filters is None: - self._filters = join_obj.query._filter - elif join_obj.query._filter is not None: - self._filters = self._filters & join_obj.query._filter - - self._add_features_to_collection( - join_obj.query._left_features, - join_obj.prefix, - join_obj.query._left_feature_group, - ) - - def _get_featuregroup_by_feature(self, feature: Feature): - fg_id = feature._feature_group_id - for fg in self.featuregroups: - if fg.id == fg_id: - return fg - - featuregroups_found = self._featuregroup_features.get(feature.name, []) - - if len(featuregroups_found) == 1: - return featuregroups_found[0][2] - elif len(featuregroups_found) == 0: - raise FeatureStoreException( - Query.ERROR_MESSAGE_FEATURE_NOT_FOUND_FG.format(feature.name) - ) - - raise FeatureStoreException( - Query.ERROR_MESSAGE_FEATURE_AMBIGUOUS_FG.format(feature.name) - ) - - def _get_feature_by_name( - self, - feature_name: str, - ): - if feature_name not in self._query_features: - raise FeatureStoreException( - Query.ERROR_MESSAGE_FEATURE_NOT_FOUND.format(feature_name) - ) - feats = self._query_features[feature_name] - - if len(feats) == 1: - return feats[0] - - feats_without_prefix = [feat for feat in feats if feat[1] is None] - if len(feats_without_prefix) == 1: - return feats_without_prefix[0] - - raise FeatureStoreException( - Query.ERROR_MESSAGE_FEATURE_AMBIGUOUS.format(feature_name) - ) def _prep_read(self, online, read_options): fs_query = self._query_constructor_api.construct_query(self) @@ -316,8 +230,6 @@ def join( join.Join(sub_query, on, left_on, right_on, join_type.upper(), prefix) ) - self._populate_collections() - return self def as_of( @@ -494,8 +406,6 @@ def filter(self, f: Union[Filter, Logic]): elif self._filter is not None: self._filter = self._filter & f - self._populate_collections() - return self def json(self): @@ -615,8 +525,6 @@ def append_feature(self, feature): self._left_features.append(feature) - self._populate_collections() - return self def is_time_travel(self): @@ -633,6 +541,85 @@ def is_cache_feature_group_only(self): [isinstance(fg, feature_group.FeatureGroup) for fg in self.featuregroups] ) + def _get_featuregroup_by_feature(self, feature: Feature): + # search for feature by id, and return the fg object + fg_id = feature._feature_group_id + for fg in self.featuregroups: + if fg.id == fg_id: + return fg + + # search for feature by name and collect fg objects + featuregroup_features = {} + for feat in self._left_feature_group.features: + featuregroup_features[feat.name] = featuregroup_features.get( + feat.name, [] + ) + [self._left_feature_group] + for join_obj in self.joins: + for feat in join_obj.query._left_feature_group.features: + featuregroup_features[feat.name] = featuregroup_features.get( + feat.name, [] + ) + [join_obj.query._left_feature_group] + featuregroups_found = featuregroup_features.get(feature.name, []) + + if len(featuregroups_found) > 1: + raise FeatureStoreException( + Query.ERROR_MESSAGE_FEATURE_AMBIGUOUS_FG.format(feature.name) + ) + elif len(featuregroups_found) == 1: + return featuregroups_found[0] + + raise FeatureStoreException( + Query.ERROR_MESSAGE_FEATURE_NOT_FOUND_FG.format(feature.name) + ) + + def _get_feature_by_name( + self, + feature_name: str, + ): + # collect a dict that maps feature names -> (feature, prefix, fg) + query_features = {} + for feat in self._left_features: + feature_entry = (feat, None, self._left_feature_group) + query_features[feat.name] = query_features.get(feat.name, []) + [ + feature_entry + ] + for join_obj in self.joins: + for feat in join_obj.query._left_features: + feature_entry = ( + feat, + join_obj.prefix, + join_obj.query._left_feature_group, + ) + query_features[feat.name] = query_features.get(feat.name, []) + [ + feature_entry + ] + # if the join has a prefix, add a lookup for "prefix.feature_name" + if join_obj.prefix: + name_with_prefix = f"{join_obj.prefix}{feat.name}" + query_features[name_with_prefix] = query_features.get( + name_with_prefix, [] + ) + [feature_entry] + + if feature_name not in query_features: + raise FeatureStoreException( + Query.ERROR_MESSAGE_FEATURE_NOT_FOUND.format(feature_name) + ) + + # return (feature, prefix, fg) tuple, if only one match was found + feats = query_features[feature_name] + if len(feats) == 1: + return feats[0] + + # if multiple matches were found, return the one without prefix + feats_without_prefix = [feat for feat in feats if feat[1] is None] + if len(feats_without_prefix) == 1: + return feats_without_prefix[0] + + # there were multiple matches and + raise FeatureStoreException( + Query.ERROR_MESSAGE_FEATURE_AMBIGUOUS.format(feature_name) + ) + @property def joins(self): """List of joins in the query""" @@ -641,17 +628,35 @@ def joins(self): @property def featuregroups(self): """List of feature groups used in the query""" - return list(self._featuregroups) + featuregroups = {self._left_feature_group} + for join_obj in self.joins: + featuregroups.add(join_obj.query._left_feature_group) + return list(featuregroups) @property def filters(self): """All filters used in the query""" - return self._filters + filters = self._filter + for join_obj in self.joins: + if filters is None: + filters = join_obj.query._filter + elif join_obj.query._filter is not None: + filters = filters & join_obj.query._filter + + return filters @property def features(self): """List of all features in the query""" - return [feat[0] for feat in self._query_feature_list] + features = [] + for feat in self._left_features: + features.append(feat) + + for join_obj in self.joins: + for feat in join_obj.query._left_features: + features.append(feat) + + return features def get_feature(self, feature_name): """ From 1c35b47663de240dd3c81cfaa624ce88008ee7b2 Mon Sep 17 00:00:00 2001 From: tdoehmen Date: Tue, 6 Jun 2023 18:31:40 +0200 Subject: [PATCH 29/29] fixed comment --- python/hsfs/constructor/query.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/hsfs/constructor/query.py b/python/hsfs/constructor/query.py index 282906459e..e8ef5751a3 100644 --- a/python/hsfs/constructor/query.py +++ b/python/hsfs/constructor/query.py @@ -631,7 +631,7 @@ def _get_feature_by_name( if len(feats_without_prefix) == 1: return feats_without_prefix[0] - # there were multiple matches and + # there were multiple ambiguous matches raise FeatureStoreException( Query.ERROR_MESSAGE_FEATURE_AMBIGUOUS.format(feature_name) )