Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/bam-reading #33

Open
wants to merge 5 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 30 additions & 21 deletions minFQ/fastq_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
unseen_files_in_watch_folder_dict,
_prepare_toml,
OpenLine,
create_run_collection, average_quality, check_is_pass,
create_run_collection, average_quality, check_is_pass, ReadBam,
)
from watchdog.events import FileSystemEventHandler

Expand Down Expand Up @@ -233,7 +233,14 @@ def parse_fastq_file(
}
handle = open_handle_dict.get(fastq_path[-3:], open)
with handle(fastq_path, "rt") as fh:
for desc, name, seq, qual in readfq(fh):
# If the flag is_bam is used, BAM files will be read and parsed
if args.is_bam:
read_file = ReadBam(bam_file=fh).read_bam()
# Otherwise, assume it's a fastq file
else:
read_file = readfq(fh)
for desc, name, seq, qual in read_file:
# Editied for parsing bam desc
description_dict = parse_fastq_description(desc)
counter += 1
sequencing_stats.reads_seen += 1
Expand All @@ -255,6 +262,7 @@ def parse_fastq_file(
EndPoint.FASTQ_FILE, json=payload, base_id=run_id
)
run_dict[run_id].get_readnames_by_run(fastq_file["id"])

parse_fastq_record(
name,
seq,
Expand All @@ -274,24 +282,25 @@ def parse_fastq_file(
log.error("This gzipped file failed to upload - {}.".format(fastq_path))
raise Exception
# continue
# This chunk of code will mean we commit reads every time we get a new file?
for runs in run_dict:
run_dict[runs].read_names = []
run_dict[runs].commit_reads()
try:
# Update the fastq file entry on the server that says we provides file size to database record
payload = {
"file_name": str(check_fastq_path(fastq_path)),
"run_id": run_id,
"md5": get_file_size(fastq_path),
"run": run_dict[run_id].run,
}
fastq_file = minotour_api.put(EndPoint.FASTQ_FILE, json=payload, base_id=run_id)
except Exception as err:
log.error("Problem with uploading file {}".format(err))
sequencing_stats.time_per_file = time.time()
sequencing_stats.fastq_info[run_id]["files_processed"] += 1
return counter

# This chunk of code will mean we commit reads every time we get a new file?
for runs in run_dict:
run_dict[runs].read_names = []
run_dict[runs].commit_reads()
try:
# Update the fastq file entry on the server that says we provides file size to database record
payload = {
"file_name": str(check_fastq_path(fastq_path)),
"run_id": run_id,
"md5": get_file_size(fastq_path),
"run": run_dict[run_id].run,
}
fastq_file = minotour_api.put(EndPoint.FASTQ_FILE, json=payload, base_id=run_id)
except Exception as err:
log.error("Problem with uploading file {}".format(err))
sequencing_stats.time_per_file = time.time()
sequencing_stats.fastq_info[run_id]["files_processed"] += 1
return counter


class FastqHandler(FileSystemEventHandler):
Expand Down Expand Up @@ -366,7 +375,7 @@ def process_files(self):
# remove the file from fastq files to create as we are doing that now
if create_time > time.time() - 5:
time.sleep(5)
# file created 5 sec ago, so should be complete. For simulations we make the time longer.
# file created 5 sec ago, so should be complete. For simulations, we make the time longer.
del self.fastq_files_to_create[fastqfile]
parse_fastq_file(
fastqfile,
Expand Down
205 changes: 176 additions & 29 deletions minFQ/fastq_handler_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,109 @@
from pathlib import Path

import numpy as np
import pysam
import click

from minFQ.endpoints import EndPoint
from minFQ.run_data_tracker import RunDataTracker

log = logging.getLogger(__name__)


class ReadBam:
"""
:param bam_file: A bam format alignment file.
:return:
name: sequence name
seq: sequence
qual: quality of reads
start_time_pr: start time of run (per read)
read_basecall_id: read ID and basecall ID
` channel: channel ID
read_number: read number
seq_to_signal :sequence to signal move table
rg_id : read group identifier
dt: time of run
ds: run ID in basecall model
bm: basecall model
ri: run ID
lb: sample ID
pl: read platform (e.g ONT, PacBio)
pm: device position
pu: flow cell ID
al: unknown (outputs unclassified)
"""

def __init__(self, bam_file):
"""Initializes bam_file and tags as instances"""
# Defines self.bam_file as an AlignmentFile object for reading input BAM file
self.bam_file = pysam.AlignmentFile(bam_file, "rb", check_sq=False)
self.tags = self.get_rg_tags(self.bam_file)

def get_rg_tags(self, bam_file):
"""Detects and retrieves @RG header tags from a BAM file and assigns each one to a dictionary"""
# Gets @RG from header
rg_tags = bam_file.header.get("RG", [])
# Exits script if no @RG field present
if not rg_tags:
print("This file does not contain an @RG field")
exit()
# Only return the first RG tag for ease
rg_tag = rg_tags[0]
# Creates dictionary of @RG header tags and their values
rg_tags_dict = {
"rg_id": rg_tag.get("ID", None),
"start_time": rg_tag.get("DT", None),
"basecall_model_version_id": rg_tag.get("DS", "")
.split(" ")[1]
.replace("basecall_model=", ""),
"run_id": rg_tag.get("DS", "").split(" ")[0].replace("runid=", ""),
"sample_id": rg_tag.get("LB", None),
"platform": rg_tag.get("PL", None),
"position": rg_tag.get("PM", None),
"flow_cell_id": rg_tag.get("PU", None),
"al": rg_tag.get("al", None),
}
return rg_tags_dict

def read_bam(self):
"""Extracts read data from a BAM file iteratively and yields in a FASTQ friendly format"""
for read in self.bam_file:
name = read.query_name
seq = read.seq
qual = read.qual
start_time_pr = read.get_tag("st")
channel = read.get_tag("ch")
read_number = read.get_tag("rn")
read_basecall_id = read.get_tag("RG")

# Combines read and @RG metrics to create description variable, using the same format as FASTQ files
desc = (
"@"
+ str(name)
+ " runid="
+ str(self.tags["run_id"])
+ " read="
+ str(read_number)
+ " ch="
+ str(channel)
+ " start_time="
+ str(start_time_pr)
+ " flow_cell_id="
+ str(self.tags["flow_cell_id"])
+ " protocol_group_id="
+ "UNKNOWN"
+ " sample_id="
+ str(self.tags["sample_id"])
+ " parent_read_id="
+ str(name)
+ " basecall_model_version_id="
+ str(self.tags["basecall_model_version_id"])
)
# Yields FASTQ metrics
yield desc, name, seq, qual


class OpenLine:
def __init__(self, fp, start=1, number=-1, f=open, f_kwds=None):
"""
Expand All @@ -31,7 +128,7 @@ def __init__(self, fp, start=1, number=-1, f=open, f_kwds=None):
"""
self.fp = fp
# seeking lines is zero indexed
self.start = max(start - 1 , 0)
self.start = max(start - 1, 0)
self.number = number
self.open_func = f
self.current_line = start
Expand Down Expand Up @@ -101,7 +198,9 @@ def check_fastq_path(path):
if folders_in_path[-2] in ("pass", "fail"):
return "{}_{}".format(folders_in_path[-2], folders_in_path[-1])
elif folders_in_path[-3] in ("pass", "fail"):
return "{}_{}_{}".format(folders_in_path[-3], folders_in_path[-2], folders_in_path[-1])
return "{}_{}_{}".format(
folders_in_path[-3], folders_in_path[-2], folders_in_path[-1]
)
else:
return "{}".format(folders_in_path[-1])
except Exception as e:
Expand Down Expand Up @@ -199,29 +298,56 @@ def average_quality(quality):
)


def get_runid(fastq):
def get_runid(file_path):
"""
Open a fastq file, read the first line and parse out the Run ID
:param fastq: path to the fastq file to be parsed
:type fastq: str
:return runid: The run ID of this fastq file as a string
Open a file, read the first line and parse out the Run ID
:param file_path: path to the file to be parsed
:type file_path: str
:return runid: The run ID of this file as a string
"""
runid = ""
if fastq.endswith(".gz"):
with gzip.open(fastq, "rt") as file:

runid = "" # EDITED TO INCLUDE BAM FILES
file_suffix = Path(file_path).suffix

if file_suffix == ".bam":
with pysam.AlignmentFile(file_path, "rb", check_sq=False) as file_path:
rg_tags = file_path.header.get("RG", [])
rg_tag = rg_tags[0]
line = rg_tag.get("DS", "").split(" ")[0]
elif file_suffix == ".gz":
with gzip.open(file_path, "rt") as file_path:
for _ in range(1):
line = file.readline()
else:
with open(fastq, "r") as file:
line = file_path.readline()
else: # this only works for others
with open(file_path, "r") as file_path:
for _ in range(1):
line = file.readline()
line = file_path.readline()
for _ in line.split():
if _.startswith("runid"):
runid = _.split("=")[1]
return runid


def unseen_files_in_watch_folder_dict(path, ignore_existing, minotour_api, fastq_dict, sequencing_statistics):
# if file_path.endswith(".bam"):
# with pysam.AlignmentFile(file_path, 'rb', check_sq=False) as file_path:
# rg_tags = file_path.header.get('RG', [])
# rg_tag = rg_tags[0]
# line = rg_tag.get('DS', '').split(' ')[0]
# elif file_path.endswith(".gz"):
# with gzip.open(file_path, "rt") as file_path:
# for _ in range(1):
# line = file_path.readline()
# else:
# with open(file_path, "r") as file_path:
# for _ in range(1):
# line = file_path.readline()
# for _ in line.split():
# if _.startswith("runid"):
# runid = _.split("=")[1]
# return runid


def unseen_files_in_watch_folder_dict(
path, ignore_existing, minotour_api, fastq_dict, sequencing_statistics
):
"""
Iterate fastq files in watch directory and see if we have seen them before by checking against server.
Parameters
Expand Down Expand Up @@ -251,11 +377,21 @@ def unseen_files_in_watch_folder_dict(path, ignore_existing, minotour_api, fastq
# if directory
if os.path.isdir(path):
log.info("caching existing fastq files in: {}".format(path))
sequencing_statistics.fastq_message = "caching existing fastq files in: {}".format(path)
sequencing_statistics.fastq_message = (
"caching existing fastq files in: {}".format(path)
)
## ToDo Consider moving these to top level
novel_run_set = set()
seen_file_tracker = {}
file_endings = {".fq", ".fastq", ".fq.gz", ".fastq.gz", ".fq.xz", ".fastq.xz"}
file_endings = {
".fq",
".fastq",
".fq.gz",
".fastq.gz",
".fq.xz",
".fastq.xz",
".bam",
}
# have s rummage around the watch directory
for path, dirs, files in os.walk(path):
# iterate fastq files in the watchdir
Expand All @@ -273,22 +409,26 @@ def unseen_files_in_watch_folder_dict(path, ignore_existing, minotour_api, fastq
sequencing_statistics.fastq_info[run_id]["files_seen"] += 1
sequencing_statistics.fastq_info[run_id]["run_id"] = run_id
if "directory" not in sequencing_statistics.fastq_info[run_id]:
sequencing_statistics.fastq_info[run_id]["directory"] = store_path
sequencing_statistics.fastq_info[run_id][
"directory"
] = store_path
if (
run_id not in novel_run_set
and run_id not in seen_file_tracker.keys()
):
# get all files for this run
result = minotour_api.get_json(EndPoint.FASTQ_FILE, base_id=run_id)
result = minotour_api.get_json(
EndPoint.FASTQ_FILE, base_id=run_id
)
#### Here we want to parse through the results and store them in some kind of dictionary in order that we can check what is happening
# We are parsing through the fastq files we have seen for this run so we don't reprocess them
if result is not None:
for entry in result:
if entry["runid"] not in seen_file_tracker.keys():
seen_file_tracker[entry["runid"]] = {}
seen_file_tracker[entry["runid"]][
entry["name"]
] = entry["md5"]
seen_file_tracker[entry["runid"]][entry["name"]] = (
entry["md5"]
)
# add the file path
filepath = os.path.join(path, f)
check_file_path = check_fastq_path(filepath)
Expand All @@ -307,10 +447,14 @@ def unseen_files_in_watch_folder_dict(path, ignore_existing, minotour_api, fastq
if int(file_byte_size) == int(
seen_file_tracker[run_id][check_file_path]
):
sequencing_statistics.fastq_info[run_id]["files_skipped"] += 1
sequencing_statistics.fastq_info[run_id][
"files_skipped"
] += 1
sequencing_statistics.files_skipped += 1
else:
new_fastq_file_dict[filepath] = os.stat(filepath).st_mtime
new_fastq_file_dict[filepath] = os.stat(
filepath
).st_mtime
novel_run_set.add(run_id)

if not seen_file:
Expand All @@ -325,11 +469,15 @@ def unseen_files_in_watch_folder_dict(path, ignore_existing, minotour_api, fastq
)
)
else:
sequencing_statistics.fastq_message = "Ignoring existing fastq files in: {}".format(path)
sequencing_statistics.fastq_message = (
"Ignoring existing fastq files in: {}".format(path)
)
return new_fastq_file_dict


def create_run_collection(run_id, run_dict, args, header, description_dict, sequencing_statistics):
def create_run_collection(
run_id, run_dict, args, header, description_dict, sequencing_statistics
):
"""
Create run collection for this run id if we don't already have one, store in run_dict
Parameters
Expand All @@ -352,4 +500,3 @@ def create_run_collection(run_id, run_dict, args, header, description_dict, sequ
"""
run_dict[run_id] = RunDataTracker(args, header, sequencing_statistics)
run_dict[run_id].add_run(description_dict, args)

Loading