Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fstore 855 #17

Open
wants to merge 34 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
80ab231
working state
tdoehmen May 12, 2023
84df78c
Merge branch 'master' of https://github.com/tdoehmen/feature-store-ap…
tdoehmen May 12, 2023
51f8c46
refactored query parameters
tdoehmen May 12, 2023
5e7856d
removed numeric parameter
tdoehmen May 12, 2023
1ae8865
fix bug
tdoehmen May 12, 2023
3dba7fe
look in all features in all featuregroups
tdoehmen May 12, 2023
48f1145
support excluded features
tdoehmen May 12, 2023
eb57d6e
added features property to featureview
tdoehmen May 12, 2023
c45f51c
include transformation functions in repr
tdoehmen May 12, 2023
8cab66a
fix column check on join
tdoehmen May 12, 2023
f8b0809
updated warning messages
tdoehmen May 12, 2023
30aa095
more refactoring, fixed tests
tdoehmen May 16, 2023
0a4516a
fixed tests, added filter check
tdoehmen May 16, 2023
6ad85c9
fixed test
tdoehmen May 16, 2023
6196ec5
fixed ambiguity issue with filter params
tdoehmen May 16, 2023
12bcda5
fixed tests and cleanup
tdoehmen May 16, 2023
3a2068d
check full filter
tdoehmen May 16, 2023
ed61159
fix append feature and check filter there as well
tdoehmen May 16, 2023
f2dd9a8
don't check merged filter on .filter
tdoehmen May 16, 2023
1d2538b
working state
tdoehmen May 16, 2023
0bbae9e
removed checks
tdoehmen May 16, 2023
bdd4bec
some renaming
tdoehmen May 16, 2023
dd7b49c
made lookup function private/protected
tdoehmen May 16, 2023
042cde5
added docs
tdoehmen May 17, 2023
bbc2ee5
fixed cached check
tdoehmen May 17, 2023
f1a3133
updated error messages, reverted transformation function test changes
tdoehmen May 17, 2023
0a09939
added tests
tdoehmen May 17, 2023
61f83ef
added add_features_to_collection method
tdoehmen May 27, 2023
ad21415
Merge branch 'master' into FSTORE-855
tdoehmen Jun 1, 2023
742c501
collect features on request
tdoehmen Jun 6, 2023
eb95349
Merge branch 'FSTORE-855' of https://github.com/tdoehmen/feature-stor…
tdoehmen Jun 6, 2023
9bc8b40
Merge branch 'master' into FSTORE-855
tdoehmen Jun 6, 2023
1c35b47
fixed comment
tdoehmen Jun 6, 2023
0e9146c
Merge branch 'master' into FSTORE-855
tdoehmen Jun 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 180 additions & 29 deletions python/hsfs/constructor/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,28 @@
from hsfs.core import query_constructor_api, storage_connector_api, arrow_flight_client
from hsfs.constructor import join
from hsfs.constructor.filter import Filter, Logic
from hsfs.client.exceptions import FeatureStoreException
from hsfs.feature import Feature


class Query:

ERROR_MESSAGE_FEATURE_AMBIGUOUS = (
"Provided feature name '{}' is ambiguous and exists in more than one feature group. "
"Consider prepending the prefix specified in the join."
)
ERROR_MESSAGE_FEATURE_AMBIGUOUS_FG = (
"Feature name '{}' is ambiguous and exists in more than one feature group. "
"Consider accessing the feature through the feature group object when specifying the query."
)
ERROR_MESSAGE_FEATURE_NOT_FOUND = (
"Feature name '{}' could not found be found in query."
)
ERROR_MESSAGE_FEATURE_NOT_FOUND_FG = (
"Feature name '{}' could not be found in "
"any of the featuregroups in this query."
)

