From 9b1666ea51ac2b62454f8e2e9603c5e7b0920536 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Irene=20L=C3=B3pez?= Date: Tue, 19 Dec 2023 10:58:58 +0000 Subject: [PATCH 1/5] feat(eqtl): add preprocessing dag --- src/airflow/dags/eqtl_catalogue_preprocess.py | 70 +++++++++++++++++++ ...talogue.py => eqtl_catalogue_ingestion.py} | 0 2 files changed, 70 insertions(+) create mode 100644 src/airflow/dags/eqtl_catalogue_preprocess.py rename src/otg/{eqtl_catalogue.py => eqtl_catalogue_ingestion.py} (100%) diff --git a/src/airflow/dags/eqtl_catalogue_preprocess.py b/src/airflow/dags/eqtl_catalogue_preprocess.py new file mode 100644 index 000000000..efadcc45e --- /dev/null +++ b/src/airflow/dags/eqtl_catalogue_preprocess.py @@ -0,0 +1,70 @@ +"""Airflow DAG for the preprocessing of eQTL Catalogue's harmonised summary statistics and study table.""" +from __future__ import annotations + +from pathlib import Path + +import common_airflow as common +from airflow.models.dag import DAG +from airflow.utils.trigger_rule import TriggerRule + +CLUSTER_NAME = "otg-preprocess-eqtl-catalogue" +AUTOSCALING = "otg-etl" + +SUMSTATS = "gs://genetics_etl_python_playground/output/python_etl/parquet/XX.XX/preprocess/eqtl_catalogue/summary_stats" +RELEASEBUCKET = "gs://genetics_etl_python_playground/output/python_etl/parquet/XX.XX" + +with DAG( + dag_id=Path(__file__).stem, + description="Open Targets Genetics — eQTL Catalogue preprocess", + default_args=common.shared_dag_args, + **common.shared_dag_kwargs, +): + parse_study_and_sumstats = common.submit_step( + cluster_name=CLUSTER_NAME, + step_id="eqtl_catalogue_ingestion", + task_id="eqtl_catalogue_ingestion", + ) + summary_stats_window_clumping = common.submit_step( + cluster_name=CLUSTER_NAME, + step_id="clump", + task_id="sumstats_window_clumping", + other_args=[ + f"step.input_path={SUMSTATS}", + f"step.clumped_study_locus_path={RELEASEBUCKET}/study_locus/window_clumped/from_sumstats/eqtl_catalogue", + ], + ) + ld_clumping = common.submit_step( + cluster_name=CLUSTER_NAME, + step_id="clump", + task_id="ld_clumping", + other_args=[ + "step.input_path={RELEASEBUCKET}/study_locus/window_clumped/from_sumstats/eqtl_catalogue", + "step.ld_index_path={RELEASEBUCKET}/ld_index", + "step.study_index_path={RELEASEBUCKET}/study_index/eqtl_catalogue", + "step.clumped_study_locus_path={RELEASEBUCKET}/study_locus/ld_clumped/from_sumstats/eqtl_catalogue", + ], + trigger_rule=TriggerRule.ALL_DONE, + ) + + pics = common.submit_step( + cluster_name=CLUSTER_NAME, + step_id="pics", + task_id="pics", + other_args=[ + "step.study_locus_ld_annotated_in={RELEASEBUCKET}/study_locus/ld_clumped/from_sumstats/eqtl_catalogue", + "step.picsed_study_locus_out={RELEASEBUCKET}/credible_set/from_sumstats_study_locus/eqtl_catalogue", + ], + trigger_rule=TriggerRule.ALL_DONE, + ) + + ( + common.create_cluster( + CLUSTER_NAME, autoscaling_policy=AUTOSCALING, num_workers=5 + ) + >> common.install_dependencies(CLUSTER_NAME) + >> parse_study_and_sumstats + >> summary_stats_window_clumping + >> ld_clumping + >> pics + >> common.delete_cluster(CLUSTER_NAME) + ) diff --git a/src/otg/eqtl_catalogue.py b/src/otg/eqtl_catalogue_ingestion.py similarity index 100% rename from src/otg/eqtl_catalogue.py rename to src/otg/eqtl_catalogue_ingestion.py From 5dbebce1c7c3e33cb3c02e1b69c07c3d9411723b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Irene=20L=C3=B3pez?= Date: Tue, 19 Dec 2023 12:31:06 +0000 Subject: [PATCH 2/5] docs: propagate changes to docs --- docs/python_api/step/eqtl_catalogue.md | 5 ----- docs/python_api/step/eqtl_catalogue_ingestion.md | 5 +++++ src/otg/eqtl_catalogue_ingestion.py | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) delete mode 100644 docs/python_api/step/eqtl_catalogue.md create mode 100644 docs/python_api/step/eqtl_catalogue_ingestion.md diff --git a/docs/python_api/step/eqtl_catalogue.md b/docs/python_api/step/eqtl_catalogue.md deleted file mode 100644 index e152c8ac8..000000000 --- a/docs/python_api/step/eqtl_catalogue.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -title: eQTL Catalogue ---- - -::: otg.eqtl_catalogue.EqtlCatalogueStep diff --git a/docs/python_api/step/eqtl_catalogue_ingestion.md b/docs/python_api/step/eqtl_catalogue_ingestion.md new file mode 100644 index 000000000..dfd6e3568 --- /dev/null +++ b/docs/python_api/step/eqtl_catalogue_ingestion.md @@ -0,0 +1,5 @@ +--- +title: eQTL Catalogue +--- + +::: otg.eqtl_catalogue_ingestion.EqtlCatalogueIngestionStep diff --git a/src/otg/eqtl_catalogue_ingestion.py b/src/otg/eqtl_catalogue_ingestion.py index c57f95ed3..e03255676 100644 --- a/src/otg/eqtl_catalogue_ingestion.py +++ b/src/otg/eqtl_catalogue_ingestion.py @@ -12,7 +12,7 @@ @dataclass -class EqtlCatalogueStep: +class EqtlCatalogueIngestionStep: """eQTL Catalogue ingestion step. Attributes: @@ -28,7 +28,7 @@ class EqtlCatalogueStep: eqtl_catalogue_study_index_out: str = MISSING eqtl_catalogue_summary_stats_out: str = MISSING - def __post_init__(self: EqtlCatalogueStep) -> None: + def __post_init__(self: EqtlCatalogueIngestionStep) -> None: """Run step.""" # Fetch study index. df = self.session.spark.read.option("delimiter", "\t").csv( From 84510ac11977050d9f2dfc8d99b54f54f332bae8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Irene=20L=C3=B3pez?= Date: Tue, 19 Dec 2023 12:40:50 +0000 Subject: [PATCH 3/5] chore: add eqtl configs --- config/datasets/gcp.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/config/datasets/gcp.yaml b/config/datasets/gcp.yaml index c198b4d96..f62329110 100644 --- a/config/datasets/gcp.yaml +++ b/config/datasets/gcp.yaml @@ -46,5 +46,7 @@ finngen_summary_stats: ${datasets.summary_statistics}/finngen from_sumstats_study_locus: ${datasets.study_locus}/from_sumstats from_sumstats_pics: ${datasets.credible_set}/from_sumstats ukbiobank_study_index: ${datasets.study_index}/ukbiobank +eqtl_catalogue_study_index_out: ${datasets.study_index}/eqtl_catalogue +eqtl_catalogue_summary_stats_out: ${datasets.outputs}/preprocess/eqtl_catalogue/summary_stats l2g_model: ${datasets.outputs}/l2g_model l2g_predictions: ${datasets.outputs}/l2g_predictions From f1b5cd59e60a9065c5f96fe11f5c305821f59af2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Irene=20L=C3=B3pez?= Date: Wed, 20 Dec 2023 11:36:24 +0100 Subject: [PATCH 4/5] chore(airflow): update `otg_version` --- src/airflow/dags/common_airflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/airflow/dags/common_airflow.py b/src/airflow/dags/common_airflow.py index 476487e5e..3e42a4906 100644 --- a/src/airflow/dags/common_airflow.py +++ b/src/airflow/dags/common_airflow.py @@ -19,7 +19,7 @@ from pathlib import Path # Code version. It has to be repeated here as well as in `pyproject.toml`, because Airflow isn't able to look at files outside of its `dags/` directory. -OTG_VERSION = "1.0.0" +OTG_VERSION = "0.0.0" # Cloud configuration. From 66ac91cd08e5051fab61d8e5dcbaa699aaaf52e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Irene=20L=C3=B3pez?= Date: Wed, 20 Dec 2023 12:47:55 +0100 Subject: [PATCH 5/5] chore: remove `eqtl_catalogue` from `dag_preprocess` --- .../step/{eqtl_catalogue.yaml => eqtl_catalogue_ingestion.yaml} | 2 +- src/airflow/dags/dag_preprocess.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) rename config/step/{eqtl_catalogue.yaml => eqtl_catalogue_ingestion.yaml} (80%) diff --git a/config/step/eqtl_catalogue.yaml b/config/step/eqtl_catalogue_ingestion.yaml similarity index 80% rename from config/step/eqtl_catalogue.yaml rename to config/step/eqtl_catalogue_ingestion.yaml index 04a958993..c36b9e286 100644 --- a/config/step/eqtl_catalogue.yaml +++ b/config/step/eqtl_catalogue_ingestion.yaml @@ -1,4 +1,4 @@ -_target_: otg.eqtl_catalogue.EqtlCatalogueStep +_target_: otg.eqtl_catalogue.EqtlCatalogueIngestionStep eqtl_catalogue_paths_imported: ${datasets.eqtl_catalogue_paths_imported} eqtl_catalogue_study_index_out: ${datasets.eqtl_catalogue_study_index_out} eqtl_catalogue_summary_stats_out: ${datasets.eqtl_catalogue_summary_stats_out} diff --git a/src/airflow/dags/dag_preprocess.py b/src/airflow/dags/dag_preprocess.py index f0dde085e..691a207df 100644 --- a/src/airflow/dags/dag_preprocess.py +++ b/src/airflow/dags/dag_preprocess.py @@ -10,7 +10,6 @@ ALL_STEPS = [ "finngen", - "eqtl_catalogue", "ld_index", "variant_annotation", "ukbiobank",