Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(eqtl): add preprocessing dag #366

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config/datasets/gcp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,7 @@ finngen_summary_stats: ${datasets.summary_statistics}/finngen
from_sumstats_study_locus: ${datasets.study_locus}/from_sumstats
from_sumstats_pics: ${datasets.credible_set}/from_sumstats
ukbiobank_study_index: ${datasets.study_index}/ukbiobank
eqtl_catalogue_study_index_out: ${datasets.study_index}/eqtl_catalogue
eqtl_catalogue_summary_stats_out: ${datasets.outputs}/preprocess/eqtl_catalogue/summary_stats
l2g_model: ${datasets.outputs}/l2g_model
l2g_predictions: ${datasets.outputs}/l2g_predictions
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
_target_: otg.eqtl_catalogue.EqtlCatalogueStep
_target_: otg.eqtl_catalogue.EqtlCatalogueIngestionStep
eqtl_catalogue_paths_imported: ${datasets.eqtl_catalogue_paths_imported}
eqtl_catalogue_study_index_out: ${datasets.eqtl_catalogue_study_index_out}
eqtl_catalogue_summary_stats_out: ${datasets.eqtl_catalogue_summary_stats_out}
5 changes: 0 additions & 5 deletions docs/python_api/step/eqtl_catalogue.md

This file was deleted.

5 changes: 5 additions & 0 deletions docs/python_api/step/eqtl_catalogue_ingestion.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
title: eQTL Catalogue
---

::: otg.eqtl_catalogue_ingestion.EqtlCatalogueIngestionStep
2 changes: 1 addition & 1 deletion src/airflow/dags/common_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from pathlib import Path

# Code version. It has to be repeated here as well as in `pyproject.toml`, because Airflow isn't able to look at files outside of its `dags/` directory.
OTG_VERSION = "1.0.0"
OTG_VERSION = "0.0.0"


# Cloud configuration.
Expand Down
1 change: 0 additions & 1 deletion src/airflow/dags/dag_preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

ALL_STEPS = [
"finngen",
"eqtl_catalogue",
"ld_index",
"variant_annotation",
"ukbiobank",
Expand Down
70 changes: 70 additions & 0 deletions src/airflow/dags/eqtl_catalogue_preprocess.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""Airflow DAG for the preprocessing of eQTL Catalogue's harmonised summary statistics and study table."""
from __future__ import annotations

from pathlib import Path

import common_airflow as common
from airflow.models.dag import DAG
from airflow.utils.trigger_rule import TriggerRule

CLUSTER_NAME = "otg-preprocess-eqtl-catalogue"
AUTOSCALING = "otg-etl"

SUMSTATS = "gs://genetics_etl_python_playground/output/python_etl/parquet/XX.XX/preprocess/eqtl_catalogue/summary_stats"
RELEASEBUCKET = "gs://genetics_etl_python_playground/output/python_etl/parquet/XX.XX"

with DAG(
dag_id=Path(__file__).stem,
description="Open Targets Genetics — eQTL Catalogue preprocess",
default_args=common.shared_dag_args,
**common.shared_dag_kwargs,
):
parse_study_and_sumstats = common.submit_step(
cluster_name=CLUSTER_NAME,
step_id="eqtl_catalogue_ingestion",
task_id="eqtl_catalogue_ingestion",
)
summary_stats_window_clumping = common.submit_step(
cluster_name=CLUSTER_NAME,
step_id="clump",
task_id="sumstats_window_clumping",
other_args=[
f"step.input_path={SUMSTATS}",
f"step.clumped_study_locus_path={RELEASEBUCKET}/study_locus/window_clumped/from_sumstats/eqtl_catalogue",
],
)
ld_clumping = common.submit_step(
cluster_name=CLUSTER_NAME,
step_id="clump",
task_id="ld_clumping",
other_args=[
"step.input_path={RELEASEBUCKET}/study_locus/window_clumped/from_sumstats/eqtl_catalogue",
"step.ld_index_path={RELEASEBUCKET}/ld_index",
"step.study_index_path={RELEASEBUCKET}/study_index/eqtl_catalogue",
"step.clumped_study_locus_path={RELEASEBUCKET}/study_locus/ld_clumped/from_sumstats/eqtl_catalogue",
],
trigger_rule=TriggerRule.ALL_DONE,
)

pics = common.submit_step(
cluster_name=CLUSTER_NAME,
step_id="pics",
task_id="pics",
other_args=[
"step.study_locus_ld_annotated_in={RELEASEBUCKET}/study_locus/ld_clumped/from_sumstats/eqtl_catalogue",
"step.picsed_study_locus_out={RELEASEBUCKET}/credible_set/from_sumstats_study_locus/eqtl_catalogue",
],
trigger_rule=TriggerRule.ALL_DONE,
)

(
common.create_cluster(
CLUSTER_NAME, autoscaling_policy=AUTOSCALING, num_workers=5
)
>> common.install_dependencies(CLUSTER_NAME)
>> parse_study_and_sumstats
>> summary_stats_window_clumping
>> ld_clumping
>> pics
>> common.delete_cluster(CLUSTER_NAME)
)
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@


@dataclass
class EqtlCatalogueStep:
class EqtlCatalogueIngestionStep:
"""eQTL Catalogue ingestion step.

Attributes:
Expand All @@ -28,7 +28,7 @@ class EqtlCatalogueStep:
eqtl_catalogue_study_index_out: str = MISSING
eqtl_catalogue_summary_stats_out: str = MISSING

def __post_init__(self: EqtlCatalogueStep) -> None:
def __post_init__(self: EqtlCatalogueIngestionStep) -> None:
"""Run step."""
# Fetch study index.
df = self.session.spark.read.option("delimiter", "\t").csv(
Expand Down