Skip to content

Commit

Permalink
Docs and mock client changes
Browse files Browse the repository at this point in the history
  • Loading branch information
aditya-nambiar committed Nov 9, 2023
1 parent 0cfc506 commit 482aff1
Show file tree
Hide file tree
Showing 15 changed files with 294 additions and 126 deletions.
15 changes: 10 additions & 5 deletions docs/examples/examples/ecommerce.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,13 @@


# docsnip datasets
@source(postgres.table("orders", cursor="timestamp"), every="1m", lateness="1d")
@source(
postgres.table("orders", cursor="timestamp"),
every="1m",
lateness="1d",
tier="prod",
)
@source(Webhook(name="fennel_webhook").endpoint("Order"), tier="dev")
@meta(owner="data-eng-oncall@fennel.ai")
@dataset
class Order:
Expand Down Expand Up @@ -89,15 +95,14 @@ def myextractor(cls, ts: pd.Series, uids: pd.Series, sellers: pd.Series):
# We can write a unit test to verify that the feature is working as expected
# docsnip test

fake_webhook = Webhook(name="fennel_webhook")


class TestUserLivestreamFeatures(unittest.TestCase):
@mock
def test_feature(self, client):
fake_Order = Order.with_source(fake_webhook.endpoint("Order"))
client.sync(
datasets=[fake_Order, UserSellerOrders], featuresets=[UserSeller]
datasets=[Order, UserSellerOrders],
featuresets=[UserSeller],
tier="dev",
)
columns = ["uid", "product_id", "seller_id", "timestamp"]
now = datetime.utcnow()
Expand Down
4 changes: 2 additions & 2 deletions docs/examples/featuresets/overview.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,15 @@ class Movies:
# invalid: both e1 & e2 output `over_3hrs`
over_3hrs: bool = feature(id=3)

@extractor(tiers=["default"])
@extractor(tier=["default"])
@inputs(duration)
@outputs(over_2hrs, over_3hrs)
def e1(cls, ts: pd.Series, durations: pd.Series) -> pd.DataFrame:
two_hrs = durations > 2 * 3600
three_hrs = durations > 3 * 3600
return pd.DataFrame({"over_2hrs": two_hrs, "over_3hrs": three_hrs})

@extractor(tiers=["non-default"])
@extractor(tier=["non-default"])
@inputs(duration)
@outputs(over_3hrs)
def e2(cls, ts: pd.Series, durations: pd.Series) -> pd.Series:
Expand Down
18 changes: 11 additions & 7 deletions docs/examples/getting-started/quickstart.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,20 @@
postgres = Postgres.get(name="my_rdbms")
warehouse = Snowflake.get(name="my_warehouse")
kafka = Kafka.get(name="my_kafka")
webhook = Webhook(name="fennel_webhook")


# /docsnip


# docsnip datasets
@dataset
@source(postgres.table("product_info", cursor="last_modified"), every="1m")
@source(
postgres.table("product_info", cursor="last_modified"),
every="1m",
tier="prod",
)
@source(webhook.endpoint("Product"), tier="dev")
@meta(owner="chris@fennel.ai", tags=["PII"])
class Product:
product_id: int = field(key=True)
Expand All @@ -51,7 +57,8 @@ def get_expectations(cls):

# ingesting realtime data from Kafka works exactly the same way
@meta(owner="eva@fennel.ai")
@source(kafka.topic("orders"), lateness="1h")
@source(kafka.topic("orders"), lateness="1h", tier="prod")
@source(webhook.endpoint("Order"), tier="dev")
@dataset
class Order:
uid: int
Expand Down Expand Up @@ -122,15 +129,13 @@ def myextractor(cls, ts: pd.Series, uids: pd.Series, sellers: pd.Series):
# docsnip sync
from fennel.test_lib import MockClient

webhook = Webhook(name="fennel_webhook")

# client = Client('<FENNEL SERVER URL>') # uncomment this line to use a real Fennel server
client = MockClient() # comment this line to use a real Fennel server
fake_Product = Product.with_source(webhook.endpoint("Product"))
fake_Order = Order.with_source(webhook.endpoint("Order"))
client.sync(
datasets=[fake_Order, fake_Product, UserSellerOrders],
datasets=[Order, Product, UserSellerOrders],
featuresets=[UserSellerFeatures],
tier="dev",
)

