Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreaGuarracino committed Dec 23, 2024
1 parent 2ee3240 commit 4854c05
Show file tree
Hide file tree
Showing 2 changed files with 193 additions and 16 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@ rayon = "1.10.0"
serde = { version = "1.0.216", features = ["derive"] }
noodles = { version = "0.87.0", features = ["bgzf"] }
regex = "1.11.1"
log = "0.4.22"
env_logger = "0.11.5"
207 changes: 191 additions & 16 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use clap::Parser;
use std::fs::File;
use std::io::{self, BufReader, BufWriter};
use std::io::{self, BufReader, BufWriter, Write};
use std::num::NonZeroUsize;
use noodles::bgzf;
use impg::impg::{Impg, SerializableImpg, AdjustedInterval};
use coitrees::IntervalTree;
use impg::paf;
use rayon::ThreadPoolBuilder;
use std::io::BufRead;
use log::{debug, info, warn, error};
use std::collections::HashSet;

/// Common options shared between all commands
#[derive(Parser, Debug)]
Expand All @@ -23,6 +25,10 @@ struct CommonOpts {
/// Number of threads for parallel processing.
#[clap(short = 't', long, value_parser, default_value_t = NonZeroUsize::new(1).unwrap())]
num_threads: NonZeroUsize,

/// Verbosity level (0 = error, 1 = info, 2 = debug)
#[clap(short, long, default_value = "0")]
verbose: u8,
}

