Skip to content

Commit

Permalink
Fix combine CLI producing empty output with invalid data (#56)
Browse files Browse the repository at this point in the history
* fix variant model to support failed harmonisation

* combine: fix CLI not raising exceptions

* add test with variants that fail harmonisation

* bump patch version

* make sure an output file exists

* add test to make sure ValidationErrors are correctly raised

* add hm_rsID to field validators

* relax rsID format check

* disable rsid format check when variants are harmonised by liftover
  • Loading branch information
nebfield authored Oct 23, 2024
1 parent 4109030 commit 3b3bf34
Show file tree
Hide file tree
Showing 7 changed files with 177 additions and 34 deletions.
2 changes: 1 addition & 1 deletion pgscatalog.core/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "pgscatalog.core"
version = "0.3.1"
version = "0.3.2"
description = "Core tools for working with polygenic scores (PGS) and the PGS Catalog"
license = "Apache-2.0"
authors = ["Benjamin Wingfield <bwingfield@ebi.ac.uk>", "Samuel Lambert <sl925@medschl.cam.ac.uk>", "Laurent Gil <lg10@sanger.ac.uk>"]
Expand Down
46 changes: 28 additions & 18 deletions pgscatalog.core/src/pgscatalog/core/cli/combine_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import textwrap
from typing import Optional

import pydantic
from tqdm import tqdm

from pgscatalog.core.lib.models import ScoreLog, ScoreLogs, ScoreVariant, VariantLog
Expand Down Expand Up @@ -39,35 +40,44 @@ def _combine(
{"accession", "row_nr", "hm_source", "is_complex"}
)
# it's important to create the list here to raise EffectTypeErrors
# for the largest scoring files this can use quite a lot of memory (~16GB)
# for the largest scoring files this can use quite a lot of memory (~32GB)
# TODO: materialise and write these variants in batches to reduce memory usage
# don't forget to think about deleting a partially written file (i.e. bad batch?)
dumped_variants = list(x.model_dump(include=fields) for x in normalised_score)
logger.info(f"Finished processing {scorefile.pgs_id}")
except EffectTypeError:
logger.warning(
f"Unsupported non-additive effect types in {scorefile=}, skipping"
)
is_compatible = False
except pydantic.ValidationError:
logger.critical(
f"{scorefile.pgs_id} contains invalid data, stopping and exploding"
)
raise
else:
# no exceptions found, good to write out
logger.info("Writing variants to file")
writer = TextFileWriter(compress=compress_output, filename=out_path)
writer.write(dumped_variants)
logger.info("Finished writing")
finally:
variant_logs: Optional[list[VariantLog]] = None
if dumped_variants is not None:
variant_logs = [VariantLog(**x) for x in dumped_variants]

log: ScoreLog = ScoreLog(
header=scorefile.header,
variant_logs=variant_logs,
compatible_effect_type=is_compatible,

# (don't use finally clause here, it was hiding helpful exceptions)
variant_logs: Optional[list[VariantLog]] = None
if dumped_variants is not None:
variant_logs = [VariantLog(**x) for x in dumped_variants]

log: ScoreLog = ScoreLog(
header=scorefile.header,
variant_logs=variant_logs,
compatible_effect_type=is_compatible,
)
if log.variants_are_missing:
logger.warning(
f"{log.variant_count_difference} fewer variants in output compared to original file"
)
if log.variants_are_missing:
logger.warning(
f"{log.variant_count_difference} fewer variants in output compared to original file"
)
logger.info("Normalisation complete, returning score log")
return log
logger.info("Normalisation complete, returning score log")
return log


def run():
Expand All @@ -77,7 +87,7 @@ def run():
logger.setLevel(logging.DEBUG)
logger.debug("Verbose logging enabled")

out_path = pathlib.Path(args.outfile)
out_path = pathlib.Path(args.outfile).resolve()

if out_path.exists():
logger.critical(f"Output file already exists: {args.outfile}")
Expand Down Expand Up @@ -155,7 +165,7 @@ def run():
)
variant_log.append(log)

if n_finished == 0:
if n_finished == 0 or not out_path.exists():
raise ValueError(
"Couldn't process any scoring files. Did they all have non-additive weights?"
)
Expand Down
2 changes: 1 addition & 1 deletion pgscatalog.core/src/pgscatalog/core/lib/_normalise.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def remap_harmonised(variants, harmonised, target_build):
Perhaps authors submitted rsID and effect allele originally:
>>> from pgscatalog.core.lib.models import ScoreVariant
>>> variant = ScoreVariant(**{"chr_position": 1, "rsID": None, "chr_name": "2", "effect_allele": "A", "effect_weight": 5, "accession": "test", "hm_chr": "1", "hm_pos": 100, "hm_rsID": "testrsid", "hm_inferOtherAllele": "A", "row_nr": 0})
>>> variant = ScoreVariant(**{"chr_position": 1, "rsID": None, "chr_name": "2", "effect_allele": "A", "effect_weight": 5, "accession": "test", "hm_chr": "1", "hm_pos": 100, "hm_rsID": "rsTEST", "hm_inferOtherAllele": "A", "row_nr": 0})
>>> variant
ScoreVariant(..., effect_allele=Allele(allele='A', is_snp=True), other_allele=None, ...
Expand Down
74 changes: 60 additions & 14 deletions pgscatalog.core/src/pgscatalog/core/lib/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,12 @@ class CatalogScoreVariant(BaseModel):
]

# helpful class attributes (not used by pydantic to instantiate a class)
harmonised_columns: ClassVar[tuple[str, str, str]] = ("hm_rsID", "hm_chr", "hm_pos")
harmonised_columns: ClassVar[tuple[str, str, str, str]] = (
"hm_source",
"hm_rsID",
"hm_chr",
"hm_pos",
)
complex_columns: ClassVar[tuple[str, str, str]] = (
"is_haplotype",
"is_diplotype",
Expand Down Expand Up @@ -457,6 +462,14 @@ def is_harmonised(self) -> bool:
return True
return False

@cached_property
def is_hm_bad(self) -> bool:
"""Was harmonisation OK?"""
# not a computed field because never used in serialisation
if self.is_harmonised and self.hm_source == "Unknown":
return True
return False

@computed_field # type: ignore
@cached_property
def is_complex(self) -> bool:
Expand Down Expand Up @@ -518,6 +531,7 @@ def alleles_must_parse(cls, value: Any) -> Allele:
"chr_position",
"is_haplotype",
"is_diplotype",
"hm_rsID",
"hm_chr",
"hm_pos",
"hm_match_chr",
Expand All @@ -532,21 +546,32 @@ def empty_string_to_none(cls, v: Any) -> Optional[Any]:
return None
return v

@field_validator("rsID", mode="after")
@field_validator("rsID", "hm_rsID", mode="after")
@classmethod
def check_rsid_format(cls, rsid: Optional[str]) -> Optional[str]:
def set_missing_rsid(cls, rsid: Optional[str]) -> Optional[str]:
# rsID is a special case where . means missing
if rsid == ".":
rsid = None

if (
rsid is None
or rsid.startswith("rs")
or rsid.startswith("ss")
or rsid.startswith("HLA")
):
return rsid
else:
raise ValueError("rsid field must start with rs or ss or HLA")
return rsid

@model_validator(mode="after")
def check_rsid_format(self) -> Self:
if self.is_hm_bad or self.hm_source == "liftover":
# disable this check when harmonisation fails
# variants that have been harmonised by liftover will put coordinates in rsID column
return self

for x in (self.rsID, self.hm_rsID):
if not (
x is None
or x.startswith("rs")
or x.startswith("ss")
or x.startswith("HLA")
):
raise ValueError("rsid field must start with rs or ss or HLA")

return self

@field_validator(
"effect_weight",
Expand Down Expand Up @@ -617,7 +642,8 @@ def check_position(self) -> Self:
# mandatory rsid with optional coordinates
pass
case _:
if self.is_complex:
if self.is_complex or self.is_hm_bad:
# harmonisation may fail and create variants with bad coordinates
# complex variants can have odd non-standard positions
# e.g. e2 allele of APOE gene
pass
Expand Down Expand Up @@ -682,6 +708,23 @@ class ScoreVariant(CatalogScoreVariant):
>>> variant_complex = ScoreVariant(**{"rsID": None, "chr_name": "1", "chr_position": 1, "effect_allele": "A", "effect_weight": 0.5, "is_haplotype": True, "row_nr": 0, "accession": "test"})
>>> variant_complex.is_complex
True
The harmonisation process might fail, so variants can be missing mandatory fields.
This must be supported by the model:
>>> bad_hm_variant = ScoreVariant(**{"rsID": "a_weird_rsid", "chr_name": None, "chr_position": None, "effect_allele": "G", "effect_weight": 0.5, "hm_chr": None, "hm_pos": None, "hm_rsID": None, "hm_source": "Unknown", "row_nr": 0, "accession": "test"})
>>> bad_hm_variant.is_harmonised
True
>>> bad_hm_variant.is_hm_bad
True
>>> harmonised_variant.is_hm_bad
False
rsID format validation (i.e. starts with rs, ss...) is disabled when harmonisation fails:
>>> bad_hm_variant.rsID
'a_weird_rsid'
"""

model_config = ConfigDict(use_enum_values=True)
Expand Down Expand Up @@ -1043,7 +1086,10 @@ def variant_count_difference(self) -> Optional[int]:

@property
def variants_are_missing(self) -> bool:
return self.variant_count_difference != 0
return (
self.variant_count_difference is not None
and self.variant_count_difference != 0
)


class ScoreLogs(RootModel):
Expand Down
23 changes: 23 additions & 0 deletions pgscatalog.core/tests/data/bad_harmonised.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
###PGS CATALOG SCORING FILE - see https://www.pgscatalog.org/downloads/#dl_ftp_scoring for additional information
#format_version=2.0
##POLYGENIC SCORE (PGS) INFORMATION
#pgs_id=PGS003581
#pgs_name=MTAG.Envs.LifetimeMDD
#trait_reported=Major Depressive Disorder (Lifetime)
#trait_mapped=major depressive disorder
#trait_efo=MONDO_0002009
#genome_build=GRCh37
#variants_number=4861398
#weight_type=beta
##SOURCE INFORMATION
#pgp_id=PGP000461
#citation=Dahl A et al. bioRxiv (2022). doi:10.1101/2022.08.15.503980
##HARMONIZATION DETAILS
#HmPOS_build=GRCh38
#HmPOS_date=2023-04-12
#HmPOS_match_chr={""True"": null, ""False"": null}
#HmPOS_match_pos={""True"": null, ""False"": null}
rsID effect_allele other_allele effect_weight allelefrequency_effect variant_description hm_source hm_rsID hm_chr hm_pos hm_inferOtherAllele
. T C 0.004040743 0.329683 VARIANT_ID=1:2556125_C_T Unknown
. T C 0.004025092 0.329689 VARIANT_ID=1:2556548_C_T Unknown
. A G 0.004013246 0.329686 VARIANT_ID=1:2556709_G_A Unknown
18 changes: 18 additions & 0 deletions pgscatalog.core/tests/data/invalid_scorefile.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
###PGS CATALOG SCORING FILE - see https://www.pgscatalog.org/downloads/#dl_ftp_scoring for additional information
#format_version=2.0
##POLYGENIC SCORE (PGS) INFORMATION
#pgs_id=PGS003581
#pgs_name=MTAG.Envs.LifetimeMDD
#trait_reported=Major Depressive Disorder (Lifetime)
#trait_mapped=major depressive disorder
#trait_efo=MONDO_0002009
#genome_build=GRCh37
#variants_number=4861398
#weight_type=beta
##SOURCE INFORMATION
#pgp_id=PGP000461
#citation=Dahl A et al. bioRxiv (2022). doi:10.1101/2022.08.15.503980
rsID effect_allele other_allele effect_weight allelefrequency_effect variant_description
. T C 0.004040743 0.329683 VARIANT_ID=1:2556125_C_T
. T C 0.004025092 0.329689 VARIANT_ID=1:2556548_C_T
. A G 0.004013246 0.329686 VARIANT_ID=1:2556709_G_A
46 changes: 46 additions & 0 deletions pgscatalog.core/tests/test_combine_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import itertools
import json
from unittest.mock import patch

import pydantic
import pytest

from pgscatalog.core.cli.combine_cli import run
Expand Down Expand Up @@ -83,6 +85,50 @@ def lift_scorefiles(request):
)


@pytest.fixture(scope="package")
def fail_harmonised(request):
"""This scoring file only contains variants that failed harmonisation."""
return request.path.parent / "data" / "bad_harmonised.txt"


@pytest.fixture(scope="package")
def invalid_scorefile(request):
"""This scoring file contains invalid variants missing mandatory fields"""
return request.path.parent / "data" / "invalid_scorefile.txt"


def test_invalid_scorefile(tmp_path, invalid_scorefile):
"""There's nothing that can be done for this file, except explode loudly"""
out_path = tmp_path / "combined.txt"
path = [str(invalid_scorefile)]

args = [("pgscatalog-combine", "-s"), path, ("-o", str(out_path), "-t", "GRCh37")]
flargs = list(itertools.chain(*args))

# make sure the correct exception type is being raised by the CLI
with pytest.raises(pydantic.ValidationError):
with patch("sys.argv", flargs):
run()


def test_fail_harmonised(tmp_path, fail_harmonised):
"""Variants that have failed harmonisation will be missing mandatory fields, but we should accept them as a special case and write out like normal"""
out_path = tmp_path / "combined.txt"
path = [str(fail_harmonised)]

args = [("pgscatalog-combine", "-s"), path, ("-o", str(out_path), "-t", "GRCh38")]
flargs = list(itertools.chain(*args))

with patch("sys.argv", flargs):
run()

# https://github.com/PGScatalog/pygscatalog/issues/55
assert out_path.exists()
with open(out_path, mode="rt") as f:
x = list(csv.reader(f, delimiter="\t"))
assert len(x) == 4 # 3 variants + header


def test_combine_nonadditive(tmp_path, non_additive_scorefile_grch38):
"""Test normalising a single non-additive scoring file fails."""
out_path = tmp_path / "combined.txt.gz"
Expand Down

0 comments on commit 3b3bf34

Please sign in to comment.