Skip to content

Commit

Permalink
added test for alignment_info.py, removed unused code
Browse files Browse the repository at this point in the history
  • Loading branch information
cat-bug authored and andrewprzh committed Aug 26, 2024
1 parent ede4443 commit aa62894
Show file tree
Hide file tree
Showing 7 changed files with 220 additions and 164 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -564,13 +564,13 @@ We recommend _not_ to modify these options unless you are clearly aware of their
but may improve running time when disk I/O is relatively slow.

`--min_mapq`
Filers out all alignments with MAPQ less than this value (will also filter all secondary alignments, as they typically have MAPQ = 0).
Filters out all alignments with MAPQ less than this value (will also filter all secondary alignments, as they typically have MAPQ = 0).

`--inconsistent_mapq_cutoff`
Filers out inconsistent alignments with MAPQ less than this value (works when the reference annotation is provided, default is 5).
Filters out inconsistent alignments with MAPQ less than this value (works when the reference annotation is provided, default is 5).

`--simple_alignments_mapq_cutoff`
Filers out alignments with 1 or 2 exons and MAPQ less than this value (works only in annotation-free mode, default is 1).
Filters out alignments with 1 or 2 exons and MAPQ less than this value (works only in annotation-free mode, default is 1).

`--normalization_method`
Method for normalizing non-grouped counts into TPMs:
Expand Down
14 changes: 8 additions & 6 deletions src/alignment_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@


class AlignmentInfo:
REGION_TO_CHECK_LEN = 12

def __init__(self, alignment):
self.alignment = alignment
# concat indels
Expand All @@ -22,7 +24,6 @@ def __init__(self, alignment):
self.aligned_pairs = None
self.aligned_pairs_start_index = None
self.aligned_pairs_end_index = None
self.region_to_check = 12
if not self.read_exons:
return
self.read_start = self.read_exons[0][0]
Expand Down Expand Up @@ -60,14 +61,15 @@ def get_error_count(self, ref_start, ref_end, intron_index=None, left_site=True,
if intron_index is None:
selected_pairs = self.aligned_pairs
elif left_site:
# find alinned pairs near intron start (previous exon end)
selected_pairs = self.aligned_pairs[max(0, self.aligned_pairs_end_index[intron_index] - self.region_to_check):
self.aligned_pairs_end_index[intron_index] + self.region_to_check]
# find aligned pairs near intron start (previous exon end)
selected_pairs = self.aligned_pairs[
max(0, self.aligned_pairs_end_index[intron_index] - AlignmentInfo.REGION_TO_CHECK_LEN):
self.aligned_pairs_end_index[intron_index] + AlignmentInfo.REGION_TO_CHECK_LEN]
else:
# find aligned pairs near intron end (next exon start)
selected_pairs = self.aligned_pairs[
max(0, self.aligned_pairs_start_index[intron_index+1] - self.region_to_check):
self.aligned_pairs_start_index[intron_index+1] + self.region_to_check]
max(0, self.aligned_pairs_start_index[intron_index + 1] - AlignmentInfo.REGION_TO_CHECK_LEN):
self.aligned_pairs_start_index[intron_index + 1] + AlignmentInfo.REGION_TO_CHECK_LEN]

indel_count = 0
mismatch_count = 0
Expand Down
86 changes: 0 additions & 86 deletions src/alignment_refiner.py

This file was deleted.

53 changes: 37 additions & 16 deletions src/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import threading
import math
from collections import defaultdict
from enum import Enum

logger = logging.getLogger('IsoQuant')

Expand All @@ -27,6 +28,26 @@ def increment(self):
return self.value


class CigarEvent(Enum):
match = 0
insertion = 1
deletion = 2
skipped = 3
soft_clipping = 4
hard_clipping = 5
padding = 6
seq_match = 7
seq_mismatch = 8

@classmethod
def get_match_events(cls):
return {cls.match, cls.seq_match, cls.seq_mismatch}

@classmethod
def get_ins_del_match_events(cls):
return {cls.match, cls.insertion, cls.deletion, cls.seq_match, cls.seq_mismatch}


# key, value
def get_first_best_from_sorted(sorted_list_of_pairs):
if not sorted_list_of_pairs:
Expand Down Expand Up @@ -460,25 +481,25 @@ def concat_gapless_blocks(blocks, cigar_tuples):

