Skip to content

Commit

Permalink
PyGRB now establishes the tc prior for injections at workflow time (#…
Browse files Browse the repository at this point in the history
…4751)

* PyGRB now establishes the tc prior for injections at workflow time

* Makes sure PyGRB injections are only in the offsource

* Created the method generate_tc_prior and cleaned up indentations in pycbc_make_offline_grb_workflow

* Consistent returns from generate_triggered_segment
  • Loading branch information
pannarale authored Jun 13, 2024
1 parent d486ec2 commit db6f79b
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 52 deletions.
125 changes: 77 additions & 48 deletions bin/pygrb/pycbc_make_offline_grb_workflow
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,13 @@ __version__ = pycbc.version.git_verbose_msg
__date__ = pycbc.version.date
__program__ = "pycbc_make_offline_grb_workflow"

import shutil
import sys
import os
import argparse
import logging
import pycbc.workflow as _workflow
from pycbc.workflow.core import configparser_value_to_file
from ligo.segments import segment, segmentlist, segmentlistdict
from ligo.segments import segment, segmentlistdict
import matplotlib
matplotlib.use('agg')
from pycbc.results.pygrb_plotting_utils import make_grb_segments_plot
Expand Down Expand Up @@ -79,18 +78,25 @@ wflow.cp = _workflow.set_grb_start_end(wflow.cp, start, end)
# Retrieve science segments
curr_dir = os.getcwd()
seg_dir = os.path.join(curr_dir, "segments")
sciSegsFile = _workflow.get_segments_file(wflow, 'science', 'segments-science', seg_dir)
sciSegsFile = _workflow.get_segments_file(wflow,
'science',
'segments-science',
seg_dir)

sciSegs = {}
for ifo in wflow.ifos:
sciSegs[ifo] = sciSegsFile.segment_dict[ifo+':science']

# This block of code and PyCBCMultiInspiralExecutable.get_valid_times()
# must be consistent
if wflow.cp.has_option("inspiral", "segment-start-pad"):
pad_data = int(wflow.cp.get("inspiral", "pad-data"))
start_pad = int(wflow.cp.get("inspiral", "segment-start-pad"))
end_pad = int(wflow.cp.get("inspiral", "segment-end-pad"))
wflow.cp.set("workflow-exttrig_segments", "min-before",str(start_pad+pad_data))
wflow.cp.set("workflow-exttrig_segments", "min-after",str(end_pad+pad_data))
wflow.cp.set("workflow-exttrig_segments", "min-before",
str(start_pad+pad_data))
wflow.cp.set("workflow-exttrig_segments", "min-after",
str(end_pad+pad_data))
elif wflow.cp.has_option("inspiral", "analyse-segment-end"):
safety = 1
deadtime = int(wflow.cp.get("inspiral", "segment-length")) / 2
Expand All @@ -108,31 +114,33 @@ else:
single_ifo = wflow.cp.has_option("workflow", "allow-single-ifo-search")
if len(sciSegs.keys()) == 0:
plot_met = make_grb_segments_plot(wflow, segmentlistdict(), triggertime,
triggername, seg_dir)
triggername, seg_dir)
logging.warning("No science segments available.")
sys.exit()
elif len(sciSegs.keys()) < 2 and not single_ifo:
plot_met = make_grb_segments_plot(wflow, segmentlistdict(sciSegs),
triggertime, triggername, seg_dir)
triggertime, triggername, seg_dir)
msg = "Science segments exist only for %s. " % tuple(sciSegs.keys())[0]
msg += "If you wish to enable single IFO running add the option "
msg += "'allow-single-ifo-search' to the [workflow] section of your "
msg += "configuration file."
logging.warning(msg)
sys.exit()
else:
onSrc, offSrc = _workflow.generate_triggered_segment(wflow, seg_dir,
sciSegs)
onSrc, offSrc, bufferSeg = _workflow.generate_triggered_segment(wflow,
seg_dir,
sciSegs)

