Skip to content

Commit

Permalink
big wip
Browse files Browse the repository at this point in the history
  • Loading branch information
kedhammar committed Sep 12, 2024
1 parent 221803a commit fc9f6d1
Showing 1 changed file with 70 additions and 46 deletions.
116 changes: 70 additions & 46 deletions scripts/generate_aviti_run_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ def idxs_from_label(label: str) -> list[str | tuple[str, str]]:


def get_flowcell_id(process: Process) -> str:
"""Get the Element flowcell ID from the process."""
flowcell_ids = [
op.container.name for op in process.all_outputs() if op.type == "Analyte"
]
Expand All @@ -134,7 +135,7 @@ def get_flowcell_id(process: Process) -> str:
return flowcell_id


def get_runValues_section(process: Process, file_name: str) -> str:
def get_runValues_section(process: Process, manifest_name: str) -> str:
"""Generate the [RUNVALUES] section of the AVITI run manifest and return it as a string."""

read_recipe = "-".join(
Expand All @@ -151,7 +152,7 @@ def get_runValues_section(process: Process, file_name: str) -> str:
"[RUNVALUES]",
"KeyName, Value",
f"lims_step_name, {sanitize(process.type.name)}",
f"file_name, {sanitize(file_name)}",
f"manifest_name, {sanitize(manifest_name)}",
f"read_recipe, {read_recipe}",
]
)
Expand All @@ -171,41 +172,42 @@ def get_settings_section() -> str:
return settings_section


def get_samples_section(process: Process) -> str:
"""Generate the [SAMPLES] section of the AVITI run manifest and return it as a string."""
def get_samples_dfs(process: Process) -> list[pd.DataFrame]:
"""Generate dataframes of samples with the same index duplicity and length, adding PhiX controls as needed."""

# Assert output analytes loaded on flowcell
arts_out = [op for op in process.all_outputs() if op.type == "Analyte"]
assert (
len(arts_out) == 1 or len(arts_out) == 2
), "Expected one or two output analytes."

lanes = [art_out.location[1].split(":")[0] for art_out in arts_out]
assert set(lanes) == {"1"} or set(lanes) == {
"1",
"2",
}, "Expected a single-lane or dual-lane flowcell."

# Iterate over pools
all_rows = []
# Iterate over pool / lane
sample_rows = []
for art_out, lane in zip(arts_out, lanes):
lane_rows = []
assert (
"AVITI Flow Cell" in art_out.container.type.name
), f"Unsupported container type {art_out.container.type.name}."
assert (
len(art_out.samples) > 1 and len(art_out.reagent_labels) > 1
), "Not a pool."
assert len(art_out.samples) == len(
art_out.reagent_labels
), "Unequal number of samples and reagent labels."

# Get sample-label linkage via database
sample2label: dict[str, str] = get_pool_sample_label_mapping(art_out)
assert len(set(art_out.reagent_labels)) == len(
art_out.reagent_labels
), "Detected non-unique reagent labels."

# Record PhiX UDFs for each output artifact
phix_loaded: bool = art_out.udf["% phiX"] != 0
phix_set_name = art_out.udf.get("Element PhiX Set", None)
if phix_loaded:
assert (
phix_set_name is not None
), "PhiX controls loaded but no kit specified."
else:
assert phix_set_name is None, "PhiX controls specified but not loaded."

# Collect rows for each sample
samples = art_out.samples
# Iterate over samples
for sample in samples:
# Project name and sequencing setup
if sample.project:
Expand All @@ -228,41 +230,65 @@ def get_samples_section(process: Process) -> str:
row["Lane"] = lane
row["Project"] = project
row["Recipe"] = seq_setup
row["phix_loaded"] = phix_loaded
row["phix_set_name"] = phix_set_name

lane_rows.append(row)
sample_rows.append(row)

# Add PhiX controls if added:
phix_loaded: bool = art_out.udf["% phiX"] != 0
phix_set_name = art_out.udf.get("Element PhiX Set", None)
# Get master dateframe
df_samples = pd.DataFrame(sample_rows)