def __init__(
self,
left_feature_group,
Expand Down Expand Up @@ -61,7 +80,7 @@ def _prep_read(self, online, read_options):
online_conn = None

if engine.get_instance().is_flyingduck_query_supported(self, read_options):
sql_query = arrow_flight_client.get_instance()._construct_query_object(
sql_query = arrow_flight_client.get_instance().create_query_object(
self, sql_query
)
else:
Expand Down Expand Up @@ -121,7 +140,7 @@ def read(
and "pandas_types" in read_options
and read_options["pandas_types"]
):
schema = self._collect_features()
schema = self.features
if len(self.joins) > 0 or None in [f.type for f in schema]:
raise ValueError(
"Pandas types casting only supported for feature_group.read()/query.select_all()"
Expand All @@ -136,13 +155,6 @@ def read(
schema,
)

def _collect_features(self):
features = []
features.extend(self.features)
for j in self.joins:
features.extend(j.query.features)
return features

def show(self, n: int, online: Optional[bool] = False):
"""Show the first N rows of the Query.

Expand Down Expand Up @@ -180,7 +192,7 @@ def join(

If no join keys are specified, Hopsworks will use the maximal matching subset of
the primary keys of the feature groups you are joining.
Joins of one level are supported, no neted joins.
Joins of one level are supported, no nested joins.

!!! example "Join two feature groups"
```python
Expand Down Expand Up @@ -220,6 +232,7 @@ def join(
self._joins.append(
join.Join(sub_query, on, left_on, right_on, join_type.upper(), prefix)
)

return self

def as_of(
Expand Down Expand Up @@ -395,13 +408,8 @@ def filter(self, f: Union[Filter, Logic]):
)
elif self._filter is not None:
self._filter = self._filter & f
return self

def from_cache_feature_group_only(self):
for _query in [join.query for join in self._joins] + [self]:
if not isinstance(_query._left_feature_group, feature_group.FeatureGroup):
return False
return True
return self

def json(self):
return json.dumps(self, cls=util.FeatureStoreEncoder)
Expand Down Expand Up @@ -466,7 +474,7 @@ def _hopsworks_json(cls, json_dict):
It does not fully deserialize the message as the usecase is to
send it straight back to Hopsworks to read the content of the query

Args:
Arguments:
json_dict (str): a json string containing a query object

Returns:
Expand Down Expand Up @@ -506,10 +514,12 @@ def __str__(self):

@property
def left_feature_group_start_time(self):
"""Start time of time travel for the left feature group."""
return self._left_feature_group_start_time

@property
def left_feature_group_end_time(self):
"""End time of time travel for the left feature group."""
return self._left_feature_group_end_time

@left_feature_group_start_time.setter
Expand All @@ -520,31 +530,172 @@ def left_feature_group_start_time(self, left_feature_group_start_time):
def left_feature_group_end_time(self, left_feature_group_end_time):
self._left_feature_group_end_time = left_feature_group_end_time

@property
def features(self):
return self._left_features

def append_feature(self, feature):
"""
!!! example
```python
fg1 = fs.get_feature_group("...")
fg2 = fs.get_feature_group("...")
Append a feature to the query.

query = fg1.select_all().join(fg2.select_all())

query.append_feature('feature_name')
```
# Arguments
feature: `[str, Feature]`. Name of the feature to append to the query.
"""
feature = util.validate_feature(feature)

self._left_features.append(feature)

return self

def is_time_travel(self):
"""Query contains time travel"""
return (
self.left_feature_group_start_time
or self.left_feature_group_end_time
or any([_join.query.is_time_travel() for _join in self._joins])
)

def is_cache_feature_group_only(self):
"""Query contains only cached feature groups"""
return all(
[isinstance(fg, feature_group.FeatureGroup) for fg in self.featuregroups]
)

def _get_featuregroup_by_feature(self, feature: Feature):
# search for feature by id, and return the fg object
fg_id = feature._feature_group_id
for fg in self.featuregroups:
if fg.id == fg_id:
return fg

# search for feature by name and collect fg objects
featuregroup_features = {}
for feat in self._left_feature_group.features:
featuregroup_features[feat.name] = featuregroup_features.get(
feat.name, []
) + [self._left_feature_group]
for join_obj in self.joins:
for feat in join_obj.query._left_feature_group.features:
featuregroup_features[feat.name] = featuregroup_features.get(
feat.name, []
) + [join_obj.query._left_feature_group]
featuregroups_found = featuregroup_features.get(feature.name, [])

if len(featuregroups_found) > 1:
raise FeatureStoreException(
Query.ERROR_MESSAGE_FEATURE_AMBIGUOUS_FG.format(feature.name)
)
elif len(featuregroups_found) == 1:
return featuregroups_found[0]

raise FeatureStoreException(
Query.ERROR_MESSAGE_FEATURE_NOT_FOUND_FG.format(feature.name)
)

def _get_feature_by_name(
self,
feature_name: str,
):
# collect a dict that maps feature names -> (feature, prefix, fg)
query_features = {}
for feat in self._left_features:
feature_entry = (feat, None, self._left_feature_group)
query_features[feat.name] = query_features.get(feat.name, []) + [
feature_entry
]
for join_obj in self.joins:
for feat in join_obj.query._left_features:
feature_entry = (
feat,
join_obj.prefix,
join_obj.query._left_feature_group,
)
query_features[feat.name] = query_features.get(feat.name, []) + [
feature_entry
]
# if the join has a prefix, add a lookup for "prefix.feature_name"
if join_obj.prefix:
name_with_prefix = f"{join_obj.prefix}{feat.name}"
query_features[name_with_prefix] = query_features.get(
name_with_prefix, []
) + [feature_entry]

if feature_name not in query_features:
raise FeatureStoreException(
Query.ERROR_MESSAGE_FEATURE_NOT_FOUND.format(feature_name)
)

# return (feature, prefix, fg) tuple, if only one match was found
feats = query_features[feature_name]
if len(feats) == 1:
return feats[0]

# if multiple matches were found, return the one without prefix
feats_without_prefix = [feat for feat in feats if feat[1] is None]
if len(feats_without_prefix) == 1:
return feats_without_prefix[0]

# there were multiple ambiguous matches
raise FeatureStoreException(
Query.ERROR_MESSAGE_FEATURE_AMBIGUOUS.format(feature_name)
)

@property
def joins(self):
"""List of joins in the query"""
return self._joins

@property
def featuregroups(self):
"""List of feature groups used in the query"""
featuregroups = {self._left_feature_group}
for join_obj in self.joins:
featuregroups.add(join_obj.query._left_feature_group)
return list(featuregroups)

@property
def filters(self):
"""All filters used in the query"""
filters = self._filter
for join_obj in self.joins:
if filters is None:
filters = join_obj.query._filter
elif join_obj.query._filter is not None:
filters = filters & join_obj.query._filter

return filters

@property
def features(self):
"""List of all features in the query"""
features = []
for feat in self._left_features:
features.append(feat)

for join_obj in self.joins:
for feat in join_obj.query._left_features:
features.append(feat)

return features

def get_feature(self, feature_name):
"""
Get a feature by name.

# Arguments
feature_name: `str`. Name of the feature to get.

# Returns
`Feature`. Feature object.
"""
return self._get_feature_by_name(feature_name)[0]

def __getattr__(self, name):
try:
return self.__getitem__(name)
except FeatureStoreException:
raise AttributeError(f"'Query' object has no attribute '{name}'. ")

def __getitem__(self, name):
if not isinstance(name, str):
raise TypeError(
f"Expected type `str`, got `{type(name)}`. "
"Features are accessible by name."
)
return self.get_feature(name)
Loading