Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] MPI update #328

Merged
merged 47 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
1739302
change to log message and log level in feature scoring
quaquel Nov 20, 2023
d22414a
adds a simple example using the MPI evaluator
quaquel Nov 27, 2023
be58e00
first step for making MPI work with WorkingDirectoryModels
quaquel Nov 27, 2023
d088b48
Update example_mpi_lake_model.py
quaquel Nov 27, 2023
506999b
ongoing work
quaquel Dec 1, 2023
85ac783
reorganizing evalutor code and renaming of modules
quaquel Dec 1, 2023
2681924
fixes location of mpi tests
quaquel Dec 1, 2023
f69cf80
some mocking fixes
quaquel Dec 1, 2023
ff6d6d6
Update futures_mpi.py
quaquel Dec 1, 2023
4fede84
Update futures_multiprocessing.py
quaquel Dec 1, 2023
510c4fd
attemp for doc fix
quaquel Dec 1, 2023
7e19066
and one more
quaquel Dec 1, 2023
222d4f5
another attempt
quaquel Dec 1, 2023
278b684
Update example_mpi_lake_model.py
quaquel Dec 1, 2023
284ecd6
Merge branch 'reorganization' into mpi_update
quaquel Dec 1, 2023
ca0d66a
cleanup and merging reorganization into mpi_update
quaquel Dec 1, 2023
0d144d9
logging works
quaquel Dec 1, 2023
d87f3f0
unit testing for mpi logging
quaquel Dec 2, 2023
5fd2214
Update ci.yml
quaquel Dec 2, 2023
4953b03
Update futures_util.py
quaquel Dec 2, 2023
59d8c6c
unit test for intializer
quaquel Dec 2, 2023
e7b1b30
more unit tests
quaquel Dec 3, 2023
39b004a
more unit tests
quaquel Dec 3, 2023
04d02d8
backport from mpi_update
quaquel Dec 3, 2023
9f26e71
Merge branch 'reorganization' into mpi_update
quaquel Dec 3, 2023
88c1318
Update test_futures_mpi.py
quaquel Dec 3, 2023
d365ab0
Merge branch 'master' into mpi_update
quaquel Dec 4, 2023
7ec29fc
Update __init__.py
quaquel Dec 6, 2023
d5b6158
start of testing in delfblue
quaquel Dec 6, 2023
042667d
another attempted fix
quaquel Dec 6, 2023
d340cf8
temporary disabling loggin
quaquel Dec 6, 2023
def91f5
more delftblue testing
quaquel Dec 7, 2023
68c8a0b
more testing
quaquel Dec 7, 2023
0d6017e
Update futures_mpi.py
quaquel Dec 7, 2023
94b03e4
reenable logging
quaquel Dec 7, 2023
d9b9594
Update futures_mpi.py
quaquel Dec 7, 2023
d07571e
another attempt
quaquel Dec 7, 2023
f233075
Update futures_mpi.py
quaquel Dec 7, 2023
f60973d
Update futures_mpi.py
quaquel Dec 7, 2023
d0d43fa
Update futures_mpi.py
quaquel Dec 7, 2023
41839bf
updated mpi tutorial
quaquel Dec 8, 2023
a185fd0
make it possible to control chunksize
quaquel Dec 8, 2023
bb7f245
Merge branch 'master' into mpi_update
quaquel Dec 8, 2023
9959e87
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Dec 8, 2023
a690592
Update test_futures_mpi.py
quaquel Dec 8, 2023
8633407
Merge branch 'mpi_update' of https://github.com/quaquel/EMAworkbench …
quaquel Dec 8, 2023
7482825
test work again
quaquel Dec 8, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ jobs:
run:
pytest --ignore=./test/test_connectors -v --cov=ema_workbench/em_framework --cov=ema_workbench/util --cov=ema_workbench/analysis
- name: Coveralls
if: matrix.os == 'ubuntu-latest' && matrix.python-version == '3.10'
if: matrix.os == 'ubuntu-latest' && matrix.python-version == '3.11'
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: coveralls --service=github
Expand Down
295 changes: 158 additions & 137 deletions docs/source/indepth_tutorial/mpi-evaluator.ipynb

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions ema_workbench/em_framework/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
SequentialEvaluator,
Samplers,
)

