Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update sample repo for 1.0 #73

Merged
merged 26 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions ads/data_sources/ad_impressions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from tecton import HiveConfig, KinesisConfig, StreamSource, BatchSource, DatetimePartitionColumn, PushSource
from tecton import HiveConfig, KinesisConfig, StreamSource, BatchSource, DatetimePartitionColumn, PushConfig
from datetime import timedelta
from tecton.types import Field, Int64, String, Timestamp

Expand Down Expand Up @@ -100,7 +100,7 @@ def ad_stream_translator(df):
Field(name='timestamp', dtype=Timestamp),
Field(name='clicked', dtype=Int64),
]
keyword_click_source = PushSource(
keyword_click_source = StreamSource(
name="keyword_click_source",
schema=input_schema,
batch_config=ad_impressions_hive_config,
Expand All @@ -109,19 +109,21 @@ def ad_stream_translator(df):
batch config for backfilling and offline training data generation.
""",
owner="demo-user@tecton.ai",
tags={'release': 'staging'}
tags={'release': 'staging'},
stream_config=PushConfig()
)

user_schema = [
Field(name='user_id', dtype=String),
Field(name='timestamp', dtype=Timestamp),
Field(name='clicked', dtype=Int64),
]
user_click_push_source = PushSource(
user_click_push_source = StreamSource(
name="user_event_source",
schema=user_schema,
description="A push source for synchronous, online ingestion of ad-click events with user info.",
owner="demo-user@tecton.ai",
tags={'release': 'staging'}
tags={'release': 'staging'},
stream_config=PushConfig()
)

12 changes: 6 additions & 6 deletions ads/entities.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,40 @@
from tecton import Entity

from tecton.types import Field, String, Int64

ad = Entity(
name='ad',
join_keys=['ad_id'],
join_keys=[Field('ad_id', Int64)],
description='An ad',
owner='demo-user@tecton.ai',
tags={'release': 'production'}
)

content = Entity(
name="content",
join_keys=["content_id"],
join_keys=[Field('content_id', String)],
description='Content ID',
owner='demo-user@tecton.ai',
tags={'release': 'production'}
)

auction = Entity(
name="auction",
join_keys=["auction_id"],
join_keys=[Field('auction_id', String)],
description='Auction ID',
owner='demo-user@tecton.ai',
)

user = Entity(
name='ads_user',
join_keys=['user_id'],
join_keys=[Field('user_id', String)],
description='A user of the platform',
owner='demo-user@tecton.ai',
tags={'release': 'production'}
)

content_keyword = Entity(
name='ContentKeyword',
join_keys=['content_keyword'],
join_keys=[Field('content_keyword', String)],
description='The keyword describing the content this ad is being placed alongside.',
owner='demo-user@tecton.ai',
tags={'release': 'production'}
Expand Down
11 changes: 9 additions & 2 deletions ads/features/batch_features/ad_auction_keywords.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from tecton import transformation, FilteredSource, batch_feature_view, const
from tecton import transformation, batch_feature_view, const, Attribute
from tecton.types import String, Array, Int32, Bool
from ads.entities import auction
from ads.data_sources.ad_impressions import ad_impressions_batch
from datetime import datetime, timedelta
Expand Down Expand Up @@ -30,8 +31,14 @@ def keyword_stats(input_data, keyword_column):
# array of words, then create metrics based on that array.
@batch_feature_view(
mode='pipeline',
sources=[FilteredSource(ad_impressions_batch)],
sources=[ad_impressions_batch],
entities=[auction],
timestamp_field="timestamp",
features=[
Attribute(name="keyword_list", dtype=Array(String)),
Attribute(name="num_keywords", dtype=Int32),
Attribute(name="keyword_contains_bitcoin", dtype=Bool),
],
ttl=timedelta(days=1),
batch_schedule=timedelta(days=1),
online=False,
Expand Down
12 changes: 9 additions & 3 deletions ads/features/batch_features/user_distinct_ad_count_7d.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from tecton import batch_feature_view, FilteredSource, materialization_context
from tecton import batch_feature_view, materialization_context, Attribute
from tecton import TectonTimeConstant
from tecton.types import Int64
from ads.entities import user
from ads.data_sources.ad_impressions import ad_impressions_batch
from datetime import datetime, timedelta


@batch_feature_view(
sources=[FilteredSource(ad_impressions_batch, start_time_offset=timedelta(days=-6))],
sources=[ad_impressions_batch.select_range(start_time=TectonTimeConstant.MATERIALIZATION_START_TIME - timedelta(days=6), end_time=TectonTimeConstant.MATERIALIZATION_END_TIME)],
entities=[user],
mode='spark_sql',
ttl=timedelta(days=1),
Expand All @@ -16,7 +18,11 @@
feature_start_time=datetime(2022, 5, 1),
tags={'release': 'production', 'usecase': 'ads'},
owner='demo-user@tecton.ai',
description='How many distinct advertisements a user has been shown in the last week'
description='How many distinct advertisements a user has been shown in the last week',
timestamp_field='timestamp',
features=[
Attribute(name='distinct_ad_count', dtype=Int64),
]
)
def user_distinct_ad_count_7d(ad_impressions, context=materialization_context()):
return f'''
Expand Down
7 changes: 5 additions & 2 deletions ads/features/feature_tables/ad_embeddings.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from tecton.types import Field, Timestamp, Array, Float64, Int64
from tecton import FeatureTable
from tecton import FeatureTable, Attribute
from datetime import timedelta

from ads.entities import ad
Expand All @@ -14,7 +14,10 @@
ad_embeddings = FeatureTable(
name='ad_embeddings',
entities=[ad],
schema=schema,
features=[
Attribute(name='ad_embedding', dtype=Array(Float64)),
],
timestamp_field='timestamp',
online=True,
offline=True,
ttl=timedelta(days=10),
Expand Down
14 changes: 5 additions & 9 deletions ads/features/feature_tables/user_embeddings.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,16 @@
from tecton.types import Field, String, Timestamp, Array, Float64
from tecton import Entity, FeatureTable, DeltaConfig
from tecton import FeatureTable, Attribute
from ads.entities import user
from datetime import timedelta


schema = [
Field('user_id', String),
Field('timestamp', Timestamp),
Field('user_embedding', Array(Float64))
]


user_embeddings = FeatureTable(
name='user_embeddings',
entities=[user],
schema=schema,
features=[
Attribute(name='user_embedding', dtype=Array(Float64)),
],
timestamp_field="timestamp",
online=True,
offline=True,
ttl=timedelta(days=10),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
from tecton import on_demand_feature_view
from tecton.types import Field, Float64
from tecton import realtime_feature_view, Attribute
from tecton.types import Float64
from ads.features.feature_tables.user_embeddings import user_embeddings
from ads.features.feature_tables.ad_embeddings import ad_embeddings

output_schema = [Field('cosine_similarity', Float64)]

@on_demand_feature_view(
@realtime_feature_view(
sources=[ad_embeddings, user_embeddings],
mode='python',
schema=output_schema,
features=[
Attribute(name='cosine_similarity', dtype=Float64)
],
owner='demo-user@tecton.ai',
tags={'release': 'production'},
description="Computes the cosine similarity between a precomputed ad embedding and a precomputed user embedding."
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
from tecton import RequestSource, on_demand_feature_view
from tecton import RequestSource, realtime_feature_view, Attribute
from tecton.types import Field, Array, Float64
from ads.features.feature_tables.user_embeddings import user_embeddings


request_schema = [Field('query_embedding', Array(Float64))]
request = RequestSource(schema=request_schema)

output_schema = [Field('cosine_similarity', Float64)]


@on_demand_feature_view(
@realtime_feature_view(
sources=[request, user_embeddings],
mode='python',
schema=output_schema,
features=[Attribute(name='cosine_similarity', dtype=Float64)],
owner='demo-user@tecton.ai',
tags={'release': 'production'},
description="Computes the cosine similarity between a query embedding and a precomputed user embedding."
Expand Down
17 changes: 11 additions & 6 deletions ads/features/stream_features/content_keyword_click_counts.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from tecton import stream_feature_view, FilteredSource, Aggregation, DatabricksClusterConfig, StreamProcessingMode
from tecton import stream_feature_view, DatabricksClusterConfig, StreamProcessingMode, Aggregate, \
AggregationLeadingEdge
from tecton.types import Field, Int32

from ads.entities import content_keyword
from ads.data_sources.ad_impressions import ad_impressions_stream
from datetime import datetime, timedelta
Expand All @@ -10,22 +13,24 @@
)

@stream_feature_view(
source=FilteredSource(ad_impressions_stream),
source=ad_impressions_stream,
entities=[content_keyword],
mode='pyspark',
stream_processing_mode=StreamProcessingMode.CONTINUOUS, # enable low latency streaming
aggregations=[
Aggregation(column='clicked', function='count', time_window=timedelta(minutes=1)),
Aggregation(column='clicked', function='count', time_window=timedelta(minutes=5)),
features=[
Aggregate(input_column=Field('clicked', Int32), function='count', time_window=timedelta(minutes=1)),
Aggregate(input_column=Field('clicked', Int32), function='count', time_window=timedelta(minutes=5)),
],
timestamp_field='timestamp',
batch_compute=cluster_config,
stream_compute=cluster_config,
online=False,
offline=False,
feature_start_time=datetime(2022, 5, 1),
tags={'release': 'production'},
owner='demo-user@tecton.ai',
description='The count of ad impressions for a content_keyword'
description='The count of ad impressions for a content_keyword',
aggregation_leading_edge=AggregationLeadingEdge.LATEST_EVENT_TIME
)
njoung marked this conversation as resolved.
Show resolved Hide resolved
def content_keyword_click_counts(ad_impressions):
from pyspark.sql import functions as F
Expand Down
11 changes: 8 additions & 3 deletions ads/features/stream_features/content_keyword_clicks_push.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from datetime import timedelta, datetime
from tecton import StreamFeatureView, FilteredSource, BatchTriggerType
from tecton import StreamFeatureView, BatchTriggerType, Attribute
from tecton.types import Int64
from ads.entities import content_keyword
from ads.data_sources.ad_impressions import keyword_click_source

Expand All @@ -9,7 +10,7 @@
# https://docs.tecton.ai/using-the-ingestion-api/#creating-a-stream-feature-view-with-a-push-source
content_keyword_click_counts_push = StreamFeatureView(
name="keyword_clicks_fv",
source=FilteredSource(keyword_click_source),
source=keyword_click_source,
entities=[content_keyword],
online=True,
offline=True,
Expand All @@ -20,5 +21,9 @@
tags={'release': 'production'},
owner='demo-user@tecton.ai',
description='The ad clicks for a content keyword',
batch_trigger=BatchTriggerType.MANUAL
batch_trigger=BatchTriggerType.MANUAL,
lilly-tecton marked this conversation as resolved.
Show resolved Hide resolved
timestamp_field='timestamp',
features=[
Attribute(name='clicked', dtype=Int64),
]
)
17 changes: 10 additions & 7 deletions ads/features/stream_features/user_ad_impression_counts.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,29 @@
from tecton import stream_feature_view, Aggregation, FilteredSource
from tecton import stream_feature_view, Aggregate, AggregationLeadingEdge
from tecton.types import Int32, Field
from ads.entities import ad, user
from ads.data_sources.ad_impressions import ad_impressions_stream
from datetime import datetime, timedelta


@stream_feature_view(
source=FilteredSource(ad_impressions_stream),
source=ad_impressions_stream,
entities=[user, ad],
mode='spark_sql',
aggregation_interval=timedelta(hours=1),
aggregations=[
Aggregation(column='impression', function='count', time_window=timedelta(hours=1)),
Aggregation(column='impression', function='count', time_window=timedelta(hours=24)),
Aggregation(column='impression', function='count', time_window=timedelta(hours=72)),
features=[
Aggregate(input_column=Field('impression', Int32), function='count', time_window=timedelta(hours=1)),
Aggregate(input_column=Field('impression', Int32), function='count', time_window=timedelta(hours=24)),
Aggregate(input_column=Field('impression', Int32), function='count', time_window=timedelta(hours=72)),
],
timestamp_field='timestamp',
online=False,
offline=False,
batch_schedule=timedelta(days=1),
feature_start_time=datetime(2022, 5, 1),
tags={'release': 'production'},
owner='demo-user@tecton.ai',
description='The count of impressions between a given user and a given ad'
description='The count of impressions between a given user and a given ad',
aggregation_leading_edge=AggregationLeadingEdge.LATEST_EVENT_TIME
)
def user_ad_impression_counts(ad_impressions):
return f"""
Expand Down
18 changes: 11 additions & 7 deletions ads/features/stream_features/user_click_counts.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,30 @@
from tecton import stream_feature_view, FilteredSource, Aggregation
from tecton import stream_feature_view, Aggregate, AggregationLeadingEdge
from tecton.types import Field, Int64

from ads.entities import user
from ads.data_sources.ad_impressions import ad_impressions_stream
from datetime import datetime, timedelta


@stream_feature_view(
source=FilteredSource(ad_impressions_stream),
source=ad_impressions_stream,
entities=[user],
mode='pyspark',
aggregation_interval=timedelta(hours=1),
aggregations=[
Aggregation(column='clicked', function='count', time_window=timedelta(hours=1)),
Aggregation(column='clicked', function='count', time_window=timedelta(hours=24)),
Aggregation(column='clicked', function='count', time_window=timedelta(hours=72)),
features=[
Aggregate(input_column=Field('clicked', Int64), function='count', time_window=timedelta(hours=1)),
Aggregate(input_column=Field('clicked', Int64), function='count', time_window=timedelta(hours=24)),
Aggregate(input_column=Field('clicked', Int64), function='count', time_window=timedelta(hours=72)),
],
online=False,
offline=False,
batch_schedule=timedelta(days=1),
feature_start_time=datetime(2022, 5, 1),
tags={'release': 'production'},
owner='demo-user@tecton.ai',
description='The count of ad clicks for a user'
description='The count of ad clicks for a user',
timestamp_field='timestamp',
aggregation_leading_edge=AggregationLeadingEdge.LATEST_EVENT_TIME
)
def user_click_counts(ad_impressions):
return ad_impressions.select(ad_impressions['user_uuid'].alias('user_id'), 'clicked', 'timestamp')
Loading
Loading