Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing args and merge #2

Merged
merged 22 commits into from
Jul 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions processMeerKAT/aux_scripts/concat.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def do_concat(visname, fields, dirs='*MHz'):
fname = msmd.namesforfields(int(fname))[0]

#Concat tt0 images (into continuum cube)
suffix = 'images/*{0}*image.tt0'.format(fname)
suffix = 'images/*.{0}*image.tt0'.format(fname)
files,pattern = get_infiles(dirs,suffix)
out = '{0}.{1}.contcube'.format(filebase,fname)
images = check_output(fname,files,pattern,out,job='imageconcat',filetype='image')
Expand All @@ -104,7 +104,7 @@ def do_concat(visname, fields, dirs='*MHz'):
logger.error("Output image '{0}' attempted to write but was not written.".format(out))

#Concat images (into continuum cube)
suffix = 'images/*{0}*image'.format(fname)
suffix = 'images/*.{0}*image'.format(fname)
files,pattern = get_infiles(dirs,suffix)
out = '{0}.{1}.contcube'.format(filebase,fname)
images = check_output(fname,files,pattern,out,job='imageconcat',filetype='image')
Expand All @@ -121,7 +121,7 @@ def do_concat(visname, fields, dirs='*MHz'):
logger.error("Output image '{0}' attempted to write but was not written.".format(out))

#Concat MSs
suffix = '*{0}*.ms'.format(fname)
suffix = '*.{0}*.ms'.format(fname)
files,pattern = get_infiles(dirs,suffix)
out = '{0}.{1}.ms'.format(filebase,fname)
MSs = check_output(fname,files,pattern,out,job='concat',filetype='MS')
Expand All @@ -137,7 +137,7 @@ def do_concat(visname, fields, dirs='*MHz'):
logger.error("Output MS '{0}' attempted to write but was not written.".format(out))

#Concat MMSs
suffix = '*{0}*.mms'.format(fname)
suffix = '*.{0}*.mms'.format(fname)
files,pattern = get_infiles(dirs,suffix)
out = '{0}.{1}.mms'.format(filebase,fname)
MMSs = check_output(fname,files,pattern,out,job='virtualconcat',filetype='MMS')
Expand Down
2 changes: 1 addition & 1 deletion processMeerKAT/aux_scripts/show_ant_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
if len(args) > 1:
if sys.argv[1] == '-h':
print('Usage: {0} filename bins'.format(os.path.split(sys.argv[0])[1]))
else:
elif sys.argv[1] != '--config':
fname = sys.argv[1]
bins = int(sys.argv[2])

Expand Down
73 changes: 67 additions & 6 deletions processMeerKAT/bookkeeping.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from collections import namedtuple
import os
import glob
import re

import logging
from time import gmtime
Expand Down Expand Up @@ -85,6 +86,8 @@ def polfield_name(visname):
polfield = list(set(["3C138", "0518+165", "0521+166", "J0521+1638"]).intersection(set(fieldnames)))[0]
elif any([ff in ["3C48", "0134+329", "0137+331", "J0137+3309"] for ff in fieldnames]):
polfield = list(set(["3C48", "0134+329", "0137+331", "J0137+3309"]).intersection(set(fieldnames)))[0]
elif "J1130-1449" in fieldnames:
polfield = "J1130-1449"
else:
logger.warning("No valid polarization field found. Defaulting to use the phase calibrator to solve for XY phase.")
logger.warning("The polarization solutions found will likely be wrong. Please check the results carefully.")
Expand Down Expand Up @@ -125,7 +128,7 @@ def get_selfcal_params():
if params['dopol'] and 'G' in params['gaintype']:
logger.warning("dopol is True, but gaintype includes 'G'. Use gaintype='T' for polarisation on linear feeds (e.g. MeerKAT).")

