diff --git a/sdk/python/feast/infra/materialization/__init__.py b/sdk/python/feast/infra/materialization/__init__.py new file mode 100644 index 0000000000..6be653b26e --- /dev/null +++ b/sdk/python/feast/infra/materialization/__init__.py @@ -0,0 +1,13 @@ +from .batch_materialization_engine import ( + BatchMaterializationEngine, + MaterializationJob, + MaterializationTask, +) +from .local_engine import LocalMaterializationEngine + +__all__ = [ + "MaterializationJob", + "MaterializationTask", + "BatchMaterializationEngine", + "LocalMaterializationEngine", +] diff --git a/sdk/python/feast/infra/materialization/batch_materialization_engine.py b/sdk/python/feast/infra/materialization/batch_materialization_engine.py new file mode 100644 index 0000000000..773c685d6e --- /dev/null +++ b/sdk/python/feast/infra/materialization/batch_materialization_engine.py @@ -0,0 +1,122 @@ +import enum +from abc import ABC, abstractmethod +from dataclasses import dataclass +from datetime import datetime +from typing import Callable, List, Optional, Sequence, Union + +from tqdm import tqdm + +from feast.batch_feature_view import BatchFeatureView +from feast.entity import Entity +from feast.feature_view import FeatureView +from feast.infra.offline_stores.offline_store import OfflineStore +from feast.infra.online_stores.online_store import OnlineStore +from feast.registry import BaseRegistry +from feast.repo_config import RepoConfig +from feast.stream_feature_view import StreamFeatureView + + +@dataclass +class MaterializationTask: + """ + A MaterializationTask represents a unit of data that needs to be materialized from an + offline store to an online store. + """ + + project: str + feature_view: Union[BatchFeatureView, StreamFeatureView, FeatureView] + start_time: datetime + end_time: datetime + tqdm_builder: Callable[[int], tqdm] + + +class MaterializationJobStatus(enum.Enum): + WAITING = 1 + RUNNING = 2 + AVAILABLE = 3 + ERROR = 4 + CANCELLING = 5 + CANCELLED = 6 + SUCCEEDED = 7 + + +class MaterializationJob(ABC): + """ + MaterializationJob represents an ongoing or executed process that materializes data as per the + definition of a materialization task. + """ + + task: MaterializationTask + + @abstractmethod + def status(self) -> MaterializationJobStatus: + ... + + @abstractmethod + def error(self) -> Optional[BaseException]: + ... + + @abstractmethod + def should_be_retried(self) -> bool: + ... + + @abstractmethod + def job_id(self) -> str: + ... + + @abstractmethod + def url(self) -> Optional[str]: + ... + + +class BatchMaterializationEngine(ABC): + def __init__( + self, + *, + repo_config: RepoConfig, + offline_store: OfflineStore, + online_store: OnlineStore, + **kwargs, + ): + self.repo_config = repo_config + self.offline_store = offline_store + self.online_store = online_store + + @abstractmethod + def update( + self, + project: str, + views_to_delete: Sequence[ + Union[BatchFeatureView, StreamFeatureView, FeatureView] + ], + views_to_keep: Sequence[ + Union[BatchFeatureView, StreamFeatureView, FeatureView] + ], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + ): + """This method ensures that any necessary infrastructure or resources needed by the + engine are set up ahead of materialization.""" + + @abstractmethod + def materialize( + self, registry: BaseRegistry, tasks: List[MaterializationTask] + ) -> List[MaterializationJob]: + """ + Materialize data from the offline store to the online store for this feature repo. + Args: + registry: The feast registry containing the applied feature views. + tasks: A list of individual materialization tasks. + Returns: + A list of materialization jobs representing each task. + """ + ... + + @abstractmethod + def teardown_infra( + self, + project: str, + fvs: Sequence[Union[BatchFeatureView, StreamFeatureView, FeatureView]], + entities: Sequence[Entity], + ): + """This method ensures that any infrastructure or resources set up by ``update()``are torn down.""" diff --git a/sdk/python/feast/infra/materialization/local_engine.py b/sdk/python/feast/infra/materialization/local_engine.py new file mode 100644 index 0000000000..4f775981ef --- /dev/null +++ b/sdk/python/feast/infra/materialization/local_engine.py @@ -0,0 +1,185 @@ +from dataclasses import dataclass +from datetime import datetime +from typing import Callable, List, Literal, Optional, Sequence, Union + +from tqdm import tqdm + +from feast.batch_feature_view import BatchFeatureView +from feast.entity import Entity +from feast.feature_view import FeatureView +from feast.infra.offline_stores.offline_store import OfflineStore +from feast.infra.online_stores.online_store import OnlineStore +from feast.repo_config import FeastConfigBaseModel, RepoConfig +from feast.stream_feature_view import StreamFeatureView + +from ...registry import BaseRegistry +from ...utils import ( + _convert_arrow_to_proto, + _get_column_names, + _run_pyarrow_field_mapping, +) +from .batch_materialization_engine import ( + BatchMaterializationEngine, + MaterializationJob, + MaterializationJobStatus, + MaterializationTask, +) + +DEFAULT_BATCH_SIZE = 10_000 + + +class LocalMaterializationEngineConfig(FeastConfigBaseModel): + """Batch Materialization Engine config for local in-process engine""" + + type: Literal["local"] = "local" + """ Type selector""" + + +@dataclass +class LocalMaterializationJob(MaterializationJob): + def __init__( + self, + job_id: str, + status: MaterializationJobStatus, + error: Optional[BaseException] = None, + ) -> None: + super().__init__() + self._job_id: str = job_id + self._status: MaterializationJobStatus = status + self._error: Optional[BaseException] = error + + def status(self) -> MaterializationJobStatus: + return self._status + + def error(self) -> Optional[BaseException]: + return self._error + + def should_be_retried(self) -> bool: + return False + + def job_id(self) -> str: + return self._job_id + + def url(self) -> Optional[str]: + return None + + +class LocalMaterializationEngine(BatchMaterializationEngine): + def update( + self, + project: str, + views_to_delete: Sequence[ + Union[BatchFeatureView, StreamFeatureView, FeatureView] + ], + views_to_keep: Sequence[ + Union[BatchFeatureView, StreamFeatureView, FeatureView] + ], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + ): + # Nothing to set up. + pass + + def teardown_infra( + self, + project: str, + fvs: Sequence[Union[BatchFeatureView, StreamFeatureView, FeatureView]], + entities: Sequence[Entity], + ): + # Nothing to tear down. + pass + + def __init__( + self, + *, + repo_config: RepoConfig, + offline_store: OfflineStore, + online_store: OnlineStore, + **kwargs, + ): + super().__init__( + repo_config=repo_config, + offline_store=offline_store, + online_store=online_store, + **kwargs, + ) + + def materialize( + self, registry, tasks: List[MaterializationTask] + ) -> List[MaterializationJob]: + return [ + self._materialize_one( + registry, + task.feature_view, + task.start_time, + task.end_time, + task.project, + task.tqdm_builder, + ) + for task in tasks + ] + + def _materialize_one( + self, + registry: BaseRegistry, + feature_view: Union[BatchFeatureView, StreamFeatureView, FeatureView], + start_date: datetime, + end_date: datetime, + project: str, + tqdm_builder: Callable[[int], tqdm], + ): + entities = [] + for entity_name in feature_view.entities: + entities.append(registry.get_entity(entity_name, project)) + + ( + join_key_columns, + feature_name_columns, + timestamp_field, + created_timestamp_column, + ) = _get_column_names(feature_view, entities) + + job_id = f"{feature_view.name}-{start_date}-{end_date}" + + try: + offline_job = self.offline_store.pull_latest_from_table_or_query( + config=self.repo_config, + data_source=feature_view.batch_source, + join_key_columns=join_key_columns, + feature_name_columns=feature_name_columns, + timestamp_field=timestamp_field, + created_timestamp_column=created_timestamp_column, + start_date=start_date, + end_date=end_date, + ) + + table = offline_job.to_arrow() + + if feature_view.batch_source.field_mapping is not None: + table = _run_pyarrow_field_mapping( + table, feature_view.batch_source.field_mapping + ) + + join_key_to_value_type = { + entity.name: entity.dtype.to_value_type() + for entity in feature_view.entity_columns + } + + with tqdm_builder(table.num_rows) as pbar: + for batch in table.to_batches(DEFAULT_BATCH_SIZE): + rows_to_write = _convert_arrow_to_proto( + batch, feature_view, join_key_to_value_type + ) + self.online_store.online_write_batch( + self.repo_config, + feature_view, + rows_to_write, + lambda x: pbar.update(x), + ) + return LocalMaterializationJob( + job_id=job_id, status=MaterializationJobStatus.SUCCEEDED + ) + except BaseException as e: + return LocalMaterializationJob( + job_id=job_id, status=MaterializationJobStatus.ERROR, error=e + ) diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index 10012c2d80..d60d468174 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -29,14 +29,14 @@ DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL, get_pyarrow_schema_from_batch_source, ) -from feast.infra.provider import ( - _get_requested_feature_views_to_features_dict, - _run_dask_field_mapping, -) from feast.registry import BaseRegistry from feast.repo_config import FeastConfigBaseModel, RepoConfig from feast.saved_dataset import SavedDatasetStorage from feast.usage import log_exceptions_and_usage +from feast.utils import ( + _get_requested_feature_views_to_features_dict, + _run_dask_field_mapping, +) class FileOfflineStoreConfig(FeastConfigBaseModel): diff --git a/sdk/python/feast/infra/offline_stores/offline_utils.py b/sdk/python/feast/infra/offline_stores/offline_utils.py index abe8d4e4e5..8b963a864b 100644 --- a/sdk/python/feast/infra/offline_stores/offline_utils.py +++ b/sdk/python/feast/infra/offline_stores/offline_utils.py @@ -17,11 +17,10 @@ from feast.feature_view import FeatureView from feast.importer import import_class from feast.infra.offline_stores.offline_store import OfflineStore -from feast.infra.provider import _get_requested_feature_views_to_features_dict from feast.registry import BaseRegistry from feast.repo_config import RepoConfig from feast.type_map import feast_value_type_to_pa -from feast.utils import to_naive_utc +from feast.utils import _get_requested_feature_views_to_features_dict, to_naive_utc DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL = "event_timestamp" diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index 8c6dd831dd..181d46a5a8 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -2,33 +2,42 @@ from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union import pandas as pd -import pyarrow import pyarrow as pa from tqdm import tqdm -from feast import FeatureService +from feast import importer +from feast.batch_feature_view import BatchFeatureView from feast.entity import Entity from feast.feature_logging import FeatureServiceLoggingSource +from feast.feature_service import FeatureService from feast.feature_view import FeatureView +from feast.infra.materialization import BatchMaterializationEngine, MaterializationTask +from feast.infra.materialization.batch_materialization_engine import ( + MaterializationJobStatus, +) from feast.infra.offline_stores.offline_store import RetrievalJob from feast.infra.offline_stores.offline_utils import get_offline_store_from_config from feast.infra.online_stores.helpers import get_online_store_from_config -from feast.infra.provider import ( - Provider, - _convert_arrow_to_proto, - _get_column_names, - _run_field_mapping, -) +from feast.infra.provider import Provider from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.registry import BaseRegistry from feast.repo_config import RepoConfig from feast.saved_dataset import SavedDataset +from feast.stream_feature_view import StreamFeatureView from feast.usage import RatioSampler, log_exceptions_and_usage, set_usage_attribute -from feast.utils import make_tzaware +from feast.utils import ( + _convert_arrow_to_proto, + _run_pyarrow_field_mapping, + make_tzaware, +) DEFAULT_BATCH_SIZE = 10_000 +BATCH_ENGINE_CLASS_FOR_TYPE = { + "local": "feast.infra.materialization.LocalMaterializationEngine", +} + class PassthroughProvider(Provider): """ @@ -41,6 +50,7 @@ def __init__(self, config: RepoConfig): self.repo_config = config self._offline_store = None self._online_store = None + self._batch_engine: Optional[BatchMaterializationEngine] = None @property def online_store(self): @@ -58,6 +68,46 @@ def offline_store(self): ) return self._offline_store + @property + def batch_engine(self) -> BatchMaterializationEngine: + if self._batch_engine: + return self._batch_engine + else: + engine_config = self.repo_config.batch_engine_config + config_is_dict = False + if isinstance(engine_config, str): + engine_config_type = engine_config + elif isinstance(engine_config, Dict): + if "type" not in engine_config: + raise ValueError("engine_config needs to have a `type` specified.") + engine_config_type = engine_config["type"] + config_is_dict = True + else: + raise RuntimeError( + f"Invalid config type specified for batch_engine: {type(engine_config)}" + ) + + if engine_config_type in BATCH_ENGINE_CLASS_FOR_TYPE: + engine_config_type = BATCH_ENGINE_CLASS_FOR_TYPE[engine_config_type] + engine_module, engine_class_name = engine_config_type.rsplit(".", 1) + engine_class = importer.import_class(engine_module, engine_class_name) + + if config_is_dict: + _batch_engine = engine_class( + repo_config=self.repo_config, + offline_store=self.offline_store, + online_store=self.online_store, + **engine_config, + ) + else: + _batch_engine = engine_class( + repo_config=self.repo_config, + offline_store=self.offline_store, + online_store=self.online_store, + ) + self._batch_engine = _batch_engine + return _batch_engine + def update_infra( self, project: str, @@ -137,7 +187,9 @@ def ingest_df( table = pa.Table.from_pandas(df) if feature_view.batch_source.field_mapping is not None: - table = _run_field_mapping(table, feature_view.batch_source.field_mapping) + table = _run_pyarrow_field_mapping( + table, feature_view.batch_source.field_mapping + ) join_keys = {entity.join_key: entity.value_type for entity in entities} rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys) @@ -150,7 +202,9 @@ def ingest_df_to_offline_store(self, feature_view: FeatureView, table: pa.Table) set_usage_attribute("provider", self.__class__.__name__) if feature_view.batch_source.field_mapping is not None: - table = _run_field_mapping(table, feature_view.batch_source.field_mapping) + table = _run_pyarrow_field_mapping( + table, feature_view.batch_source.field_mapping + ) self.offline_write_batch(self.repo_config, feature_view, table, None) @@ -165,50 +219,24 @@ def materialize_single_feature_view( tqdm_builder: Callable[[int], tqdm], ) -> None: set_usage_attribute("provider", self.__class__.__name__) - - entities = [] - for entity_name in feature_view.entities: - entities.append(registry.get_entity(entity_name, project)) - - ( - join_key_columns, - feature_name_columns, - timestamp_field, - created_timestamp_column, - ) = _get_column_names(feature_view, entities) - - offline_job = self.offline_store.pull_latest_from_table_or_query( - config=config, - data_source=feature_view.batch_source, - join_key_columns=join_key_columns, - feature_name_columns=feature_name_columns, - timestamp_field=timestamp_field, - created_timestamp_column=created_timestamp_column, - start_date=start_date, - end_date=end_date, + assert ( + isinstance(feature_view, BatchFeatureView) + or isinstance(feature_view, StreamFeatureView) + or isinstance(feature_view, FeatureView) + ), f"Unexpected type for {feature_view.name}: {type(feature_view)}" + task = MaterializationTask( + project=project, + feature_view=feature_view, + start_time=start_date, + end_time=end_date, + tqdm_builder=tqdm_builder, ) - - table = offline_job.to_arrow() - - if feature_view.batch_source.field_mapping is not None: - table = _run_field_mapping(table, feature_view.batch_source.field_mapping) - - join_key_to_value_type = { - entity.name: entity.dtype.to_value_type() - for entity in feature_view.entity_columns - } - - with tqdm_builder(table.num_rows) as pbar: - for batch in table.to_batches(DEFAULT_BATCH_SIZE): - rows_to_write = _convert_arrow_to_proto( - batch, feature_view, join_key_to_value_type - ) - self.online_write_batch( - self.repo_config, - feature_view, - rows_to_write, - lambda x: pbar.update(x), - ) + jobs = self.batch_engine.materialize(registry, [task]) + assert len(jobs) == 1 + if jobs[0].status() == MaterializationJobStatus.ERROR and jobs[0].error(): + e = jobs[0].error() + assert e + raise e def get_historical_features( self, @@ -260,7 +288,7 @@ def retrieve_saved_dataset( def write_feature_service_logs( self, feature_service: FeatureService, - logs: Union[pyarrow.Table, str], + logs: Union[pa.Table, str], config: RepoConfig, registry: BaseRegistry, ): diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index d2e37e69db..9695e4d736 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -1,29 +1,24 @@ import abc -from collections import defaultdict from datetime import datetime from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union -import dask.dataframe as dd import pandas as pd import pyarrow from tqdm import tqdm from feast import FeatureService, errors from feast.entity import Entity -from feast.feature_view import DUMMY_ENTITY_ID, FeatureView +from feast.feature_view import FeatureView from feast.importer import import_class from feast.infra.infra_object import Infra from feast.infra.offline_stores.offline_store import RetrievalJob -from feast.on_demand_feature_view import OnDemandFeatureView from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.registry import BaseRegistry from feast.repo_config import RepoConfig from feast.saved_dataset import SavedDataset -from feast.type_map import python_values_to_proto_values -from feast.value_type import ValueType PROVIDERS_CLASS_FOR_TYPE = { "gcp": "feast.infra.gcp.GcpProvider", @@ -252,198 +247,3 @@ def get_provider(config: RepoConfig, repo_path: Path) -> Provider: cls = import_class(module_name, class_name, "Provider") return cls(config) - - -def _get_requested_feature_views_to_features_dict( - feature_refs: List[str], - feature_views: List[FeatureView], - on_demand_feature_views: List[OnDemandFeatureView], -) -> Tuple[Dict[FeatureView, List[str]], Dict[OnDemandFeatureView, List[str]]]: - """Create a dict of FeatureView -> List[Feature] for all requested features. - Set full_feature_names to True to have feature names prefixed by their feature view name.""" - - feature_views_to_feature_map: Dict[FeatureView, List[str]] = defaultdict(list) - on_demand_feature_views_to_feature_map: Dict[ - OnDemandFeatureView, List[str] - ] = defaultdict(list) - - for ref in feature_refs: - ref_parts = ref.split(":") - feature_view_from_ref = ref_parts[0] - feature_from_ref = ref_parts[1] - - found = False - for fv in feature_views: - if fv.projection.name_to_use() == feature_view_from_ref: - found = True - feature_views_to_feature_map[fv].append(feature_from_ref) - for odfv in on_demand_feature_views: - if odfv.projection.name_to_use() == feature_view_from_ref: - found = True - on_demand_feature_views_to_feature_map[odfv].append(feature_from_ref) - - if not found: - raise ValueError(f"Could not find feature view from reference {ref}") - - return feature_views_to_feature_map, on_demand_feature_views_to_feature_map - - -def _get_column_names( - feature_view: FeatureView, entities: List[Entity] -) -> Tuple[List[str], List[str], str, Optional[str]]: - """ - If a field mapping exists, run it in reverse on the join keys, - feature names, event timestamp column, and created timestamp column - to get the names of the relevant columns in the offline feature store table. - - Returns: - Tuple containing the list of reverse-mapped join_keys, - reverse-mapped feature names, reverse-mapped event timestamp column, - and reverse-mapped created timestamp column that will be passed into - the query to the offline store. - """ - # if we have mapped fields, use the original field names in the call to the offline store - timestamp_field = feature_view.batch_source.timestamp_field - feature_names = [feature.name for feature in feature_view.features] - created_timestamp_column = feature_view.batch_source.created_timestamp_column - join_keys = [ - entity.join_key for entity in entities if entity.join_key != DUMMY_ENTITY_ID - ] - if feature_view.batch_source.field_mapping is not None: - reverse_field_mapping = { - v: k for k, v in feature_view.batch_source.field_mapping.items() - } - timestamp_field = ( - reverse_field_mapping[timestamp_field] - if timestamp_field in reverse_field_mapping.keys() - else timestamp_field - ) - created_timestamp_column = ( - reverse_field_mapping[created_timestamp_column] - if created_timestamp_column - and created_timestamp_column in reverse_field_mapping.keys() - else created_timestamp_column - ) - join_keys = [ - reverse_field_mapping[col] if col in reverse_field_mapping.keys() else col - for col in join_keys - ] - feature_names = [ - reverse_field_mapping[col] if col in reverse_field_mapping.keys() else col - for col in feature_names - ] - - # We need to exclude join keys and timestamp columns from the list of features, after they are mapped to - # their final column names via the `field_mapping` field of the source. - feature_names = [ - name - for name in feature_names - if name not in join_keys - and name != timestamp_field - and name != created_timestamp_column - ] - return ( - join_keys, - feature_names, - timestamp_field, - created_timestamp_column, - ) - - -def _run_field_mapping( - table: pyarrow.Table, field_mapping: Dict[str, str], -) -> pyarrow.Table: - # run field mapping in the forward direction - cols = table.column_names - mapped_cols = [ - field_mapping[col] if col in field_mapping.keys() else col for col in cols - ] - table = table.rename_columns(mapped_cols) - return table - - -def _run_dask_field_mapping( - table: dd.DataFrame, field_mapping: Dict[str, str], -): - if field_mapping: - # run field mapping in the forward direction - table = table.rename(columns=field_mapping) - table = table.persist() - - return table - - -def _coerce_datetime(ts): - """ - Depending on underlying time resolution, arrow to_pydict() sometimes returns pd - timestamp type (for nanosecond resolution), and sometimes you get standard python datetime - (for microsecond resolution). - While pd timestamp class is a subclass of python datetime, it doesn't always behave the - same way. We convert it to normal datetime so that consumers downstream don't have to deal - with these quirks. - """ - if isinstance(ts, pd.Timestamp): - return ts.to_pydatetime() - else: - return ts - - -def _convert_arrow_to_proto( - table: Union[pyarrow.Table, pyarrow.RecordBatch], - feature_view: FeatureView, - join_keys: Dict[str, ValueType], -) -> List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]]: - # Avoid ChunkedArrays which guarentees `zero_copy_only` availiable. - if isinstance(table, pyarrow.Table): - table = table.to_batches()[0] - - columns = [ - (field.name, field.dtype.to_value_type()) for field in feature_view.features - ] + list(join_keys.items()) - - proto_values_by_column = { - column: python_values_to_proto_values( - table.column(column).to_numpy(zero_copy_only=False), value_type - ) - for column, value_type in columns - } - - entity_keys = [ - EntityKeyProto( - join_keys=join_keys, - entity_values=[proto_values_by_column[k][idx] for k in join_keys], - ) - for idx in range(table.num_rows) - ] - - # Serialize the features per row - feature_dict = { - feature.name: proto_values_by_column[feature.name] - for feature in feature_view.features - } - features = [dict(zip(feature_dict, vars)) for vars in zip(*feature_dict.values())] - - # Convert event_timestamps - event_timestamps = [ - _coerce_datetime(val) - for val in pd.to_datetime( - table.column(feature_view.batch_source.timestamp_field).to_numpy( - zero_copy_only=False - ) - ) - ] - - # Convert created_timestamps if they exist - if feature_view.batch_source.created_timestamp_column: - created_timestamps = [ - _coerce_datetime(val) - for val in pd.to_datetime( - table.column( - feature_view.batch_source.created_timestamp_column - ).to_numpy(zero_copy_only=False) - ) - ] - else: - created_timestamps = [None] * table.num_rows - - return list(zip(entity_keys, features, event_timestamps, created_timestamps)) diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index b7cf1683dc..f315023ee1 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -120,6 +120,9 @@ class RepoConfig(FeastBaseModel): _offline_config: Any = Field(alias="offline_store") """ OfflineStoreConfig: Offline store configuration (optional depending on provider) """ + batch_engine_config: Any = Field(alias="batch_engine") + """ BatchMaterializationEngine: Batch materialization configuration (optional depending on provider)""" + feature_server: Optional[Any] """ FeatureServerConfig: Feature server configuration (optional depending on provider) """ @@ -155,6 +158,13 @@ def __init__(self, **data: Any): elif data["provider"] == "aws": self._online_config = "dynamodb" + self._batch_engine = None + if "batch_engine" in data: + self.batch_engine_config = data["batch_engine"] + else: + # Defaults to using local in-process materialization engine. + self.batch_engine_config = "local" + if isinstance(self.feature_server, Dict): self.feature_server = get_feature_server_config_from_type( self.feature_server["type"] diff --git a/sdk/python/feast/utils.py b/sdk/python/feast/utils.py index a40f423c53..9f18da38cd 100644 --- a/sdk/python/feast/utils.py +++ b/sdk/python/feast/utils.py @@ -1,8 +1,24 @@ +import typing +from collections import defaultdict from datetime import datetime +from typing import Dict, List, Optional, Tuple, Union +import pandas as pd +import pyarrow +from dask import dataframe as dd from dateutil.tz import tzlocal from pytz import utc +from feast.entity import Entity +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.type_map import python_values_to_proto_values +from feast.value_type import ValueType + +if typing.TYPE_CHECKING: + from feast.feature_view import FeatureView + from feast.on_demand_feature_view import OnDemandFeatureView + def make_tzaware(t: datetime) -> datetime: """We assume tz-naive datetimes are UTC""" @@ -24,3 +40,201 @@ def maybe_local_tz(t: datetime) -> datetime: return t.replace(tzinfo=tzlocal()) else: return t + + +def _get_requested_feature_views_to_features_dict( + feature_refs: List[str], + feature_views: List["FeatureView"], + on_demand_feature_views: List["OnDemandFeatureView"], +) -> Tuple[Dict["FeatureView", List[str]], Dict["OnDemandFeatureView", List[str]]]: + """Create a dict of FeatureView -> List[Feature] for all requested features. + Set full_feature_names to True to have feature names prefixed by their feature view name.""" + + feature_views_to_feature_map: Dict["FeatureView", List[str]] = defaultdict(list) + on_demand_feature_views_to_feature_map: Dict[ + "OnDemandFeatureView", List[str] + ] = defaultdict(list) + + for ref in feature_refs: + ref_parts = ref.split(":") + feature_view_from_ref = ref_parts[0] + feature_from_ref = ref_parts[1] + + found = False + for fv in feature_views: + if fv.projection.name_to_use() == feature_view_from_ref: + found = True + feature_views_to_feature_map[fv].append(feature_from_ref) + for odfv in on_demand_feature_views: + if odfv.projection.name_to_use() == feature_view_from_ref: + found = True + on_demand_feature_views_to_feature_map[odfv].append(feature_from_ref) + + if not found: + raise ValueError(f"Could not find feature view from reference {ref}") + + return feature_views_to_feature_map, on_demand_feature_views_to_feature_map + + +def _get_column_names( + feature_view: "FeatureView", entities: List[Entity] +) -> Tuple[List[str], List[str], str, Optional[str]]: + """ + If a field mapping exists, run it in reverse on the join keys, + feature names, event timestamp column, and created timestamp column + to get the names of the relevant columns in the offline feature store table. + + Returns: + Tuple containing the list of reverse-mapped join_keys, + reverse-mapped feature names, reverse-mapped event timestamp column, + and reverse-mapped created timestamp column that will be passed into + the query to the offline store. + """ + # if we have mapped fields, use the original field names in the call to the offline store + timestamp_field = feature_view.batch_source.timestamp_field + feature_names = [feature.name for feature in feature_view.features] + created_timestamp_column = feature_view.batch_source.created_timestamp_column + + from feast.feature_view import DUMMY_ENTITY_ID + + join_keys = [ + entity.join_key for entity in entities if entity.join_key != DUMMY_ENTITY_ID + ] + if feature_view.batch_source.field_mapping is not None: + reverse_field_mapping = { + v: k for k, v in feature_view.batch_source.field_mapping.items() + } + timestamp_field = ( + reverse_field_mapping[timestamp_field] + if timestamp_field in reverse_field_mapping.keys() + else timestamp_field + ) + created_timestamp_column = ( + reverse_field_mapping[created_timestamp_column] + if created_timestamp_column + and created_timestamp_column in reverse_field_mapping.keys() + else created_timestamp_column + ) + join_keys = [ + reverse_field_mapping[col] if col in reverse_field_mapping.keys() else col + for col in join_keys + ] + feature_names = [ + reverse_field_mapping[col] if col in reverse_field_mapping.keys() else col + for col in feature_names + ] + + # We need to exclude join keys and timestamp columns from the list of features, after they are mapped to + # their final column names via the `field_mapping` field of the source. + feature_names = [ + name + for name in feature_names + if name not in join_keys + and name != timestamp_field + and name != created_timestamp_column + ] + return ( + join_keys, + feature_names, + timestamp_field, + created_timestamp_column, + ) + + +def _run_pyarrow_field_mapping( + table: pyarrow.Table, field_mapping: Dict[str, str], +) -> pyarrow.Table: + # run field mapping in the forward direction + cols = table.column_names + mapped_cols = [ + field_mapping[col] if col in field_mapping.keys() else col for col in cols + ] + table = table.rename_columns(mapped_cols) + return table + + +def _run_dask_field_mapping( + table: dd.DataFrame, field_mapping: Dict[str, str], +): + if field_mapping: + # run field mapping in the forward direction + table = table.rename(columns=field_mapping) + table = table.persist() + + return table + + +def _coerce_datetime(ts): + """ + Depending on underlying time resolution, arrow to_pydict() sometimes returns pd + timestamp type (for nanosecond resolution), and sometimes you get standard python datetime + (for microsecond resolution). + While pd timestamp class is a subclass of python datetime, it doesn't always behave the + same way. We convert it to normal datetime so that consumers downstream don't have to deal + with these quirks. + """ + if isinstance(ts, pd.Timestamp): + return ts.to_pydatetime() + else: + return ts + + +def _convert_arrow_to_proto( + table: Union[pyarrow.Table, pyarrow.RecordBatch], + feature_view: "FeatureView", + join_keys: Dict[str, ValueType], +) -> List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]]: + # Avoid ChunkedArrays which guarantees `zero_copy_only` available. + if isinstance(table, pyarrow.Table): + table = table.to_batches()[0] + + columns = [ + (field.name, field.dtype.to_value_type()) for field in feature_view.features + ] + list(join_keys.items()) + + proto_values_by_column = { + column: python_values_to_proto_values( + table.column(column).to_numpy(zero_copy_only=False), value_type + ) + for column, value_type in columns + } + + entity_keys = [ + EntityKeyProto( + join_keys=join_keys, + entity_values=[proto_values_by_column[k][idx] for k in join_keys], + ) + for idx in range(table.num_rows) + ] + + # Serialize the features per row + feature_dict = { + feature.name: proto_values_by_column[feature.name] + for feature in feature_view.features + } + features = [dict(zip(feature_dict, vars)) for vars in zip(*feature_dict.values())] + + # Convert event_timestamps + event_timestamps = [ + _coerce_datetime(val) + for val in pd.to_datetime( + table.column(feature_view.batch_source.timestamp_field).to_numpy( + zero_copy_only=False + ) + ) + ] + + # Convert created_timestamps if they exist + if feature_view.batch_source.created_timestamp_column: + created_timestamps = [ + _coerce_datetime(val) + for val in pd.to_datetime( + table.column( + feature_view.batch_source.created_timestamp_column + ).to_numpy(zero_copy_only=False) + ) + ] + else: + created_timestamps = [None] * table.num_rows + + return list(zip(entity_keys, features, event_timestamps, created_timestamps)) diff --git a/sdk/python/tests/unit/infra/test_provider.py b/sdk/python/tests/unit/infra/test_provider.py index 5ed5603b03..217a1361b4 100644 --- a/sdk/python/tests/unit/infra/test_provider.py +++ b/sdk/python/tests/unit/infra/test_provider.py @@ -18,8 +18,8 @@ from feast.entity import Entity from feast.feature_view import FeatureView from feast.field import Field -from feast.infra.provider import _get_column_names from feast.types import String +from feast.utils import _get_column_names def test_get_column_names_preserves_feature_ordering(): diff --git a/sdk/python/tests/utils/online_write_benchmark.py b/sdk/python/tests/utils/online_write_benchmark.py index 9f2f8ba60d..8a138f41db 100644 --- a/sdk/python/tests/utils/online_write_benchmark.py +++ b/sdk/python/tests/utils/online_write_benchmark.py @@ -14,9 +14,9 @@ from feast.feature_store import FeatureStore from feast.feature_view import FeatureView from feast.field import Field -from feast.infra.provider import _convert_arrow_to_proto from feast.repo_config import RepoConfig from feast.types import Float32, Int32 +from feast.utils import _convert_arrow_to_proto def create_driver_hourly_stats_feature_view(source):