From c3975c2adbd4a6baf18ce1efe94e16973f6abbdf Mon Sep 17 00:00:00 2001 From: GarethCabournDavies Date: Fri, 3 May 2024 05:14:31 -0700 Subject: [PATCH] start work on fit_by, fit_over and DQ trigger supervision --- .../pycbc_live_supervise_collated_triggers | 481 ++++++++++++++++++ pycbc/live/supervision.py | 9 +- 2 files changed, 487 insertions(+), 3 deletions(-) create mode 100755 bin/live/pycbc_live_supervise_collated_triggers diff --git a/bin/live/pycbc_live_supervise_collated_triggers b/bin/live/pycbc_live_supervise_collated_triggers new file mode 100755 index 00000000000..7ecf4624dec --- /dev/null +++ b/bin/live/pycbc_live_supervise_collated_triggers @@ -0,0 +1,481 @@ +#!/usr/bin/env python + +""" +Supervise the periodic collection of triggers from pycbc_live, +using them to get template fits, smoothing those fits, +generating data quality files and the associated plots. +""" + +import re +import logging +import argparse +from datetime import datetime, timedelta +from dateutil.relativedelta import relativedelta +import os +import shutil +import subprocess +import numpy as np +import h5py + +import lal +from lal import gpstime + +import pycbc +from pycbc.live import supervision as sv + + +# These are the option used to control the supervision, and will not be passed +# to the subprocesses +control_options = [ + "ifos", + "mail-volunteers-file", + "output-directory", + "output-id-str", + "public-dir", + "replay-duration", + "replay-start-time", + "submit-dir", + "true-start-time", + "fit-by-template-format", + "fit-over-format", + "fit-over-combined-days", + "plot-fits-format", + "variable-fit-over-param", + "variable-dq-file", + 'collated-triggers-format', + 'top-directory', +] + +# These options are for the collection of triggers into a merged file +trigger_collation_options = [ + 'ifos', + 'file-identifier', + 'verbose', +] + +# These options are for the fit_by_template code +fit_by_template_options = [ + 'bank-file', + 'gating-veto-windows', + 'fit-function', + 'stat-threshold', + 'prune-param', + 'prune-bins', + 'prune-number', + 'log-prune-param', + 'f-lower', + 'min-duration', + 'approximant', + 'sngl-ranking', + 'statistic-keywords', + 'verbose', +] + +# fit_over_multiparam control options + +# These options are for the fit_over_multiparam code +fit_over_multiparam_options = [ + 'bank-file', + 'fit-param', + 'approximant', + 'f-lower', + 'min-duration', + 'log-param', + 'smoothing-width', + 'smoothing-method', + 'smoothing-keywords', + 'output-fits-by-template', + 'verbose', +] + +# These options are for the plot_template_bank_corner code +plot_bank_corner_options = [ + 'bank-file', + 'parameters', + 'mins', + 'maxs', + 'color-parameter', + 'dpi', + 'mpl-style', + 'verbose', +] + +# These options are for the bin_trigger_rates_dq code +#bin_dq_options = [ +# 'flag-file', +# 'flag-name', +# 'stat-threshold', +# 'sngl-ranking', +# 'verbose', +#] + +# These options are for the plot_dq_flag_likelihood +#plot_dq_options = [ +# 'dq-file', +# 'dq-label', +# 'verbose', +#] + +all_options = control_options + trigger_collation_options \ + + fit_by_template_options + fit_over_multiparam_options \ + + plot_bank_corner_options #+ bin_dq_options \ +# + plot_dq_options + +all_options = set(all_options) + +def read_options(args): + """ + read the options into a dictionary + """ + option_values = {opt: None for opt in all_options} + + with open(args.config_file, 'r') as conf_file: + all_lines = conf_file.readlines() + + for line in all_lines: + # Ignore whitespace and comments + line = line.strip() + if not line: + continue + if line.startswith(';'): + continue + + option, value = line.split('=') + + option = option.strip() + if option not in all_options: + logging.warning("Option %s unrecognised, ignoring", option) + continue + + if value.strip() == '': + # Option is a flag - Store as True + value = True + elif len(value.split()) > 1: + # Option is a list + value = value.split() + else: + # Option is a single value + value = value.strip() + + option_values[option] = value + + return option_values + +def ensure_directories(option_values, day_str): + output_dir = os.path.join(option_values['output-directory'], day_str) + sv.run_and_error(['mkdir', '-p', output_dir], option_values) + if 'public-dir' in option_values: + # The public directory wil be in subdirectories for the year, month, + # day, e.g. 2024_04_12 will be in 2024/04/12. + public_dir = os.path.join( + option_values['public-dir'], + *day_str.split('_') + ) + #sv.run_and_error(['mkdir', '-p', public_dir], option_values) + + +def collate_triggers(args, day_dt, day_str, option_values): + """ + Perform the fits as specified + """ + + #print(option_values) + # The main output directory will have a date subdirectory which we + # put the output into + output_dir = os.path.join(option_values['output-directory'], day_str) + trig_merge_file = os.path.join( + output_dir, + option_values['collated-triggers-format'].format(date=day_str) + ) + # Collate the triggers into a single hdf file for input into + # fit_by_template and bin_dq + collate_args = [ + './pycbc_live_collate_triggers', #FIXME - remove the ./ once this script is merged + ] + collate_args += sv.dict_to_args( + {opt: v for opt, v in option_values.items() if opt in trigger_collation_options} + ) + # FIXME: The following will depend on how the trigger finding actually happens + gps_start = gpstime.utc_to_gps(day_dt).gpsSeconds + gps_end = gpstime.utc_to_gps(day_dt + timedelta(days=1)).gpsSeconds + collate_args += [ + '--trigger-dir', option_values['top-directory'], + '--trigger-file-method', 'start-num-days', + '--start-date', day_str, + '--num-days', '1', + '--output-file-name', option_values['collated-triggers-format'].format(date=day_str), + '--output-dir', output_dir + ] + + logging.info(collate_args) + # Run once then comment out for testing, as this can be expensive + sv.run_and_error(collate_args, option_values) + + return trig_merge_file + + +def fits_plot(fits_file, option_values, ifo): + # Add plotting for daily fits, and linking to the public directory + output_dir = os.path.join(option_values['output-directory'], day_str) + fits_plot_output = fits_file[:-3] + 'png' + logging.info("Plotting daily fits %s to %s", fits_file, fits_plot_output) + fits_plot_arguments = [ + 'pycbc_plot_bank_corner', + '--fits-file', + option_values['output'], + '--output-plot-file', + fits_plot_output, + ] + fits_plot_arguments += sv.dict_to_args( + {opt: v for opt, v in option_values.items() if opt in plot_bank_corner_options} + ) + logging.info(fits_plot_arguments) + sv.run_and_error(fits_plot_arguments, option_values) + if 'public-dir' in option_values: + public_dir = os.path.abspath(os.path.join( + option_values['public-dir'], + *day_str.split('_') + )) + sv.symlink(fits_plot_output, public_dir) + + +def fit_by_template(trigger_merge_file, day_str, option_values, ifo): + fbt_out_fname = option_values['fit-by-template-format'].format( + date=day_str, + ifo=ifo, + ) + output_dir = os.path.abspath(option_values['output-directory']) + fbt_out_full = os.path.join(output_dir, fbt_out_fname) + option_values['output'] = fbt_out_full + fit_by_args = ['pycbc_fit_sngls_by_template'] + fit_by_args += ['--trigger-file', trigger_merge_file] + fit_by_args += sv.dict_to_args( + {opt: v for opt, v in option_values.items() if opt in fit_by_template_options} + ) + fit_by_args += ['--output', fbt_out_full, '--ifo', ifo] + logging.info(fit_by_args) + sv.run_and_error(fit_by_args, option_values) + + return fbt_out_full + + +def fit_over_multiparam(option_values, ifo, day_dt, day_str): + combined_days = int(option_values['fit-over-combined-days']) + if option_values['replay-start-time'] is not None: + replay_start_time = int(option_values['replay-start-time']) + true_start_time = int(option_values['true-start-time']) + replay_duration = int(option_values['replay-duration']) + rep_start_utc = lal.GPSToUTC(replay_start_time)[0:6] + + dt_replay_start = datetime( + year=rep_start_utc[0], + month=rep_start_utc[1], + day=rep_start_utc[2], + hour=rep_start_utc[3], + minute=rep_start_utc[4], + second=rep_start_utc[5] + ) + + td = (day_dt - dt_replay_start).total_seconds() + + # Time since the start of this replay + time_since_replay = np.remainder(td, replay_duration) + + # Add this on to the original start time to get the current time of + # the replay data + true_utc = lal.GPSToUTC(true_start_time)[0:6] + dt_true_start = datetime( + year=true_utc[0], + month=true_utc[1], + day=true_utc[2], + hour=true_utc[3], + minute=true_utc[4], + second=true_utc[5] + ) + # Original time of the data being replayed right now + current_date = dt_true_start + timedelta(seconds=time_since_replay) + else: + current_date = day_dt + + date_test = current_date + timedelta(days=1) + + logging.info("Finding fit_by_template files for combination") + + trfits_files = [] + missed_files = 0 + found_files = 0 + while found_files < combined_days and missed_files < 10: + # Loop through the possible file locations and see if the file exists + date_test -= timedelta(days=1) + date_out = date_test.strftime("%Y_%m_%d") + fbt_out_fname = option_values['fit-by-template-format'].format( + date=date_out, + ifo=ifo, + ) + output_dir = os.path.abspath(option_values['output-directory']) + fbt_out_full = os.path.join(output_dir, fbt_out_fname) + # Check that the file exists: + if not os.path.exists(fbt_out_full): + missed_files += 1 + logging.info("File %s does not exist - skipping", fbt_out_full) + continue + if not len(trfits_files): + end_date = date_out + # This is now the oldest file + first_date = date_out + # reset the "missed files" counter, and add to the "found files" + missed_files = 0 + found_files += 1 + trfits_files.append(fbt_out_full) + + if missed_files == 10: + # If more than 10 days between files, something wrong with analysis. + # warn and use fewer files - 10 here is chosen to be an unusual amount + # of time for the analysis to be down in standard operation + logging.warning('More than 10 days between files, only using ' + f'{found_files} files for combination!') + + file_id_str = f'{first_date}-{end_date}' + out_fname = f'{file_id_str}-{ifo}-FIT_OVER_MULTIPARAM' + + fit_over_args = ['pycbc_fit_sngls_over_multiparam', '--template-fit-file'] + fit_over_args += trfits_files + fit_over_args += sv.dict_to_args( + {opt: v for opt, v in option_values.items() if opt in fit_over_multiparam_options} + ) + output_dir = os.path.abspath(os.path.join( + option_values['output-directory'], + day_str, + ) + ) + fit_over_args += ['--output', os.path.join(output_dir, out_fname + '.hdf')] + logging.info(fit_over_args) + sv.run_and_error(fit_over_args, option_values) +# +# if not trfits_files: +# raise ValueError("No files meet the criteria") +# +# combined_options['trfits-files'] = ' '.join(trfits_files) +# +# combined_args = ['pycbc_live_combine_single_significance_fits'] +# combined_args += sv.dict_to_args(combined_options) +# +# sv.run_and_error(combined_args, option_values) +# +# logging.info('Copying combined fits file to local filesystem') +# try: +# shutil.copyfile( +# combined_options['output'], +# option_values['variable-trigger-fits'] +# ) +# except Exception as e: +# sv.mail_volunteers_error( +# option_values, +# [str(e)], +# "PyCBC live could not copy to variable trigger fits file" +# ) +# raise e + +# +# logging.info( +# "%s updated to link to %s", +# option_values['variable-trigger-fits'], +# combined_options['output'] +# ) +# +# logging.info("Plotting combined fits") +# # Add plotting for combined fits, and linking to the public directory +# combined_plot_output = os.path.join(output_dir, +# f"{{ifo}}-{out_fname}-{{type}}.png") +# combined_plot_arguments = [ +# 'pycbc_live_plot_combined_single_significance_fits', +# '--combined-fits-file', +# combined_options['output'], +# '--output-plot-name-format', +# combined_plot_output, +# '--log-colormap' +# ] +# +# print(combined_plot_arguments) +# +# combined_plot_outputs = [ +# combined_plot_output.format(ifo=ifo, type='fit_coeffs') for ifo in +# combined_options['ifos'].split() +# ] +# combined_plot_outputs += [ +# combined_plot_output.format(ifo=ifo, type='counts') for ifo in +# combined_options['ifos'].split() +# ] +# +# # Link the plots to the public-dir if wanted +# if 'public-dir' in option_values: +# logging.info("Linking combined fits") +# for cpo in combined_plot_outputs: +# sv.symlink(cpo, public_dir) + +def do_collation_fits_dq(args, option_values, day_dt, day_str): + ensure_directories(option_values, day_str) + merged_triggers = collate_triggers(args, day_dt, day_str, option_values) + for ifo in option_values['ifos']: + if args.fit_by_template is not None: + fbt_file = fit_by_template(merged_triggers, day_str, option_values, ifo) + fits_plot(fbt_file, option_values, ifo) + if args.fit_over_multiparam is not None: + fom_file = fit_over_multiparam(option_values, ifo, day_dt, day_str) + fits_plot(fom_file, option_values, ifo) + logging.info('Done') + + +parser = argparse.ArgumentParser(description=__doc__) +parser.add_argument('--config-file', required=True) +parser.add_argument( + '--date', + help='Date to analyse, if not given, will analyse yesterday (UTC). ' + 'Format YYYY_MM_DD. Do not use if using --run-daily-at.' +) +parser.add_argument( + '--fit-by-template', + action='store_true', + help="Perform template fits calculation." +) +parser.add_argument( + '--fit-over-multiparam', + action='store_true', + help="Perform template fits smoothing." +) +parser.add_argument( + '--run-daily-at', + metavar='HH:MM:SS', + help='Stay running and repeat the fitting daily at the given UTC hour.' +) +args = parser.parse_args() + +pycbc.init_logging(True) + +if args.run_daily_at is not None and args.date is not None: + parser.error('Cannot take --run-daily-at and --date at the same time') + +option_values = read_options(args) + +if args.run_daily_at is not None: + # keep running and repeat the fitting every day at the given hour + if not re.match('[0-9][0-9]:[0-9][0-9]:[0-9][0-9]', args.run_daily_at): + parser.error('--run-daily-at takes a UTC time in the format HH:MM:SS') + logging.info('Starting in daily run mode') + while True: + sv.wait_for_utc_time(args.run_daily_at) + logging.info('==== Time to update the single fits, waking up ====') + # Get the date string for yesterday's triggers + day_dt = datetime.utcnow() - timedelta(days=1) + day_str = day_dt.strftime('%Y_%m_%d') + do_collation_fits_dq(args, option_values, day_dt, day_str) +else: + # run just once + if args.date: + day_str = args.date + day_dt = datetime.strptime(args.date, '%Y_%m_%d') + else: + # Get the date string for yesterday's triggers + day_dt = datetime.utcnow() - timedelta(days=1) + day_str = day_dt.strftime('%Y_%m_%d') + do_collation_fits_dq(args, option_values, day_dt, day_str) diff --git a/pycbc/live/supervision.py b/pycbc/live/supervision.py index c9b44996025..47e9a48bce2 100644 --- a/pycbc/live/supervision.py +++ b/pycbc/live/supervision.py @@ -93,7 +93,7 @@ def mail_volunteers_error(controls, mail_body_lines, subject): mail_body = '\n'.join(mail_body_lines) subprocess.run(mail_command, input=mail_body, text=True) -def run_and_error(command_arguments): +def run_and_error(command_arguments, controls): """ Wrapper around subprocess.run to catch errors and send emails if required """ @@ -102,9 +102,12 @@ def run_and_error(command_arguments): if command_output.returncode: error_contents = [' '.join(command_arguments), command_output.stderr.decode()] - mail_volunteers_error(controls, error_contents, - f"PyCBC live could not run {command_arguments[0]}") + if controls['mail-volunteers-file'] is not None: + mail_volunteers_error(controls, error_contents, + f"PyCBC live could not run {command_arguments[0]}" + ) err_msg = f"Could not run {command_arguments[0]}" + err_msg += ' '.join(error_contents) raise subprocess.SubprocessError(err_msg) def wait_for_utc_time(target_str):