From 10571c414dfd87d57f056a6dd9cfc9899c011d5d Mon Sep 17 00:00:00 2001 From: Evan Rees <25933122+WiscEvan@users.noreply.github.com> Date: Fri, 19 Jun 2020 16:26:18 -0500 Subject: [PATCH] Recursive DBSCAN (#84) * :art::memo::racehorse: Update docstrings * Update median completeness calculation to not incorporate all contigs but rather just cluster values. * :art: Change RecursiveDBSCANError exception to BinningError * :bug: Update metabin import in user.py from mag.py * :art::fire: Remove default 'z' column in run_dbscan function. * :art: add_metrics func now returns 2-tuple with cluster_metrics dataframe for median completeness lookup. * :art: Change default domain marker count lookup in add_metrics to raise error if domain does not match. * :art::racehorse: Add naive HDBSCAN implementation as clustering method. :memo: Add comments for break conditions within get_clusters function. * :art::memo::racehorse: hdbscan implementation now scans across min_samples and min_cluster_size. --- autometa/binning/recursive_dbscan.py | 417 +++++++++++++++++++-------- autometa/common/exceptions.py | 4 +- autometa/config/user.py | 2 +- 3 files changed, 305 insertions(+), 118 deletions(-) diff --git a/autometa/binning/recursive_dbscan.py b/autometa/binning/recursive_dbscan.py index 8cccd3fdf..8b28fb908 100644 --- a/autometa/binning/recursive_dbscan.py +++ b/autometa/binning/recursive_dbscan.py @@ -26,8 +26,11 @@ import logging import os +import shutil +import tempfile import pandas as pd +import numpy as np from Bio import SeqIO from sklearn.cluster import DBSCAN @@ -38,7 +41,7 @@ # TODO: This should be from autometa.common.kmers import Kmers # So later we can simply/and more clearly do Kmers.load(kmers_fpath).embed(method) -from autometa.common.exceptions import RecursiveDBSCANError +from autometa.common.exceptions import BinningError from autometa.taxonomy.ncbi import NCBI pd.set_option("mode.chained_assignment", None) @@ -47,14 +50,114 @@ logger = logging.getLogger(__name__) +def add_metrics(df, markers_df, domain="bacteria"): + """Adds the completeness and purity metrics to each respective contig in df. + + Parameters + ---------- + df : pd.DataFrame + index='contig' cols=['x','y','coverage','cluster'] + markers_df : pd.DataFrame + wide format, i.e. index=contig cols=[marker,marker,...] + domain : str, optional + Kingdom to determine metrics (the default is 'bacteria'). + choices=['bacteria','archaea'] + + Returns + ------- + 2-tuple + `df` with added cols=['completeness', 'purity'] + pd.DataFrame(index=clusters, cols=['completeness', 'purity']) + + Raises + ------- + KeyError + `domain` is not "bacteria" or "archaea" + + """ + domain = domain.lower() + marker_sets = {"bacteria": 139.0, "archaea": 162.0} + if domain not in marker_sets: + raise KeyError(f"{domain} is not bacteria or archaea!") + expected_number = marker_sets[domain] + clusters = dict(list(df.groupby("cluster"))) + metrics = {"purity": {}, "completeness": {}} + for cluster, dff in clusters.items(): + contigs = dff.index.tolist() + summed_markers = markers_df[markers_df.index.isin(contigs)].sum() + is_present = summed_markers >= 1 + is_single_copy = summed_markers == 1 + nunique_markers = summed_markers[is_present].index.nunique() + num_single_copy_markers = summed_markers[is_single_copy].index.nunique() + completeness = nunique_markers / expected_number * 100 + # Protect from divide by zero + if nunique_markers == 0: + purity = pd.NA + else: + purity = num_single_copy_markers / nunique_markers * 100 + metrics["completeness"].update({cluster: completeness}) + metrics["purity"].update({cluster: purity}) + metrics_df = pd.DataFrame(metrics, index=clusters.keys()) + merged_df = pd.merge(df, metrics_df, left_on="cluster", right_index=True) + return merged_df, metrics_df + + +def run_dbscan(df, eps, dropcols=["cluster", "purity", "completeness"]): + """Run clustering on `df` at provided `eps`. + + Notes + ----- + + * documentation for sklearn `DBSCAN `_ + * documentation for `HDBSCAN `_ + + Parameters + ---------- + df : pd.DataFrame + Contigs with embedded k-mer frequencies as ['x','y'] columns and optionally 'coverage' column + eps : float + The maximum distance between two samples for one to be considered + as in the neighborhood of the other. This is not a maximum bound + on the distances of points within a cluster. This is the most + important DBSCAN parameter to choose appropriately for your data set + and distance function. See `DBSCAN docs `_ for more details. + dropcols : list, optional + Drop columns in list from `df` + (the default is ['cluster','purity','completeness']). + + Returns + ------- + pd.DataFrame + `df` with 'cluster' column added + + Raises + ------- + ValueError + sets `usecols` and `dropcols` may not share elements + + """ + for col in dropcols: + if col in df.columns: + df.drop(columns=col, inplace=True) + n_samples = df.shape[0] + if n_samples == 1: + clusters = pd.Series([pd.NA], index=df.index, name="cluster") + return pd.merge(df, clusters, how="left", left_index=True, right_index=True) + cols = ["x", "y"] + if "coverage" in df.columns: + cols.append("coverage") + if np.any(df.isnull()): + raise BinningError( + f"df is missing {df.isnull().sum().sum()} kmer/coverage annotations" + ) + X = df.loc[:, cols].to_numpy() + clusterer = DBSCAN(eps=eps, min_samples=1, n_jobs=-1).fit(X) + clusters = pd.Series(clusterer.labels_, index=df.index, name="cluster") + return pd.merge(df, clusters, how="left", left_index=True, right_index=True) + + def recursive_dbscan( - table, - markers_df, - domain, - completeness_cutoff, - purity_cutoff, - verbose=False, - method="DBSCAN", + table, markers_df, domain, completeness_cutoff, purity_cutoff, verbose=False, ): """Carry out DBSCAN, starting at eps=0.3 and continuing until there is just one group. @@ -83,9 +186,6 @@ def recursive_dbscan( `purity_cutoff` threshold to retain cluster (the default is 90.0). verbose : bool log stats for each recursive_dbscan clustering iteration. - method : str - clustering `method` to perform. (the default is 'DBSCAN'). - Choices=['DBSCAN','HDBSCAN'] NOTE: 'HDBSCAN' is still under development. Returns ------- @@ -105,11 +205,11 @@ def recursive_dbscan( best_median = float("-inf") best_df = pd.DataFrame() while n_clusters > 1: - binned_df = run_dbscan(table, eps, method=method) - df = add_metrics(df=binned_df, markers_df=markers_df, domain=domain) - completess_filter = df["completeness"] >= completeness_cutoff - purity_filter = df["purity"] >= purity_cutoff - median_completeness = df[completess_filter & purity_filter][ + binned_df = run_dbscan(table, eps) + df, metrics_df = add_metrics(df=binned_df, markers_df=markers_df, domain=domain) + completeness_filter = metrics_df["completeness"] >= completeness_cutoff + purity_filter = metrics_df["purity"] >= purity_cutoff + median_completeness = metrics_df[completeness_filter & purity_filter][ "completeness" ].median() if median_completeness >= best_median: @@ -139,9 +239,9 @@ def recursive_dbscan( logger.debug("No complete or pure clusters found") return pd.DataFrame(), table - completess_filter = best_df["completeness"] >= completeness_cutoff + completeness_filter = best_df["completeness"] >= completeness_cutoff purity_filter = best_df["purity"] >= purity_cutoff - complete_and_pure_df = best_df.loc[completess_filter & purity_filter] + complete_and_pure_df = best_df.loc[completeness_filter & purity_filter] unclustered_df = best_df.loc[~best_df.index.isin(complete_and_pure_df.index)] if verbose: logger.debug(f"Best completeness median: {best_median:4.2f}") @@ -151,30 +251,36 @@ def recursive_dbscan( return complete_and_pure_df, unclustered_df -def run_dbscan( +def run_hdbscan( df, - eps, + min_cluster_size, + min_samples, + cache_dir=None, dropcols=["cluster", "purity", "completeness"], - usecols=["x", "y"], - method="DBSCAN", ): - """Run clustering via `method`. + """Run clustering on `df` at provided `min_cluster_size`. + + Notes + ----- + + * reasoning for parameter: `cluster_selection_method `_ + * reasoning for parameters: `min_cluster_size and min_samples `_ + * documentation for `HDBSCAN `_ Parameters ---------- df : pd.DataFrame - Description of parameter `df`. - eps : float - Description of parameter `eps`. - dropcols : list - Drop columns in list from `df` (the default is ['cluster','purity','completeness']). - usecols : list - Use columns in list for `df`. - The default is ['x','y','coverage'] if 'coverage' exists in df.columns. - else ['x','y','z']. - method : str - clustering `method` to perform. (the default is 'DBSCAN'). - Choices=['DBSCAN','HDBSCAN'] NOTE: 'HDBSCAN' is still under development + Contigs with embedded k-mer frequencies as ['x','y'] columns and optionally 'coverage' column + min_cluster_size : int + The minimum size of clusters; single linkage splits that contain + fewer points than this will be considered points "falling out" of a + cluster rather than a cluster splitting into two new clusters. + min_samples : int + The number of samples in a neighborhood for a point to be + considered a core point. + dropcols : list, optional + Drop columns in list from `df` + (the default is ['cluster','purity','completeness']). Returns ------- @@ -185,74 +291,133 @@ def run_dbscan( ------- ValueError sets `usecols` and `dropcols` may not share elements - ValueError - Method is not an available choice: choices=['DBSCAN','HDBSCAN'] + """ + for col in dropcols: + if col in df.columns: + df.drop(columns=col, inplace=True) n_samples = df.shape[0] if n_samples == 1: clusters = pd.Series([pd.NA], index=df.index, name="cluster") return pd.merge(df, clusters, how="left", left_index=True, right_index=True) - for col in dropcols: - if col in df.columns: - df.drop(columns=col, inplace=True) cols = ["x", "y"] - cols.append("coverage") if "coverage" in df.columns else cols.append("z") + if "coverage" in df.columns: + cols.append("coverage") + if np.any(df.isnull()): + raise BinningError( + f"df is missing {df.isnull().sum().sum()} kmer/coverage annotations" + ) X = df.loc[:, cols].to_numpy() - if method == "DBSCAN": - clustering = DBSCAN(eps=eps, min_samples=1, n_jobs=-1).fit(X) - elif method == "HDBSCAN": - clustering = HDBSCAN( - cluster_selection_epsilon=eps, - min_cluster_size=2, - min_samples=1, - allow_single_cluster=True, - gen_min_span_tree=True, - ).fit(X) - else: - raise ValueError(f"Method: {method} not a choice. choose b/w DBSCAN & HDBSCAN") - clusters = pd.Series(clustering.labels_, index=df.index, name="cluster") + clusterer = HDBSCAN( + min_cluster_size=min_cluster_size, + min_samples=min_samples, + cluster_selection_method="leaf", + allow_single_cluster=True, + memory=cache_dir, + ).fit(X) + clusters = pd.Series(clusterer.labels_, index=df.index, name="cluster") return pd.merge(df, clusters, how="left", left_index=True, right_index=True) -def add_metrics(df, markers_df, domain="bacteria"): - """Adds the completeness and purity metrics to each respective contig in df. +def recursive_hdbscan( + table, markers_df, domain, completeness_cutoff, purity_cutoff, verbose=False, +): + """Recursively run HDBSCAN starting with defaults and iterating the min_samples + and min_cluster_size until only 1 cluster is recovered. Parameters ---------- - df : pd.DataFrame - Description of parameter `df`. + table : pd.DataFrame + Contigs with embedded k-mer frequencies as ['x','y','z'] columns and + optionally 'coverage' column markers_df : pd.DataFrame wide format, i.e. index=contig cols=[marker,marker,...] domain : str Kingdom to determine metrics (the default is 'bacteria'). choices=['bacteria','archaea'] + completeness_cutoff : float + `completeness_cutoff` threshold to retain cluster (the default is 20.0). + purity_cutoff : float + `purity_cutoff` threshold to retain cluster (the default is 90.0). + verbose : bool + log stats for each recursive_dbscan clustering iteration. Returns ------- - pd.DataFrame - DataFrame with added columns - 'completeness' and 'purity' + 2-tuple + (pd.DataFrame(), pd.DataFrame()) + DataFrames consisting of contigs that passed/failed clustering + cutoffs, respectively. + + DataFrame: + index = contig + columns = ['x,'y','z','coverage','cluster','purity','completeness'] """ - marker_sets = {"bacteria": 139.0, "archaea": 162.0} - expected_number = marker_sets.get(domain.lower(), 139) - clusters = dict(list(df.groupby("cluster"))) - metrics = {"purity": {}, "completeness": {}} - for cluster, dff in clusters.items(): - contigs = dff.index.tolist() - summed_markers = markers_df[markers_df.index.isin(contigs)].sum() - is_present = summed_markers >= 1 - is_single_copy = summed_markers == 1 - nunique_markers = summed_markers[is_present].index.nunique() - num_single_copy_markers = summed_markers[is_single_copy].index.nunique() - completeness = nunique_markers / expected_number * 100 - # Protect from divide by zero - if nunique_markers == 0: - purity = pd.NA + max_min_cluster_size = 10000 + max_min_samples = 10 + min_cluster_size = 2 + min_samples = 1 + n_clusters = float("inf") + best_median = float("-inf") + best_df = pd.DataFrame() + cache_dir = tempfile.mkdtemp() + while n_clusters > 1: + binned_df = run_hdbscan( + table, + min_cluster_size=min_cluster_size, + min_samples=min_samples, + cache_dir=cache_dir, + ) + df, metrics_df = add_metrics(df=binned_df, markers_df=markers_df, domain=domain) + + completeness_filter = metrics_df["completeness"] >= completeness_cutoff + purity_filter = metrics_df["purity"] >= purity_cutoff + median_completeness = metrics_df[completeness_filter & purity_filter][ + "completeness" + ].median() + if median_completeness >= best_median: + best_median = median_completeness + best_df = df + + n_clusters = df["cluster"].nunique() + + if verbose: + logger.debug( + f"(min_samples, min_cluster_size): ({min_samples}, {min_cluster_size}) clusters: {n_clusters}" + f" median completeness (current, best): ({median_completeness:4.2f}, {best_median:4.2f})" + ) + + if min_cluster_size >= max_min_cluster_size: + shutil.rmtree(cache_dir) + cache_dir = tempfile.mkdtemp() + min_samples += 1 + min_cluster_size = 2 else: - purity = num_single_copy_markers / nunique_markers * 100 - metrics["completeness"].update({cluster: completeness}) - metrics["purity"].update({cluster: purity}) - metrics_df = pd.DataFrame(metrics, index=clusters.keys()) - return pd.merge(df, metrics_df, left_on="cluster", right_index=True) + min_cluster_size += 10 + + if metrics_df[completeness_filter & purity_filter].empty: + min_cluster_size += 100 + + if min_samples >= max_min_samples: + max_min_cluster_size *= 2 + + shutil.rmtree(cache_dir) + + if best_df.empty: + if verbose: + logger.debug("No complete or pure clusters found") + return pd.DataFrame(), table + + completeness_filter = best_df["completeness"] >= completeness_cutoff + purity_filter = best_df["purity"] >= purity_cutoff + complete_and_pure_df = best_df.loc[completeness_filter & purity_filter] + unclustered_df = best_df.loc[~best_df.index.isin(complete_and_pure_df.index)] + if verbose: + logger.debug(f"Best completeness median: {best_median:4.2f}") + logger.debug( + f"clustered: {len(complete_and_pure_df)} unclustered: {len(unclustered_df)}" + ) + return complete_and_pure_df, unclustered_df def get_clusters( @@ -261,7 +426,7 @@ def get_clusters( domain="bacteria", completeness=20.0, purity=90.0, - method="DBSCAN", + method="dbscan", verbose=False, ): """Find best clusters retained after applying `completeness` and `purity` filters. @@ -281,8 +446,8 @@ def get_clusters( purity : float `purity` threshold to retain cluster (the default is 90.). method : str - Description of parameter `method` (the default is 'DBSCAN'). - choices = ['DBSCAN','HDBSCAN'] + Description of parameter `method` (the default is 'dbscan'). + choices = ['dbscan','hdbscan'] verbose : bool log stats for each recursive_dbscan clustering iteration @@ -293,15 +458,16 @@ def get_clusters( """ num_clusters = 0 clusters = [] + recursive_clusterers = {"dbscan": recursive_dbscan, "hdbscan": recursive_hdbscan} + if method not in recursive_clusterers: + raise ValueError(f"Method: {method} not a choice. choose b/w dbscan & hdbscan") + clusterer = recursive_clusterers[method] + + # Continue while unclustered are remaining + # break when either clustered_df or unclustered_df is empty while True: - clustered_df, unclustered_df = recursive_dbscan( - master_df, - markers_df, - domain, - completeness, - purity, - method=method, - verbose=verbose, + clustered_df, unclustered_df = clusterer( + master_df, markers_df, domain, completeness, purity, verbose=verbose, ) # No contigs can be clustered, label as unclustered and add the final df # of (unclustered) contigs @@ -314,7 +480,10 @@ def get_clusters( c: f"bin_{1+i+num_clusters:04d}" for i, c in enumerate(clustered_df.cluster.unique()) } - rename_cluster = lambda c: translation[c] + + def rename_cluster(c): + return translation[c] + clustered_df.cluster = clustered_df.cluster.map(rename_cluster) # All contigs have now been clustered, add the final df of (clustered) contigs @@ -336,7 +505,7 @@ def binning( completeness=20.0, purity=90.0, taxonomy=True, - method="DBSCAN", + method="dbscan", reverse=True, verbose=False, ): @@ -364,8 +533,8 @@ def binning( Split canonical ranks and subset based on rank then attempt to find clusters (the default is True). taxonomic_levels = [superkingdom,phylum,class,order,family,genus,species] method : str - Clustering `method` (the default is 'DBSCAN'). - choices = ['DBSCAN','HDBSCAN'] + Clustering `method` (the default is 'dbscan'). + choices = ['dbscan','hdbscan'] reverse : bool True - [superkingdom,phylum,class,order,family,genus,species] False - [species,genus,family,order,class,phylum,superkingdom] @@ -379,17 +548,25 @@ def binning( Raises ------- - RecursiveDBSCANError + BinningError No marker information is availble for contigs to be binned. """ # First check needs to ensure we have markers available to check binning quality... if master.loc[master.index.isin(markers.index)].empty: - err = "No markers for contigs in table. Unable to assess binning quality" - raise RecursiveDBSCANError(err) + raise BinningError( + "No markers for contigs in table. Unable to assess binning quality" + ) + logger.info(f"Using {method} clustering method") if not taxonomy: return get_clusters( - master, markers, domain, completeness, purity, method, verbose + master_df=master, + markers_df=markers, + domain=domain, + completeness=completeness, + purity=purity, + method=method, + verbose=verbose, ) # Use taxonomy method @@ -406,17 +583,15 @@ def binning( for rank in ranks: # TODO: We should account for novel taxa here instead of removing 'unclassified' unclassified_filter = master[rank] != "unclassified" - n_contigs_in_taxa = ( - master.loc[unclassified_filter].groupby(rank)[rank].count().sum() - ) - n_taxa = ( - master.loc[unclassified_filter].groupby(rank)[rank].count().index.nunique() - ) + master_grouped_by_rank = master.loc[unclassified_filter].groupby(rank) + taxa_counts = master_grouped_by_rank[rank].count() + n_contigs_in_taxa = taxa_counts.sum() + n_taxa = taxa_counts.index.nunique() logger.info( f"Examining {rank}: {n_taxa:,} unique taxa ({n_contigs_in_taxa:,} contigs)" ) # Group contigs by rank and find best clusters within subset - for rank_name_txt, dff in master.loc[unclassified_filter].groupby(rank): + for rank_name_txt, dff in master_grouped_by_rank: if dff.empty: continue # Only cluster contigs that have not already been assigned a bin. @@ -428,14 +603,20 @@ def binning( if clustered_contigs: rank_df = rank_df.loc[~rank_df.index.isin(clustered_contigs)] # After all of the filters, are there multiple contigs to cluster? - if len(rank_df) <= 1: + if rank_df.empty: continue # Find best clusters logger.debug( f"Examining taxonomy: {rank} : {rank_name_txt} : {rank_df.shape}" ) clusters_df = get_clusters( - rank_df, markers, domain, completeness, purity, method + master_df=rank_df, + markers_df=markers, + domain=domain, + completeness=completeness, + purity=purity, + method=method, + verbose=verbose, ) # Store clustered contigs is_clustered = clusters_df["cluster"].notnull() @@ -447,7 +628,10 @@ def binning( c: f"bin_{1+i+num_clusters:04d}" for i, c in enumerate(clustered.cluster.unique()) } - rename_cluster = lambda c: translation[c] + + def rename_cluster(c): + return translation[c] + clustered.cluster = clustered.cluster.map(rename_cluster) num_clusters += clustered.cluster.nunique() clusters.append(clustered) @@ -467,7 +651,10 @@ def main(): level=logger.DEBUG, ) parser = argparse.ArgumentParser( - description="Perform decomposition/embedding/clustering via PCA/[TSNE|UMAP]/DBSCAN." + description="Perform marker gene guided binning of " + "metagenome contigs using annotations (when available) of sequence " + "composition, coverage and homology.", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser.add_argument("kmers", help="") parser.add_argument("coverage", help="") @@ -477,14 +664,14 @@ def main(): parser.add_argument( "--embedding-method", help="Embedding method to use", - choices=["TSNE", "UMAP"], - default="TSNE", + choices=["bhsne", "sksne", "umap"], + default="bhsne", ) parser.add_argument( "--clustering-method", help="Clustering method to use", - choices=["DBSCAN", "HDBSCAN"], - default="DBSCAN", + choices=["dbscan", "hdbscan"], + default="dbscan", ) parser.add_argument( "--completeness", help="", default=20.0, type=float @@ -508,7 +695,7 @@ def main(): cov_df = pd.read_csv(args.coverage, sep="\t", index_col="contig") master_df = pd.merge( - kmers_df, cov_df[["coverage"]], how="left", left_index=True, right_index=True + kmers_df, cov_df[["coverage"]], how="left", left_index=True, right_index=True, ) markers_df = Markers.load(args.markers) diff --git a/autometa/common/exceptions.py b/autometa/common/exceptions.py index a6fa86a8f..0e53579a3 100644 --- a/autometa/common/exceptions.py +++ b/autometa/common/exceptions.py @@ -67,8 +67,8 @@ def __str__(self): return self.value -class RecursiveDBSCANError(AutometaException): - """RecursiveDBSCANError exception class.""" +class BinningError(AutometaException): + """BinningError exception class.""" def __init__(self, value): self.value = value diff --git a/autometa/config/user.py b/autometa/config/user.py index 77483f72d..caa599333 100644 --- a/autometa/config/user.py +++ b/autometa/config/user.py @@ -34,7 +34,7 @@ from autometa.common import utilities from autometa.common.metagenome import Metagenome -from autometa.common.mag import MAG +from autometa.common.metabin import MetaBin from autometa.config.databases import Databases from autometa.config.project import Project from autometa.common.utilities import timeit