diff --git a/VERSIONLOG.md b/VERSIONLOG.md index 27eeb720..dd89d91f 100644 --- a/VERSIONLOG.md +++ b/VERSIONLOG.md @@ -1,9 +1,17 @@ # Scilifelab_epps Version Log -## 20250122.1 +## 20250123.1 Shorten MinKNOW samplesheet name, so the timestamp is visible in the MinKNOW file explorer. Also add happy-new-year dir creation. +## 20250122.2 + +Rebuild EPP to fetch last recorded derived sample UDF. + +## 20250122.1 + +Create yearly dir for AVITI run manifests. + ## 20250116.1 Ruff 0.9.2 formatting. diff --git a/scilifelab_epps/calc_from_args/udf_arg_methods.py b/scilifelab_epps/calc_from_args/udf_arg_methods.py index 1040c91d..3321f3b1 100644 --- a/scilifelab_epps/calc_from_args/udf_arg_methods.py +++ b/scilifelab_epps/calc_from_args/udf_arg_methods.py @@ -2,7 +2,6 @@ import logging from typing import Any -import yaml from genologics.entities import Artifact, Process from scilifelab_epps.utils import udf_tools @@ -23,7 +22,6 @@ def fetch_from_arg( """ - history: str | None = None source: Artifact | Process source_name: str @@ -47,19 +45,11 @@ def fetch_from_arg( value = process.udf[arg_dict["udf"]] else: if arg_dict["recursive"]: - # Fetch UDF recursively, back-tracking the input-output tuple - if arg_dict["source"] == "input": - use_current = False - else: - assert arg_dict["source"] == "output" - use_current = True - - value, history = udf_tools.fetch_last( - currentStep=process, - art_tuple=art_tuple, + # Fetch UDF recursively + + value = udf_tools.fetch_last( + target_art=source, target_udfs=arg_dict["udf"], - use_current=use_current, - print_history=True, ) else: # Fetch UDF from input or output artifact @@ -78,15 +68,11 @@ def fetch_from_arg( else: return on_fail - # Log what has been done - log_str = f"Fetched UDF '{arg_dict['udf']}': {value} from {arg_dict['source']} '{source_name}'." - - if history: - history_yaml = yaml.load(history, Loader=yaml.FullLoader) - last_step_name = history_yaml[-1]["Step name"] - last_step_id = history_yaml[-1]["Step ID"] - log_str += f"\n\tUDF recusively fetched from step: '{last_step_name}' (ID: '{last_step_id}')" - + log_str = ( + f"Fetched UDF '{arg_dict['udf']}': {value}" + + f"{' (recursive)' if arg_dict['recursive'] else ''}" + + f" from {arg_dict['source']} '{source_name}'." + ) logging.info(log_str) return value diff --git a/scilifelab_epps/utils/udf_tools.py b/scilifelab_epps/utils/udf_tools.py index 043a680d..fec53764 100644 --- a/scilifelab_epps/utils/udf_tools.py +++ b/scilifelab_epps/utils/udf_tools.py @@ -1,5 +1,7 @@ import json -from typing import Union +import logging +import xml.etree.ElementTree as ET +from typing import Any, Union from genologics.entities import Artifact, Process from requests.exceptions import HTTPError @@ -9,6 +11,32 @@ """ +def process_has_udfs(process: Process, target_udfs: list[str]) -> list[str]: + """Check whether any target UDFs are present in the sample fields of the process associated type. + + This function is necessary because a non-required sample UDF left blank will not be detected in the artifact object. + + Returns a list of found UDFs, or an empty list if none were found. + """ + + # Get the raw xml of the process associated type + raw_xml = process.type.xml() + + # Parse as tree object + root = ET.fromstring(raw_xml) + + # Instantiate return object + target_udfs_found = [] + + # Check whether the target UDF is present in the sample fields + for sample_field in root.iter("sample-field"): + for target_udf in target_udfs: + if sample_field.attrib["name"] == target_udf: + target_udfs_found.append(target_udf) + + return target_udfs_found + + def put(target: Artifact | Process, target_udf: str, val, on_fail=AssertionError): """Try to put UDF on artifact or process, optionally without causing fatal error. Evaluates true on success and error (default) or on_fail param on failure. @@ -39,22 +67,6 @@ def is_filled(art: Artifact, target_udf: str) -> bool: return False -def no_outputs(currentStep: Process) -> bool: - """Check whether step has outputs or not""" - - art_tuples = get_art_tuples(currentStep) - - if art_tuples: - none_outputs = [t[1] is None for t in art_tuples] - - if all(none_outputs): - return True - else: - return False - else: - return True - - def get_art_tuples(currentStep: Process) -> list: """Return I/O tuples whose elements are either 1) both analytes @@ -135,125 +147,145 @@ def list_udfs(art: Artifact) -> list: def fetch_last( - currentStep: Process, - art_tuple: tuple, + target_art: Artifact, target_udfs: str | list, - use_current=True, - print_history=False, + include_current=False, + log_traceback=False, + return_traceback=False, on_fail=AssertionError, -): +) -> Any | tuple[Any, dict]: """Recursively look for target UDF. - Target UDF can be supplied as a string, or as a prioritized list of strings. + Arguments: + + target_art Artifact to traceback. - If "print_history" == True, will return both the target metric and the lookup history as a string. + target_udfs The UDF(s) to look for. Can be supplied as a string, or as a prioritized + list of strings. + + include_current If True, will pull target UDFs if found in the target artifact. + + log_traceback If True, will log the full traceback. + + return_traceback If True, will additionally return the traceback as a dict. + + on_fail If this is a subclass of Exception, will raise this exception on failure. + If not, will return this value on failure instead of the UDF value. """ # Convert to list, to enable iteration if isinstance(target_udfs, str): target_udfs = [target_udfs] - history = [] - - while True: - history.append({"Step name": currentStep.type.name, "Step ID": currentStep.id}) - - # Try to grab input and output articles, if possible - try: - input_art = art_tuple[0]["uri"] - except: - input_art = None - try: - output_art = art_tuple[1]["uri"] - except: - output_art = None + # Instantiate traceback + traceback = [] + steps_visited = [] - if len(history) == 1 and use_current is not True: - # If we are in the original step and "use_current" is false, skip - pass - else: - # Look trough outputs - if output_art: - history[-1].update( - { - "Derived sample ID": output_art.id, - "Derived sample UDFs": dict(output_art.udf.items()), + # Instantiate recursion variables + current_art = target_art + n = 1 + try: + # Start recursive search + while True: + # Dynamically reassign parent process + pp = current_art.parent_process + + # Keep track of visited parent processes + if pp is not None: + steps_visited.append(f"'{pp.type.name}' ({pp.id})") + target_udfs_in_parent_process = process_has_udfs(pp, target_udfs) + + traceback.append( + { + "Artifact": { + "Name": current_art.name, + "ID": current_art.id, + "UDFs": dict(current_art.udf.items()), + "Parent Step": { + "Name": pp.type.name if pp else None, + "ID": pp.id if pp else None, + }, } - ) + } + ) - for target_udf in target_udfs: - if target_udf in list_udfs(output_art): - if print_history is True: - return output_art.udf[target_udf], json.dumps( - history, indent=2 + # Search for correct UDF + for target_udf in target_udfs: + if target_udf in list_udfs(current_art): + if include_current is not True and n == 1: + logging.info( + "Target UDF was found in specified artifact, but include_current is set to False. Skipping." + ) + else: + if log_traceback is True: + logging.info( + f"Traceback:\n{json.dumps(traceback, indent=2)}" ) + logging.info( + f"Found target UDF '{target_udf}'" + + f" with value '{current_art.udf[target_udf]}'" + + f" in process {steps_visited[-1]}" + + f" {'output' if pp else 'input'}" + + f" artifact '{current_art.name}' ({current_art.id})" + ) + + if return_traceback: + return current_art.udf[target_udf], traceback else: - return output_art.udf[target_udf] - - # Look through inputs - if input_art: - if input_art.parent_process: - history[-1].update( - { - "Input sample parent step name": input_art.parent_process.type.name, - "Input sample parent step ID": input_art.parent_process.id, - } - ) - history[-1].update( - { - "Input sample ID": input_art.id, - "Input sample UDFs": dict(input_art.udf.items()), - } + return current_art.udf[target_udf] + + # Address the case that no target UDFs were found on the artifact, even though they were present in the parent process + if pp is not None and target_udfs_in_parent_process != []: + logging.warning( + f"Parent process '{pp.type.name}' ({pp.id})" + + f" has target UDF(s) {target_udfs_in_parent_process}," + + f" but it's not filled in for artifact '{current_art.name}' ({current_art.id})." + + " Please double check that you haven't missed filling it in." ) - for target_udf in target_udfs: - if target_udf in list_udfs(input_art): - if print_history is True: - return input_art.udf[target_udf], json.dumps( - history, indent=2 - ) - else: - return input_art.udf[target_udf] - # Cycle to previous step, if possible - try: - pp = input_art.parent_process - assert pp is not None - - pp_tuples = get_art_tuples(pp) - matching_tuples = [] - for pp_tuple in pp_tuples: - try: - pp_input = pp_tuple[0]["uri"] - except: - pp_input = None - try: - pp_output = pp_tuple[1]["uri"] - except: - pp_output = None - - if (pp_input and pp_input.id == input_art.id) or ( - pp_output and pp_output.id == input_art.id - ): - matching_tuples.append(pp_tuple) - - assert len(matching_tuples) == 1, ( - "Target artifact matches multiple inputs/outputs in previous step." - ) + # Stop traceback if no parent process is found + if pp is None: + raise AssertionError( + f"Artifact '{current_art.name}' ({current_art.id}) has no parent process linked and can't be traced back further." + ) + + pp_art_tuples = get_art_tuples(pp) - # Back-tracking successful, re-assign variables to represent previous step - currentStep = pp - art_tuple = matching_tuples[0] + # If parent process has valid input-output tuples, use for linkage + linked_input_arts = [] + if pp_art_tuples != []: + for pp_tuple in pp_art_tuples: + if pp_tuple[1]["uri"].id == current_art.id: + linked_input_arts.append(pp_tuple[0]["uri"]) + else: + raise NotImplementedError( + "Parent process has no valid input-output links, traceback can't continue." + ) - except AssertionError: - if isinstance(on_fail, type) and issubclass(on_fail, Exception): - if print_history is True: - print(json.dumps(history, indent=2)) - raise on_fail( - f"Could not find matching UDF(s) [{', '.join(target_udfs)}] for artifact tuple {art_tuple}" + if len(linked_input_arts) == 1: + # Dynamically reassign current artifact + current_art = linked_input_arts[0] + elif len(linked_input_arts) > 1: + raise AssertionError( + "Parent process has multiple input artifacts linked to the same output artifact, can't traceback." ) else: - if print_history is True: - print(json.dumps(history, indent=2)) - return on_fail, json.dumps(history, indent=2) - else: - return on_fail + raise AssertionError( + "Parent process has no input artifacts linked to the output artifact, can't traceback." + ) + + n += 1 + + except AssertionError: + if isinstance(on_fail, type) and issubclass(on_fail, Exception): + raise on_fail( + f"Could not find matching UDF(s) [{', '.join(target_udfs)}] for artifact '{target_art.name}' ({target_art.id})" + ) + else: + logging.warning( + f"Failed traceback for artifact '{target_art.name}' ({target_art.id}), falling back to value '{on_fail}'" + ) + if return_traceback: + return on_fail, traceback + else: + return on_fail diff --git a/scilifelab_epps/zika/utils.py b/scilifelab_epps/zika/utils.py index 26b0b528..41cbfeee 100644 --- a/scilifelab_epps/zika/utils.py +++ b/scilifelab_epps/zika/utils.py @@ -114,7 +114,11 @@ def fetch_sample_data(currentStep: Process, to_fetch: dict) -> pd.DataFrame: except KeyError: row[col_name] = None else: - row[col_name] = fetch_last(currentStep, art_tuple, udf_query) + row[col_name] = fetch_last( + target_art=art_tuple[0]["uri"], + target_udfs=udf_query, + include_current=True, + ) rows.append(row) # Transform to dataframe diff --git a/scripts/fetch_last_known_field.py b/scripts/fetch_last_known_field.py new file mode 100644 index 00000000..536afd8b --- /dev/null +++ b/scripts/fetch_last_known_field.py @@ -0,0 +1,121 @@ +#!/usr/bin/env python +import logging +from argparse import ArgumentParser +from datetime import datetime as dt + +from genologics.config import BASEURI, PASSWORD, USERNAME +from genologics.entities import Artifact, Process +from genologics.lims import Lims + +from scilifelab_epps.utils import udf_tools +from scilifelab_epps.wrapper import epp_decorator + +TIMESTAMP = dt.now().strftime("%y%m%d_%H%M%S") + + +@epp_decorator(script_path=__file__, timestamp=TIMESTAMP) +def main(args): + """This script will get the name of an artifact UDF from a master step field, + and for every sample artifact in the current step: + + - Use API calls to recursively back-trace the sample history using + input-output links until it finds an artifact with the specified UDF + - Copy the value of the specified UDF from the found artifact to the + artifact of the current step + + Example use-case: + - For Nanopore libraries in the Aggregate QC step of the Library Validation protocol, + fetch the last recorded artifact UDF "Size (bp)" from the library prep for all samples. + """ + lims = Lims(BASEURI, USERNAME, PASSWORD) + process = Process(lims, id=args.pid) + + # Get the name of the target UDF from the step field + target_udf = process.udf.get(args.step_udf, None) + assert target_udf is not None and target_udf != "None", ( + f"No target UDF supplied from step field '{args.step_udf}'" + ) + + # Check whether process has output artifacts, not the case for e.g. QC steps + no_outputs: bool = True if udf_tools.get_art_tuples(process) == [] else False + + # Load input artifacts + arts_in: list[Artifact] = [ + art for art in process.all_inputs() if art.type == "Analyte" + ] + + # Find target output artifacts, if any + if no_outputs: + logging.info("Step has no output artifacts. Assigning to input artifacts.") + else: + art_tuples: list[tuple[dict]] = process.input_output_maps + art_in2out: dict[str:Artifact] = { + i["uri"].id: o["uri"] + for i, o in art_tuples + if i["uri"].type == "Analyte" and o["uri"].type == "Analyte" + } + + steps_used = [] + for art_in in arts_in: + if no_outputs: + target_artifact = art_in + else: + target_artifact = art_in2out[art_in.id] + logging.info( + f"Looking for last recorded UDF '{target_udf}' of {'input' if no_outputs else 'output'} artifact '{target_artifact.name}'..." + ) + udf_value, traceback = udf_tools.fetch_last( + target_art=target_artifact, + target_udfs=target_udf, + log_traceback=True, + return_traceback=True, + on_fail=None, + ) + + steps_used.append( + f"'{traceback[-1]['Artifact']['Parent Step']['Name']}'" + + f" ({traceback[-1]['Artifact']['Parent Step']['ID']})" + ) + + if udf_value is not None: + target_artifact.udf[target_udf] = udf_value + target_artifact.put() + logging.info( + f"Updated UDF '{target_udf}' for {'input' if no_outputs else 'output'} artifact '{target_artifact.name}' to '{udf_value}'" + ) + else: + logging.warning( + f"Could not traceback UDF '{target_udf}' for {'input' if no_outputs else 'output'} artifact '{target_artifact.name}'" + ) + logging.info(f"Traceback:\n{traceback}") + + # Write to stdout for the green banner + print( + f"UDF '{target_udf}' pulled from steps: {' ,'.join(set(steps_used))}. Please double check the values." + ) + + +if __name__ == "__main__": + # Parse args + parser = ArgumentParser() + parser.add_argument( + "--pid", + required=True, + type=str, + help="Lims ID for current Process.", + ) + parser.add_argument( + "--log", + required=True, + type=str, + help="Which file slot to use for the script log.", + ) + parser.add_argument( + "--step_udf", + required=True, + type=str, + help="The name of the step UDF listing the target artifact UDF.", + ) + args = parser.parse_args() + + main(args) diff --git a/scripts/generate_aviti_run_manifest.py b/scripts/generate_aviti_run_manifest.py index 3226269c..1bc0b968 100644 --- a/scripts/generate_aviti_run_manifest.py +++ b/scripts/generate_aviti_run_manifest.py @@ -485,9 +485,13 @@ def main(args: Namespace): # Move manifest(s) logging.info("Moving run manifest to ngi-nas-ns...") try: + dst = f"/srv/ngi-nas-ns/samplesheets/Aviti/{dt.now().year}" + if not os.path.exists(dst): + logging.info(f"Happy new year! Creating {dst}") + os.mkdir(dst) shutil.copyfile( zip_file, - f"/srv/ngi-nas-ns/samplesheets/Aviti/{dt.now().year}/{zip_file}", + f"{dst}/{zip_file}", ) os.remove(zip_file) except: diff --git a/scripts/log_udfs.py b/scripts/log_udfs.py index e42d6af0..a8a202ed 100644 --- a/scripts/log_udfs.py +++ b/scripts/log_udfs.py @@ -37,7 +37,7 @@ def main(lims, args): file_str = None # Parse outputs and their UDFs - if udf_tools.no_outputs(currentStep): + if udf_tools.get_art_tuples(currentStep) == []: arts = [art for art in currentStep.all_inputs() if art.type == "Analyte"] else: arts = [art for art in currentStep.all_outputs() if art.type == "Analyte"] diff --git a/scripts/ont_calc_volumes.py b/scripts/ont_calc_volumes.py index 9cbd7519..7e4e069d 100644 --- a/scripts/ont_calc_volumes.py +++ b/scripts/ont_calc_volumes.py @@ -45,7 +45,11 @@ def main(lims, args): # Get last known length size_bp, size_bp_history = udf_tools.fetch_last( - currentStep, art_tuple, "Size (bp)", on_fail=None, print_history=True + target_art=art_out, + target_udfs="Size (bp)", + log_traceback=True, + return_traceback=True, + on_fail=None, ) log.append(f"'Size (bp)': {size_bp}\n{size_bp_history}") diff --git a/scripts/ont_update_amount.py b/scripts/ont_update_amount.py index f3c6fbfc..37e430af 100644 --- a/scripts/ont_update_amount.py +++ b/scripts/ont_update_amount.py @@ -44,10 +44,10 @@ def main(lims, args): or "ONT Barcoding" in currentStep.type.name ): size_bp, size_bp_history = udf_tools.fetch_last( - currentStep=currentStep, - art_tuple=art_tuple, + target_art=art_out, target_udfs="Size (bp)", - print_history=True, + log_traceback=True, + return_traceback=True, on_fail=None, ) log.append(f"'Size (bp)': {size_bp}\n{size_bp_history}")