Skip to content

Commit

Permalink
Use RetrievalJob instead of creating a new OfflineJob object
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Delacour <matt.delacour@shopify.com>
  • Loading branch information
Matt Delacour committed Jun 28, 2021
1 parent 9df3864 commit fa5551b
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 50 deletions.
27 changes: 7 additions & 20 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,7 @@
from feast.data_source import BigQuerySource, DataSource
from feast.errors import FeastProviderLoginError
from feast.feature_view import FeatureView
from feast.infra.offline_stores.offline_store import (
OfflineJob,
OfflineStore,
RetrievalJob,
)
from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob
from feast.infra.provider import (
DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL,
_get_requested_feature_views_to_features_dict,
Expand Down Expand Up @@ -63,7 +59,7 @@ def pull_latest_from_table_or_query(
created_timestamp_column: Optional[str],
start_date: datetime,
end_date: datetime,
) -> OfflineJob:
) -> RetrievalJob:
assert isinstance(data_source, BigQuerySource)
from_expression = data_source.get_table_query_string()

Expand All @@ -78,6 +74,7 @@ def pull_latest_from_table_or_query(
timestamp_desc_string = " DESC, ".join(timestamps) + " DESC"
field_string = ", ".join(join_key_columns + feature_name_columns + timestamps)

client = _get_bigquery_client(project=config.offline_store.project_id)
query = f"""
SELECT {field_string}
FROM (
Expand All @@ -88,8 +85,7 @@ def pull_latest_from_table_or_query(
)
WHERE _feast_row = 1
"""

return BigQueryOfflineJob(query=query, config=config)
return BigQueryRetrievalJob(query=query, client=client, config=config)

@staticmethod
def get_historical_features(
Expand Down Expand Up @@ -225,15 +221,6 @@ def _infer_event_timestamp_from_dataframe(entity_df: pandas.DataFrame) -> str:
)


class BigQueryOfflineJob(OfflineJob):
def __init__(self, query: str, config: RepoConfig):
self.query = query
self.client = _get_bigquery_client(project=config.offline_store.project_id)

def to_table(self) -> pyarrow.Table:
return self.client.query(self.query).to_arrow()


class BigQueryRetrievalJob(RetrievalJob):
def __init__(self, query, client, config):
self.query = query
Expand Down Expand Up @@ -269,9 +256,6 @@ def _block_until_done():
if not job_config:
today = date.today().strftime("%Y%m%d")
rand_id = str(uuid.uuid4())[:7]
dataset_project = (
self.config.offline_store.project_id or self.client.project
)
path = f"{self.client.project}.{self.config.offline_store.dataset}.historical_{today}_{rand_id}"
job_config = bigquery.QueryJobConfig(destination=path)

Expand All @@ -291,6 +275,9 @@ def _block_until_done():
print(f"Done writing to '{job_config.destination}'.")
return str(job_config.destination)

def to_table(self) -> pyarrow.Table:
return self.client.query(self.query).to_arrow()


@dataclass(frozen=True)
class FeatureViewQueryContext:
Expand Down
30 changes: 9 additions & 21 deletions sdk/python/feast/infra/offline_stores/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,7 @@
from feast.data_source import DataSource, FileSource
from feast.errors import FeastJoinKeysDuringMaterialization
from feast.feature_view import FeatureView
from feast.infra.offline_stores.offline_store import (
OfflineJob,
OfflineStore,
RetrievalJob,
)
from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob
from feast.infra.provider import (
DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL,
_get_requested_feature_views_to_features_dict,
Expand All @@ -30,19 +26,6 @@ class FileOfflineStoreConfig(FeastConfigBaseModel):
""" Offline store type selector"""


class FileOfflineJob(OfflineJob):
def __init__(self, evaluation_function: Callable):
"""Initialize a lazy historical retrieval job"""

# The evaluation function executes a stored procedure to compute a historical retrieval.
self.evaluation_function = evaluation_function

def to_table(self):
# Only execute the evaluation function to build the final historical retrieval dataframe at the last moment.
df = self.evaluation_function()
return pyarrow.Table.from_pandas(df)


class FileRetrievalJob(RetrievalJob):
def __init__(self, evaluation_function: Callable):
"""Initialize a lazy historical retrieval job"""
Expand All @@ -55,6 +38,11 @@ def to_df(self):
df = self.evaluation_function()
return df

def to_table(self):
# Only execute the evaluation function to build the final historical retrieval dataframe at the last moment.
df = self.evaluation_function()
return pyarrow.Table.from_pandas(df)


class FileOfflineStore(OfflineStore):
@staticmethod
Expand All @@ -65,7 +53,7 @@ def get_historical_features(
entity_df: Union[pd.DataFrame, str],
registry: Registry,
project: str,
) -> FileRetrievalJob:
) -> RetrievalJob:
if not isinstance(entity_df, pd.DataFrame):
raise ValueError(
f"Please provide an entity_df of type {type(pd.DataFrame)} instead of type {type(entity_df)}"
Expand Down Expand Up @@ -230,7 +218,7 @@ def pull_latest_from_table_or_query(
created_timestamp_column: Optional[str],
start_date: datetime,
end_date: datetime,
) -> FileOfflineJob:
) -> RetrievalJob:
assert isinstance(data_source, FileSource)

# Create lazy function that is only called from the RetrievalJob object
Expand Down Expand Up @@ -274,4 +262,4 @@ def evaluate_offline_job():
)
return last_values_df[columns_to_extract]

return FileOfflineJob(evaluation_function=evaluate_offline_job)
return FileRetrievalJob(evaluation_function=evaluate_offline_job)
10 changes: 3 additions & 7 deletions sdk/python/feast/infra/offline_stores/offline_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,13 @@ class RetrievalJob(ABC):
"""RetrievalJob is used to manage the execution of a historical feature retrieval"""

@abstractmethod
def to_df(self):
def to_df(self) -> pd.DataFrame:
"""Return dataset as Pandas DataFrame synchronously"""
pass


class OfflineJob(ABC):
"""OfflineJob is used to manage the execution of a specific logic of the offline store"""

@abstractmethod
def to_table(self) -> pyarrow.Table:
"""Return dataset as Pandas DataFrame synchronously"""
"""Return dataset as pyarrow Table synchronously"""
pass


Expand All @@ -59,7 +55,7 @@ def pull_latest_from_table_or_query(
created_timestamp_column: Optional[str],
start_date: datetime,
end_date: datetime,
) -> OfflineJob:
) -> RetrievalJob:
"""
Note that join_key_columns, feature_name_columns, event_timestamp_column, and created_timestamp_column
have all already been mapped to column names of the source table and those column names are the values passed
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/infra/offline_stores/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from typing import List, Optional, Union

import pandas as pd
import pyarrow
from pydantic import StrictStr
from pydantic.typing import Literal

Expand Down Expand Up @@ -38,14 +37,15 @@ class RedshiftOfflineStoreConfig(FeastConfigBaseModel):
class RedshiftOfflineStore(OfflineStore):
@staticmethod
def pull_latest_from_table_or_query(
config: RepoConfig,
data_source: DataSource,
join_key_columns: List[str],
feature_name_columns: List[str],
event_timestamp_column: str,
created_timestamp_column: Optional[str],
start_date: datetime,
end_date: datetime,
) -> pyarrow.Table:
) -> RetrievalJob:
pass

@staticmethod
Expand Down

0 comments on commit fa5551b

Please sign in to comment.