Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat] add an option to only process specific lanes #36

Merged
merged 5 commits into from
Jan 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 42 additions & 22 deletions src/lib/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,12 @@ pub struct Opts {
/// Prepend this prefix to all output metric file names.
#[clap(long, display_order = 31)]
pub metric_prefix: Option<String>,

/// Select a subset of lanes to demultiplex. Will cause only samples and input FASTQs with
/// the given `Lane`(s) to be demultiplexed. Samples without a lane will be ignored, and
/// FASTQs without lane information will be ignored.
#[clap(long, short = 'l', required = false, multiple = true, display_order = 31)]
pub lane: Vec<usize>,
}

impl Opts {
Expand All @@ -261,10 +267,15 @@ impl Opts {
///
/// If the input FASTQS are a mix of files and path prefixes, this function will return an
/// error.
///
/// If lanes is non-empty, then the input FASTQs must be path prefixes so that their lane can
/// be inferred from the FASTQ file name, and only those FASTQs from the given lane(s) will be
/// returned.
pub fn from(
fastqs: Vec<PathBuf>,
read_structures: Vec<ReadStructure>,
fastqs: &[PathBuf],
read_structures: &[ReadStructure],
sample_barcode_in_fastq_header: bool,
lanes: &[usize],
) -> Result<Vec<FastqsAndReadStructure>> {
// must have FASTQs
ensure!(!fastqs.is_empty(), "No FASTQs or path prefixes found with --fastq");
Expand All @@ -285,14 +296,16 @@ impl Opts {
"Read structures may not contain sample barcode segment(s) when extracting sample barcodes from the FASTQ header");
}

FastqsAndReadStructure::zip(&fastqs, &read_structures)
ensure!(lanes.is_empty(), "Cannot specify lanes when explicit FASTQ paths are used");

FastqsAndReadStructure::zip(fastqs, read_structures)
} else if fastqs.iter().all(|f| !f.is_file()) {
ensure!(
read_structures.is_empty(),
"Read Structure must not be given when the input FASTQs are a path prefix."
);

let input_fastq_group = FastqsAndReadStructure::from_prefixes(&fastqs);
let input_fastq_group = FastqsAndReadStructure::from_prefixes(fastqs, lanes);

// Ensure that if the sample barcodes are to be extracted from the FASTQ header then
// no index reads (sample barcode reads) should be found.
Expand Down Expand Up @@ -332,7 +345,8 @@ impl Opts {
}

// Ensure that all FASTQs have the same read name in the header. This checks only the
// first read in each FASTQ
// first read in each FASTQ, and only up to the first colon in the read name. The latter
// is required in case we have reads across lanes.
let mut first_head: Vec<u8> = vec![];
let mut end_index: usize = 0;
let mut first_file: String = String::from("");
Expand All @@ -351,12 +365,12 @@ impl Opts {
Some(Ok(record)) => {
if first_head.is_empty() {
first_head = record.head().to_vec();
end_index = first_head.find_byte(b' ').unwrap_or(first_head.len());
end_index = first_head.find_byte(b':').unwrap_or(first_head.len());
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this was a bug, since the lane is in the read name prior to the first space.

first_file = file.to_string_lossy().to_string();
} else {
let head = record.head();
let ok = head.len() == end_index
|| (head.len() > end_index && head[end_index] == b' ');
|| (head.len() > end_index && head[end_index] == b':');
let ok = ok && first_head[0..end_index] == head[0..end_index];
ensure!(
ok,
Expand Down Expand Up @@ -519,6 +533,7 @@ impl Default for Opts {
skip_read_name_check: false,
sample_barcode_in_fastq_header: false,
metric_prefix: None,
lane: vec![],
}
}
}
Expand Down Expand Up @@ -566,10 +581,15 @@ impl FastqsAndReadStructure {
/// FASTQS with the same kind and kind number are read in serially, (2) the FASTQS are
/// synchronized across groups (e.g. the same lane/prefix across kind/kind-number will be read
/// at the same time), and (3) we emit the read pairs in the correct order (e.g. R1/R2).
pub fn from_prefixes(prefixes: &[PathBuf]) -> Vec<FastqsAndReadStructure> {
///
/// If lanes is non-empty, then only those FASTQs from the given lane(s) will be returned.
pub fn from_prefixes(prefixes: &[PathBuf], lanes: &[usize]) -> Vec<FastqsAndReadStructure> {
// glob all the FASTQs
let input_fastqs: Vec<InputFastq> =
prefixes.iter().flat_map(|prefix| InputFastq::glob(&prefix).unwrap()).collect();
let input_fastqs: Vec<InputFastq> = prefixes
.iter()
.flat_map(|prefix| InputFastq::glob(&prefix).unwrap())
.filter(|fq| lanes.is_empty() || lanes.contains(&fq.lane))
.collect();

// Collect all FASTQs by kind and kind number
let mut unsorted_groups: HashMap<(SegmentType, u32), Vec<InputFastq>> = HashMap::new();
Expand Down Expand Up @@ -821,7 +841,7 @@ mod test {
..Opts::default()
};

let result = Opts::from(opts.fastqs, opts.read_structures, false);
let result = Opts::from(&opts.fastqs, &opts.read_structures, false, &vec![]);
assert!(result.is_err());
if let Err(error) = result {
assert!(error.to_string().contains("must either all be files or all path prefixes"));
Expand All @@ -836,7 +856,7 @@ mod test {
..Opts::default()
};

let result = Opts::from(opts.fastqs, opts.read_structures, false);
let result = Opts::from(&opts.fastqs, &opts.read_structures, false, &vec![]);
assert!(result.is_err());
if let Err(error) = result {
assert!(error.to_string().contains("No FASTQs or path prefixes found"));
Expand All @@ -860,7 +880,7 @@ mod test {
..Opts::default()
};

let result = Opts::from(opts.fastqs, opts.read_structures, false);
let result = Opts::from(&opts.fastqs, &opts.read_structures, false, &vec![]);
assert!(result.is_err());
if let Err(error) = result {
assert!(error
Expand All @@ -879,7 +899,7 @@ mod test {
..Opts::default()
};

let result = Opts::from(opts.fastqs, opts.read_structures, false);
let result = Opts::from(&opts.fastqs, &opts.read_structures, false, &vec![]);
assert!(result.is_err());
if let Err(error) = result {
assert!(error.to_string().contains("Read Structure must not be given"));
Expand All @@ -892,7 +912,7 @@ mod test {
let prefix = dir.path().join("prefix");
let opts = Opts { read_structures: vec![], fastqs: vec![prefix], ..Opts::default() };

let result = Opts::from(opts.fastqs, opts.read_structures, false);
let result = Opts::from(&opts.fastqs, &opts.read_structures, false, &vec![]);
assert!(result.is_err());
if let Err(error) = result {
assert!(error.to_string().contains("No FASTQS found for prefix"));
Expand Down Expand Up @@ -920,7 +940,7 @@ mod test {
let opts =
Opts { read_structures: vec![], fastqs: vec![prefix.clone()], ..Opts::default() };

let result = Opts::from(opts.fastqs, opts.read_structures, false);
let result = Opts::from(&opts.fastqs, &opts.read_structures, false, &vec![]);
assert!(result.is_err());
if let Err(error) = result {
assert!(error.to_string().contains("Different # of FASTQs per group"));
Expand Down Expand Up @@ -966,7 +986,7 @@ mod test {

let opts = Opts { read_structures: vec![], fastqs: prefixes, ..Opts::default() };

let results = Opts::from(opts.fastqs, opts.read_structures, false).unwrap();
let results = Opts::from(&opts.fastqs, &opts.read_structures, false, &vec![]).unwrap();
assert_eq!(results.len(), 4);

// I1
Expand Down Expand Up @@ -1026,7 +1046,7 @@ mod test {
..Opts::default()
};

let result = Opts::from(opts.fastqs, opts.read_structures, false);
let result = Opts::from(&opts.fastqs, &opts.read_structures, false, &vec![]);

assert_eq!(result.is_ok(), ok);
if let Err(error) = result {
Expand All @@ -1047,25 +1067,25 @@ mod test {
// FASTQ file
// Read structure has a sample barcode, but extracting sample barcode from FASTQ header,
// so should fail
let result = Opts::from(vec![fastq.clone()], vec![read_structure.clone()], true);
let result = Opts::from(&vec![fastq.clone()], &vec![read_structure.clone()], true, &vec![]);
assert!(result.is_err());

// FASTQ file
// Read structure has a sample barcode and not extracting sample barcode from FASTQ header,
// so should be ok
let result = Opts::from(vec![fastq], vec![read_structure], false);
let result = Opts::from(&vec![fastq], &vec![read_structure], false, &vec![]);
assert!(result.is_ok());

// Path prefix
// Index FASTQ found, but extracting sample barcode from FASTQ header,
// so should fail
let result = Opts::from(vec![prefix.clone()], vec![], true);
let result = Opts::from(&vec![prefix.clone()], &vec![], true, &vec![]);
assert!(result.is_err());

// Path prefix
//Index FASTQ found and not extracting sample barcode from FASTQ header,
// so should be ok
let result = Opts::from(vec![prefix], vec![], false);
let result = Opts::from(&vec![prefix], &vec![], false, &vec![]);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd love to have defaults in rust

assert!(result.is_ok());
}
}
17 changes: 12 additions & 5 deletions src/lib/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,12 @@ pub fn run(opts: Opts) -> Result<()> {
// kind number to support serially processing multiple lanes.
//
// IMPORTANT: opts.fastqs should no longer be used past this point
let input_fastq_groups: Vec<FastqsAndReadStructure> =
Opts::from(opts.fastqs, opts.read_structures, opts.sample_barcode_in_fastq_header)?;
let input_fastq_groups: Vec<FastqsAndReadStructure> = Opts::from(
&opts.fastqs,
&opts.read_structures,
opts.sample_barcode_in_fastq_header,
&opts.lane,
)?;
opts.read_structures = input_fastq_groups.iter().map(|g| g.read_structure.clone()).collect();
opts.fastqs = vec![]; // so we don't use it later incorrectly

Expand Down Expand Up @@ -119,7 +123,10 @@ pub fn run(opts: Opts) -> Result<()> {
check_bgzf(fastq)?;
}

info!("Creating writer threads");
info!(
"Creating writer threads (threads for writing/compressing: {}/{})",
opts.writer_threads, opts.compressor_threads
);
let writers: Result<Vec<_>> = samples
.iter()
.sorted_by(|a, b| a.ordinal.cmp(&b.ordinal))
Expand Down Expand Up @@ -161,7 +168,7 @@ pub fn run(opts: Opts) -> Result<()> {
)?)));
}

info!("Creating reader threads");
info!("Creating reader threads ({} threads per reader)", opts.decompression_threads_per_reader);
let mut readers = input_fastq_groups
.iter()
.map(|g| {
Expand All @@ -185,7 +192,7 @@ pub fn run(opts: Opts) -> Result<()> {
None
};

info!("Processing data");
info!("Processing data ({} demuxing threads)", opts.demux_threads);
let demuxer: Box<dyn Demultiplex> = if samples[0].barcode.len() <= 12
|| opts.override_matcher.map_or(false, |m| m == MatcherKind::PreCompute)
{
Expand Down
44 changes: 43 additions & 1 deletion src/lib/sample_metadata.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
#![forbid(unsafe_code)]
#![allow(clippy::must_use_candidate)]

use std::{collections::HashSet, path::Path};
use std::{
collections::{HashMap, HashSet},
path::Path,
};

use bstr::{BStr, BString};
use itertools::Itertools;
Expand Down Expand Up @@ -314,6 +317,45 @@ pub fn validate_samples(
Ok(samples)
}

/// Subset the samples to those with the specific set of lanes. If no lanes are provided, all
/// samples are considered. Next, aggregate samples with the same Sample_ID and barcode
/// combination.
pub fn coelesce_samples(samples: Vec<SampleMetadata>, lanes: &[usize]) -> Vec<SampleMetadata> {
// subset to just the samples for the given lane
let samples = if lanes.is_empty() {
samples
} else {
// subset to just the samples for the given lane
samples
.iter()
.filter(|sample| sample.lane.filter(|lane| lanes.contains(lane)).is_some())
.cloned()
.collect()
};

// Aggregate samples with the same Sample_ID and barcode
let mut sample_groups: HashMap<(String, BString), Vec<SampleMetadata>> = HashMap::new();
for sample in &samples {
let key = (sample.sample_id.clone(), sample.barcode.clone());
sample_groups.entry(key).or_insert_with(|| Vec::with_capacity(1)).push(sample.clone());
}

// Create a new sample per group (per unique Sample_ID/barcode combination)
let mut samples: Vec<SampleMetadata> = sample_groups
.into_iter()
.map(|(_, group)| {
let sample = group[0].clone();
SampleMetadata { lane: None, ..sample }
})
.collect();

// Sort by line number to keep the order of samples in the outputs (e.g. metrics) the same
// as the order in the input (e.g. sample sheet).
samples.sort_by_key(|sample| sample.line_number);

samples
}

/// Serialize a collection of [`SampleMetadata`] into a file.
///
/// # Errors
Expand Down
11 changes: 7 additions & 4 deletions src/lib/sample_sheet.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::opts::Opts;
use crate::opts::TOOL_NAME;
use crate::sample_metadata::coelesce_samples;
use crate::sample_metadata::{validate_samples, SampleMetadata};
use clap::Parser;
use csv::{ReaderBuilder, StringRecord, Trim};
Expand Down Expand Up @@ -225,6 +226,7 @@ impl SampleSheet {
samples.push(record);
}

let samples = coelesce_samples(samples, &opts.lane);
let samples = validate_samples(
samples,
Some(opts.allowed_mismatches),
Expand Down Expand Up @@ -342,7 +344,7 @@ impl SampleSheet {
let header = &records[0];

// parse the samples
let mut ordinal = 1;
let mut ordinal = 0;
let mut samples: Vec<SampleMetadata> = vec![];
for record in &records[1..] {
// allow an empty line
Expand Down Expand Up @@ -407,6 +409,7 @@ impl SampleSheet {
};

let samples = SampleSheet::slurp_samples(&records[start..=end], start)?;
let samples = coelesce_samples(samples, &opts.lane);

// Validate the samples
let samples = validate_samples(
Expand All @@ -426,7 +429,7 @@ mod test {
use crate::opts::Opts;
use crate::sample_sheet::{SampleSheet, SampleSheetError};
use bstr::BString;
use clap::error::ErrorKind::{MissingRequiredArgument, UnknownArgument};
use clap::error::ErrorKind::UnknownArgument;
use csv::StringRecord;
use itertools::Itertools;
use matches::assert_matches;
Expand Down Expand Up @@ -595,11 +598,11 @@ mod test {
assert_eq!(samples[0].sample_id, "S1");
assert_eq!(samples[0].index1, Some(BString::from("AAAA")));
assert_eq!(samples[0].index2, Some(BString::from("CCCC")));
assert_eq!(samples[0].ordinal, 1);
assert_eq!(samples[0].ordinal, 0);
assert_eq!(samples[1].sample_id, "S2");
assert_eq!(samples[1].index1, Some(BString::from("GGGG")));
assert_eq!(samples[1].index2, Some(BString::from("TTTT")));
assert_eq!(samples[1].ordinal, 2);
assert_eq!(samples[1].ordinal, 1);
}

#[test]
Expand Down