Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python-centric feast deploy CLI #1362

Merged
merged 14 commits into from
Mar 10, 2021
36 changes: 36 additions & 0 deletions sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import json
import logging
import sys
from pathlib import Path
from typing import Dict

import click
Expand All @@ -26,6 +27,8 @@
from feast.entity import Entity
from feast.feature_table import FeatureTable
from feast.loaders.yaml import yaml_loader
from feast.repo_config import load_repo_config
woop marked this conversation as resolved.
Show resolved Hide resolved
from feast.repo_operations import apply_total, registry_dump, teardown

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -353,5 +356,38 @@ def project_list():
print(tabulate(table, headers=["NAME"], tablefmt="plain"))


@cli.command("apply")
@click.argument("repo_path", type=click.Path(dir_okay=True, exists=True))
def apply_total_command(repo_path: str):
"""
Applies a feature repo
"""
repo_config = load_repo_config(Path(repo_path))
woop marked this conversation as resolved.
Show resolved Hide resolved

apply_total(repo_config, Path(repo_path).resolve())


@cli.command("teardown")
@click.argument("repo_path", type=click.Path(dir_okay=True, exists=True))
def teardown_command(repo_path: str):
"""
Tear down infra for a a feature repo
woop marked this conversation as resolved.
Show resolved Hide resolved
"""
repo_config = load_repo_config(Path(repo_path))

teardown(repo_config, Path(repo_path).resolve())


@cli.command("registry-dump")
woop marked this conversation as resolved.
Show resolved Hide resolved
@click.argument("repo_path", type=click.Path(dir_okay=True, exists=True))
def registry_dump_command(repo_path: str):
"""
Prints contents of the metadata registry
"""
repo_config = load_repo_config(Path(repo_path))

registry_dump(repo_config)


if __name__ == "__main__":
cli()
1 change: 1 addition & 0 deletions sdk/python/feast/infra/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# from .provider import Provider
36 changes: 36 additions & 0 deletions sdk/python/feast/infra/local_sqlite.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import os
import sqlite3
from typing import List

from feast import FeatureTable
from feast.infra.provider import Provider
from feast.repo_config import LocalOnlineStoreConfig


def _table_id(project: str, table: FeatureTable) -> str:
return f"{project}_{table.name}"


class LocalSqlite(Provider):
_db_path: str

def __init__(self, config: LocalOnlineStoreConfig):
self._db_path = config.path

def update_infra(
self,
project: str,
tables_to_delete: List[FeatureTable],
tables_to_keep: List[FeatureTable],
):
conn = sqlite3.connect(self._db_path)
for table in tables_to_keep:
conn.execute(
f"CREATE TABLE IF NOT EXISTS {_table_id(project, table)} (key BLOB, value BLOB)"
)

for table in tables_to_delete:
conn.execute(f"DROP TABLE IF EXISTS {_table_id(project, table)}")

def teardown_infra(self, project: str, tables: List[FeatureTable]) -> None:
os.unlink(self._db_path)
49 changes: 49 additions & 0 deletions sdk/python/feast/infra/provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import abc
from typing import List

from feast import FeatureTable
from feast.repo_config import RepoConfig


class Provider(abc.ABC):
woop marked this conversation as resolved.
Show resolved Hide resolved
@abc.abstractmethod
def update_infra(
self,
project: str,
tables_to_delete: List[FeatureTable],
tables_to_keep: List[FeatureTable],
):
"""
Reconcile cloud resources with the objects declared in the feature repo.

Args:
tables_to_delete: Tables that were deleted from the feature repo, so provider needs to
clean up the corresponding cloud resources.
tables_to_keep: Tables that are still in the feature repo. Depending on implementation,
provider may or may not need to update the corresponding resources.
"""
...

@abc.abstractmethod
def teardown_infra(self, project: str, tables: List[FeatureTable]):
"""
Tear down all cloud resources for a repo.

Args:
tables: Tables that are declared in the feature repo.
"""
...


def get_provider(config: RepoConfig) -> Provider:
if config.provider == "gcp":
from feast.infra.gcp import Gcp

return Gcp(config.online_store.firestore)
elif config.provider == "local":
from feast.infra.local_sqlite import LocalSqlite

assert config.online_store.local is not None
woop marked this conversation as resolved.
Show resolved Hide resolved
return LocalSqlite(config.online_store.local)
else:
raise ValueError(config)
29 changes: 29 additions & 0 deletions sdk/python/feast/repo_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from pathlib import Path
from typing import NamedTuple, Optional

import yaml
from bindr import bind


class LocalOnlineStoreConfig(NamedTuple):
path: str

class FirestoreOnlineStoreConfig(NamedTuple):
pass

class OnlineStoreConfig(NamedTuple):
firestore: Optional[FirestoreOnlineStoreConfig] = None
local: Optional[LocalOnlineStoreConfig] = None


