Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update sample repo for 1.0 #73

Merged
merged 26 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions ads/entities.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,40 @@
from tecton import Entity

from tecton.types import Field, String, Int64

ad = Entity(
name='ad',
join_keys=['ad_id'],
join_keys=[Field('ad_id', Int64)],
description='An ad',
owner='demo-user@tecton.ai',
tags={'release': 'production'}
)

content = Entity(
name="content",
join_keys=["content_id"],
join_keys=[Field('content_id', String)],
description='Content ID',
owner='demo-user@tecton.ai',
tags={'release': 'production'}
)

auction = Entity(
name="auction",
join_keys=["auction_id"],
join_keys=[Field('auction_id', String)],
description='Auction ID',
owner='demo-user@tecton.ai',
)

user = Entity(
name='ads_user',
join_keys=['user_id'],
join_keys=[Field('user_id', String)],
description='A user of the platform',
owner='demo-user@tecton.ai',
tags={'release': 'production'}
)

content_keyword = Entity(
name='ContentKeyword',
join_keys=['content_keyword'],
join_keys=[Field('content_keyword', String)],
description='The keyword describing the content this ad is being placed alongside.',
owner='demo-user@tecton.ai',
tags={'release': 'production'}
Expand Down
9 changes: 8 additions & 1 deletion ads/features/batch_features/ad_auction_keywords.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from tecton import transformation, FilteredSource, batch_feature_view, const
from tecton import transformation, FilteredSource, batch_feature_view, const, Attribute
from tecton.types import String, Array, Int32, Bool
from ads.entities import auction
from ads.data_sources.ad_impressions import ad_impressions_batch
from datetime import datetime, timedelta
Expand Down Expand Up @@ -32,6 +33,12 @@ def keyword_stats(input_data, keyword_column):
mode='pipeline',
sources=[FilteredSource(ad_impressions_batch)],
entities=[auction],
timestamp_field="timestamp",
features=[
Attribute(name="keyword_list", dtype=Array(String)),
Attribute(name="num_keywords", dtype=Int32),
Attribute(name="keyword_contains_bitcoin", dtype=Bool),
],
ttl=timedelta(days=1),
batch_schedule=timedelta(days=1),
online=False,
Expand Down
9 changes: 7 additions & 2 deletions ads/features/batch_features/user_distinct_ad_count_7d.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from tecton import batch_feature_view, FilteredSource, materialization_context
from tecton import batch_feature_view, FilteredSource, materialization_context, Attribute
from tecton.types import String, Int64
from ads.entities import user
from ads.data_sources.ad_impressions import ad_impressions_batch
from datetime import datetime, timedelta
Expand All @@ -16,7 +17,11 @@
feature_start_time=datetime(2022, 5, 1),
tags={'release': 'production', 'usecase': 'ads'},
owner='demo-user@tecton.ai',
description='How many distinct advertisements a user has been shown in the last week'
description='How many distinct advertisements a user has been shown in the last week',
timestamp_field='timestamp',
features=[
Attribute(name='distinct_ad_count', dtype=Int64),
]
)
def user_distinct_ad_count_7d(ad_impressions, context=materialization_context()):
return f'''
Expand Down
17 changes: 7 additions & 10 deletions ads/features/feature_tables/ad_embeddings.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
from tecton.types import Field, String, Timestamp, Array, Float64
from tecton import Entity, FeatureTable, DeltaConfig
from ads.entities import ad
from tecton.types import Array, Float64
from tecton import FeatureTable, Attribute
from datetime import timedelta

schema = [
Field('ad_id', String),
Field('timestamp', Timestamp),
lilly-tecton marked this conversation as resolved.
Show resolved Hide resolved
Field('ad_embedding', Array(Float64))
]

from ads.entities import ad

ad_embeddings = FeatureTable(
name='ad_embeddings',
entities=[ad],
schema=schema,
features=[
Attribute(name='ad_embedding', dtype=Array(Float64)),
],
timestamp_field='timestamp',
online=True,
offline=True,
ttl=timedelta(days=10),
Expand Down
14 changes: 5 additions & 9 deletions ads/features/feature_tables/user_embeddings.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,16 @@
from tecton.types import Field, String, Timestamp, Array, Float64
from tecton import Entity, FeatureTable, DeltaConfig
from tecton import FeatureTable, Attribute
from ads.entities import user
from datetime import timedelta


schema = [
Field('user_id', String),
Field('timestamp', Timestamp),
Field('user_embedding', Array(Float64))
]


user_embeddings = FeatureTable(
name='user_embeddings',
entities=[user],
schema=schema,
features=[
Attribute(name='user_embedding', dtype=Array(Float64)),
],
timestamp_field="timestamp",
online=True,
offline=True,
ttl=timedelta(days=10),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
from tecton import on_demand_feature_view
from tecton.types import Field, Float64
from tecton import on_demand_feature_view, Attribute
from tecton.types import Float64
from ads.features.feature_tables.user_embeddings import user_embeddings
from ads.features.feature_tables.ad_embeddings import ad_embeddings

output_schema = [Field('cosine_similarity', Float64)]

@on_demand_feature_view(
sources=[ad_embeddings, user_embeddings],
mode='python',
schema=output_schema,
features=[
Attribute(name='cosine_similarity', dtype=Float64)
],
owner='demo-user@tecton.ai',
tags={'release': 'production'},
description="Computes the cosine similarity between a precomputed ad embedding and a precomputed user embedding."
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
from tecton import RequestSource, on_demand_feature_view
from tecton import RequestSource, on_demand_feature_view, Attribute
from tecton.types import Field, Array, Float64
from ads.features.feature_tables.user_embeddings import user_embeddings


request_schema = [Field('query_embedding', Array(Float64))]
request = RequestSource(schema=request_schema)

output_schema = [Field('cosine_similarity', Float64)]


@on_demand_feature_view(
sources=[request, user_embeddings],
mode='python',
schema=output_schema,
features=[Attribute(name='cosine_similarity', dtype=Float64)],
owner='demo-user@tecton.ai',
tags={'release': 'production'},
description="Computes the cosine similarity between a query embedding and a precomputed user embedding."
Expand Down
11 changes: 7 additions & 4 deletions ads/features/stream_features/content_keyword_click_counts.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from tecton import stream_feature_view, FilteredSource, Aggregation, DatabricksClusterConfig, StreamProcessingMode
from tecton import stream_feature_view, FilteredSource, DatabricksClusterConfig, StreamProcessingMode, Aggregate
from tecton.types import Field, Int64, Int32

from ads.entities import content_keyword
from ads.data_sources.ad_impressions import ad_impressions_stream
from datetime import datetime, timedelta
Expand All @@ -14,10 +16,11 @@
entities=[content_keyword],
mode='pyspark',
stream_processing_mode=StreamProcessingMode.CONTINUOUS, # enable low latency streaming
aggregations=[
Aggregation(column='clicked', function='count', time_window=timedelta(minutes=1)),
Aggregation(column='clicked', function='count', time_window=timedelta(minutes=5)),
features=[
Aggregate(input_column=Field('clicked', Int32), function='count', time_window=timedelta(minutes=1)),
Aggregate(input_column=Field('clicked', Int32), function='count', time_window=timedelta(minutes=5)),
],
timestamp_field='timestamp',
batch_compute=cluster_config,
stream_compute=cluster_config,
online=False,
Expand Down
10 changes: 8 additions & 2 deletions ads/features/stream_features/content_keyword_clicks_push.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from datetime import timedelta, datetime
from tecton import StreamFeatureView, FilteredSource
from tecton import StreamFeatureView, FilteredSource, BatchTriggerType, Attribute
from tecton.types import Int64
from ads.entities import content_keyword
from ads.data_sources.ad_impressions import keyword_click_source

Expand All @@ -19,5 +20,10 @@
ttl=timedelta(days=30),
tags={'release': 'production'},
owner='demo-user@tecton.ai',
description='The ad clicks for a content keyword'
description='The ad clicks for a content keyword',
batch_trigger=BatchTriggerType.MANUAL,
lilly-tecton marked this conversation as resolved.
Show resolved Hide resolved
timestamp_field='timestamp',
features=[
Attribute(name='clicked', dtype=Int64),
]
)
12 changes: 7 additions & 5 deletions ads/features/stream_features/user_ad_impression_counts.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from tecton import stream_feature_view, Aggregation, FilteredSource
from tecton import stream_feature_view, Aggregate, FilteredSource
from tecton.types import Int32, Field
from ads.entities import ad, user
from ads.data_sources.ad_impressions import ad_impressions_stream
from datetime import datetime, timedelta
Expand All @@ -9,11 +10,12 @@
entities=[user, ad],
mode='spark_sql',
aggregation_interval=timedelta(hours=1),
aggregations=[
Aggregation(column='impression', function='count', time_window=timedelta(hours=1)),
Aggregation(column='impression', function='count', time_window=timedelta(hours=24)),
Aggregation(column='impression', function='count', time_window=timedelta(hours=72)),
features=[
Aggregate(input_column=Field('impression', Int32), function='count', time_window=timedelta(hours=1)),
Aggregate(input_column=Field('impression', Int32), function='count', time_window=timedelta(hours=24)),
Aggregate(input_column=Field('impression', Int32), function='count', time_window=timedelta(hours=72)),
],
timestamp_field='timestamp',
online=False,
offline=False,
batch_schedule=timedelta(days=1),
Expand Down
15 changes: 9 additions & 6 deletions ads/features/stream_features/user_click_counts.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from tecton import stream_feature_view, FilteredSource, Aggregation
from tecton import stream_feature_view, FilteredSource, Aggregate
from tecton.types import Field, Bool, Int64

from ads.entities import user
from ads.data_sources.ad_impressions import ad_impressions_stream
from datetime import datetime, timedelta
Expand All @@ -9,18 +11,19 @@
entities=[user],
mode='pyspark',
aggregation_interval=timedelta(hours=1),
aggregations=[
Aggregation(column='clicked', function='count', time_window=timedelta(hours=1)),
Aggregation(column='clicked', function='count', time_window=timedelta(hours=24)),
Aggregation(column='clicked', function='count', time_window=timedelta(hours=72)),
features=[
Aggregate(input_column=Field('clicked', Int64), function='count', time_window=timedelta(hours=1)),
Aggregate(input_column=Field('clicked', Int64), function='count', time_window=timedelta(hours=24)),
Aggregate(input_column=Field('clicked', Int64), function='count', time_window=timedelta(hours=72)),
],
online=False,
offline=False,
batch_schedule=timedelta(days=1),
feature_start_time=datetime(2022, 5, 1),
tags={'release': 'production'},
owner='demo-user@tecton.ai',
description='The count of ad clicks for a user'
description='The count of ad clicks for a user',
timestamp_field='timestamp'
)
def user_click_counts(ad_impressions):
return ad_impressions.select(ad_impressions['user_uuid'].alias('user_id'), 'clicked', 'timestamp')
12 changes: 7 additions & 5 deletions ads/features/stream_features/user_click_counts_push.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from datetime import timedelta, datetime
from tecton import StreamFeatureView, Aggregation
from tecton import StreamFeatureView, Aggregation, Aggregate
from tecton.types import Bool, Field, Int64
from ads.entities import user
from ads.data_sources.ad_impressions import user_click_push_source

Expand All @@ -15,11 +16,12 @@
offline=True,
feature_start_time=datetime(2023, 1, 1),
alert_email="demo-user@tecton.ai",
aggregations=[
Aggregation(column='clicked', function='count', time_window=timedelta(hours=1)),
Aggregation(column='clicked', function='count', time_window=timedelta(hours=24)),
Aggregation(column='clicked', function='count', time_window=timedelta(hours=72)),
features=[
Aggregate(input_column=Field('clicked', Int64), function='count', time_window=timedelta(hours=1)),
Aggregate(input_column=Field('clicked', Int64), function='count', time_window=timedelta(hours=24)),
Aggregate(input_column=Field('clicked', Int64), function='count', time_window=timedelta(hours=72)),
],
timestamp_field='timestamp',
tags={'release': 'production'},
owner='demo-user@tecton.ai',
description='The count of ad clicks for a user'
Expand Down
13 changes: 8 additions & 5 deletions ads/features/stream_features/user_impression_counts.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from tecton import stream_feature_view, FilteredSource, Aggregation
from tecton import stream_feature_view, FilteredSource, Aggregation, Aggregate
from tecton.types import Int32, Field

from ads.entities import user
from ads.data_sources.ad_impressions import ad_impressions_stream
from datetime import datetime, timedelta
Expand All @@ -9,11 +11,12 @@
entities=[user],
mode='spark_sql',
aggregation_interval=timedelta(hours=1),
aggregations=[
Aggregation(column='impression', function='count', time_window=timedelta(hours=1)),
Aggregation(column='impression', function='count', time_window=timedelta(hours=24)),
Aggregation(column='impression', function='count', time_window=timedelta(hours=72)),
features=[
Aggregate(input_column=Field('impression', Int32), function='count', time_window=timedelta(hours=1)),
Aggregate(input_column=Field('impression', Int32), function='count', time_window=timedelta(hours=24)),
Aggregate(input_column=Field('impression', Int32), function='count', time_window=timedelta(hours=72)),
],
timestamp_field='timestamp',
online=False,
offline=False,
batch_schedule=timedelta(days=1),
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
tecton[pyspark]~=0.7.0
tecton[pyspark]~=0.10.0b30
Loading