Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: reorder tasks in genetics_etl #62

Merged
merged 1 commit into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 11 additions & 12 deletions src/ot_orchestration/dags/config/genetics_etl.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
gwas_catalog_manifests_path: gs://gwas_catalog_data/manifests
l2g_gold_standard_path: gs://genetics_etl_python_playground/input/l2g/gold_standard/curation.json
release_dir: gs://ot_orchestration/releases/24.10_freeze5
dataproc:
Expand Down Expand Up @@ -34,7 +33,7 @@ nodes:
- gs://finngen_data/r11/study_index
step.target_index_path: gs://ot_orchestration/releases/24.10_freeze5/gene_index
step.disease_index_path: gs://open-targets-pre-data-releases/24.06/output/etl/parquet/diseases
step.valid_study_index_path: &valid_study_index gs://ot_orchestration/releases/24.10_freeze5/study_index
step.valid_study_index_path: gs://ot_orchestration/releases/24.10_freeze5/study_index
step.invalid_study_index_path: gs://ot_orchestration/releases/24.10_freeze5/invalid_study_index
step.biosample_index_path: gs://ot_orchestration/releases/24.10_freeze5/biosample_index
step.invalid_qc_reasons:
Expand All @@ -51,15 +50,15 @@ nodes:
- study_validation
params:
step: credible_set_validation
step.study_index_path: *valid_study_index
step.study_index_path: gs://ot_orchestration/releases/24.10_freeze5/study_index
step.study_locus_path:
- gs://gwas_catalog_top_hits/credible_sets/
- gs://gwas_catalog_sumstats_pics/credible_sets/
- gs://gwas_catalog_sumstats_susie/credible_set_clean/
- gs://eqtl_catalogue_data/credible_set_datasets/eqtl_catalogue_susie/
- gs://ukb_ppp_eur_data/credible_set_clean/
- gs://finngen_data/r11/credible_set_datasets/susie/
step.valid_study_locus_path: &valid_credible_set gs://ot_orchestration/releases/24.10_freeze5/credible_set
step.valid_study_locus_path: gs://ot_orchestration/releases/24.10_freeze5/credible_set
step.invalid_study_locus_path: gs://ot_orchestration/releases/24.10_freeze5/invalid_credible_set
step.invalid_qc_reasons:
- DUPLICATED_STUDYLOCUS_ID
Expand Down Expand Up @@ -171,7 +170,7 @@ nodes:
command: gs://genetics_etl_python_playground/initialisation/0.0.0/cli.py
params:
step: colocalisation
step.credible_set_path: *valid_credible_set
step.credible_set_path: gs://ot_orchestration/releases/24.10_freeze5/credible_set
step.coloc_path: gs://ot_orchestration/releases/24.10_freeze5/colocalisation
step.colocalisation_method: ECaviar
step.session.spark_uri: yarn
Expand All @@ -181,7 +180,7 @@ nodes:
command: gs://genetics_etl_python_playground/initialisation/0.0.0/cli.py
params:
step: colocalisation
step.credible_set_path: *valid_credible_set
step.credible_set_path: gs://ot_orchestration/releases/24.10_freeze5/credible_set
step.coloc_path: gs://ot_orchestration/releases/24.10_freeze5/colocalisation/
step.colocalisation_method: Coloc
step.session.spark_uri: yarn
Expand All @@ -192,10 +191,10 @@ nodes:
command: gs://genetics_etl_python_playground/initialisation/0.0.0/cli.py
params:
step: locus_to_gene_feature_matrix
step.credible_set_path: *valid_credible_set
step.credible_set_path: gs://ot_orchestration/releases/24.10_freeze5/credible_set
step.variant_index_path: gs://ot_orchestration/releases/24.10_freeze5/variant_index
step.colocalisation_path: gs://ot_orchestration/releases/24.10_freeze5/colocalisation
step.study_index_path: *valid_study_index
step.study_index_path: gs://ot_orchestration/releases/24.10_freeze5/study_index
step.gene_index_path: gs://ot_orchestration/releases/24.10_freeze5/gene_index
step.feature_matrix_path: gs://ot_orchestration/releases/24.10_freeze5/locus_to_gene_feature_matrix
+step.session.extended_spark_conf: "{spark.sql.autoBroadcastJoinThreshold:'-1'}"
Expand All @@ -213,7 +212,7 @@ nodes:
step.wandb_run_name: "24.10"
step.hf_hub_repo_id: opentargets/locus_to_gene
step.model_path: gs://ot_orchestration/releases/24.10_freeze5/locus_to_gene_model/classifier.skops
step.credible_set_path: *valid_credible_set
step.credible_set_path: gs://ot_orchestration/releases/24.10_freeze5/credible_set
step.variant_index_path: gs://ot_orchestration/releases/24.10_freeze5/variant_index
step.feature_matrix_path: gs://ot_orchestration/releases/24.10_freeze5/locus_to_gene_feature_matrix
step.gold_standard_curation_path: gs://ot_orchestration/releases/24.10_freeze5/locus_to_gene_gold_standard.json
Expand All @@ -233,7 +232,7 @@ nodes:
step.run_mode: predict
step.predictions_path: gs://ot_orchestration/releases/24.10_freeze5/locus_to_gene_predictions
step.feature_matrix_path: gs://ot_orchestration/releases/24.10_freeze5/locus_to_gene_feature_matrix
step.credible_set_path: *valid_credible_set
step.credible_set_path: gs://ot_orchestration/releases/24.10_freeze5/credible_set
step.session.spark_uri: yarn
prerequisites:
- l2g_train
Expand All @@ -243,8 +242,8 @@ nodes:
step: locus_to_gene_evidence
step.evidence_output_path: gs://ot_orchestration/releases/24.10_freeze5/locus_to_gene_evidence
step.locus_to_gene_predictions_path: gs://ot_orchestration/releases/24.10_freeze5/locus_to_gene_predictions
step.credible_set_path: *valid_credible_set
step.study_index_path: *valid_study_index
step.credible_set_path: gs://ot_orchestration/releases/24.10_freeze5/credible_set
step.study_index_path: gs://ot_orchestration/releases/24.10_freeze5/study_index
step.locus_to_gene_threshold: 0.05
step.session.spark_uri: yarn
prerequisites:
Expand Down
183 changes: 88 additions & 95 deletions src/ot_orchestration/dags/genetics_etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import time
from pathlib import Path

