diff --git a/config/datasets/gcp.yaml b/config/datasets/gcp.yaml index c198b4d96..f62329110 100644 --- a/config/datasets/gcp.yaml +++ b/config/datasets/gcp.yaml @@ -46,5 +46,7 @@ finngen_summary_stats: ${datasets.summary_statistics}/finngen from_sumstats_study_locus: ${datasets.study_locus}/from_sumstats from_sumstats_pics: ${datasets.credible_set}/from_sumstats ukbiobank_study_index: ${datasets.study_index}/ukbiobank +eqtl_catalogue_study_index_out: ${datasets.study_index}/eqtl_catalogue +eqtl_catalogue_summary_stats_out: ${datasets.outputs}/preprocess/eqtl_catalogue/summary_stats l2g_model: ${datasets.outputs}/l2g_model l2g_predictions: ${datasets.outputs}/l2g_predictions diff --git a/config/step/eqtl_catalogue.yaml b/config/step/eqtl_catalogue_ingestion.yaml similarity index 80% rename from config/step/eqtl_catalogue.yaml rename to config/step/eqtl_catalogue_ingestion.yaml index 04a958993..c36b9e286 100644 --- a/config/step/eqtl_catalogue.yaml +++ b/config/step/eqtl_catalogue_ingestion.yaml @@ -1,4 +1,4 @@ -_target_: otg.eqtl_catalogue.EqtlCatalogueStep +_target_: otg.eqtl_catalogue.EqtlCatalogueIngestionStep eqtl_catalogue_paths_imported: ${datasets.eqtl_catalogue_paths_imported} eqtl_catalogue_study_index_out: ${datasets.eqtl_catalogue_study_index_out} eqtl_catalogue_summary_stats_out: ${datasets.eqtl_catalogue_summary_stats_out} diff --git a/docs/python_api/step/eqtl_catalogue.md b/docs/python_api/step/eqtl_catalogue.md deleted file mode 100644 index e152c8ac8..000000000 --- a/docs/python_api/step/eqtl_catalogue.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -title: eQTL Catalogue ---- - -::: otg.eqtl_catalogue.EqtlCatalogueStep diff --git a/docs/python_api/step/eqtl_catalogue_ingestion.md b/docs/python_api/step/eqtl_catalogue_ingestion.md new file mode 100644 index 000000000..dfd6e3568 --- /dev/null +++ b/docs/python_api/step/eqtl_catalogue_ingestion.md @@ -0,0 +1,5 @@ +--- +title: eQTL Catalogue +--- + +::: otg.eqtl_catalogue_ingestion.EqtlCatalogueIngestionStep diff --git a/src/airflow/dags/common_airflow.py b/src/airflow/dags/common_airflow.py index 476487e5e..3e42a4906 100644 --- a/src/airflow/dags/common_airflow.py +++ b/src/airflow/dags/common_airflow.py @@ -19,7 +19,7 @@ from pathlib import Path # Code version. It has to be repeated here as well as in `pyproject.toml`, because Airflow isn't able to look at files outside of its `dags/` directory. -OTG_VERSION = "1.0.0" +OTG_VERSION = "0.0.0" # Cloud configuration. diff --git a/src/airflow/dags/dag_preprocess.py b/src/airflow/dags/dag_preprocess.py index f0dde085e..691a207df 100644 --- a/src/airflow/dags/dag_preprocess.py +++ b/src/airflow/dags/dag_preprocess.py @@ -10,7 +10,6 @@ ALL_STEPS = [ "finngen", - "eqtl_catalogue", "ld_index", "variant_annotation", "ukbiobank", diff --git a/src/airflow/dags/eqtl_catalogue_preprocess.py b/src/airflow/dags/eqtl_catalogue_preprocess.py new file mode 100644 index 000000000..efadcc45e --- /dev/null +++ b/src/airflow/dags/eqtl_catalogue_preprocess.py @@ -0,0 +1,70 @@ +"""Airflow DAG for the preprocessing of eQTL Catalogue's harmonised summary statistics and study table.""" +from __future__ import annotations + +from pathlib import Path + +import common_airflow as common +from airflow.models.dag import DAG +from airflow.utils.trigger_rule import TriggerRule + +CLUSTER_NAME = "otg-preprocess-eqtl-catalogue" +AUTOSCALING = "otg-etl" + +SUMSTATS = "gs://genetics_etl_python_playground/output/python_etl/parquet/XX.XX/preprocess/eqtl_catalogue/summary_stats" +RELEASEBUCKET = "gs://genetics_etl_python_playground/output/python_etl/parquet/XX.XX" + +with DAG( + dag_id=Path(__file__).stem, + description="Open Targets Genetics — eQTL Catalogue preprocess", + default_args=common.shared_dag_args, + **common.shared_dag_kwargs, +): + parse_study_and_sumstats = common.submit_step( + cluster_name=CLUSTER_NAME, + step_id="eqtl_catalogue_ingestion", + task_id="eqtl_catalogue_ingestion", + ) + summary_stats_window_clumping = common.submit_step( + cluster_name=CLUSTER_NAME, + step_id="clump", + task_id="sumstats_window_clumping", + other_args=[ + f"step.input_path={SUMSTATS}", + f"step.clumped_study_locus_path={RELEASEBUCKET}/study_locus/window_clumped/from_sumstats/eqtl_catalogue", + ], + ) + ld_clumping = common.submit_step( + cluster_name=CLUSTER_NAME, + step_id="clump", + task_id="ld_clumping", + other_args=[ + "step.input_path={RELEASEBUCKET}/study_locus/window_clumped/from_sumstats/eqtl_catalogue", + "step.ld_index_path={RELEASEBUCKET}/ld_index", + "step.study_index_path={RELEASEBUCKET}/study_index/eqtl_catalogue", + "step.clumped_study_locus_path={RELEASEBUCKET}/study_locus/ld_clumped/from_sumstats/eqtl_catalogue", + ], + trigger_rule=TriggerRule.ALL_DONE, + ) + + pics = common.submit_step( + cluster_name=CLUSTER_NAME, + step_id="pics", + task_id="pics", + other_args=[ + "step.study_locus_ld_annotated_in={RELEASEBUCKET}/study_locus/ld_clumped/from_sumstats/eqtl_catalogue", + "step.picsed_study_locus_out={RELEASEBUCKET}/credible_set/from_sumstats_study_locus/eqtl_catalogue", + ], + trigger_rule=TriggerRule.ALL_DONE, + ) + + ( + common.create_cluster( + CLUSTER_NAME, autoscaling_policy=AUTOSCALING, num_workers=5 + ) + >> common.install_dependencies(CLUSTER_NAME) + >> parse_study_and_sumstats + >> summary_stats_window_clumping + >> ld_clumping + >> pics + >> common.delete_cluster(CLUSTER_NAME) + ) diff --git a/src/otg/eqtl_catalogue.py b/src/otg/eqtl_catalogue_ingestion.py similarity index 96% rename from src/otg/eqtl_catalogue.py rename to src/otg/eqtl_catalogue_ingestion.py index c57f95ed3..e03255676 100644 --- a/src/otg/eqtl_catalogue.py +++ b/src/otg/eqtl_catalogue_ingestion.py @@ -12,7 +12,7 @@ @dataclass -class EqtlCatalogueStep: +class EqtlCatalogueIngestionStep: """eQTL Catalogue ingestion step. Attributes: @@ -28,7 +28,7 @@ class EqtlCatalogueStep: eqtl_catalogue_study_index_out: str = MISSING eqtl_catalogue_summary_stats_out: str = MISSING - def __post_init__(self: EqtlCatalogueStep) -> None: + def __post_init__(self: EqtlCatalogueIngestionStep) -> None: """Run step.""" # Fetch study index. df = self.session.spark.read.option("delimiter", "\t").csv(