Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract position metadata #37

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
5 changes: 5 additions & 0 deletions micov/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
COLUMN_COVERED_DTYPE = pl.UInt32
COLUMN_PERCENT_COVERED = "percent_covered"
COLUMN_PERCENT_COVERED_DTYPE = float
COLUMN_NAME = "name"
PRESENT = "present"
ABSENT = "absent"
NOT_APPLICABLE = "not applicable"
COLUMN_REGION_ID = "region_id"

### should really probably just use a dataclass, and type annotations?

Expand Down
30 changes: 30 additions & 0 deletions micov/_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
COLUMN_CIGAR,
COLUMN_GENOME_ID,
COLUMN_LENGTH,
COLUMN_NAME,
COLUMN_SAMPLE_ID,
COLUMN_START,
COLUMN_START_DTYPE,
Expand Down Expand Up @@ -455,6 +456,35 @@ def parse_features_to_keep(path):
return df.rename({df.columns[0]: COLUMN_GENOME_ID})


def parse_feature_names(path):
"""Parse a TSV of feature names.

We assume the file has a header, and has two columns. The first is the
feature ID and second is the name for the feature.

If the feature name appears to be a lineage, in that it contains "; ",
the lineage will be split and the last name retained.
"""
if path is None:
return None

df = pl.read_csv(path, separator="\t")

return (
df.lazy()
.rename({df.columns[0]: COLUMN_GENOME_ID, df.columns[1]: COLUMN_NAME})
.with_columns(
pl.when(pl.col(COLUMN_NAME).str.contains("; "))
.then(pl.col(COLUMN_NAME).str.split("; ").list.get(-1))
.otherwise(pl.col(COLUMN_NAME))
.alias(COLUMN_NAME)
)
.with_columns(pl.col(COLUMN_NAME).str.replace_all(r" |\[|\]", "_"))
.select([COLUMN_GENOME_ID, COLUMN_NAME])
.collect()
)


def parse_sample_metadata(path):
"""Naively parse sample metadata, do not infer types."""
df = pl.read_csv(path, separator="\t", infer_schema_length=0)
Expand Down
36 changes: 19 additions & 17 deletions micov/_plot.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,11 @@


def per_sample_plots(
all_coverage,
all_covered_positions,
metadata,
feature_metadata,
view,
sample_metadata_column,
output,
monte,
monte_iters,
target_lookup,
):
"""Construct plots for all genomes.

Expand All @@ -39,14 +35,8 @@ def per_sample_plots(

Parameters
----------
all_coverage : pl.DataFrame
The total coverage per sample per genome
all_covered_positions : pl.DataFrame
The exact covered regions per sample per genome
metadata : pl.DataFrame
The sample metadata
feature_metadata : pl.DataFrame
The feature metadata
view : View
The current View of the coverage data
sample_metadata_column : str
The specific column to stratify when plotting. Note it is assumed
this column is categorical.
Expand All @@ -57,12 +47,24 @@ def per_sample_plots(
One of (None, 'focused', 'unfocused'). See "add_monte" for more detail.
monte_iters : int
The number of Monte Carlo iterations to perform.
target_lookup : dict
A mapping of a genome ID to a name

"""
all_covered_positions = view.positions().pl()
all_coverage = view.coverages().pl()
metadata = view.metadata().pl()
feature_metadata = view.feature_metadata().pl()
feature_names = view.feature_names().pl()
target_lookup = dict(feature_names.iter_rows())

if view.constrain_positions:
n_genomes = len(feature_metadata[COLUMN_GENOME_ID].unique())
n_regions = len(feature_metadata)

if n_genomes != n_regions:
raise ValueError(
"Plotting does not yet support desribing multiple regions."
)

for genome in all_coverage[COLUMN_GENOME_ID].unique():
# TODO: move this into feaure_metadata
target_name = target_lookup[genome]
ymin, ymax = (
feature_metadata.filter(pl.col(COLUMN_GENOME_ID) == genome)
Expand Down
Loading
Loading