diff --git a/fennel/CHANGELOG.md b/fennel/CHANGELOG.md index 4b97c12db..7d700f59a 100644 --- a/fennel/CHANGELOG.md +++ b/fennel/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## [1.5.15] - 2024-09-01 +- Allow version to be used as field in dataset + ## [1.5.14] - 2024-08-29 - Add support for S3 batch sink diff --git a/fennel/datasets/datasets.py b/fennel/datasets/datasets.py index f3c6d5c7e..db67ff950 100644 --- a/fennel/datasets/datasets.py +++ b/fennel/datasets/datasets.py @@ -1963,10 +1963,6 @@ def on_demand(self): def fields(self): return self._fields - @property - def version(self): - return self._version - # --------------------------------------------------------------------- # Index @@ -2114,7 +2110,7 @@ def indices_from_ds( if index_obj.online: online_index = index_proto.OnlineIndex( - ds_version=obj.version, + ds_version=obj._version, ds_name=obj._name, index_type=index_type, duration=index_proto.IndexDuration(forever="forever"), @@ -2126,7 +2122,7 @@ def indices_from_ds( offline_index = None elif index_obj.offline == IndexDuration.forever: offline_index = index_proto.OfflineIndex( - ds_version=obj.version, + ds_version=obj._version, ds_name=obj._name, index_type=index_type, duration=index_proto.IndexDuration(forever="forever"), diff --git a/fennel/datasets/test_dataset.py b/fennel/datasets/test_dataset.py index 5ab16babe..0b41bab55 100644 --- a/fennel/datasets/test_dataset.py +++ b/fennel/datasets/test_dataset.py @@ -41,6 +41,7 @@ class UserInfoDataset: age: int = field().meta(description="Users age lol") # type: ignore account_creation_date: datetime country: Optional[str] + version: int timestamp: datetime = field(timestamp=True) @@ -58,7 +59,6 @@ def test_simple_dataset(): { "name": "UserInfoDataset", "metadata": {"owner": "ml-eng@fennel.ai"}, - "version": 1, "dsschema": { "keys": { "fields": [ @@ -81,6 +81,7 @@ def test_simple_dataset(): "optionalType": {"of": {"stringType": {}}} }, }, + {"name": "version", "dtype": {"intType": {}}}, ] }, "timestamp": "timestamp", @@ -88,8 +89,9 @@ def test_simple_dataset(): "history": "63072000s", "retention": "63072000s", "fieldMetadata": { - "age": {"description": "Users age lol"}, + "version": {}, "name": {}, + "age": {"description": "Users age lol"}, "account_creation_date": {}, "country": {}, "user_id": {"owner": "xyz@fennel.ai"}, @@ -99,6 +101,7 @@ def test_simple_dataset(): }, "pycode": {}, "isSourceDataset": True, + "version": 1, } ], "sources": [ @@ -118,8 +121,8 @@ def test_simple_dataset(): }, "dataset": "UserInfoDataset", "dsVersion": 1, - "cdc": "Upsert", "disorder": "1209600s", + "cdc": "Upsert", } ], "extdbs": [ diff --git a/fennel/internal_lib/to_proto/to_proto.py b/fennel/internal_lib/to_proto/to_proto.py index 6cee7c1c1..1c566a0d2 100644 --- a/fennel/internal_lib/to_proto/to_proto.py +++ b/fennel/internal_lib/to_proto/to_proto.py @@ -497,7 +497,7 @@ def sinks_from_ds( ) return [ - _conn_to_sink_proto(sink, ds._name, ds.version) + _conn_to_sink_proto(sink, ds._name, ds._version) for sink in filtered_sinks ] @@ -830,7 +830,7 @@ def _webhook_to_source_proto( ), disorder=to_duration_proto(connector.disorder), dataset=dataset._name, - ds_version=dataset.version, + ds_version=dataset._version, cdc=to_cdc_proto(connector.cdc), pre_proc=_pre_proc_to_proto(connector.pre_proc), bounded=connector.bounded, @@ -871,7 +871,7 @@ def _kafka_conn_to_source_proto( ), disorder=to_duration_proto(connector.disorder), dataset=dataset._name, - ds_version=dataset.version, + ds_version=dataset._version, cdc=to_cdc_proto(connector.cdc), pre_proc=_pre_proc_to_proto(connector.pre_proc), starting_from=_to_timestamp_proto(connector.since), @@ -968,7 +968,7 @@ def _s3_conn_to_source_proto( source = connector_proto.Source( table=ext_table, dataset=dataset._name, - ds_version=dataset.version, + ds_version=dataset._version, every=to_duration_proto(connector.every), disorder=to_duration_proto(connector.disorder), starting_from=_to_timestamp_proto(connector.since), @@ -1163,7 +1163,7 @@ def _bigquery_conn_to_source_proto( connector_proto.Source( table=ext_table, dataset=dataset._name, - ds_version=dataset.version, + ds_version=dataset._version, cursor=connector.cursor, every=to_duration_proto(connector.every), disorder=to_duration_proto(connector.disorder), @@ -1237,7 +1237,7 @@ def _redshift_conn_to_source_proto( connector_proto.Source( table=ext_table, dataset=dataset._name, - ds_version=dataset.version, + ds_version=dataset._version, cursor=connector.cursor, every=to_duration_proto(connector.every), disorder=to_duration_proto(connector.disorder), @@ -1333,7 +1333,7 @@ def _mongo_conn_to_source_proto( connector_proto.Source( table=ext_table, dataset=dataset._name, - ds_version=dataset.version, + ds_version=dataset._version, cursor=connector.cursor, every=to_duration_proto(connector.every), disorder=to_duration_proto(connector.disorder), @@ -1409,7 +1409,7 @@ def _pubsub_conn_to_source_proto( ), disorder=to_duration_proto(connector.disorder), dataset=dataset._name, - ds_version=dataset.version, + ds_version=dataset._version, cdc=to_cdc_proto(connector.cdc), pre_proc=_pre_proc_to_proto(connector.pre_proc), bounded=connector.bounded, @@ -1448,7 +1448,7 @@ def _snowflake_conn_to_source_proto( connector_proto.Source( table=ext_table, dataset=dataset._name, - ds_version=dataset.version, + ds_version=dataset._version, cursor=connector.cursor, every=to_duration_proto(connector.every), disorder=to_duration_proto(connector.disorder), @@ -1530,7 +1530,7 @@ def _mysql_conn_to_source_proto( connector_proto.Source( table=ext_table, dataset=dataset._name, - ds_version=dataset.version, + ds_version=dataset._version, cursor=connector.cursor, every=to_duration_proto(connector.every), disorder=to_duration_proto(connector.disorder), @@ -1621,7 +1621,7 @@ def _pg_conn_to_source_proto( connector_proto.Source( table=ext_table, dataset=dataset._name, - ds_version=dataset.version, + ds_version=dataset._version, cursor=connector.cursor, every=to_duration_proto(connector.every), disorder=to_duration_proto(connector.disorder), @@ -1711,7 +1711,7 @@ def _kinesis_conn_to_source_proto( connector_proto.Source( table=ext_table, dataset=dataset._name, - ds_version=dataset.version, + ds_version=dataset._version, disorder=to_duration_proto(connector.disorder), cdc=to_cdc_proto(connector.cdc), starting_from=_to_timestamp_proto(connector.since), diff --git a/fennel/testing/mock_client.py b/fennel/testing/mock_client.py index c93838745..8bff7322d 100644 --- a/fennel/testing/mock_client.py +++ b/fennel/testing/mock_client.py @@ -166,9 +166,9 @@ def is_new_dataset_eligible(dataset_new, dataset_old): 1. If version same and old == new then eligible. 2. new != old then version should be higher. """ - if dataset_new.version > dataset_old.version: + if dataset_new._version > dataset_old._version: return True - elif dataset_new.version < dataset_old.version: + elif dataset_new._version < dataset_old._version: return False else: return dataset_old.signature() == dataset_new.signature() diff --git a/pyproject.toml b/pyproject.toml index 4b7e06cff..da96109de 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "fennel-ai" -version = "1.5.14" +version = "1.5.15" description = "The modern realtime feature engineering platform" authors = ["Fennel AI "] packages = [{ include = "fennel" }] @@ -41,6 +41,7 @@ pyspelling = "^2.8.2" # A pyyaml bug when using cython, https://github.com/yaml/pyyaml/issues/601 pyyaml = "^6.0.1" + [build-system] requires = ["poetry-core>=1.0.0"] build-backend = "poetry.core.masonry.api"