class RepoConfig(NamedTuple):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we just called this Config?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we already have a Config or two so I wanted to be a bit more specific here

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should reconcile https://github.com/feast-dev/feast/blob/master/sdk/python/feast/feature_store_config.py and this since I think they're the same thing

metadata_store: str
project: str
provider: str
online_store: OnlineStoreConfig


def load_repo_config(repo_path: Path) -> RepoConfig:
with open(repo_path / "feature_store.yaml") as f:
raw_config = yaml.safe_load(f)
return bind(RepoConfig, raw_config)
98 changes: 98 additions & 0 deletions sdk/python/feast/repo_operations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import importlib
import os
import sys
from pathlib import Path
from typing import List, NamedTuple

from feast import Entity, FeatureTable
from feast.infra.provider import get_provider
from feast.registry import Registry
from feast.repo_config import RepoConfig


def py_path_to_module(path: Path, repo_root: Path) -> str:
return (
str(path.relative_to(repo_root))[: -len(".py")]
.replace("./", "")
.replace("/", ".")
)


class ParsedRepo(NamedTuple):
feature_tables: List[FeatureTable]
entities: List[Entity]


def parse_repo(repo_root: Path) -> ParsedRepo:
""" Collect feature table definitions from feature repo """
res = ParsedRepo(feature_tables=[], entities=[])

# FIXME: process subdirs but exclude hidden ones
repo_files = [p.resolve() for p in repo_root.glob("*.py")]

for repo_file in repo_files:

module_path = py_path_to_module(repo_file, repo_root)

print(f"Processing {repo_file} as {module_path}")
module = importlib.import_module(module_path)

for attr_name in dir(module):
obj = getattr(module, attr_name)
if isinstance(obj, FeatureTable):
res.feature_tables.append(obj)
elif isinstance(obj, Entity):
res.entities.append(obj)
return res


def apply_total(repo_config: RepoConfig, repo_path: Path):
os.chdir(repo_path)
sys.path.append("")

project = repo_config.project
registry = Registry(repo_config.metadata_store)
repo = parse_repo(repo_path)

for entity in repo.entities:
registry.apply_entity(entity, project=project)

repo_table_names = set(t.name for t in repo.feature_tables)
tables_to_delete = []
for registry_table in registry.list_feature_tables(project=project):
if registry_table.name not in repo_table_names:
tables_to_delete.append(registry_table)

# Delete tables that should not exist
for registry_table in tables_to_delete:
registry.delete_feature_table(registry_table.name, project=project)

for table in repo.feature_tables:
registry.apply_feature_table(table, project)

infra_provider = get_provider(repo_config)
infra_provider.update_infra(
project, tables_to_delete=tables_to_delete, tables_to_keep=repo.feature_tables
)

print("Done!")


def teardown(repo_config: RepoConfig, repo_path: Path):
registry = Registry(repo_config.metadata_store)
project = repo_config.project
registry_tables = registry.list_feature_tables(project=project)
infra_provider = get_provider(repo_config)
infra_provider.teardown_infra(project, tables=registry_tables)


def registry_dump(repo_config: RepoConfig):
""" For debugging only: output contents of the metadata registry """

project = repo_config.project
registry = Registry(repo_config.metadata_store)

for entity in registry.list_entities(project=project):
print(entity)
for table in registry.list_feature_tables(project=project):
print(table)
1 change: 1 addition & 0 deletions sdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
"numpy<1.20.0",
"google",
"kubernetes==12.0.*",
"bindr",
]

# README file from Feast repo root directory
Expand Down
55 changes: 55 additions & 0 deletions sdk/python/tests/cli/test_cli_local.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import subprocess
import sys
import tempfile
from pathlib import Path
from textwrap import dedent
from typing import List

from feast import cli


class CliRunner:
"""
NB. We can't use test runner helper from click here, since it doesn't start a new Python
interpreter. And we need a new interpreter for each test since we dynamically import
modules from the feature repo, and it is hard to clean up that state otherwise.
"""

def run(self, args: List[str], cwd: Path) -> subprocess.CompletedProcess:
woop marked this conversation as resolved.
Show resolved Hide resolved
return subprocess.run([sys.executable, cli.__file__] + args, cwd=cwd)


class TestCliLocal:

def test_basic(self) -> None:
runner = CliRunner()
with tempfile.TemporaryDirectory() as repo_dir_name, tempfile.TemporaryDirectory() as data_dir_name:

repo_path = Path(repo_dir_name)
data_path = Path(data_dir_name)

repo_config = repo_path / "feature_store.yaml"

repo_config.write_text(
dedent(
f"""
project: foo
metadata_store: {data_path / "metadata.db"}
provider: local
online_store:
local:
path: {data_path / "online_store.db"}
"""
)
)

repo_example = repo_path / "example.py"
repo_example.write_text(
(Path(__file__).parent / "example_feature_repo_1.py").read_text()
)

result = runner.run(["apply", str(repo_path)], cwd=repo_path)
assert result.returncode == 0

result = runner.run(["teardown", str(repo_path)], cwd=repo_path)
assert result.returncode == 0