Skip to content

Commit

Permalink
feat: Scaffold for unified push api (feast-dev#2796)
Browse files Browse the repository at this point in the history
* Skaffolding for offline store push

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* LInt

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix api

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix lint

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fixed

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>
  • Loading branch information
kevjumba authored Jun 15, 2022
1 parent bdecb10 commit 1bd0930
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 12 deletions.
22 changes: 22 additions & 0 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1370,6 +1370,28 @@ def write_to_online_store(
provider = self._get_provider()
provider.ingest_df(feature_view, entities, df)

@log_exceptions_and_usage
def write_to_offline_store(
self,
feature_view_name: str,
df: pd.DataFrame,
allow_registry_cache: bool = True,
):
"""
ingests data directly into the Online store
"""
# TODO: restrict this to work with online StreamFeatureViews and validate the FeatureView type
try:
feature_view = self.get_stream_feature_view(
feature_view_name, allow_registry_cache=allow_registry_cache
)
except FeatureViewNotFoundException:
feature_view = self.get_feature_view(
feature_view_name, allow_registry_cache=allow_registry_cache
)
provider = self._get_provider()
provider.ingest_df_to_offline_store(feature_view, df)

@log_exceptions_and_usage
def get_online_features(
self,
Expand Down
24 changes: 23 additions & 1 deletion sdk/python/feast/infra/offline_stores/offline_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from abc import ABC, abstractmethod
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, List, Optional, Union
from typing import TYPE_CHECKING, Any, Callable, List, Optional, Union

import pandas as pd
import pyarrow
Expand Down Expand Up @@ -270,3 +270,25 @@ def write_logged_features(
This is an optional method that could be supported only be some stores.
"""
raise NotImplementedError()

@staticmethod
def offline_write_batch(
config: RepoConfig,
table: FeatureView,
data: pd.DataFrame,
progress: Optional[Callable[[int], Any]],
):
"""
Write features to a specified destination in the offline store.
Data can be appended to an existing table (destination) or a new one will be created automatically
(if it doesn't exist).
Hence, this function can be called repeatedly with the same destination config to write features.
Args:
config: Repo configuration object
table: FeatureView to write the data to.
data: dataframe containing feature data and timestamp column for historical feature retrieval
progress: Optional function to be called once every mini-batch of rows is written to
the online store. Can be used to display progress.
"""
raise NotImplementedError()
17 changes: 14 additions & 3 deletions sdk/python/feast/infra/passthrough_provider.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from datetime import datetime, timedelta
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union

import pandas
import pandas as pd
import pyarrow
import pyarrow as pa
from tqdm import tqdm
Expand Down Expand Up @@ -100,6 +100,17 @@ def online_write_batch(
if self.online_store:
self.online_store.online_write_batch(config, table, data, progress)

def offline_write_batch(
self,
config: RepoConfig,
table: FeatureView,
data: pd.DataFrame,
progress: Optional[Callable[[int], Any]],
) -> None:
set_usage_attribute("provider", self.__class__.__name__)
if self.offline_store:
self.offline_store.offline_write_batch(config, table, data, progress)

@log_exceptions_and_usage(sampler=RatioSampler(ratio=0.001))
def online_read(
self,
Expand All @@ -117,7 +128,7 @@ def online_read(
return result

def ingest_df(
self, feature_view: FeatureView, entities: List[Entity], df: pandas.DataFrame,
self, feature_view: FeatureView, entities: List[Entity], df: pd.DataFrame,
):
set_usage_attribute("provider", self.__class__.__name__)
table = pa.Table.from_pandas(df)
Expand Down Expand Up @@ -193,7 +204,7 @@ def get_historical_features(
config: RepoConfig,
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Union[pandas.DataFrame, str],
entity_df: Union[pd.DataFrame, str],
registry: BaseRegistry,
project: str,
full_feature_names: bool,
Expand Down
24 changes: 16 additions & 8 deletions sdk/python/feast/infra/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union

import dask.dataframe as dd
import pandas
import pandas as pd
import pyarrow
from tqdm import tqdm

Expand Down Expand Up @@ -119,13 +119,21 @@ def online_write_batch(
...

def ingest_df(
self, feature_view: FeatureView, entities: List[Entity], df: pandas.DataFrame,
self, feature_view: FeatureView, entities: List[Entity], df: pd.DataFrame,
):
"""
Ingests a DataFrame directly into the online store
"""
pass

def ingest_df_to_offline_store(
self, feature_view: FeatureView, df: pd.DataFrame,
):
"""
Ingests a DataFrame directly into the offline store
"""
pass

@abc.abstractmethod
def materialize_single_feature_view(
self,
Expand All @@ -145,7 +153,7 @@ def get_historical_features(
config: RepoConfig,
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Union[pandas.DataFrame, str],
entity_df: Union[pd.DataFrame, str],
registry: BaseRegistry,
project: str,
full_feature_names: bool,
Expand Down Expand Up @@ -367,14 +375,14 @@ def _run_dask_field_mapping(

def _coerce_datetime(ts):
"""
Depending on underlying time resolution, arrow to_pydict() sometimes returns pandas
Depending on underlying time resolution, arrow to_pydict() sometimes returns pd
timestamp type (for nanosecond resolution), and sometimes you get standard python datetime
(for microsecond resolution).
While pandas timestamp class is a subclass of python datetime, it doesn't always behave the
While pd timestamp class is a subclass of python datetime, it doesn't always behave the
same way. We convert it to normal datetime so that consumers downstream don't have to deal
with these quirks.
"""
if isinstance(ts, pandas.Timestamp):
if isinstance(ts, pd.Timestamp):
return ts.to_pydatetime()
else:
return ts
Expand Down Expand Up @@ -418,7 +426,7 @@ def _convert_arrow_to_proto(
# Convert event_timestamps
event_timestamps = [
_coerce_datetime(val)
for val in pandas.to_datetime(
for val in pd.to_datetime(
table.column(feature_view.batch_source.timestamp_field).to_numpy(
zero_copy_only=False
)
Expand All @@ -429,7 +437,7 @@ def _convert_arrow_to_proto(
if feature_view.batch_source.created_timestamp_column:
created_timestamps = [
_coerce_datetime(val)
for val in pandas.to_datetime(
for val in pd.to_datetime(
table.column(
feature_view.batch_source.created_timestamp_column
).to_numpy(zero_copy_only=False)
Expand Down

0 comments on commit 1bd0930

Please sign in to comment.