Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor parsing fixes + new HPC #1

Merged
merged 6 commits into from
Apr 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@
build/
dist/
venv/
.vscode/settings.json
processMeerKAT/workspace.code-workspace
17 changes: 17 additions & 0 deletions processMeerKAT/known_hpc.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,20 @@
path_binding = '--bind /share:/share '
# Must use linebreaks for #SBATCH lines! Otherwise python reads them as comments, and ignores them.
submission_file_base = "#!/bin/bash{array}{exclude}{reservation}\n#SBATCH --nodes={nodes}\n#SBATCH --ntasks-per-node={tasks}\n#SBATCH --cpus-per-task={cpus}\n#SBATCH --mem={mem}GB\n#SBATCH --job-name={runname}{name}\n#SBATCH --distribution=plane={plane}\n#SBATCH --output={LOG_DIR}/%%x-{ID}.out\n#SBATCH --error={LOG_DIR}/%%x-{ID}.err\n#SBATCH --time={time}"

[petrichor]
# Specify differences to ilifu
TOTAL_NODES_LIMIT = 110
CPUS_PER_NODE_LIMIT = 64
NTASKS_PER_NODE_LIMIT = %(CPUS_PER_NODE_LIMIT)s
MEM_PER_NODE_GB_LIMIT = 512 # GB
MEM_PER_NODE_GB_LIMIT_HIGHMEM = 1000 #
ACCOUNTS = [''] # List of allowed accounts; Not currently used: see sbatch_file_base ### update me!!!
CONTAINER = '/scratch1/tho822/containers/casa-pipeline/casa-6.1.2.7-modular.simg' #
PARTITION = 'defq'
QOS = 'express'
MPI_WRAPPER = 'mpirun'
MODULES = ['singularity','openmpi']
path_binding = '--bind /scratch1:/scratch1,/scratch2:/scratch2 '
# Must use linebreaks for #SBATCH lines! Otherwise python reads them as comments, and ignores them.
submission_file_base = "#!/bin/bash{array}{exclude}{reservation}\n#SBATCH --nodes={nodes}\n#SBATCH --ntasks-per-node={tasks}\n#SBATCH --cpus-per-task={cpus}\n#SBATCH --mem={mem}GB\n#SBATCH --job-name={runname}{name}\n#SBATCH --distribution=plane={plane}\n#SBATCH --output={LOG_DIR}/%%x-{ID}.out\n#SBATCH --error={LOG_DIR}/%%x-{ID}.err\n#SBATCH --time={time}"
34 changes: 20 additions & 14 deletions processMeerKAT/processMeerKAT.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,15 @@ def parse_scripts(val):
DEFAULTS_CONFIG_PATH = "known_hpc.cfg"
known_hpc_path = "{0}/{1}".format(SCRIPT_DIR, DEFAULTS_CONFIG_PATH)

# Begin parsing
parser = argparse.ArgumentParser(prog=THIS_PROG,description='Process MeerKAT data via CASA MeasurementSet. Version: {0}'.format(__version__))

if os.path.isfile(known_hpc_path):
KNOWN_HPCS, HPC_CONFIG = config_parser.parse_config(known_hpc_path)
else:
parser.error("Known HPC config file ({0}) not found.".format(known_hpc_path))

# Begin parsing
parser = argparse.ArgumentParser(prog=THIS_PROG,description='Process MeerKAT data via CASA MeasurementSet. Version: {0}'.format(__version__))


parser.add_argument("--hpc",metavar='name', required=False, type=str, default="ilifu", help="Name of hpc facility being used. If not known to processMeerKAT/known_hpc.cfg slurm limits are functionally removed [default: ilifu].")
# Read in parser default values according to --cluster parameter
Expand All @@ -154,12 +156,13 @@ def parse_scripts(val):
HPC_DEFAULTS = KNOWN_HPCS[HPC_NAME]

parser.add_argument("-M","--MS",metavar="path", required=False, type=str, help="Path to MeasurementSet.")
parser.add_argument("-C","--config",metavar="path", default=HPC_DEFAULTS['CONFIG'.lower()], required=False, type=str, help="Relative (not absolute) path to config file.")
parser.add_argument("-C","--config",metavar="config", default=HPC_DEFAULTS['CONFIG'.lower()], required=False, type=str, help="Relative (not absolute) path to config file.")