now = datetime.utcnow()
Expand All @@ -149,7 +154,6 @@ def myextractor(cls, ts: pd.Series, uids: pd.Series, sellers: pd.Series):
data = [[1, 1, now], [1, 2, now], [1, 3, now]]
df = pd.DataFrame(data, columns=columns)
response = client.log("fennel_webhook", "Order", df)
print(response.json())
assert response.status_code == requests.codes.OK, response.json()
# /docsnip

Expand Down
17 changes: 10 additions & 7 deletions docs/examples/overview/concepts.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,15 @@ class UserDataset:

postgres = Postgres.get(name="postgres")
kafka = Kafka.get(name="kafka")
webhook = Webhook(name="fennel_webhook")


# docsnip external_data_sources
@meta(owner="data-eng-oncall@fennel.ai")
@source(postgres.table("user", cursor="update_timestamp"), every="1m")
@source(
postgres.table("user", cursor="update_timestamp"), every="1m", tier="prod"
)
@source(webhook.endpoint("User"), tier="dev")
@dataset
class User:
uid: int = field(key=True)
Expand All @@ -40,7 +44,8 @@ class User:


@meta(owner="data-eng-oncall@fennel.ai")
@source(kafka.topic("transactions"))
@source(kafka.topic("transactions"), tier="prod")
@source(webhook.endpoint("Transaction"), tier="dev")
@dataset
class Transaction:
uid: int
Expand Down Expand Up @@ -118,15 +123,13 @@ def get_country(cls, ts: pd.Series, uids: pd.Series):

# /docsnip

webhook = Webhook(name="fennel_webhook")


# Tests to ensure that there are no run time errors in the snippets
@mock
def test_overview(client):
fake_User = User.with_source(webhook.endpoint("User"))
fake_Transaction = Transaction.with_source(webhook.endpoint("Transaction"))
client.sync(datasets=[fake_User, fake_Transaction, UserTransactionsAbroad])
client.sync(
datasets=[User, Transaction, UserTransactionsAbroad], tier="dev"
)
now = datetime.now()
dob = now - timedelta(days=365 * 30)
data = [
Expand Down
20 changes: 12 additions & 8 deletions fennel/client_tests/test_movie_tickets.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class ActorStats:
revenue: int
at: datetime

@pipeline(version=1)
@pipeline(version=1, tier="prod")
@inputs(MovieInfo, TicketSale)
def pipeline_join(cls, info: Dataset, sale: Dataset):
uniq = sale.groupby("ticket_id").first()
Expand All @@ -73,7 +73,7 @@ def pipeline_join(cls, info: Dataset, sale: Dataset):
]
)

@pipeline(version=2, active=True)
@pipeline(version=2, active=True, tier="prod")
@inputs(MovieInfo, TicketSale)
def pipeline_join_v2(cls, info: Dataset, sale: Dataset):
def foo(df):
Expand Down Expand Up @@ -112,25 +112,29 @@ class RequestFeatures:
class ActorFeatures:
revenue: int = feature(id=1)

@extractor(depends_on=[ActorStats])
@extractor(depends_on=[ActorStats], tier="prod")
@inputs(RequestFeatures.name)
@outputs(revenue)
def extract_revenue(cls, ts: pd.Series, name: pd.Series):
import sys

print(name, file=sys.stderr)
print("##", name.name, file=sys.stderr)
df, _ = ActorStats.lookup(ts, name=name) # type: ignore
df = df.fillna(0)
return df["revenue"]

@extractor(depends_on=[ActorStats], tier="staging")
@inputs(RequestFeatures.name)
@outputs(revenue)
def extract_revenue2(cls, ts: pd.Series, name: pd.Series):
df, _ = ActorStats.lookup(ts, name=name) # type: ignore
df = df.fillna(0)
return df["revenue"] * 2


