diff --git a/config/datasets/gcp.yaml b/config/datasets/gcp.yaml index 00af8d8e8..9955ea0d2 100644 --- a/config/datasets/gcp.yaml +++ b/config/datasets/gcp.yaml @@ -37,7 +37,7 @@ colocalisation: ${datasets.outputs}/colocalisation v2g: ${datasets.outputs}/v2g ld_index: ${datasets.outputs}/ld_index catalog_study_index: ${datasets.outputs}/catalog_study_index -catalog_study_locus: ${datasets.study_locus}/catalog_study_locus +catalog_study_locus: ${datasets.study_locus}/catalog_curated finngen_study_index: ${datasets.outputs}/finngen_study_index finngen_summary_stats: ${datasets.outputs}/finngen_summary_stats from_sumstats_study_locus: ${datasets.study_locus}/from_sumstats diff --git a/config/step/gwas_catalog.yaml b/config/step/gwas_catalog_ingestion.yaml similarity index 92% rename from config/step/gwas_catalog.yaml rename to config/step/gwas_catalog_ingestion.yaml index ea357eb72..863c75ac2 100644 --- a/config/step/gwas_catalog.yaml +++ b/config/step/gwas_catalog_ingestion.yaml @@ -4,6 +4,5 @@ catalog_ancestry_files: ${datasets.catalog_ancestries} catalog_associations_file: ${datasets.catalog_associations} catalog_sumstats_lut: ${datasets.catalog_sumstats_lut} variant_annotation_path: ${datasets.variant_annotation} -ld_index_path: ${datasets.ld_index} catalog_studies_out: ${datasets.catalog_study_index} catalog_associations_out: ${datasets.catalog_study_locus} diff --git a/docs/python_api/step/gwas_catalog.md b/docs/python_api/step/gwas_catalog.md deleted file mode 100644 index aa20adfaa..000000000 --- a/docs/python_api/step/gwas_catalog.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -title: GWAS Catalog ---- - -::: otg.gwas_catalog.GWASCatalogStep diff --git a/docs/python_api/step/gwas_catalog_ingestion.md b/docs/python_api/step/gwas_catalog_ingestion.md new file mode 100644 index 000000000..ecac99f8c --- /dev/null +++ b/docs/python_api/step/gwas_catalog_ingestion.md @@ -0,0 +1,5 @@ +--- +title: GWAS Catalog +--- + +::: otg.gwas_catalog_ingestion.GWASCatalogIngestionStep diff --git a/src/airflow/dags/common_airflow.py b/src/airflow/dags/common_airflow.py index 7fde7ebe4..4b5250450 100644 --- a/src/airflow/dags/common_airflow.py +++ b/src/airflow/dags/common_airflow.py @@ -219,7 +219,7 @@ def submit_step( task_id = step_id return submit_pyspark_job( cluster_name=cluster_name, - task_id=step_id, + task_id=task_id, python_module_path=f"{INITIALISATION_BASE_PATH}/{PYTHON_CLI}", trigger_rule=trigger_rule, args=[f"step={step_id}"] diff --git a/src/airflow/dags/gwas_catalog_preprocess.py b/src/airflow/dags/gwas_catalog_preprocess.py new file mode 100644 index 000000000..e21dcd9bd --- /dev/null +++ b/src/airflow/dags/gwas_catalog_preprocess.py @@ -0,0 +1,96 @@ +"""Airflow DAG for the preprocessing of GWAS Catalog's harmonised summary statistics and curated associations.""" +from __future__ import annotations + +from pathlib import Path + +import common_airflow as common +from airflow.models.dag import DAG +from airflow.utils.task_group import TaskGroup +from airflow.utils.trigger_rule import TriggerRule + +CLUSTER_NAME = "otg-preprocess-gwascatalog" +AUTOSCALING = "otg-preprocess-gwascatalog" + +SUMSTATS = "gs://open-targets-gwas-summary-stats/harmonised" +RELEASEBUCKET = "gs://genetics_etl_python_playground/output/python_etl/parquet/XX.XX" + +with DAG( + dag_id=Path(__file__).stem, + description="Open Targets Genetics — GWAS Catalog preprocess", + default_args=common.shared_dag_args, + **common.shared_dag_kwargs, +): + with TaskGroup(group_id="summary_stats_preprocessing") as summary_stats_group: + summary_stats_window_clumping = common.submit_step( + cluster_name=CLUSTER_NAME, + step_id="clump", + task_id="catalog_sumstats_window_clumping", + other_args=[ + f"step.input_path={SUMSTATS}", + f"step.clumped_study_locus_path={RELEASEBUCKET}/study_locus/window_clumped/from_sumstats/catalog", + ], + ) + summary_stats_ld_clumping = common.submit_step( + cluster_name=CLUSTER_NAME, + step_id="clump", + task_id="catalog_sumstats_ld_clumping", + other_args=[ + f"step.input_path={RELEASEBUCKET}/study_locus/window_clumped/from_sumstats/catalog", + "step.ld_index_path={RELEASEBUCKET}/ld_index", + "step.study_index_path={RELEASEBUCKET}/study_index/catalog", + "step.clumped_study_locus_path={RELEASEBUCKET}/study_locus/ld_clumped/from_sumstats/catalog", + ], + trigger_rule=TriggerRule.ALL_DONE, + ) + summary_stats_pics = common.submit_step( + cluster_name=CLUSTER_NAME, + step_id="pics", + task_id="catalog_sumstats_pics", + other_args=[ + "step.study_locus_ld_annotated_in={RELEASEBUCKET}/study_locus/ld_clumped/from_sumstats/catalog", + "step.picsed_study_locus_out={RELEASEBUCKET}/credible_set/from_sumstats/catalog", + ], + trigger_rule=TriggerRule.ALL_DONE, + ) + summary_stats_window_clumping >> summary_stats_ld_clumping >> summary_stats_pics + + with TaskGroup(group_id="curation_preprocessing") as curation_group: + parse_study_and_curated_assocs = common.submit_step( + cluster_name=CLUSTER_NAME, + step_id="gwas_catalog_ingestion", + task_id="catalog_ingestion", + ) + + curation_ld_clumping = common.submit_step( + cluster_name=CLUSTER_NAME, + step_id="clump", + task_id="catalog_curation_ld_clumping", + other_args=[ + "step.input_path={RELEASEBUCKET}/study_locus/catalog_curated", + "step.ld_index_path={RELEASEBUCKET}/ld_index", + "step.study_index_path={RELEASEBUCKET}/study_index/catalog", + "step.clumped_study_locus_path={RELEASEBUCKET}/study_locus/ld_clumped/catalog_curated", + ], + trigger_rule=TriggerRule.ALL_DONE, + ) + + curation_pics = common.submit_step( + cluster_name=CLUSTER_NAME, + step_id="pics", + task_id="catalog_curation_pics", + other_args=[ + "step.study_locus_ld_annotated_in={RELEASEBUCKET}/study_locus/ld_clumped/catalog_curated", + "step.picsed_study_locus_out={RELEASEBUCKET}/credible_set/catalog_curated", + ], + trigger_rule=TriggerRule.ALL_DONE, + ) + parse_study_and_curated_assocs >> curation_ld_clumping >> curation_pics + + ( + common.create_cluster( + CLUSTER_NAME, autoscaling_policy=AUTOSCALING, num_workers=5 + ) + >> common.install_dependencies(CLUSTER_NAME) + >> [summary_stats_group, curation_group] + >> common.delete_cluster(CLUSTER_NAME) + ) diff --git a/src/otg/gwas_catalog.py b/src/otg/gwas_catalog_ingestion.py similarity index 79% rename from src/otg/gwas_catalog.py rename to src/otg/gwas_catalog_ingestion.py index 0422e3613..bea0bbed2 100644 --- a/src/otg/gwas_catalog.py +++ b/src/otg/gwas_catalog_ingestion.py @@ -1,4 +1,4 @@ -"""Step to process GWAS Catalog associations.""" +"""Step to process GWAS Catalog associations and study table.""" from __future__ import annotations from dataclasses import dataclass @@ -6,19 +6,16 @@ from omegaconf import MISSING from otg.common.session import Session -from otg.dataset.ld_index import LDIndex from otg.dataset.variant_annotation import VariantAnnotation from otg.datasource.gwas_catalog.associations import ( GWASCatalogCuratedAssociationsParser, ) from otg.datasource.gwas_catalog.study_index import StudyIndexGWASCatalogParser from otg.datasource.gwas_catalog.study_splitter import GWASCatalogStudySplitter -from otg.method.ld import LDAnnotator -from otg.method.pics import PICS @dataclass -class GWASCatalogStep: +class GWASCatalogIngestionStep: """GWAS Catalog ingestion step to extract GWASCatalog Study and StudyLocus tables. !!!note This step currently only processes the GWAS Catalog curated list of top hits. @@ -31,7 +28,6 @@ class GWASCatalogStep: catalog_associations_file (str): Raw GWAS catalog associations file. variant_annotation_path (str): Input variant annotation path. ld_populations (list): List of populations to include. - min_r2 (float): Minimum r2 to consider when considering variants within a window. catalog_studies_out (str): Output GWAS catalog studies path. catalog_associations_out (str): Output GWAS catalog associations path. """ @@ -42,12 +38,10 @@ class GWASCatalogStep: catalog_sumstats_lut: str = MISSING catalog_associations_file: str = MISSING variant_annotation_path: str = MISSING - ld_index_path: str = MISSING - min_r2: float = 0.5 catalog_studies_out: str = MISSING catalog_associations_out: str = MISSING - def __post_init__(self: GWASCatalogStep) -> None: + def __post_init__(self: GWASCatalogIngestionStep) -> None: """Run step.""" # Extract va = VariantAnnotation.from_parquet(self.session, self.variant_annotation_path) @@ -63,7 +57,6 @@ def __post_init__(self: GWASCatalogStep) -> None: catalog_associations = self.session.spark.read.csv( self.catalog_associations_file, sep="\t", header=True ).persist() - ld_index = LDIndex.from_parquet(self.session, self.ld_index_path) # Transform study_index, study_locus = GWASCatalogStudySplitter.split( @@ -72,15 +65,11 @@ def __post_init__(self: GWASCatalogStep) -> None: ), GWASCatalogCuratedAssociationsParser.from_source(catalog_associations, va), ) - study_locus_ld = LDAnnotator.ld_annotate( - study_locus, study_index, ld_index - ).clump() - finemapped_study_locus = PICS.finemap(study_locus_ld).annotate_credible_sets() # Load study_index.df.write.mode(self.session.write_mode).parquet( self.catalog_studies_out ) - finemapped_study_locus.df.write.mode(self.session.write_mode).parquet( + study_locus.df.write.mode(self.session.write_mode).parquet( self.catalog_associations_out )