Skip to content
This repository has been archived by the owner on Mar 1, 2024. It is now read-only.

Commit

Permalink
final code
Browse files Browse the repository at this point in the history
  • Loading branch information
wintonzheng committed Nov 15, 2023
1 parent cbe2f2e commit 8aa6779
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 66 deletions.
52 changes: 1 addition & 51 deletions wyvern/feature_store/feature_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,43 +466,11 @@ async def get_historical_features(
async def get_historical_features_v2(
data: GetHistoricalFeaturesRequestV2,
) -> GetHistoricalFeaturesResponseV2:
# TODO: validate the data input: lengths of requests, timestamps and all the entities should be the same
# if "request" not in data.entities:
# raise HTTPException(
# status_code=400,
# detail="request is required in entities",
# )
# length_of_requests = len(data.entities["request"])
# length_of_timestamps = len(data.timestamps)
# if length_of_requests != length_of_timestamps:
# raise HTTPException(
# status_code=400,
# detail=(
# f"Length of requests({length_of_requests}) and "
# f"timestamps({length_of_timestamps}) should be the same"
# ),
# )
# if length_of_requests > MAX_HISTORICAL_REQUEST_SIZE:
# raise HTTPException(
# status_code=400,
# detail=(
# f"The max size of requests is {MAX_HISTORICAL_REQUEST_SIZE}. Got {length_of_requests} requests."
# ),
# )
# for key, value in data.entities.items():
# if len(value) != length_of_requests:
# raise HTTPException(
# status_code=400,
# detail=f"Length of requests({length_of_requests}) and {key}({len(value)}) should be the same",
# )

# Generate a 10-digit hex for the request
random_id = secrets.token_hex(5)

# convert the data input to pandas dataframe
realtime_features, feast_features = separate_real_time_features(data.features)
# TODO: analyze all the realtime features and generate all the composite feature columns in the dataframe
# the column name will be the composite feature name
valid_realtime_features: List[str] = []
composite_entities: Dict[str, List[str]] = {}
for realtime_feature in realtime_features:
Expand All @@ -515,22 +483,6 @@ async def get_historical_features_v2(
continue

if len(entity_names) == 2:
entity_name_1 = entity_names[0]
entity_name_2 = entity_names[1]

# TODO: valid entity_name_1 and entity_name_2 are in the table columns
# if entity_name_1 not in data.entities:
# logger.warning(
# f"Realtime feature {realtime_feature} depends on "
# f"entity={entity_name_1}, which is not found in entities",
# )
# continue
# if entity_name_2 not in data.entities:
# logger.warning(
# f"Realtime feature {realtime_feature} depends on "
# f"entity={entity_name_2}, which is not found in entities",
# )
# continue
composite_entities[entity_type_column] = entity_names
valid_realtime_features.append(realtime_feature)

Expand All @@ -542,13 +494,11 @@ async def get_historical_features_v2(
)
composite_historical_feature_table = f"HISTORICAL_FEATURES_{random_id}"

# TODO: send this sql to snowflake to create temporary table with this select_sql query
select_sql = f"""
CREATE TABLE {composite_historical_feature_table} AS
CREATE TEMPORARY TABLE {composite_historical_feature_table} AS
SELECT *, {composite_columns}, TIMESTAMP as event_timestamp
FROM {data.table}
"""

snowflake_ctx = generate_snowflake_ctx()
snowflake_ctx.cursor().execute(select_sql)

Expand Down
38 changes: 23 additions & 15 deletions wyvern/feature_store/historical_feature_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,12 @@ def build_and_merge_realtime_pivot_tables(
],
)
feature_names_pivot_raw = ",".join(
["\"'{fn}'\" as {fn}" for fn in curr_feature_names_underscore],
[f"\"'{fn}'\" as {fn}" for fn in curr_feature_names_underscore],
)

# TODO: send this sql to snowflake
pivot_sql = f"""
CREATE TABLE {next_table} AS (
CREATE TEMPORARY TABLE {next_table} AS (
WITH PIVOT_DATA AS (
SELECT F.REQUEST_ID AS REQUEST,
F.API_SOURCE,
Expand All @@ -200,14 +200,16 @@ def build_and_merge_realtime_pivot_tables(
{prev_table}.*,{feature_names_with_pivot_table_str}
FROM
{prev_table}
LEFT JOIN PIVOT_TABLE ON {prev_table}.REQUEST = PIVOT_TABLE.REQUEST AND {prev_table}.{entity_identifier_type} = PIVOT_TABLE.FEATURE_IDENTIFIER
LEFT JOIN PIVOT_TABLE ON
{prev_table}.REQUEST = PIVOT_TABLE.REQUEST AND
{prev_table}.{entity_identifier_type} = PIVOT_TABLE.FEATURE_IDENTIFIER
)
"""
context.cursor().execute(pivot_sql)
counter += 1
prev_table = next_table
next_table = f"{composite_table}_{counter}"
return next_table
return prev_table


def process_historical_real_time_features_requests(
Expand Down Expand Up @@ -436,6 +438,8 @@ def build_and_merge_feast_tables(
f"Entity name should be singular or composite: {entity_name}",
)

feature_columns = [fn.replace(":", "__") for fn in feature_names]

# TODO: validate that all entities are in the entity_df_table
# for entity in entities:
# if entity not in entity_values:
Expand All @@ -454,7 +458,7 @@ def build_and_merge_feast_tables(

# dedupe (IDENTIFIER, event_timestamp)
identifier_table_sql = f"""
WITH identifier_table_sql_dedupe AS ({identifier_table_sql_dupe})
WITH identifier_table_sql_dupe AS ({identifier_table_sql_dupe})
SELECT IDENTIFIER, event_timestamp
FROM identifier_table_sql_dupe
WHERE rn = 1
Expand All @@ -471,30 +475,34 @@ def build_and_merge_feast_tables(
# in the format "feast_entity_df_" followed by a hex string (UUID without dashes)
result_sql = re.sub(
r'"feast_entity_df_[0-9a-f]{32}"',
'"identifier_tbl"',
"identifier_tbl",
result_sql,
flags=re.IGNORECASE,
)
new_feast_table_sql = f"""
CREATE TABLE {next_table}_feast AS (
WITH identifier_tbl_dupe AS ({identifier_table_sql_dupe}),
identifier_tbl AS (
SELECT IDENTIFIER, event_timestamp
FROM identifier_tbl_dupe,
WHERE rn = 1
),
CREATE TEMPORARY TABLE {next_table}_feast AS (
WITH identifier_tbl_dupe AS ({identifier_table_sql_dupe}),
identifier_tbl AS (
SELECT IDENTIFIER, event_timestamp
FROM identifier_tbl_dupe
WHERE rn = 1
),
{result_sql}
)
"""
context.cursor().execute(new_feast_table_sql)

# left join to the previous composite table
picked_feature_columns_str = ", ".join(
[f'{next_table}_feast."{c}"' for c in feature_columns],
)
new_composite_table_sql = f"""
CREATE TABLE {next_table} AS (
SELECT *
SELECT {prev_table}.*, {picked_feature_columns_str}
FROM {prev_table}
LEFT JOIN {next_table}_feast
ON {prev_table}.{identifier_column} = {next_table}_feast.IDENTIFIER and {prev_table}.event_timestamp = {next_table}_feast.event_timestamp
ON {prev_table}.{identifier_column} = {next_table}_feast.IDENTIFIER and
{prev_table}.event_timestamp = {next_table}_feast.event_timestamp
)
"""
context.cursor().execute(new_composite_table_sql)
Expand Down

0 comments on commit 8aa6779

Please sign in to comment.