Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Submit #66

Merged
merged 8 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.6.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- id: check-added-large-files
- repo: https://github.com/psf/black
rev: 24.4.0
hooks:
- id: black
- repo: https://github.com/PyCQA/isort
rev: 5.13.2
hooks:
- id: isort
args: ["--profile=black"]
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.4.1
hooks:
# Run the linter.
- id: ruff
args: [ --fix ]
# Run the formatter.
- id: ruff-format
# - repo: https://github.com/pre-commit/pre-commit-hooks
# rev: v4.6.0
# hooks:
# - id: trailing-whitespace
# - id: end-of-file-fixer
# - id: check-yaml
# - id: check-added-large-files

ci:
autofix_commit_msg: |
Expand Down
1 change: 1 addition & 0 deletions arrakis/cleanup.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/usr/bin/env python3
"""DANGER ZONE: Purge directories of un-needed FITS files."""

import argparse
import logging
import shutil
Expand Down
4 changes: 2 additions & 2 deletions arrakis/configs/petrichor.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# Set up for Petrichor
cluster_class: "dask_jobqueue.SLURMCluster"
cluster_kwargs:
cores: 16
cores: 20
processes: 1
name: 'spice-worker'
memory: "128GiB"
memory: "160GiB"
account: 'OD-217087'
#queue: 'workq'
walltime: '0-12:00:00'
Expand Down
10 changes: 2 additions & 8 deletions arrakis/cutout.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
#!/usr/bin/env python
"""Produce cutouts from RACS cubes"""

import argparse
import logging
import os
import pickle
import warnings
from concurrent.futures import ThreadPoolExecutor
from glob import glob
from pathlib import Path
from pprint import pformat
from shutil import copyfile
from typing import Dict, List
from typing import List, Optional, Set, TypeVar
from typing import NamedTuple as Struct
from typing import Optional, Set, TypeVar, Union

import astropy.units as u
import numpy as np
Expand All @@ -36,7 +33,6 @@
validate_sbid_field_pair,
)
from arrakis.utils.fitsutils import fix_header
from arrakis.utils.io import try_mkdir
from arrakis.utils.pipeline import generic_parser, logo_str, workdir_arg_parser

iers.conf.auto_download = False
Expand Down Expand Up @@ -79,7 +75,6 @@ def cutout_weight(
beam_num: int,
dryrun=False,
) -> pymongo.UpdateOne:

# Update database
myquery = {"Source_ID": source_id}

Expand Down Expand Up @@ -337,7 +332,6 @@ def big_cutout(
password: Optional[str] = None,
limit: Optional[int] = None,
) -> List[pymongo.UpdateOne]:

wild = f"image.restored.{stoke.lower()}*contcube*beam{beam_num:02}.conv.fits"
images = list(datadir.glob(wild))
if len(images) == 0:
Expand Down
100 changes: 49 additions & 51 deletions arrakis/frion.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,34 @@
#!/usr/bin/env python3
"""Correct for the ionosphere in parallel"""

import argparse
import logging
import os
from glob import glob
from pathlib import Path
from pprint import pformat
from shutil import copyfile
from typing import Callable, Dict, List
from typing import Callable, Dict, List, Optional, Union
from typing import NamedTuple as Struct
from typing import Optional, Union

import astropy.units as u
import numpy as np
import pymongo
from astropy.time import Time, TimeDelta
from FRion import correct, predict
from prefect import flow, task, unmapped
from prefect import flow, task
from tqdm.auto import tqdm

from arrakis.logger import UltimateHelpFormatter, logger
from arrakis.logger import TqdmToLogger, UltimateHelpFormatter, logger
from arrakis.utils.database import (
get_db,
get_field_db,
test_db,
validate_sbid_field_pair,
)
from arrakis.utils.fitsutils import getfreq
from arrakis.utils.io import try_mkdir
from arrakis.utils.pipeline import generic_parser, logo_str
from arrakis.utils.pipeline import generic_parser, logo_str, workdir_arg_parser

logger.setLevel(logging.INFO)
TQDM_OUT = TqdmToLogger(logger, level=logging.INFO)


