From fa5551bade595f2b5a84e49de4a530ec34721ab1 Mon Sep 17 00:00:00 2001 From: Matt Delacour Date: Mon, 28 Jun 2021 15:46:57 -0400 Subject: [PATCH] Use RetrievalJob instead of creating a new OfflineJob object Signed-off-by: Matt Delacour --- .../feast/infra/offline_stores/bigquery.py | 27 +++++------------ sdk/python/feast/infra/offline_stores/file.py | 30 ++++++------------- .../infra/offline_stores/offline_store.py | 10 ++----- .../feast/infra/offline_stores/redshift.py | 4 +-- 4 files changed, 21 insertions(+), 50 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index d41c92f8cc..0c961a5048 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -15,11 +15,7 @@ from feast.data_source import BigQuerySource, DataSource from feast.errors import FeastProviderLoginError from feast.feature_view import FeatureView -from feast.infra.offline_stores.offline_store import ( - OfflineJob, - OfflineStore, - RetrievalJob, -) +from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob from feast.infra.provider import ( DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL, _get_requested_feature_views_to_features_dict, @@ -63,7 +59,7 @@ def pull_latest_from_table_or_query( created_timestamp_column: Optional[str], start_date: datetime, end_date: datetime, - ) -> OfflineJob: + ) -> RetrievalJob: assert isinstance(data_source, BigQuerySource) from_expression = data_source.get_table_query_string() @@ -78,6 +74,7 @@ def pull_latest_from_table_or_query( timestamp_desc_string = " DESC, ".join(timestamps) + " DESC" field_string = ", ".join(join_key_columns + feature_name_columns + timestamps) + client = _get_bigquery_client(project=config.offline_store.project_id) query = f""" SELECT {field_string} FROM ( @@ -88,8 +85,7 @@ def pull_latest_from_table_or_query( ) WHERE _feast_row = 1 """ - - return BigQueryOfflineJob(query=query, config=config) + return BigQueryRetrievalJob(query=query, client=client, config=config) @staticmethod def get_historical_features( @@ -225,15 +221,6 @@ def _infer_event_timestamp_from_dataframe(entity_df: pandas.DataFrame) -> str: ) -class BigQueryOfflineJob(OfflineJob): - def __init__(self, query: str, config: RepoConfig): - self.query = query - self.client = _get_bigquery_client(project=config.offline_store.project_id) - - def to_table(self) -> pyarrow.Table: - return self.client.query(self.query).to_arrow() - - class BigQueryRetrievalJob(RetrievalJob): def __init__(self, query, client, config): self.query = query @@ -269,9 +256,6 @@ def _block_until_done(): if not job_config: today = date.today().strftime("%Y%m%d") rand_id = str(uuid.uuid4())[:7] - dataset_project = ( - self.config.offline_store.project_id or self.client.project - ) path = f"{self.client.project}.{self.config.offline_store.dataset}.historical_{today}_{rand_id}" job_config = bigquery.QueryJobConfig(destination=path) @@ -291,6 +275,9 @@ def _block_until_done(): print(f"Done writing to '{job_config.destination}'.") return str(job_config.destination) + def to_table(self) -> pyarrow.Table: + return self.client.query(self.query).to_arrow() + @dataclass(frozen=True) class FeatureViewQueryContext: diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index bb81bf842b..c61162f81f 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -9,11 +9,7 @@ from feast.data_source import DataSource, FileSource from feast.errors import FeastJoinKeysDuringMaterialization from feast.feature_view import FeatureView -from feast.infra.offline_stores.offline_store import ( - OfflineJob, - OfflineStore, - RetrievalJob, -) +from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob from feast.infra.provider import ( DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL, _get_requested_feature_views_to_features_dict, @@ -30,19 +26,6 @@ class FileOfflineStoreConfig(FeastConfigBaseModel): """ Offline store type selector""" -class FileOfflineJob(OfflineJob): - def __init__(self, evaluation_function: Callable): - """Initialize a lazy historical retrieval job""" - - # The evaluation function executes a stored procedure to compute a historical retrieval. - self.evaluation_function = evaluation_function - - def to_table(self): - # Only execute the evaluation function to build the final historical retrieval dataframe at the last moment. - df = self.evaluation_function() - return pyarrow.Table.from_pandas(df) - - class FileRetrievalJob(RetrievalJob): def __init__(self, evaluation_function: Callable): """Initialize a lazy historical retrieval job""" @@ -55,6 +38,11 @@ def to_df(self): df = self.evaluation_function() return df + def to_table(self): + # Only execute the evaluation function to build the final historical retrieval dataframe at the last moment. + df = self.evaluation_function() + return pyarrow.Table.from_pandas(df) + class FileOfflineStore(OfflineStore): @staticmethod @@ -65,7 +53,7 @@ def get_historical_features( entity_df: Union[pd.DataFrame, str], registry: Registry, project: str, - ) -> FileRetrievalJob: + ) -> RetrievalJob: if not isinstance(entity_df, pd.DataFrame): raise ValueError( f"Please provide an entity_df of type {type(pd.DataFrame)} instead of type {type(entity_df)}" @@ -230,7 +218,7 @@ def pull_latest_from_table_or_query( created_timestamp_column: Optional[str], start_date: datetime, end_date: datetime, - ) -> FileOfflineJob: + ) -> RetrievalJob: assert isinstance(data_source, FileSource) # Create lazy function that is only called from the RetrievalJob object @@ -274,4 +262,4 @@ def evaluate_offline_job(): ) return last_values_df[columns_to_extract] - return FileOfflineJob(evaluation_function=evaluate_offline_job) + return FileRetrievalJob(evaluation_function=evaluate_offline_job) diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py index acd0b78f8a..6e2394b44b 100644 --- a/sdk/python/feast/infra/offline_stores/offline_store.py +++ b/sdk/python/feast/infra/offline_stores/offline_store.py @@ -28,17 +28,13 @@ class RetrievalJob(ABC): """RetrievalJob is used to manage the execution of a historical feature retrieval""" @abstractmethod - def to_df(self): + def to_df(self) -> pd.DataFrame: """Return dataset as Pandas DataFrame synchronously""" pass - -class OfflineJob(ABC): - """OfflineJob is used to manage the execution of a specific logic of the offline store""" - @abstractmethod def to_table(self) -> pyarrow.Table: - """Return dataset as Pandas DataFrame synchronously""" + """Return dataset as pyarrow Table synchronously""" pass @@ -59,7 +55,7 @@ def pull_latest_from_table_or_query( created_timestamp_column: Optional[str], start_date: datetime, end_date: datetime, - ) -> OfflineJob: + ) -> RetrievalJob: """ Note that join_key_columns, feature_name_columns, event_timestamp_column, and created_timestamp_column have all already been mapped to column names of the source table and those column names are the values passed diff --git a/sdk/python/feast/infra/offline_stores/redshift.py b/sdk/python/feast/infra/offline_stores/redshift.py index 06a437564a..f15b9af451 100644 --- a/sdk/python/feast/infra/offline_stores/redshift.py +++ b/sdk/python/feast/infra/offline_stores/redshift.py @@ -2,7 +2,6 @@ from typing import List, Optional, Union import pandas as pd -import pyarrow from pydantic import StrictStr from pydantic.typing import Literal @@ -38,6 +37,7 @@ class RedshiftOfflineStoreConfig(FeastConfigBaseModel): class RedshiftOfflineStore(OfflineStore): @staticmethod def pull_latest_from_table_or_query( + config: RepoConfig, data_source: DataSource, join_key_columns: List[str], feature_name_columns: List[str], @@ -45,7 +45,7 @@ def pull_latest_from_table_or_query( created_timestamp_column: Optional[str], start_date: datetime, end_date: datetime, - ) -> pyarrow.Table: + ) -> RetrievalJob: pass @staticmethod