From e0031360f83db32046337bdbc13c40b34fd47920 Mon Sep 17 00:00:00 2001 From: dzalkind <65573423+dzalkind@users.noreply.github.com> Date: Tue, 10 Dec 2024 16:29:03 -0700 Subject: [PATCH] Add mpi_tools to rosco (#400) * Add mpi_tools to rosco * Only publish on macos-latest --- .github/workflows/Publish_ROSCO.yml | 2 +- rosco/toolbox/ofTools/case_gen/run_FAST.py | 6 +- rosco/toolbox/ofTools/util/mpi_tools.py | 139 +++++++++++++++++++++ 3 files changed, 143 insertions(+), 4 deletions(-) create mode 100644 rosco/toolbox/ofTools/util/mpi_tools.py diff --git a/.github/workflows/Publish_ROSCO.yml b/.github/workflows/Publish_ROSCO.yml index 2202c5fe..637b0264 100644 --- a/.github/workflows/Publish_ROSCO.yml +++ b/.github/workflows/Publish_ROSCO.yml @@ -12,7 +12,7 @@ jobs: strategy: fail-fast: false matrix: - os: [ubuntu-latest, windows-latest, macos-12, macos-13, macos-14] + os: [ubuntu-latest, windows-latest, macOS-latest] steps: - name: Set up QEMU diff --git a/rosco/toolbox/ofTools/case_gen/run_FAST.py b/rosco/toolbox/ofTools/case_gen/run_FAST.py index 49061f5b..160a4d40 100644 --- a/rosco/toolbox/ofTools/case_gen/run_FAST.py +++ b/rosco/toolbox/ofTools/case_gen/run_FAST.py @@ -22,13 +22,13 @@ try: from weis.aeroelasticse.runFAST_pywrapper import runFAST_pywrapper_batch in_weis = True -except Exception: +except Exception as e: from rosco.toolbox.ofTools.case_gen.runFAST_pywrapper import runFAST_pywrapper_batch in_weis = False #from rosco.toolbox.ofTools.case_gen.CaseGen_IEC import CaseGen_IEC from rosco.toolbox.ofTools.case_gen.CaseGen_General import CaseGen_General from rosco.toolbox.ofTools.case_gen import CaseLibrary as cl -from wisdem.commonse.mpi_tools import MPI +from openmdao.utils.mpi import MPI # Globals this_dir = os.path.dirname(os.path.abspath(__file__)) @@ -160,7 +160,7 @@ def run_FAST(self): # Management of parallelization, leave in for now if MPI: - from wisdem.commonse.mpi_tools import map_comm_heirarchical, subprocessor_loop, subprocessor_stop + from rosco.toolbox.ofTools.util.mpi_tools import map_comm_heirarchical, subprocessor_loop, subprocessor_stop n_OF_runs = len(case_list) available_cores = MPI.COMM_WORLD.Get_size() diff --git a/rosco/toolbox/ofTools/util/mpi_tools.py b/rosco/toolbox/ofTools/util/mpi_tools.py new file mode 100644 index 00000000..e53fcc85 --- /dev/null +++ b/rosco/toolbox/ofTools/util/mpi_tools.py @@ -0,0 +1,139 @@ +import os +import sys + +from openmdao.utils.mpi import MPI + + +def under_mpirun(): + """Return True if we're being executed under mpirun.""" + # this is a bit of a hack, but there appears to be + # no consistent set of environment vars between MPI + # implementations. + for name in os.environ.keys(): + if ( + name == "OMPI_COMM_WORLD_RANK" + or name == "MPIEXEC_HOSTNAME" + or name.startswith("MPIR_") + or name.startswith("MPICH_") + or name.startswith("INTEL_ONEAPI_MPI_") + or name.startswith("I_MPI_") + ): + return True + return False + + +if under_mpirun(): + + def debug(*msg): # pragma: no cover + newmsg = ["%d: " % MPI.COMM_WORLD.rank] + list(msg) + for m in newmsg: + sys.stdout.write("%s " % m) + sys.stdout.write("\n") + sys.stdout.flush() + +else: + MPI = None + + +def map_comm_heirarchical(n_DV, n_OF, openmp=False): + """ + Heirarchical parallelization communicator mapping. Assumes a number of top level processes + equal to the number of design variables (x2 if central finite differencing is used), each + with its associated number of openfast simulations. + When openmp flag is turned on, the code spreads the openfast simulations across nodes to + lavereage the opnemp parallelization of OpenFAST. The cores that will run under openmp, are marked + in the color map as 1000000. The ones handling python and the DV are marked as 0, and + finally the master ones for each openfast run are marked with a 1. + """ + if openmp: + n_procs_per_node = 36 # Number of + num_procs = MPI.COMM_WORLD.Get_size() + n_nodes = num_procs / n_procs_per_node + + comm_map_down = {} + comm_map_up = {} + color_map = [1000000] * num_procs + + n_DV_per_node = n_DV / n_nodes + + # for m in range(n_DV_per_node): + for nn in range(int(n_nodes)): + for n_dv in range(int(n_DV_per_node)): + comm_map_down[nn * n_procs_per_node + n_dv] = [ + int(n_DV_per_node) + n_dv * n_OF + nn * (n_procs_per_node) + j for j in range(n_OF) + ] + + # This core handles python, so in the colormap the entry is 0 + color_map[nn * n_procs_per_node + n_dv] = int(0) + # These cores handles openfast, so in the colormap the entry is 1 + for k in comm_map_down[nn * n_procs_per_node + n_dv]: + color_map[k] = int(1) + + for j in comm_map_down[nn * n_procs_per_node + n_dv]: + comm_map_up[j] = nn * n_procs_per_node + n_dv + else: + N = n_DV + n_DV * n_OF + comm_map_down = {} + comm_map_up = {} + color_map = [0] * n_DV + + for i in range(n_DV): + comm_map_down[i] = [n_DV + j + i * n_OF for j in range(n_OF)] + color_map.extend([i + 1] * n_OF) + + for j in comm_map_down[i]: + comm_map_up[j] = i + + return comm_map_down, comm_map_up, color_map + + +def subprocessor_loop(comm_map_up): + """ + Subprocessors loop, waiting to receive a function and its arguements to evaluate. + Output of the function is returned. Loops until a stop signal is received + + Input data format: + data[0] = function to be evaluated + data[1] = [list of arguments] + If the function to be evaluated does not fit this format, then a wrapper function + should be created and passed, that handles the setup, argument assignment, etc + for the actual function. + + Stop sigal: + data[0] = False + """ + # comm = impl.world_comm() + rank = MPI.COMM_WORLD.Get_rank() + rank_target = comm_map_up[rank] + + keep_running = True + while keep_running: + data = MPI.COMM_WORLD.recv(source=(rank_target), tag=0) + if data[0] == False: + break + else: + func_execution = data[0] + args = data[1] + output = func_execution(args) + MPI.COMM_WORLD.send(output, dest=(rank_target), tag=1) + + +def subprocessor_stop(comm_map_down): + """ + Send stop signal to subprocessors + """ + # comm = MPI.COMM_WORLD + for rank in comm_map_down.keys(): + subranks = comm_map_down[rank] + for subrank_i in subranks: + MPI.COMM_WORLD.send([False], dest=subrank_i, tag=0) + print("All MPI subranks closed.") + + +if __name__ == "__main__": + + ( + _, + _, + _, + ) = map_comm_heirarchical(2, 4)