From af733ad429baf64c8ab03e1ba3a72ad6cbdf4518 Mon Sep 17 00:00:00 2001 From: Kai Zhang Date: Fri, 27 Sep 2024 21:51:26 +0800 Subject: [PATCH] upgrade seqspec to 0.3.0 --- precellar/Cargo.toml | 3 +- precellar/src/align.rs | 197 +++++------ precellar/src/seqspec.rs | 699 +++++++++++++++++++++++---------------- python/src/lib.rs | 6 +- 4 files changed, 511 insertions(+), 394 deletions(-) diff --git a/precellar/Cargo.toml b/precellar/Cargo.toml index 3b4e778..4424351 100644 --- a/precellar/Cargo.toml +++ b/precellar/Cargo.toml @@ -20,7 +20,8 @@ multi_reader = "0.1" noodles = { version = "0.80", features = ["core", "fastq", "bam", "sam", "async"] } kdam = "0.5.2" rayon = "1.10" +smallvec = "1.13" serde = "1.0" +serde_yaml = "0.9" regex = "1.6" -yaml-rust = "0.4" zstd = { version = "0.13", features = ["zstdmt"] } \ No newline at end of file diff --git a/precellar/src/align.rs b/precellar/src/align.rs index 81ef1af..46b79ab 100644 --- a/precellar/src/align.rs +++ b/precellar/src/align.rs @@ -1,13 +1,15 @@ use crate::barcode::{BarcodeCorrector, Whitelist}; -use crate::seqspec::{Modality, RegionType, SeqSpec}; +use crate::seqspec::{Assay, Modality, Read, Region, RegionType, SequenceType}; use crate::qc::{AlignQC, Metrics}; use bstr::BString; use kdam::{tqdm, BarExt}; +use std::ops::Range; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; use std::collections::{HashMap, HashSet}; use std::io::BufRead; +use smallvec::SmallVec; use anyhow::{bail, Result}; use noodles::{bam, sam, fastq}; use noodles::sam::alignment::{ @@ -51,9 +53,9 @@ impl Alinger for BurrowsWheelerAligner { pub struct FastqProcessor { base_dir: PathBuf, - seqspec: SeqSpec, + assay: Assay, aligner: A, - current_modality: Option, + current_modality: Option, mito_dna: HashSet, metrics: HashMap, align_qc: HashMap>>, @@ -63,9 +65,9 @@ pub struct FastqProcessor { } impl FastqProcessor { - pub fn new(seqspec: SeqSpec, aligner: A) -> Self { + pub fn new(assay: Assay, aligner: A) -> Self { Self { - seqspec, aligner, current_modality: None, metrics: HashMap::new(), + assay, aligner, current_modality: None, metrics: HashMap::new(), align_qc: HashMap::new(), mito_dna: HashSet::new(), base_dir: PathBuf::from("./"), barcode_correct_prob: 0.975, } @@ -81,8 +83,8 @@ impl FastqProcessor { self } - pub fn modality(&self) -> &str { - self.current_modality.as_ref().expect("modality not set, please call set_modality first") + pub fn modality(&self) -> Modality { + self.current_modality.expect("modality not set, please call set_modality first") } pub fn add_mito_dna(&mut self, mito_dna: &str) { @@ -90,14 +92,14 @@ impl FastqProcessor { .map(|x| self.mito_dna.insert(x)); } - pub fn with_modality(mut self, modality: &str) -> Self { + pub fn with_modality(mut self, modality: Modality) -> Self { self.current_modality = Some(modality.into()); self } pub fn get_report(&self) -> Metrics { - let mut metrics = self.metrics.get(self.modality()).map_or(Metrics::default(), |x| x.clone()); - if let Some(align_qc) = self.align_qc.get(self.modality()) { + let mut metrics = self.metrics.get(&self.modality()).map_or(Metrics::default(), |x| x.clone()); + if let Some(align_qc) = self.align_qc.get(&self.modality()) { align_qc.lock().unwrap().report(&mut metrics); } metrics @@ -116,13 +118,13 @@ impl FastqProcessor { let corrector = BarcodeCorrector::default().with_bc_confidence_threshold(self.barcode_correct_prob); let header = self.aligner.header(); self.align_qc.insert( - self.modality().into(), + self.modality(), Arc::new(Mutex::new(AlignQC { mito_dna: self.mito_dna.clone(), ..AlignQC::default() })) ); - let align_qc = self.align_qc.get(self.modality()).unwrap().clone(); + let align_qc = self.align_qc.get(&self.modality()).unwrap().clone(); let mut progress_bar = tqdm!(total = whitelist.total_count); fq_records.chunk(self.aligner.chunk_size()).map(move |data| if is_paired { @@ -156,7 +158,7 @@ impl FastqProcessor { } (ali1_, ali2_) }).collect::>(); - progress_bar.update(results.len()); + progress_bar.update(results.len()).unwrap(); Either::Right(results) } else { let (barcodes, mut reads): (Vec<_>, Vec<_>) = data.into_iter() @@ -179,7 +181,7 @@ impl FastqProcessor { { align_qc.lock().unwrap().update(&ali, &header); } ali }).collect::>(); - progress_bar.update(results.len()); + progress_bar.update(results.len()).unwrap(); Either::Left(results) }) } @@ -202,111 +204,88 @@ impl FastqProcessor { }) } - pub fn gen_raw_fastq_records(&self) -> FastqRecords { + pub fn gen_raw_fastq_records(&self) -> FastqRecords { let modality = self.modality(); - let fq_list: HashMap<_, _> = self.seqspec.modality(modality).unwrap() - .iter_regions() - .filter(|region| region.region_type == RegionType::Fastq && - region.iter_regions().any(|r| - r.region_type == RegionType::GDNA || - r.region_type == RegionType::CDNA || - r.region_type == RegionType::Barcode || - r.region_type == RegionType::UMI - ) - ).map(|fq| (&fq.id, fq)).collect(); - let mut read_list = HashMap::new(); - self.seqspec.sequence_spec.get(modality).unwrap().iter() - .for_each(|read| if fq_list.contains_key(&read.primer_id) { - read_list.insert(&read.primer_id, read); - }); - let data = read_list.into_iter().map(|(id, read)| { - let is_reverse = read.is_reverse(); - let fq = fq_list.get(id).unwrap(); - let regions = if is_reverse { - fq.subregion_range_rev() - } else { - fq.subregion_range() - }; - (fq.id.clone(), is_reverse, regions, read.read_fastq(self.base_dir.clone())) - }); + let data = self.assay.get_index_of(modality) + .map(|(read, regions)| (read, regions, read.read_fastq(self.base_dir.clone()))); FastqRecords::new(data) } fn count_barcodes(&mut self) -> Result { let modality = self.modality(); let mut whitelist = self.get_whitelist()?; - let region_with_barcode = self.seqspec.modality(self.modality()).unwrap() - .iter_regions().find(|r| - r.region_type == RegionType::Fastq && r.iter_regions().any(|x| x.region_type == RegionType::Barcode) - ).unwrap(); - - let sequence_read = self.seqspec.get_read_by_primer_id(modality, ®ion_with_barcode.id).unwrap(); - let subregions = if sequence_read.is_reverse() { - region_with_barcode.subregion_range_rev() - } else { - region_with_barcode.subregion_range() - }; - let range = subregions.into_iter().find(|x| x.0 == RegionType::Barcode).unwrap().1; - - sequence_read.read_fastq(&self.base_dir).records().for_each(|record| { + + let (read, index) = self.assay.get_index_of(modality).into_iter() + .find(|(_, index)| index.into_iter().any(|x| x.0.region_type.is_barcode())).unwrap(); + let range = index.into_iter().find(|x| x.0.region_type.is_barcode()).unwrap().1; + + read.read_fastq(&self.base_dir).records().for_each(|record| { let mut record = record.unwrap(); - let n = record.sequence().len(); - record = slice_fastq_record(&record, range.start, range.end.unwrap_or(n)); - if sequence_read.is_reverse() { + record = slice_fastq_record(&record, range.start as usize, range.end as usize); + if read.is_reverse() { record = rev_compl_fastq_record(record); } whitelist.count_barcode(std::str::from_utf8(record.sequence()).unwrap(), record.quality_scores()); }); - self.metrics.entry(modality.to_string()).or_insert_with(Metrics::default) + self.metrics.entry(modality).or_insert_with(Metrics::default) .insert("frac_q30_bases_barcode".to_string(), whitelist.frac_q30_bases()); Ok(whitelist) } fn get_whitelist(&self) -> Result { - let regions: Vec<_> = self.seqspec.modality(self.modality()).unwrap() - .iter_regions().filter(|r| r.region_type == RegionType::Barcode).collect(); + let regions: Vec<_> = self.assay.get_region_by_modality(self.modality()).unwrap() + .iter_regions().filter(|r| r.region_type.is_barcode()).collect(); if regions.len() != 1 { bail!("Expecting exactly one barcode region, found {}", regions.len()); } let region = regions[0]; - if region.sequence_type.as_str() == "onlist" { - Ok(Whitelist::new(region.sequence_type.fetch_onlist()?)) + if region.sequence_type == SequenceType::Onlist { + Ok(Whitelist::new(region.onlist.as_ref().unwrap().read()?)) } else { Ok(Whitelist::empty()) } } } -pub struct FastqRecords { - ids: Vec, - is_reverse: Vec, - subregions: Vec>, - readers: Vec>, +pub struct FastqRecords(Vec>); + +struct FastqRecord { + id: String, + is_reverse: bool, + subregion: Vec<(RegionType, Range)>, + reader: fastq::Reader, + min_len: usize, + max_len: usize, } pub type Barcode = fastq::Record; pub type UMI = fastq::Record; impl FastqRecords { - fn new(iter: I) -> Self + fn new<'a, I>(iter: I) -> Self where - I: Iterator, fastq::Reader)>, + I: Iterator)>, fastq::Reader)>, { - let mut ids = Vec::new(); - let mut is_reverse = Vec::new(); - let mut subregions = Vec::new(); - let mut readers = Vec::new(); - iter.for_each(|(f, s, sr, r)| { - ids.push(f); - is_reverse.push(s); - subregions.push(sr.into_iter().filter(|x| - x.0 == RegionType::Barcode || x.0 == RegionType::CDNA || x.0 == RegionType::GDNA - ).collect()); - readers.push(r); - }); - Self { ids, is_reverse, subregions, readers } + let records = iter.map(|(read, regions, reader)| + FastqRecord { + id: read.id().to_string(), + is_reverse: read.is_reverse(), + subregion: regions.into_iter().filter_map(|x| { + let region_type = x.0.region_type; + if region_type.is_barcode() || region_type.is_dna() { + Some((region_type, x.1)) + } else { + None + } + }).collect(), + reader, + min_len: read.min_len as usize, + max_len: read.max_len as usize, + } + ).collect(); + Self(records) } fn chunk(self, chunk_size: usize) -> FastqRecordChunk { @@ -316,17 +295,12 @@ impl FastqRecords { fn is_paired_end(&self) -> bool { let mut read1 = false; let mut read2 = false; - self.subregions.iter().enumerate().for_each(|(i, sr)| { - sr.iter().for_each(|(region_type, _)| { - match region_type { - RegionType::CDNA | RegionType::GDNA => { - if self.is_reverse[i] { - read1 = true; - } else { - read2 = true; - } - }, - _ => (), + self.0.iter().for_each(|x| { + x.subregion.iter().for_each(|(region_type, _)| if region_type.is_dna() { + if x.is_reverse { + read1 = true; + } else { + read2 = true; } }); }); @@ -339,16 +313,19 @@ impl Iterator for FastqRecords { fn next(&mut self) -> Option { let mut id_without_record = Vec::new(); - let records = self.readers.iter_mut().enumerate().map(|(i, reader)| { + let records: SmallVec<[_; 4]> = self.0.iter_mut().map(|x| { let mut record = fastq::Record::default(); - let s = reader.read_record(&mut record).expect("error reading fastq record"); - if s == 0 { - id_without_record.push(self.ids[i].as_str()); + if x.reader.read_record(&mut record).expect("error reading fastq record") == 0 { + id_without_record.push(x.id.as_str()); None } else { + let n = record.sequence().len(); + if n < x.min_len || n > x.max_len { + panic!("Read length ({}) out of range: {}-{}", n, x.min_len, x.max_len); + } Some(record) } - }).collect::>(); + }).collect(); if id_without_record.len() == records.len() { return None; } else if id_without_record.len() > 0 { @@ -360,21 +337,21 @@ impl Iterator for FastqRecords { let mut read2 = None; records.iter().enumerate().for_each(|(i, r)| { let record = r.as_ref().unwrap(); - self.subregions[i].iter().for_each(|(region_type, range)| { - let fq = slice_fastq_record(record, range.start, range.end.unwrap_or(record.sequence().len())); - let is_reverse = self.is_reverse[i]; - match region_type { - RegionType::Barcode => if is_reverse { + self.0[i].subregion.iter().for_each(|(region_type, range)| { + let fq = slice_fastq_record(record, range.start as usize, range.end as usize); + let is_reverse = self.0[i].is_reverse; + if region_type.is_barcode() { + if is_reverse { barcode = Some(rev_compl_fastq_record(fq)); } else { barcode = Some(fq); - }, - RegionType::CDNA | RegionType::GDNA => if is_reverse { + } + } else if region_type.is_dna() { + if is_reverse { read1 = Some(fq); } else { read2 = Some(fq); - }, - _ => (), + } } }); }); @@ -500,17 +477,17 @@ mod tests { #[test] fn test_seqspec_io() { - let spec = SeqSpec::from_path("tests/data/spec.yaml").unwrap(); + let spec = Assay::from_path("tests/data/spec.yaml").unwrap(); let aligner = BurrowsWheelerAligner::new( FMIndex::read("tests/data/hg38").unwrap(), AlignerOpts::default(), PairedEndStats::default() ); let mut processor = FastqProcessor::new(spec, aligner) - .with_modality("atac"); + .with_modality(Modality::Atac); processor.gen_barcoded_alignments().take(6).for_each(|x| { - () + println!("{:?}", x); }); println!("{}", processor.get_report()); diff --git a/precellar/src/seqspec.rs b/precellar/src/seqspec.rs index 5cb834c..3e84d89 100644 --- a/precellar/src/seqspec.rs +++ b/precellar/src/seqspec.rs @@ -1,348 +1,483 @@ -use anyhow::{bail, Context, Result}; +use log::warn; +use serde::{Deserialize, Deserializer}; +use serde_yaml::{self, Value}; +use std::{fs, ops::Range, str::FromStr}; +use anyhow::{bail, Result}; use noodles::fastq; -use std::{collections::HashMap, io::{BufRead, BufReader}, path::{Path, PathBuf}}; -use yaml_rust::{Yaml, YamlLoader}; +use std::{io::{BufRead, BufReader}, path::{Path, PathBuf}}; use cached_path::Cache; -use itertools::Itertools; use crate::io::open_file_for_read; -pub type Modality = String; - -#[derive(Debug)] -pub struct SeqSpec { - pub version: String, - pub id: String, +#[derive(Deserialize, Debug)] +pub struct Assay { + pub seqspec_version: String, + pub assay_id: String, pub name: String, - pub doi: Option, - pub date: Option, - pub description: Option, - pub lib_struct: Option, - pub library_protocol: Option, - pub library_kit: Option, - pub sequence_protocol: String, - pub sequence_kit: Option, - pub library_spec: HashMap, - pub sequence_spec: HashMap>, + pub doi: String, + pub date: String, + pub description: String, + pub modalities: Vec, + pub lib_struct: String, + pub library_protocol: LibraryProtocol, + pub library_kit: LibraryKit, + pub sequence_protocol: SequenceProtocol, + pub sequence_kit: SequenceKit, + pub sequence_spec: Option>, + pub library_spec: Option>, } -impl TryFrom<&Yaml> for SeqSpec { - type Error = anyhow::Error; - - fn try_from(yaml: &Yaml) -> Result { - let version = yaml["seqspec_version"].as_str().unwrap().to_string(); - let id = yaml["assay_id"].as_str().unwrap().to_string(); - let name = yaml["name"].as_str().unwrap().to_string(); - let doi = yaml["doi"].as_str().map(|x| x.to_string()); - let date = yaml["date"].as_str().map(|x| x.to_string()); - let description = yaml["description"].as_str().map(|x| x.to_string()); - let lib_struct = yaml["lib_struct"].as_str().map(|x| x.to_string()); - let library_protocol = yaml["library_protocol"].as_str().map(|x| x.to_string()); - let library_kit = yaml["library_kit"].as_str().map(|x| x.to_string()); - let sequence_protocol = yaml["sequence_protocol"].as_str().unwrap().to_string(); - let sequence_kit = yaml["sequence_kit"].as_str().map(|x| x.to_string()); - let library_spec = yaml["library_spec"].clone().into_iter().map(|region| - (region["region_type"].as_str().unwrap().to_string(), Region::try_from(region.clone()).unwrap()) - ).collect(); - let sequence_spec = yaml["sequence_spec"].clone().into_iter() - .map(|read| (read["modality"].as_str().unwrap().to_string(), Read::try_from(&read).unwrap())) - .sorted_by(|a, b| a.0.cmp(&b.0)) - .chunk_by(|x| x.0.clone()) - .into_iter() - .map(|(k, g)| (k, g.map(|x| x.1).collect())) - .collect(); - Ok(Self { - version, id, name, doi, date, description, lib_struct, library_protocol, - library_kit, sequence_protocol, sequence_kit, library_spec, sequence_spec, +impl Assay { + pub fn from_path>(path: P) -> Result { + let yaml_str = fs::read_to_string(path)?; + Ok(serde_yaml::from_str(&yaml_str)?) + } + + /// Get the index of atomic regions of each read in the sequence spec. + pub fn get_index_of(&self, modality: Modality) -> impl Iterator)>)> { + self.sequence_spec.as_ref().expect("sequence_spec is empty") + .iter().filter_map(move |read| if read.modality == modality { + let region = self.get_region_by_modality(modality)?; + let index = read.get_index(region).expect("primer not found"); + Some((read, index)) + } else { + None + }) + } + + pub fn iter_reads(&self, modality: Modality) -> impl Iterator { + self.sequence_spec.as_ref().expect("sequence_spec is empty").iter() + .filter(move |read| read.modality == modality) + } + + /// Get the top-level region for a given modality, i.e., the region that contains all other regions. + pub fn get_region_by_modality(&self, modality: Modality) -> Option<&Region> { + self.library_spec.as_ref()?.iter().find(|region| { + region.sequence_type == SequenceType::Joined && + region.region_type == RegionType::Modality(modality) }) } } -impl TryFrom for SeqSpec { - type Error = anyhow::Error; +#[derive(Deserialize, Debug, Copy, Clone, PartialEq, Eq, Hash)] +#[serde(rename_all = "lowercase")] +pub enum Modality { + Dna, + Rna, + Tag, + Protein, + Atac, + Crispr, +} - fn try_from(yaml: Yaml) -> Result { - Self::try_from(&yaml) +impl FromStr for Modality { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + match s { + "dna" => Ok(Modality::Dna), + "rna" => Ok(Modality::Rna), + "tag" => Ok(Modality::Tag), + "protein" => Ok(Modality::Protein), + "atac" => Ok(Modality::Atac), + "crispr" => Ok(Modality::Crispr), + _ => bail!("Invalid modality: {}", s), + } } } -impl SeqSpec { - pub fn from_path>(path: P) -> Result { - let text = std::fs::read_to_string(&path)?; - YamlLoader::load_from_str(&text)? - .into_iter().next().with_context(|| format!("Empty YAML file: {:?}", path.as_ref()))? - .try_into() +#[derive(Debug)] +pub enum LibraryProtocol { + Standard(String), + Custom(Vec), +} + +#[derive(Deserialize, Debug)] +pub struct ProtocolItem { + pub protocol_id: String, + pub name: Option, + pub modality: Modality, +} + +impl<'de> Deserialize<'de> for LibraryProtocol { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let value = Value::deserialize(deserializer)?; + match value { + Value::String(s) => Ok(LibraryProtocol::Standard(s)), + Value::Sequence(seq) => { + let items = seq.into_iter().map(|item| serde_yaml::from_value(item).unwrap()) + .collect::>(); + Ok(LibraryProtocol::Custom(items)) + } + _ => Err(serde::de::Error::custom("invalid value")), + } } +} + +#[derive(Debug)] +pub enum LibraryKit { + Standard(String), + Custom(Vec), +} - pub fn modality(&self, name: &str) -> Option<&Region> { - self.library_spec.get(name) +#[derive(Deserialize, Debug)] +pub struct KitItem { + pub kit_id: String, + pub name: Option, + pub modality: Modality, +} + +impl<'de> Deserialize<'de> for LibraryKit { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let value = Value::deserialize(deserializer)?; + match value { + Value::String(s) => Ok(LibraryKit::Standard(s)), + Value::Sequence(seq) => { + let items = seq.into_iter().map(|item| serde_yaml::from_value(item).unwrap()) + .collect::>(); + Ok(LibraryKit::Custom(items)) + } + _ => Err(serde::de::Error::custom("invalid value")), + } } +} - /// Return an iterator over all regions in the region tree. - pub fn iter_regions(&self) -> impl Iterator { - self.library_spec.values().flat_map(|x| { - Box::new(std::iter::once(x).chain(x.iter_regions())) as Box> - }) +#[derive(Debug)] +pub enum SequenceProtocol { + Custom(Vec), + Standard(String), +} + +impl <'de> Deserialize<'de> for SequenceProtocol { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let value = Value::deserialize(deserializer)?; + match value { + Value::String(s) => Ok(SequenceProtocol::Standard(s)), + Value::Sequence(seq) => { + let items = seq.into_iter().map(|item| serde_yaml::from_value(item).unwrap()) + .collect::>(); + Ok(SequenceProtocol::Custom(items)) + } + _ => Err(serde::de::Error::custom("invalid value")), + } } +} - pub fn get_read_by_primer_id(&self, modality: &str, primer_id: &str) -> Option<&Read> { - self.sequence_spec.get(modality).unwrap().iter().find(|x| x.primer_id == primer_id) +#[derive(Debug)] +pub enum SequenceKit { + Standard(String), + Custom(Vec), +} + +impl<'de> Deserialize<'de> for SequenceKit { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let value = Value::deserialize(deserializer)?; + match value { + Value::String(s) => Ok(SequenceKit::Standard(s)), + Value::Sequence(seq) => { + let items = seq.into_iter().map(|item| serde_yaml::from_value(item).unwrap()) + .collect::>(); + Ok(SequenceKit::Custom(items)) + } + _ => Err(serde::de::Error::custom("invalid value")), + } } } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Deserialize, Debug)] pub struct Read { - pub id: Vec, // The file paths of the sequencing reads from multiple lanes. - pub name: String, + read_id: String, + pub name: Option, pub modality: Modality, - pub primer_id: String, // the primer_id should maps to the correct region id off - // of which the sequencing read is generated in the library_spec. - pub length: Length, - strand: bool, + pub primer_id: String, + pub min_len: u32, + pub max_len: u32, + pub strand: Strand, + pub files: Option>, } impl Read { + pub fn id(&self) -> &str { + &self.read_id + } + pub fn read_fastq>(&self, base_dir: P) -> fastq::Reader { + let base_dir = base_dir.as_ref().to_path_buf(); let reader = multi_reader::MultiReader::new( - self.id.clone().into_iter().map(move |x| { - let mut path = PathBuf::from(x); - path = if path.is_absolute() { - path - } else { - base_dir.as_ref().join(path) - }; - open_file_for_read(path) - }) + self.files.clone().unwrap().into_iter().map(move |file| file.open(&base_dir)) ); fastq::Reader::new(BufReader::new(reader)) } - pub fn is_reverse(&self) -> bool { - !self.strand - } -} + fn get_index<'a>(&'a self, region: &'a Region) -> Option)>> { + if region.sequence_type != SequenceType::Joined { + return None; + } -impl TryFrom<&Yaml> for Read { - type Error = anyhow::Error; - - fn try_from(yaml: &Yaml) -> Result { - let id = yaml["read_id"].as_str().unwrap().split(',').map(|x| x.trim().to_string()).collect(); - let name = yaml["name"].as_str().unwrap().to_string(); - let modality = yaml["modality"].as_str().unwrap().to_string(); - let primer_id = yaml["primer_id"].as_str().unwrap().to_string(); - let min_length = yaml["min_len"].as_i64().unwrap() as usize; - let max_length = yaml["max_len"].as_i64().unwrap() as usize; - let length = if min_length == max_length { - Length::Fixed(min_length) + let mut found_primer = false; + + let result = if self.is_reverse() { + self.components( + region.regions.as_ref().unwrap().iter().rev() + .skip_while(|region| { + let found = !(region.region_type.is_sequencing_primer() && region.region_id == self.primer_id); + if found { + found_primer = true; + } + found + }).skip(1) + ) } else { - Length::Range(min_length, max_length) + self.components( + region.regions.as_ref().unwrap().iter() + .skip_while(|region| { + let found = !(region.region_type.is_sequencing_primer() && region.region_id == self.primer_id); + if found { + found_primer = true; + } + found + }).skip(1) + ) }; - let strand = match yaml["strand"].as_str().unwrap() { - "pos" => true, - "neg" => false, - x => bail!("Invalid strand: {:?}", x), - }; - Ok(Self { id, name, modality, primer_id, length, strand }) + + if found_primer { + Some(result) + } else { + None + } } -} -#[derive(Debug, Clone)] -pub enum SequenceType { - Fixed(String), - OnList { - filepath: String, - md5: Option, - local: bool, - }, - Random, - Joined, -} - -impl SequenceType { - pub fn as_str(&self) -> &'static str { - match self { - Self::Fixed(_) => "fixed", - Self::OnList { .. } => "onlist", - Self::Random => "random", - Self::Joined => "joined", + fn components<'a, I: Iterator>(&self, regions: I) -> Vec<(&'a Region, Range)> { + let mut result = Vec::new(); + let read_len = self.max_len; + let mut cur_pos = 0; + for region in regions { + if region.min_len == region.max_len { + let end = (cur_pos + region.min_len).min(read_len); + result.push((region, cur_pos..end)); + if end == read_len { + break; + } + cur_pos = end; + } else if cur_pos + region.min_len >= read_len { + result.push((region, cur_pos..read_len)); + break; + } else if cur_pos + region.max_len < read_len { + warn!("Read ({}) length exceeds maximum length of the variable-length region (insertion), \ + truncating the reads to the maximum length of the region. \ + If this is not the desired behavior, please adjust the region lengths.", self.read_id); + result.push((region, cur_pos..cur_pos + region.max_len)); + break; + } else { + warn!("Reads ({}) may contain additional bases downstream of the variable-length region, e.g., adapter sequences.", self.read_id); + result.push((region, cur_pos..read_len)); + break; + } } + result } - pub(crate) fn fetch_onlist(&self) -> Result> { - match self { - Self::OnList { filepath, .. } => { - let cache = Cache::new()?; - let file = cache.cached_path(&filepath)?; - let reader = std::io::BufReader::new(open_file_for_read(file)); - Ok(reader.lines().map(|x| x.unwrap()).collect()) - } - _ => panic!("Not an onlist sequence type"), + pub fn is_reverse(&self) -> bool { + match self.strand { + Strand::Neg => true, + Strand::Pos => false, } } } -#[derive(Debug, PartialEq, Eq, Clone)] -pub enum RegionType { - Fastq, - Barcode, - UMI, - CDNA, - GDNA, - Other(String), +#[derive(Deserialize, Debug)] +#[serde(rename_all = "lowercase")] +pub enum Strand { + Pos, + Neg, +} + +#[derive(Deserialize, Debug, Clone)] +pub struct File { + pub file_id: String, + pub filename: String, + pub filetype: String, + pub filesize: u64, + pub url: String, + pub urltype: UrlType, + pub md5: String, } -impl From<&str> for RegionType { - fn from(s: &str) -> Self { - match s.trim().to_lowercase().as_str() { - "fastq" => Self::Fastq, - "barcode" => Self::Barcode, - "umi" => Self::UMI, - "cdna" => Self::CDNA, - "gdna" => Self::GDNA, - _ => Self::Other(s.to_string()), +impl File { + pub fn open>(&self, base_dir: P) -> Box { + match self.urltype { + UrlType::Local => { + let mut path = PathBuf::from(&self.url); + path = if path.is_absolute() { + path + } else { + base_dir.as_ref().join(path) + }; + Box::new(open_file_for_read(path)) + } + _ => { + let cache = Cache::new().unwrap(); + let file = cache.cached_path(&self.url).unwrap(); + Box::new(open_file_for_read(file)) + } } } } -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum Length { - Fixed(usize), - Range(usize, usize), +#[derive(Deserialize, Debug, Copy, Clone)] +#[serde(rename_all = "lowercase")] +pub enum UrlType { + Local, + Ftp, + Http, + Https, } -#[derive(Debug, Clone)] +#[derive(Deserialize, Debug, Clone)] pub struct Region { - pub id: String, + pub region_id: String, pub region_type: RegionType, - pub name: String, pub sequence_type: SequenceType, - pub length: Length, - pub sub_regions: Vec, + pub sequence: String, + pub min_len: u32, + pub max_len: u32, + pub onlist: Option, + pub regions: Option>, } -impl TryFrom for Region { - type Error = anyhow::Error; - - fn try_from(yaml: Yaml) -> Result { - let id = yaml["region_id"].as_str().unwrap().to_string(); - let region_type = yaml["region_type"].as_str().unwrap().into(); - let name = yaml["name"].as_str().unwrap().to_string(); - let sequence_type = match yaml["sequence_type"].as_str().unwrap() { - "fixed" => SequenceType::Fixed(yaml["sequence"].as_str().unwrap().to_string()), - "onlist" => { - let md5 = yaml["onlist"].as_hash().unwrap().get(&Yaml::String("md5".to_string())) - .and_then(|x| x.as_str()).map(|x| x.to_string()); - let filepath = yaml["onlist"]["filename"].as_str().unwrap().to_string(); - let local = match yaml["onlist"]["location"].as_str().unwrap() { - "local" => true, - "remote" => false, - x => bail!("Invalid location: {:?}", x), - }; - SequenceType::OnList { filepath, md5, local } - }, - "random" => SequenceType::Random, - "joined" => SequenceType::Joined, - _ => bail!("Invalid sequence type: {:?}", yaml["sequence_type"]), - }; - let min_length = yaml["min_len"].as_i64().unwrap() as usize; - let max_length = yaml["max_len"].as_i64().unwrap() as usize; - let length = if min_length == max_length { - Length::Fixed(min_length) - } else { - Length::Range(min_length, max_length) - }; - let sub_regions = yaml["regions"].clone().into_iter().map(Region::try_from).collect::>>()?; - Ok(Self { id, region_type, name, sequence_type, length, sub_regions }) +impl Region { + /// Return an iterator over all regions in the region tree. + pub fn iter_regions(&self) -> impl Iterator { + self.regions.as_ref().unwrap().iter() } } -#[derive(Debug, Clone, Copy)] -pub struct Range { - pub start: usize, - pub end: Option, +#[derive(Deserialize, Debug, Copy, Clone, PartialEq)] +#[serde(rename_all = "lowercase")] +pub enum RegionType { + Barcode, + Cdna, + #[serde(rename = "custom_primer")] + CustomPrimer, + Fastq, + FastqLink, + Gdna, + Hic, + #[serde(rename = "illumina_p5")] + IlluminaP5, + #[serde(rename = "illumina_p7")] + IlluminaP7, + Index5, + Index7, + Linker, + Me1, + Me2, + Methyl, + Named, + #[serde(rename = "nextera_read1")] + NexteraRead1, + #[serde(rename = "nextera_read2")] + NexteraRead2, + #[serde(rename = "poly_a")] + PolyA, + #[serde(rename = "poly_g")] + PolyG, + #[serde(rename = "poly_t")] + PolyT, + #[serde(rename = "poly_c")] + PolyC, + S5, + S7, + #[serde(rename = "truseq_read1")] + TruseqRead1, + #[serde(rename = "truseq_read2")] + TruseqRead2, + Umi, + #[serde(untagged)] + Modality(Modality), } -impl Region { - /// Return the list of fastq regions in the region tree. - pub fn fastqs(&self) -> Vec<&Region> { - let mut result = Vec::new(); - for x in &self.sub_regions { - if x.region_type == RegionType::Fastq { - result.push(x); - } else { - let remains = x.fastqs(); - result.extend(remains); - } +impl RegionType { + pub fn is_barcode(&self) -> bool { + match self { + RegionType::Barcode => true, + _ => false, } - result } - /// Return an iterator over all regions in the region tree. - pub fn iter_regions(&self) -> impl Iterator { - self.sub_regions.iter().flat_map(|x| { - Box::new(std::iter::once(x).chain(x.iter_regions())) as Box> - }) - } - - /// Return the start and end position of each subregion in a fastq region. - pub fn subregion_range(&self) -> Vec<(RegionType, Range)> { - if self.region_type != RegionType::Fastq { - panic!("must be called on a fastq region"); + pub fn is_umi(&self) -> bool { + match self { + RegionType::Umi => true, + _ => false, } - - get_region_range(&self.sub_regions).collect() } - /// Return the start and end position of each subregion in a fastq region. - /// Make adjustments as if the fastq read is reverse complemented. - pub fn subregion_range_rev(&self) -> Vec<(RegionType, Range)> { - if self.region_type != RegionType::Fastq { - panic!("must be called on a fastq region"); + /// Check if the region contains genomic sequences. + pub fn is_dna(&self) -> bool { + match self { + RegionType::Gdna | RegionType::Cdna => true, + _ => false, } + } - let ranges = get_region_range(&self.sub_regions).collect::>(); - if ranges.len() <=1 { - ranges - } else { - let total_len = ranges.last().unwrap().1.end.unwrap(); - ranges.into_iter().map(move |(region_type, range)| { - let start = total_len - range.end.unwrap(); - let end = start + (range.end.unwrap() - range.start); - (region_type, Range {start, end: Some(end)}) - }).collect() + pub fn is_sequencing_primer(&self) -> bool { + match self { + RegionType::CustomPrimer | + RegionType::TruseqRead1 | RegionType::TruseqRead2 | + RegionType::NexteraRead1 | RegionType::NexteraRead2 | + RegionType::IlluminaP5 | RegionType::IlluminaP7 => true, + _ => false, } } } -/// Return the start and end position of each subregion in a fastq region. -fn get_region_range(regions: &[Region]) -> impl Iterator + '_ { - let len = regions.len(); - let mut current_pos = 0; - let mut i = 0; +#[derive(Deserialize, Debug, Copy, Clone, PartialEq)] +#[serde(rename_all = "lowercase")] +pub enum SequenceType { + Fixed, // sequence string is known and fixed in length and nucleotide composition + Random, // the sequence is not known a-priori + Onlist, // the sequence is derived from an onlist + Joined, // the sequence is created from nested regions and the regions property must contain Regions +} - std::iter::from_fn(move || { - if i >= len { - return None; - } +#[derive(Deserialize, Debug, Clone)] +pub struct Onlist { + pub file_id: String, + pub filename: String, + pub filetype: String, + pub filesize: u64, + url: String, + pub urltype: UrlType, + pub location: Option, + pub md5: String, +} - let region = ®ions[i]; - let range = match region.length { - Length::Fixed(n) => { - let r = Range {start: current_pos, end: Some(current_pos + n)}; - current_pos += n; - r - } - Length::Range(_, _) => { - if i != len - 1 { - panic!("Variable length region must be the last region in a region list"); - } else { - Range {start: current_pos, end: None} - } - } - }; - i += 1; - Some((region.region_type.clone(), range)) - }) +impl Onlist { + pub fn read(&self) -> Result> { + let cache = Cache::new()?; + let file = cache.cached_path(&self.url)?; + let reader = std::io::BufReader::new(open_file_for_read(file)); + Ok(reader.lines().map(|x| x.unwrap()).collect()) + } +} + +#[derive(Deserialize, Debug, Clone, Copy)] +#[serde(rename_all = "lowercase")] +pub enum Location { + Local, + Remote, } #[cfg(test)] @@ -350,24 +485,26 @@ mod tests { use super::*; #[test] - fn test_seqspec_io() { - let spec = SeqSpec::from_path("tests/data/spec.yaml").unwrap(); - - for fq in spec.modality("rna").unwrap().fastqs() { - println!("{:?}", fq.subregion_range()); - } + fn test_parse() { + let protocol: LibraryProtocol = serde_yaml::from_str("10X").unwrap(); + println!("{:?}", protocol); + + let protocol: LibraryProtocol = serde_yaml::from_str( + "- !LibProtocol + protocol_id: CG000338 Chromium Next GEM Multiome ATAC + Gene Expression Rev. D protocol (10x Genomics) + name: DogmaSeq-DIG + modality: rna" + ).unwrap(); + println!("{:?}", protocol); } #[test] - fn test_onlist() { - let spec = SeqSpec::from_path("tests/data/spec.yaml").unwrap(); + fn test_parse_yaml() { + let yaml_str = fs::read_to_string("tests/data/spec.yaml").expect("Failed to read file"); - spec.iter_regions().for_each(|region| { - if let SequenceType::OnList { filepath, .. } = ®ion.sequence_type { - if region.sequence_type.fetch_onlist().is_err() { - panic!("Failed to fetch onlist: {:?}", region.region_type); - } - } - }); + let assay: Assay = serde_yaml::from_str(&yaml_str).expect("Failed to parse YAML"); + for x in assay.get_index_of(Modality::Protein) { + println!("{:?}", x); + } } -} +} \ No newline at end of file diff --git a/python/src/lib.rs b/python/src/lib.rs index d0e0b38..be3e1d9 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -13,7 +13,7 @@ use ::precellar::{ align::{Alinger, FastqProcessor, NameCollatedRecords}, fragment::FragmentGenerator, io::{open_file_for_write, Compression}, - qc::{FragmentQC, Metrics, AlignQC}, seqspec::SeqSpec, + qc::{FragmentQC, Metrics, AlignQC}, seqspec::{Assay, Modality}, }; #[cfg(not(target_env = "msvc"))] @@ -90,9 +90,11 @@ fn align( temp_dir: Option, num_threads: u32, ) -> Result> { + let modality = Modality::from_str(modality).unwrap(); + assert!(output_bam.is_some() || output_fragment.is_some(), "either output_bam or output_fragment must be provided"); - let spec = SeqSpec::from_path(&seqspec).unwrap(); + let spec = Assay::from_path(&seqspec).unwrap(); let aligner = BurrowsWheelerAligner::new( FMIndex::read(genome_index).unwrap(), AlignerOpts::default().with_n_threads(num_threads as usize),