From 9c49a62e545b761438a895611d6e290e2a049fb4 Mon Sep 17 00:00:00 2001 From: Silas Kieser Date: Fri, 24 Mar 2023 08:44:52 +0100 Subject: [PATCH] Use hdfs for gene catalogs (#621) * use hdf for genecatalog * add conda env for hdf * add psutils and python3.11 for hdf * perfectioning logs * impfove lgging genecatalog * add code for loading genecatalog --- atlas/workflow/Snakefile | 2 +- atlas/workflow/envs/hdf.yaml | 10 ++ atlas/workflow/rules/genecatalog.smk | 8 +- .../scripts/combine_gene_coverages.py | 130 +++++++++++------- docs/usage/output.rst | 50 +++++++ 5 files changed, 151 insertions(+), 49 deletions(-) create mode 100644 atlas/workflow/envs/hdf.yaml diff --git a/atlas/workflow/Snakefile b/atlas/workflow/Snakefile index af8cf678..68c2ef49 100644 --- a/atlas/workflow/Snakefile +++ b/atlas/workflow/Snakefile @@ -268,7 +268,7 @@ def get_gene_catalog_input(): annotations_requested = config.get("gene_annotations", []) try: - annotations_files = ["Genecatalog/counts/median_coverage.parquet"] + [ + annotations_files = ["Genecatalog/counts/median_coverage.h5"] + [ annotation_file_names[key] for key in annotations_requested ] diff --git a/atlas/workflow/envs/hdf.yaml b/atlas/workflow/envs/hdf.yaml new file mode 100644 index 00000000..09a039e8 --- /dev/null +++ b/atlas/workflow/envs/hdf.yaml @@ -0,0 +1,10 @@ +channels: +- conda-forge +- bioconda +- defaults +dependencies: +- python=3.10 +- pandas=1.5 +- h5py=3.8 +- psutil=5.9 +- biom-format >=2.1.14, <2.2 diff --git a/atlas/workflow/rules/genecatalog.smk b/atlas/workflow/rules/genecatalog.smk index 6dc5e9d1..2480bb6f 100644 --- a/atlas/workflow/rules/genecatalog.smk +++ b/atlas/workflow/rules/genecatalog.smk @@ -300,13 +300,17 @@ rule pileup_Genecatalog: rule combine_gene_coverages: input: covstats=expand("Genecatalog/alignments/{sample}_coverage.tsv", sample=SAMPLES), + info = "Genecatalog/counts/sequence_infos.tsv" output: - "Genecatalog/counts/median_coverage.parquet", - "Genecatalog/counts/Nmapped_reads.parquet", + cov="Genecatalog/counts/median_coverage.h5", + counts="Genecatalog/counts/Nmapped_reads.h5", + summary="Genecatalog/counts/gene_coverage_stats.tsv" log: "logs/Genecatalog/counts/combine_gene_coverages.log", params: samples=SAMPLES, + conda: + "../envs/hdf.yaml" threads: 1 resources: mem=config["large_mem"], diff --git a/atlas/workflow/scripts/combine_gene_coverages.py b/atlas/workflow/scripts/combine_gene_coverages.py index 70ad8926..f18510ac 100644 --- a/atlas/workflow/scripts/combine_gene_coverages.py +++ b/atlas/workflow/scripts/combine_gene_coverages.py @@ -29,14 +29,14 @@ def handle_exception(exc_type, exc_value, exc_traceback): sys.excepthook = handle_exception #### Begining of script - +import numpy as np import pandas as pd import gc, os from utils.parsers_bbmap import read_pileup_coverage -import psutil - +import h5py +import psutil def measure_memory(write_log_entry=True): mem_uage = psutil.Process().memory_info().rss / (1024 * 1024) @@ -49,71 +49,109 @@ def measure_memory(write_log_entry=True): logging.info("Start") measure_memory() -# read gene info +N_samples = len(snakemake.input.covstats) -# gene_info= pd.read_table(input.info, index_col=0) -# gene_info.sort_index(inplace=True) -# gene_list= gene_info.index +logging.info("Read gene info") -# N_genes= gene_info.shape[0] +gene_info= pd.read_table(snakemake.input.info, index_col=0) +gene_info.sort_index(inplace=True) +N_genes= gene_info.shape[0] +#gene_list= gene_info.index -N_samples = len(snakemake.input.covstats) +del gene_info +gc.collect() -# prepare snakemake.output tables -combined_cov = {} -combined_N_reads = {} +measure_memory() -for i, sample in enumerate(snakemake.params.samples): - cov_file = snakemake.input.covstats[i] +logging.info("Open hdf files for writing") - data = read_pileup_coverage(cov_file, coverage_measure="Median_fold") +gene_matrix_shape = (N_samples,N_genes) - # transform index to int this should drastrically redruce memory - data.index = data.index.str[len("Gene") :].astype(int) +with h5py.File(snakemake.output.cov, 'w') as hdf_cov_file, h5py.File(snakemake.output.counts, 'w') as hdf_counts_file: - # genes are not sorted - # data.sort_index(inplace=True) - combined_cov[sample] = pd.to_numeric(data.Median_fold, downcast="float") - combined_N_reads[sample] = pd.to_numeric(data.Reads, downcast="integer") + combined_cov = hdf_cov_file.create_dataset('data', shape= gene_matrix_shape ,fillvalue=0, compression="gzip") + combined_counts = hdf_counts_file.create_dataset('data', shape= gene_matrix_shape ,fillvalue=0, compression="gzip") - # delete interminate data and release mem - del data - gc.collect() + # add Smaple names attribute + sample_names = np.array(list(snakemake.params.samples)).astype("S") + combined_cov.attrs['sample_names'] = sample_names + combined_counts.attrs['sample_names'] = sample_names + - logging.info(f"Read coverage file for sample {i+1}: {sample}") - current_mem_uage = measure_memory() - estimated_max_mem = current_mem_uage / (i + 1) * (N_samples + 1) / 1024 + Summary= {} - logging.info(f"Estimated max mem is {estimated_max_mem:5.0f} GB") + logging.info("Start reading files") + initial_mem_uage = measure_memory() -# merge N reads -logging.info("Concatenate raw reads") -combined_N_reads = pd.concat(combined_N_reads, axis=1, sort=True, copy=False).fillna(0) -# give index nice name -combined_N_reads.index.name = "GeneNr" -measure_memory() + for i,sample in enumerate(snakemake.params.samples): + sample_cov_file = snakemake.input.covstats[i] -# Store as parquet -# add index so that it can be read in R + data = read_pileup_coverage(sample_cov_file, coverage_measure="Median_fold") -logging.info("Write first table") -combined_N_reads.reset_index().to_parquet(snakemake.output[1]) -del combined_N_reads -gc.collect() -measure_memory() + + assert data.shape[0] == N_genes, f"I only have {data.shape[0]} /{N_genes} in the file {sample_cov_file}" + + # genes are not sorted :-() + data.sort_index(inplace=True) + + + # downcast data + Median_fold = pd.to_numeric(data.Median_fold, downcast="float") + Reads= pd.to_numeric(data.Reads, downcast="integer") + -logging.info("Concatenate coverage data") -combined_cov = pd.concat(combined_cov, axis=1, sort=True, copy=False).fillna(0) -combined_cov.index.name = "GeneNr" + # delete interminate data and release mem + del data + gc.collect() -combined_cov.reset_index().to_parquet(snakemake.output[0]) -del combined_cov + # get summary statistics + logging.debug("Extract Summary statistics") + non_zero_coverage= Median_fold.loc[Median_fold>0] + + Summary[sample] = {"Sum_coverage" : Median_fold.sum(), + "Total_counts" : Reads.sum(), + "Non_zero_counts" : (Reads>0).sum(), + "Non_zero_coverage" : non_zero_coverage.shape[0], + "Max_coverage" : Median_fold.max(), + "Average_nonzero_coverage" : non_zero_coverage.mean(), + "Q1_nonzero_coverage" : non_zero_coverage.quantile(0.25), + "Median_nonzero_coverage" : non_zero_coverage.quantile(0.5), + "Q3_nonzero_coverage" : non_zero_coverage.quantile(0.75), + } + del non_zero_coverage + + combined_cov[i,:] = Median_fold.values + combined_counts[i,:] = Reads.values + + + del Median_fold,Reads + gc.collect() + + + logging.info(f"Read coverage file for sample {i+1} / {N_samples}") + + current_mem_uage = measure_memory() + if i>5: + + estimated_max_mem = (current_mem_uage- initial_mem_uage) /(i + 1) * (N_samples + 1) +initial_mem_uage + + logging.info(f"Estimated max mem is {estimated_max_mem:7.0f} MB ") + + + + + +logging.info("All samples processed") +gc.collect() + +logging.info("Save Summary") +pd.DataFrame(Summary).to_csv(snakemake.output.summary,sep='\t') \ No newline at end of file diff --git a/docs/usage/output.rst b/docs/usage/output.rst index 1cb8048d..3cf62b21 100644 --- a/docs/usage/output.rst +++ b/docs/usage/output.rst @@ -183,6 +183,56 @@ This rule produces the following output file for the whole dataset. +Since version 2.15 the output of the counts are stored in a hdf file. + +You can open the hdf file in R or python as following: + + +..code: python + + import h5py + + filename = "path/to/atlas_dir/Genecatalog/counts/median_coverage_genomes.h5" + + with h5py.File(filename, 'r') as hdf_file: + + data_matrix = hdf_file['data'][:] + sample_names = hdf_file['data'].attrs['sample_names'].astype(str) + + +..code: R + + library(rhdf5) + + + filename = "path/to/atlas_dir/Genecatalog/counts/median_coverage_genomes.h5" + + data <- h5read(filename, "data") + + attributes= h5readAttributes(filename, "data") + + colnames(data) <- attributes$sample_names + + +You don't need to load the full data. You could only select genes with annotations. +You can use the file ``Genecatalog/counts/gene_coverage_stats.tsv`` To normalize the counts. + + +.. seealso:: See in Atlas Tutorial + + +Before version 2.15 the output of the counts were stored in a parquet file. The parquet file can be opended easily with ``pandas.read_parquet`` or ``arrow::read_parquet```. + + + + + + + + + + + All