From 0b393955797960905d7d47246270e499014f0581 Mon Sep 17 00:00:00 2001 From: Zach Barnett Date: Mon, 14 Oct 2024 15:54:34 -0500 Subject: [PATCH 01/15] test changes --- sdk/python/feast/feature_store.py | 28 +++++++++++++++ .../cassandra_online_store.py | 36 ++++++++++++++++--- sdk/python/feast/repo_config.py | 1 + setup.py | 5 +++ 4 files changed, 65 insertions(+), 5 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index ce5919eaf1..405fa894d4 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1255,6 +1255,20 @@ def materialize_incremental( feature_views_to_materialize = self._get_feature_views_to_materialize( feature_views ) + + if getattr(self.config.online_store, "lazy_table_creation", False): + # feature_views_to_delete = self._get_feature_views_to_delete() + # don't delete any tables for now + + self._get_provider().update_infra( + project=self.project, + tables_to_delete=[], + tables_to_keep=feature_views_to_materialize, + entities_to_delete=[], + entities_to_keep=[], + partial=True, + ) + _print_materialization_log( None, end_date, @@ -1350,6 +1364,20 @@ def materialize( feature_views_to_materialize = self._get_feature_views_to_materialize( feature_views ) + + if getattr(self.config.online_store, "lazy_table_creation", False): + # feature_views_to_delete = self._get_feature_views_to_delete() + # don't delete any tables for now + + self._get_provider().update_infra( + project=self.project, + tables_to_delete=[], + tables_to_keep=feature_views_to_materialize, + entities_to_delete=[], + entities_to_keep=[], + partial=True, + ) + _print_materialization_log( start_date, end_date, diff --git a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py index 0870bc709d..584f34b38e 100644 --- a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py +++ b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py @@ -88,7 +88,7 @@ event_ts TIMESTAMP, created_ts TIMESTAMP, PRIMARY KEY ((entity_key), feature_name) - ) WITH CLUSTERING ORDER BY (feature_name ASC); + ) WITH CLUSTERING ORDER BY (feature_name ASC) {optional_ttl_clause}; """ DROP_TABLE_CQL_TEMPLATE = "DROP TABLE IF EXISTS {fqtable};" @@ -153,6 +153,12 @@ class CassandraOnlineStoreConfig(FeastConfigBaseModel): request_timeout: Optional[StrictFloat] = None """Request timeout in seconds.""" + ttl: Optional[StrictInt] = None + '''Time to live option''' + + lazy_table_creation: Optional[bool] = False + """If True, tables will be created on during materialization, rather than registration.""" + class CassandraLoadBalancingPolicy(FeastConfigBaseModel): """ Configuration block related to the Cluster's load-balancing policy. @@ -225,14 +231,21 @@ def _get_session(self, config: RepoConfig): return self._session if not self._session: # configuration consistency checks - hosts = online_store_config.hosts secure_bundle_path = online_store_config.secure_bundle_path - port = online_store_config.port or 9042 + port = 19042 if online_store_config.type == "scylladb" else ( + online_store_config.port or 9042) keyspace = online_store_config.keyspace username = online_store_config.username password = online_store_config.password protocol_version = online_store_config.protocol_version + if online_store_config.type == "scylladb": + # Using the shard aware functionality + if online_store_config.load_balancing is None: + online_store_config.load_balancing = "TokenAwarePolicy(DCAwareRoundRobinPolicy)" + if online_store_config.protocol_version is None: + protocol_version = 4 + hosts = online_store_config.hosts db_directions = hosts or secure_bundle_path if not db_directions or not keyspace: raise CassandraInvalidConfig(E_CASSANDRA_NOT_CONFIGURED) @@ -450,6 +463,10 @@ def update( """ project = config.project + if config.online_config.lazy_table_creation: + # create tables during materialization + return + for table in tables_to_keep: self._create_table(config, project, table) for table in tables_to_delete: @@ -562,7 +579,10 @@ def _create_table(self, config: RepoConfig, project: str, table: FeatureView): fqtable = CassandraOnlineStore._fq_table_name(keyspace, project, table) create_cql = self._get_cql_statement(config, "create", fqtable) logger.info(f"Creating table {fqtable}.") - session.execute(create_cql) + if config.online_config.ttl: + session.execute(create_cql, parameters=config.online_config.ttl) + else: + session.execute(create_cql) def _get_cql_statement( self, config: RepoConfig, op_name: str, fqtable: str, **kwargs @@ -579,9 +599,15 @@ def _get_cql_statement( """ session: Session = self._get_session(config) template, prepare = CQL_TEMPLATE_MAP[op_name] + if op_name == "create" and config.online_config.ttl: + ttl_clause = " WITH default_time_to_live = ?" + else: + ttl_clause = None + statement = template.format( fqtable=fqtable, - **kwargs, + optional_ttl_clause=ttl_clause, + **kwargs ) if prepare: # using the statement itself as key (no problem with that) diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 2b6679eaa0..92f47b130a 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -62,6 +62,7 @@ "postgres": "feast.infra.online_stores.contrib.postgres.PostgreSQLOnlineStore", "hbase": "feast.infra.online_stores.contrib.hbase_online_store.hbase.HbaseOnlineStore", "cassandra": "feast.infra.online_stores.contrib.cassandra_online_store.cassandra_online_store.CassandraOnlineStore", + "scylladb": "feast.infra.online_stores.contrib.cassandra_online_store.cassandra_online_store.CassandraOnlineStore", "mysql": "feast.infra.online_stores.contrib.mysql_online_store.mysql.MySQLOnlineStore", "hazelcast": "feast.infra.online_stores.contrib.hazelcast_online_store.hazelcast_online_store.HazelcastOnlineStore", "milvus": "feast.expediagroup.vectordb.milvus_online_store.MilvusOnlineStore", diff --git a/setup.py b/setup.py index b04ad502d5..669733f5f9 100644 --- a/setup.py +++ b/setup.py @@ -119,6 +119,10 @@ GE_REQUIRED = ["great_expectations>=0.15.41"] +SCYLLADB_REQUIRED = [ + "scylla-driver>=3.24.0,<4", +] + AZURE_REQUIRED = [ "azure-storage-blob>=0.37.0", "azure-identity>=1.6.1", @@ -477,6 +481,7 @@ def run(self): "hbase": HBASE_REQUIRED, "docs": DOCS_REQUIRED, "cassandra": CASSANDRA_REQUIRED, + "scylladb": SCYLLADB_REQUIRED, "hazelcast": HAZELCAST_REQUIRED, "grpcio": GRPCIO_REQUIRED, "ibis": IBIS_REQUIRED, From 4d384911ff3143a49976499502dc13fc6744c7fa Mon Sep 17 00:00:00 2001 From: Zach Barnett Date: Tue, 15 Oct 2024 14:04:02 -0500 Subject: [PATCH 02/15] fix: allow for scylladb in feature_store.yaml --- .../contrib/cassandra_online_store/cassandra_online_store.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py index 584f34b38e..50d5d50db8 100644 --- a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py +++ b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py @@ -124,7 +124,7 @@ class CassandraOnlineStoreConfig(FeastConfigBaseModel): and password being the Client ID and Client Secret of the database token. """ - type: Literal["cassandra"] = "cassandra" + type: Literal["cassandra", "scylladb"] = "cassandra" """Online store type selector.""" # settings for connection to Cassandra / Astra DB @@ -463,7 +463,8 @@ def update( """ project = config.project - if config.online_config.lazy_table_creation: + if getattr(config.online_store, "lazy_table_creation", False): + logger.info(f"Lazy table creation enabled. Skipping table creation for {project} online store.") # create tables during materialization return From 6c5c8d727edd922341125aa8d9fa5a0c3b189c54 Mon Sep 17 00:00:00 2001 From: Zach Barnett Date: Tue, 15 Oct 2024 14:13:37 -0500 Subject: [PATCH 03/15] fix: linter issue --- .../contrib/cassandra_online_store/cassandra_online_store.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py index 50d5d50db8..22e59b58ad 100644 --- a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py +++ b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py @@ -239,9 +239,10 @@ def _get_session(self, config: RepoConfig): password = online_store_config.password protocol_version = online_store_config.protocol_version if online_store_config.type == "scylladb": - # Using the shard aware functionality if online_store_config.load_balancing is None: - online_store_config.load_balancing = "TokenAwarePolicy(DCAwareRoundRobinPolicy)" + online_store_config.load_balancing = CassandraOnlineStoreConfig.CassandraLoadBalancingPolicy( + load_balancing_policy="TokenAwarePolicy(DCAwareRoundRobinPolicy)" + ) if online_store_config.protocol_version is None: protocol_version = 4 From efd3835c5389912ab1059d6c4dbf3dd99bd13180 Mon Sep 17 00:00:00 2001 From: Zach Barnett Date: Tue, 15 Oct 2024 14:18:42 -0500 Subject: [PATCH 04/15] fix: linter issue --- .../cassandra_online_store.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py index 22e59b58ad..d59b4e1efd 100644 --- a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py +++ b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py @@ -154,7 +154,7 @@ class CassandraOnlineStoreConfig(FeastConfigBaseModel): """Request timeout in seconds.""" ttl: Optional[StrictInt] = None - '''Time to live option''' + """Time to live option""" lazy_table_creation: Optional[bool] = False """If True, tables will be created on during materialization, rather than registration.""" @@ -232,8 +232,11 @@ def _get_session(self, config: RepoConfig): if not self._session: # configuration consistency checks secure_bundle_path = online_store_config.secure_bundle_path - port = 19042 if online_store_config.type == "scylladb" else ( - online_store_config.port or 9042) + port = ( + 19042 + if online_store_config.type == "scylladb" + else (online_store_config.port or 9042) + ) keyspace = online_store_config.keyspace username = online_store_config.username password = online_store_config.password @@ -465,7 +468,9 @@ def update( project = config.project if getattr(config.online_store, "lazy_table_creation", False): - logger.info(f"Lazy table creation enabled. Skipping table creation for {project} online store.") + logger.info( + f"Lazy table creation enabled. Skipping table creation for {project} online store." + ) # create tables during materialization return @@ -607,9 +612,7 @@ def _get_cql_statement( ttl_clause = None statement = template.format( - fqtable=fqtable, - optional_ttl_clause=ttl_clause, - **kwargs + fqtable=fqtable, optional_ttl_clause=ttl_clause, **kwargs ) if prepare: # using the statement itself as key (no problem with that) From 7da2b5b74f68ce06c2bf0fd91d1e16a78be2f119 Mon Sep 17 00:00:00 2001 From: Zach Barnett Date: Wed, 16 Oct 2024 14:11:21 -0500 Subject: [PATCH 05/15] fix materialization/registration logic --- sdk/python/feast/expediagroup/provider/expedia.py | 11 +++++++++++ sdk/python/feast/feature_store.py | 6 ++++-- .../cassandra_online_store/cassandra_online_store.py | 7 ------- 3 files changed, 15 insertions(+), 9 deletions(-) diff --git a/sdk/python/feast/expediagroup/provider/expedia.py b/sdk/python/feast/expediagroup/provider/expedia.py index b242e00437..4eb2c56d06 100644 --- a/sdk/python/feast/expediagroup/provider/expedia.py +++ b/sdk/python/feast/expediagroup/provider/expedia.py @@ -68,8 +68,19 @@ def update_infra( entities_to_delete: Sequence[Entity], entities_to_keep: Sequence[Entity], partial: bool, + materialization_update: bool = False, ): if self.online_store: + if self.online_store.type == "scylladb" and materialization_update: + self.online_store.update( + config=self.repo_config, + tables_to_delete=tables_to_delete, + tables_to_keep=tables_to_keep, + entities_to_keep=entities_to_keep, + entities_to_delete=entities_to_delete, + partial=partial, + ) + if tables_to_delete: logger.info( f"Data associated to {[feature_view.name for feature_view in tables_to_delete]} feature views will be deleted from the online store based on ttl defined if the entities are not shared with other feature views" diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 405fa894d4..522db65164 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1256,7 +1256,7 @@ def materialize_incremental( feature_views ) - if getattr(self.config.online_store, "lazy_table_creation", False): + if getattr(self.config.online_store, "lazy_table_creation", False) and self.config.provider == "expedia": # feature_views_to_delete = self._get_feature_views_to_delete() # don't delete any tables for now @@ -1267,6 +1267,7 @@ def materialize_incremental( entities_to_delete=[], entities_to_keep=[], partial=True, + materialization_update=True, ) _print_materialization_log( @@ -1365,7 +1366,7 @@ def materialize( feature_views ) - if getattr(self.config.online_store, "lazy_table_creation", False): + if getattr(self.config.online_store, "lazy_table_creation", False) and self.config.provider == "expedia": # feature_views_to_delete = self._get_feature_views_to_delete() # don't delete any tables for now @@ -1376,6 +1377,7 @@ def materialize( entities_to_delete=[], entities_to_keep=[], partial=True, + materialization_update=True, ) _print_materialization_log( diff --git a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py index d59b4e1efd..ffae6babe1 100644 --- a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py +++ b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py @@ -467,13 +467,6 @@ def update( """ project = config.project - if getattr(config.online_store, "lazy_table_creation", False): - logger.info( - f"Lazy table creation enabled. Skipping table creation for {project} online store." - ) - # create tables during materialization - return - for table in tables_to_keep: self._create_table(config, project, table) for table in tables_to_delete: From a46a0ec43635cd6ae1fdda8740dae4957c19f522 Mon Sep 17 00:00:00 2001 From: Zach Barnett Date: Wed, 16 Oct 2024 14:26:08 -0500 Subject: [PATCH 06/15] fix: use kwargs in expedia provider update infra --- sdk/python/feast/expediagroup/provider/expedia.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/expediagroup/provider/expedia.py b/sdk/python/feast/expediagroup/provider/expedia.py index 4eb2c56d06..2433c4920d 100644 --- a/sdk/python/feast/expediagroup/provider/expedia.py +++ b/sdk/python/feast/expediagroup/provider/expedia.py @@ -68,8 +68,10 @@ def update_infra( entities_to_delete: Sequence[Entity], entities_to_keep: Sequence[Entity], partial: bool, - materialization_update: bool = False, + **kwargs ): + materialization_update = kwargs.get("materialization_update", False) + if self.online_store: if self.online_store.type == "scylladb" and materialization_update: self.online_store.update( From 46d1992a074f43b065eabd8e8dcdffd10b9d9a1d Mon Sep 17 00:00:00 2001 From: Zach Barnett Date: Wed, 16 Oct 2024 14:41:53 -0500 Subject: [PATCH 07/15] try to ignore keyword lint error --- sdk/python/feast/expediagroup/provider/expedia.py | 4 +--- sdk/python/feast/feature_store.py | 4 ++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/sdk/python/feast/expediagroup/provider/expedia.py b/sdk/python/feast/expediagroup/provider/expedia.py index 2433c4920d..4eb2c56d06 100644 --- a/sdk/python/feast/expediagroup/provider/expedia.py +++ b/sdk/python/feast/expediagroup/provider/expedia.py @@ -68,10 +68,8 @@ def update_infra( entities_to_delete: Sequence[Entity], entities_to_keep: Sequence[Entity], partial: bool, - **kwargs + materialization_update: bool = False, ): - materialization_update = kwargs.get("materialization_update", False) - if self.online_store: if self.online_store.type == "scylladb" and materialization_update: self.online_store.update( diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 522db65164..b80dafffb5 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1267,7 +1267,7 @@ def materialize_incremental( entities_to_delete=[], entities_to_keep=[], partial=True, - materialization_update=True, + materialization_update=True, # type: ignore ) _print_materialization_log( @@ -1377,7 +1377,7 @@ def materialize( entities_to_delete=[], entities_to_keep=[], partial=True, - materialization_update=True, + materialization_update=True, # type: ignore ) _print_materialization_log( From 222ee63bfab110470ec4a87aeb09328293ce9e3d Mon Sep 17 00:00:00 2001 From: Zach Barnett Date: Wed, 16 Oct 2024 14:45:12 -0500 Subject: [PATCH 08/15] fix: lint formatting --- sdk/python/feast/feature_store.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index b80dafffb5..fbe45d3d9a 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1256,7 +1256,10 @@ def materialize_incremental( feature_views ) - if getattr(self.config.online_store, "lazy_table_creation", False) and self.config.provider == "expedia": + if ( + getattr(self.config.online_store, "lazy_table_creation", False) + and self.config.provider == "expedia" + ): # feature_views_to_delete = self._get_feature_views_to_delete() # don't delete any tables for now @@ -1267,7 +1270,7 @@ def materialize_incremental( entities_to_delete=[], entities_to_keep=[], partial=True, - materialization_update=True, # type: ignore + materialization_update=True, # type: ignore ) _print_materialization_log( @@ -1366,7 +1369,10 @@ def materialize( feature_views ) - if getattr(self.config.online_store, "lazy_table_creation", False) and self.config.provider == "expedia": + if ( + getattr(self.config.online_store, "lazy_table_creation", False) + and self.config.provider == "expedia" + ): # feature_views_to_delete = self._get_feature_views_to_delete() # don't delete any tables for now @@ -1377,7 +1383,7 @@ def materialize( entities_to_delete=[], entities_to_keep=[], partial=True, - materialization_update=True, # type: ignore + materialization_update=True, # type: ignore ) _print_materialization_log( From 678a790fef489e9ab0109098f23382eb3c70d347 Mon Sep 17 00:00:00 2001 From: Zach Barnett Date: Wed, 16 Oct 2024 14:53:41 -0500 Subject: [PATCH 09/15] allow for non-lazy table creation --- sdk/python/feast/expediagroup/provider/expedia.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/expediagroup/provider/expedia.py b/sdk/python/feast/expediagroup/provider/expedia.py index 4eb2c56d06..d69177ec9b 100644 --- a/sdk/python/feast/expediagroup/provider/expedia.py +++ b/sdk/python/feast/expediagroup/provider/expedia.py @@ -71,7 +71,7 @@ def update_infra( materialization_update: bool = False, ): if self.online_store: - if self.online_store.type == "scylladb" and materialization_update: + if materialization_update or not getattr(self.repo_config.online_store, "lazy_table_creation", False): self.online_store.update( config=self.repo_config, tables_to_delete=tables_to_delete, From a50905c55898378cdf24f2e117a95b2c68d9daed Mon Sep 17 00:00:00 2001 From: Zach Barnett Date: Wed, 16 Oct 2024 14:56:25 -0500 Subject: [PATCH 10/15] fix: lint formatting --- sdk/python/feast/expediagroup/provider/expedia.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/expediagroup/provider/expedia.py b/sdk/python/feast/expediagroup/provider/expedia.py index d69177ec9b..029d301e16 100644 --- a/sdk/python/feast/expediagroup/provider/expedia.py +++ b/sdk/python/feast/expediagroup/provider/expedia.py @@ -71,7 +71,9 @@ def update_infra( materialization_update: bool = False, ): if self.online_store: - if materialization_update or not getattr(self.repo_config.online_store, "lazy_table_creation", False): + if materialization_update or not getattr( + self.repo_config.online_store, "lazy_table_creation", False + ): self.online_store.update( config=self.repo_config, tables_to_delete=tables_to_delete, From a16e1f2278711811865ae5e2152feafeb1a676ee Mon Sep 17 00:00:00 2001 From: Zach Barnett Date: Wed, 16 Oct 2024 15:03:39 -0500 Subject: [PATCH 11/15] dont update for non-scylladb --- sdk/python/feast/expediagroup/provider/expedia.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/expediagroup/provider/expedia.py b/sdk/python/feast/expediagroup/provider/expedia.py index 029d301e16..6bef40b8a9 100644 --- a/sdk/python/feast/expediagroup/provider/expedia.py +++ b/sdk/python/feast/expediagroup/provider/expedia.py @@ -70,10 +70,12 @@ def update_infra( partial: bool, materialization_update: bool = False, ): + scylla_no_lazy_table_creation = self.repo_config.online_store.type == "scylladb" and not getattr( + self.repo_config.online_store, "lazy_table_creation", False + ) + if self.online_store: - if materialization_update or not getattr( - self.repo_config.online_store, "lazy_table_creation", False - ): + if materialization_update or scylla_no_lazy_table_creation: self.online_store.update( config=self.repo_config, tables_to_delete=tables_to_delete, From 2fc023d48d8b6e97cd4d0bc38407f39baf2179f9 Mon Sep 17 00:00:00 2001 From: Zach Barnett Date: Wed, 16 Oct 2024 15:07:08 -0500 Subject: [PATCH 12/15] fix: lint formatting --- sdk/python/feast/expediagroup/provider/expedia.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/expediagroup/provider/expedia.py b/sdk/python/feast/expediagroup/provider/expedia.py index 6bef40b8a9..6e9dfa6c93 100644 --- a/sdk/python/feast/expediagroup/provider/expedia.py +++ b/sdk/python/feast/expediagroup/provider/expedia.py @@ -70,8 +70,9 @@ def update_infra( partial: bool, materialization_update: bool = False, ): - scylla_no_lazy_table_creation = self.repo_config.online_store.type == "scylladb" and not getattr( - self.repo_config.online_store, "lazy_table_creation", False + scylla_no_lazy_table_creation = ( + self.repo_config.online_store.type == "scylladb" + and not getattr(self.repo_config.online_store, "lazy_table_creation", False) ) if self.online_store: From 50d3c48f4270238bb0432ceda319d07e11375c41 Mon Sep 17 00:00:00 2001 From: Zach Barnett Date: Mon, 21 Oct 2024 10:08:45 -0500 Subject: [PATCH 13/15] fix: address Bhargav's comments --- .../feast/expediagroup/provider/expedia.py | 16 --------- sdk/python/feast/feature_store.py | 17 +++++----- .../cassandra_online_store.py | 33 ++++--------------- 3 files changed, 15 insertions(+), 51 deletions(-) diff --git a/sdk/python/feast/expediagroup/provider/expedia.py b/sdk/python/feast/expediagroup/provider/expedia.py index 6e9dfa6c93..b242e00437 100644 --- a/sdk/python/feast/expediagroup/provider/expedia.py +++ b/sdk/python/feast/expediagroup/provider/expedia.py @@ -68,24 +68,8 @@ def update_infra( entities_to_delete: Sequence[Entity], entities_to_keep: Sequence[Entity], partial: bool, - materialization_update: bool = False, ): - scylla_no_lazy_table_creation = ( - self.repo_config.online_store.type == "scylladb" - and not getattr(self.repo_config.online_store, "lazy_table_creation", False) - ) - if self.online_store: - if materialization_update or scylla_no_lazy_table_creation: - self.online_store.update( - config=self.repo_config, - tables_to_delete=tables_to_delete, - tables_to_keep=tables_to_keep, - entities_to_keep=entities_to_keep, - entities_to_delete=entities_to_delete, - partial=partial, - ) - if tables_to_delete: logger.info( f"Data associated to {[feature_view.name for feature_view in tables_to_delete]} feature views will be deleted from the online store based on ttl defined if the entities are not shared with other feature views" diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index fbe45d3d9a..22669ff66e 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -991,14 +991,15 @@ def apply( ) tables_to_keep: List[FeatureView] = views_to_update + sfvs_to_update # type: ignore - self._get_provider().update_infra( - project=self.project, - tables_to_delete=tables_to_delete, - tables_to_keep=tables_to_keep, - entities_to_delete=entities_to_delete if not partial else [], - entities_to_keep=entities_to_update, - partial=partial, - ) + if not getattr(self.config.online_store, "lazy_table_creation", False): + self._get_provider().update_infra( + project=self.project, + tables_to_delete=tables_to_delete, + tables_to_keep=tables_to_keep, + entities_to_delete=entities_to_delete if not partial else [], + entities_to_keep=entities_to_update, + partial=partial, + ) self._registry.commit() diff --git a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py index ffae6babe1..869201e9fc 100644 --- a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py +++ b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py @@ -88,7 +88,7 @@ event_ts TIMESTAMP, created_ts TIMESTAMP, PRIMARY KEY ((entity_key), feature_name) - ) WITH CLUSTERING ORDER BY (feature_name ASC) {optional_ttl_clause}; + ) WITH CLUSTERING ORDER BY (feature_name ASC); """ DROP_TABLE_CQL_TEMPLATE = "DROP TABLE IF EXISTS {fqtable};" @@ -153,9 +153,6 @@ class CassandraOnlineStoreConfig(FeastConfigBaseModel): request_timeout: Optional[StrictFloat] = None """Request timeout in seconds.""" - ttl: Optional[StrictInt] = None - """Time to live option""" - lazy_table_creation: Optional[bool] = False """If True, tables will be created on during materialization, rather than registration.""" @@ -231,25 +228,14 @@ def _get_session(self, config: RepoConfig): return self._session if not self._session: # configuration consistency checks + hosts = online_store_config.hosts secure_bundle_path = online_store_config.secure_bundle_path - port = ( - 19042 - if online_store_config.type == "scylladb" - else (online_store_config.port or 9042) - ) + port = online_store_config.port or 9042 keyspace = online_store_config.keyspace username = online_store_config.username password = online_store_config.password protocol_version = online_store_config.protocol_version - if online_store_config.type == "scylladb": - if online_store_config.load_balancing is None: - online_store_config.load_balancing = CassandraOnlineStoreConfig.CassandraLoadBalancingPolicy( - load_balancing_policy="TokenAwarePolicy(DCAwareRoundRobinPolicy)" - ) - if online_store_config.protocol_version is None: - protocol_version = 4 - hosts = online_store_config.hosts db_directions = hosts or secure_bundle_path if not db_directions or not keyspace: raise CassandraInvalidConfig(E_CASSANDRA_NOT_CONFIGURED) @@ -579,10 +565,7 @@ def _create_table(self, config: RepoConfig, project: str, table: FeatureView): fqtable = CassandraOnlineStore._fq_table_name(keyspace, project, table) create_cql = self._get_cql_statement(config, "create", fqtable) logger.info(f"Creating table {fqtable}.") - if config.online_config.ttl: - session.execute(create_cql, parameters=config.online_config.ttl) - else: - session.execute(create_cql) + session.execute(create_cql) def _get_cql_statement( self, config: RepoConfig, op_name: str, fqtable: str, **kwargs @@ -599,13 +582,9 @@ def _get_cql_statement( """ session: Session = self._get_session(config) template, prepare = CQL_TEMPLATE_MAP[op_name] - if op_name == "create" and config.online_config.ttl: - ttl_clause = " WITH default_time_to_live = ?" - else: - ttl_clause = None - statement = template.format( - fqtable=fqtable, optional_ttl_clause=ttl_clause, **kwargs + fqtable=fqtable, + **kwargs, ) if prepare: # using the statement itself as key (no problem with that) From 84e268ceb6419562d5524688edd5d1a7384e3bfd Mon Sep 17 00:00:00 2001 From: Zach Barnett Date: Mon, 21 Oct 2024 10:09:56 -0500 Subject: [PATCH 14/15] fix: address Bhargav's comments 2 --- sdk/python/feast/feature_store.py | 34 ------------------------------- 1 file changed, 34 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 22669ff66e..44d7242bd0 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1257,23 +1257,6 @@ def materialize_incremental( feature_views ) - if ( - getattr(self.config.online_store, "lazy_table_creation", False) - and self.config.provider == "expedia" - ): - # feature_views_to_delete = self._get_feature_views_to_delete() - # don't delete any tables for now - - self._get_provider().update_infra( - project=self.project, - tables_to_delete=[], - tables_to_keep=feature_views_to_materialize, - entities_to_delete=[], - entities_to_keep=[], - partial=True, - materialization_update=True, # type: ignore - ) - _print_materialization_log( None, end_date, @@ -1370,23 +1353,6 @@ def materialize( feature_views ) - if ( - getattr(self.config.online_store, "lazy_table_creation", False) - and self.config.provider == "expedia" - ): - # feature_views_to_delete = self._get_feature_views_to_delete() - # don't delete any tables for now - - self._get_provider().update_infra( - project=self.project, - tables_to_delete=[], - tables_to_keep=feature_views_to_materialize, - entities_to_delete=[], - entities_to_keep=[], - partial=True, - materialization_update=True, # type: ignore - ) - _print_materialization_log( start_date, end_date, From aafc6e5ca7c290249954094e461d611aab5da1ea Mon Sep 17 00:00:00 2001 From: Zach Barnett Date: Tue, 22 Oct 2024 16:24:41 -0500 Subject: [PATCH 15/15] fix: update lazy_table_creation docstring --- .../contrib/cassandra_online_store/cassandra_online_store.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py index 869201e9fc..af747650e2 100644 --- a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py +++ b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py @@ -154,7 +154,10 @@ class CassandraOnlineStoreConfig(FeastConfigBaseModel): """Request timeout in seconds.""" lazy_table_creation: Optional[bool] = False - """If True, tables will be created on during materialization, rather than registration.""" + """ + If True, tables will be created on during materialization, rather than registration. + Table deletion is not currently supported in this mode. + """ class CassandraLoadBalancingPolicy(FeastConfigBaseModel): """