Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use request.addfinalizer instead of the yield based approach in integ tests #2089

Merged
merged 3 commits into from
Dec 1, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 35 additions & 14 deletions sdk/python/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import multiprocessing
from datetime import datetime, timedelta
from sys import platform
Expand All @@ -34,6 +35,8 @@
construct_universal_entities,
)

logger = logging.getLogger(__name__)


def pytest_configure(config):
if platform in ["darwin", "windows"]:
Expand Down Expand Up @@ -137,21 +140,33 @@ def simple_dataset_2() -> pd.DataFrame:
@pytest.fixture(
params=FULL_REPO_CONFIGS, scope="session", ids=[str(c) for c in FULL_REPO_CONFIGS]
)
def environment(request):
with construct_test_environment(request.param) as e:
yield e
def environment(request, tmp_path_factory, worker_id):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is tmp_path_factory needed here? doesn't look like it's being used

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah. Using pytest-xdist, each worker creates its own version of fixtures. I had a version of this PR that creates the fixture in a single worker and shares it across workers by pickling it, but that won't work because when a single fixture is used across workers, there's data corruption (because multiple processes write to the same registry location or sqlite online store).

tmp_path_factory is a holdover from there. Removing.

logger.info("Worker: %s, Request: %s", worker_id, request.param)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be debug level?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah

e = construct_test_environment(request.param)

def cleanup():
logger.info("Running cleanup in %s, Request: %s", worker_id, request.param)
e.feature_store.teardown()

request.addfinalizer(cleanup)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but isn't this code equivalent to the previous version with context managers?
I believe pytest under the hook does the same addfinalizer, https://github.com/pytest-dev/pytest/blob/main/src/_pytest/fixtures.py#L921

return e


@pytest.fixture()
def local_redis_environment():
with construct_test_environment(
IntegrationTestRepoConfig(online_store=REDIS_CONFIG)
) as e:
yield e
def local_redis_environment(request, worker_id):

e = construct_test_environment(IntegrationTestRepoConfig(online_store=REDIS_CONFIG))

def cleanup():
logger.info("Running cleanup in %s, Request: %s", worker_id, request.param)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is request.param supposed to represent here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

request.param refers to the config used to create the environment, from params=FULL_REPO_CONFIGS

e.feature_store.teardown()

request.addfinalizer(cleanup)
return e


@pytest.fixture(scope="session")
def universal_data_sources(environment):
def universal_data_sources(request, environment, tmp_path_factory, worker_id):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, why do we have tmp_path_factory and worker_id here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't. Removing.

entities = construct_universal_entities()
datasets = construct_universal_datasets(
entities, environment.start_date, environment.end_date
Expand All @@ -160,18 +175,24 @@ def universal_data_sources(environment):
datasets, environment.data_source_creator
)

yield entities, datasets, datasources
def cleanup():
environment.data_source_creator.teardown()

environment.data_source_creator.teardown()
request.addfinalizer(cleanup)

return entities, datasets, datasources


@pytest.fixture(scope="session")
def e2e_data_sources(environment: Environment):
def e2e_data_sources(environment: Environment, request):
df = create_dataset()
data_source = environment.data_source_creator.create_data_source(
df, environment.feature_store.project, field_mapping={"ts_1": "ts"},
)

yield df, data_source
def cleanup():
environment.data_source_creator.teardown()

request.addfinalizer(cleanup)

environment.data_source_creator.teardown()
return df, data_source
84 changes: 40 additions & 44 deletions sdk/python/tests/integration/feature_repos/repo_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import os
import tempfile
import uuid
from contextlib import contextmanager
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from pathlib import Path
Expand Down Expand Up @@ -235,7 +234,6 @@ def table_name_from_data_source(ds: DataSource) -> Optional[str]:
return None


@contextmanager
def construct_test_environment(
test_repo_config: IntegrationTestRepoConfig,
test_suite_name: str = "integration_test",
Expand All @@ -254,49 +252,47 @@ def construct_test_environment(
offline_store_config = offline_creator.create_offline_store_config()
online_store = test_repo_config.online_store

with tempfile.TemporaryDirectory() as repo_dir_name:
if test_repo_config.python_feature_server:
from feast.infra.feature_servers.aws_lambda.config import (
AwsLambdaFeatureServerConfig,
)

feature_server = AwsLambdaFeatureServerConfig(
enabled=True,
execution_role_name="arn:aws:iam::402087665549:role/lambda_execution_role",
)

registry = f"s3://feast-integration-tests/registries/{project}/registry.db"
else:
feature_server = None
registry = str(Path(repo_dir_name) / "registry.db")

config = RepoConfig(
registry=registry,
project=project,
provider=test_repo_config.provider,
offline_store=offline_store_config,
online_store=online_store,
repo_path=repo_dir_name,
feature_server=feature_server,
repo_dir_name = tempfile.mkdtemp()

if test_repo_config.python_feature_server:
from feast.infra.feature_servers.aws_lambda.config import (
AwsLambdaFeatureServerConfig,
)

# Create feature_store.yaml out of the config
with open(Path(repo_dir_name) / "feature_store.yaml", "w") as f:
yaml.safe_dump(json.loads(config.json()), f)

fs = FeatureStore(repo_dir_name)
# We need to initialize the registry, because if nothing is applied in the test before tearing down
# the feature store, that will cause the teardown method to blow up.
fs.registry._initialize_registry()
environment = Environment(
name=project,
test_repo_config=test_repo_config,
feature_store=fs,
data_source_creator=offline_creator,
python_feature_server=test_repo_config.python_feature_server,
feature_server = AwsLambdaFeatureServerConfig(
enabled=True,
execution_role_name="arn:aws:iam::402087665549:role/lambda_execution_role",
)

try:
yield environment
finally:
fs.teardown()
registry = f"s3://feast-integration-tests/registries/{project}/registry.db"
else:
feature_server = None
registry = str(Path(repo_dir_name) / "registry.db")

config = RepoConfig(
registry=registry,
project=project,
provider=test_repo_config.provider,
offline_store=offline_store_config,
online_store=online_store,
repo_path=repo_dir_name,
feature_server=feature_server,
)

# Create feature_store.yaml out of the config
with open(Path(repo_dir_name) / "feature_store.yaml", "w") as f:
yaml.safe_dump(json.loads(config.json()), f)

fs = FeatureStore(repo_dir_name)
# We need to initialize the registry, because if nothing is applied in the test before tearing down
# the feature store, that will cause the teardown method to blow up.
fs.registry._initialize_registry()
environment = Environment(
name=project,
test_repo_config=test_repo_config,
feature_store=fs,
data_source_creator=offline_creator,
python_feature_server=test_repo_config.python_feature_server,
)

return environment