Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DEV 4622] Add support for now in expressions. #572

Merged
merged 1 commit into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/api.yml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ sidebar:
- "api-reference/expressions/fillnull"
- "api-reference/expressions/lit"
- "api-reference/expressions/not"
- "api-reference/expressions/now"
- "api-reference/expressions/typeof"
- "api-reference/expressions/when"

Expand Down
25 changes: 25 additions & 0 deletions docs/examples/api-reference/expressions/basic.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from datetime import datetime

import pytest
from typing import Optional
import pandas as pd
Expand Down Expand Up @@ -156,3 +158,26 @@ def test_lit():
df = pd.DataFrame({"x": pd.Series([1, 2, None], dtype=pd.Int64Dtype())})
assert expr.eval(df, schema={"x": Optional[int]}).tolist() == [2, 3, pd.NA]
# /docsnip


def test_now():
# docsnip expr_now
from fennel.expr import now, col

# docsnip-highlight next-line
expr = now().dt.since(col("birthdate"), "year")

assert (
expr.typeof(schema={"birthdate": Optional[datetime]}) == Optional[int]
)

# can be evaluated with a dataframe
df = pd.DataFrame(
{"birthdate": [datetime(1997, 12, 24), datetime(2001, 1, 21), None]}
)
assert expr.eval(df, schema={"birthdate": Optional[datetime]}).tolist() == [
26,
23,
pd.NA,
]
# /docsnip
18 changes: 18 additions & 0 deletions docs/pages/api-reference/expressions/now.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
---
title: Now
order: 0
status: published
---
### Now

Function to get current timestamp, similar to what `datetime.now` does in Python.

<pre snippet="api-reference/expressions/basic#expr_now"
message="Using now to get age of a person" status="success">
</pre>

#### Returns
<Expandable type="Any">
Returns an expression object denoting a reference to the column. The type of
the resulting expression is datetime.
</Expandable>
3 changes: 3 additions & 0 deletions fennel/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## [1.5.33] - 2024-09-30
- Add support for now in expressions.

## [1.5.32] - 2024-09-27
- Add dropnull to FirstK

Expand Down
149 changes: 149 additions & 0 deletions fennel/client_tests/test_expr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
from datetime import datetime, timezone, timedelta
from typing import Optional

import pandas as pd
import pytest

from fennel._vendor import requests
from fennel.connectors import source, Webhook
from fennel.datasets import dataset, field
from fennel.expr import col
from fennel.featuresets import featureset, feature as F
from fennel.testing import mock

webhook = Webhook(name="fennel_webhook")
__owner__ = "eng@fennel.ai"


@source(webhook.endpoint("UserInfoDataset"), disorder="14d", cdc="upsert")
@dataset(index=True)
class UserInfoDataset:
user_id: int = field(key=True)
name: str
birthdate: datetime
country: str
ts: datetime = field(timestamp=True)


@pytest.mark.integration
@mock
def test_now(client):
from fennel.expr import now

@featureset
class UserInfoFeatures:
user_id: int
name: Optional[str] = F(UserInfoDataset.name)
birthdate: Optional[datetime] = F(UserInfoDataset.birthdate)
age: Optional[int] = F(now().dt.since(col("birthdate"), unit="year"))
country: Optional[str] = F(UserInfoDataset.country)

# Sync the dataset
response = client.commit(
message="msg",
datasets=[UserInfoDataset],
featuresets=[UserInfoFeatures],
)
assert response.status_code == requests.codes.OK, response.json()

now = datetime.now(timezone.utc)
now_1y = now - timedelta(days=365)
df = pd.DataFrame(
{
"user_id": [1, 2, 3, 4, 5],
"name": ["Ross", "Monica", "Chandler", "Joey", "Rachel"],
"birthdate": [
datetime(1970, 1, 1, tzinfo=timezone.utc),
datetime(1980, 3, 12, tzinfo=timezone.utc),
datetime(1990, 5, 15, tzinfo=timezone.utc),
datetime(1997, 12, 24, tzinfo=timezone.utc),
datetime(2001, 1, 21, tzinfo=timezone.utc),
],
"country": ["India", "USA", "Africa", "UK", "Chile"],
"ts": [now_1y, now_1y, now_1y, now_1y, now_1y],
}
)
response = client.log("fennel_webhook", "UserInfoDataset", df)
assert response.status_code == requests.codes.OK, response.json()

client.sleep()

# Querying UserInfoFeatures
df = client.query(
inputs=[UserInfoFeatures.user_id],
outputs=[
UserInfoFeatures.name,
UserInfoFeatures.age,
UserInfoFeatures.country,
],
input_dataframe=pd.DataFrame(
{"UserInfoFeatures.user_id": [1, 2, 3, 4, 5, 6]}
),
)
assert df.shape == (6, 3)
assert df["UserInfoFeatures.name"].tolist() == [
"Ross",
"Monica",
"Chandler",
"Joey",
"Rachel",
pd.NA,
]
assert df["UserInfoFeatures.age"].tolist() == [54, 44, 34, 26, 23, pd.NA]
assert df["UserInfoFeatures.country"].tolist() == [
"India",
"USA",
"Africa",
"UK",
"Chile",
pd.NA,
]