from airflow.decorators import task, task_group
from airflow.decorators import task
from airflow.models.baseoperator import chain
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.cloud_batch import (
Expand Down Expand Up @@ -44,103 +44,13 @@
SOURCE_CONFIG_FILE_PATH = Path(__file__).parent / "config" / "genetics_etl.yaml"
config = read_yaml_config(SOURCE_CONFIG_FILE_PATH)
nodes = config["nodes"]
gwas_catalog_manifests_path = GCSPath(config["gwas_catalog_manifests_path"])
l2g_gold_standard_path = GCSPath(config["l2g_gold_standard_path"])
release_dir = GCSPath(config["release_dir"])
variant_annotation_task_group_config = find_node_in_config(nodes, "variant_annotation")
node_map = {}


# FIXME: eventually this task group should have 2 steps only
# - 1. transform variant sources to vcf files, collect and partition them by chunk size - should be done by a single gentropy step rather then
# multiple tasks in the DAG (pending)
# - 2. list new chunk vcf files and annotate them - batch job
@task_group(group_id="variant_annotation")
def variant_annotation():
"""Variant annotation task group."""
task_config = find_node_in_config(
variant_annotation_task_group_config["nodes"], "variant_to_vcf"
)

google_batch_config = task_config["google_batch"]

commands = create_task_commands(
commands=google_batch_config["commands"],
params=task_config["params"],
)

task = create_task_spec(
image=google_batch_config["image"],
commands=commands,
resource_specs=google_batch_config["resource_specs"],
task_specs=google_batch_config["task_specs"],
entrypoint=google_batch_config["entrypoint"],
)

environment = google_batch_config["environment"]
batch_job = create_batch_job(
task=task,
task_env=create_task_env(environment),
policy_specs=google_batch_config["policy_specs"],
)
variant_to_vcf = CloudBatchSubmitJobOperator(
task_id="variant_to_vcf",
project_id=GCP_PROJECT_GENETICS,
region=GCP_REGION,
job_name=f"variant-to-vcf-job-{time.strftime('%Y%m%d-%H%M%S')}",
job=batch_job,
deferrable=False,
)

task_config = find_node_in_config(
config=variant_annotation_task_group_config["nodes"],
node_id="list_nonannotated_vcfs",
)["params"]

merged_vcfs = ConvertVariantsToVcfOperator(
task_id="list_nonannotated_vcfs",
tsv_files_glob=task_config["input_vcf_glob"],
output_path=task_config["output_path"],
chunk_size=task_config["chunk_size"],
)

task_config = find_node_in_config(
variant_annotation_task_group_config["nodes"], "vep_annotation"
)
task_config_params = task_config["params"]
vep_annotation = VepAnnotateOperator(
task_id=task_config["id"],
vcf_input_path=task_config_params["vcf_input_path"],
vep_output_path=task_config_params["vep_output_path"],
vep_cache_path=task_config_params["vep_cache_path"],
google_batch=task_config["google_batch"],
)
chain_dependencies(
nodes=variant_annotation_task_group_config["nodes"],
tasks_or_task_groups={
"variant_to_vcf": variant_to_vcf,
"list_nonannotated_vcfs": merged_vcfs,
"vep_annotation": vep_annotation,
},
)


# Files to move:
DATA_TO_MOVE = {
# GWAS Catalog manifest files:
"gwas_catalog_manifests": {
"source_bucket": gwas_catalog_manifests_path.bucket,
"source_object": gwas_catalog_manifests_path.path,
"destination_bucket": release_dir.bucket,
"destination_object": f"{release_dir.path}/manifests/",
},
# L2G gold standard:
"l2g_gold_standard": {
"source_bucket": l2g_gold_standard_path.bucket,
"source_object": l2g_gold_standard_path.path,
"destination_bucket": release_dir.bucket,
"destination_object": f"{release_dir.path}/locus_to_gene_gold_standard.json",
},
}


# This operator meant to fail the DAG if the release folder exists:
Expand All @@ -153,6 +63,18 @@ def variant_annotation():
# Compiling tasks for moving data to the right place:
with TaskGroup(group_id="data_transfer") as data_transfer:
# Defining the tasks to execute in the task group:
l2g_gold_standard_path = GCSPath(config["l2g_gold_standard_path"])
release_dir = GCSPath(config["release_dir"])
# Files to move:
DATA_TO_MOVE = {
# L2G gold standard:
"l2g_gold_standard": {
"source_bucket": l2g_gold_standard_path.bucket,
"source_object": l2g_gold_standard_path.path,
"destination_bucket": release_dir.bucket,
"destination_object": f"{release_dir.path}/locus_to_gene_gold_standard.json",
},
}
[
GCSToGCSOperator(
task_id=f"move_{data_name}",
Expand All @@ -165,9 +87,80 @@ def variant_annotation():
]

with TaskGroup(group_id="genetics_etl") as genetics_etl:
# Register all TaskGroups as nodes
node_map = {"variant_annotation": variant_annotation()}
# Register all standalone dataproc tasks
# ===== Variant annotation task group ======
with TaskGroup(group_id="variant_annotation") as variant_annotation:
variant_annotation_task_group_config = find_node_in_config(
nodes, "variant_annotation"
)

# ===== Convert variant datasource to vcf file step ======
task_config = find_node_in_config(
variant_annotation_task_group_config["nodes"],
node_id="variant_to_vcf",
)
google_batch_config = task_config["google_batch"]
commands = create_task_commands(
commands=google_batch_config["commands"],
params=task_config["params"],
)
task = create_task_spec(
image=google_batch_config["image"],
commands=commands,
resource_specs=google_batch_config["resource_specs"],
task_specs=google_batch_config["task_specs"],
entrypoint=google_batch_config["entrypoint"],
)
environment = google_batch_config["environment"]
batch_job = create_batch_job(
task=task,
task_env=create_task_env(environment),
policy_specs=google_batch_config["policy_specs"],
)
variant_to_vcf = CloudBatchSubmitJobOperator(
task_id="variant_to_vcf",
project_id=GCP_PROJECT_GENETICS,
region=GCP_REGION,
job_name=f"variant-to-vcf-job-{time.strftime('%Y%m%d-%H%M%S')}",
job=batch_job,
deferrable=False,
)

# ===== Partition vcf files into chunks step ======
task_config = find_node_in_config(
config=variant_annotation_task_group_config["nodes"],
node_id="list_nonannotated_vcfs",
)
merged_vcfs = ConvertVariantsToVcfOperator(
task_id="list_nonannotated_vcfs",
tsv_files_glob=task_config["params"]["input_vcf_glob"],
output_path=task_config["params"]["output_path"],
chunk_size=task_config["params"]["chunk_size"],
)
# ===== Perform VEP annotations on chunks step =====
task_config = find_node_in_config(
variant_annotation_task_group_config["nodes"],
node_id="vep_annotation",
)
task_config_params = task_config["params"]
vep_annotation = VepAnnotateOperator(
task_id=task_config["id"],
vcf_input_path=task_config_params["vcf_input_path"],
vep_output_path=task_config_params["vep_output_path"],
vep_cache_path=task_config_params["vep_cache_path"],
google_batch=task_config["google_batch"],
)

chain_dependencies(
nodes=variant_annotation_task_group_config["nodes"],
tasks_or_task_groups={
"variant_to_vcf": variant_to_vcf,
"list_nonannotated_vcfs": merged_vcfs,
"vep_annotation": vep_annotation,
},
)
node_map["variant_annotation"] = variant_annotation
# ===== END Variant annotation task group ======

tasks = [node for node in nodes if node.get("kind", "Task") == "Task"]
# Build individual tasks and register them as nodes.
for task in tasks:
Expand Down