diff --git a/.gitignore b/.gitignore index 7ff73c3..3994c5e 100644 --- a/.gitignore +++ b/.gitignore @@ -107,3 +107,5 @@ venv.bak/ *~ *.orig archive + +docker/data* diff --git a/demo/vcf_demo.ipynb b/demo/vcf_demo.ipynb new file mode 100644 index 0000000..973d987 --- /dev/null +++ b/demo/vcf_demo.ipynb @@ -0,0 +1,409 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "a8cfd2b7-ede8-4980-b885-42a8c9aa46f5", + "metadata": {}, + "source": [ + "## AnyVar VCF processing and annotation\n", + "\n", + "### Setup\n", + "\n", + "First, we'll initialize AnyVar (we already have some required services running in the background) and the VCF registrar object" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "149106da-a4f7-4a7e-a2c1-154805bef2d2", + "metadata": { + "collapsed": true, + "jupyter": { + "outputs_hidden": true + }, + "tags": [] + }, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "Removing allOf attribute from CopyNumber to avoid python-jsonschema-objects error.\n", + "Removing allOf attribute from SequenceInterval to avoid python-jsonschema-objects error.\n", + "Removing allOf attribute from RepeatedSequenceExpression to avoid python-jsonschema-objects error.\n", + "/Users/jss009/code/anyvar/venv/lib/python3.8/site-packages/python_jsonschema_objects/__init__.py:49: UserWarning: Schema version http://json-schema.org/draft-07/schema not recognized. Some keywords and features may not be supported.\n", + " warnings.warn(\n" + ] + } + ], + "source": [ + "from pathlib import Path\n", + "from timeit import default_timer as timer\n", + "\n", + "from anyvar.anyvar import create_storage, create_translator, AnyVar\n", + "from anyvar.storage.postgres import PostgresBatchManager\n", + "from anyvar.extras.vcf import VcfRegistrar" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "4dc892b7-9ba9-4ea0-9af0-8aea6114bc8b", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "av = AnyVar(\n", + " create_translator(\"http://localhost:7999/variation/\"),\n", + " create_storage(\"postgresql://postgres@localhost:5432/anyvar\")\n", + ")\n", + "vcf_registrar = VcfRegistrar(av)" + ] + }, + { + "cell_type": "markdown", + "id": "d2cd1685-5c37-4464-8908-32244972b059", + "metadata": {}, + "source": [ + "### Input" + ] + }, + { + "cell_type": "markdown", + "id": "f5333db2-dd61-46e3-be0a-5f4ae493f728", + "metadata": { + "tags": [] + }, + "source": [ + "We have a sample file `vcf-100k-no-added-errors-01-20-23.vcf`, with about 100,000 rows comprised of simple SNPs and indels:" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "5a8366c6-d7e6-4cd0-8c52-918cb0b1c44f", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + " 107139 ../vcf-100k-no-added-errors-01-20-23.vcf\n" + ] + } + ], + "source": [ + "!wc -l ../vcf-100k-no-added-errors-01-20-23.vcf" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "a8df0639-37e9-460d-8485-850ee8878858", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\u001b[38;5;238m───────┬────────────────────────────────────────────────────────────────────────\u001b[0m\n", + " \u001b[38;5;238m│ \u001b[0mFile: \u001b[1m../vcf-100k-no-added-errors-01-20-23.vcf\u001b[0m\n", + "\u001b[38;5;238m───────┼────────────────────────────────────────────────────────────────────────\u001b[0m\n", + "\u001b[38;5;238m4000\u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245mchr1 18476814 . ATTCATCTCTCC A . PASS QUALapprox=1784\u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245m;SB=29,17,26,20;MQ=60.0000;MQRankSum=1.04600;VarDP=92;AS_ReadPosRankSum\u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245m=-0.371000;AS_pab_max=0.867939;AS_QD=19.3913;AS_MQ=60.0000;QD=19.3913;A\u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245mS_MQRankSum=1.04600;FS=1.73310;AS_FS=1.73310;ReadPosRankSum=-0.371000;A\u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245mS_QUALapprox=1784;AS_SB_TABLE=29,17,26,20;AS_VarDP=92;AS_SOR=0.466938;S\u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245mOR=0.467000;AS_VQSLOD=2.65400;InbreedingCoeff=-1.31130e-05\u001b[0m\n", + "\u001b[38;5;238m4001\u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245mchr1 18478618 . GAGGAAAGAAGGGAGGGAGGGAGGAAGGAAGGAAGGAAGGA G \u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245m . PASS QUALapprox=472443;SB=33801,6409,10273,221;MQ=55.3025;MQRan\u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245mkSum=-1.06100;VarDP=51308;AS_ReadPosRankSum=1.66600;AS_pab_max=0.359283\u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245m;AS_QD=13.5263;AS_MQ=55.8653;QD=9.20798;AS_MQRankSum=-0.380000;FS=25.15\u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245m05;AS_FS=.;ReadPosRankSum=0.736000;AS_QUALapprox=257;AS_SB_TABLE=33801,\u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245m6409,5,2;AS_VarDP=19;AS_SOR=0.134394;SOR=4.35700;singleton;AS_VQSLOD=15\u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245m.0936;InbreedingCoeff=-9.29832e-06\u001b[0m\n", + "\u001b[38;5;238m4002\u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245mchr1 18480705 . TGGCTGATCACAAGCTCAGCCCCTGG T . PASS QUA\u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245mLapprox=7722;SB=130,91,113,90;MQ=60.0000;MQRankSum=-0.207000;VarDP=424;\u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245mAS_ReadPosRankSum=0.0170000;AS_pab_max=0.874629;AS_QD=18.2123;AS_MQ=60.\u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245m0000;QD=18.2123;AS_MQRankSum=-0.207000;FS=1.75729;AS_FS=1.75729;ReadPos\u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245mRankSum=0.0170000;AS_QUALapprox=7722;AS_SB_TABLE=130,91,113,90;AS_VarDP\u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245m=424;AS_SOR=0.573256;SOR=0.573000;AS_VQSLOD=2.69030;InbreedingCoeff=-2.\u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245m63453e-05\u001b[0m\n", + "\u001b[38;5;238m4003\u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245mchr1 18500005 . T C . PASS QUALapprox=511;SB=40,33,16,\u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245m9;MQ=59.9617;MQRankSum=0.00000;VarDP=98;AS_ReadPosRankSum=0.481000;AS_p\u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245mab_max=0.229523;AS_QD=18.8000;AS_MQ=60.0000;QD=5.21429;AS_MQRankSum=-0.\u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245m368000;FS=3.11919;AS_FS=1.04367;ReadPosRankSum=-0.175000;AS_QUALapprox=\u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245m470;AS_SB_TABLE=40,33,8,8;AS_VarDP=25;AS_SOR=0.523358;SOR=1.09400;singl\u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245meton;AS_VQSLOD=4.37060;InbreedingCoeff=-6.55651e-06\u001b[0m\n", + "\u001b[38;5;238m───────┴────────────────────────────────────────────────────────────────────────\u001b[0m\n" + ] + } + ], + "source": [ + "!bat --line-range=4000:4003 ../vcf-100k-no-added-errors-01-20-23.vcf # for example" + ] + }, + { + "cell_type": "markdown", + "id": "5800eeb2-b035-406a-9ed8-0e35e2aeabba", + "metadata": {}, + "source": [ + "### Ingestion and annotation" + ] + }, + { + "cell_type": "markdown", + "id": "e731ade9-6b00-4b74-b2f3-be6a91f8a394", + "metadata": {}, + "source": [ + "We'll run the `annotate()` method and track wall clock time:" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "71f0a884-287a-40a0-b736-e85933a8570b", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "processed all VCF rows in 1964.3764593320002 seconds\n" + ] + } + ], + "source": [ + "start = timer()\n", + "vcf_registrar.annotate(\n", + " \"../vcf-100k-no-added-errors-01-20-23.vcf\", \n", + " vcf_out=\"out.vcf\"\n", + ")\n", + "end = timer()\n", + "print(f\"processed all VCF rows in {end - start} seconds\")" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "e52a6bcf-57f7-4127-b316-10740c111613", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Between references and alternates, this registers 198098 alleles.\n" + ] + } + ], + "source": [ + "allele_count = av.object_store.get_variation_count('all')\n", + "print(f\"Between references and alternates, this registers {allele_count} alleles.\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d0bc8505-9d12-44fa-9ede-64ccea2c98b2", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [] + }, + { + "cell_type": "markdown", + "id": "2c4e72b6-5f11-48a3-a55b-ac63a6693785", + "metadata": {}, + "source": [ + "### Output" + ] + }, + { + "cell_type": "markdown", + "id": "6b8371e5-e1df-4cd7-8f33-1fe56230e2dc", + "metadata": {}, + "source": [ + "This process adds VRS allele IDs to the VCF's INFO field:" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "79198412-b69b-4677-86aa-e542cb564674", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\u001b[38;5;238m───────┬────────────────────────────────────────────────────────────────────────\u001b[0m\n", + " \u001b[38;5;238m│ \u001b[0mFile: \u001b[1mout.vcf\u001b[0m\n", + "\u001b[38;5;238m───────┼────────────────────────────────────────────────────────────────────────\u001b[0m\n", + "\u001b[38;5;238m4000\u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245mchr1 18357472 . GGGATGAGGTGGGGATGGGGATGGGAATGAAGTGGA G . \u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245m AS_VQSR QUALapprox=1111;SB=377,138,62,9;MQ=59.9291;MQRankSum=0.48;VarD\u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245mP=586;AS_ReadPosRankSum=-1.271;AS_pab_max=0.375;AS_QD=1.89811;AS_MQ=59.\u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245m9236;QD=1.8959;AS_MQRankSum=0.48;FS=6.83343;AS_FS=15.2899;ReadPosRankSu\u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245mm=-1.231;AS_QUALapprox=1006;AS_SB_TABLE=377,138,58,5;AS_VarDP=530;AS_SO\u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245mR=2.64441;SOR=1.851;AS_VQSLOD=-2.4957;InbreedingCoeff=-0.000289321;VRS_\u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245mAllele=ga4gh:VA.7WnGU91csokVUIk06Qgbte8vCd1gqUsY,ga4gh:VA.8grD8SPIVB8MQ\u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245mq7E_OuTvZbgbAIys2Il\u001b[0m\n", + "\u001b[38;5;238m4001\u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245mchr1 18467245 . G C . PASS QUALapprox=1466;SB=41,21,28\u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245m,30;MQ=60;MQRankSum=0.879;VarDP=120;AS_ReadPosRankSum=-0.189;AS_pab_max\u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245m=1;AS_QD=12.2167;AS_MQ=60;QD=12.2167;AS_MQRankSum=0.879;FS=11.8989;AS_F\u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245mS=11.8989;ReadPosRankSum=-0.189;AS_QUALapprox=1466;AS_SB_TABLE=41,21,28\u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245m,30;AS_VarDP=120;AS_SOR=0.348587;SOR=0.349;AS_VQSLOD=4.4446;InbreedingC\u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245moeff=-1.96695e-05;VRS_Allele=ga4gh:VA.uQ3gn707Jt4RveHLGeKGQpqzzUWtYaWw,\u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245mga4gh:VA.4DKgte-DjFZ71nmAw9Vv52snNVKoxg5z\u001b[0m\n", + "\u001b[38;5;238m4002\u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245mchr1 18476814 . ATTCATCTCTCC A . PASS QUALapprox=1784\u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245m;SB=29,17,26,20;MQ=60;MQRankSum=1.046;VarDP=92;AS_ReadPosRankSum=-0.371\u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245m;AS_pab_max=0.867939;AS_QD=19.3913;AS_MQ=60;QD=19.3913;AS_MQRankSum=1.0\u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245m46;FS=1.7331;AS_FS=1.7331;ReadPosRankSum=-0.371;AS_QUALapprox=1784;AS_S\u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245mB_TABLE=29,17,26,20;AS_VarDP=92;AS_SOR=0.466938;SOR=0.467;AS_VQSLOD=2.6\u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245m54;InbreedingCoeff=-1.3113e-05;VRS_Allele=ga4gh:VA.tT2-U2WwLDM0r77vQwCu\u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245m3amz8fCkuVw_,ga4gh:VA.tRZlSaiaekR-f6PfTaQMeUQmv1uvJRoa\u001b[0m\n", + "\u001b[38;5;238m4003\u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245mchr1 18478618 . GAGGAAAGAAGGGAGGGAGGGAGGAAGGAAGGAAGGAAGGA G \u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245m . PASS QUALapprox=472443;SB=33801,6409,10273,221;MQ=55.3025;MQRan\u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245mkSum=-1.061;VarDP=51308;AS_ReadPosRankSum=1.666;AS_pab_max=0.359283;AS_\u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245mQD=13.5263;AS_MQ=55.8653;QD=9.20798;AS_MQRankSum=-0.38;FS=25.1505;AS_FS\u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245m=.;ReadPosRankSum=0.736;AS_QUALapprox=257;AS_SB_TABLE=33801,6409,5,2;AS\u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245m_VarDP=19;AS_SOR=0.134394;SOR=4.357;singleton;AS_VQSLOD=15.0936;Inbreed\u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245mingCoeff=-9.29832e-06;VRS_Allele=ga4gh:VA.53I40AWtmxlEbsaQwpYZ4s_if0qVY\u001b[0m\n", + "\u001b[38;5;238m \u001b[0m \u001b[38;5;238m│\u001b[0m \u001b[38;2;192;202;245mjOU,ga4gh:VA.Kne5ouJJ-vPwJthJa0EK00zOUt9drP7u\u001b[0m\n", + "\u001b[38;5;238m───────┴────────────────────────────────────────────────────────────────────────\u001b[0m\n" + ] + } + ], + "source": [ + "!bat --line-range=4000:4003 out.vcf" + ] + }, + { + "cell_type": "markdown", + "id": "b11fdf60-2db4-44eb-b2bc-1c3a424c3443", + "metadata": {}, + "source": [ + "We can dereference those IDs to retrieve the complete VRS allele:" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "ecd9ecc1-9dbb-49ed-b081-2e84c9a0a2e9", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "data": { + "text/plain": [ + "{'_id': 'ga4gh:VA.tT2-U2WwLDM0r77vQwCu3amz8fCkuVw_',\n", + " 'type': 'Allele',\n", + " 'location': {'type': 'SequenceLocation',\n", + " 'sequence_id': 'ga4gh:SQ.Ya6Rs7DHhDeg7YaOSg1EoNi3U_nQ9SvO',\n", + " 'interval': {'type': 'SequenceInterval',\n", + " 'start': {'type': 'Number', 'value': 18476813},\n", + " 'end': {'type': 'Number', 'value': 18476825}}},\n", + " 'state': {'type': 'LiteralSequenceExpression', 'sequence': 'ATTCATCTCTCC'}}" + ] + }, + "execution_count": 8, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "av.get_object(\"ga4gh:VA.tT2-U2WwLDM0r77vQwCu3amz8fCkuVw_\", True).as_dict()" + ] + }, + { + "cell_type": "markdown", + "id": "a4da9841-3b50-41f4-a2e5-843e10b41909", + "metadata": {}, + "source": [ + "### Search\n", + "\n", + "Currently, we support basic genomic region searches:" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "d4e51daf-3354-4187-b63d-4aa7e299f02f", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[{'_id': 'ga4gh:VA.Q19O8HhV1UnaYYAmcgmpcy1UDHkU4mdD',\n", + " 'type': 'Allele',\n", + " 'state': {'type': 'LiteralSequenceExpression', 'sequence': 'C'},\n", + " 'location': 'ga4gh:VSL.tWfR6n2aEy6patCt2DcWa7mf4UD6poT_'},\n", + " {'_id': 'ga4gh:VA.SwdQzWZyRDzJSVDKZCaa1BDX-zjCP8GJ',\n", + " 'type': 'Allele',\n", + " 'state': {'type': 'LiteralSequenceExpression',\n", + " 'sequence': 'TTTTTTTTTCTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTT'},\n", + " 'location': 'ga4gh:VSL.PpwQTE2qCqjlCILek9uZTnfXycki__tX'},\n", + " {'_id': 'ga4gh:VA.qihaf7S9gRb2fxvOA1OJ6ghcfr7OudaS',\n", + " 'type': 'Allele',\n", + " 'state': {'type': 'LiteralSequenceExpression', 'sequence': 'T'},\n", + " 'location': 'ga4gh:VSL.iJCaR2HHgifLaqbyK3CYik4XRKJUvwL8'},\n", + " {'_id': 'ga4gh:VA.Kqa1gjWWWfiuc54Ze2J170k9t0WPCUQN',\n", + " 'type': 'Allele',\n", + " 'state': {'type': 'LiteralSequenceExpression', 'sequence': 'C'},\n", + " 'location': 'ga4gh:VSL.iJCaR2HHgifLaqbyK3CYik4XRKJUvwL8'}]" + ] + }, + "execution_count": 9, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "chr4 = av.translator.get_sequence_id(\"NCBI:NC_000004.12\")\n", + "av.object_store.search_variations(chr4, 400000, 500000)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6eee0c2a-9007-47fd-a4e0-fe3892931add", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "anyvar", + "language": "python", + "name": "anyvar" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.13" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/src/anyvar/extras/vcf.py b/src/anyvar/extras/vcf.py new file mode 100644 index 0000000..5edb3f9 --- /dev/null +++ b/src/anyvar/extras/vcf.py @@ -0,0 +1,80 @@ +"""Support processing and manipulation of VCF objects.""" +import logging +from typing import Dict, List, Optional + +from ga4gh.vrs.extras.vcf_annotation import VCFAnnotator + +from anyvar.anyvar import AnyVar +from anyvar.translate.translate import TranslatorConnectionException + +_logger = logging.getLogger(__name__) + + +class VcfRegistrar(VCFAnnotator): + """Custom implementation of annotator class from VRS-Python. Rewrite some methods + and values in order to enable use of existing AnyVar translator. + """ + + def __init__(self, av: AnyVar) -> None: + """Initialize VCF processor. + + :param av: complete AnyVar instance + """ + self.av = av + + def annotate( + self, vcf_in: str, vcf_out: Optional[str] = None, + vrs_pickle_out: Optional[str] = None, assembly: str = "GRCh38", + ) -> None: + """Annotates an input VCF file with VRS Allele IDs & creates a pickle file + containing the vrs object information. + + :param vcf_in: The path for the input VCF file to annotate + :param vcf_out: The path for the output VCF file + :param vrs_pickle_out: The path for the output VCF pickle file + :param assembly: The assembly used in `vcf_in` data + """ + if self.av.object_store.batch_manager: + storage = self.av.object_store + with storage.batch_manager(storage): # type: ignore + return super().annotate(vcf_in, vcf_out, vrs_pickle_out, assembly) + else: + super().annotate(vcf_in, vcf_out, vrs_pickle_out, assembly) + + def _get_vrs_object( + self, vcf_coords: str, vrs_data: Dict, vrs_allele_ids: List[str], assembly: str, + vrs_data_key: Optional[str] = None, output_pickle: bool = True, + output_vcf: bool = False + ) -> None: + """Get VRS Object given `vcf_coords`. `vrs_data` and `vrs_allele_ids` will + be mutated. Generally, we expect AnyVar to use the output_vcf option rather than + the pickle file. + + :param vcf_coords: Allele to get VRS object for. Format is chr-pos-ref-alt + :param vrs_data: Dictionary containing the VRS object information for the VCF + :param vrs_allele_ids: List containing the VRS Allele IDs + :param assembly: The assembly used in `vcf_coords`. Not used by this + implementation -- GRCh38 is assumed. + :param vrs_data_key: The key to update in `vrs_data`. If not provided, will use + `vcf_coords` as the key. + :param output_pickle: `True` if VRS pickle file will be output. `False` + otherwise. + :param output_vcf: `True` if annotated VCF file will be output. `False` + otherwise. + :return: nothing, but registers VRS objects with AnyVar storage and stashes IDs + """ + try: + vrs_object = self.av.translator.translate_vcf_row(vcf_coords) + except (TranslatorConnectionException, NotImplementedError): + pass + else: + if vrs_object: + self.av.put_object(vrs_object) + if output_pickle: + key = vrs_data_key if vrs_data_key else vcf_coords + vrs_data[key] = str(vrs_object.as_dict()) + + if output_vcf: + vrs_allele_ids.append(vrs_object._id._value) + else: + _logger.error(f"Translation failed: {vcf_coords}") diff --git a/src/anyvar/storage/__init__.py b/src/anyvar/storage/__init__.py index 741fb34..4a7c676 100644 --- a/src/anyvar/storage/__init__.py +++ b/src/anyvar/storage/__init__.py @@ -2,6 +2,7 @@ from abc import abstractmethod from collections.abc import MutableMapping +from contextlib import AbstractContextManager from anyvar.restapi.schema import VariationStatisticType @@ -9,6 +10,8 @@ class _Storage(MutableMapping): """Define base storage backend class.""" + batch_manager = None + @abstractmethod def search_variations(self, ga4gh_accession_id: str, start: int, stop: int): """Find all registered variations in a provided genomic region @@ -31,3 +34,20 @@ def get_variation_count(self, variation_type: VariationStatisticType) -> int: @abstractmethod def wipe_db(self): """Empty database of all stored records.""" + + +class _BatchManager(AbstractContextManager): + """Base context management class for batch writing. + + Theoretically we could write batch methods without a context manager -- some DB + implementations have them natively -- but this makes it easier for us to ensure + transactions are properly handled. + """ + + @abstractmethod + def __init__(self, storage: _Storage): + """Initialize context manager. + + :param storage: Storage instance to manage. Should be taken from the active + AnyVar instance -- otherwise it won't be able to delay insertions. + """ diff --git a/src/anyvar/storage/postgres.py b/src/anyvar/storage/postgres.py index 6effa0f..92fbc1d 100644 --- a/src/anyvar/storage/postgres.py +++ b/src/anyvar/storage/postgres.py @@ -1,4 +1,5 @@ import json +import logging from typing import Any, Optional import psycopg @@ -8,24 +9,33 @@ from anyvar.restapi.schema import VariationStatisticType -from . import _Storage +from . import _BatchManager, _Storage silos = "locations alleles haplotypes genotypes variationsets relations texts".split() +_logger = logging.getLogger(__name__) + + class PostgresObjectStore(_Storage): """PostgreSQL storage backend. Currently, this is our recommended storage approach. """ - def __init__(self, db_url: str): + def __init__(self, db_url: str, batch_limit: int = 65536): """Initialize PostgreSQL DB handler. :param db_url: libpq connection info URL + :param batch_limit: max size of batch insert queue """ self.conn = psycopg.connect(db_url, autocommit=True) self.ensure_schema_exists() + self.batch_manager = PostgresBatchManager + self.batch_mode = False + self.batch_insert_values = [] + self.batch_limit = batch_limit + def _create_schema(self): """Add DB schema.""" create_statement = """ @@ -50,14 +60,23 @@ def __repr__(self): return str(self.conn) def __setitem__(self, name: str, value: Any): + """Add item to database. If batch mode is on, add item to queue and write only + if queue size is exceeded. + + :param name: value for `vrs_id` field + :param value: value for `vrs_object` field + """ assert is_pjs_instance(value), "ga4gh.vrs object value required" name = str(name) # in case str-like value_json = json.dumps(value.as_dict()) - with self.conn.cursor() as cur: - cur.execute( - "INSERT INTO vrs_objects (vrs_id, vrs_object) VALUES (%s, %s) ON CONFLICT DO NOTHING;", # noqa: E501 - [name, value_json] - ) + if self.batch_mode: + self.batch_insert_values.append((name, value_json)) + if len(self.batch_insert_values) > self.batch_limit: + self.copy_insert() + else: + insert_query = "INSERT INTO vrs_objects (vrs_id, vrs_object) VALUES (%s, %s) ON CONFLICT DO NOTHING;" # noqa: E501 + with self.conn.cursor() as cur: + cur.execute(insert_query, [name, value_json]) def __getitem__(self, name: str) -> Optional[Any]: """Fetch item from DB given key. @@ -208,9 +227,7 @@ def _insertion_count(self): def __iter__(self): with self.conn.cursor() as cur: - return cur.stream( - "SELECT * FROM vrs_objects;" - ) + return cur.stream("SELECT * FROM vrs_objects;") def keys(self): with self.conn.cursor() as cur: @@ -249,3 +266,75 @@ def wipe_db(self): """Remove all stored records from vrs_objects table.""" with self.conn.cursor() as cur: cur.execute("DELETE FROM vrs_objects;") + + def copy_insert(self): + """Perform copy-based insert, enabling much faster writes for large, repeated + insert statements, using insert parameters stored in `self.batch_insert_values`. + + Because we may be writing repeated records, we need to handle conflicts, which + isn't available for COPY. The workaround (https://stackoverflow.com/a/49836011) + is to make a temporary table for each COPY statement, and then handle + conflicts when moving data over from that table to vrs_objects. + """ + tmp_statement = "CREATE TEMP TABLE tmp_table (LIKE vrs_objects INCLUDING DEFAULTS);" # noqa: E501 + copy_statement = "COPY tmp_table (vrs_id, vrs_object) FROM STDIN;" + insert_statement = "INSERT INTO vrs_objects SELECT * FROM tmp_table ON CONFLICT DO NOTHING;" # noqa: E501 + drop_statement = "DROP TABLE tmp_table;" + with self.conn.cursor() as cur: + cur.execute(tmp_statement) + with cur.copy(copy_statement) as copy: + for row in self.batch_insert_values: + copy.write_row(row) + cur.execute(insert_statement) + cur.execute(drop_statement) + self.conn.commit() + self.batch_insert_values = [] + + +class PostgresBatchManager(_BatchManager): + """Context manager enabling batch insertion statements via Postgres COPY command. + + Use in cases like VCF ingest when intaking large amounts of data at once. + """ + + def __init__(self, storage: PostgresObjectStore): + """Initialize context manager. + + :param storage: Postgres instance to manage. Should be taken from the active + AnyVar instance -- otherwise it won't be able to delay insertions. + :raise ValueError: if `storage` param is not a `PostgresObjectStore` instance + """ + if not isinstance(storage, PostgresObjectStore): + raise ValueError( + "PostgresBatchManager requires a PostgresObjectStore instance" + ) + self._storage = storage + + def __enter__(self): + """Enter managed context.""" + self._storage.batch_insert_values = [] + self._storage.batch_mode = True + + def __exit__( + self, exc_type: Optional[type], exc_value: Optional[BaseException], + traceback: Optional[Any] + ) -> bool: + """Handle exit from context management. This method is responsible for + committing or rolling back any staged inserts. + + :param exc_type: type of exception encountered, if any + :param exc_value: exception value + :param traceback: traceback for context of exception + :return: True if no exceptions encountered, False otherwise + """ + if exc_type is not None: + self._storage.conn.rollback() + self._storage.batch_insert_values = [] + self._storage.batch_mode = False + _logger.error( + f"Postgres batch manager encountered exception {exc_type}: {exc_value}" + ) + return False + self._storage.copy_insert() + self._storage.batch_mode = False + return True