diff --git a/Makefile b/Makefile index b44aaf0ee5..613d688055 100644 --- a/Makefile +++ b/Makefile @@ -331,6 +331,17 @@ test-python-universal-cassandra-no-cloud-providers: not test_snowflake" \ sdk/python/tests +test-python-universal-singlestore-online: + PYTHONPATH='.' \ + FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.online_stores.contrib.singlestore_repo_configuration \ + PYTEST_PLUGINS=sdk.python.tests.integration.feature_repos.universal.online_store.singlestore \ + python -m pytest -n 8 --integration \ + -k "not test_universal_cli and \ + not gcs_registry and \ + not s3_registry and \ + not test_snowflake" \ + sdk/python/tests + test-python-universal: python -m pytest -n 8 --integration sdk/python/tests diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index 06c5edcc8b..f55ab0d92f 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -103,6 +103,7 @@ * [Rockset (contrib)](reference/online-stores/rockset.md) * [Hazelcast (contrib)](reference/online-stores/hazelcast.md) * [ScyllaDB (contrib)](reference/online-stores/scylladb.md) + * [SingleStore (contrib)](reference/online-stores/singlestore.md) * [Providers](reference/providers/README.md) * [Local](reference/providers/local.md) * [Google Cloud Platform](reference/providers/google-cloud-platform.md) diff --git a/docs/reference/online-stores/README.md b/docs/reference/online-stores/README.md index b5f4eb8de8..0acf6701f9 100644 --- a/docs/reference/online-stores/README.md +++ b/docs/reference/online-stores/README.md @@ -64,4 +64,7 @@ Please see [Online Store](../../getting-started/architecture-and-components/onli {% content-ref url="remote.md" %} [remote.md](remote.md) + +{% content-ref url="singlestore.md" %} +[singlestore.md](singlestore.md) {% endcontent-ref %} diff --git a/docs/reference/online-stores/singlestore.md b/docs/reference/online-stores/singlestore.md new file mode 100644 index 0000000000..1777787f22 --- /dev/null +++ b/docs/reference/online-stores/singlestore.md @@ -0,0 +1,51 @@ +# SingleStore online store (contrib) + +## Description + +The SingleStore online store provides support for materializing feature values into a SingleStore database for serving online features. + +## Getting started +In order to use this online store, you'll need to run `pip install 'feast[singlestore]'`. You can get started by then running `feast init` and then setting the `feature_store.yaml` as described below. + +## Example + +{% code title="feature_store.yaml" %} +```yaml +project: my_feature_repo +registry: data/registry.db +provider: local +online_store: + type: singlestore + host: DB_HOST + port: DB_PORT + database: DB_NAME + user: DB_USERNAME + password: DB_PASSWORD +``` +{% endcode %} + +## Functionality Matrix + +The set of functionality supported by online stores is described in detail [here](overview.md#functionality). +Below is a matrix indicating which functionality is supported by the SingleStore online store. + +| | SingleStore | +| :-------------------------------------------------------- | :----------- | +| write feature values to the online store | yes | +| read feature values from the online store | yes | +| update infrastructure (e.g. tables) in the online store | yes | +| teardown infrastructure (e.g. tables) in the online store | yes | +| generate a plan of infrastructure changes | no | +| support for on-demand transforms | yes | +| readable by Python SDK | yes | +| readable by Java | no | +| readable by Go | no | +| support for entityless feature views | yes | +| support for concurrent writing to the same key | no | +| support for ttl (time to live) at retrieval | no | +| support for deleting expired data | no | +| collocated by feature view | yes | +| collocated by feature service | no | +| collocated by entity key | no | + +To compare this set of functionality against other online stores, please see the full [functionality matrix](overview.md#functionality-matrix). diff --git a/sdk/python/docs/source/feast.infra.online_stores.contrib.rst b/sdk/python/docs/source/feast.infra.online_stores.contrib.rst index d614438e3d..9d301fcd0d 100644 --- a/sdk/python/docs/source/feast.infra.online_stores.contrib.rst +++ b/sdk/python/docs/source/feast.infra.online_stores.contrib.rst @@ -89,6 +89,14 @@ feast.infra.online\_stores.contrib.postgres\_repo\_configuration module :undoc-members: :show-inheritance: +feast.infra.online\_stores.contrib.singlestore\_repo\_configuration module +-------------------------------------------------------------------------- + +.. automodule:: feast.infra.online_stores.contrib.singlestore_repo_configuration + :members: + :undoc-members: + :show-inheritance: + Module contents --------------- diff --git a/sdk/python/docs/source/feast.infra.registry.contrib.postgres.rst b/sdk/python/docs/source/feast.infra.registry.contrib.postgres.rst new file mode 100644 index 0000000000..3f31990805 --- /dev/null +++ b/sdk/python/docs/source/feast.infra.registry.contrib.postgres.rst @@ -0,0 +1,21 @@ +feast.infra.registry.contrib.postgres package +============================================= + +Submodules +---------- + +feast.infra.registry.contrib.postgres.postgres\_registry\_store module +---------------------------------------------------------------------- + +.. automodule:: feast.infra.registry.contrib.postgres.postgres_registry_store + :members: + :undoc-members: + :show-inheritance: + +Module contents +--------------- + +.. automodule:: feast.infra.registry.contrib.postgres + :members: + :undoc-members: + :show-inheritance: diff --git a/sdk/python/docs/source/feast.infra.registry.contrib.rst b/sdk/python/docs/source/feast.infra.registry.contrib.rst index 83417109b8..44b89736ad 100644 --- a/sdk/python/docs/source/feast.infra.registry.contrib.rst +++ b/sdk/python/docs/source/feast.infra.registry.contrib.rst @@ -8,6 +8,7 @@ Subpackages :maxdepth: 4 feast.infra.registry.contrib.azure + feast.infra.registry.contrib.postgres Module contents --------------- diff --git a/sdk/python/feast/infra/online_stores/contrib/singlestore_online_store/singlestore.py b/sdk/python/feast/infra/online_stores/contrib/singlestore_online_store/singlestore.py new file mode 100644 index 0000000000..e17a059c1a --- /dev/null +++ b/sdk/python/feast/infra/online_stores/contrib/singlestore_online_store/singlestore.py @@ -0,0 +1,235 @@ +from __future__ import absolute_import + +from collections import defaultdict +from datetime import datetime +from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple + +import pytz +import singlestoredb +from pydantic import StrictStr +from singlestoredb.connection import Connection, Cursor +from singlestoredb.exceptions import InterfaceError + +from feast import Entity, FeatureView, RepoConfig +from feast.infra.key_encoding_utils import serialize_entity_key +from feast.infra.online_stores.online_store import OnlineStore +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.repo_config import FeastConfigBaseModel + + +class SingleStoreOnlineStoreConfig(FeastConfigBaseModel): + """ + Configuration for the SingleStore online store. + NOTE: The class *must* end with the `OnlineStoreConfig` suffix. + """ + + type: Literal["singlestore"] = "singlestore" + + host: Optional[StrictStr] = None + user: Optional[StrictStr] = None + password: Optional[StrictStr] = None + database: Optional[StrictStr] = None + port: Optional[int] = None + + +class SingleStoreOnlineStore(OnlineStore): + """ + An online store implementation that uses SingleStore. + NOTE: The class *must* end with the `OnlineStore` suffix. + """ + + _conn: Optional[Connection] = None + + def _init_conn(self, config: RepoConfig) -> Connection: + online_store_config = config.online_store + assert isinstance(online_store_config, SingleStoreOnlineStoreConfig) + return singlestoredb.connect( + host=online_store_config.host or "127.0.0.1", + user=online_store_config.user or "test", + password=online_store_config.password or "test", + database=online_store_config.database or "feast", + port=online_store_config.port or 3306, + autocommit=True, + ) + + def _get_cursor(self, config: RepoConfig) -> Any: + # This will try to reconnect also. + # In case it fails, we will have to create a new connection. + if not self._conn: + self._conn = self._init_conn(config) + try: + self._conn.ping(reconnect=True) + except InterfaceError: + self._conn = self._init_conn(config) + return self._conn.cursor() + + def online_write_batch( + self, + config: RepoConfig, + table: FeatureView, + data: List[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + progress: Optional[Callable[[int], Any]], + ) -> None: + project = config.project + with self._get_cursor(config) as cur: + insert_values = [] + for entity_key, values, timestamp, created_ts in data: + entity_key_bin = serialize_entity_key( + entity_key, + entity_key_serialization_version=2, + ).hex() + timestamp = _to_naive_utc(timestamp) + if created_ts is not None: + created_ts = _to_naive_utc(created_ts) + + for feature_name, val in values.items(): + insert_values.append( + ( + entity_key_bin, + feature_name, + val.SerializeToString(), + timestamp, + created_ts, + ) + ) + # Control the batch so that we can update the progress + batch_size = 50000 + for i in range(0, len(insert_values), batch_size): + current_batch = insert_values[i : i + batch_size] + cur.executemany( + f""" + INSERT INTO {_table_id(project, table)} + (entity_key, feature_name, value, event_ts, created_ts) + values (%s, %s, %s, %s, %s) + ON DUPLICATE KEY UPDATE + value = VALUES(value), + event_ts = VALUES(event_ts), + created_ts = VALUES(created_ts); + """, + current_batch, + ) + if progress: + progress(len(current_batch)) + + def online_read( + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None, + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + project = config.project + result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] + with self._get_cursor(config) as cur: + keys = [] + for entity_key in entity_keys: + keys.append( + serialize_entity_key( + entity_key, + entity_key_serialization_version=2, + ).hex() + ) + + if not requested_features: + entity_key_placeholders = ",".join(["%s" for _ in keys]) + cur.execute( + f""" + SELECT entity_key, feature_name, value, event_ts FROM {_table_id(project, table)} + WHERE entity_key IN ({entity_key_placeholders}) + ORDER BY event_ts; + """, + tuple(keys), + ) + else: + entity_key_placeholders = ",".join(["%s" for _ in keys]) + requested_features_placeholders = ",".join( + ["%s" for _ in requested_features] + ) + cur.execute( + f""" + SELECT entity_key, feature_name, value, event_ts FROM {_table_id(project, table)} + WHERE entity_key IN ({entity_key_placeholders}) and feature_name IN ({requested_features_placeholders}) + ORDER BY event_ts; + """, + tuple(keys + requested_features), + ) + rows = cur.fetchall() or [] + + # Since we don't know the order returned from MySQL we'll need + # to construct a dict to be able to quickly look up the correct row + # when we iterate through the keys since they are in the correct order + values_dict = defaultdict(list) + for row in rows: + values_dict[row[0]].append(row[1:]) + + for key in keys: + if key in values_dict: + key_values = values_dict[key] + res = {} + res_ts: Optional[datetime] = None + for feature_name, value_bin, event_ts in key_values: + val = ValueProto() + val.ParseFromString(bytes(value_bin)) + res[feature_name] = val + res_ts = event_ts + result.append((res_ts, res)) + else: + result.append((None, None)) + return result + + def update( + self, + config: RepoConfig, + tables_to_delete: Sequence[FeatureView], + tables_to_keep: Sequence[FeatureView], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, + ) -> None: + project = config.project + with self._get_cursor(config) as cur: + # We don't create any special state for the entities in this implementation. + for table in tables_to_keep: + cur.execute( + f"""CREATE TABLE IF NOT EXISTS {_table_id(project, table)} (entity_key VARCHAR(512), + feature_name VARCHAR(256), + value BLOB, + event_ts timestamp NULL DEFAULT NULL, + created_ts timestamp NULL DEFAULT NULL, + PRIMARY KEY(entity_key, feature_name), + INDEX {_table_id(project, table)}_ek (entity_key))""" + ) + + for table in tables_to_delete: + _drop_table_and_index(cur, project, table) + + def teardown( + self, + config: RepoConfig, + tables: Sequence[FeatureView], + entities: Sequence[Entity], + ) -> None: + project = config.project + with self._get_cursor(config) as cur: + for table in tables: + _drop_table_and_index(cur, project, table) + + +def _drop_table_and_index(cur: Cursor, project: str, table: FeatureView) -> None: + table_name = _table_id(project, table) + cur.execute(f"DROP INDEX {table_name}_ek ON {table_name};") + cur.execute(f"DROP TABLE IF EXISTS {table_name}") + + +def _table_id(project: str, table: FeatureView) -> str: + return f"{project}_{table.name}" + + +def _to_naive_utc(ts: datetime) -> datetime: + if ts.tzinfo is None: + return ts + else: + return ts.astimezone(pytz.utc).replace(tzinfo=None) diff --git a/sdk/python/feast/infra/online_stores/contrib/singlestore_repo_configuration.py b/sdk/python/feast/infra/online_stores/contrib/singlestore_repo_configuration.py new file mode 100644 index 0000000000..2debe0f0ee --- /dev/null +++ b/sdk/python/feast/infra/online_stores/contrib/singlestore_repo_configuration.py @@ -0,0 +1,10 @@ +from tests.integration.feature_repos.integration_test_repo_config import ( + IntegrationTestRepoConfig, +) +from tests.integration.feature_repos.universal.online_store.singlestore import ( + SingleStoreOnlineStoreCreator, +) + +FULL_REPO_CONFIGS = [ + IntegrationTestRepoConfig(online_store_creator=SingleStoreOnlineStoreCreator), +] diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index d5b3160b56..c69469d769 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -65,6 +65,7 @@ "ikv": "feast.infra.online_stores.contrib.ikv_online_store.ikv.IKVOnlineStore", "elasticsearch": "feast.infra.online_stores.contrib.elasticsearch.ElasticSearchOnlineStore", "remote": "feast.infra.online_stores.remote.RemoteOnlineStore", + "singlestore": "feast.infra.online_stores.contrib.singlestore_online_store.singlestore.SingleStoreOnlineStore", } OFFLINE_STORE_CLASS_FOR_TYPE = { diff --git a/sdk/python/requirements/py3.10-ci-requirements.txt b/sdk/python/requirements/py3.10-ci-requirements.txt index 97bdfc159b..a0faf3d9ef 100644 --- a/sdk/python/requirements/py3.10-ci-requirements.txt +++ b/sdk/python/requirements/py3.10-ci-requirements.txt @@ -68,7 +68,9 @@ botocore==1.34.99 # moto # s3transfer build==1.2.1 - # via pip-tools + # via + # pip-tools + # singlestoredb cachecontrol==0.14.0 # via firebase-admin cachetools==5.3.3 @@ -505,6 +507,8 @@ pandas==2.2.2 # snowflake-connector-python pandocfilters==1.5.1 # via nbconvert +parsimonious==0.10.0 + # via singlestoredb parso==0.8.4 # via jedi parsy==2.1 @@ -610,6 +614,7 @@ pygments==2.18.0 pyjwt[crypto]==2.8.0 # via # msal + # singlestoredb # snowflake-connector-python pymssql==2.3.0 pymysql==1.1.1 @@ -705,6 +710,7 @@ requests==2.31.0 # msal # requests-oauthlib # responses + # singlestoredb # snowflake-connector-python # sphinx # trino @@ -745,8 +751,10 @@ setuptools==70.0.0 # grpcio-tools # kubernetes # pip-tools + # singlestoredb shellingham==1.5.4 # via typer +singlestoredb==1.3.1 six==1.16.0 # via # asttokens @@ -794,6 +802,8 @@ sqlalchemy-views==0.3.2 sqlglot==20.11.0 # via ibis-framework sqlite-vec==0.0.1a10 +sqlparams==6.0.1 + # via singlestoredb stack-data==0.6.3 # via ipython starlette==0.37.2 @@ -821,6 +831,7 @@ tomli==2.0.1 # pip-tools # pytest # pytest-env + # singlestoredb tomlkit==0.12.5 # via snowflake-connector-python toolz==0.12.1 @@ -946,7 +957,9 @@ websockets==12.0 werkzeug==3.0.3 # via moto wheel==0.43.0 - # via pip-tools + # via + # pip-tools + # singlestoredb widgetsnbextension==4.0.11 # via ipywidgets wrapt==1.16.0 diff --git a/sdk/python/requirements/py3.11-ci-requirements.txt b/sdk/python/requirements/py3.11-ci-requirements.txt index f6db0af6bc..cea8cc22d0 100644 --- a/sdk/python/requirements/py3.11-ci-requirements.txt +++ b/sdk/python/requirements/py3.11-ci-requirements.txt @@ -64,7 +64,9 @@ botocore==1.34.99 # moto # s3transfer build==1.2.1 - # via pip-tools + # via + # pip-tools + # singlestoredb cachecontrol==0.14.0 # via firebase-admin cachetools==5.3.3 @@ -496,6 +498,8 @@ pandas==2.2.2 # snowflake-connector-python pandocfilters==1.5.1 # via nbconvert +parsimonious==0.10.0 + # via singlestoredb parso==0.8.4 # via jedi parsy==2.1 @@ -601,6 +605,7 @@ pygments==2.18.0 pyjwt[crypto]==2.8.0 # via # msal + # singlestoredb # snowflake-connector-python pymssql==2.3.0 pymysql==1.1.1 @@ -696,6 +701,7 @@ requests==2.31.0 # msal # requests-oauthlib # responses + # singlestoredb # snowflake-connector-python # sphinx # trino @@ -736,8 +742,10 @@ setuptools==70.0.0 # grpcio-tools # kubernetes # pip-tools + # singlestoredb shellingham==1.5.4 # via typer +singlestoredb==1.3.1 six==1.16.0 # via # asttokens @@ -785,6 +793,8 @@ sqlalchemy-views==0.3.2 sqlglot==20.11.0 # via ibis-framework sqlite-vec==0.0.1a10 +sqlparams==6.0.1 + # via singlestoredb stack-data==0.6.3 # via ipython starlette==0.37.2 @@ -925,7 +935,9 @@ websockets==12.0 werkzeug==3.0.3 # via moto wheel==0.43.0 - # via pip-tools + # via + # pip-tools + # singlestoredb widgetsnbextension==4.0.11 # via ipywidgets wrapt==1.16.0 diff --git a/sdk/python/requirements/py3.9-ci-requirements.txt b/sdk/python/requirements/py3.9-ci-requirements.txt index 135b65a0cc..d7df488a88 100644 --- a/sdk/python/requirements/py3.9-ci-requirements.txt +++ b/sdk/python/requirements/py3.9-ci-requirements.txt @@ -68,7 +68,9 @@ botocore==1.34.99 # moto # s3transfer build==1.2.1 - # via pip-tools + # via + # pip-tools + # singlestoredb cachecontrol==0.14.0 # via firebase-admin cachetools==5.3.3 @@ -514,6 +516,8 @@ pandas==2.2.2 # snowflake-connector-python pandocfilters==1.5.1 # via nbconvert +parsimonious==0.10.0 + # via singlestoredb parso==0.8.4 # via jedi parsy==2.1 @@ -619,6 +623,7 @@ pygments==2.18.0 pyjwt[crypto]==2.8.0 # via # msal + # singlestoredb # snowflake-connector-python pymssql==2.3.0 pymysql==1.1.1 @@ -714,6 +719,7 @@ requests==2.31.0 # msal # requests-oauthlib # responses + # singlestoredb # snowflake-connector-python # sphinx # trino @@ -756,8 +762,10 @@ setuptools==70.0.0 # grpcio-tools # kubernetes # pip-tools + # singlestoredb shellingham==1.5.4 # via typer +singlestoredb==1.3.1 six==1.16.0 # via # asttokens @@ -805,6 +813,8 @@ sqlalchemy-views==0.3.2 sqlglot==20.11.0 # via ibis-framework sqlite-vec==0.0.1a10 +sqlparams==6.0.1 + # via singlestoredb stack-data==0.6.3 # via ipython starlette==0.37.2 @@ -832,6 +842,7 @@ tomli==2.0.1 # pip-tools # pytest # pytest-env + # singlestoredb tomlkit==0.12.5 # via snowflake-connector-python toolz==0.12.1 @@ -960,7 +971,9 @@ websockets==12.0 werkzeug==3.0.3 # via moto wheel==0.43.0 - # via pip-tools + # via + # pip-tools + # singlestoredb widgetsnbextension==4.0.11 # via ipywidgets wrapt==1.16.0 diff --git a/sdk/python/tests/integration/feature_repos/universal/online_store/singlestore.py b/sdk/python/tests/integration/feature_repos/universal/online_store/singlestore.py new file mode 100644 index 0000000000..d3a02421d0 --- /dev/null +++ b/sdk/python/tests/integration/feature_repos/universal/online_store/singlestore.py @@ -0,0 +1,43 @@ +import subprocess +import time +from typing import Dict + +from testcontainers.core.container import DockerContainer + +from tests.integration.feature_repos.universal.online_store_creator import ( + OnlineStoreCreator, +) + + +class SingleStoreOnlineStoreCreator(OnlineStoreCreator): + def __init__(self, project_name: str, **kwargs): + super().__init__(project_name) + self.container = ( + DockerContainer("ghcr.io/singlestore-labs/singlestoredb-dev:latest") + .with_exposed_ports(3306) + .with_env("USER", "root") + .with_env("ROOT_PASSWORD", "test") + # this license key is authorized solely for use in SingleStore Feast tests and is subject to strict usage restrictions + # if you want a free SingleStore license for your own use please visit https://www.singlestore.com/cloud-trial/ + .with_env( + "LICENSE_KEY", + "BGIxODZiYTg1YWUxYjRlODRhYzRjMGFmYTA1OTkxYzgyAAAAAAAAAAABAAAAAAAAACgwNQIZANx4NIXJ7CWvKYYb3wIyRXxBY7fdAnLeSwIYLy2Q0jA124GAkl04yuGrD59Zpv85DVYXAA==", + ) + ) + + def create_online_store(self) -> Dict[str, str]: + self.container.start() + time.sleep(30) + exposed_port = self.container.get_exposed_port("3306") + command = f"mysql -uroot -ptest -P {exposed_port} -e 'CREATE DATABASE feast;'" + subprocess.run(command, shell=True, check=True) + return { + "type": "singlestore", + "user": "root", + "password": "test", + "database": "feast", + "port": exposed_port, + } + + def teardown(self): + self.container.stop() diff --git a/setup.py b/setup.py index 9b3d0e55e6..2160ff0e91 100644 --- a/setup.py +++ b/setup.py @@ -155,6 +155,8 @@ ELASTICSEARCH_REQUIRED = ["elasticsearch>=8.13.0"] +SINGLESTORE_REQUIRED = ["singlestoredb"] + CI_REQUIRED = ( [ "build", @@ -218,6 +220,7 @@ + DELTA_REQUIRED + ELASTICSEARCH_REQUIRED + SQLITE_VEC_REQUIRED + + SINGLESTORE_REQUIRED ) DOCS_REQUIRED = CI_REQUIRED @@ -386,6 +389,7 @@ def run(self): "delta": DELTA_REQUIRED, "elasticsearch": ELASTICSEARCH_REQUIRED, "sqlite_vec": SQLITE_VEC_REQUIRED, + "singlestore": SINGLESTORE_REQUIRED, }, include_package_data=True, license="Apache",