From f8799500a64e0c92dcf7a58486053f8c5a0e4fa7 Mon Sep 17 00:00:00 2001 From: Shuchang Zheng Date: Thu, 2 Nov 2023 18:06:34 +0800 Subject: [PATCH] feast query --- wyvern/feature_store/feature_server.py | 1 + .../feature_store/historical_feature_util.py | 82 +++++++++++++++++++ 2 files changed, 83 insertions(+) diff --git a/wyvern/feature_store/feature_server.py b/wyvern/feature_store/feature_server.py index 5c6af6f..c2586b0 100644 --- a/wyvern/feature_store/feature_server.py +++ b/wyvern/feature_store/feature_server.py @@ -34,6 +34,7 @@ ) from wyvern.config import settings from wyvern.feature_store.historical_feature_util import ( + build_and_merge_feast_tables, build_and_merge_realtime_pivot_tables, build_historical_real_time_feature_requests, build_historical_registry_feature_requests, diff --git a/wyvern/feature_store/historical_feature_util.py b/wyvern/feature_store/historical_feature_util.py index db86fba..5bdf323 100644 --- a/wyvern/feature_store/historical_feature_util.py +++ b/wyvern/feature_store/historical_feature_util.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- import logging +import re from collections import defaultdict from datetime import datetime from typing import Any, Dict, List, Optional, Tuple @@ -406,6 +407,87 @@ def build_historical_registry_feature_requests( return requests +def build_and_merge_feast_tables( + store: FeatureStore, + feature_names: List[str], + composite_table: str, +) -> str: + features_grouped_by_entities = group_registry_features_by_entities( + feature_names, + store=store, + ) + counter = 0 + prev_table = composite_table + next_table = f"{composite_table}_0" + for entity_name, feature_names in features_grouped_by_entities.items(): + if not feature_names: + continue + + if FULL_FEATURE_NAME_SEPARATOR in entity_name: + entities = entity_name.split(FULL_FEATURE_NAME_SEPARATOR) + else: + entities = [entity_name] + + if len(entities) > 2: + raise ValueError( + f"Entity name should be singular or composite: {entity_name}", + ) + + # TODO: validate that all entities are in the entity_df_table + # for entity in entities: + # if entity not in entity_values: + # raise ValueError( + # f"{feature_names} depends on {entity}. Could not find entity values: {entity}", + # ) + identifier_column = SQL_COLUMN_SEPARATOR.join(entities) + identifier_table_sql_dupe = f""" + SELECT + {identifier_column} AS IDENTIFIER, + event_timestamp, + ROW_NUMBER() OVER (PARTITION BY IDENTIFIER, event_timestamp ORDER BY (SELECT NULL)) as rn + FROM {composite_table} + WHERE {identifier_column} is NOT NULL + """ + + # dedupe (IDENTIFIER, event_timestamp) + identifier_table_sql = f""" + WITH identifier_table_sql_dupe AS ({identifier_table_sql_dupe}) + SELECT IDENTIFIER, event_timestamp + FROM identifier_table_sql_dupe + WHERE rn = 1 + """ + result = store.get_historical_features( + entity_df=identifier_table_sql, + features=feature_names or [], + full_feature_names=True, + ) + result_sql = result.to_sql() + # Strip the leading "WITH " (WITH plus an empty space) + result_sql = result_sql.replace("WITH ", "") + # Replace the table name with 'identifier_tbl', assuming the table name is always + # in the format "feast_entity_df_" followed by a hex string (UUID without dashes) + result_sql = re.sub( + r'"feast_entity_df_[0-9a-f]{32}"', + '"identifier_tbl"', + result_sql, + flags=re.IGNORECASE, + ) + final_sql = f""" + WITH identifier_tbl_dupe AS ({identifier_table_sql_dupe}), + identifier_tbl AS ( + SELECT IDENTIFIER, event_timestamp + FROM identifier_tbl_dupe, + WHERE rn = 1 + ), + {result_sql} + """ + counter += 1 + prev_table = next_table + next_table = f"{composite_table}_{counter}" + + return "" + + def process_historical_registry_features_requests( store: FeatureStore, requests: List[GetFeastHistoricalFeaturesRequest],