Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make online store nullable #2224

Merged
18 changes: 10 additions & 8 deletions sdk/python/feast/infra/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,16 @@ def update_infra(
entities_to_keep: Sequence[Entity],
partial: bool,
):
self.online_store.update(
config=self.repo_config,
tables_to_delete=tables_to_delete,
tables_to_keep=tables_to_keep,
entities_to_keep=entities_to_keep,
entities_to_delete=entities_to_delete,
partial=partial,
)
# Call update only if there is an online store
if self.online_store:
self.online_store.update(
config=self.repo_config,
tables_to_delete=tables_to_delete,
tables_to_keep=tables_to_keep,
entities_to_keep=entities_to_keep,
entities_to_delete=entities_to_delete,
partial=partial,
)

if self.repo_config.feature_server and self.repo_config.feature_server.enabled:
if not enable_aws_lambda_feature_server(self.repo_config):
Expand Down
3 changes: 1 addition & 2 deletions sdk/python/feast/infra/online_stores/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@
serialize_entity_key,
serialize_entity_key_prefix,
)
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto


def get_online_store_from_config(online_store_config: Any) -> OnlineStore:
mirayyuce marked this conversation as resolved.
Show resolved Hide resolved
def get_online_store_from_config(online_store_config: Any):
"""Creates an online store corresponding to the given online store config."""
module_name = online_store_config.__module__
qualified_name = type(online_store_config).__name__
Expand Down
26 changes: 17 additions & 9 deletions sdk/python/feast/infra/passthrough_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from feast.infra.offline_stores.offline_store import RetrievalJob
from feast.infra.offline_stores.offline_utils import get_offline_store_from_config
from feast.infra.online_stores.helpers import get_online_store_from_config
from feast.infra.online_stores.online_store import OnlineStore
from feast.infra.provider import (
Provider,
_convert_arrow_to_proto,
Expand All @@ -35,7 +36,11 @@ def __init__(self, config: RepoConfig):

self.repo_config = config
self.offline_store = get_offline_store_from_config(config.offline_store)
self.online_store = get_online_store_from_config(config.online_store)
self.online_store = (
get_online_store_from_config(config.online_store)
if config.online_store
else None
)

def update_infra(
self,
Expand All @@ -47,14 +52,17 @@ def update_infra(
partial: bool,
):
set_usage_attribute("provider", self.__class__.__name__)
self.online_store.update(
config=self.repo_config,
tables_to_delete=tables_to_delete,
tables_to_keep=tables_to_keep,
entities_to_keep=entities_to_keep,
entities_to_delete=entities_to_delete,
partial=partial,
)

# Call update only if there is an online store
if self.online_store:
self.online_store.update(
config=self.repo_config,
tables_to_delete=tables_to_delete,
tables_to_keep=tables_to_keep,
entities_to_keep=entities_to_keep,
entities_to_delete=entities_to_delete,
partial=partial,
)

def teardown_infra(
self, project: str, tables: Sequence[FeatureView], entities: Sequence[Entity],
mirayyuce marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
4 changes: 3 additions & 1 deletion sdk/python/feast/repo_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def _validate_online_store_config(cls, values):
if "online_store" not in values:
values["online_store"] = dict()

# Skip if we aren't creating the configuration from a dict
# Skip if we aren't creating the configuration from a dict or online store is null
if not isinstance(values["online_store"], Dict):
return values

Expand All @@ -176,6 +176,7 @@ def _validate_online_store_config(cls, values):
try:
online_config_class = get_online_config_from_type(online_store_type)
online_config_class(**values["online_store"])

mirayyuce marked this conversation as resolved.
Show resolved Hide resolved
except ValidationError as e:
raise ValidationError(
[ErrorWrapper(e, loc="online_store")], model=RepoConfig,
Expand All @@ -185,6 +186,7 @@ def _validate_online_store_config(cls, values):
@root_validator(pre=True)
def _validate_offline_store_config(cls, values):
# Set empty offline_store config if it isn't set explicitly

mirayyuce marked this conversation as resolved.
Show resolved Hide resolved
if "offline_store" not in values:
values["offline_store"] = dict()

Expand Down
67 changes: 66 additions & 1 deletion sdk/python/tests/integration/registration/test_cli.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,32 @@
import os
import tempfile
import uuid
from contextlib import contextmanager
from pathlib import Path, PosixPath
from textwrap import dedent
from typing import List

import pytest
import yaml
from assertpy import assertpy

from feast import FeatureStore, RepoConfig
from tests.integration.feature_repos.integration_test_repo_config import (
IntegrationTestRepoConfig,
)
from tests.integration.feature_repos.repo_configuration import FULL_REPO_CONFIGS
from tests.integration.feature_repos.universal.data_source_creator import (
DataSourceCreator,
)
from tests.integration.feature_repos.universal.data_sources.bigquery import (
BigQueryDataSourceCreator,
)
from tests.integration.feature_repos.universal.data_sources.file import (
FileDataSourceCreator,
)
from tests.integration.feature_repos.universal.data_sources.redshift import (
RedshiftDataSourceCreator,
)
from tests.utils.cli_utils import CliRunner, get_example_repo
from tests.utils.online_read_write_test import basic_rw_test

Expand All @@ -21,7 +35,6 @@
@pytest.mark.parametrize("test_repo_config", FULL_REPO_CONFIGS)
def test_universal_cli(test_repo_config) -> None:
project = f"test_universal_cli_{str(uuid.uuid4()).replace('-', '')[:8]}"

runner = CliRunner()

with tempfile.TemporaryDirectory() as repo_dir_name:
Expand Down Expand Up @@ -128,6 +141,58 @@ def make_feature_store_yaml(project, test_repo_config, repo_dir_name: PosixPath)
return yaml.safe_dump(config_dict)


NULLABLE_ONLINE_STORE_CONFIGS: List[IntegrationTestRepoConfig] = [
mirayyuce marked this conversation as resolved.
Show resolved Hide resolved
IntegrationTestRepoConfig(),
]
if os.getenv("FEAST_IS_LOCAL_TEST", "False") != "True":
NULLABLE_ONLINE_STORE_CONFIGS.extend(
[
IntegrationTestRepoConfig(
provider="gcp",
offline_store_creator=BigQueryDataSourceCreator,
online_store="null",
),
IntegrationTestRepoConfig(
provider="aws",
offline_store_creator=RedshiftDataSourceCreator,
online_store="null",
),
IntegrationTestRepoConfig(
mirayyuce marked this conversation as resolved.
Show resolved Hide resolved
provider="local",
offline_store_creator=FileDataSourceCreator,
online_store="null",
),
]
)


@pytest.mark.integration
@pytest.mark.parametrize("test_nullable_online_store", NULLABLE_ONLINE_STORE_CONFIGS)
def test_nullable_online_store(test_nullable_online_store) -> None:
project = f"test_nullable_online_store{str(uuid.uuid4()).replace('-', '')[:8]}"
runner = CliRunner()

with tempfile.TemporaryDirectory() as repo_dir_name:
try:
feature_store_yaml = make_feature_store_yaml(
project, test_nullable_online_store, repo_dir_name
)
config_dict = yaml.safe_load(feature_store_yaml)
config_dict["online_store"]["type"] = "null"
repo_path = Path(repo_dir_name)

repo_config = repo_path / "feature_store.yaml"

repo_config.write_text(dedent(feature_store_yaml))

repo_example = repo_path / "example.py"
repo_example.write_text(get_example_repo("example_feature_repo_1.py"))
result = runner.run(["apply"], cwd=repo_path)
assertpy.assert_that(result.returncode).is_equal_to(0)
finally:
runner.run(["teardown"], cwd=repo_path)


@contextmanager
def setup_third_party_provider_repo(provider_name: str):
with tempfile.TemporaryDirectory() as repo_dir_name:
Expand Down
43 changes: 43 additions & 0 deletions sdk/python/tests/integration/scaffolding/test_repo_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,49 @@ def _test_config(config_text, expect_error: Optional[str]):
return rc


def test_nullable_online_store_aws():
_test_config(
dedent(
"""
project: foo
registry: "registry.db"
provider: aws
online_store: null
"""
),
expect_error="__root__ -> offline_store -> cluster_id\n"
" field required (type=value_error.missing)",
)


def test_nullable_online_store_gcp():
_test_config(
dedent(
"""
project: foo
registry: "registry.db"
provider: gcp
online_store: null
"""
),
expect_error=None,
)


def test_nullable_online_store_local():
_test_config(
dedent(
"""
project: foo
registry: "registry.db"
provider: local
online_store: null
"""
),
expect_error=None,
)


def test_local_config():
_test_config(
dedent(
Expand Down