Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[tests] Added more explicit lifecycle management to Ray clusters during tests #2447

Merged
merged 11 commits into from
Sep 5, 2022
Merged
2 changes: 1 addition & 1 deletion .github/workflows/pytest.yml
Original file line number Diff line number Diff line change
@@ -151,7 +151,7 @@ jobs:

- name: Tests
run: |
pytest -v --timeout 300 --durations 10 -m "$MARKERS" --junitxml pytest.xml tests
pytest -v --timeout 300 --durations 100 -m "$MARKERS" --junitxml pytest.xml tests

- name: Upload Unit Test Results
if: always()
20 changes: 14 additions & 6 deletions ludwig/backend/ray.py
Original file line number Diff line number Diff line change
@@ -802,12 +802,7 @@ def __init__(self, processor=None, trainer=None, loader=None, use_legacy=False,
self._use_legacy = use_legacy

def initialize(self):
if not ray.is_initialized():
try:
ray.init("auto", ignore_reinit_error=True)
except ConnectionError:
logger.info("Initializing new Ray cluster...")
ray.init(ignore_reinit_error=True)
initialize_ray()

dask.config.set(scheduler=ray_dask_get)
# Disable placement groups on dask
@@ -944,3 +939,16 @@ def num_nodes(self) -> int:
if not ray.is_initialized():
return 1
return len(ray.nodes())


def initialize_ray():
if not ray.is_initialized():
try:
ray.init("auto", ignore_reinit_error=True)
except ConnectionError:
init_ray_local()


def init_ray_local():
logger.info("Initializing new Ray cluster...")
ray.init(ignore_reinit_error=True)
8 changes: 2 additions & 6 deletions ludwig/hyperopt/execution.py
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@

from ludwig.api import LudwigModel
from ludwig.backend import initialize_backend, RAY
from ludwig.backend.ray import initialize_ray
from ludwig.callbacks import Callback
from ludwig.constants import (
COLUMN,
@@ -159,12 +160,7 @@ def __init__(
self.output_feature = output_feature
self.metric = metric
self.split = split
if not ray.is_initialized():
try:
ray.init("auto", ignore_reinit_error=True)
except ConnectionError:
logger.info("Initializing new Ray cluster...")
ray.init(ignore_reinit_error=True)
initialize_ray()
self.search_space, self.decode_ctx = self._get_search_space(parameters)
self.num_samples = num_samples
self.goal = goal
9 changes: 3 additions & 6 deletions ludwig/utils/automl/ray_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import os

from ludwig.backend.ray import initialize_ray

try:
import ray
except ImportError:
@@ -23,8 +24,4 @@ def _ray_init():
# Forcibly terminate trial requested to stop after this amount of time passes
os.environ.setdefault("TUNE_FORCE_TRIAL_CLEANUP_S", "120")

try:
ray.init("auto", ignore_reinit_error=True)
except ConnectionError:
logging.info("Initializing new Ray cluster...")
ray.init()
initialize_ray()
44 changes: 40 additions & 4 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@
import os
import tempfile
import uuid
from unittest import mock

import pytest

@@ -24,6 +25,20 @@
from tests.integration_tests.utils import category_feature, generate_data, text_feature


@pytest.fixture(autouse=True)
def setup_tests(request):
if "distributed" not in request.keywords:
# Only run this patch if we're running distributed tests, otherwise Ray will not be installed
# and this will fail.
# See: https://stackoverflow.com/a/38763328
yield
return

with mock.patch("ludwig.backend.ray.init_ray_local") as mock_init_ray_local:
mock_init_ray_local.side_effect = RuntimeError("Ray must be initialized explicitly when running tests")
yield mock_init_ray_local


@pytest.fixture()
def csv_filename():
"""Yields a csv filename for holding temporary data."""
@@ -93,14 +108,35 @@ def hyperopt_results():


@pytest.fixture(scope="module")
def ray_cluster_2cpu():
with _ray_start(num_cpus=2):
def ray_cluster_2cpu(request):
with _ray_start(request, num_cpus=2):
yield


@pytest.fixture(scope="module")
def ray_cluster_3cpu(request):
with _ray_start(request, num_cpus=3):
yield


@pytest.fixture(scope="module")
def ray_cluster_7cpu(request):
with _ray_start(request, num_cpus=7):
yield


@contextlib.contextmanager
def _ray_start(**kwargs):
import ray
def _ray_start(request, **kwargs):
try:
import ray
except ImportError:
if "distributed" in request.keywords:
raise

# Allow this fixture to run in environments where Ray is not installed
# for parameterized tests that mix Ray with non-Ray backends
yield None
return

init_kwargs = _get_default_ray_kwargs()
init_kwargs.update(kwargs)
9 changes: 7 additions & 2 deletions tests/integration_tests/test_cli.py
Original file line number Diff line number Diff line change
@@ -281,11 +281,16 @@ def test_preprocess_cli(tmpdir, csv_filename):
_run_ludwig("preprocess", dataset=dataset_filename, preprocessing_config=config_filename)


@pytest.mark.distributed
@pytest.mark.parametrize("second_seed_offset", [0, 1])
@pytest.mark.parametrize("random_seed", [1919, 31])
@pytest.mark.parametrize("type_of_run", ["train", "experiment"])
@pytest.mark.parametrize("backend", ["local", "horovod"])
@pytest.mark.parametrize(
"backend",
[
pytest.param("local", id="local"),
pytest.param("horovod", id="horovod", marks=pytest.mark.distributed),
],
)
def test_reproducible_cli_runs(
backend: str, type_of_run: str, random_seed: int, second_seed_offset: int, csv_filename: str, tmpdir: pathlib.Path
) -> None:
50 changes: 14 additions & 36 deletions tests/integration_tests/test_gbm.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import contextlib
import os

import pytest
@@ -14,20 +13,6 @@
from tests.integration_tests.utils import binary_feature, category_feature, generate_data, number_feature, text_feature


@contextlib.contextmanager
def ray_start(num_cpus=3, num_gpus=None):
res = _ray.init(
num_cpus=num_cpus,
num_gpus=num_gpus,
include_dashboard=False,
object_store_memory=150 * 1024 * 1024,
)
try:
yield res
finally:
_ray.shutdown()


@pytest.fixture(scope="module")
def local_backend():
return {"type": "local"}
@@ -75,9 +60,8 @@ def test_local_gbm_output_not_supported(tmpdir, local_backend):


@pytest.mark.distributed
def test_ray_gbm_output_not_supported(tmpdir, ray_backend):
with ray_start():
run_test_gbm_output_not_supported(tmpdir, ray_backend)
def test_ray_gbm_output_not_supported(tmpdir, ray_backend, ray_cluster_3cpu):
run_test_gbm_output_not_supported(tmpdir, ray_backend)


def run_test_gbm_multiple_outputs(tmpdir, backend_config):
@@ -109,9 +93,8 @@ def test_local_gbm_multiple_outputs(tmpdir, local_backend):


@pytest.mark.distributed
def test_ray_gbm_multiple_outputs(tmpdir, ray_backend):
with ray_start():
run_test_gbm_multiple_outputs(tmpdir, ray_backend)
def test_ray_gbm_multiple_outputs(tmpdir, ray_backend, ray_cluster_3cpu):
run_test_gbm_multiple_outputs(tmpdir, ray_backend)


def run_test_gbm_binary(tmpdir, backend_config):
@@ -154,9 +137,8 @@ def test_local_gbm_binary(tmpdir, local_backend):


@pytest.mark.distributed
def test_ray_gbm_binary(tmpdir, ray_backend):
with ray_start():
run_test_gbm_binary(tmpdir, ray_backend)
def test_ray_gbm_binary(tmpdir, ray_backend, ray_cluster_3cpu):
run_test_gbm_binary(tmpdir, ray_backend)


def run_test_gbm_non_number_inputs(tmpdir, backend_config):
@@ -199,9 +181,8 @@ def test_local_gbm_non_number_inputs(tmpdir, local_backend):


@pytest.mark.distributed
def test_ray_gbm_non_number_inputs(tmpdir, ray_backend):
with ray_start():
run_test_gbm_non_number_inputs(tmpdir, ray_backend)
def test_ray_gbm_non_number_inputs(tmpdir, ray_backend, ray_cluster_3cpu):
run_test_gbm_non_number_inputs(tmpdir, ray_backend)


def run_test_gbm_category(tmpdir, backend_config):
@@ -246,9 +227,8 @@ def test_local_gbm_category(tmpdir, local_backend):


@pytest.mark.distributed
def test_ray_gbm_category(tmpdir, ray_backend):
with ray_start():
run_test_gbm_category(tmpdir, ray_backend)
def test_ray_gbm_category(tmpdir, ray_backend, ray_cluster_3cpu):
run_test_gbm_category(tmpdir, ray_backend)


def run_test_gbm_number(tmpdir, backend_config):
@@ -298,9 +278,8 @@ def test_local_gbm_number(tmpdir, local_backend):


@pytest.mark.distributed
def test_ray_gbm_number(tmpdir, ray_backend):
with ray_start():
run_test_gbm_number(tmpdir, ray_backend)
def test_ray_gbm_number(tmpdir, ray_backend, ray_cluster_3cpu):
run_test_gbm_number(tmpdir, ray_backend)


def run_test_gbm_schema(backend_config):
@@ -328,6 +307,5 @@ def test_local_gbm_schema(local_backend):


@pytest.mark.distributed
def test_ray_gbm_schema(ray_backend):
with ray_start():
run_test_gbm_schema(ray_backend)
def test_ray_gbm_schema(ray_backend, ray_cluster_3cpu):
run_test_gbm_schema(ray_backend)
Loading