From 3e8e6ce817358bf9d6c2e94bfa2b87398de73b4f Mon Sep 17 00:00:00 2001 From: Nicolai-vKuegelgen Date: Mon, 8 Apr 2024 12:07:13 +0200 Subject: [PATCH] Address code review --- cubi_tk/irods_common.py | 13 ++++++------ cubi_tk/sodar/pull_data_collection.py | 30 +++++++++++++-------------- 2 files changed, 22 insertions(+), 21 deletions(-) diff --git a/cubi_tk/irods_common.py b/cubi_tk/irods_common.py index 157ce35..a05cea8 100644 --- a/cubi_tk/irods_common.py +++ b/cubi_tk/irods_common.py @@ -3,7 +3,7 @@ import os.path from pathlib import Path import re -from typing import Dict, Iterable, List, Union +from typing import Iterable, Union import attrs from irods.collection import iRODSCollection @@ -299,7 +299,7 @@ def __init__( super().__init__(ask, irods_env_path) self.hash_scheme = hash_scheme - def retrieve_irods_data_objects(self, irods_path: str) -> Dict[str, List[iRODSDataObject]]: + def retrieve_irods_data_objects(self, irods_path: str) -> dict[str, list[iRODSDataObject]]: """Retrieve data objects from iRODS. :param irods_path: iRODS path. @@ -331,7 +331,7 @@ def _irods_query( self, session: iRODSSession, root_coll: iRODSCollection, - ) -> Dict[str, Union[Dict[str, iRODSDataObject], List[iRODSDataObject]]]: + ) -> dict[str, Union[dict[str, iRODSDataObject], list[iRODSDataObject]]]: """Get data objects recursively under the given iRODS path.""" ignore_schemes = [k.lower() for k in HASH_SCHEMES if k != self.hash_scheme.upper()] @@ -344,8 +344,9 @@ def _irods_query( for res in query: # If the 'res' dict is not split into Colllection&Object the resulting iRODSDataObject is not fully functional, # likely because a name/path/... attribute is overwritten somewhere - coll_res = {k: v for k, v in res.items() if k.icat_id >= 500} - obj_res = {k: v for k, v in res.items() if k.icat_id < 500} + magic_icat_id_separator = 500 + coll_res = {k: v for k, v in res.items() if k.icat_id >= magic_icat_id_separator} + obj_res = {k: v for k, v in res.items() if k.icat_id < magic_icat_id_separator} coll = iRODSCollection(root_coll.manager, coll_res) obj = iRODSDataObject(session.data_objects, parent=coll, results=[obj_res]) @@ -357,7 +358,7 @@ def _irods_query( return data_objs @staticmethod - def parse_irods_collection(irods_data_objs) -> Dict[str, List[iRODSDataObject]]: + def parse_irods_collection(irods_data_objs) -> dict[str, list[iRODSDataObject]]: """Parse iRODS collection :param irods_data_objs: iRODS collection. diff --git a/cubi_tk/sodar/pull_data_collection.py b/cubi_tk/sodar/pull_data_collection.py index 0d711e5..326d209 100644 --- a/cubi_tk/sodar/pull_data_collection.py +++ b/cubi_tk/sodar/pull_data_collection.py @@ -55,7 +55,7 @@ def __init__(self, args): @classmethod def setup_argparse(cls, parser: argparse.ArgumentParser) -> None: - """Setup arguments for ``check-remote`` command.""" + """Setup arguments for ``pull-data-collection`` command.""" parser.add_argument( "--hidden-cmd", dest="sodar_cmd", default=cls.run, help=argparse.SUPPRESS ) @@ -157,11 +157,11 @@ def setup_argparse(cls, parser: argparse.ArgumentParser) -> None: @classmethod def run( cls, args, _parser: argparse.ArgumentParser, _subparser: argparse.ArgumentParser - ) -> typing.Optional[int]: + ) -> int | None: """Entry point into the command.""" return cls(args).execute() - def check_args(self, args): + def check_args(self, args) -> int: """Called for checking arguments.""" res = 0 @@ -190,18 +190,18 @@ def check_args(self, args): return res - def execute(self) -> typing.Optional[int]: + def execute(self) -> int | None: """Execute the transfer.""" res = self.check_args(self.args) if res: # pragma: nocover return res - logger.info("Starting cubi-tk snappy pull-processed-data") + logger.info("Starting cubi-tk sodar pull-data-collection") logger.info(" args: %s", self.args) # Get list of sample ids if self.args.sample_list: - samples = self.args.sample_list + samples = set(self.args.sample_list) elif self.args.biomedsheet: samples = self.parse_sample_tsv(self.args.biomedsheet, sample_col=2, skip_rows=12) elif self.args.tsv: @@ -250,11 +250,11 @@ def execute(self) -> typing.Optional[int]: return 0 @staticmethod - def parse_sample_tsv(tsv_path, sample_col=1, skip_rows=0, skip_comments=True) -> List[str]: + def parse_sample_tsv(tsv_path, sample_col=1, skip_rows=0, skip_comments=True) -> set[str]: extra_args = {"comment": "#"} if skip_comments else {} df = pd.read_csv(tsv_path, sep="\t", skiprows=skip_rows, **extra_args) try: - samples = list(df.iloc[:, sample_col - 1]) + samples = set(df.iloc[:, sample_col - 1]) except IndexError: logger.error( f"Error extracting column no. {sample_col} from {tsv_path}, only {len(df.columns)} where detected." @@ -265,12 +265,12 @@ def parse_sample_tsv(tsv_path, sample_col=1, skip_rows=0, skip_comments=True) -> @staticmethod def filter_irods_file_list( - remote_files_dict: Dict[str, List[iRODSDataObject]], + remote_files_dict: dict[str, list[iRODSDataObject]], common_assay_path: str, - file_patterns: List[str], - samples: List[str], + file_patterns: list[str], + samples: set[str], substring_match: bool = False, - ) -> Dict[str, List[iRODSDataObject]]: + ) -> Dict[str, list[iRODSDataObject]]: """Filter iRODS collection based on identifiers (sample id or library name) and file type/extension. :param remote_files_dict: Dictionary with iRODS collection information. Key: file name as string (e.g., @@ -304,9 +304,9 @@ def filter_irods_file_list( # Check if collection (=1st element of striped path) matches any of the samples if samples and not substring_match: - sample_match = any([s == collection for s in samples]) + sample_match = any(s == collection for s in samples) elif samples: - sample_match = any([s in collection for s in samples]) + sample_match = any(s in collection for s in samples) else: sample_match = True @@ -314,7 +314,7 @@ def filter_irods_file_list( continue if file_patterns: - file_pattern_match = any([p for p in file_patterns if path.match(p)]) + file_pattern_match = any(p for p in file_patterns if path.match(p)) else: file_pattern_match = True