Skip to content

Commit

Permalink
better parallel liftover
Browse files Browse the repository at this point in the history
  • Loading branch information
mikolmogorov committed Mar 4, 2022
1 parent df1ee97 commit 0909330
Showing 1 changed file with 33 additions and 45 deletions.
78 changes: 33 additions & 45 deletions hapdup/bed_liftover.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,68 +66,56 @@ def project_flank(bam_file, ref_seq, ref_pos, flank):
return last_aln.query_name, last_qry_pos, 1 if not last_aln.is_reverse else 0


def bed_liftover(bed_file, bam_file, output_failed, ctg_id):
out_strings = []
#print("Processing", ctg_id)
def bed_liftover(bam_file, output_failed, line):
fields = line.split("\t")
chr_id, chr_start, chr_end = fields[0], int(fields[1]), int(fields[2])

for line in open(bed_file, "r"):
line = line.strip()
if line.startswith("#"):
#out_strings.append(line)
continue
#MIN_BLOCK = 50
#if chr_end - chr_start < MIN_BLOCK:
# continue

fields = line.split("\t")
chr_id, chr_start, chr_end = fields[0], int(fields[1]), int(fields[2])
proj_start_chr, proj_start_pos, proj_start_sign = project(bam_file, chr_id, chr_start)
proj_end_chr, proj_end_pos, proj_end_sign = project(bam_file, chr_id, chr_end)

if chr_id != ctg_id:
continue

#MIN_BLOCK = 50
#if chr_end - chr_start < MIN_BLOCK:
# continue
if (not proj_start_chr or not proj_end_chr or
proj_end_chr != proj_start_chr or
proj_start_sign != proj_end_sign):
if output_failed:
print("#Failed: " + line)
return None

proj_start_chr, proj_start_pos, proj_start_sign = project(bam_file, chr_id, chr_start)
proj_end_chr, proj_end_pos, proj_end_sign = project(bam_file, chr_id, chr_end)

if (not proj_start_chr or not proj_end_chr or
proj_end_chr != proj_start_chr or
proj_start_sign != proj_end_sign):
if output_failed:
out_strings.append("#Failed: " + line)
#print("#Failed:", line)
continue
if proj_start_sign < 0:
proj_start_pos, proj_end_pos = proj_end_pos, proj_start_pos
#if proj_end_pos < proj_start_pos:
# print(proj_start_pos, proj_start_sign, proj_end_pos, proj_end_sign)
# raise Exception("Negative length interval")

if proj_start_sign < 0:
proj_start_pos, proj_end_pos = proj_end_pos, proj_start_pos
#if proj_end_pos < proj_start_pos:
# print(proj_start_pos, proj_start_sign, proj_end_pos, proj_end_sign)
# raise Exception("Negative length interval")

fields[0], fields[1], fields[2] = proj_start_chr, str(proj_start_pos), str(proj_end_pos)
out_strings.append("\t".join(fields))

return ctg_id, out_strings
fields[0], fields[1], fields[2] = proj_start_chr, str(proj_start_pos), str(proj_end_pos)
return fields


def _unpacker(args):
return bed_liftover(*args)


def liftover_parallel(bed_file, bam_file, out_stream, output_failed, num_threads):
all_reference_ids = [r for r in pysam.AlignmentFile(bam_file, "rb").references]
random.shuffle(all_reference_ids)
tasks = [(bed_file, bam_file, output_failed, r)
for r in all_reference_ids]
tasks = []
for line in open(bed_file, "r"):
line = line.strip()
if line.startswith("#"):
out_stream.write(line + "\n")
continue

tasks.append((bam_file, output_failed, line))

thread_outputs = None
with Pool(num_threads) as p:
thread_outputs = p.map(_unpacker, tasks)

thread_outputs.sort(key=lambda o: o[0])
thread_outputs = [x[1] for x in thread_outputs if len(x[1]) > 0]
bed_strings = sum(thread_outputs, [])
for s in bed_strings:
out_stream.write(s + "\n")
thread_outputs = [x for x in thread_outputs if x is not None]
thread_outputs.sort(key=lambda o: (o[0], o[1], o[2]))
for s in thread_outputs:
out_stream.write("\t".join(s) + "\n")


def main():
Expand Down

0 comments on commit 0909330

Please sign in to comment.