diff --git a/scripts/generate_aviti_run_manifest.py b/scripts/generate_aviti_run_manifest.py index c644d4ee..a9a69dde 100644 --- a/scripts/generate_aviti_run_manifest.py +++ b/scripts/generate_aviti_run_manifest.py @@ -119,6 +119,7 @@ def idxs_from_label(label: str) -> list[str | tuple[str, str]]: def get_flowcell_id(process: Process) -> str: + """Get the Element flowcell ID from the process.""" flowcell_ids = [ op.container.name for op in process.all_outputs() if op.type == "Analyte" ] @@ -134,7 +135,7 @@ def get_flowcell_id(process: Process) -> str: return flowcell_id -def get_runValues_section(process: Process, file_name: str) -> str: +def get_runValues_section(process: Process, manifest_name: str) -> str: """Generate the [RUNVALUES] section of the AVITI run manifest and return it as a string.""" read_recipe = "-".join( @@ -151,7 +152,7 @@ def get_runValues_section(process: Process, file_name: str) -> str: "[RUNVALUES]", "KeyName, Value", f"lims_step_name, {sanitize(process.type.name)}", - f"file_name, {sanitize(file_name)}", + f"manifest_name, {sanitize(manifest_name)}", f"read_recipe, {read_recipe}", ] ) @@ -171,41 +172,42 @@ def get_settings_section() -> str: return settings_section -def get_samples_section(process: Process) -> str: - """Generate the [SAMPLES] section of the AVITI run manifest and return it as a string.""" +def get_samples_dfs(process: Process) -> list[pd.DataFrame]: + """Generate dataframes of samples with the same index duplicity and length, adding PhiX controls as needed.""" # Assert output analytes loaded on flowcell arts_out = [op for op in process.all_outputs() if op.type == "Analyte"] assert ( len(arts_out) == 1 or len(arts_out) == 2 ), "Expected one or two output analytes." + lanes = [art_out.location[1].split(":")[0] for art_out in arts_out] assert set(lanes) == {"1"} or set(lanes) == { "1", "2", }, "Expected a single-lane or dual-lane flowcell." - # Iterate over pools - all_rows = [] + # Iterate over pool / lane + sample_rows = [] for art_out, lane in zip(arts_out, lanes): - lane_rows = [] - assert ( - "AVITI Flow Cell" in art_out.container.type.name - ), f"Unsupported container type {art_out.container.type.name}." - assert ( - len(art_out.samples) > 1 and len(art_out.reagent_labels) > 1 - ), "Not a pool." - assert len(art_out.samples) == len( - art_out.reagent_labels - ), "Unequal number of samples and reagent labels." - + # Get sample-label linkage via database sample2label: dict[str, str] = get_pool_sample_label_mapping(art_out) assert len(set(art_out.reagent_labels)) == len( art_out.reagent_labels ), "Detected non-unique reagent labels." + # Record PhiX UDFs for each output artifact + phix_loaded: bool = art_out.udf["% phiX"] != 0 + phix_set_name = art_out.udf.get("Element PhiX Set", None) + if phix_loaded: + assert ( + phix_set_name is not None + ), "PhiX controls loaded but no kit specified." + else: + assert phix_set_name is None, "PhiX controls specified but not loaded." + + # Collect rows for each sample samples = art_out.samples - # Iterate over samples for sample in samples: # Project name and sequencing setup if sample.project: @@ -228,41 +230,65 @@ def get_samples_section(process: Process) -> str: row["Lane"] = lane row["Project"] = project row["Recipe"] = seq_setup + row["phix_loaded"] = phix_loaded + row["phix_set_name"] = phix_set_name - lane_rows.append(row) + sample_rows.append(row) - # Add PhiX controls if added: - phix_loaded: bool = art_out.udf["% phiX"] != 0 - phix_set_name = art_out.udf.get("Element PhiX Set", None) + # Get master dateframe + df_samples = pd.DataFrame(sample_rows) - if phix_loaded: - assert ( - phix_set_name is not None - ), "PhiX controls loaded but no kit specified." + # Calculate index lengths for grouping + df_samples["len_idx1"] = df_samples["Index1"].apply(len) + df_samples["len_idx2"] = df_samples["Index2"].apply(len) + # Group into composite dataframes and add PhiX controls w. correct length + dfs_samples_and_controls = [] + for (len_idx1, len_idx2), group in df_samples.groupby(["len_idx1", "len_idx2"]): + # Add PhiX if needed + if group["phix_loaded"].any(): + phix_set_name = group["phix_set_name"].iloc[0] phix_set = PHIX_SETS[phix_set_name] for phix_idx_pair in phix_set["indices"]: row = {} row["SampleName"] = phix_set["nickname"] - row["Index1"] = phix_idx_pair[0] - row["Index2"] = phix_idx_pair[1] - row["Lane"] = lane + row["Index1"] = fit_seq(phix_idx_pair[0], len_idx1) + row["Index2"] = fit_seq(phix_idx_pair[1], len_idx2) + row["Lane"] = group["Lane"].iloc[0] row["Project"] = "Control" row["Recipe"] = "0-0" - lane_rows.append(row) - else: - assert phix_set is None, "PhiX controls specified but not loaded." - # Check for index collision within lane, across samples and PhiX - check_distances(lane_rows) - all_rows.extend(lane_rows) + # Add PhiX row to group + group = pd.concat([group, pd.DataFrame([row])], ignore_index=True) + + dfs_samples_and_controls.append(group) - df = pd.DataFrame(all_rows) + df_samples_and_controls = pd.concat(dfs_samples_and_controls, ignore_index=True) - samples_section = f"[SAMPLES]\n{df.to_csv(index=None, header=True)}" + # Check for index collision per lane, across samples and PhiX + for lane, group in df_samples_and_controls.groupby("Lane"): + rows_to_check = group.to_dict(orient="records") + check_distances(rows_to_check) - return samples_section + # + + +def fit_seq(seq: str, length: int, extend: str = None) -> str: + """Fit a sequence to a given length by extending or truncating.""" + if len(seq) == length: + return seq + elif len(seq) > length: + return seq[:length] + else: + if extend is None: + raise AssertionError("Can't extend sequence without extension string.") + else: + if length - len(seq) > len(extend): + raise AssertionError( + "Extension string too short to fit sequence to desired length." + ) + return seq + extend[: length - len(seq)] def check_distances(rows: list[dict], dist_warning_threshold=3) -> None: @@ -374,18 +400,15 @@ def main(args: Namespace): lims = Lims(BASEURI, USERNAME, PASSWORD) process = Process(lims, id=args.pid) - # Name manifest file + # Name manifest flowcell_id = get_flowcell_id(process) - file_name = f"AVITI_run_manifest_{flowcell_id}_{process.id}_{TIMESTAMP}_{process.technician.name.replace(' ','')}.csv" + manifest_name = f"AVITI_run_manifest_{flowcell_id}_{process.id}_{TIMESTAMP}_{process.technician.name.replace(' ','')}" - # Build manifest + samples_dfs = get_samples_dfs(process) - runValues_section = get_runValues_section(process, file_name) - settings_section = get_settings_section() - samples_section = get_samples_section(process) - - manifest = "\n\n".join([runValues_section, settings_section, samples_section]) + # TODO zip + """ # Write manifest with open(file_name, "w") as f: f.write(manifest) @@ -410,6 +433,7 @@ def main(args: Namespace): logging.error("Failed to move run manifest to ngi-nas-ns.", exc_info=True) else: logging.info("Run manifest moved to ngi-nas-ns.") + """ if __name__ == "__main__":