-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathLOFAR_timesplit.py
executable file
·171 lines (142 loc) · 6.73 KB
/
LOFAR_timesplit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Data preparation for selfcal, apply cal solutions
# and split SB in time and concatenate in frequency.
import sys, os, glob, re
import numpy as np
from astropy.time import Time
import casacore.tables as pt
########################################################
from LiLF import lib_ms, lib_util, lib_log
logger_obj = lib_log.Logger('pipeline-timesplit.logger')
logger = lib_log.logger
s = lib_util.Scheduler(log_dir = logger_obj.log_dir, dry = False)
w = lib_util.Walker('pipeline-timesplit.walker')
# parse parset
parset = lib_util.getParset()
parset_dir = parset.get('LOFAR_timesplit','parset_dir')
data_dir = parset.get('LOFAR_timesplit','data_dir')
cal_dir = parset.get('LOFAR_timesplit','cal_dir')
ngroups = parset.getint('LOFAR_timesplit','ngroups')
initc = parset.getint('LOFAR_timesplit','initc') # initial tc num (useful for multiple observation of same target)
bl2flag = parset.get('flag','stations')
#################################################
# Clean
with w.if_todo('clean'):
logger.info('Cleaning...')
lib_util.check_rm('mss*')
### DONE
with w.if_todo('copy'):
MSs = lib_ms.AllMSs( glob.glob(data_dir+'/*MS'), s )
logger.info('Copy data...')
for MS in MSs.getListObj():
#if min(MS.getFreqs()) > 30.e6:
# overwrite=True to prevent updating the weights twice
MS.move(MS.nameMS+'.MS', keepOrig=True, overwrite=True)
### DONE
MSs = lib_ms.AllMSs( glob.glob('*MS'), s )
##################################################
# Find solutions to apply
if cal_dir == '':
obsid = MSs.getListObj()[0].getObsID()
# try standard location
cal_dir = glob.glob('../id%i_-_3[c|C]196' % obsid)+glob.glob('../id%i_-_3[c|C]295' % obsid)+glob.glob('../id%i_-_3[c|C]380' % obsid)
if len(cal_dir) > 0:
cal_dir = cal_dir[0]
else:
logger.error('Cannot find solutions.')
sys.exit()
else:
cal_dir = '../'+cal_dir
logger.info('Calibrator directory: %s' % cal_dir)
h5_pa = cal_dir+'/cal-pa.h5'
h5_amp = cal_dir+'/cal-amp.h5'
h5_iono = cal_dir+'/cal-iono.h5'
if not os.path.exists(h5_pa) or not os.path.exists(h5_amp) or not os.path.exists(h5_iono):
logger.error("Missing solutions in %s" % cal_dir)
sys.exit()
####################################################
# Correct fist for BP(diag)+TEC+Clock and then for beam
with w.if_todo('apply'):
# Apply cal sol - SB.MS:DATA -> SB.MS:CORRECTED_DATA (polalign corrected)
logger.info('Apply solutions (pa)...')
MSs.run('DP3 '+parset_dir+'/DP3-cor.parset msin=$pathMS \
cor.parmdb='+h5_pa+' cor.correction=polalign', log='$nameMS_cor1.log', commandType='DP3')
# Apply cal sol - SB.MS:CORRECTED_DATA -> SB.MS:CORRECTED_DATA (polalign corrected, calibrator corrected+reweight, beam corrected+reweight)
logger.info('Apply solutions (amp/ph)...')
MSs.run('DP3 '+parset_dir+'/DP3-cor.parset msin=$pathMS msin.datacolumn=CORRECTED_DATA cor.steps=[amp,ph] \
cor.amp.parmdb='+h5_amp+' cor.amp.correction=amplitudeSmooth cor.amp.updateweights=True\
cor.ph.parmdb='+h5_iono+' cor.ph.correction=phaseOrig000', log='$nameMS_cor2.log', commandType='DP3')
# Beam correction CORRECTED_DATA -> CORRECTED_DATA (polalign corrected, beam corrected+reweight)
logger.info('Beam correction...')
MSs.run('DP3 '+parset_dir+'/DP3-beam.parset msin=$pathMS corrbeam.updateweights=True', log='$nameMS_beam.log', commandType='DP3')
### DONE
###################################################################################################
# Create groups
groupnames = []
logger.info('Concatenating in frequency...')
for i, msg in enumerate(np.array_split(sorted(glob.glob('*MS')), ngroups)):
if ngroups == 1:
groupname = 'mss'
else:
groupname = 'mss-%02i' % i
groupnames.append(groupname)
# skip if already done
if not os.path.exists(groupname):
os.makedirs(groupname)
# add missing SB with a fake name not to leave frequency holes
min_nu = pt.table(MSs.getListStr()[0]).OBSERVATION[0]['LOFAR_OBSERVATION_FREQUENCY_MIN']
max_nu = pt.table(MSs.getListStr()[0]).OBSERVATION[0]['LOFAR_OBSERVATION_FREQUENCY_MAX']
num_init = lib_util.lofar_nu2num(min_nu)+1 # +1 because FREQ_MIN/MAX somewhat have the lowest edge of the SB freq
num_fin = lib_util.lofar_nu2num(max_nu)+1
ms_name_init = msg[0]
prefix = re.sub('SB[0-9]*.MS','',msg[0])
msg = []
for j in range(num_init, num_fin+1):
msg.append(prefix+'SB%03i.MS' % j)
# prepare concatenated mss - SB.MS:CORRECTED_DATA -> group#.MS:DATA (cal corr data, beam corrected)
s.add('DP3 '+parset_dir+'/DP3-concat.parset msin="['+','.join(msg)+']" msout='+groupname+'/'+groupname+'.MS', \
log=groupname+'_DP3_concat.log', commandType='DP3')
s.run(check=True)
MSs = lib_ms.AllMSs( glob.glob('mss*/*MS'), s )
#############################################################
# Flagging on concatenated dataset - also flag low-elevation
with w.if_todo('flag'):
logger.info('Flagging...')
MSs.run('DP3 '+parset_dir+'/DP3-flag.parset msin=$pathMS ant.baseline=\"' + bl2flag + '\" \
aoflagger.strategy='+parset_dir+'/LBAdefaultwideband.lua',
log='$nameMS_DP3_flag.log', commandType='DP3')
logger.info('Remove bad timestamps...')
MSs.run( 'flagonmindata.py -f 0.5 $pathMS', log='$nameMS_flagonmindata.log', commandType='python')
logger.info('Plot weights...')
MSs.run('reweight.py $pathMS -v -p -a CS001LBA', log='$nameMS_weights.log', commandType='python')
lib_util.check_rm('plots-weights')
os.system('mkdir plots-weights; mv *png plots-weights')
### DONE
#####################################
# Create time-chunks
with w.if_todo('timesplit'):
logger.info('Splitting in time...')
tc = initc
for groupname in groupnames:
ms = groupname+'/'+groupname+'.MS'
if not os.path.exists(ms): continue
t = pt.table(ms, ack=False)
starttime = t[0]['TIME']
endtime = t[t.nrows()-1]['TIME']
hours = (endtime-starttime)/3600.
logger.debug(ms+' has length of '+str(hours)+' h.')
for timerange in np.array_split(sorted(set(t.getcol('TIME'))), round(hours)):
logger.info('%02i - Splitting timerange %f %f' % (tc, timerange[0], timerange[-1]))
t1 = t.query('TIME >= ' + str(timerange[0]) + ' && TIME <= ' + str(timerange[-1]), sortlist='TIME,ANTENNA1,ANTENNA2')
splitms = groupname+'/TC%02i.MS' % tc
lib_util.check_rm(splitms)
t1.copy(splitms, True)
t1.close()
tc += 1
t.close()
lib_util.check_rm(ms) # remove not-timesplitted file
### DONE
logger.info('Cleaning up...')
os.system('rm -r *MS')
logger.info("Done.")