single_args = ['nloops','loop','discard_nloops','outlier_threshold'] #need to be 1 long (i.e. not a list)
single_args = ['nloops','loop','discard_nloops','outlier_threshold','outlier_radius'] #need to be 1 long (i.e. not a list)
gaincal_args = ['solint','calmode','gaintype','flag'] #need to be nloops long
list_args = ['imsize'] #allowed to be lists of lists

Expand Down Expand Up @@ -177,10 +180,41 @@ def get_selfcal_params():

return args,params

def get_selfcal_args(vis,loop,nloops,nterms,deconvolver,discard_nloops,calmode,outlier_threshold,threshold,step):
def get_selfcal_args(vis,loop,nloops,nterms,deconvolver,discard_nloops,calmode,outlier_threshold,outlier_radius,threshold,step):

from casatools import msmetadata,quanta
from read_ms import check_spw
msmd = msmetadata()
qa = quanta()

if os.path.exists('{0}/SUBMSS'.format(vis)):
tmpvis = glob.glob('{0}/SUBMSS/*'.format(vis))[0]
else:
tmpvis = vis

msmd.open(tmpvis)

visbase = os.path.split(vis.rstrip('/ '))[1] # Get only vis name, not entire path
basename = visbase.replace('.mms', '')
targetfields = config_parser.get_key(config_parser.parse_args()['config'], 'fields', 'targetfields')

#Force taking first target field (relevant for writing outliers.txt at beginning of pipeline)
if type(targetfields) is str and ',' in targetfields:
targetfield = targetfields.split(',')[0]
msg = 'Multiple target fields input ("{0}"), but only one position can be used to identify outliers (for outlier imaging). Using "{1}".'
logger.warning(msg.format(targetfields,targetfield))
else:
targetfield = targetfields
#Make sure it's an integer
try:
targetfield = int(targetfield)
except ValueError: # It's not an int, but a str
targetfield = msmd.fieldsforname(targetfield)[0]

if '.ms' in visbase and str(targetfield) not in visbase:
basename = visbase.replace('.ms','.{0}'.format(msmd.namesforfields(targetfield)[0]))
else:
basename = visbase.replace('.mms', '')

