diff --git a/.gitignore b/.gitignore index c75c9061..e2e15a10 100644 --- a/.gitignore +++ b/.gitignore @@ -119,3 +119,4 @@ test-probes.txt z.test.pncA_T2ATAC.txt .idea +.mongodb \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 718b5161..54912c50 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Default kmer size (21) made consistent across all subcommands [[#141][141]] - Nanopore preset `--ont` defaults to an expected error rate of 0.08 and ploidy 'haploid' [[#134][134]] +- VCF interface from pyvcf to cyvcf2 [[#142][142]] ### Fixed - Improved error messaging when an X amino acid resolves to a mutation already present @@ -68,6 +69,7 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm [134]: https://github.com/Mykrobe-tools/mykrobe/issues/134 [140]: https://github.com/Mykrobe-tools/mykrobe/issues/140 [141]: https://github.com/Mykrobe-tools/mykrobe/issues/141 +[142]: https://github.com/Mykrobe-tools/mykrobe/issues/142 [Unreleased]: https://github.com/Mykrobe-tools/mykrobe/compare/v0.10.0...HEAD [mkdtemp]: https://docs.python.org/3.6/library/tempfile.html#tempfile.mkdtemp diff --git a/requirements.txt b/requirements.txt index 8d561697..3e169bcf 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ anytree Biopython -pyvcf +cyvcf2 requests mongoengine cython diff --git a/setup.py b/setup.py index c1d4c26a..cd2c231d 100644 --- a/setup.py +++ b/setup.py @@ -2,27 +2,25 @@ # -*- encoding: utf-8 -*- from __future__ import absolute_import from __future__ import print_function -import logging + +import io import os -import requests import shutil import subprocess import tarfile -import io -import re from glob import glob from os.path import basename from os.path import dirname from os.path import join from os.path import splitext +import requests from setuptools import find_packages from setuptools import setup from setuptools.command.install import install as DistutilsInstall class MyInstall(DistutilsInstall): - def run(self): self._install_mccortex() @@ -37,7 +35,7 @@ def _get_mykrobe_data(self): extracted_name = "mykrobe-data" tarball_filename = "mykrobe_data.tar.gz" request = requests.get(data_tarball_url, allow_redirects=True) - with open(tarball_filename, 'wb') as t: + with open(tarball_filename, "wb") as t: t.write(request.content) if os.path.exists(extracted_name): shutil.rmtree(extracted_name) @@ -63,16 +61,26 @@ def _install_mccortex(self): if not os.path.exists(mccortex_git_dir): subprocess.call( - ["git", "clone", "--recursive", "-b", "geno_kmer_count", "https://github.com/Mykrobe-tools/mccortex", mccortex_git_dir], cwd=dir_of_this_file) + [ + "git", + "clone", + "--recursive", + "-b", + "geno_kmer_count", + "https://github.com/Mykrobe-tools/mccortex", + mccortex_git_dir, + ], + cwd=dir_of_this_file, + ) mccortex_build_binary = os.path.join(mccortex_git_dir, "bin", "mccortex31") if not os.path.exists(mccortex_build_binary): - subprocess.call( - ["make", "clean"], cwd=mccortex_git_dir) - subprocess.call( - ["make"], cwd=mccortex_git_dir) + subprocess.call(["make", "clean"], cwd=mccortex_git_dir) + subprocess.call(["make"], cwd=mccortex_git_dir) - mccortex_install_dir = os.path.join(dir_of_this_file, "src", "mykrobe", "cortex") + mccortex_install_dir = os.path.join( + dir_of_this_file, "src", "mykrobe", "cortex" + ) mccortex_install_binary = os.path.join(mccortex_install_dir, "mccortex31") assert os.path.exists(mccortex_install_dir) shutil.copy(mccortex_build_binary, mccortex_install_binary) @@ -82,65 +90,52 @@ def _install_mccortex(self): def read(*names, **kwargs): return io.open( - join(dirname(__file__), *names), - encoding=kwargs.get('encoding', 'utf8') + join(dirname(__file__), *names), encoding=kwargs.get("encoding", "utf8") ).read() setup( - name='mykrobe', - version='0.10.0', - license='MIT', - description='Antibiotic resistance prediction in minutes', - # long_description='%s\n%s' % ( - # re.compile('^.. start-badges.*^.. end-badges', - # re.M | re.S).sub('', read('README.md')), - # re.sub(':[a-z]+:`~?(.*?)`', r'``\1``', read('CHANGELOG.rst')) - # ), - author='Phelim Bradley', - author_email='wave@phel.im', - url='https://github.com/Mykrobe-tools/mykrobe', - packages=find_packages('src'), - package_dir={'': 'src'}, - py_modules=[splitext(basename(path))[0] for path in glob( - 'src/*.py')]+[splitext(basename(path))[0] for path in glob('src/*/*.py')]+[splitext(basename(path))[0] for path in glob('src/*/*/*.py')], + name="mykrobe", + version="0.10.0", + license="MIT", + description="Antibiotic resistance prediction in minutes", + author="Phelim Bradley", + author_email="wave@phel.im", + url="https://github.com/Mykrobe-tools/mykrobe", + packages=find_packages("src"), + package_dir={"": "src"}, + py_modules=[splitext(basename(path))[0] for path in glob("src/*.py")] + + [splitext(basename(path))[0] for path in glob("src/*/*.py")] + + [splitext(basename(path))[0] for path in glob("src/*/*/*.py")], include_package_data=True, package_data={"mykrobe": ["cortex/mccortex31"]}, zip_safe=False, classifiers=[ - 'Intended Audience :: Developers', - 'Operating System :: Unix', - 'Operating System :: POSIX', - 'Programming Language :: Python', - 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.3', - 'Programming Language :: Python :: 3.4', - 'Programming Language :: Python :: 3.5', - 'Programming Language :: Python :: 3.6', - 'Programming Language :: Python :: Implementation :: CPython', - 'Programming Language :: Python :: Implementation :: PyPy', - 'Topic :: Utilities', - ], - keywords=[ - # eg: 'keyword1', 'keyword2', 'keyword3', + "Intended Audience :: Developers", + "Operating System :: Unix", + "Operating System :: POSIX", + "Programming Language :: Python", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.3", + "Programming Language :: Python :: 3.4", + "Programming Language :: Python :: 3.5", + "Programming Language :: Python :: 3.6", + "Programming Language :: Python :: Implementation :: CPython", + "Programming Language :: Python :: Implementation :: PyPy", + "Topic :: Utilities", ], install_requires=[ - 'anytree', - 'Biopython', - "pyvcf", - 'mongoengine', - 'requests', - # eg: 'aspectlib==1.1.1', 'six>=1.7', + "anytree", + "Biopython", + "cyvcf2", + "mongoengine", + "requests", + "numpy", ], - extras_require={ - # eg: - # 'rst': ['docutils>=0.11'], - # ':python_version=="2.6"': ['argparse'], - }, entry_points={ - 'console_scripts': [ - 'mykrobe = mykrobe.cli:main', + "console_scripts": [ + "mykrobe = mykrobe.cli:main", ] }, - cmdclass={'install': MyInstall} + cmdclass={"install": MyInstall}, ) diff --git a/src/mykrobe/_vcf/__init__.py b/src/mykrobe/_vcf/__init__.py index 7dcf079e..8debc218 100644 --- a/src/mykrobe/_vcf/__init__.py +++ b/src/mykrobe/_vcf/__init__.py @@ -1 +1 @@ -from mykrobe._vcf.models import VCF +from mykrobe._vcf.models import VCF, Genotype diff --git a/src/mykrobe/_vcf/models.py b/src/mykrobe/_vcf/models.py index bfaf7918..b0f1c3c0 100644 --- a/src/mykrobe/_vcf/models.py +++ b/src/mykrobe/_vcf/models.py @@ -1,36 +1,35 @@ from __future__ import print_function -import vcf + import os.path +import re +from collections import OrderedDict +from typing import Dict, Union, List, Tuple + +import cyvcf2 from mongoengine import DoesNotExist from mongoengine import NotUniqueError -from mykrobe.variants.schema.models import VariantCallSet + +from mykrobe import NON_METADATA_KEYS, SINGULAR_METADATA +from mykrobe.utils import make_var_hash +from mykrobe.variants.schema.models import ReferenceSet from mykrobe.variants.schema.models import Variant +from mykrobe.variants.schema.models import VariantCall +from mykrobe.variants.schema.models import VariantCallSet from mykrobe.variants.schema.models import VariantSet from mykrobe.variants.schema.models import VariantSetMetadata -from mykrobe.variants.schema.models import VariantCall -from mykrobe.variants.schema.models import Reference -from mykrobe.variants.schema.models import ReferenceSet -from mykrobe.utils import make_var_hash GLOBAL_VARIANT_SET_NAME = "global_atlas" -def split_GT(GT): - if "|" in GT: - return GT.split("|") - else: - return GT.split("/") - - class VCF(object): - def __init__( - self, - f, - reference_set_id, - method="NotSpecified", - force=False, - append_to_global_variant_set=True): + self, + f, + reference_set_id, + method="NotSpecified", + force=False, + append_to_global_variant_set=True, + ): self.f = f self.reference_set = ReferenceSet.objects.get(id=reference_set_id) self.references = self._create_reference_lookup() @@ -50,55 +49,58 @@ def _create_reference_lookup(self): return refs def add_to_database(self): - with open(self.f, 'r') as infile: - self.vcf_reader = vcf.Reader(infile) - self._create_new_variant_set() - self._create_variant_set_meta_data() - self._create_call_sets() - self._create_variants_and_calls() + self.vcf_reader = cyvcf2.VCF(self.f, gts012=True) + self._create_new_variant_set() + self._create_variant_set_meta_data() + self._create_call_sets() + self._create_variants_and_calls() + self.vcf_reader.close() def _create_variants_and_calls(self): for record in self.vcf_reader: if not record.FILTER and self._is_record_valid(record): v = self._get_or_create_variant(record) - for call in record.samples: - genotype_likelihoods = self._get_genotype_likelihoods(call) + for sample_idx in range(len(record.genotypes)): + gt_likelihoods = self._get_genotype_likelihoods(record, sample_idx) c = VariantCall.create( variant=v, - call_set=self.call_sets[call.sample], - genotype=call['GT'], - genotype_likelihoods=genotype_likelihoods) + call_set=self.call_sets[self.vcf_reader.samples[sample_idx]], + genotype=str(Genotype(record.genotypes[sample_idx])), + genotype_likelihoods=gt_likelihoods, + ) self.calls.append(c) VariantCall.objects.insert(self.calls) def _get_or_create_variant(self, record): try: - var_hash = make_var_hash(record.REF, record.POS, [ - str(a) for a in record.ALT]) + var_hash = make_var_hash( + record.REF, record.POS, [str(a) for a in record.ALT] + ) v = Variant.objects.get(var_hash=var_hash) v.add_to_variant_set(self.vcf_variant_set) except DoesNotExist: try: reference = self.references[record.CHROM] - except KeyError as e: + except KeyError: raise KeyError( - "Reference %s cannot be found in reference set %s (%s). Please add it to the database." % - (record.CHROM, self.reference_set.id, self.reference_set.name)) + "Reference %s cannot be found in reference set %s (%s). Please add it to the database." + % (record.CHROM, self.reference_set.id, self.reference_set.name) + ) v = Variant.create_and_save( variant_sets=self.variant_sets, start=record.POS, reference_bases=record.REF, - alternate_bases=[ - str(a) for a in record.ALT], + alternate_bases=[str(a) for a in record.ALT], reference=reference, - names=[record.ID]) + names=[record.ID], + ) return v def _remove_variant_set(self, variant_set_name): vs = VariantSet.objects.get( - name=variant_set_name, - reference_set=self.reference_set) + name=variant_set_name, reference_set=self.reference_set + ) for call_set in VariantCallSet.objects(variant_sets=vs): call_set.variant_sets.remove(vs) call_set.save() @@ -112,37 +114,33 @@ def _remove_variant_set(self, variant_set_name): vs.delete() def _create_new_variant_set(self): - variant_set_name = os.path.basename( - self.f) - if VariantSet.objects( - name=variant_set_name, - reference_set=self.reference_set): + variant_set_name = os.path.basename(self.f) + if VariantSet.objects(name=variant_set_name, reference_set=self.reference_set): if not self.force: raise NotUniqueError( - "VariantSet %s already exists. Rerun with -f to recreate." % - variant_set_name) + "VariantSet %s already exists. Rerun with -f to recreate." + % variant_set_name + ) else: self._remove_variant_set(variant_set_name) self.vcf_variant_set = VariantSet.create_and_save( - name=variant_set_name, - reference_set=self.reference_set) + name=variant_set_name, reference_set=self.reference_set + ) def _create_call_sets(self): for sample in self.vcf_reader.samples: try: cs = VariantCallSet.create_and_save( - name="_".join( - [ - sample, - self.method]), + name="_".join([sample, self.method]), variant_sets=self.variant_sets, sample_id=sample, - info={ - "variant_caller": self.method}) + info={"variant_caller": self.method}, + ) except NotUniqueError: raise ValueError( - "There is already a call set for sample %s with method %s " % - (sample, self.method)) + "There is already a call set for sample %s with method %s " + % (sample, self.method) + ) else: self.call_sets[sample] = cs @@ -159,79 +157,189 @@ def global_variant_set(self): vs = VariantSet.objects.get(name=GLOBAL_VARIANT_SET_NAME) except: vs = VariantSet.create_and_save( - name=GLOBAL_VARIANT_SET_NAME, - reference_set=self.reference_set) + name=GLOBAL_VARIANT_SET_NAME, reference_set=self.reference_set + ) return vs def _create_variant_set_meta_data(self): for variant_set in self.variant_sets: - for k, v in self.vcf_reader.metadata.items(): - if not VariantSetMetadata.objects( - key=k, - variant_set=variant_set): + for k, v in self._metadata_from_header(self.vcf_reader.raw_header).items(): + if not VariantSetMetadata.objects(key=k, variant_set=variant_set): vsm = VariantSetMetadata.create_and_save( key=k, value="metadata", type="metadata", - variant_set=variant_set) - for k, v in self.vcf_reader.infos.items(): - if not VariantSetMetadata.objects( - key=k, - variant_set=variant_set): - vsm = VariantSetMetadata.create_and_save( - key=k, - value="infos", - type=v.type, variant_set=variant_set, - number=int( - v.num), - description=v.desc) - for k, v in self.vcf_reader.filters.items(): - if not VariantSetMetadata.objects( - key=k, - variant_set=variant_set): - vsm = VariantSetMetadata.create_and_save( - key=k, - value="filters", - type="filters", - variant_set=variant_set, - description=v.desc) - for k, v in self.vcf_reader.formats.items(): - if not VariantSetMetadata.objects( - key=k, - variant_set=variant_set): - vsm = VariantSetMetadata.create_and_save( - key=k, - value="formats", - type=v.type, - variant_set=variant_set, - number=int( - v.num), - description=v.desc) + ) + for hrec in self.vcf_reader.header_iter(): + try: + hrec_id = hrec["ID"] + hdr_type = hrec["HeaderType"].upper() + except KeyError: # cyvcf2's HREC doesn't have a `get` method + continue - def _is_record_valid(self, record): + if not VariantSetMetadata.objects(key=hrec_id, variant_set=variant_set): + try: + desc = hrec["Description"] + except KeyError: + desc = "" + + params = dict( + key=hrec_id, variant_set=variant_set, description=desc + ) + if hdr_type in ("INFO", "FORMAT"): + params["value"] = hdr_type.lower() + "s" + for field in ("type", "number"): + try: + params[field] = hrec[field.capitalize()] + except KeyError: + raise KeyError( + "{} ID {} does not have field '{}'".format( + hdr_type, hrec_id, field.capitalize() + ) + ) + elif hdr_type == "FILTER": + params["value"] = "filters" + params["type"] = "filters" + else: + continue + + vsm = VariantSetMetadata.create_and_save(**params) + + @staticmethod + def _read_meta_hash(meta_string: str) -> Tuple[str, Dict[str, str]]: + """Taken from pyvcf + https://github.com/jamescasbon/PyVCF/blob/476169cd457ba0caa6b998b301a4d91e975251d9/vcf/parser.py#L184-L220 + """ + items = meta_string.split("=", 1) + # Removing initial hash marks + key = items[0].lstrip("#") + # N.B., items can have quoted values, so cannot just split on comma + val = OrderedDict() + state = 0 + k = "" + v = "" + for c in items[1].strip("[<>]"): + + if state == 0: # reading item key + if c == "=": + state = 1 # end of key, start reading value + else: + k += c # extend key + elif state == 1: # reading item value + if v == "" and c == '"': + v += c # include quote mark in value + state = 2 # start reading quoted value + elif c == ",": + val[k] = v # store parsed item + state = 0 # read next key + k = "" + v = "" + else: + v += c + elif state == 2: # reading quoted item value + if c == '"': + v += c # include quote mark in value + state = 1 # end quoting + else: + v += c + if k != "": + val[k] = v + return key, val + + def _read_meta(self, meta_string: str) -> Tuple[str, Union[str, Dict[str, str]]]: + """Taken from pyvcf + https://github.com/jamescasbon/PyVCF/blob/476169cd457ba0caa6b998b301a4d91e975251d9/vcf/parser.py#L222-L231 + """ + if re.match("##.+=<", meta_string): + return self._read_meta_hash(meta_string) + meta_pattern = re.compile(r"""##(?P.+?)=(?P.+)""") + match = meta_pattern.match(meta_string) + if not match: + # Spec only allows key=value, but we try to be liberal and + # interpret anything else as key=none (and all values are parsed + # as strings). + return meta_string.lstrip("#"), "none" + return match.group("key"), match.group("val") + + def _metadata_from_header( + self, raw_header: str + ) -> Dict[str, Union[str, List[str]]]: + metadata = dict() + for line in raw_header.splitlines(): + key, val = self._read_meta(line) + if key in NON_METADATA_KEYS: + continue + if key in SINGULAR_METADATA: + metadata[key] = val + else: + if key not in metadata: + metadata[key] = [] + metadata[key].append(val) + + return metadata + + @staticmethod + def _is_record_valid(record: cyvcf2.Variant) -> bool: valid = True - for sample in record.samples: - if sample["GT"] is None: + for i, gt in enumerate(map(Genotype, record.genotypes)): + if not gt.is_hom_alt(): valid = False - else: - try: - if sum([int(i) for i in split_GT(sample['GT'])]) < 2: - valid = False - except ValueError: - valid = False - try: - if sample["GT_CONF"] <= 1: - valid = False - except AttributeError: - pass + break + + gt_conf = record.format("GT_CONF")[i][0] + if gt_conf <= 1: # todo: magic number. + valid = False + break + return valid - def _get_genotype_likelihoods(self, sample): + @staticmethod + def _get_genotype_likelihoods( + record: cyvcf2.Variant, sample_idx: int + ) -> List[float]: try: - genotype_likelihoods = [float(i) for i in sample['GL']] - except: + genotype_likelihoods = record.format("GL")[sample_idx] + except KeyError: + gt_conf = record.format("GT_CONF")[sample_idx][0] genotype_likelihoods = [0, 0, 0] - genotype_likelihoods[ - sum([int(i) for i in sample['GT'].split('/')])] = sample["GT_CONF"] + genotype_likelihoods[sum(record.genotypes[sample_idx][:-1])] = gt_conf + return genotype_likelihoods + + +class Genotype(object): + """This small class makes it easier to go from a cyvcf2 genotype array to a string""" + + __slots__ = ("alleles", "phased") + + def __init__(self, genotype): + """genotype is a numpy array whose first element is an int, and last element + is a bool. There are a variable number of ints. The actual type is + List[int, ..., bool], but this doesn't seem to be supported + https://stackoverflow.com/q/71042017/5299417 + """ + self.alleles = genotype[:-1] + self.phased = genotype[-1] + + def __str__(self): + sep = "/|"[int(self.phased)] + # return sep.join(["." if a == -1 else str(a) for a in self.alleles]) + return sep.join(map(self._allele_to_str, self.alleles)) + # return sep.join("0123."[a] for a in self.alleles) + + @staticmethod + def _allele_to_str(a: int) -> str: + return "." if a == -1 else str(a) + + def is_het(self) -> bool: + return len(set(self.alleles)) > 1 + + def is_null(self) -> bool: + return sum(self.alleles) < 0 + + def is_hom_alt(self) -> bool: + """Is genotype homozygous alternate?""" + return not self.is_het() and self.alleles[0] > 0 + + __repr__ = __str__ diff --git a/src/mykrobe/cmds/amr.py b/src/mykrobe/cmds/amr.py index f4682ff2..732fb518 100755 --- a/src/mykrobe/cmds/amr.py +++ b/src/mykrobe/cmds/amr.py @@ -268,6 +268,12 @@ def run(parser, args): if args.tmp is None: args.tmp = tempfile.mkdtemp() + "/" + if args.ont: + args.expected_error_rate = ONT_E_RATE + logger.info(f"Set expected error rate to {args.expected_error_rate} because --ont flag was used") + args.ploidy = ONT_PLOIDY + logger.info(f"Set ploidy to {args.ploidy} because --ont flag used") + # Run Cortex cp = CoverageParser( sample=args.sample, @@ -338,13 +344,11 @@ def run(parser, args): ) if args.guess_sequence_method and kmer_count_error_rate > 0.001: logger.warning("Guess sequence method is on, and we've guessed ONT") - args.ont = True - - if args.ont: + # this is duplicated from the if args.ont statement above args.expected_error_rate = ONT_E_RATE - logger.info(f"Set expected error rate to {args.expected_error_rate} because --ont flag was used") + logger.info(f"Set expected error rate to {args.expected_error_rate}") args.ploidy = ONT_PLOIDY - logger.info(f"Set ploidy to {args.ploidy} because --ont flag used") + logger.info(f"Set ploidy to {args.ploidy}") # conf_percent_cutoff == 100 means that we want to keep all variant calls, # in which case there is no need to run the simulations diff --git a/src/mykrobe/constants.py b/src/mykrobe/constants.py index 789f80fe..db2aa77d 100644 --- a/src/mykrobe/constants.py +++ b/src/mykrobe/constants.py @@ -6,3 +6,7 @@ ONT_E_RATE = 0.08 ONT_PLOIDY = "haploid" ILLUMINA_E_RATE = 0.05 +NON_METADATA_KEYS = {"INFO", "FILTER", "ALT", "FORMAT", "contig"} +# Spec is a bit weak on which metadata lines are singular, like fileformat +# and which can have repeats, like contig +SINGULAR_METADATA = {'fileformat', 'fileDate', 'reference'} diff --git a/src/mykrobe/cortex/mccortex31 b/src/mykrobe/cortex/mccortex31 new file mode 100755 index 00000000..c9120e13 Binary files /dev/null and b/src/mykrobe/cortex/mccortex31 differ diff --git a/tests/vcf_tests/test_adding_vcf_to_db.py b/tests/vcf_tests/test_adding_vcf_to_db.py index 0c3003bc..7cf087a4 100644 --- a/tests/vcf_tests/test_adding_vcf_to_db.py +++ b/tests/vcf_tests/test_adding_vcf_to_db.py @@ -1,39 +1,38 @@ import datetime -from mykrobe._vcf import VCF -from mykrobe.variants.schema.models import VariantSet -from mykrobe.variants.schema.models import VariantSetMetadata -from mykrobe.variants.schema.models import Variant -from mykrobe.variants.schema.models import VariantCallSet -from mykrobe.variants.schema.models import VariantCall + from mongoengine import connect -from mykrobe.variants.schema.models import ReferenceSet -from mykrobe.variants.schema.models import Reference -DB = connect('mykrobe-test') +from mykrobe._vcf import VCF, Genotype +from mykrobe.variants.schema.models import Reference +from mykrobe.variants.schema.models import ReferenceSet +from mykrobe.variants.schema.models import Variant +from mykrobe.variants.schema.models import VariantCall +from mykrobe.variants.schema.models import VariantCallSet +from mykrobe.variants.schema.models import VariantSet +from mykrobe.variants.schema.models import VariantSetMetadata +DB = connect("mykrobe-test") -class BaseTest(): +class BaseTest: def setup(self): - DB.drop_database('mykrobe-test') + DB.drop_database("mykrobe-test") self.reference_set = ReferenceSet().create_and_save(name="ref_set") self.reference = Reference().create_and_save( - name="NC_000962.3", - md5checksum="sre", - reference_sets=[ - self.reference_set]) + name="NC_000962.3", md5checksum="sre", reference_sets=[self.reference_set] + ) def teardown(self): - DB.drop_database('mykrobe-test') + DB.drop_database("mykrobe-test") class TestAddNewVariantSet(BaseTest): - def test_add_new_vcf_variant_set(self): vcf = VCF( f="tests/vcf_tests/test.vcf", reference_set_id=self.reference_set.id, - method="CORTEX") + method="CORTEX", + ) vcf.add_to_database() # We create a global variant set as well as one for the individual VCF assert VariantSet.objects().count() == 2 @@ -43,59 +42,56 @@ def test_add_new_vcf_variant_set(self): class TestAddNewVariantSetMetaData(BaseTest): - def test_add_new_vcf_variant_set(self): vcf = VCF( f="tests/vcf_tests/test.vcf", reference_set_id=self.reference_set.id, - method="CORTEX") + method="CORTEX", + ) vcf.add_to_database() assert VariantSetMetadata.objects().count() >= 2 assert VariantSetMetadata.objects(key="KMER").count() == 2 class TestAddNewCallSet(BaseTest): - def test_add_new_call_set(self): vcf = VCF( f="tests/vcf_tests/test.vcf", reference_set_id=self.reference_set.id, - method="CORTEX") + method="CORTEX", + ) vcf.add_to_database() # Only one callset but the callset should belong to multiple variant # sets assert VariantCallSet.objects().count() == 1 - assert VariantCallSet.objects()[ - 0].created_at <= datetime.datetime.now() + assert VariantCallSet.objects()[0].created_at <= datetime.datetime.now() assert len(VariantCallSet.objects()[0].variant_sets) == 2 class TestVariantsAndCalls(BaseTest): - def test_add_add_variants_and_calls(self): vcf = VCF( f="tests/vcf_tests/test.vcf", reference_set_id=self.reference_set.id, - method="CORTEX") + method="CORTEX", + ) vcf.add_to_database() assert VariantCall.objects().count() == 21 assert Variant.objects().count() == 21 class TestAddSecondVCF(BaseTest): - def setup(self): - DB.drop_database('mykrobe-test') + DB.drop_database("mykrobe-test") self.reference_set = ReferenceSet().create_and_save(name="ref_set") self.reference = Reference().create_and_save( - name="NC_000962.3", - md5checksum="sre", - reference_sets=[ - self.reference_set]) + name="NC_000962.3", md5checksum="sre", reference_sets=[self.reference_set] + ) vcf = VCF( f="tests/vcf_tests/test.vcf", reference_set_id=self.reference_set.id, - method="CORTEX") + method="CORTEX", + ) vcf.add_to_database() def test_add_second_vcf_variant_set(self): @@ -103,26 +99,25 @@ def test_add_second_vcf_variant_set(self): vcf = VCF( f="tests/vcf_tests/test2.vcf", reference_set_id=self.reference_set.id, - method="CORTEX") + method="CORTEX", + ) vcf.add_to_database() assert VariantSet.objects().count() == 3 assert VariantCallSet.objects().count() == 2 assert VariantCall.objects().count() == 42 assert Variant.objects().count() == 22 assert len(Variant.objects()[0].variant_sets) == 3 - assert len( - Variant.objects.get( - names="UNION_BC_k31_var_147").variant_sets) == 3 + assert len(Variant.objects.get(names="UNION_BC_k31_var_147").variant_sets) == 3 class TestAddVCFwithIndels(BaseTest): - def test_add_second_vcf_variant_set(self): # This VCF only has one Variant which is not in the first VCF vcf = VCF( f="tests/vcf_tests/test3.vcf", reference_set_id=self.reference_set.id, - method="CORTEX") + method="CORTEX", + ) vcf.add_to_database() assert VariantSet.objects().count() == 2 assert VariantCallSet.objects().count() == 1 @@ -133,3 +128,50 @@ def test_add_second_vcf_variant_set(self): assert Variant.insertions().count() == 8 assert Variant.deletions().count() == 8 assert Variant.ph_snps.count() == 1 + + +class TestGenotype: + def test_str_unphased(self): + gt = Genotype([-1, 0, 2, False]) + + actual = str(gt) + expected = "./0/2" + + assert actual == expected + + def test_str_phased(self): + gt = Genotype([-1, 0, 2, True]) + + actual = str(gt) + expected = ".|0|2" + + assert actual == expected + + def test_str_haploid(self): + gt = Genotype([0, False]) + + actual = str(gt) + expected = "0" + + assert actual == expected + + def test_is_het(self): + assert Genotype([0, 1, False]).is_het() + assert Genotype([2, 1, True]).is_het() + assert Genotype([0, -1, False]).is_het() + assert Genotype([0, 0, -1, False]).is_het() + assert not Genotype([0, 0, False]).is_het() + assert not Genotype([1, False]).is_het() + + def test_is_null(self): + assert Genotype([-1, False]).is_null() + assert Genotype([-1, -1, False]).is_null() + assert not Genotype([1, -1, False]).is_null() + + def test_is_hom_alt(self): + assert Genotype([1, 1, True]).is_hom_alt() + assert Genotype([3, 3, True]).is_hom_alt() + assert not Genotype([0, 0, True]).is_hom_alt() + assert not Genotype([-1, -1, True]).is_hom_alt() + assert not Genotype([-1, False]).is_hom_alt() + assert Genotype([1, False]).is_hom_alt()