diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 360f9b09..f43e2ddc 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,20 +1,20 @@ repos: -- repo: https://github.com/pre-commit/pre-commit-hooks - rev: v4.6.0 - hooks: - - id: trailing-whitespace - - id: end-of-file-fixer - - id: check-yaml - - id: check-added-large-files -- repo: https://github.com/psf/black - rev: 24.4.0 - hooks: - - id: black -- repo: https://github.com/PyCQA/isort - rev: 5.13.2 - hooks: - - id: isort - args: ["--profile=black"] +- repo: https://github.com/astral-sh/ruff-pre-commit + # Ruff version. + rev: v0.4.1 + hooks: + # Run the linter. + - id: ruff + args: [ --fix ] + # Run the formatter. + - id: ruff-format +# - repo: https://github.com/pre-commit/pre-commit-hooks +# rev: v4.6.0 +# hooks: +# - id: trailing-whitespace +# - id: end-of-file-fixer +# - id: check-yaml +# - id: check-added-large-files ci: autofix_commit_msg: | diff --git a/arrakis/cleanup.py b/arrakis/cleanup.py index cb409d3e..57a056d1 100644 --- a/arrakis/cleanup.py +++ b/arrakis/cleanup.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 """DANGER ZONE: Purge directories of un-needed FITS files.""" + import argparse import logging import shutil diff --git a/arrakis/configs/petrichor.yaml b/arrakis/configs/petrichor.yaml index 60b51807..f0e4499e 100644 --- a/arrakis/configs/petrichor.yaml +++ b/arrakis/configs/petrichor.yaml @@ -1,10 +1,10 @@ # Set up for Petrichor cluster_class: "dask_jobqueue.SLURMCluster" cluster_kwargs: - cores: 16 + cores: 20 processes: 1 name: 'spice-worker' - memory: "128GiB" + memory: "160GiB" account: 'OD-217087' #queue: 'workq' walltime: '0-12:00:00' diff --git a/arrakis/cutout.py b/arrakis/cutout.py index 35f15dcf..be4df1b4 100644 --- a/arrakis/cutout.py +++ b/arrakis/cutout.py @@ -1,18 +1,15 @@ #!/usr/bin/env python """Produce cutouts from RACS cubes""" + import argparse import logging -import os -import pickle import warnings from concurrent.futures import ThreadPoolExecutor -from glob import glob from pathlib import Path from pprint import pformat from shutil import copyfile -from typing import Dict, List +from typing import List, Optional, Set, TypeVar from typing import NamedTuple as Struct -from typing import Optional, Set, TypeVar, Union import astropy.units as u import numpy as np @@ -36,7 +33,6 @@ validate_sbid_field_pair, ) from arrakis.utils.fitsutils import fix_header -from arrakis.utils.io import try_mkdir from arrakis.utils.pipeline import generic_parser, logo_str, workdir_arg_parser iers.conf.auto_download = False @@ -79,7 +75,6 @@ def cutout_weight( beam_num: int, dryrun=False, ) -> pymongo.UpdateOne: - # Update database myquery = {"Source_ID": source_id} @@ -337,7 +332,6 @@ def big_cutout( password: Optional[str] = None, limit: Optional[int] = None, ) -> List[pymongo.UpdateOne]: - wild = f"image.restored.{stoke.lower()}*contcube*beam{beam_num:02}.conv.fits" images = list(datadir.glob(wild)) if len(images) == 0: diff --git a/arrakis/frion.py b/arrakis/frion.py index cac0d7e4..6a01d794 100644 --- a/arrakis/frion.py +++ b/arrakis/frion.py @@ -1,24 +1,23 @@ #!/usr/bin/env python3 """Correct for the ionosphere in parallel""" + import argparse import logging import os -from glob import glob from pathlib import Path from pprint import pformat -from shutil import copyfile -from typing import Callable, Dict, List +from typing import Callable, Dict, List, Optional, Union from typing import NamedTuple as Struct -from typing import Optional, Union import astropy.units as u import numpy as np import pymongo from astropy.time import Time, TimeDelta from FRion import correct, predict -from prefect import flow, task, unmapped +from prefect import flow, task +from tqdm.auto import tqdm -from arrakis.logger import UltimateHelpFormatter, logger +from arrakis.logger import TqdmToLogger, UltimateHelpFormatter, logger from arrakis.utils.database import ( get_db, get_field_db, @@ -26,10 +25,10 @@ validate_sbid_field_pair, ) from arrakis.utils.fitsutils import getfreq -from arrakis.utils.io import try_mkdir -from arrakis.utils.pipeline import generic_parser, logo_str +from arrakis.utils.pipeline import generic_parser, logo_str, workdir_arg_parser logger.setLevel(logging.INFO) +TQDM_OUT = TqdmToLogger(logger, level=logging.INFO) class Prediction(Struct): @@ -89,8 +88,8 @@ def predict_worker( start_time: Time, end_time: Time, freq: np.ndarray, - cutdir: str, - plotdir: str, + cutdir: Path, + plotdir: Path, server: str = "ftp://ftp.aiub.unibe.ch/CODE/", prefix: str = "", formatter: Optional[Union[str, Callable]] = None, @@ -114,8 +113,8 @@ def predict_worker( """ logger.setLevel(logging.INFO) - ifile = os.path.join(cutdir, beam["beams"][field]["i_file"]) - i_dir = os.path.dirname(ifile) + ifile: Path = cutdir / beam["beams"][field]["i_file"] + i_dir = ifile.parent iname = island["Source_ID"] ra = island["RA"] dec = island["Dec"] @@ -137,7 +136,7 @@ def predict_worker( ra=ra, dec=dec, timestep=300.0, - ionexPath=os.path.join(os.path.dirname(cutdir), "IONEXdata"), + ionexPath=cutdir.parent / "IONEXdata", server=server, proxy_server=proxy_server, use_proxy=True, # Always use proxy - forces urllib @@ -146,23 +145,10 @@ def predict_worker( pre_download=pre_download, **proxy_args, ) + logger.info(f"Predicted modulation for {iname}.") predict_file = os.path.join(i_dir, f"{iname}_ion.txt") predict.write_modulation(freq_array=freq, theta=theta, filename=predict_file) - - plot_file = os.path.join(i_dir, f"{iname}_ion.png") - try: - predict.generate_plots( - times, RMs, theta, freq, position=[ra, dec], savename=plot_file - ) - except Exception as e: - logger.error(f"Failed to generate plot: {e}") - - plot_files = glob(os.path.join(i_dir, "*ion.png")) - logger.info(f"Plotting files: {plot_files=}") - for src in plot_files: - base = os.path.basename(src) - dst = os.path.join(plotdir, base) - copyfile(src, dst) + logger.info(f"Prediction file: {predict_file}") myquery = {"Source_ID": iname} @@ -324,29 +310,40 @@ def main( formatter=ionex_formatter, pre_download=ionex_predownload, ) - predictions = predict_worker.map( - island=islands, - field=unmapped(field), - beam=beams_cor, - start_time=unmapped(start_time), - end_time=unmapped(end_time), - freq=unmapped(freq.to(u.Hz).value), - cutdir=unmapped(cutdir), - plotdir=unmapped(plotdir), - server=unmapped(ionex_server), - prefix=unmapped(ionex_prefix), - proxy_server=unmapped(ionex_proxy_server), - formatter=unmapped(ionex_formatter), - pre_download=unmapped(ionex_predownload), - ) - corrections = correct_worker.map( - beam=beams_cor, - outdir=unmapped(cutdir), - field=unmapped(field), - prediction=predictions, - island=islands, - ) + predictions = [] + corrections = [] + assert len(islands) == len(beams_cor), "Islands and beams must be the same length" + for island, beam in tqdm( + zip(islands, beams_cor), + desc="Submitting tasks", + file=TQDM_OUT, + total=len(islands), + ): + prediction = predict_worker.submit( + island=island, + field=field, + beam=beam, + start_time=start_time, + end_time=end_time, + freq=freq.to(u.Hz).value, + cutdir=cutdir, + plotdir=plotdir, + server=ionex_server, + prefix=ionex_prefix, + proxy_server=ionex_proxy_server, + formatter=ionex_formatter, + pre_download=ionex_predownload, + ) + predictions.append(prediction) + correction = correct_worker.submit( + beam=beam, + outdir=cutdir, + field=field, + prediction=prediction, + island=island, + ) + corrections.append(correction) updates_arrays = [p.result().update for p in predictions] updates = [c.result() for c in corrections] @@ -426,9 +423,10 @@ def cli(): warnings.simplefilter("ignore", category=RuntimeWarning) gen_parser = generic_parser(parent_parser=True) + work_parser = workdir_arg_parser(parent_parser=True) f_parser = frion_parser(parent_parser=True) parser = argparse.ArgumentParser( - parents=[gen_parser, f_parser], + parents=[gen_parser, work_parser, f_parser], formatter_class=UltimateHelpFormatter, description=f_parser.description, ) diff --git a/arrakis/imager.py b/arrakis/imager.py index b737978c..4453df6e 100644 --- a/arrakis/imager.py +++ b/arrakis/imager.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 """Arrkis imager""" + import argparse import hashlib import logging @@ -9,9 +10,8 @@ from glob import glob from pathlib import Path from subprocess import CalledProcessError -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional, Tuple, Union from typing import NamedTuple as Struct -from typing import Optional, Tuple, Union import numpy as np from astropy.io import fits @@ -692,8 +692,8 @@ def main( logger.info(f"Searching {msdir} for MS matching {ms_glob_pattern}.") mslist = sorted(msdir.glob(ms_glob_pattern)) - assert (len(mslist) > 0) & ( - len(mslist) == num_beams + assert ( + (len(mslist) > 0) & (len(mslist) == num_beams) ), f"Incorrect number of MS files found: {len(mslist)} / {num_beams} - glob pattern: {ms_glob_pattern}" logger.info(f"Will image {len(mslist)} MS files in {msdir} to {out_dir}") diff --git a/arrakis/init_database.py b/arrakis/init_database.py index 0ae3ea66..85c4787b 100644 --- a/arrakis/init_database.py +++ b/arrakis/init_database.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 """Create the Arrakis database""" + import json import logging import time diff --git a/arrakis/linmos.py b/arrakis/linmos.py index d7140273..176754c5 100644 --- a/arrakis/linmos.py +++ b/arrakis/linmos.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 """Run LINMOS on cutouts in parallel""" + import argparse import logging import os @@ -8,19 +9,19 @@ from glob import glob from pathlib import Path from pprint import pformat -from typing import Dict, List +from typing import Dict, List, Optional, Tuple from typing import NamedTuple as Struct -from typing import Optional, Tuple import pandas as pd import pymongo from astropy.utils.exceptions import AstropyWarning -from prefect import flow, task, unmapped +from prefect import flow, task from racs_tools import beamcon_3D from spectral_cube.utils import SpectralCubeWarning from spython.main import Client as sclient +from tqdm.auto import tqdm -from arrakis.logger import UltimateHelpFormatter, logger +from arrakis.logger import TqdmToLogger, UltimateHelpFormatter, logger from arrakis.utils.database import get_db, test_db from arrakis.utils.pipeline import generic_parser, logo_str, workdir_arg_parser @@ -31,6 +32,8 @@ logger.setLevel(logging.INFO) +TQDM_OUT = TqdmToLogger(logger, level=logging.INFO) + class ImagePaths(Struct): """Class to hold image paths""" @@ -338,28 +341,33 @@ def main( logger.info(f"Running LINMOS on {len(big_beams)} islands") - all_parfiles = [] - for stoke in stokeslist: - image_paths = find_images.map( - field=unmapped(field), - beams_row=big_beams.iterrows(), - stoke=unmapped(stoke.capitalize()), - datadir=unmapped(cutdir), - ) - parfiles = genparset.map( - image_paths=image_paths, - stoke=unmapped(stoke.capitalize()), - datadir=unmapped(cutdir), - holofile=unmapped(holofile), - ) - all_parfiles.extend(parfiles) - - results = linmos.map( - all_parfiles, - unmapped(field), - unmapped(str(image)), - unmapped(holofile), - ) + results = [] + for beams_row in tqdm( + big_beams.iterrows(), + total=len(big_beams), + desc="Submitting tasks for LINMOS", + file=TQDM_OUT, + ): + for stoke in stokeslist: + image_path = find_images.submit( + field=field, + beams_row=beams_row, + stoke=stoke.capitalize(), + datadir=cutdir, + ) + parset = genparset.submit( + image_paths=image_path, + stoke=stoke.capitalize(), + datadir=cutdir, + holofile=holofile, + ) + result = linmos.submit( + parset=parset, + fieldname=field, + image=str(image), + holofile=holofile, + ) + results.append(result) updates = [f.result() for f in results] updates = [u for u in updates if u is not None] diff --git a/arrakis/makecat.py b/arrakis/makecat.py index 8d6085da..d6bdcfea 100644 --- a/arrakis/makecat.py +++ b/arrakis/makecat.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 """Make an Arrakis catalogue""" + import argparse import logging import os @@ -15,6 +16,7 @@ import pandas as pd from astropy.coordinates import SkyCoord from astropy.io import votable as vot +from astropy.io.votable.tree import VOTableFile from astropy.stats import sigma_clip from astropy.table import Column, Table from dask.diagnostics import ProgressBar @@ -513,9 +515,9 @@ def masker(x): df_out["local_rm_flag"] = df_out["local_rm_flag"].astype(bool) cat_out = RMTable.from_pandas(df_out.reset_index()) cat_out["local_rm_flag"].meta["ucd"] = "meta.code" - cat_out["local_rm_flag"].description = ( - "RM is statistically different from nearby RMs" - ) + cat_out[ + "local_rm_flag" + ].description = "RM is statistically different from nearby RMs" # Bring back the units for col in cat_out.colnames: @@ -685,7 +687,7 @@ def get_integration_time( return tints -def add_metadata(vo_table: vot.tree.Table, filename: str): +def add_metadata(vo_table: VOTableFile, filename: str): """Add metadata to VO Table for CASDA Args: @@ -696,9 +698,14 @@ def add_metadata(vo_table: vot.tree.Table, filename: str): """ # Add extra metadata for col_name, meta in columns_possum.extra_column_descriptions.items(): - col = vo_table.get_first_table().get_field_by_id(col_name) - col.description = meta["description"] - col.ucd = meta["ucd"] + try: + col = vo_table.get_first_table().get_field_by_id(col_name) + col.description = meta["description"] + col.ucd = meta["ucd"] + except KeyError as e: + logger.error(e) + logger.warning(f"Column {col_name} not found in table") + continue # Add params for CASDA if len(vo_table.params) > 0: diff --git a/arrakis/merge_fields.py b/arrakis/merge_fields.py index 46635a8c..ce49d445 100644 --- a/arrakis/merge_fields.py +++ b/arrakis/merge_fields.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 """Merge multiple RACS fields""" + import argparse import os from pprint import pformat @@ -7,7 +8,7 @@ from typing import Dict, List, Optional import pymongo -from prefect import flow, task, unmapped +from prefect import flow, task from arrakis.linmos import get_yanda, linmos, linmos_parser from arrakis.logger import UltimateHelpFormatter, logger @@ -126,12 +127,15 @@ def copy_singletons( big_beams = list( beams_col.find({"Source_ID": {"$in": island_ids}}).sort("Source_ID") ) - updates = copy_singleton.map( - beam=big_beams, - field_dict=unmapped(field_dict), - merge_name=unmapped(merge_name), - data_dir=unmapped(data_dir), - ) + updates = [] + for beam in big_beams: + update = copy_singleton.submit( + beam=beam, + field_dict=field_dict, + merge_name=merge_name, + data_dir=data_dir, + ) + updates.append(update) return updates @@ -256,14 +260,16 @@ def merge_multiple_fields( big_beams = list( beams_col.find({"Source_ID": {"$in": island_ids}}).sort("Source_ID") ) - - updates = merge_multiple_field.map( - beam=big_beams, - field_dict=unmapped(field_dict), - merge_name=unmapped(merge_name), - data_dir=unmapped(data_dir), - image=unmapped(image), - ) + updates = [] + for beam in big_beams: + update = merge_multiple_field.submit( + beam=beam, + field_dict=field_dict, + merge_name=merge_name, + data_dir=data_dir, + image=image, + ) + updates.append(update) return updates @@ -282,8 +288,8 @@ def main( ) -> str: logger.debug(f"{fields=}") - assert len(fields) == len( - field_dirs + assert ( + len(fields) == len(field_dirs) ), f"List of fields must be the same length as length of field dirs. {len(fields)=},{len(field_dirs)=}" field_dict = { @@ -410,7 +416,6 @@ def cli(): ) args = parser.parse_args() - verbose = args.verbose test_db( host=args.host, username=args.username, diff --git a/arrakis/process_region.py b/arrakis/process_region.py index 134f7b65..343768a1 100644 --- a/arrakis/process_region.py +++ b/arrakis/process_region.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 """Arrakis multi-field pipeline""" + import argparse import logging import os diff --git a/arrakis/process_spice.py b/arrakis/process_spice.py index 5f53ae60..d50b0a7b 100644 --- a/arrakis/process_spice.py +++ b/arrakis/process_spice.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 """Arrakis single-field pipeline""" + import argparse import logging import os diff --git a/arrakis/rmclean_oncuts.py b/arrakis/rmclean_oncuts.py index aaa5048d..9634fcb1 100644 --- a/arrakis/rmclean_oncuts.py +++ b/arrakis/rmclean_oncuts.py @@ -1,25 +1,23 @@ #!/usr/bin/env python3 """Run RM-synthesis on cutouts in parallel""" + import argparse import logging import os -import sys import warnings -from glob import glob from pathlib import Path from pprint import pformat -from shutil import copyfile from typing import Optional -import matplotlib.pyplot as plt import numpy as np import pymongo -from prefect import flow, task, unmapped +from prefect import flow, task from RMtools_1D import do_RMclean_1D from RMtools_3D import do_RMclean_3D +from tqdm.auto import tqdm from arrakis import rmsynth_oncuts -from arrakis.logger import UltimateHelpFormatter, logger +from arrakis.logger import TqdmToLogger, UltimateHelpFormatter, logger from arrakis.utils.database import ( get_db, get_field_db, @@ -28,6 +26,9 @@ ) from arrakis.utils.pipeline import generic_parser, logo_str +logger.setLevel(logging.INFO) +TQDM_OUT = TqdmToLogger(logger, level=logging.INFO) + @task(name="1D RM-CLEAN") def rmclean1d( @@ -59,7 +60,6 @@ def rmclean1d( Returns: pymongo.UpdateOne: MongoDB update query. """ - iname = comp["Source_ID"] cname = comp["Gaussian_ID"] logger.debug(f"Working on {comp}") save_name = field if sbid is None else f"{field}_{sbid}" @@ -92,10 +92,10 @@ def rmclean1d( maxIter=maxIter, gain=gain, nBits=nBits, - showPlots=showPlots, + showPlots=False, verbose=rm_verbose, prefixOut=prefix, - saveFigures=savePlots, + saveFigures=False, window=window, ) # Ensure JSON serializable @@ -113,12 +113,12 @@ def rmclean1d( # Save output do_RMclean_1D.saveOutput(outdict, arrdict, prefixOut=prefix, verbose=rm_verbose) - if savePlots: - plt.close("all") - plotdir = outdir / "plots" - plot_files = list(fdfFile.parent.glob("*.pdf")) - for plot_file in plot_files: - copyfile(plot_file, plotdir / plot_file.name) + # if savePlots: + # plt.close("all") + # plotdir = outdir / "plots" + # plot_files = list(fdfFile.parent.glob("*.pdf")) + # for plot_file in plot_files: + # copyfile(plot_file, plotdir / plot_file.name) # Load into Mongo myquery = {"Gaussian_ID": cname} @@ -344,35 +344,43 @@ def main( count = limit n_comp = count n_island = count + components = components[:count] + islands = islands[:count] if dimension == "1d": + outputs = [] logger.info(f"Running RM-CLEAN on {n_comp} components") - outputs = rmclean1d.map( - comp=components, - field=unmapped(field), - sbid=unmapped(sbid), - outdir=unmapped(outdir), - cutoff=unmapped(cutoff), - maxIter=unmapped(maxIter), - gain=unmapped(gain), - showPlots=unmapped(showPlots), - savePlots=unmapped(savePlots), - rm_verbose=unmapped(rm_verbose), - window=unmapped(window), - ) + for comp in tqdm(components, total=n_comp, desc="RM-CLEAN 1D", file=TQDM_OUT): + output = rmclean1d.submit( + comp=comp, + field=field, + sbid=sbid, + outdir=outdir, + cutoff=cutoff, + maxIter=maxIter, + gain=gain, + showPlots=showPlots, + savePlots=savePlots, + rm_verbose=rm_verbose, + window=window, + ) + outputs.append(output) + elif dimension == "3d": logger.info(f"Running RM-CLEAN on {n_island} islands") - - outputs = rmclean3d.map( - field=unmapped(field), - island=islands, - sbid=unmapped(sbid), - outdir=unmapped(outdir), - cutoff=unmapped(cutoff), - maxIter=unmapped(maxIter), - gain=unmapped(gain), - rm_verbose=unmapped(rm_verbose), - ) + outputs = [] + for island in tqdm(islands, total=n_island, desc="RM-CLEAN 3D", file=TQDM_OUT): + output = rmclean3d.submit( + field=field, + island=island, + sbid=sbid, + outdir=outdir, + cutoff=cutoff, + maxIter=maxIter, + gain=gain, + rm_verbose=rm_verbose, + ) + outputs.append(output) else: raise ValueError(f"Dimension {dimension} not supported.") diff --git a/arrakis/rmsynth_oncuts.py b/arrakis/rmsynth_oncuts.py index 357964a3..b37fbde8 100644 --- a/arrakis/rmsynth_oncuts.py +++ b/arrakis/rmsynth_oncuts.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 """Run RM-CLEAN on cutouts in parallel""" + import argparse import logging import os @@ -8,11 +9,11 @@ from pathlib import Path from pprint import pformat from shutil import copyfile -from typing import List +from typing import List, Optional, Tuple, Union from typing import NamedTuple as Struct -from typing import Optional, Tuple, Union import astropy.units as u +import matplotlib import matplotlib.pyplot as plt import numpy as np import pandas as pd @@ -23,14 +24,15 @@ from astropy.stats import mad_std, sigma_clip from astropy.wcs import WCS from astropy.wcs.utils import proj_plane_pixel_scales -from prefect import flow, task, unmapped +from prefect import flow, task from radio_beam import Beam from RMtools_1D import do_RMsynth_1D from RMtools_3D import do_RMsynth_3D from RMutils.util_misc import create_frac_spectra from scipy.stats import norm +from tqdm.auto import tqdm -from arrakis.logger import UltimateHelpFormatter, logger +from arrakis.logger import TqdmToLogger, UltimateHelpFormatter, logger from arrakis.utils.database import ( get_db, get_field_db, @@ -41,7 +43,9 @@ from arrakis.utils.fitting import fit_pl, fitted_mean, fitted_std from arrakis.utils.pipeline import generic_parser, logo_str, workdir_arg_parser +matplotlib.use("Agg") logger.setLevel(logging.INFO) +TQDM_OUT = TqdmToLogger(logger, level=logging.INFO) class Spectrum(Struct): @@ -1077,49 +1081,65 @@ def main( elif dimension == "1d": logger.info(f"Running RMsynth on {n_comp} components") - outputs = rmsynthoncut1d.map( - comp_tuple=components.iterrows(), - beam_tuple=beams.loc[components.Source_ID].iterrows(), - outdir=unmapped(outdir), - freq=unmapped(freq), - field=unmapped(field), - sbid=unmapped(sbid), - polyOrd=unmapped(polyOrd), - phiMax_radm2=unmapped(phiMax_radm2), - dPhi_radm2=unmapped(dPhi_radm2), - nSamples=unmapped(nSamples), - weightType=unmapped(weightType), - fitRMSF=unmapped(fitRMSF), - noStokesI=unmapped(noStokesI), - showPlots=unmapped(showPlots), - savePlots=unmapped(savePlots), - debug=unmapped(debug), - rm_verbose=unmapped(rm_verbose), - fit_function=unmapped(fit_function), - tt0=unmapped(tt0), - tt1=unmapped(tt1), - ion=unmapped(ion), - do_own_fit=unmapped(do_own_fit), - ) + outputs = [] + for comp_tuple, beam_tuple in tqdm( + zip(components.iterrows(), beams.loc[components.Source_ID].iterrows()), + total=n_comp, + desc="Submitting RMsynth 1D jobs", + file=TQDM_OUT, + ): + output = rmsynthoncut1d.submit( + comp_tuple=comp_tuple, + beam_tuple=beam_tuple, + outdir=outdir, + freq=freq, + field=field, + sbid=sbid, + polyOrd=polyOrd, + phiMax_radm2=phiMax_radm2, + dPhi_radm2=dPhi_radm2, + nSamples=nSamples, + weightType=weightType, + fitRMSF=fitRMSF, + noStokesI=noStokesI, + showPlots=showPlots, + savePlots=savePlots, + debug=debug, + rm_verbose=rm_verbose, + fit_function=fit_function, + tt0=tt0, + tt1=tt1, + ion=ion, + do_own_fit=do_own_fit, + ) + outputs.append(output) elif dimension == "3d": logger.info(f"Running RMsynth on {n_island} islands") - outputs = rmsynthoncut3d.map( - island_id=island_ids, - beam_tuple=beams.loc[island_ids].iterrows(), - outdir=unmapped(outdir), - freq=unmapped(freq), - field=unmapped(field), - sbid=unmapped(sbid), - phiMax_radm2=unmapped(phiMax_radm2), - dPhi_radm2=unmapped(dPhi_radm2), - nSamples=unmapped(nSamples), - weightType=unmapped(weightType), - fitRMSF=unmapped(fitRMSF), - not_RMSF=unmapped(not_RMSF), - rm_verbose=unmapped(rm_verbose), - ion=unmapped(ion), - ) + outputs = [] + for island_id, beam_tuple in tqdm( + zip(island_ids, beams.loc[island_ids].iterrows()), + total=n_island, + desc="Submitting RMsynth 3D jobs", + file=TQDM_OUT, + ): + output = rmsynthoncut3d.submit( + island_id=island_id, + beam_tuple=beam_tuple, + outdir=outdir, + freq=freq, + field=field, + sbid=sbid, + phiMax_radm2=phiMax_radm2, + dPhi_radm2=dPhi_radm2, + nSamples=nSamples, + weightType=weightType, + fitRMSF=fitRMSF, + not_RMSF=not_RMSF, + rm_verbose=rm_verbose, + ion=ion, + ) + outputs.append(output) else: raise ValueError("An incorrect RMSynth mode has been configured. ") diff --git a/arrakis/utils/fitting.py b/arrakis/utils/fitting.py index 75dc1527..e3b558a8 100644 --- a/arrakis/utils/fitting.py +++ b/arrakis/utils/fitting.py @@ -166,9 +166,7 @@ def fit_pl( } # Initialise the save dict - save_dict = { - n: {} for n in range(nterms + 1) - } # type: Dict[int, Dict[str, Any]] + save_dict = {n: {} for n in range(nterms + 1)} # type: Dict[int, Dict[str, Any]] for n in range(nterms + 1): p0 = p0_long[: n + 1] save_dict[n]["aics"] = np.nan diff --git a/arrakis/utils/msutils.py b/arrakis/utils/msutils.py index 844d5d28..49539232 100644 --- a/arrakis/utils/msutils.py +++ b/arrakis/utils/msutils.py @@ -718,7 +718,7 @@ def wsclean( logger.warning("CAUTION - square channel joining and multiscale is unstable!") for key, value in arguments.items(): - if type(value) is bool: + if isinstance(value, bool): if value: command += f" -{key.replace('_', '-')}" elif value: diff --git a/arrakis/utils/pipeline.py b/arrakis/utils/pipeline.py index b4ffce88..e5ce0fc2 100644 --- a/arrakis/utils/pipeline.py +++ b/arrakis/utils/pipeline.py @@ -8,7 +8,7 @@ import time import warnings from pathlib import Path -from typing import List, Optional, Tuple, Union +from typing import List, Tuple, Union import astropy.units as u import dask.array as da @@ -214,7 +214,7 @@ def __exit__(self, exc_type, exc_value, traceback): def inspect_client( - client: Union[distributed.Client, None] = None + client: Union[distributed.Client, None] = None, ) -> Tuple[str, int, int, u.Quantity, int, u.Quantity]: """_summary_ diff --git a/scripts/casda_prepare.py b/scripts/casda_prepare.py index 9e21444d..eb52532a 100755 --- a/scripts/casda_prepare.py +++ b/scripts/casda_prepare.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 """Prepare files for CASDA upload""" + import argparse import hashlib import logging @@ -262,9 +263,9 @@ def convert_spectra( hdul[0].header["CRPIX2"] = 1 hdul[0].header["CUNTI1"] = "deg" hdul[0].header["CUNTI2"] = "deg" - hdul[0].header[ - "comment" - ] = "Dummy image to indicate the pixel size and position" + hdul[0].header["comment"] = ( + "Dummy image to indicate the pixel size and position" + ) # Add dummy data to make it a valid FITS file hdul[0].data = np.zeros((1, 1)) hdul.flush() @@ -565,8 +566,8 @@ def main( with open(outf, "w") as f: for rid in rem_ids: f.write(f"{rid}\n") - assert len(cubes) == len( - set(polcat["source_id"]) + assert ( + len(cubes) == len(set(polcat["source_id"])) ), f"Number of cubes does not match number of sources -- {len(cubes)=} and {len(set(polcat['source_id']))=}" unique_ids, unique_idx = np.unique(polcat["source_id"], return_index=True) diff --git a/scripts/compare_leakage.py b/scripts/compare_leakage.py index 1a24b5de..53ba69e7 100644 --- a/scripts/compare_leakage.py +++ b/scripts/compare_leakage.py @@ -83,7 +83,8 @@ def interpolate(field, comp, beams, cutdir, septab, holofile, verbose=True): imfile = glob( os.path.join(cutdir, f"{comp['Source_ID']}*beam{bm:02d}.conv.fits") )[0] - except: + except Exception as e: + logger.error(e) logger.critical(f"No image file for source {comp['Source_ID']} beam {bm}") return diff --git a/scripts/fix_dr1_cat.py b/scripts/fix_dr1_cat.py index 32398b6b..0f11365c 100755 --- a/scripts/fix_dr1_cat.py +++ b/scripts/fix_dr1_cat.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 """Post process DR1 catalog""" + import logging import os import pickle diff --git a/scripts/fix_src_cat.py b/scripts/fix_src_cat.py index a0817fd2..26af1342 100644 --- a/scripts/fix_src_cat.py +++ b/scripts/fix_src_cat.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 """Post process DR1 source catalog""" + import logging import os from pathlib import Path diff --git a/scripts/spica.py b/scripts/spica.py index e791b8d1..8d5ef92f 100755 --- a/scripts/spica.py +++ b/scripts/spica.py @@ -55,7 +55,8 @@ def mslist(cal_sb, name): try: ms = glob(f"{racs_area}/{cal_sb}/RACS_test4_1.05_{name}/*beam00_*.ms")[0] - except: + except Exception as e: + logger.error(e) raise Exception( f"Can't find '{racs_area}/{cal_sb}/RACS_test4_1.05_{name}/*beam00_*.ms'" )