Skip to content

Commit

Permalink
feat: Use ASOF JOIN in Snowflake offline store query (feast-dev#4850)
Browse files Browse the repository at this point in the history
* Use ASOF JOIN in Snowflake offline store query

Signed-off-by: hkuepers <hanno.kuepers@ratepay.com>

* Fix Snowflake query template for entityless feature views

Signed-off-by: hkuepers <hanno.kuepers@ratepay.com>

* Remove quotes on subquery in snowflake template

Signed-off-by: hkuepers <hanno.kuepers@ratepay.com>

* Use __subquery in Snowflake template for preparation

Signed-off-by: hkuepers <hanno.kuepers@ratepay.com>

* Fix deduplication in Snowflake query string

Signed-off-by: hkuepers <hanno.kuepers@ratepay.com>

* Use event_timestamp in ttl cte

Signed-off-by: hkuepers <hanno.kuepers@ratepay.com>

---------

Signed-off-by: hkuepers <hanno.kuepers@ratepay.com>
Co-authored-by: hkuepers <hanno.kuepers@ratepay.com>
  • Loading branch information
nanohanno and hkuepers authored Dec 23, 2024
1 parent 3dbd58b commit 8f591a2
Showing 1 changed file with 44 additions and 86 deletions.
130 changes: 44 additions & 86 deletions sdk/python/feast/infra/offline_stores/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -716,8 +716,8 @@ def _get_entity_df_event_timestamp_range(

MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN = """
/*
Compute a deterministic hash for the `left_table_query_string` that will be used throughout
all the logic as the field to GROUP BY the data
0. Compute a deterministic hash for the `left_table_query_string` that will be used throughout
all the logic as the field to GROUP BY the data.
*/
WITH "entity_dataframe" AS (
SELECT *,
Expand All @@ -739,6 +739,10 @@ def _get_entity_df_event_timestamp_range(
{% for featureview in featureviews %}
/*
1. Only select the required columns with entities of the featureview.
*/
"{{ featureview.name }}__entity_dataframe" AS (
SELECT
{{ featureview.entities | map('tojson') | join(', ')}}{% if featureview.entities %},{% else %}{% endif %}
Expand All @@ -752,20 +756,7 @@ def _get_entity_df_event_timestamp_range(
),
/*
This query template performs the point-in-time correctness join for a single feature set table
to the provided entity table.
1. We first join the current feature_view to the entity dataframe that has been passed.
This JOIN has the following logic:
- For each row of the entity dataframe, only keep the rows where the `timestamp_field`
is less than the one provided in the entity dataframe
- If there a TTL for the current feature_view, also keep the rows where the `timestamp_field`
is higher the the one provided minus the TTL
- For each row, Join on the entity key and retrieve the `entity_row_unique_id` that has been
computed previously
The output of this CTE will contain all the necessary information and already filtered out most
of the data that is not relevant.
2. Use subquery to prepare event_timestamp, created_timestamp, entity columns and feature columns.
*/
"{{ featureview.name }}__subquery" AS (
Expand All @@ -777,94 +768,61 @@ def _get_entity_df_event_timestamp_range(
"{{ feature }}" as {% if full_feature_names %}"{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}"{% else %}"{{ featureview.field_mapping.get(feature, feature) }}"{% endif %}{% if loop.last %}{% else %}, {% endif %}
{% endfor %}
FROM {{ featureview.table_subquery }}
WHERE "{{ featureview.timestamp_field }}" <= '{{ featureview.max_event_timestamp }}'
{% if featureview.ttl == 0 %}{% else %}
AND "{{ featureview.timestamp_field }}" >= '{{ featureview.min_event_timestamp }}'
{% endif %}
),
"{{ featureview.name }}__base" AS (
SELECT
"subquery".*,
"entity_dataframe"."entity_timestamp",
"entity_dataframe"."{{featureview.name}}__entity_row_unique_id"
FROM "{{ featureview.name }}__subquery" AS "subquery"
INNER JOIN "{{ featureview.name }}__entity_dataframe" AS "entity_dataframe"
ON TRUE
AND "subquery"."event_timestamp" <= "entity_dataframe"."entity_timestamp"
{% if featureview.ttl == 0 %}{% else %}
AND "subquery"."event_timestamp" >= TIMESTAMPADD(second,-{{ featureview.ttl }},"entity_dataframe"."entity_timestamp")
{% endif %}
{% for entity in featureview.entities %}
AND "subquery"."{{ entity }}" = "entity_dataframe"."{{ entity }}"
{% endfor %}
),
/*
2. If the `created_timestamp_column` has been set, we need to
deduplicate the data first. This is done by calculating the
`MAX(created_at_timestamp)` for each event_timestamp.
We then join the data on the next CTE
3. If the `created_timestamp_column` has been set, we need to
deduplicate the data first. This is done by calculating the
`MAX(created_at_timestamp)` for each event_timestamp and joining back on the subquery.
Otherwise, the ASOF JOIN can have unstable side effects
https://docs.snowflake.com/en/sql-reference/constructs/asof-join#expected-behavior-when-ties-exist-in-the-right-table
*/
{% if featureview.created_timestamp_column %}
"{{ featureview.name }}__dedup" AS (
SELECT
"{{featureview.name}}__entity_row_unique_id",
"event_timestamp",
MAX("created_timestamp") AS "created_timestamp"
FROM "{{ featureview.name }}__base"
GROUP BY "{{featureview.name}}__entity_row_unique_id", "event_timestamp"
SELECT *
FROM "{{ featureview.name }}__subquery"
INNER JOIN (
SELECT
{{ featureview.entities | map('tojson') | join(', ')}}{% if featureview.entities %},{% else %}{% endif %}
"event_timestamp",
MAX("created_timestamp") AS "created_timestamp"
FROM "{{ featureview.name }}__subquery"
GROUP BY {{ featureview.entities | map('tojson') | join(', ')}}{% if featureview.entities %},{% else %}{% endif %} "event_timestamp"
)
USING({{ featureview.entities | map('tojson') | join(', ')}}{% if featureview.entities %},{% else %}{% endif %} "event_timestamp", "created_timestamp")
),
{% endif %}
/*
3. The data has been filtered during the first CTE "*__base"
Thus we only need to compute the latest timestamp of each feature.
4. Make ASOF JOIN of deduplicated feature CTE on reduced entity dataframe.
*/
"{{ featureview.name }}__latest" AS (
"{{ featureview.name }}__asof_join" AS (
SELECT
"event_timestamp",
{% if featureview.created_timestamp_column %}"created_timestamp",{% endif %}
"{{featureview.name}}__entity_row_unique_id"
FROM
(
SELECT *,
ROW_NUMBER() OVER(
PARTITION BY "{{featureview.name}}__entity_row_unique_id"
ORDER BY "event_timestamp" DESC{% if featureview.created_timestamp_column %},"created_timestamp" DESC{% endif %}
) AS "row_number"
FROM "{{ featureview.name }}__base"
{% if featureview.created_timestamp_column %}
INNER JOIN "{{ featureview.name }}__dedup"
USING ("{{featureview.name}}__entity_row_unique_id", "event_timestamp", "created_timestamp")
{% endif %}
)
WHERE "row_number" = 1
e.*,
v.*
FROM "{{ featureview.name }}__entity_dataframe" e
ASOF JOIN {% if featureview.created_timestamp_column %}"{{ featureview.name }}__dedup"{% else %}"{{ featureview.name }}__subquery"{% endif %} v
MATCH_CONDITION (e."entity_timestamp" >= v."event_timestamp")
{% if featureview.entities %} USING({{ featureview.entities | map('tojson') | join(', ')}}) {% endif %}
),
/*
4. Once we know the latest value of each feature for a given timestamp,
we can join again the data back to the original "base" dataset
5. If TTL is configured filter the CTE to remove rows where the feature values are older than the configured ttl.
*/
"{{ featureview.name }}__cleaned" AS (
SELECT "base".*
FROM "{{ featureview.name }}__base" AS "base"
INNER JOIN "{{ featureview.name }}__latest"
USING(
"{{featureview.name}}__entity_row_unique_id",
"event_timestamp"
{% if featureview.created_timestamp_column %}
,"created_timestamp"
{% endif %}
)
){% if loop.last %}{% else %}, {% endif %}
"{{ featureview.name }}__ttl" AS (
SELECT *
FROM "{{ featureview.name }}__asof_join"
{% if featureview.ttl == 0 %}{% else %}
WHERE "event_timestamp" >= TIMESTAMPADD(second,-{{ featureview.ttl }},"entity_timestamp")
{% endif %}
){% if loop.last %}{% else %}, {% endif %}
{% endfor %}
/*
Joins the outputs of multiple time travel joins to a single table.
Join the outputs of multiple time travel joins to a single table.
The entity_dataframe dataset being our source of truth here.
*/
Expand All @@ -877,7 +835,7 @@ def _get_entity_df_event_timestamp_range(
{% for feature in featureview.features %}
,{% if full_feature_names %}"{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}"{% else %}"{{ featureview.field_mapping.get(feature, feature) }}"{% endif %}
{% endfor %}
FROM "{{ featureview.name }}__cleaned"
) "{{ featureview.name }}__cleaned" USING ("{{featureview.name}}__entity_row_unique_id")
FROM "{{ featureview.name }}__ttl"
) "{{ featureview.name }}__ttl" USING ("{{featureview.name}}__entity_row_unique_id")
{% endfor %}
"""

0 comments on commit 8f591a2

Please sign in to comment.