diff --git a/ema_workbench/em_framework/futures_mpi.py b/ema_workbench/em_framework/futures_mpi.py index 0f78ea97c..0c613b8b2 100644 --- a/ema_workbench/em_framework/futures_mpi.py +++ b/ema_workbench/em_framework/futures_mpi.py @@ -14,7 +14,7 @@ from .util import NamedObjectMap from .model import AbstractModel from .experiment_runner import ExperimentRunner -from ..util import get_module_logger, get_rootlogger +from ..util import get_module_logger, get_rootlogger, method_logger from ..util import ema_logging @@ -71,7 +71,7 @@ def mpi_initializer(models, log_level, root_dir): root_logger.info(f"worker {rank} initialized") -def logwatcher(stop_event): +def logwatcher(start_event, stop_event): from mpi4py import MPI rank = MPI.COMM_WORLD.Get_rank() @@ -84,6 +84,7 @@ def logwatcher(stop_event): service = "logwatcher" MPI.Publish_name(service, info, port) _logger.debug(f"published service: {service}") + start_event.set() root = 0 _logger.debug("waiting for client connection...") @@ -96,12 +97,18 @@ def logwatcher(stop_event): try: logger = logging.getLogger(record.name) except Exception as e: - # AttributeError if record does not have a name attribute - # TypeError record.name is not a string - raise e + if record.msg is None: + _logger.debug("received sentinel") + break + else: + # AttributeError if record does not have a name attribute + # TypeError record.name is not a string + raise e else: logger.callHandlers(record) + _logger.info("closing logwatcher") + def run_experiment_mpi(experiment): _logger.debug(f"starting {experiment.experiment_id}") @@ -113,6 +120,15 @@ def run_experiment_mpi(experiment): return experiment, outcomes +def send_sentinel(): + record = logging.makeLogRecord(dict(level=logging.CRITICAL, msg=None, name=42)) + + for handler in get_rootlogger().handlers: + if isinstance(handler, MPIHandler): + _logger.debug("sending sentinel") + handler.communicator.send(record, 0, 0) + + class MPIHandler(QueueHandler): """ This handler sends events from the worker process to the master process @@ -154,24 +170,33 @@ def __init__(self, msis, n_processes=None, **kwargs): self.stop_event = None self.n_processes = n_processes + @method_logger(__name__) def initialize(self): # Only import mpi4py if the MPIEvaluator is used, to avoid unnecessary dependencies. from mpi4py.futures import MPIPoolExecutor + start_event = threading.Event() self.stop_event = threading.Event() self.logwatcher_thread = threading.Thread( - name="logwatcher", target=logwatcher, daemon=True, args=(self.stop_event,) + name="logwatcher", + target=logwatcher, + daemon=False, + args=( + start_event, + self.stop_event, + ), ) self.logwatcher_thread.start() + start_event.wait() + _logger.info("logwatcher server started") self.root_dir = determine_rootdir(self._msis) self._pool = MPIPoolExecutor( max_workers=self.n_processes, initializer=mpi_initializer, initargs=(self._msis, _logger.level, self.root_dir), - ) # Removed initializer arguments + ) - self._pool = MPIPoolExecutor(max_workers=self.n_processes) # Removed initializer arguments _logger.info(f"MPI pool started with {self._pool._max_workers} workers") if self._pool._max_workers <= 10: _logger.warning( @@ -179,10 +204,16 @@ def initialize(self): ) return self + @method_logger(__name__) def finalize(self): - self._pool.shutdown() + # submit sentinel self.stop_event.set() - _logger.info("MPI pool has been shut down") + self._pool.submit(send_sentinel) + self._pool.shutdown() + self.logwatcher_thread.join(timeout=60) + + if self.logwatcher_thread.is_alive(): + _logger.warning(f"houston we have a problem") if self.root_dir: shutil.rmtree(self.root_dir) @@ -190,9 +221,9 @@ def finalize(self): time.sleep(0.1) _logger.info("MPI pool has been shut down") + @method_logger(__name__) def evaluate_experiments(self, scenarios, policies, callback, combine="factorial", **kwargs): - ex_gen = experiment_generator(scenarios, self._msis, policies, combine=combine) - experiments = list(ex_gen) + experiments = list(experiment_generator(scenarios, self._msis, policies, combine=combine)) results = self._pool.map(run_experiment_mpi, experiments, **kwargs) for experiment, outcomes in results: diff --git a/ema_workbench/examples/example_mpi_lake_model.py b/ema_workbench/examples/example_mpi_lake_model.py index 157e03f31..99e92ba68 100644 --- a/ema_workbench/examples/example_mpi_lake_model.py +++ b/ema_workbench/examples/example_mpi_lake_model.py @@ -9,11 +9,6 @@ import math import time -# FIXME -import sys - -sys.path.insert(0, "/Users/jhkwakkel/Documents/GitHub/EMAworkbench") - import numpy as np from scipy.optimize import brentq @@ -88,10 +83,13 @@ def lake_problem( if __name__ == "__main__": + import ema_workbench + # run with mpiexec -n 1 -usize {ntasks} python example_mpi_lake_model.py starttime = time.perf_counter() ema_logging.log_to_stderr(ema_logging.INFO, pass_root_logger_level=True) + ema_logging.get_rootlogger().info(f"{ema_workbench.__version__}") # instantiate the model lake_model = Model("lakeproblem", function=lake_problem) diff --git a/ema_workbench/examples/slurm_script.sh b/ema_workbench/examples/slurm_script.sh index 7b8c2819d..395961529 100644 --- a/ema_workbench/examples/slurm_script.sh +++ b/ema_workbench/examples/slurm_script.sh @@ -1,8 +1,8 @@ #!/bin/bash #SBATCH --job-name="Python_test" -#SBATCH --time=00:02:00 -#SBATCH --ntasks=10 +#SBATCH --time=00:06:00 +#SBATCH --ntasks=8 #SBATCH --cpus-per-task=1 #SBATCH --partition=compute #SBATCH --mem-per-cpu=4GB @@ -17,6 +17,7 @@ module load py-mpi4py module load py-pip pip install ipyparallel -pip install --user -e git+https://github.com/quaquel/EMAworkbench@mpi_update#egg=ema-workbench +pip install --user -e git+https://github.com/quaquel/EMAworkbench@mpi_fixes#egg=ema_workbench mpiexec -n 1 python3 example_mpi_lake_model.py +