class TestMovieTicketSale(unittest.TestCase):
@mock
def test_movie_ticket_sale(self, client):
datasets = [MovieInfo, TicketSale, ActorStats] # type: ignore
featuresets = [ActorFeatures, RequestFeatures]
client.sync(datasets=datasets, featuresets=featuresets) # type: ignore
client.sync(datasets=datasets, featuresets=featuresets, tier="prod") # type: ignore
client.sleep()
data = [
[
Expand Down
141 changes: 141 additions & 0 deletions fennel/client_tests/test_tier_selector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
from fennel import Sum
from fennel.featuresets import featureset, extractor, feature
from fennel.lib.schema import outputs
from datetime import datetime

import pandas as pd
from google.protobuf.json_format import ParseDict # type: ignore
from typing import List

from fennel.datasets import dataset, pipeline, field, Dataset
from fennel.lib.metadata import meta
from fennel.lib.schema import inputs
from fennel.lib.window import Window
from fennel.sources import source, Webhook
from fennel.test_lib import *

webhook = Webhook(name="fennel_webhook")


@meta(owner="abhay@fennel.ai")
@source(webhook.endpoint("MovieInfo"), tier="prod")
@source(webhook.endpoint("MovieInfo2"), tier="staging")
@dataset
class MovieInfo:
title: str = field(key=True)
actors: List[str] # can be an empty list
release: datetime


@meta(owner="abhay@fennel.ai")
@source(webhook.endpoint("TicketSale"), tier="prod")
@source(webhook.endpoint("TicketSale2"), tier="staging")
@dataset
class TicketSale:
ticket_id: str
title: str
price: int
at: datetime


@meta(owner="abhay@fennel.ai")
@dataset
class ActorStats:
name: str = field(key=True)
revenue: int
at: datetime

@pipeline(version=1, tier="prod")
@inputs(MovieInfo, TicketSale)
def pipeline_join(cls, info: Dataset, sale: Dataset):
uniq = sale.groupby("ticket_id").first()
c = (
uniq.join(info, how="inner", on=["title"])
.explode(columns=["actors"])
.rename(columns={"actors": "name"})
)
# name -> Option[str]
schema = c.schema()
schema["name"] = str
c = c.transform(lambda x: x, schema)
return c.groupby("name").aggregate(
[
Sum(
window=Window("forever"),
of="price",
into_field="revenue",
),
]
)

@pipeline(version=2, active=True, tier="staging")
@inputs(MovieInfo, TicketSale)
def pipeline_join_v2(cls, info: Dataset, sale: Dataset):
def foo(df):
df["price"] = df["price"] * 2
return df

uniq = sale.groupby("ticket_id").first()
c = (
uniq.join(info, how="inner", on=["title"])
.explode(columns=["actors"])
.rename(columns={"actors": "name"})
)
# name -> Option[str]
schema = c.schema()
schema["name"] = str
c = c.transform(foo, schema)
return c.groupby("name").aggregate(
[
Sum(
window=Window("forever"),
of="price",
into_field="revenue",
),
]
)


@meta(owner="zaki@fennel.ai")
@featureset
class RequestFeatures:
name: str = feature(id=1)


@meta(owner="abhay@fennel.ai")
@featureset
class ActorFeatures:
revenue: int = feature(id=1)

@extractor(depends_on=[ActorStats], tier="prod")
@inputs(RequestFeatures.name)
@outputs(revenue)
def extract_revenue(cls, ts: pd.Series, name: pd.Series):
df, _ = ActorStats.lookup(ts, name=name) # type: ignore
df = df.fillna(0)
return df["revenue"]

@extractor(depends_on=[ActorStats], tier="staging")
@inputs(RequestFeatures.name)
@outputs(revenue)
def extract_revenue2(cls, ts: pd.Series, name: pd.Series):
df, _ = ActorStats.lookup(ts, name=name) # type: ignore
df = df.fillna(0)
return df["revenue"] * 2


def test_tier_selector():
view = InternalTestClient()
view.add(MovieInfo)
view.add(TicketSale)
view.add(ActorStats)
view.add(RequestFeatures)
view.add(ActorFeatures)

sync_request = view._get_sync_request_proto("dev")
assert len(sync_request.feature_sets) == 2
assert len(sync_request.features) == 2
assert len(sync_request.datasets) == 3
assert len(sync_request.sources) == 0
assert len(sync_request.pipelines) == 0
assert len(sync_request.extractors) == 0
Loading

0 comments on commit 482aff1

Please sign in to comment.