Skip to content

Commit

Permalink
Add notion of OfflineJob
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Delacour <matt.delacour@shopify.com>
  • Loading branch information
Tsotne Tabidze authored and Matt Delacour committed Jun 28, 2021
1 parent b9dd955 commit 9df3864
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 64 deletions.
26 changes: 14 additions & 12 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,12 +386,13 @@ def tqdm_builder(length):
end_date = utils.make_tzaware(end_date)

provider.materialize_single_feature_view(
feature_view,
start_date,
end_date,
self._registry,
self.project,
tqdm_builder,
config=self.config,
feature_view=feature_view,
start_date=start_date,
end_date=end_date,
registry=self._registry,
project=self.project,
tqdm_builder=tqdm_builder,
)

self._registry.apply_materialization(
Expand Down Expand Up @@ -464,12 +465,13 @@ def tqdm_builder(length):
end_date = utils.make_tzaware(end_date)

provider.materialize_single_feature_view(
feature_view,
start_date,
end_date,
self._registry,
self.project,
tqdm_builder,
config=self.config,
feature_view=feature_view,
start_date=start_date,
end_date=end_date,
registry=self._registry,
project=self.project,
tqdm_builder=tqdm_builder,
)

self._registry.apply_materialization(
Expand Down
5 changes: 4 additions & 1 deletion sdk/python/feast/infra/gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def online_read(

def materialize_single_feature_view(
self,
config: RepoConfig,
feature_view: FeatureView,
start_date: datetime,
end_date: datetime,
Expand All @@ -99,7 +100,8 @@ def materialize_single_feature_view(
created_timestamp_column,
) = _get_column_names(feature_view, entities)

table = self.offline_store.pull_latest_from_table_or_query(
offline_job = self.offline_store.pull_latest_from_table_or_query(
config=config,
data_source=feature_view.input,
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
Expand All @@ -108,6 +110,7 @@ def materialize_single_feature_view(
start_date=start_date,
end_date=end_date,
)
table = offline_job.to_table()

if feature_view.input.field_mapping is not None:
table = _run_field_mapping(table, feature_view.input.field_mapping)
Expand Down
5 changes: 4 additions & 1 deletion sdk/python/feast/infra/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ def online_read(

def materialize_single_feature_view(
self,
config: RepoConfig,
feature_view: FeatureView,
start_date: datetime,
end_date: datetime,
Expand All @@ -98,15 +99,17 @@ def materialize_single_feature_view(
created_timestamp_column,
) = _get_column_names(feature_view, entities)

table = self.offline_store.pull_latest_from_table_or_query(
offline_job = self.offline_store.pull_latest_from_table_or_query(
data_source=feature_view.input,
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
event_timestamp_column=event_timestamp_column,
created_timestamp_column=created_timestamp_column,
start_date=start_date,
end_date=end_date,
config=config,
)
table = offline_job.to_table()

if feature_view.input.field_mapping is not None:
table = _run_field_mapping(table, feature_view.input.field_mapping)
Expand Down
40 changes: 23 additions & 17 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
from feast.data_source import BigQuerySource, DataSource
from feast.errors import FeastProviderLoginError
from feast.feature_view import FeatureView
from feast.infra.offline_stores.offline_store import OfflineStore
from feast.infra.offline_stores.offline_store import (
OfflineJob,
OfflineStore,
RetrievalJob,
)
from feast.infra.provider import (
DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL,
RetrievalJob,
_get_requested_feature_views_to_features_dict,
)
from feast.registry import Registry
Expand Down Expand Up @@ -52,14 +55,15 @@ class BigQueryOfflineStoreConfig(FeastConfigBaseModel):
class BigQueryOfflineStore(OfflineStore):
@staticmethod
def pull_latest_from_table_or_query(
config: RepoConfig,
data_source: DataSource,
join_key_columns: List[str],
feature_name_columns: List[str],
event_timestamp_column: str,
created_timestamp_column: Optional[str],
start_date: datetime,
end_date: datetime,
) -> pyarrow.Table:
) -> OfflineJob:
assert isinstance(data_source, BigQuerySource)
from_expression = data_source.get_table_query_string()

Expand All @@ -85,13 +89,7 @@ def pull_latest_from_table_or_query(
WHERE _feast_row = 1
"""

return BigQueryOfflineStore._pull_query(query)

@staticmethod
def _pull_query(query: str) -> pyarrow.Table:
client = _get_bigquery_client()
query_job = client.query(query)
return query_job.to_arrow()
return BigQueryOfflineJob(query=query, config=config)

@staticmethod
def get_historical_features(
Expand All @@ -103,19 +101,18 @@ def get_historical_features(
project: str,
) -> RetrievalJob:
# TODO: Add entity_df validation in order to fail before interacting with BigQuery
assert isinstance(config.offline_store, BigQueryOfflineStoreConfig)

client = _get_bigquery_client()

client = _get_bigquery_client(project=config.offline_store.project_id)
expected_join_keys = _get_join_keys(project, feature_views, registry)

assert isinstance(config.offline_store, BigQueryOfflineStoreConfig)
dataset_project = config.offline_store.project_id or client.project

table = _upload_entity_df_into_bigquery(
client=client,
project=config.project,
dataset_name=config.offline_store.dataset,
dataset_project=dataset_project,
dataset_project=client.project,
entity_df=entity_df,
)

Expand Down Expand Up @@ -228,6 +225,15 @@ def _infer_event_timestamp_from_dataframe(entity_df: pandas.DataFrame) -> str:
)


class BigQueryOfflineJob(OfflineJob):
def __init__(self, query: str, config: RepoConfig):
self.query = query
self.client = _get_bigquery_client(project=config.offline_store.project_id)

def to_table(self) -> pyarrow.Table:
return self.client.query(self.query).to_arrow()


class BigQueryRetrievalJob(RetrievalJob):
def __init__(self, query, client, config):
self.query = query
Expand Down Expand Up @@ -266,7 +272,7 @@ def _block_until_done():
dataset_project = (
self.config.offline_store.project_id or self.client.project
)
path = f"{dataset_project}.{self.config.offline_store.dataset}.historical_{today}_{rand_id}"
path = f"{self.client.project}.{self.config.offline_store.dataset}.historical_{today}_{rand_id}"
job_config = bigquery.QueryJobConfig(destination=path)

bq_job = self.client.query(self.query, job_config=job_config)
Expand Down Expand Up @@ -446,9 +452,9 @@ def build_point_in_time_query(
return query


def _get_bigquery_client():
def _get_bigquery_client(project: Optional[str] = None):
try:
client = bigquery.Client()
client = bigquery.Client(project=project)
except DefaultCredentialsError as e:
raise FeastProviderLoginError(
str(e)
Expand Down
88 changes: 56 additions & 32 deletions sdk/python/feast/infra/offline_stores/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@
from feast.data_source import DataSource, FileSource
from feast.errors import FeastJoinKeysDuringMaterialization
from feast.feature_view import FeatureView
from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob
from feast.infra.offline_stores.offline_store import (
OfflineJob,
OfflineStore,
RetrievalJob,
)
from feast.infra.provider import (
DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL,
_get_requested_feature_views_to_features_dict,
Expand All @@ -26,6 +30,19 @@ class FileOfflineStoreConfig(FeastConfigBaseModel):
""" Offline store type selector"""


class FileOfflineJob(OfflineJob):
def __init__(self, evaluation_function: Callable):
"""Initialize a lazy historical retrieval job"""

# The evaluation function executes a stored procedure to compute a historical retrieval.
self.evaluation_function = evaluation_function

def to_table(self):
# Only execute the evaluation function to build the final historical retrieval dataframe at the last moment.
df = self.evaluation_function()
return pyarrow.Table.from_pandas(df)


class FileRetrievalJob(RetrievalJob):
def __init__(self, evaluation_function: Callable):
"""Initialize a lazy historical retrieval job"""
Expand Down Expand Up @@ -205,49 +222,56 @@ def evaluate_historical_retrieval():

@staticmethod
def pull_latest_from_table_or_query(
config: RepoConfig,
data_source: DataSource,
join_key_columns: List[str],
feature_name_columns: List[str],
event_timestamp_column: str,
created_timestamp_column: Optional[str],
start_date: datetime,
end_date: datetime,
) -> pyarrow.Table:
) -> FileOfflineJob:
assert isinstance(data_source, FileSource)

source_df = pd.read_parquet(data_source.path)
# Make sure all timestamp fields are tz-aware. We default tz-naive fields to UTC
source_df[event_timestamp_column] = source_df[event_timestamp_column].apply(
lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc)
)
if created_timestamp_column:
source_df[created_timestamp_column] = source_df[
created_timestamp_column
].apply(lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc))

source_columns = set(source_df.columns)
if not set(join_key_columns).issubset(source_columns):
raise FeastJoinKeysDuringMaterialization(
data_source.path, set(join_key_columns), source_columns
# Create lazy function that is only called from the RetrievalJob object
def evaluate_offline_job():
source_df = pd.read_parquet(data_source.path)
# Make sure all timestamp fields are tz-aware. We default tz-naive fields to UTC
source_df[event_timestamp_column] = source_df[event_timestamp_column].apply(
lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc)
)
if created_timestamp_column:
source_df[created_timestamp_column] = source_df[
created_timestamp_column
].apply(
lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc)
)

ts_columns = (
[event_timestamp_column, created_timestamp_column]
if created_timestamp_column
else [event_timestamp_column]
)
source_columns = set(source_df.columns)
if not set(join_key_columns).issubset(source_columns):
raise FeastJoinKeysDuringMaterialization(
data_source.path, set(join_key_columns), source_columns
)

source_df.sort_values(by=ts_columns, inplace=True)
ts_columns = (
[event_timestamp_column, created_timestamp_column]
if created_timestamp_column
else [event_timestamp_column]
)

filtered_df = source_df[
(source_df[event_timestamp_column] >= start_date)
& (source_df[event_timestamp_column] < end_date)
]
last_values_df = filtered_df.drop_duplicates(
join_key_columns, keep="last", ignore_index=True
)
source_df.sort_values(by=ts_columns, inplace=True)

filtered_df = source_df[
(source_df[event_timestamp_column] >= start_date)
& (source_df[event_timestamp_column] < end_date)
]
last_values_df = filtered_df.drop_duplicates(
join_key_columns, keep="last", ignore_index=True
)

columns_to_extract = set(join_key_columns + feature_name_columns + ts_columns)
table = pyarrow.Table.from_pandas(last_values_df[columns_to_extract])
columns_to_extract = set(
join_key_columns + feature_name_columns + ts_columns
)
return last_values_df[columns_to_extract]

return table
return FileOfflineJob(evaluation_function=evaluate_offline_job)
12 changes: 11 additions & 1 deletion sdk/python/feast/infra/offline_stores/offline_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ def to_df(self):
pass


class OfflineJob(ABC):
"""OfflineJob is used to manage the execution of a specific logic of the offline store"""

@abstractmethod
def to_table(self) -> pyarrow.Table:
"""Return dataset as Pandas DataFrame synchronously"""
pass


class OfflineStore(ABC):
"""
OfflineStore is an object used for all interaction between Feast and the service used for offline storage of
Expand All @@ -42,14 +51,15 @@ class OfflineStore(ABC):
@staticmethod
@abstractmethod
def pull_latest_from_table_or_query(
config: RepoConfig,
data_source: DataSource,
join_key_columns: List[str],
feature_name_columns: List[str],
event_timestamp_column: str,
created_timestamp_column: Optional[str],
start_date: datetime,
end_date: datetime,
) -> pyarrow.Table:
) -> OfflineJob:
"""
Note that join_key_columns, feature_name_columns, event_timestamp_column, and created_timestamp_column
have all already been mapped to column names of the source table and those column names are the values passed
Expand Down
1 change: 1 addition & 0 deletions sdk/python/feast/infra/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ def online_write_batch(
@abc.abstractmethod
def materialize_single_feature_view(
self,
config: RepoConfig,
feature_view: FeatureView,
start_date: datetime,
end_date: datetime,
Expand Down
1 change: 1 addition & 0 deletions sdk/python/tests/foo_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def online_write_batch(

def materialize_single_feature_view(
self,
config: RepoConfig,
feature_view: FeatureView,
start_date: datetime,
end_date: datetime,
Expand Down

0 comments on commit 9df3864

Please sign in to comment.