From a096ba3927ad909f1ec5b0e5709baae5f3281317 Mon Sep 17 00:00:00 2001 From: Lilly Date: Fri, 26 Jul 2024 14:33:00 -0700 Subject: [PATCH 01/23] update for keywords --- ads/features/batch_features/ad_auction_keywords.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/ads/features/batch_features/ad_auction_keywords.py b/ads/features/batch_features/ad_auction_keywords.py index 325bb17..e2b08df 100644 --- a/ads/features/batch_features/ad_auction_keywords.py +++ b/ads/features/batch_features/ad_auction_keywords.py @@ -1,4 +1,5 @@ -from tecton import transformation, FilteredSource, batch_feature_view, const +from tecton import transformation, FilteredSource, batch_feature_view, const, Attribute +from tecton.types import String, Array, Int32, Bool from ads.entities import auction from ads.data_sources.ad_impressions import ad_impressions_batch from datetime import datetime, timedelta @@ -32,6 +33,13 @@ def keyword_stats(input_data, keyword_column): mode='pipeline', sources=[FilteredSource(ad_impressions_batch)], entities=[auction], + timestamp_field="timestamp", + features=[ + Attribute(name="auction_id", dtype=String), + Attribute(name="keyword_list", dtype=Array(String)), + Attribute(name="num_keywords", dtype=Int32), + Attribute(name="keyword_contains_bitcoin", dtype=Bool), + ], ttl=timedelta(days=1), batch_schedule=timedelta(days=1), online=False, From 574f918b750042fb4baead3a320302fa196dc0a4 Mon Sep 17 00:00:00 2001 From: Lilly Date: Fri, 26 Jul 2024 15:53:59 -0700 Subject: [PATCH 02/23] update ads with features --- ads/entities.py | 12 ++++++------ .../batch_features/user_distinct_ad_count_7d.py | 10 ++++++++-- ads/features/feature_tables/ad_embeddings.py | 8 ++++++-- ads/features/feature_tables/user_embeddings.py | 8 ++++++-- .../user_ad_embedding_similarity.py | 9 +++++---- .../user_query_embedding_similarity.py | 7 ++----- .../stream_features/content_keyword_click_counts.py | 11 +++++++---- .../stream_features/user_ad_impression_counts.py | 12 +++++++----- ads/features/stream_features/user_click_counts.py | 12 +++++++----- .../stream_features/user_click_counts_push.py | 12 +++++++----- .../stream_features/user_impression_counts.py | 13 ++++++++----- 11 files changed, 69 insertions(+), 45 deletions(-) diff --git a/ads/entities.py b/ads/entities.py index 7181dae..13c8ee4 100644 --- a/ads/entities.py +++ b/ads/entities.py @@ -1,9 +1,9 @@ from tecton import Entity - +from tecton.types import Field, String ad = Entity( name='ad', - join_keys=['ad_id'], + join_keys=[Field('ad_id', String)], description='An ad', owner='demo-user@tecton.ai', tags={'release': 'production'} @@ -11,7 +11,7 @@ content = Entity( name="content", - join_keys=["content_id"], + join_keys=[Field('content_id', String)], description='Content ID', owner='demo-user@tecton.ai', tags={'release': 'production'} @@ -19,14 +19,14 @@ auction = Entity( name="auction", - join_keys=["auction_id"], + join_keys=[Field('auction_id', String)], description='Auction ID', owner='demo-user@tecton.ai', ) user = Entity( name='ads_user', - join_keys=['user_id'], + join_keys=[Field('content_keyword', String)], description='A user of the platform', owner='demo-user@tecton.ai', tags={'release': 'production'} @@ -34,7 +34,7 @@ content_keyword = Entity( name='ContentKeyword', - join_keys=['content_keyword'], + join_keys=[Field('content_keyword', String)], description='The keyword describing the content this ad is being placed alongside.', owner='demo-user@tecton.ai', tags={'release': 'production'} diff --git a/ads/features/batch_features/user_distinct_ad_count_7d.py b/ads/features/batch_features/user_distinct_ad_count_7d.py index b088724..c033e18 100644 --- a/ads/features/batch_features/user_distinct_ad_count_7d.py +++ b/ads/features/batch_features/user_distinct_ad_count_7d.py @@ -1,4 +1,5 @@ -from tecton import batch_feature_view, FilteredSource, materialization_context +from tecton import batch_feature_view, FilteredSource, materialization_context, Attribute +from tecton.types import String, Int64 from ads.entities import user from ads.data_sources.ad_impressions import ad_impressions_batch from datetime import datetime, timedelta @@ -16,7 +17,12 @@ feature_start_time=datetime(2022, 5, 1), tags={'release': 'production', 'usecase': 'ads'}, owner='demo-user@tecton.ai', - description='How many distinct advertisements a user has been shown in the last week' + description='How many distinct advertisements a user has been shown in the last week', + timestamp_field='timestamp', + features=[ + Attribute(name='user_id', dtype=String), + Attribute(name='distinct_ad_count', dtype=Int64), + ] ) def user_distinct_ad_count_7d(ad_impressions, context=materialization_context()): return f''' diff --git a/ads/features/feature_tables/ad_embeddings.py b/ads/features/feature_tables/ad_embeddings.py index 211abad..36e0a81 100644 --- a/ads/features/feature_tables/ad_embeddings.py +++ b/ads/features/feature_tables/ad_embeddings.py @@ -1,5 +1,5 @@ from tecton.types import Field, String, Timestamp, Array, Float64 -from tecton import Entity, FeatureTable, DeltaConfig +from tecton import FeatureTable, Attribute from ads.entities import ad from datetime import timedelta @@ -13,7 +13,11 @@ ad_embeddings = FeatureTable( name='ad_embeddings', entities=[ad], - schema=schema, + features=[ + Attribute(name='ad_id', dtype=String), + Attribute(name='ad_embedding', dtype=Array(Float64)), + ], + timestamp_field='timestamp', online=True, offline=True, ttl=timedelta(days=10), diff --git a/ads/features/feature_tables/user_embeddings.py b/ads/features/feature_tables/user_embeddings.py index a992b0a..2eb3a99 100644 --- a/ads/features/feature_tables/user_embeddings.py +++ b/ads/features/feature_tables/user_embeddings.py @@ -1,5 +1,5 @@ from tecton.types import Field, String, Timestamp, Array, Float64 -from tecton import Entity, FeatureTable, DeltaConfig +from tecton import FeatureTable, Attribute from ads.entities import user from datetime import timedelta @@ -14,7 +14,11 @@ user_embeddings = FeatureTable( name='user_embeddings', entities=[user], - schema=schema, + features=[ + Attribute(name='user_id', dtype=String), + Attribute(name='user_embedding', dtype=Array(Float64)), + ], + timestamp_field="timestamp", online=True, offline=True, ttl=timedelta(days=10), diff --git a/ads/features/on_demand_feature_views/user_ad_embedding_similarity.py b/ads/features/on_demand_feature_views/user_ad_embedding_similarity.py index 83a1879..da40044 100644 --- a/ads/features/on_demand_feature_views/user_ad_embedding_similarity.py +++ b/ads/features/on_demand_feature_views/user_ad_embedding_similarity.py @@ -1,14 +1,15 @@ -from tecton import on_demand_feature_view -from tecton.types import Field, Float64 +from tecton import on_demand_feature_view, Attribute +from tecton.types import Float64 from ads.features.feature_tables.user_embeddings import user_embeddings from ads.features.feature_tables.ad_embeddings import ad_embeddings -output_schema = [Field('cosine_similarity', Float64)] @on_demand_feature_view( sources=[ad_embeddings, user_embeddings], mode='python', - schema=output_schema, + features=[ + Attribute(name='cosine_similarity', dtype=Float64) + ], owner='demo-user@tecton.ai', tags={'release': 'production'}, description="Computes the cosine similarity between a precomputed ad embedding and a precomputed user embedding." diff --git a/ads/features/on_demand_feature_views/user_query_embedding_similarity.py b/ads/features/on_demand_feature_views/user_query_embedding_similarity.py index 85acdf3..cd35eca 100644 --- a/ads/features/on_demand_feature_views/user_query_embedding_similarity.py +++ b/ads/features/on_demand_feature_views/user_query_embedding_similarity.py @@ -1,4 +1,4 @@ -from tecton import RequestSource, on_demand_feature_view +from tecton import RequestSource, on_demand_feature_view, Attribute from tecton.types import Field, Array, Float64 from ads.features.feature_tables.user_embeddings import user_embeddings @@ -6,13 +6,10 @@ request_schema = [Field('query_embedding', Array(Float64))] request = RequestSource(schema=request_schema) -output_schema = [Field('cosine_similarity', Float64)] - - @on_demand_feature_view( sources=[request, user_embeddings], mode='python', - schema=output_schema, + features=[Attribute(name='cosine_similarity', dtype=Float64)], owner='demo-user@tecton.ai', tags={'release': 'production'}, description="Computes the cosine similarity between a query embedding and a precomputed user embedding." diff --git a/ads/features/stream_features/content_keyword_click_counts.py b/ads/features/stream_features/content_keyword_click_counts.py index 7457a4c..3ce0954 100644 --- a/ads/features/stream_features/content_keyword_click_counts.py +++ b/ads/features/stream_features/content_keyword_click_counts.py @@ -1,4 +1,6 @@ -from tecton import stream_feature_view, FilteredSource, Aggregation, DatabricksClusterConfig, StreamProcessingMode +from tecton import stream_feature_view, FilteredSource, DatabricksClusterConfig, StreamProcessingMode, Aggregate +from tecton.types import Field, Bool + from ads.entities import content_keyword from ads.data_sources.ad_impressions import ad_impressions_stream from datetime import datetime, timedelta @@ -14,10 +16,11 @@ entities=[content_keyword], mode='pyspark', stream_processing_mode=StreamProcessingMode.CONTINUOUS, # enable low latency streaming - aggregations=[ - Aggregation(column='clicked', function='count', time_window=timedelta(minutes=1)), - Aggregation(column='clicked', function='count', time_window=timedelta(minutes=5)), + features=[ + Aggregate(input_column=Field('column', Bool), function='count', time_window=timedelta(minutes=1)), + Aggregate(input_column=Field('column', Bool), function='count', time_window=timedelta(minutes=5)), ], + timestamp_field='timestamp', batch_compute=cluster_config, stream_compute=cluster_config, online=False, diff --git a/ads/features/stream_features/user_ad_impression_counts.py b/ads/features/stream_features/user_ad_impression_counts.py index 71705dc..737622c 100644 --- a/ads/features/stream_features/user_ad_impression_counts.py +++ b/ads/features/stream_features/user_ad_impression_counts.py @@ -1,4 +1,5 @@ -from tecton import stream_feature_view, Aggregation, FilteredSource +from tecton import stream_feature_view, Aggregate, FilteredSource +from tecton.types import Int32 from ads.entities import ad, user from ads.data_sources.ad_impressions import ad_impressions_stream from datetime import datetime, timedelta @@ -9,11 +10,12 @@ entities=[user, ad], mode='spark_sql', aggregation_interval=timedelta(hours=1), - aggregations=[ - Aggregation(column='impression', function='count', time_window=timedelta(hours=1)), - Aggregation(column='impression', function='count', time_window=timedelta(hours=24)), - Aggregation(column='impression', function='count', time_window=timedelta(hours=72)), + features=[ + Aggregate(input_column=('impression', Int32), function='count', time_window=timedelta(hours=1)), + Aggregate(input_column=('impression', Int32), function='count', time_window=timedelta(hours=24)), + Aggregate(input_column=('impression', Int32), function='count', time_window=timedelta(hours=72)), ], + timestamp_field='timestamp', online=False, offline=False, batch_schedule=timedelta(days=1), diff --git a/ads/features/stream_features/user_click_counts.py b/ads/features/stream_features/user_click_counts.py index 984467e..28a213a 100644 --- a/ads/features/stream_features/user_click_counts.py +++ b/ads/features/stream_features/user_click_counts.py @@ -1,4 +1,6 @@ -from tecton import stream_feature_view, FilteredSource, Aggregation +from tecton import stream_feature_view, FilteredSource, Aggregate +from tecton.types import Field, Bool + from ads.entities import user from ads.data_sources.ad_impressions import ad_impressions_stream from datetime import datetime, timedelta @@ -9,10 +11,10 @@ entities=[user], mode='pyspark', aggregation_interval=timedelta(hours=1), - aggregations=[ - Aggregation(column='clicked', function='count', time_window=timedelta(hours=1)), - Aggregation(column='clicked', function='count', time_window=timedelta(hours=24)), - Aggregation(column='clicked', function='count', time_window=timedelta(hours=72)), + features=[ + Aggregate(input_column=Field('clicked', Bool), function='count', time_window=timedelta(hours=1)), + Aggregate(input_column=Field('clicked', Bool), function='count', time_window=timedelta(hours=24)), + Aggregate(input_column=Field('clicked', Bool), function='count', time_window=timedelta(hours=72)), ], online=False, offline=False, diff --git a/ads/features/stream_features/user_click_counts_push.py b/ads/features/stream_features/user_click_counts_push.py index 8a18867..0882839 100644 --- a/ads/features/stream_features/user_click_counts_push.py +++ b/ads/features/stream_features/user_click_counts_push.py @@ -1,5 +1,6 @@ from datetime import timedelta, datetime -from tecton import StreamFeatureView, Aggregation +from tecton import StreamFeatureView, Aggregation, Aggregate +from tecton.types import Bool, Field from ads.entities import user from ads.data_sources.ad_impressions import user_click_push_source @@ -15,11 +16,12 @@ offline=True, feature_start_time=datetime(2023, 1, 1), alert_email="demo-user@tecton.ai", - aggregations=[ - Aggregation(column='clicked', function='count', time_window=timedelta(hours=1)), - Aggregation(column='clicked', function='count', time_window=timedelta(hours=24)), - Aggregation(column='clicked', function='count', time_window=timedelta(hours=72)), + features=[ + Aggregate(input_column=Field('clicked', Bool), function='count', time_window=timedelta(hours=1)), + Aggregate(input_column=Field('clicked', Bool), function='count', time_window=timedelta(hours=24)), + Aggregate(input_column=Field('clicked', Bool), function='count', time_window=timedelta(hours=72)), ], + timestamp_field='timestamp', tags={'release': 'production'}, owner='demo-user@tecton.ai', description='The count of ad clicks for a user' diff --git a/ads/features/stream_features/user_impression_counts.py b/ads/features/stream_features/user_impression_counts.py index 503ff7c..7c7e2fd 100644 --- a/ads/features/stream_features/user_impression_counts.py +++ b/ads/features/stream_features/user_impression_counts.py @@ -1,4 +1,6 @@ -from tecton import stream_feature_view, FilteredSource, Aggregation +from tecton import stream_feature_view, FilteredSource, Aggregation, Aggregate +from tecton.types import Int32, Field + from ads.entities import user from ads.data_sources.ad_impressions import ad_impressions_stream from datetime import datetime, timedelta @@ -9,11 +11,12 @@ entities=[user], mode='spark_sql', aggregation_interval=timedelta(hours=1), - aggregations=[ - Aggregation(column='impression', function='count', time_window=timedelta(hours=1)), - Aggregation(column='impression', function='count', time_window=timedelta(hours=24)), - Aggregation(column='impression', function='count', time_window=timedelta(hours=72)), + features=[ + Aggregate(input_column=Field('impression', Int32), function='count', time_window=timedelta(hours=1)), + Aggregate(input_column=Aggregate('impression', Int32), function='count', time_window=timedelta(hours=24)), + Aggregate(input_column=Aggregate('impression', Int32), function='count', time_window=timedelta(hours=72)), ], + timestamp_field='timestamp', online=False, offline=False, batch_schedule=timedelta(days=1), From d9d9c9d7cb095977307b8c5eb616234f3e647a80 Mon Sep 17 00:00:00 2001 From: Lilly Date: Fri, 26 Jul 2024 15:59:27 -0700 Subject: [PATCH 03/23] more fix --- .../stream_features/content_keyword_click_counts.py | 6 +++--- .../stream_features/content_keyword_clicks_push.py | 10 ++++++++-- .../stream_features/user_ad_impression_counts.py | 8 ++++---- ads/features/stream_features/user_impression_counts.py | 4 ++-- 4 files changed, 17 insertions(+), 11 deletions(-) diff --git a/ads/features/stream_features/content_keyword_click_counts.py b/ads/features/stream_features/content_keyword_click_counts.py index 3ce0954..5f31b88 100644 --- a/ads/features/stream_features/content_keyword_click_counts.py +++ b/ads/features/stream_features/content_keyword_click_counts.py @@ -1,5 +1,5 @@ from tecton import stream_feature_view, FilteredSource, DatabricksClusterConfig, StreamProcessingMode, Aggregate -from tecton.types import Field, Bool +from tecton.types import Field, Int64 from ads.entities import content_keyword from ads.data_sources.ad_impressions import ad_impressions_stream @@ -17,8 +17,8 @@ mode='pyspark', stream_processing_mode=StreamProcessingMode.CONTINUOUS, # enable low latency streaming features=[ - Aggregate(input_column=Field('column', Bool), function='count', time_window=timedelta(minutes=1)), - Aggregate(input_column=Field('column', Bool), function='count', time_window=timedelta(minutes=5)), + Aggregate(input_column=Field('clicked', Int64), function='count', time_window=timedelta(minutes=1)), + Aggregate(input_column=Field('clicked', Int64), function='count', time_window=timedelta(minutes=5)), ], timestamp_field='timestamp', batch_compute=cluster_config, diff --git a/ads/features/stream_features/content_keyword_clicks_push.py b/ads/features/stream_features/content_keyword_clicks_push.py index 69c8d7f..c8aab63 100644 --- a/ads/features/stream_features/content_keyword_clicks_push.py +++ b/ads/features/stream_features/content_keyword_clicks_push.py @@ -1,5 +1,7 @@ from datetime import timedelta, datetime -from tecton import StreamFeatureView, FilteredSource +from tecton import StreamFeatureView, FilteredSource, Attribute +from tecton.types import Int64 + from ads.entities import content_keyword from ads.data_sources.ad_impressions import keyword_click_source @@ -19,5 +21,9 @@ ttl=timedelta(days=30), tags={'release': 'production'}, owner='demo-user@tecton.ai', - description='The ad clicks for a content keyword' + description='The ad clicks for a content keyword', + timestamp_field='timestamp', + features=[ + Attribute(name='clicked', dtype=Int64), + ] ) diff --git a/ads/features/stream_features/user_ad_impression_counts.py b/ads/features/stream_features/user_ad_impression_counts.py index 737622c..6d85bed 100644 --- a/ads/features/stream_features/user_ad_impression_counts.py +++ b/ads/features/stream_features/user_ad_impression_counts.py @@ -1,5 +1,5 @@ from tecton import stream_feature_view, Aggregate, FilteredSource -from tecton.types import Int32 +from tecton.types import Int32, Field from ads.entities import ad, user from ads.data_sources.ad_impressions import ad_impressions_stream from datetime import datetime, timedelta @@ -11,9 +11,9 @@ mode='spark_sql', aggregation_interval=timedelta(hours=1), features=[ - Aggregate(input_column=('impression', Int32), function='count', time_window=timedelta(hours=1)), - Aggregate(input_column=('impression', Int32), function='count', time_window=timedelta(hours=24)), - Aggregate(input_column=('impression', Int32), function='count', time_window=timedelta(hours=72)), + Aggregate(input_column=Field('impression', Int32), function='count', time_window=timedelta(hours=1)), + Aggregate(input_column=Field('impression', Int32), function='count', time_window=timedelta(hours=24)), + Aggregate(input_column=Field('impression', Int32), function='count', time_window=timedelta(hours=72)), ], timestamp_field='timestamp', online=False, diff --git a/ads/features/stream_features/user_impression_counts.py b/ads/features/stream_features/user_impression_counts.py index 7c7e2fd..cb3a1ba 100644 --- a/ads/features/stream_features/user_impression_counts.py +++ b/ads/features/stream_features/user_impression_counts.py @@ -13,8 +13,8 @@ aggregation_interval=timedelta(hours=1), features=[ Aggregate(input_column=Field('impression', Int32), function='count', time_window=timedelta(hours=1)), - Aggregate(input_column=Aggregate('impression', Int32), function='count', time_window=timedelta(hours=24)), - Aggregate(input_column=Aggregate('impression', Int32), function='count', time_window=timedelta(hours=72)), + Aggregate(input_column=Field('impression', Int32), function='count', time_window=timedelta(hours=24)), + Aggregate(input_column=Field('impression', Int32), function='count', time_window=timedelta(hours=72)), ], timestamp_field='timestamp', online=False, From bd683633b47a8cede4671a465ea33d24e19f7195 Mon Sep 17 00:00:00 2001 From: Lilly Date: Wed, 31 Jul 2024 17:27:09 -0700 Subject: [PATCH 04/23] update --- ads/entities.py | 6 +++--- ads/features/batch_features/ad_auction_keywords.py | 1 - .../batch_features/user_distinct_ad_count_7d.py | 1 - ads/features/feature_tables/ad_embeddings.py | 11 ++--------- ads/features/feature_tables/user_embeddings.py | 8 -------- .../stream_features/content_keyword_click_counts.py | 6 +++--- .../stream_features/content_keyword_clicks_push.py | 4 ++-- ads/features/stream_features/user_click_counts.py | 11 ++++++----- .../stream_features/user_click_counts_push.py | 8 ++++---- 9 files changed, 20 insertions(+), 36 deletions(-) diff --git a/ads/entities.py b/ads/entities.py index 13c8ee4..562e2bb 100644 --- a/ads/entities.py +++ b/ads/entities.py @@ -1,9 +1,9 @@ from tecton import Entity -from tecton.types import Field, String +from tecton.types import Field, String, Int64 ad = Entity( name='ad', - join_keys=[Field('ad_id', String)], + join_keys=[Field('ad_id', Int64)], description='An ad', owner='demo-user@tecton.ai', tags={'release': 'production'} @@ -26,7 +26,7 @@ user = Entity( name='ads_user', - join_keys=[Field('content_keyword', String)], + join_keys=[Field('user_id', String)], description='A user of the platform', owner='demo-user@tecton.ai', tags={'release': 'production'} diff --git a/ads/features/batch_features/ad_auction_keywords.py b/ads/features/batch_features/ad_auction_keywords.py index e2b08df..70a0aa8 100644 --- a/ads/features/batch_features/ad_auction_keywords.py +++ b/ads/features/batch_features/ad_auction_keywords.py @@ -35,7 +35,6 @@ def keyword_stats(input_data, keyword_column): entities=[auction], timestamp_field="timestamp", features=[ - Attribute(name="auction_id", dtype=String), Attribute(name="keyword_list", dtype=Array(String)), Attribute(name="num_keywords", dtype=Int32), Attribute(name="keyword_contains_bitcoin", dtype=Bool), diff --git a/ads/features/batch_features/user_distinct_ad_count_7d.py b/ads/features/batch_features/user_distinct_ad_count_7d.py index c033e18..a989071 100644 --- a/ads/features/batch_features/user_distinct_ad_count_7d.py +++ b/ads/features/batch_features/user_distinct_ad_count_7d.py @@ -20,7 +20,6 @@ description='How many distinct advertisements a user has been shown in the last week', timestamp_field='timestamp', features=[ - Attribute(name='user_id', dtype=String), Attribute(name='distinct_ad_count', dtype=Int64), ] ) diff --git a/ads/features/feature_tables/ad_embeddings.py b/ads/features/feature_tables/ad_embeddings.py index 36e0a81..e4c0900 100644 --- a/ads/features/feature_tables/ad_embeddings.py +++ b/ads/features/feature_tables/ad_embeddings.py @@ -1,20 +1,13 @@ -from tecton.types import Field, String, Timestamp, Array, Float64 +from tecton.types import Array, Float64 from tecton import FeatureTable, Attribute -from ads.entities import ad from datetime import timedelta -schema = [ - Field('ad_id', String), - Field('timestamp', Timestamp), - Field('ad_embedding', Array(Float64)) -] - +from ads.entities import ad ad_embeddings = FeatureTable( name='ad_embeddings', entities=[ad], features=[ - Attribute(name='ad_id', dtype=String), Attribute(name='ad_embedding', dtype=Array(Float64)), ], timestamp_field='timestamp', diff --git a/ads/features/feature_tables/user_embeddings.py b/ads/features/feature_tables/user_embeddings.py index 2eb3a99..c444650 100644 --- a/ads/features/feature_tables/user_embeddings.py +++ b/ads/features/feature_tables/user_embeddings.py @@ -4,18 +4,10 @@ from datetime import timedelta -schema = [ - Field('user_id', String), - Field('timestamp', Timestamp), - Field('user_embedding', Array(Float64)) -] - - user_embeddings = FeatureTable( name='user_embeddings', entities=[user], features=[ - Attribute(name='user_id', dtype=String), Attribute(name='user_embedding', dtype=Array(Float64)), ], timestamp_field="timestamp", diff --git a/ads/features/stream_features/content_keyword_click_counts.py b/ads/features/stream_features/content_keyword_click_counts.py index 5f31b88..bd4cc32 100644 --- a/ads/features/stream_features/content_keyword_click_counts.py +++ b/ads/features/stream_features/content_keyword_click_counts.py @@ -1,5 +1,5 @@ from tecton import stream_feature_view, FilteredSource, DatabricksClusterConfig, StreamProcessingMode, Aggregate -from tecton.types import Field, Int64 +from tecton.types import Field, Int64, Int32 from ads.entities import content_keyword from ads.data_sources.ad_impressions import ad_impressions_stream @@ -17,8 +17,8 @@ mode='pyspark', stream_processing_mode=StreamProcessingMode.CONTINUOUS, # enable low latency streaming features=[ - Aggregate(input_column=Field('clicked', Int64), function='count', time_window=timedelta(minutes=1)), - Aggregate(input_column=Field('clicked', Int64), function='count', time_window=timedelta(minutes=5)), + Aggregate(input_column=Field('clicked', Int32), function='count', time_window=timedelta(minutes=1)), + Aggregate(input_column=Field('clicked', Int32), function='count', time_window=timedelta(minutes=5)), ], timestamp_field='timestamp', batch_compute=cluster_config, diff --git a/ads/features/stream_features/content_keyword_clicks_push.py b/ads/features/stream_features/content_keyword_clicks_push.py index c8aab63..6b3fbca 100644 --- a/ads/features/stream_features/content_keyword_clicks_push.py +++ b/ads/features/stream_features/content_keyword_clicks_push.py @@ -1,7 +1,6 @@ from datetime import timedelta, datetime -from tecton import StreamFeatureView, FilteredSource, Attribute +from tecton import StreamFeatureView, FilteredSource, BatchTriggerType, Attribute from tecton.types import Int64 - from ads.entities import content_keyword from ads.data_sources.ad_impressions import keyword_click_source @@ -22,6 +21,7 @@ tags={'release': 'production'}, owner='demo-user@tecton.ai', description='The ad clicks for a content keyword', + batch_trigger=BatchTriggerType.MANUAL, timestamp_field='timestamp', features=[ Attribute(name='clicked', dtype=Int64), diff --git a/ads/features/stream_features/user_click_counts.py b/ads/features/stream_features/user_click_counts.py index 28a213a..d686b8a 100644 --- a/ads/features/stream_features/user_click_counts.py +++ b/ads/features/stream_features/user_click_counts.py @@ -1,5 +1,5 @@ from tecton import stream_feature_view, FilteredSource, Aggregate -from tecton.types import Field, Bool +from tecton.types import Field, Bool, Int64 from ads.entities import user from ads.data_sources.ad_impressions import ad_impressions_stream @@ -12,9 +12,9 @@ mode='pyspark', aggregation_interval=timedelta(hours=1), features=[ - Aggregate(input_column=Field('clicked', Bool), function='count', time_window=timedelta(hours=1)), - Aggregate(input_column=Field('clicked', Bool), function='count', time_window=timedelta(hours=24)), - Aggregate(input_column=Field('clicked', Bool), function='count', time_window=timedelta(hours=72)), + Aggregate(input_column=Field('clicked', Int64), function='count', time_window=timedelta(hours=1)), + Aggregate(input_column=Field('clicked', Int64), function='count', time_window=timedelta(hours=24)), + Aggregate(input_column=Field('clicked', Int64), function='count', time_window=timedelta(hours=72)), ], online=False, offline=False, @@ -22,7 +22,8 @@ feature_start_time=datetime(2022, 5, 1), tags={'release': 'production'}, owner='demo-user@tecton.ai', - description='The count of ad clicks for a user' + description='The count of ad clicks for a user', + timestamp_field='timestamp' ) def user_click_counts(ad_impressions): return ad_impressions.select(ad_impressions['user_uuid'].alias('user_id'), 'clicked', 'timestamp') diff --git a/ads/features/stream_features/user_click_counts_push.py b/ads/features/stream_features/user_click_counts_push.py index 0882839..05e9fa9 100644 --- a/ads/features/stream_features/user_click_counts_push.py +++ b/ads/features/stream_features/user_click_counts_push.py @@ -1,6 +1,6 @@ from datetime import timedelta, datetime from tecton import StreamFeatureView, Aggregation, Aggregate -from tecton.types import Bool, Field +from tecton.types import Bool, Field, Int64 from ads.entities import user from ads.data_sources.ad_impressions import user_click_push_source @@ -17,9 +17,9 @@ feature_start_time=datetime(2023, 1, 1), alert_email="demo-user@tecton.ai", features=[ - Aggregate(input_column=Field('clicked', Bool), function='count', time_window=timedelta(hours=1)), - Aggregate(input_column=Field('clicked', Bool), function='count', time_window=timedelta(hours=24)), - Aggregate(input_column=Field('clicked', Bool), function='count', time_window=timedelta(hours=72)), + Aggregate(input_column=Field('clicked', Int64), function='count', time_window=timedelta(hours=1)), + Aggregate(input_column=Field('clicked', Int64), function='count', time_window=timedelta(hours=24)), + Aggregate(input_column=Field('clicked', Int64), function='count', time_window=timedelta(hours=72)), ], timestamp_field='timestamp', tags={'release': 'production'}, From 7c03554416cd2e70feaf6e537ac3ea5beda8fb17 Mon Sep 17 00:00:00 2001 From: Lilly Date: Wed, 31 Jul 2024 17:36:32 -0700 Subject: [PATCH 05/23] update for test... --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 56c058e..b70902c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1 @@ -tecton[pyspark]~=0.7.0 +tecton[pyspark]~=0.10.0b30 From 9606dc33801e2e593f3b2618eac6d0eec1771892 Mon Sep 17 00:00:00 2001 From: Lilly Date: Wed, 31 Jul 2024 17:38:19 -0700 Subject: [PATCH 06/23] huh --- repo.yaml | 58 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) create mode 100644 repo.yaml diff --git a/repo.yaml b/repo.yaml new file mode 100644 index 0000000..f5e9cf8 --- /dev/null +++ b/repo.yaml @@ -0,0 +1,58 @@ +# This is the Tecton repo config. It's used to configure how Tecton builds and applies your +# feature definitions during `tecton plan/apply/test`. +# +# By default, the Tecton CLI will use the Repo Config specified at /repo.yaml, but you +# can specify another file by using `tecton plan --config my_config.yaml`. + +# The `defaults` keyword specifies default parameter values for Tecton objects defined in your Feature Repository. +# For example, you can set a default `tecton_materialization_runtime` for all Batch Feature Views. +# Defaults can be overridden on a per-object basis in your Python feature definitions. +# See Tecton's documentation for details on which Tecton objects are currently supported by the `defaults` keyword. + +defaults: + batch_feature_view: + tecton_materialization_runtime: 0.10.0b28 + environment: # For Rift-based Batch Feature Views + stream_feature_view: + tecton_materialization_runtime: 0.10.0b28 + environment: # For Rift-based Stream Feature Views + feature_table: + tecton_materialization_runtime: 0.10.0b28 + +# Below is an example of other defaults that can be set using the `defaults` keyword. +# defaults: +# batch_feature_view: +# tecton_materialization_runtime: 99.99.99 +# online_store: +# kind: RedisConfig +# offline_store: +# kind: OfflineStoreConfig +# staging_table_format: +# kind: ParquetConfig +# batch_compute: +# kind: DatabricksClusterConfig +# instance_type: m5.xlarge +# number_of_workers: 2 +# extra_pip_dependencies: +# - haversine==2.8.0 +# stream_feature_view: +# tecton_materialization_runtime: 99.99.99 +# stream_compute: +# kind: DatabricksClusterConfig +# instance_availability: on_demand +# instance_type: m5.2xlarge +# number_of_workers: 4 +# offline_store: +# kind: OfflineStoreConfig +# staging_table_format: +# kind: ParquetConfig +# feature_table: +# tecton_materialization_runtime: 99.99.99 +# batch_compute: +# kind: DatabricksClusterConfig +# instance_type: m5.xlarge +# number_of_workers: 2 +# online_store: +# kind: RedisConfig +# feature_service: +# on_demand_environment: tecton-python-extended:0.4 From 615acd509ec8f9d7008a40a94d0b3208c94a7ecf Mon Sep 17 00:00:00 2001 From: Lilly Date: Tue, 17 Sep 2024 09:20:10 -0700 Subject: [PATCH 07/23] [features migration] upgrade fraud - merge after 1.0 release (#74) https://linear.app/tecton/issue/FE-2120/update-the-samples-repo-to-use-the-features-parameter --- fraud/entities.py | 6 +++--- .../batch_transaction_features.py | 10 ++++++++-- .../batch_features/merchant_fraud_rate.py | 15 +++++++++------ .../transaction_user_is_adult.py | 10 ++++++++-- ...distinct_merchant_transaction_count_30d.py | 11 +++++++---- .../batch_features/user_credit_card_issuer.py | 7 ++++++- .../batch_features/user_date_of_birth.py | 8 +++++++- ...distinct_merchant_transaction_count_30d.py | 11 +++++++++-- .../batch_features/user_home_location.py | 11 +++++++++-- .../user_median_transaction_amount_30d.py | 11 +++++++---- .../user_merchant_transaction_count.py | 15 +++++++++------ ...user_transaction_aggregates_by_category.py | 17 ++++++++++------- .../batch_features/user_transaction_counts.py | 15 +++++++++------ .../user_weekend_transaction_count_30d.py | 11 +++++++---- .../feature_tables/user_login_counts.py | 13 ++++++------- .../complex_data_type.py | 13 ++++++------- .../transaction_amount_is_high.py | 6 +++--- ...ansaction_amount_is_higher_than_average.py | 6 +++--- .../transaction_distance_from_home.py | 6 +++--- .../on_demand_feature_views/user_age.py | 6 +++--- .../user_to_user_distance.py | 4 ++-- .../last_transaction_amount_pyspark.py | 8 +++++++- .../last_transaction_amount_sql.py | 8 +++++++- .../user_continuous_transaction_count.py | 15 +++++++++------ .../user_recent_transactions.py | 9 ++++++--- .../user_transaction_amount_metrics.py | 19 +++++++++++-------- recsys/entities.py | 5 +++-- .../batch_features/article_features.py | 8 +++++--- .../batch_features/session_features.py | 13 ++++++++----- 29 files changed, 190 insertions(+), 107 deletions(-) diff --git a/fraud/entities.py b/fraud/entities.py index a1c4ba6..8396026 100644 --- a/fraud/entities.py +++ b/fraud/entities.py @@ -1,9 +1,9 @@ from tecton import Entity - +from tecton.types import Field, String user = Entity( name='fraud_user', - join_keys=['user_id'], + join_keys=[Field('user_id', String)], description='A user of the platform', owner='demo-user@tecton.ai', tags={'release': 'production'} @@ -11,7 +11,7 @@ merchant = Entity( name='merchant', - join_keys=['merchant'], + join_keys=[Field('merchant', String)], description='A merchant', owner='demo-user@tecton.ai', tags={'release': 'production'} diff --git a/fraud/features/batch_features/batch_transaction_features.py b/fraud/features/batch_features/batch_transaction_features.py index d7261ad..d387bdc 100644 --- a/fraud/features/batch_features/batch_transaction_features.py +++ b/fraud/features/batch_features/batch_transaction_features.py @@ -1,4 +1,6 @@ -from tecton import batch_feature_view, FilteredSource +from tecton import batch_feature_view, FilteredSource, Attribute +from tecton.types import Float64 + from fraud.entities import user from fraud.data_sources.transactions import transactions_batch from datetime import datetime, timedelta @@ -12,7 +14,11 @@ feature_start_time=datetime(2022, 5, 1), batch_schedule=timedelta(days=1), ttl=timedelta(days=30), - description='Last user transaction amount (batch calculated)' + description='Last user transaction amount (batch calculated)', + features=[ + Attribute('amt', Float64) + ], + timestamp_field='timestamp' ) def last_transaction_amount(transactions_batch): return f''' diff --git a/fraud/features/batch_features/merchant_fraud_rate.py b/fraud/features/batch_features/merchant_fraud_rate.py index 7d605d1..7c4997e 100644 --- a/fraud/features/batch_features/merchant_fraud_rate.py +++ b/fraud/features/batch_features/merchant_fraud_rate.py @@ -1,4 +1,6 @@ -from tecton import batch_feature_view, Aggregation, FilteredSource +from tecton import batch_feature_view, Aggregation, FilteredSource, Aggregate +from tecton.types import Field, Bool, Int64, Int32 + from fraud.entities import merchant from fraud.data_sources.transactions import transactions_batch from datetime import datetime, timedelta @@ -11,13 +13,14 @@ online=True, offline=True, aggregation_interval=timedelta(days=1), - aggregations=[ - Aggregation(column='is_fraud', function='mean', time_window=timedelta(days=1)), - Aggregation(column='is_fraud', function='mean', time_window=timedelta(days=30)), - Aggregation(column='is_fraud', function='mean', time_window=timedelta(days=90)) + features=[ + Aggregate(input_column=Field('is_fraud', Int32), function='mean', time_window=timedelta(days=1)), + Aggregate(input_column=Field('is_fraud', Int32), function='mean', time_window=timedelta(days=30)), + Aggregate(input_column=Field('is_fraud', Int32), function='mean', time_window=timedelta(days=90)), ], feature_start_time=datetime(2022, 5, 1), - description='The merchant fraud rate over series of time windows, updated daily.' + description='The merchant fraud rate over series of time windows, updated daily.', + timestamp_field='timestamp' ) def merchant_fraud_rate(transactions_batch): return f''' diff --git a/fraud/features/batch_features/transaction_user_is_adult.py b/fraud/features/batch_features/transaction_user_is_adult.py index 5e611d5..69d4c9b 100644 --- a/fraud/features/batch_features/transaction_user_is_adult.py +++ b/fraud/features/batch_features/transaction_user_is_adult.py @@ -1,4 +1,6 @@ -from tecton import batch_feature_view, FilteredSource +from tecton import batch_feature_view, FilteredSource, Attribute +from tecton.types import Int32 + from fraud.entities import user from fraud.data_sources.fraud_users import fraud_users_batch from fraud.data_sources.transactions import transactions_batch @@ -18,7 +20,11 @@ ttl=timedelta(days=100), tags={'release': 'production'}, owner='demo-user@tecton.ai', - description='Whether the user performing the transaction is over 18 years old.' + description='Whether the user performing the transaction is over 18 years old.', + features=[ + Attribute('user_is_adult', Int32) + ], + timestamp_field='timestamp' ) def transaction_user_is_adult(transactions_batch, fraud_users_batch): return f''' diff --git a/fraud/features/batch_features/user_approx_distinct_merchant_transaction_count_30d.py b/fraud/features/batch_features/user_approx_distinct_merchant_transaction_count_30d.py index 1463e86..d57c4a8 100644 --- a/fraud/features/batch_features/user_approx_distinct_merchant_transaction_count_30d.py +++ b/fraud/features/batch_features/user_approx_distinct_merchant_transaction_count_30d.py @@ -1,5 +1,7 @@ -from tecton import batch_feature_view, Aggregation, FilteredSource +from tecton import batch_feature_view, Aggregation, FilteredSource, Aggregate from tecton.aggregation_functions import approx_count_distinct +from tecton.types import Field, String + from fraud.entities import user from fraud.data_sources.transactions import transactions_batch from datetime import datetime, timedelta @@ -15,11 +17,12 @@ offline=True, feature_start_time=datetime(2022, 4, 1), aggregation_interval=timedelta(days=1), - aggregations=[ - Aggregation(column='merchant', function=approx_count_distinct(), time_window=timedelta(days=30)) + features=[ + Aggregate(input_column=Field('merchant', String), function=approx_count_distinct(), time_window=timedelta(days=30)) ], tags={'release': 'production'}, - description='How many transactions the user has made to distinct merchants in the last 30 days.' + description='How many transactions the user has made to distinct merchants in the last 30 days.', + timestamp_field='timestamp' ) def user_approx_distinct_merchant_transaction_count_30d(transactions_batch): return f''' diff --git a/fraud/features/batch_features/user_credit_card_issuer.py b/fraud/features/batch_features/user_credit_card_issuer.py index 6a676cf..c4ce3da 100644 --- a/fraud/features/batch_features/user_credit_card_issuer.py +++ b/fraud/features/batch_features/user_credit_card_issuer.py @@ -1,4 +1,6 @@ -from tecton import batch_feature_view +from tecton import batch_feature_view, Attribute +from tecton.types import String + from fraud.entities import user from fraud.data_sources.fraud_users import fraud_users_batch from datetime import datetime, timedelta @@ -19,6 +21,9 @@ tags={'release': 'production'}, owner='demo-user@tecton.ai', description='User credit card issuer derived from the user credit card number.', + features=[ + Attribute('credit_card_issuer', String) + ] ) def user_credit_card_issuer(fraud_users_batch): return f''' diff --git a/fraud/features/batch_features/user_date_of_birth.py b/fraud/features/batch_features/user_date_of_birth.py index aecccb8..62a3b45 100644 --- a/fraud/features/batch_features/user_date_of_birth.py +++ b/fraud/features/batch_features/user_date_of_birth.py @@ -1,4 +1,6 @@ -from tecton import batch_feature_view +from tecton import batch_feature_view, Attribute +from tecton.types import String + from fraud.entities import user from fraud.data_sources.fraud_users import fraud_users_batch from datetime import datetime, timedelta @@ -17,6 +19,10 @@ tags={'release': 'production'}, owner='demo-user@tecton.ai', description='User date of birth, entered at signup.', + features=[ + Attribute('user_date_of_birth', String) + ], + timestamp_field='timestamp' ) def user_date_of_birth(fraud_users_batch): from pyspark.sql import functions as f diff --git a/fraud/features/batch_features/user_distinct_merchant_transaction_count_30d.py b/fraud/features/batch_features/user_distinct_merchant_transaction_count_30d.py index f0fb363..1e62e61 100644 --- a/fraud/features/batch_features/user_distinct_merchant_transaction_count_30d.py +++ b/fraud/features/batch_features/user_distinct_merchant_transaction_count_30d.py @@ -1,4 +1,6 @@ -from tecton import batch_feature_view, FilteredSource, materialization_context +from tecton import batch_feature_view, FilteredSource, materialization_context, Attribute +from tecton.types import Int64 + from fraud.entities import user from fraud.data_sources.transactions import transactions_batch from datetime import datetime, timedelta @@ -21,7 +23,12 @@ ttl=timedelta(days=2), owner='demo-user@tecton.ai', tags={'release': 'production'}, - description='How many transactions the user has made to distinct merchants in the last 30 days.' + description='How many transactions the user has made to distinct merchants in the last 30 days.', + features=[ + Attribute('distinct_merchant_transaction_count_30d', Int64) + ], + timestamp_field='timestamp' + ) def user_distinct_merchant_transaction_count_30d(transactions_batch, context=materialization_context()): return f''' diff --git a/fraud/features/batch_features/user_home_location.py b/fraud/features/batch_features/user_home_location.py index bd823c3..bc20d09 100644 --- a/fraud/features/batch_features/user_home_location.py +++ b/fraud/features/batch_features/user_home_location.py @@ -1,4 +1,6 @@ -from tecton import batch_feature_view +from tecton import batch_feature_view, Attribute +from tecton.types import Float64 + from fraud.entities import user from fraud.data_sources.fraud_users import fraud_users_batch from datetime import datetime, timedelta @@ -17,7 +19,12 @@ tags={'release': 'production'}, owner='demo-user@tecton.ai', description='User date of birth, entered at signup.', - timestamp_field='signup_timestamp' + timestamp_field='signup_timestamp', + features=[ + Attribute('lat', Float64), + Attribute('long', Float64), + ], + ) def user_home_location(fraud_users_batch): return f''' diff --git a/fraud/features/batch_features/user_median_transaction_amount_30d.py b/fraud/features/batch_features/user_median_transaction_amount_30d.py index 2aa2b03..59ca942 100644 --- a/fraud/features/batch_features/user_median_transaction_amount_30d.py +++ b/fraud/features/batch_features/user_median_transaction_amount_30d.py @@ -1,5 +1,7 @@ -from tecton import batch_feature_view, Aggregation, FilteredSource +from tecton import batch_feature_view, Aggregation, FilteredSource, Aggregate from tecton.aggregation_functions import approx_percentile +from tecton.types import Field, Float64 + from fraud.entities import user from fraud.data_sources.transactions import transactions_batch from datetime import datetime, timedelta @@ -12,10 +14,11 @@ offline=False, feature_start_time=datetime(2022, 5, 1), aggregation_interval=timedelta(days=1), - aggregations=[ - Aggregation(column='amt', function=approx_percentile(percentile=0.5), time_window=timedelta(days=30)), + features=[ + Aggregate(input_column=Field('amt', Float64), function=approx_percentile(percentile=0.5), time_window=timedelta(days=30)) ], - description='Median transaction amount for a user over the last 30 days' + description='Median transaction amount for a user over the last 30 days', + timestamp_field='timestamp' ) def user_median_transaction_amount_30d(transactions_batch): return f''' diff --git a/fraud/features/batch_features/user_merchant_transaction_count.py b/fraud/features/batch_features/user_merchant_transaction_count.py index 18fadd8..d9bd339 100644 --- a/fraud/features/batch_features/user_merchant_transaction_count.py +++ b/fraud/features/batch_features/user_merchant_transaction_count.py @@ -1,4 +1,6 @@ -from tecton import batch_feature_view, Aggregation, FilteredSource +from tecton import batch_feature_view, FilteredSource, Aggregate +from tecton.types import Field, Int32 + from fraud.entities import user, merchant from fraud.data_sources.transactions import transactions_batch from datetime import datetime, timedelta @@ -9,17 +11,18 @@ entities=[user, merchant], mode='spark_sql', aggregation_interval=timedelta(days=1), - aggregations=[ - Aggregation(column='transaction', function='count', time_window=timedelta(days=1)), - Aggregation(column='transaction', function='count', time_window=timedelta(days=30)), - Aggregation(column='transaction', function='count', time_window=timedelta(days=90)) + features=[ + Aggregate(input_column=Field('transaction', Int32), function='count', time_window=timedelta(days=1)), + Aggregate(input_column=Field('transaction', Int32), function='count', time_window=timedelta(days=30)), + Aggregate(input_column=Field('transaction', Int32), function='count', time_window=timedelta(days=90)), ], online=True, offline=True, feature_start_time=datetime(2022, 5, 1), tags={'release': 'production'}, owner='demo-user@tecton.ai', - description='User transaction counts at specific merchants over a series of time windows, updated daily.' + description='User transaction counts at specific merchants over a series of time windows, updated daily.', + timestamp_field='timestamp' ) def user_merchant_transaction_counts(transactions): return f''' diff --git a/fraud/features/batch_features/user_transaction_aggregates_by_category.py b/fraud/features/batch_features/user_transaction_aggregates_by_category.py index 0f7fc94..82cb40b 100644 --- a/fraud/features/batch_features/user_transaction_aggregates_by_category.py +++ b/fraud/features/batch_features/user_transaction_aggregates_by_category.py @@ -1,4 +1,6 @@ -from tecton import batch_feature_view, Aggregation, FilteredSource +from tecton import batch_feature_view, Aggregation, FilteredSource, Aggregate +from tecton.types import Field, Float64 + from fraud.entities import user from fraud.data_sources.transactions import transactions_batch from datetime import datetime, timedelta @@ -21,16 +23,16 @@ 'grocery_net', ] -aggregations = [ +generated_features = [ [ - Aggregation(column=f'{category}_amt', function='sum', time_window=timedelta(days=30)), - Aggregation(column=f'{category}_amt', function='mean', time_window=timedelta(days=30)) + Aggregate(input_column=Field(f'{category}_amt', Float64), function='sum', time_window=timedelta(days=30)), + Aggregate(input_column=Field(f'{category}_amt', Float64), function='mean', time_window=timedelta(days=30)), ] for category in CATEGORIES ] # Flatten the `aggregations` list of lists. -aggregations = reduce(lambda l, r: l + r, aggregations) +features = reduce(lambda l, r: l + r, generated_features) # This feature view produces aggregate metrics for each purchase category in a user's transaction history, e.g. how much # has the user spent on "health_fitness" in the past 30 days. This feature view creates two aggregate features for each @@ -42,11 +44,12 @@ entities=[user], mode='pyspark', aggregation_interval=timedelta(days=1), - aggregations=aggregations, + features=features, online=False, offline=False, feature_start_time=datetime(2022, 7, 1), - description='User transaction aggregate metrics split by purchase category.' + description='User transaction aggregate metrics split by purchase category.', + timestamp_field='timestamp' ) def user_transaction_aggregates_by_category(transactions_df): from pyspark.sql.functions import col, when diff --git a/fraud/features/batch_features/user_transaction_counts.py b/fraud/features/batch_features/user_transaction_counts.py index 6916d23..985a7c6 100644 --- a/fraud/features/batch_features/user_transaction_counts.py +++ b/fraud/features/batch_features/user_transaction_counts.py @@ -1,4 +1,6 @@ -from tecton import batch_feature_view, FilteredSource, Aggregation +from tecton import batch_feature_view, FilteredSource, Aggregation, Aggregate +from tecton.types import Field, Int32 + from fraud.entities import user from fraud.data_sources.transactions import transactions_batch from datetime import datetime, timedelta @@ -9,17 +11,18 @@ entities=[user], mode='spark_sql', aggregation_interval=timedelta(days=1), - aggregations=[ - Aggregation(column='transaction', function='count', time_window=timedelta(days=1)), - Aggregation(column='transaction', function='count', time_window=timedelta(days=30)), - Aggregation(column='transaction', function='count', time_window=timedelta(days=90)) + features=[ + Aggregate(input_column=Field('transaction', Int32), function='count', time_window=timedelta(days=1)), + Aggregate(input_column=Field('transaction', Int32), function='count', time_window=timedelta(days=30)), + Aggregate(input_column=Field('transaction', Int32), function='count', time_window=timedelta(days=90)), ], online=True, offline=True, feature_start_time=datetime(2022, 5, 1), tags={'release': 'production'}, owner='demo-user@tecton.ai', - description='User transaction totals over a series of time windows, updated daily.' + description='User transaction totals over a series of time windows, updated daily.', + timestamp_field='timestamp' ) def user_transaction_counts(transactions): return f''' diff --git a/fraud/features/batch_features/user_weekend_transaction_count_30d.py b/fraud/features/batch_features/user_weekend_transaction_count_30d.py index 08d0a35..9e87a13 100644 --- a/fraud/features/batch_features/user_weekend_transaction_count_30d.py +++ b/fraud/features/batch_features/user_weekend_transaction_count_30d.py @@ -1,4 +1,6 @@ -from tecton import batch_feature_view, FilteredSource, Aggregation +from tecton import batch_feature_view, FilteredSource, Aggregation, Aggregate +from tecton.types import Field, Int32 + from fraud.entities import user from fraud.data_sources.transactions import transactions_batch from datetime import datetime, timedelta @@ -12,14 +14,15 @@ def is_weekend(input_df, timestamp_column): entities=[user], mode='pyspark', aggregation_interval=timedelta(days=1), - aggregations=[ - Aggregation(column='is_weekend', function='count', time_window=timedelta(days=30)), + features=[ + Aggregate(input_column=Field('is_weekend', Int32), function='count', time_window=timedelta(days=30)), ], online=False, offline=False, feature_start_time=datetime(2022, 5, 1), tags={'cost-center': 'finance'}, - description='How many weekend transactions the user has made in the last 30 days.' + description='How many weekend transactions the user has made in the last 30 days.', + timestamp_field='timestamp' ) def user_weekend_transaction_count_30d(transactions_batch): return is_weekend(transactions_batch, "timestamp") \ diff --git a/fraud/features/feature_tables/user_login_counts.py b/fraud/features/feature_tables/user_login_counts.py index 0d3183d..3d3c697 100644 --- a/fraud/features/feature_tables/user_login_counts.py +++ b/fraud/features/feature_tables/user_login_counts.py @@ -1,24 +1,23 @@ -from tecton import Entity, FeatureTable +from tecton import Entity, FeatureTable, Attribute from tecton.types import String, Timestamp, Int64, Field from fraud.entities import user from datetime import timedelta -schema = [ - Field('user_id', String), - Field('timestamp', Timestamp), - Field('user_login_count_7d', Int64), - Field('user_login_count_30d', Int64), +features = [ + Attribute('user_login_count_7d', Int64), + Attribute('user_login_count_30d', Int64), ] user_login_counts = FeatureTable( name='user_login_counts', entities=[user], - schema=schema, + features=features, online=True, offline=True, ttl=timedelta(days=7), owner='demo-user@tecton.ai', tags={'release': 'production'}, description='User login counts over time.', + timestamp_field='timestamp' ) diff --git a/fraud/features/on_demand_feature_views/complex_data_type.py b/fraud/features/on_demand_feature_views/complex_data_type.py index 3d2ed7b..acc8c1b 100644 --- a/fraud/features/on_demand_feature_views/complex_data_type.py +++ b/fraud/features/on_demand_feature_views/complex_data_type.py @@ -1,4 +1,4 @@ -from tecton import on_demand_feature_view +from tecton import on_demand_feature_view, Attribute from tecton import RequestSource from tecton.types import Array from tecton.types import Field @@ -20,18 +20,17 @@ ] ) -output_schema = [ - Field("output_string_map", Map(String, String)), - Field("output_two_dimensional_array", Array(Array(String))), - Field("output_simple_struct", Struct( - [ +features = [ + Attribute("output_string_map", Map(String, String)), + Attribute("output_two_dimensional_array", Array(Array(String))), + Attribute("output_simple_struct", Struct([ Field("string_field", String), Field("float64_field", Float64), ] )), ] -@on_demand_feature_view(mode="python", sources=[request_source], schema=output_schema) +@on_demand_feature_view(mode="python", sources=[request_source], features=features) def complex_data_type_odfv(request): # Transform map value output_string_map = request["string_map"] diff --git a/fraud/features/on_demand_feature_views/transaction_amount_is_high.py b/fraud/features/on_demand_feature_views/transaction_amount_is_high.py index ca9dd99..c126d23 100644 --- a/fraud/features/on_demand_feature_views/transaction_amount_is_high.py +++ b/fraud/features/on_demand_feature_views/transaction_amount_is_high.py @@ -1,15 +1,15 @@ -from tecton import RequestSource, on_demand_feature_view +from tecton import RequestSource, on_demand_feature_view, Attribute from tecton.types import Float64, Field, Bool request_schema = [Field('amt', Float64)] transaction_request = RequestSource(schema=request_schema) -output_schema = [Field('transaction_amount_is_high', Bool)] +features = [Attribute('transaction_amount_is_high', Bool)] # An example of an on-demand feature view that depends only on a request source. @on_demand_feature_view( sources=[transaction_request], mode='python', - schema=output_schema, + features=features, description='The transaction amount is higher than $100.' ) def transaction_amount_is_high(transaction_request): diff --git a/fraud/features/on_demand_feature_views/transaction_amount_is_higher_than_average.py b/fraud/features/on_demand_feature_views/transaction_amount_is_higher_than_average.py index 151bca4..f3ce43c 100644 --- a/fraud/features/on_demand_feature_views/transaction_amount_is_higher_than_average.py +++ b/fraud/features/on_demand_feature_views/transaction_amount_is_higher_than_average.py @@ -1,15 +1,15 @@ -from tecton import RequestSource, on_demand_feature_view +from tecton import RequestSource, on_demand_feature_view, Attribute from tecton.types import String, Timestamp, Float64, Field, Bool from fraud.features.stream_features.user_transaction_amount_metrics import user_transaction_amount_metrics request_schema = [Field('amt', Float64)] transaction_request = RequestSource(schema=request_schema) -output_schema = [Field('transaction_amount_is_higher_than_average', Bool)] +features = [Attribute('transaction_amount_is_higher_than_average', Bool)] @on_demand_feature_view( sources=[transaction_request, user_transaction_amount_metrics], mode='python', - schema=output_schema, + features=features, description='The transaction amount is higher than the 1 day average.' ) def transaction_amount_is_higher_than_average(transaction_request, user_transaction_amount_metrics): diff --git a/fraud/features/on_demand_feature_views/transaction_distance_from_home.py b/fraud/features/on_demand_feature_views/transaction_distance_from_home.py index 611107f..b1c5809 100644 --- a/fraud/features/on_demand_feature_views/transaction_distance_from_home.py +++ b/fraud/features/on_demand_feature_views/transaction_distance_from_home.py @@ -1,4 +1,4 @@ -from tecton import RequestSource, on_demand_feature_view +from tecton import RequestSource, on_demand_feature_view, Attribute from tecton.types import String, Timestamp, Float64, Field from fraud.features.batch_features.user_home_location import user_home_location @@ -7,12 +7,12 @@ Field('long', Float64), ] request = RequestSource(schema=request_schema) -output_schema = [Field('dist_km', Float64)] +features = [Attribute('dist_km', Float64)] @on_demand_feature_view( sources=[request, user_home_location], mode='python', - schema=output_schema, + features=features, description="How far a transaction is from the user's home", environments=['tecton-python-extended:0.1', 'tecton-python-extended:0.2'] ) diff --git a/fraud/features/on_demand_feature_views/user_age.py b/fraud/features/on_demand_feature_views/user_age.py index 18edc21..2dcf381 100644 --- a/fraud/features/on_demand_feature_views/user_age.py +++ b/fraud/features/on_demand_feature_views/user_age.py @@ -1,16 +1,16 @@ -from tecton import RequestSource, on_demand_feature_view +from tecton import RequestSource, on_demand_feature_view, Attribute from tecton.types import String, Timestamp, Int64, Field from fraud.features.batch_features.user_date_of_birth import user_date_of_birth request_schema = [Field('timestamp', String)] request = RequestSource(schema=request_schema) -output_schema = [Field('user_age', Int64)] +features = [Attribute('user_age', Int64)] @on_demand_feature_view( sources=[request, user_date_of_birth], mode='python', - schema=output_schema, + features=features, description="The user's age in days." ) def user_age(request, user_date_of_birth): diff --git a/fraud/features/on_demand_feature_views/user_to_user_distance.py b/fraud/features/on_demand_feature_views/user_to_user_distance.py index 9090a88..359a409 100644 --- a/fraud/features/on_demand_feature_views/user_to_user_distance.py +++ b/fraud/features/on_demand_feature_views/user_to_user_distance.py @@ -1,4 +1,4 @@ -from tecton import RequestSource, on_demand_feature_view +from tecton import RequestSource, on_demand_feature_view, Attribute from tecton.types import String, Timestamp, Float64, Field from fraud.features.batch_features.user_home_location import user_home_location @@ -10,7 +10,7 @@ user_home_location.with_join_key_map({"user_id": "recipient_id"}), ], mode='python', - schema=[Field('dist_km', Float64)], + features=[Attribute('dist_km', Float64)], description="How far apart two users' home locations are.", environments=['tecton-python-extended:0.1', 'tecton-python-extended:0.2'] ) diff --git a/fraud/features/stream_features/last_transaction_amount_pyspark.py b/fraud/features/stream_features/last_transaction_amount_pyspark.py index bf84474..9251a8b 100644 --- a/fraud/features/stream_features/last_transaction_amount_pyspark.py +++ b/fraud/features/stream_features/last_transaction_amount_pyspark.py @@ -1,4 +1,6 @@ -from tecton import stream_feature_view, FilteredSource +from tecton import stream_feature_view, FilteredSource, Attribute +from tecton.types import Float64 + from fraud.entities import user from fraud.data_sources.transactions import transactions_stream from datetime import datetime, timedelta @@ -13,6 +15,10 @@ batch_schedule=timedelta(days=1), ttl=timedelta(days=30), description='Last user transaction amount (stream calculated)', + features=[ + Attribute('amt', Float64) + ], + timestamp_field='timestamp' ) def last_transaction_amount_pyspark(transactions): from pyspark.sql import functions as f diff --git a/fraud/features/stream_features/last_transaction_amount_sql.py b/fraud/features/stream_features/last_transaction_amount_sql.py index 3ba853f..c365ad9 100644 --- a/fraud/features/stream_features/last_transaction_amount_sql.py +++ b/fraud/features/stream_features/last_transaction_amount_sql.py @@ -1,4 +1,6 @@ -from tecton import stream_feature_view, FilteredSource +from tecton import stream_feature_view, FilteredSource, Attribute +from tecton.types import Float64 + from fraud.entities import user from fraud.data_sources.transactions import transactions_stream from datetime import datetime, timedelta @@ -13,6 +15,10 @@ batch_schedule=timedelta(days=1), ttl=timedelta(days=30), description='Last user transaction amount (stream calculated)', + features=[ + Attribute('amt', Float64) + ], + timestamp_field='timestamp' ) def last_transaction_amount_sql(transactions): return f''' diff --git a/fraud/features/stream_features/user_continuous_transaction_count.py b/fraud/features/stream_features/user_continuous_transaction_count.py index 66991c0..01749e2 100644 --- a/fraud/features/stream_features/user_continuous_transaction_count.py +++ b/fraud/features/stream_features/user_continuous_transaction_count.py @@ -1,4 +1,6 @@ -from tecton import stream_feature_view, FilteredSource, Aggregation, StreamProcessingMode +from tecton import stream_feature_view, FilteredSource, Aggregation, StreamProcessingMode, Attribute, Aggregate +from tecton.types import Float64, Int32, Field + from fraud.entities import user from fraud.data_sources.transactions import transactions_stream from datetime import datetime, timedelta @@ -11,10 +13,10 @@ entities=[user], mode='spark_sql', stream_processing_mode=StreamProcessingMode.CONTINUOUS, - aggregations=[ - Aggregation(column='transaction', function='count', time_window=timedelta(minutes=1)), - Aggregation(column='transaction', function='count', time_window=timedelta(minutes=30)), - Aggregation(column='transaction', function='count', time_window=timedelta(hours=1)) + features=[ + Aggregate(input_column=Field('transaction', Int32), function='count', time_window=timedelta(minutes=1)), + Aggregate(input_column=Field('transaction', Int32), function='count', time_window=timedelta(minutes=30)), + Aggregate(input_column=Field('transaction', Int32), function='count', time_window=timedelta(hours=1)) ], online=False, offline=True, @@ -22,7 +24,8 @@ prevent_destroy=False, # Set to True to prevent accidental destructive changes or downtime. tags={'release': 'production'}, owner='demo-user@tecton.ai', - description='Number of transactions a user has made recently' + description='Number of transactions a user has made recently', + timestamp_field='timestamp' ) def user_continuous_transaction_count(transactions): return f''' diff --git a/fraud/features/stream_features/user_recent_transactions.py b/fraud/features/stream_features/user_recent_transactions.py index e860e38..2c44293 100644 --- a/fraud/features/stream_features/user_recent_transactions.py +++ b/fraud/features/stream_features/user_recent_transactions.py @@ -1,5 +1,7 @@ -from tecton import stream_feature_view, FilteredSource, Aggregation +from tecton import stream_feature_view, FilteredSource, Aggregate from tecton.aggregation_functions import last_distinct +from tecton.types import Field, String + from fraud.entities import user from fraud.data_sources.transactions import transactions_stream from datetime import datetime, timedelta @@ -12,9 +14,10 @@ mode='spark_sql', aggregation_interval=timedelta(minutes=10), # Defines how frequently feature values get updated in the online store batch_schedule=timedelta(days=1), # Defines how frequently batch jobs are scheduled to ingest into the offline store - aggregations=[ - Aggregation(column='amt', function=last_distinct(10), time_window=timedelta(hours=1)) + features=[ + Aggregate(input_column=Field('amt', String), function=last_distinct(10), time_window=timedelta(hours=1)) ], + timestamp_field='timestamp', online=False, offline=False, feature_start_time=datetime(2022, 5, 1), diff --git a/fraud/features/stream_features/user_transaction_amount_metrics.py b/fraud/features/stream_features/user_transaction_amount_metrics.py index 6f5d0f3..5c2874c 100644 --- a/fraud/features/stream_features/user_transaction_amount_metrics.py +++ b/fraud/features/stream_features/user_transaction_amount_metrics.py @@ -1,4 +1,6 @@ -from tecton import stream_feature_view, FilteredSource, Aggregation +from tecton import stream_feature_view, FilteredSource, Aggregation, Aggregate +from tecton.types import Field, Int64, Float64 + from fraud.entities import user from fraud.data_sources.transactions import transactions_stream from datetime import datetime, timedelta @@ -11,14 +13,15 @@ mode='spark_sql', aggregation_interval=timedelta(minutes=10), # Defines how frequently feature values get updated in the online store batch_schedule=timedelta(days=1), # Defines how frequently batch jobs are scheduled to ingest into the offline store - aggregations=[ - Aggregation(column='amt', function='sum', time_window=timedelta(hours=1)), - Aggregation(column='amt', function='sum', time_window=timedelta(days=1)), - Aggregation(column='amt', function='sum', time_window=timedelta(days=3)), - Aggregation(column='amt', function='mean', time_window=timedelta(hours=1)), - Aggregation(column='amt', function='mean', time_window=timedelta(days=1)), - Aggregation(column='amt', function='mean', time_window=timedelta(days=3)) + features=[ + Aggregate(input_column=Field('amt', Float64), function='sum', time_window=timedelta(hours=1)), + Aggregate(input_column=Field('amt', Float64), function='sum', time_window=timedelta(days=1)), + Aggregate(input_column=Field('amt', Float64), function='sum', time_window=timedelta(days=3)), + Aggregate(input_column=Field('amt', Float64), function='mean', time_window=timedelta(hours=1)), + Aggregate(input_column=Field('amt', Float64), function='mean', time_window=timedelta(days=1)), + Aggregate(input_column=Field('amt', Float64), function='mean', time_window=timedelta(days=3)), ], + timestamp_field='timestamp', online=True, offline=True, feature_start_time=datetime(2022, 5, 1), diff --git a/recsys/entities.py b/recsys/entities.py index bc211f1..8a7e828 100644 --- a/recsys/entities.py +++ b/recsys/entities.py @@ -1,8 +1,9 @@ from tecton import Entity +from tecton.types import Field, Int32 article = Entity( name='article', - join_keys=['aid'], + join_keys=[Field('aid', Int32)], description='Item on an ecommerce site', owner='demo-user@tecton.ai', tags={'release': 'production'} @@ -10,7 +11,7 @@ session = Entity( name="session", - join_keys=["session"], + join_keys=[Field("session", Int32)], description='A user session', owner='demo-user@tecton.ai', tags={'release': 'production'} diff --git a/recsys/features/batch_features/article_features.py b/recsys/features/batch_features/article_features.py index 527b13c..6aada4b 100644 --- a/recsys/features/batch_features/article_features.py +++ b/recsys/features/batch_features/article_features.py @@ -1,5 +1,7 @@ -from tecton import batch_feature_view, Aggregation +from tecton import batch_feature_view, Aggregate from tecton.aggregation_functions import approx_count_distinct +from tecton.types import Field, Int32 + from recsys.entities import article from recsys.data_sources.session_events import sessions_batch from datetime import timedelta @@ -12,8 +14,8 @@ mode="spark_sql", timestamp_field="ts", aggregation_interval=timedelta(days=1), - aggregations=[ - Aggregation(function=approx_count_distinct(), column="session", time_window=timedelta(days=30)), + features=[ + Aggregate(input_column=Field("session", Int32), function=approx_count_distinct(), time_window=timedelta(days=30)), ], ) def article_sessions(sessions_batch): diff --git a/recsys/features/batch_features/session_features.py b/recsys/features/batch_features/session_features.py index efb0b0c..f706889 100644 --- a/recsys/features/batch_features/session_features.py +++ b/recsys/features/batch_features/session_features.py @@ -1,5 +1,7 @@ -from tecton import batch_feature_view, Aggregation +from tecton import batch_feature_view, Aggregate from tecton.aggregation_functions import approx_count_distinct +from tecton.types import Field, Int32 + from recsys.entities import session from recsys.data_sources.session_events import sessions_batch from datetime import datetime, timedelta @@ -13,11 +15,12 @@ feature_start_time=datetime(2022, 7, 31), batch_schedule=timedelta(days=1), aggregation_interval=timedelta(days=1), - aggregations=[ - Aggregation(function=approx_count_distinct(), column="aid", time_window=timedelta(days=1)), - Aggregation(function=approx_count_distinct(), column="aid", time_window=timedelta(days=3)), - Aggregation(function=approx_count_distinct(), column="aid", time_window=timedelta(days=7)) + features=[ + Aggregate(input_column=Field('aid', Int32), function=approx_count_distinct(), time_window=timedelta(days=1)), + Aggregate(input_column=Field('aid', Int32), function=approx_count_distinct(), time_window=timedelta(days=3)), + Aggregate(input_column=Field('aid', Int32), function=approx_count_distinct(), time_window=timedelta(days=7)) ], + timestamp_field='ts', description='Number of articles in session over 1, 3, 7 days' ) def session_approx_count_articles(sessions_data): From fdc7451945e85cc89b1845cd4a67bf537ee86a98 Mon Sep 17 00:00:00 2001 From: Lilly Date: Tue, 17 Sep 2024 09:38:30 -0700 Subject: [PATCH 08/23] update runtimes, filtered source --- ads/features/batch_features/ad_auction_keywords.py | 4 ++-- .../batch_features/user_distinct_ad_count_7d.py | 7 ++++--- .../stream_features/content_keyword_click_counts.py | 10 ++++++---- .../stream_features/content_keyword_clicks_push.py | 2 +- .../stream_features/user_ad_impression_counts.py | 7 ++++--- ads/features/stream_features/user_click_counts.py | 9 +++++---- ads/features/stream_features/user_click_counts_push.py | 2 +- ads/features/stream_features/user_impression_counts.py | 7 ++++--- .../batch_features/batch_transaction_features.py | 4 ++-- fraud/features/batch_features/merchant_fraud_rate.py | 6 +++--- .../batch_features/transaction_user_is_adult.py | 4 ++-- ...r_approx_distinct_merchant_transaction_count_30d.py | 4 ++-- .../features/batch_features/user_credit_card_issuer.py | 2 +- fraud/features/batch_features/user_date_of_birth.py | 2 +- .../user_distinct_merchant_transaction_count_30d.py | 7 +++---- fraud/features/batch_features/user_home_location.py | 2 +- .../user_median_transaction_amount_30d.py | 4 ++-- .../batch_features/user_merchant_transaction_count.py | 4 ++-- .../user_transaction_aggregates_by_category.py | 4 ++-- .../features/batch_features/user_transaction_counts.py | 4 ++-- .../user_weekend_transaction_count_30d.py | 4 ++-- .../stream_features/last_transaction_amount_pyspark.py | 7 ++++--- .../stream_features/last_transaction_amount_sql.py | 2 +- .../user_continuous_transaction_count.py | 6 +++--- .../stream_features/user_recent_transactions.py | 7 ++++--- .../stream_features/user_transaction_amount_metrics.py | 6 +++--- recsys/features/batch_features/article_features.py | 2 +- recsys/features/batch_features/session_features.py | 2 +- 28 files changed, 69 insertions(+), 62 deletions(-) diff --git a/ads/features/batch_features/ad_auction_keywords.py b/ads/features/batch_features/ad_auction_keywords.py index 70a0aa8..d301d36 100644 --- a/ads/features/batch_features/ad_auction_keywords.py +++ b/ads/features/batch_features/ad_auction_keywords.py @@ -1,4 +1,4 @@ -from tecton import transformation, FilteredSource, batch_feature_view, const, Attribute +from tecton import transformation, batch_feature_view, const, Attribute from tecton.types import String, Array, Int32, Bool from ads.entities import auction from ads.data_sources.ad_impressions import ad_impressions_batch @@ -31,7 +31,7 @@ def keyword_stats(input_data, keyword_column): # array of words, then create metrics based on that array. @batch_feature_view( mode='pipeline', - sources=[FilteredSource(ad_impressions_batch)], + sources=[ad_impressions_batch], entities=[auction], timestamp_field="timestamp", features=[ diff --git a/ads/features/batch_features/user_distinct_ad_count_7d.py b/ads/features/batch_features/user_distinct_ad_count_7d.py index a989071..abf0757 100644 --- a/ads/features/batch_features/user_distinct_ad_count_7d.py +++ b/ads/features/batch_features/user_distinct_ad_count_7d.py @@ -1,12 +1,13 @@ -from tecton import batch_feature_view, FilteredSource, materialization_context, Attribute -from tecton.types import String, Int64 +from tecton import batch_feature_view, materialization_context, Attribute +from tecton import TectonTimeConstant +from tecton.types import Int64 from ads.entities import user from ads.data_sources.ad_impressions import ad_impressions_batch from datetime import datetime, timedelta @batch_feature_view( - sources=[FilteredSource(ad_impressions_batch, start_time_offset=timedelta(days=-6))], + sources=[ad_impressions_batch.select_range(start_time=TectonTimeConstant.MATERIALIZATION_START_TIME - timedelta(days=6), end_time=TectonTimeConstant.MATERIALIZATION_END_TIME)], entities=[user], mode='spark_sql', ttl=timedelta(days=1), diff --git a/ads/features/stream_features/content_keyword_click_counts.py b/ads/features/stream_features/content_keyword_click_counts.py index bd4cc32..03816d3 100644 --- a/ads/features/stream_features/content_keyword_click_counts.py +++ b/ads/features/stream_features/content_keyword_click_counts.py @@ -1,5 +1,6 @@ -from tecton import stream_feature_view, FilteredSource, DatabricksClusterConfig, StreamProcessingMode, Aggregate -from tecton.types import Field, Int64, Int32 +from tecton import stream_feature_view, DatabricksClusterConfig, StreamProcessingMode, Aggregate, \ + AggregationLeadingEdge +from tecton.types import Field, Int32 from ads.entities import content_keyword from ads.data_sources.ad_impressions import ad_impressions_stream @@ -12,7 +13,7 @@ ) @stream_feature_view( - source=FilteredSource(ad_impressions_stream), + source=ad_impressions_stream, entities=[content_keyword], mode='pyspark', stream_processing_mode=StreamProcessingMode.CONTINUOUS, # enable low latency streaming @@ -28,7 +29,8 @@ feature_start_time=datetime(2022, 5, 1), tags={'release': 'production'}, owner='demo-user@tecton.ai', - description='The count of ad impressions for a content_keyword' + description='The count of ad impressions for a content_keyword', + aggregation_leading_edge=AggregationLeadingEdge.LATEST_EVENT_TIME ) def content_keyword_click_counts(ad_impressions): from pyspark.sql import functions as F diff --git a/ads/features/stream_features/content_keyword_clicks_push.py b/ads/features/stream_features/content_keyword_clicks_push.py index a04d90d..f9591ab 100644 --- a/ads/features/stream_features/content_keyword_clicks_push.py +++ b/ads/features/stream_features/content_keyword_clicks_push.py @@ -10,7 +10,7 @@ # https://docs.tecton.ai/using-the-ingestion-api/#creating-a-stream-feature-view-with-a-push-source content_keyword_click_counts_push = StreamFeatureView( name="keyword_clicks_fv", - source=keyword_click_source, + source=keyword_click_source.unfiltered(), entities=[content_keyword], online=True, offline=True, diff --git a/ads/features/stream_features/user_ad_impression_counts.py b/ads/features/stream_features/user_ad_impression_counts.py index 6d85bed..64c7df0 100644 --- a/ads/features/stream_features/user_ad_impression_counts.py +++ b/ads/features/stream_features/user_ad_impression_counts.py @@ -1,4 +1,4 @@ -from tecton import stream_feature_view, Aggregate, FilteredSource +from tecton import stream_feature_view, Aggregate, AggregationLeadingEdge from tecton.types import Int32, Field from ads.entities import ad, user from ads.data_sources.ad_impressions import ad_impressions_stream @@ -6,7 +6,7 @@ @stream_feature_view( - source=FilteredSource(ad_impressions_stream), + source=ad_impressions_stream, entities=[user, ad], mode='spark_sql', aggregation_interval=timedelta(hours=1), @@ -22,7 +22,8 @@ feature_start_time=datetime(2022, 5, 1), tags={'release': 'production'}, owner='demo-user@tecton.ai', - description='The count of impressions between a given user and a given ad' + description='The count of impressions between a given user and a given ad', + aggregation_leading_edge=AggregationLeadingEdge.LATEST_EVENT_TIME ) def user_ad_impression_counts(ad_impressions): return f""" diff --git a/ads/features/stream_features/user_click_counts.py b/ads/features/stream_features/user_click_counts.py index d686b8a..0ac7ae2 100644 --- a/ads/features/stream_features/user_click_counts.py +++ b/ads/features/stream_features/user_click_counts.py @@ -1,5 +1,5 @@ -from tecton import stream_feature_view, FilteredSource, Aggregate -from tecton.types import Field, Bool, Int64 +from tecton import stream_feature_view, Aggregate, AggregationLeadingEdge +from tecton.types import Field, Int64 from ads.entities import user from ads.data_sources.ad_impressions import ad_impressions_stream @@ -7,7 +7,7 @@ @stream_feature_view( - source=FilteredSource(ad_impressions_stream), + source=ad_impressions_stream, entities=[user], mode='pyspark', aggregation_interval=timedelta(hours=1), @@ -23,7 +23,8 @@ tags={'release': 'production'}, owner='demo-user@tecton.ai', description='The count of ad clicks for a user', - timestamp_field='timestamp' + timestamp_field='timestamp', + aggregation_leading_edge=AggregationLeadingEdge.LATEST_EVENT_TIME ) def user_click_counts(ad_impressions): return ad_impressions.select(ad_impressions['user_uuid'].alias('user_id'), 'clicked', 'timestamp') diff --git a/ads/features/stream_features/user_click_counts_push.py b/ads/features/stream_features/user_click_counts_push.py index 05e9fa9..2646631 100644 --- a/ads/features/stream_features/user_click_counts_push.py +++ b/ads/features/stream_features/user_click_counts_push.py @@ -10,7 +10,7 @@ # https://docs.tecton.ai/using-the-ingestion-api/#creating-a-stream-feature-view-with-a-push-source user_click_counts_push = StreamFeatureView( name="user_click_counts_wafv", - source=user_click_push_source, + source=user_click_push_source.unfiltered(), entities=[user], online=True, offline=True, diff --git a/ads/features/stream_features/user_impression_counts.py b/ads/features/stream_features/user_impression_counts.py index cb3a1ba..a460ba5 100644 --- a/ads/features/stream_features/user_impression_counts.py +++ b/ads/features/stream_features/user_impression_counts.py @@ -1,4 +1,4 @@ -from tecton import stream_feature_view, FilteredSource, Aggregation, Aggregate +from tecton import stream_feature_view, FilteredSource, Aggregation, Aggregate, AggregationLeadingEdge from tecton.types import Int32, Field from ads.entities import user @@ -7,7 +7,7 @@ @stream_feature_view( - source=FilteredSource(ad_impressions_stream), + source=ad_impressions_stream, entities=[user], mode='spark_sql', aggregation_interval=timedelta(hours=1), @@ -23,7 +23,8 @@ feature_start_time=datetime(2022, 5, 1), tags={'release': 'production'}, owner='demo-user@tecton.ai', - description='The count of ad impressions for a user' + description='The count of ad impressions for a user', + aggregation_leading_edge=AggregationLeadingEdge.LATEST_EVENT_TIME ) def user_impression_counts(ad_impressions): return f''' diff --git a/fraud/features/batch_features/batch_transaction_features.py b/fraud/features/batch_features/batch_transaction_features.py index d387bdc..31628ae 100644 --- a/fraud/features/batch_features/batch_transaction_features.py +++ b/fraud/features/batch_features/batch_transaction_features.py @@ -1,4 +1,4 @@ -from tecton import batch_feature_view, FilteredSource, Attribute +from tecton import batch_feature_view, Attribute from tecton.types import Float64 from fraud.entities import user @@ -6,7 +6,7 @@ from datetime import datetime, timedelta @batch_feature_view( - sources=[FilteredSource(transactions_batch)], + sources=[transactions_batch], entities=[user], mode='spark_sql', online=False, diff --git a/fraud/features/batch_features/merchant_fraud_rate.py b/fraud/features/batch_features/merchant_fraud_rate.py index 7c4997e..4c9c782 100644 --- a/fraud/features/batch_features/merchant_fraud_rate.py +++ b/fraud/features/batch_features/merchant_fraud_rate.py @@ -1,5 +1,5 @@ -from tecton import batch_feature_view, Aggregation, FilteredSource, Aggregate -from tecton.types import Field, Bool, Int64, Int32 +from tecton import batch_feature_view, Aggregate +from tecton.types import Field, Int32 from fraud.entities import merchant from fraud.data_sources.transactions import transactions_batch @@ -7,7 +7,7 @@ @batch_feature_view( - sources=[FilteredSource(transactions_batch)], + sources=[transactions_batch], entities=[merchant], mode='spark_sql', online=True, diff --git a/fraud/features/batch_features/transaction_user_is_adult.py b/fraud/features/batch_features/transaction_user_is_adult.py index 69d4c9b..5cdafb6 100644 --- a/fraud/features/batch_features/transaction_user_is_adult.py +++ b/fraud/features/batch_features/transaction_user_is_adult.py @@ -1,4 +1,4 @@ -from tecton import batch_feature_view, FilteredSource, Attribute +from tecton import batch_feature_view, Attribute from tecton.types import Int32 from fraud.entities import user @@ -10,7 +10,7 @@ # For every transaction, the following FeatureView precomputes a feature that indicates # whether a user was an adult as of the time of the transaction @batch_feature_view( - sources=[FilteredSource(transactions_batch), fraud_users_batch], + sources=[transactions_batch, fraud_users_batch.unfiltered()], entities=[user], mode='spark_sql', online=False, diff --git a/fraud/features/batch_features/user_approx_distinct_merchant_transaction_count_30d.py b/fraud/features/batch_features/user_approx_distinct_merchant_transaction_count_30d.py index d57c4a8..6de3fa8 100644 --- a/fraud/features/batch_features/user_approx_distinct_merchant_transaction_count_30d.py +++ b/fraud/features/batch_features/user_approx_distinct_merchant_transaction_count_30d.py @@ -1,4 +1,4 @@ -from tecton import batch_feature_view, Aggregation, FilteredSource, Aggregate +from tecton import batch_feature_view, Aggregate from tecton.aggregation_functions import approx_count_distinct from tecton.types import Field, String @@ -10,7 +10,7 @@ # This feature view is a simpler way to implement the features in user_distinct_merchant_transaction_count_30d. # Instead of using a "custom aggregation" with `incremental_backfills=True`, it uses Tecton's built-in `approx_count_distinct` aggregation. @batch_feature_view( - sources=[FilteredSource(transactions_batch)], + sources=[transactions_batch], entities=[user], mode='spark_sql', online=True, diff --git a/fraud/features/batch_features/user_credit_card_issuer.py b/fraud/features/batch_features/user_credit_card_issuer.py index c4ce3da..1f61a0b 100644 --- a/fraud/features/batch_features/user_credit_card_issuer.py +++ b/fraud/features/batch_features/user_credit_card_issuer.py @@ -7,7 +7,7 @@ @batch_feature_view( - sources=[fraud_users_batch], + sources=[fraud_users_batch.unfiltered()], entities=[user], mode='spark_sql', online=False, diff --git a/fraud/features/batch_features/user_date_of_birth.py b/fraud/features/batch_features/user_date_of_birth.py index 62a3b45..c93648a 100644 --- a/fraud/features/batch_features/user_date_of_birth.py +++ b/fraud/features/batch_features/user_date_of_birth.py @@ -7,7 +7,7 @@ @batch_feature_view( - sources=[fraud_users_batch], + sources=[fraud_users_batch.unfiltered()], entities=[user], mode='pyspark', online=False, diff --git a/fraud/features/batch_features/user_distinct_merchant_transaction_count_30d.py b/fraud/features/batch_features/user_distinct_merchant_transaction_count_30d.py index 1e62e61..3196933 100644 --- a/fraud/features/batch_features/user_distinct_merchant_transaction_count_30d.py +++ b/fraud/features/batch_features/user_distinct_merchant_transaction_count_30d.py @@ -1,4 +1,4 @@ -from tecton import batch_feature_view, FilteredSource, materialization_context, Attribute +from tecton import batch_feature_view, materialization_context, Attribute, TectonTimeConstant from tecton.types import Int64 from fraud.entities import user @@ -12,7 +12,7 @@ # See this documentation for more info: # https://docs.tecton.ai/latest/overviews/framework/feature_views/batch/incremental_backfills.html. @batch_feature_view( - sources=[FilteredSource(transactions_batch, start_time_offset=timedelta(days=-29))], + sources=[transactions_batch.select_range(start_time=TectonTimeConstant.MATERIALIZATION_START_TIME - timedelta(days=20), end_time=TectonTimeConstant.MATERIALIZATION_END_TIME)], entities=[user], mode='spark_sql', online=True, @@ -27,8 +27,7 @@ features=[ Attribute('distinct_merchant_transaction_count_30d', Int64) ], - timestamp_field='timestamp' - + timestamp_field='timestamp', ) def user_distinct_merchant_transaction_count_30d(transactions_batch, context=materialization_context()): return f''' diff --git a/fraud/features/batch_features/user_home_location.py b/fraud/features/batch_features/user_home_location.py index bc20d09..ff8c89c 100644 --- a/fraud/features/batch_features/user_home_location.py +++ b/fraud/features/batch_features/user_home_location.py @@ -7,7 +7,7 @@ @batch_feature_view( - sources=[fraud_users_batch], + sources=[fraud_users_batch.unfiltered()], entities=[user], mode='spark_sql', online=True, diff --git a/fraud/features/batch_features/user_median_transaction_amount_30d.py b/fraud/features/batch_features/user_median_transaction_amount_30d.py index 59ca942..48c49b4 100644 --- a/fraud/features/batch_features/user_median_transaction_amount_30d.py +++ b/fraud/features/batch_features/user_median_transaction_amount_30d.py @@ -1,4 +1,4 @@ -from tecton import batch_feature_view, Aggregation, FilteredSource, Aggregate +from tecton import batch_feature_view, Aggregate from tecton.aggregation_functions import approx_percentile from tecton.types import Field, Float64 @@ -7,7 +7,7 @@ from datetime import datetime, timedelta @batch_feature_view( - sources=[FilteredSource(transactions_batch)], + sources=[transactions_batch], entities=[user], mode='spark_sql', online=False, diff --git a/fraud/features/batch_features/user_merchant_transaction_count.py b/fraud/features/batch_features/user_merchant_transaction_count.py index d9bd339..005d4aa 100644 --- a/fraud/features/batch_features/user_merchant_transaction_count.py +++ b/fraud/features/batch_features/user_merchant_transaction_count.py @@ -1,4 +1,4 @@ -from tecton import batch_feature_view, FilteredSource, Aggregate +from tecton import batch_feature_view, Aggregate from tecton.types import Field, Int32 from fraud.entities import user, merchant @@ -7,7 +7,7 @@ @batch_feature_view( - sources=[FilteredSource(transactions_batch)], + sources=[transactions_batch], entities=[user, merchant], mode='spark_sql', aggregation_interval=timedelta(days=1), diff --git a/fraud/features/batch_features/user_transaction_aggregates_by_category.py b/fraud/features/batch_features/user_transaction_aggregates_by_category.py index 82cb40b..67c5503 100644 --- a/fraud/features/batch_features/user_transaction_aggregates_by_category.py +++ b/fraud/features/batch_features/user_transaction_aggregates_by_category.py @@ -1,4 +1,4 @@ -from tecton import batch_feature_view, Aggregation, FilteredSource, Aggregate +from tecton import batch_feature_view, Aggregate from tecton.types import Field, Float64 from fraud.entities import user @@ -40,7 +40,7 @@ # # An equivalent feature view can be implemented in spark SQL using the PIVOT clause. @batch_feature_view( - sources=[FilteredSource(transactions_batch)], + sources=[transactions_batch], entities=[user], mode='pyspark', aggregation_interval=timedelta(days=1), diff --git a/fraud/features/batch_features/user_transaction_counts.py b/fraud/features/batch_features/user_transaction_counts.py index 985a7c6..cec78f8 100644 --- a/fraud/features/batch_features/user_transaction_counts.py +++ b/fraud/features/batch_features/user_transaction_counts.py @@ -1,4 +1,4 @@ -from tecton import batch_feature_view, FilteredSource, Aggregation, Aggregate +from tecton import batch_feature_view, Aggregate from tecton.types import Field, Int32 from fraud.entities import user @@ -7,7 +7,7 @@ @batch_feature_view( - sources=[FilteredSource(transactions_batch)], + sources=[transactions_batch], entities=[user], mode='spark_sql', aggregation_interval=timedelta(days=1), diff --git a/fraud/features/batch_features/user_weekend_transaction_count_30d.py b/fraud/features/batch_features/user_weekend_transaction_count_30d.py index 9e87a13..250fe03 100644 --- a/fraud/features/batch_features/user_weekend_transaction_count_30d.py +++ b/fraud/features/batch_features/user_weekend_transaction_count_30d.py @@ -1,4 +1,4 @@ -from tecton import batch_feature_view, FilteredSource, Aggregation, Aggregate +from tecton import batch_feature_view, Aggregate from tecton.types import Field, Int32 from fraud.entities import user @@ -10,7 +10,7 @@ def is_weekend(input_df, timestamp_column): return input_df.withColumn("is_weekend", dayofweek(to_timestamp(col(timestamp_column))).isin([1,7]).cast("int")) @batch_feature_view( - sources=[FilteredSource(transactions_batch)], + sources=[transactions_batch], entities=[user], mode='pyspark', aggregation_interval=timedelta(days=1), diff --git a/fraud/features/stream_features/last_transaction_amount_pyspark.py b/fraud/features/stream_features/last_transaction_amount_pyspark.py index 9251a8b..565278d 100644 --- a/fraud/features/stream_features/last_transaction_amount_pyspark.py +++ b/fraud/features/stream_features/last_transaction_amount_pyspark.py @@ -1,4 +1,4 @@ -from tecton import stream_feature_view, FilteredSource, Attribute +from tecton import stream_feature_view, Attribute, AggregationLeadingEdge from tecton.types import Float64 from fraud.entities import user @@ -6,7 +6,7 @@ from datetime import datetime, timedelta @stream_feature_view( - source=transactions_stream, + source=transactions_stream.unfiltered(), entities=[user], mode='pyspark', online=False, @@ -18,7 +18,8 @@ features=[ Attribute('amt', Float64) ], - timestamp_field='timestamp' + timestamp_field='timestamp', + aggregation_leading_edge=AggregationLeadingEdge.LATEST_EVENT_TIME ) def last_transaction_amount_pyspark(transactions): from pyspark.sql import functions as f diff --git a/fraud/features/stream_features/last_transaction_amount_sql.py b/fraud/features/stream_features/last_transaction_amount_sql.py index c365ad9..5d03108 100644 --- a/fraud/features/stream_features/last_transaction_amount_sql.py +++ b/fraud/features/stream_features/last_transaction_amount_sql.py @@ -6,7 +6,7 @@ from datetime import datetime, timedelta @stream_feature_view( - source=transactions_stream, + source=transactions_stream.unfiltered(), entities=[user], mode='spark_sql', online=False, diff --git a/fraud/features/stream_features/user_continuous_transaction_count.py b/fraud/features/stream_features/user_continuous_transaction_count.py index 01749e2..aaffaae 100644 --- a/fraud/features/stream_features/user_continuous_transaction_count.py +++ b/fraud/features/stream_features/user_continuous_transaction_count.py @@ -1,5 +1,5 @@ -from tecton import stream_feature_view, FilteredSource, Aggregation, StreamProcessingMode, Attribute, Aggregate -from tecton.types import Float64, Int32, Field +from tecton import stream_feature_view, StreamProcessingMode, Aggregate +from tecton.types import Int32, Field from fraud.entities import user from fraud.data_sources.transactions import transactions_stream @@ -9,7 +9,7 @@ # It counts the number of non-fraudulent transactions per user over a 1min, 5min and 1h time window # The expected freshness for these features is <1second @stream_feature_view( - source=FilteredSource(transactions_stream), + source=transactions_stream, entities=[user], mode='spark_sql', stream_processing_mode=StreamProcessingMode.CONTINUOUS, diff --git a/fraud/features/stream_features/user_recent_transactions.py b/fraud/features/stream_features/user_recent_transactions.py index 2c44293..e71ba13 100644 --- a/fraud/features/stream_features/user_recent_transactions.py +++ b/fraud/features/stream_features/user_recent_transactions.py @@ -1,4 +1,4 @@ -from tecton import stream_feature_view, FilteredSource, Aggregate +from tecton import stream_feature_view, FilteredSource, Aggregate, AggregationLeadingEdge from tecton.aggregation_functions import last_distinct from tecton.types import Field, String @@ -9,7 +9,7 @@ # The following defines a sliding time window aggregation that collects the last N transaction amounts of a user @stream_feature_view( - source=FilteredSource(transactions_stream), + source=transactions_stream, entities=[user], mode='spark_sql', aggregation_interval=timedelta(minutes=10), # Defines how frequently feature values get updated in the online store @@ -24,7 +24,8 @@ prevent_destroy=False, # Set to True to prevent accidental destructive changes or downtime. tags={'release': 'production'}, owner='demo-user@tecton.ai', - description='Most recent 10 transaction amounts of a user' + description='Most recent 10 transaction amounts of a user', + aggregation_leading_edge=AggregationLeadingEdge.LATEST_EVENT_TIME ) def user_recent_transactions(transactions): return f''' diff --git a/fraud/features/stream_features/user_transaction_amount_metrics.py b/fraud/features/stream_features/user_transaction_amount_metrics.py index 5c2874c..30a1fd8 100644 --- a/fraud/features/stream_features/user_transaction_amount_metrics.py +++ b/fraud/features/stream_features/user_transaction_amount_metrics.py @@ -1,5 +1,5 @@ -from tecton import stream_feature_view, FilteredSource, Aggregation, Aggregate -from tecton.types import Field, Int64, Float64 +from tecton import stream_feature_view, Aggregate +from tecton.types import Field, Float64 from fraud.entities import user from fraud.data_sources.transactions import transactions_stream @@ -8,7 +8,7 @@ # The following defines several sliding time window aggregations over a user's transaction amounts @stream_feature_view( - source=FilteredSource(transactions_stream), + source=transactions_stream, entities=[user], mode='spark_sql', aggregation_interval=timedelta(minutes=10), # Defines how frequently feature values get updated in the online store diff --git a/recsys/features/batch_features/article_features.py b/recsys/features/batch_features/article_features.py index 6aada4b..0f9067f 100644 --- a/recsys/features/batch_features/article_features.py +++ b/recsys/features/batch_features/article_features.py @@ -9,7 +9,7 @@ @batch_feature_view( description="Unique sessions with article interactions", - sources=[sessions_batch], + sources=[sessions_batch.unfiltered()], entities=[article], mode="spark_sql", timestamp_field="ts", diff --git a/recsys/features/batch_features/session_features.py b/recsys/features/batch_features/session_features.py index f706889..5bde210 100644 --- a/recsys/features/batch_features/session_features.py +++ b/recsys/features/batch_features/session_features.py @@ -7,7 +7,7 @@ from datetime import datetime, timedelta @batch_feature_view( - sources=[sessions_batch], + sources=[sessions_batch.unfiltered()], entities=[session], mode='spark_sql', online=False, From dd5713c3ceb2ae648f4a715ff54e66d091b65c9c Mon Sep 17 00:00:00 2001 From: Lilly Date: Tue, 17 Sep 2024 09:46:05 -0700 Subject: [PATCH 09/23] hi --- ads/features/batch_features/ad_auction_keywords.py | 2 +- ads/features/stream_features/user_impression_counts.py | 2 +- fraud/features/batch_features/tutorial_features.py | 2 +- .../features/stream_features/last_transaction_amount_sql.py | 5 +++-- .../stream_features/user_continuous_transaction_count.py | 5 +++-- fraud/features/stream_features/user_recent_transactions.py | 2 +- .../stream_features/user_transaction_amount_metrics.py | 5 +++-- 7 files changed, 13 insertions(+), 10 deletions(-) diff --git a/ads/features/batch_features/ad_auction_keywords.py b/ads/features/batch_features/ad_auction_keywords.py index d301d36..9db0584 100644 --- a/ads/features/batch_features/ad_auction_keywords.py +++ b/ads/features/batch_features/ad_auction_keywords.py @@ -31,7 +31,7 @@ def keyword_stats(input_data, keyword_column): # array of words, then create metrics based on that array. @batch_feature_view( mode='pipeline', - sources=[ad_impressions_batch], + sources=[ad_impressions_batch.unfiltered()], entities=[auction], timestamp_field="timestamp", features=[ diff --git a/ads/features/stream_features/user_impression_counts.py b/ads/features/stream_features/user_impression_counts.py index a460ba5..5afe590 100644 --- a/ads/features/stream_features/user_impression_counts.py +++ b/ads/features/stream_features/user_impression_counts.py @@ -1,4 +1,4 @@ -from tecton import stream_feature_view, FilteredSource, Aggregation, Aggregate, AggregationLeadingEdge +from tecton import stream_feature_view, Aggregate, AggregationLeadingEdge from tecton.types import Int32, Field from ads.entities import user diff --git a/fraud/features/batch_features/tutorial_features.py b/fraud/features/batch_features/tutorial_features.py index 74bf234..0db2cb6 100644 --- a/fraud/features/batch_features/tutorial_features.py +++ b/fraud/features/batch_features/tutorial_features.py @@ -1,4 +1,4 @@ -from tecton import batch_feature_view, Aggregation, FilteredSource +from tecton import batch_feature_view from fraud.entities import user from fraud.data_sources.transactions import transactions_batch from datetime import datetime, timedelta diff --git a/fraud/features/stream_features/last_transaction_amount_sql.py b/fraud/features/stream_features/last_transaction_amount_sql.py index 5d03108..b73e664 100644 --- a/fraud/features/stream_features/last_transaction_amount_sql.py +++ b/fraud/features/stream_features/last_transaction_amount_sql.py @@ -1,4 +1,4 @@ -from tecton import stream_feature_view, FilteredSource, Attribute +from tecton import stream_feature_view, Attribute, AggregationLeadingEdge from tecton.types import Float64 from fraud.entities import user @@ -18,7 +18,8 @@ features=[ Attribute('amt', Float64) ], - timestamp_field='timestamp' + timestamp_field='timestamp', + aggregation_leading_edge=AggregationLeadingEdge.LATEST_EVENT_TIME ) def last_transaction_amount_sql(transactions): return f''' diff --git a/fraud/features/stream_features/user_continuous_transaction_count.py b/fraud/features/stream_features/user_continuous_transaction_count.py index aaffaae..e88178a 100644 --- a/fraud/features/stream_features/user_continuous_transaction_count.py +++ b/fraud/features/stream_features/user_continuous_transaction_count.py @@ -1,4 +1,4 @@ -from tecton import stream_feature_view, StreamProcessingMode, Aggregate +from tecton import stream_feature_view, StreamProcessingMode, Aggregate, AggregationLeadingEdge from tecton.types import Int32, Field from fraud.entities import user @@ -25,7 +25,8 @@ tags={'release': 'production'}, owner='demo-user@tecton.ai', description='Number of transactions a user has made recently', - timestamp_field='timestamp' + timestamp_field='timestamp', + aggregation_leading_edge=AggregationLeadingEdge.LATEST_EVENT_TIME ) def user_continuous_transaction_count(transactions): return f''' diff --git a/fraud/features/stream_features/user_recent_transactions.py b/fraud/features/stream_features/user_recent_transactions.py index e71ba13..a5d3675 100644 --- a/fraud/features/stream_features/user_recent_transactions.py +++ b/fraud/features/stream_features/user_recent_transactions.py @@ -1,4 +1,4 @@ -from tecton import stream_feature_view, FilteredSource, Aggregate, AggregationLeadingEdge +from tecton import stream_feature_view, Aggregate, AggregationLeadingEdge from tecton.aggregation_functions import last_distinct from tecton.types import Field, String diff --git a/fraud/features/stream_features/user_transaction_amount_metrics.py b/fraud/features/stream_features/user_transaction_amount_metrics.py index 30a1fd8..0ccd9a1 100644 --- a/fraud/features/stream_features/user_transaction_amount_metrics.py +++ b/fraud/features/stream_features/user_transaction_amount_metrics.py @@ -1,4 +1,4 @@ -from tecton import stream_feature_view, Aggregate +from tecton import stream_feature_view, Aggregate, AggregationLeadingEdge from tecton.types import Field, Float64 from fraud.entities import user @@ -27,7 +27,8 @@ feature_start_time=datetime(2022, 5, 1), tags={'release': 'production'}, owner='demo-user@tecton.ai', - description='Transaction amount statistics and total over a series of time windows, updated every 10 minutes.' + description='Transaction amount statistics and total over a series of time windows, updated every 10 minutes.', + aggregation_leading_edge=AggregationLeadingEdge.LATEST_EVENT_TIME ) def user_transaction_amount_metrics(transactions): return f''' From 4064cc5e59eac6098b858ca981b61962a1add711 Mon Sep 17 00:00:00 2001 From: Lilly Date: Tue, 17 Sep 2024 09:56:04 -0700 Subject: [PATCH 10/23] hi --- ads/data_sources/ad_impressions.py | 12 +++++++----- ads/features/batch_features/ad_auction_keywords.py | 2 +- .../user_ad_embedding_similarity.py | 4 ++-- .../user_query_embedding_similarity.py | 4 ++-- .../stream_features/content_keyword_clicks_push.py | 2 +- .../stream_features/user_click_counts_push.py | 2 +- .../user_distinct_merchant_transaction_count_30d.py | 2 +- .../on_demand_feature_views/complex_data_type.py | 4 ++-- .../transaction_amount_is_high.py | 4 ++-- .../transaction_amount_is_higher_than_average.py | 4 ++-- .../transaction_distance_from_home.py | 4 ++-- fraud/features/on_demand_feature_views/user_age.py | 4 ++-- .../on_demand_feature_views/user_to_user_distance.py | 4 ++-- 13 files changed, 27 insertions(+), 25 deletions(-) diff --git a/ads/data_sources/ad_impressions.py b/ads/data_sources/ad_impressions.py index fcfeab5..22d4860 100644 --- a/ads/data_sources/ad_impressions.py +++ b/ads/data_sources/ad_impressions.py @@ -1,4 +1,4 @@ -from tecton import HiveConfig, KinesisConfig, StreamSource, BatchSource, DatetimePartitionColumn, PushSource +from tecton import HiveConfig, KinesisConfig, StreamSource, BatchSource, DatetimePartitionColumn, PushConfig from datetime import timedelta from tecton.types import Field, Int64, String, Timestamp @@ -100,7 +100,7 @@ def ad_stream_translator(df): Field(name='timestamp', dtype=Timestamp), Field(name='clicked', dtype=Int64), ] -keyword_click_source = PushSource( +keyword_click_source = StreamSource( name="keyword_click_source", schema=input_schema, batch_config=ad_impressions_hive_config, @@ -109,7 +109,8 @@ def ad_stream_translator(df): batch config for backfilling and offline training data generation. """, owner="demo-user@tecton.ai", - tags={'release': 'staging'} + tags={'release': 'staging'}, + stream_config=PushConfig() ) user_schema = [ @@ -117,11 +118,12 @@ def ad_stream_translator(df): Field(name='timestamp', dtype=Timestamp), Field(name='clicked', dtype=Int64), ] -user_click_push_source = PushSource( +user_click_push_source = StreamSource( name="user_event_source", schema=user_schema, description="A push source for synchronous, online ingestion of ad-click events with user info.", owner="demo-user@tecton.ai", - tags={'release': 'staging'} + tags={'release': 'staging'}, + stream_config=PushConfig() ) diff --git a/ads/features/batch_features/ad_auction_keywords.py b/ads/features/batch_features/ad_auction_keywords.py index 9db0584..d301d36 100644 --- a/ads/features/batch_features/ad_auction_keywords.py +++ b/ads/features/batch_features/ad_auction_keywords.py @@ -31,7 +31,7 @@ def keyword_stats(input_data, keyword_column): # array of words, then create metrics based on that array. @batch_feature_view( mode='pipeline', - sources=[ad_impressions_batch.unfiltered()], + sources=[ad_impressions_batch], entities=[auction], timestamp_field="timestamp", features=[ diff --git a/ads/features/on_demand_feature_views/user_ad_embedding_similarity.py b/ads/features/on_demand_feature_views/user_ad_embedding_similarity.py index da40044..c62c21a 100644 --- a/ads/features/on_demand_feature_views/user_ad_embedding_similarity.py +++ b/ads/features/on_demand_feature_views/user_ad_embedding_similarity.py @@ -1,10 +1,10 @@ -from tecton import on_demand_feature_view, Attribute +from tecton import realtime_feature_view, Attribute from tecton.types import Float64 from ads.features.feature_tables.user_embeddings import user_embeddings from ads.features.feature_tables.ad_embeddings import ad_embeddings -@on_demand_feature_view( +@realtime_feature_view( sources=[ad_embeddings, user_embeddings], mode='python', features=[ diff --git a/ads/features/on_demand_feature_views/user_query_embedding_similarity.py b/ads/features/on_demand_feature_views/user_query_embedding_similarity.py index cd35eca..c53a7fc 100644 --- a/ads/features/on_demand_feature_views/user_query_embedding_similarity.py +++ b/ads/features/on_demand_feature_views/user_query_embedding_similarity.py @@ -1,4 +1,4 @@ -from tecton import RequestSource, on_demand_feature_view, Attribute +from tecton import RequestSource, realtime_feature_view, Attribute from tecton.types import Field, Array, Float64 from ads.features.feature_tables.user_embeddings import user_embeddings @@ -6,7 +6,7 @@ request_schema = [Field('query_embedding', Array(Float64))] request = RequestSource(schema=request_schema) -@on_demand_feature_view( +@realtime_feature_view( sources=[request, user_embeddings], mode='python', features=[Attribute(name='cosine_similarity', dtype=Float64)], diff --git a/ads/features/stream_features/content_keyword_clicks_push.py b/ads/features/stream_features/content_keyword_clicks_push.py index f9591ab..a04d90d 100644 --- a/ads/features/stream_features/content_keyword_clicks_push.py +++ b/ads/features/stream_features/content_keyword_clicks_push.py @@ -10,7 +10,7 @@ # https://docs.tecton.ai/using-the-ingestion-api/#creating-a-stream-feature-view-with-a-push-source content_keyword_click_counts_push = StreamFeatureView( name="keyword_clicks_fv", - source=keyword_click_source.unfiltered(), + source=keyword_click_source, entities=[content_keyword], online=True, offline=True, diff --git a/ads/features/stream_features/user_click_counts_push.py b/ads/features/stream_features/user_click_counts_push.py index 2646631..0b0b28d 100644 --- a/ads/features/stream_features/user_click_counts_push.py +++ b/ads/features/stream_features/user_click_counts_push.py @@ -1,5 +1,5 @@ from datetime import timedelta, datetime -from tecton import StreamFeatureView, Aggregation, Aggregate +from tecton import StreamFeatureView, Aggregate from tecton.types import Bool, Field, Int64 from ads.entities import user from ads.data_sources.ad_impressions import user_click_push_source diff --git a/fraud/features/batch_features/user_distinct_merchant_transaction_count_30d.py b/fraud/features/batch_features/user_distinct_merchant_transaction_count_30d.py index 3196933..6c5159e 100644 --- a/fraud/features/batch_features/user_distinct_merchant_transaction_count_30d.py +++ b/fraud/features/batch_features/user_distinct_merchant_transaction_count_30d.py @@ -12,7 +12,7 @@ # See this documentation for more info: # https://docs.tecton.ai/latest/overviews/framework/feature_views/batch/incremental_backfills.html. @batch_feature_view( - sources=[transactions_batch.select_range(start_time=TectonTimeConstant.MATERIALIZATION_START_TIME - timedelta(days=20), end_time=TectonTimeConstant.MATERIALIZATION_END_TIME)], + sources=[transactions_batch.select_range(start_time=TectonTimeConstant.MATERIALIZATION_START_TIME - timedelta(days=29), end_time=TectonTimeConstant.MATERIALIZATION_END_TIME)], entities=[user], mode='spark_sql', online=True, diff --git a/fraud/features/on_demand_feature_views/complex_data_type.py b/fraud/features/on_demand_feature_views/complex_data_type.py index acc8c1b..f3f3a3d 100644 --- a/fraud/features/on_demand_feature_views/complex_data_type.py +++ b/fraud/features/on_demand_feature_views/complex_data_type.py @@ -1,4 +1,4 @@ -from tecton import on_demand_feature_view, Attribute +from tecton import realtime_feature_view, Attribute from tecton import RequestSource from tecton.types import Array from tecton.types import Field @@ -30,7 +30,7 @@ )), ] -@on_demand_feature_view(mode="python", sources=[request_source], features=features) +@realtime_feature_view(mode="python", sources=[request_source], features=features) def complex_data_type_odfv(request): # Transform map value output_string_map = request["string_map"] diff --git a/fraud/features/on_demand_feature_views/transaction_amount_is_high.py b/fraud/features/on_demand_feature_views/transaction_amount_is_high.py index c126d23..ab04966 100644 --- a/fraud/features/on_demand_feature_views/transaction_amount_is_high.py +++ b/fraud/features/on_demand_feature_views/transaction_amount_is_high.py @@ -1,4 +1,4 @@ -from tecton import RequestSource, on_demand_feature_view, Attribute +from tecton import RequestSource, realtime_feature_view, Attribute from tecton.types import Float64, Field, Bool request_schema = [Field('amt', Float64)] @@ -6,7 +6,7 @@ features = [Attribute('transaction_amount_is_high', Bool)] # An example of an on-demand feature view that depends only on a request source. -@on_demand_feature_view( +@realtime_feature_view( sources=[transaction_request], mode='python', features=features, diff --git a/fraud/features/on_demand_feature_views/transaction_amount_is_higher_than_average.py b/fraud/features/on_demand_feature_views/transaction_amount_is_higher_than_average.py index f3ce43c..a17926a 100644 --- a/fraud/features/on_demand_feature_views/transaction_amount_is_higher_than_average.py +++ b/fraud/features/on_demand_feature_views/transaction_amount_is_higher_than_average.py @@ -1,4 +1,4 @@ -from tecton import RequestSource, on_demand_feature_view, Attribute +from tecton import RequestSource, realtime_feature_view, Attribute from tecton.types import String, Timestamp, Float64, Field, Bool from fraud.features.stream_features.user_transaction_amount_metrics import user_transaction_amount_metrics @@ -6,7 +6,7 @@ transaction_request = RequestSource(schema=request_schema) features = [Attribute('transaction_amount_is_higher_than_average', Bool)] -@on_demand_feature_view( +@realtime_feature_view( sources=[transaction_request, user_transaction_amount_metrics], mode='python', features=features, diff --git a/fraud/features/on_demand_feature_views/transaction_distance_from_home.py b/fraud/features/on_demand_feature_views/transaction_distance_from_home.py index b1c5809..4085d35 100644 --- a/fraud/features/on_demand_feature_views/transaction_distance_from_home.py +++ b/fraud/features/on_demand_feature_views/transaction_distance_from_home.py @@ -1,4 +1,4 @@ -from tecton import RequestSource, on_demand_feature_view, Attribute +from tecton import RequestSource, realtime_feature_view, Attribute from tecton.types import String, Timestamp, Float64, Field from fraud.features.batch_features.user_home_location import user_home_location @@ -9,7 +9,7 @@ request = RequestSource(schema=request_schema) features = [Attribute('dist_km', Float64)] -@on_demand_feature_view( +@realtime_feature_view( sources=[request, user_home_location], mode='python', features=features, diff --git a/fraud/features/on_demand_feature_views/user_age.py b/fraud/features/on_demand_feature_views/user_age.py index 2dcf381..eb60b5c 100644 --- a/fraud/features/on_demand_feature_views/user_age.py +++ b/fraud/features/on_demand_feature_views/user_age.py @@ -1,4 +1,4 @@ -from tecton import RequestSource, on_demand_feature_view, Attribute +from tecton import RequestSource, realtime_feature_view, Attribute from tecton.types import String, Timestamp, Int64, Field from fraud.features.batch_features.user_date_of_birth import user_date_of_birth @@ -7,7 +7,7 @@ request = RequestSource(schema=request_schema) features = [Attribute('user_age', Int64)] -@on_demand_feature_view( +@realtime_feature_view( sources=[request, user_date_of_birth], mode='python', features=features, diff --git a/fraud/features/on_demand_feature_views/user_to_user_distance.py b/fraud/features/on_demand_feature_views/user_to_user_distance.py index 359a409..00f5536 100644 --- a/fraud/features/on_demand_feature_views/user_to_user_distance.py +++ b/fraud/features/on_demand_feature_views/user_to_user_distance.py @@ -1,10 +1,10 @@ -from tecton import RequestSource, on_demand_feature_view, Attribute +from tecton import RequestSource, realtime_feature_view, Attribute from tecton.types import String, Timestamp, Float64, Field from fraud.features.batch_features.user_home_location import user_home_location # This on-demand feature uses the same user feature view as two separate inputs. The join keys # for the on-demand feature view are a "sender_id" and "recipient_id". -@on_demand_feature_view( +@realtime_feature_view( sources=[ user_home_location, user_home_location.with_join_key_map({"user_id": "recipient_id"}), From da750277080839e33b3fbd8bdd872e4aaa07551e Mon Sep 17 00:00:00 2001 From: Lilly Date: Tue, 17 Sep 2024 10:01:01 -0700 Subject: [PATCH 11/23] update --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index b70902c..f6d28b4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1 @@ -tecton[pyspark]~=0.10.0b30 +tecton[pyspark]~=1.0.0 From e66630f56162da8b61b74a096df03ffa4465cb5b Mon Sep 17 00:00:00 2001 From: Lilly Date: Tue, 17 Sep 2024 10:06:33 -0700 Subject: [PATCH 12/23] tri --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index f6d28b4..6c19812 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1 @@ -tecton[pyspark]~=1.0.0 +tecton~=1.0.0 From c11c0a1630b4a53e0b3d5727cbd6f4b2cd48f150 Mon Sep 17 00:00:00 2001 From: Lilly Date: Tue, 17 Sep 2024 10:08:33 -0700 Subject: [PATCH 13/23] hi --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 6c19812..f6d28b4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1 @@ -tecton~=1.0.0 +tecton[pyspark]~=1.0.0 From 95e626a7d3213d94307b013bd4c98030c8b2a8b0 Mon Sep 17 00:00:00 2001 From: Lilly Date: Tue, 17 Sep 2024 10:21:55 -0700 Subject: [PATCH 14/23] wah --- .../stream_features/tests/test_user_recent_transactions.py | 1 + 1 file changed, 1 insertion(+) diff --git a/fraud/features/stream_features/tests/test_user_recent_transactions.py b/fraud/features/stream_features/tests/test_user_recent_transactions.py index afd09cf..0e160c2 100644 --- a/fraud/features/stream_features/tests/test_user_recent_transactions.py +++ b/fraud/features/stream_features/tests/test_user_recent_transactions.py @@ -15,6 +15,7 @@ # session is needed, then you can create your own and set it with `tecton.set_tecton_spark_session()`. @pytest.mark.skipif(os.environ.get("TECTON_TEST_SPARK") is None, reason="Requires JDK installation and $JAVA_HOME env variable to run, so we skip unless user sets the `TECTON_TEST_SPARK` env var.") def test_user_recent_transactions(my_custom_spark_session): + print(f'falala {pandas.__version__}') input_pandas_df = pandas.DataFrame({ "user_id": ["user_1", "user_1", "user_1", "user_2"], "timestamp": [datetime(2022, 5, 1)] * 4, From c8dde2fbef40b4aa0b0b4cbec4cf6e2c9ca08e83 Mon Sep 17 00:00:00 2001 From: Lilly Date: Tue, 17 Sep 2024 10:36:59 -0700 Subject: [PATCH 15/23] hi --- .../stream_features/tests/test_user_recent_transactions.py | 2 +- requirements.txt | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/fraud/features/stream_features/tests/test_user_recent_transactions.py b/fraud/features/stream_features/tests/test_user_recent_transactions.py index 0e160c2..39ceb91 100644 --- a/fraud/features/stream_features/tests/test_user_recent_transactions.py +++ b/fraud/features/stream_features/tests/test_user_recent_transactions.py @@ -15,7 +15,7 @@ # session is needed, then you can create your own and set it with `tecton.set_tecton_spark_session()`. @pytest.mark.skipif(os.environ.get("TECTON_TEST_SPARK") is None, reason="Requires JDK installation and $JAVA_HOME env variable to run, so we skip unless user sets the `TECTON_TEST_SPARK` env var.") def test_user_recent_transactions(my_custom_spark_session): - print(f'falala {pandas.__version__}') + print(f'flalal {pandas.__version__}') input_pandas_df = pandas.DataFrame({ "user_id": ["user_1", "user_1", "user_1", "user_2"], "timestamp": [datetime(2022, 5, 1)] * 4, diff --git a/requirements.txt b/requirements.txt index f6d28b4..0d1c46b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ tecton[pyspark]~=1.0.0 +pandas<2.0 \ No newline at end of file From 035956adac1d9939915cf1c4288c6bdac0a49250 Mon Sep 17 00:00:00 2001 From: Lilly Date: Tue, 17 Sep 2024 10:45:46 -0700 Subject: [PATCH 16/23] tri --- .../batch_features/tests/test_user_credit_card_issuer.py | 3 ++- .../tests/test_user_distinct_merchant_transaction_count_30d.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/fraud/features/batch_features/tests/test_user_credit_card_issuer.py b/fraud/features/batch_features/tests/test_user_credit_card_issuer.py index fc83c5a..6f8b767 100644 --- a/fraud/features/batch_features/tests/test_user_credit_card_issuer.py +++ b/fraud/features/batch_features/tests/test_user_credit_card_issuer.py @@ -3,6 +3,7 @@ import pandas import pytest +import pytz from fraud.features.batch_features.user_credit_card_issuer import user_credit_card_issuer @@ -29,7 +30,7 @@ def test_user_credit_card_issuer_run(tecton_pytest_spark_session): expected = pandas.DataFrame({ "user_id": ["user_1", "user_2", "user_3", "user_4"], - "signup_timestamp": [datetime(2022, 5, 1)] * 4, + "signup_timestamp": [pytz.UTC.localize(datetime(2022, 5, 1))] * 4, "credit_card_issuer": ["other", "Visa", "MasterCard", "Discover"], }) diff --git a/fraud/features/batch_features/tests/test_user_distinct_merchant_transaction_count_30d.py b/fraud/features/batch_features/tests/test_user_distinct_merchant_transaction_count_30d.py index ed095b0..5363b7d 100644 --- a/fraud/features/batch_features/tests/test_user_distinct_merchant_transaction_count_30d.py +++ b/fraud/features/batch_features/tests/test_user_distinct_merchant_transaction_count_30d.py @@ -3,6 +3,7 @@ import pandas import pytest +import pytz from fraud.features.batch_features.user_distinct_merchant_transaction_count_30d import user_distinct_merchant_transaction_count_30d @@ -34,7 +35,7 @@ def test_user_distinct_merchant_transaction_count_30d(tecton_pytest_spark_sessio expected = pandas.DataFrame({ "user_id": ["user_1", "user_2"], - "timestamp": [datetime(2022, 5, 2) - timedelta(microseconds=1), datetime(2022, 5, 2) - timedelta(microseconds=1)], + "timestamp": [pytz.UTC.localize(datetime(2022, 5, 2) - timedelta(microseconds=1), datetime(2022, 5, 2) - timedelta(microseconds=1))], "distinct_merchant_transaction_count_30d": [2, 1], }) From 1047190f91667f40efb45ae82bb2de1d574a1b50 Mon Sep 17 00:00:00 2001 From: Lilly Date: Tue, 17 Sep 2024 11:24:40 -0700 Subject: [PATCH 17/23] fix dis --- ...distinct_merchant_transaction_count_30d.py | 27 ++++++++++++------- .../tests/test_user_recent_transactions.py | 26 +++++++++--------- requirements.txt | 3 +-- 3 files changed, 32 insertions(+), 24 deletions(-) diff --git a/fraud/features/batch_features/tests/test_user_distinct_merchant_transaction_count_30d.py b/fraud/features/batch_features/tests/test_user_distinct_merchant_transaction_count_30d.py index 5363b7d..c05df48 100644 --- a/fraud/features/batch_features/tests/test_user_distinct_merchant_transaction_count_30d.py +++ b/fraud/features/batch_features/tests/test_user_distinct_merchant_transaction_count_30d.py @@ -14,15 +14,22 @@ @pytest.mark.skipif(os.environ.get("TECTON_TEST_SPARK") is None, reason="Requires JDK installation and $JAVA_HOME env variable to run, so we skip unless user sets the `TECTON_TEST_SPARK` env var.") def test_user_distinct_merchant_transaction_count_30d(tecton_pytest_spark_session): timestamps = [datetime(2022, 4, 10), datetime(2022, 4, 25), datetime(2022, 5, 1)] - input_pandas_df = pandas.DataFrame({ - "user_id": ["user_1", "user_1", "user_2"], - "merchant": ["merchant_1", "merchant_2", "merchant_1"], - "timestamp": timestamps, - "partition_0": [ts.year for ts in timestamps], - "partition_1": [ts.month for ts in timestamps], - "partition_2": [ts.day for ts in timestamps], - }) - input_spark_df = tecton_pytest_spark_session.createDataFrame(input_pandas_df) + + data = [ + ( + "user_1", "merchant_1", timestamps[0], timestamps[0].year, timestamps[0].month, timestamps[0].day + ), + ( + "user_1", "merchant_2", timestamps[1], timestamps[1].year, timestamps[1].month, timestamps[1].day + ), + ( + "user_2", "merchant_1", timestamps[2], timestamps[2].year, timestamps[2].month, timestamps[2].day + ), + ] + + schema = ["user_id", "merchant", "timestamp", "partition_0", "partition_1", "partition_2"] + + input_spark_df = tecton_pytest_spark_session.createDataFrame(data, schema) # Simulate materializing features for May 1st. output = user_distinct_merchant_transaction_count_30d.test_run( @@ -35,7 +42,7 @@ def test_user_distinct_merchant_transaction_count_30d(tecton_pytest_spark_sessio expected = pandas.DataFrame({ "user_id": ["user_1", "user_2"], - "timestamp": [pytz.UTC.localize(datetime(2022, 5, 2) - timedelta(microseconds=1), datetime(2022, 5, 2) - timedelta(microseconds=1))], + "timestamp": [pytz.UTC.localize(datetime(2022, 5, 2) - timedelta(microseconds=1)), pytz.UTC.localize(datetime(2022, 5, 2) - timedelta(microseconds=1))], "distinct_merchant_transaction_count_30d": [2, 1], }) diff --git a/fraud/features/stream_features/tests/test_user_recent_transactions.py b/fraud/features/stream_features/tests/test_user_recent_transactions.py index 39ceb91..edcdf5e 100644 --- a/fraud/features/stream_features/tests/test_user_recent_transactions.py +++ b/fraud/features/stream_features/tests/test_user_recent_transactions.py @@ -3,6 +3,7 @@ import pandas import pytest +import pytz import tecton from fraud.features.stream_features.user_recent_transactions import user_recent_transactions @@ -15,16 +16,17 @@ # session is needed, then you can create your own and set it with `tecton.set_tecton_spark_session()`. @pytest.mark.skipif(os.environ.get("TECTON_TEST_SPARK") is None, reason="Requires JDK installation and $JAVA_HOME env variable to run, so we skip unless user sets the `TECTON_TEST_SPARK` env var.") def test_user_recent_transactions(my_custom_spark_session): - print(f'flalal {pandas.__version__}') - input_pandas_df = pandas.DataFrame({ - "user_id": ["user_1", "user_1", "user_1", "user_2"], - "timestamp": [datetime(2022, 5, 1)] * 4, - "amt": [100, 200, 300, 400], - "partition_0": ["2022"] * 4, - "partition_1": ["05"] * 4, - "partition_2": ["01"] * 4, - }) - input_spark_df = my_custom_spark_session.createDataFrame(input_pandas_df) + data = [ + ("user_1", datetime(2022, 5, 1), 100, "2022", "05", "01"), + ("user_1", datetime(2022, 5, 1), 200, "2022", "05", "01"), + ("user_1", datetime(2022, 5, 1), 300, "2022", "05", "01"), + ("user_2", datetime(2022, 5, 1), 400, "2022", "05", "01") + ] + + # Define the schema (column names) + schema = ["user_id", "timestamp", "amt", "partition_0", "partition_1", "partition_2"] + + input_spark_df = my_custom_spark_session.createDataFrame(data, schema) # Simulate materializing features for May 1st. output = user_recent_transactions.test_run( @@ -37,10 +39,10 @@ def test_user_recent_transactions(my_custom_spark_session): expected = pandas.DataFrame({ "user_id": ["user_1", "user_2"], - "amt_last_distinct_10_1h_10m": [["300", "200", "100"], ["400"]], + "amt_last_distinct_10_1h_10m": [["100", "200", "300"], ["400"]], # The result timestamp is rounded up to the nearest aggregation interval "end time". The aggregation interval # is ten minutes for this feature view. - "timestamp": [datetime(2022, 5, 1, 0, 10)] * 2, + "timestamp": [pytz.UTC.localize(datetime(2022, 5, 1, 7, 10))] * 2, }) pandas.testing.assert_frame_equal(actual, expected) diff --git a/requirements.txt b/requirements.txt index 0d1c46b..9e8ab75 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1 @@ -tecton[pyspark]~=1.0.0 -pandas<2.0 \ No newline at end of file +tecton[pyspark]~=1.0.0 \ No newline at end of file From 25269dd4c42c9f3a663e7b14241aa6f22ccf125c Mon Sep 17 00:00:00 2001 From: Lilly Date: Tue, 17 Sep 2024 11:55:10 -0700 Subject: [PATCH 18/23] fix --- .../tests/test_user_credit_card_issuer.py | 48 +++++++++++++------ ...distinct_merchant_transaction_count_30d.py | 1 + .../tests/test_user_recent_transactions.py | 1 + 3 files changed, 36 insertions(+), 14 deletions(-) diff --git a/fraud/features/batch_features/tests/test_user_credit_card_issuer.py b/fraud/features/batch_features/tests/test_user_credit_card_issuer.py index 6f8b767..c33e676 100644 --- a/fraud/features/batch_features/tests/test_user_credit_card_issuer.py +++ b/fraud/features/batch_features/tests/test_user_credit_card_issuer.py @@ -13,12 +13,22 @@ # session is needed, then you can create your own and set it with `tecton.set_tecton_spark_session()`. @pytest.mark.skipif(os.environ.get("TECTON_TEST_SPARK") is None, reason="Requires JDK installation and $JAVA_HOME env variable to run, so we skip unless user sets the `TECTON_TEST_SPARK` env var.") def test_user_credit_card_issuer_run(tecton_pytest_spark_session): - input_pandas_df = pandas.DataFrame({ - "user_id": ["user_1", "user_2", "user_3", "user_4"], - "signup_timestamp": [datetime(2022, 5, 1)] * 4, - "cc_num": [1000000000000000, 4000000000000000, 5000000000000000, 6000000000000000], - }) - input_spark_df = tecton_pytest_spark_session.createDataFrame(input_pandas_df) + # Same signup timestamp for all users + signup_timestamp = datetime(2022, 5, 1) + + # Create the data + data = [ + ("user_1", signup_timestamp, 1000000000000000), + ("user_2", signup_timestamp, 4000000000000000), + ("user_3", signup_timestamp, 5000000000000000), + ("user_4", signup_timestamp, 6000000000000000) + ] + + # Define schema: user_id, signup_timestamp, cc_num + schema = ["user_id", "signup_timestamp", "cc_num"] + + # Convert to Spark DataFrame + input_spark_df = tecton_pytest_spark_session.createDataFrame(data, schema) # Simulate materializing features for May 1st. output = user_credit_card_issuer.test_run( @@ -30,9 +40,10 @@ def test_user_credit_card_issuer_run(tecton_pytest_spark_session): expected = pandas.DataFrame({ "user_id": ["user_1", "user_2", "user_3", "user_4"], - "signup_timestamp": [pytz.UTC.localize(datetime(2022, 5, 1))] * 4, + "signup_timestamp": [pytz.UTC.localize(datetime(2022, 5, 1, 0, 0, 0, 0))] * 4, "credit_card_issuer": ["other", "Visa", "MasterCard", "Discover"], }) + expected['signup_timestamp'] = expected['signup_timestamp'].astype('datetime64[us]') pandas.testing.assert_frame_equal(actual, expected) @@ -40,12 +51,20 @@ def test_user_credit_card_issuer_run(tecton_pytest_spark_session): @pytest.mark.skipif(os.environ.get("TECTON_TEST_SPARK") is None, reason="Requires JDK installation and $JAVA_HOME env variable to run, so we skip unless user sets the `TECTON_TEST_SPARK` env var.") def test_user_credit_card_issuer_ghf(tecton_pytest_spark_session): - input_pandas_df = pandas.DataFrame({ - "user_id": ["user_1", "user_2", "user_3", "user_4"], - "signup_timestamp": [datetime(2022, 5, 1)] * 4, - "cc_num": [1000000000000000, 4000000000000000, 5000000000000000, 6000000000000000], - }) - input_spark_df = tecton_pytest_spark_session.createDataFrame(input_pandas_df) + signup_timestamp = datetime(2022, 5, 1) + + data = [ + ("user_1", signup_timestamp, 1000000000000000), + ("user_2", signup_timestamp, 4000000000000000), + ("user_3", signup_timestamp, 5000000000000000), + ("user_4", signup_timestamp, 6000000000000000) + ] + + # Define schema: user_id, signup_timestamp, cc_num + schema = ["user_id", "signup_timestamp", "cc_num"] + + # Convert to Spark DataFrame + input_spark_df = tecton_pytest_spark_session.createDataFrame(data, schema) spine_df = pandas.DataFrame({ "user_id": ["user_1", "user_1", "user_2", "user_not_found"], @@ -59,9 +78,10 @@ def test_user_credit_card_issuer_ghf(tecton_pytest_spark_session): expected = pandas.DataFrame({ "user_id": ["user_1", "user_1", "user_2", "user_not_found"], - "timestamp": [datetime(2022, 5, 1), datetime(2022, 5, 2), datetime(2022, 6, 1), datetime(2022, 6, 1)], + "timestamp": [pytz.UTC.localize(datetime(2022, 5, 1)), pytz.UTC.localize(datetime(2022, 5, 2)), pytz.UTC.localize(datetime(2022, 6, 1)), pytz.UTC.localize(datetime(2022, 6, 1))], "user_credit_card_issuer__credit_card_issuer": [None, "other", "Visa", None], }) + expected['signup_timestamp'] = expected['signup_timestamp'].astype('datetime64[us]') # NOTE: because the Spark join has non-deterministic ordering, it is important to # sort the dataframe to avoid test flakes. diff --git a/fraud/features/batch_features/tests/test_user_distinct_merchant_transaction_count_30d.py b/fraud/features/batch_features/tests/test_user_distinct_merchant_transaction_count_30d.py index c05df48..dfb4985 100644 --- a/fraud/features/batch_features/tests/test_user_distinct_merchant_transaction_count_30d.py +++ b/fraud/features/batch_features/tests/test_user_distinct_merchant_transaction_count_30d.py @@ -45,6 +45,7 @@ def test_user_distinct_merchant_transaction_count_30d(tecton_pytest_spark_sessio "timestamp": [pytz.UTC.localize(datetime(2022, 5, 2) - timedelta(microseconds=1)), pytz.UTC.localize(datetime(2022, 5, 2) - timedelta(microseconds=1))], "distinct_merchant_transaction_count_30d": [2, 1], }) + expected['timestamp'] = expected['timestamp'].astype('datetime64[us]') pandas.testing.assert_frame_equal(actual, expected) diff --git a/fraud/features/stream_features/tests/test_user_recent_transactions.py b/fraud/features/stream_features/tests/test_user_recent_transactions.py index edcdf5e..706d788 100644 --- a/fraud/features/stream_features/tests/test_user_recent_transactions.py +++ b/fraud/features/stream_features/tests/test_user_recent_transactions.py @@ -44,6 +44,7 @@ def test_user_recent_transactions(my_custom_spark_session): # is ten minutes for this feature view. "timestamp": [pytz.UTC.localize(datetime(2022, 5, 1, 7, 10))] * 2, }) + expected['timestamp'] = expected['timestamp'].astype('datetime64[us]') pandas.testing.assert_frame_equal(actual, expected) From ca05d857b0685ef9f0946e872f210dada5214f21 Mon Sep 17 00:00:00 2001 From: Lilly Date: Tue, 17 Sep 2024 11:58:41 -0700 Subject: [PATCH 19/23] fix --- .../batch_features/tests/test_user_credit_card_issuer.py | 4 ++-- .../test_user_distinct_merchant_transaction_count_30d.py | 2 +- .../stream_features/tests/test_user_recent_transactions.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/fraud/features/batch_features/tests/test_user_credit_card_issuer.py b/fraud/features/batch_features/tests/test_user_credit_card_issuer.py index c33e676..c5de2e7 100644 --- a/fraud/features/batch_features/tests/test_user_credit_card_issuer.py +++ b/fraud/features/batch_features/tests/test_user_credit_card_issuer.py @@ -43,7 +43,7 @@ def test_user_credit_card_issuer_run(tecton_pytest_spark_session): "signup_timestamp": [pytz.UTC.localize(datetime(2022, 5, 1, 0, 0, 0, 0))] * 4, "credit_card_issuer": ["other", "Visa", "MasterCard", "Discover"], }) - expected['signup_timestamp'] = expected['signup_timestamp'].astype('datetime64[us]') + expected['signup_timestamp'] = expected['signup_timestamp'].astype('datetime64[us, UTC]') pandas.testing.assert_frame_equal(actual, expected) @@ -81,7 +81,7 @@ def test_user_credit_card_issuer_ghf(tecton_pytest_spark_session): "timestamp": [pytz.UTC.localize(datetime(2022, 5, 1)), pytz.UTC.localize(datetime(2022, 5, 2)), pytz.UTC.localize(datetime(2022, 6, 1)), pytz.UTC.localize(datetime(2022, 6, 1))], "user_credit_card_issuer__credit_card_issuer": [None, "other", "Visa", None], }) - expected['signup_timestamp'] = expected['signup_timestamp'].astype('datetime64[us]') + expected['timestamp'] = expected['timestamp'].astype('datetime64[us, UTC]') # NOTE: because the Spark join has non-deterministic ordering, it is important to # sort the dataframe to avoid test flakes. diff --git a/fraud/features/batch_features/tests/test_user_distinct_merchant_transaction_count_30d.py b/fraud/features/batch_features/tests/test_user_distinct_merchant_transaction_count_30d.py index dfb4985..e5c14fd 100644 --- a/fraud/features/batch_features/tests/test_user_distinct_merchant_transaction_count_30d.py +++ b/fraud/features/batch_features/tests/test_user_distinct_merchant_transaction_count_30d.py @@ -45,7 +45,7 @@ def test_user_distinct_merchant_transaction_count_30d(tecton_pytest_spark_sessio "timestamp": [pytz.UTC.localize(datetime(2022, 5, 2) - timedelta(microseconds=1)), pytz.UTC.localize(datetime(2022, 5, 2) - timedelta(microseconds=1))], "distinct_merchant_transaction_count_30d": [2, 1], }) - expected['timestamp'] = expected['timestamp'].astype('datetime64[us]') + expected['timestamp'] = expected['timestamp'].astype('datetime64[us, UTC]') pandas.testing.assert_frame_equal(actual, expected) diff --git a/fraud/features/stream_features/tests/test_user_recent_transactions.py b/fraud/features/stream_features/tests/test_user_recent_transactions.py index 706d788..d2acdfc 100644 --- a/fraud/features/stream_features/tests/test_user_recent_transactions.py +++ b/fraud/features/stream_features/tests/test_user_recent_transactions.py @@ -44,7 +44,7 @@ def test_user_recent_transactions(my_custom_spark_session): # is ten minutes for this feature view. "timestamp": [pytz.UTC.localize(datetime(2022, 5, 1, 7, 10))] * 2, }) - expected['timestamp'] = expected['timestamp'].astype('datetime64[us]') + expected['timestamp'] = expected['timestamp'].astype('datetime64[us, UTC]') pandas.testing.assert_frame_equal(actual, expected) From 878a4479f3ae46632af738d766c1b78cd84b9a8f Mon Sep 17 00:00:00 2001 From: Lilly Date: Tue, 17 Sep 2024 12:01:56 -0700 Subject: [PATCH 20/23] fix --- .../stream_features/tests/test_user_recent_transactions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fraud/features/stream_features/tests/test_user_recent_transactions.py b/fraud/features/stream_features/tests/test_user_recent_transactions.py index d2acdfc..ba40c42 100644 --- a/fraud/features/stream_features/tests/test_user_recent_transactions.py +++ b/fraud/features/stream_features/tests/test_user_recent_transactions.py @@ -42,7 +42,7 @@ def test_user_recent_transactions(my_custom_spark_session): "amt_last_distinct_10_1h_10m": [["100", "200", "300"], ["400"]], # The result timestamp is rounded up to the nearest aggregation interval "end time". The aggregation interval # is ten minutes for this feature view. - "timestamp": [pytz.UTC.localize(datetime(2022, 5, 1, 7, 10))] * 2, + "timestamp": [pytz.UTC.localize(datetime(2022, 5, 1, 0, 10))] * 2, }) expected['timestamp'] = expected['timestamp'].astype('datetime64[us, UTC]') From ff47c1d7d396a0a41adad546df12814b8676a9a7 Mon Sep 17 00:00:00 2001 From: Lilly Date: Tue, 17 Sep 2024 13:14:57 -0700 Subject: [PATCH 21/23] hi --- .../batch_features/tests/test_user_credit_card_issuer.py | 6 +++--- .../test_user_distinct_merchant_transaction_count_30d.py | 4 ++-- .../stream_features/tests/test_user_recent_transactions.py | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/fraud/features/batch_features/tests/test_user_credit_card_issuer.py b/fraud/features/batch_features/tests/test_user_credit_card_issuer.py index c5de2e7..dcc5eaa 100644 --- a/fraud/features/batch_features/tests/test_user_credit_card_issuer.py +++ b/fraud/features/batch_features/tests/test_user_credit_card_issuer.py @@ -1,5 +1,5 @@ import os -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone import pandas import pytest @@ -40,7 +40,7 @@ def test_user_credit_card_issuer_run(tecton_pytest_spark_session): expected = pandas.DataFrame({ "user_id": ["user_1", "user_2", "user_3", "user_4"], - "signup_timestamp": [pytz.UTC.localize(datetime(2022, 5, 1, 0, 0, 0, 0))] * 4, + "signup_timestamp": [datetime(2022, 5, 1, 0, 0, 0, 0, tzinfo=timezone.utc)] * 4, "credit_card_issuer": ["other", "Visa", "MasterCard", "Discover"], }) expected['signup_timestamp'] = expected['signup_timestamp'].astype('datetime64[us, UTC]') @@ -78,7 +78,7 @@ def test_user_credit_card_issuer_ghf(tecton_pytest_spark_session): expected = pandas.DataFrame({ "user_id": ["user_1", "user_1", "user_2", "user_not_found"], - "timestamp": [pytz.UTC.localize(datetime(2022, 5, 1)), pytz.UTC.localize(datetime(2022, 5, 2)), pytz.UTC.localize(datetime(2022, 6, 1)), pytz.UTC.localize(datetime(2022, 6, 1))], + "timestamp": [datetime(2022, 5, 1, tzinfo=timezone.utc), datetime(2022, 5, 2, tzinfo=timezone.utc), datetime(2022, 6, 1, tzinfo=timezone.utc), datetime(2022, 6, 1, tzinfo=timezone.utc)], "user_credit_card_issuer__credit_card_issuer": [None, "other", "Visa", None], }) expected['timestamp'] = expected['timestamp'].astype('datetime64[us, UTC]') diff --git a/fraud/features/batch_features/tests/test_user_distinct_merchant_transaction_count_30d.py b/fraud/features/batch_features/tests/test_user_distinct_merchant_transaction_count_30d.py index e5c14fd..665b682 100644 --- a/fraud/features/batch_features/tests/test_user_distinct_merchant_transaction_count_30d.py +++ b/fraud/features/batch_features/tests/test_user_distinct_merchant_transaction_count_30d.py @@ -1,5 +1,5 @@ import os -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone import pandas import pytest @@ -42,7 +42,7 @@ def test_user_distinct_merchant_transaction_count_30d(tecton_pytest_spark_sessio expected = pandas.DataFrame({ "user_id": ["user_1", "user_2"], - "timestamp": [pytz.UTC.localize(datetime(2022, 5, 2) - timedelta(microseconds=1)), pytz.UTC.localize(datetime(2022, 5, 2) - timedelta(microseconds=1))], + "timestamp": [datetime(2022, 5, 2, tzinfo=timezone.utc) - timedelta(microseconds=1), datetime(2022, 5, 2, tzinfo=timezone.utc) - timedelta(microseconds=1)], "distinct_merchant_transaction_count_30d": [2, 1], }) expected['timestamp'] = expected['timestamp'].astype('datetime64[us, UTC]') diff --git a/fraud/features/stream_features/tests/test_user_recent_transactions.py b/fraud/features/stream_features/tests/test_user_recent_transactions.py index ba40c42..8f631c2 100644 --- a/fraud/features/stream_features/tests/test_user_recent_transactions.py +++ b/fraud/features/stream_features/tests/test_user_recent_transactions.py @@ -42,7 +42,7 @@ def test_user_recent_transactions(my_custom_spark_session): "amt_last_distinct_10_1h_10m": [["100", "200", "300"], ["400"]], # The result timestamp is rounded up to the nearest aggregation interval "end time". The aggregation interval # is ten minutes for this feature view. - "timestamp": [pytz.UTC.localize(datetime(2022, 5, 1, 0, 10))] * 2, + "timestamp": [(datetime(2022, 5, 1, 0, 10, tzinfo=timezone.utc))] * 2, }) expected['timestamp'] = expected['timestamp'].astype('datetime64[us, UTC]') From 018b8ac5c5fbd5348d9139ec5c95e1ca9aa50201 Mon Sep 17 00:00:00 2001 From: Nick Joung Date: Tue, 17 Sep 2024 16:22:06 -0400 Subject: [PATCH 22/23] Update fraud/features/stream_features/tests/test_user_recent_transactions.py --- .../stream_features/tests/test_user_recent_transactions.py | 1 - 1 file changed, 1 deletion(-) diff --git a/fraud/features/stream_features/tests/test_user_recent_transactions.py b/fraud/features/stream_features/tests/test_user_recent_transactions.py index 8f631c2..7671f17 100644 --- a/fraud/features/stream_features/tests/test_user_recent_transactions.py +++ b/fraud/features/stream_features/tests/test_user_recent_transactions.py @@ -23,7 +23,6 @@ def test_user_recent_transactions(my_custom_spark_session): ("user_2", datetime(2022, 5, 1), 400, "2022", "05", "01") ] - # Define the schema (column names) schema = ["user_id", "timestamp", "amt", "partition_0", "partition_1", "partition_2"] input_spark_df = my_custom_spark_session.createDataFrame(data, schema) From cd5ff67ab0ecb9abdb123a21453b32d0e95ca55a Mon Sep 17 00:00:00 2001 From: Lilly Date: Tue, 17 Sep 2024 13:28:07 -0700 Subject: [PATCH 23/23] rm --- repo.yaml | 2 -- 1 file changed, 2 deletions(-) diff --git a/repo.yaml b/repo.yaml index 580716d..6426474 100644 --- a/repo.yaml +++ b/repo.yaml @@ -12,10 +12,8 @@ defaults: batch_feature_view: tecton_materialization_runtime: 1.0.0 - environment: tecton-rift-core-1.0.0 # For Rift-based Batch Feature Views stream_feature_view: tecton_materialization_runtime: 1.0.0 - environment: tecton-rift-core-1.0.0 # For Rift-based Stream Feature Views feature_table: tecton_materialization_runtime: 1.0.0