args, unknown = parser.parse_known_args()
# Extract hpc name used during build and warn if not the same as CLI hpc
config_dict, config = config_parser.parse_config(args.config)
if config.has_option('run', 'hpc'):
config_hpc_name = config['run']['hpc']
config_hpc_name = config['run']['hpc'].strip("'")
else:
config_hpc_name = HPC_NAME
if HPC_NAME != config_hpc_name:
Expand Down Expand Up @@ -398,7 +401,7 @@ def write_command(script,args,mpi_wrapper,container,name='job',casa_script=False
if idx+1 < len(argv):
argv[idx+1] += "_${SLURM_ARRAY_JOB_ID}"
# Remove config name. Config is passed into `args` parameter.
elif element in ["-c", "--config"]:
elif element in ["-C", "--config"]:
argv[idx] = ""
if idx+1 < len(argv):
argv[idx+1] = ""
Expand Down Expand Up @@ -629,14 +632,14 @@ def write_spw_master(filename,config,args,SPWs,precal_scripts,postcal_scripts,su
if idx+1 < len(argv):
argv[idx+1] += "_${SLURM_ARRAY_JOB_ID}"
# Remove config name. Config is passed into `args` parameter.
elif element in ["-c", "--config"]:
elif element in ["-C", "--config"]:
argv[idx] = ""
if idx+1 < len(argv):
argv[idx+1] = ""
else:
pass
argument_calls = " ".join(argv)
if ("-v" or "--verbose") not in argument_calls:
if not any((arg in argument_calls for arg in ("-v", "--verbose"))):
argument_calls += " --quiet"

for i,spw in enumerate(SPWs.split(',')):
Expand Down Expand Up @@ -728,14 +731,14 @@ def write_spw_master(filename,config,args,SPWs,precal_scripts,postcal_scripts,su
print(element, arguments[idx:])
if idx+1 < len(arguments):
arguments[idx+1] += "_$f"
elif element in ["-c", "--config"]:
elif element in ["-C", "--config"]:
if idx+1 < len(arguments):
arguments[idx+1] = ".config.tmp"
else:
pass

argument_calls = " ".join(arguments)
if ("-v" or "--verbose") not in argument_calls:
if not any((arg in argument_calls for arg in ("-v", "--verbose"))):
argument_calls += " --quiet"

# Create script to start processMeerKAT.py for each SPW whilst maintaining args.
Expand Down Expand Up @@ -1135,7 +1138,7 @@ def default_config(arg_dict):
mpi_wrapper = srun(arg_dict)

#Write and submit srun command to extract fields, and insert them into config file under section [fields]
params = '-B -M {MS} -C {config} -N {nodes} -t {ntasks_per_node}'.format(**arg_dict)
params = '-B -M {MS} -C {config} -N {nodes} -t {ntasks_per_node} --hpc {hpc}'.format(**arg_dict)
if arg_dict['dopol']:
params += ' -P'
if arg_dict['verbose']:
Expand Down Expand Up @@ -1262,7 +1265,7 @@ def format_args(config,submit,quiet,dependencies,justrun):

#Check selfcal params
if config_parser.has_section(config,'selfcal'):
selfcal_kwargs = get_config_kwargs(config, 'selfcal', SELFCAL_CONFIG_KEYS)
selfcal_kwargs = get_config_kwargs(config, 'selfcal', HPC_DEFAULTS['SELFCAL_CONFIG_KEYS'.lower()])
params = bookkeeping.get_selfcal_params()
if selfcal_kwargs['loop'] > 0:
logger.warning("Starting with loop={0}, which is only valid if previous loops were successfully run in this directory.".format(selfcal_kwargs['loop']))
Expand All @@ -1278,7 +1281,7 @@ def format_args(config,submit,quiet,dependencies,justrun):
os.system(command)

if config_parser.has_section(config,'image'):
imaging_kwargs = get_config_kwargs(config, 'image', IMAGING_CONFIG_KEYS)
imaging_kwargs = get_config_kwargs(config, 'image', HPC_DEFAULTS['IMAGING_CONFIG_KEYS'.lower()])

#If nspw = 1 and precal or postcal scripts present, overwrite config and reload
if nspw == 1:
Expand Down Expand Up @@ -1372,8 +1375,11 @@ def format_args(config,submit,quiet,dependencies,justrun):
#sys.exit(1)

#If everything up until here has passed, we can copy config file to TMP_CONFIG (in case user runs sbatch manually) and inform user
logger.debug("Copying '{0}' to '{1}', and using this to run pipeline.".format(config, HPC_DEFAULTS['TMP_CONFIG'.lower()]))
copyfile(config, HPC_DEFAULTS['TMP_CONFIG'.lower()])
# Skip if config is temporary
if config == HPC_DEFAULTS['TMP_CONFIG'.lower()]:
logger.debug("Not copying '{0}' to '{1}'. They're the same file.".format(config, HPC_DEFAULTS['TMP_CONFIG'.lower()]))
else:
logger.debug("Copying '{0}' to '{1}', and using this to run pipeline.".format(config, HPC_DEFAULTS['TMP_CONFIG'.lower()]))
if not quiet:
logger.warning("Changing [slurm] section in your config will have no effect unless you [-R --run] again.")

Expand Down