Skip to content
This repository has been archived by the owner on Mar 1, 2024. It is now read-only.

Commit

Permalink
sql works
Browse files Browse the repository at this point in the history
  • Loading branch information
wintonzheng committed Oct 29, 2023
1 parent 2efd778 commit 62c951b
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 18 deletions.
23 changes: 12 additions & 11 deletions wyvern/feature_store/feature_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,25 +532,26 @@ async def get_historical_features_v2(
composite_entities[entity_type_column] = entity_names
valid_realtime_features.append(realtime_feature)

# TODO: generate all the composite feature columns in the remote composite table
# this needs to be a sql query that generates a temporary table RT_{HEX_ID}_COMPOSITE
# for entity_type_column in composite_entities:
# entity_name1, entity_name2 = composite_entities[entity_type_column]
# df[entity_type_column] = df[entity_name1] + ":" + df[entity_name2]

composite_columns = ",".join(
[
":".join(entities) + f" as {entity_type_column}"
" || ':' || ".join(entities) + f" AS {entity_type_column}"
for entity_type_column, entities in composite_entities.items()
],
)
composite_historical_feature_table = f"HISTORICAL_FEATURES_{hex_id}"

# TODO: send this sql to snowflake to create temporary table with this select_sql query
select_sql = f"""
SELECT *, {composite_columns}, timestamp as event_timestamp
CREATE TABLE {composite_historical_feature_table} AS
SELECT *, {composite_columns}, TIMESTAMP as event_timestamp
FROM {data.table}
"""
# create temporary table with this select_sql query

build_and_merge_realtime_pivot_tables(hex_id, realtime_features)
result_table = build_and_merge_realtime_pivot_tables(
valid_realtime_features,
data.table,
composite_historical_feature_table,
)

# feast_requests = build_historical_registry_feature_requests(
# store=store,
Expand Down Expand Up @@ -590,7 +591,7 @@ async def get_historical_features_v2(
# final_df["timestamp"] = final_df["timestamp"].astype(str)

return GetHistoricalFeaturesResponseV2(
result_table="result_table",
result_table=result_table,
)

return app
Expand Down
69 changes: 62 additions & 7 deletions wyvern/feature_store/historical_feature_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,10 @@ def build_historical_real_time_feature_requests(


def build_and_merge_realtime_pivot_tables(
hex_id: str,
full_feature_names: List[str],
) -> Dict[str, RequestEntityIdentifierObjects]:
input_table: str,
composite_table: str,
) -> str:
"""
Build historical real-time feature requests grouped by entity types so that we can process them in parallel.
Expand All @@ -135,7 +136,15 @@ def build_and_merge_realtime_pivot_tables(
features_grouped_by_entity = group_realtime_features_by_entity_type(
full_feature_names=full_feature_names,
)
result_dict: Dict[str, RequestEntityIdentifierObjects] = {}
counter = 0

# prev_table is the previous temporary composite table
prev_table = composite_table
# next_table is the next temporary composite table joined with the next entity type
next_table = f"{composite_table}_0"

# iterate through all the entity types.
# For each entity type, build a new temporary composite table with all the features for this entity type
for (
entity_identifier_type,
curr_feature_names,
Expand All @@ -145,11 +154,57 @@ def build_and_merge_realtime_pivot_tables(
if len(entity_list) > 2:
logger.warning("Invalid entity_identifier_type={entity_identifier_type}")
continue
# TODO: PIVOT to generate

# TODO: merge the PIVOT table into the _COMPOSITE table
curr_feature_names_underscore = [
fn.replace(":", "__", 1) for fn in curr_feature_names
]
entity_identifier_type_val = ":".join(entity_list)
feature_names_sql_str = ",".join(
[f"'{fn}'" for fn in curr_feature_names_underscore],
)
feature_names_with_pivot_table_str = ",".join(
[
f"PIVOT_TABLE.{feature_name}"
for feature_name in curr_feature_names_underscore
],
)
feature_names_pivot_raw = ",".join(
["\"'{fn}'\" as {fn}" for fn in curr_feature_names_underscore],
)

return result_dict
# TODO: send this sql to snowflake
pivot_sql = f"""
CREATE TABLE {next_table} AS (
WITH PIVOT_DATA AS (
SELECT F.REQUEST_ID AS REQUEST,
F.API_SOURCE,
F.EVENT_TYPE,
F.FEATURE_IDENTIFIER,
F.FEATURE_IDENTIFIER_TYPE,
REPLACE(F.FEATURE_NAME, ':', '__') AS FEATURE_NAME,
F.FEATURE_VALUE
FROM FEATURE_LOGS F
INNER JOIN (SELECT DISTINCT REQUEST FROM {input_table}) T
ON F.REQUEST_ID = T.REQUEST
WHERE F.FEATURE_IDENTIFIER_TYPE = '{entity_identifier_type_val}'
), PIVOT_TABLE_RAW AS (
SELECT *
FROM PIVOT_DATA
PIVOT(MAX(FEATURE_VALUE) FOR FEATURE_NAME IN ({feature_names_sql_str}))
), PIVOT_TABLE AS (
SELECT REQUEST, FEATURE_IDENTIFIER, FEATURE_IDENTIFIER_TYPE, {feature_names_pivot_raw}
FROM PIVOT_TABLE_RAW
)
SELECT
{prev_table}.*,{feature_names_with_pivot_table_str}
FROM
{prev_table}
LEFT JOIN PIVOT_TABLE ON {prev_table}.REQUEST = PIVOT_TABLE.REQUEST AND {prev_table}.{entity_identifier_type} = PIVOT_TABLE.FEATURE_IDENTIFIER
)
"""
counter += 1
prev_table = next_table
next_table = f"{composite_table}_{counter}"
return next_table


def process_historical_real_time_features_requests(
Expand Down

0 comments on commit 62c951b

Please sign in to comment.