Skip to content

Commit

Permalink
refactor: reorder tasks in genetics_etl (#62)
Browse files Browse the repository at this point in the history
Co-authored-by: Szymon Szyszkowski <ss60@mib117351s.internal.sanger.ac.uk>
  • Loading branch information
project-defiant and Szymon Szyszkowski authored Oct 29, 2024
1 parent a8f1221 commit 41c73b6
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 107 deletions.
23 changes: 11 additions & 12 deletions src/ot_orchestration/dags/config/genetics_etl.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
gwas_catalog_manifests_path: gs://gwas_catalog_data/manifests
l2g_gold_standard_path: gs://genetics_etl_python_playground/input/l2g/gold_standard/curation.json
release_dir: gs://ot_orchestration/releases/24.10_freeze5
dataproc:
Expand Down Expand Up @@ -34,7 +33,7 @@ nodes:
- gs://finngen_data/r11/study_index
step.target_index_path: gs://ot_orchestration/releases/24.10_freeze5/gene_index
step.disease_index_path: gs://open-targets-pre-data-releases/24.06/output/etl/parquet/diseases
step.valid_study_index_path: &valid_study_index gs://ot_orchestration/releases/24.10_freeze5/study_index
step.valid_study_index_path: gs://ot_orchestration/releases/24.10_freeze5/study_index
step.invalid_study_index_path: gs://ot_orchestration/releases/24.10_freeze5/invalid_study_index
step.biosample_index_path: gs://ot_orchestration/releases/24.10_freeze5/biosample_index
step.invalid_qc_reasons:
Expand All @@ -51,15 +50,15 @@ nodes:
- study_validation
params:
step: credible_set_validation
step.study_index_path: *valid_study_index
step.study_index_path: gs://ot_orchestration/releases/24.10_freeze5/study_index
step.study_locus_path:
- gs://gwas_catalog_top_hits/credible_sets/
- gs://gwas_catalog_sumstats_pics/credible_sets/
- gs://gwas_catalog_sumstats_susie/credible_set_clean/
- gs://eqtl_catalogue_data/credible_set_datasets/eqtl_catalogue_susie/
- gs://ukb_ppp_eur_data/credible_set_clean/
- gs://finngen_data/r11/credible_set_datasets/susie/
step.valid_study_locus_path: &valid_credible_set gs://ot_orchestration/releases/24.10_freeze5/credible_set
step.valid_study_locus_path: gs://ot_orchestration/releases/24.10_freeze5/credible_set
step.invalid_study_locus_path: gs://ot_orchestration/releases/24.10_freeze5/invalid_credible_set
step.invalid_qc_reasons:
- DUPLICATED_STUDYLOCUS_ID
Expand Down Expand Up @@ -171,7 +170,7 @@ nodes:
command: gs://genetics_etl_python_playground/initialisation/0.0.0/cli.py
params:
step: colocalisation
step.credible_set_path: *valid_credible_set
step.credible_set_path: gs://ot_orchestration/releases/24.10_freeze5/credible_set
step.coloc_path: gs://ot_orchestration/releases/24.10_freeze5/colocalisation
step.colocalisation_method: ECaviar
step.session.spark_uri: yarn
Expand All @@ -181,7 +180,7 @@ nodes:
command: gs://genetics_etl_python_playground/initialisation/0.0.0/cli.py
params:
step: colocalisation
step.credible_set_path: *valid_credible_set
step.credible_set_path: gs://ot_orchestration/releases/24.10_freeze5/credible_set
step.coloc_path: gs://ot_orchestration/releases/24.10_freeze5/colocalisation/
step.colocalisation_method: Coloc
step.session.spark_uri: yarn
Expand All @@ -192,10 +191,10 @@ nodes:
command: gs://genetics_etl_python_playground/initialisation/0.0.0/cli.py
params:
step: locus_to_gene_feature_matrix
step.credible_set_path: *valid_credible_set
step.credible_set_path: gs://ot_orchestration/releases/24.10_freeze5/credible_set
step.variant_index_path: gs://ot_orchestration/releases/24.10_freeze5/variant_index
step.colocalisation_path: gs://ot_orchestration/releases/24.10_freeze5/colocalisation
step.study_index_path: *valid_study_index
step.study_index_path: gs://ot_orchestration/releases/24.10_freeze5/study_index
step.gene_index_path: gs://ot_orchestration/releases/24.10_freeze5/gene_index
step.feature_matrix_path: gs://ot_orchestration/releases/24.10_freeze5/locus_to_gene_feature_matrix
+step.session.extended_spark_conf: "{spark.sql.autoBroadcastJoinThreshold:'-1'}"
Expand All @@ -213,7 +212,7 @@ nodes:
step.wandb_run_name: "24.10"
step.hf_hub_repo_id: opentargets/locus_to_gene
step.model_path: gs://ot_orchestration/releases/24.10_freeze5/locus_to_gene_model/classifier.skops
step.credible_set_path: *valid_credible_set
step.credible_set_path: gs://ot_orchestration/releases/24.10_freeze5/credible_set
step.variant_index_path: gs://ot_orchestration/releases/24.10_freeze5/variant_index
step.feature_matrix_path: gs://ot_orchestration/releases/24.10_freeze5/locus_to_gene_feature_matrix
step.gold_standard_curation_path: gs://ot_orchestration/releases/24.10_freeze5/locus_to_gene_gold_standard.json
Expand All @@ -233,7 +232,7 @@ nodes:
step.run_mode: predict
step.predictions_path: gs://ot_orchestration/releases/24.10_freeze5/locus_to_gene_predictions
step.feature_matrix_path: gs://ot_orchestration/releases/24.10_freeze5/locus_to_gene_feature_matrix
step.credible_set_path: *valid_credible_set
step.credible_set_path: gs://ot_orchestration/releases/24.10_freeze5/credible_set
step.session.spark_uri: yarn
prerequisites:
- l2g_train
Expand All @@ -243,8 +242,8 @@ nodes:
step: locus_to_gene_evidence
step.evidence_output_path: gs://ot_orchestration/releases/24.10_freeze5/locus_to_gene_evidence
step.locus_to_gene_predictions_path: gs://ot_orchestration/releases/24.10_freeze5/locus_to_gene_predictions
step.credible_set_path: *valid_credible_set
step.study_index_path: *valid_study_index
step.credible_set_path: gs://ot_orchestration/releases/24.10_freeze5/credible_set
step.study_index_path: gs://ot_orchestration/releases/24.10_freeze5/study_index
step.locus_to_gene_threshold: 0.05
step.session.spark_uri: yarn
prerequisites:
Expand Down
183 changes: 88 additions & 95 deletions src/ot_orchestration/dags/genetics_etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import time
from pathlib import Path

from airflow.decorators import task, task_group
from airflow.decorators import task
from airflow.models.baseoperator import chain
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.cloud_batch import (
Expand Down Expand Up @@ -44,103 +44,13 @@
SOURCE_CONFIG_FILE_PATH = Path(__file__).parent / "config" / "genetics_etl.yaml"
config = read_yaml_config(SOURCE_CONFIG_FILE_PATH)
nodes = config["nodes"]
gwas_catalog_manifests_path = GCSPath(config["gwas_catalog_manifests_path"])
l2g_gold_standard_path = GCSPath(config["l2g_gold_standard_path"])
release_dir = GCSPath(config["release_dir"])
variant_annotation_task_group_config = find_node_in_config(nodes, "variant_annotation")
node_map = {}


# FIXME: eventually this task group should have 2 steps only
# - 1. transform variant sources to vcf files, collect and partition them by chunk size - should be done by a single gentropy step rather then
# multiple tasks in the DAG (pending)
# - 2. list new chunk vcf files and annotate them - batch job
@task_group(group_id="variant_annotation")
def variant_annotation():
"""Variant annotation task group."""
task_config = find_node_in_config(
variant_annotation_task_group_config["nodes"], "variant_to_vcf"
)

google_batch_config = task_config["google_batch"]

commands = create_task_commands(
commands=google_batch_config["commands"],
params=task_config["params"],
)

task = create_task_spec(
image=google_batch_config["image"],
commands=commands,
resource_specs=google_batch_config["resource_specs"],
task_specs=google_batch_config["task_specs"],
entrypoint=google_batch_config["entrypoint"],
)

environment = google_batch_config["environment"]
batch_job = create_batch_job(
task=task,
task_env=create_task_env(environment),
policy_specs=google_batch_config["policy_specs"],
)
variant_to_vcf = CloudBatchSubmitJobOperator(
task_id="variant_to_vcf",
project_id=GCP_PROJECT_GENETICS,
region=GCP_REGION,
job_name=f"variant-to-vcf-job-{time.strftime('%Y%m%d-%H%M%S')}",
job=batch_job,
deferrable=False,
)

task_config = find_node_in_config(
config=variant_annotation_task_group_config["nodes"],
node_id="list_nonannotated_vcfs",
)["params"]

merged_vcfs = ConvertVariantsToVcfOperator(
task_id="list_nonannotated_vcfs",
tsv_files_glob=task_config["input_vcf_glob"],
output_path=task_config["output_path"],
chunk_size=task_config["chunk_size"],
)

task_config = find_node_in_config(
variant_annotation_task_group_config["nodes"], "vep_annotation"
)
task_config_params = task_config["params"]
vep_annotation = VepAnnotateOperator(
task_id=task_config["id"],
vcf_input_path=task_config_params["vcf_input_path"],
vep_output_path=task_config_params["vep_output_path"],
vep_cache_path=task_config_params["vep_cache_path"],
google_batch=task_config["google_batch"],
)
chain_dependencies(
nodes=variant_annotation_task_group_config["nodes"],
tasks_or_task_groups={
"variant_to_vcf": variant_to_vcf,
"list_nonannotated_vcfs": merged_vcfs,
"vep_annotation": vep_annotation,
},
)


# Files to move:
DATA_TO_MOVE = {
# GWAS Catalog manifest files:
"gwas_catalog_manifests": {
"source_bucket": gwas_catalog_manifests_path.bucket,
"source_object": gwas_catalog_manifests_path.path,
"destination_bucket": release_dir.bucket,
"destination_object": f"{release_dir.path}/manifests/",
},
# L2G gold standard:
"l2g_gold_standard": {
"source_bucket": l2g_gold_standard_path.bucket,
"source_object": l2g_gold_standard_path.path,
"destination_bucket": release_dir.bucket,
"destination_object": f"{release_dir.path}/locus_to_gene_gold_standard.json",
},
}


# This operator meant to fail the DAG if the release folder exists:
Expand All @@ -153,6 +63,18 @@ def variant_annotation():
# Compiling tasks for moving data to the right place:
with TaskGroup(group_id="data_transfer") as data_transfer:
# Defining the tasks to execute in the task group:
l2g_gold_standard_path = GCSPath(config["l2g_gold_standard_path"])
release_dir = GCSPath(config["release_dir"])
# Files to move:
DATA_TO_MOVE = {
# L2G gold standard:
"l2g_gold_standard": {
"source_bucket": l2g_gold_standard_path.bucket,
"source_object": l2g_gold_standard_path.path,
"destination_bucket": release_dir.bucket,
"destination_object": f"{release_dir.path}/locus_to_gene_gold_standard.json",
},
}
[
GCSToGCSOperator(
task_id=f"move_{data_name}",
Expand All @@ -165,9 +87,80 @@ def variant_annotation():
]

with TaskGroup(group_id="genetics_etl") as genetics_etl:
# Register all TaskGroups as nodes
node_map = {"variant_annotation": variant_annotation()}
# Register all standalone dataproc tasks
# ===== Variant annotation task group ======
with TaskGroup(group_id="variant_annotation") as variant_annotation:
variant_annotation_task_group_config = find_node_in_config(
nodes, "variant_annotation"
)

# ===== Convert variant datasource to vcf file step ======
task_config = find_node_in_config(
variant_annotation_task_group_config["nodes"],
node_id="variant_to_vcf",
)
google_batch_config = task_config["google_batch"]
commands = create_task_commands(
commands=google_batch_config["commands"],
params=task_config["params"],
)
task = create_task_spec(
image=google_batch_config["image"],
commands=commands,
resource_specs=google_batch_config["resource_specs"],
task_specs=google_batch_config["task_specs"],
entrypoint=google_batch_config["entrypoint"],
)
environment = google_batch_config["environment"]
batch_job = create_batch_job(
task=task,
task_env=create_task_env(environment),
policy_specs=google_batch_config["policy_specs"],
)
variant_to_vcf = CloudBatchSubmitJobOperator(
task_id="variant_to_vcf",
project_id=GCP_PROJECT_GENETICS,
region=GCP_REGION,
job_name=f"variant-to-vcf-job-{time.strftime('%Y%m%d-%H%M%S')}",
job=batch_job,
deferrable=False,
)

# ===== Partition vcf files into chunks step ======
task_config = find_node_in_config(
config=variant_annotation_task_group_config["nodes"],
node_id="list_nonannotated_vcfs",
)
merged_vcfs = ConvertVariantsToVcfOperator(
task_id="list_nonannotated_vcfs",
tsv_files_glob=task_config["params"]["input_vcf_glob"],
output_path=task_config["params"]["output_path"],
chunk_size=task_config["params"]["chunk_size"],
)
# ===== Perform VEP annotations on chunks step =====
task_config = find_node_in_config(
variant_annotation_task_group_config["nodes"],
node_id="vep_annotation",
)
task_config_params = task_config["params"]
vep_annotation = VepAnnotateOperator(
task_id=task_config["id"],
vcf_input_path=task_config_params["vcf_input_path"],
vep_output_path=task_config_params["vep_output_path"],
vep_cache_path=task_config_params["vep_cache_path"],
google_batch=task_config["google_batch"],
)

chain_dependencies(
nodes=variant_annotation_task_group_config["nodes"],
tasks_or_task_groups={
"variant_to_vcf": variant_to_vcf,
"list_nonannotated_vcfs": merged_vcfs,
"vep_annotation": vep_annotation,
},
)
node_map["variant_annotation"] = variant_annotation
# ===== END Variant annotation task group ======

tasks = [node for node in nodes if node.get("kind", "Task") == "Task"]
# Build individual tasks and register them as nodes.
for task in tasks:
Expand Down

0 comments on commit 41c73b6

Please sign in to comment.