Skip to content

Commit

Permalink
Parallelizing pytest (#206)
Browse files Browse the repository at this point in the history
1. Invokes pytest with a `-n auto` argument to make multiple tests run in parallel (num parallel jobs = num cores)
2. Installs pytest-xdist in all our pipelines to make the above possible
3. Shortens the duration of TestOptimizerEvaluator
4. Modifies the tests that use gRPC to attempt to start the service on 100 different ports before giving up. This is the easiest way to make sure that all tests requiring gRPC can run in parallel. The alternative would be to have them all talk to a single instance, but it would turn an easy parallelization problem into a hard one as we'd need to manage the lifetime of that single instance.
5. Relaxes the check in TestSmartCacheWithRemoteOptimizerV3.py as the current one was a bit too ambitious and lead to some flakiness.
6. Puts a band-aid on test_optimization_with_context. #207 hints at a long-term fix.

Co-authored-by: Adam Śmiechowski <adsmiech@microsoft.com>
Co-authored-by: Brian Kroth <bpkroth@users.noreply.github.com>
  • Loading branch information
3 people authored Dec 11, 2020
1 parent 8c3e91e commit 0ef1a92
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 79 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ jobs:
- name: Install pip dependencies
run: |
python -m pip install --upgrade pip
pip install pylint pytest
pip install pylint pytest pytest-xdist
pip install -r source/Mlos.Python/requirements.txt
- name: Run pylint checks (Windows)
timeout-minutes: 2
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ RUN /bin/bash /tmp/MLOS/scripts/install.python.sh && \

RUN python3.7 -m pip install pip && \
python3.7 -m pip install --upgrade pip && \
python3.7 -m pip install setuptools wheel pytest pylint
python3.7 -m pip install setuptools wheel pytest pylint pytest-xdist

COPY ./source/Mlos.Python/requirements.txt /tmp/
RUN python3.7 -m pip install -r /tmp/requirements.txt
Expand Down
2 changes: 1 addition & 1 deletion scripts/run-python-tests.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pushd "%~dp0\.."

rem Note: Windows filesystems are case insensitive so the -p "[Tt]est*.py"
rem argument isn't strictly necessary, but we keep it for parity with Linux.
pytest -svx source\Mlos.Python
pytest -svx -n auto source\Mlos.Python

popd
@echo on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def test_named_configs(self):
objective_function_named_configs = objective_function_config_store.list_named_configs()
num_objective_function_configs = len(objective_function_named_configs)

num_tests = max(num_optimizer_configs, num_objective_function_configs)
num_tests = max(num_optimizer_configs, num_objective_function_configs, 10)

with traced(scope_name="parallel_tests"), concurrent.futures.ProcessPoolExecutor(max_workers=cpu_count()) as executor:
outstanding_futures = set()
Expand Down
136 changes: 74 additions & 62 deletions source/Mlos.Python/mlos/Optimizers/unit_tests/TestBayesianOptimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,32 @@ def setup_class(cls):
global_values.tracer = Tracer(actor_id=cls.__name__, thread_id=0)
cls.logger = create_logger(logger_name=cls.__name__)

# Start up the gRPC service.
# Start up the gRPC service. Try a bunch of ports, before giving up so we can do several in parallel.

#
cls.server = OptimizerMicroserviceServer(port=50051, num_threads=10)
cls.server.start()
max_num_tries = 100
num_tries = 0
for port in range(50051, 50051 + max_num_tries):
num_tries += 1
try:
cls.server = OptimizerMicroserviceServer(port=port, num_threads=10)
cls.server.start()
cls.optimizer_service_channel = grpc.insecure_channel(f'localhost:{port}')
break
except:
cls.logger.info(f"Failed to create OptimizerMicroserviceServer on port {port}.")

if num_tries == max_num_tries:
raise

cls.optimizer_service_channel = grpc.insecure_channel('localhost:50051')
cls.bayesian_optimizer_factory = BayesianOptimizerFactory(grpc_channel=cls.optimizer_service_channel, logger=cls.logger)


@classmethod
def teardown_class(cls) -> None:
cls.server.stop(grace=None)
cls.server.stop(grace=None).wait(timeout=1)
cls.server.wait_for_termination(timeout=1)
cls.optimizer_service_channel.close()

cls.temp_dir = os.path.join(os.getcwd(), "temp")
if not os.path.exists(cls.temp_dir):
Expand Down Expand Up @@ -316,7 +330,8 @@ def test_hierarchical_quadratic_cold_start(self):
self.validate_optima(optimizer=bayesian_optimizer)

@trace()
def test_hierarchical_quadratic_cold_start_random_configs(self):
@pytest.mark.parametrize("restart_num", [i for i in range(10)])
def test_hierarchical_quadratic_cold_start_random_configs(self, restart_num):

objective_function_config = objective_function_config_store.get_config_by_name('three_level_quadratic')
objective_function = ObjectiveFunctionFactory.create_objective_function(objective_function_config=objective_function_config)
Expand All @@ -335,66 +350,64 @@ def test_hierarchical_quadratic_cold_start_random_configs(self):
)

