Skip to content

Commit

Permalink
feat: enable feature store batch serve to Pandas DataFrame; fix: read…
Browse files Browse the repository at this point in the history
… instances uri for batch serve (#983)

* feat: add batch_serve_to_df, add unit tests, add integration tests; update: setup.py; fix: read_instances bug for batch_serve_to_*

* fix: add self.wait() before self._parse_resource_name, add ignore_index=True for pd.concat and return pd.DataFrame upon empty frames

* fix: setup.py versioning conflict

* fix: optional_sync
  • Loading branch information
morgandu authored Feb 4, 2022
1 parent 11d9af3 commit e0fec36
Show file tree
Hide file tree
Showing 6 changed files with 565 additions and 189 deletions.
47 changes: 37 additions & 10 deletions google/cloud/aiplatform/featurestore/entity_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,16 +123,21 @@ def __init__(
location=self.location, credentials=credentials,
)

@property
def featurestore_name(self) -> str:
"""Full qualified resource name of the managed featurestore in which this EntityType is."""
def _get_featurestore_name(self) -> str:
"""Gets full qualified resource name of the managed featurestore in which this EntityType is."""
entity_type_name_components = self._parse_resource_name(self.resource_name)
return featurestore.Featurestore._format_resource_name(
project=entity_type_name_components["project"],
location=entity_type_name_components["location"],
featurestore=entity_type_name_components["featurestore"],
)

@property
def featurestore_name(self) -> str:
"""Full qualified resource name of the managed featurestore in which this EntityType is."""
self.wait()
return self._get_featurestore_name()

def get_featurestore(self) -> "featurestore.Featurestore":
"""Retrieves the managed featurestore in which this EntityType is.
Expand All @@ -141,7 +146,7 @@ def get_featurestore(self) -> "featurestore.Featurestore":
"""
return featurestore.Featurestore(self.featurestore_name)

def get_feature(self, feature_id: str) -> "featurestore.Feature":
def _get_feature(self, feature_id: str) -> "featurestore.Feature":
"""Retrieves an existing managed feature in this EntityType.
Args:
Expand All @@ -151,7 +156,6 @@ def get_feature(self, feature_id: str) -> "featurestore.Feature":
featurestore.Feature - The managed feature resource object.
"""
entity_type_name_components = self._parse_resource_name(self.resource_name)

return featurestore.Feature(
feature_name=featurestore.Feature._format_resource_name(
project=entity_type_name_components["project"],
Expand All @@ -162,6 +166,18 @@ def get_feature(self, feature_id: str) -> "featurestore.Feature":
)
)

def get_feature(self, feature_id: str) -> "featurestore.Feature":
"""Retrieves an existing managed feature in this EntityType.
Args:
feature_id (str):
Required. The managed feature resource ID in this EntityType.
Returns:
featurestore.Feature - The managed feature resource object.
"""
self.wait()
return self._get_feature(feature_id=feature_id)

def update(
self,
description: Optional[str] = None,
Expand Down Expand Up @@ -202,6 +218,7 @@ def update(
Returns:
EntityType - The updated entityType resource object.
"""
self.wait()
update_mask = list()

if description:
Expand Down Expand Up @@ -380,6 +397,7 @@ def list_features(
Returns:
List[featurestore.Feature] - A list of managed feature resource objects.
"""
self.wait()
return featurestore.Feature.list(
entity_type_name=self.resource_name, filter=filter, order_by=order_by,
)
Expand All @@ -399,7 +417,7 @@ def delete_features(self, feature_ids: List[str], sync: bool = True,) -> None:
"""
features = []
for feature_id in feature_ids:
feature = self.get_feature(feature_id=feature_id)
feature = self._get_feature(feature_id=feature_id)
feature.delete(sync=False)
features.append(feature)

Expand Down Expand Up @@ -626,6 +644,7 @@ def create_feature(
featurestore.Feature - feature resource object
"""
self.wait()
return featurestore.Feature.create(
feature_id=feature_id,
value_type=value_type,
Expand Down Expand Up @@ -761,8 +780,9 @@ def batch_create_features(

return self

@staticmethod
def _validate_and_get_import_feature_values_request(
self,
entity_type_name: str,
feature_ids: List[str],
feature_time: Union[str, datetime.datetime],
data_source: Union[gca_io.AvroSource, gca_io.BigQuerySource, gca_io.CsvSource],
Expand All @@ -773,6 +793,8 @@ def _validate_and_get_import_feature_values_request(
) -> gca_featurestore_service.ImportFeatureValuesRequest:
"""Validates and get import feature values request.
Args:
entity_type_name (str):
Required. A fully-qualified entityType resource name.
feature_ids (List[str]):
Required. IDs of the Feature to import values
of. The Features must exist in the target
Expand Down Expand Up @@ -840,7 +862,7 @@ def _validate_and_get_import_feature_values_request(
]

import_feature_values_request = gca_featurestore_service.ImportFeatureValuesRequest(
entity_type=self.resource_name,
entity_type=entity_type_name,
feature_specs=feature_specs,
entity_id_field=entity_id_field,
disable_online_serving=disable_online_serving,
Expand Down Expand Up @@ -992,6 +1014,7 @@ def ingest_from_bq(
bigquery_source = gca_io.BigQuerySource(input_uri=bq_source_uri)

import_feature_values_request = self._validate_and_get_import_feature_values_request(
entity_type_name=self.resource_name,
feature_ids=feature_ids,
feature_time=feature_time,
data_source=bigquery_source,
Expand Down Expand Up @@ -1114,6 +1137,7 @@ def ingest_from_gcs(
data_source = gca_io.AvroSource(gcs_source=gcs_source)

import_feature_values_request = self._validate_and_get_import_feature_values_request(
entity_type_name=self.resource_name,
feature_ids=feature_ids,
feature_time=feature_time,
data_source=data_source,
Expand Down Expand Up @@ -1213,6 +1237,7 @@ def ingest_from_df(
project=self.project, credentials=self.credentials
)

self.wait()
entity_type_name_components = self._parse_resource_name(self.resource_name)
featurestore_id, entity_type_id = (
entity_type_name_components["featurestore"],
Expand All @@ -1222,6 +1247,8 @@ def ingest_from_df(
temp_bq_dataset_name = f"temp_{featurestore_id}_{uuid.uuid4()}".replace(
"-", "_"
)

# TODO(b/216497263): Add support for resource project does not match initializer.global_config.project
temp_bq_dataset_id = f"{initializer.global_config.project}.{temp_bq_dataset_name}"[
:1024
]
Expand Down Expand Up @@ -1297,7 +1324,7 @@ def read(
Returns:
pd.DataFrame: entities' feature values in DataFrame
"""

self.wait()
if isinstance(feature_ids, str):
feature_ids = [feature_ids]

Expand Down Expand Up @@ -1339,7 +1366,7 @@ def read(
feature_descriptor.id for feature_descriptor in header.feature_descriptors
]

return EntityType._construct_dataframe(
return self._construct_dataframe(
feature_ids=feature_ids, entity_views=entity_views,
)

Expand Down
25 changes: 17 additions & 8 deletions google/cloud/aiplatform/featurestore/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,17 +122,21 @@ def __init__(
else featurestore_id,
)

@property
def featurestore_name(self) -> str:
"""Full qualified resource name of the managed featurestore in which this Feature is."""
def _get_featurestore_name(self) -> str:
"""Gets full qualified resource name of the managed featurestore in which this Feature is."""
feature_path_components = self._parse_resource_name(self.resource_name)

return featurestore.Featurestore._format_resource_name(
project=feature_path_components["project"],
location=feature_path_components["location"],
featurestore=feature_path_components["featurestore"],
)

@property
def featurestore_name(self) -> str:
"""Full qualified resource name of the managed featurestore in which this Feature is."""
self.wait()
return self._get_featurestore_name()

def get_featurestore(self) -> "featurestore.Featurestore":
"""Retrieves the managed featurestore in which this Feature is.
Expand All @@ -141,18 +145,22 @@ def get_featurestore(self) -> "featurestore.Featurestore":
"""
return featurestore.Featurestore(featurestore_name=self.featurestore_name)

@property
def entity_type_name(self) -> str:
"""Full qualified resource name of the managed entityType in which this Feature is."""
def _get_entity_type_name(self) -> str:
"""Gets full qualified resource name of the managed entityType in which this Feature is."""
feature_path_components = self._parse_resource_name(self.resource_name)

return featurestore.EntityType._format_resource_name(
project=feature_path_components["project"],
location=feature_path_components["location"],
featurestore=feature_path_components["featurestore"],
entity_type=feature_path_components["entity_type"],
)

@property
def entity_type_name(self) -> str:
"""Full qualified resource name of the managed entityType in which this Feature is."""
self.wait()
return self._get_entity_type_name()

def get_entity_type(self) -> "featurestore.EntityType":
"""Retrieves the managed entityType in which this Feature is.
Expand Down Expand Up @@ -203,6 +211,7 @@ def update(
Returns:
Feature - The updated feature resource object.
"""
self.wait()
update_mask = list()

if description:
Expand Down
Loading

0 comments on commit e0fec36

Please sign in to comment.