/// Command-line tool for querying overlaps in PAF files.
Expand All @@ -34,17 +40,13 @@ enum Args {
#[clap(flatten)]
common: CommonOpts,

/// Path to the FASTA index file (.fai).
#[clap(short = 'f', long, value_parser)]
fasta_index: String,

/// Window size for partitioning.
#[clap(short = 'w', long, value_parser)]
window_size: usize,

/// Sample/contig name to process.
/// Sequence name prefix to start - all sequences starting with this prefix will be included
#[clap(short = 's', long, value_parser)]
sample_name: String,
sequence_prefix: String,

/// Minimum length for intervals (default: 5000).
#[clap(long, value_parser, default_value_t = 5000)]
Expand Down Expand Up @@ -88,13 +90,12 @@ fn main() -> io::Result<()> {
match args {
Args::Partition {
common,
fasta_index,
window_size,
sample_name,
sequence_prefix,
min_length,
} => {
let impg = initialize_impg(&common)?;
partition_alignments(&impg, &fasta_index, window_size, &sample_name, min_length)?;
partition_alignments(&impg, window_size, &sequence_prefix, min_length, common.verbose > 1)?;
},
Args::Query {
common,
Expand All @@ -113,11 +114,12 @@ fn main() -> io::Result<()> {
let invalid_cigars = impg::impg::check_intervals(&impg, &results);
if !invalid_cigars.is_empty() {
for (row, error_reason) in invalid_cigars {
eprintln!("{}; {}", error_reason, row);
error!("{}; {}", error_reason, row);
}
panic!("Invalid intervals encountered.");
}
}

if output_paf {
// Skip the first element (the input range) for PAF
output_results_paf(&impg, results.into_iter().skip(1), &target_name, None);
Expand All @@ -132,7 +134,7 @@ fn main() -> io::Result<()> {
let invalid_cigars = impg::impg::check_intervals(&impg, &results);
if !invalid_cigars.is_empty() {
for (row, error_reason) in invalid_cigars {
eprintln!("{}; {}", error_reason, row);
error!("{}; {}", error_reason, row);
}
panic!("Invalid intervals encountered.");
}
Expand Down Expand Up @@ -165,6 +167,15 @@ fn main() -> io::Result<()> {

/// Initialize thread pool and load/generate index based on common options
fn initialize_impg(common: &CommonOpts) -> io::Result<Impg> {
// Initialize logger based on verbosity
env_logger::Builder::new()
.filter_level(match common.verbose {
0 => log::LevelFilter::Error,
1 => log::LevelFilter::Info,
_ => log::LevelFilter::Debug,
})
.init();

// Configure thread pool
ThreadPoolBuilder::new()
.num_threads(common.num_threads.into())
Expand Down Expand Up @@ -196,10 +207,10 @@ fn load_index(paf_file: &str) -> io::Result<Impg> {
if let (Ok(paf_file_ts), Ok(index_file_ts)) = (paf_file_metadata.modified(), index_file_metadata.modified()) {
if paf_file_ts > index_file_ts
{
eprintln!("WARNING:\tPAF file has been modified since impg index creation.");
warn!("WARNING:\tPAF file has been modified since impg index creation.");
}
} else {
eprintln!("WARNING:\tUnable to compare timestamps of PAF file and impg index file. PAF file may have been modified since impg index creation.");
warn!("WARNING:\tUnable to compare timestamps of PAF file and impg index file. PAF file may have been modified since impg index creation.");
}

let file = File::open(index_file)?;
Expand Down Expand Up @@ -230,12 +241,176 @@ fn generate_index(paf_file: &str, num_threads: NonZeroUsize) -> io::Result<Impg>

fn partition_alignments(
impg: &Impg,
fasta_index: &str,
window_size: usize,
sample_name: &str,
sequence_prefix: &str,
min_length: usize,
debug: bool,
) -> io::Result<()> {
// Get all sequences with the given prefix
let mut sample_regions = Vec::new();
for seq_id in 0..impg.seq_index.len() as u32 {
let seq_name = impg.seq_index.get_name(seq_id).unwrap();
if seq_name.starts_with(sequence_prefix) {
let seq_length = impg.seq_index.get_len_from_id(seq_id).unwrap();
sample_regions.push((seq_name.to_string(), 0, seq_length));
}
}
if sample_regions.is_empty() {
return Err(io::Error::new(
io::ErrorKind::NotFound,
format!("No sequences with prefix {} found in sequence index", sequence_prefix),
));
}

if debug {
debug!("Starting sequences:");
for (chrom, start, end) in &sample_regions {
debug!(" Region: {}:{}-{}", chrom, start, end);
}
}

// Create windows from sample regions
let mut windows = Vec::new();
for (chrom, start, end) in sample_regions {
let mut pos = start;
while pos < end {
let window_end = std::cmp::min(pos + window_size, end);
windows.push((chrom.clone(), pos, window_end));
pos = window_end;
}
}

if debug {
debug!("Starting windows:");
for (chrom, start, end) in &windows {
debug!(" Window: {}:{}-{}", chrom, start, end);
}
}

// Initialize masked regions
let mut masked_regions = HashSet::new();

// Initialize missing regions from sequence index
let mut missing_regions: HashSet<_> = (0..impg.seq_index.len() as u32)
.map(|id| {
let name = impg.seq_index.get_name(id).unwrap();
let len = impg.seq_index.get_len_from_id(id).unwrap();
(name.to_string(), 0, len)
})
.collect();

let mut partition_num = 0;

while !windows.is_empty() {
debug!("Processing new window set");

if debug {
debug!(" Missing regions:");
for (chrom, start, end) in &missing_regions {
debug!(" Region: {}:{}-{}", chrom, start, end);
}
}

let mut new_windows = Vec::new();

for (chrom, start, end) in windows.iter() {
let region = format!("{}:{}-{}", chrom, start, end);
debug!(" Querying region {}", region);

// Query overlaps for current window
let overlaps = impg.query_transitive(
impg.seq_index.get_id(chrom).unwrap(),
*start as i32,
*end as i32
);

// TODO: Merge overlaps that are close to each other
// bedtools sort | bedtools merge -d 10000

// Apply mask
// bedtools subtract -a "partition$num.tmp.bed" -b "$MASK_BED"
// Convert overlaps to regions, excluding masked areas
let mut partition_regions = Vec::new();
for (interval, _, _) in overlaps.into_iter().skip(1) { // Skip input interval
let name = impg.seq_index.get_name(interval.metadata).unwrap().to_string();
let region = (name, interval.first as usize, interval.last as usize);
if !masked_regions.contains(&region) {
partition_regions.push(region);
}
}

if debug {
debug!(" Collected {} overlaps for region {}:", partition_regions.len(), region);
// for (chrom, start, end) in &partition_regions {
// debug!(" Region: {}:{}-{}", chrom, start, end);
// }
}

if !partition_regions.is_empty() {
info!(" Processing partition {}", partition_num);

// Write partition to file
let mut partition_file = File::create(format!("partition{}.bed", partition_num))?;
for (name, start, end) in &partition_regions {
writeln!(partition_file, "{}\t{}\t{}", name, start, end)?;
}

// Update masked regions and missing regions
for region in partition_regions {
masked_regions.insert(region.clone());
missing_regions.remove(&region);
}

// Handle short intervals
let mut short_regions = Vec::new();
let mut long_regions = Vec::new();

for region in &masked_regions {
let len = region.2 - region.1;
if len < min_length {
short_regions.push(region.clone());
} else {
long_regions.push(region.clone());
}
}

// Extend short intervals
for region in short_regions {
if let Some(seq_id) = impg.seq_index.get_id(&region.0) {
if let Some(seq_len) = impg.seq_index.get_len_from_id(seq_id) {
let new_start = if region.1 >= min_length { region.1 - min_length } else { 0 };
let new_end = std::cmp::min(region.2 + min_length, seq_len);
long_regions.push((region.0, new_start, new_end));
}
}
}

partition_num += 1;
} else {
panic!("No overlaps found for region {}", region);
}
}

// If no missing regions remain, we're done
if missing_regions.is_empty() {
break;
}

// Find longest remaining region for new windows
if let Some(longest_region) = missing_regions.iter()
.max_by_key(|r| r.2 - r.1) {
let (chrom, start, end) = longest_region;
let mut pos = *start;
while pos < *end {
let window_end = std::cmp::min(pos + window_size, *end);
new_windows.push((chrom.clone(), pos, window_end));
pos = window_end;
}
}

windows = new_windows;
}

Ok(())
}

Expand Down

0 comments on commit 4854c05

Please sign in to comment.