Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐍🐎 Large data mode #207

Merged
merged 14 commits into from
Jan 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
857 changes: 857 additions & 0 deletions autometa/binning/large_data_mode.py

Large diffs are not rendered by default.

544 changes: 544 additions & 0 deletions autometa/binning/large_data_mode_loginfo.py

Large diffs are not rendered by default.

493 changes: 193 additions & 300 deletions autometa/binning/recursive_dbscan.py

Large diffs are not rendered by default.

176 changes: 119 additions & 57 deletions autometa/binning/summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import logging
import os
from typing import Union

import pandas as pd
import numpy as np
Expand All @@ -17,7 +18,7 @@

from autometa.taxonomy.ncbi import NCBI
from autometa.taxonomy import majority_vote
from autometa.common import markers
from autometa.common.markers import load as load_markers


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -51,7 +52,8 @@ def write_cluster_records(
):
contigs = set(dff.index)
records = [rec for rec in mgrecords if rec.id in contigs]
outfpath = os.path.join(outdir, f"{cluster}.fna")
file_extension = "fasta" if cluster == "unclustered" else "fna"
outfpath = os.path.join(outdir, f"{cluster}.{file_extension}")
SeqIO.write(records, outfpath, "fasta")
return

Expand Down Expand Up @@ -87,79 +89,139 @@ def fragmentation_metric(df: pd.DataFrame, quality_measure: float = 0.50) -> int
return length


def get_agg_stats(
cluster_groups: pd.core.groupby.generic.DataFrameGroupBy, stat_col: str
) -> pd.DataFrame:
"""Compute min, max, (length weighted) mean and median from provided `stat_col`

Parameters
----------
cluster_groups : pd.core.groupby.generic.DataFrameGroupBy
pandas DataFrame grouped by cluster
stat_col : str
column to on which to compute min, max, (length-weighted) mean and median

Returns
-------
pd.DataFrame
index=cluster, columns=[min_{stat_col}, max_{stat_col}, std_{stat_col}, length_weighted_{stat_col}]
"""

stats = (
cluster_groups[stat_col]
.agg(["min", "max", "std", "median"])
.rename(
columns={
"min": f"min_{stat_col}",
"max": f"max_{stat_col}",
"std": f"std_{stat_col}",
"median": f"median_{stat_col}",
}
)
)

def weighted_average(df, values, weights):
return np.average(a=df[values], weights=df[weights])

length_weighted_avg = cluster_groups.apply(
weighted_average, values=stat_col, weights="length"
)
length_weighted_avg.name = f"length_weighted_mean_{stat_col}"
return pd.concat([stats, length_weighted_avg], axis=1)


