diff --git a/crisprme.py b/crisprme.py index 97cdd34..d902067 100755 --- a/crisprme.py +++ b/crisprme.py @@ -1,16 +1,13 @@ #!/usr/bin/env python -import sys -import os +from Bio.Seq import Seq + import subprocess import itertools -from Bio.Seq import Seq +import sys +import os import re -import utils -# for getting lst of chr to know file extension and if enriched -from os.path import isfile, isdir, join -from os import listdir version = "2.1.3" # CRISPRme version; TODO: update when required __version__ = version @@ -91,8 +88,10 @@ def extractSequence(name, input_range, genome_selected): list_chr = [ f - for f in listdir(current_working_directory + "Genomes/" + genome_selected) - if isfile(join(current_working_directory + "Genomes/" + genome_selected, f)) + for f in os.listdir(current_working_directory + "Genomes/" + genome_selected) + if os.path.isfile( + os.path.join(current_working_directory + "Genomes/" + genome_selected, f) + ) and not f.endswith(".fai") ] add_ext = ".fa" @@ -867,7 +866,7 @@ def target_integration(): ) print( "\t--empirical_data, used to specify the file that contains empirical data provided by the user to assess in-silico targets" - ), + ) print("\t--output, used to specify the output folder for the results") exit(0) @@ -1136,52 +1135,40 @@ def crisprme_version(): def complete_test_crisprme(): - # check existence of directories - utils.check_directories("./") - - # create all necessary files to test the complete search with small chromosome - with open("PAMs/20bp-NGG-SpCas9.txt", "w") as pam_file: - pam_file.write("NNNNNNNNNNNNNNNNNNNNNGG 3\n") - with open("test_guide.txt", "w") as test_guide: - test_guide.write("CTAACAGTTGCTTTTATCACNNN\n") - with open("test_vcf_list.txt", "w") as test_vcf_list: - test_vcf_list.write("hg38_1000G/\n") - with open("test_samplesID_list.txt", "w") as test_output: - test_output.write("hg38_1000G.samplesID.txt\n") - utils.download_genome("chr22", "test") - utils.download_vcf("chr22", "1000G") - utils.download_samplesID() - utils.download_annotation() - - debug = False - if "--debug" in input_args: - debug = True - - # run complete search - if debug: - os.system( - "crisprme.py complete-search \ - --genome Genomes/test/ \ - --thread 4 \ - --bMax 1 --mm 4 --bDNA 1 --bRNA 1 --merge 3 \ - --pam PAMs/20bp-NGG-SpCas9.txt --guide test_guide.txt \ - --vcf test_vcf_list.txt --samplesID test_samplesID_list.txt \ - --annotation Annotations/encode+gencode.hg38.bed \ - --gene_annotation Annotations/gencode.protein_coding.bed \ - --output test_output \ - --debug" + if "--help" in input_args: + sys.stderr.write( + "Execute comprehensive testing for complete-search functionality " + "provided by CRISPRme\n" + "Options:\n" + "--chrom, test the complete-search functionality on the specified " + "chromosome (e.g., chr22). By default, the test is conducted on all " + "chromosomes\n" + "--vcf_dataset, VCFs dataset to be used during CRISPRme testing. " + "Available options include 1000 Genomes (1000G) and Human Genome " + "Diversity Project (HGDP). The default dataset is 1000 Genomes.\n" + "--debug, debug mode\n" ) - else: - os.system( - "crisprme.py complete-search \ - --genome Genomes/test/ \ - --thread 4 \ - --bMax 1 --mm 4 --bDNA 1 --bRNA 1 --merge 3 \ - --pam PAMs/20bp-NGG-SpCas9.txt --guide test_guide.txt \ - --vcf test_vcf_list.txt --samplesID test_samplesID_list.txt \ - --annotation Annotations/encode+gencode.hg38.bed \ - --gene_annotation Annotations/gencode.protein_coding.bed \ - --output test_output" + if "--chrom" in input_args: + try: + chrom = input_args[input_args.index("--chrom") + 1] + except IndexError: + sys.stderr.write("Please input some parameter for flag --chrom\n") + sys.exit(1) + if "--vcf" in input_args: + try: + vcf = input_args[input_args.index("--vcf") + 1] + except IndexError: + sys.stderr.write("Please input some parameter for flag --vcf\n") + sys.exit(1) + debug = "--debug" in input_args + # begin crisprme test + script_test = os.path.join( + script_path.replace("PostProcess", "src"), "crisprme_test.py" + ) + code = subprocess.call(f"python {script_test} {chrom} {vcf} {debug}", shell=True) + if code != 0: + raise OSError( + "\nCRISPRme test failed! See Results/crisprme-test-out/log_error.txt for details\n" ) @@ -1206,22 +1193,18 @@ def callHelp(): callHelp() elif sys.argv[1] == "complete-search": complete_search() -elif sys.argv[1] == "gnomAD-converter": - gnomAD_converter() -# elif sys.argv[1] == 'search-only': -# search_only() -# elif sys.argv[1] == 'post-analysis-only': -# post_analysis_only() +elif sys.argv[1] == "complete-test": + complete_test_crisprme() elif sys.argv[1] == "targets-integration": target_integration() -elif sys.argv[1] == "web-interface": - web_interface() +elif sys.argv[1] == "gnomAD-converter": + gnomAD_converter() elif sys.argv[1] == "generate-personal-card": personal_card() +elif sys.argv[1] == "web-interface": + web_interface() elif sys.argv[1] == "--version": crisprme_version() -elif sys.argv[1] == "complete-test": - complete_test_crisprme() else: sys.stderr.write('ERROR! "' + sys.argv[1] + '" is not an allowed!\n\n') callHelp() # print help when wrong input command diff --git a/src/crisprme_test.py b/src/crisprme_test.py new file mode 100644 index 0000000..668fcd0 --- /dev/null +++ b/src/crisprme_test.py @@ -0,0 +1,369 @@ +""" +""" + +from utils import ( + check_crisprme_directory_tree, + download, + gunzip, + untar, + rename, + CHROMS, + CRISPRME_DIRS, +) + +from typing import Tuple + +import subprocess +import sys +import os + +HG38URL = "https://hgdownload.soe.ucsc.edu/goldenPath/hg38" +VCF1000GPURL = ( + "ftp://ftp.1000genomes.ebi.ac.uk/vol1/ftp/data_collections/1000_genomes_project/" + "release/20190312_biallelic_SNV_and_INDEL/" + "ALL.{}.shapeit2_integrated_snvindels_v2a_27022019.GRCh38.phased.vcf.gz" +) +VCFHGDPURL = ( + "ftp://ngs.sanger.ac.uk:21/production/hgdp/hgdp_wgs.20190516/" + "hgdp_wgs.20190516.full.{}.vcf.gz" +) +SAMPLESIDURL = ( + "https://raw.githubusercontent.com/pinellolab/CRISPRme/test-function/download_data" +) +ANNOTATIONURL = "https://github.com/pinellolab/CRISPRme/raw/test-function/download_data" + + +def ensure_hg38_directory(dest: str) -> str: + """ + Ensure the existence of the 'hg38' directory within the specified destination + directory. + + Args: + dest (str): The destination directory where the 'hg38' directory should + be created. + + Returns: + str: The path to the 'hg38' directory. + """ + + hg38_dir = os.path.join(dest, "hg38") + if not os.path.exists(hg38_dir): # create hg38 directory within Genomes + os.mkdir(hg38_dir) + return hg38_dir + + +def download_genome_data(chrom: str, dest: str) -> None: + """ + Download genome data for a specified chromosome to the destination directory. + + Args: + chrom (str): The chromosome identifier in UCSC format. + dest (str): The destination directory to save the downloaded genome data. + + Returns: + None + + Raises: + ValueError: If the input chromosome is not valid. + FileExistsError: If the destination directory does not exist. + """ + + # assume chromosomes given in UCSC format (chr1, chr2, etc.) + if chrom not in CHROMS + ["all"]: + raise ValueError(f"Forbidden input chromosome ({chrom})") + if not os.path.isdir(dest): # check dest directory existence + raise FileExistsError(f"Unable to locate {dest}") + if chrom == "all": + chromstar = download(f"{HG38URL}/bigZips/hg38.chromFa.tar.gz", dest) + chromsdir = untar(chromstar, dest, "chroms") # decompress archive + # rename chroms dir to hg38 + chromsdir = rename(chromsdir, os.path.join(os.path.dirname(chromsdir), "hg38")) + assert os.path.isdir(chromsdir) + else: + chromgz = download(f"{HG38URL}/chromosomes/{chrom}.fa.gz", dest) + dest = ensure_hg38_directory(dest) # create hg38 directory + chromfa = gunzip( + chromgz, + os.path.join(dest, f"{os.path.splitext(os.path.basename(chromgz))[0]}"), + ) # decompress chrom FASTA + assert os.path.isfile(chromfa) + + +def ensure_vcf_dataset_directory(dest: str, dataset: str) -> str: + """ + Ensure the existence of a directory for a specific VCF dataset within the + specified destination directory. + + Args: + dest (str): The destination directory where the VCF dataset directory + should be created. + dataset (str): The name or identifier of the VCF dataset. + + Returns: + str: The path to the VCF dataset directory. + """ + + vcf_dataset_dir = os.path.join(dest, f"hg38_{dataset}") + if not os.path.exists(vcf_dataset_dir): + os.mkdir(vcf_dataset_dir) + return vcf_dataset_dir + + +def download_vcf_data(chrom: str, dest: str, dataset: str) -> None: + """ + Download VCF data for a specific chromosome and variant dataset to the destination + directory. + + Args: + chrom (str): The chromosome identifier in UCSC format. + dest (str): The destination directory to save the downloaded VCF data. + dataset (str): The name or identifier of the variant dataset (e.g., "1000G", + "HGDP"). + + Returns: + None + + Raises: + ValueError: If the input chromosome or variant dataset is invalid. + FileExistsError: If the destination directory does not exist. + """ + + # assume chromosomes given in UCSC format (chr1, chr2, etc.) + if chrom not in CHROMS + ["all"]: + raise ValueError(f"Forbidden input chromosome ({chrom})") + if not os.path.isdir(dest): # check dest directory existence + raise FileExistsError(f"Unable to locate {dest}") + # support for 1000 GP and HGDP datasets + if dataset not in ["1000G", "HGDP"]: + raise ValueError(f"Unknown variant dataset ({dataset})") + # create VCF dataset directory within VCFs folder + vcf_dataset_dir = ensure_vcf_dataset_directory(dest, dataset) + vcf_url = VCF1000GPURL if dataset == "1000G" else VCFHGDPURL + chroms = CHROMS if chrom == "all" else [chrom] + for c in chroms: + download(vcf_url.format(c), vcf_dataset_dir) + + +def ensure_samplesids_directory(dest: str) -> str: + """ + Ensure the existence of the 'samplesIDs' directory within the specified + destination directory. + + Args: + dest (str): The destination directory where the 'samplesIDs' directory + should be created. + + Returns: + str: The path to the 'samplesIDs' directory. + """ + + samplesids_dir = os.path.join(dest, CRISPRME_DIRS[6]) + if not os.path.exists(samplesids_dir): + os.mkdir(samplesids_dir) + return samplesids_dir + + +def download_samples_ids_data(dataset: str) -> None: + """ + Download samples IDs data for a specific variant dataset. + + Args: + dataset (str): The name or identifier of the variant dataset (e.g., + "1000G", "HGDP"). + + Returns: + None + + Raises: + ValueError: If the variant dataset is unknown. + """ + + if dataset not in ["1000G", "HGDP"]: + raise ValueError(f"Unknown variant dataset ({dataset})") + # samples ids folder must be located within current directory + # -- see check_crisprme_directory_tree() for details + samplesids_dir = ensure_samplesids_directory(os.getcwd()) + samplesid_fname = ( + "hg38_1000G.samplesID.txt" if dataset == "1000G" else "hg38_HGDP.samplesID.txt" + ) + download(f"{SAMPLESIDURL}/{samplesid_fname}", samplesids_dir) + + +def ensure_annotation_directory(dest: str) -> str: + """ + Ensure the existence of the 'annotation' directory within the specified + destination directory. + + Args: + dest (str): The destination directory where the 'annotation' directory + should be created. + + Returns: + str: The path to the 'annotation' directory. + """ + + annotation_dir = os.path.join(dest, CRISPRME_DIRS[4]) + if not os.path.exists(annotation_dir): + os.mkdir(annotation_dir) + return annotation_dir + + +def download_annotation_data() -> Tuple[str, str]: + """ + Download gencode and encode annotation data to the 'annotation' directory + within the current working directory. + + Returns: + Tuple[str, str]: Paths to the downloaded gencode and encode annotation + files. + """ + + annotation_dir = ensure_annotation_directory(os.getcwd()) + # download gencode annotation + gencodetar = download( + f"{ANNOTATIONURL}/gencode.protein_coding.tar.gz", annotation_dir + ) + gencode = os.path.join( + untar(gencodetar, annotation_dir), "gencode.protein_coding.bed" + ) + # download encode annotation + encodetar = download(f"{ANNOTATIONURL}/encode+gencode.hg38.tar.gz", annotation_dir) + encode = os.path.join(untar(encodetar, annotation_dir), "encode+gencode.hg38.bed") + return gencode, encode + + +def ensure_pams_directory(dest: str) -> str: + """ + Ensure the existence of the 'PAMs' directory within the specified destination + directory. + + Args: + dest (str): The destination directory where the 'PAMs' directory should + be created. + + Returns: + str: The path to the 'PAMs' directory. + """ + + pams_dir = os.path.join(dest, CRISPRME_DIRS[5]) + if not os.path.exists(pams_dir): + os.mkdir(pams_dir) + return pams_dir + + +def write_ngg_pamfile() -> str: + """ + Write a test PAM file containing the NGG sequence to the 'PAMs' directory + within the current working directory. + + Returns: + str: The path to the created test PAM file. + """ + + pams_dir = ensure_pams_directory( + os.getcwd() + ) # PAMs directory must be in current working dir + pamfile = os.path.join(pams_dir, "20bp-NGG-SpCas9.txt") + try: + with open(pamfile, mode="w") as outfile: + outfile.write("NNNNNNNNNNNNNNNNNNNNNGG 3\n") # 20 + 3 bp (NGG) + except IOError as e: + raise IOError("An error occurred while writing the test PAM file") from e + return pamfile + + +def write_sg1617_guidefile() -> str: + """ + Write a test guide file containing the sg1617 guide sequence. + + Returns: + str: The path to the created test guide file. + """ + + guidefile = "sg1617_test_guide.txt" + try: + with open(guidefile, mode="w") as outfile: + outfile.write("CTAACAGTTGCTTTTATCACNNN\n") # sg1617 guide + except IOError as e: + raise IOError("An error occerred while writing the test guide file") from e + return guidefile + + +def write_vcf_list(dataset: str) -> str: + """ + Write a test VCF list file for a specific variant dataset. + + Args: + dataset (str): The name or identifier of the variant dataset (e.g., "1000G", + "HGDP"). + + Returns: + str: The path to the created test VCF list file. + """ + + # support for 1000 GP and HGDP datasets + if dataset not in ["1000G", "HGDP"]: + raise ValueError(f"Unknown variant dataset ({dataset})") + # select the test vcf list + vcflist = f"hg38_{dataset}" + vcflistfile = "vcf_list_test.txt" + try: + with open(vcflistfile, mode="w") as outfile: + outfile.write(f"{vcflist}\n") + except IOError as e: + raise IOError("An error occurred while writing the test VCF list") from e + return vcflistfile + + +def write_samplesids_list(dataset: str) -> str: + """ + Write a test samples ID list file for a specific variant dataset. + + Args: + dataset (str): The name or identifier of the variant dataset (e.g., "1000G", + "HGDP"). + + Returns: + str: The path to the created test samples ID list file. + """ + + # support for 1000 GP and HGDP datasets + if dataset not in ["1000G", "HGDP"]: + raise ValueError(f"Unknown variant dataset ({dataset})") + # select the test vcf list + samplesidslist = ( + "hg38_1000G.samplesID.txt" if dataset == "1000G" else "hg38_HGDP.samplesID.txt" + ) + samplesidslistfile = "samplesID_list_test.txt" + try: + with open(samplesidslistfile, mode="w") as outfile: + outfile.write(f"{samplesidslist}\n") + except IOError as e: + raise IOError("An error occurred while writing the test VCF list") from e + return samplesidslistfile + + +def run_crisprme_test(chrom: str, dataset: str, debug: bool) -> None: + check_crisprme_directory_tree(os.getcwd()) # check crisprme directory tree + download_genome_data(chrom, CRISPRME_DIRS[0]) # download genome data + download_vcf_data(chrom, CRISPRME_DIRS[3], dataset) # download vcf data + download_samples_ids_data(dataset) # download vcf dataset samples ids + gencode, encode = ( + download_annotation_data() + ) # download gencode and encode annotation data + pam = write_ngg_pamfile() # write test NGG PAM file + guide = write_sg1617_guidefile() # write test sg1617 guide + vcf = write_vcf_list(dataset) # write test vcf list + samplesids = write_samplesids_list(dataset) # write test samples ids list + debug_arg = "--debug" if debug else "" + crisprme_cmd = ( + f"crisprme.py complete-search --genome {CRISPRME_DIRS[0]}/hg38 " + f"--thread 4 --bmax 1 --mm 4 --bDNA 1 --bRNA 1 --merge 3 --pam {pam} " + f"--guide {guide} --vcf {vcf} --samplesID {samplesids} --annotation {encode} " + f"--gene_annotation {gencode} --output crisprme-test-out {debug_arg}" + ) + subprocess.call(crisprme_cmd, shell=True) # run crisprme test + + +if __name__ == "__main__": + run_crisprme_test("chr22", "1000G", True) diff --git a/src/utils.py b/src/utils.py new file mode 100644 index 0000000..18ff8c2 --- /dev/null +++ b/src/utils.py @@ -0,0 +1,177 @@ +""" +""" + +from typing import Optional + +import subprocess +import tarfile +import gzip +import sys +import os + +CRISPRME_DIRS = [ + "Genomes", + "Results", + "Dictionaries", + "VCFs", + "Annotations", + "PAMs", + "samplesIDs", +] +CHROMS = [f"chr{i}" for i in list(range(1, 23)) + ["X"]] + + +def check_crisprme_directory_tree(basedir: str) -> None: + """ + Check if the CRISPRme directory tree exists in the specified base directory + and create it if not. + + Args: + basedir (str): The base directory path to check and create the CRISPRme + directory tree. + + Returns: + None + + Raises: + TypeError: If basedir is not a string. + FileExistsError: If basedir does not exist. + + Examples: + check_crisprme_directory_tree('/path/to/base/directory') + """ + + if not isinstance(basedir, str): + raise TypeError(f"Expected {str.__name__}, got {type(basedir).__name__}") + if not os.path.exists(basedir): + raise FileExistsError(f"Unable to locate {basedir}") + for d in CRISPRME_DIRS: # create CRISPRme directory tree + # if directory not found, create it + if not os.path.isdir(os.path.join(basedir, d)): + os.makedirs(os.path.join(basedir, d)) + + +def download(url: str, dest: str) -> str: + """ + Download a file from the specified URL to the destination directory. + + Args: + url (str): The URL of the file to download. + dest (str): The destination directory to save the downloaded file. + + Returns: + str: The file path of the downloaded file. + + Raises: + TypeError: If url is not a string. + subprocess.SubprocessError: If the download process fails. + + Examples: + download('http://example.com/file.txt', '/path/to/destination/directory') + """ + + if not isinstance(url, str): + raise TypeError(f"Expected {str.__name__}, got {type(url).__name__}") + fname = os.path.join(dest, os.path.basename(url)) + code = subprocess.call(f"wget {url} -O {fname}", shell=True) # download using wget + if code != 0: + raise subprocess.SubprocessError(f"Download from {url} failed") + assert os.path.isfile(fname) + return fname + + +def remove(fname: str) -> None: + """ + Remove a file or directory specified by the given path. + + Args: + fname (str): The path of the file or directory to be removed. + + Returns: + None + + Raises: + subprocess.SubprocessError: If an error occurs during the removal process. + """ + + code = subprocess.call(f"rm -rf {fname}", shell=True) + if code != 0: + raise subprocess.SubprocessError(f"An error occurred while removing {fname}") + + +def rename(orig: str, newname: str) -> str: + """ + Rename a file or directory from the original path to the new specified name. + + Args: + orig (str): The original path of the file or directory to be renamed. + newname (str): The new name or path to rename the file or directory to. + + Returns: + str: The new path or name after renaming. + + Raises: + subprocess.SubprocessError: If the renaming process fails. + """ + + code = subprocess.call(f"mv {orig} {newname}", shell=True) + if code != 0: + subprocess.SubprocessError(f"Renaming {orig} failed") + assert os.path.exists(newname) + return newname + + +def untar(fname_tar_gz: str, dest: str, outdir: Optional[str] = "") -> str: + """ + Decompress and extract the contents of a tar.gz file to the specified + destination directory. + + Args: + fname_tar_gz (str): The path to the tar.gz file to decompress and extract. + dest (str): The destination directory to extract the contents to. + outdir (Optional[str]): Optional subdirectory within the destination to + extract the contents to. + + Returns: + str: The path to the directory where the contents were extracted. + + Raises: + IOError: If an error occurs during the decompression process. + """ + + try: + with gzip.open(fname_tar_gz, mode="rb") as fin: + with tarfile.open(fileobj=fin, mode="r") as tar: + tar.extractall(dest) + except IOError as e: + raise IOError(f"An error occurred while decompressing {fname_tar_gz}") from e + outdir = os.path.join(dest, outdir) if outdir else dest + assert os.path.isdir(outdir) + remove(fname_tar_gz) # delete compressed archive + return outdir + + +def gunzip(fname_gz: str, fname_out: str) -> str: + """ + Decompress a gzip file to the specified output file. + + Args: + fname_gz (str): The path to the gzip file to decompress. + fname_out (str): The path to save the decompressed output. + + Returns: + str: The path to the decompressed output file. + + Raises: + IOError: If an error occurs during the decompression process. + """ + + try: + with gzip.open(fname_gz, mode="rb") as fin: + with open(fname_out, mode="wb") as fout: + fout.write(fin.read()) + except IOError as e: + raise IOError(f"An error occurred while decompressing {fname_gz}") from e + assert os.stat(fname_out).st_size > 0 + remove(fname_gz) # delete compressed archive + return fname_out