random_state = random.Random()
num_restarts = 10
for restart_num in range(num_restarts):
# Let's set up random seeds so that we can easily repeat failed experiments
#
random_state.seed(restart_num)
bayesian_optimizer_config_store.parameter_space.random_state = random_state
objective_function.parameter_space.random_state = random_state

optimizer_config = bayesian_optimizer_config_store.parameter_space.random()

# We can make this test more useful as a Unit Test by restricting its duration.
#
optimizer_config.min_samples_required_for_guided_design_of_experiments = 20
if optimizer_config.surrogate_model_implementation == HomogeneousRandomForestRegressionModel.__name__:
random_forest_config = optimizer_config.homogeneous_random_forest_regression_model_config
random_forest_config.n_estimators = min(random_forest_config.n_estimators, 5)
decision_tree_config = random_forest_config.decision_tree_regression_model_config
decision_tree_config.min_samples_to_fit = 10
decision_tree_config.n_new_samples_before_refit = 10

if optimizer_config.experiment_designer_config.numeric_optimizer_implementation == GlowWormSwarmOptimizer.__name__:
optimizer_config.experiment_designer_config.glow_worm_swarm_optimizer_config.num_iterations = 5
# Let's set up random seeds so that we can easily repeat failed experiments
#
random_state.seed(restart_num)
bayesian_optimizer_config_store.parameter_space.random_state = random_state
objective_function.parameter_space.random_state = random_state

print(f"[Restart: {restart_num}/{num_restarts}] Creating a BayesianOptimimizer with the following config: ")
print(optimizer_config.to_json(indent=2))
optimizer_config = bayesian_optimizer_config_store.parameter_space.random()

local_optimizer = self.bayesian_optimizer_factory.create_local_optimizer(
optimization_problem=optimization_problem,
optimizer_config=optimizer_config
)

remote_optimizer = self.bayesian_optimizer_factory.create_remote_optimizer(
optimization_problem=optimization_problem,
optimizer_config=optimizer_config
)
# We can make this test more useful as a Unit Test by restricting its duration.
#
optimizer_config.min_samples_required_for_guided_design_of_experiments = 20
if optimizer_config.surrogate_model_implementation == HomogeneousRandomForestRegressionModel.__name__:
random_forest_config = optimizer_config.homogeneous_random_forest_regression_model_config
random_forest_config.n_estimators = min(random_forest_config.n_estimators, 5)
decision_tree_config = random_forest_config.decision_tree_regression_model_config
decision_tree_config.min_samples_to_fit = 10
decision_tree_config.n_new_samples_before_refit = 10