def get_metabin_stats(
bin_df: pd.DataFrame, markers_fpath: str, cluster_col: str = "cluster"
bin_df: pd.DataFrame,
markers: Union[str, pd.DataFrame],
cluster_col: str = "cluster",
) -> pd.DataFrame:
"""Retrieve statistics for all clusters recovered from Autometa binning.

Parameters
----------
bin_df : pd.DataFrame
Autometa binning table. index=contig, cols=['cluster','length', 'GC', 'coverage', ...]
markers_fpath : str
Path to autometa annotated and filtered markers table respective to kingdom binned.
Autometa binning table. index=contig, cols=['cluster','length', 'gc_content', 'coverage', ...]
markers : str,pd.DataFrame
Path to or pd.DataFrame of markers table corresponding to contigs in `bin_df`
cluster_col : str, optional
Clustering column by which to group metabins

Returns
-------
pd.DataFrame
dataframe consisting of various metagenome-assembled genome statistics indexed by cluster.

Raises
------
TypeError
markers should be a path to or pd.DataFrame of a markers table corresponding to contigs in `bin_df`

ValueError
One of the required columns (`cluster_col`, coverage, length, gc_content) was not found in `bin_df`
"""
logger.info(f"Retrieving metabins' stats for {cluster_col}")
stats = []
markers_df = markers.load(markers_fpath)
for cluster, dff in bin_df.fillna(value={cluster_col: "unclustered"}).groupby(
cluster_col
):
length_weighted_coverage = np.average(
a=dff.coverage, weights=dff.length / dff.length.sum()
)
length_weighted_gc = np.average(
a=dff.gc_content, weights=dff.length / dff.length.sum()
if isinstance(markers, str):
markers_df = load_markers(markers)
elif isinstance(markers, pd.DataFrame):
markers_df = markers
else:
raise TypeError(
f"`markers` should be a path to or pd.DataFrame of a markers table corresponding to contigs in `bin_df`. Provided: {markers}"
)
num_expected_markers = markers_df.shape[1]
cluster_pfams = markers_df[markers_df.index.isin(dff.index)]
if cluster_pfams.empty:
total_markers = 0
num_single_copy_markers = 0
num_markers_present = 0
completeness = pd.NA
purity = pd.NA
else:
pfam_counts = cluster_pfams.sum()
total_markers = pfam_counts.sum()
num_single_copy_markers = pfam_counts[pfam_counts == 1].count()
num_markers_present = pfam_counts[pfam_counts >= 1].count()
completeness = num_markers_present / num_expected_markers * 100
purity = num_single_copy_markers / num_markers_present * 100

stats.append(
{
cluster_col: cluster,
"nseqs": dff.shape[0],
"seqs_pct": dff.shape[0] / bin_df.shape[0] * 100,
"size (bp)": dff.length.sum(),
"size_pct": dff.length.sum() / bin_df.length.sum() * 100,
"N90": fragmentation_metric(dff, quality_measure=0.9),
"N50": fragmentation_metric(dff, quality_measure=0.5),
"N10": fragmentation_metric(dff, quality_measure=0.1),
"length_weighted_gc_content": length_weighted_gc,
"min_gc_content": dff.gc_content.min(),
"max_gc_content": dff.gc_content.max(),
"std_gc_content": dff.gc_content.std(),
"length_weighted_coverage": length_weighted_coverage,
"min_coverage": dff.coverage.min(),
"max_coverage": dff.coverage.max(),
"std_coverage": dff.coverage.std(),
"completeness": completeness,
"purity": purity,
"num_total_markers": total_markers,
f"num_unique_markers (expected {num_expected_markers})": num_markers_present,
"num_single_copy_markers": num_single_copy_markers,
}

metabin_stat_cols = [cluster_col, "coverage", "length", "gc_content"]
for col in metabin_stat_cols:
if col not in bin_df.columns:
raise ValueError(
f"Required column ({col}) not in bin_df columns: {bin_df.columns}"
)
# If the indices do not match, marker calculations will fail
if bin_df.index.name != "contig":
raise ValueError(
f"binning dataframe must be indexed by contig. given: {bin_df.index.name}."
"\n\tTry:"
"\n\t\tbin_df.set_index('contig', inplace=True)"
)
return pd.DataFrame(stats).set_index(cluster_col).convert_dtypes()

df = bin_df[metabin_stat_cols].fillna(value={cluster_col: "unclustered"}).copy()

clusters = df.join(markers_df, how="outer").groupby("cluster")

percent_metagenome_size = clusters.length.sum() / df.length.sum() * 100
percent_metagenome_seqs = clusters.size() / df.shape[0] * 100
marker_counts = clusters[markers_df.columns].sum()
cluster_marker_sum = marker_counts.sum(axis=1)
redundant_marker_count = marker_counts.gt(1).sum(axis=1)
single_copy_marker_count = marker_counts.eq(1).sum(axis=1)
unique_marker_count = marker_counts.ge(1).sum(axis=1)
expected_unique_marker_count = markers_df.shape[1]
completeness = unique_marker_count / expected_unique_marker_count * 100
purity = single_copy_marker_count / unique_marker_count * 100
stats_df = pd.DataFrame(
{
"nseqs": clusters.size(),
"size (bp)": clusters.length.sum(),
"completeness": completeness,
"purity": purity,
"marker_sum": cluster_marker_sum,
"unique_marker_count": unique_marker_count,
"single_copy_marker_count": single_copy_marker_count,
"redundant_marker_count": redundant_marker_count,
"expected_unique_marker_count": expected_unique_marker_count,
"percent_of_metagenome_seqs": percent_metagenome_seqs,
"percent_of_metagenome_size": percent_metagenome_size,
"N90": clusters.apply(fragmentation_metric, quality_measure=0.9),
"N50": clusters.apply(fragmentation_metric, quality_measure=0.5),
"N10": clusters.apply(fragmentation_metric, quality_measure=0.1),
}
)
coverage_stats = get_agg_stats(clusters, "coverage")
gc_content_stats = get_agg_stats(clusters, "gc_content")
return (
pd.concat([stats_df, coverage_stats, gc_content_stats], axis=1)
.round(2)
.convert_dtypes()
)


def get_metabin_taxonomies(
Expand Down
Loading