diff --git a/CHANGELOG.md b/CHANGELOG.md index ed6789b8..66144da3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,16 @@ # CHANGELOG -## v3.0.1 (#203) +## v3.0.2 (#204) +System Wisteria GPU Upgrades + +- Bugfix: Fujitsu tasktime for individual job submission was using `walltime` value, not `tasktime` value +- Combined and condensed main System functionality in Fujitsu system from Wisteria child class. Prior to this Wisteria child class was overwriting most of the functionality of Fujitsu which is not really the point of inheritance +- New Custom Run and Submit scriopts for Wisteria GPU. Better comments and slightly easier to modify for others +- Added new `rscgrps` to include GPU partitions on Wisteria +- Improved run call header for easier switching between CPU and GPU nodes + +## v3.0.1 (#203) +Quality of Life Updates - Solver now automatically generates VTK files for Models and Gradients at the end of each iteration - New function `solver.specfem.make_output_vtk_files` that generates .vtk files for all files in the output/ directory diff --git a/pyproject.toml b/pyproject.toml index 556b8520..dab19fd5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "seisflows" -version = "3.0.1" +version = "3.0.2" description = "An automated workflow tool for full waveform inversion" readme = "README.md" requires-python = ">=3.7" diff --git a/seisflows/system/fujitsu.py b/seisflows/system/fujitsu.py index 3cdb0c69..1bc23f16 100644 --- a/seisflows/system/fujitsu.py +++ b/seisflows/system/fujitsu.py @@ -4,6 +4,7 @@ Computing Suite). .. note:: + The nickname `PJM`, based on the batch job script directives, may be used as a shorthand to refer to the Fujitsu job management system. @@ -18,10 +19,10 @@ import subprocess from datetime import timedelta -from seisflows import ROOT_DIR, logger +from seisflows import logger from seisflows.system.cluster import Cluster from seisflows.tools import msg -from seisflows.tools.config import pickle_function_list +from seisflows.tools.config import pickle_function_list, import_seisflows class Fujitsu(Cluster): @@ -75,7 +76,6 @@ def __init__(self, ntask_max=100, pjm_args="", **kwargs): self._failed_states = ["CANCEL", "HOLD", "ERROR"] self._pending_states = ["QUEUED", "RUNNING"] - def check(self): """ Checks parameters and paths @@ -146,42 +146,51 @@ def run_call_header(self): f"-N {self.title}", # job name f"-o {os.path.join(self.path.log_files, '%j')}", f"-j", # merge stderr with stdout - f"-L elapse={self._walltime}", # [[hour:]minute:]second + f"-L elapse={self._tasktime}", # [[hour:]minute:]second f"-L node={self.nodes}", f"--mpi proc={self.nproc}", ]) return _call + + def submit(self, workdir=None, parameter_file="parameters.yaml", + direct=True): + """ + Submit main workflow to the System. Two options are available, + submitting a Python job directly to the system, or submitting a + subprocess. - def submit(self, workdir=None, parameter_file="parameters.yaml"): - """ - Submits the main workflow job as a separate job submitted directly to - the system that is running the master job. - - .. note:: - - Fujitsu scheduler doesn't allow command line arugments - (e.g., --workdir), so these are assumed to be default values where - the workdir is ${pwd} and the parameter file is called - 'parameters.yaml' - :type workdir: str :param workdir: path to the current working directory :type parameter_file: str :param parameter_file: paramter file file name used to instantiate - the SeisFlows package - """ - # e.g., submit -w ./ -p parameters.yaml - submit_call = " ".join([ - f"{self.submit_call_header}", - f"{self.submit_workflow}", - ]) - - logger.debug(submit_call) - try: - subprocess.run(submit_call, shell=True) - except subprocess.CalledProcessError as e: - logger.critical(f"SeisFlows master job has failed with: {e}") - sys.exit(-1) + the SeisFlows package + :type direct: bool + :param direct: (used for overriding system modules) submits the main + workflow job directly to the login node as a Python process + (default). If False, submits the main job as a separate subprocess. + Note that this is Fujitsu specific and main jobs should be run from + interactive jobs run on compute nodes to avoid running jobs on + shared login resources + """ + if direct: + workflow = import_seisflows(workdir=workdir or self.path.workdir, + parameter_file=parameter_file) + workflow.check() + workflow.setup() + workflow.run() + else: + # e.g., submit -w ./ -p parameters.yaml + submit_call = " ".join([ + f"{self.submit_call_header}", + f"{self.submit_workflow}", + ]) + + logger.debug(submit_call) + try: + subprocess.run(submit_call, shell=True) + except subprocess.CalledProcessError as e: + logger.critical(f"SeisFlows master job has failed with: {e}") + sys.exit(-1) @staticmethod def _stdout_to_job_id(stdout): @@ -215,11 +224,8 @@ def run(self, funcs, single=False, **kwargs): cluster. Executes the list of functions (`funcs`) NTASK times with each task occupying NPROC cores. - .. warning:: - This has not been tested generally on Fujitsu systems, see system - Wisteria for a working application of the Fujitsu module - .. note:: + Completely overwrites the `Cluster.run()` command :type funcs: list of methods @@ -244,15 +250,23 @@ def run(self, funcs, single=False, **kwargs): f"system {self.ntask} times") _ntask = self.ntask + # If no environs, ensure there is not trailing comma + if self.environs: + self.environs = f",{self.environs}" + # Default Fujitsu command line input, can be overloaded by subclasses # Copy-paste this default run_call and adjust accordingly for subclass job_ids = [] for taskid in range(_ntask): run_call = " ".join([ f"{self.run_call_header}", - f"--funcs {funcs_fid}", - f"--kwargs {kwargs_fid}", - f"--environment SEISFLOWS_TASKID={{task_id}},{self.environs}" + # -x in 'pjsub' sets environment variables which are distributed + # in the run script, see custom run scripts for example how + # Ensure that these are comma-separated, not space-separated + f"-x SEISFLOWS_FUNCS={funcs_fid}," + f"SEISFLOWS_KWARGS={kwargs_fid}," + f"SEISFLOWS_TASKID={taskid}{self.environs}," + f"GPU_MODE={int(bool(self.gpu))}", # 0 if False, 1 if True f"{self.run_functions}", ]) @@ -266,7 +280,7 @@ def run(self, funcs, single=False, **kwargs): # Monitor the job queue until all jobs have completed, or any one fails try: - status = self.check_job_status(job_id) + status = self.monitor_job_status(job_ids) except FileNotFoundError: logger.critical(f"cannot access job information through 'pjstat', " f"waited 50s with no return, please check job " @@ -351,7 +365,7 @@ def query_job_states(self, job_id, timeout_s=300, wait_time_s=30, if _recheck > (timeout_s // wait_time_s): raise TimeoutError(f"cannot access job ID {job_id}") time.sleep(wait_time_s) - query_job_states(job_id, _recheck=_recheck) + self.query_job_states(job_id, _recheck=_recheck) return job_ids, job_states diff --git a/seisflows/system/runscripts/custom_run-wisteria b/seisflows/system/runscripts/custom_run-wisteria index a2c0590f..c4518403 100755 --- a/seisflows/system/runscripts/custom_run-wisteria +++ b/seisflows/system/runscripts/custom_run-wisteria @@ -2,20 +2,41 @@ # ============================================================================== # This is a Wisteria (UTokyo HPC) specific run script that is required # because the compute node does not inherit the login node's Conda environment. -# Instead we need to load the module and environment manually, before run +# Instead we need to load the module and environment manually, before run. +# +# User needs to set the following paths: +# WORK_DIR: path to the directory where the Conda environment is stored, and +# where the SeisFlows repository has been cloned +# CONDA_ENV: name of the Conda environment to be used +# GPU_MODE: needs to be set by the calling function, if GPU_MODE=='GPU', then +# the script will load the GPU specific environment # ============================================================================== +# Defines where our Conda environment is saved and what its name is WORK_DIR=/work/01/gr58/share/adjtomo +CONDA_ENV=adjtomo +echo "work environment set as: '$WORK_DIR'" # Load MPI and activate Conda environment -module load intel -module load impi +if [ $GPU_MODE -eq 1 ]; then # Use GPUs + echo "loading GPU modules on compute node" + module load cuda/12.2 + module load gcc + module load ompi-cuda +else + echo "loading CPU modules on compute node" + module load intel + module load impi +fi + +# Conda will be common to GPU or CPU versions +echo "loading Conda environment: $CONDA_ENV" module load miniconda/py38_4.9.2 source $MINICONDA_DIR/etc/profile.d/conda.sh -conda activate $WORK_DIR/conda/envs/adjtomo +conda activate $WORK_DIR/conda/envs/$CONDA_ENV -# Run Functions: ensure that we are using the correct Python version +# Run Functions: ensure that we are using the correct Python version # The following environment variables must be set by the '-x' flag in the # corresponding system.run() function: # --- @@ -24,4 +45,5 @@ conda activate $WORK_DIR/conda/envs/adjtomo # SEISFLOWS_ENV: any additional environment variables # SEISFLOWS_TASKID: assigned processor number for given task # --- -$WORK_DIR/conda/envs/adjtomo/bin/python $WORK_DIR/REPOSITORIES/seisflows/seisflows/system/runscripts/run --funcs $SEISFLOWS_FUNCS --kwargs $SEISFLOWS_KWARGS --environment SEISFLOWS_TASKID=$SEISFLOWS_TASKID,$SEISFLOWS_ENV +$WORK_DIR/conda/envs/$CONDA_ENV/bin/python $WORK_DIR/REPOSITORIES/seisflows/seisflows/system/runscripts/run --funcs $SEISFLOWS_FUNCS --kwargs $SEISFLOWS_KWARGS --environment SEISFLOWS_TASKID=$SEISFLOWS_TASKID,$SEISFLOWS_ENV + diff --git a/seisflows/system/runscripts/custom_submit-wisteria b/seisflows/system/runscripts/custom_submit-wisteria index 54de1c23..9fbfba35 100755 --- a/seisflows/system/runscripts/custom_submit-wisteria +++ b/seisflows/system/runscripts/custom_submit-wisteria @@ -5,6 +5,7 @@ # Instead we need to load the module and environment manually, before submit # ============================================================================== +# Defines where our Conda environment is stored WORK_DIR=/work/01/gr58/share/adjtomo # Load MPI and activate Conda environment diff --git a/seisflows/system/wisteria.py b/seisflows/system/wisteria.py index 7d2bf5a0..bec73491 100644 --- a/seisflows/system/wisteria.py +++ b/seisflows/system/wisteria.py @@ -9,6 +9,7 @@ (data/learning nodes w/ GPU) - Odyssey has 7680 nodes with 48 cores/node - Aquarius has 45 nodes with 36 cores/node + - Aquarius also contains 8x Nvidia A100 .. note:: Wisteria Caveat 1 @@ -30,11 +31,7 @@ environment variables. We use these in place of command line arguments """ import os -import subprocess -import sys -import time -from seisflows import ROOT_DIR, logger -from seisflows.tools.config import import_seisflows, pickle_function_list +from seisflows import ROOT_DIR from seisflows.system.fujitsu import Fujitsu @@ -63,6 +60,13 @@ class Wisteria(Fujitsu): - short-a: Aquarius short, 2 hr. max, [1, 2] nodes available - regular-a: Aquarius regular, 24-48 hr. max, [1, 8] nodes available + - share-debug: Aquarius GPU debug, 30 min max, 1, 2, 4 GPU available + - share-short: Aquarius GPU short queue, 2 hr. max, 1, 2, 4 GPU avail. + :type gpu: int + :param gpu: if not None, tells SeisFlows to use the GPU version of SPECFEM, + the integer value of `gpu` will set the number of requested GPUs for a + simulation on system (i.e., #PJM -L gpu=`gpu`) + Paths ----- @@ -78,108 +82,34 @@ class Wisteria(Fujitsu): run_functions = os.path.join(ROOT_DIR, "system", "runscripts", "custom_run-wisteria") - def __init__(self, user=None, group=None, rscgrp=None, **kwargs): + def __init__(self, group=None, rscgrp=None, gpu=None, **kwargs): """Wisteria init""" super().__init__(**kwargs) self.group = group self.rscgrp = rscgrp + self.gpu = gpu # Wisteria resource groups and their cores per node self._rscgrps = { + # Node-occupied resource allocation (Odyssey) "debug-o": 48, "short-o": 48, "regular-o": 48, "priority-o": 48, - "debug-a": 48, "short-a": 48, "regular-a": 48 + # Node-occupied resource allocation (Aquarius) + "debug-a": 36, "short-a": 36, "regular-a": 36, + # GPU-exclusive resource allocation + "share-debug": 1, "share-short": 2, "share": 5 } - def submit(self, workdir=None, parameter_file="parameters.yaml"): - """ - Submits the main workflow job as a serial job submitted directly to - the system that is running the master job - - :type workdir: str - :param workdir: path to the current working directory - :type parameter_file: str - :param parameter_file: parameter file name used to instantiate the - SeisFlows package - """ - workflow = import_seisflows(workdir=workdir or self.path.workdir, - parameter_file=parameter_file) - workflow.check() - workflow.setup() - workflow.run() - - def run(self, funcs, single=False, **kwargs): - """ - Runs task multiple times in embarrassingly parallel fasion on a PJM - cluster. Executes the list of functions (`funcs`) NTASK times with each - task occupying NPROC cores. - - .. note:: - Completely overwrites the `Cluster.run()` command - - :type funcs: list of methods - :param funcs: a list of functions that should be run in order. All - kwargs passed to run() will be passed into the functions. - :type single: bool - :param single: run a single-process, non-parallel task, such as - smoothing the gradient, which only needs to be run by once. - This will change how the job array and the number of tasks is - defined, such that the job is submitted as a single-core job to - the system. - """ - funcs_fid, kwargs_fid = pickle_function_list(funcs, - path=self.path.scratch, - **kwargs) - if single: - logger.info(f"running functions {[_.__name__ for _ in funcs]} on " - f"system 1 time") - _ntask = 1 - else: - logger.info(f"running functions {[_.__name__ for _ in funcs]} on " - f"system {self.ntask} times") - _ntask = self.ntask - - # Default Fujitsu command line input, can be overloaded by subclasses - # Copy-paste this default run_call and adjust accordingly for subclass - job_ids = [] - for taskid in range(_ntask): - run_call = " ".join([ - f"{self.run_call_header}", - # -x in 'pjsub' sets environment variables which are distributed - # in the run script, see custom run scripts for example how - f"-x SEISFLOWS_FUNCS={funcs_fid},SEISFLOWS_KWARGS={kwargs_fid}," - f"SEISFLOWS_TASKID={taskid}", - f"{self.run_functions}", - ]) - - if taskid == 0: - logger.debug(run_call) - - # Grab the job ids from each stdout message - stdout = subprocess.run(run_call, stdout=subprocess.PIPE, - text=True, shell=True).stdout - job_ids.append(self._stdout_to_job_id(stdout)) - - # Monitor the job queue until all jobs have completed, or any one fails - try: - status = self.check_job_status(job_ids) - except FileNotFoundError: - logger.critical(f"cannot access job information through 'pjstat', " - f"waited 50s with no return, please check job " - f"scheduler and log messages") - sys.exit(-1) - - if status == -1: # Failed job - logger.critical( - msg.cli(f"Stopping workflow. Please check logs for details.", - items=[f"TASKS: {[_.__name__ for _ in funcs]}", - f"PJSUB: {run_call}"], - header="PJM run error", border="=") - ) - sys.exit(-1) + @property + def run_call_header(self): + """Override run call header to allow for GPU version requirements""" + if self.gpu: + _call = super().run_call_header.replace( + f"-L node={self.nodes}", + f"-L gpu={self.gpu}" + ) + return _call else: - logger.info(f"{self.ntask} tasks finished successfully") - # Wait for all processes to finish and write to disk (if they do) - # Moving on too quickly may result in required files not being avail - time.sleep(5) + return super().run_call_header + diff --git a/seisflows/workflow/forward.py b/seisflows/workflow/forward.py index b2692bf7..c4780c26 100644 --- a/seisflows/workflow/forward.py +++ b/seisflows/workflow/forward.py @@ -549,7 +549,7 @@ def run_forward_simulations(self, path_model, save_traces=None, export_traces = False assert(os.path.exists(path_model)), \ - f"Model path for objective function does not exist" + f"Model path '{path_model}' for objective function does not exist" # We will run the forward simulation with the given input model self.solver.import_model(path_model=path_model)