from .optimization import (
Convergence,
EpsilonProgress,
Expand Down
9 changes: 7 additions & 2 deletions ema_workbench/em_framework/evaluators.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def finalize(self):
"""finalize the evaluator"""
raise NotImplementedError

def evaluate_experiments(self, scenarios, policies, callback, combine="factorial"):
def evaluate_experiments(self, scenarios, policies, callback, combine="factorial", **kwargs):
"""used by ema_workbench"""
raise NotImplementedError

Expand Down Expand Up @@ -181,6 +181,7 @@ def perform_experiments(
lever_sampling=Samplers.LHS,
callback=None,
combine="factorial",
**kwargs,
):
"""convenience method for performing experiments.

Expand All @@ -202,6 +203,7 @@ def perform_experiments(
lever_sampling=lever_sampling,
callback=callback,
combine=combine,
**kwargs,
)

def optimize(
Expand Down Expand Up @@ -307,6 +309,7 @@ def perform_experiments(
return_callback=False,
combine="factorial",
log_progress=False,
**kwargs,
):
"""sample uncertainties and levers, and perform the resulting experiments
on each of the models
Expand Down Expand Up @@ -336,6 +339,8 @@ def perform_experiments(
then combined by cycling over the shortest of the the two sets
of designs until the longest set of designs is exhausted.

Additional keyword arguments are passed on to evaluate_experiments of the evaluator

Returns
-------
tuple
Expand Down Expand Up @@ -397,7 +402,7 @@ def perform_experiments(
if not evaluator:
evaluator = SequentialEvaluator(models)

evaluator.evaluate_experiments(scenarios, policies, callback, combine=combine)
evaluator.evaluate_experiments(scenarios, policies, callback, combine=combine, **kwargs)

if callback.i != nr_of_exp:
raise EMAError(
Expand Down
4 changes: 2 additions & 2 deletions ema_workbench/em_framework/futures_ipyparallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,11 +369,11 @@ def finalize(self):
self.logwatcher.stop()
cleanup(self.client)

def evaluate_experiments(self, scenarios, policies, callback, combine="factorial"):
def evaluate_experiments(self, scenarios, policies, callback, combine="factorial", **kwargs):
ex_gen = experiment_generator(scenarios, self._msis, policies, combine=combine)

lb_view = self.client.load_balanced_view()
results = lb_view.map(_run_experiment, ex_gen, ordered=False, block=False)
results = lb_view.map(_run_experiment, ex_gen, ordered=False, block=False, **kwargs)

for entry in results:
callback(*entry)
163 changes: 143 additions & 20 deletions ema_workbench/em_framework/futures_mpi.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,144 @@
import atexit
import copy
import logging
import os
import shutil
import threading
import time
import warnings

from logging.handlers import QueueHandler

from .evaluators import BaseEvaluator, experiment_generator
from .futures_util import setup_working_directories, finalizer, determine_rootdir
from .util import NamedObjectMap
from .model import AbstractModel
from .experiment_runner import ExperimentRunner
from ..util import get_module_logger
from ..util import get_module_logger, get_rootlogger

from ..util import ema_logging

__all__ = ["MPIEvaluator"]

_logger = get_module_logger(__name__)

experiment_runner = None

def run_experiment_mpi(packed_data):
from mpi4py.MPI import COMM_WORLD

rank = COMM_WORLD.Get_rank()
class RankFilter(logging.Filter):
"""Filter for adding mpi rank to log message"""

experiment, model_name, msis = packed_data
_logger.debug(f"MPI Rank {rank}: starting {repr(experiment)}")
def __init__(self, rank):
super().__init__()
self.rank = rank

models = NamedObjectMap(AbstractModel)
models.extend(msis)
experiment_runner = ExperimentRunner(models)
def filter(self, record):
record.rank = self.rank
return True


def mpi_initializer(models, log_level, root_dir):
global experiment_runner
from mpi4py import MPI

rank = MPI.COMM_WORLD.Get_rank()

# setup the experiment runner
msis = NamedObjectMap(AbstractModel)
msis.extend(models)
experiment_runner = ExperimentRunner(msis)

# setup the logging
info = MPI.INFO_NULL
service = "logwatcher"
port = MPI.Lookup_name(service)
logcomm = MPI.COMM_WORLD.Connect(port, info, 0)

root_logger = get_rootlogger()

handler = MPIHandler(logcomm)
handler.addFilter(RankFilter(rank))
handler.setLevel(log_level)
handler.setFormatter(logging.Formatter("[worker %(rank)s/%(levelname)s] %(message)s"))
root_logger.addHandler(handler)

# setup the working directories
tmpdir = setup_working_directories(models, root_dir)
if tmpdir:
atexit.register(finalizer, os.path.abspath(tmpdir))

# _logger.info(f"worker {rank} initialized")
root_logger.info(f"worker {rank} initialized")


def logwatcher(stop_event):
from mpi4py import MPI

rank = MPI.COMM_WORLD.Get_rank()

info = MPI.INFO_NULL
port = MPI.Open_port(info)
# print(f"client: {rank} {port}")
_logger.debug(f"opened port: {port}")

service = "logwatcher"
MPI.Publish_name(service, info, port)
_logger.debug(f"published service: {service}")

root = 0
_logger.debug("waiting for client connection...")
comm = MPI.COMM_WORLD.Accept(port, info, root)
_logger.debug("client connected...")

while not stop_event.is_set():
if rank == root:
record = comm.recv(None, MPI.ANY_SOURCE, tag=0)
try:
logger = logging.getLogger(record.name)
except Exception as e:
# AttributeError if record does not have a name attribute
# TypeError record.name is not a string
raise e
else:
logger.callHandlers(record)


def run_experiment_mpi(experiment):
_logger.debug(f"starting {experiment.experiment_id}")

outcomes = experiment_runner.run_experiment(experiment)

_logger.debug(f"MPI Rank {rank}: completed {experiment}")
_logger.debug(f"completed {experiment.experiment_id}")

return experiment, outcomes


class MPIHandler(QueueHandler):
"""
This handler sends events from the worker process to the master process

"""

def __init__(self, communicator):
"""
Initialise an instance, using the passed queue.
"""
logging.Handler.__init__(self)
self.communicator = communicator

def emit(self, record):
"""
Emit a record.

Writes the LogRecord to the queue, preparing it for pickling first.
"""
record = self.prepare(record)
try:
self.communicator.send(record, 0, 0)
except Exception:
self.handleError(record)


class MPIEvaluator(BaseEvaluator):
"""Evaluator for experiments using MPI Pool Executor from mpi4py"""

Expand All @@ -43,12 +150,27 @@ def __init__(self, msis, n_processes=None, **kwargs):
FutureWarning,
)
self._pool = None
self.root_dir = None
self.stop_event = None
self.n_processes = n_processes

def initialize(self):
# Only import mpi4py if the MPIEvaluator is used, to avoid unnecessary dependencies.
from mpi4py.futures import MPIPoolExecutor

self.stop_event = threading.Event()
self.logwatcher_thread = threading.Thread(
name="logwatcher", target=logwatcher, daemon=True, args=(self.stop_event,)
)
self.logwatcher_thread.start()

self.root_dir = determine_rootdir(self._msis)
self._pool = MPIPoolExecutor(
max_workers=self.n_processes,
initializer=mpi_initializer,
initargs=(self._msis, _logger.level, self.root_dir),
) # Removed initializer arguments

self._pool = MPIPoolExecutor(max_workers=self.n_processes) # Removed initializer arguments
_logger.info(f"MPI pool started with {self._pool._max_workers} workers")
if self._pool._max_workers <= 10:
Expand All @@ -59,20 +181,21 @@ def initialize(self):

def finalize(self):
self._pool.shutdown()
self.stop_event.set()
_logger.info("MPI pool has been shut down")

def evaluate_experiments(self, scenarios, policies, callback, combine="factorial"):
ex_gen = experiment_generator(scenarios, self._msis, policies, combine=combine)
experiments = list(ex_gen)
if self.root_dir:
shutil.rmtree(self.root_dir)

packed = [(experiment, experiment.model_name, self._msis) for experiment in experiments]
time.sleep(0.1)
_logger.info("MPI pool has been shut down")

_logger.info(
f"MPIEvaluator: Starting {len(packed)} experiments using MPI pool with {self._pool._max_workers} workers"
)
results = self._pool.map(run_experiment_mpi, packed)
def evaluate_experiments(self, scenarios, policies, callback, combine="factorial", **kwargs):
ex_gen = experiment_generator(scenarios, self._msis, policies, combine=combine)
experiments = list(ex_gen)

_logger.info(f"MPIEvaluator: Completed all {len(packed)} experiments")
results = self._pool.map(run_experiment_mpi, experiments, **kwargs)
for experiment, outcomes in results:
callback(experiment, outcomes)
_logger.info(f"MPIEvaluator: Callback completed for all {len(packed)} experiments")

_logger.info(f"MPIEvaluator: Completed all {len(experiments)} experiments")
19 changes: 11 additions & 8 deletions ema_workbench/examples/example_mpi_lake_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@

"""
import math
import sys
import time

# FIXME
import sys

sys.path.insert(0, "/Users/jhkwakkel/Documents/GitHub/EMAworkbench")

import numpy as np
Expand All @@ -22,8 +23,8 @@
Constant,
ema_logging,
MPIEvaluator,
save_results,
)
from ema_workbench.em_framework.evaluators import Samplers


def lake_problem(
Expand Down Expand Up @@ -86,10 +87,10 @@ def lake_problem(


if __name__ == "__main__":
# run with mpiexec -n 4 python -m mpi4py.futures example_mpi_lake_model.py
starttime = time.time()
# run with mpiexec -n 1 -usize {ntasks} python example_mpi_lake_model.py
starttime = time.perf_counter()

ema_logging.log_to_stderr(ema_logging.INFO, pass_root_logger_level=False)
ema_logging.log_to_stderr(ema_logging.INFO, pass_root_logger_level=True)

# instantiate the model
lake_model = Model("lakeproblem", function=lake_problem)
Expand Down Expand Up @@ -119,10 +120,12 @@ def lake_problem(
lake_model.constants = [Constant("alpha", 0.41), Constant("nsamples", 150)]

# generate some random policies by sampling over levers
n_scenarios = 1000
n_scenarios = 10000
n_policies = 4

with MPIEvaluator(lake_model) as evaluator:
res = evaluator.perform_experiments(n_scenarios, n_policies)
res = evaluator.perform_experiments(n_scenarios, n_policies, chunksize=250)

save_results(res, "test.tar.gz")

print(time.time() - starttime)
print(time.perf_counter() - starttime)
22 changes: 22 additions & 0 deletions ema_workbench/examples/slurm_script.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#!/bin/bash

#SBATCH --job-name="Python_test"
#SBATCH --time=00:02:00
#SBATCH --ntasks=10
#SBATCH --cpus-per-task=1
#SBATCH --partition=compute
#SBATCH --mem-per-cpu=4GB
#SBATCH --account=research-tpm-mas

module load 2023r1
module load openmpi
module load python
module load py-numpy
module load py-scipy
module load py-mpi4py
module load py-pip

pip install ipyparallel
pip install --user -e git+https://github.com/quaquel/EMAworkbench@mpi_update#egg=ema-workbench

mpiexec -n 1 python3 example_mpi_lake_model.py
Binary file added ema_workbench/examples/test.tar.gz
Binary file not shown.
Loading