Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Lazy Infra Creation with online_store Option Set to True #144

Merged
merged 15 commits into from
Oct 22, 2024
28 changes: 28 additions & 0 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1255,6 +1255,20 @@ def materialize_incremental(
feature_views_to_materialize = self._get_feature_views_to_materialize(
feature_views
)

zabarn marked this conversation as resolved.
Show resolved Hide resolved
if getattr(self.config.online_store, "lazy_table_creation", False):
# feature_views_to_delete = self._get_feature_views_to_delete()
# don't delete any tables for now

self._get_provider().update_infra(
zabarn marked this conversation as resolved.
Show resolved Hide resolved
project=self.project,
tables_to_delete=[],
tables_to_keep=feature_views_to_materialize,
entities_to_delete=[],
entities_to_keep=[],
partial=True,
)

_print_materialization_log(
None,
end_date,
Expand Down Expand Up @@ -1350,6 +1364,20 @@ def materialize(
feature_views_to_materialize = self._get_feature_views_to_materialize(
feature_views
)

if getattr(self.config.online_store, "lazy_table_creation", False):
# feature_views_to_delete = self._get_feature_views_to_delete()
# don't delete any tables for now

self._get_provider().update_infra(
project=self.project,
tables_to_delete=[],
tables_to_keep=feature_views_to_materialize,
entities_to_delete=[],
entities_to_keep=[],
partial=True,
)

_print_materialization_log(
start_date,
end_date,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
event_ts TIMESTAMP,
created_ts TIMESTAMP,
PRIMARY KEY ((entity_key), feature_name)
) WITH CLUSTERING ORDER BY (feature_name ASC);
) WITH CLUSTERING ORDER BY (feature_name ASC) {optional_ttl_clause};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can change to WITH default_time_to_live = {ttl}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will handle ttl in a separate PR.

"""

DROP_TABLE_CQL_TEMPLATE = "DROP TABLE IF EXISTS {fqtable};"
Expand Down Expand Up @@ -124,7 +124,7 @@ class CassandraOnlineStoreConfig(FeastConfigBaseModel):
and password being the Client ID and Client Secret of the database token.
"""

type: Literal["cassandra"] = "cassandra"
type: Literal["cassandra", "scylladb"] = "cassandra"
"""Online store type selector."""

# settings for connection to Cassandra / Astra DB
Expand Down Expand Up @@ -153,6 +153,12 @@ class CassandraOnlineStoreConfig(FeastConfigBaseModel):
request_timeout: Optional[StrictFloat] = None
"""Request timeout in seconds."""

ttl: Optional[StrictInt] = None
Copy link
Collaborator

@EXPEbdodla EXPEbdodla Oct 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can set this to 0. ttl is discouraged due to tombstones. If users need it, they can set which comes with extra load on the cluster.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will handle ttl in a separate PR.

"""Time to live option"""

lazy_table_creation: Optional[bool] = False
"""If True, tables will be created on during materialization, rather than registration."""
zabarn marked this conversation as resolved.
Show resolved Hide resolved

class CassandraLoadBalancingPolicy(FeastConfigBaseModel):
"""
Configuration block related to the Cluster's load-balancing policy.
Expand Down Expand Up @@ -225,14 +231,25 @@ def _get_session(self, config: RepoConfig):
return self._session
if not self._session:
# configuration consistency checks
hosts = online_store_config.hosts
secure_bundle_path = online_store_config.secure_bundle_path
port = online_store_config.port or 9042
port = (
zabarn marked this conversation as resolved.
Show resolved Hide resolved
19042
if online_store_config.type == "scylladb"
else (online_store_config.port or 9042)
)
keyspace = online_store_config.keyspace
username = online_store_config.username
password = online_store_config.password
protocol_version = online_store_config.protocol_version
if online_store_config.type == "scylladb":
if online_store_config.load_balancing is None:
online_store_config.load_balancing = CassandraOnlineStoreConfig.CassandraLoadBalancingPolicy(
load_balancing_policy="TokenAwarePolicy(DCAwareRoundRobinPolicy)"
)
if online_store_config.protocol_version is None:
protocol_version = 4
zabarn marked this conversation as resolved.
Show resolved Hide resolved

hosts = online_store_config.hosts
db_directions = hosts or secure_bundle_path
if not db_directions or not keyspace:
raise CassandraInvalidConfig(E_CASSANDRA_NOT_CONFIGURED)
Expand Down Expand Up @@ -450,6 +467,13 @@ def update(
"""
project = config.project

if getattr(config.online_store, "lazy_table_creation", False):
logger.info(
f"Lazy table creation enabled. Skipping table creation for {project} online store."
)
# create tables during materialization
return

for table in tables_to_keep:
self._create_table(config, project, table)
for table in tables_to_delete:
Expand Down Expand Up @@ -562,7 +586,10 @@ def _create_table(self, config: RepoConfig, project: str, table: FeatureView):
fqtable = CassandraOnlineStore._fq_table_name(keyspace, project, table)
create_cql = self._get_cql_statement(config, "create", fqtable)
logger.info(f"Creating table {fqtable}.")
session.execute(create_cql)
if config.online_config.ttl:
session.execute(create_cql, parameters=config.online_config.ttl)
else:
session.execute(create_cql)
zabarn marked this conversation as resolved.
Show resolved Hide resolved

def _get_cql_statement(
self, config: RepoConfig, op_name: str, fqtable: str, **kwargs
Expand All @@ -579,9 +606,13 @@ def _get_cql_statement(
"""
session: Session = self._get_session(config)
template, prepare = CQL_TEMPLATE_MAP[op_name]
if op_name == "create" and config.online_config.ttl:
ttl_clause = " WITH default_time_to_live = ?"
else:
ttl_clause = None
zabarn marked this conversation as resolved.
Show resolved Hide resolved

statement = template.format(
fqtable=fqtable,
**kwargs,
fqtable=fqtable, optional_ttl_clause=ttl_clause, **kwargs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just pass the ttl value directly.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will handle ttl in a separate PR.

)
if prepare:
# using the statement itself as key (no problem with that)
Expand Down
1 change: 1 addition & 0 deletions sdk/python/feast/repo_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
"postgres": "feast.infra.online_stores.contrib.postgres.PostgreSQLOnlineStore",
"hbase": "feast.infra.online_stores.contrib.hbase_online_store.hbase.HbaseOnlineStore",
"cassandra": "feast.infra.online_stores.contrib.cassandra_online_store.cassandra_online_store.CassandraOnlineStore",
"scylladb": "feast.infra.online_stores.contrib.cassandra_online_store.cassandra_online_store.CassandraOnlineStore",
"mysql": "feast.infra.online_stores.contrib.mysql_online_store.mysql.MySQLOnlineStore",
"hazelcast": "feast.infra.online_stores.contrib.hazelcast_online_store.hazelcast_online_store.HazelcastOnlineStore",
"milvus": "feast.expediagroup.vectordb.milvus_online_store.MilvusOnlineStore",
Expand Down
5 changes: 5 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@

GE_REQUIRED = ["great_expectations>=0.15.41"]

SCYLLADB_REQUIRED = [
"scylla-driver>=3.24.0,<4",
]

AZURE_REQUIRED = [
"azure-storage-blob>=0.37.0",
"azure-identity>=1.6.1",
Expand Down Expand Up @@ -477,6 +481,7 @@ def run(self):
"hbase": HBASE_REQUIRED,
"docs": DOCS_REQUIRED,
"cassandra": CASSANDRA_REQUIRED,
"scylladb": SCYLLADB_REQUIRED,
"hazelcast": HAZELCAST_REQUIRED,
"grpcio": GRPCIO_REQUIRED,
"ibis": IBIS_REQUIRED,
Expand Down
Loading