Skip to content

Commit

Permalink
Reorganization of evaluator code and renaming of modules (#320)
Browse files Browse the repository at this point in the history
reorganizing evalutor code and renaming of modules

In preparation for future updates to the mpi code, all evaluator code has been reorganized. All evaluators go into their respective modules, and code shared among two or more goes into a new util package. All relevant modules also have been renamed to future_{style of parallelization}
  • Loading branch information
quaquel authored Dec 8, 2023
1 parent 2ae0278 commit 2b1ad48
Show file tree
Hide file tree
Showing 20 changed files with 479 additions and 428 deletions.
1 change: 1 addition & 0 deletions docs/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ nbsphinx
myst
pyscaffold
myst-parser
ipyparallel

# General requirements
numpy
Expand Down
6 changes: 4 additions & 2 deletions docs/source/api_index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ Exploratory modeling framework
../../ema_documentation/em_framework/experiment_runner.rst
../../ema_documentation/em_framework/callbacks.rst
../../ema_documentation/em_framework/points.rst
../../ema_documentation/em_framework/ema_multiprocessing.rst
../../ema_documentation/em_framework/ema_ipyparallel.rst
../../ema_documentation/em_framework/futures_multiprocessing.rst
../../ema_documentation/em_framework/futures_ipyparallel.rst
../../ema_documentation/em_framework/futures_mpi.rst
../../ema_documentation/em_framework/futures_util.rst
../../ema_documentation/em_framework/util.rst

Connectors
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
**************************
:mod:`futures_ipyparallel`
**************************

.. automodule:: ema_workbench.em_framework.futures_ipyparallel
:members:
6 changes: 6 additions & 0 deletions docs/source/ema_documentation/em_framework/futures_mpi.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
******************
:mod:`futures_mpi`
******************

.. automodule:: ema_workbench.em_framework.futures_mpi
:members:
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
******************************
:mod:`futures_multiprocessing`
******************************

.. automodule:: ema_workbench.em_framework.futures_multiprocessing
:members:
6 changes: 6 additions & 0 deletions docs/source/ema_documentation/em_framework/futures_util.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
******************
:mod:`futures_util`
******************

.. automodule:: ema_workbench.em_framework.futures_util
:members:
12 changes: 6 additions & 6 deletions ema_workbench/em_framework/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import warnings

__all__ = [
"ema_parallel",
"parameters",
"model",
"outcomes",
Expand Down Expand Up @@ -77,8 +76,6 @@
from .evaluators import (
perform_experiments,
optimize,
MPIEvaluator,
MultiprocessingEvaluator,
SequentialEvaluator,
Samplers,
)
Expand All @@ -96,12 +93,15 @@
to_problem,
to_robust_problem,
)
from .outputspace_exploration import OutputSpaceExploration

try:
from .evaluators import IpyparallelEvaluator
from .futures_ipyparallel import IpyparallelEvaluator
except ImportError:
warnings.warn("ipyparallel not installed - IpyparalleEvaluator not available")
IpyparallelEvaluator = None
warnings.warn("ipyparallel not available", ImportWarning)

from .futures_multiprocessing import MultiprocessingEvaluator
from .futures_mpi import MPIEvaluator
from .outputspace_exploration import OutputSpaceExploration

del warnings
235 changes: 0 additions & 235 deletions ema_workbench/em_framework/evaluators.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,12 @@
"""
import enum
import multiprocessing
import numbers
import os
import random
import shutil
import string
import sys
import threading
import warnings
import logging

from ema_workbench.em_framework.samplers import AbstractSampler
from .callbacks import DefaultCallback
from .points import experiment_generator, Scenario, Policy
from .ema_multiprocessing import LogQueueReader, initializer, add_tasks
from .experiment_runner import ExperimentRunner
from .model import AbstractModel
from .optimization import (
Expand All @@ -45,26 +36,12 @@
from .util import NamedObjectMap, determine_objects
from ..util import EMAError, get_module_logger, ema_logging

warnings.simplefilter("once", ImportWarning)

try:
from .ema_ipyparallel import (
start_logwatcher,
set_engine_logger,
initialize_engines,
cleanup,
_run_experiment,
)
except (ImportError, ModuleNotFoundError):
warnings.warn("ipyparallel not installed - IpyparalleEvaluator not available")

# Created on 5 Mar 2017
#
# .. codeauthor::jhkwakkel <j.h.kwakkel (at) tudelft (dot) nl>

__all__ = [
"MultiprocessingEvaluator",
"IpyparallelEvaluator",
"optimize",
"perform_experiments",
"SequentialEvaluator",
Expand Down Expand Up @@ -314,218 +291,6 @@ def evaluate_experiments(self, scenarios, policies, callback, combine="factorial
os.chdir(cwd)


class MultiprocessingEvaluator(BaseEvaluator):
"""evaluator for experiments using a multiprocessing pool
Parameters
----------
msis : collection of models
n_processes : int (optional)
A negative number can be inputted to use the number of logical cores minus the negative cores.
For example, on a 12 thread processor, -2 results in using 10 threads.
max_tasks : int (optional)
note that the maximum number of available processes is either multiprocessing.cpu_count()
and in case of windows, this never can be higher then 61
"""

def __init__(self, msis, n_processes=None, maxtasksperchild=None, **kwargs):
super().__init__(msis, **kwargs)

self._pool = None

# Calculate n_processes if negative value is inputted
max_processes = multiprocessing.cpu_count()
if sys.platform == "win32":
# on windows the max number of processes is currently
# still limited to 61
max_processes = min(max_processes, 61)

if isinstance(n_processes, int):
if n_processes > 0:
if max_processes < n_processes:
warnings.warn(
f"The number of processes cannot be more then {max_processes}", UserWarning
)
self.n_processes = min(n_processes, max_processes)
else:
self.n_processes = max(max_processes + n_processes, 1)
elif n_processes is None:
self.n_processes = max_processes
else:
raise ValueError(f"max_processes must be an integer or None, not {type(n_processes)}")

self.maxtasksperchild = maxtasksperchild

def initialize(self):
log_queue = multiprocessing.Queue()

log_queue_reader = LogQueueReader(log_queue)
log_queue_reader.start()

try:
loglevel = ema_logging._rootlogger.getEffectiveLevel()
except AttributeError:
loglevel = 30

# check if we need a working directory
for model in self._msis:
try:
model.working_directory
except AttributeError:
self.root_dir = None
break
else:
random_part = [random.choice(string.ascii_letters + string.digits) for _ in range(5)]
random_part = "".join(random_part)
self.root_dir = os.path.abspath("tmp" + random_part)
os.makedirs(self.root_dir)

self._pool = multiprocessing.Pool(
self.n_processes,
initializer,
(self._msis, log_queue, loglevel, self.root_dir),
self.maxtasksperchild,
)
self.n_processes = self._pool._processes
_logger.info(f"pool started with {self.n_processes} workers")
return self

def __exit__(self, exc_type, exc_value, traceback):
_logger.info("terminating pool")

if exc_type is not None:
# When an exception is thrown stop accepting new jobs
# and abort pending jobs without waiting.
self._pool.terminate()
return False

super().__exit__(exc_type, exc_value, traceback)

def finalize(self):
# Stop accepting new jobs and wait for pending jobs to finish.
self._pool.close()
self._pool.join()

if self.root_dir:
shutil.rmtree(self.root_dir)

def evaluate_experiments(self, scenarios, policies, callback, combine="factorial"):
ex_gen = experiment_generator(scenarios, self._msis, policies, combine=combine)
add_tasks(self.n_processes, self._pool, ex_gen, callback)


class MPIEvaluator(BaseEvaluator):
"""Evaluator for experiments using MPI Pool Executor from mpi4py"""

def __init__(self, msis, n_processes=None, **kwargs):
super().__init__(msis, **kwargs)
warnings.warn(
"The MPIEvaluator is experimental. Its interface and functionality might change in future releases.\n"
"We welcome your feedback at: https://github.com/quaquel/EMAworkbench/discussions/311",
FutureWarning,
)
self._pool = None
self.n_processes = n_processes

def initialize(self):
# Only import mpi4py if the MPIEvaluator is used, to avoid unnecessary dependencies.
from mpi4py.futures import MPIPoolExecutor

self._pool = MPIPoolExecutor(max_workers=self.n_processes) # Removed initializer arguments
_logger.info(f"MPI pool started with {self._pool._max_workers} workers")
if self._pool._max_workers <= 10:
_logger.warning(
f"With only a few workers ({self._pool._max_workers}), the MPIEvaluator may be slower than the Sequential- or MultiprocessingEvaluator"
)
return self

def finalize(self):
self._pool.shutdown()
_logger.info("MPI pool has been shut down")

def evaluate_experiments(self, scenarios, policies, callback, combine="factorial"):
ex_gen = experiment_generator(scenarios, self._msis, policies, combine=combine)
experiments = list(ex_gen)

packed = [(experiment, experiment.model_name, self._msis) for experiment in experiments]

_logger.info(
f"MPIEvaluator: Starting {len(packed)} experiments using MPI pool with {self._pool._max_workers} workers"
)
results = self._pool.map(run_experiment_mpi, packed)

_logger.info(f"MPIEvaluator: Completed all {len(packed)} experiments")
for experiment, outcomes in results:
callback(experiment, outcomes)
_logger.info(f"MPIEvaluator: Callback completed for all {len(packed)} experiments")


def run_experiment_mpi(packed_data):
from mpi4py.MPI import COMM_WORLD

rank = COMM_WORLD.Get_rank()

experiment, model_name, msis = packed_data
_logger.debug(f"MPI Rank {rank}: starting {repr(experiment)}")

models = NamedObjectMap(AbstractModel)
models.extend(msis)
experiment_runner = ExperimentRunner(models)

outcomes = experiment_runner.run_experiment(experiment)

_logger.debug(f"MPI Rank {rank}: completed {experiment}")

return experiment, outcomes


class IpyparallelEvaluator(BaseEvaluator):
"""evaluator for using an ipypparallel pool"""

def __init__(self, msis, client, **kwargs):
super().__init__(msis, **kwargs)
self.client = client

def initialize(self):
import ipyparallel

_logger.debug("starting ipyparallel pool")

try:
TIMEOUT_MAX = threading.TIMEOUT_MAX
except AttributeError:
TIMEOUT_MAX = 1e10 # noqa
ipyparallel.client.asyncresult._FOREVER = TIMEOUT_MAX
# update loggers on all engines
self.client[:].apply_sync(set_engine_logger)

_logger.debug("initializing engines")
initialize_engines(self.client, self._msis, os.getcwd())

self.logwatcher, self.logwatcher_thread = start_logwatcher()

_logger.debug("successfully started ipyparallel pool")
_logger.info("performing experiments using ipyparallel")

return self

def finalize(self):
self.logwatcher.stop()
cleanup(self.client)

def evaluate_experiments(self, scenarios, policies, callback, combine="factorial"):
ex_gen = experiment_generator(scenarios, self._msis, policies, combine=combine)

lb_view = self.client.load_balanced_view()
results = lb_view.map(_run_experiment, ex_gen, ordered=False, block=False)

for entry in results:
callback(*entry)


def perform_experiments(
models,
scenarios=0,
Expand Down
Loading

0 comments on commit 2b1ad48

Please sign in to comment.