Skip to content

Commit

Permalink
feat: add gwas_catalog_preprocess dag (#291)
Browse files Browse the repository at this point in the history
* feat: gwas_catalog step stops at ingestion

* feat: gwas_catalog step stops at ingestion

* feat: add gwas_catalog_preprocess dag

* fix: change step_id to task_id as task_id

* feat: group gwas_catalog_preprocess tasks into sumstats and curation groups

* fix: add all dependencies when ld clumping

* fix: update gwas catalog docs

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* refactor(dag): extract releasebucket and sumstats paths as constants

* refactor: streamline study locus paths
  • Loading branch information
ireneisdoomed authored Dec 12, 2023
1 parent ee73572 commit 56ea0a9
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 23 deletions.
2 changes: 1 addition & 1 deletion config/datasets/gcp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ colocalisation: ${datasets.outputs}/colocalisation
v2g: ${datasets.outputs}/v2g
ld_index: ${datasets.outputs}/ld_index
catalog_study_index: ${datasets.outputs}/catalog_study_index
catalog_study_locus: ${datasets.study_locus}/catalog_study_locus
catalog_study_locus: ${datasets.study_locus}/catalog_curated
finngen_study_index: ${datasets.outputs}/finngen_study_index
finngen_summary_stats: ${datasets.outputs}/finngen_summary_stats
from_sumstats_study_locus: ${datasets.study_locus}/from_sumstats
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,5 @@ catalog_ancestry_files: ${datasets.catalog_ancestries}
catalog_associations_file: ${datasets.catalog_associations}
catalog_sumstats_lut: ${datasets.catalog_sumstats_lut}
variant_annotation_path: ${datasets.variant_annotation}
ld_index_path: ${datasets.ld_index}
catalog_studies_out: ${datasets.catalog_study_index}
catalog_associations_out: ${datasets.catalog_study_locus}
5 changes: 0 additions & 5 deletions docs/python_api/step/gwas_catalog.md

This file was deleted.

5 changes: 5 additions & 0 deletions docs/python_api/step/gwas_catalog_ingestion.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
title: GWAS Catalog
---

::: otg.gwas_catalog_ingestion.GWASCatalogIngestionStep
2 changes: 1 addition & 1 deletion src/airflow/dags/common_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ def submit_step(
task_id = step_id
return submit_pyspark_job(
cluster_name=cluster_name,
task_id=step_id,
task_id=task_id,
python_module_path=f"{INITIALISATION_BASE_PATH}/{PYTHON_CLI}",
trigger_rule=trigger_rule,
args=[f"step={step_id}"]
Expand Down
96 changes: 96 additions & 0 deletions src/airflow/dags/gwas_catalog_preprocess.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
"""Airflow DAG for the preprocessing of GWAS Catalog's harmonised summary statistics and curated associations."""
from __future__ import annotations

from pathlib import Path

import common_airflow as common
from airflow.models.dag import DAG
from airflow.utils.task_group import TaskGroup
from airflow.utils.trigger_rule import TriggerRule

CLUSTER_NAME = "otg-preprocess-gwascatalog"
AUTOSCALING = "otg-preprocess-gwascatalog"

SUMSTATS = "gs://open-targets-gwas-summary-stats/harmonised"
RELEASEBUCKET = "gs://genetics_etl_python_playground/output/python_etl/parquet/XX.XX"

with DAG(
dag_id=Path(__file__).stem,
description="Open Targets Genetics — GWAS Catalog preprocess",
default_args=common.shared_dag_args,
**common.shared_dag_kwargs,
):
with TaskGroup(group_id="summary_stats_preprocessing") as summary_stats_group:
summary_stats_window_clumping = common.submit_step(
cluster_name=CLUSTER_NAME,
step_id="clump",
task_id="catalog_sumstats_window_clumping",
other_args=[
f"step.input_path={SUMSTATS}",
f"step.clumped_study_locus_path={RELEASEBUCKET}/study_locus/window_clumped/from_sumstats/catalog",
],
)
summary_stats_ld_clumping = common.submit_step(
cluster_name=CLUSTER_NAME,
step_id="clump",
task_id="catalog_sumstats_ld_clumping",
other_args=[
f"step.input_path={RELEASEBUCKET}/study_locus/window_clumped/from_sumstats/catalog",
"step.ld_index_path={RELEASEBUCKET}/ld_index",
"step.study_index_path={RELEASEBUCKET}/study_index/catalog",
"step.clumped_study_locus_path={RELEASEBUCKET}/study_locus/ld_clumped/from_sumstats/catalog",
],
trigger_rule=TriggerRule.ALL_DONE,
)
summary_stats_pics = common.submit_step(
cluster_name=CLUSTER_NAME,
step_id="pics",
task_id="catalog_sumstats_pics",
other_args=[
"step.study_locus_ld_annotated_in={RELEASEBUCKET}/study_locus/ld_clumped/from_sumstats/catalog",
"step.picsed_study_locus_out={RELEASEBUCKET}/credible_set/from_sumstats/catalog",
],
trigger_rule=TriggerRule.ALL_DONE,
)
summary_stats_window_clumping >> summary_stats_ld_clumping >> summary_stats_pics

with TaskGroup(group_id="curation_preprocessing") as curation_group:
parse_study_and_curated_assocs = common.submit_step(
cluster_name=CLUSTER_NAME,
step_id="gwas_catalog_ingestion",
task_id="catalog_ingestion",
)

curation_ld_clumping = common.submit_step(
cluster_name=CLUSTER_NAME,
step_id="clump",
task_id="catalog_curation_ld_clumping",
other_args=[
"step.input_path={RELEASEBUCKET}/study_locus/catalog_curated",
"step.ld_index_path={RELEASEBUCKET}/ld_index",
"step.study_index_path={RELEASEBUCKET}/study_index/catalog",
"step.clumped_study_locus_path={RELEASEBUCKET}/study_locus/ld_clumped/catalog_curated",
],
trigger_rule=TriggerRule.ALL_DONE,
)

curation_pics = common.submit_step(
cluster_name=CLUSTER_NAME,
step_id="pics",
task_id="catalog_curation_pics",
other_args=[
"step.study_locus_ld_annotated_in={RELEASEBUCKET}/study_locus/ld_clumped/catalog_curated",
"step.picsed_study_locus_out={RELEASEBUCKET}/credible_set/catalog_curated",
],
trigger_rule=TriggerRule.ALL_DONE,
)
parse_study_and_curated_assocs >> curation_ld_clumping >> curation_pics

(
common.create_cluster(
CLUSTER_NAME, autoscaling_policy=AUTOSCALING, num_workers=5
)
>> common.install_dependencies(CLUSTER_NAME)
>> [summary_stats_group, curation_group]
>> common.delete_cluster(CLUSTER_NAME)
)
19 changes: 4 additions & 15 deletions src/otg/gwas_catalog.py → src/otg/gwas_catalog_ingestion.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,21 @@
"""Step to process GWAS Catalog associations."""
"""Step to process GWAS Catalog associations and study table."""
from __future__ import annotations

from dataclasses import dataclass

from omegaconf import MISSING

from otg.common.session import Session
from otg.dataset.ld_index import LDIndex
from otg.dataset.variant_annotation import VariantAnnotation
from otg.datasource.gwas_catalog.associations import (
GWASCatalogCuratedAssociationsParser,
)
from otg.datasource.gwas_catalog.study_index import StudyIndexGWASCatalogParser
from otg.datasource.gwas_catalog.study_splitter import GWASCatalogStudySplitter
from otg.method.ld import LDAnnotator
from otg.method.pics import PICS


@dataclass
class GWASCatalogStep:
class GWASCatalogIngestionStep:
"""GWAS Catalog ingestion step to extract GWASCatalog Study and StudyLocus tables.
!!!note This step currently only processes the GWAS Catalog curated list of top hits.
Expand All @@ -31,7 +28,6 @@ class GWASCatalogStep:
catalog_associations_file (str): Raw GWAS catalog associations file.
variant_annotation_path (str): Input variant annotation path.
ld_populations (list): List of populations to include.
min_r2 (float): Minimum r2 to consider when considering variants within a window.
catalog_studies_out (str): Output GWAS catalog studies path.
catalog_associations_out (str): Output GWAS catalog associations path.
"""
Expand All @@ -42,12 +38,10 @@ class GWASCatalogStep:
catalog_sumstats_lut: str = MISSING
catalog_associations_file: str = MISSING
variant_annotation_path: str = MISSING
ld_index_path: str = MISSING
min_r2: float = 0.5
catalog_studies_out: str = MISSING
catalog_associations_out: str = MISSING

def __post_init__(self: GWASCatalogStep) -> None:
def __post_init__(self: GWASCatalogIngestionStep) -> None:
"""Run step."""
# Extract
va = VariantAnnotation.from_parquet(self.session, self.variant_annotation_path)
Expand All @@ -63,7 +57,6 @@ def __post_init__(self: GWASCatalogStep) -> None:
catalog_associations = self.session.spark.read.csv(
self.catalog_associations_file, sep="\t", header=True
).persist()
ld_index = LDIndex.from_parquet(self.session, self.ld_index_path)

# Transform
study_index, study_locus = GWASCatalogStudySplitter.split(
Expand All @@ -72,15 +65,11 @@ def __post_init__(self: GWASCatalogStep) -> None:
),
GWASCatalogCuratedAssociationsParser.from_source(catalog_associations, va),
)
study_locus_ld = LDAnnotator.ld_annotate(
study_locus, study_index, ld_index
).clump()
finemapped_study_locus = PICS.finemap(study_locus_ld).annotate_credible_sets()

# Load
study_index.df.write.mode(self.session.write_mode).parquet(
self.catalog_studies_out
)
finemapped_study_locus.df.write.mode(self.session.write_mode).parquet(
study_locus.df.write.mode(self.session.write_mode).parquet(
self.catalog_associations_out
)

0 comments on commit 56ea0a9

Please sign in to comment.