Skip to content

Commit

Permalink
move collation into coadder class
Browse files Browse the repository at this point in the history
  • Loading branch information
arahlin committed Sep 6, 2023
1 parent 3961940 commit 69d28fe
Showing 1 changed file with 88 additions and 59 deletions.
147 changes: 88 additions & 59 deletions maps/python/map_modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,68 +504,119 @@ def ReplicateMaps(frame, input_map_id, output_map_ids, copy_weights=False):
@core.indexmod
class CoaddMaps(object):
"""
Coadd maps and weights.
Coadd maps and weights, optionally collating by map Id. This class can be
used as an argument to ``pipe.Add()`` as a standard pipeline module, or
instantiated as a standalone instance. In the latter case, the object is
treated as a callable for input map frames, and the ``coadd_frame`` or
``coadd_frames`` attribute contains the running coadd(s).
Attributes
----------
coadd_frame : G3Frame
Output coadd map frame, also injected into the pipeline on
EndProcessing. This attribute is only populated if the ``collate``
option is set to False.
coadd_frames : dict of G3Frames
Output coadd map frames, keyed by input map Id. Each frame is also
injected into the pipeline on EndProcessing. This attribute is only
populated if the ``collate`` option is set to True.
Arguments
---------
map_ids : list of str
List of map Id's to include in the coadd. If None, any maps
List of map Id's to include in the coadd(s). If None, any maps
in the pipeline are included.
output_map_id : str
Id to assign to the output frame.
Id to assign to the output frame. If ``collate`` is True, this argument
is required and treated as a prefix to which each input map Id is
appended.
collate : bool
If True, coadd unique map Id's into separate output map frames.
ignore_missing_weights : bool
If False (default), a warning is issued when the frame contains weighted
Stokes maps without a weights map. Set this option to True when feeding
single bolometer map frames with common weights through a pipeline.
ensure_weighted_maps : bool
weighted : bool
If True (default), ensure that maps have had weights applied before
coadding. Otherwise, blindly coadds all input maps.
coadding. Otherwise, coadd maps without checking the weights.
drop_input_frames : bool
If True, drop input map frames from the pipeline.
"""

def __init__(
self,
map_ids=None,
output_map_id=None,
output_map_id="Coadd",
collate=False,
weighted=True,
ignore_missing_weights=False,
ensure_weighted_maps=True,
drop_input_frames=False,
):
self.coadd_frame = core.G3Frame(core.G3FrameType.Map)
if output_map_id is not None:
self.coadd_frame["Id"] = output_map_id
if isinstance(map_ids, str):
map_ids = [map_ids]
self.map_ids = map_ids
if map_ids or len(map_ids) == 1:
collate = False
self.collate = collate
if self.collate:
self.coadd_frames = dict()
self.output_map_id = output_map_id
else:
self.coadd_frame = core.G3Frame(core.G3FrameType.Map)
self.coadd_frame["Id"] = output_map_id
self.weighted = weighted
self.ignore_missing_weights = ignore_missing_weights
self.ensure_weighted_maps = ensure_weighted_maps
self.drop_input_frames = drop_input_frames

def __call__(self, frame):

if frame.type == core.G3FrameType.EndProcessing:
if self.collate:
return list(self.coadd_frames.values()) + [frame]
return [self.coadd_frame, frame]

if "Id" not in frame:
return

if self.map_ids is not None and frame["Id"] not in self.map_ids:
map_id = frame["Id"]
if self.map_ids is not None and map_id not in self.map_ids:
return

ValidateMaps(frame, ignore_missing_weights=self.ignore_missing_weights)
input_weighted = True
if not self.ensure_weighted_maps and not frame["T"].weighted:
input_weighted = False
if self.weighted:
ApplyWeights(frame)

if self.collate:
if map_id not in self.coadd_frames:
fr = core.G3Frame(core.G3FrameType.Map)
fr["Id"] = self.output_map_id + map_id
self.coadd_frames[map_id] = fr
cfr = self.coadd_frames[map_id]
else:
cfr = self.coadd_frame

if "InputMapIds" in cfr:
map_ids = list(cfr.pop("InputMapIds"))
else:
map_ids = []
if "InputMapIds" in frame:
# allow for recursive coadds
map_ids += list(frame["InputMapIds"])
elif map_id not in map_ids:
map_ids += [map_id]
cfr["InputMapIds"] = core.G3VectorString(map_ids)

for key in ["T", "Q", "U", "Wpol", "Wunpol"]:
if key not in frame:
continue
if key not in self.coadd_frame:
self.coadd_frame[key] = frame[key].clone(False)
m = self.coadd_frame.pop(key)
if key not in cfr:
cfr[key] = frame[key].clone(False)
m = cfr.pop(key)
m += frame[key]
self.coadd_frame[key] = m
cfr[key] = m

if not input_weighted:
RemoveWeights(frame)
if self.drop_input_frames:
return False


@core.usefulfunc
Expand Down Expand Up @@ -613,51 +664,29 @@ def coadd_map_files(
# drop metadata frames
pipe.Add(lambda fr: fr.type == core.G3FrameType.Map)

# build coadded map with consistently handled weights
if weighted:
pipe.Add(ApplyWeights)

if collate:
# coadd each map_id into a separate output frame
coadders = dict()
for mid in map_ids:
cid = output_map_id + mid
coadder = CoaddMaps(
map_ids=[mid],
output_map_id=cid,
ensure_weighted_maps=False,
)
coadder.coadd_frame["InputFiles"] = core.G3VectorString(input_files)
coadder.coadd_frame["InputMapIds"] = core.G3VectorString([mid])
coadders[cid] = coadder
pipe.Add(coadder)
pipe.Add(RecordInputs, coadd_id=cid, map_ids=[mid], input_files=input_files)

else:
# coadd all map_id's into one output frame
coadder = CoaddMaps(
map_ids=map_ids,
output_map_id=output_map_id,
ensure_weighted_maps=False,
)
coadder.coadd_frame["InputFiles"] = core.G3VectorString(input_files)
coadder.coadd_frame["InputMapIds"] = core.G3VectorString(map_ids)
pipe.Add(coadder)
pipe.Add(
RecordInputs,
coadd_id=output_map_id,
map_ids=map_ids,
input_files=input_files,
)
# build coadds
coadder = CoaddMaps(
map_ids=map_ids,
output_map_id=output_map_id,
collate=collate,
weighted=weighted,
drop_input_frames=True,
)
pipe.Add(coadder)

def RecordInputFiles(frame):
if frame.type != core.G3FrameType.Map:
return
frame["InputFiles"] = core.G3VectorString(input_files)

pipe.Add(lambda fr: "Id" not in fr or fr["Id"].startswith(output_map_id))
pipe.Add(RecordInputFiles)

if output_file:
pipe.Add(core.G3Writer, filename=output_file)
pipe.Run()

if collate:
return {k: v.coadd_frame for k, v in coadders.items()}
return coadder.coadd_frames
return coadder.coadd_frame


Expand Down

0 comments on commit 69d28fe

Please sign in to comment.