while cigar_index < len(cigar_tuples) and block_index < len(blocks):
# init new block
cigar_event = cigar_tuples[cigar_index][0]
cigar_event = CigarEvent(cigar_tuples[cigar_index][0])
if current_block is None:
# init new block from match
if cigar_event in [0, 7, 8]:
if cigar_event in CigarEvent.get_match_events():
current_block = (blocks[block_index][0] - deletions_before_block, blocks[block_index][1])
deletions_before_block = 0
block_index += 1
# keep track of deletions before matched block
elif cigar_event == 2:
elif cigar_event == CigarEvent.deletion:
deletions_before_block = cigar_tuples[cigar_index][1]
# found intron, add current block
elif cigar_event == 3:
elif cigar_event == CigarEvent.skipped:
resulting_blocks.append(current_block)
current_block = None
# add deletion to block
elif cigar_event == 2:
elif cigar_event == CigarEvent.deletion:
current_block = (current_block[0], current_block[1] + cigar_tuples[cigar_index][1])
# found match - merge blocks
elif cigar_event in [0, 7, 8]:
elif cigar_event in CigarEvent.get_match_events():
# if abs(current_block[1] - blocks[block_index][0]) > 1:
# logger.debug("Distant blocks")
# logger.debug(current_block, blocks[block_index])
Expand All @@ -500,38 +521,38 @@ def get_read_blocks(ref_start, cigar_tuples):
current_ref_block_start = None
current_read_block_start = None
current_cigar_block_start = None
has_match=False
has_match = False
ref_blocks = []
cigar_blocks = []
read_blocks = []

while cigar_index < len(cigar_tuples):
cigar_event = cigar_tuples[cigar_index][0]
cigar_event = CigarEvent(cigar_tuples[cigar_index][0])
event_len = cigar_tuples[cigar_index][1]

if current_ref_block_start is None and cigar_event in [0, 1, 2, 7, 8]:
if current_ref_block_start is None and cigar_event in CigarEvent.get_ins_del_match_events():
# init new block from match
current_ref_block_start = ref_pos
current_read_block_start = read_pos
current_cigar_block_start = cigar_index
if cigar_event == 1:
if cigar_event == CigarEvent.insertion:
read_pos += event_len
elif cigar_event == 2:
elif cigar_event == CigarEvent.deletion:
ref_pos += event_len
else:
read_pos += event_len
ref_pos += event_len
has_match = True
# found intron, add current block
elif cigar_event in [0, 7, 8]:
elif cigar_event in CigarEvent.get_match_events():
read_pos += event_len
ref_pos += event_len
has_match = True
elif cigar_event == 1:
elif cigar_event == CigarEvent.insertion:
read_pos += event_len
elif cigar_event == 2:
elif cigar_event == CigarEvent.deletion:
ref_pos += event_len
elif cigar_event == 3:
elif cigar_event == CigarEvent.skipped:
if current_ref_block_start:
if has_match:
ref_blocks.append((current_ref_block_start, ref_pos - 1))
Expand All @@ -542,7 +563,7 @@ def get_read_blocks(ref_start, cigar_tuples):
current_read_block_start = None
current_cigar_block_start = None
ref_pos += event_len
elif cigar_event == 4:
elif cigar_event == CigarEvent.soft_clipping:
if current_ref_block_start:
if has_match:
ref_blocks.append((current_ref_block_start, ref_pos - 1))
Expand Down
106 changes: 106 additions & 0 deletions tests/test_alignment_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import pytest

from src.alignment_info import AlignmentInfo
from src.polya_finder import PolyAFinder
from src.polya_verification import PolyAFixer

SEQ1 = 'CTCAAGACCAAGAAGGACGACATGACCATGGCTTAAAAGAGTCTGCTCCCCACAGCCCCCTGCGAT' \
'GGATGGACGGAGGAACCAGGGTCGGACGACCTCCGATGCTAAGAGCACTCCAACTGCTGCAAACCG' \
'AAGAAGCAGGCATCGGACDCCTTTTTTTTTTAGGCGCCAAAAAAAAAAAACCAAAAAAAAAAAAA'
TUPLES1 = [(0, 167), (4, 30)]
TUPLES2 = [(0, 167), (4, 30), (5, 5), (4, 16), (0, 30), (5, 70), (0, 201)]
PAIRS = [(0, 75897), (1, 76063), (2, 75995)]