class Prediction(Struct):
Expand Down Expand Up @@ -89,8 +88,8 @@ def predict_worker(
start_time: Time,
end_time: Time,
freq: np.ndarray,
cutdir: str,
plotdir: str,
cutdir: Path,
plotdir: Path,
server: str = "ftp://ftp.aiub.unibe.ch/CODE/",
prefix: str = "",
formatter: Optional[Union[str, Callable]] = None,
Expand All @@ -114,8 +113,8 @@ def predict_worker(
"""
logger.setLevel(logging.INFO)

ifile = os.path.join(cutdir, beam["beams"][field]["i_file"])
i_dir = os.path.dirname(ifile)
ifile: Path = cutdir / beam["beams"][field]["i_file"]
i_dir = ifile.parent
iname = island["Source_ID"]
ra = island["RA"]
dec = island["Dec"]
Expand All @@ -137,7 +136,7 @@ def predict_worker(
ra=ra,
dec=dec,
timestep=300.0,
ionexPath=os.path.join(os.path.dirname(cutdir), "IONEXdata"),
ionexPath=cutdir.parent / "IONEXdata",
server=server,
proxy_server=proxy_server,
use_proxy=True, # Always use proxy - forces urllib
Expand All @@ -146,23 +145,10 @@ def predict_worker(
pre_download=pre_download,
**proxy_args,
)
logger.info(f"Predicted modulation for {iname}.")
predict_file = os.path.join(i_dir, f"{iname}_ion.txt")
predict.write_modulation(freq_array=freq, theta=theta, filename=predict_file)

plot_file = os.path.join(i_dir, f"{iname}_ion.png")
try:
predict.generate_plots(
times, RMs, theta, freq, position=[ra, dec], savename=plot_file
)
except Exception as e:
logger.error(f"Failed to generate plot: {e}")

plot_files = glob(os.path.join(i_dir, "*ion.png"))
logger.info(f"Plotting files: {plot_files=}")
for src in plot_files:
base = os.path.basename(src)
dst = os.path.join(plotdir, base)
copyfile(src, dst)
logger.info(f"Prediction file: {predict_file}")

myquery = {"Source_ID": iname}

Expand Down Expand Up @@ -324,29 +310,40 @@ def main(
formatter=ionex_formatter,
pre_download=ionex_predownload,
)
predictions = predict_worker.map(
island=islands,
field=unmapped(field),
beam=beams_cor,
start_time=unmapped(start_time),
end_time=unmapped(end_time),
freq=unmapped(freq.to(u.Hz).value),
cutdir=unmapped(cutdir),
plotdir=unmapped(plotdir),
server=unmapped(ionex_server),
prefix=unmapped(ionex_prefix),
proxy_server=unmapped(ionex_proxy_server),
formatter=unmapped(ionex_formatter),
pre_download=unmapped(ionex_predownload),
)

corrections = correct_worker.map(
beam=beams_cor,
outdir=unmapped(cutdir),
field=unmapped(field),
prediction=predictions,
island=islands,
)
predictions = []
corrections = []
assert len(islands) == len(beams_cor), "Islands and beams must be the same length"
for island, beam in tqdm(
zip(islands, beams_cor),
desc="Submitting tasks",
file=TQDM_OUT,
total=len(islands),
):
prediction = predict_worker.submit(
island=island,
field=field,
beam=beam,
start_time=start_time,
end_time=end_time,
freq=freq.to(u.Hz).value,
cutdir=cutdir,
plotdir=plotdir,
server=ionex_server,
prefix=ionex_prefix,
proxy_server=ionex_proxy_server,
formatter=ionex_formatter,
pre_download=ionex_predownload,
)
predictions.append(prediction)
correction = correct_worker.submit(
beam=beam,
outdir=cutdir,
field=field,
prediction=prediction,
island=island,
)
corrections.append(correction)

updates_arrays = [p.result().update for p in predictions]
updates = [c.result() for c in corrections]
Expand Down Expand Up @@ -426,9 +423,10 @@ def cli():
warnings.simplefilter("ignore", category=RuntimeWarning)

gen_parser = generic_parser(parent_parser=True)
work_parser = workdir_arg_parser(parent_parser=True)
f_parser = frion_parser(parent_parser=True)
parser = argparse.ArgumentParser(
parents=[gen_parser, f_parser],
parents=[gen_parser, work_parser, f_parser],
formatter_class=UltimateHelpFormatter,
description=f_parser.description,
)
Expand Down
8 changes: 4 additions & 4 deletions arrakis/imager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/usr/bin/env python3
"""Arrkis imager"""

import argparse
import hashlib
import logging
Expand All @@ -9,9 +10,8 @@
from glob import glob
from pathlib import Path
from subprocess import CalledProcessError
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional, Tuple, Union
from typing import NamedTuple as Struct
from typing import Optional, Tuple, Union

import numpy as np
from astropy.io import fits
Expand Down Expand Up @@ -692,8 +692,8 @@ def main(
logger.info(f"Searching {msdir} for MS matching {ms_glob_pattern}.")
mslist = sorted(msdir.glob(ms_glob_pattern))

assert (len(mslist) > 0) & (
len(mslist) == num_beams
assert (
(len(mslist) > 0) & (len(mslist) == num_beams)
), f"Incorrect number of MS files found: {len(mslist)} / {num_beams} - glob pattern: {ms_glob_pattern}"

logger.info(f"Will image {len(mslist)} MS files in {msdir} to {out_dir}")
Expand Down
1 change: 1 addition & 0 deletions arrakis/init_database.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/usr/bin/env python3
"""Create the Arrakis database"""

import json
import logging
import time
Expand Down
Loading
Loading