Skip to content
This repository has been archived by the owner on Mar 1, 2024. It is now read-only.

Commit

Permalink
feast query
Browse files Browse the repository at this point in the history
  • Loading branch information
wintonzheng committed Nov 14, 2023
1 parent 6cb1524 commit f879950
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 0 deletions.
1 change: 1 addition & 0 deletions wyvern/feature_store/feature_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
)
from wyvern.config import settings
from wyvern.feature_store.historical_feature_util import (
build_and_merge_feast_tables,
build_and_merge_realtime_pivot_tables,
build_historical_real_time_feature_requests,
build_historical_registry_feature_requests,
Expand Down
82 changes: 82 additions & 0 deletions wyvern/feature_store/historical_feature_util.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
import logging
import re
from collections import defaultdict
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple
Expand Down Expand Up @@ -406,6 +407,87 @@ def build_historical_registry_feature_requests(
return requests


def build_and_merge_feast_tables(
store: FeatureStore,
feature_names: List[str],
composite_table: str,
) -> str:
features_grouped_by_entities = group_registry_features_by_entities(
feature_names,
store=store,
)
counter = 0
prev_table = composite_table
next_table = f"{composite_table}_0"
for entity_name, feature_names in features_grouped_by_entities.items():
if not feature_names:
continue

if FULL_FEATURE_NAME_SEPARATOR in entity_name:
entities = entity_name.split(FULL_FEATURE_NAME_SEPARATOR)
else:
entities = [entity_name]

if len(entities) > 2:
raise ValueError(
f"Entity name should be singular or composite: {entity_name}",
)

# TODO: validate that all entities are in the entity_df_table
# for entity in entities:
# if entity not in entity_values:
# raise ValueError(
# f"{feature_names} depends on {entity}. Could not find entity values: {entity}",
# )
identifier_column = SQL_COLUMN_SEPARATOR.join(entities)
identifier_table_sql_dupe = f"""
SELECT
{identifier_column} AS IDENTIFIER,
event_timestamp,
ROW_NUMBER() OVER (PARTITION BY IDENTIFIER, event_timestamp ORDER BY (SELECT NULL)) as rn
FROM {composite_table}
WHERE {identifier_column} is NOT NULL
"""

# dedupe (IDENTIFIER, event_timestamp)
identifier_table_sql = f"""
WITH identifier_table_sql_dupe AS ({identifier_table_sql_dupe})
SELECT IDENTIFIER, event_timestamp
FROM identifier_table_sql_dupe
WHERE rn = 1
"""
result = store.get_historical_features(
entity_df=identifier_table_sql,
features=feature_names or [],
full_feature_names=True,
)
result_sql = result.to_sql()
# Strip the leading "WITH " (WITH plus an empty space)
result_sql = result_sql.replace("WITH ", "")
# Replace the table name with 'identifier_tbl', assuming the table name is always
# in the format "feast_entity_df_" followed by a hex string (UUID without dashes)
result_sql = re.sub(
r'"feast_entity_df_[0-9a-f]{32}"',
'"identifier_tbl"',
result_sql,
flags=re.IGNORECASE,
)
final_sql = f"""
WITH identifier_tbl_dupe AS ({identifier_table_sql_dupe}),
identifier_tbl AS (
SELECT IDENTIFIER, event_timestamp
FROM identifier_tbl_dupe,
WHERE rn = 1
),
{result_sql}
"""
counter += 1
prev_table = next_table
next_table = f"{composite_table}_{counter}"

return ""


def process_historical_registry_features_requests(
store: FeatureStore,
requests: List[GetFeastHistoricalFeaturesRequest],
Expand Down

0 comments on commit f879950

Please sign in to comment.