class Alignment:
def __init__(self, query_name, cigartuples, seq, reference_start, reference_end, reference_name, reference_id,
aligned_pairs):
self.query_name = query_name
self.cigartuples = cigartuples
self.seq = seq
self.reference_start = reference_start
self.reference_end = reference_end
self.reference_name = reference_name
self.reference_id = reference_id
self.aligned_pairs = aligned_pairs

def get_aligned_pairs(self):
return self.aligned_pairs


class TestAlignmentInfo:

@pytest.mark.parametrize("read_exons, read_blocks, cigar_blocks",
[
([(75897, 76063), (76064, 76294)], [(0, 166), (213, 443)], [(0, 0), (4, 6)])
])
def test_basic(self, read_exons, read_blocks, cigar_blocks):
alignment = Alignment('query', TUPLES2, SEQ1, 75896, 113577, 'reference', 789, PAIRS)
alignment_info = AlignmentInfo(alignment)

assert alignment_info.alignment == alignment
assert alignment_info.read_start == read_exons[0][0]
assert alignment_info.read_end == read_exons[-1][-1]
assert alignment_info.read_exons == read_exons
assert alignment_info.read_blocks == read_blocks
assert alignment_info.cigar_blocks == cigar_blocks

assert alignment_info.aligned_pairs is None
assert alignment_info.aligned_pairs_start_index is None
assert alignment_info.aligned_pairs_end_index is None
assert alignment_info.polya_info is None
assert alignment_info.combined_profile is None
assert alignment_info.exons_changed is False
assert len(alignment_info.cage_hits) == 0

def test_alignment_pairs(self):
alignment = Alignment('query', TUPLES1, SEQ1, 75896, 113577, 'reference', 789, PAIRS)
alignment_info = AlignmentInfo(alignment)
alignment_info.set_aligned_pairs()

assert len(PAIRS) in alignment_info.aligned_pairs_start_index
assert len(alignment_info.aligned_pairs_start_index) == 3

@pytest.mark.parametrize("ref_start, ref_end, intron_index, left_site, chr_record, expected",
[
(75999, 81000, None, None, None, (0, 0)),
(75999, 81000, 0, False, None, (0, 0)),
(75999, 81000, 0, True, None, (0, 0))
])
def test_get_error_count(self, ref_start, ref_end, intron_index, left_site, chr_record, expected):
alignment = Alignment('query', TUPLES2, SEQ1, 75896, 113577, 'reference', 789, PAIRS)
alignment_info = AlignmentInfo(alignment)

indel_count, mismatch_count = alignment_info.get_error_count(ref_start, ref_end, intron_index, left_site,
chr_record)
assert (indel_count, mismatch_count) == expected

@pytest.mark.parametrize("read_start, read_end",
[
(51055, 51221)
])
def test_add_polya_info(self, read_start, read_end):
alignment = Alignment('query', TUPLES2, SEQ1, 51054, 35000, 'reference', 789, PAIRS)
alignment_info = AlignmentInfo(alignment)
polya_finder = PolyAFinder()
polya_fixer = PolyAFixer(1)

alignment_info.add_polya_info(polya_finder, polya_fixer)

assert alignment_info.exons_changed is True
assert alignment_info.polya_info.internal_polya_pos == read_end
assert alignment_info.polya_info.internal_polyt_pos == -1
assert alignment_info.polya_info.external_polya_pos == -1
assert alignment_info.polya_info.external_polyt_pos == -1
assert alignment_info.read_exons == [(read_start, read_end)]
assert alignment_info.read_blocks == [(0, (read_end - read_start))]
assert alignment_info.cigar_blocks == [(0, 0)]
assert alignment_info.read_start == read_start
assert alignment_info.read_end == read_end

def test_get_seq(self):
alignment = Alignment('query', TUPLES1, SEQ1, 75896, 113577, 'reference', 789, PAIRS)
alignment_info = AlignmentInfo(alignment)

with pytest.raises(AssertionError):
alignment_info.get_seq(76000, 81000)
Loading

0 comments on commit aa62894

Please sign in to comment.