Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PyGRB now establishes the tc prior for injections at workflow time #4751

Merged
merged 5 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 77 additions & 48 deletions bin/pygrb/pycbc_make_offline_grb_workflow
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,13 @@ __version__ = pycbc.version.git_verbose_msg
__date__ = pycbc.version.date
__program__ = "pycbc_make_offline_grb_workflow"

import shutil
import sys
import os
import argparse
import logging
import pycbc.workflow as _workflow
from pycbc.workflow.core import configparser_value_to_file
from ligo.segments import segment, segmentlist, segmentlistdict
from ligo.segments import segment, segmentlistdict
import matplotlib
matplotlib.use('agg')
from pycbc.results.pygrb_plotting_utils import make_grb_segments_plot
Expand Down Expand Up @@ -79,18 +78,25 @@ wflow.cp = _workflow.set_grb_start_end(wflow.cp, start, end)
# Retrieve science segments
curr_dir = os.getcwd()
seg_dir = os.path.join(curr_dir, "segments")
sciSegsFile = _workflow.get_segments_file(wflow, 'science', 'segments-science', seg_dir)
sciSegsFile = _workflow.get_segments_file(wflow,
'science',
'segments-science',
seg_dir)

sciSegs = {}
for ifo in wflow.ifos:
sciSegs[ifo] = sciSegsFile.segment_dict[ifo+':science']

# This block of code and PyCBCMultiInspiralExecutable.get_valid_times()
# must be consistent
if wflow.cp.has_option("inspiral", "segment-start-pad"):
pad_data = int(wflow.cp.get("inspiral", "pad-data"))
start_pad = int(wflow.cp.get("inspiral", "segment-start-pad"))
end_pad = int(wflow.cp.get("inspiral", "segment-end-pad"))
wflow.cp.set("workflow-exttrig_segments", "min-before",str(start_pad+pad_data))
wflow.cp.set("workflow-exttrig_segments", "min-after",str(end_pad+pad_data))
wflow.cp.set("workflow-exttrig_segments", "min-before",
str(start_pad+pad_data))
wflow.cp.set("workflow-exttrig_segments", "min-after",
str(end_pad+pad_data))
elif wflow.cp.has_option("inspiral", "analyse-segment-end"):
safety = 1
deadtime = int(wflow.cp.get("inspiral", "segment-length")) / 2
Expand All @@ -108,31 +114,33 @@ else:
single_ifo = wflow.cp.has_option("workflow", "allow-single-ifo-search")
if len(sciSegs.keys()) == 0:
plot_met = make_grb_segments_plot(wflow, segmentlistdict(), triggertime,
triggername, seg_dir)
triggername, seg_dir)
logging.warning("No science segments available.")
sys.exit()
elif len(sciSegs.keys()) < 2 and not single_ifo:
plot_met = make_grb_segments_plot(wflow, segmentlistdict(sciSegs),
triggertime, triggername, seg_dir)
triggertime, triggername, seg_dir)
msg = "Science segments exist only for %s. " % tuple(sciSegs.keys())[0]
msg += "If you wish to enable single IFO running add the option "
msg += "'allow-single-ifo-search' to the [workflow] section of your "
msg += "configuration file."
logging.warning(msg)
sys.exit()
else:
onSrc, offSrc = _workflow.generate_triggered_segment(wflow, seg_dir,
sciSegs)
onSrc, offSrc, bufferSeg = _workflow.generate_triggered_segment(wflow,
seg_dir,
sciSegs)

