Skip to content

Commit

Permalink
Add minFlaggedFraction option to flagstation
Browse files Browse the repository at this point in the history
  • Loading branch information
darafferty committed Oct 30, 2024
1 parent a1ee80f commit 06877e8
Showing 1 changed file with 53 additions and 20 deletions.
73 changes: 53 additions & 20 deletions losoto/operations/flagstation.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

def _run_parser(soltab, parser, step):
mode = parser.getstr( step, 'mode') # no default
minFlaggedFraction = parser.getfloat( step, 'minFlaggedFraction', 0.0)
maxFlaggedFraction = parser.getfloat( step, 'maxFlaggedFraction', 0.5)
nSigma = parser.getfloat( step, 'nSigma', 5.0)
maxStddev = parser.getfloat( step, 'maxStddev', -1.0)
Expand All @@ -20,11 +21,11 @@ def _run_parser(soltab, parser, step):
soltabExport = parser.getstr( step, 'soltabExport', '' )
ncpu = parser.getint( '_global', 'ncpu', 0)

parser.checkSpelling( step, soltab, ['mode', 'maxFlaggedFraction', 'nSigma', 'maxStddev', 'ampRange', 'telescope', 'skipInternational', 'skipAnts', 'refAnt', 'soltabExport'])
return run( soltab, mode, maxFlaggedFraction, nSigma, maxStddev, ampRange, telescope, skipInternational, skipAnts, refAnt, soltabExport, ncpu )
parser.checkSpelling( step, soltab, ['mode', 'minFlaggedFraction', 'maxFlaggedFraction', 'nSigma', 'maxStddev', 'ampRange', 'telescope', 'skipInternational', 'skipAnts', 'refAnt', 'soltabExport'])
return run( soltab, mode, minFlaggedFraction, maxFlaggedFraction, nSigma, maxStddev, ampRange, telescope, skipInternational, skipAnts, refAnt, soltabExport, ncpu )