if phix_loaded:
assert (
phix_set_name is not None
), "PhiX controls loaded but no kit specified."
# Calculate index lengths for grouping
df_samples["len_idx1"] = df_samples["Index1"].apply(len)
df_samples["len_idx2"] = df_samples["Index2"].apply(len)

# Group into composite dataframes and add PhiX controls w. correct length
dfs_samples_and_controls = []
for (len_idx1, len_idx2), group in df_samples.groupby(["len_idx1", "len_idx2"]):
# Add PhiX if needed
if group["phix_loaded"].any():
phix_set_name = group["phix_set_name"].iloc[0]
phix_set = PHIX_SETS[phix_set_name]

for phix_idx_pair in phix_set["indices"]:
row = {}
row["SampleName"] = phix_set["nickname"]
row["Index1"] = phix_idx_pair[0]
row["Index2"] = phix_idx_pair[1]
row["Lane"] = lane
row["Index1"] = fit_seq(phix_idx_pair[0], len_idx1)
row["Index2"] = fit_seq(phix_idx_pair[1], len_idx2)
row["Lane"] = group["Lane"].iloc[0]
row["Project"] = "Control"
row["Recipe"] = "0-0"
lane_rows.append(row)
else:
assert phix_set is None, "PhiX controls specified but not loaded."

# Check for index collision within lane, across samples and PhiX
check_distances(lane_rows)
all_rows.extend(lane_rows)
# Add PhiX row to group
group = pd.concat([group, pd.DataFrame([row])], ignore_index=True)

dfs_samples_and_controls.append(group)

df = pd.DataFrame(all_rows)
df_samples_and_controls = pd.concat(dfs_samples_and_controls, ignore_index=True)

samples_section = f"[SAMPLES]\n{df.to_csv(index=None, header=True)}"
# Check for index collision per lane, across samples and PhiX
for lane, group in df_samples_and_controls.groupby("Lane"):
rows_to_check = group.to_dict(orient="records")
check_distances(rows_to_check)

return samples_section
#


def fit_seq(seq: str, length: int, extend: str = None) -> str:
"""Fit a sequence to a given length by extending or truncating."""
if len(seq) == length:
return seq
elif len(seq) > length:
return seq[:length]
else:
if extend is None:
raise AssertionError("Can't extend sequence without extension string.")
else:
if length - len(seq) > len(extend):
raise AssertionError(
"Extension string too short to fit sequence to desired length."
)
return seq + extend[: length - len(seq)]


def check_distances(rows: list[dict], dist_warning_threshold=3) -> None:
Expand Down Expand Up @@ -374,18 +400,15 @@ def main(args: Namespace):
lims = Lims(BASEURI, USERNAME, PASSWORD)
process = Process(lims, id=args.pid)

# Name manifest file
# Name manifest
flowcell_id = get_flowcell_id(process)
file_name = f"AVITI_run_manifest_{flowcell_id}_{process.id}_{TIMESTAMP}_{process.technician.name.replace(' ','')}.csv"
manifest_name = f"AVITI_run_manifest_{flowcell_id}_{process.id}_{TIMESTAMP}_{process.technician.name.replace(' ','')}"

# Build manifest
samples_dfs = get_samples_dfs(process)

runValues_section = get_runValues_section(process, file_name)
settings_section = get_settings_section()
samples_section = get_samples_section(process)

manifest = "\n\n".join([runValues_section, settings_section, samples_section])
# TODO zip

"""
# Write manifest
with open(file_name, "w") as f:
f.write(manifest)
Expand All @@ -410,6 +433,7 @@ def main(args: Namespace):
logging.error("Failed to move run manifest to ngi-nas-ns.", exc_info=True)
else:
logging.info("Run manifest moved to ngi-nas-ns.")
"""


if __name__ == "__main__":
Expand Down

0 comments on commit fc9f6d1

Please sign in to comment.