sciSegs = segmentlistdict(sciSegs)
if onSrc is None:
plot_met = make_grb_segments_plot(wflow, sciSegs, triggertime, triggername,
seg_dir, fail_criterion=offSrc)
seg_dir, fail_criterion=offSrc)
logging.info("Making segment plot and exiting.")
sys.exit()
else:
plot_met = make_grb_segments_plot(wflow, sciSegs, triggertime, triggername,
seg_dir, coherent_seg=offSrc[tuple(offSrc.keys())[0]][0])
plot_met = make_grb_segments_plot(
wflow, sciSegs, triggertime, triggername, seg_dir,
coherent_seg=offSrc[tuple(offSrc.keys())[0]][0])
segs_plot = _workflow.File(plot_met[0], plot_met[1], plot_met[2],
file_url=plot_met[3])
segs_plot.add_pfn(segs_plot.cache_entry.path, site="local")
Expand All @@ -158,15 +166,15 @@ if wflow.cp.has_option("workflow-condition_strain", "do-gating"):
if wflow.cp.has_option("inspiral", "segment-start-pad"):
start_pad = int(wflow.cp.get("inspiral", "segment-start-pad"))
end_pad = int(wflow.cp.get("inspiral", "segment-end-pad"))
wflow.analysis_time = segment(int(sciSegs[ifo][0][0]) + \
start_pad + padding,
int(sciSegs[ifo][0][1]) - \
padding - end_pad)
wflow.analysis_time = segment(int(sciSegs[ifo][0][0]) +
start_pad + padding,
int(sciSegs[ifo][0][1]) -
padding - end_pad)
elif wflow.cp.has_option("inspiral", "analyse-segment-end"):
wflow.analysis_time = segment(int(sciSegs[ifo][0][0]) + deadtime - \
spec_len + padding - safety,
int(sciSegs[ifo][0][1]) - spec_len - \
padding - safety)
wflow.analysis_time = segment(int(sciSegs[ifo][0][0]) + deadtime -
spec_len + padding - safety,
int(sciSegs[ifo][0][1]) - spec_len -
padding - safety)
else:
wflow.analysis_time = segment(int(sciSegs[ifo][0][0]) + deadtime + padding,
int(sciSegs[ifo][0][1]) - deadtime - padding)
Expand All @@ -176,7 +184,9 @@ ext_file = None
# DATAFIND
df_dir = os.path.join(curr_dir, "datafind")
datafind_files, _, sciSegs, _ = _workflow.setup_datafind_workflow(wflow,
sciSegs, df_dir, sciSegsFile)
sciSegs,
df_dir,
sciSegsFile)
if wflow.cp.has_option("workflow-condition_strain", "do-gating"):
new_seg = segment(sciSegs[ifo][0][0] + gate_pad,
sciSegs[ifo][0][1] - gate_pad)
Expand All @@ -196,7 +206,8 @@ if wflow.cp.has_option("workflow-condition_strain", "do-gating"):
wflow.cp = _workflow.set_grb_start_end(wflow.cp, int(sciSegs[ifo][0][0]),
int(sciSegs[ifo][0][1]))
gating_nodes, gated_files = _workflow.make_gating_node(wflow,
datafind_files, outdir=df_dir)
datafind_files,
outdir=df_dir)
gating_method = wflow.cp.get("workflow-condition_strain",
"gating-method")
for gating_node in gating_nodes:
Expand All @@ -207,21 +218,23 @@ if wflow.cp.has_option("workflow-condition_strain", "do-gating"):
wflow.execute_node(gating_node)
else:
msg = "[workflow-condition_strain] option 'gating-method' can "
msg += "only have one of the values 'IN_WORKFLOW' or 'AT_RUNTIME'. "
msg += "You have provided the value %s." % gating_method
msg += "only have one of the values 'IN_WORKFLOW' or 'AT_RUNTIME'."
msg += " You have provided the value %s." % gating_method
logging.error(msg)
sys.exit()
datafind_files = _workflow.FileList([])
for ifo in ifos:
gated_frames = _workflow.FileList([gated_frame for gated_frame in \
gated_files if gated_frame.ifo == ifo])
gated_frames = _workflow.FileList([gated_frame for gated_frame in
gated_files
if gated_frame.ifo == ifo])
# TODO: Remove .lcf cache here
gated_cache = _workflow.File(ifo, "gated",
gated_cache = _workflow.File(
ifo, "gated",
segment(int(wflow.cp.get("workflow", "start-time")),
int(wflow.cp.get("workflow", "end-time"))),
extension="lcf", directory=df_dir)
gated_cache.add_pfn(gated_cache.cache_entry.path, site="local")
gated_frames.convert_to_lal_cache().tofile(\
gated_frames.convert_to_lal_cache().tofile(
open(gated_cache.storage_path, "w"))
datafind_files.append(gated_cache)

Expand All @@ -235,7 +248,7 @@ wflow.ifos = ifos
if wflow.cp.has_option("workflow-inspiral", "ipn-search-points") \
and wflow.cp.has_option("workflow-injections", "ipn-sim-points"):
wflow.cp.set("injections", "ipn-gps-time",
wflow.cp.get("workflow", "trigger-time"))
wflow.cp.get("workflow", "trigger-time"))
IPN = True
elif wflow.cp.has_option("workflow-inspiral", "ipn-search-points") \
or wflow.cp.has_option("workflow-injections", "ipn-sim-points"):
Expand Down Expand Up @@ -278,7 +291,9 @@ if len(bank_files) > 1:
raise NotImplementedError("Multiple banks not supported")
full_bank_file = bank_files[0]
# Note: setup_splittable_workflow requires a FileList as input
splitbank_files = _workflow.setup_splittable_workflow(wflow, bank_files, df_dir,
splitbank_files = _workflow.setup_splittable_workflow(wflow,
bank_files,
df_dir,
tags=["inspiral"])
all_files.append(full_bank_file)
all_files.extend(splitbank_files)
Expand All @@ -295,6 +310,16 @@ if wflow.cp.has_section("workflow-injections"):
inj_caches = _workflow.FileList([])
inj_insp_caches = _workflow.FileList([])

# Given the stretch of time this workflow will analyse, and the onsource
# window with its buffer, generate the configuration file with the prior
# for the injections times and add it to the config parser
inj_method = wflow.cp.get("workflow-injections",
"injections-method")
if inj_method == "IN_WORKFLOW" and \
wflow.cp.has_option("workflow-injections", "tc-prior-at-runtime"):
tc_path = os.path.join(base_dir, "tc_prior.ini")
_workflow.generate_tc_prior(wflow, tc_path, bufferSeg)

# Generate injection files
if IPN:
# TODO: we used to pass this file to setup_injection_workflow as
Expand All @@ -320,13 +345,14 @@ if wflow.cp.has_section("workflow-injections"):
# Either split template bank for injections jobs or use same split banks
# as for standard matched filter jobs
if wflow.cp.has_section("workflow-splittable-injections"):
inj_splitbank_files = _workflow.setup_splittable_workflow(wflow,
bank_files, inj_dir, tags=["injections"])
inj_splitbank_files = _workflow.setup_splittable_workflow(
wflow, bank_files, inj_dir, tags=["injections"])
for inj_split in inj_splitbank_files:
split_str = [s for s in inj_split.tagged_description.split("_") \
split_str = [s for s in inj_split.tagged_description.split("_")
if ("BANK" in s and s[-1].isdigit())]
if len(split_str) != 0:
inj_split.tagged_description += "%s_%d" % (inj_split.tag_str,
inj_split.tagged_description += "%s_%d" % (
inj_split.tag_str,
int(split_str[0].replace("BANK", "")))
all_files.extend(inj_splitbank_files)
else:
Expand All @@ -338,34 +364,35 @@ if wflow.cp.has_section("workflow-injections"):
inj_split_files = _workflow.FileList([])
for inj_file, inj_tag in zip(inj_files, inj_tags):
file = _workflow.FileList([inj_file])
inj_splits = _workflow.setup_splittable_workflow(wflow, file,
inj_dir, tags=["split_inspinj", inj_tag])
inj_splits = _workflow.setup_splittable_workflow(
wflow, file, inj_dir, tags=["split_inspinj", inj_tag])
for inj_split in inj_splits:
split_str = [s for s in \
inj_split.tagged_description.split("_") \
split_str = [s for s in
inj_split.tagged_description.split("_")
if ("SPLIT" in s and s[-1].isdigit())]
if len(split_str) != 0:
new = inj_split.tagged_description.replace(split_str[0],
new = inj_split.tagged_description.replace(
split_str[0],
"SPLIT_%s" % split_str[0].replace("SPLIT", ""))
inj_split.tagged_description = new
inj_split_files.extend(inj_splits)
all_files.extend(inj_split_files)
injs = inj_split_files

# Generate injection matched filter workflow
inj_insp_files = _workflow.setup_matchedfltr_workflow(wflow, sciSegs,
datafind_veto_files, inj_splitbank_files, inj_dir, injs,
tags=[mf_tag + "_injections"])
inj_insp_files = _workflow.setup_matchedfltr_workflow(
wflow, sciSegs, datafind_veto_files, inj_splitbank_files,
inj_dir, injs, tags=[mf_tag + "_injections"])
for inj_insp_file in inj_insp_files:
split_str = [s for s in inj_insp_file.name.split("_") \
split_str = [s for s in inj_insp_file.name.split("_")
if ("SPLIT" in s and s[-1].isdigit())]
if len(split_str) != 0:
num = split_str[0].replace("SPLIT", "_")
inj_insp_file.tagged_description += num

# Make cache files (needed for post-processing)
for inj_tag in inj_tags:
files = _workflow.FileList([file for file in injs \
files = _workflow.FileList([file for file in injs
if inj_tag in file.tag_str])
inj_cache = _workflow.File(ifos, "injections", sciSegs[ifo][0],
extension="lcf", directory=inj_dir,
Expand All @@ -375,7 +402,7 @@ if wflow.cp.has_section("workflow-injections"):
inj_cache_entries = files.convert_to_lal_cache()
inj_cache_entries.tofile(open(inj_cache.storage_path, "w"))

files = _workflow.FileList([file for file in inj_insp_files \
files = _workflow.FileList([file for file in inj_insp_files
if inj_tag in file.tag_str])
inj_insp_cache = _workflow.File(ifos, "inspiral_injections",
sciSegs[ifo][0], extension="lcf",
Expand All @@ -391,7 +418,8 @@ if wflow.cp.has_section("workflow-injections"):

# MAIN MATCHED FILTERING
insp_dir = os.path.join(curr_dir, "inspiral")
inspiral_files = _workflow.setup_matchedfltr_workflow(wflow, sciSegs,
inspiral_files = _workflow.setup_matchedfltr_workflow(
wflow, sciSegs,
datafind_veto_files, splitbank_files, insp_dir,
tags=[mf_tag + "_no_injections"])
all_files.extend(inspiral_files)
Expand All @@ -415,8 +443,9 @@ results_files = _workflow.FileList([])
if post_proc_method == "PYGRB_OFFLINE":
trig_comb_files, clustered_files, inj_find_files =\
_workflow.setup_pygrb_pp_workflow(wflow, pp_dir, seg_dir,
sciSegs[ifo][0], full_bank_file, inspiral_files,
injs, inj_insp_files, inj_tags)
sciSegs[ifo][0], full_bank_file,
inspiral_files, injs,
inj_insp_files, inj_tags)
sec_name = 'workflow-pygrb_pp_workflow'
if not wflow.cp.has_section(sec_name):
msg = 'No {0} section found in configuration file.'.format(sec_name)
Expand Down
44 changes: 43 additions & 1 deletion pycbc/workflow/grb_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

from pycbc import makedir
from pycbc.workflow.core import \
File, FileList, configparser_value_to_file, resolve_url_to_file,\
File, FileList, configparser_value_to_file, resolve_url_to_file, \
Executable, Node
from pycbc.workflow.jobsetup import select_generic_executable
from pycbc.workflow.pegasus_workflow import SubWorkflow
Expand Down Expand Up @@ -238,6 +238,48 @@ def get_sky_grid_scale(
return out


def generate_tc_prior(wflow, tc_path, buffer_seg):
"""
Generate the configuration file for the prior on the coalescence
time of injections, ensuring that these times fall in the analysis
time and avoid the onsource and its buffer.
Parameters
----------
tc_path : str
Path where the configuration file for the prior needs to be written.
buffer_seg : segmentlist
Start and end times of the buffer segment encapsulating the onsource.
"""

# Write the tc-prior configuration file if it does not exist
if os.path.exists(tc_path):
raise ValueError("Refusing to overwrite %s." % tc_path)
tc_file = open(tc_path, "w")
tc_file.write("[prior-tc]\n")
tc_file.write("name = uniform\n")
tc_file.write("min-tc = %s\n" % wflow.analysis_time[0])
tc_file.write("max-tc = %s\n\n" % wflow.analysis_time[1])
tc_file.write("[constraint-tc]\n")
tc_file.write("name = custom\n")
tc_file.write("constraint_arg = (tc < %s) | (tc > %s)\n" %
(buffer_seg[0], buffer_seg[1]))
tc_file.close()

# Add the tc-prior configuration file url to wflow.cp if necessary
tc_file_path = "file://"+tc_path
for inj_sec in wflow.cp.get_subsections("injections"):
config_urls = wflow.cp.get("workflow-injections",
inj_sec+"-config-files")
config_urls = [url.strip() for url in config_urls.split(",")]
if tc_file_path not in config_urls:
config_urls += [tc_file_path]
config_urls = ', '.join([str(item) for item in config_urls])
wflow.cp.set("workflow-injections",
inj_sec+"-config-files",
config_urls)


def setup_pygrb_pp_workflow(wf, pp_dir, seg_dir, segment, bank_file,
insp_files, inj_files, inj_insp_files, inj_tags):
"""
Expand Down
6 changes: 3 additions & 3 deletions pycbc/workflow/segment.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,15 +381,15 @@ def generate_triggered_segment(workflow, out_dir, sciencesegs):
segmentsUtils.tosegwizard(open(bufferSegfile, "w"),
segments.segmentlist([bufferSegment]))

return onsource[best_comb], offsource[best_comb]
return onsource[best_comb], offsource[best_comb], bufferSegment

num_ifos -= 1

logger.warning("No suitable science segments available.")
try:
return None, offsource[best_comb]
return None, offsource[best_comb], None
except UnboundLocalError:
return None, min_seg
return None, min_seg, None

def get_flag_segments_file(workflow, name, option_name, out_dir, tags=None):
"""Get segments from option name syntax for each ifo for indivudal flags.
Expand Down

0 comments on commit db6f79b

Please sign in to comment.