From 9f03d32ced46b6136ad109fbae94a321bd756098 Mon Sep 17 00:00:00 2001 From: Oleg Avdeev Date: Mon, 1 Mar 2021 10:38:54 -0800 Subject: [PATCH 01/13] Python-centric feast deploy CLI Signed-off-by: Oleg Avdeev --- sdk/python/feast/cli.py | 36 +++++++++++ sdk/python/feast/infra/__init__.py | 1 + sdk/python/feast/infra/firestore.py | 65 +++++++++++++++++++ sdk/python/feast/infra/provider.py | 44 +++++++++++++ sdk/python/feast/repo_config.py | 27 ++++++++ sdk/python/feast/repo_operations.py | 98 +++++++++++++++++++++++++++++ sdk/python/setup.py | 1 + 7 files changed, 272 insertions(+) create mode 100644 sdk/python/feast/infra/__init__.py create mode 100644 sdk/python/feast/infra/firestore.py create mode 100644 sdk/python/feast/infra/provider.py create mode 100644 sdk/python/feast/repo_config.py create mode 100644 sdk/python/feast/repo_operations.py diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index 8b7b5c5f11..c12a0e91f1 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -15,6 +15,7 @@ import json import logging import sys +from pathlib import Path from typing import Dict import click @@ -26,6 +27,8 @@ from feast.entity import Entity from feast.feature_table import FeatureTable from feast.loaders.yaml import yaml_loader +from feast.repo_config import load_repo_config +from feast.repo_operations import apply_total, registry_dump, teardown _logger = logging.getLogger(__name__) @@ -353,5 +356,38 @@ def project_list(): print(tabulate(table, headers=["NAME"], tablefmt="plain")) +@cli.command("apply") +@click.argument("repo_path", type=click.Path(dir_okay=True, exists=True)) +def apply_total_command(repo_path: str): + """ + Applies a feature repo + """ + repo_config = load_repo_config(Path(repo_path)) + + apply_total(repo_config, Path(repo_path).resolve()) + + +@cli.command("teardown") +@click.argument("repo_path", type=click.Path(dir_okay=True, exists=True)) +def teardown_command(repo_path: str): + """ + Tear down infra for a a feature repo + """ + repo_config = load_repo_config(Path(repo_path)) + + teardown(repo_config, Path(repo_path).resolve()) + + +@cli.command("registry-dump") +@click.argument("repo_path", type=click.Path(dir_okay=True, exists=True)) +def registry_dump_command(repo_path: str): + """ + Prints contents of the metadata registry + """ + repo_config = load_repo_config(Path(repo_path)) + + registry_dump(repo_config) + + if __name__ == "__main__": cli() diff --git a/sdk/python/feast/infra/__init__.py b/sdk/python/feast/infra/__init__.py new file mode 100644 index 0000000000..c49186db43 --- /dev/null +++ b/sdk/python/feast/infra/__init__.py @@ -0,0 +1 @@ +# from .provider import Provider diff --git a/sdk/python/feast/infra/firestore.py b/sdk/python/feast/infra/firestore.py new file mode 100644 index 0000000000..cfb106bfcd --- /dev/null +++ b/sdk/python/feast/infra/firestore.py @@ -0,0 +1,65 @@ +from typing import List + +from feast import FeatureTable +from feast.infra.provider import Provider + + +def _table_id(project: str, table: FeatureTable) -> str: + return f"{project}_{table.name}" + + +def _delete_collection(coll_ref, batch_size=1000) -> None: + """ + Delete Firebase collection. There is no way to delete the entire collections, so we have to + delete documents in the collection one by one. + """ + while True: + docs = coll_ref.limit(batch_size).stream() + deleted = 0 + + for doc in docs: + doc.reference.delete() + deleted = deleted + 1 + + if deleted < batch_size: + return + + +class Firestore(Provider): + def update_infra( + self, + project: str, + tables_to_delete: List[FeatureTable], + tables_to_keep: List[FeatureTable], + ): + import firebase_admin + from firebase_admin import firestore + + firebase_admin.initialize_app() + db = firestore.client() + + table_id = lambda t: _table_id(project, table=t) + + for table in tables_to_keep: + db.collection(project).document(table_id(table)).set( + {"created_at": firestore.SERVER_TIMESTAMP} + ) + + for table in tables_to_delete: + _delete_collection( + db.collection(project).document(table_id(table)).collection("values") + ) + db.collection(project).document(table_id(table)).delete() + + def teardown_infra(self, project: str, tables: List[FeatureTable]) -> None: + import firebase_admin + from firebase_admin import firestore + + firebase_admin.initialize_app() + db = firestore.client() + table_id = lambda t: _table_id(project, table=t) + for table in tables: + _delete_collection( + db.collection(project).document(table_id(table)).collection("values") + ) + db.collection(project).document(table_id(table)).delete() diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py new file mode 100644 index 0000000000..103d38df0b --- /dev/null +++ b/sdk/python/feast/infra/provider.py @@ -0,0 +1,44 @@ +import abc +from typing import List + +from feast import FeatureTable +from feast.repo_config import FirestoreConfig, OnlineStoreConfig + + +class Provider(abc.ABC): + @abc.abstractmethod + def update_infra( + self, + project: str, + tables_to_delete: List[FeatureTable], + tables_to_keep: List[FeatureTable], + ): + """ + Reconcile cloud resources with the objects declared in the feature repo. + + Args: + tables_to_delete: Tables that were deleted from the feature repo, so provider needs to + clean up the corresponding cloud resources. + tables_to_keep: Tables that are still in the feature repo. Depending on implementation, + provider may or may not need to update the corresponding resources. + """ + ... + + @abc.abstractmethod + def teardown_infra(self, project: str, tables: List[FeatureTable]): + """ + Tear down all cloud resources for a repo. + + Args: + tables: Tables that are declared in the feature repo. + """ + ... + + +def get_provider(config: OnlineStoreConfig) -> Provider: + if config.type == "firestore": + from feast.infra.firestore import Firestore + + return Firestore() + else: + raise ValueError(config) diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py new file mode 100644 index 0000000000..22b0646913 --- /dev/null +++ b/sdk/python/feast/repo_config.py @@ -0,0 +1,27 @@ +from pathlib import Path +from typing import NamedTuple, Optional + +import yaml +from bindr import bind + + +class FirestoreConfig(NamedTuple): + dummy: Optional[str] = None + + +class OnlineStoreConfig(NamedTuple): + type: str + firestore: Optional[FirestoreConfig] = None + + +class RepoConfig(NamedTuple): + metadata_store: str + project: str + provider: str + online_store: OnlineStoreConfig + + +def load_repo_config(repo_path: Path) -> RepoConfig: + with open(repo_path / "feature_store.yaml") as f: + raw_config = yaml.safe_load(f) + return bind(RepoConfig, raw_config) diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py new file mode 100644 index 0000000000..b91f5d8bf2 --- /dev/null +++ b/sdk/python/feast/repo_operations.py @@ -0,0 +1,98 @@ +import importlib +import os +import sys +from pathlib import Path +from typing import List, NamedTuple + +from feast import Entity, FeatureTable +from feast.infra.provider import get_provider +from feast.registry import Registry +from feast.repo_config import RepoConfig + + +def py_path_to_module(path: Path, repo_root: Path) -> str: + return ( + str(path.relative_to(repo_root))[: -len(".py")] + .replace("./", "") + .replace("/", ".") + ) + + +class ParsedRepo(NamedTuple): + feature_tables: List[FeatureTable] + entities: List[Entity] + + +def parse_repo(repo_root: Path) -> ParsedRepo: + """ Collect feature table definitions from feature repo """ + res = ParsedRepo(feature_tables=[], entities=[]) + + # FIXME: process subdirs but exclude hidden ones + repo_files = [p.resolve() for p in repo_root.glob("*.py")] + + for repo_file in repo_files: + + module_path = py_path_to_module(repo_file, repo_root) + + print(f"Processing {repo_file} as {module_path}") + module = importlib.import_module(module_path) + + for attr_name in dir(module): + obj = getattr(module, attr_name) + if isinstance(obj, FeatureTable): + res.feature_tables.append(obj) + elif isinstance(obj, Entity): + res.entities.append(obj) + return res + + +def apply_total(repo_config: RepoConfig, repo_path: Path): + os.chdir(repo_path) + sys.path.append("") + + project = repo_config.project + registry = Registry(repo_config.metadata_store) + repo = parse_repo(repo_path) + + for entity in repo.entities: + registry.apply_entity(entity, project=project) + + repo_table_names = set(t.name for t in repo.feature_tables) + tables_to_delete = [] + for registry_table in registry.list_feature_tables(project=project): + if registry_table.name not in repo_table_names: + tables_to_delete.append(registry_table) + + # Delete tables that should not exist + for registry_table in tables_to_delete: + registry.delete_feature_table(registry_table.name, project=project) + + for table in repo.feature_tables: + registry.apply_feature_table(table, project) + + infra_provider = get_provider(repo_config.online_store) + infra_provider.update_infra( + project, tables_to_delete=tables_to_delete, tables_to_keep=repo.feature_tables + ) + + print("Done!") + + +def teardown(repo_config: RepoConfig, repo_path: Path): + registry = Registry(repo_config.metadata_store) + project = repo_config.project + registry_tables = registry.list_feature_tables(project=project) + infra_provider = get_provider(repo_config.online_store) + infra_provider.teardown_infra(project, tables=registry_tables) + + +def registry_dump(repo_config: RepoConfig): + """ For debugging only: output contents of the metadata registry """ + + project = repo_config.project + registry = Registry(repo_config.metadata_store) + + for entity in registry.list_entities(project=project): + print(entity) + for table in registry.list_feature_tables(project=project): + print(table) diff --git a/sdk/python/setup.py b/sdk/python/setup.py index c8ceb5d479..6f23d68f4a 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -45,6 +45,7 @@ "numpy<1.20.0", "google", "kubernetes==12.0.*", + "bindr", ] # README file from Feast repo root directory From 290a57bf59646bfdbf83b832349f2f3c8b9d00c5 Mon Sep 17 00:00:00 2001 From: Oleg Avdeev Date: Wed, 3 Mar 2021 09:05:56 -0800 Subject: [PATCH 02/13] sqlite provider Signed-off-by: Oleg Avdeev --- sdk/python/feast/infra/local_sqlite.py | 34 ++++++++++++++++++++++++++ sdk/python/feast/infra/provider.py | 4 +++ sdk/python/feast/repo_config.py | 5 ++++ sdk/python/tests/cli/test_cli_local.py | 3 +++ 4 files changed, 46 insertions(+) create mode 100644 sdk/python/feast/infra/local_sqlite.py create mode 100644 sdk/python/tests/cli/test_cli_local.py diff --git a/sdk/python/feast/infra/local_sqlite.py b/sdk/python/feast/infra/local_sqlite.py new file mode 100644 index 0000000000..34dd1921d0 --- /dev/null +++ b/sdk/python/feast/infra/local_sqlite.py @@ -0,0 +1,34 @@ +import os +import sqlite3 +from typing import List + +from feast import FeatureTable +from feast.infra.provider import Provider +from feast.repo_config import LocalConfig + + +def _table_id(project: str, table: FeatureTable) -> str: + return f"{project}_{table.name}" + + +class Sqlite(Provider): + def __init__(self, config: LocalConfig): + self._db_path = config.path + + def update_infra( + self, + project: str, + tables_to_delete: List[FeatureTable], + tables_to_keep: List[FeatureTable], + ): + conn = sqlite3.connect(self._db_path) + for table in tables_to_keep: + conn.execute( + f"CREATE TABLE IF NOT EXISTS {_table_id(project, table)} (key BLOB, value BLOB)" + ) + + for table in tables_to_delete: + conn.execute(f"DROP TABLE IF EXISTS {_table_id(project, table)}") + + def teardown_infra(self, project: str, tables: List[FeatureTable]) -> None: + os.unlink(self._db_path) diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index 103d38df0b..9c176702fb 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -40,5 +40,9 @@ def get_provider(config: OnlineStoreConfig) -> Provider: from feast.infra.firestore import Firestore return Firestore() + elif config.type == "local": + from feast.infra.local_sqlite import Sqlite + + return Sqlite(config.local) else: raise ValueError(config) diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 22b0646913..eb911c2451 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -9,9 +9,14 @@ class FirestoreConfig(NamedTuple): dummy: Optional[str] = None +class LocalConfig(NamedTuple): + path: Optional[str] = None + + class OnlineStoreConfig(NamedTuple): type: str firestore: Optional[FirestoreConfig] = None + local: Optional[LocalConfig] = None class RepoConfig(NamedTuple): diff --git a/sdk/python/tests/cli/test_cli_local.py b/sdk/python/tests/cli/test_cli_local.py new file mode 100644 index 0000000000..2abb87b23c --- /dev/null +++ b/sdk/python/tests/cli/test_cli_local.py @@ -0,0 +1,3 @@ +class TestCliLocal: + def test_hello(self): + pass From 88433d88ff717fc50fdeedf2edf9c34bd995e060 Mon Sep 17 00:00:00 2001 From: Oleg Avdeev Date: Thu, 4 Mar 2021 07:42:24 -0800 Subject: [PATCH 03/13] add a proper test for local Signed-off-by: Oleg Avdeev --- sdk/python/feast/infra/firestore.py | 65 -------------------------- sdk/python/feast/infra/local_sqlite.py | 8 ++-- sdk/python/feast/infra/provider.py | 17 +++---- sdk/python/feast/repo_config.py | 12 ++--- sdk/python/feast/repo_operations.py | 4 +- sdk/python/tests/cli/test_cli_local.py | 55 +++++++++++++++++++++- 6 files changed, 72 insertions(+), 89 deletions(-) delete mode 100644 sdk/python/feast/infra/firestore.py diff --git a/sdk/python/feast/infra/firestore.py b/sdk/python/feast/infra/firestore.py deleted file mode 100644 index cfb106bfcd..0000000000 --- a/sdk/python/feast/infra/firestore.py +++ /dev/null @@ -1,65 +0,0 @@ -from typing import List - -from feast import FeatureTable -from feast.infra.provider import Provider - - -def _table_id(project: str, table: FeatureTable) -> str: - return f"{project}_{table.name}" - - -def _delete_collection(coll_ref, batch_size=1000) -> None: - """ - Delete Firebase collection. There is no way to delete the entire collections, so we have to - delete documents in the collection one by one. - """ - while True: - docs = coll_ref.limit(batch_size).stream() - deleted = 0 - - for doc in docs: - doc.reference.delete() - deleted = deleted + 1 - - if deleted < batch_size: - return - - -class Firestore(Provider): - def update_infra( - self, - project: str, - tables_to_delete: List[FeatureTable], - tables_to_keep: List[FeatureTable], - ): - import firebase_admin - from firebase_admin import firestore - - firebase_admin.initialize_app() - db = firestore.client() - - table_id = lambda t: _table_id(project, table=t) - - for table in tables_to_keep: - db.collection(project).document(table_id(table)).set( - {"created_at": firestore.SERVER_TIMESTAMP} - ) - - for table in tables_to_delete: - _delete_collection( - db.collection(project).document(table_id(table)).collection("values") - ) - db.collection(project).document(table_id(table)).delete() - - def teardown_infra(self, project: str, tables: List[FeatureTable]) -> None: - import firebase_admin - from firebase_admin import firestore - - firebase_admin.initialize_app() - db = firestore.client() - table_id = lambda t: _table_id(project, table=t) - for table in tables: - _delete_collection( - db.collection(project).document(table_id(table)).collection("values") - ) - db.collection(project).document(table_id(table)).delete() diff --git a/sdk/python/feast/infra/local_sqlite.py b/sdk/python/feast/infra/local_sqlite.py index 34dd1921d0..fbd43a6dfb 100644 --- a/sdk/python/feast/infra/local_sqlite.py +++ b/sdk/python/feast/infra/local_sqlite.py @@ -4,15 +4,17 @@ from feast import FeatureTable from feast.infra.provider import Provider -from feast.repo_config import LocalConfig +from feast.repo_config import LocalOnlineStoreConfig def _table_id(project: str, table: FeatureTable) -> str: return f"{project}_{table.name}" -class Sqlite(Provider): - def __init__(self, config: LocalConfig): +class LocalSqlite(Provider): + _db_path: str + + def __init__(self, config: LocalOnlineStoreConfig): self._db_path = config.path def update_infra( diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index 9c176702fb..a9636d4af0 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -2,7 +2,7 @@ from typing import List from feast import FeatureTable -from feast.repo_config import FirestoreConfig, OnlineStoreConfig +from feast.repo_config import RepoConfig class Provider(abc.ABC): @@ -35,14 +35,15 @@ def teardown_infra(self, project: str, tables: List[FeatureTable]): ... -def get_provider(config: OnlineStoreConfig) -> Provider: - if config.type == "firestore": - from feast.infra.firestore import Firestore +def get_provider(config: RepoConfig) -> Provider: + if config.provider == "gcp": + from feast.infra.gcp import Gcp - return Firestore() - elif config.type == "local": - from feast.infra.local_sqlite import Sqlite + return Gcp() + elif config.provider == "local": + from feast.infra.local_sqlite import LocalSqlite - return Sqlite(config.local) + assert config.online_store.local is not None + return LocalSqlite(config.online_store.local) else: raise ValueError(config) diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index eb911c2451..500fd00398 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -5,18 +5,12 @@ from bindr import bind -class FirestoreConfig(NamedTuple): - dummy: Optional[str] = None - - -class LocalConfig(NamedTuple): - path: Optional[str] = None +class LocalOnlineStoreConfig(NamedTuple): + path: str class OnlineStoreConfig(NamedTuple): - type: str - firestore: Optional[FirestoreConfig] = None - local: Optional[LocalConfig] = None + local: Optional[LocalOnlineStoreConfig] = None class RepoConfig(NamedTuple): diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index b91f5d8bf2..706458e6c6 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -70,7 +70,7 @@ def apply_total(repo_config: RepoConfig, repo_path: Path): for table in repo.feature_tables: registry.apply_feature_table(table, project) - infra_provider = get_provider(repo_config.online_store) + infra_provider = get_provider(repo_config) infra_provider.update_infra( project, tables_to_delete=tables_to_delete, tables_to_keep=repo.feature_tables ) @@ -82,7 +82,7 @@ def teardown(repo_config: RepoConfig, repo_path: Path): registry = Registry(repo_config.metadata_store) project = repo_config.project registry_tables = registry.list_feature_tables(project=project) - infra_provider = get_provider(repo_config.online_store) + infra_provider = get_provider(repo_config) infra_provider.teardown_infra(project, tables=registry_tables) diff --git a/sdk/python/tests/cli/test_cli_local.py b/sdk/python/tests/cli/test_cli_local.py index 2abb87b23c..efbea0627a 100644 --- a/sdk/python/tests/cli/test_cli_local.py +++ b/sdk/python/tests/cli/test_cli_local.py @@ -1,3 +1,54 @@ +import subprocess +import sys +import tempfile +from pathlib import Path +from textwrap import dedent +from typing import List + +from feast import cli + + +class CliRunner: + """ + NB. We can't use test runner helper from click here, since it doesn't start a new Python + interpreter. And we need a new interpreter for each test since we dynamically import + modules from the feature repo, and it is hard to clean up that state otherwise. + """ + + def run(self, args: List[str], cwd: Path) -> subprocess.CompletedProcess: + return subprocess.run([sys.executable, cli.__file__] + args, cwd=cwd) + + class TestCliLocal: - def test_hello(self): - pass + def test_hello(self) -> None: + runner = CliRunner() + with tempfile.TemporaryDirectory() as repo_dir_name, tempfile.TemporaryDirectory() as data_dir_name: + + repo_path = Path(repo_dir_name) + data_path = Path(data_dir_name) + + repo_config = repo_path / "feature_store.yaml" + + repo_config.write_text( + dedent( + f""" + project: foo + metadata_store: {data_path / "metadata.db"} + provider: gcp + online_store: + local: + path: {data_path / "online_store.db"} + """ + ) + ) + + repo_example = repo_path / "example.py" + repo_example.write_text( + (Path(__file__).parent / "example_feature_repo_1.py").read_text() + ) + + result = runner.run(["apply", str(repo_path)], cwd=repo_path) + assert result.returncode == 0 + + result = runner.run(["teardown", str(repo_path)], cwd=repo_path) + assert result.returncode == 0 From bfd29022777bbf93ce6d223b0fa2b1bf20bee157 Mon Sep 17 00:00:00 2001 From: Oleg Avdeev Date: Fri, 5 Mar 2021 20:49:33 -0800 Subject: [PATCH 04/13] gcp test Signed-off-by: Oleg Avdeev --- sdk/python/feast/infra/provider.py | 2 +- sdk/python/feast/repo_config.py | 3 +++ sdk/python/tests/cli/test_cli_local.py | 5 +++-- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index a9636d4af0..925c33180a 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -39,7 +39,7 @@ def get_provider(config: RepoConfig) -> Provider: if config.provider == "gcp": from feast.infra.gcp import Gcp - return Gcp() + return Gcp(config.online_store.firestore) elif config.provider == "local": from feast.infra.local_sqlite import LocalSqlite diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 500fd00398..fbe1898d16 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -8,8 +8,11 @@ class LocalOnlineStoreConfig(NamedTuple): path: str +class FirestoreOnlineStoreConfig(NamedTuple): + pass class OnlineStoreConfig(NamedTuple): + firestore: Optional[FirestoreOnlineStoreConfig] = None local: Optional[LocalOnlineStoreConfig] = None diff --git a/sdk/python/tests/cli/test_cli_local.py b/sdk/python/tests/cli/test_cli_local.py index efbea0627a..93aaa4fb22 100644 --- a/sdk/python/tests/cli/test_cli_local.py +++ b/sdk/python/tests/cli/test_cli_local.py @@ -20,7 +20,8 @@ def run(self, args: List[str], cwd: Path) -> subprocess.CompletedProcess: class TestCliLocal: - def test_hello(self) -> None: + + def test_basic(self) -> None: runner = CliRunner() with tempfile.TemporaryDirectory() as repo_dir_name, tempfile.TemporaryDirectory() as data_dir_name: @@ -34,7 +35,7 @@ def test_hello(self) -> None: f""" project: foo metadata_store: {data_path / "metadata.db"} - provider: gcp + provider: local online_store: local: path: {data_path / "online_store.db"} From 9d9c3da073c52bced2e32d153b060bd87f0fd9ec Mon Sep 17 00:00:00 2001 From: Oleg Avdeev Date: Mon, 8 Mar 2021 18:20:59 -0800 Subject: [PATCH 05/13] add missing files and mark integration test Signed-off-by: Oleg Avdeev --- sdk/python/feast/repo_config.py | 2 + .../tests/cli/example_feature_repo_1.py | 28 +++++++++ sdk/python/tests/cli/test_cli_local.py | 1 - sdk/python/tests/cli/test_firebase.py | 63 +++++++++++++++++++ 4 files changed, 93 insertions(+), 1 deletion(-) create mode 100644 sdk/python/tests/cli/example_feature_repo_1.py create mode 100644 sdk/python/tests/cli/test_firebase.py diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index fbe1898d16..349740693e 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -8,9 +8,11 @@ class LocalOnlineStoreConfig(NamedTuple): path: str + class FirestoreOnlineStoreConfig(NamedTuple): pass + class OnlineStoreConfig(NamedTuple): firestore: Optional[FirestoreOnlineStoreConfig] = None local: Optional[LocalOnlineStoreConfig] = None diff --git a/sdk/python/tests/cli/example_feature_repo_1.py b/sdk/python/tests/cli/example_feature_repo_1.py new file mode 100644 index 0000000000..4a32700f5e --- /dev/null +++ b/sdk/python/tests/cli/example_feature_repo_1.py @@ -0,0 +1,28 @@ +from google.protobuf.duration_pb2 import Duration + +from feast import BigQuerySource, Entity, Feature, FeatureTable, ValueType + +driver_locations_source = BigQuerySource( + table_ref="rh_prod.ride_hailing_co.drivers", + event_timestamp_column="event_timestamp", + created_timestamp_column="created_timestamp", +) + + +driver = Entity( + name="driver", # The name is derived from this argument, not object name. + value_type=ValueType.INT64, + description="driver id", +) + + +driver_locations = FeatureTable( + name="driver_locations", + entities=["driver"], + max_age=Duration(seconds=86400 * 1), + features=[ + Feature(name="lat", dtype=ValueType.FLOAT), + Feature(name="lon", dtype=ValueType.STRING), + ], + batch_source=driver_locations_source, +) diff --git a/sdk/python/tests/cli/test_cli_local.py b/sdk/python/tests/cli/test_cli_local.py index 93aaa4fb22..cb3628b953 100644 --- a/sdk/python/tests/cli/test_cli_local.py +++ b/sdk/python/tests/cli/test_cli_local.py @@ -20,7 +20,6 @@ def run(self, args: List[str], cwd: Path) -> subprocess.CompletedProcess: class TestCliLocal: - def test_basic(self) -> None: runner = CliRunner() with tempfile.TemporaryDirectory() as repo_dir_name, tempfile.TemporaryDirectory() as data_dir_name: diff --git a/sdk/python/tests/cli/test_firebase.py b/sdk/python/tests/cli/test_firebase.py new file mode 100644 index 0000000000..f6a8c5c1e6 --- /dev/null +++ b/sdk/python/tests/cli/test_firebase.py @@ -0,0 +1,63 @@ +import random +import string +import subprocess +import sys +import tempfile +from pathlib import Path +from textwrap import dedent +from typing import List + +import pytest + +from feast import cli + + +class CliRunner: + """ + NB. We can't use test runner helper from click here, since it doesn't start a new Python + interpreter. And we need a new interpreter for each test since we dynamically import + modules from the feature repo, and it is hard to clean up that state otherwise. + """ + + def run(self, args: List[str], cwd: Path) -> subprocess.CompletedProcess: + return subprocess.run([sys.executable, cli.__file__] + args, cwd=cwd) + + +@pytest.mark.integration +class TestCliGcp: + def setup_method(self): + self._project_id = "".join( + random.choice(string.ascii_lowercase + string.digits) for _ in range(10) + ) + + def test_basic(self) -> None: + runner = CliRunner() + with tempfile.TemporaryDirectory() as repo_dir_name, tempfile.TemporaryDirectory() as data_dir_name: + + repo_path = Path(repo_dir_name) + data_path = Path(data_dir_name) + + repo_config = repo_path / "feature_store.yaml" + + repo_config.write_text( + dedent( + f""" + project: {self._project_id} + metadata_store: {data_path / "metadata.db"} + provider: gcp + online_store: + firestore: + """ + ) + ) + + repo_example = repo_path / "example.py" + repo_example.write_text( + (Path(__file__).parent / "example_feature_repo_1.py").read_text() + ) + + result = runner.run(["apply", str(repo_path)], cwd=repo_path) + assert result.returncode == 0 + + result = runner.run(["teardown", str(repo_path)], cwd=repo_path) + assert result.returncode == 0 From e14329e7fbe9aa93431f95599bb171f1baef4a93 Mon Sep 17 00:00:00 2001 From: Oleg Avdeev Date: Mon, 8 Mar 2021 18:28:58 -0800 Subject: [PATCH 06/13] reconcile configs Signed-off-by: Oleg Avdeev --- sdk/python/feast/feature_store.py | 8 ++-- sdk/python/feast/feature_store_config.py | 53 ------------------------ 2 files changed, 4 insertions(+), 57 deletions(-) delete mode 100644 sdk/python/feast/feature_store_config.py diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index aea899e1ab..0f76c6eeef 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -13,7 +13,7 @@ # limitations under the License. from typing import Optional -from feast.feature_store_config import Config +from feast.repo_config import RepoConfig, load_repo_config class FeatureStore: @@ -22,13 +22,13 @@ class FeatureStore: """ def __init__( - self, config_path: Optional[str], config: Optional[Config], + self, config_path: Optional[str], config: Optional[RepoConfig], ): if config_path is None or config is None: raise Exception("You cannot specify both config_path and config") if config is not None: self.config = config elif config_path is not None: - self.config = Config.from_path(config_path) + self.config = load_repo_config(config_path) else: - self.config = Config() + self.config = RepoConfig() diff --git a/sdk/python/feast/feature_store_config.py b/sdk/python/feast/feature_store_config.py deleted file mode 100644 index b26d1e0bd5..0000000000 --- a/sdk/python/feast/feature_store_config.py +++ /dev/null @@ -1,53 +0,0 @@ -# Copyright 2019 The Feast Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from typing import Optional - -import yaml - - -class Config: - """ - Configuration object that contains all possible configuration options for a FeatureStore. - """ - - def __init__( - self, - provider: Optional[str], - online_store: Optional[str], - metadata_store: Optional[str], - ): - self.provider = provider if (provider is not None) else "local" - self.online_store = online_store if (online_store is not None) else "local" - self.metadata_store = ( - metadata_store if (metadata_store is not None) else "./metadata_store" - ) - - @classmethod - def from_path(cls, config_path: str): - """ - Construct the configuration object from a filepath containing a yaml file. - - Example yaml file: - - provider: gcp - online_store: firestore - metadata_store: gs://my_bucket/metadata_store - """ - with open(config_path, "r") as f: - config_dict = yaml.safe_load(f) - return cls( - provider=config_dict.get("provider"), - online_store=config_dict.get("online_store"), - metadata_store=config_dict.get("metadata_store"), - ) From 3b198f87b38708e197e7f700af21d469c245179f Mon Sep 17 00:00:00 2001 From: Oleg Avdeev Date: Mon, 8 Mar 2021 18:30:43 -0800 Subject: [PATCH 07/13] add missing file Signed-off-by: Oleg Avdeev --- sdk/python/feast/infra/gcp.py | 78 +++++++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) create mode 100644 sdk/python/feast/infra/gcp.py diff --git a/sdk/python/feast/infra/gcp.py b/sdk/python/feast/infra/gcp.py new file mode 100644 index 0000000000..1a47ba2556 --- /dev/null +++ b/sdk/python/feast/infra/gcp.py @@ -0,0 +1,78 @@ +from typing import List, Optional + +from feast import FeatureTable +from feast.infra.provider import Provider +from feast.repo_config import FirestoreOnlineStoreConfig + + +def _table_id(project: str, table: FeatureTable) -> str: + return f"{project}_{table.name}" + + +def _delete_collection(coll_ref, batch_size=1000) -> None: + """ + Delete Firebase collection. There is no way to delete the entire collections, so we have to + delete documents in the collection one by one. + """ + while True: + docs = coll_ref.limit(batch_size).stream() + deleted = 0 + + for doc in docs: + doc.reference.delete() + deleted = deleted + 1 + + if deleted < batch_size: + return + + +class Gcp(Provider): + def __init__(self, config: Optional[FirestoreOnlineStoreConfig]): + pass + + def _initialize_app(self): + import firebase_admin + + return firebase_admin.initialize_app() + + def update_infra( + self, + project: str, + tables_to_delete: List[FeatureTable], + tables_to_keep: List[FeatureTable], + ): + from firebase_admin import firestore + + def table_id(t): + return _table_id(project, table=t) + + app = self._initialize_app() + + db = firestore.client(app=app) + + for table in tables_to_keep: + db.collection(project).document(table_id(table)).set( + {"created_at": firestore.SERVER_TIMESTAMP} + ) + + for table in tables_to_delete: + _delete_collection( + db.collection(project).document(table_id(table)).collection("values") + ) + db.collection(project).document(table_id(table)).delete() + + def teardown_infra(self, project: str, tables: List[FeatureTable]) -> None: + from firebase_admin import firestore + + def table_id(t): + return _table_id(project, table=t) + + app = self._initialize_app() + + db = firestore.client(app=app) + + for table in tables: + _delete_collection( + db.collection(project).document(table_id(table)).collection("values") + ) + db.collection(project).document(table_id(table)).delete() From c867b5077374f7983cf8a65038b0edaa0dff2998 Mon Sep 17 00:00:00 2001 From: Oleg Avdeev Date: Tue, 9 Mar 2021 09:48:00 -0800 Subject: [PATCH 08/13] add missing dep Signed-off-by: Oleg Avdeev --- sdk/python/requirements-ci.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/python/requirements-ci.txt b/sdk/python/requirements-ci.txt index 24f1d7dd70..99238ef83e 100644 --- a/sdk/python/requirements-ci.txt +++ b/sdk/python/requirements-ci.txt @@ -22,3 +22,4 @@ pytest-mock==1.10.4 PyYAML==5.3.1 great-expectations==0.13.2 adlfs==0.5.9 +firebase-admin==4.5.2 From 404543b5869b8f325f44459ac010c9fe02dfcc93 Mon Sep 17 00:00:00 2001 From: Oleg Avdeev Date: Tue, 9 Mar 2021 10:19:23 -0800 Subject: [PATCH 09/13] add missing dep Signed-off-by: Oleg Avdeev --- sdk/python/requirements-ci.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/python/requirements-ci.txt b/sdk/python/requirements-ci.txt index 99238ef83e..4c058871b9 100644 --- a/sdk/python/requirements-ci.txt +++ b/sdk/python/requirements-ci.txt @@ -23,3 +23,4 @@ PyYAML==5.3.1 great-expectations==0.13.2 adlfs==0.5.9 firebase-admin==4.5.2 +google-cloud-firestore==2.0.2 From cfbc72ccc5bcb1a362d8e924f3b4e67e473a3a99 Mon Sep 17 00:00:00 2001 From: Oleg Avdeev Date: Tue, 9 Mar 2021 10:32:46 -0800 Subject: [PATCH 10/13] fix deps again Signed-off-by: Oleg Avdeev --- sdk/python/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/setup.py b/sdk/python/setup.py index 6f23d68f4a..7660a7bcdd 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -29,7 +29,7 @@ "google-api-core==1.22.4", "google-cloud-bigquery==1.18.*", "google-cloud-storage==1.20.*", - "google-cloud-core==1.0.*", + "google-cloud-core==1.4.*", "googleapis-common-protos==1.52.*", "google-cloud-bigquery-storage==0.7.*", "grpcio==1.31.0", From ffe1879d3a1fa2e614928e6b9a6d929800dcb990 Mon Sep 17 00:00:00 2001 From: Oleg Avdeev Date: Tue, 9 Mar 2021 16:32:05 -0800 Subject: [PATCH 11/13] use datastore not firestore Signed-off-by: Oleg Avdeev --- sdk/python/feast/infra/gcp.py | 80 ++++++++++++++------------------- sdk/python/feast/repo_config.py | 6 +-- sdk/python/requirements-ci.txt | 2 +- 3 files changed, 38 insertions(+), 50 deletions(-) diff --git a/sdk/python/feast/infra/gcp.py b/sdk/python/feast/infra/gcp.py index 1a47ba2556..1614de00a5 100644 --- a/sdk/python/feast/infra/gcp.py +++ b/sdk/python/feast/infra/gcp.py @@ -1,39 +1,34 @@ -from typing import List, Optional +from datetime import datetime +from typing import List from feast import FeatureTable from feast.infra.provider import Provider -from feast.repo_config import FirestoreOnlineStoreConfig +from feast.repo_config import DatastoreOnlineStoreConfig -def _table_id(project: str, table: FeatureTable) -> str: - return f"{project}_{table.name}" - - -def _delete_collection(coll_ref, batch_size=1000) -> None: +def _delete_all_values(client, key) -> None: """ - Delete Firebase collection. There is no way to delete the entire collections, so we have to - delete documents in the collection one by one. + Delete all keys under the key path in firestore. """ while True: - docs = coll_ref.limit(batch_size).stream() - deleted = 0 - - for doc in docs: - doc.reference.delete() - deleted = deleted + 1 - - if deleted < batch_size: + query = client.query(kind="Value", ancestor=key) + entities = list(query.fetch(limit=1000)) + if not entities: return + for entity in entities: + print("Deleting: {}".format(entity)) + client.delete(entity.key) + class Gcp(Provider): - def __init__(self, config: Optional[FirestoreOnlineStoreConfig]): - pass + def __init__(self, config: DatastoreOnlineStoreConfig): + self._project_id = config.project_id - def _initialize_app(self): - import firebase_admin + def _initialize_client(self): + from google.cloud import datastore - return firebase_admin.initialize_app() + return datastore.Client(self.project_id) def update_infra( self, @@ -41,38 +36,31 @@ def update_infra( tables_to_delete: List[FeatureTable], tables_to_keep: List[FeatureTable], ): - from firebase_admin import firestore - - def table_id(t): - return _table_id(project, table=t) - - app = self._initialize_app() + from google.cloud import datastore - db = firestore.client(app=app) + client = self._initialize_client() for table in tables_to_keep: - db.collection(project).document(table_id(table)).set( - {"created_at": firestore.SERVER_TIMESTAMP} - ) + key = client.key("FeastProject", project, "FeatureTable", table.name) + entity = datastore.Entity(key=key) + entity.update({"created_at": datetime.utcnow()}) + client.put(entity) for table in tables_to_delete: - _delete_collection( - db.collection(project).document(table_id(table)).collection("values") + _delete_all_values( + client, client.key("FeastProject", project, "FeatureTable", table.name) ) - db.collection(project).document(table_id(table)).delete() - - def teardown_infra(self, project: str, tables: List[FeatureTable]) -> None: - from firebase_admin import firestore - def table_id(t): - return _table_id(project, table=t) + key = client.key("FeastProject", project, "FeatureTable", table.name) + client.delete(key) - app = self._initialize_app() - - db = firestore.client(app=app) + def teardown_infra(self, project: str, tables: List[FeatureTable]) -> None: + client = self._initialize_client() for table in tables: - _delete_collection( - db.collection(project).document(table_id(table)).collection("values") + _delete_all_values( + client, client.key("FeastProject", project, "FeatureTable", table.name) ) - db.collection(project).document(table_id(table)).delete() + + key = client.key("FeastProject", project, "FeatureTable", table.name) + client.delete(key) diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 349740693e..aae9c217d1 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -9,12 +9,12 @@ class LocalOnlineStoreConfig(NamedTuple): path: str -class FirestoreOnlineStoreConfig(NamedTuple): - pass +class DatastoreOnlineStoreConfig(NamedTuple): + project_id: str class OnlineStoreConfig(NamedTuple): - firestore: Optional[FirestoreOnlineStoreConfig] = None + datastore: Optional[DatastoreOnlineStoreConfig] = None local: Optional[LocalOnlineStoreConfig] = None diff --git a/sdk/python/requirements-ci.txt b/sdk/python/requirements-ci.txt index 4c058871b9..5049778cb2 100644 --- a/sdk/python/requirements-ci.txt +++ b/sdk/python/requirements-ci.txt @@ -23,4 +23,4 @@ PyYAML==5.3.1 great-expectations==0.13.2 adlfs==0.5.9 firebase-admin==4.5.2 -google-cloud-firestore==2.0.2 +google-cloud-datastore==2.1.0 From 4373a04f6d966330e5d7d8a53f111dc219d65685 Mon Sep 17 00:00:00 2001 From: Oleg Avdeev Date: Tue, 9 Mar 2021 16:44:37 -0800 Subject: [PATCH 12/13] fix tests Signed-off-by: Oleg Avdeev --- sdk/python/feast/infra/gcp.py | 16 ++++++++++++---- sdk/python/feast/infra/provider.py | 2 +- .../cli/{test_firebase.py => test_datastore.py} | 2 +- 3 files changed, 14 insertions(+), 6 deletions(-) rename sdk/python/tests/cli/{test_firebase.py => test_datastore.py} (98%) diff --git a/sdk/python/feast/infra/gcp.py b/sdk/python/feast/infra/gcp.py index 1614de00a5..e1508df068 100644 --- a/sdk/python/feast/infra/gcp.py +++ b/sdk/python/feast/infra/gcp.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import List +from typing import List, Optional from feast import FeatureTable from feast.infra.provider import Provider @@ -22,13 +22,21 @@ def _delete_all_values(client, key) -> None: class Gcp(Provider): - def __init__(self, config: DatastoreOnlineStoreConfig): - self._project_id = config.project_id + _project_id: Optional[str] + + def __init__(self, config: Optional[DatastoreOnlineStoreConfig]): + if config: + self._project_id = config.project_id + else: + self._project_id = None def _initialize_client(self): from google.cloud import datastore - return datastore.Client(self.project_id) + if self._project_id is not None: + return datastore.Client(self.project_id) + else: + return datastore.Client() def update_infra( self, diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index 925c33180a..52689e9543 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -39,7 +39,7 @@ def get_provider(config: RepoConfig) -> Provider: if config.provider == "gcp": from feast.infra.gcp import Gcp - return Gcp(config.online_store.firestore) + return Gcp(config.online_store.datastore) elif config.provider == "local": from feast.infra.local_sqlite import LocalSqlite diff --git a/sdk/python/tests/cli/test_firebase.py b/sdk/python/tests/cli/test_datastore.py similarity index 98% rename from sdk/python/tests/cli/test_firebase.py rename to sdk/python/tests/cli/test_datastore.py index f6a8c5c1e6..c2ef7d350e 100644 --- a/sdk/python/tests/cli/test_firebase.py +++ b/sdk/python/tests/cli/test_datastore.py @@ -46,7 +46,7 @@ def test_basic(self) -> None: metadata_store: {data_path / "metadata.db"} provider: gcp online_store: - firestore: + datastore: """ ) ) From 8bc09171386d7311111207f4119234042846c3b4 Mon Sep 17 00:00:00 2001 From: Oleg Avdeev Date: Tue, 9 Mar 2021 19:21:08 -0800 Subject: [PATCH 13/13] comments Signed-off-by: Oleg Avdeev --- sdk/python/feast/cli.py | 2 +- sdk/python/feast/infra/gcp.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index c12a0e91f1..12aeada765 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -371,7 +371,7 @@ def apply_total_command(repo_path: str): @click.argument("repo_path", type=click.Path(dir_okay=True, exists=True)) def teardown_command(repo_path: str): """ - Tear down infra for a a feature repo + Tear down infra for a feature repo """ repo_config = load_repo_config(Path(repo_path)) diff --git a/sdk/python/feast/infra/gcp.py b/sdk/python/feast/infra/gcp.py index e1508df068..8c3882bb1e 100644 --- a/sdk/python/feast/infra/gcp.py +++ b/sdk/python/feast/infra/gcp.py @@ -8,7 +8,7 @@ def _delete_all_values(client, key) -> None: """ - Delete all keys under the key path in firestore. + Delete all data under the key path in datastore. """ while True: query = client.query(kind="Value", ancestor=key) @@ -59,6 +59,7 @@ def update_infra( client, client.key("FeastProject", project, "FeatureTable", table.name) ) + # Delete the table metadata datastore entity key = client.key("FeastProject", project, "FeatureTable", table.name) client.delete(key) @@ -70,5 +71,6 @@ def teardown_infra(self, project: str, tables: List[FeatureTable]) -> None: client, client.key("FeastProject", project, "FeatureTable", table.name) ) + # Delete the table metadata datastore entity key = client.key("FeastProject", project, "FeatureTable", table.name) client.delete(key)