Skip to content

Commit

Permalink
Revert function 'parse_realigned_bam' to version in commit 8446269
Browse files Browse the repository at this point in the history
This commit reverts the changes made to function 'parse_realigned_bam' in some commits after 8446269. The changes were intended to speed up the quantification. However, the introduced multiprocessing structure didn't handle the multimapping reads well. Note: the changed function were not deleted, they stayed in the script as functions but not being called anywhere.
  • Loading branch information
youyupei committed Jul 27, 2023
1 parent 621a567 commit 3275249
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 45 deletions.
1 change: 0 additions & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ Imports:
arrangements,
basilisk,
bambu,
Rsubread,
Biostrings,
BiocGenerics,
circlize,
Expand Down
182 changes: 139 additions & 43 deletions inst/python/count_tr.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ def wrt_tr_to_csv(bc_tr_count_dict, transcript_dict, csv_f, transcript_dict_ref=
f.close()
if print_saturation and has_UMI:
helper.green_msg(f"The isoform quantification result generated: {csv_f}.")
helper.green_msg(f"The estimated saturation is {1-len(dup_count)/sum(dup_count)}")
if sum(dup_count):
helper.green_msg(f"The estimated saturation is {1-len(dup_count)/sum(dup_count)}")
return tr_cnt


Expand Down Expand Up @@ -131,12 +132,51 @@ def query_len(cigar_string, hard_clipping=False):
return result


def parse_realigned_bam_multiprocss(bam_in, fa_idx_f, min_sup_reads, min_tr_coverage,
min_read_coverage, bc_file = False,
n_process=mp.cpu_count()-4):
"""
Read each alignment from the read to transcript realignment BAM file
Current approach:
Single thread loop through all alignment
Returns:
Per transcript read count.
Note: This function is not currently used as multi-mapping reads are not handled properly.
"""
with ps.AlignmentFile(bam_in, "rb") as bam_file:
# Get the list of reference names
reference_names = bam_file.references

def process_trans(batch_id, total_batches, bam_in, fa_idx_f, min_sup_reads, min_tr_coverage,
rst_futures = helper.multiprocessing_submit(process_trans, iter(reference_names)
,n_process=n_process, pbar = False,
bam_in=bam_in,
fa_idx_f=fa_idx_f, min_sup_reads=min_sup_reads,
min_tr_coverage=min_tr_coverage,
min_read_coverage=min_read_coverage, bc_file=bc_file)

bc_tr_count_dict_mp_rst = {}
bc_tr_badcov_count_dict_mp_rst = {}
tr_kept_rst_mp_rst = None # this doesn't seem to be used in the downstream

for idx, f in enumerate(rst_futures):
#print(f"collecting result of batch {idx} " + datetime.now().strftime("%Y-%m-%d %H:%M:%S"),flush = True)

d1, d2, _ = f.result()
for k1 in d1.keys():
for k2 in d1[k1].keys():
bc_tr_count_dict_mp_rst.setdefault(k1,{}).setdefault(k2,[]).extend(d1[k1][k2])
for k1 in d2.keys():
for k2 in d2[k1].keys():
bc_tr_badcov_count_dict_mp_rst.setdefault(k1,{}).setdefault(k2,[]).extend(d2[k1][k2])

#tr_kept_rst_mp_rst.update(p3) # this doesn't seem to be used in the downstream
print("All reads processed", flush= True)
return bc_tr_count_dict_mp_rst, bc_tr_badcov_count_dict_mp_rst, tr_kept_rst_mp_rst

def process_trans(chr_name, bam_in, fa_idx_f, min_sup_reads, min_tr_coverage,
min_read_coverage, bc_file):
"""Main function for process single batch of alignments
batch_id: 0 based integer
total_batches: total number of batches
chr_name: chromosome name
alignment_batch: a list of pysam AlignedSegment object
"""
bamfile = ps.AlignmentFile(bam_in, "rb")
Expand All @@ -151,10 +191,8 @@ def process_trans(batch_id, total_batches, bam_in, fa_idx_f, min_sup_reads, min_

if bc_file:
bc_dict = make_bc_dict(bc_file) # not sure yet what this is doing

for idx,rec in enumerate(bamfile.fetch()):
if idx % total_batches != batch_id:
continue
print(chr_name)
for idx, rec in enumerate(bamfile.fetch(chr_name)):
if rec.is_unmapped:
cnt_stat["unmapped"] += 1
continue
Expand Down Expand Up @@ -188,7 +226,6 @@ def process_trans(batch_id, total_batches, bam_in, fa_idx_f, min_sup_reads, min_
[it for it in tr_cov_dict[tr] if it > 0.9]) > min_sup_reads)



#unique_tr_count = Counter(read_dict[r][0][0]
# for r in read_dict if read_dict[r][0][2] > 0.9)

Expand Down Expand Up @@ -242,44 +279,103 @@ def process_trans(batch_id, total_batches, bam_in, fa_idx_f, min_sup_reads, min_
# print("process end time:" + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
return bc_tr_count_dict, bc_tr_badcov_count_dict, tr_kept


def parse_realigned_bam(bam_in, fa_idx_f, min_sup_reads, min_tr_coverage,
min_read_coverage, bc_file = False,
n_process=mp.cpu_count()-4):
def parse_realigned_bam(bam_in, fa_idx_f, min_sup_reads, min_tr_coverage, min_read_coverage, bc_file = False):
"""
Read each alignment from the read to transcript realignment BAM file
Current approach:
Single thread loop through all alignment
Returns:
Per transcript read count.
"""
rst_futures = helper.multiprocessing_submit(process_trans,
(x for x in range(n_process)), total_batches=n_process ,n_process=n_process, pbar = False,
pbar_func = lambda x: 1,
pbar_unit='Batch',
bam_in=bam_in,
fa_idx_f=fa_idx_f, min_sup_reads=min_sup_reads,
min_tr_coverage=min_tr_coverage,
min_read_coverage=min_read_coverage, bc_file=bc_file)

bc_tr_count_dict_mp_rst = {}
bc_tr_badcov_count_dict_mp_rst = {}
tr_kept_rst_mp_rst = None # this doesn't seem to be used in the downstream
fa_idx = dict((it.strip().split()[0], int(
it.strip().split()[1])) for it in open(fa_idx_f))
bc_tr_count_dict = {}
bc_tr_badcov_count_dict = {}
tr_cov_dict = {}
read_dict = {}
cnt_stat = Counter()
bamfile = ps.AlignmentFile(bam_in, "rb")
gene_names = bamfile.references

if bc_file:
bc_dict = make_bc_dict(bc_file)
for rec in bamfile.fetch(until_eof=True):
if rec.is_unmapped:
cnt_stat["unmapped"] += 1
continue
map_st = rec.reference_start
map_en = rec.reference_end
tr = rec.reference_name
tr_cov = float(map_en-map_st)/fa_idx[tr]
tr_cov_dict.setdefault(tr, []).append(tr_cov)
if rec.query_name not in read_dict:
read_dict.setdefault(rec.query_name, []).append((tr, rec.get_tag("AS"), tr_cov, float(
rec.query_alignment_length)/rec.infer_read_length(), rec.mapping_quality))
else:
if rec.get_tag("AS") > read_dict[rec.query_name][0][1]:
read_dict[rec.query_name].insert(0, (tr, rec.get_tag("AS"), tr_cov, float(
rec.query_alignment_length)/rec.infer_read_length(), rec.mapping_quality))
# same aligned sequence
elif rec.get_tag("AS") == read_dict[rec.query_name][0][1] and float(rec.query_alignment_length)/rec.infer_read_length() == read_dict[rec.query_name][0][3]:
# choose the one with higher transcript coverage, might be internal TSS
if tr_cov > read_dict[rec.query_name][0][2]:
read_dict[rec.query_name].insert(0, (tr, rec.get_tag("AS"), tr_cov, float(
rec.query_alignment_length)/rec.infer_read_length(), rec.mapping_quality))
else:
read_dict[rec.query_name].append((tr, rec.get_tag("AS"), tr_cov, float(
rec.query_alignment_length)/rec.infer_read_length(), rec.mapping_quality))
if tr not in fa_idx:
cnt_stat["not_in_annotation"] += 1
print("\t" + str(tr), "not in annotation ???")
tr_kept = dict((tr, tr) for tr in tr_cov_dict if len(
[it for it in tr_cov_dict[tr] if it > 0.9]) > min_sup_reads)

for idx, f in enumerate(rst_futures):
#print(f"collecting result of batch {idx} " + datetime.now().strftime("%Y-%m-%d %H:%M:%S"),flush = True)

d1, d2, _ = f.result()
for k1 in d1.keys():
for k2 in d1[k1].keys():
bc_tr_count_dict_mp_rst.setdefault(k1,{}).setdefault(k2,[]).extend(d1[k1][k2])
for k1 in d2.keys():
for k2 in d2[k1].keys():
bc_tr_badcov_count_dict_mp_rst.setdefault(k1,{}).setdefault(k2,[]).extend(d2[k1][k2])
#unique_tr_count = Counter(read_dict[r][0][0]
# for r in read_dict if read_dict[r][0][2] > 0.9)

#tr_kept_rst_mp_rst.update(p3) # this doesn't seem to be used in the downstream
print("All reads processed", flush= True)
return bc_tr_count_dict_mp_rst, bc_tr_badcov_count_dict_mp_rst, tr_kept_rst_mp_rst
for r in read_dict:
tmp = read_dict[r]
tmp = [it for it in tmp if it[0] in tr_kept]
if len(tmp) > 0:
hit = tmp[0] # transcript_id, pct_ref, pct_reads
else:
cnt_stat["no_good_match"] += 1
continue
# below line creates issue when header line has more than one _.
# in this case, umi is assumed to be delimited from the barcode by the last _
# bc, umi = r.split("#")[0].split("_") # assume cleaned barcode
try:
bc, umi = r.split("#")[0].split("_") # assume cleaned barcode
except ValueError as ve:
print(ve, ": ", ve.args, ".")
raise ValueError(
"Please check if barcode and UMI are delimited by \"_\"")

if bc_file:
bc = bc_dict[bc]
if len(tmp) == 1 and tmp[0][4] > 0:
if bc not in bc_tr_count_dict:
bc_tr_count_dict[bc] = {}
bc_tr_count_dict[bc].setdefault(hit[0], []).append(umi)
cnt_stat["counted_reads"] += 1
elif len(tmp) > 1 and tmp[0][1] == tmp[1][1] and tmp[0][3] == tmp[1][3]:
if hit[1] > 0.8:
if bc not in bc_tr_count_dict:
bc_tr_count_dict[bc] = {}
bc_tr_count_dict[bc].setdefault(hit[0], []).append(umi)
cnt_stat["counted_reads"] += 1
else:
cnt_stat["ambigious_reads"] += 1
if bc not in bc_tr_badcov_count_dict:
bc_tr_badcov_count_dict[bc] = {}
bc_tr_badcov_count_dict[bc].setdefault(hit[0], []).append(umi)
elif hit[2] < min_tr_coverage or hit[3] < min_read_coverage:
cnt_stat["not_enough_coverage"] += 1
if bc not in bc_tr_badcov_count_dict:
bc_tr_badcov_count_dict[bc] = {}
bc_tr_badcov_count_dict[bc].setdefault(hit[0], []).append(umi)
else:
if bc not in bc_tr_count_dict:
bc_tr_count_dict[bc] = {}
bc_tr_count_dict[bc].setdefault(hit[0], []).append(umi)
cnt_stat["counted_reads"] += 1
print(("\t" + str(cnt_stat)))
return bc_tr_count_dict, bc_tr_badcov_count_dict, tr_kept

def realigment_min_sup_reads_filter(bam_list, fa_idx_f, min_sup_reads):
fa_idx = dict((it.strip().split()[0], int(
Expand Down
3 changes: 2 additions & 1 deletion inst/python/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ def multiprocessing_submit(func, iterator, n_process=mp.cpu_count()-1 ,pbar = Tr
else:
n_job_in_queue -= 1
# update pregress bar based on batch size
_pbar.update(pbar_update)
if pbar:
_pbar.update(pbar_update)
yield job
del futures[job]

Expand Down

0 comments on commit 3275249

Please sign in to comment.