From 179d7074e98edd371e07f172f354c229a0656324 Mon Sep 17 00:00:00 2001 From: Jordan Collier Date: Fri, 10 Dec 2021 15:48:19 +0200 Subject: [PATCH 01/21] Match TTs, enable MPI, shared processing CASA 6.4 and naming bugfix Various changes: - Match taylor terms of outliers to main image - Run science_image.py with MPI - Make bash scripts executable for all, so other users that have permission can use them - e.g. to check progress on pipeline run - Update default container to CASA 6.4 - Fix outlier naming bug that increased run-time --- processMeerKAT/bookkeeping.py | 11 ++++++++++- processMeerKAT/default_config.txt | 2 +- processMeerKAT/processMeerKAT.py | 4 ++-- processMeerKAT/science_image.py | 1 + processMeerKAT/selfcal_scripts/selfcal_part2.py | 11 +++++++---- processMeerKAT/selfcal_scripts/set_sky_model.py | 1 + 6 files changed, 22 insertions(+), 8 deletions(-) diff --git a/processMeerKAT/bookkeeping.py b/processMeerKAT/bookkeeping.py index 6bcaded..d4d8a85 100644 --- a/processMeerKAT/bookkeeping.py +++ b/processMeerKAT/bookkeeping.py @@ -180,7 +180,16 @@ def get_selfcal_params(): def get_selfcal_args(vis,loop,nloops,nterms,deconvolver,discard_nloops,calmode,outlier_threshold,threshold,step): visbase = os.path.split(vis.rstrip('/ '))[1] # Get only vis name, not entire path - basename = visbase.replace('.mms', '') + #Assume first target will be in the visname later (relevant for writing outliers.txt at beginning of pipeline) + if '.ms' in visbase: + from casatools import msmetadata + msmd = msmetadata() + msmd.open(vis) + basename = visbase.replace('.ms','.{0}'.format(msmd.namesforfields(msmd.fieldsforintent('TARGET'))[0])) + msmd.done() + else: + basename = visbase.replace('.mms', '') + imbase = basename + '_im_%d' # Images will be produced in $CWD imagename = imbase % loop outimage = imagename + '.image' diff --git a/processMeerKAT/default_config.txt b/processMeerKAT/default_config.txt index 0a21c22..64c4f54 100644 --- a/processMeerKAT/default_config.txt +++ b/processMeerKAT/default_config.txt @@ -17,7 +17,7 @@ partition = 'Main' # SLURM partition to use exclude = '' # SLURM nodes to exclude time = '12:00:00' submit = False -container = '/idia/software/containers/casa-6-2021-10-12.simg' +container = '/idia/software/containers/casa-6.4.simg' mpi_wrapper = 'mpirun' name = '' dependencies = '' diff --git a/processMeerKAT/processMeerKAT.py b/processMeerKAT/processMeerKAT.py index 73d54d5..d8cd241 100755 --- a/processMeerKAT/processMeerKAT.py +++ b/processMeerKAT/processMeerKAT.py @@ -61,7 +61,7 @@ IMAGING_CONFIG_KEYS = ['cell', 'robust', 'imsize', 'wprojplanes', 'niter', 'threshold', 'multiscale', 'nterms', 'gridder', 'deconvolver', 'restoringbeam', 'specmode', 'stokes', 'mask', 'rmsmap'] SLURM_CONFIG_STR_KEYS = ['container','mpi_wrapper','partition','time','name','dependencies','exclude','account','reservation'] SLURM_CONFIG_KEYS = ['nodes','ntasks_per_node','mem','plane','submit','precal_scripts','postcal_scripts','scripts','verbose','modules'] + SLURM_CONFIG_STR_KEYS -CONTAINER = '/idia/software/containers/casa-6-2021-10-12.simg' +CONTAINER = '/idia/software/containers/casa-6.4.simg' MPI_WRAPPER = 'mpirun' PRECAL_SCRIPTS = [('calc_refant.py',False,''),('partition.py',True,'')] #Scripts run before calibration at top level directory when nspw > 1 POSTCAL_SCRIPTS = [('concat.py',False,''),('plotcal_spw.py', False, ''),('selfcal_part1.py',True,''),('selfcal_part2.py',False,''),('science_image.py', True, '')] #Scripts run after calibration at top level directory when nspw > 1 @@ -888,7 +888,7 @@ def write_bash_job_script(master,filename,extn,do,purpose,dir='jobScripts',echo= master.write('\n#Create {0}.sh file, make executable and symlink to current version\n'.format(filename)) master.write('echo "#!/bin/bash" > {0}\n'.format(fname)) master.write('{0}{1}>> {2}\n'.format(do,do2,fname)) - master.write('chmod u+x {0}\n'.format(fname)) + master.write('chmod +x {0}\n'.format(fname)) master.write('ln -f -s {0} {1}.sh\n'.format(fname,filename)) if echo: master.write('echo Run ./{0}.sh to {1}.\n'.format(filename,purpose)) diff --git a/processMeerKAT/science_image.py b/processMeerKAT/science_image.py index 6ead3db..28403c5 100644 --- a/processMeerKAT/science_image.py +++ b/processMeerKAT/science_image.py @@ -9,6 +9,7 @@ from casatasks import * logfile=casalog.logfile() casalog.setlogfile('logs/{SLURM_JOB_NAME}-{SLURM_JOB_ID}.casa'.format(**os.environ)) +import casampi def science_image(vis, cell, robust, imsize, wprojplanes, niter, threshold, multiscale, nterms, gridder, deconvolver, restoringbeam, specmode, stokes, mask, rmsmap, outlierfile): diff --git a/processMeerKAT/selfcal_scripts/selfcal_part2.py b/processMeerKAT/selfcal_scripts/selfcal_part2.py index dcbb476..a3becf1 100644 --- a/processMeerKAT/selfcal_scripts/selfcal_part2.py +++ b/processMeerKAT/selfcal_scripts/selfcal_part2.py @@ -190,9 +190,9 @@ def find_outliers(vis, refant, dopol, nloops, loop, cell, robust, imsize, wprojp imsize=[{2},{2}] cell=[1.0arcsec,1.0arcsec] phasecenter={3} - nterms=3 + nterms={4} gridder=standard - {4}\n""".format(imbase%(index),i,outlier_imsize,position,mask)) + {5}\n""".format(imbase%(index),i,outlier_imsize,position,nterms[loop],mask)) out.close() @@ -203,6 +203,9 @@ def find_outliers(vis, refant, dopol, nloops, loop, cell, robust, imsize, wprojp sys.exit(1) #Write outlier file specific to this loop + if not os.path.exists(outlierfile_all) and os.path.exists('../{0}'.format(outlierfile_all)): + logger.warning('Using outliers from ../{0}'.format(outlierfile_all)) + outlierfile_all = '../{0}'.format(outlierfile_all) outliers=open(outlierfile_all).read() out = open(outlierfile,'w') @@ -309,9 +312,9 @@ def find_outliers(vis, refant, dopol, nloops, loop, cell, robust, imsize, wprojp imsize=[{2},{2}] cell=[1.0arcsec,1.0arcsec] phasecenter={3} - nterms=3 + nterms={4} gridder=standard - {4}\n""".format(imbase%(index),i,outlier_imsize,phasecenter,mask)) + {5}\n""".format(imbase%(index),i,outlier_imsize,phasecenter,nterms[loop],mask)) else: logger.info('Excluding "{0}", as it lies within the image footprint.'.format(outlier_bases[i])) diff --git a/processMeerKAT/selfcal_scripts/set_sky_model.py b/processMeerKAT/selfcal_scripts/set_sky_model.py index 3c0bc6c..64ca540 100644 --- a/processMeerKAT/selfcal_scripts/set_sky_model.py +++ b/processMeerKAT/selfcal_scripts/set_sky_model.py @@ -1,6 +1,7 @@ #Copyright (C) 2020 Inter-University Institute for Data Intensive Astronomy #See processMeerKAT.py for license details. +#!/usr/bin/env python3 import os import bookkeeping from selfcal_scripts.selfcal_part2 import find_outliers From eea9f88ecef420af01e3d57282bc28e580a02923 Mon Sep 17 00:00:00 2001 From: Jordan Collier Date: Wed, 22 Dec 2021 14:16:35 +0200 Subject: [PATCH 02/21] Temporary workaround for Singularity >= 3.6 --- setup.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.sh b/setup.sh index bd056b3..a955586 100644 --- a/setup.sh +++ b/setup.sh @@ -1,3 +1,4 @@ dir=$(dirname $BASH_SOURCE)/processMeerKAT export PATH=$PATH:$dir export PYTHONPATH=$PYTHONPATH:$dir +export SINGULARITYENV_PYTHONPATH=$PYTHONPATH:/opt/casaaddons From b89993ce85c13f9963c7330d48ece45d8e80f4d9 Mon Sep 17 00:00:00 2001 From: Jordan Collier Date: Wed, 22 Dec 2021 20:35:58 +0200 Subject: [PATCH 03/21] Slightly better workaround Slightly improved workaround for Singularity >= 3.6 (sensitive to different $PYTHONPATH within container), as suggest here: https://github.com/sylabs/singularity/issues/490#issuecomment-999628892 --- setup.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.sh b/setup.sh index a955586..a1637f3 100644 --- a/setup.sh +++ b/setup.sh @@ -1,4 +1,4 @@ dir=$(dirname $BASH_SOURCE)/processMeerKAT export PATH=$PATH:$dir export PYTHONPATH=$PYTHONPATH:$dir -export SINGULARITYENV_PYTHONPATH=$PYTHONPATH:/opt/casaaddons +export SINGULARITYENV_PYTHONPATH="$PYTHONPATH:\$PYTHONPATH" From 3ee57b4011cf8bcee637ec11efdf4f54f6509e53 Mon Sep 17 00:00:00 2001 From: Jordan Collier Date: Wed, 9 Mar 2022 10:20:49 +0200 Subject: [PATCH 04/21] Revert log renaming, update science image defaults, include additional outliers - Revert to renaming the CASA 6 logs after script is run - Update science imaging config defaults: 6k, 512 planes, 5k iterations, S/N=10 threshold - Include any outliers above threshold within edge 1% of main image --- processMeerKAT/bookkeeping.py | 2 +- processMeerKAT/default_config.txt | 8 ++++---- processMeerKAT/processMeerKAT.py | 2 +- processMeerKAT/selfcal_scripts/selfcal_part2.py | 6 +++++- 4 files changed, 11 insertions(+), 7 deletions(-) diff --git a/processMeerKAT/bookkeeping.py b/processMeerKAT/bookkeeping.py index d4d8a85..f58f97d 100644 --- a/processMeerKAT/bookkeeping.py +++ b/processMeerKAT/bookkeeping.py @@ -283,7 +283,7 @@ def run_script(func,logfile=''): if continue_run: try: func(args,taskvals) - #rename_logs(logfile) + rename_logs(logfile) except Exception as err: logger.error('Exception found in the pipeline of type {0}: {1}'.format(type(err),err)) logger.error(traceback.format_exc()) diff --git a/processMeerKAT/default_config.txt b/processMeerKAT/default_config.txt index 64c4f54..6306576 100644 --- a/processMeerKAT/default_config.txt +++ b/processMeerKAT/default_config.txt @@ -79,10 +79,10 @@ outlier_threshold = 0.0 # S/N values if >= 1.0, otherwise Jy [image] cell = '1.5arcsec' robust = -0.5 -imsize = [4096, 4096] -wprojplanes = 256 -niter = 100000 -threshold = 10e-6 # S/N value if >= 1.0 and rmsmap != '', otherwise Jy +imsize = [6144, 6144] +wprojplanes = 512 +niter = 50000 +threshold = 10 # S/N value if >= 1.0 and rmsmap != '', otherwise Jy multiscale = [0, 5, 10, 15] nterms = 2 # Number of taylor terms gridder = 'wproject' diff --git a/processMeerKAT/processMeerKAT.py b/processMeerKAT/processMeerKAT.py index d8cd241..b9ff6e6 100755 --- a/processMeerKAT/processMeerKAT.py +++ b/processMeerKAT/processMeerKAT.py @@ -58,7 +58,7 @@ FIELDS_CONFIG_KEYS = ['fluxfield','bpassfield','phasecalfield','targetfields','extrafields'] CROSSCAL_CONFIG_KEYS = ['minbaselines','chanbin','width','timeavg','createmms','keepmms','spw','nspw','calcrefant','refant','standard','badants','badfreqranges'] SELFCAL_CONFIG_KEYS = ['nloops','loop','cell','robust','imsize','wprojplanes','niter','threshold','uvrange','nterms','gridder','deconvolver','solint','calmode','discard_nloops','gaintype','outlier_threshold','flag'] -IMAGING_CONFIG_KEYS = ['cell', 'robust', 'imsize', 'wprojplanes', 'niter', 'threshold', 'multiscale', 'nterms', 'gridder', 'deconvolver', 'restoringbeam', 'specmode', 'stokes', 'mask', 'rmsmap'] +IMAGING_CONFIG_KEYS = ['cell', 'robust', 'imsize', 'wprojplanes', 'niter', 'threshold', 'multiscale', 'nterms', 'gridder', 'deconvolver', 'restoringbeam', 'specmode', 'stokes', 'mask', 'rmsmap','outlierfile'] SLURM_CONFIG_STR_KEYS = ['container','mpi_wrapper','partition','time','name','dependencies','exclude','account','reservation'] SLURM_CONFIG_KEYS = ['nodes','ntasks_per_node','mem','plane','submit','precal_scripts','postcal_scripts','scripts','verbose','modules'] + SLURM_CONFIG_STR_KEYS CONTAINER = '/idia/software/containers/casa-6.4.simg' diff --git a/processMeerKAT/selfcal_scripts/selfcal_part2.py b/processMeerKAT/selfcal_scripts/selfcal_part2.py index a3becf1..9371401 100644 --- a/processMeerKAT/selfcal_scripts/selfcal_part2.py +++ b/processMeerKAT/selfcal_scripts/selfcal_part2.py @@ -202,7 +202,7 @@ def find_outliers(vis, refant, dopol, nloops, loop, cell, robust, imsize, wprojp logger.error("Outlier file '{0}' doesn't exist, so sky model build went wrong. Will terminate process.".format(outlierfile_all)) sys.exit(1) - #Write outlier file specific to this loop + #Write outlier file specific to this loop, looking up from SPW directory where relevant if not os.path.exists(outlierfile_all) and os.path.exists('../{0}'.format(outlierfile_all)): logger.warning('Using outliers from ../{0}'.format(outlierfile_all)) outlierfile_all = '../{0}'.format(outlierfile_all) @@ -254,6 +254,10 @@ def find_outliers(vis, refant, dopol, nloops, loop, cell, robust, imsize, wprojp if axis in head.keys(): head.pop(head.index(axis)) + #Shrink image by 1% to include outliers on edge of main images + head['NAXIS1'] = int(head['NAXIS1']*0.99) + head['NAXIS2'] = int(head['NAXIS2']*0.99) + w = WCS(head) r=re.compile(r'phasecenter=J2000 (?P.*?) (?P.*?)\n') positions=[m.groupdict() for m in r.finditer(outliers)] From a33b522ba00858d27e806d6d8ff0f092160e8af0 Mon Sep 17 00:00:00 2001 From: Jordan Collier Date: Wed, 30 Mar 2022 12:01:57 +0200 Subject: [PATCH 05/21] Add manual model for J1130-1449 Add manual model from Russ Taylor for polarisation calibrator J1130-1449, taken from observations under the MeerKAT calibrator project. --- processMeerKAT/bookkeeping.py | 2 ++ processMeerKAT/crosscal_scripts/xy_yx_solve.py | 4 ++++ 2 files changed, 6 insertions(+) diff --git a/processMeerKAT/bookkeeping.py b/processMeerKAT/bookkeeping.py index f58f97d..ab1754b 100644 --- a/processMeerKAT/bookkeeping.py +++ b/processMeerKAT/bookkeeping.py @@ -85,6 +85,8 @@ def polfield_name(visname): polfield = list(set(["3C138", "0518+165", "0521+166", "J0521+1638"]).intersection(set(fieldnames)))[0] elif any([ff in ["3C48", "0134+329", "0137+331", "J0137+3309"] for ff in fieldnames]): polfield = list(set(["3C48", "0134+329", "0137+331", "J0137+3309"]).intersection(set(fieldnames)))[0] + elif "J1130-1449" in fieldnames: + polfield = "J1130-1449" else: logger.warning("No valid polarization field found. Defaulting to use the phase calibrator to solve for XY phase.") logger.warning("The polarization solutions found will likely be wrong. Please check the results carefully.") diff --git a/processMeerKAT/crosscal_scripts/xy_yx_solve.py b/processMeerKAT/crosscal_scripts/xy_yx_solve.py index 780aea1..94d2df1 100644 --- a/processMeerKAT/crosscal_scripts/xy_yx_solve.py +++ b/processMeerKAT/crosscal_scripts/xy_yx_solve.py @@ -47,6 +47,10 @@ def qu_polfield(polfield, visname): perley_frac = np.array([0.003, 0.005, 0.007]) perley_f = np.array([1050,1450,1640]) pa_polcal = np.array([25, 140, -5]) + elif polfield == "J1130-1449": #Manual model from Russ Taylor, taken from MeerKAT polarisation calibrator project + perley_frac = np.array([0.038, 0.050, 0.056]) + perley_f = np.array(1050, 1450, 1640]) + pa_polcal = np.array([145, 66, 45]) else: # This should never happen. raise ValueError("Invalid polarization field. Exiting.") From 750160418216e43b7ba455cc0febd4bba90bfd1e Mon Sep 17 00:00:00 2001 From: Jordan Collier Date: Wed, 30 Mar 2022 15:32:27 +0200 Subject: [PATCH 06/21] Typo! --- processMeerKAT/crosscal_scripts/xy_yx_solve.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processMeerKAT/crosscal_scripts/xy_yx_solve.py b/processMeerKAT/crosscal_scripts/xy_yx_solve.py index 94d2df1..fa46890 100644 --- a/processMeerKAT/crosscal_scripts/xy_yx_solve.py +++ b/processMeerKAT/crosscal_scripts/xy_yx_solve.py @@ -49,7 +49,7 @@ def qu_polfield(polfield, visname): pa_polcal = np.array([25, 140, -5]) elif polfield == "J1130-1449": #Manual model from Russ Taylor, taken from MeerKAT polarisation calibrator project perley_frac = np.array([0.038, 0.050, 0.056]) - perley_f = np.array(1050, 1450, 1640]) + perley_f = np.array([1050, 1450, 1640]) pa_polcal = np.array([145, 66, 45]) else: # This should never happen. From 0f9527cf6721199eabc74286ef36a712d90b2370 Mon Sep 17 00:00:00 2001 From: Jordan Collier Date: Wed, 30 Mar 2022 16:47:49 +0200 Subject: [PATCH 07/21] Centre outliers on brightest gaussian component - When running PyBDSF in selfcal_part2, write out the gaussian catalogue instead of the source catalogue - After this, use the position of the brightest gaussian for outliers, instead of the position (centroid) of the closest source --- processMeerKAT/selfcal_scripts/selfcal_part2.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/processMeerKAT/selfcal_scripts/selfcal_part2.py b/processMeerKAT/selfcal_scripts/selfcal_part2.py index 9371401..45d5f49 100644 --- a/processMeerKAT/selfcal_scripts/selfcal_part2.py +++ b/processMeerKAT/selfcal_scripts/selfcal_part2.py @@ -6,6 +6,7 @@ import shutil import os import re +import numpy as np import config_parser from config_parser import validate_args as va @@ -79,7 +80,7 @@ def pybdsf(imbase,rmsfile,imagename,outimage,thresh,maskfile,cat,trim_box=None,w # Write out island mask and FITS catalog img.export_image(outfile=maskfile, img_type='island_mask', img_format='casa', clobber=True) - img.write_catalog(outfile=cat, format='fits', clobber=True, catalog_type='srl') + img.write_catalog(outfile=cat, format='fits', clobber=True, catalog_type='gaul') if write_all: regionfile = imbase % loop + ".casabox" @@ -87,7 +88,7 @@ def pybdsf(imbase,rmsfile,imagename,outimage,thresh,maskfile,cat,trim_box=None,w #Write out catalogs img.write_catalog(outfile=regionfile, format='casabox', clobber=True, catalog_type='srl') - img.write_catalog(outfile=ascii, format='ascii', clobber=True, catalog_type='srl') + img.write_catalog(outfile=ascii, format='ascii', clobber=True, catalog_type='gaul') # Write out RMS image img.export_image(outfile=rmsfile, img_type='rms', img_format='casa', clobber=True) @@ -137,7 +138,7 @@ def find_outliers(vis, refant, dopol, nloops, loop, cell, robust, imsize, wprojp efluxcol = 'e_total_flux_source' racol = 'ra' deccol = 'dec' - outlier_threshold *= 1e3 #convert from mJy to Jy + outlier_threshold *= 1e3 #convert from Jy to mJy #Extract RACS positions within sky_model_radius try: @@ -299,13 +300,17 @@ def find_outliers(vis, refant, dopol, nloops, loop, cell, robust, imsize, wprojp mask = 'mask={0}'.format(outlier_pixmask) - #If catalog written, take new PyBDSF position closest to previous position + #If catalog written, take new PyBDSF position as brightest Gaussian component, or otherwise, closest to previous position + brightest = True if os.path.exists(outlier_cat): tab=fits.open(outlier_cat) data = tab[1].data tab.close() cat_positions = SkyCoord(ra=data['RA'],dec=data['Dec'],unit='deg,deg') - row,_,_ = pos.match_to_catalog_sky(cat_positions) + if brightest: + row = np.where(data['Total_flux'] == np.max(data['Total_flux']))[0][0] + else: + row,_,_ = pos.match_to_catalog_sky(cat_positions) phasecenter = 'J2000 {0}'.format(cat_positions[row].to_string('hmsdms')) else: logger.warning("PyBDSF catalogue '{0}' not created. Using old position and mask.".format(outlier_cat)) From 600625b2ade233d89361691a7b3934516aa317b3 Mon Sep 17 00:00:00 2001 From: Srikrishna Sekhar Date: Mon, 25 Apr 2022 11:18:05 +0200 Subject: [PATCH 08/21] Fixed typo in xy_yx_solve meanfreq was using Ghz when it should have been using MHz. --- processMeerKAT/crosscal_scripts/xy_yx_solve.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processMeerKAT/crosscal_scripts/xy_yx_solve.py b/processMeerKAT/crosscal_scripts/xy_yx_solve.py index fa46890..f7478dd 100644 --- a/processMeerKAT/crosscal_scripts/xy_yx_solve.py +++ b/processMeerKAT/crosscal_scripts/xy_yx_solve.py @@ -30,7 +30,7 @@ def qu_polfield(polfield, visname): """ msmd.open(visname) - meanfreq = msmd.meanfreq(0, unit='GHz') + meanfreq = msmd.meanfreq(0, unit='MHz') msmd.done() if polfield in ["3C286", "1328+307", "1331+305", "J1331+3030"]: From 0d3686131dbdf95454317422fb9497e466028013 Mon Sep 17 00:00:00 2001 From: Jordan Collier Date: Fri, 6 May 2022 21:05:02 +0200 Subject: [PATCH 09/21] CASA v6.4.4 and refant patches - Update to CASA 6.4.4 (and OpenMPI 4.0.3) - Bugfixes for calc_refant.py since we moved to CASA 6 and field names instead of IDs - Enable show_ant_stats.py to work out of the box --- processMeerKAT/aux_scripts/show_ant_stats.py | 2 +- processMeerKAT/crosscal_scripts/calc_refant.py | 10 +++++++--- processMeerKAT/default_config.txt | 4 ++-- processMeerKAT/processMeerKAT.py | 4 ++-- 4 files changed, 12 insertions(+), 8 deletions(-) diff --git a/processMeerKAT/aux_scripts/show_ant_stats.py b/processMeerKAT/aux_scripts/show_ant_stats.py index 42d70c5..0cdd4d3 100644 --- a/processMeerKAT/aux_scripts/show_ant_stats.py +++ b/processMeerKAT/aux_scripts/show_ant_stats.py @@ -14,7 +14,7 @@ if len(args) > 1: if sys.argv[1] == '-h': print('Usage: {0} filename bins'.format(os.path.split(sys.argv[0])[1])) - else: + elif sys.argv[1] != '--config': fname = sys.argv[1] bins = int(sys.argv[2]) diff --git a/processMeerKAT/crosscal_scripts/calc_refant.py b/processMeerKAT/crosscal_scripts/calc_refant.py index 3957142..17987f3 100644 --- a/processMeerKAT/crosscal_scripts/calc_refant.py +++ b/processMeerKAT/crosscal_scripts/calc_refant.py @@ -14,8 +14,9 @@ from casatasks import * logfile=casalog.logfile() casalog.setlogfile('logs/{SLURM_JOB_NAME}-{SLURM_JOB_ID}.casa'.format(**os.environ)) -from casatools import msmetadata +from casatools import msmetadata,table msmd = msmetadata() +tb = table() import logging from time import gmtime @@ -26,6 +27,8 @@ def get_ref_ant(visname, fluxfield): msmd.open(visname) + if type(fluxfield) is str: + fluxfield = msmd.fieldsforname(fluxfield)[0] fluxscans = msmd.scansforfield(int(fluxfield)) logger.info("Flux field scan no: %d" % fluxscans[0]) antennas = msmd.antennasforscan(fluxscans[0]) @@ -37,7 +40,7 @@ def get_ref_ant(visname, fluxfield): tb.open(visname) fptr = open('ant_stats.txt', 'w') - fptr.write(header) + fptr.write(header + '\n') antflags = [] for ant in antennas: @@ -46,12 +49,13 @@ def get_ref_ant(visname, fluxfield): if antdat.size == 0: flags = 1 fptr.write('{0: <3} {1:.4f}\n'.format(ant, np.nan)) + logger.info('{0: <3} {1:.4f}'.format(ant, np.nan)) antflags.append(flags) continue flags = np.count_nonzero(antdat)/float(antdat.size) - fptr.write('\n{0: <3} {1:.4f}'.format(ant, flags)) + fptr.write('{0: <3} {1:.4f}\n'.format(ant, flags)) logger.info('{0: <3} {1:.4f}'.format(ant, flags)) antflags.append(flags) diff --git a/processMeerKAT/default_config.txt b/processMeerKAT/default_config.txt index 6306576..5505f0f 100644 --- a/processMeerKAT/default_config.txt +++ b/processMeerKAT/default_config.txt @@ -17,13 +17,13 @@ partition = 'Main' # SLURM partition to use exclude = '' # SLURM nodes to exclude time = '12:00:00' submit = False -container = '/idia/software/containers/casa-6.4.simg' +container = '/idia/software/containers/casa-6.4.4-modular.simg' mpi_wrapper = 'mpirun' name = '' dependencies = '' account = 'b03-idia-ag' reservation = '' -modules = ['openmpi/2.1.1'] +modules = ['openmpi/4.0.3'] verbose = False precal_scripts = [('calc_refant.py',False,''), ('partition.py',True,'')] postcal_scripts = [('concat.py',False,''), ('plotcal_spw.py', False, ''), ('selfcal_part1.py',True,''), ('selfcal_part2.py',False,''), ('science_image.py', True, '')] diff --git a/processMeerKAT/processMeerKAT.py b/processMeerKAT/processMeerKAT.py index b9ff6e6..d623b42 100755 --- a/processMeerKAT/processMeerKAT.py +++ b/processMeerKAT/processMeerKAT.py @@ -61,7 +61,7 @@ IMAGING_CONFIG_KEYS = ['cell', 'robust', 'imsize', 'wprojplanes', 'niter', 'threshold', 'multiscale', 'nterms', 'gridder', 'deconvolver', 'restoringbeam', 'specmode', 'stokes', 'mask', 'rmsmap','outlierfile'] SLURM_CONFIG_STR_KEYS = ['container','mpi_wrapper','partition','time','name','dependencies','exclude','account','reservation'] SLURM_CONFIG_KEYS = ['nodes','ntasks_per_node','mem','plane','submit','precal_scripts','postcal_scripts','scripts','verbose','modules'] + SLURM_CONFIG_STR_KEYS -CONTAINER = '/idia/software/containers/casa-6.4.simg' +CONTAINER = '/idia/software/containers/casa-6.4.4-modular.simg' MPI_WRAPPER = 'mpirun' PRECAL_SCRIPTS = [('calc_refant.py',False,''),('partition.py',True,'')] #Scripts run before calibration at top level directory when nspw > 1 POSTCAL_SCRIPTS = [('concat.py',False,''),('plotcal_spw.py', False, ''),('selfcal_part1.py',True,''),('selfcal_part2.py',False,''),('science_image.py', True, '')] #Scripts run after calibration at top level directory when nspw > 1 @@ -188,7 +188,7 @@ def parse_scripts(val): help="Run pipeline with these scripts, in this order, using these containers (3rd value - empty string to default to [-c --container]). Is it threadsafe (2nd value)?") parser.add_argument("-b","--precal_scripts", action='append', nargs=3, metavar=('script','threadsafe','container'), required=False, type=parse_scripts, default=PRECAL_SCRIPTS, help="Same as [-S --scripts], but run before calibration.") parser.add_argument("-a","--postcal_scripts", action='append', nargs=3, metavar=('script','threadsafe','container'), required=False, type=parse_scripts, default=POSTCAL_SCRIPTS, help="Same as [-S --scripts], but run after calibration.") - parser.add_argument("--modules", nargs='*', metavar='module', required=False, default=['openmpi/2.1.1'], help="Load these modules within each sbatch script.") + parser.add_argument("--modules", nargs='*', metavar='module', required=False, default=['openmpi/4.0.3'], help="Load these modules within each sbatch script.") parser.add_argument("-w","--mpi_wrapper", metavar="path", required=False, type=str, default=MPI_WRAPPER, help="Use this mpi wrapper when calling threadsafe scripts [default: '{0}'].".format(MPI_WRAPPER)) parser.add_argument("-c","--container", metavar="path", required=False, type=str, default=CONTAINER, help="Use this container when calling scripts [default: '{0}'].".format(CONTAINER)) From 46e9525324c491269551fea9c5a5db2b938f37de Mon Sep 17 00:00:00 2001 From: Jordan Collier Date: Mon, 9 May 2022 16:56:21 +0200 Subject: [PATCH 10/21] Don't split out bad antennas If badants isn't an empty list, exclude those antennas during the split step at the end of the pipeline. --- processMeerKAT/crosscal_scripts/split.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/processMeerKAT/crosscal_scripts/split.py b/processMeerKAT/crosscal_scripts/split.py index ba0c706..8d90aa0 100644 --- a/processMeerKAT/crosscal_scripts/split.py +++ b/processMeerKAT/crosscal_scripts/split.py @@ -15,11 +15,12 @@ import casampi msmd = msmetadata() -def split_vis(visname, spw, fields, specavg, timeavg, keepmms): +def split_vis(visname, spw, fields, specavg, timeavg, keepmms, badants): outputbase = os.path.splitext(os.path.split(visname)[1])[0] extn = 'mms' if keepmms else 'ms' newvis = visname + antenna = '!{0}'.format(','.join(map(str,badants))) if len(badants) > 0 else '' for field in fields: if field != '': @@ -32,7 +33,7 @@ def split_vis(visname, spw, fields, specavg, timeavg, keepmms): split(vis=visname, outputvis=outname, datacolumn='corrected', field=fname, spw=spw, keepflags=True, keepmms=keepmms, - width=specavg, timebin=timeavg) + width=specavg, timebin=timeavg, antenna=antenna) if fname == fields.targetfield.split(',')[0]: newvis = outname @@ -47,13 +48,14 @@ def main(args,taskvals): fields = bookkeeping.get_field_ids(taskvals['fields']) spw = va(taskvals, 'crosscal', 'spw', str, default='') + badants = taskvals['crosscal'].pop('badants') specavg = va(taskvals, 'crosscal', 'width', int, default=1) timeavg = va(taskvals, 'crosscal', 'timeavg', str, default='8s') keepmms = va(taskvals, 'crosscal', 'keepmms', bool) msmd.open(visname) - newvis = split_vis(visname, spw, fields, specavg, timeavg, keepmms) + newvis = split_vis(visname, spw, fields, specavg, timeavg, keepmms, badants) config_parser.overwrite_config(args['config'], conf_dict={'vis' : "'{0}'".format(newvis)}, conf_sec='data') config_parser.overwrite_config(args['config'], conf_dict={'crosscal_vis': "'{0}'".format(visname)}, conf_sec='run', sec_comment='# Internal variables for pipeline execution') From c5523c6830929387618dca0f0d5edd55744ac03e Mon Sep 17 00:00:00 2001 From: Jordan Collier Date: Thu, 26 May 2022 14:26:11 +0200 Subject: [PATCH 11/21] Use config target field ID to generate outliers Use the `targetfields` parameter from the config to identify phase centre of target for selecting outliers from within 2 degrees. --- .../selfcal_scripts/selfcal_part2.py | 20 ++++++++++++++++--- .../selfcal_scripts/set_sky_model.py | 4 +++- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/processMeerKAT/selfcal_scripts/selfcal_part2.py b/processMeerKAT/selfcal_scripts/selfcal_part2.py index 45d5f49..f6b698e 100644 --- a/processMeerKAT/selfcal_scripts/selfcal_part2.py +++ b/processMeerKAT/selfcal_scripts/selfcal_part2.py @@ -93,8 +93,8 @@ def pybdsf(imbase,rmsfile,imagename,outimage,thresh,maskfile,cat,trim_box=None,w # Write out RMS image img.export_image(outfile=rmsfile, img_type='rms', img_format='casa', clobber=True) -def find_outliers(vis, refant, dopol, nloops, loop, cell, robust, imsize, wprojplanes, niter, threshold, uvrange, - nterms, gridder, deconvolver, solint, calmode, discard_nloops, gaintype, outlier_threshold, flag, step): +def find_outliers(vis, refant, dopol, nloops, loop, cell, robust, imsize, wprojplanes, niter, threshold, uvrange, nterms, + gridder, deconvolver, solint, calmode, discard_nloops, gaintype, outlier_threshold, flag, step, targetfields=None): local = locals() local.pop('step') @@ -125,7 +125,21 @@ def find_outliers(vis, refant, dopol, nloops, loop, cell, robust, imsize, wprojp else: tmpvis = vis msmd.open(tmpvis) - dir=msmd.sourcedirs()[str(msmd.fieldsforintent('TARGET')[0])] + + #Force taking first target field + if type(targetfields) is str and ',' in targetfields: + targetfield = targetfields.split(',')[0] + msg = 'Multiple target fields input ("{0}"), but only one position can be used to identify outliers (for outlier imaging). Using "{1}".' + logger.warning(msg.format(targetfields,targetfield)) + else: + targetfield = targetfields + #Make sure it's an integer + try: + targetfield = int(targetfield) + except ValueError: # It's not an int, but a str + targetfield = msmd.fieldsforname(targetfield)[0] + + dir=msmd.sourcedirs()[str(targetfield)] ra=qa.convert(dir['m0'],'deg')['value'] dec=qa.convert(dir['m1'],'deg')['value'] if ra < 0: diff --git a/processMeerKAT/selfcal_scripts/set_sky_model.py b/processMeerKAT/selfcal_scripts/set_sky_model.py index 64ca540..063ad05 100644 --- a/processMeerKAT/selfcal_scripts/set_sky_model.py +++ b/processMeerKAT/selfcal_scripts/set_sky_model.py @@ -4,6 +4,7 @@ #!/usr/bin/env python3 import os import bookkeeping +import config_parser from selfcal_scripts.selfcal_part2 import find_outliers from casatasks import casalog logfile=casalog.logfile() @@ -12,5 +13,6 @@ if __name__ == '__main__': args,params = bookkeeping.get_selfcal_params() - find_outliers(**params,step='sky') + targetfields = config_parser.parse_config(args['config'])[0]['fields']['targetfields'] + find_outliers(**params,step='sky',targetfields=targetfields) bookkeeping.rename_logs(logfile) From e0ddc981a86923fdbd2284cfc07d1a2704b0adc1 Mon Sep 17 00:00:00 2001 From: Jordan Collier Date: Thu, 26 May 2022 16:49:01 +0200 Subject: [PATCH 12/21] Assume `TARGET` intent doesn't exist elsewhere When outlier imaging is triggered at the beginning of the pipeline, it assumes a naming convention for the outliers, based on the name of the first target field. This is now retrieved from the config instead of being based on the MS intents (for non-compliant MSs). --- processMeerKAT/bookkeeping.py | 30 ++++++++++++++----- .../selfcal_scripts/selfcal_part2.py | 22 +++----------- .../selfcal_scripts/set_sky_model.py | 3 +- 3 files changed, 28 insertions(+), 27 deletions(-) diff --git a/processMeerKAT/bookkeeping.py b/processMeerKAT/bookkeeping.py index ab1754b..3e34708 100644 --- a/processMeerKAT/bookkeeping.py +++ b/processMeerKAT/bookkeeping.py @@ -181,17 +181,33 @@ def get_selfcal_params(): def get_selfcal_args(vis,loop,nloops,nterms,deconvolver,discard_nloops,calmode,outlier_threshold,threshold,step): + from casatools import msmetadata + msmd = msmetadata() + msmd.open(vis) + visbase = os.path.split(vis.rstrip('/ '))[1] # Get only vis name, not entire path - #Assume first target will be in the visname later (relevant for writing outliers.txt at beginning of pipeline) + targetfields = config_parser.get_key(config_parser.parse_args()['config'], 'fields', 'targetfields') + + #Force taking first target field (relevant for writing outliers.txt at beginning of pipeline) + if type(targetfields) is str and ',' in targetfields: + targetfield = targetfields.split(',')[0] + msg = 'Multiple target fields input ("{0}"), but only one position can be used to identify outliers (for outlier imaging). Using "{1}".' + logger.warning(msg.format(targetfields,targetfield)) + else: + targetfield = targetfields + #Make sure it's an integer + try: + targetfield = int(targetfield) + except ValueError: # It's not an int, but a str + targetfield = msmd.fieldsforname(targetfield)[0] + if '.ms' in visbase: - from casatools import msmetadata - msmd = msmetadata() - msmd.open(vis) - basename = visbase.replace('.ms','.{0}'.format(msmd.namesforfields(msmd.fieldsforintent('TARGET'))[0])) - msmd.done() + basename = visbase.replace('.ms','.{0}'.format(msmd.namesforfields(targetfield)[0])) else: basename = visbase.replace('.mms', '') + msmd.done() + imbase = basename + '_im_%d' # Images will be produced in $CWD imagename = imbase % loop outimage = imagename + '.image' @@ -244,7 +260,7 @@ def get_selfcal_args(vis,loop,nloops,nterms,deconvolver,discard_nloops,calmode,o elif step == 'bdsf': thresh = threshold[loop] - return imbase,imagename,outimage,pixmask,rmsfile,caltable,prev_caltables,threshold,outlierfile,cfcache,thresh,maskfile + return imbase,imagename,outimage,pixmask,rmsfile,caltable,prev_caltables,threshold,outlierfile,cfcache,thresh,maskfile,targetfield def rename_logs(logfile=''): diff --git a/processMeerKAT/selfcal_scripts/selfcal_part2.py b/processMeerKAT/selfcal_scripts/selfcal_part2.py index f6b698e..a12667d 100644 --- a/processMeerKAT/selfcal_scripts/selfcal_part2.py +++ b/processMeerKAT/selfcal_scripts/selfcal_part2.py @@ -36,7 +36,7 @@ def selfcal_part2(vis, refant, dopol, nloops, loop, cell, robust, imsize, wprojplanes, niter, threshold, uvrange, nterms, gridder, deconvolver, solint, calmode, discard_nloops, gaintype, outlier_threshold, flag): - imbase,imagename,outimage,pixmask,rmsfile,caltable,prev_caltables,threshold,outlierfile,cfcache,_,_ = bookkeeping.get_selfcal_args(vis,loop,nloops,nterms,deconvolver,discard_nloops,calmode,outlier_threshold,threshold,step='predict') + imbase,imagename,outimage,pixmask,rmsfile,caltable,prev_caltables,threshold,outlierfile,cfcache,_,_,_ = bookkeeping.get_selfcal_args(vis,loop,nloops,nterms,deconvolver,discard_nloops,calmode,outlier_threshold,threshold,step='predict') if calmode[loop] != '': if os.path.exists(caltable): @@ -94,11 +94,11 @@ def pybdsf(imbase,rmsfile,imagename,outimage,thresh,maskfile,cat,trim_box=None,w img.export_image(outfile=rmsfile, img_type='rms', img_format='casa', clobber=True) def find_outliers(vis, refant, dopol, nloops, loop, cell, robust, imsize, wprojplanes, niter, threshold, uvrange, nterms, - gridder, deconvolver, solint, calmode, discard_nloops, gaintype, outlier_threshold, flag, step, targetfields=None): + gridder, deconvolver, solint, calmode, discard_nloops, gaintype, outlier_threshold, flag, step): local = locals() local.pop('step') - imbase,imagename,outimage,pixmask,rmsfile,caltable,prev_caltables,threshold,outlierfile,cfcache,thresh,maskfile = bookkeeping.get_selfcal_args(vis,loop,nloops,nterms,deconvolver,discard_nloops,calmode,outlier_threshold,threshold,step=step) + imbase,imagename,outimage,pixmask,rmsfile,caltable,prev_caltables,threshold,outlierfile,cfcache,thresh,maskfile,targetfield = bookkeeping.get_selfcal_args(vis,loop,nloops,nterms,deconvolver,discard_nloops,calmode,outlier_threshold,threshold,step) cat = imagename + ".catalog.fits" outlierfile_all = 'outliers.txt' fitsname = imagename + '.fits' @@ -125,20 +125,6 @@ def find_outliers(vis, refant, dopol, nloops, loop, cell, robust, imsize, wprojp else: tmpvis = vis msmd.open(tmpvis) - - #Force taking first target field - if type(targetfields) is str and ',' in targetfields: - targetfield = targetfields.split(',')[0] - msg = 'Multiple target fields input ("{0}"), but only one position can be used to identify outliers (for outlier imaging). Using "{1}".' - logger.warning(msg.format(targetfields,targetfield)) - else: - targetfield = targetfields - #Make sure it's an integer - try: - targetfield = int(targetfield) - except ValueError: # It's not an int, but a str - targetfield = msmd.fieldsforname(targetfield)[0] - dir=msmd.sourcedirs()[str(targetfield)] ra=qa.convert(dir['m0'],'deg')['value'] dec=qa.convert(dir['m1'],'deg')['value'] @@ -358,7 +344,7 @@ def find_outliers(vis, refant, dopol, nloops, loop, cell, robust, imsize, wprojp def mask_image(vis, refant, dopol, nloops, loop, cell, robust, imsize, wprojplanes, niter, threshold, uvrange, nterms, gridder, deconvolver, solint, calmode, discard_nloops, gaintype, outlier_threshold, flag, outlier_base='', outlier_image=''): - imbase,imagename,outimage,pixmask,rmsfile,caltable,prev_caltables,threshold,outlierfile,cfcache,thresh,maskfile = bookkeeping.get_selfcal_args(vis,loop,nloops,nterms,deconvolver,discard_nloops,calmode,outlier_threshold,threshold,step='mask') + imbase,imagename,outimage,pixmask,rmsfile,caltable,prev_caltables,threshold,outlierfile,cfcache,thresh,maskfile,_ = bookkeeping.get_selfcal_args(vis,loop,nloops,nterms,deconvolver,discard_nloops,calmode,outlier_threshold,threshold,step='mask') if outlier_base != '': maskfile = outlier_base + '.islmask' diff --git a/processMeerKAT/selfcal_scripts/set_sky_model.py b/processMeerKAT/selfcal_scripts/set_sky_model.py index 063ad05..3d2f758 100644 --- a/processMeerKAT/selfcal_scripts/set_sky_model.py +++ b/processMeerKAT/selfcal_scripts/set_sky_model.py @@ -13,6 +13,5 @@ if __name__ == '__main__': args,params = bookkeeping.get_selfcal_params() - targetfields = config_parser.parse_config(args['config'])[0]['fields']['targetfields'] - find_outliers(**params,step='sky',targetfields=targetfields) + find_outliers(**params,step='sky') bookkeeping.rename_logs(logfile) From 43eb73dfc559aac8b4ffa528d339f939e8b19280 Mon Sep 17 00:00:00 2001 From: Jordan Collier Date: Thu, 26 May 2022 21:54:05 +0200 Subject: [PATCH 13/21] Bugfix --- processMeerKAT/selfcal_scripts/selfcal_part1.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processMeerKAT/selfcal_scripts/selfcal_part1.py b/processMeerKAT/selfcal_scripts/selfcal_part1.py index 5360c59..a667941 100644 --- a/processMeerKAT/selfcal_scripts/selfcal_part1.py +++ b/processMeerKAT/selfcal_scripts/selfcal_part1.py @@ -44,7 +44,7 @@ def symlink_psf(imagenames,loop): def selfcal_part1(vis, refant, dopol, nloops, loop, cell, robust, imsize, wprojplanes, niter, threshold, uvrange, nterms, gridder, deconvolver, solint, calmode, discard_nloops, gaintype, outlier_threshold, flag): - imbase,imagename,outimage,pixmask,rmsfile,caltable,prev_caltables,threshold,outlierfile,cfcache,_,_ = bookkeeping.get_selfcal_args(vis,loop,nloops,nterms,deconvolver,discard_nloops,calmode,outlier_threshold,threshold,step='tclean') + imbase,imagename,outimage,pixmask,rmsfile,caltable,prev_caltables,threshold,outlierfile,cfcache,_,_,_ = bookkeeping.get_selfcal_args(vis,loop,nloops,nterms,deconvolver,discard_nloops,calmode,outlier_threshold,threshold,step='tclean') calcpsf = True #Add model column with MPI rather than in selfcal_part2 without MPI. From ade4ddee11d08e83afd047f581f974cf6104ff34 Mon Sep 17 00:00:00 2001 From: Srikrishna Sekhar Date: Tue, 14 Jun 2022 20:31:46 +0200 Subject: [PATCH 14/21] Added PB correction to science_image.py science_image.py now produces an additional PB corrected image (using katbeam) at the end of imaging. The cutoff for the PB can be specified by the pbthreshold parameter in the imaging section of the config file. The default container has been updated to use the new CASA 6.5 release, the container also contains katbeam as a dependency. --- processMeerKAT/default_config.txt | 3 +- processMeerKAT/processMeerKAT.py | 4 +- processMeerKAT/science_image.py | 92 ++++++++++++++++++++++++++++++- 3 files changed, 95 insertions(+), 4 deletions(-) diff --git a/processMeerKAT/default_config.txt b/processMeerKAT/default_config.txt index 5505f0f..996a0a9 100644 --- a/processMeerKAT/default_config.txt +++ b/processMeerKAT/default_config.txt @@ -17,7 +17,7 @@ partition = 'Main' # SLURM partition to use exclude = '' # SLURM nodes to exclude time = '12:00:00' submit = False -container = '/idia/software/containers/casa-6.4.4-modular.simg' +container = '/idia/software/containers/casa-6.5.0-modular.sif' mpi_wrapper = 'mpirun' name = '' dependencies = '' @@ -90,6 +90,7 @@ deconvolver = 'mtmfs' restoringbeam = '' specmode = 'mfs' stokes = 'I' +pbthreshold = 0.1 # Threshold below which to mask the PB for PB correction mask = '' rmsmap = '' outlierfile = '' diff --git a/processMeerKAT/processMeerKAT.py b/processMeerKAT/processMeerKAT.py index d623b42..0282e6c 100755 --- a/processMeerKAT/processMeerKAT.py +++ b/processMeerKAT/processMeerKAT.py @@ -58,10 +58,10 @@ FIELDS_CONFIG_KEYS = ['fluxfield','bpassfield','phasecalfield','targetfields','extrafields'] CROSSCAL_CONFIG_KEYS = ['minbaselines','chanbin','width','timeavg','createmms','keepmms','spw','nspw','calcrefant','refant','standard','badants','badfreqranges'] SELFCAL_CONFIG_KEYS = ['nloops','loop','cell','robust','imsize','wprojplanes','niter','threshold','uvrange','nterms','gridder','deconvolver','solint','calmode','discard_nloops','gaintype','outlier_threshold','flag'] -IMAGING_CONFIG_KEYS = ['cell', 'robust', 'imsize', 'wprojplanes', 'niter', 'threshold', 'multiscale', 'nterms', 'gridder', 'deconvolver', 'restoringbeam', 'specmode', 'stokes', 'mask', 'rmsmap','outlierfile'] +IMAGING_CONFIG_KEYS = ['cell', 'robust', 'imsize', 'wprojplanes', 'niter', 'threshold', 'multiscale', 'nterms', 'gridder', 'deconvolver', 'restoringbeam', 'specmode', 'stokes', 'mask', 'rmsmap','outlierfile', 'pbthreshold'] SLURM_CONFIG_STR_KEYS = ['container','mpi_wrapper','partition','time','name','dependencies','exclude','account','reservation'] SLURM_CONFIG_KEYS = ['nodes','ntasks_per_node','mem','plane','submit','precal_scripts','postcal_scripts','scripts','verbose','modules'] + SLURM_CONFIG_STR_KEYS -CONTAINER = '/idia/software/containers/casa-6.4.4-modular.simg' +CONTAINER = '/idia/software/containers/casa-6.5.0-modular.sif' MPI_WRAPPER = 'mpirun' PRECAL_SCRIPTS = [('calc_refant.py',False,''),('partition.py',True,'')] #Scripts run before calibration at top level directory when nspw > 1 POSTCAL_SCRIPTS = [('concat.py',False,''),('plotcal_spw.py', False, ''),('selfcal_part1.py',True,''),('selfcal_part2.py',False,''),('science_image.py', True, '')] #Scripts run after calibration at top level directory when nspw > 1 diff --git a/processMeerKAT/science_image.py b/processMeerKAT/science_image.py index 28403c5..0f43ccf 100644 --- a/processMeerKAT/science_image.py +++ b/processMeerKAT/science_image.py @@ -2,6 +2,7 @@ #See processMeerKAT.py for license details. import os +import numpy as np import config_parser import bookkeeping @@ -11,7 +12,89 @@ casalog.setlogfile('logs/{SLURM_JOB_NAME}-{SLURM_JOB_ID}.casa'.format(**os.environ)) import casampi -def science_image(vis, cell, robust, imsize, wprojplanes, niter, threshold, multiscale, nterms, gridder, deconvolver, restoringbeam, specmode, stokes, mask, rmsmap, outlierfile): +import shutil +from katbeam import JimBeam +from casatools import image +ia = image() + + +def do_pb_corr(inpimage, pbthreshold=0): + """ + Given the input CASA image, outputs a katbeam corrected image, optionally + cutoff at a specified threshold. + + Inputs: + inpimage Input CASA image name, str + threshold Cutoff threshold to mask the PB, float + + Outputs: + None + """ + + pbcorimage = inpimage.replace('.image', '.katbeam_pbcor.image') + pbimage = inpimage.replace('.image', '.katbeam.pb') + + LBandBeam = JimBeam('MKAT-AA-L-JIM-2020') + + ia.open(inpimage) + csys = ia.coordsys().torecord() + imgdata = ia.getchunk() + shape = ia.shape() + ia.close() + + cx, cy = shape[0]//2, shape[1]//2 + + # Size of each pixel + cdelt = np.abs(csys['direction0']['cdelt'][0]) + unit = csys['direction0']['units'][0] + + if unit == 'rad': + cdelt = np.rad2deg(cdelt) + elif unit == "'": #arcmin + cdelt /= 60. + + # Frequency of image, convert from Hz to MHz + try: + freq = csys['spectral1']['wcs']['crval']/1e6 + except KeyError: + freq = csys['spectral2']['wcs']['crval']/1e6 + + + x = np.linspace(-cx, cx+1, shape[0]) + y = np.linspace(-cy, cy+1, shape[1]) + + xx, yy = np.meshgrid(x, y) + + # Convert pixels into separation in degrees + xx *= cdelt + yy *= cdelt + + # Generate the 2D PB image + beam_I = LBandBeam.I(xx, yy, freq) + + # Match shape with image data for PB correction + if len(shape) == 4: + beam_I = beam_I[:, :, None, None] + + pbcor_imgdata = imgdata/beam_I + + # Mask below the threshold + if pbthreshold > 0: + pbcor_imgdata[beam_I < pbthreshold] = np.nan + beam_I[beam_I < pbthreshold] = np.nan + + shutil.copytree(inpimage, pbimage) + ia.open(pbimage) + ia.putchunk(beam_I) + ia.close() + + shutil.copytree(inpimage, pbcorimage) + ia.open(pbcorimage) + ia.putchunk(pbcor_imgdata) + ia.close() + + +def science_image(vis, cell, robust, imsize, wprojplanes, niter, threshold, multiscale, nterms, gridder, deconvolver, restoringbeam, specmode, stokes, mask, rmsmap, outlierfile, pbthreshold): visbase = os.path.split(vis.rstrip('/ '))[1] # Get only vis name, not entire path imagename = visbase.replace('.mms', '.science_image') # Images will be produced in $CWD @@ -27,6 +110,13 @@ def science_image(vis, cell, robust, imsize, wprojplanes, niter, threshold, mult threshold=threshold, nterms=nterms, calcpsf=True, mask=mask, outlierfile=outlierfile, pblimit=-1, restoringbeam=restoringbeam, parallel = True) + if nterms > 1: + imname = imagename + '.image.tt0' + elif nterms == 1: + imname = imagename + '.image' + + do_pb_corr(imname, pbthreshold) + if __name__ == '__main__': args,params = bookkeeping.get_imaging_params() From 2cbd59543f0142ca58dc83082685a8e08a561263 Mon Sep 17 00:00:00 2001 From: Jordan Collier Date: Thu, 16 Jun 2022 11:09:11 +0200 Subject: [PATCH 15/21] Various v2.0 changes, primarily to outlier imaging Outlier imaging: - Improvement to run-time for calculating outliers, by using local copy of RACS catalogue instead of new tap query, and reading first sub-MS instead of large MMS (if present) - Config parameter for search radius for outliers from RACS - If 0.0 or '' used, calculate search radius by reading input MS - If whole catalogued flux of an outlier is < 1 mJy, or no source is catalogued, discard it General: - Ensure MHz unit when checking validity of SPW selection - Select SPWs with '*' instead of '0' (e.g. when having already processed with the pipeline but wanting a different SPW selection) - Cosmetic changes and minor bug fixes --- processMeerKAT/bookkeeping.py | 51 ++++++++++++--- processMeerKAT/config_parser.py | 2 +- .../crosscal_scripts/plotcal_spw.py | 2 +- processMeerKAT/default_config.txt | 3 +- processMeerKAT/processMeerKAT.py | 56 +++++++++-------- processMeerKAT/read_ms.py | 44 ++++++++----- processMeerKAT/science_image.py | 5 +- .../selfcal_scripts/selfcal_part1.py | 6 +- .../selfcal_scripts/selfcal_part2.py | 62 ++++++++++--------- 9 files changed, 147 insertions(+), 84 deletions(-) diff --git a/processMeerKAT/bookkeeping.py b/processMeerKAT/bookkeeping.py index 3e34708..cb3b2c1 100644 --- a/processMeerKAT/bookkeeping.py +++ b/processMeerKAT/bookkeeping.py @@ -10,6 +10,7 @@ from collections import namedtuple import os import glob +import re import logging from time import gmtime @@ -127,7 +128,7 @@ def get_selfcal_params(): if params['dopol'] and 'G' in params['gaintype']: logger.warning("dopol is True, but gaintype includes 'G'. Use gaintype='T' for polarisation on linear feeds (e.g. MeerKAT).") - single_args = ['nloops','loop','discard_nloops','outlier_threshold'] #need to be 1 long (i.e. not a list) + single_args = ['nloops','loop','discard_nloops','outlier_threshold','outlier_radius'] #need to be 1 long (i.e. not a list) gaincal_args = ['solint','calmode','gaintype','flag'] #need to be nloops long list_args = ['imsize'] #allowed to be lists of lists @@ -179,11 +180,19 @@ def get_selfcal_params(): return args,params -def get_selfcal_args(vis,loop,nloops,nterms,deconvolver,discard_nloops,calmode,outlier_threshold,threshold,step): +def get_selfcal_args(vis,loop,nloops,nterms,deconvolver,discard_nloops,calmode,outlier_threshold,outlier_radius,threshold,step): - from casatools import msmetadata + from casatools import msmetadata,quanta + from read_ms import check_spw msmd = msmetadata() - msmd.open(vis) + qa = quanta() + + if os.path.exists('{0}/SUBMSS'.format(vis)): + tmpvis = glob.glob('{0}/SUBMSS/*'.format(vis))[0] + else: + tmpvis = vis + + msmd.open(tmpvis) visbase = os.path.split(vis.rstrip('/ '))[1] # Get only vis name, not entire path targetfields = config_parser.get_key(config_parser.parse_args()['config'], 'fields', 'targetfields') @@ -201,13 +210,11 @@ def get_selfcal_args(vis,loop,nloops,nterms,deconvolver,discard_nloops,calmode,o except ValueError: # It's not an int, but a str targetfield = msmd.fieldsforname(targetfield)[0] - if '.ms' in visbase: + if '.ms' in visbase and str(targetfield) not in visbase: basename = visbase.replace('.ms','.{0}'.format(msmd.namesforfields(targetfield)[0])) else: basename = visbase.replace('.mms', '') - msmd.done() - imbase = basename + '_im_%d' # Images will be produced in $CWD imagename = imbase % loop outimage = imagename + '.image' @@ -245,9 +252,23 @@ def get_selfcal_args(vis,loop,nloops,nterms,deconvolver,discard_nloops,calmode,o outlierfile = 'outliers_loop{0}.txt'.format(loop) else: outlierfile = 'outliers_loop{0}.txt'.format(loop+1) + + #Derive sky model radius for outliers, assuming channel 0 (of SPW 0) is lowest frequency and therefore largest FWHM + if outlier_radius == 0.0 or outlier_radius == '': + SPW = check_spw(config_parser.parse_args()['config'],msmd) + low_freq = float(SPW.replace('*:','').split('~')[0]) * 1e6 #MHz to Hz + rads=1.025*qa.constants(v='c')['value']/low_freq/ msmd.antennadiameter()['0']['value'] + FWHM=qa.convert(qa.quantity(rads,'rad'),'deg')['value'] + sky_model_radius = 1.5*FWHM #degrees + logger.warning('Using calculated search radius of {0:.1f} degrees.'.format(sky_model_radius)) + else: + logger.info('Using preset search radius of {0} degrees'.format(outlier_radius)) + sky_model_radius = outlier_radius else: outlierfile = '' + msmd.done() + if not (type(threshold[loop]) is str and 'Jy' in threshold[loop]) and threshold[loop] > 1: if step in ['tclean','predict']: if os.path.exists(rmsfile): @@ -260,7 +281,7 @@ def get_selfcal_args(vis,loop,nloops,nterms,deconvolver,discard_nloops,calmode,o elif step == 'bdsf': thresh = threshold[loop] - return imbase,imagename,outimage,pixmask,rmsfile,caltable,prev_caltables,threshold,outlierfile,cfcache,thresh,maskfile,targetfield + return imbase,imagename,outimage,pixmask,rmsfile,caltable,prev_caltables,threshold,outlierfile,cfcache,thresh,maskfile,targetfield,sky_model_radius def rename_logs(logfile=''): @@ -283,6 +304,18 @@ def get_imaging_params(): taskvals, config = config_parser.parse_config(args['config']) params = taskvals['image'] params['vis'] = taskvals['data']['vis'] + params['keepmms'] = taskvals['crosscal']['keepmms'] + + #Rename the masks that were already used + if params['outlierfile'] != '' and os.path.exists(params['outlierfile']): + outliers=open(params['outlierfile']).read() + outlier_bases = re.findall(r'imagename=(.*)\n',outliers) + for name in outlier_bases: + mask = '{0}.mask'.format(name) + if os.path.exists(mask): + newname = '{0}.old'.format(mask) + logger.info('Re-using old mask for "{0}". Renaming "{1}" to "{2}" to avoid mask conflict.'.format(name,mask,newname)) + os.rename(mask,newname) return args,params @@ -308,7 +341,7 @@ def run_script(func,logfile=''): config_parser.overwrite_config(args['config'], conf_dict={'continue' : False}, conf_sec='run', sec_comment='# Internal variables for pipeline execution') if nspw > 1: for SPW in spw.split(','): - spw_config = '{0}/{1}'.format(SPW.replace('0:',''),args['config']) + spw_config = '{0}/{1}'.format(SPW.replace('*:',''),args['config']) config_parser.overwrite_config(spw_config, conf_dict={'continue' : False}, conf_sec='run', sec_comment='# Internal variables for pipeline execution') rename_logs(logfile) sys.exit(1) diff --git a/processMeerKAT/config_parser.py b/processMeerKAT/config_parser.py index ba61291..b01d42b 100755 --- a/processMeerKAT/config_parser.py +++ b/processMeerKAT/config_parser.py @@ -109,7 +109,7 @@ def parse_spw(filename): lowest = min(low) highest = max(high) - # Uncomment to use e.g. '*MHz' + # Uncomment to simply use e.g. '*MHz' # if all([i == unit[0] for i in unit]): # unit = unit[0] # dirs = '*{0}'.format(unit) diff --git a/processMeerKAT/crosscal_scripts/plotcal_spw.py b/processMeerKAT/crosscal_scripts/plotcal_spw.py index 7541e84..8c73297 100644 --- a/processMeerKAT/crosscal_scripts/plotcal_spw.py +++ b/processMeerKAT/crosscal_scripts/plotcal_spw.py @@ -7,7 +7,7 @@ import matplotlib # Agg doesn't need X - matplotlib doesn't work with xvfb -matplotlib.use('Agg', warn=False) +matplotlib.use('Agg') import matplotlib.pyplot as plt import numpy as np diff --git a/processMeerKAT/default_config.txt b/processMeerKAT/default_config.txt index 996a0a9..196af19 100644 --- a/processMeerKAT/default_config.txt +++ b/processMeerKAT/default_config.txt @@ -46,7 +46,7 @@ width = 1 # Number of channels to (further) average afte timeavg = '8s' # Time interval to average after calibration (during split) createmms = True # Create MMS (True) or MS (False) for cross-calibration during partition keepmms = True # Output MMS (True) or MS (False) during split -spw = '0:880~933MHz,0:960~1010MHz,0:1010~1060MHz,0:1060~1110MHz,0:1110~1163MHz,0:1299~1350MHz,0:1350~1400MHz,0:1400~1450MHz,0:1450~1500MHz,0:1500~1524MHz,0:1630~1680MHz' # Spectral window / frequencies to extract for MMS +spw = '*:880~933MHz,*:960~1010MHz,*:1010~1060MHz,*:1060~1110MHz,*:1110~1163MHz,*:1299~1350MHz,*:1350~1400MHz,*:1400~1450MHz,*:1450~1500MHz,*:1500~1524MHz,*:1630~1680MHz' # Spectral window / frequencies to extract for MMS nspw = 11 # Number of spectral windows to split into calcrefant = False # Calculate reference antenna in program (overwrites 'refant') refant = 'm059' # Reference antenna name / number @@ -75,6 +75,7 @@ flag = True # Flag residual column after selfcal? gaintype = 'G' # Use 'T' for polarisation on linear feeds (e.g. MeerKAT) discard_nloops = 0 # Discard this many selfcal solutions (e.g. from quick and dirty image) during subsequent loops (only considers when calmode !='') outlier_threshold = 0.0 # S/N values if >= 1.0, otherwise Jy +outlier_radius = 0.0 # Radius in degrees for identifying outliers in RACS [image] cell = '1.5arcsec' diff --git a/processMeerKAT/processMeerKAT.py b/processMeerKAT/processMeerKAT.py index 0282e6c..c168137 100755 --- a/processMeerKAT/processMeerKAT.py +++ b/processMeerKAT/processMeerKAT.py @@ -53,11 +53,12 @@ CONFIG = 'default_config.txt' TMP_CONFIG = '.config.tmp' MASTER_SCRIPT = 'submit_pipeline.sh' +SPW_PREFIX = '*:' #Set global values for field, crosscal and SLURM arguments copied to config file, and some of their default values FIELDS_CONFIG_KEYS = ['fluxfield','bpassfield','phasecalfield','targetfields','extrafields'] CROSSCAL_CONFIG_KEYS = ['minbaselines','chanbin','width','timeavg','createmms','keepmms','spw','nspw','calcrefant','refant','standard','badants','badfreqranges'] -SELFCAL_CONFIG_KEYS = ['nloops','loop','cell','robust','imsize','wprojplanes','niter','threshold','uvrange','nterms','gridder','deconvolver','solint','calmode','discard_nloops','gaintype','outlier_threshold','flag'] +SELFCAL_CONFIG_KEYS = ['nloops','loop','cell','robust','imsize','wprojplanes','niter','threshold','uvrange','nterms','gridder','deconvolver','solint','calmode','discard_nloops','gaintype','outlier_threshold','flag','outlier_radius'] IMAGING_CONFIG_KEYS = ['cell', 'robust', 'imsize', 'wprojplanes', 'niter', 'threshold', 'multiscale', 'nterms', 'gridder', 'deconvolver', 'restoringbeam', 'specmode', 'stokes', 'mask', 'rmsmap','outlierfile', 'pbthreshold'] SLURM_CONFIG_STR_KEYS = ['container','mpi_wrapper','partition','time','name','dependencies','exclude','account','reservation'] SLURM_CONFIG_KEYS = ['nodes','ntasks_per_node','mem','plane','submit','precal_scripts','postcal_scripts','scripts','verbose','modules'] + SLURM_CONFIG_STR_KEYS @@ -195,7 +196,7 @@ def parse_scripts(val): parser.add_argument("-n","--name", metavar="unique", required=False, type=str, default='', help="Unique name to give this pipeline run (e.g. 'run1_'), appended to the start of all job names. [default: ''].") parser.add_argument("-d","--dependencies", metavar="list", required=False, type=str, default='', help="Comma-separated list (without spaces) of SLURM job dependencies (only used when nspw=1). [default: ''].") parser.add_argument("-e","--exclude", metavar="nodes", required=False, type=str, default='', help="SLURM worker nodes to exclude [default: ''].") - parser.add_argument("-A","--account", metavar="group", required=False, type=str, default='b03-idia-ag', help="SLURM accounting group to use (e.g. 'b05-pipelines-ag' - check 'sacctmgr show user $USER cluster=ilifu-slurm20 -s format=account%%30,cluster%%15') [default: 'b03-idia-ag'].") + parser.add_argument("-A","--account", metavar="group", required=False, type=str, default='b03-idia-ag', help="SLURM accounting group to use (e.g. 'b05-pipelines-ag' - check 'sacctmgr show user $USER cluster=ilifu-slurm20 -s format=account%%30') [default: 'b03-idia-ag'].") parser.add_argument("-r","--reservation", metavar="name", required=False, type=str, default='', help="SLURM reservation to use. [default: ''].") parser.add_argument("-l","--local", action="store_true", required=False, default=False, help="Build config file locally (i.e. without calling srun) [default: False].") @@ -395,7 +396,7 @@ def write_command(script,args,name='job',mpi_wrapper=MPI_WRAPPER,container=CONTA arr=($SPWs) cd ${arr[SLURM_ARRAY_TASK_ID]} - """ % SPWs.replace(',',' ').replace('0:','') + """ % SPWs.replace(',',' ').replace(SPW_PREFIX,'') command += "{mpi_wrapper} singularity exec {container} {plot_call} {casa_call} {script} {args}".format(**params) @@ -576,7 +577,7 @@ def write_spw_master(filename,config,SPWs,precal_scripts,postcal_scripts,submit, master = open(filename,'w') master.write('#!/bin/bash\n') - SPWs = SPWs.replace('0:','') + SPWs = SPWs.replace(SPW_PREFIX,'') toplevel = len(precal_scripts + postcal_scripts) > 0 scripts = precal_scripts[:] @@ -618,7 +619,7 @@ def write_spw_master(filename,config,SPWs,precal_scripts,postcal_scripts,submit, extn = '_$DATE.sh' for i,spw in enumerate(SPWs.split(',')): - master.write('echo Running pipeline in directory "{0}" for spectral window 0:{0}\n'.format(spw)) + master.write('echo Running pipeline in directory "{1}" for spectral window {0}{1}\n'.format(SPW_PREFIX, spw)) master.write('cd {0}\n'.format(spw)) master.write('output=$({0} --config ./{1} --run --submit --quiet --justrun'.format(os.path.split(THIS_PROG)[1],config)) if partition: @@ -1164,7 +1165,7 @@ def format_args(config,submit,quiet,dependencies,justrun): #Ensure all keys exist in these sections kwargs = get_config_kwargs(config,'slurm',SLURM_CONFIG_KEYS) data_kwargs = get_config_kwargs(config,'data',['vis']) - get_config_kwargs(config, 'fields', FIELDS_CONFIG_KEYS) + field_kwargs = get_config_kwargs(config, 'fields', FIELDS_CONFIG_KEYS) crosscal_kwargs = get_config_kwargs(config, 'crosscal', CROSSCAL_CONFIG_KEYS) #Force submit=True if user has requested it during [-R --run] @@ -1193,8 +1194,12 @@ def format_args(config,submit,quiet,dependencies,justrun): logger.warning("Starting with loop={0}, which is only valid if previous loops were successfully run in this directory.".format(selfcal_kwargs['loop'])) #Find RACS outliers elif ((nspw > 1 and 'selfcal_part1.py' in [i[0] for i in kwargs['postcal_scripts']]) or (nspw == 1 and 'selfcal_part1.py' in [i[0] for i in kwargs['scripts']])) and selfcal_kwargs['outlier_threshold'] != 0 and selfcal_kwargs['outlier_threshold'] != '': + if selfcal_kwargs['outlier_radius'] != '' and selfcal_kwargs['outlier_radius'] != 0.0: + txt = 'within {0} degrees'.format(selfcal_kwargs['outlier_radius']) + else: + txt = 'within calculated search radius' logger.info('Populating sky model for selfcal using outlier_threshold={0}'.format(selfcal_kwargs['outlier_threshold'])) - logger.info('Querying Rapid ASAKP Continuum Survey (RACS) catalog within 2 degrees of target phase centre. Please allow a moment for this.') + logger.info('Querying Rapid ASAKP Continuum Survey (RACS) catalog around the target phase centre to identify outliers {0}. Please allow a moment for this.'.format(txt)) sky_model_kwargs = deepcopy(kwargs) sky_model_kwargs['partition'] = 'Devel' mpi_wrapper = srun(sky_model_kwargs, qos=True, time=2, mem=0) @@ -1268,7 +1273,7 @@ def format_args(config,submit,quiet,dependencies,justrun): #Write timestamp to this pipeline run kwargs['timestamp'] = datetime.now().strftime("%Y-%m-%d-%H-%M-%S") config_parser.overwrite_config(config, conf_dict={'timestamp' : "'{0}'".format(kwargs['timestamp'])}, conf_sec='run', sec_comment='# Internal variables for pipeline execution') - nspw = spw_split(spw, nspw, config, mem, crosscal_kwargs['badfreqranges'],kwargs['MS'],includes_partition, createmms = crosscal_kwargs['createmms']) + nspw = spw_split(spw, nspw, config, mem, crosscal_kwargs['badfreqranges'],kwargs['MS'],includes_partition, createmms = crosscal_kwargs['createmms'], fields=field_kwargs) config_parser.overwrite_config(config, conf_dict={'nspw' : "{0}".format(nspw)}, conf_sec='crosscal') #Pop script to calculate reference antenna if calcrefant=False. Assume it won't be in postcal scripts @@ -1339,21 +1344,13 @@ def get_spw_bounds(spw): if unit != 'MHz': logger.warning('Please use SPW unit "MHz", to ensure the best performance (e.g. not processing entirely flagged frequency ranges).') - # Can only do when using CASA - # if unit == '': - # msmd.open(MS) - # low_MHz = msmd.chanfreqs(0)[low] / 1e6 - # high_MHz = msmd.chanfreqs(0)[high] / 1e6 - # msmd.done() - # else: - # low_MHz=qa.convertfreq('{0}{1}'.format(low,unit),'MHz')['value'] - # high_MHz=qa.convertfreq('{0}{1}'.format(high,unit),'MHz')['value'] + else: return None return low,high,unit,func -def spw_split(spw,nspw,config,mem,badfreqranges,MS,partition,createmms=True,remove=True): +def spw_split(spw,nspw,config,mem,badfreqranges,MS,partition,createmms=True,remove=True,fields={}): """Split into N SPWs, placing an instance of the pipeline into N directories, each with 1 Nth of the bandwidth. @@ -1377,6 +1374,8 @@ def spw_split(spw,nspw,config,mem,badfreqranges,MS,partition,createmms=True,remo Create MMS as output? remove : bool, optional Remove SPWs completely encompassed by bad frequency ranges? + fields : dict, optional + Field names, so we can do some visname renaming hackery! Returns: -------- @@ -1393,7 +1392,7 @@ def spw_split(spw,nspw,config,mem,badfreqranges,MS,partition,createmms=True,remo #Remove SPWs entirely encompassed by bad frequency ranges (only for MHz unit) for i in range(len(lo)): - SPWs.append('0:{0}~{1}{2}'.format(func(lo[i]),func(hi[i]),unit)) + SPWs.append('{0}{1}~{2}{3}'.format(SPW_PREFIX, func(lo[i]),func(hi[i]),unit)) elif ',' in spw: SPWs = spw.split(',') @@ -1412,9 +1411,9 @@ def spw_split(spw,nspw,config,mem,badfreqranges,MS,partition,createmms=True,remo low,high = get_spw_bounds(SPWs[i])[0:2] if unit == 'MHz' and remove: for freq in badfreqranges: - bad_low,bad_high = get_spw_bounds('0:{0}'.format(freq))[0:2] + bad_low,bad_high = get_spw_bounds('{0}{1}'.format(SPW_PREFIX,freq))[0:2] if low >= bad_low and high <= bad_high: - logger.info("Won't process spw '0:{0}~{1}{2}', since it's completely encompassed by bad frequency range '{3}'.".format(low,high,unit,freq)) + logger.info("Won't process spw '{0}{1}~{2}{3}', since it's completely encompassed by bad frequency range '{3}'.".format(SPW_PREFIX,low,high,unit,freq)) badfreq = True break if badfreq: @@ -1429,9 +1428,9 @@ def spw_split(spw,nspw,config,mem,badfreqranges,MS,partition,createmms=True,remo #Create each spw as directory and place config in there logger.info("Making {0} directories for SPWs ({1}) and copying '{2}' to each of them.".format(nspw,SPWs,config)) for spw in SPWs: - spw_config = '{0}/{1}'.format(spw.replace('0:',''),config) - if not os.path.exists(spw.replace('0:','')): - os.mkdir(spw.replace('0:','')) + spw_config = '{0}/{1}'.format(spw.replace(SPW_PREFIX,''),config) + if not os.path.exists(spw.replace(SPW_PREFIX,'')): + os.mkdir(spw.replace(SPW_PREFIX,'')) copyfile(config, spw_config) config_parser.overwrite_config(spw_config, conf_dict={'spw' : "'{0}'".format(spw)}, conf_sec='crosscal') config_parser.overwrite_config(spw_config, conf_dict={'nspw' : 1}, conf_sec='crosscal') @@ -1446,7 +1445,14 @@ def spw_split(spw,nspw,config,mem,badfreqranges,MS,partition,createmms=True,remo basename, ext = os.path.splitext(MS.rstrip('/ ')) filebase = os.path.split(basename)[1] extn = 'mms' if createmms else 'ms' - vis = '{0}.{1}.{2}'.format(filebase,spw.replace('0:',''),extn) + + #Hack to rename vis if setting as specific field (e.g. as target field when running selfcal) + prefix,suffix = os.path.splitext(filebase) + if suffix[1:] != '' and suffix[1:] in fields.values(): + extn = '{0}.{1}'.format(suffix[1:],extn) + filebase = prefix + + vis = '{0}.{1}.{2}'.format(filebase,spw.replace(SPW_PREFIX,''),extn) logger.warning("Since script with 'partition' in its name isn't present in '{0}', assuming partition has already been done, and setting vis='{1}' in '{2}'. If '{1}' doesn't exist, please update '{2}', as the pipeline will not launch successfully.".format(config,vis,spw_config)) orig_vis = config_parser.get_key(spw_config, 'data', 'vis') config_parser.overwrite_config(spw_config, conf_dict={'orig_vis' : "'{0}'".format(orig_vis)}, conf_sec='run', sec_comment='# Internal variables for pipeline execution') diff --git a/processMeerKAT/read_ms.py b/processMeerKAT/read_ms.py index 080f89e..489df8c 100755 --- a/processMeerKAT/read_ms.py +++ b/processMeerKAT/read_ms.py @@ -10,7 +10,7 @@ import config_parser from casatasks import * -from casatools import msmetadata,table,measures +from casatools import msmetadata,table,measures,quanta logger = processMeerKAT.logger @@ -18,6 +18,7 @@ msmd = msmetadata() tb = table() me = measures() +qa = quanta() def get_fields(MS): @@ -232,7 +233,7 @@ def check_scans(MS,nodes,tasks,dopol): threads = {'nodes' : nodes, 'ntasks_per_node' : tasks} return threads -def check_spw(config): +def check_spw(config,msmd): """Check SPW bounds are within the SPW bounds of the MS. If not, output a warning and update the SPW. @@ -247,26 +248,41 @@ def check_spw(config): update = False low,high,unit,dirs = config_parser.parse_spw(config) - if type(low) is list: - low = low[0] - if type(high) is list: - high = high[-1] + lowest = np.min(low) + highest = np.max(high) nspw = msmd.nspw() - if nspw > 1: - logger.warning("Expected 1 SPW but found nspw={0}. Please manually edit 'spw' in '{1}'.".format(nspw,config)) + # if nspw > 1: + # logger.warning("Expected 1 SPW but found nspw={0}. Please manually edit 'spw' in '{1}'.".format(nspw,config)) + #Check SPW bounds overlap with MS, and convert to MHz explicitly, assuming first and last SPW contain lowest and highest frequencies ms_low = msmd.chanfreqs(0)[0] / 1e6 ms_high = msmd.chanfreqs(nspw-1)[-1] / 1e6 - if low < ms_low - 1: - low = int(round(ms_low+0.5)) + if type(unit) is list: + low_unit = unit[low.index(lowest)] + high_unit = unit[high.index(highest)] + else: + low_unit = unit + high_unit = unit + + #i.e. channel number + if low_unit == '': + low_MHz = msmd.chanfreqs(0)[low] / 1e6 + if high_unit == '': + high_MHz = msmd.chanfreqs(nspw-1)[high] / 1e6 + else: + low_MHz=qa.convertfreq('{0}{1}'.format(lowest,low_unit),'MHz')['value'] + high_MHz=qa.convertfreq('{0}{1}'.format(highest,high_unit),'MHz')['value'] + + if low_MHz < ms_low - 1: + low_MHz = int(ms_low) update = True - if high > ms_high + 1: - high = int(round(ms_high-0.5)) + if high_MHz > ms_high + 1: + high_MHz = int(round(ms_high+0.5)) update = True - SPW = '0:{0}~{1}MHz'.format(low,high) + SPW = '*:{0}~{1}MHz'.format(low_MHz,high_MHz) if update: logger.warning('Default SPW outside SPW of input MS ({0}~{1}MHz). Forcing SPW={2}'.format(ms_low,ms_high,SPW)) @@ -392,7 +408,7 @@ def main(): check_refant(args.MS, refant, args.config, warn=True) threads = check_scans(args.MS,args.nodes,args.ntasks_per_node,dopol) - SPW = check_spw(args.config) + SPW = check_spw(args.config,msmd) config_parser.overwrite_config(args.config, conf_dict={'dopol' : dopol}, conf_sec='run', sec_comment='# Internal variables for pipeline execution') config_parser.overwrite_config(args.config, conf_dict=threads, conf_sec='slurm') diff --git a/processMeerKAT/science_image.py b/processMeerKAT/science_image.py index 0f43ccf..e592d04 100644 --- a/processMeerKAT/science_image.py +++ b/processMeerKAT/science_image.py @@ -94,10 +94,11 @@ def do_pb_corr(inpimage, pbthreshold=0): ia.close() -def science_image(vis, cell, robust, imsize, wprojplanes, niter, threshold, multiscale, nterms, gridder, deconvolver, restoringbeam, specmode, stokes, mask, rmsmap, outlierfile, pbthreshold): +def science_image(vis, cell, robust, imsize, wprojplanes, niter, threshold, multiscale, nterms, gridder, deconvolver, restoringbeam, specmode, stokes, mask, rmsmap, outlierfile, keepmms, pbthreshold): visbase = os.path.split(vis.rstrip('/ '))[1] # Get only vis name, not entire path - imagename = visbase.replace('.mms', '.science_image') # Images will be produced in $CWD + extn = '.ms' if keepmms==False else '.mms' + imagename = visbase.replace(extn, '.science_image') # Images will be produced in $CWD if not (type(threshold) is str and 'Jy' in threshold) and threshold > 1 and os.path.exists(rmsmap): stats = imstat(imagename=rmsmap) diff --git a/processMeerKAT/selfcal_scripts/selfcal_part1.py b/processMeerKAT/selfcal_scripts/selfcal_part1.py index a667941..cd57a69 100644 --- a/processMeerKAT/selfcal_scripts/selfcal_part1.py +++ b/processMeerKAT/selfcal_scripts/selfcal_part1.py @@ -41,10 +41,10 @@ def symlink_psf(imagenames,loop): return False -def selfcal_part1(vis, refant, dopol, nloops, loop, cell, robust, imsize, wprojplanes, niter, threshold, - uvrange, nterms, gridder, deconvolver, solint, calmode, discard_nloops, gaintype, outlier_threshold, flag): +def selfcal_part1(vis, refant, dopol, nloops, loop, cell, robust, imsize, wprojplanes, niter, threshold, uvrange, nterms, + gridder, deconvolver, solint, calmode, discard_nloops, gaintype, outlier_threshold, outlier_radius, flag): - imbase,imagename,outimage,pixmask,rmsfile,caltable,prev_caltables,threshold,outlierfile,cfcache,_,_,_ = bookkeeping.get_selfcal_args(vis,loop,nloops,nterms,deconvolver,discard_nloops,calmode,outlier_threshold,threshold,step='tclean') + imbase,imagename,outimage,pixmask,rmsfile,caltable,prev_caltables,threshold,outlierfile,cfcache,_,_,_,_ = bookkeeping.get_selfcal_args(vis,loop,nloops,nterms,deconvolver,discard_nloops,calmode,outlier_threshold,outlier_radius,threshold,step='tclean') calcpsf = True #Add model column with MPI rather than in selfcal_part2 without MPI. diff --git a/processMeerKAT/selfcal_scripts/selfcal_part2.py b/processMeerKAT/selfcal_scripts/selfcal_part2.py index a12667d..b3da864 100644 --- a/processMeerKAT/selfcal_scripts/selfcal_part2.py +++ b/processMeerKAT/selfcal_scripts/selfcal_part2.py @@ -34,9 +34,9 @@ logging.basicConfig(format="%(asctime)-15s %(levelname)s: %(message)s", level=logging.INFO) def selfcal_part2(vis, refant, dopol, nloops, loop, cell, robust, imsize, wprojplanes, niter, threshold, uvrange, - nterms, gridder, deconvolver, solint, calmode, discard_nloops, gaintype, outlier_threshold, flag): + nterms, gridder, deconvolver, solint, calmode, discard_nloops, gaintype, outlier_threshold, outlier_radius, flag): - imbase,imagename,outimage,pixmask,rmsfile,caltable,prev_caltables,threshold,outlierfile,cfcache,_,_,_ = bookkeeping.get_selfcal_args(vis,loop,nloops,nterms,deconvolver,discard_nloops,calmode,outlier_threshold,threshold,step='predict') + imbase,imagename,outimage,pixmask,rmsfile,caltable,prev_caltables,threshold,outlierfile,cfcache,_,_,_,_ = bookkeeping.get_selfcal_args(vis,loop,nloops,nterms,deconvolver,discard_nloops,calmode,outlier_threshold,outlier_radius,threshold,step='predict') if calmode[loop] != '': if os.path.exists(caltable): @@ -94,17 +94,17 @@ def pybdsf(imbase,rmsfile,imagename,outimage,thresh,maskfile,cat,trim_box=None,w img.export_image(outfile=rmsfile, img_type='rms', img_format='casa', clobber=True) def find_outliers(vis, refant, dopol, nloops, loop, cell, robust, imsize, wprojplanes, niter, threshold, uvrange, nterms, - gridder, deconvolver, solint, calmode, discard_nloops, gaintype, outlier_threshold, flag, step): + gridder, deconvolver, solint, calmode, discard_nloops, gaintype, outlier_threshold, outlier_radius, flag, step): local = locals() local.pop('step') - imbase,imagename,outimage,pixmask,rmsfile,caltable,prev_caltables,threshold,outlierfile,cfcache,thresh,maskfile,targetfield = bookkeeping.get_selfcal_args(vis,loop,nloops,nterms,deconvolver,discard_nloops,calmode,outlier_threshold,threshold,step) + imbase,imagename,outimage,pixmask,rmsfile,caltable,prev_caltables,threshold,outlierfile,cfcache,thresh,maskfile,targetfield,sky_model_radius = bookkeeping.get_selfcal_args(vis,loop,nloops,nterms,deconvolver,discard_nloops,calmode,outlier_threshold,outlier_radius,threshold,step) cat = imagename + ".catalog.fits" outlierfile_all = 'outliers.txt' fitsname = imagename + '.fits' outlier_imsize = 128 outlier_snr = 5 - sky_model_radius = 2 #degrees + outlier_min_flux = 1e-3 # 1 mJy if step != 'sky': pybdsf(imbase,rmsfile,imagename,outimage,thresh,maskfile,cat) @@ -116,14 +116,15 @@ def find_outliers(vis, refant, dopol, nloops, loop, cell, robust, imsize, wprojp #Take sky positions from RACS if step == 'sky': - from astroquery.utils.tap.core import TapPlus - from astropy.table import vstack + #from astroquery.utils.tap.core import TapPlus + #from astropy.table import vstack #Open MS and extract first target centre; use first sub-MS for speed if MMS if os.path.exists('{0}/SUBMSS'.format(vis)): tmpvis = glob.glob('{0}/SUBMSS/*'.format(vis))[0] else: tmpvis = vis + msmd.open(tmpvis) dir=msmd.sourcedirs()[str(targetfield)] ra=qa.convert(dir['m0'],'deg')['value'] @@ -141,23 +142,23 @@ def find_outliers(vis, refant, dopol, nloops, loop, cell, robust, imsize, wprojp outlier_threshold *= 1e3 #convert from Jy to mJy #Extract RACS positions within sky_model_radius - try: - casdatap = TapPlus(url="https://casda.csiro.au/casda_vo_tools/tap") - query = "SELECT * FROM AS110.racs_dr1_sources_galacticcut_v2021_08_v01 where 1=CONTAINS(POINT('ICRS', ra, dec),CIRCLE('ICRS',{0},{1},{2}))".format(ra,dec,sky_model_radius) - job = casdatap.launch_job_async(query) - galcut = job.get_results() - job = casdatap.launch_job_async(query.replace('cut','region')) - galregion = job.get_results() - RACS = vstack([galcut,galregion],join_type='exact') - RACS.write(cat,overwrite=True) - except: - RACS = '{0}/RACS.fits.gz'.format(processMeerKAT.SCRIPT_DIR) - tmp = fits.open(RACS) - all_positions = SkyCoord(ra=tmp[1].data[racol],dec=tmp[1].data[deccol],unit='deg,deg') - tmp[1].data = tmp[1].data[phasecenter.separation(all_positions) < Quantity(sky_model_radius,'deg')] - tmp.writeto(cat,overwrite=True) - tmp.close() - del all_positions #Don't store millions of positions beyond here + # try: + # casdatap = TapPlus(url="https://casda.csiro.au/casda_vo_tools/tap") + # query = "SELECT * FROM AS110.racs_dr1_sources_galacticcut_v2021_08_v01 where 1=CONTAINS(POINT('ICRS', ra, dec),CIRCLE('ICRS',{0},{1},{2}))".format(ra,dec,sky_model_radius) + # job = casdatap.launch_job_async(query) + # galcut = job.get_results() + # job = casdatap.launch_job_async(query.replace('cut','region')) + # galregion = job.get_results() + # RACS = vstack([galcut,galregion],join_type='exact') + # RACS.write(cat,overwrite=True) + # except: + RACS = '{0}/RACS.fits.gz'.format(processMeerKAT.SCRIPT_DIR) + tmp = fits.open(RACS) + all_positions = SkyCoord(ra=tmp[1].data[racol],dec=tmp[1].data[deccol],unit='deg,deg') + tmp[1].data = tmp[1].data[phasecenter.separation(all_positions) < Quantity(sky_model_radius,'deg')] + tmp.writeto(cat,overwrite=True) + tmp.close() + del all_positions #Don't store millions of positions beyond here index = loop @@ -306,6 +307,9 @@ def find_outliers(vis, refant, dopol, nloops, loop, cell, robust, imsize, wprojp tab=fits.open(outlier_cat) data = tab[1].data tab.close() + if np.sum(data['Total_flux']) < outlier_min_flux: + logger.warning('Total flux in catalogue for "{0}_outlier{1}" < {2} Jy. Excluding outlier.'.format(imbase%(index),i,outlier_min_flux)) + continue cat_positions = SkyCoord(ra=data['RA'],dec=data['Dec'],unit='deg,deg') if brightest: row = np.where(data['Total_flux'] == np.max(data['Total_flux']))[0][0] @@ -313,8 +317,10 @@ def find_outliers(vis, refant, dopol, nloops, loop, cell, robust, imsize, wprojp row,_,_ = pos.match_to_catalog_sky(cat_positions) phasecenter = 'J2000 {0}'.format(cat_positions[row].to_string('hmsdms')) else: - logger.warning("PyBDSF catalogue '{0}' not created. Using old position and mask.".format(outlier_cat)) - mask = 'mask={0}'.format(pixmask) + logger.warning("PyBDSF catalogue '{0}' not created. Excluding outlier.".format(outlier_cat)) + continue + #Using old position and mask.".format(outlier_cat)) + #mask = 'mask={0}'.format(pixmask) out.write(""" imagename={0}_outlier{1} @@ -342,9 +348,9 @@ def find_outliers(vis, refant, dopol, nloops, loop, cell, robust, imsize, wprojp return rmsfile,outlierfile def mask_image(vis, refant, dopol, nloops, loop, cell, robust, imsize, wprojplanes, niter, threshold, uvrange, nterms, gridder, - deconvolver, solint, calmode, discard_nloops, gaintype, outlier_threshold, flag, outlier_base='', outlier_image=''): + deconvolver, solint, calmode, discard_nloops, gaintype, outlier_threshold, outlier_radius, flag, outlier_base='', outlier_image=''): - imbase,imagename,outimage,pixmask,rmsfile,caltable,prev_caltables,threshold,outlierfile,cfcache,thresh,maskfile,_ = bookkeeping.get_selfcal_args(vis,loop,nloops,nterms,deconvolver,discard_nloops,calmode,outlier_threshold,threshold,step='mask') + imbase,imagename,outimage,pixmask,rmsfile,caltable,prev_caltables,threshold,outlierfile,cfcache,thresh,maskfile,_,_ = bookkeeping.get_selfcal_args(vis,loop,nloops,nterms,deconvolver,discard_nloops,calmode,outlier_threshold,outlier_radius,threshold,step='mask') if outlier_base != '': maskfile = outlier_base + '.islmask' From a47888511881584cb340fd571a58dbc8345b5ca6 Mon Sep 17 00:00:00 2001 From: Jordan Collier Date: Fri, 17 Jun 2022 09:08:50 +0200 Subject: [PATCH 16/21] Bugfix for `*` SPW selection --- processMeerKAT/crosscal_scripts/partition.py | 2 +- processMeerKAT/processMeerKAT.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/processMeerKAT/crosscal_scripts/partition.py b/processMeerKAT/crosscal_scripts/partition.py index df1d2c3..bda4675 100644 --- a/processMeerKAT/crosscal_scripts/partition.py +++ b/processMeerKAT/crosscal_scripts/partition.py @@ -57,7 +57,7 @@ def main(args,taskvals): low,high,unit,dirs = config_parser.parse_spw(args['config']) spwname = '{0:.0f}~{1:.0f}MHz'.format(min(low),max(high)) else: - spwname = spw.replace('0:','') + spwname = spw.replace('*:','') msmd.open(visname) npol = msmd.ncorrforpol()[0] diff --git a/processMeerKAT/processMeerKAT.py b/processMeerKAT/processMeerKAT.py index c168137..0b85bae 100755 --- a/processMeerKAT/processMeerKAT.py +++ b/processMeerKAT/processMeerKAT.py @@ -712,7 +712,7 @@ def write_spw_master(filename,config,SPWs,precal_scripts,postcal_scripts,submit, logger.info('Running master script "{0}"'.format(filename)) os.system('./{0}'.format(filename)) else: - logger.info('Master script "{0}" written, but will not run.'.format(filename)) + logger.info('Master script "{0}" written in CWD, but will not run.'.format(filename)) def write_master(filename,config,scripts=[],submit=False,dir='jobScripts',pad_length=5,verbose=False, echo=True, dependencies='',slurm_kwargs={}): From 30f4e3a2c75c6650169e15c0f9f0df7d7c2eea0c Mon Sep 17 00:00:00 2001 From: Srikrishna Sekhar Date: Mon, 27 Jun 2022 15:15:20 +0200 Subject: [PATCH 17/21] Bugfix to selfcal_part_2 --- processMeerKAT/bookkeeping.py | 1 + 1 file changed, 1 insertion(+) diff --git a/processMeerKAT/bookkeeping.py b/processMeerKAT/bookkeeping.py index cb3b2c1..1fd3e09 100644 --- a/processMeerKAT/bookkeeping.py +++ b/processMeerKAT/bookkeeping.py @@ -266,6 +266,7 @@ def get_selfcal_args(vis,loop,nloops,nterms,deconvolver,discard_nloops,calmode,o sky_model_radius = outlier_radius else: outlierfile = '' + sky_model_radius = 0.0 msmd.done() From deedd9f0eee34fa4c55ff37ef8e329aed1da9072 Mon Sep 17 00:00:00 2001 From: Jordan Collier Date: Tue, 28 Jun 2022 16:47:26 +0200 Subject: [PATCH 18/21] Bugfix for when target field in basename --- processMeerKAT/aux_scripts/concat.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/processMeerKAT/aux_scripts/concat.py b/processMeerKAT/aux_scripts/concat.py index 231c3ed..3073582 100644 --- a/processMeerKAT/aux_scripts/concat.py +++ b/processMeerKAT/aux_scripts/concat.py @@ -83,7 +83,7 @@ def do_concat(visname, fields, dirs='*MHz'): fname = msmd.namesforfields(int(fname))[0] #Concat tt0 images (into continuum cube) - suffix = 'images/*{0}*image.tt0'.format(fname) + suffix = 'images/*.{0}*image.tt0'.format(fname) files,pattern = get_infiles(dirs,suffix) out = '{0}.{1}.contcube'.format(filebase,fname) images = check_output(fname,files,pattern,out,job='imageconcat',filetype='image') @@ -100,7 +100,7 @@ def do_concat(visname, fields, dirs='*MHz'): logger.error("Output image '{0}' attempted to write but was not written.".format(out)) #Concat images (into continuum cube) - suffix = 'images/*{0}*image'.format(fname) + suffix = 'images/*.{0}*image'.format(fname) files,pattern = get_infiles(dirs,suffix) out = '{0}.{1}.contcube'.format(filebase,fname) images = check_output(fname,files,pattern,out,job='imageconcat',filetype='image') @@ -117,7 +117,7 @@ def do_concat(visname, fields, dirs='*MHz'): logger.error("Output image '{0}' attempted to write but was not written.".format(out)) #Concat MSs - suffix = '*{0}*.ms'.format(fname) + suffix = '*.{0}*.ms'.format(fname) files,pattern = get_infiles(dirs,suffix) out = '{0}.{1}.ms'.format(filebase,fname) MSs = check_output(fname,files,pattern,out,job='concat',filetype='MS') @@ -133,7 +133,7 @@ def do_concat(visname, fields, dirs='*MHz'): logger.error("Output MS '{0}' attempted to write but was not written.".format(out)) #Concat MMSs - suffix = '*{0}*.mms'.format(fname) + suffix = '*.{0}*.mms'.format(fname) files,pattern = get_infiles(dirs,suffix) out = '{0}.{1}.mms'.format(filebase,fname) MMSs = check_output(fname,files,pattern,out,job='virtualconcat',filetype='MMS') From 23cbf8b2fb704060a4601be2a27958fe99dfb1a2 Mon Sep 17 00:00:00 2001 From: "Thomson, Alec (CASS, Kensington)" Date: Sat, 2 Jul 2022 16:19:19 +1000 Subject: [PATCH 19/21] Add spw prefix --- processMeerKAT/known_hpc.cfg | 1 + 1 file changed, 1 insertion(+) diff --git a/processMeerKAT/known_hpc.cfg b/processMeerKAT/known_hpc.cfg index 114a2a6..574a856 100644 --- a/processMeerKAT/known_hpc.cfg +++ b/processMeerKAT/known_hpc.cfg @@ -15,6 +15,7 @@ CONFIG = 'default_config.txt' TMP_CONFIG = '.config.tmp' MASTER_SCRIPT = 'submit_pipeline.sh' + SPW_PREFIX = '*:' PARTITION = 'Main' MODULES = ['openmpi/2.1.1'] QOS = 'qos-interactive' From 767e6a76bbef44e88f5483fac87a43a1c88041ed Mon Sep 17 00:00:00 2001 From: "Thomson, Alec (CASS, Kensington)" Date: Sat, 2 Jul 2022 16:26:22 +1000 Subject: [PATCH 20/21] Fix overdetermined args --- processMeerKAT/processMeerKAT.py | 103 ++++++++++++++++++------------- 1 file changed, 60 insertions(+), 43 deletions(-) diff --git a/processMeerKAT/processMeerKAT.py b/processMeerKAT/processMeerKAT.py index 37d3910..cbc73f9 100755 --- a/processMeerKAT/processMeerKAT.py +++ b/processMeerKAT/processMeerKAT.py @@ -149,49 +149,35 @@ def parse_scripts(val): - parser.add_argument("--hpc",metavar='name', required=False, type=str, default="ilifu", help="Name of hpc facility being used. If not known to processMeerKAT/known_hpc.cfg slurm limits are functionally removed [default: ilifu].") - # Read in parser default values according to --cluster parameter - args, unknown = parser.parse_known_args() - HPC_NAME = args.hpc.lower() if args.hpc in KNOWN_HPCS.keys() else "unknown" - HPC_DEFAULTS = KNOWN_HPCS[HPC_NAME] + parser.add_argument("--hpc",metavar='name', required=False, type=str, default="unkown", help="Name of hpc facility being used. If not known to processMeerKAT/known_hpc.cfg slurm limits are functionally removed [default: ilifu].") - parser.add_argument("-M","--MS",metavar="path", required=False, type=str, help="Path to MeasurementSet.") - parser.add_argument("-C","--config",metavar="config", default=HPC_DEFAULTS['CONFIG'.lower()], required=False, type=str, help="Relative (not absolute) path to config file.") - args, unknown = parser.parse_known_args() - # Extract hpc name used during build and warn if not the same as CLI hpc - config_dict, config = config_parser.parse_config(args.config) - if config.has_option('run', 'hpc'): - config_hpc_name = config['run']['hpc'].strip("'") - else: - config_hpc_name = HPC_NAME - if HPC_NAME != config_hpc_name: - msg = "Entered hpc ({HPC_NAME}) is not the same as was used to build this pipeline instance! '{config_hpc_name}' was used. Pipeline may not function as expected. Please consider rebuilding pipeline with correct hpc selection." - logger.warning(msg.format(HPC_NAME=HPC_NAME, config_hpc_name=config_hpc_name)) + parser.add_argument("-M","--MS",metavar="path", required=False, type=str, help="Path to MeasurementSet.") + parser.add_argument("-C","--config",metavar="config", default=None, required=False, type=str, help="Relative (not absolute) path to config file.") # Parse in remaining arguments - parser.add_argument("-N","--nodes",metavar="num", required=False, type=int, default=1, - help="Use this number of nodes [default: 1; max: {0}].".format(HPC_DEFAULTS['TOTAL_NODES_LIMIT'.lower()])) - parser.add_argument("-t","--ntasks-per-node", metavar="num", required=False, type=int, default=8, - help="Use this number of tasks (per node) [default: 16; max: {0}].".format(HPC_DEFAULTS['NTASKS_PER_NODE_LIMIT'.lower()])) + parser.add_argument("-N","--nodes",metavar="num", required=False, type=int, default=None, + help="Use this number of nodes [default: 1].") + parser.add_argument("-t","--ntasks-per-node", metavar="num", required=False, type=int, default=None, + help="Use this number of tasks (per node) [default: 8]") parser.add_argument("-D","--plane", metavar="num", required=False, type=int, default=1, help="Distribute tasks of this block size before moving onto next node [default: 1; max: ntasks-per-node].") - parser.add_argument("-m","--mem", metavar="num", required=False, type=int, default=HPC_DEFAULTS['MEM_PER_NODE_GB_LIMIT'.lower()], - help="Use this many GB of memory (per node) for threadsafe scripts [default: {0}; max: {0}].".format(HPC_DEFAULTS['MEM_PER_NODE_GB_LIMIT'.lower()])) - parser.add_argument("-p","--partition", metavar="name", required=False, type=str, default=HPC_DEFAULTS['PARTITION'.lower()], help="SLURM partition to use [default: 'Main'].") + parser.add_argument("-m","--mem", metavar="num", required=False, type=int, default=None, + help="Use this many GB of memory (per node) for threadsafe scripts [default: {0}; max: {0}].") + parser.add_argument("-p","--partition", metavar="name", required=False, type=str, default=None, help="SLURM partition to use [default: 'Main'].") parser.add_argument("-T","--time", metavar="time", required=False, type=str, default="12:00:00", help="Time limit to use for all jobs, in the form d-hh:mm:ss [default: '12:00:00'].") - parser.add_argument("-S","--scripts", action='append', nargs=3, metavar=('script','threadsafe','container'), required=False, type=parse_scripts, default=HPC_DEFAULTS['SCRIPTS'.lower()], + parser.add_argument("-S","--scripts", action='append', nargs=3, metavar=('script','threadsafe','container'), required=False, type=parse_scripts, default=None, help="Run pipeline with these scripts, in this order, using these containers (3rd value - empty string to default to [-c --container]). Is it threadsafe (2nd value)?") - parser.add_argument("-b","--precal_scripts", action='append', nargs=3, metavar=('script','threadsafe','container'), required=False, type=parse_scripts, default=HPC_DEFAULTS['PRECAL_SCRIPTS'.lower()], help="Same as [-S --scripts], but run before calibration.") - parser.add_argument("-a","--postcal_scripts", action='append', nargs=3, metavar=('script','threadsafe','container'), required=False, type=parse_scripts, default=HPC_DEFAULTS['POSTCAL_SCRIPTS'.lower()], help="Same as [-S --scripts], but run after calibration.") - parser.add_argument("--modules", nargs='*', metavar='module', required=False, default=HPC_DEFAULTS['MODULES'.lower()], help="Load these modules within each sbatch script.") - parser.add_argument("-w","--mpi_wrapper", metavar="path", required=False, type=str, default=HPC_DEFAULTS['MPI_WRAPPER'.lower()], - help="Use this mpi wrapper when calling threadsafe scripts [default: '{0}'].".format(HPC_DEFAULTS['MPI_WRAPPER'.lower()])) - parser.add_argument("-c","--container", metavar="path", required=False, type=str, default=HPC_DEFAULTS['CONTAINER'.lower()], help="Use this container when calling scripts [default: '{0}'].".format(HPC_DEFAULTS['CONTAINER'.lower()])) + parser.add_argument("-b","--precal_scripts", action='append', nargs=3, metavar=('script','threadsafe','container'), required=False, type=parse_scripts, default=None, help="Same as [-S --scripts], but run before calibration.") + parser.add_argument("-a","--postcal_scripts", action='append', nargs=3, metavar=('script','threadsafe','container'), required=False, type=parse_scripts, default=None, help="Same as [-S --scripts], but run after calibration.") + parser.add_argument("--modules", nargs='*', metavar='module', required=False, default=None, help="Load these modules within each sbatch script.") + parser.add_argument("-w","--mpi_wrapper", metavar="path", required=False, type=str, default=None, + help="Use this mpi wrapper when calling threadsafe scripts [default: ].") + parser.add_argument("-c","--container", metavar="path", required=False, type=str, default=None, help="Use this container when calling scripts [default:].") parser.add_argument("-n","--name", metavar="unique", required=False, type=str, default='', help="Unique name to give this pipeline run (e.g. 'run1_'), appended to the start of all job names. [default: ''].") parser.add_argument("-d","--dependencies", metavar="list", required=False, type=str, default='', help="Comma-separated list (without spaces) of SLURM job dependencies (only used when nspw=1). [default: ''].") parser.add_argument("-e","--exclude", metavar="nodes", required=False, type=str, default='', help="SLURM worker nodes to exclude [default: ''].") - parser.add_argument("-A","--account", metavar="group", required=False, type=str, default=HPC_DEFAULTS['ACCOUNTS'.lower()][0], help="SLURM accounting group to use (e.g. 'b05-pipelines-ag' - check 'sacctmgr show user $USER cluster=ilifu-slurm20 -s format=account%%30,cluster%%15') [default: 'b03-idia-ag'].") + parser.add_argument("-A","--account", metavar="group", required=False, type=str, default=None, help="SLURM accounting group to use (e.g. 'b05-pipelines-ag' - check 'sacctmgr show user $USER cluster=ilifu-slurm20 -s format=account%%30,cluster%%15') [default: 'b03-idia-ag'].") parser.add_argument("-r","--reservation", metavar="name", required=False, type=str, default='', help="SLURM reservation to use. [default: ''].") parser.add_argument("-l","--local", action="store_true", required=False, default=False, help="Build config file locally (i.e. without calling srun) [default: False].") @@ -213,6 +199,37 @@ def parse_scripts(val): args, unknown = parser.parse_known_args() + HPC_NAME = args.hpc.lower() if args.hpc in KNOWN_HPCS.keys() else "unknown" + logger.info(HPC_NAME) + HPC_DEFAULTS = KNOWN_HPCS[HPC_NAME] + # Hash table to lookup in config file + arg_hash = { + "nodes": "total_nodes_limit", + "ntasks_per_node": "ntasks_per_node_limit", + "mem": "mem_per_node_gb_limit", + "account": "accounts" + } + # Set arg values from config + for arg, val in vars(args).items(): + if val is None: + if arg in HPC_DEFAULTS.keys(): + hpc_val = HPC_DEFAULTS[arg] + logger.info(f"Setting option '{arg}' to '{hpc_val}' from HPC '{HPC_NAME}' config.") + vars(args)[arg] = hpc_val + elif arg in arg_hash.keys(): + hpc_val = HPC_DEFAULTS[arg_hash[arg]][0] if arg=="account" else HPC_DEFAULTS[arg_hash[arg]] + logger.info(f"Setting option '{arg}' to '{hpc_val}' from HPC '{HPC_NAME}' config.") + vars(args)[arg] = hpc_val + # Extract hpc name used during build and warn if not the same as CLI hpc + config_dict, config = config_parser.parse_config(args.config) + if config.has_option('run', 'hpc'): + config_hpc_name = config['run']['hpc'].strip("'") + else: + config_hpc_name = HPC_NAME + if HPC_NAME != config_hpc_name: + msg = "Entered hpc ({HPC_NAME}) is not the same as was used to build this pipeline instance! '{config_hpc_name}' was used. Pipeline may not function as expected. Please consider rebuilding pipeline with correct hpc selection." + logger.warning(msg.format(HPC_NAME=HPC_NAME, config_hpc_name=config_hpc_name)) + if len(unknown) > 0: parser.error('Unknown input argument(s) present - {0}'.format(unknown)) @@ -414,7 +431,7 @@ def write_command(script,args,mpi_wrapper,container,name='job',casa_script=False arr=($SPWs) cd ${arr[SLURM_ARRAY_TASK_ID]} - """ % SPWs.replace(',',' ').replace(SPW_PREFIX,'') + """ % SPWs.replace(',',' ').replace(HPC_DEFAULTS["SPW_PREFIX".lower()],'') command += "{mpi_wrapper} singularity exec {path_binding}{container} {plot_call} {casa_call} {script} {args}{argument_calls}".format(**params) @@ -582,7 +599,7 @@ def write_spw_master(filename,config,args,SPWs,precal_scripts,postcal_scripts,su master = open(filename,'w') master.write('#!/bin/bash\n') - SPWs = SPWs.replace(SPW_PREFIX,'') + SPWs = SPWs.replace(HPC_DEFAULTS["SPW_PREFIX".lower()],'') toplevel = len(precal_scripts + postcal_scripts) > 0 scripts = precal_scripts[:] @@ -643,7 +660,7 @@ def write_spw_master(filename,config,args,SPWs,precal_scripts,postcal_scripts,su argument_calls += " --quiet" for i,spw in enumerate(SPWs.split(',')): - master.write('echo Running pipeline in directory "{1}" for spectral window {0}{1}\n'.format(SPW_PREFIX, spw)) + master.write('echo Running pipeline in directory "{1}" for spectral window {0}{1}\n'.format(HPC_DEFAULTS["SPW_PREFIX".lower()], spw)) master.write('cd {0}\n'.format(spw)) master.write('output=$({0} --config ./{config} --run --submit --justrun {argument_calls}'.format(os.path.split(THIS_PROG)[1], config=config, argument_calls=argument_calls)) if partition: @@ -1242,7 +1259,7 @@ def format_args(config,submit,quiet,dependencies,justrun): SLURM_CONFIG_KEYS = HPC_DEFAULTS['SLURM_CONFIG_KEYS_BASE'.lower()]+HPC_DEFAULTS['SLURM_CONFIG_STR_KEYS'.lower()] kwargs = get_config_kwargs(config,'slurm',SLURM_CONFIG_KEYS) data_kwargs = get_config_kwargs(config,'data',['vis']) - get_config_kwargs(config, 'fields', HPC_DEFAULTS['FIELDS_CONFIG_KEYS'.lower()]) + field_kwargs = get_config_kwargs(config, 'fields', HPC_DEFAULTS['FIELDS_CONFIG_KEYS'.lower()]) crosscal_kwargs = get_config_kwargs(config, 'crosscal', HPC_DEFAULTS['CROSSCAL_CONFIG_KEYS'.lower()]) #Force submit=True if user has requested it during [-R --run] @@ -1473,7 +1490,7 @@ def spw_split(spw,nspw,config,mem,badfreqranges,MS,partition,createmms=True,remo #Remove SPWs entirely encompassed by bad frequency ranges (only for MHz unit) for i in range(len(lo)): - SPWs.append('{0}{1}~{2}{3}'.format(SPW_PREFIX, func(lo[i]),func(hi[i]),unit)) + SPWs.append('{0}{1}~{2}{3}'.format(HPC_DEFAULTS["SPW_PREFIX".lower()], func(lo[i]),func(hi[i]),unit)) elif ',' in spw: SPWs = spw.split(',') @@ -1492,9 +1509,9 @@ def spw_split(spw,nspw,config,mem,badfreqranges,MS,partition,createmms=True,remo low,high = get_spw_bounds(SPWs[i])[0:2] if unit == 'MHz' and remove: for freq in badfreqranges: - bad_low,bad_high = get_spw_bounds('{0}{1}'.format(SPW_PREFIX,freq))[0:2] + bad_low,bad_high = get_spw_bounds('{0}{1}'.format(HPC_DEFAULTS["SPW_PREFIX".lower()],freq))[0:2] if low >= bad_low and high <= bad_high: - logger.info("Won't process spw '{0}{1}~{2}{3}', since it's completely encompassed by bad frequency range '{3}'.".format(SPW_PREFIX,low,high,unit,freq)) + logger.info("Won't process spw '{0}{1}~{2}{3}', since it's completely encompassed by bad frequency range '{3}'.".format(HPC_DEFAULTS["SPW_PREFIX".lower()],low,high,unit,freq)) badfreq = True break if badfreq: @@ -1509,9 +1526,9 @@ def spw_split(spw,nspw,config,mem,badfreqranges,MS,partition,createmms=True,remo #Create each spw as directory and place config in there logger.info("Making {0} directories for SPWs ({1}) and copying '{2}' to each of them.".format(nspw,SPWs,config)) for spw in SPWs: - spw_config = '{0}/{1}'.format(spw.replace(SPW_PREFIX,''),config) - if not os.path.exists(spw.replace(SPW_PREFIX,'')): - os.mkdir(spw.replace(SPW_PREFIX,'')) + spw_config = '{0}/{1}'.format(spw.replace(HPC_DEFAULTS["SPW_PREFIX".lower()],''),config) + if not os.path.exists(spw.replace(HPC_DEFAULTS["SPW_PREFIX".lower()],'')): + os.mkdir(spw.replace(HPC_DEFAULTS["SPW_PREFIX".lower()],'')) copyfile(config, spw_config) config_parser.overwrite_config(spw_config, conf_dict={'spw' : "'{0}'".format(spw)}, conf_sec='crosscal') config_parser.overwrite_config(spw_config, conf_dict={'nspw' : 1}, conf_sec='crosscal') @@ -1533,7 +1550,7 @@ def spw_split(spw,nspw,config,mem,badfreqranges,MS,partition,createmms=True,remo extn = '{0}.{1}'.format(suffix[1:],extn) filebase = prefix - vis = '{0}.{1}.{2}'.format(filebase,spw.replace(SPW_PREFIX,''),extn) + vis = '{0}.{1}.{2}'.format(filebase,spw.replace(HPC_DEFAULTS["SPW_PREFIX".lower()],''),extn) logger.warning("Since script with 'partition' in its name isn't present in '{0}', assuming partition has already been done, and setting vis='{1}' in '{2}'. If '{1}' doesn't exist, please update '{2}', as the pipeline will not launch successfully.".format(config,vis,spw_config)) orig_vis = config_parser.get_key(spw_config, 'data', 'vis') config_parser.overwrite_config(spw_config, conf_dict={'orig_vis' : "'{0}'".format(orig_vis)}, conf_sec='run', sec_comment='# Internal variables for pipeline execution') From eb4aacde2823641dc3ca872af09d941c8151154e Mon Sep 17 00:00:00 2001 From: "Thomson, Alec (CASS, Kensington)" Date: Sat, 2 Jul 2022 18:03:32 +1000 Subject: [PATCH 21/21] Typo --- processMeerKAT/processMeerKAT.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processMeerKAT/processMeerKAT.py b/processMeerKAT/processMeerKAT.py index cbc73f9..cf27929 100755 --- a/processMeerKAT/processMeerKAT.py +++ b/processMeerKAT/processMeerKAT.py @@ -149,7 +149,7 @@ def parse_scripts(val): - parser.add_argument("--hpc",metavar='name', required=False, type=str, default="unkown", help="Name of hpc facility being used. If not known to processMeerKAT/known_hpc.cfg slurm limits are functionally removed [default: ilifu].") + parser.add_argument("--hpc",metavar='name', required=False, type=str, default="unknown", help="Name of hpc facility being used. If not known to processMeerKAT/known_hpc.cfg slurm limits are functionally removed [default: ilifu].") parser.add_argument("-M","--MS",metavar="path", required=False, type=str, help="Path to MeasurementSet.")