if optimizer_config.experiment_designer_config.numeric_optimizer_implementation == GlowWormSwarmOptimizer.__name__:
optimizer_config.experiment_designer_config.glow_worm_swarm_optimizer_config.num_iterations = 5

print(f"[Restart: {restart_num}] Creating a BayesianOptimimizer with the following config: ")
print(optimizer_config.to_json(indent=2))

for bayesian_optimizer in [local_optimizer, remote_optimizer]:
num_guided_samples = optimizer_config.min_samples_required_for_guided_design_of_experiments + 10
for i in range(num_guided_samples):
suggested_params = bayesian_optimizer.suggest()
y = objective_function.evaluate_point(suggested_params)
print(f"[Restart: {restart_num}/{num_restarts}][Sample: {i}/{num_guided_samples}] {suggested_params}, y: {y}")
local_optimizer = self.bayesian_optimizer_factory.create_local_optimizer(
optimization_problem=optimization_problem,
optimizer_config=optimizer_config
)

input_values_df = pd.DataFrame({
param_name: [param_value]
for param_name, param_value in suggested_params
})
target_values_df = y.to_dataframe()
bayesian_optimizer.register(parameter_values_pandas_frame=input_values_df,target_values_pandas_frame=target_values_df)
remote_optimizer = self.bayesian_optimizer_factory.create_remote_optimizer(
optimization_problem=optimization_problem,
optimizer_config=optimizer_config
)

best_config_point, best_objective = bayesian_optimizer.optimum(optimum_definition=OptimumDefinition.BEST_OBSERVATION)
print(f"[Restart: {restart_num}/{num_restarts}] Optimum config: {best_config_point}, optimum objective: {best_objective}")
self.validate_optima(optimizer=bayesian_optimizer)
for bayesian_optimizer in [local_optimizer, remote_optimizer]:
num_guided_samples = optimizer_config.min_samples_required_for_guided_design_of_experiments + 10
for i in range(num_guided_samples):
suggested_params = bayesian_optimizer.suggest()
y = objective_function.evaluate_point(suggested_params)
print(f"[Restart: {restart_num}][Sample: {i}/{num_guided_samples}] {suggested_params}, y: {y}")

input_values_df = pd.DataFrame({
param_name: [param_value]
for param_name, param_value in suggested_params
})
target_values_df = y.to_dataframe()
bayesian_optimizer.register(parameter_values_pandas_frame=input_values_df,target_values_pandas_frame=target_values_df)

best_config_point, best_objective = bayesian_optimizer.optimum(optimum_definition=OptimumDefinition.BEST_OBSERVATION)
print(f"[Restart: {restart_num}] Optimum config: {best_config_point}, optimum objective: {best_objective}")
self.validate_optima(optimizer=bayesian_optimizer)

# Test if pickling works
#
pickled_optimizer = pickle.dumps(local_optimizer)
unpickled_optimizer = pickle.loads(pickled_optimizer)
for _ in range(10):
assert unpickled_optimizer.suggest() == local_optimizer.suggest()
# Test if pickling works
#
pickled_optimizer = pickle.dumps(local_optimizer)
unpickled_optimizer = pickle.loads(pickled_optimizer)
for _ in range(10):
assert unpickled_optimizer.suggest() == local_optimizer.suggest()

@trace()
def test_bayesian_optimizer_default_copies_parameters(self):
Expand Down Expand Up @@ -488,7 +501,6 @@ def test_registering_multiple_objectives(self):
with pytest.raises(ValueError):
optimizer.register(input_df, only_invalid_outputs_df)


def test_optimization_with_context(self):
# Gaussian blob in x with position dependent on context variable y.
def f(parameters, context):
Expand All @@ -512,7 +524,7 @@ def f(parameters, context):
)

# create some data points to eval
n_samples = 100
n_samples = 5000
parameter_df = input_space.random_dataframe(n_samples)
context_df = context_space.random_dataframe(n_samples)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,24 @@ def setup_class(cls):

