diff --git a/howard/functions/from_annovar.py b/howard/functions/from_annovar.py index d965e5f..a1d1af8 100644 --- a/howard/functions/from_annovar.py +++ b/howard/functions/from_annovar.py @@ -2,32 +2,21 @@ import csv import gc -import io -import multiprocessing +import sys import os -import re import subprocess -from tempfile import NamedTemporaryFile -import tempfile import duckdb -import json import argparse + import Bio.bgzf as bgzf import pandas as pd -import vcf import logging as log -import sys -from functools import partial -import itertools - import pyarrow as pa import pyarrow.parquet as pq import pyarrow.csv as pcsv - from howard.objects.variants import Variants -from howard.objects.database import Database -from howard.functions.commons import * +from howard.functions.commons import full_path, remove_if_exists # Dictionnaire des types @@ -118,7 +107,7 @@ def from_annovar(args: argparse) -> None: # Output output_dirname = os.path.dirname(output_file) - output_file_name, output_file_ext = os.path.splitext(os.path.basename(output_file)) + _, output_file_ext = os.path.splitext(os.path.basename(output_file)) if output_file_ext not in [".gz"]: log.error(f"Output file '{output_file}' without compress '.gz' extension") raise ValueError( @@ -134,7 +123,7 @@ def from_annovar(args: argparse) -> None: # To Parquet if output_file_parquet: output_parquet_dirname = os.path.dirname(output_file_parquet) - output_file_parquet_name, output_file_parquet_ext = os.path.splitext( + _, output_file_parquet_ext = os.path.splitext( os.path.basename(output_file_parquet) ) if output_file_parquet_ext not in [".parquet"]: @@ -181,7 +170,7 @@ def from_annovar(args: argparse) -> None: # first VCF # Annovar to VCF - log.info(f"Annovar to VCF and Parquet...") + log.info("Annovar to VCF and Parquet...") annovar_to_vcf( input_file=input_file, output_file=output_file, @@ -199,7 +188,7 @@ def from_annovar(args: argparse) -> None: ) # Header VCF hdr - log.info(f"VCF Extract header hdr for VCF...") + log.info("VCF Extract header hdr for VCF...") command = f"""{bcftools} view -h {output_file} 1>{output_file}.hdr """ log.debug("bcftools command: " + command) subprocess.run(command, shell=True) @@ -208,7 +197,7 @@ def from_annovar(args: argparse) -> None: if output_file_parquet: # File already generated # Header VCF hdr - log.info(f"VCF Extract header hdr for Parquet...") + log.info("VCF Extract header hdr for Parquet...") command = f"""{bcftools} view -h {output_file} 1>{output_file_parquet}.hdr """ log.debug("bcftools command: " + command) subprocess.run(command, shell=True) @@ -323,14 +312,15 @@ def annovar_to_vcf( "#Chr", "#chr", "Chr", - "chr" "chromosome", + "chr", + "chromosome", ] for chr in chrom_list_for_header: if chr in header: header[header.index(chr)] = "#CHROM" log.debug(f"found '{chr}' as chromosome column") if "#CHROM" not in header: - log.debug(f"default chromosome column is 0") + log.debug("default chromosome column is 0") header[0] = "#CHROM" if "POS" not in header: pos_list_for_header = ["Pos", "pos", "Start", "START"] @@ -339,7 +329,7 @@ def annovar_to_vcf( header[header.index(pos)] = "POS" log.debug(f"found '{pos}' as position column") if "POS" not in header: - log.debug(f"default position column is 1") + log.debug("default position column is 1") header[1] = "POS" if "REF" not in header: ref_list_for_header = ["Ref", "ref"] @@ -348,7 +338,7 @@ def annovar_to_vcf( header[header.index(ref)] = "REF" log.debug(f"found '{ref}' as reference column") if "REF" not in header: - log.debug(f"default reference column is 3") + log.debug("default reference column is 3") header[3] = "REF" if "ALT" not in header: alt_list_for_header = ["Alt", "alt"] @@ -357,7 +347,7 @@ def annovar_to_vcf( header[header.index(alt)] = "ALT" log.debug(f"found '{alt}' as alternative column") if "ALT" not in header: - log.debug(f"default alternative column is 4") + log.debug("default alternative column is 4") header[4] = "ALT" break else: @@ -706,7 +696,7 @@ def annovar_to_vcf( if reduce_memory and multi_variant: # Log - log.debug(f"Create View From TSV to Parquet") + log.debug("Create View From TSV to Parquet") # Create Parquet file from TSV tsv_to_parquet( @@ -998,81 +988,19 @@ def parquet_info_explode( config_ro["memory_limit"] = memory config_threads["memory_limit"] = memory + # Reduce memory if reduce_memory: - - # List of exploded parquet files - list_of_exploded_files = [] - - # Create - parquet_input = Variants(input=input_file, config=config_ro) - parquet_chromosomes = parquet_input.get_query_to_df( - query=f""" SELECT "#CHROM" FROM read_csv('{input_file}',AUTO_DETECT=TRUE) WHERE "#CHROM" NOT NULL GROUP BY "#CHROM" """ - ) - - for chrom in parquet_chromosomes["#CHROM"]: - - # Split Chrom - query_chrom = f""" SELECT * FROM read_csv('{input_file}',AUTO_DETECT=TRUE) WHERE "#CHROM"='{chrom}' """ - query_chrom_output = f"{output_file}.{chrom}.parquet" - query_chrom_explode_output = f"{output_file}.{chrom}.explode.parquet" - - # Log - log.info(f"Explode infos chromosome {chrom}") - - # Extract - log.debug(f"Extract chromosome {chrom}: {query_chrom_explode_output}") - remove_if_exists([query_chrom_output, query_chrom_output + ".hdr"]) - parquet_input.export_output( - output_file=query_chrom_output, query=query_chrom, export_header=True - ) - - # Explode - log.debug( - f"Explode infos for chromosome {chrom}: {query_chrom_explode_output}" - ) - list_of_exploded_files.append(query_chrom_explode_output) - parquet_explode = Variants( - input=query_chrom_output, - output=query_chrom_explode_output, - config=config_threads, - param=param_explode, - ) - parquet_explode.load_data() - remove_if_exists( - [query_chrom_explode_output, query_chrom_explode_output + ".hdr"] - ) - parquet_explode.export_output() - remove_if_exists([query_chrom_output, query_chrom_output + ".hdr"]) - - # list_of_exploded_files - log.info(f"Merge explode infos") - log.debug(f"Merge explode infos files: {list_of_exploded_files}") - query_explode = f""" SELECT * FROM read_parquet({list_of_exploded_files}) """ - query_explode_output = output_file - remove_if_exists([query_explode_output, query_explode_output + ".hdr"]) - parquet_explode.export_output( - output_file=query_explode_output, query=query_explode, export_header=True - ) - - # Clear - for file in list_of_exploded_files: - remove_if_exists([file, file + ".hdr"]) - - else: - - # Create - parquet_explode = Variants( - input=input_file, - output=output_file, - config=config_threads, - param=param_explode, - ) - parquet_explode.load_data() - parquet_explode.export_output() - - # Check - # df = parquet_input.get_query_to_df(f"SELECT * FROM '{query_explode_output}' LIMIT 10 ") - # log.debug(df) + param_explode["chunk_size"] = 100000 + + # Create + parquet_explode = Variants( + input=input_file, + output=output_file, + config=config_threads, + param=param_explode, + ) + parquet_explode.load_data() + parquet_explode.export_output() def tsv_to_parquet( diff --git a/howard/objects/database.py b/howard/objects/database.py index a03cc2f..0e9182c 100644 --- a/howard/objects/database.py +++ b/howard/objects/database.py @@ -1,22 +1,34 @@ -# import pyarrow as pa -# import pyarrow.csv as csv -import fileinput import hashlib -import random -from shutil import copyfileobj -import string -import polars as pl -import pandas as pd import duckdb import sqlite3 import vcf +import glob +import os +import io +import pgzip +import shutil -# from Bio import bgzf +import polars as pl +import pandas as pd import Bio.bgzf as bgzf - -from howard.functions.commons import * -from howard.functions.databases import * - +import pyarrow.parquet as pq +import pyarrow as pa +import logging as log + +from tempfile import TemporaryDirectory +from typing import Optional, Union + +from howard.functions.commons import ( + load_duckdb_extension, + get_file_format, + full_path, + get_file_compressed, + get_compression_type, + remove_if_exists, + concat_and_compress_files, + DTYPE_LIMIT_AUTO, + CODE_TYPE_MAP, +) SEP_TYPE = { "vcf": "\t", @@ -208,17 +220,17 @@ def set_database( elif self.find_database( database=database, databases_folders=databases_folders, - format=format, + database_format=format, assembly=assembly, ): self.database = self.find_database( database=database, databases_folders=databases_folders, - format=format, + database_format=format, assembly=assembly, ) - def set_databases_folders(self, databases_folders: list = ["."]) -> None: + def set_databases_folders(self, databases_folders: list = None) -> None: """ This function sets the list of folders where databases are located as an attribute of an object. @@ -228,6 +240,9 @@ def set_databases_folders(self, databases_folders: list = ["."]) -> None: :type databases_folders: list """ + if databases_folders is None: + databases_folders = ["."] + self.databases_folders = databases_folders def get_database_folders(self) -> list: @@ -367,7 +382,7 @@ def get_header_from_list(self, header_list: list = None) -> vcf: log.error(inst) raise ValueError(inst) else: - log.warning(f"Warning in VCF header") + log.warning("Warning in VCF header") log.debug(f"header_list={header_list}") return None @@ -390,7 +405,7 @@ def get_header_from_file(self, header_file: str) -> vcf: return self.get_header_from_list(header_list) - def find_header_file(self, database: str = None) -> str: + def find_header_file(self, database: str = None) -> Optional[str]: """ This function finds the header file for a given database in various formats. @@ -533,19 +548,16 @@ def get_header_from_columns( # Attach if need if self.get_sql_database_attach(database=database, output="attach"): self.query( - database=database, query=self.get_sql_database_attach(database=database, output="attach"), ) # database columns database_query_columns = None if sql_query: - database_query_columns = self.query(sql_query) + database_query_columns = self.query(query=sql_query) if not database_query_columns: database_query_columns_sql = f""" SELECT * FROM {self.get_sql_database_link(database=database)} LIMIT {DTYPE_LIMIT_AUTO} """ - database_query_columns = self.query( - database=database, query=database_query_columns_sql - ) + database_query_columns = self.query(query=database_query_columns_sql) # Remove specific VCF column if is a VCF type if ( @@ -591,20 +603,16 @@ def get_header_from_columns( # Detach if need if self.get_sql_database_attach(database=database, output="detach"): self.query( - database=database, query=self.get_sql_database_attach(database=database, output="detach"), ) return database_header - def query(self, database: str = None, query: str = None) -> object: + def query(self, query: str = None) -> object: """ This is a Python function that takes in a database and query string as parameters and returns the result of the query on the database. - :param database: A string representing the name of the database to query. If no database is - provided, the method will attempt to retrieve the default database from the connection object - :type database: str :param query: The query parameter is a string that represents the SQL query that needs to be executed on the database. It can be any valid SQL statement such as SELECT, INSERT, UPDATE, DELETE, etc @@ -613,11 +621,8 @@ def query(self, database: str = None, query: str = None) -> object: database. If no query is provided, the method returns None. """ - if not database: - database = self.get_database() - if query: - return self.conn.query(query) + return self.get_conn().query(query) else: return None @@ -704,7 +709,7 @@ def set_header_file(self, header_file: str = None) -> None: self.header_file = header_file - def get_header_columns_from_database(self, database: str = None) -> list: + def get_header_columns_from_database(self, database: str = None) -> Optional[list]: """ The function `get_header_columns_from_database` retrieves the column names from a specified database table. @@ -723,9 +728,8 @@ def get_header_columns_from_database(self, database: str = None) -> list: if sql_from: sql_query = f"SELECT * FROM {sql_from} LIMIT 0" try: - # columns_list = list(self.conn.query(sql_query).df().columns) columns_list = list(self.conn.query(sql_query).columns) - except: + except ValueError: columns_list = None return columns_list else: @@ -845,9 +849,9 @@ def find_database( self, database: str = None, databases_folders: list = None, - format: str = None, + database_format: str = None, assembly: str = None, - ) -> str: + ) -> Optional[str]: """ This function finds a database file in a specified folder or the current directory. @@ -873,8 +877,8 @@ def find_database( if not database: database = self.get_database() - if not format: - format = self.get_format() + if not database_format: + database_format = self.get_format() if not assembly: assembly = self.get_assembly() @@ -892,7 +896,11 @@ def find_database( database_file = None - for format_extension in ["", f".{format}", f".{format}.gz"]: + for format_extension in [ + "", + f".{database_format}", + f".{database_format}.gz", + ]: # find in subfolder assemby if assembly: @@ -950,7 +958,7 @@ def get_database(self) -> str: return self.database - def get_database_basename(self, database: str = None) -> str: + def get_database_basename(self, database: str = None) -> Optional[str]: """ This function returns the basename of a database file. @@ -974,7 +982,7 @@ def get_database_basename(self, database: str = None) -> str: else: return None - def get_database_dirname(self, database: str = None) -> str: + def get_database_dirname(self, database: str = None) -> Optional[str]: """ This function returns the directory name of a given database or the current database if none is specified. @@ -1064,7 +1072,7 @@ def get_format(self, database: str = None) -> str: else: return get_file_format(database) - def get_type(self, database: str = None, sql_query: str = None) -> str: + def get_type(self, database: str = None, sql_query: str = None) -> Optional[str]: """ The `get_type` function determines the type of a database (variants VCF-like or regions BED-like) based on its columns and format. @@ -1105,7 +1113,7 @@ def get_type(self, database: str = None, sql_query: str = None) -> str: else: return None - def get_database_tables(self, database: str = None) -> str: + def get_database_tables(self, database: str = None) -> Union[str, list, None]: """ This function retrieves a list of tables in a specified database using the DuckDB format. @@ -1119,19 +1127,19 @@ def get_database_tables(self, database: str = None) -> str: if not database: database = self.get_database() if database and self.exists(database): - format = self.get_format(database) - if format in ["conn"]: + database_format = self.get_format(database) + if database_format in ["conn"]: database_conn = database database_tables = list(database_conn.query("SHOW TABLES;").df()["name"]) return database_tables - elif format in ["duckdb"]: + elif database_format in ["duckdb"]: database_conn = duckdb.connect(database) database_tables = list(database_conn.query("SHOW TABLES;").df()["name"]) database_conn.close() return database_tables - elif format in ["sqlite"]: + elif database_format in ["sqlite"]: database_conn = sqlite3.connect(database) - sql_query = f"SELECT name FROM sqlite_master WHERE type='table'" + sql_query = "SELECT name FROM sqlite_master WHERE type='table'" database_tables = list( pd.read_sql_query(sql_query, database_conn)["name"] ) @@ -1141,7 +1149,7 @@ def get_database_tables(self, database: str = None) -> str: else: return None - def get_database_table(self, database: str = None) -> str: + def get_database_table(self, database: str = None) -> Optional[str]: """ This function returns the name of a table in a specified database if it exists and is in a supported format. @@ -1177,7 +1185,7 @@ def get_database_table(self, database: str = None) -> str: def get_type_from_columns( self, database_columns: list = [], check_database_type: str = None - ) -> str: + ) -> Optional[str]: """ This function returns the type of a database based on the provided list of columns. @@ -1343,7 +1351,7 @@ def get_sql_from( nb_columns_detected_by_duckdb = len( self.conn.query(query_nb_columns_detected_by_duckdb).columns ) - except: + except ValueError: nb_columns_detected_by_duckdb = 0 # Check table columns @@ -1399,7 +1407,7 @@ def get_sql_database_attach( self, database: str = None, output: str = "query", - ) -> str: + ) -> Optional[str]: """ This function returns a SQL query to attach or detach a database based on the specified format and output. @@ -1480,7 +1488,9 @@ def get_sql_database_link(self, database: str = None) -> str: return sql_database_link - def create_view(self, database: str = None, view_name: str = "variants") -> str: + def create_view( + self, database: str = None, view_name: str = "variants" + ) -> Optional[str]: """ The `create_view` function creates a view in a specified database or the default database, using a SQL database link. @@ -1511,13 +1521,13 @@ def create_view(self, database: str = None, view_name: str = "variants") -> str: ) self.view_name = view_name return view_name - except: + except ValueError: return None else: self.view_name = None return None - def get_view(self, database: str = None, create_view: str = None) -> str: + def get_view(self, database: str = None, create_view: str = None) -> Optional[str]: """ The `get_view` function returns the name of a view in a database, or creates a new view if specified. @@ -1749,7 +1759,6 @@ def get_columns( table = self.get_database_table(database=database) if sql_query: - # columns_list = list(database.query(sql_query).df().columns) columns_list = list(database.query(sql_query).columns) return columns_list @@ -1760,14 +1769,12 @@ def get_columns( if table: database_conn = database sql_query = f"SELECT * FROM {table} LIMIT 0" - # columns_list = list(database_conn.query(sql_query).df().columns) columns_list = list(database_conn.query(sql_query).columns) return columns_list elif database_format in ["duckdb"]: if table: database_conn = duckdb.connect(database) sql_query = f"SELECT * FROM {table} LIMIT 0" - # columns_list = list(database_conn.query(sql_query).df().columns) columns_list = list(database_conn.query(sql_query).columns) database_conn.close() return columns_list @@ -1792,9 +1799,8 @@ def get_columns( database=database, header_file=header_file ) sql_query = f"SELECT * FROM {sql_from} LIMIT 0" - # return list(self.conn.query(sql_query).df().columns) return list(self.conn.query(sql_query).columns) - except: + except ValueError: return [] return [] @@ -1867,7 +1873,7 @@ def get_table_columns_from_file( # Try from database file try: table_header = self.read_header_file(database) - except: + except ValueError: table_header = None if table_header: @@ -1877,7 +1883,7 @@ def get_table_columns_from_file( .strip() .split(delimiter) ) - except: + except IndexError: table_columns = None else: table_columns = None @@ -1886,7 +1892,7 @@ def get_table_columns_from_file( # Try from header file try: table_header = self.read_header_file(header_file) - except: + except ValueError: table_header = None if table_header: @@ -1896,7 +1902,7 @@ def get_table_columns_from_file( .strip() .split(delimiter) ) - except: + except IndexError: table_columns = None else: table_columns = None @@ -2082,7 +2088,7 @@ def is_genotype_column( SELECT * FROM df_downsampling WHERE ( - regexp_matches("{column}", '^[0-9\.]([/|][0-9\.])+') + regexp_matches("{column}", '^[0-9.]([/|][0-9.])+') {query_format} ) """ @@ -2357,7 +2363,7 @@ def export( # JSON elif output_type in ["json"]: - query_export_format = f"FORMAT JSON, ARRAY TRUE" + query_export_format = "FORMAT JSON, ARRAY TRUE" include_header = False post_process = True if order_by_clean: @@ -2373,7 +2379,7 @@ def export( # Parquet elif output_type in ["parquet"]: - query_export_format = f"FORMAT PARQUET" + query_export_format = "FORMAT PARQUET" # Export options export_options = { "format": "PARQUET", @@ -2752,7 +2758,7 @@ def export( """ # Export with duckdb - self.query(database=database, query=query_copy) + self.query(query=query_copy) # Export mode unknown else: @@ -2763,7 +2769,7 @@ def export( if post_process: # Log - number of records - log.debug(f"Post processing...") + log.debug("Post processing...") # Input files input_files = [] @@ -2823,7 +2829,7 @@ def export( if output_header and export_header: # Log - Generate header - log.debug(f"Generate header...") + log.debug("Generate header...") # Create database database_for_header = Database(database=output_database) @@ -2850,6 +2856,6 @@ def export( # Clean tmp files (deprecated) remove_if_exists(tmp_files) - + # Return if file exists return os.path.exists(output_database) and self.get_type(output_database) diff --git a/howard/objects/variants.py b/howard/objects/variants.py index ae89f49..9f2b6d9 100644 --- a/howard/objects/variants.py +++ b/howard/objects/variants.py @@ -13,6 +13,7 @@ import tempfile import duckdb import json +from sklearn.cluster import DBSCAN import yaml import argparse import Bio.bgzf as bgzf @@ -2055,7 +2056,7 @@ def export_output( header_in_output=header_in_output, order_by=order_by, query=query, - export_header=export_header + export_header=export_header, ) # Remove @@ -4922,9 +4923,6 @@ def annotation_parquet(self, threads: int = None) -> None: f"Existing annotations in VCF: {vcf_annotation} [{vcf_annotation_line}]" ) - # prefix - prefix = self.get_explode_infos_prefix() - # Added columns added_columns = [] @@ -4976,7 +4974,6 @@ def annotation_parquet(self, threads: int = None) -> None: # Find files parquet_file = database.get_database() parquet_hdr_file = database.get_header_file() - parquet_format = database.get_format() parquet_type = database.get_type() # Check if files exists @@ -5045,9 +5042,9 @@ def annotation_parquet(self, threads: int = None) -> None: ) # For all fields in database - annotation_fields_ALL = False + annotation_fields_all = False if "ALL" in annotation_fields or "INFO" in annotation_fields: - annotation_fields_ALL = True + annotation_fields_all = True annotation_fields = { key: key for key in parquet_hdr_vcf_header_infos } @@ -5242,7 +5239,7 @@ def annotation_parquet(self, threads: int = None) -> None: if ( allow_annotation_full_info and nb_annotation_field == len(annotation_fields) - and annotation_fields_ALL + and annotation_fields_all and ( "INFO" in parquet_hdr_vcf_header_columns and "INFO" in database.get_extra_columns() @@ -5264,199 +5261,112 @@ def annotation_parquet(self, threads: int = None) -> None: sql_query_annotation_update_info_sets ) - # Check chromosomes list (and variant max position) - sql_query_chromosomes_max_pos = f""" SELECT table_variants."#CHROM" as CHROM, MAX(table_variants."POS") as MAX_POS, MIN(table_variants."POS")-1 as MIN_POS FROM {table_variants} as table_variants GROUP BY table_variants."#CHROM" """ - sql_query_chromosomes_max_pos_df = self.conn.execute( - sql_query_chromosomes_max_pos + # Check chromosomes list (and variants infos) + sql_query_chromosomes = f""" + SELECT table_variants."#CHROM" as CHROM, count(*) AS count_variants, min(POS) AS min_variants, MAX(POS) AS max_variants + FROM {table_variants} as table_variants + GROUP BY table_variants."#CHROM" + ORDER BY table_variants."#CHROM" + """ + sql_query_chromosomes_df = self.conn.execute( + sql_query_chromosomes ).df() + sql_query_chromosomes_dict = { + entry["CHROM"]: { + "count": entry["count_variants"], + "min": entry["min_variants"], + "max": entry["max_variants"], + } + for index, entry in sql_query_chromosomes_df.iterrows() + } - # Create dictionnary with chromosomes (and max position) - sql_query_chromosomes_max_pos_dictionary = ( - sql_query_chromosomes_max_pos_df.groupby("CHROM") - .apply( - lambda x: { - "max_pos": x["MAX_POS"].max(), - "min_pos": x["MIN_POS"].min(), - } - ) - .to_dict() - ) - - # Affichage du dictionnaire - log.debug( - "Chromosomes max pos found: " - + str(sql_query_chromosomes_max_pos_dictionary) - ) - - # nb_of_variant_annotated + # Init nb_of_query = 0 nb_of_variant_annotated = 0 - # query_dict = {} query_dict = query_dict_remove - for chrom in sql_query_chromosomes_max_pos_dictionary: + # for chrom in sql_query_chromosomes_df["CHROM"]: + for chrom in sql_query_chromosomes_dict: - # nb_of_variant_annotated_by_chrom - nb_of_variant_annotated_by_chrom = 0 + # Number of variant by chromosome + nb_of_variant_by_chrom = sql_query_chromosomes_dict.get( + chrom, {} + ).get("count", 0) - # Get position of the farthest variant (max position) in the chromosome - sql_query_chromosomes_max_pos_dictionary_max_pos = ( - sql_query_chromosomes_max_pos_dictionary.get( - chrom, {} - ).get("max_pos") - ) - sql_query_chromosomes_max_pos_dictionary_min_pos = ( - sql_query_chromosomes_max_pos_dictionary.get( - chrom, {} - ).get("min_pos") - ) - - # Autodetect range of bases to split/chunk log.debug( - f"Annotation '{annotation_name}' - Chromosome '{chrom}' - Start Autodetection Intervals..." - ) - - batch_annotation_databases_step = None - batch_annotation_databases_ncuts = 1 - - # Create intervals from 0 to max position variant, with the batch window previously defined - sql_query_intervals = split_interval( - sql_query_chromosomes_max_pos_dictionary_min_pos, - sql_query_chromosomes_max_pos_dictionary_max_pos, - step=batch_annotation_databases_step, - ncuts=batch_annotation_databases_ncuts, + f"Annotation '{annotation_name}' - Chromosome '{chrom}' [{nb_of_variant_by_chrom} variants]..." ) - log.debug( - f"Annotation '{annotation_name}' - Chromosome '{chrom}' - Stop Autodetection Intervals" - ) - - # Interval Start/Stop - sql_query_interval_start = sql_query_intervals[0] - - # For each interval - for i in sql_query_intervals[1:]: - - # Interval Start/Stop - sql_query_interval_stop = i - - log.debug( - f"Annotation '{annotation_name}' - Chromosome '{chrom}' - Interval [{sql_query_interval_start}-{sql_query_interval_stop}] ..." - ) - - log.debug( - f"Annotation '{annotation_name}' - Chromosome '{chrom}' - Interval [{sql_query_interval_start}-{sql_query_interval_stop}] - Start detecting regions..." - ) - - regions = [ - ( - chrom, - sql_query_interval_start, - sql_query_interval_stop, - ) - ] - - log.debug( - f"Annotation '{annotation_name}' - Chromosome '{chrom}' - Interval [{sql_query_interval_start}-{sql_query_interval_stop}] - Stop detecting regions" - ) - - # Fusion des régions chevauchantes - if regions: - - # Number of regions - nb_regions = len(regions) - - # create where caluse on regions - clause_where_regions_variants = create_where_clause( - regions, table="table_variants" - ) - - log.debug( - f"Annotation '{annotation_name}' - Chromosome '{chrom}' - Interval [{sql_query_interval_start}-{sql_query_interval_stop}] - {nb_regions} regions..." - ) - - # Annotation with regions database - if parquet_type in ["regions"]: - sql_query_annotation_from_clause = f""" - FROM ( - SELECT - '{chrom}' AS \"#CHROM\", - table_variants_from.\"POS\" AS \"POS\", - {",".join(sql_query_annotation_to_agregate)} - FROM {table_variants} as table_variants_from - LEFT JOIN {parquet_file_link} as table_parquet_from ON ( - table_parquet_from.\"#CHROM\" in ('{chrom}') - AND table_variants_from.\"POS\" <= table_parquet_from.\"END\" - AND (table_variants_from.\"POS\" >= (table_parquet_from.\"START\"+1) - OR table_variants_from.\"POS\" + (len(table_variants_from.\"REF\")-1) >= (table_parquet_from.\"START\"+1) - ) + # Annotation with regions database + if parquet_type in ["regions"]: + sql_query_annotation_from_clause = f""" + FROM ( + SELECT + '{chrom}' AS \"#CHROM\", + table_variants_from.\"POS\" AS \"POS\", + {",".join(sql_query_annotation_to_agregate)} + FROM {table_variants} as table_variants_from + LEFT JOIN {parquet_file_link} as table_parquet_from ON ( + table_parquet_from."#CHROM" = '{chrom}' + AND table_variants_from.\"POS\" <= table_parquet_from.\"END\" + AND (table_variants_from.\"POS\" >= (table_parquet_from.\"START\"+1) + OR table_variants_from.\"POS\" + (len(table_variants_from.\"REF\")-1) >= (table_parquet_from.\"START\"+1) ) - WHERE table_variants_from.\"#CHROM\" in ('{chrom}') - GROUP BY table_variants_from.\"POS\" - ) - as table_parquet - """ - - sql_query_annotation_where_clause = """ - table_parquet.\"#CHROM\" = table_variants.\"#CHROM\" - AND table_parquet.\"POS\" = table_variants.\"POS\" - """ - - # Annotation with variants database - else: - sql_query_annotation_from_clause = f""" - FROM {parquet_file_link} as table_parquet - """ - sql_query_annotation_where_clause = f""" - table_parquet.\"#CHROM\" in ('{chrom}') - AND ( {clause_where_regions_variants} ) - AND table_parquet.\"#CHROM\" = table_variants.\"#CHROM\" - AND table_parquet.\"POS\" = table_variants.\"POS\" - AND table_parquet.\"ALT\" = table_variants.\"ALT\" - AND table_parquet.\"REF\" = table_variants.\"REF\" - """ - - # Create update query - sql_query_annotation_chrom_interval_pos = f""" - UPDATE {table_variants} as table_variants - SET INFO = - concat( - CASE WHEN table_variants.INFO NOT IN ('','.') - THEN table_variants.INFO - ELSE '' - END - , - CASE WHEN table_variants.INFO NOT IN ('','.') - AND ( - concat({sql_query_annotation_update_info_sets_sql}) - ) - NOT IN ('','.') - THEN ';' - ELSE '' - END - , - {sql_query_annotation_update_info_sets_sql} - ) - {sql_query_annotation_from_clause} - WHERE {sql_query_annotation_where_clause} - ; - """ + ) + WHERE table_variants_from.\"#CHROM\" in ('{chrom}') + GROUP BY table_variants_from.\"POS\" + ) + as table_parquet + """ - # Add update query to dict - query_dict[ - f"{chrom}:{sql_query_interval_start}-{sql_query_interval_stop}" - ] = sql_query_annotation_chrom_interval_pos + sql_query_annotation_where_clause = """ + table_parquet.\"#CHROM\" = table_variants.\"#CHROM\" + AND table_parquet.\"POS\" = table_variants.\"POS\" + """ - log.debug( - "Create SQL query: " - + str(sql_query_annotation_chrom_interval_pos) - ) + # Annotation with variants database + else: + sql_query_annotation_from_clause = f""" + FROM {parquet_file_link} as table_parquet + """ + sql_query_annotation_where_clause = f""" + table_variants."#CHROM" = '{chrom}' + AND table_parquet.\"#CHROM\" = table_variants.\"#CHROM\" + AND table_parquet.\"POS\" = table_variants.\"POS\" + AND table_parquet.\"ALT\" = table_variants.\"ALT\" + AND table_parquet.\"REF\" = table_variants.\"REF\" + """ - # Interval Start/Stop - sql_query_interval_start = sql_query_interval_stop + # Create update query + sql_query_annotation_chrom_interval_pos = f""" + UPDATE {table_variants} as table_variants + SET INFO = + concat( + CASE WHEN table_variants.INFO NOT IN ('','.') + THEN table_variants.INFO + ELSE '' + END + , + CASE WHEN table_variants.INFO NOT IN ('','.') + AND ( + concat({sql_query_annotation_update_info_sets_sql}) + ) + NOT IN ('','.') + THEN ';' + ELSE '' + END + , + {sql_query_annotation_update_info_sets_sql} + ) + {sql_query_annotation_from_clause} + WHERE {sql_query_annotation_where_clause} + ; + """ - # nb_of_variant_annotated - nb_of_variant_annotated += nb_of_variant_annotated_by_chrom + # Add update query to dict + query_dict[ + f"{chrom} [{nb_of_variant_by_chrom} variants]" + ] = sql_query_annotation_chrom_interval_pos nb_of_query = len(query_dict) num_query = 0 diff --git a/tests/test_objects_database.py b/tests/test_objects_database.py index a2cc0a2..4989d42 100644 --- a/tests/test_objects_database.py +++ b/tests/test_objects_database.py @@ -38,9 +38,7 @@ def test_export_order_by(): database = Database(database=input_database) database.export(output_database, order_by="") database = Database(database=output_database) - results = database.query( - database=output_database, query=f"""SELECT POS, QUAL, ALT FROM variants""" - ) + results = database.query(query=f"""SELECT POS, QUAL, ALT FROM variants""") df_first_pos = results.df()["POS"][0] df_first_qual = results.df()["QUAL"][0] df_first_alt = results.df()["ALT"][0] @@ -50,9 +48,7 @@ def test_export_order_by(): database = Database(database=input_database) database.export(output_database, order_by="QUAL DESC") database = Database(database=output_database) - results = database.query( - database=output_database, query=f"""SELECT POS, QUAL, ALT FROM variants""" - ) + results = database.query(query=f"""SELECT POS, QUAL, ALT FROM variants""") df_first_pos = results.df()["POS"][0] df_first_qual = results.df()["QUAL"][0] df_first_alt = results.df()["ALT"][0] @@ -62,9 +58,7 @@ def test_export_order_by(): database = Database(database=input_database) database.export(output_database, order_by="ALT DESC, POS ASC") database = Database(database=output_database) - results = database.query( - database=output_database, query=f"""SELECT POS, QUAL, ALT FROM variants""" - ) + results = database.query(query=f"""SELECT POS, QUAL, ALT FROM variants""") df_first_pos = results.df()["POS"][0] df_first_qual = results.df()["QUAL"][0] df_first_alt = results.df()["ALT"][0] @@ -74,9 +68,7 @@ def test_export_order_by(): database = Database(database=input_database) database.export(output_database, order_by="ALT DESC, POS") database = Database(database=output_database) - results = database.query( - database=output_database, query=f"""SELECT POS, QUAL, ALT FROM variants""" - ) + results = database.query(query=f"""SELECT POS, QUAL, ALT FROM variants""") df_first_pos = results.df()["POS"][0] df_first_qual = results.df()["QUAL"][0] df_first_alt = results.df()["ALT"][0] @@ -86,9 +78,7 @@ def test_export_order_by(): database = Database(database=input_database) database.export(output_database, order_by="ALT DESC, POS ASC, BEDCOLUMN") database = Database(database=output_database) - results = database.query( - database=output_database, query=f"""SELECT POS, QUAL, ALT FROM variants""" - ) + results = database.query(query=f"""SELECT POS, QUAL, ALT FROM variants""") df_first_pos = results.df()["POS"][0] df_first_qual = results.df()["QUAL"][0] df_first_alt = results.df()["ALT"][0] @@ -145,7 +135,6 @@ def test_database_as_conn(): try: assert database.export(output_database=output_database) assert database.query( - database=output_database, query=f"""{database.get_sql_database_link(database=output_database)}""", ) except: @@ -661,7 +650,7 @@ def test_find_database(): database.find_database( database=database_file_basename_without_extention_format, databases_folders=databases_folders, - format=database_format, + database_format=database_format, ) == database_file ) @@ -2098,11 +2087,9 @@ def test_export(): ) if database.get_sql_database_attach(database=output_database): database.query( - database=output_database, query=f"""{database.get_sql_database_attach(database=output_database)}""", ) assert database.query( - database=output_database, query=f"""{database.get_sql_database_link(database=output_database)}""", ) except: