Skip to content

Commit

Permalink
improve and clean annotation parquet #227 clean SonarLint issues #226
Browse files Browse the repository at this point in the history
  • Loading branch information
antonylebechec committed Apr 21, 2024
1 parent d3b4ab5 commit ebeffd1
Show file tree
Hide file tree
Showing 4 changed files with 206 additions and 375 deletions.
126 changes: 27 additions & 99 deletions howard/functions/from_annovar.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,21 @@

import csv
import gc
import io
import multiprocessing
import sys
import os
import re
import subprocess
from tempfile import NamedTemporaryFile
import tempfile
import duckdb
import json
import argparse

import Bio.bgzf as bgzf
import pandas as pd
import vcf
import logging as log
import sys
from functools import partial
import itertools

import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.csv as pcsv


from howard.objects.variants import Variants
from howard.objects.database import Database
from howard.functions.commons import *
from howard.functions.commons import full_path, remove_if_exists


# Dictionnaire des types
Expand Down Expand Up @@ -118,7 +107,7 @@ def from_annovar(args: argparse) -> None:

# Output
output_dirname = os.path.dirname(output_file)
output_file_name, output_file_ext = os.path.splitext(os.path.basename(output_file))
_, output_file_ext = os.path.splitext(os.path.basename(output_file))
if output_file_ext not in [".gz"]:
log.error(f"Output file '{output_file}' without compress '.gz' extension")
raise ValueError(
Expand All @@ -134,7 +123,7 @@ def from_annovar(args: argparse) -> None:
# To Parquet
if output_file_parquet:
output_parquet_dirname = os.path.dirname(output_file_parquet)
output_file_parquet_name, output_file_parquet_ext = os.path.splitext(
_, output_file_parquet_ext = os.path.splitext(
os.path.basename(output_file_parquet)
)
if output_file_parquet_ext not in [".parquet"]:
Expand Down Expand Up @@ -181,7 +170,7 @@ def from_annovar(args: argparse) -> None:
# first VCF

# Annovar to VCF
log.info(f"Annovar to VCF and Parquet...")
log.info("Annovar to VCF and Parquet...")
annovar_to_vcf(
input_file=input_file,
output_file=output_file,
Expand All @@ -199,7 +188,7 @@ def from_annovar(args: argparse) -> None:
)

# Header VCF hdr
log.info(f"VCF Extract header hdr for VCF...")
log.info("VCF Extract header hdr for VCF...")
command = f"""{bcftools} view -h {output_file} 1>{output_file}.hdr """
log.debug("bcftools command: " + command)
subprocess.run(command, shell=True)
Expand All @@ -208,7 +197,7 @@ def from_annovar(args: argparse) -> None:
if output_file_parquet:
# File already generated
# Header VCF hdr
log.info(f"VCF Extract header hdr for Parquet...")
log.info("VCF Extract header hdr for Parquet...")
command = f"""{bcftools} view -h {output_file} 1>{output_file_parquet}.hdr """
log.debug("bcftools command: " + command)
subprocess.run(command, shell=True)
Expand Down Expand Up @@ -323,14 +312,15 @@ def annovar_to_vcf(
"#Chr",
"#chr",
"Chr",
"chr" "chromosome",
"chr",
"chromosome",
]
for chr in chrom_list_for_header:
if chr in header:
header[header.index(chr)] = "#CHROM"
log.debug(f"found '{chr}' as chromosome column")
if "#CHROM" not in header:
log.debug(f"default chromosome column is 0")
log.debug("default chromosome column is 0")
header[0] = "#CHROM"
if "POS" not in header:
pos_list_for_header = ["Pos", "pos", "Start", "START"]
Expand All @@ -339,7 +329,7 @@ def annovar_to_vcf(
header[header.index(pos)] = "POS"
log.debug(f"found '{pos}' as position column")
if "POS" not in header:
log.debug(f"default position column is 1")
log.debug("default position column is 1")
header[1] = "POS"
if "REF" not in header:
ref_list_for_header = ["Ref", "ref"]
Expand All @@ -348,7 +338,7 @@ def annovar_to_vcf(
header[header.index(ref)] = "REF"
log.debug(f"found '{ref}' as reference column")
if "REF" not in header:
log.debug(f"default reference column is 3")
log.debug("default reference column is 3")
header[3] = "REF"
if "ALT" not in header:
alt_list_for_header = ["Alt", "alt"]
Expand All @@ -357,7 +347,7 @@ def annovar_to_vcf(
header[header.index(alt)] = "ALT"
log.debug(f"found '{alt}' as alternative column")
if "ALT" not in header:
log.debug(f"default alternative column is 4")
log.debug("default alternative column is 4")
header[4] = "ALT"
break
else:
Expand Down Expand Up @@ -706,7 +696,7 @@ def annovar_to_vcf(
if reduce_memory and multi_variant:

# Log
log.debug(f"Create View From TSV to Parquet")
log.debug("Create View From TSV to Parquet")

# Create Parquet file from TSV
tsv_to_parquet(
Expand Down Expand Up @@ -998,81 +988,19 @@ def parquet_info_explode(
config_ro["memory_limit"] = memory
config_threads["memory_limit"] = memory

# Reduce memory
if reduce_memory:

# List of exploded parquet files
list_of_exploded_files = []

# Create
parquet_input = Variants(input=input_file, config=config_ro)
parquet_chromosomes = parquet_input.get_query_to_df(
query=f""" SELECT "#CHROM" FROM read_csv('{input_file}',AUTO_DETECT=TRUE) WHERE "#CHROM" NOT NULL GROUP BY "#CHROM" """
)

for chrom in parquet_chromosomes["#CHROM"]:

# Split Chrom
query_chrom = f""" SELECT * FROM read_csv('{input_file}',AUTO_DETECT=TRUE) WHERE "#CHROM"='{chrom}' """
query_chrom_output = f"{output_file}.{chrom}.parquet"
query_chrom_explode_output = f"{output_file}.{chrom}.explode.parquet"

# Log
log.info(f"Explode infos chromosome {chrom}")

# Extract
log.debug(f"Extract chromosome {chrom}: {query_chrom_explode_output}")
remove_if_exists([query_chrom_output, query_chrom_output + ".hdr"])
parquet_input.export_output(
output_file=query_chrom_output, query=query_chrom, export_header=True
)

# Explode
log.debug(
f"Explode infos for chromosome {chrom}: {query_chrom_explode_output}"
)
list_of_exploded_files.append(query_chrom_explode_output)
parquet_explode = Variants(
input=query_chrom_output,
output=query_chrom_explode_output,
config=config_threads,
param=param_explode,
)
parquet_explode.load_data()
remove_if_exists(
[query_chrom_explode_output, query_chrom_explode_output + ".hdr"]
)
parquet_explode.export_output()
remove_if_exists([query_chrom_output, query_chrom_output + ".hdr"])

# list_of_exploded_files
log.info(f"Merge explode infos")
log.debug(f"Merge explode infos files: {list_of_exploded_files}")
query_explode = f""" SELECT * FROM read_parquet({list_of_exploded_files}) """
query_explode_output = output_file
remove_if_exists([query_explode_output, query_explode_output + ".hdr"])
parquet_explode.export_output(
output_file=query_explode_output, query=query_explode, export_header=True
)

# Clear
for file in list_of_exploded_files:
remove_if_exists([file, file + ".hdr"])

else:

# Create
parquet_explode = Variants(
input=input_file,
output=output_file,
config=config_threads,
param=param_explode,
)
parquet_explode.load_data()
parquet_explode.export_output()

# Check
# df = parquet_input.get_query_to_df(f"SELECT * FROM '{query_explode_output}' LIMIT 10 ")
# log.debug(df)
param_explode["chunk_size"] = 100000

# Create
parquet_explode = Variants(
input=input_file,
output=output_file,
config=config_threads,
param=param_explode,
)
parquet_explode.load_data()
parquet_explode.export_output()


def tsv_to_parquet(
Expand Down
Loading

0 comments on commit ebeffd1

Please sign in to comment.