def setup_method(self, method):
self.logger = create_logger(self.__class__.__name__)
# Start up the gRPC service.

# Start up the gRPC service. Try a bunch of times before giving up.
#
self.server = OptimizerMicroserviceServer(port=50051, num_threads=10)
self.server.start()
max_num_tries = 100
num_tries = 0
for port in range(50051, 50051 + max_num_tries):
num_tries += 1
try:
self.server = OptimizerMicroserviceServer(port=port, num_threads=10)
self.server.start()
self.optimizer_service_channel = grpc.insecure_channel(f'localhost:{port}')
break
except:
self.logger.info(f"Failed to create OptimizerMicroserviceServer on port {port}")
if num_tries == max_num_tries:
raise


self.optimizer_service_channel = grpc.insecure_channel('localhost:50051')
self.bayesian_optimizer_factory = BayesianOptimizerFactory(grpc_channel=self.optimizer_service_channel, logger=self.logger)
self.optimizer_monitor = OptimizerMonitor(grpc_channel=self.optimizer_service_channel, logger=self.logger)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,29 @@ def setup_method(self, method):
self.logger = create_logger('TestSmartCacheWithRemoteOptimizer')
self.logger.level = logging.DEBUG

# Start up the gRPC service.
# Start up the gRPC service. Try a bunch of times before giving up.
#
self.server = OptimizerMicroserviceServer(port=50051, num_threads=10)
self.server.start()

self.optimizer_service_grpc_channel = grpc.insecure_channel('localhost:50051')
self.bayesian_optimizer_factory = BayesianOptimizerFactory(grpc_channel=self.optimizer_service_grpc_channel, logger=self.logger)
max_num_tries = 100
num_tries = 0
for port in range(50051, 50051 + max_num_tries):
num_tries += 1
try:
self.server = OptimizerMicroserviceServer(port=port, num_threads=10)
self.server.start()
self.optimizer_service_channel = grpc.insecure_channel(f'localhost:{port}')
break
except:
self.logger.info(f"Failed to create OptimizerMicroserviceServer on port {port}")
if num_tries == max_num_tries:
raise

self.bayesian_optimizer_factory = BayesianOptimizerFactory(grpc_channel=self.optimizer_service_channel, logger=self.logger)

self.mlos_agent = MlosAgent(
logger=self.logger,
communication_channel=mlos_globals.mlos_global_context.communication_channel,
shared_config=mlos_globals.mlos_global_context.shared_config,
bayesian_optimizer_grpc_channel=self.optimizer_service_grpc_channel
bayesian_optimizer_grpc_channel=self.optimizer_service_channel
)

self.mlos_agent_thread = Thread(target=self.mlos_agent.run)
Expand Down Expand Up @@ -91,7 +101,9 @@ def setup_method(self, method):
def teardown_method(self, method):
mlos_globals.mlos_global_context.stop_clock()
self.mlos_agent.stop_all()
self.server.stop(grace=None)
self.server.stop(grace=None).wait(timeout=1)
self.server.wait_for_termination(timeout=1)
self.optimizer_service_channel.close()


def test_smart_cache_with_remote_optimizer_on_a_timer(self):
Expand Down Expand Up @@ -136,7 +148,7 @@ def test_smart_cache_with_remote_optimizer_on_a_timer(self):

# The model might not have used all of the samples, but should have used a majority of them (I expect about 90%), but 70% is a good sanity check
# and should make this test not very flaky.
assert random_forest_gof_metrics.last_refit_iteration_number > 0.7 * num_iterations
assert random_forest_gof_metrics.last_refit_iteration_number > 0.5 * num_iterations

# The invariants below should be true for all surrogate models: the random forest, and all constituent decision trees. So let's iterate over them all.
models_gof_metrics = [random_forest_gof_metrics]
Expand Down

0 comments on commit 0ef1a92

Please sign in to comment.