Skip to content

Commit

Permalink
Add support for now in expressions.
Browse files Browse the repository at this point in the history
  • Loading branch information
nonibansal committed Sep 30, 2024
1 parent 15ec345 commit e652502
Show file tree
Hide file tree
Showing 12 changed files with 443 additions and 173 deletions.
3 changes: 3 additions & 0 deletions fennel/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## [1.5.32] - 2024-09-27
- Add support for now in expressions.

## [1.5.31] - 2024-09-26
- Fix bug in signature of assign operator.

Expand Down
149 changes: 149 additions & 0 deletions fennel/client_tests/test_expr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
from datetime import datetime, timezone, timedelta
from typing import Optional

import pandas as pd
import pytest

from fennel._vendor import requests
from fennel.connectors import source, Webhook
from fennel.datasets import dataset, field
from fennel.expr import col
from fennel.featuresets import featureset, feature as F
from fennel.testing import mock

webhook = Webhook(name="fennel_webhook")
__owner__ = "eng@fennel.ai"


@source(webhook.endpoint("UserInfoDataset"), disorder="14d", cdc="upsert")
@dataset(index=True)
class UserInfoDataset:
user_id: int = field(key=True)
name: str
birthdate: datetime
country: str
ts: datetime = field(timestamp=True)


@pytest.mark.integration
@mock
def test_now(client):
from fennel.expr import now

@featureset
class UserInfoFeatures:
user_id: int
name: Optional[str] = F(UserInfoDataset.name)
birthdate: Optional[datetime] = F(UserInfoDataset.birthdate)
age: Optional[int] = F(now().dt.since(col("birthdate"), unit="year"))
country: Optional[str] = F(UserInfoDataset.country)

# Sync the dataset
response = client.commit(
message="msg",
datasets=[UserInfoDataset],
featuresets=[UserInfoFeatures],
)
assert response.status_code == requests.codes.OK, response.json()

now = datetime.now(timezone.utc)
now_1y = now - timedelta(days=365)
df = pd.DataFrame(
{
"user_id": [1, 2, 3, 4, 5],
"name": ["Ross", "Monica", "Chandler", "Joey", "Rachel"],
"birthdate": [
datetime(1970, 1, 1, tzinfo=timezone.utc),
datetime(1980, 3, 12, tzinfo=timezone.utc),
datetime(1990, 5, 15, tzinfo=timezone.utc),
datetime(1997, 12, 24, tzinfo=timezone.utc),
datetime(2001, 1, 21, tzinfo=timezone.utc),
],
"country": ["India", "USA", "Africa", "UK", "Chile"],
"ts": [now_1y, now_1y, now_1y, now_1y, now_1y],
}
)
response = client.log("fennel_webhook", "UserInfoDataset", df)
assert response.status_code == requests.codes.OK, response.json()

client.sleep()

# Querying UserInfoFeatures
df = client.query(
inputs=[UserInfoFeatures.user_id],
outputs=[
UserInfoFeatures.name,
UserInfoFeatures.age,
UserInfoFeatures.country,
],
input_dataframe=pd.DataFrame(
{"UserInfoFeatures.user_id": [1, 2, 3, 4, 5, 6]}
),
)
assert df.shape == (6, 3)
assert df["UserInfoFeatures.name"].tolist() == [
"Ross",
"Monica",
"Chandler",
"Joey",
"Rachel",
pd.NA,
]
assert df["UserInfoFeatures.age"].tolist() == [54, 44, 34, 26, 23, pd.NA]
assert df["UserInfoFeatures.country"].tolist() == [
"India",
"USA",
"Africa",
"UK",
"Chile",
pd.NA,
]

if not client.is_integration_client():
df = client.query_offline(
inputs=[UserInfoFeatures.user_id],
outputs=[
UserInfoFeatures.name,
UserInfoFeatures.age,
UserInfoFeatures.country,
],
input_dataframe=pd.DataFrame(
{
"UserInfoFeatures.user_id": [1, 2, 3, 4, 5, 6],
"timestamp": [
now_1y,
now_1y,
now_1y,
now_1y,
now_1y,
now_1y,
],
}
),
timestamp_column="timestamp",
)
assert df.shape == (6, 4)
assert df["UserInfoFeatures.name"].tolist() == [
"Ross",
"Monica",
"Chandler",
"Joey",
"Rachel",
pd.NA,
]
assert df["UserInfoFeatures.age"].tolist() == [
53,
43,
33,
25,
22,
pd.NA,
]
assert df["UserInfoFeatures.country"].tolist() == [
"India",
"USA",
"Africa",
"UK",
"Chile",
pd.NA,
]
1 change: 1 addition & 0 deletions fennel/expr/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
var,
datetime,
from_epoch,
now,
Expr,
InvalidExprException,
)
25 changes: 24 additions & 1 deletion fennel/expr/expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import Any, Callable, Dict, Type, Optional
import pandas as pd

