Skip to content

Commit

Permalink
Merge pull request #57 from AlecThomson/mfw
Browse files Browse the repository at this point in the history
DR2 preparation improvements
  • Loading branch information
AlecThomson authored Mar 20, 2024
2 parents e78c91a + d91f8eb commit 418ec25
Show file tree
Hide file tree
Showing 7 changed files with 277 additions and 18 deletions.
8 changes: 3 additions & 5 deletions arrakis/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,9 @@ def cleanup(workdir: str, stokeslist: List[str]) -> None:
return
for stoke in stokeslist:
# Clean up beam images
# old_files = glob(f"{workdir}/*.cutout.*.{stoke.lower()}.*beam[00-36]*.fits")
# for old in old_files:
# os.remove(old)

...
old_files = glob(f"{workdir}/*.cutout.*.{stoke.lower()}.*beam[00-36]*.fits")
for old in old_files:
os.remove(old)


@flow(name="Cleanup")
Expand Down
8 changes: 4 additions & 4 deletions arrakis/configs/petrichor.yaml
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
# Set up for Petrichor
cluster_class: "dask_jobqueue.SLURMCluster"
cluster_kwargs:
cores: 8
processes: 8
cores: 16
processes: 1
name: 'spice-worker'
memory: "64GiB"
memory: "128GiB"
account: 'OD-217087'
#queue: 'workq'
walltime: '0-01:00:00'
walltime: '1-00:00:00'
job_extra_directives: ['--qos express']
# interface for the workers
interface: "ib0"
Expand Down
26 changes: 26 additions & 0 deletions arrakis/configs/rm_petrichor.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Set up for Petrichor
cluster_class: "dask_jobqueue.SLURMCluster"
cluster_kwargs:
cores: 4
processes: 4
name: 'spice-worker'
memory: "256GiB"
account: 'OD-217087'
walltime: '0-01:00:00'
job_extra_directives: ['--qos express']
# interface for the workers
interface: "ib0"
log_directory: 'spice_logs'
job_script_prologue: [
'module load singularity',
'unset SINGULARITY_BINDPATH',
'export OMP_NUM_THREADS=1'
]
local_directory: $LOCALDIR
silence_logs: 'info'
adapt_kwargs:
minimum_jobs: 36
maximum_jobs: 128
wait_count: 20
target_duration: "5s"
interval: "10s"
5 changes: 3 additions & 2 deletions arrakis/frion.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,10 +239,11 @@ def main(
field_col = get_field_db(
host=host, epoch=epoch, username=username, password=password
)
query_3 = {"FIELD_NAME": f"{field}"}
# SELECT '1' is best field according to the database
query_3 = {"$and": [{"FIELD_NAME": f"{field}"}, {"SELECT": 1}]}
logger.info(f"{query_3}")

# Get most recent SBID
# Get most recent SBID if more than one is 'SELECT'ed
if field_col.count_documents(query_3) > 1:
field_datas = list(field_col.find({"FIELD_NAME": f"{field}"}))
sbids = [f["CAL_SBID"] for f in field_datas]
Expand Down
24 changes: 17 additions & 7 deletions arrakis/imager.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def image_beam(
simage: Path,
pols: str = "IQU",
nchan: int = 36,
scale: u.Quantity = 2.5 * u.arcsec,
scale: float = 2.5,
npix: int = 4096,
join_polarizations: bool = True,
join_channels: bool = True,
Expand All @@ -155,6 +155,8 @@ def image_beam(
multiscale: bool = False,
multiscale_scale_bias: Optional[float] = None,
data_column: str = "CORRECTED_DATA",
no_mf_weighting: bool = False,
no_update_model_required: bool = True,
) -> ImageSet:
"""Image a single beam"""
logger = get_run_logger()
Expand All @@ -170,7 +172,7 @@ def image_beam(
pol="I",
verbose=True,
channels_out=nchan,
scale=f"{scale.to(u.arcsec).value}asec",
scale=f"{scale}asec",
size=f"{npix} {npix}",
join_polarizations=False, # Only do I
join_channels=join_channels,
Expand All @@ -195,7 +197,8 @@ def image_beam(
multiscale_scale_bias=multiscale_scale_bias,
multiscale=multiscale,
data_column=data_column,
no_mf_weighting=True,
no_mf_weighting=no_mf_weighting,
no_update_model_required=no_update_model_required,
)
commands.append(command)
pols = pols.replace("I", "")
Expand All @@ -222,7 +225,7 @@ def image_beam(
pol=pols,
verbose=True,
channels_out=nchan,
scale=f"{scale.to(u.arcsec).value}asec",
scale=f"{scale}asec",
size=f"{npix} {npix}",
join_polarizations=join_polarizations,
join_channels=join_channels,
Expand All @@ -247,7 +250,8 @@ def image_beam(
multiscale=multiscale,
multiscale_scale_bias=multiscale_scale_bias,
data_column=data_column,
no_mf_weighting=True,
no_mf_weighting=no_mf_weighting,
no_update_model_required=no_update_model_required,
)
commands.append(command)

Expand Down Expand Up @@ -595,14 +599,13 @@ def main(
ms_glob_pattern: str = "scienceData*_averaged_cal.leakage.ms",
data_column: str = "CORRECTED_DATA",
skip_fix_ms: bool = False,
no_mf_weighting: bool = False,
):
simage = get_wsclean(wsclean=wsclean_path)

logger.info(f"Searching {msdir} for MS matching {ms_glob_pattern}.")
mslist = sorted(msdir.glob(ms_glob_pattern))

scale = scale * u.arcsecond

assert (len(mslist) > 0) & (
len(mslist) == 36
), f"Incorrect number of MS files found: {len(mslist)} / 36"
Expand Down Expand Up @@ -660,6 +663,7 @@ def main(
multiscale_scale_bias=multiscale_scale_bias,
absmem=absmem,
data_column=data_column,
no_mf_weighting=no_mf_weighting,
)

# Compute the smallest beam that all images can be convolved to.
Expand Down Expand Up @@ -889,6 +893,11 @@ def imager_parser(parent_parser: bool = False) -> argparse.ArgumentParser:
default="CORRECTED_DATA",
help="Which column in the measurement set to image. ",
)
parser.add_argument(
"--no_mf_weighting",
action="store_true",
help="Do not use multi-frequency weighting. ",
)
parser.add_argument(
"--skip_fix_ms",
action="store_true",
Expand Down Expand Up @@ -943,6 +952,7 @@ def cli():
ms_glob_pattern=args.ms_glob_pattern,
data_column=args.data_column,
skip_fix_ms=args.skip_fix_ms,
no_mf_weighting=args.no_mf_weighting,
)


Expand Down
1 change: 1 addition & 0 deletions arrakis/process_spice.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ def main(args: configargparse.Namespace) -> None:
ms_glob_pattern=args.ms_glob_pattern,
data_column=args.data_column,
skip_fix_ms=args.skip_fix_ms,
no_mf_weighting=args.no_mf_weighting,
)
client = dask_runner._client
if client is not None:
Expand Down
Loading

0 comments on commit 418ec25

Please sign in to comment.