diff --git a/providers/google/docs/operators/cloud/vertex_ai.rst b/providers/google/docs/operators/cloud/vertex_ai.rst index 646cb4fee51a0..6e23cd2d934d8 100644 --- a/providers/google/docs/operators/cloud/vertex_ai.rst +++ b/providers/google/docs/operators/cloud/vertex_ai.rst @@ -664,6 +664,36 @@ The operator returns the cached content response in :ref:`XCom ` Interacting with Vertex AI Feature Store ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +To create feature online store you can use +:class:`~airflow.providers.google.cloud.operators.vertex_ai.feature_store.CreateFeatureOnlineStoreOperator`. +The operator creation results in :ref:`XCom ` under ``return_value`` key. + +.. exampleinclude:: /../../google/tests/system/google/cloud/vertex_ai/example_vertex_ai_feature_store.py + :language: python + :dedent: 4 + :start-after: [START how_to_cloud_vertex_ai_create_feature_online_store_operator] + :end-before: [END how_to_cloud_vertex_ai_create_feature_online_store_operator] + +To create feature store view you can use +:class:`~airflow.providers.google.cloud.operators.vertex_ai.feature_store.CreateFeatureViewOperator`. +The operator creation results in :ref:`XCom ` under ``return_value`` key. + +.. exampleinclude:: /../../google/tests/system/google/cloud/vertex_ai/example_vertex_ai_feature_store.py + :language: python + :dedent: 4 + :start-after: [START how_to_cloud_vertex_ai_create_feature_view_store_operator] + :end-before: [END how_to_cloud_vertex_ai_create_feature_view_store_operator] + +To feature online store you can use +:class:`~airflow.providers.google.cloud.operators.vertex_ai.feature_store.GetFeatureOnlineStoreOperator`. +The operator creation results in :ref:`XCom ` under ``return_value`` key. + +.. exampleinclude:: /../../google/tests/system/google/cloud/vertex_ai/example_vertex_ai_feature_store.py + :language: python + :dedent: 4 + :start-after: [START how_to_cloud_vertex_ai_get_feature_online_store_operator] + :end-before: [END how_to_cloud_vertex_ai_get_feature_online_store_operator] + To get a feature view sync job you can use :class:`~airflow.providers.google.cloud.operators.vertex_ai.feature_store.GetFeatureViewSyncOperator`. The operator returns sync job results in :ref:`XCom ` under ``return_value`` key. @@ -693,6 +723,33 @@ To check if Feature View Sync succeeded you can use :start-after: [START how_to_cloud_vertex_ai_feature_store_feature_view_sync_sensor] :end-before: [END how_to_cloud_vertex_ai_feature_store_feature_view_sync_sensor] +To check feature values data you can use the +:class:`~airflow.providers.google.cloud.sensors.vertex_ai.FetchFeatureValuesOperator`. + +.. exampleinclude:: /../../google/tests/system/google/cloud/vertex_ai/example_vertex_ai_feature_store.py + :language: python + :dedent: 4 + :start-after: [START how_to_cloud_vertex_ai_fetch_feature_values_operator] + :end-before: [END how_to_cloud_vertex_ai_fetch_feature_values_operator] + +To delete the feature view you can use +:class:`~airflow.providers.google.cloud.sensors.vertex_ai.DeleteFeatureViewOperator`. + +.. exampleinclude:: /../../google/tests/system/google/cloud/vertex_ai/example_vertex_ai_feature_store.py + :language: python + :dedent: 4 + :start-after: [START how_to_cloud_vertex_ai_delete_feature_view_operator] + :end-before: [END how_to_cloud_vertex_ai_delete_feature_view_operator] + +To delete the feature online store you can use +:class:`~airflow.providers.google.cloud.sensors.vertex_ai.DeleteFeatureOnlineStoreOperator`. + +.. exampleinclude:: /../../google/tests/system/google/cloud/vertex_ai/example_vertex_ai_feature_store.py + :language: python + :dedent: 4 + :start-after: [START how_to_cloud_vertex_ai_delete_feature_online_store_operator] + :end-before: [END how_to_cloud_vertex_ai_delete_feature_online_store_operator] + Interacting with Ray on Vertex AI Cluster ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/providers/google/src/airflow/providers/google/cloud/hooks/vertex_ai/feature_store.py b/providers/google/src/airflow/providers/google/cloud/hooks/vertex_ai/feature_store.py index 69c7b69f8dad5..118ea6a864f82 100644 --- a/providers/google/src/airflow/providers/google/cloud/hooks/vertex_ai/feature_store.py +++ b/providers/google/src/airflow/providers/google/cloud/hooks/vertex_ai/feature_store.py @@ -18,15 +18,32 @@ from __future__ import annotations +from collections.abc import Sequence +from typing import ( + TYPE_CHECKING, +) + from google.api_core.client_options import ClientOptions +from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault from google.cloud.aiplatform_v1beta1 import ( FeatureOnlineStoreAdminServiceClient, + FeatureOnlineStoreServiceClient, ) from airflow.exceptions import AirflowException from airflow.providers.google.common.consts import CLIENT_INFO from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID, GoogleBaseHook +if TYPE_CHECKING: + from google.api_core.operation import Operation + from google.api_core.retry import Retry + from google.cloud.aiplatform_v1beta1.types import ( + FeatureOnlineStore, + FeatureView, + FeatureViewDataKey, + FetchFeatureValuesResponse, + ) + class FeatureStoreHook(GoogleBaseHook): """ @@ -48,6 +65,19 @@ class FeatureStoreHook(GoogleBaseHook): originating account. """ + @staticmethod + def _get_client_options( + location: str | None = None, + custom_endpoint: str | None = None, + ) -> ClientOptions: + if custom_endpoint: + client_options = ClientOptions(api_endpoint=custom_endpoint) + elif location and location != "global": + client_options = ClientOptions(api_endpoint=f"{location}-aiplatform.googleapis.com:443") + else: + client_options = ClientOptions() + return client_options + def get_feature_online_store_admin_service_client( self, location: str | None = None, @@ -62,12 +92,153 @@ def get_feature_online_store_admin_service_client( If provided and not 'global', the client will be configured to use the region-specific API endpoint. """ - if location and location != "global": - client_options = ClientOptions(api_endpoint=f"{location}-aiplatform.googleapis.com:443") - else: - client_options = ClientOptions() return FeatureOnlineStoreAdminServiceClient( - credentials=self.get_credentials(), client_info=CLIENT_INFO, client_options=client_options + credentials=self.get_credentials(), + client_info=CLIENT_INFO, + client_options=self._get_client_options(location), + ) + + def get_feature_online_store_service_client( + self, + location: str | None = None, + custom_endpoint: str | None = None, + ) -> FeatureOnlineStoreServiceClient: + return FeatureOnlineStoreServiceClient( + credentials=self.get_credentials(), + client_info=CLIENT_INFO, + client_options=self._get_client_options(location, custom_endpoint), + ) + + @GoogleBaseHook.fallback_to_default_project_id + def create_feature_online_store( + self, + feature_online_store_id: str, + feature_online_store: FeatureOnlineStore, + project_id: str = PROVIDE_PROJECT_ID, + location: str | None = None, + timeout: float | _MethodDefault = DEFAULT, + retry: Retry | _MethodDefault | None = DEFAULT, + metadata: Sequence[tuple[str, str]] = (), + ) -> Operation: + """ + Create and sends request for Feature Online store. + + This method initiates VertexAI Feature Online Store creation request. + Feature Online Store aims to serve and manage features data as a part of VertexAI MLOps. + + :param feature_online_store_id: The ID of the online feature store. + :param feature_online_store: The configuration of the online repository. + :param project_id: The ID of the Google Cloud project that contains the + feature store. If not provided, will attempt to determine from the environment. + :param location: The Google Cloud region where the feature store is located + (e.g., 'us-central1', 'us-east1'). + :param retry: Designation of what errors, if any, should be retried. + :param timeout: The timeout for this request. + :param metadata: Strings which should be sent along with the request as metadata. + """ + client = self.get_feature_online_store_admin_service_client(location) + return client.create_feature_online_store( + request={ + "parent": f"projects/{project_id}/locations/{location}", + "feature_online_store_id": feature_online_store_id, + "feature_online_store": feature_online_store, + }, + timeout=timeout, + retry=retry, + metadata=metadata, + ) + + @GoogleBaseHook.fallback_to_default_project_id + def get_feature_online_store( + self, + feature_online_store_id: str, + project_id: str = PROVIDE_PROJECT_ID, + location: str | None = None, + timeout: float | _MethodDefault = DEFAULT, + retry: Retry | _MethodDefault | None = DEFAULT, + metadata: Sequence[tuple[str, str]] = (), + ) -> FeatureOnlineStore: + """ + Get Feature Online store data. + + Get the FeatureOnlineStore details. + Vertex AI Feature Online Store provides a centralized repository for serving ML features + and embedding indexes at low latency. + + :param feature_online_store_id: The ID of the online feature store. + :param project_id: The ID of the Google Cloud project that contains the + feature store. If not provided, will attempt to determine from the environment. + :param location: The Google Cloud region where the feature store is located + (e.g., 'us-central1', 'us-east1'). + :param retry: Designation of what errors, if any, should be retried. + :param timeout: The timeout for this request. + :param metadata: Strings which should be sent along with the request as metadata. + """ + client = self.get_feature_online_store_admin_service_client(location) + return client.get_feature_online_store( + name=f"projects/{project_id}/locations/{location}/featureOnlineStores/{feature_online_store_id}", + timeout=timeout, + retry=retry, + metadata=metadata, + ) + + @staticmethod + def _get_featurestore_public_endpoint(feature_online_store: FeatureOnlineStore): + public_endpoint = None + featurestore_data = type(feature_online_store).to_dict(feature_online_store) + if "dedicated_serving_endpoint" in featurestore_data: + public_endpoint = featurestore_data["dedicated_serving_endpoint"].get( + "public_endpoint_domain_name" + ) + return public_endpoint + + @GoogleBaseHook.fallback_to_default_project_id + def create_feature_view( + self, + feature_view_id: str, + feature_view: FeatureView, + feature_online_store_id: str, + project_id: str = PROVIDE_PROJECT_ID, + run_sync_immediately: bool = False, + location: str | None = None, + timeout: float | _MethodDefault = DEFAULT, + retry: Retry | _MethodDefault | None = DEFAULT, + metadata: Sequence[tuple[str, str]] = (), + ) -> Operation: + """ + Create request for Feature View creation. + + This method initiates VertexAI Feature View request for the existing Feature Online Store. + Feature View represents features and data according to the source provided. + + :param feature_view_id: The ID to use for the FeatureView, which will + become the final component of the FeatureView's resource name. + This value may be up to 60 characters, and valid characters are ``[a-z0-9_]``. + The first character cannot be a number. The value must be unique within a FeatureOnlineStore. + :param feature_view: The configuration of the FeatureView to create. + :param feature_online_store_id: The ID of the online feature store. + :param run_sync_immediately: If set to true, one on demand sync will be run + immediately, regardless the FeatureView.sync_config. + :param project_id: The ID of the Google Cloud project that contains the + feature store. If not provided, will attempt to determine from the environment. + :param location: The Google Cloud region where the feature store is located + (e.g., 'us-central1', 'us-east1'). + :param retry: Designation of what errors, if any, should be retried. + :param timeout: The timeout for this request. + :param metadata: Strings which should be sent along with the request as metadata. + """ + client = self.get_feature_online_store_admin_service_client(location) + return client.create_feature_view( + request={ + "parent": f"projects/{project_id}/locations/" + f"{location}/featureOnlineStores/{feature_online_store_id}", + "feature_view_id": feature_view_id, + "feature_view": feature_view, + "run_sync_immediately": run_sync_immediately, + }, + timeout=timeout, + retry=retry, + metadata=metadata, ) def get_feature_view_sync( @@ -135,13 +306,142 @@ def sync_feature_view( environment. """ client = self.get_feature_online_store_admin_service_client(location) - feature_view = f"projects/{project_id}/locations/{location}/featureOnlineStores/{feature_online_store_id}/featureViews/{feature_view_id}" + feature_view = ( + f"projects/{project_id}/locations/{location}/featureOnlineStores/" + f"{feature_online_store_id}/featureViews/{feature_view_id}" + ) try: response = client.sync_feature_view(feature_view=feature_view) - return str(response.feature_view_sync) except Exception as e: self.log.error("Failed to sync feature view: %s", str(e)) raise AirflowException(str(e)) + + @GoogleBaseHook.fallback_to_default_project_id + def fetch_feature_values( + self, + feature_view_id: str, + feature_online_store_id: str, + entity_id: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, + endpoint_domain_name: str | None = None, + data_key: FeatureViewDataKey | None = None, + data_format: int | None = None, + location: str | None = None, + timeout: float | _MethodDefault = DEFAULT, + retry: Retry | _MethodDefault | None = DEFAULT, + metadata: Sequence[tuple[str, str]] = (), + ) -> FetchFeatureValuesResponse: + """ + Fetch data from the Feature View provided. + + This method fetches data from existing Feature view, filtered by provided (or default) data_key. + Helps to retrieve actual features data hosted in the VertexAI Feature Store. + + :param entity_id: Simple ID to identify Entity to fetch feature values for. + :param endpoint_domain_name: Optional. Public domain name, hosting the content of Optimized + Feature Online store. Should be omitted, if bigtable configuration provided for the FeatureStore, + and default feature store endpoint will be used, based on location provided. + :param feature_view_id: The FeatureView ID to fetch data from. + :param feature_online_store_id: The ID of the online feature store. + :param data_key: Optional. The request key to fetch feature values for. + :param data_format: Optional. Response data format. If not set, FeatureViewDataFormat.KEY_VALUE + will be used. + :param project_id: The ID of the Google Cloud project that contains the + feature store. If not provided, will attempt to determine from the + environment. + :param location: The Google Cloud region where the feature store is located + (e.g., 'us-central1', 'us-east1'). + :param retry: Designation of what errors, if any, should be retried. + :param timeout: The timeout for this request. + :param metadata: Strings which should be sent along with the request as metadata. + """ + data_client = self.get_feature_online_store_service_client(location, endpoint_domain_name) + return data_client.fetch_feature_values( + request={ + "id": entity_id, + "feature_view": f"projects/{project_id}/locations/{location}/featureOnlineStores/" + f"{feature_online_store_id}/featureViews/{feature_view_id}", + "data_key": data_key, + "data_format": data_format, + }, + timeout=timeout, + retry=retry, + metadata=metadata, + ) + + @GoogleBaseHook.fallback_to_default_project_id + def delete_feature_view( + self, + feature_view_id: str, + feature_online_store_id: str, + project_id: str = PROVIDE_PROJECT_ID, + location: str | None = None, + timeout: float | _MethodDefault = DEFAULT, + retry: Retry | _MethodDefault | None = DEFAULT, + metadata: Sequence[tuple[str, str]] = (), + ) -> Operation: + """ + Delete the Feature View. + + This method deletes the Feature View from the Feature Online Store. + + :param feature_view_id: The ID to use for the FeatureView, to be deleted. + :param feature_online_store_id: The ID of the online feature store. + :param project_id: The ID of the Google Cloud project that contains the + feature store. If not provided, will attempt to determine from the + environment. + :param location: The Google Cloud region where the feature store is located + (e.g., 'us-central1', 'us-east1'). + :param retry: Designation of what errors, if any, should be retried. + :param timeout: The timeout for this request. + :param metadata: Strings which should be sent along with the request as metadata. + """ + client = self.get_feature_online_store_admin_service_client(location) + return client.delete_feature_view( + name=f"projects/{project_id}/locations/{location}/featureOnlineStores/{feature_online_store_id}" + f"/featureViews/{feature_view_id}", + timeout=timeout, + retry=retry, + metadata=metadata, + ) + + @GoogleBaseHook.fallback_to_default_project_id + def delete_feature_online_store( + self, + feature_online_store_id: str, + force: bool = False, + project_id: str = PROVIDE_PROJECT_ID, + location: str | None = None, + timeout: float | _MethodDefault = DEFAULT, + retry: Retry | _MethodDefault | None = DEFAULT, + metadata: Sequence[tuple[str, str]] = (), + ) -> Operation: + """ + Delete the FeatureOnlineStore. + + This method deletes the Feature Online Store and all features data. + The FeatureOnlineStore must not contain any FeatureViews. + + :param feature_online_store_id: The ID of the online feature store. + :param force: If set to true, any FeatureViews and Features for this FeatureOnlineStore + will also be deleted. + :param project_id: The ID of the Google Cloud project that contains the + feature store. If not provided, will attempt to determine from the + environment. + :param location: The Google Cloud region where the feature store is located + (e.g., 'us-central1', 'us-east1'). + :param retry: Designation of what errors, if any, should be retried. + :param timeout: The timeout for this request. + :param metadata: Strings which should be sent along with the request as metadata. + """ + client = self.get_feature_online_store_admin_service_client(location) + return client.delete_feature_online_store( + name=f"projects/{project_id}/locations/{location}/featureOnlineStores/{feature_online_store_id}", + force=force, + timeout=timeout, + retry=retry, + metadata=metadata, + ) diff --git a/providers/google/src/airflow/providers/google/cloud/operators/vertex_ai/feature_store.py b/providers/google/src/airflow/providers/google/cloud/operators/vertex_ai/feature_store.py index 318ff25a3a915..2fe98b75221d4 100644 --- a/providers/google/src/airflow/providers/google/cloud/operators/vertex_ai/feature_store.py +++ b/providers/google/src/airflow/providers/google/cloud/operators/vertex_ai/feature_store.py @@ -22,10 +22,23 @@ from collections.abc import Sequence from typing import TYPE_CHECKING, Any +from google.api_core.exceptions import GoogleAPICallError +from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault +from google.cloud.aiplatform_v1beta1.types import FeatureViewDataFormat + +from airflow.exceptions import AirflowException from airflow.providers.google.cloud.hooks.vertex_ai.feature_store import FeatureStoreHook from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator +from airflow.providers.google.common.hooks.operation_helpers import OperationHelper if TYPE_CHECKING: + from google.api_core.retry import Retry + from google.cloud.aiplatform_v1beta1.types import ( + FeatureOnlineStore, + FeatureView, + FeatureViewDataKey, + ) + from airflow.utils.context import Context @@ -101,6 +114,257 @@ def execute(self, context: Context) -> str: return response +class CreateFeatureOnlineStoreOperator(GoogleCloudBaseOperator, OperationHelper): + """ + Create the Feature Online store. + + This method initiates VertexAI Feature Online Store creation request. + Feature Online Store aims to serve and manage features data as a part of VertexAI MLOps. + + :param project_id: Required. The ID of the Google Cloud project that contains the feature store. + This is used to identify which project's resources to interact with. + :param location: Required. The location of the feature store (e.g., 'us-central1', 'us-east1'). + This specifies the Google Cloud region where the feature store resources are located. + :param feature_online_store_id: Required. The ID of the online feature store that contains + the feature view to be synchronized. This store serves as the online serving layer. + :param feature_online_store: FeatureOnlineStore configuration object of the feature online store + to be created. + :param gcp_conn_id: The connection ID to use for connecting to Google Cloud Platform. + Defaults to 'google_cloud_default'. + :param impersonation_chain: Optional service account to impersonate using short-term + credentials. Can be either a single account or a chain of accounts required to + get the access_token of the last account in the list, which will be impersonated + in the request. If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. If set as a sequence, the identities + from the list must grant Service Account Token Creator IAM role to the directly + preceding identity, with first account from the list granting this role to the + originating account. + """ + + template_fields: Sequence[str] = ( + "project_id", + "location", + "feature_online_store_id", + ) + + def __init__( + self, + *, + project_id: str, + location: str, + feature_online_store_id: str, + feature_online_store: FeatureOnlineStore, + timeout: float | _MethodDefault = DEFAULT, + retry: Retry | _MethodDefault | None = DEFAULT, + metadata: Sequence[tuple[str, str]] = (), + gcp_conn_id: str = "google_cloud_default", + impersonation_chain: str | Sequence[str] | None = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + + self.project_id = project_id + self.location = location + self.feature_online_store_id = feature_online_store_id + self.feature_online_store = feature_online_store + self.timeout = timeout + self.retry = retry + self.metadata = metadata + self.gcp_conn_id = gcp_conn_id + self.impersonation_chain = impersonation_chain + + def execute(self, context: Context) -> dict[str, Any]: + """Execute the get feature view sync operation.""" + hook = FeatureStoreHook( + gcp_conn_id=self.gcp_conn_id, + impersonation_chain=self.impersonation_chain, + ) + self.log.info("Creating the Feature Online Store...") + result_operation = hook.create_feature_online_store( + project_id=self.project_id, + location=self.location, + feature_online_store_id=self.feature_online_store_id, + feature_online_store=self.feature_online_store, + timeout=self.timeout, + retry=self.retry, + metadata=self.metadata, + ) + op_result = self.wait_for_operation_result(operation=result_operation) + self.log.info("The Feature Online Store has been created: %s", self.feature_online_store_id) + result = type(op_result).to_dict(op_result) + return result + + +class GetFeatureOnlineStoreOperator(GoogleCloudBaseOperator, OperationHelper): + """ + Get Feature Online store instance. + + This method initiates VertexAI Feature Online Store creation request. + Feature Online Store aims to serve and manage features data as a part of VertexAI MLOps. + + :param project_id: Required. The ID of the Google Cloud project that contains the feature store. + This is used to identify which project's resources to interact with. + :param location: Required. The location of the feature store (e.g., 'us-central1', 'us-east1'). + This specifies the Google Cloud region where the feature store resources are located. + :param feature_online_store_id: Required. The ID of the online feature store that contains + the feature view to be synchronized. This store serves as the online serving layer. + :param gcp_conn_id: The connection ID to use for connecting to Google Cloud Platform. + Defaults to 'google_cloud_default'. + :param impersonation_chain: Optional service account to impersonate using short-term + credentials. Can be either a single account or a chain of accounts required to + get the access_token of the last account in the list, which will be impersonated + in the request. If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. If set as a sequence, the identities + from the list must grant Service Account Token Creator IAM role to the directly + preceding identity, with first account from the list granting this role to the + originating account. + """ + + template_fields: Sequence[str] = ( + "project_id", + "location", + "feature_online_store_id", + ) + + def __init__( + self, + *, + project_id: str, + location: str, + feature_online_store_id: str, + timeout: float | _MethodDefault = DEFAULT, + retry: Retry | _MethodDefault | None = DEFAULT, + metadata: Sequence[tuple[str, str]] = (), + gcp_conn_id: str = "google_cloud_default", + impersonation_chain: str | Sequence[str] | None = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + + self.project_id = project_id + self.location = location + self.feature_online_store_id = feature_online_store_id + self.timeout = timeout + self.retry = retry + self.metadata = metadata + self.gcp_conn_id = gcp_conn_id + self.impersonation_chain = impersonation_chain + + def execute(self, context: Context) -> dict[str, Any]: + """Execute the get feature view sync operation.""" + hook = FeatureStoreHook( + gcp_conn_id=self.gcp_conn_id, + impersonation_chain=self.impersonation_chain, + ) + self.log.info("Get the Feature Online Store id: %s...", self.feature_online_store_id) + try: + result = hook.get_feature_online_store( + project_id=self.project_id, + location=self.location, + feature_online_store_id=self.feature_online_store_id, + timeout=self.timeout, + retry=self.retry, + metadata=self.metadata, + ) + except GoogleAPICallError as ex: + exc_msg = f"Google API error getting {self.feature_online_store_id} Feature Online Store instance" + raise AirflowException(exc_msg) from ex + + result = type(result).to_dict(result) + self.log.info("The Feature Online Store has been retrieved: %s", self.feature_online_store_id) + return result + + +class CreateFeatureViewOperator(GoogleCloudBaseOperator, OperationHelper): + """ + Create request for Feature View creation. + + This method initiates VertexAI Feature View request for the existing Feature Online Store. + Feature View represents features and data according to the source provided. + + :param feature_view_id: The ID to use for the FeatureView, which will become the final component + of the FeatureView's resource name. This value may be up to 60 characters, and valid characters + are ``[a-z0-9_]``. The first character cannot be a number. + The value must be unique within a FeatureOnlineStore. + :param feature_view: The configuration of the FeatureView to create. + :param feature_online_store_id: The ID of the online feature store. + :param run_sync_immediately: If set to true, one on demand sync will be run + immediately, regardless the FeatureView.sync_config. + :param project_id: Required. The ID of the Google Cloud project that contains the feature store. + This is used to identify which project's resources to interact with. + :param location: Required. The location of the feature store (e.g., 'us-central1', 'us-east1'). + This specifies the Google Cloud region where the feature store resources are located. + :param gcp_conn_id: The connection ID to use for connecting to Google Cloud Platform. + Defaults to 'google_cloud_default'. + :param impersonation_chain: Optional service account to impersonate using short-term + credentials. Can be either a single account or a chain of accounts required to + get the access_token of the last account in the list, which will be impersonated + in the request. If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. If set as a sequence, the identities + from the list must grant Service Account Token Creator IAM role to the directly + preceding identity, with first account from the list granting this role to the + originating account. + """ + + template_fields: Sequence[str] = ( + "project_id", + "location", + "feature_online_store_id", + ) + + def __init__( + self, + *, + feature_view_id: str, + feature_view: FeatureView, + feature_online_store_id: str, + run_sync_immediately: bool = False, + project_id: str, + location: str, + timeout: float | _MethodDefault = DEFAULT, + retry: Retry | _MethodDefault | None = DEFAULT, + metadata: Sequence[tuple[str, str]] = (), + gcp_conn_id: str = "google_cloud_default", + impersonation_chain: str | Sequence[str] | None = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.project_id = project_id + self.location = location + self.feature_view_id = feature_view_id + self.feature_view = feature_view + self.run_sync_immediately = run_sync_immediately + self.feature_online_store_id = feature_online_store_id + self.timeout = timeout + self.retry = retry + self.metadata = metadata + self.gcp_conn_id = gcp_conn_id + self.impersonation_chain = impersonation_chain + + def execute(self, context: Context) -> dict[str, Any]: + """Execute the get feature view sync operation.""" + hook = FeatureStoreHook( + gcp_conn_id=self.gcp_conn_id, + impersonation_chain=self.impersonation_chain, + ) + self.log.info("Creating the Online Store Feature View...") + result_operation = hook.create_feature_view( + project_id=self.project_id, + location=self.location, + feature_view_id=self.feature_view_id, + feature_view=self.feature_view, + feature_online_store_id=self.feature_online_store_id, + run_sync_immediately=self.run_sync_immediately, + timeout=self.timeout, + retry=self.retry, + metadata=self.metadata, + ) + op_result = self.wait_for_operation_result(operation=result_operation) + self.log.info("The Online Store Feature View has been created: %s", self.feature_online_store_id) + result = type(op_result).to_dict(op_result) + return result + + class GetFeatureViewSyncOperator(GoogleCloudBaseOperator): """ Retrieve the status and details of a Feature View synchronization operation. @@ -161,3 +425,270 @@ def execute(self, context: Context) -> dict[str, Any]: self.log.info(response) return response + + +class FetchFeatureValuesOperator(GoogleCloudBaseOperator, OperationHelper): + """ + Fetch features data from the Feature View provided. + + This method fetches data from existing Feature view, filtered by provided (or default) data_key. + Helps to retrieve actual features data hosted in the VertexAI Feature Store. + + :param entity_id: Simple ID to identify Entity to fetch feature values for. + :param feature_view_id: The FeatureView ID to fetch data from. + :param feature_online_store_id: The ID of the online feature store. + :param data_key: The request key to fetch feature values for. + :param project_id: Required. The ID of the Google Cloud project that contains the feature store. + This is used to identify which project's resources to interact with. + :param location: Required. The location of the feature store (e.g., 'us-central1', 'us-east1'). + This specifies the Google Cloud region where the feature store resources are located. + :param gcp_conn_id: The connection ID to use for connecting to Google Cloud Platform. + Defaults to 'google_cloud_default'. + :param impersonation_chain: Optional service account to impersonate using short-term + credentials. Can be either a single account or a chain of accounts required to + get the access_token of the last account in the list, which will be impersonated + in the request. If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. If set as a sequence, the identities + from the list must grant Service Account Token Creator IAM role to the directly + preceding identity, with first account from the list granting this role to the + originating account. + """ + + template_fields: Sequence[str] = ( + "project_id", + "location", + "feature_online_store_id", + "feature_view_id", + "entity_id", + ) + + def __init__( + self, + *, + feature_view_id: str, + feature_online_store_id: str, + project_id: str, + location: str, + entity_id: str | None = None, + data_key: FeatureViewDataKey | None = None, + timeout: float | _MethodDefault = DEFAULT, + retry: Retry | _MethodDefault | None = DEFAULT, + metadata: Sequence[tuple[str, str]] = (), + gcp_conn_id: str = "google_cloud_default", + impersonation_chain: str | Sequence[str] | None = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.project_id = project_id + self.location = location + self.entity_id = entity_id + self.feature_view_id = feature_view_id + self.feature_online_store_id = feature_online_store_id + self.data_key = data_key + self.timeout = timeout + self.retry = retry + self.metadata = metadata + self.gcp_conn_id = gcp_conn_id + self.impersonation_chain = impersonation_chain + + def execute(self, context: Context) -> dict[str, Any]: + """Execute the get feature view sync operation.""" + hook = FeatureStoreHook( + gcp_conn_id=self.gcp_conn_id, + impersonation_chain=self.impersonation_chain, + ) + try: + feature_online_store = hook.get_feature_online_store( + feature_online_store_id=self.feature_online_store_id, + project_id=self.project_id, + location=self.location, + ) + public_domain_name = hook._get_featurestore_public_endpoint(feature_online_store) + except GoogleAPICallError as ex: + exc_msg = f"Google API error getting {self.feature_online_store_id} Feature Online Store instance" + raise AirflowException(exc_msg) from ex + + self.log.info( + "Fetching data from the Feature View %s, Online Feature Store %s.", + self.feature_view_id, + self.feature_online_store_id, + ) + request_result = hook.fetch_feature_values( + project_id=self.project_id, + location=self.location, + endpoint_domain_name=public_domain_name, + entity_id=self.entity_id, + feature_view_id=self.feature_view_id, + feature_online_store_id=self.feature_online_store_id, + data_key=self.data_key, + data_format=FeatureViewDataFormat.KEY_VALUE, + timeout=self.timeout, + retry=self.retry, + metadata=self.metadata, + ) + self.log.info( + "Fetching data from the Feature View %s, Online Feature Store %s. is finished.", + self.feature_view_id, + self.feature_online_store_id, + ) + result = type(request_result).to_dict(request_result) + return result + + +class DeleteFeatureOnlineStoreOperator(GoogleCloudBaseOperator, OperationHelper): + """ + Delete the Feature Online store. + + This method initiates VertexAI Feature Online Store deletion request. + There should be no FeatureViews to be deleted successfully. + + + :param project_id: Required. The ID of the Google Cloud project that contains the feature store. + This is used to identify which project's resources to interact with. + :param location: Required. The location of the feature store (e.g., 'us-central1', 'us-east1'). + This specifies the Google Cloud region where the feature store resources are located. + :param feature_online_store_id: Required. The ID of the online feature store that contains + the feature view to be synchronized. This store serves as the online serving layer. + :param force: If set to true, any FeatureViews and Features for this FeatureOnlineStore + will also be deleted. + :param gcp_conn_id: The connection ID to use for connecting to Google Cloud Platform. + Defaults to 'google_cloud_default'. + :param impersonation_chain: Optional service account to impersonate using short-term + credentials. Can be either a single account or a chain of accounts required to + get the access_token of the last account in the list, which will be impersonated + in the request. If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. If set as a sequence, the identities + from the list must grant Service Account Token Creator IAM role to the directly + preceding identity, with first account from the list granting this role to the + originating account. + """ + + template_fields: Sequence[str] = ( + "project_id", + "location", + "feature_online_store_id", + ) + + def __init__( + self, + *, + project_id: str, + location: str, + feature_online_store_id: str, + force: bool = False, + timeout: float | _MethodDefault = DEFAULT, + retry: Retry | _MethodDefault | None = DEFAULT, + metadata: Sequence[tuple[str, str]] = (), + gcp_conn_id: str = "google_cloud_default", + impersonation_chain: str | Sequence[str] | None = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.project_id = project_id + self.location = location + self.feature_online_store_id = feature_online_store_id + self.force = force + self.timeout = timeout + self.retry = retry + self.metadata = metadata + self.gcp_conn_id = gcp_conn_id + self.impersonation_chain = impersonation_chain + + def execute(self, context: Context) -> dict[str, Any]: + """Execute the get feature view sync operation.""" + hook = FeatureStoreHook( + gcp_conn_id=self.gcp_conn_id, + impersonation_chain=self.impersonation_chain, + ) + self.log.info("Deleting the Feature Online Store...") + + result_operation = hook.delete_feature_online_store( + project_id=self.project_id, + location=self.location, + feature_online_store_id=self.feature_online_store_id, + force=self.force, + timeout=self.timeout, + retry=self.retry, + metadata=self.metadata, + ) + self.wait_for_operation_result(operation=result_operation) + self.log.info("The Feature Online Store deletion has been complete: %s", self.feature_online_store_id) + + return {"result": f"The {self.feature_online_store_id} has been deleted."} + + +class DeleteFeatureViewOperator(GoogleCloudBaseOperator, OperationHelper): + """ + Delete the Feature View. + + This method deletes the Feature View from the Feature Online Store. + + :param project_id: Required. The ID of the Google Cloud project that contains the feature store. + This is used to identify which project's resources to interact with. + :param location: Required. The location of the feature store (e.g., 'us-central1', 'us-east1'). + This specifies the Google Cloud region where the feature store resources are located. + :param feature_online_store_id: Required. The ID of the online feature store that contains + the feature view to be synchronized. This store serves as the online serving layer. + :param gcp_conn_id: The connection ID to use for connecting to Google Cloud Platform. + Defaults to 'google_cloud_default'. + :param impersonation_chain: Optional service account to impersonate using short-term + credentials. Can be either a single account or a chain of accounts required to + get the access_token of the last account in the list, which will be impersonated + in the request. If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. If set as a sequence, the identities + from the list must grant Service Account Token Creator IAM role to the directly + preceding identity, with first account from the list granting this role to the + originating account. + """ + + template_fields: Sequence[str] = ( + "project_id", + "location", + "feature_online_store_id", + ) + + def __init__( + self, + *, + project_id: str, + location: str, + feature_online_store_id: str, + feature_view_id: str, + timeout: float | _MethodDefault = DEFAULT, + retry: Retry | _MethodDefault | None = DEFAULT, + metadata: Sequence[tuple[str, str]] = (), + gcp_conn_id: str = "google_cloud_default", + impersonation_chain: str | Sequence[str] | None = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.project_id = project_id + self.location = location + self.feature_online_store_id = feature_online_store_id + self.feature_view_id = feature_view_id + self.timeout = timeout + self.retry = retry + self.metadata = metadata + self.gcp_conn_id = gcp_conn_id + self.impersonation_chain = impersonation_chain + + def execute(self, context: Context) -> dict[str, Any]: + """Execute the get feature view sync operation.""" + hook = FeatureStoreHook( + gcp_conn_id=self.gcp_conn_id, + impersonation_chain=self.impersonation_chain, + ) + self.log.info("Deleting the Feature View %s ... ", self.feature_view_id) + result_operation = hook.delete_feature_view( + project_id=self.project_id, + location=self.location, + feature_online_store_id=self.feature_online_store_id, + feature_view_id=self.feature_view_id, + timeout=self.timeout, + retry=self.retry, + metadata=self.metadata, + ) + self.wait_for_operation_result(operation=result_operation) + self.log.info("The Feature View deletion has been complete: %s", self.feature_view_id) + + return {"result": f"The {self.feature_view_id} has been deleted."} diff --git a/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_feature_store.py b/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_feature_store.py index 3d0794fa85cbd..7a2252d73c182 100644 --- a/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_feature_store.py +++ b/providers/google/tests/system/google/cloud/vertex_ai/example_vertex_ai_feature_store.py @@ -23,30 +23,134 @@ from __future__ import annotations import os -from datetime import datetime +from datetime import datetime, timedelta + +from google.cloud.aiplatform_v1beta1 import FeatureOnlineStore, FeatureView, FeatureViewDataKey from airflow import DAG +from airflow.providers.google.cloud.operators.bigquery import ( + BigQueryCreateEmptyDatasetOperator, + BigQueryCreateTableOperator, + BigQueryDeleteDatasetOperator, +) from airflow.providers.google.cloud.operators.vertex_ai.feature_store import ( + CreateFeatureOnlineStoreOperator, + CreateFeatureViewOperator, + DeleteFeatureOnlineStoreOperator, + DeleteFeatureViewOperator, + FetchFeatureValuesOperator, + GetFeatureOnlineStoreOperator, GetFeatureViewSyncOperator, SyncFeatureViewOperator, ) from airflow.providers.google.cloud.sensors.vertex_ai.feature_store import FeatureViewSyncSensor +from airflow.utils.trigger_rule import TriggerRule PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default") +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default") DAG_ID = "vertex_ai_feature_store_dag" REGION = "us-central1" -FEATURE_ONLINE_STORE_ID = "my_feature_online_store_unique" -FEATURE_VIEW_ID = "feature_view_publications" +BQ_LOCATION = "US" +BQ_DATASET_ID = "bq_ds_featurestore_demo" +BQ_VIEW_ID = "product_features_view" +BQ_VIEW_FQN = f"{PROJECT_ID}.{BQ_DATASET_ID}.{BQ_VIEW_ID}" + +FEATURE_ONLINE_STORE_ID = f"my_feature_online_store_unique_{ENV_ID}" +FEATURE_VIEW_ID = "feature_view_product" +FEATURE_VIEW_DATA_KEY = {"key": "28098"} + +FEATURE_EXTRACT_QUERY = """ + WITH + product_order_agg AS ( + SELECT cast(product_id as string) as entity_id, + countif(status in ("Shipped", "Complete")) as good_order_count, + countif(status in ("Returned", "Cancelled")) as bad_order_count + FROM `bigquery-public-data.thelook_ecommerce.order_items` + WHERE + timestamp_trunc(created_at, day) >= timestamp_trunc(timestamp_sub(CURRENT_TIMESTAMP(), interval 30 day), day) and + timestamp_trunc(created_at, day) < timestamp_trunc(CURRENT_TIMESTAMP(), day) + group by 1 + order by entity_id), + product_basic AS ( + SELECT cast(id as string) AS entity_id, + lower(name) as name, + lower(category) as category, + lower(brand) as brand, + cost, + retail_price + FROM bigquery-public-data.thelook_ecommerce.products) + SELECT *, current_timestamp() as feature_timestamp + FROM product_basic + LEFT OUTER JOIN product_order_agg + USING (entity_id) + """ + with DAG( dag_id=DAG_ID, description="Sample DAG with Vertex AI Feature Store operations.", schedule="@once", - start_date=datetime(2024, 1, 1), + start_date=datetime(2025, 6, 1), catchup=False, tags=["example", "vertex_ai", "feature_store"], ) as dag: + create_bq_dataset = BigQueryCreateEmptyDatasetOperator( + task_id="create_bq_dataset", + dataset_id=BQ_DATASET_ID, + project_id=PROJECT_ID, + location=BQ_LOCATION, + ) + + create_bq_table_view = BigQueryCreateTableOperator( + task_id="create_bq_table_view", + project_id=PROJECT_ID, + location=BQ_LOCATION, + dataset_id=BQ_DATASET_ID, + table_id=BQ_VIEW_ID, + table_resource={ + "view": { + "query": FEATURE_EXTRACT_QUERY, + "useLegacySql": False, + } + }, + ) + # [START how_to_cloud_vertex_ai_create_feature_online_store_operator] + create_feature_online_store = CreateFeatureOnlineStoreOperator( + task_id="create_feature_online_store", + project_id=PROJECT_ID, + location=REGION, + feature_online_store_id=FEATURE_ONLINE_STORE_ID, + feature_online_store=FeatureOnlineStore(optimized=FeatureOnlineStore.Optimized()), + ) + # [END how_to_cloud_vertex_ai_create_feature_online_store_operator] + + # [START how_to_cloud_vertex_ai_create_feature_view_store_operator] + create_feature_view = CreateFeatureViewOperator( + task_id="create_feature_view", + project_id=PROJECT_ID, + location=REGION, + feature_online_store_id=FEATURE_ONLINE_STORE_ID, + feature_view_id=FEATURE_VIEW_ID, + feature_view=FeatureView( + big_query_source=FeatureView.BigQuerySource( + uri=f"bq://{BQ_VIEW_FQN}", + entity_id_columns=["entity_id"], + ), + sync_config=FeatureView.SyncConfig(cron="TZ=America/Los_Angeles 56 * * * *"), + ), + ) + # [END how_to_cloud_vertex_ai_create_feature_view_store_operator] + + # [START how_to_cloud_vertex_ai_get_feature_online_store_operator] + get_feature_online_store = GetFeatureOnlineStoreOperator( + task_id="get_feature_online_store", + project_id=PROJECT_ID, + location=REGION, + feature_online_store_id=FEATURE_ONLINE_STORE_ID, + ) + # [END how_to_cloud_vertex_ai_get_feature_online_store_operator] + # [START how_to_cloud_vertex_ai_feature_store_sync_feature_view_operator] sync_task = SyncFeatureViewOperator( task_id="sync_task", @@ -63,7 +167,7 @@ location=REGION, feature_view_sync_name="{{ task_instance.xcom_pull(task_ids='sync_task', key='return_value')}}", poke_interval=60, # Check every minute - timeout=600, # Timeout after 10 minutes + timeout=1200, # Timeout after 20 minutes mode="reschedule", ) # [END how_to_cloud_vertex_ai_feature_store_feature_view_sync_sensor] @@ -76,7 +180,62 @@ ) # [END how_to_cloud_vertex_ai_feature_store_get_feature_view_sync_operator] - sync_task >> wait_for_sync >> get_task + # [START how_to_cloud_vertex_ai_fetch_feature_values_operator] + fetch_feature_data = FetchFeatureValuesOperator( + task_id="fetch_feature_data", + project_id=PROJECT_ID, + location=REGION, + feature_online_store_id=FEATURE_ONLINE_STORE_ID, + feature_view_id=FEATURE_VIEW_ID, + data_key=FeatureViewDataKey(FEATURE_VIEW_DATA_KEY), + retries=3, + retry_delay=timedelta(minutes=3), + ) + # [END how_to_cloud_vertex_ai_fetch_feature_values_operator] + + # [START how_to_cloud_vertex_ai_delete_feature_view_operator] + delete_feature_view = DeleteFeatureViewOperator( + task_id="delete_feature_view", + project_id=PROJECT_ID, + location=REGION, + feature_online_store_id=FEATURE_ONLINE_STORE_ID, + feature_view_id=FEATURE_VIEW_ID, + ) + # [END how_to_cloud_vertex_ai_delete_feature_view_operator] + + # [START how_to_cloud_vertex_ai_delete_feature_online_store_operator] + delete_feature_online_store = DeleteFeatureOnlineStoreOperator( + task_id="delete_feature_online_store", + project_id=PROJECT_ID, + location=REGION, + feature_online_store_id=FEATURE_ONLINE_STORE_ID, + ) + # [END how_to_cloud_vertex_ai_delete_feature_online_store_operator] + + delete_bq_dataset = BigQueryDeleteDatasetOperator( + task_id="delete_bq_dataset", + dataset_id=BQ_DATASET_ID, + delete_contents=True, + trigger_rule=TriggerRule.ALL_DONE, + ) + + # TEST SETUP + ( + create_bq_dataset + >> create_bq_table_view + # TEST BODY + >> create_feature_online_store + >> get_feature_online_store + >> create_feature_view + >> sync_task + >> wait_for_sync + >> get_task + >> fetch_feature_data + # TEST TEARDOWN + >> delete_feature_view + >> delete_feature_online_store + >> delete_bq_dataset + ) from tests_common.test_utils.watcher import watcher diff --git a/providers/google/tests/unit/google/cloud/operators/vertex_ai/test_feature_store.py b/providers/google/tests/unit/google/cloud/operators/vertex_ai/test_feature_store.py index 5340b69d72009..4443a23ace1ea 100644 --- a/providers/google/tests/unit/google/cloud/operators/vertex_ai/test_feature_store.py +++ b/providers/google/tests/unit/google/cloud/operators/vertex_ai/test_feature_store.py @@ -19,7 +19,21 @@ from unittest import mock +from google.cloud.aiplatform_v1beta1.types import ( + FeatureOnlineStore, + FeatureView, + FeatureViewDataFormat, + FeatureViewDataKey, + FetchFeatureValuesResponse, +) + from airflow.providers.google.cloud.operators.vertex_ai.feature_store import ( + CreateFeatureOnlineStoreOperator, + CreateFeatureViewOperator, + DeleteFeatureOnlineStoreOperator, + DeleteFeatureViewOperator, + FetchFeatureValuesOperator, + GetFeatureOnlineStoreOperator, GetFeatureViewSyncOperator, SyncFeatureViewOperator, ) @@ -33,7 +47,17 @@ IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"] FEATURE_ONLINE_STORE_ID = "test-store" FEATURE_VIEW_ID = "test-view" -FEATURE_VIEW_SYNC_NAME = f"projects/{GCP_PROJECT}/locations/{GCP_LOCATION}/featureOnlineStores/{FEATURE_ONLINE_STORE_ID}/featureViews/{FEATURE_VIEW_ID}/featureViewSyncs/sync123" +FEATURE_ONLINE_STORE_NAME = ( + f"projects/{GCP_PROJECT}/locations/{GCP_LOCATION}/featureOnlineStores/{FEATURE_ONLINE_STORE_ID}" +) +FEATURE_VIEW_SYNC_NAME = ( + f"projects/{GCP_PROJECT}/locations/{GCP_LOCATION}/featureOnlineStores/" + f"{FEATURE_ONLINE_STORE_ID}/featureViews/{FEATURE_VIEW_ID}/featureViewSyncs/sync123" +) +FEATURE_VIEW_NAME = ( + f"projects/{GCP_PROJECT}/locations/{GCP_LOCATION}/featureOnlineStores/" + f"{FEATURE_ONLINE_STORE_ID}/featureViews/{FEATURE_VIEW_ID}" +) class TestSyncFeatureViewOperator: @@ -42,10 +66,8 @@ def test_execute(self, mock_hook_class): # Create the mock hook and set up its return value mock_hook = mock.MagicMock() mock_hook_class.return_value = mock_hook - # Set up the return value for sync_feature_view to match the hook implementation mock_hook.sync_feature_view.return_value = FEATURE_VIEW_SYNC_NAME - op = SyncFeatureViewOperator( task_id=TASK_ID, project_id=GCP_PROJECT, @@ -57,13 +79,11 @@ def test_execute(self, mock_hook_class): ) response = op.execute(context={"ti": mock.MagicMock()}) - # Verify hook initialization mock_hook_class.assert_called_once_with( gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN, ) - # Verify hook method call mock_hook.sync_feature_view.assert_called_once_with( project_id=GCP_PROJECT, @@ -71,7 +91,6 @@ def test_execute(self, mock_hook_class): feature_online_store_id=FEATURE_ONLINE_STORE_ID, feature_view_id=FEATURE_VIEW_ID, ) - # Verify response matches expected value assert response == FEATURE_VIEW_SYNC_NAME @@ -82,17 +101,14 @@ def test_execute(self, mock_hook_class): # Create the mock hook and set up expected response mock_hook = mock.MagicMock() mock_hook_class.return_value = mock_hook - expected_response = { "name": FEATURE_VIEW_SYNC_NAME, "start_time": 1000, "end_time": 2000, "sync_summary": {"row_synced": 500, "total_slot": 4}, } - # Set up the return value for get_feature_view_sync to match the hook implementation mock_hook.get_feature_view_sync.return_value = expected_response - op = GetFeatureViewSyncOperator( task_id=TASK_ID, location=GCP_LOCATION, @@ -100,20 +116,295 @@ def test_execute(self, mock_hook_class): gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN, ) - response = op.execute(context={"ti": mock.MagicMock()}) - # Verify hook initialization mock_hook_class.assert_called_once_with( gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN, ) - # Verify hook method call mock_hook.get_feature_view_sync.assert_called_once_with( location=GCP_LOCATION, feature_view_sync_name=FEATURE_VIEW_SYNC_NAME, ) - # Verify response matches expected structure assert response == expected_response + + +class TestCreateFeatureOnlineStoreOperator: + @mock.patch(VERTEX_AI_PATH.format("feature_store.FeatureStoreHook")) + def test_execute(self, mock_hook_class): + FEATURE_ONLINE_STORE_CONF = FeatureOnlineStore({"name": FEATURE_ONLINE_STORE_NAME, "optimized": {}}) + sample_result = { + "etag": "", + "labels": {}, + "name": FEATURE_ONLINE_STORE_NAME, + "optimized": {}, + "satisfies_pzi": False, + "satisfies_pzs": False, + "state": 0, + } + mock_hook = mock.MagicMock() + mock_hook_class.return_value = mock_hook + # Set up the return value for hook method to match the hook implementation + sample_operation = mock.MagicMock() + sample_operation.result.return_value = FEATURE_ONLINE_STORE_CONF + mock_hook.create_feature_online_store.return_value = sample_operation + mock_hook.return_value.wait_for_operation_result.side_effect = lambda operation: operation.result() + common_kwargs = { + "project_id": GCP_PROJECT, + "location": GCP_LOCATION, + "feature_online_store_id": FEATURE_ONLINE_STORE_ID, + "feature_online_store": FEATURE_ONLINE_STORE_CONF, + "metadata": (), + "timeout": 100, + "retry": None, + } + op = CreateFeatureOnlineStoreOperator( + task_id=TASK_ID, + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + **common_kwargs, + ) + result = op.execute(context={"ti": mock.MagicMock()}) + # Verify hook initialization + mock_hook_class.assert_called_once_with( + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + # Verify hook method call + mock_hook.create_feature_online_store.assert_called_once_with(**common_kwargs) + # Verify result matches expected value + assert result == sample_result + + +class TestCreateFeatureViewOperator: + @mock.patch(VERTEX_AI_PATH.format("feature_store.FeatureStoreHook")) + def test_execute(self, mock_hook_class): + feature_view_conf_params = { + "big_query_source": FeatureView.BigQuerySource( + uri="bq://{BQ_TABLE}", + entity_id_columns=["entity_id"], + ), + "sync_config": FeatureView.SyncConfig(cron="TZ=Europe/London 56 * * * *"), + } + FEATURE_VIEW_CONF = FeatureView(**feature_view_conf_params) + + sample_result = { + "big_query_source": {"entity_id_columns": ["entity_id"], "uri": "bq://{BQ_TABLE}"}, + "etag": "", + "labels": {}, + "name": "projects/test-project/locations/us-central1/featureOnlineStores/test-store" + "/featureViews/test-view", + "satisfies_pzi": False, + "satisfies_pzs": False, + "service_account_email": "", + "service_agent_type": 0, + "sync_config": {"cron": "TZ=Europe/London 56 * * * *"}, + } + mock_hook = mock.MagicMock() + mock_hook_class.return_value = mock_hook + # Set up the return value for hook method to match the hook implementation + sample_operation = mock.MagicMock() + sample_operation.result.return_value = FeatureView( + {**feature_view_conf_params, **{"name": FEATURE_VIEW_NAME}} + ) + mock_hook.create_feature_view.return_value = sample_operation + mock_hook.return_value.wait_for_operation_result.side_effect = lambda operation: operation.result() + common_kwargs = { + "project_id": GCP_PROJECT, + "location": GCP_LOCATION, + "feature_online_store_id": FEATURE_ONLINE_STORE_ID, + "feature_view_id": FEATURE_VIEW_ID, + "feature_view": FEATURE_VIEW_CONF, + "run_sync_immediately": False, + "metadata": (), + "timeout": 100, + "retry": None, + } + op = CreateFeatureViewOperator( + task_id=TASK_ID, + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + **common_kwargs, + ) + result = op.execute(context={"ti": mock.MagicMock()}) + # Verify hook initialization + mock_hook_class.assert_called_once_with( + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + # Verify hook method call + mock_hook.create_feature_view.assert_called_once_with(**common_kwargs) + # Verify result matches expected value + assert result == sample_result + + +class TestFetchFeatureValuesOperator: + @mock.patch(VERTEX_AI_PATH.format("feature_store.FeatureStoreHook")) + def test_execute(self, mock_hook_class): + ENTITY_ID = "entity-id" + FEATURE_VIEW_DATA_KEY = {"key": "28098"} + sample_result = { + "key_values": { + "features": [ + {"name": "brand", "value": {"string_value": "rip curl"}}, + {"name": "cost", "value": {"double_value": 36.56684834767282}}, + {"name": "feature_timestamp", "value": {"int64_value": "1750151356612667"}}, + ] + } + } + mock_hook = mock.MagicMock() + mock_hook_class.return_value = mock_hook + # Set up the return value for hook method to match the hook implementation + mock_hook.get_feature_online_store = mock.MagicMock() + PUBLIC_DOMAIN_NAME = "public.domain.url" + get_public_domain_mock = mock.MagicMock() + get_public_domain_mock.return_value = PUBLIC_DOMAIN_NAME + mock_hook._get_featurestore_public_endpoint = get_public_domain_mock + mock_hook.fetch_feature_values.return_value = FetchFeatureValuesResponse(sample_result) + common_kwargs = { + "project_id": GCP_PROJECT, + "location": GCP_LOCATION, + "feature_online_store_id": FEATURE_ONLINE_STORE_ID, + "feature_view_id": FEATURE_VIEW_ID, + "entity_id": ENTITY_ID, + "data_key": FeatureViewDataKey(FEATURE_VIEW_DATA_KEY), + "metadata": (), + "timeout": 100, + "retry": None, + } + op = FetchFeatureValuesOperator( + task_id=TASK_ID, + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + **common_kwargs, + ) + result = op.execute(context={"ti": mock.MagicMock()}) + # Verify hook initialization + mock_hook_class.assert_called_once_with( + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + # Verify hook method call + mock_hook.fetch_feature_values.assert_called_once_with( + data_format=FeatureViewDataFormat.KEY_VALUE, + endpoint_domain_name=PUBLIC_DOMAIN_NAME, + **common_kwargs, + ) + # Verify result matches expected value + assert result == sample_result + + +class TestGetFeatureOnlineStoreOperator: + @mock.patch(VERTEX_AI_PATH.format("feature_store.FeatureStoreHook")) + def test_execute(self, mock_hook_class): + # Create the mock hook and set up its return value + mock_hook = mock.MagicMock() + mock_hook_class.return_value = mock_hook + # Set up the return value for get_feature_online_store to match the hook implementation + SAMPLE_RESPONSE = { + "etag": "", + "labels": {}, + "name": FEATURE_ONLINE_STORE_ID, + "satisfies_pzi": False, + "satisfies_pzs": False, + "state": 0, + } + mock_hook.get_feature_online_store.return_value = FeatureOnlineStore(SAMPLE_RESPONSE) + common_kwargs = { + "project_id": GCP_PROJECT, + "location": GCP_LOCATION, + "feature_online_store_id": FEATURE_ONLINE_STORE_ID, + "metadata": (), + "timeout": 100, + "retry": None, + } + op = GetFeatureOnlineStoreOperator( + task_id=TASK_ID, + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + **common_kwargs, + ) + response = op.execute(context={"ti": mock.MagicMock()}) + # Verify hook initialization + mock_hook_class.assert_called_once_with( + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + # Verify hook method call + mock_hook.get_feature_online_store.assert_called_once_with(**common_kwargs) + # Verify response matches expected value + assert response == SAMPLE_RESPONSE + + +class TestDeleteFeatureOnlineStoreOperator: + @mock.patch(VERTEX_AI_PATH.format("feature_store.FeatureStoreHook")) + def test_execute(self, mock_hook_class): + # Create the mock hook and set up its return value + mock_hook = mock.MagicMock() + mock_hook_class.return_value = mock_hook + sample_operation = mock.MagicMock() + mock_hook.delete_feature_online_store.return_value = sample_operation + mock_hook.return_value.wait_for_operation_result.side_effect = lambda operation: operation.result() + common_kwargs = { + "project_id": GCP_PROJECT, + "location": GCP_LOCATION, + "feature_online_store_id": FEATURE_ONLINE_STORE_ID, + "force": False, + "metadata": (), + "timeout": 100, + "retry": None, + } + op = DeleteFeatureOnlineStoreOperator( + task_id=TASK_ID, + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + **common_kwargs, + ) + response = op.execute(context={"ti": mock.MagicMock()}) + # Verify hook initialization + mock_hook_class.assert_called_once_with( + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + # Verify hook method call + mock_hook.delete_feature_online_store.assert_called_once_with(**common_kwargs) + # Verify response matches expected value + assert response == {"result": f"The {FEATURE_ONLINE_STORE_ID} has been deleted."} + + +class TestDeleteFeatureViewOperator: + @mock.patch(VERTEX_AI_PATH.format("feature_store.FeatureStoreHook")) + def test_execute(self, mock_hook_class): + # Create the mock hook and set up its return value + mock_hook = mock.MagicMock() + mock_hook_class.return_value = mock_hook + sample_operation = mock.MagicMock() + mock_hook.delete_feature_view.return_value = sample_operation + mock_hook.return_value.wait_for_operation_result.side_effect = lambda operation: operation.result() + common_kwargs = { + "project_id": GCP_PROJECT, + "location": GCP_LOCATION, + "feature_online_store_id": FEATURE_ONLINE_STORE_ID, + "feature_view_id": FEATURE_VIEW_ID, + "metadata": (), + "timeout": 100, + "retry": None, + } + op = DeleteFeatureViewOperator( + task_id=TASK_ID, + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + **common_kwargs, + ) + response = op.execute(context={"ti": mock.MagicMock()}) + # Verify hook initialization + mock_hook_class.assert_called_once_with( + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + # Verify hook method call + mock_hook.delete_feature_view.assert_called_once_with(**common_kwargs) + # Verify response matches expected value + assert response == {"result": f"The {FEATURE_VIEW_ID} has been deleted."}