Skip to content

Commit

Permalink
use new cluster for each dask test
Browse files Browse the repository at this point in the history
  • Loading branch information
shiyu1994 committed Sep 30, 2022
1 parent 982f07e commit 7ff172d
Showing 1 changed file with 53 additions and 56 deletions.
109 changes: 53 additions & 56 deletions tests/python_package_test/test_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,15 @@
]


@pytest.fixture(scope='module')
def cluster():
dask_cluster = LocalCluster(n_workers=2, threads_per_worker=2, dashboard_address=None)
yield dask_cluster
dask_cluster.close()
return dask_cluster


@pytest.fixture(scope='module')
def cluster2():
dask_cluster = LocalCluster(n_workers=2, threads_per_worker=2, dashboard_address=None)
yield dask_cluster
dask_cluster.close()
class ClientWrapper(Client):
def __exit__(self, exc_type, exc_value, traceback):
super().__exit__(exc_type, exc_value, traceback)
self.cluster.close()


@pytest.fixture()
Expand Down Expand Up @@ -249,8 +246,8 @@ def _objective_logistic_regression(y_true, y_pred):
@pytest.mark.parametrize('task', ['binary-classification', 'multiclass-classification'])
@pytest.mark.parametrize('boosting_type', boosting_types)
@pytest.mark.parametrize('tree_learner', distributed_training_algorithms)
def test_classifier(output, task, boosting_type, tree_learner, cluster):
with Client(cluster) as client:
def test_classifier(output, task, boosting_type, tree_learner):
with ClientWrapper(cluster()) as client:
X, y, w, _, dX, dy, dw, _ = _create_data(
objective=task,
output=output
Expand Down Expand Up @@ -344,8 +341,8 @@ def test_classifier(output, task, boosting_type, tree_learner, cluster):

@pytest.mark.parametrize('output', data_output + ['scipy_csc_matrix'])
@pytest.mark.parametrize('task', ['binary-classification', 'multiclass-classification'])
def test_classifier_pred_contrib(output, task, cluster):
with Client(cluster) as client:
def test_classifier_pred_contrib(output, task):
with ClientWrapper(cluster()) as client:
X, y, w, _, dX, dy, dw, _ = _create_data(
objective=task,
output=output
Expand Down Expand Up @@ -440,8 +437,8 @@ def test_classifier_pred_contrib(output, task, cluster):

@pytest.mark.parametrize('output', data_output)
@pytest.mark.parametrize('task', ['binary-classification', 'multiclass-classification'])
def test_classifier_custom_objective(output, task, cluster):
with Client(cluster) as client:
def test_classifier_custom_objective(output, task):
with ClientWrapper(cluster()) as client:
X, y, w, _, dX, dy, dw, _ = _create_data(
objective=task,
output=output,
Expand Down Expand Up @@ -539,7 +536,7 @@ def test_machines_to_worker_map_unparseable_host_names():


def test_assign_open_ports_to_workers(cluster):
with Client(cluster) as client:
with ClientWrapper(cluster()) as client:
workers = client.scheduler_info()['workers'].keys()
n_workers = len(workers)
host_to_workers = lgb.dask._group_workers_by_host(workers)
Expand All @@ -556,7 +553,7 @@ def test_assign_open_ports_to_workers(cluster):


def test_training_does_not_fail_on_port_conflicts(cluster):
with Client(cluster) as client:
with ClientWrapper(cluster()) as client:
_, _, _, _, dX, dy, dw, _ = _create_data('binary-classification', output='array')

lightgbm_default_port = 12400
Expand All @@ -581,8 +578,8 @@ def test_training_does_not_fail_on_port_conflicts(cluster):
@pytest.mark.parametrize('output', data_output)
@pytest.mark.parametrize('boosting_type', boosting_types)
@pytest.mark.parametrize('tree_learner', distributed_training_algorithms)
def test_regressor(output, boosting_type, tree_learner, cluster):
with Client(cluster) as client:
def test_regressor(output, boosting_type, tree_learner):
with ClientWrapper(cluster()) as client:
X, y, w, _, dX, dy, dw, _ = _create_data(
objective='regression',
output=output
Expand Down Expand Up @@ -661,8 +658,8 @@ def test_regressor(output, boosting_type, tree_learner, cluster):


@pytest.mark.parametrize('output', data_output)
def test_regressor_pred_contrib(output, cluster):
with Client(cluster) as client:
def test_regressor_pred_contrib(output):
with ClientWrapper(cluster()) as client:
X, y, w, _, dX, dy, dw, _ = _create_data(
objective='regression',
output=output
Expand Down Expand Up @@ -710,8 +707,8 @@ def test_regressor_pred_contrib(output, cluster):

@pytest.mark.parametrize('output', data_output)
@pytest.mark.parametrize('alpha', [.1, .5, .9])
def test_regressor_quantile(output, alpha, cluster):
with Client(cluster) as client:
def test_regressor_quantile(output, alpha):
with ClientWrapper(cluster()) as client:
X, y, w, _, dX, dy, dw, _ = _create_data(
objective='regression',
output=output
Expand Down Expand Up @@ -757,8 +754,8 @@ def test_regressor_quantile(output, alpha, cluster):


@pytest.mark.parametrize('output', data_output)
def test_regressor_custom_objective(output, cluster):
with Client(cluster) as client:
def test_regressor_custom_objective(output):
with ClientWrapper(cluster()) as client:
X, y, w, _, dX, dy, dw, _ = _create_data(
objective='regression',
output=output
Expand Down Expand Up @@ -810,8 +807,8 @@ def test_regressor_custom_objective(output, cluster):
@pytest.mark.parametrize('group', [None, group_sizes])
@pytest.mark.parametrize('boosting_type', boosting_types)
@pytest.mark.parametrize('tree_learner', distributed_training_algorithms)
def test_ranker(output, group, boosting_type, tree_learner, cluster):
with Client(cluster) as client:
def test_ranker(output, group, boosting_type, tree_learner):
with ClientWrapper(cluster()) as client:
if output == 'dataframe-with-categorical':
X, y, w, g, dX, dy, dw, dg = _create_data(
objective='ranking',
Expand Down Expand Up @@ -915,8 +912,8 @@ def test_ranker(output, group, boosting_type, tree_learner, cluster):


@pytest.mark.parametrize('output', ['array', 'dataframe', 'dataframe-with-categorical'])
def test_ranker_custom_objective(output, cluster):
with Client(cluster) as client:
def test_ranker_custom_objective(output):
with ClientWrapper(cluster()) as client:
if output == 'dataframe-with-categorical':
X, y, w, g, dX, dy, dw, dg = _create_data(
objective='ranking',
Expand Down Expand Up @@ -979,11 +976,11 @@ def test_ranker_custom_objective(output, cluster):
@pytest.mark.parametrize('output', data_output)
@pytest.mark.parametrize('eval_sizes', [[0.5, 1, 1.5], [0]])
@pytest.mark.parametrize('eval_names_prefix', ['specified', None])
def test_eval_set_no_early_stopping(task, output, eval_sizes, eval_names_prefix, cluster):
def test_eval_set_no_early_stopping(task, output, eval_sizes, eval_names_prefix):
if task == 'ranking' and output == 'scipy_csr_matrix':
pytest.skip('LGBMRanker is not currently tested on sparse matrices')

with Client(cluster) as client:
with ClientWrapper(cluster()) as client:
# Use larger trainset to prevent premature stopping due to zero loss, causing num_trees() < n_estimators.
# Use small chunk_size to avoid single-worker allocation of eval data partitions.
n_samples = 1000
Expand Down Expand Up @@ -1128,8 +1125,8 @@ def test_eval_set_no_early_stopping(task, output, eval_sizes, eval_names_prefix,


@pytest.mark.parametrize('task', ['binary-classification', 'regression', 'ranking'])
def test_eval_set_with_custom_eval_metric(task, cluster):
with Client(cluster) as client:
def test_eval_set_with_custom_eval_metric(task):
with ClientWrapper(cluster()) as client:
n_samples = 1000
n_eval_samples = int(n_samples * 0.5)
chunk_size = 10
Expand Down Expand Up @@ -1200,8 +1197,8 @@ def test_eval_set_with_custom_eval_metric(task, cluster):


@pytest.mark.parametrize('task', tasks)
def test_training_works_if_client_not_provided_or_set_after_construction(task, cluster):
with Client(cluster) as client:
def test_training_works_if_client_not_provided_or_set_after_construction(task):
with ClientWrapper(cluster()) as client:
_, _, _, _, dX, dy, _, dg = _create_data(
objective=task,
output='array',
Expand Down Expand Up @@ -1265,17 +1262,17 @@ def test_training_works_if_client_not_provided_or_set_after_construction(task, c
@pytest.mark.parametrize('serializer', ['pickle', 'joblib', 'cloudpickle'])
@pytest.mark.parametrize('task', tasks)
@pytest.mark.parametrize('set_client', [True, False])
def test_model_and_local_version_are_picklable_whether_or_not_client_set_explicitly(serializer, task, set_client, tmp_path, cluster, cluster2):
def test_model_and_local_version_are_picklable_whether_or_not_client_set_explicitly(serializer, task, set_client, tmp_path):

with Client(cluster) as client1:
with ClientWrapper(cluster()) as client1:
# data on cluster1
X_1, _, _, _, dX_1, dy_1, _, dg_1 = _create_data(
objective=task,
output='array',
group=None
)

with Client(cluster2) as client2:
with ClientWrapper(cluster()) as client2:
# create identical data on cluster2
X_2, _, _, _, dX_2, dy_2, _, dg_2 = _create_data(
objective=task,
Expand Down Expand Up @@ -1430,7 +1427,7 @@ def test_model_and_local_version_are_picklable_whether_or_not_client_set_explici


def test_warns_and_continues_on_unrecognized_tree_learner(cluster):
with Client(cluster) as client:
with ClientWrapper(cluster()) as client:
X = da.random.random((1e3, 10))
y = da.random.random((1e3, 1))
dask_regressor = lgb.DaskLGBMRegressor(
Expand All @@ -1447,8 +1444,8 @@ def test_warns_and_continues_on_unrecognized_tree_learner(cluster):


@pytest.mark.parametrize('tree_learner', ['data_parallel', 'voting_parallel'])
def test_training_respects_tree_learner_aliases(tree_learner, cluster):
with Client(cluster) as client:
def test_training_respects_tree_learner_aliases(tree_learner):
with ClientWrapper(cluster()) as client:
task = 'regression'
_, _, _, _, dX, dy, dw, dg = _create_data(objective=task, output='array')
dask_factory = task_to_dask_factory[task]
Expand All @@ -1466,7 +1463,7 @@ def test_training_respects_tree_learner_aliases(tree_learner, cluster):


def test_error_on_feature_parallel_tree_learner(cluster):
with Client(cluster) as client:
with ClientWrapper(cluster()) as client:
X = da.random.random((100, 10), chunks=(50, 10))
y = da.random.random(100, chunks=50)
X, y = client.persist([X, y])
Expand All @@ -1484,7 +1481,7 @@ def test_error_on_feature_parallel_tree_learner(cluster):


def test_errors(cluster):
with Client(cluster) as client:
with ClientWrapper(cluster()) as client:
def f(part):
raise Exception('foo')

Expand All @@ -1503,11 +1500,11 @@ def f(part):

@pytest.mark.parametrize('task', tasks)
@pytest.mark.parametrize('output', data_output)
def test_training_succeeds_even_if_some_workers_do_not_have_any_data(task, output, cluster):
def test_training_succeeds_even_if_some_workers_do_not_have_any_data(task, output):
if task == 'ranking' and output == 'scipy_csr_matrix':
pytest.skip('LGBMRanker is not currently tested on sparse matrices')

with Client(cluster) as client:
with ClientWrapper(cluster()) as client:
def collection_to_single_partition(collection):
"""Merge the parts of a Dask collection into a single partition."""
if collection is None:
Expand Down Expand Up @@ -1555,8 +1552,8 @@ def collection_to_single_partition(collection):


@pytest.mark.parametrize('task', tasks)
def test_network_params_not_required_but_respected_if_given(task, listen_port, cluster):
with Client(cluster) as client:
def test_network_params_not_required_but_respected_if_given(task, listen_port):
with ClientWrapper(cluster()) as client:
_, _, _, _, dX, dy, _, dg = _create_data(
objective=task,
output='array',
Expand Down Expand Up @@ -1613,8 +1610,8 @@ def test_network_params_not_required_but_respected_if_given(task, listen_port, c


@pytest.mark.parametrize('task', tasks)
def test_machines_should_be_used_if_provided(task, cluster):
with Client(cluster) as client:
def test_machines_should_be_used_if_provided(task):
with ClientWrapper(cluster()) as client:
_, _, _, _, dX, dy, _, dg = _create_data(
objective=task,
output='array',
Expand Down Expand Up @@ -1715,8 +1712,8 @@ def test_dask_methods_and_sklearn_equivalents_have_similar_signatures(methods):


@pytest.mark.parametrize('task', tasks)
def test_training_succeeds_when_data_is_dataframe_and_label_is_column_array(task, cluster):
with Client(cluster) as client:
def test_training_succeeds_when_data_is_dataframe_and_label_is_column_array(task):
with ClientWrapper(cluster()) as client:
_, _, _, _, dX, dy, dw, dg = _create_data(
objective=task,
output='dataframe',
Expand All @@ -1742,11 +1739,11 @@ def test_training_succeeds_when_data_is_dataframe_and_label_is_column_array(task

@pytest.mark.parametrize('task', tasks)
@pytest.mark.parametrize('output', data_output)
def test_init_score(task, output, cluster):
def test_init_score(task, output):
if task == 'ranking' and output == 'scipy_csr_matrix':
pytest.skip('LGBMRanker is not currently tested on sparse matrices')

with Client(cluster) as client:
with ClientWrapper(cluster()) as client:
_, _, _, _, dX, dy, dw, dg = _create_data(
objective=task,
output=output,
Expand Down Expand Up @@ -1794,8 +1791,8 @@ def _tested_estimators():

@pytest.mark.parametrize("estimator", _tested_estimators())
@pytest.mark.parametrize("check", sklearn_checks_to_run())
def test_sklearn_integration(estimator, check, cluster):
with Client(cluster) as client:
def test_sklearn_integration(estimator, check):
with ClientWrapper(cluster()) as client:
estimator.set_params(local_listen_port=18000, time_out=5)
name = type(estimator).__name__
check(name, estimator)
Expand All @@ -1811,11 +1808,11 @@ def test_parameters_default_constructible(estimator):

@pytest.mark.parametrize('task', tasks)
@pytest.mark.parametrize('output', data_output)
def test_predict_with_raw_score(task, output, cluster):
def test_predict_with_raw_score(task, output):
if task == 'ranking' and output == 'scipy_csr_matrix':
pytest.skip('LGBMRanker is not currently tested on sparse matrices')

with Client(cluster) as client:
with ClientWrapper(cluster()) as client:
_, _, _, _, dX, dy, _, dg = _create_data(
objective=task,
output=output,
Expand Down

0 comments on commit 7ff172d

Please sign in to comment.