def _flag_resid(vals, weights, soltype, nSigma, maxFlaggedFraction, maxStddev, ants, s, outQueue):
def _flag_resid(vals, weights, soltype, nSigma, minFlaggedFraction, maxFlaggedFraction, maxStddev, ants, s, outQueue):
"""
Flags bad residuals relative to mean by setting the corresponding weights to 0.0
Expand All @@ -42,6 +43,10 @@ def _flag_resid(vals, weights, soltype, nSigma, maxFlaggedFraction, maxStddev, a
nSigma : float
Number of sigma for flagging. vals outside of nSigma*stddev are flagged
minFlaggedFraction : float
Minimum allowable fraction of new flagged frequencies. Stations with
lower fractions will not have any new flags applied
maxFlaggedFraction : float
Maximum allowable fraction of flagged frequencies. Stations with higher fractions
will be completely flagged
Expand Down Expand Up @@ -118,11 +123,19 @@ def _flag_resid(vals, weights, soltype, nSigma, maxFlaggedFraction, maxStddev, a

# Check whether station is bad (high flagged fraction). If
# so, flag all frequencies and polarizations
if float(len(bad[0]))/float(nsols_unflagged) > maxFlaggedFraction:
fraction_flagged = float(len(bad[0]))/float(nsols_unflagged)
if fraction_flagged > maxFlaggedFraction:
# Station has high fraction of initially unflagged solutions that are now flagged
logging.info('Flagged {0} (pol {1}) due to high flagged fraction '
'({2:.2f})'.format(ants[s], pol, float(len(bad[0]))/float(nsols_unflagged)))
'({2:.2f})'.format(ants[s], pol, fraction_flagged))
weights[:, :, pol] = 0.0
elif fraction_flagged < minFlaggedFraction:
# Station has low fraction of initially unflagged solutions that are
# now flagged, so don't flag anything new
logging.info('Flagged 0.0% of solutions for {0} (pol {1}) due to '
'flagged fraction ({2:.3f}) < minFlaggedFraction '
'({3:.3f})'.format(ants[s], pol, fraction_flagged, minFlaggedFraction))
weights[:, :, pol] = weights_orig # restore original flags
else:
# Station is OK, flag bad points only
nflagged_orig = len(np.where(weights_orig == 0.0)[0])
Expand All @@ -134,7 +147,7 @@ def _flag_resid(vals, weights, soltype, nSigma, maxFlaggedFraction, maxStddev, a
outQueue.put([s, weights])


def _flag_bandpass(freqs, amps, weights, telescope, nSigma, ampRange, maxFlaggedFraction, maxStddev,
def _flag_bandpass(freqs, amps, weights, telescope, nSigma, ampRange, minFlaggedFraction, maxFlaggedFraction, maxStddev,
plot, ants, s, outQueue):
"""
Flags bad amplitude solutions relative to median bandpass (in log space) by setting
Expand All @@ -160,6 +173,10 @@ def _flag_bandpass(freqs, amps, weights, telescope, nSigma, ampRange, maxFlagged
nSigma : float
Number of sigma for flagging. Amplitudes outside of nSigma*stddev are flagged
minFlaggedFraction : float
Minimum allowable fraction of new flagged frequencies. Stations with
lower fractions will not have any new flags applied
maxFlaggedFraction : float
Maximum allowable fraction of flagged frequencies. Stations with higher fractions
will be completely flagged
Expand Down Expand Up @@ -459,7 +476,8 @@ def _fit_bandpass(freq, logamp, sigma, band, do_fit=True):
sigma_div[bad] = 1e8
niter += 1
flagged = np.where(sigma_div >= 1e8)
nflagged_orig = len(np.where(weights[:, :, pol] == 0.0)[0])
flagged_orig = np.where(weights[:, :, pol] == 0.0)
nflagged_orig = len(flagged_orig[0])
weights[:, flagged[0], pol] = 0.0
nflagged_new = len(np.where(weights[:, :, pol] == 0.0)[0])
fraction_flagged = float(nflagged_new - nflagged_orig) / float(np.product(weights.shape[:-1]))
Expand All @@ -479,11 +497,20 @@ def _fit_bandpass(freq, logamp, sigma, band, do_fit=True):
'({2})'.format(ants[s], pol, stdev_all))
weights[:, :, pol] = 0.0
elif fraction_flagged > maxFlaggedFraction:
# Station has high fraction of initially unflagged solutions that are now flagged
# Station has high fraction of solutions that are
# now flagged, so flag everything
logging.info('Flagged 100% of solutions for {0} (pol {1}) due to '
'flagged fraction ({2:.3f}) > maxFlaggedFraction '
'({3:.3f})'.format(ants[s], pol, fraction_flagged, maxFlaggedFraction))
weights[:, :, pol] = 0.0
elif fraction_flagged > 0 and fraction_flagged < minFlaggedFraction:
# Station has low fraction of solutions that are
# now flagged, so don't flag anything new
logging.info('Flagged 0.0% of solutions for {0} (pol {1}) due to '
'flagged fraction ({2:.3f}) < minFlaggedFraction '
'({3:.3f})'.format(ants[s], pol, fraction_flagged, minFlaggedFraction))
weights[:, flagged[0], pol] = 1.0 # unset new flags
weights[:, flagged_orig[0], pol] = 0.0 # restore original flags
else:
median_val = np.nanmedian(amps[np.where(weights[:, :, pol] > 0.0)])
if median_val < median_min or median_val > median_max:
Expand All @@ -492,13 +519,14 @@ def _fit_bandpass(freq, logamp, sigma, band, do_fit=True):
'({2})'.format(ants[s], pol, median_val))
weights[:, :, pol] = 0.0
else:
# Station is OK, flag bad points only
# Station is OK, keep new flags of bad points only
logging.info('Flagged {0:.1f}% of solutions for {1} (pol {2})'.format(fraction_flagged*100, ants[s], pol))

outQueue.put([s, weights])