sciSegs = segmentlistdict(sciSegs)
if onSrc is None:
plot_met = make_grb_segments_plot(wflow, sciSegs, triggertime, triggername,
seg_dir, fail_criterion=offSrc)
seg_dir, fail_criterion=offSrc)
logging.info("Making segment plot and exiting.")
sys.exit()
else:
plot_met = make_grb_segments_plot(wflow, sciSegs, triggertime, triggername,
seg_dir, coherent_seg=offSrc[tuple(offSrc.keys())[0]][0])
plot_met = make_grb_segments_plot(
wflow, sciSegs, triggertime, triggername, seg_dir,
coherent_seg=offSrc[tuple(offSrc.keys())[0]][0])
segs_plot = _workflow.File(plot_met[0], plot_met[1], plot_met[2],
file_url=plot_met[3])
segs_plot.add_pfn(segs_plot.cache_entry.path, site="local")
Expand All @@ -158,15 +166,15 @@ if wflow.cp.has_option("workflow-condition_strain", "do-gating"):
if wflow.cp.has_option("inspiral", "segment-start-pad"):
start_pad = int(wflow.cp.get("inspiral", "segment-start-pad"))
end_pad = int(wflow.cp.get("inspiral", "segment-end-pad"))
wflow.analysis_time = segment(int(sciSegs[ifo][0][0]) + \
start_pad + padding,
int(sciSegs[ifo][0][1]) - \
padding - end_pad)
wflow.analysis_time = segment(int(sciSegs[ifo][0][0]) +
start_pad + padding,
int(sciSegs[ifo][0][1]) -
padding - end_pad)
elif wflow.cp.has_option("inspiral", "analyse-segment-end"):
wflow.analysis_time = segment(int(sciSegs[ifo][0][0]) + deadtime - \
spec_len + padding - safety,
int(sciSegs[ifo][0][1]) - spec_len - \
padding - safety)
wflow.analysis_time = segment(int(sciSegs[ifo][0][0]) + deadtime -
spec_len + padding - safety,
int(sciSegs[ifo][0][1]) - spec_len -
padding - safety)
else:
wflow.analysis_time = segment(int(sciSegs[ifo][0][0]) + deadtime + padding,
int(sciSegs[ifo][0][1]) - deadtime - padding)
Expand All @@ -176,7 +184,9 @@ ext_file = None
# DATAFIND
df_dir = os.path.join(curr_dir, "datafind")
datafind_files, _, sciSegs, _ = _workflow.setup_datafind_workflow(wflow,
sciSegs, df_dir, sciSegsFile)
sciSegs,
df_dir,
sciSegsFile)
if wflow.cp.has_option("workflow-condition_strain", "do-gating"):
new_seg = segment(sciSegs[ifo][0][0] + gate_pad,
sciSegs[ifo][0][1] - gate_pad)
Expand All @@ -196,7 +206,8 @@ if wflow.cp.has_option("workflow-condition_strain", "do-gating"):
wflow.cp = _workflow.set_grb_start_end(wflow.cp, int(sciSegs[ifo][0][0]),
int(sciSegs[ifo][0][1]))
gating_nodes, gated_files = _workflow.make_gating_node(wflow,
datafind_files, outdir=df_dir)
datafind_files,
outdir=df_dir)
gating_method = wflow.cp.get("workflow-condition_strain",
"gating-method")
for gating_node in gating_nodes:
Expand All @@ -207,21 +218,23 @@ if wflow.cp.has_option("workflow-condition_strain", "do-gating"):
wflow.execute_node(gating_node)
else:
msg = "[workflow-condition_strain] option 'gating-method' can "
msg += "only have one of the values 'IN_WORKFLOW' or 'AT_RUNTIME'. "
msg += "You have provided the value %s." % gating_method
msg += "only have one of the values 'IN_WORKFLOW' or 'AT_RUNTIME'."
msg += " You have provided the value %s." % gating_method
logging.error(msg)
sys.exit()
datafind_files = _workflow.FileList([])
for ifo in ifos:
gated_frames = _workflow.FileList([gated_frame for gated_frame in \
gated_files if gated_frame.ifo == ifo])
gated_frames = _workflow.FileList([gated_frame for gated_frame in
gated_files
if gated_frame.ifo == ifo])
# TODO: Remove .lcf cache here
gated_cache = _workflow.File(ifo, "gated",
gated_cache = _workflow.File(
ifo, "gated",
segment(int(wflow.cp.get("workflow", "start-time")),
int(wflow.cp.get("workflow", "end-time"))),
extension="lcf", directory=df_dir)
gated_cache.add_pfn(gated_cache.cache_entry.path, site="local")
gated_frames.convert_to_lal_cache().tofile(\
gated_frames.convert_to_lal_cache().tofile(
open(gated_cache.storage_path, "w"))
datafind_files.append(gated_cache)

Expand All @@ -235,7 +248,7 @@ wflow.ifos = ifos
if wflow.cp.has_option("workflow-inspiral", "ipn-search-points") \
and wflow.cp.has_option("workflow-injections", "ipn-sim-points"):
wflow.cp.set("injections", "ipn-gps-time",
wflow.cp.get("workflow", "trigger-time"))
wflow.cp.get("workflow", "trigger-time"))
IPN = True
elif wflow.cp.has_option("workflow-inspiral", "ipn-search-points") \
or wflow.cp.has_option("workflow-injections", "ipn-sim-points"):
Expand Down Expand Up @@ -278,7 +291,9 @@ if len(bank_files) > 1:
raise NotImplementedError("Multiple banks not supported")
full_bank_file = bank_files[0]
# Note: setup_splittable_workflow requires a FileList as input
splitbank_files = _workflow.setup_splittable_workflow(wflow, bank_files, df_dir,
splitbank_files = _workflow.setup_splittable_workflow(wflow,
bank_files,
df_dir,
tags=["inspiral"])
all_files.append(full_bank_file)
all_files.extend(splitbank_files)
Expand All @@ -295,6 +310,16 @@ if wflow.cp.has_section("workflow-injections"):
inj_caches = _workflow.FileList([])
inj_insp_caches = _workflow.FileList([])

# Given the stretch of time this workflow will analyse, and the onsource
# window with its buffer, generate the configuration file with the prior
# for the injections times and add it to the config parser
inj_method = wflow.cp.get("workflow-injections",
"injections-method")
if inj_method == "IN_WORKFLOW" and \
wflow.cp.has_option("workflow-injections", "tc-prior-at-runtime"):
tc_path = os.path.join(base_dir, "tc_prior.ini")
_workflow.generate_tc_prior(wflow, tc_path, bufferSeg)

# Generate injection files
if IPN:
# TODO: we used to pass this file to setup_injection_workflow as
Expand All @@ -320,13 +345,14 @@ if wflow.cp.has_section("workflow-injections"):
# Either split template bank for injections jobs or use same split banks
# as for standard matched filter jobs
if wflow.cp.has_section("workflow-splittable-injections"):
inj_splitbank_files = _workflow.setup_splittable_workflow(wflow,
bank_files, inj_dir, tags=["injections"])
inj_splitbank_files = _workflow.setup_splittable_workflow(
wflow, bank_files, inj_dir, tags=["injections"])
for inj_split in inj_splitbank_files:
split_str = [s for s in inj_split.tagged_description.split("_") \
split_str = [s for s in inj_split.tagged_description.split("_")
if ("BANK" in s and s[-1].isdigit())]
if len(split_str) != 0:
inj_split.tagged_description += "%s_%d" % (inj_split.tag_str,
inj_split.tagged_description += "%s_%d" % (
inj_split.tag_str,
int(split_str[0].replace("BANK", "")))
all_files.extend(inj_splitbank_files)
else:
Expand All @@ -338,34 +364,35 @@ if wflow.cp.has_section("workflow-injections"):
inj_split_files = _workflow.FileList([])
for inj_file, inj_tag in zip(inj_files, inj_tags):
file = _workflow.FileList([inj_file])
inj_splits = _workflow.setup_splittable_workflow(wflow, file,
inj_dir, tags=["split_inspinj", inj_tag])
inj_splits = _workflow.setup_splittable_workflow(
wflow, file, inj_dir, tags=["split_inspinj", inj_tag])
for inj_split in inj_splits:
split_str = [s for s in \
inj_split.tagged_description.split("_") \
split_str = [s for s in
inj_split.tagged_description.split("_")
if ("SPLIT" in s and s[-1].isdigit())]
if len(split_str) != 0:
new = inj_split.tagged_description.replace(split_str[0],
new = inj_split.tagged_description.replace(
split_str[0],
"SPLIT_%s" % split_str[0].replace("SPLIT", ""))
inj_split.tagged_description = new
inj_split_files.extend(inj_splits)
all_files.extend(inj_split_files)
injs = inj_split_files

# Generate injection matched filter workflow
inj_insp_files = _workflow.setup_matchedfltr_workflow(wflow, sciSegs,
datafind_veto_files, inj_splitbank_files, inj_dir, injs,
tags=[mf_tag + "_injections"])
inj_insp_files = _workflow.setup_matchedfltr_workflow(
wflow, sciSegs, datafind_veto_files, inj_splitbank_files,
inj_dir, injs, tags=[mf_tag + "_injections"])
for inj_insp_file in inj_insp_files:
split_str = [s for s in inj_insp_file.name.split("_") \
split_str = [s for s in inj_insp_file.name.split("_")
if ("SPLIT" in s and s[-1].isdigit())]
if len(split_str) != 0:
num = split_str[0].replace("SPLIT", "_")
inj_insp_file.tagged_description += num

# Make cache files (needed for post-processing)
for inj_tag in inj_tags:
files = _workflow.FileList([file for file in injs \
files = _workflow.FileList([file for file in injs
if inj_tag in file.tag_str])
inj_cache = _workflow.File(ifos, "injections", sciSegs[ifo][0],
extension="lcf", directory=inj_dir,
Expand All @@ -375,7 +402,7 @@ if wflow.cp.has_section("workflow-injections"):
inj_cache_entries = files.convert_to_lal_cache()
inj_cache_entries.tofile(open(inj_cache.storage_path, "w"))

files = _workflow.FileList([file for file in inj_insp_files \
files = _workflow.FileList([file for file in inj_insp_files
if inj_tag in file.tag_str])
inj_insp_cache = _workflow.File(ifos, "inspiral_injections",
sciSegs[ifo][0], extension="lcf",
Expand All @@ -391,7 +418,8 @@ if wflow.cp.has_section("workflow-injections"):

# MAIN MATCHED FILTERING
insp_dir = os.path.join(curr_dir, "inspiral")
inspiral_files = _workflow.setup_matchedfltr_workflow(wflow, sciSegs,
inspiral_files = _workflow.setup_matchedfltr_workflow(
wflow, sciSegs,
datafind_veto_files, splitbank_files, insp_dir,
tags=[mf_tag + "_no_injections"])
all_files.extend(inspiral_files)
Expand All @@ -415,8 +443,9 @@ results_files = _workflow.FileList([])
if post_proc_method == "PYGRB_OFFLINE":
trig_comb_files, clustered_files, inj_find_files, inj_comb_files =\
_workflow.setup_pygrb_pp_workflow(wflow, pp_dir, seg_dir,
sciSegs[ifo][0], full_bank_file, inspiral_files,
injs, inj_insp_files, inj_tags)
sciSegs[ifo][0], full_bank_file,
inspiral_files, injs,
inj_insp_files, inj_tags)
sec_name = 'workflow-pygrb_pp_workflow'
if not wflow.cp.has_section(sec_name):
msg = 'No {0} section found in configuration file.'.format(sec_name)
Expand Down
44 changes: 43 additions & 1 deletion pycbc/workflow/grb_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

from pycbc import makedir
from pycbc.workflow.core import \
File, FileList, configparser_value_to_file, resolve_url_to_file,\
File, FileList, configparser_value_to_file, resolve_url_to_file, \
Executable, Node
from pycbc.workflow.jobsetup import select_generic_executable
from pycbc.workflow.pegasus_workflow import SubWorkflow
Expand Down Expand Up @@ -239,6 +239,48 @@ def get_sky_grid_scale(
return out


def generate_tc_prior(wflow, tc_path, buffer_seg):
"""
Generate the configuration file for the prior on the coalescence
time of injections, ensuring that these times fall in the analysis
time and avoid the onsource and its buffer.

Parameters
----------
tc_path : str
Path where the configuration file for the prior needs to be written.
buffer_seg : segmentlist
Start and end times of the buffer segment encapsulating the onsource.
"""

# Write the tc-prior configuration file if it does not exist
if os.path.exists(tc_path):
raise ValueError("Refusing to overwrite %s." % tc_path)
tc_file = open(tc_path, "w")
tc_file.write("[prior-tc]\n")
tc_file.write("name = uniform\n")
tc_file.write("min-tc = %s\n" % wflow.analysis_time[0])
tc_file.write("max-tc = %s\n\n" % wflow.analysis_time[1])
tc_file.write("[constraint-tc]\n")
tc_file.write("name = custom\n")
tc_file.write("constraint_arg = (tc < %s) | (tc > %s)\n" %
(buffer_seg[0], buffer_seg[1]))
tc_file.close()

# Add the tc-prior configuration file url to wflow.cp if necessary
tc_file_path = "file://"+tc_path
for inj_sec in wflow.cp.get_subsections("injections"):
config_urls = wflow.cp.get("workflow-injections",
inj_sec+"-config-files")
config_urls = [url.strip() for url in config_urls.split(",")]
if tc_file_path not in config_urls:
config_urls += [tc_file_path]
config_urls = ', '.join([str(item) for item in config_urls])
wflow.cp.set("workflow-injections",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a general comment, I'm not sure I like the method of "generate new file and then copy it around by editing the ConfigParser object". In my vision for the workflow module the ConfigParser object should be immutable (after directives and shortcuts have been resolved when it's read in).

I would prefer to pass any new files around.

Copy link
Contributor Author

@pannarale pannarale May 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm not fond of that either but I can't think of another solution because create_injections does not admit specifying a prior like that from command line, as far as I understand. PyGRB has already been forced to edit the ConfigParser for years here, by the way :-/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy using a File to store the prior, but I think it should be passed around the workflow as a File object, not by editing the ini file.

However, that's just my opinion, and I'm happy for you to set standards in the grb_utils module, so approval is given if you want to merge as is.

inj_sec+"-config-files",
config_urls)


def setup_pygrb_pp_workflow(wf, pp_dir, seg_dir, segment, bank_file,
insp_files, inj_files, inj_insp_files, inj_tags):
"""
Expand Down
6 changes: 3 additions & 3 deletions pycbc/workflow/segment.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,15 +381,15 @@ def generate_triggered_segment(workflow, out_dir, sciencesegs):
segmentsUtils.tosegwizard(open(bufferSegfile, "w"),
segments.segmentlist([bufferSegment]))

return onsource[best_comb], offsource[best_comb]
return onsource[best_comb], offsource[best_comb], bufferSegment

num_ifos -= 1

logger.warning("No suitable science segments available.")
try:
return None, offsource[best_comb]
return None, offsource[best_comb], None
except UnboundLocalError:
return None, min_seg
return None, min_seg, None

def get_flag_segments_file(workflow, name, option_name, out_dir, tags=None):
"""Get segments from option name syntax for each ifo for indivudal flags.
Expand Down
Loading