imbase = basename + '_im_%d' # Images will be produced in $CWD
imagename = imbase % loop
outimage = imagename + '.image'
Expand Down Expand Up @@ -218,8 +252,23 @@ def get_selfcal_args(vis,loop,nloops,nterms,deconvolver,discard_nloops,calmode,o
outlierfile = 'outliers_loop{0}.txt'.format(loop)
else:
outlierfile = 'outliers_loop{0}.txt'.format(loop+1)

#Derive sky model radius for outliers, assuming channel 0 (of SPW 0) is lowest frequency and therefore largest FWHM
if outlier_radius == 0.0 or outlier_radius == '':
SPW = check_spw(config_parser.parse_args()['config'],msmd)
low_freq = float(SPW.replace('*:','').split('~')[0]) * 1e6 #MHz to Hz
rads=1.025*qa.constants(v='c')['value']/low_freq/ msmd.antennadiameter()['0']['value']
FWHM=qa.convert(qa.quantity(rads,'rad'),'deg')['value']
sky_model_radius = 1.5*FWHM #degrees
logger.warning('Using calculated search radius of {0:.1f} degrees.'.format(sky_model_radius))
else:
logger.info('Using preset search radius of {0} degrees'.format(outlier_radius))
sky_model_radius = outlier_radius
else:
outlierfile = ''
sky_model_radius = 0.0

msmd.done()

if not (type(threshold[loop]) is str and 'Jy' in threshold[loop]) and threshold[loop] > 1:
if step in ['tclean','predict']:
Expand All @@ -233,7 +282,7 @@ def get_selfcal_args(vis,loop,nloops,nterms,deconvolver,discard_nloops,calmode,o
elif step == 'bdsf':
thresh = threshold[loop]

return imbase,imagename,outimage,pixmask,rmsfile,caltable,prev_caltables,threshold,outlierfile,cfcache,thresh,maskfile
return imbase,imagename,outimage,pixmask,rmsfile,caltable,prev_caltables,threshold,outlierfile,cfcache,thresh,maskfile,targetfield,sky_model_radius

def rename_logs(logfile=''):

Expand All @@ -256,6 +305,18 @@ def get_imaging_params():
taskvals, config = config_parser.parse_config(args['config'])
params = taskvals['image']
params['vis'] = taskvals['data']['vis']
params['keepmms'] = taskvals['crosscal']['keepmms']

#Rename the masks that were already used
if params['outlierfile'] != '' and os.path.exists(params['outlierfile']):
outliers=open(params['outlierfile']).read()
outlier_bases = re.findall(r'imagename=(.*)\n',outliers)
for name in outlier_bases:
mask = '{0}.mask'.format(name)
if os.path.exists(mask):
newname = '{0}.old'.format(mask)
logger.info('Re-using old mask for "{0}". Renaming "{1}" to "{2}" to avoid mask conflict.'.format(name,mask,newname))
os.rename(mask,newname)

return args,params

Expand All @@ -274,14 +335,14 @@ def run_script(func,logfile=''):
if continue_run:
try:
func(args,taskvals)
#rename_logs(logfile)
rename_logs(logfile)
except Exception as err:
logger.error('Exception found in the pipeline of type {0}: {1}'.format(type(err),err))
logger.error(traceback.format_exc())
config_parser.overwrite_config(args['config'], conf_dict={'continue' : False}, conf_sec='run', sec_comment='# Internal variables for pipeline execution')
if nspw > 1:
for SPW in spw.split(','):
spw_config = '{0}/{1}'.format(SPW.replace('0:',''),args['config'])
spw_config = '{0}/{1}'.format(SPW.replace('*:',''),args['config'])
config_parser.overwrite_config(spw_config, conf_dict={'continue' : False}, conf_sec='run', sec_comment='# Internal variables for pipeline execution')
rename_logs(logfile)
sys.exit(1)
Expand Down
2 changes: 1 addition & 1 deletion processMeerKAT/config_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def parse_spw(filename):
lowest = min(low)
highest = max(high)

# Uncomment to use e.g. '*MHz'
# Uncomment to simply use e.g. '*MHz'
# if all([i == unit[0] for i in unit]):
# unit = unit[0]
# dirs = '*{0}'.format(unit)
Expand Down
10 changes: 7 additions & 3 deletions processMeerKAT/crosscal_scripts/calc_refant.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
from casatasks import *
logfile=casalog.logfile()
casalog.setlogfile('logs/{SLURM_JOB_NAME}-{SLURM_JOB_ID}.casa'.format(**os.environ))
from casatools import msmetadata
from casatools import msmetadata,table
msmd = msmetadata()
tb = table()

import logging
from time import gmtime
Expand All @@ -30,6 +31,8 @@
def get_ref_ant(visname, fluxfield):

msmd.open(visname)
if type(fluxfield) is str:
fluxfield = msmd.fieldsforname(fluxfield)[0]
fluxscans = msmd.scansforfield(int(fluxfield))
logger.info("Flux field scan no: %d" % fluxscans[0])
antennas = msmd.antennasforscan(fluxscans[0])
Expand All @@ -41,7 +44,7 @@ def get_ref_ant(visname, fluxfield):
tb.open(visname)

fptr = open('ant_stats.txt', 'w')
fptr.write(header)
fptr.write(header + '\n')

antflags = []
for ant in antennas:
Expand All @@ -50,12 +53,13 @@ def get_ref_ant(visname, fluxfield):
if antdat.size == 0:
flags = 1
fptr.write('{0: <3} {1:.4f}\n'.format(ant, np.nan))
logger.info('{0: <3} {1:.4f}'.format(ant, np.nan))
antflags.append(flags)
continue

flags = np.count_nonzero(antdat)/float(antdat.size)

fptr.write('\n{0: <3} {1:.4f}'.format(ant, flags))
fptr.write('{0: <3} {1:.4f}\n'.format(ant, flags))
logger.info('{0: <3} {1:.4f}'.format(ant, flags))
antflags.append(flags)

Expand Down
2 changes: 1 addition & 1 deletion processMeerKAT/crosscal_scripts/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def main(args,taskvals):
low,high,unit,dirs = config_parser.parse_spw(args['config'])
spwname = '{0:.0f}~{1:.0f}MHz'.format(min(low),max(high))
else:
spwname = spw.replace('0:','')
spwname = spw.replace('*:','')

msmd.open(visname)
npol = msmd.ncorrforpol()[0]
Expand Down
2 changes: 1 addition & 1 deletion processMeerKAT/crosscal_scripts/plotcal_spw.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

import matplotlib
# Agg doesn't need X - matplotlib doesn't work with xvfb
matplotlib.use('Agg', warn=False)
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import numpy as np

Expand Down
8 changes: 5 additions & 3 deletions processMeerKAT/crosscal_scripts/split.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
import casampi
msmd = msmetadata()

def split_vis(visname, spw, fields, specavg, timeavg, keepmms):
def split_vis(visname, spw, fields, specavg, timeavg, keepmms, badants):

outputbase = os.path.splitext(os.path.split(visname)[1])[0]
extn = 'mms' if keepmms else 'ms'
newvis = visname
antenna = '!{0}'.format(','.join(map(str,badants))) if len(badants) > 0 else ''

for field in fields:
if field != '':
Expand All @@ -36,7 +37,7 @@ def split_vis(visname, spw, fields, specavg, timeavg, keepmms):

split(vis=visname, outputvis=outname, datacolumn='corrected',
field=fname, spw=spw, keepflags=True, keepmms=keepmms,
width=specavg, timebin=timeavg)
width=specavg, timebin=timeavg, antenna=antenna)

if fname == fields.targetfield.split(',')[0]:
newvis = outname
Expand All @@ -51,13 +52,14 @@ def main(args,taskvals):
fields = bookkeeping.get_field_ids(taskvals['fields'])

spw = va(taskvals, 'crosscal', 'spw', str, default='')
badants = taskvals['crosscal'].pop('badants')

specavg = va(taskvals, 'crosscal', 'width', int, default=1)
timeavg = va(taskvals, 'crosscal', 'timeavg', str, default='8s')
keepmms = va(taskvals, 'crosscal', 'keepmms', bool)

msmd.open(visname)
newvis = split_vis(visname, spw, fields, specavg, timeavg, keepmms)
newvis = split_vis(visname, spw, fields, specavg, timeavg, keepmms, badants)

config_parser.overwrite_config(args['config'], conf_dict={'vis' : "'{0}'".format(newvis)}, conf_sec='data')
config_parser.overwrite_config(args['config'], conf_dict={'crosscal_vis': "'{0}'".format(visname)}, conf_sec='run', sec_comment='# Internal variables for pipeline execution')
Expand Down
6 changes: 5 additions & 1 deletion processMeerKAT/crosscal_scripts/xy_yx_solve.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def qu_polfield(polfield, visname):
"""

msmd.open(visname)
meanfreq = msmd.meanfreq(0, unit='GHz')
meanfreq = msmd.meanfreq(0, unit='MHz')
msmd.done()

if polfield in ["3C286", "1328+307", "1331+305", "J1331+3030"]:
Expand All @@ -51,6 +51,10 @@ def qu_polfield(polfield, visname):
perley_frac = np.array([0.003, 0.005, 0.007])
perley_f = np.array([1050,1450,1640])
pa_polcal = np.array([25, 140, -5])
elif polfield == "J1130-1449": #Manual model from Russ Taylor, taken from MeerKAT polarisation calibrator project
perley_frac = np.array([0.038, 0.050, 0.056])
perley_f = np.array([1050, 1450, 1640])
pa_polcal = np.array([145, 66, 45])
else:
# This should never happen.
raise ValueError("Invalid polarization field. Exiting.")
Expand Down
16 changes: 9 additions & 7 deletions processMeerKAT/default_config.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ partition = 'Main' # SLURM partition to use
exclude = '' # SLURM nodes to exclude
time = '12:00:00'
submit = False
container = '/idia/software/containers/casa-6-2021-10-12.simg'
container = '/idia/software/containers/casa-6.5.0-modular.sif'
mpi_wrapper = 'mpirun'
name = ''
dependencies = ''
account = 'b03-idia-ag'
reservation = ''
modules = ['openmpi/2.1.1']
modules = ['openmpi/4.0.3']
verbose = False
precal_scripts = [('calc_refant.py',False,''), ('partition.py',True,'')]
postcal_scripts = [('concat.py',False,''), ('plotcal_spw.py', False, ''), ('selfcal_part1.py',True,''), ('selfcal_part2.py',False,''), ('science_image.py', True, '')]
Expand All @@ -46,7 +46,7 @@ width = 1 # Number of channels to (further) average afte
timeavg = '8s' # Time interval to average after calibration (during split)
createmms = True # Create MMS (True) or MS (False) for cross-calibration during partition
keepmms = True # Output MMS (True) or MS (False) during split
spw = '0:880~933MHz,0:960~1010MHz,0:1010~1060MHz,0:1060~1110MHz,0:1110~1163MHz,0:1299~1350MHz,0:1350~1400MHz,0:1400~1450MHz,0:1450~1500MHz,0:1500~1524MHz,0:1630~1680MHz' # Spectral window / frequencies to extract for MMS
spw = '*:880~933MHz,*:960~1010MHz,*:1010~1060MHz,*:1060~1110MHz,*:1110~1163MHz,*:1299~1350MHz,*:1350~1400MHz,*:1400~1450MHz,*:1450~1500MHz,*:1500~1524MHz,*:1630~1680MHz' # Spectral window / frequencies to extract for MMS
nspw = 11 # Number of spectral windows to split into
calcrefant = False # Calculate reference antenna in program (overwrites 'refant')
refant = 'm059' # Reference antenna name / number
Expand Down Expand Up @@ -75,21 +75,23 @@ flag = True # Flag residual column after selfcal?
gaintype = 'G' # Use 'T' for polarisation on linear feeds (e.g. MeerKAT)
discard_nloops = 0 # Discard this many selfcal solutions (e.g. from quick and dirty image) during subsequent loops (only considers when calmode !='')
outlier_threshold = 0.0 # S/N values if >= 1.0, otherwise Jy
outlier_radius = 0.0 # Radius in degrees for identifying outliers in RACS

[image]
cell = '1.5arcsec'
robust = -0.5
imsize = [4096, 4096]
wprojplanes = 256
niter = 100000
threshold = 10e-6 # S/N value if >= 1.0 and rmsmap != '', otherwise Jy
imsize = [6144, 6144]
wprojplanes = 512
niter = 50000
threshold = 10 # S/N value if >= 1.0 and rmsmap != '', otherwise Jy
multiscale = [0, 5, 10, 15]
nterms = 2 # Number of taylor terms
gridder = 'wproject'
deconvolver = 'mtmfs'
restoringbeam = ''
specmode = 'mfs'
stokes = 'I'
pbthreshold = 0.1 # Threshold below which to mask the PB for PB correction
mask = ''
rmsmap = ''
outlierfile = ''
Expand Down
1 change: 1 addition & 0 deletions processMeerKAT/known_hpc.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
CONFIG = 'default_config.txt'
TMP_CONFIG = '.config.tmp'
MASTER_SCRIPT = 'submit_pipeline.sh'
SPW_PREFIX = '*:'
PARTITION = 'Main'
MODULES = ['openmpi/2.1.1']
QOS = 'qos-interactive'
Expand Down
Loading