if not client.is_integration_client():
df = client.query_offline(
inputs=[UserInfoFeatures.user_id],
outputs=[
UserInfoFeatures.name,
UserInfoFeatures.age,
UserInfoFeatures.country,
],
input_dataframe=pd.DataFrame(
{
"UserInfoFeatures.user_id": [1, 2, 3, 4, 5, 6],
"timestamp": [
now_1y,
now_1y,
now_1y,
now_1y,
now_1y,
now_1y,
],
}
),
timestamp_column="timestamp",
)
assert df.shape == (6, 4)
assert df["UserInfoFeatures.name"].tolist() == [
"Ross",
"Monica",
"Chandler",
"Joey",
"Rachel",
pd.NA,
]
assert df["UserInfoFeatures.age"].tolist() == [
53,
43,
33,
25,
22,
pd.NA,
]
assert df["UserInfoFeatures.country"].tolist() == [
"India",
"USA",
"Africa",
"UK",
"Chile",
pd.NA,
]
2 changes: 1 addition & 1 deletion fennel/datasets/test_invalid_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ def transform(cls, rating: Dataset):

assert (
str(e2.value)
== """invalid assign - '[Pipeline:transform]->assign node' error in expression for column `movie_suffixed`: Failed to compile expression: invalid expression: both sides of '+' must be numeric types but found String & String, left: col("movie"), right: lit(String("_suffix"))"""
== """invalid assign - '[Pipeline:transform]->assign node' error in expression for column `movie_suffixed`: failed to compile expression: invalid expression: both sides of '+' must be numeric types but found String & String, left: col("movie"), right: lit(String("_suffix"))"""
)


Expand Down
1 change: 1 addition & 0 deletions fennel/expr/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
var,
datetime,
from_epoch,
now,
Expr,
InvalidExprException,
)
31 changes: 30 additions & 1 deletion fennel/expr/expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import Any, Callable, Dict, Type, Optional
import pandas as pd

from fennel._vendor.pydantic import BaseModel # type: ignore
from fennel.dtypes.dtypes import FENNEL_STRUCT, FENNEL_STRUCT_SRC_CODE
from fennel.internal_lib.schema.schema import (
convert_dtype_to_arrow_type_with_nullable,
Expand All @@ -17,6 +18,7 @@
from fennel_data_lib import assign, type_of, matches

import fennel.gen.schema_pb2 as schema_proto
import fennel.gen.expr_pb2 as expr_proto
from fennel.internal_lib.schema import (
get_datatype,
cast_col_to_arrow_dtype,
Expand All @@ -27,6 +29,10 @@
)


class EvalContext(BaseModel):
now_col_name: Optional[str] = None


class InvalidExprException(Exception):
pass

Expand Down Expand Up @@ -360,6 +366,7 @@ def eval(
schema: Dict,
output_dtype: Optional[Type] = None,
parse=True,
context: Optional[EvalContext] = None,
) -> pd.Series:
from fennel.expr.serializer import ExprSerializer

Expand Down Expand Up @@ -410,8 +417,18 @@ def pa_to_pd(pa_data, ret_type, parse=True):
ret_type = output_dtype

serialized_ret_type = get_datatype(ret_type).SerializeToString()
if context is None:
serialized_context = expr_proto.EvalContext().SerializeToString()
else:
serialized_context = expr_proto.EvalContext(
**context.dict()
).SerializeToString()
arrow_col = assign(
proto_bytes, df_pa, proto_schema, serialized_ret_type
proto_bytes,
df_pa,
proto_schema,
serialized_ret_type,
serialized_context,
)
return pa_to_pd(arrow_col, ret_type, parse)

Expand Down Expand Up @@ -1089,6 +1106,14 @@ def __str__(self) -> str:
return f"fillnull({self.expr}, {self.fill})"


class Now(Expr):
def __init__(self):
super(Now, self).__init__()

def __str__(self) -> str:
return "now()"


class MakeStruct(Expr):
def __init__(self, fields: Dict[str, Expr], type: Type):
self.fields = fields
Expand Down Expand Up @@ -1183,3 +1208,7 @@ def datetime(
),
DateTimeNoop(),
)


def now() -> Now:
return Now()
5 changes: 5 additions & 0 deletions fennel/expr/serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,11 @@ def visitDateTimeLiteral(self, obj):
expr.datetime_literal.CopyFrom(datetime_literal)
return expr

def visitNow(self, obj):
expr = proto.Expr()
expr.now.CopyFrom(proto.Now())
return expr


def val_as_json(val: Any) -> str:
if isinstance(val, str):
Expand Down
37 changes: 34 additions & 3 deletions fennel/expr/test_expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@
import random
import pandas as pd
from dataclasses import dataclass, fields
from datetime import datetime
from datetime import datetime, timezone
from typing import Any, Dict, Optional, List
from fennel.datasets import dataset

from fennel.dtypes.dtypes import struct
from fennel.expr import col, when, lit
from fennel.expr.expr import TimeUnit, from_epoch, make_struct
from fennel.expr import col, when, lit, now
from fennel.expr.expr import (
TimeUnit,
from_epoch,
make_struct,
)
from fennel.expr.visitor import ExprPrinter, FetchReferences
from fennel.expr.serializer import ExprSerializer
from google.protobuf.json_format import ParseDict # type: ignore
Expand Down Expand Up @@ -1345,3 +1349,30 @@ def test_isnull():

for case in cases:
check_test_case(case)


def test_now():
cases = [
ExprTestCase(
expr=now().dt.since(col("birthdate"), unit="day"),
df=pd.DataFrame(
{
"birthdate": [
datetime.now(timezone.utc),
datetime.now(timezone.utc),
None,
datetime.now(timezone.utc),
],
}
),
schema={"birthdate": Optional[datetime]},
display='SINCE(NOW(), col("birthdate"), unit=TimeUnit.DAY)',
refs={"birthdate"},
eval_result=[0, 0, pd.NA, 0],
expected_dtype=Optional[int],
proto_json=None,
),
]

for case in cases:
check_test_case(case)
Loading
Loading