Skip to content

Commit

Permalink
ds: Allow version to be a field name (#548)
Browse files Browse the repository at this point in the history
  • Loading branch information
aditya-nambiar authored Sep 2, 2024
1 parent 3feb525 commit ba94c86
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 24 deletions.
3 changes: 3 additions & 0 deletions fennel/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## [1.5.15] - 2024-09-01
- Allow version to be used as field in dataset

## [1.5.14] - 2024-08-29
- Add support for S3 batch sink

Expand Down
8 changes: 2 additions & 6 deletions fennel/datasets/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -1963,10 +1963,6 @@ def on_demand(self):
def fields(self):
return self._fields

@property
def version(self):
return self._version


# ---------------------------------------------------------------------
# Index
Expand Down Expand Up @@ -2114,7 +2110,7 @@ def indices_from_ds(

if index_obj.online:
online_index = index_proto.OnlineIndex(
ds_version=obj.version,
ds_version=obj._version,
ds_name=obj._name,
index_type=index_type,
duration=index_proto.IndexDuration(forever="forever"),
Expand All @@ -2126,7 +2122,7 @@ def indices_from_ds(
offline_index = None
elif index_obj.offline == IndexDuration.forever:
offline_index = index_proto.OfflineIndex(
ds_version=obj.version,
ds_version=obj._version,
ds_name=obj._name,
index_type=index_type,
duration=index_proto.IndexDuration(forever="forever"),
Expand Down
9 changes: 6 additions & 3 deletions fennel/datasets/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class UserInfoDataset:
age: int = field().meta(description="Users age lol") # type: ignore
account_creation_date: datetime
country: Optional[str]
version: int
timestamp: datetime = field(timestamp=True)


Expand All @@ -58,7 +59,6 @@ def test_simple_dataset():
{
"name": "UserInfoDataset",
"metadata": {"owner": "ml-eng@fennel.ai"},
"version": 1,
"dsschema": {
"keys": {
"fields": [
Expand All @@ -81,15 +81,17 @@ def test_simple_dataset():
"optionalType": {"of": {"stringType": {}}}
},
},
{"name": "version", "dtype": {"intType": {}}},
]
},
"timestamp": "timestamp",
},
"history": "63072000s",
"retention": "63072000s",
"fieldMetadata": {
"age": {"description": "Users age lol"},
"version": {},
"name": {},
"age": {"description": "Users age lol"},
"account_creation_date": {},
"country": {},
"user_id": {"owner": "xyz@fennel.ai"},
Expand All @@ -99,6 +101,7 @@ def test_simple_dataset():
},
"pycode": {},
"isSourceDataset": True,
"version": 1,
}
],
"sources": [
Expand All @@ -118,8 +121,8 @@ def test_simple_dataset():
},
"dataset": "UserInfoDataset",
"dsVersion": 1,
"cdc": "Upsert",
"disorder": "1209600s",
"cdc": "Upsert",
}
],
"extdbs": [
Expand Down
24 changes: 12 additions & 12 deletions fennel/internal_lib/to_proto/to_proto.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ def sinks_from_ds(
)

return [
_conn_to_sink_proto(sink, ds._name, ds.version)
_conn_to_sink_proto(sink, ds._name, ds._version)
for sink in filtered_sinks
]

Expand Down Expand Up @@ -830,7 +830,7 @@ def _webhook_to_source_proto(
),
disorder=to_duration_proto(connector.disorder),
dataset=dataset._name,
ds_version=dataset.version,
ds_version=dataset._version,
cdc=to_cdc_proto(connector.cdc),
pre_proc=_pre_proc_to_proto(connector.pre_proc),
bounded=connector.bounded,
Expand Down Expand Up @@ -871,7 +871,7 @@ def _kafka_conn_to_source_proto(
),
disorder=to_duration_proto(connector.disorder),
dataset=dataset._name,
ds_version=dataset.version,
ds_version=dataset._version,
cdc=to_cdc_proto(connector.cdc),
pre_proc=_pre_proc_to_proto(connector.pre_proc),
starting_from=_to_timestamp_proto(connector.since),
Expand Down Expand Up @@ -968,7 +968,7 @@ def _s3_conn_to_source_proto(
source = connector_proto.Source(
table=ext_table,
dataset=dataset._name,
ds_version=dataset.version,
ds_version=dataset._version,
every=to_duration_proto(connector.every),
disorder=to_duration_proto(connector.disorder),
starting_from=_to_timestamp_proto(connector.since),
Expand Down Expand Up @@ -1163,7 +1163,7 @@ def _bigquery_conn_to_source_proto(
connector_proto.Source(
table=ext_table,
dataset=dataset._name,
ds_version=dataset.version,
ds_version=dataset._version,
cursor=connector.cursor,
every=to_duration_proto(connector.every),
disorder=to_duration_proto(connector.disorder),
Expand Down Expand Up @@ -1237,7 +1237,7 @@ def _redshift_conn_to_source_proto(
connector_proto.Source(
table=ext_table,
dataset=dataset._name,
ds_version=dataset.version,
ds_version=dataset._version,
cursor=connector.cursor,
every=to_duration_proto(connector.every),
disorder=to_duration_proto(connector.disorder),
Expand Down Expand Up @@ -1333,7 +1333,7 @@ def _mongo_conn_to_source_proto(
connector_proto.Source(
table=ext_table,
dataset=dataset._name,
ds_version=dataset.version,
ds_version=dataset._version,
cursor=connector.cursor,
every=to_duration_proto(connector.every),
disorder=to_duration_proto(connector.disorder),
Expand Down Expand Up @@ -1409,7 +1409,7 @@ def _pubsub_conn_to_source_proto(
),
disorder=to_duration_proto(connector.disorder),
dataset=dataset._name,
ds_version=dataset.version,
ds_version=dataset._version,
cdc=to_cdc_proto(connector.cdc),
pre_proc=_pre_proc_to_proto(connector.pre_proc),
bounded=connector.bounded,
Expand Down Expand Up @@ -1448,7 +1448,7 @@ def _snowflake_conn_to_source_proto(
connector_proto.Source(
table=ext_table,
dataset=dataset._name,
ds_version=dataset.version,
ds_version=dataset._version,
cursor=connector.cursor,
every=to_duration_proto(connector.every),
disorder=to_duration_proto(connector.disorder),
Expand Down Expand Up @@ -1530,7 +1530,7 @@ def _mysql_conn_to_source_proto(
connector_proto.Source(
table=ext_table,
dataset=dataset._name,
ds_version=dataset.version,
ds_version=dataset._version,
cursor=connector.cursor,
every=to_duration_proto(connector.every),
disorder=to_duration_proto(connector.disorder),
Expand Down Expand Up @@ -1621,7 +1621,7 @@ def _pg_conn_to_source_proto(
connector_proto.Source(
table=ext_table,
dataset=dataset._name,
ds_version=dataset.version,
ds_version=dataset._version,
cursor=connector.cursor,
every=to_duration_proto(connector.every),
disorder=to_duration_proto(connector.disorder),
Expand Down Expand Up @@ -1711,7 +1711,7 @@ def _kinesis_conn_to_source_proto(
connector_proto.Source(
table=ext_table,
dataset=dataset._name,
ds_version=dataset.version,
ds_version=dataset._version,
disorder=to_duration_proto(connector.disorder),
cdc=to_cdc_proto(connector.cdc),
starting_from=_to_timestamp_proto(connector.since),
Expand Down
4 changes: 2 additions & 2 deletions fennel/testing/mock_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,9 @@ def is_new_dataset_eligible(dataset_new, dataset_old):
1. If version same and old == new then eligible.
2. new != old then version should be higher.
"""
if dataset_new.version > dataset_old.version:
if dataset_new._version > dataset_old._version:
return True
elif dataset_new.version < dataset_old.version:
elif dataset_new._version < dataset_old._version:
return False
else:
return dataset_old.signature() == dataset_new.signature()
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "fennel-ai"
version = "1.5.14"
version = "1.5.15"
description = "The modern realtime feature engineering platform"
authors = ["Fennel AI <developers@fennel.ai>"]
packages = [{ include = "fennel" }]
Expand Down Expand Up @@ -41,6 +41,7 @@ pyspelling = "^2.8.2"
# A pyyaml bug when using cython, https://github.com/yaml/pyyaml/issues/601
pyyaml = "^6.0.1"


[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
Expand Down

0 comments on commit ba94c86

Please sign in to comment.