From b71e27cadcdc71a685ca72d7a0df36ea3b4d8def Mon Sep 17 00:00:00 2001 From: Jan Kwakkel Date: Wed, 20 Mar 2024 13:24:28 +0100 Subject: [PATCH 01/22] remove dubble initialization of pool --- ema_workbench/em_framework/futures_mpi.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ema_workbench/em_framework/futures_mpi.py b/ema_workbench/em_framework/futures_mpi.py index 48b812f50..b550acfaa 100644 --- a/ema_workbench/em_framework/futures_mpi.py +++ b/ema_workbench/em_framework/futures_mpi.py @@ -169,9 +169,8 @@ def initialize(self): max_workers=self.n_processes, initializer=mpi_initializer, initargs=(self._msis, _logger.level, self.root_dir), - ) # Removed initializer arguments + ) - self._pool = MPIPoolExecutor(max_workers=self.n_processes) # Removed initializer arguments _logger.info(f"MPI pool started with {self._pool._max_workers} workers") if self._pool._max_workers <= 10: _logger.warning( From 435050fac7ed935344cc9d2f317c0952c13e34cf Mon Sep 17 00:00:00 2001 From: Jan Kwakkel Date: Wed, 20 Mar 2024 13:24:36 +0100 Subject: [PATCH 02/22] update mpi example --- ema_workbench/examples/example_mpi_lake_model.py | 5 ----- ema_workbench/examples/slurm_script.sh | 4 ++-- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/ema_workbench/examples/example_mpi_lake_model.py b/ema_workbench/examples/example_mpi_lake_model.py index 157e03f31..d23c1703a 100644 --- a/ema_workbench/examples/example_mpi_lake_model.py +++ b/ema_workbench/examples/example_mpi_lake_model.py @@ -9,11 +9,6 @@ import math import time -# FIXME -import sys - -sys.path.insert(0, "/Users/jhkwakkel/Documents/GitHub/EMAworkbench") - import numpy as np from scipy.optimize import brentq diff --git a/ema_workbench/examples/slurm_script.sh b/ema_workbench/examples/slurm_script.sh index 7b8c2819d..60b6479e6 100644 --- a/ema_workbench/examples/slurm_script.sh +++ b/ema_workbench/examples/slurm_script.sh @@ -1,7 +1,7 @@ #!/bin/bash #SBATCH --job-name="Python_test" -#SBATCH --time=00:02:00 +#SBATCH --time=00:06:00 #SBATCH --ntasks=10 #SBATCH --cpus-per-task=1 #SBATCH --partition=compute @@ -17,6 +17,6 @@ module load py-mpi4py module load py-pip pip install ipyparallel -pip install --user -e git+https://github.com/quaquel/EMAworkbench@mpi_update#egg=ema-workbench +pip install --user ema_workbench mpiexec -n 1 python3 example_mpi_lake_model.py From fc3fff496989a21c12ef2acf223a1c5263ba4647 Mon Sep 17 00:00:00 2001 From: Jan Kwakkel Date: Wed, 20 Mar 2024 20:57:31 +0100 Subject: [PATCH 03/22] additional method logging --- ema_workbench/em_framework/futures_mpi.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/ema_workbench/em_framework/futures_mpi.py b/ema_workbench/em_framework/futures_mpi.py index 4ea090c99..89659ccf1 100644 --- a/ema_workbench/em_framework/futures_mpi.py +++ b/ema_workbench/em_framework/futures_mpi.py @@ -14,7 +14,7 @@ from .util import NamedObjectMap from .model import AbstractModel from .experiment_runner import ExperimentRunner -from ..util import get_module_logger, get_rootlogger +from ..util import get_module_logger, get_rootlogger, method_logger from ..util import ema_logging @@ -154,6 +154,7 @@ def __init__(self, msis, n_processes=None, **kwargs): self.stop_event = None self.n_processes = n_processes + @method_logger(__name__) def initialize(self): # Only import mpi4py if the MPIEvaluator is used, to avoid unnecessary dependencies. from mpi4py.futures import MPIPoolExecutor @@ -178,6 +179,7 @@ def initialize(self): ) return self + @method_logger(__name__) def finalize(self): self._pool.shutdown() self.stop_event.set() @@ -189,6 +191,7 @@ def finalize(self): time.sleep(0.1) _logger.info("MPI pool has been shut down") + @method_logger(__name__) def evaluate_experiments(self, scenarios, policies, callback, combine="factorial", **kwargs): ex_gen = experiment_generator(scenarios, self._msis, policies, combine=combine) experiments = list(ex_gen) From 28411d53a48083570b1df5753362a5f7ba5680fc Mon Sep 17 00:00:00 2001 From: Jan Kwakkel Date: Wed, 20 Mar 2024 21:16:39 +0100 Subject: [PATCH 04/22] additional log message on closing logwatcher --- ema_workbench/em_framework/futures_mpi.py | 2 ++ ema_workbench/examples/slurm_script.sh | 5 +++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/ema_workbench/em_framework/futures_mpi.py b/ema_workbench/em_framework/futures_mpi.py index 89659ccf1..9cc3eb6dd 100644 --- a/ema_workbench/em_framework/futures_mpi.py +++ b/ema_workbench/em_framework/futures_mpi.py @@ -101,6 +101,8 @@ def logwatcher(stop_event): raise e else: logger.callHandlers(record) + else: + _logger.debug("closing logwatchter") def run_experiment_mpi(experiment): diff --git a/ema_workbench/examples/slurm_script.sh b/ema_workbench/examples/slurm_script.sh index 60b6479e6..395961529 100644 --- a/ema_workbench/examples/slurm_script.sh +++ b/ema_workbench/examples/slurm_script.sh @@ -2,7 +2,7 @@ #SBATCH --job-name="Python_test" #SBATCH --time=00:06:00 -#SBATCH --ntasks=10 +#SBATCH --ntasks=8 #SBATCH --cpus-per-task=1 #SBATCH --partition=compute #SBATCH --mem-per-cpu=4GB @@ -17,6 +17,7 @@ module load py-mpi4py module load py-pip pip install ipyparallel -pip install --user ema_workbench +pip install --user -e git+https://github.com/quaquel/EMAworkbench@mpi_fixes#egg=ema_workbench mpiexec -n 1 python3 example_mpi_lake_model.py + From 123337eed36e03a66d8eda253d9e29c43f90f86e Mon Sep 17 00:00:00 2001 From: Jan Kwakkel Date: Wed, 20 Mar 2024 21:22:08 +0100 Subject: [PATCH 05/22] make logwatcher thread non deamonic --- ema_workbench/em_framework/futures_mpi.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/ema_workbench/em_framework/futures_mpi.py b/ema_workbench/em_framework/futures_mpi.py index 9cc3eb6dd..feef6d4bf 100644 --- a/ema_workbench/em_framework/futures_mpi.py +++ b/ema_workbench/em_framework/futures_mpi.py @@ -163,7 +163,7 @@ def initialize(self): self.stop_event = threading.Event() self.logwatcher_thread = threading.Thread( - name="logwatcher", target=logwatcher, daemon=True, args=(self.stop_event,) + name="logwatcher", target=logwatcher, daemon=False, args=(self.stop_event,) ) self.logwatcher_thread.start() @@ -185,6 +185,11 @@ def initialize(self): def finalize(self): self._pool.shutdown() self.stop_event.set() + self.logwatcher_thread.join(timeout=60) + + if self.logwatcher_thread.is_alive(): + _logger.warning(f"houston we have a problem") + _logger.info("MPI pool has been shut down") if self.root_dir: @@ -195,8 +200,7 @@ def finalize(self): @method_logger(__name__) def evaluate_experiments(self, scenarios, policies, callback, combine="factorial", **kwargs): - ex_gen = experiment_generator(scenarios, self._msis, policies, combine=combine) - experiments = list(ex_gen) + experiments = list(experiment_generator(scenarios, self._msis, policies, combine=combine)) results = self._pool.map(run_experiment_mpi, experiments, **kwargs) for experiment, outcomes in results: From 279b477892730c65854f64bcc1d26cf00851aa72 Mon Sep 17 00:00:00 2001 From: Jan Kwakkel Date: Wed, 20 Mar 2024 21:48:14 +0100 Subject: [PATCH 06/22] Update futures_mpi.py --- ema_workbench/em_framework/futures_mpi.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ema_workbench/em_framework/futures_mpi.py b/ema_workbench/em_framework/futures_mpi.py index feef6d4bf..2c7a857d9 100644 --- a/ema_workbench/em_framework/futures_mpi.py +++ b/ema_workbench/em_framework/futures_mpi.py @@ -102,7 +102,7 @@ def logwatcher(stop_event): else: logger.callHandlers(record) else: - _logger.debug("closing logwatchter") + _logger.info("closing logwatcher") def run_experiment_mpi(experiment): From b9119f73a9ce0e9b4633da1eb558c70f9abdc933 Mon Sep 17 00:00:00 2001 From: Jan Kwakkel Date: Thu, 28 Mar 2024 09:19:04 +0100 Subject: [PATCH 07/22] shift to non-blocking mpi receive --- ema_workbench/em_framework/futures_mpi.py | 35 ++++++++++++++++--- .../examples/example_mpi_lake_model.py | 3 ++ 2 files changed, 34 insertions(+), 4 deletions(-) diff --git a/ema_workbench/em_framework/futures_mpi.py b/ema_workbench/em_framework/futures_mpi.py index 2c7a857d9..0ba4b2e40 100644 --- a/ema_workbench/em_framework/futures_mpi.py +++ b/ema_workbench/em_framework/futures_mpi.py @@ -91,19 +91,46 @@ def logwatcher(stop_event): _logger.debug("client connected...") while not stop_event.is_set(): - if rank == root: - record = comm.recv(None, MPI.ANY_SOURCE, tag=0) + record = comm.irecv(None, MPI.ANY_SOURCE, tag=0) + success = False + message = None + start_time = time.time() + + while not success or ( + (time.time() - start_time()) > 10 + ): # what is a good timeout number given initialization of worker processes? + success, message = record.test() + time.sleep(1) + + if success: try: - logger = logging.getLogger(record.name) + logger = logging.getLogger(message.name) except Exception as e: # AttributeError if record does not have a name attribute # TypeError record.name is not a string raise e else: - logger.callHandlers(record) + logger.callHandlers(message) + else: + record.cancel() + record.Free() else: _logger.info("closing logwatcher") + # while not stop_event.is_set(): + # if rank == root: + # record = comm.recv(None, MPI.ANY_SOURCE, tag=0) + # try: + # logger = logging.getLogger(record.name) + # except Exception as e: + # # AttributeError if record does not have a name attribute + # # TypeError record.name is not a string + # raise e + # else: + # logger.callHandlers(record) + # else: + # _logger.info("closing logwatcher") + def run_experiment_mpi(experiment): _logger.debug(f"starting {experiment.experiment_id}") diff --git a/ema_workbench/examples/example_mpi_lake_model.py b/ema_workbench/examples/example_mpi_lake_model.py index d23c1703a..99e92ba68 100644 --- a/ema_workbench/examples/example_mpi_lake_model.py +++ b/ema_workbench/examples/example_mpi_lake_model.py @@ -83,10 +83,13 @@ def lake_problem( if __name__ == "__main__": + import ema_workbench + # run with mpiexec -n 1 -usize {ntasks} python example_mpi_lake_model.py starttime = time.perf_counter() ema_logging.log_to_stderr(ema_logging.INFO, pass_root_logger_level=True) + ema_logging.get_rootlogger().info(f"{ema_workbench.__version__}") # instantiate the model lake_model = Model("lakeproblem", function=lake_problem) From 3165d306c6fabc587a365308cea3cd91d6313709 Mon Sep 17 00:00:00 2001 From: Jan Kwakkel Date: Thu, 28 Mar 2024 09:28:33 +0100 Subject: [PATCH 08/22] typo fix --- ema_workbench/em_framework/futures_mpi.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ema_workbench/em_framework/futures_mpi.py b/ema_workbench/em_framework/futures_mpi.py index 0ba4b2e40..6e1c1803e 100644 --- a/ema_workbench/em_framework/futures_mpi.py +++ b/ema_workbench/em_framework/futures_mpi.py @@ -97,7 +97,7 @@ def logwatcher(stop_event): start_time = time.time() while not success or ( - (time.time() - start_time()) > 10 + (time.time() - start_time) > 10 ): # what is a good timeout number given initialization of worker processes? success, message = record.test() time.sleep(1) From 8d8eefc1bb0a7412191e0c28de96bb1ce65757f2 Mon Sep 17 00:00:00 2001 From: Jan Kwakkel Date: Thu, 28 Mar 2024 11:21:57 +0100 Subject: [PATCH 09/22] try sentinel approach --- ema_workbench/em_framework/futures_mpi.py | 77 ++++++++++++++--------- 1 file changed, 46 insertions(+), 31 deletions(-) diff --git a/ema_workbench/em_framework/futures_mpi.py b/ema_workbench/em_framework/futures_mpi.py index 6e1c1803e..9791b5a49 100644 --- a/ema_workbench/em_framework/futures_mpi.py +++ b/ema_workbench/em_framework/futures_mpi.py @@ -90,47 +90,51 @@ def logwatcher(stop_event): comm = MPI.COMM_WORLD.Accept(port, info, root) _logger.debug("client connected...") - while not stop_event.is_set(): - record = comm.irecv(None, MPI.ANY_SOURCE, tag=0) - success = False - message = None - start_time = time.time() - - while not success or ( - (time.time() - start_time) > 10 - ): # what is a good timeout number given initialization of worker processes? - success, message = record.test() - time.sleep(1) - - if success: - try: - logger = logging.getLogger(message.name) - except Exception as e: - # AttributeError if record does not have a name attribute - # TypeError record.name is not a string - raise e - else: - logger.callHandlers(message) - else: - record.cancel() - record.Free() - else: - _logger.info("closing logwatcher") - # while not stop_event.is_set(): - # if rank == root: - # record = comm.recv(None, MPI.ANY_SOURCE, tag=0) + # record = comm.irecv(None, MPI.ANY_SOURCE, tag=0) + # success = False + # message = None + # start_time = time.time() + # + # while not success or ( + # (time.time() - start_time) > 10 + # ): # what is a good timeout number given initialization of worker processes? + # success, message = record.test() + # time.sleep(1) + # + # if success: # try: - # logger = logging.getLogger(record.name) + # logger = logging.getLogger(message.name) # except Exception as e: # # AttributeError if record does not have a name attribute # # TypeError record.name is not a string # raise e # else: - # logger.callHandlers(record) + # logger.callHandlers(message) + # else: + # record.cancel() + # record.Free() # else: # _logger.info("closing logwatcher") + while not stop_event.is_set(): + if rank == root: + record = comm.recv(None, MPI.ANY_SOURCE, tag=0) + try: + logger = logging.getLogger(record.name) + except Exception as e: + if record.msg is None: + _logger.info("received sentinel") + break + else: + # AttributeError if record does not have a name attribute + # TypeError record.name is not a string + raise e + else: + logger.callHandlers(record) + else: + _logger.info("closing logwatcher") + def run_experiment_mpi(experiment): _logger.debug(f"starting {experiment.experiment_id}") @@ -142,6 +146,14 @@ def run_experiment_mpi(experiment): return experiment, outcomes +def send_sentinel(): + record = logging.makeLogRecord({}) + + for handler in _logger.handlers: + if isinstance(handler, MPIHandler): + handler.emit(record) + + class MPIHandler(QueueHandler): """ This handler sends events from the worker process to the master process @@ -210,6 +222,9 @@ def initialize(self): @method_logger(__name__) def finalize(self): + # submit sentinel + self._pool.submit() + self._pool.shutdown() self.stop_event.set() self.logwatcher_thread.join(timeout=60) From 36a6387ce8438e9275b3297a257ce3dc42832e26 Mon Sep 17 00:00:00 2001 From: Jan Kwakkel Date: Thu, 28 Mar 2024 11:25:08 +0100 Subject: [PATCH 10/22] me being stupid --- ema_workbench/em_framework/futures_mpi.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ema_workbench/em_framework/futures_mpi.py b/ema_workbench/em_framework/futures_mpi.py index 9791b5a49..a972a0c13 100644 --- a/ema_workbench/em_framework/futures_mpi.py +++ b/ema_workbench/em_framework/futures_mpi.py @@ -223,7 +223,7 @@ def initialize(self): @method_logger(__name__) def finalize(self): # submit sentinel - self._pool.submit() + self._pool.submit(send_sentinel) self._pool.shutdown() self.stop_event.set() From 8375b4777b5f71a7d939b1a71b62a137d83a850b Mon Sep 17 00:00:00 2001 From: Jan Kwakkel Date: Thu, 28 Mar 2024 11:34:20 +0100 Subject: [PATCH 11/22] some more tests --- ema_workbench/em_framework/futures_mpi.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ema_workbench/em_framework/futures_mpi.py b/ema_workbench/em_framework/futures_mpi.py index a972a0c13..9594c705e 100644 --- a/ema_workbench/em_framework/futures_mpi.py +++ b/ema_workbench/em_framework/futures_mpi.py @@ -129,6 +129,7 @@ def logwatcher(stop_event): else: # AttributeError if record does not have a name attribute # TypeError record.name is not a string + _logger.info("some exception {e}") raise e else: logger.callHandlers(record) @@ -148,6 +149,7 @@ def run_experiment_mpi(experiment): def send_sentinel(): record = logging.makeLogRecord({}) + _logger.info("sending sentinel") for handler in _logger.handlers: if isinstance(handler, MPIHandler): @@ -223,7 +225,7 @@ def initialize(self): @method_logger(__name__) def finalize(self): # submit sentinel - self._pool.submit(send_sentinel) + f = self._pool.submit(send_sentinel) self._pool.shutdown() self.stop_event.set() @@ -232,8 +234,6 @@ def finalize(self): if self.logwatcher_thread.is_alive(): _logger.warning(f"houston we have a problem") - _logger.info("MPI pool has been shut down") - if self.root_dir: shutil.rmtree(self.root_dir) From 3abc5c223266d95b6d9783e709c018e0b91985ab Mon Sep 17 00:00:00 2001 From: Jan Kwakkel Date: Thu, 28 Mar 2024 11:40:46 +0100 Subject: [PATCH 12/22] Update futures_mpi.py --- ema_workbench/em_framework/futures_mpi.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ema_workbench/em_framework/futures_mpi.py b/ema_workbench/em_framework/futures_mpi.py index 9594c705e..215b7edd6 100644 --- a/ema_workbench/em_framework/futures_mpi.py +++ b/ema_workbench/em_framework/futures_mpi.py @@ -148,11 +148,11 @@ def run_experiment_mpi(experiment): def send_sentinel(): - record = logging.makeLogRecord({}) - _logger.info("sending sentinel") + record = logging.makeLogRecord(dict(level=logging.CRITICAL, msg=None)) for handler in _logger.handlers: if isinstance(handler, MPIHandler): + _logger.info("sending sentinel") handler.emit(record) From c8d5cc2a977118632d94fc613aedc305248607a2 Mon Sep 17 00:00:00 2001 From: Jan Kwakkel Date: Thu, 28 Mar 2024 11:45:23 +0100 Subject: [PATCH 13/22] Update futures_mpi.py --- ema_workbench/em_framework/futures_mpi.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ema_workbench/em_framework/futures_mpi.py b/ema_workbench/em_framework/futures_mpi.py index 215b7edd6..c6cb8375d 100644 --- a/ema_workbench/em_framework/futures_mpi.py +++ b/ema_workbench/em_framework/futures_mpi.py @@ -150,7 +150,7 @@ def run_experiment_mpi(experiment): def send_sentinel(): record = logging.makeLogRecord(dict(level=logging.CRITICAL, msg=None)) - for handler in _logger.handlers: + for handler in get_rootlogger().handlers: if isinstance(handler, MPIHandler): _logger.info("sending sentinel") handler.emit(record) From 979cfa448ca973db52ba960c98dfd3cf7f08f970 Mon Sep 17 00:00:00 2001 From: Jan Kwakkel Date: Thu, 28 Mar 2024 11:49:13 +0100 Subject: [PATCH 14/22] Update futures_mpi.py --- ema_workbench/em_framework/futures_mpi.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/ema_workbench/em_framework/futures_mpi.py b/ema_workbench/em_framework/futures_mpi.py index c6cb8375d..aae1ed591 100644 --- a/ema_workbench/em_framework/futures_mpi.py +++ b/ema_workbench/em_framework/futures_mpi.py @@ -225,10 +225,9 @@ def initialize(self): @method_logger(__name__) def finalize(self): # submit sentinel - f = self._pool.submit(send_sentinel) - - self._pool.shutdown() self.stop_event.set() + self._pool.submit(send_sentinel) + self._pool.shutdown() self.logwatcher_thread.join(timeout=60) if self.logwatcher_thread.is_alive(): From a87b3a852b96076d0e2c543f7e960e4c64e23a5d Mon Sep 17 00:00:00 2001 From: Jan Kwakkel Date: Thu, 28 Mar 2024 11:52:34 +0100 Subject: [PATCH 15/22] change log message level --- ema_workbench/em_framework/futures_mpi.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ema_workbench/em_framework/futures_mpi.py b/ema_workbench/em_framework/futures_mpi.py index aae1ed591..5cf77e233 100644 --- a/ema_workbench/em_framework/futures_mpi.py +++ b/ema_workbench/em_framework/futures_mpi.py @@ -152,7 +152,7 @@ def send_sentinel(): for handler in get_rootlogger().handlers: if isinstance(handler, MPIHandler): - _logger.info("sending sentinel") + _logger.debug("sending sentinel") handler.emit(record) From 6b446774a5c19a476c70fdff7fae33d904d8ba97 Mon Sep 17 00:00:00 2001 From: Jan Kwakkel Date: Thu, 28 Mar 2024 13:20:23 +0100 Subject: [PATCH 16/22] Update futures_mpi.py --- ema_workbench/em_framework/futures_mpi.py | 29 +---------------------- 1 file changed, 1 insertion(+), 28 deletions(-) diff --git a/ema_workbench/em_framework/futures_mpi.py b/ema_workbench/em_framework/futures_mpi.py index 5cf77e233..a5b117602 100644 --- a/ema_workbench/em_framework/futures_mpi.py +++ b/ema_workbench/em_framework/futures_mpi.py @@ -90,46 +90,19 @@ def logwatcher(stop_event): comm = MPI.COMM_WORLD.Accept(port, info, root) _logger.debug("client connected...") - # while not stop_event.is_set(): - # record = comm.irecv(None, MPI.ANY_SOURCE, tag=0) - # success = False - # message = None - # start_time = time.time() - # - # while not success or ( - # (time.time() - start_time) > 10 - # ): # what is a good timeout number given initialization of worker processes? - # success, message = record.test() - # time.sleep(1) - # - # if success: - # try: - # logger = logging.getLogger(message.name) - # except Exception as e: - # # AttributeError if record does not have a name attribute - # # TypeError record.name is not a string - # raise e - # else: - # logger.callHandlers(message) - # else: - # record.cancel() - # record.Free() - # else: - # _logger.info("closing logwatcher") - while not stop_event.is_set(): if rank == root: record = comm.recv(None, MPI.ANY_SOURCE, tag=0) try: logger = logging.getLogger(record.name) except Exception as e: + _logger.info(repr(record)) if record.msg is None: _logger.info("received sentinel") break else: # AttributeError if record does not have a name attribute # TypeError record.name is not a string - _logger.info("some exception {e}") raise e else: logger.callHandlers(record) From b2be0b44204888e81b4eeb27f0e7d53a642d59de Mon Sep 17 00:00:00 2001 From: Jan Kwakkel Date: Thu, 28 Mar 2024 13:27:33 +0100 Subject: [PATCH 17/22] Update futures_mpi.py --- ema_workbench/em_framework/futures_mpi.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ema_workbench/em_framework/futures_mpi.py b/ema_workbench/em_framework/futures_mpi.py index a5b117602..c560b1648 100644 --- a/ema_workbench/em_framework/futures_mpi.py +++ b/ema_workbench/em_framework/futures_mpi.py @@ -121,7 +121,7 @@ def run_experiment_mpi(experiment): def send_sentinel(): - record = logging.makeLogRecord(dict(level=logging.CRITICAL, msg=None)) + record = logging.makeLogRecord(dict(level=logging.CRITICAL, msg=None, name=42)) for handler in get_rootlogger().handlers: if isinstance(handler, MPIHandler): From 7c0eb9c4e20329b15daf9c60a775636d72cc81ca Mon Sep 17 00:00:00 2001 From: Jan Kwakkel Date: Thu, 28 Mar 2024 13:54:09 +0100 Subject: [PATCH 18/22] Update futures_mpi.py --- ema_workbench/em_framework/futures_mpi.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ema_workbench/em_framework/futures_mpi.py b/ema_workbench/em_framework/futures_mpi.py index c560b1648..b6f0db9d1 100644 --- a/ema_workbench/em_framework/futures_mpi.py +++ b/ema_workbench/em_framework/futures_mpi.py @@ -126,7 +126,8 @@ def send_sentinel(): for handler in get_rootlogger().handlers: if isinstance(handler, MPIHandler): _logger.debug("sending sentinel") - handler.emit(record) + # handler.emit(record) + handler.communicator.send(record, 0, 0) class MPIHandler(QueueHandler): From 9260bfee7f2349ed9d2b399938e3dcc8dfdfa30d Mon Sep 17 00:00:00 2001 From: Jan Kwakkel Date: Thu, 28 Mar 2024 15:00:34 +0100 Subject: [PATCH 19/22] Update futures_mpi.py --- ema_workbench/em_framework/futures_mpi.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/ema_workbench/em_framework/futures_mpi.py b/ema_workbench/em_framework/futures_mpi.py index b6f0db9d1..9423a8fc0 100644 --- a/ema_workbench/em_framework/futures_mpi.py +++ b/ema_workbench/em_framework/futures_mpi.py @@ -106,8 +106,8 @@ def logwatcher(stop_event): raise e else: logger.callHandlers(record) - else: - _logger.info("closing logwatcher") + + _logger.info("closing logwatcher") def run_experiment_mpi(experiment): @@ -126,7 +126,6 @@ def send_sentinel(): for handler in get_rootlogger().handlers: if isinstance(handler, MPIHandler): _logger.debug("sending sentinel") - # handler.emit(record) handler.communicator.send(record, 0, 0) From 6b48e3e34cd5060832b062f9ce29d255121682b8 Mon Sep 17 00:00:00 2001 From: Jan Kwakkel Date: Thu, 28 Mar 2024 15:09:50 +0100 Subject: [PATCH 20/22] cleaning up log messages --- ema_workbench/em_framework/futures_mpi.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ema_workbench/em_framework/futures_mpi.py b/ema_workbench/em_framework/futures_mpi.py index 9423a8fc0..4c955c7f2 100644 --- a/ema_workbench/em_framework/futures_mpi.py +++ b/ema_workbench/em_framework/futures_mpi.py @@ -96,9 +96,8 @@ def logwatcher(stop_event): try: logger = logging.getLogger(record.name) except Exception as e: - _logger.info(repr(record)) if record.msg is None: - _logger.info("received sentinel") + _logger.debug("received sentinel") break else: # AttributeError if record does not have a name attribute From 32ec39a1ea0fdc17cc907c20943e9b88dcb8b7fa Mon Sep 17 00:00:00 2001 From: Jan Kwakkel Date: Thu, 28 Mar 2024 19:29:43 +0100 Subject: [PATCH 21/22] add explicit wait for starting logwatcher service before launching pool --- ema_workbench/em_framework/futures_mpi.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/ema_workbench/em_framework/futures_mpi.py b/ema_workbench/em_framework/futures_mpi.py index 4c955c7f2..3a88d9ee6 100644 --- a/ema_workbench/em_framework/futures_mpi.py +++ b/ema_workbench/em_framework/futures_mpi.py @@ -71,7 +71,7 @@ def mpi_initializer(models, log_level, root_dir): root_logger.info(f"worker {rank} initialized") -def logwatcher(stop_event): +def logwatcher(start_event, stop_event): from mpi4py import MPI rank = MPI.COMM_WORLD.Get_rank() @@ -84,6 +84,7 @@ def logwatcher(stop_event): service = "logwatcher" MPI.Publish_name(service, info, port) _logger.debug(f"published service: {service}") + start_event.set() root = 0 _logger.debug("waiting for client connection...") @@ -174,11 +175,19 @@ def initialize(self): # Only import mpi4py if the MPIEvaluator is used, to avoid unnecessary dependencies. from mpi4py.futures import MPIPoolExecutor + start_event = threading.Event() self.stop_event = threading.Event() self.logwatcher_thread = threading.Thread( - name="logwatcher", target=logwatcher, daemon=False, args=(self.stop_event,) + name="logwatcher", + target=logwatcher, + daemon=False, + args=( + start_event, + self.stop_event, + ), ) self.logwatcher_thread.start() + start_event.wait() self.root_dir = determine_rootdir(self._msis) self._pool = MPIPoolExecutor( From 40979c46990f51ff35f8ce014e8fc8b52cb6366d Mon Sep 17 00:00:00 2001 From: Jan Kwakkel Date: Thu, 28 Mar 2024 19:38:00 +0100 Subject: [PATCH 22/22] Update futures_mpi.py --- ema_workbench/em_framework/futures_mpi.py | 1 + 1 file changed, 1 insertion(+) diff --git a/ema_workbench/em_framework/futures_mpi.py b/ema_workbench/em_framework/futures_mpi.py index 3a88d9ee6..0c613b8b2 100644 --- a/ema_workbench/em_framework/futures_mpi.py +++ b/ema_workbench/em_framework/futures_mpi.py @@ -188,6 +188,7 @@ def initialize(self): ) self.logwatcher_thread.start() start_event.wait() + _logger.info("logwatcher server started") self.root_dir = determine_rootdir(self._msis) self._pool = MPIPoolExecutor(