diff --git a/.gitignore b/.gitignore index b1d66dc..16180c9 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,5 @@ build/ dist/ venv/ +.vscode/settings.json +processMeerKAT/workspace.code-workspace diff --git a/processMeerKAT/known_hpc.cfg b/processMeerKAT/known_hpc.cfg index 06c3b53..114a2a6 100644 --- a/processMeerKAT/known_hpc.cfg +++ b/processMeerKAT/known_hpc.cfg @@ -105,3 +105,20 @@ path_binding = '--bind /share:/share ' # Must use linebreaks for #SBATCH lines! Otherwise python reads them as comments, and ignores them. submission_file_base = "#!/bin/bash{array}{exclude}{reservation}\n#SBATCH --nodes={nodes}\n#SBATCH --ntasks-per-node={tasks}\n#SBATCH --cpus-per-task={cpus}\n#SBATCH --mem={mem}GB\n#SBATCH --job-name={runname}{name}\n#SBATCH --distribution=plane={plane}\n#SBATCH --output={LOG_DIR}/%%x-{ID}.out\n#SBATCH --error={LOG_DIR}/%%x-{ID}.err\n#SBATCH --time={time}" + +[petrichor] + # Specify differences to ilifu + TOTAL_NODES_LIMIT = 110 + CPUS_PER_NODE_LIMIT = 64 + NTASKS_PER_NODE_LIMIT = %(CPUS_PER_NODE_LIMIT)s + MEM_PER_NODE_GB_LIMIT = 512 # GB + MEM_PER_NODE_GB_LIMIT_HIGHMEM = 1000 # + ACCOUNTS = [''] # List of allowed accounts; Not currently used: see sbatch_file_base ### update me!!! + CONTAINER = '/scratch1/tho822/containers/casa-pipeline/casa-6.1.2.7-modular.simg' # + PARTITION = 'defq' + QOS = 'express' + MPI_WRAPPER = 'mpirun' + MODULES = ['singularity','openmpi'] + path_binding = '--bind /scratch1:/scratch1,/scratch2:/scratch2 ' + # Must use linebreaks for #SBATCH lines! Otherwise python reads them as comments, and ignores them. + submission_file_base = "#!/bin/bash{array}{exclude}{reservation}\n#SBATCH --nodes={nodes}\n#SBATCH --ntasks-per-node={tasks}\n#SBATCH --cpus-per-task={cpus}\n#SBATCH --mem={mem}GB\n#SBATCH --job-name={runname}{name}\n#SBATCH --distribution=plane={plane}\n#SBATCH --output={LOG_DIR}/%%x-{ID}.out\n#SBATCH --error={LOG_DIR}/%%x-{ID}.err\n#SBATCH --time={time}" diff --git a/processMeerKAT/processMeerKAT.py b/processMeerKAT/processMeerKAT.py index da43c89..fc090f2 100755 --- a/processMeerKAT/processMeerKAT.py +++ b/processMeerKAT/processMeerKAT.py @@ -139,13 +139,15 @@ def parse_scripts(val): DEFAULTS_CONFIG_PATH = "known_hpc.cfg" known_hpc_path = "{0}/{1}".format(SCRIPT_DIR, DEFAULTS_CONFIG_PATH) + # Begin parsing + parser = argparse.ArgumentParser(prog=THIS_PROG,description='Process MeerKAT data via CASA MeasurementSet. Version: {0}'.format(__version__)) + if os.path.isfile(known_hpc_path): KNOWN_HPCS, HPC_CONFIG = config_parser.parse_config(known_hpc_path) else: parser.error("Known HPC config file ({0}) not found.".format(known_hpc_path)) - # Begin parsing - parser = argparse.ArgumentParser(prog=THIS_PROG,description='Process MeerKAT data via CASA MeasurementSet. Version: {0}'.format(__version__)) + parser.add_argument("--hpc",metavar='name', required=False, type=str, default="ilifu", help="Name of hpc facility being used. If not known to processMeerKAT/known_hpc.cfg slurm limits are functionally removed [default: ilifu].") # Read in parser default values according to --cluster parameter @@ -154,12 +156,13 @@ def parse_scripts(val): HPC_DEFAULTS = KNOWN_HPCS[HPC_NAME] parser.add_argument("-M","--MS",metavar="path", required=False, type=str, help="Path to MeasurementSet.") - parser.add_argument("-C","--config",metavar="path", default=HPC_DEFAULTS['CONFIG'.lower()], required=False, type=str, help="Relative (not absolute) path to config file.") + parser.add_argument("-C","--config",metavar="config", default=HPC_DEFAULTS['CONFIG'.lower()], required=False, type=str, help="Relative (not absolute) path to config file.") + args, unknown = parser.parse_known_args() # Extract hpc name used during build and warn if not the same as CLI hpc config_dict, config = config_parser.parse_config(args.config) if config.has_option('run', 'hpc'): - config_hpc_name = config['run']['hpc'] + config_hpc_name = config['run']['hpc'].strip("'") else: config_hpc_name = HPC_NAME if HPC_NAME != config_hpc_name: @@ -398,7 +401,7 @@ def write_command(script,args,mpi_wrapper,container,name='job',casa_script=False if idx+1 < len(argv): argv[idx+1] += "_${SLURM_ARRAY_JOB_ID}" # Remove config name. Config is passed into `args` parameter. - elif element in ["-c", "--config"]: + elif element in ["-C", "--config"]: argv[idx] = "" if idx+1 < len(argv): argv[idx+1] = "" @@ -629,14 +632,14 @@ def write_spw_master(filename,config,args,SPWs,precal_scripts,postcal_scripts,su if idx+1 < len(argv): argv[idx+1] += "_${SLURM_ARRAY_JOB_ID}" # Remove config name. Config is passed into `args` parameter. - elif element in ["-c", "--config"]: + elif element in ["-C", "--config"]: argv[idx] = "" if idx+1 < len(argv): argv[idx+1] = "" else: pass argument_calls = " ".join(argv) - if ("-v" or "--verbose") not in argument_calls: + if not any((arg in argument_calls for arg in ("-v", "--verbose"))): argument_calls += " --quiet" for i,spw in enumerate(SPWs.split(',')): @@ -728,14 +731,14 @@ def write_spw_master(filename,config,args,SPWs,precal_scripts,postcal_scripts,su print(element, arguments[idx:]) if idx+1 < len(arguments): arguments[idx+1] += "_$f" - elif element in ["-c", "--config"]: + elif element in ["-C", "--config"]: if idx+1 < len(arguments): arguments[idx+1] = ".config.tmp" else: pass argument_calls = " ".join(arguments) - if ("-v" or "--verbose") not in argument_calls: + if not any((arg in argument_calls for arg in ("-v", "--verbose"))): argument_calls += " --quiet" # Create script to start processMeerKAT.py for each SPW whilst maintaining args. @@ -1135,7 +1138,7 @@ def default_config(arg_dict): mpi_wrapper = srun(arg_dict) #Write and submit srun command to extract fields, and insert them into config file under section [fields] - params = '-B -M {MS} -C {config} -N {nodes} -t {ntasks_per_node}'.format(**arg_dict) + params = '-B -M {MS} -C {config} -N {nodes} -t {ntasks_per_node} --hpc {hpc}'.format(**arg_dict) if arg_dict['dopol']: params += ' -P' if arg_dict['verbose']: @@ -1262,7 +1265,7 @@ def format_args(config,submit,quiet,dependencies,justrun): #Check selfcal params if config_parser.has_section(config,'selfcal'): - selfcal_kwargs = get_config_kwargs(config, 'selfcal', SELFCAL_CONFIG_KEYS) + selfcal_kwargs = get_config_kwargs(config, 'selfcal', HPC_DEFAULTS['SELFCAL_CONFIG_KEYS'.lower()]) params = bookkeeping.get_selfcal_params() if selfcal_kwargs['loop'] > 0: logger.warning("Starting with loop={0}, which is only valid if previous loops were successfully run in this directory.".format(selfcal_kwargs['loop'])) @@ -1278,7 +1281,7 @@ def format_args(config,submit,quiet,dependencies,justrun): os.system(command) if config_parser.has_section(config,'image'): - imaging_kwargs = get_config_kwargs(config, 'image', IMAGING_CONFIG_KEYS) + imaging_kwargs = get_config_kwargs(config, 'image', HPC_DEFAULTS['IMAGING_CONFIG_KEYS'.lower()]) #If nspw = 1 and precal or postcal scripts present, overwrite config and reload if nspw == 1: @@ -1372,8 +1375,11 @@ def format_args(config,submit,quiet,dependencies,justrun): #sys.exit(1) #If everything up until here has passed, we can copy config file to TMP_CONFIG (in case user runs sbatch manually) and inform user - logger.debug("Copying '{0}' to '{1}', and using this to run pipeline.".format(config, HPC_DEFAULTS['TMP_CONFIG'.lower()])) - copyfile(config, HPC_DEFAULTS['TMP_CONFIG'.lower()]) + # Skip if config is temporary + if config == HPC_DEFAULTS['TMP_CONFIG'.lower()]: + logger.debug("Not copying '{0}' to '{1}'. They're the same file.".format(config, HPC_DEFAULTS['TMP_CONFIG'.lower()])) + else: + logger.debug("Copying '{0}' to '{1}', and using this to run pipeline.".format(config, HPC_DEFAULTS['TMP_CONFIG'.lower()])) if not quiet: logger.warning("Changing [slurm] section in your config will have no effect unless you [-R --run] again.")