from fennel._vendor.pydantic import BaseModel # type: ignore
from fennel.dtypes.dtypes import FENNEL_STRUCT, FENNEL_STRUCT_SRC_CODE
from fennel.internal_lib.schema.schema import (
convert_dtype_to_arrow_type_with_nullable,
Expand All @@ -17,6 +18,7 @@
from fennel_data_lib import assign, type_of, matches

import fennel.gen.schema_pb2 as schema_proto
import fennel.gen.expr_pb2 as expr_proto
from fennel.internal_lib.schema import (
get_datatype,
cast_col_to_arrow_dtype,
Expand All @@ -27,6 +29,10 @@
)


class EvalContext(BaseModel):
now_col_name: Optional[str] = None


class InvalidExprException(Exception):
pass

Expand Down Expand Up @@ -360,6 +366,7 @@ def eval(
schema: Dict,
output_dtype: Optional[Type] = None,
parse=True,
context: Optional[EvalContext] = None
) -> pd.Series:
from fennel.expr.serializer import ExprSerializer

Expand Down Expand Up @@ -410,8 +417,12 @@ def pa_to_pd(pa_data, ret_type, parse=True):
ret_type = output_dtype

serialized_ret_type = get_datatype(ret_type).SerializeToString()
if context is None:
serialized_context = expr_proto.EvalContext().SerializeToString()
else:
serialized_context = expr_proto.EvalContext(**context.dict()).SerializeToString()
arrow_col = assign(
proto_bytes, df_pa, proto_schema, serialized_ret_type
proto_bytes, df_pa, proto_schema, serialized_ret_type, serialized_context
)
return pa_to_pd(arrow_col, ret_type, parse)

Expand Down Expand Up @@ -1089,6 +1100,14 @@ def __str__(self) -> str:
return f"fillnull({self.expr}, {self.fill})"


class Now(Expr):
def __init__(self):
super(Now, self).__init__()

def __str__(self) -> str:
return "now()"


class MakeStruct(Expr):
def __init__(self, fields: Dict[str, Expr], type: Type):
self.fields = fields
Expand Down Expand Up @@ -1183,3 +1202,7 @@ def datetime(
),
DateTimeNoop(),
)


def now() -> Now:
return Now()
5 changes: 5 additions & 0 deletions fennel/expr/serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,11 @@ def visitDateTimeLiteral(self, obj):
expr.datetime_literal.CopyFrom(datetime_literal)
return expr

def visitNow(self, obj):
expr = proto.Expr()
expr.now.CopyFrom(proto.Now())
return expr


def val_as_json(val: Any) -> str:
if isinstance(val, str):
Expand Down
37 changes: 34 additions & 3 deletions fennel/expr/test_expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@
import random
import pandas as pd
from dataclasses import dataclass, fields
from datetime import datetime
from datetime import datetime, timezone
from typing import Any, Dict, Optional, List
from fennel.datasets import dataset

from fennel.dtypes.dtypes import struct
from fennel.expr import col, when, lit
from fennel.expr.expr import TimeUnit, from_epoch, make_struct
from fennel.expr import col, when, lit, now
from fennel.expr.expr import (
TimeUnit,
from_epoch,
make_struct,
)
from fennel.expr.visitor import ExprPrinter, FetchReferences
from fennel.expr.serializer import ExprSerializer
from google.protobuf.json_format import ParseDict # type: ignore
Expand Down Expand Up @@ -1345,3 +1349,30 @@ def test_isnull():

for case in cases:
check_test_case(case)


def test_now():
cases = [
ExprTestCase(
expr=now().dt.since(col("birthdate"), unit="day"),
df=pd.DataFrame(
{
"birthdate": [
datetime.now(timezone.utc),
datetime.now(timezone.utc),
None,
datetime.now(timezone.utc),
],
}
),
schema={"birthdate": Optional[datetime]},
display='SINCE(NOW(), col("birthdate"), unit=TimeUnit.DAY)',
refs={"birthdate"},
eval_result=[0, 0, pd.NA, 0],
expected_dtype=Optional[int],
proto_json=None,
),
]

for case in cases:
check_test_case(case)
14 changes: 14 additions & 0 deletions fennel/expr/visitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
DictNoop,
_DateTime,
DateTimeNoop,
Now,
)


Expand Down Expand Up @@ -115,6 +116,10 @@ def visit(self, obj):

elif isinstance(obj, DateTimeFromEpoch):
ret = self.visitDateTimeFromEpoch(obj)

elif isinstance(obj, Now):
ret = self.visitNow(obj)

else:
raise InvalidExprException("invalid expression type: %s" % obj)

Expand Down Expand Up @@ -180,6 +185,9 @@ def visitDateTimeFromEpoch(self, obj):
def visitDateTimeLiteral(self, obj):
raise NotImplementedError

def visitNow(self, obj):
raise NotImplementedError


class ExprPrinter(Visitor):

Expand Down Expand Up @@ -344,6 +352,9 @@ def visitDateTimeFromEpoch(self, obj):
def visitDateTimeLiteral(self, obj):
return f"DATETIME({obj.year}, {obj.month}, {obj.day}, {obj.hour}, {obj.minute}, {obj.second}, {obj.microsecond}, timezone={obj.timezone})"

def visitNow(self, obj):
return "NOW()"


class FetchReferences(Visitor):

Expand Down Expand Up @@ -444,3 +455,6 @@ def visitDateTimeFromEpoch(self, obj):

def visitDateTimeLiteral(self, obj):
pass

def visitNow(self, obj):
pass
234 changes: 119 additions & 115 deletions fennel/gen/expr_pb2.py

Large diffs are not rendered by default.

Loading

0 comments on commit e652502

Please sign in to comment.