Skip to content

Commit

Permalink
Use hdfs for gene catalogs (#621)
Browse files Browse the repository at this point in the history
* use hdf for genecatalog

* add conda env for hdf

* add psutils and python3.11 for hdf

* perfectioning logs

* impfove lgging genecatalog

* add code for loading genecatalog
  • Loading branch information
SilasK authored Mar 24, 2023
1 parent 7b88913 commit 9c49a62
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 49 deletions.
2 changes: 1 addition & 1 deletion atlas/workflow/Snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ def get_gene_catalog_input():
annotations_requested = config.get("gene_annotations", [])

try:
annotations_files = ["Genecatalog/counts/median_coverage.parquet"] + [
annotations_files = ["Genecatalog/counts/median_coverage.h5"] + [
annotation_file_names[key] for key in annotations_requested
]

Expand Down
10 changes: 10 additions & 0 deletions atlas/workflow/envs/hdf.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
channels:
- conda-forge
- bioconda
- defaults
dependencies:
- python=3.10
- pandas=1.5
- h5py=3.8
- psutil=5.9
- biom-format >=2.1.14, <2.2
8 changes: 6 additions & 2 deletions atlas/workflow/rules/genecatalog.smk
Original file line number Diff line number Diff line change
Expand Up @@ -300,13 +300,17 @@ rule pileup_Genecatalog:
rule combine_gene_coverages:
input:
covstats=expand("Genecatalog/alignments/{sample}_coverage.tsv", sample=SAMPLES),
info = "Genecatalog/counts/sequence_infos.tsv"
output:
"Genecatalog/counts/median_coverage.parquet",
"Genecatalog/counts/Nmapped_reads.parquet",
cov="Genecatalog/counts/median_coverage.h5",
counts="Genecatalog/counts/Nmapped_reads.h5",
summary="Genecatalog/counts/gene_coverage_stats.tsv"
log:
"logs/Genecatalog/counts/combine_gene_coverages.log",
params:
samples=SAMPLES,
conda:
"../envs/hdf.yaml"
threads: 1
resources:
mem=config["large_mem"],
Expand Down
130 changes: 84 additions & 46 deletions atlas/workflow/scripts/combine_gene_coverages.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ def handle_exception(exc_type, exc_value, exc_traceback):
sys.excepthook = handle_exception

#### Begining of script

import numpy as np
import pandas as pd
import gc, os
from utils.parsers_bbmap import read_pileup_coverage

import psutil

import h5py

import psutil
def measure_memory(write_log_entry=True):
mem_uage = psutil.Process().memory_info().rss / (1024 * 1024)

Expand All @@ -49,71 +49,109 @@ def measure_memory(write_log_entry=True):
logging.info("Start")
measure_memory()

# read gene info
N_samples = len(snakemake.input.covstats)

# gene_info= pd.read_table(input.info, index_col=0)
# gene_info.sort_index(inplace=True)
# gene_list= gene_info.index
logging.info("Read gene info")

# N_genes= gene_info.shape[0]
gene_info= pd.read_table(snakemake.input.info, index_col=0)
gene_info.sort_index(inplace=True)
N_genes= gene_info.shape[0]
#gene_list= gene_info.index

N_samples = len(snakemake.input.covstats)
del gene_info
gc.collect()

# prepare snakemake.output tables
combined_cov = {}
combined_N_reads = {}
measure_memory()


for i, sample in enumerate(snakemake.params.samples):

cov_file = snakemake.input.covstats[i]
logging.info("Open hdf files for writing")

data = read_pileup_coverage(cov_file, coverage_measure="Median_fold")
gene_matrix_shape = (N_samples,N_genes)

# transform index to int this should drastrically redruce memory
data.index = data.index.str[len("Gene") :].astype(int)
with h5py.File(snakemake.output.cov, 'w') as hdf_cov_file, h5py.File(snakemake.output.counts, 'w') as hdf_counts_file:

# genes are not sorted
# data.sort_index(inplace=True)

combined_cov[sample] = pd.to_numeric(data.Median_fold, downcast="float")
combined_N_reads[sample] = pd.to_numeric(data.Reads, downcast="integer")
combined_cov = hdf_cov_file.create_dataset('data', shape= gene_matrix_shape ,fillvalue=0, compression="gzip")
combined_counts = hdf_counts_file.create_dataset('data', shape= gene_matrix_shape ,fillvalue=0, compression="gzip")

# delete interminate data and release mem
del data
gc.collect()
# add Smaple names attribute
sample_names = np.array(list(snakemake.params.samples)).astype("S")
combined_cov.attrs['sample_names'] = sample_names
combined_counts.attrs['sample_names'] = sample_names


logging.info(f"Read coverage file for sample {i+1}: {sample}")
current_mem_uage = measure_memory()
estimated_max_mem = current_mem_uage / (i + 1) * (N_samples + 1) / 1024
Summary= {}

logging.info(f"Estimated max mem is {estimated_max_mem:5.0f} GB")

logging.info("Start reading files")
initial_mem_uage = measure_memory()

# merge N reads
logging.info("Concatenate raw reads")
combined_N_reads = pd.concat(combined_N_reads, axis=1, sort=True, copy=False).fillna(0)
# give index nice name
combined_N_reads.index.name = "GeneNr"

measure_memory()
for i,sample in enumerate(snakemake.params.samples):

sample_cov_file = snakemake.input.covstats[i]

# Store as parquet
# add index so that it can be read in R
data = read_pileup_coverage(sample_cov_file, coverage_measure="Median_fold")

logging.info("Write first table")

combined_N_reads.reset_index().to_parquet(snakemake.output[1])
del combined_N_reads
gc.collect()
measure_memory()

assert data.shape[0] == N_genes, f"I only have {data.shape[0]} /{N_genes} in the file {sample_cov_file}"

# genes are not sorted :-()
data.sort_index(inplace=True)



# downcast data
Median_fold = pd.to_numeric(data.Median_fold, downcast="float")
Reads= pd.to_numeric(data.Reads, downcast="integer")


logging.info("Concatenate coverage data")
combined_cov = pd.concat(combined_cov, axis=1, sort=True, copy=False).fillna(0)
combined_cov.index.name = "GeneNr"
# delete interminate data and release mem
del data
gc.collect()

combined_cov.reset_index().to_parquet(snakemake.output[0])

del combined_cov
# get summary statistics
logging.debug("Extract Summary statistics")
non_zero_coverage= Median_fold.loc[Median_fold>0]

Summary[sample] = {"Sum_coverage" : Median_fold.sum(),
"Total_counts" : Reads.sum(),
"Non_zero_counts" : (Reads>0).sum(),
"Non_zero_coverage" : non_zero_coverage.shape[0],
"Max_coverage" : Median_fold.max(),
"Average_nonzero_coverage" : non_zero_coverage.mean(),
"Q1_nonzero_coverage" : non_zero_coverage.quantile(0.25),
"Median_nonzero_coverage" : non_zero_coverage.quantile(0.5),
"Q3_nonzero_coverage" : non_zero_coverage.quantile(0.75),
}
del non_zero_coverage

combined_cov[i,:] = Median_fold.values
combined_counts[i,:] = Reads.values


del Median_fold,Reads
gc.collect()


logging.info(f"Read coverage file for sample {i+1} / {N_samples}")

current_mem_uage = measure_memory()
if i>5:

estimated_max_mem = (current_mem_uage- initial_mem_uage) /(i + 1) * (N_samples + 1) +initial_mem_uage

logging.info(f"Estimated max mem is {estimated_max_mem:7.0f} MB ")





logging.info("All samples processed")
gc.collect()

logging.info("Save Summary")
pd.DataFrame(Summary).to_csv(snakemake.output.summary,sep='\t')
50 changes: 50 additions & 0 deletions docs/usage/output.rst
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,56 @@ This rule produces the following output file for the whole dataset.



Since version 2.15 the output of the counts are stored in a hdf file.

You can open the hdf file in R or python as following:


..code: python

import h5py

filename = "path/to/atlas_dir/Genecatalog/counts/median_coverage_genomes.h5"

with h5py.File(filename, 'r') as hdf_file:

data_matrix = hdf_file['data'][:]
sample_names = hdf_file['data'].attrs['sample_names'].astype(str)


..code: R

library(rhdf5)


filename = "path/to/atlas_dir/Genecatalog/counts/median_coverage_genomes.h5"

data <- h5read(filename, "data")

attributes= h5readAttributes(filename, "data")

colnames(data) <- attributes$sample_names


You don't need to load the full data. You could only select genes with annotations.
You can use the file ``Genecatalog/counts/gene_coverage_stats.tsv`` To normalize the counts.


.. seealso:: See in Atlas Tutorial


Before version 2.15 the output of the counts were stored in a parquet file. The parquet file can be opended easily with ``pandas.read_parquet`` or ``arrow::read_parquet```.













All
Expand Down

0 comments on commit 9c49a62

Please sign in to comment.