diff --git a/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py index aff0c6c42c..2636cf95e2 100644 --- a/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py +++ b/sdk/python/feast/infra/online_stores/contrib/hbase_online_store/hbase.py @@ -8,7 +8,7 @@ from feast import Entity from feast.feature_view import FeatureView -from feast.infra.key_encoding_utils import serialize_entity_key +from feast.infra.online_stores.helpers import compute_entity_id from feast.infra.online_stores.online_store import OnlineStore from feast.infra.utils.hbase_utils import HbaseConstants, HbaseUtils from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto @@ -104,14 +104,15 @@ def online_write_batch( hbase = HbaseUtils(self._get_conn(config)) project = config.project - table_name = _table_id(project, table) + table_name = self._table_id(project, table) b = hbase.batch(table_name) for entity_key, values, timestamp, created_ts in data: - row_key = serialize_entity_key( + row_key = self._hbase_row_key( entity_key, - entity_key_serialization_version=config.entity_key_serialization_version, - ).hex() + feature_view_name=table.name, + config=config, + ) values_dict = {} for feature_name, val in values.items(): values_dict[ @@ -133,6 +134,9 @@ def online_write_batch( b.put(row_key, values_dict) b.send() + if progress: + progress(len(data)) + @log_exceptions_and_usage(online_store="hbase") def online_read( self, @@ -152,15 +156,16 @@ def online_read( """ hbase = HbaseUtils(self._get_conn(config)) project = config.project - table_name = _table_id(project, table) + table_name = self._table_id(project, table) result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] row_keys = [ - serialize_entity_key( + self._hbase_row_key( entity_key, - entity_key_serialization_version=config.entity_key_serialization_version, - ).hex() + feature_view_name=table.name, + config=config, + ) for entity_key in entity_keys ] rows = hbase.rows(table_name, row_keys=row_keys) @@ -206,12 +211,12 @@ def update( # We don't create any special state for the entites in this implementation. for table in tables_to_keep: - table_name = _table_id(project, table) + table_name = self._table_id(project, table) if not hbase.check_if_table_exist(table_name): hbase.create_table_with_default_cf(table_name) for table in tables_to_delete: - table_name = _table_id(project, table) + table_name = self._table_id(project, table) hbase.delete_table(table_name) def teardown( @@ -231,16 +236,43 @@ def teardown( project = config.project for table in tables: - table_name = _table_id(project, table) + table_name = self._table_id(project, table) hbase.delete_table(table_name) + def _hbase_row_key( + self, + entity_key: EntityKeyProto, + feature_view_name: str, + config: RepoConfig, + ) -> bytes: + """ + Computes the HBase row key for a given entity key and feature view name. -def _table_id(project: str, table: FeatureView) -> str: - """ - Returns table name given the project_name and the feature_view. + Args: + entity_key (EntityKeyProto): The entity key to compute the row key for. + feature_view_name (str): The name of the feature view to compute the row key for. + config (RepoConfig): The configuration for the Feast repository. - Args: - project: Name of the feast project. - table: Feast FeatureView. - """ - return f"{project}_{table.name}" + Returns: + bytes: The HBase row key for the given entity key and feature view name. + """ + entity_id = compute_entity_id( + entity_key, + entity_key_serialization_version=config.entity_key_serialization_version, + ) + # Even though `entity_id` uniquely identifies an entity, we use the same table + # for multiple feature_views with the same set of entities. + # To uniquely identify the row for a feature_view, we suffix the name of the feature_view itself. + # This also ensures that features for entities from various feature_views are + # colocated. + return f"{entity_id}#{feature_view_name}".encode() + + def _table_id(self, project: str, table: FeatureView) -> str: + """ + Returns table name given the project_name and the feature_view. + + Args: + project: Name of the feast project. + table: Feast FeatureView. + """ + return f"{project}:{table.name}"