def run( soltab, mode, maxFlaggedFraction=0.5, nSigma=5.0, maxStddev=None, ampRange=None, telescope='lofar',
def run( soltab, mode, minFlaggedFraction=0.0, maxFlaggedFraction=0.5, nSigma=5.0,
maxStddev=None, ampRange=None, telescope='lofar',
skipInternational=False, skipAnts=[], refAnt='', soltabExport='', ncpu=0 ):
"""
This operation for LoSoTo implements a station-flagging procedure. Flags are time-independent.
Expand All @@ -511,9 +539,13 @@ def run( soltab, mode, maxFlaggedFraction=0.5, nSigma=5.0, maxStddev=None, ampRa
relative to a model bandpass (only LOFAR is currently supported). Resid
mode clips residual phases or log(amplitudes).
minFlaggedFraction : float, optional
Minimum allowable fraction of new flagged frequencies. Stations with
lower fractions will not have any new flags applied
maxFlaggedFraction : float, optional
This sets the maximum allowable fraction of flagged solutions above
which the entire station is flagged.
This sets the maximum allowable fraction of newly flagged solutions
above which the entire station is flagged.
nSigma : float, optional
This sets the number of standard deviations considered when outlier
Expand Down Expand Up @@ -622,7 +654,7 @@ def run( soltab, mode, maxFlaggedFraction=0.5, nSigma=5.0, maxStddev=None, ampRa
soltab.ant[s] in skipAnts):
continue
mpm.put([soltab.freq[:], vals_arraytmp[:, s, :, :, d], weights_arraytmp[:, s, :, :, d],
telescope, nSigma, ampRange, maxFlaggedFraction, maxStddev, False, soltab.ant[:], s])
telescope, nSigma, ampRange, minFlaggedFraction, maxFlaggedFraction, maxStddev, False, soltab.ant[:], s])
mpm.wait()
for (s, w) in mpm.get():
weights_arraytmp[:, s, :, :, d] = w
Expand All @@ -634,7 +666,7 @@ def run( soltab, mode, maxFlaggedFraction=0.5, nSigma=5.0, maxStddev=None, ampRa
soltab.ant[s] in skipAnts):
continue
mpm.put([soltab.freq[:], vals_arraytmp[:, s, :, :], weights_arraytmp[:, s, :, :],
telescope, nSigma, ampRange, maxFlaggedFraction, maxStddev, False, soltab.ant[:], s])
telescope, nSigma, ampRange, minFlaggedFraction, maxFlaggedFraction, maxStddev, False, soltab.ant[:], s])
mpm.wait()
for (s, w) in mpm.get():
weights_arraytmp[:, s, :, :] = w
Expand All @@ -652,8 +684,9 @@ def run( soltab, mode, maxFlaggedFraction=0.5, nSigma=5.0, maxStddev=None, ampRa
else:
weights_array = weights_arraytmp.transpose([time_ind, ant_ind, freq_ind, pol_ind])
soltab.setValues(weights_array, weight=True)
soltab.addHistory('FLAGSTATION (mode=bandpass, telescope={0}, maxFlaggedFraction={1}, '
'nSigma={2})'.format(telescope, maxFlaggedFraction, nSigma))
soltab.addHistory('FLAGSTATION (mode=bandpass, telescope={0}, minFlaggedFraction={1}, '
'maxFlaggedFraction={2}, nSigma={3})'.format(telescope, minFlaggedFraction,
maxFlaggedFraction, nSigma))
else:
if solType not in ['phase', 'amplitude']:
logging.error("Soltab must be of type phase or amplitude for resid mode.")
Expand Down Expand Up @@ -701,15 +734,15 @@ def run( soltab, mode, maxFlaggedFraction=0.5, nSigma=5.0, maxStddev=None, ampRa
mpm = multiprocManager(ncpu, _flag_resid)
for s in range(len(soltab.ant)):
mpm.put([vals_arraytmp[:, s, :, :, d], weights_arraytmp[:, s, :, :, d],
solType, nSigma, maxFlaggedFraction, maxStddev, soltab.ant[:], s])
solType, nSigma, minFlaggedFraction, maxFlaggedFraction, maxStddev, soltab.ant[:], s])
mpm.wait()
for (s, w) in mpm.get():
weights_arraytmp[:, s, :, :, d] = w
else:
mpm = multiprocManager(ncpu, _flag_resid)
for s in range(len(soltab.ant)):
mpm.put([vals_arraytmp[:, s, :, :], weights_arraytmp[:, s, :, :], solType,
nSigma, maxFlaggedFraction, maxStddev, soltab.ant[:], s])
nSigma, minFlaggedFraction, maxFlaggedFraction, maxStddev, soltab.ant[:], s])
mpm.wait()
for (s, w) in mpm.get():
weights_arraytmp[:, s, :, :] = w
Expand Down Expand Up @@ -737,8 +770,8 @@ def run( soltab, mode, maxFlaggedFraction=0.5, nSigma=5.0, maxStddev=None, ampRa
else:
weights_array = weights_arraytmp[:, :, :, 0].transpose([time_ind, ant_ind, freq_ind])
soltab.setValues(weights_array, weight=True)
soltab.addHistory('FLAGSTATION (mode=resid, maxFlaggedFraction={0}, '
'nSigma={1})'.format(maxFlaggedFraction, nSigma))
soltab.addHistory('FLAGSTATION (mode=resid, minFlaggedFraction={0}, maxFlaggedFraction={1}, '
'nSigma={2})'.format(minFlaggedFraction, maxFlaggedFraction, nSigma))

if soltabExport is not None:
# Transfer station flags to soltabExport
Expand Down

0 comments on commit 06877e8

Please sign in to comment.