From 9fc1f05409f90e676c3fc1ed8879370ae6ed1cac Mon Sep 17 00:00:00 2001 From: Szymon Szyszkowski Date: Tue, 29 Oct 2024 13:10:55 +0000 Subject: [PATCH] refactor: reorder tasks in genetics_etl --- .../dags/config/genetics_etl.yaml | 23 ++- src/ot_orchestration/dags/genetics_etl.py | 183 +++++++++--------- 2 files changed, 99 insertions(+), 107 deletions(-) diff --git a/src/ot_orchestration/dags/config/genetics_etl.yaml b/src/ot_orchestration/dags/config/genetics_etl.yaml index c8ce3c2..86240a7 100644 --- a/src/ot_orchestration/dags/config/genetics_etl.yaml +++ b/src/ot_orchestration/dags/config/genetics_etl.yaml @@ -1,4 +1,3 @@ -gwas_catalog_manifests_path: gs://gwas_catalog_data/manifests l2g_gold_standard_path: gs://genetics_etl_python_playground/input/l2g/gold_standard/curation.json release_dir: gs://ot_orchestration/releases/24.10_freeze5 dataproc: @@ -34,7 +33,7 @@ nodes: - gs://finngen_data/r11/study_index step.target_index_path: gs://ot_orchestration/releases/24.10_freeze5/gene_index step.disease_index_path: gs://open-targets-pre-data-releases/24.06/output/etl/parquet/diseases - step.valid_study_index_path: &valid_study_index gs://ot_orchestration/releases/24.10_freeze5/study_index + step.valid_study_index_path: gs://ot_orchestration/releases/24.10_freeze5/study_index step.invalid_study_index_path: gs://ot_orchestration/releases/24.10_freeze5/invalid_study_index step.biosample_index_path: gs://ot_orchestration/releases/24.10_freeze5/biosample_index step.invalid_qc_reasons: @@ -51,7 +50,7 @@ nodes: - study_validation params: step: credible_set_validation - step.study_index_path: *valid_study_index + step.study_index_path: gs://ot_orchestration/releases/24.10_freeze5/study_index step.study_locus_path: - gs://gwas_catalog_top_hits/credible_sets/ - gs://gwas_catalog_sumstats_pics/credible_sets/ @@ -59,7 +58,7 @@ nodes: - gs://eqtl_catalogue_data/credible_set_datasets/eqtl_catalogue_susie/ - gs://ukb_ppp_eur_data/credible_set_clean/ - gs://finngen_data/r11/credible_set_datasets/susie/ - step.valid_study_locus_path: &valid_credible_set gs://ot_orchestration/releases/24.10_freeze5/credible_set + step.valid_study_locus_path: gs://ot_orchestration/releases/24.10_freeze5/credible_set step.invalid_study_locus_path: gs://ot_orchestration/releases/24.10_freeze5/invalid_credible_set step.invalid_qc_reasons: - DUPLICATED_STUDYLOCUS_ID @@ -171,7 +170,7 @@ nodes: command: gs://genetics_etl_python_playground/initialisation/0.0.0/cli.py params: step: colocalisation - step.credible_set_path: *valid_credible_set + step.credible_set_path: gs://ot_orchestration/releases/24.10_freeze5/credible_set step.coloc_path: gs://ot_orchestration/releases/24.10_freeze5/colocalisation step.colocalisation_method: ECaviar step.session.spark_uri: yarn @@ -181,7 +180,7 @@ nodes: command: gs://genetics_etl_python_playground/initialisation/0.0.0/cli.py params: step: colocalisation - step.credible_set_path: *valid_credible_set + step.credible_set_path: gs://ot_orchestration/releases/24.10_freeze5/credible_set step.coloc_path: gs://ot_orchestration/releases/24.10_freeze5/colocalisation/ step.colocalisation_method: Coloc step.session.spark_uri: yarn @@ -192,10 +191,10 @@ nodes: command: gs://genetics_etl_python_playground/initialisation/0.0.0/cli.py params: step: locus_to_gene_feature_matrix - step.credible_set_path: *valid_credible_set + step.credible_set_path: gs://ot_orchestration/releases/24.10_freeze5/credible_set step.variant_index_path: gs://ot_orchestration/releases/24.10_freeze5/variant_index step.colocalisation_path: gs://ot_orchestration/releases/24.10_freeze5/colocalisation - step.study_index_path: *valid_study_index + step.study_index_path: gs://ot_orchestration/releases/24.10_freeze5/study_index step.gene_index_path: gs://ot_orchestration/releases/24.10_freeze5/gene_index step.feature_matrix_path: gs://ot_orchestration/releases/24.10_freeze5/locus_to_gene_feature_matrix +step.session.extended_spark_conf: "{spark.sql.autoBroadcastJoinThreshold:'-1'}" @@ -213,7 +212,7 @@ nodes: step.wandb_run_name: "24.10" step.hf_hub_repo_id: opentargets/locus_to_gene step.model_path: gs://ot_orchestration/releases/24.10_freeze5/locus_to_gene_model/classifier.skops - step.credible_set_path: *valid_credible_set + step.credible_set_path: gs://ot_orchestration/releases/24.10_freeze5/credible_set step.variant_index_path: gs://ot_orchestration/releases/24.10_freeze5/variant_index step.feature_matrix_path: gs://ot_orchestration/releases/24.10_freeze5/locus_to_gene_feature_matrix step.gold_standard_curation_path: gs://ot_orchestration/releases/24.10_freeze5/locus_to_gene_gold_standard.json @@ -233,7 +232,7 @@ nodes: step.run_mode: predict step.predictions_path: gs://ot_orchestration/releases/24.10_freeze5/locus_to_gene_predictions step.feature_matrix_path: gs://ot_orchestration/releases/24.10_freeze5/locus_to_gene_feature_matrix - step.credible_set_path: *valid_credible_set + step.credible_set_path: gs://ot_orchestration/releases/24.10_freeze5/credible_set step.session.spark_uri: yarn prerequisites: - l2g_train @@ -243,8 +242,8 @@ nodes: step: locus_to_gene_evidence step.evidence_output_path: gs://ot_orchestration/releases/24.10_freeze5/locus_to_gene_evidence step.locus_to_gene_predictions_path: gs://ot_orchestration/releases/24.10_freeze5/locus_to_gene_predictions - step.credible_set_path: *valid_credible_set - step.study_index_path: *valid_study_index + step.credible_set_path: gs://ot_orchestration/releases/24.10_freeze5/credible_set + step.study_index_path: gs://ot_orchestration/releases/24.10_freeze5/study_index step.locus_to_gene_threshold: 0.05 step.session.spark_uri: yarn prerequisites: diff --git a/src/ot_orchestration/dags/genetics_etl.py b/src/ot_orchestration/dags/genetics_etl.py index ae65849..96c64d0 100644 --- a/src/ot_orchestration/dags/genetics_etl.py +++ b/src/ot_orchestration/dags/genetics_etl.py @@ -5,7 +5,7 @@ import time from pathlib import Path -from airflow.decorators import task, task_group +from airflow.decorators import task from airflow.models.baseoperator import chain from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.cloud_batch import ( @@ -44,103 +44,13 @@ SOURCE_CONFIG_FILE_PATH = Path(__file__).parent / "config" / "genetics_etl.yaml" config = read_yaml_config(SOURCE_CONFIG_FILE_PATH) nodes = config["nodes"] -gwas_catalog_manifests_path = GCSPath(config["gwas_catalog_manifests_path"]) -l2g_gold_standard_path = GCSPath(config["l2g_gold_standard_path"]) -release_dir = GCSPath(config["release_dir"]) -variant_annotation_task_group_config = find_node_in_config(nodes, "variant_annotation") +node_map = {} # FIXME: eventually this task group should have 2 steps only # - 1. transform variant sources to vcf files, collect and partition them by chunk size - should be done by a single gentropy step rather then # multiple tasks in the DAG (pending) # - 2. list new chunk vcf files and annotate them - batch job -@task_group(group_id="variant_annotation") -def variant_annotation(): - """Variant annotation task group.""" - task_config = find_node_in_config( - variant_annotation_task_group_config["nodes"], "variant_to_vcf" - ) - - google_batch_config = task_config["google_batch"] - - commands = create_task_commands( - commands=google_batch_config["commands"], - params=task_config["params"], - ) - - task = create_task_spec( - image=google_batch_config["image"], - commands=commands, - resource_specs=google_batch_config["resource_specs"], - task_specs=google_batch_config["task_specs"], - entrypoint=google_batch_config["entrypoint"], - ) - - environment = google_batch_config["environment"] - batch_job = create_batch_job( - task=task, - task_env=create_task_env(environment), - policy_specs=google_batch_config["policy_specs"], - ) - variant_to_vcf = CloudBatchSubmitJobOperator( - task_id="variant_to_vcf", - project_id=GCP_PROJECT_GENETICS, - region=GCP_REGION, - job_name=f"variant-to-vcf-job-{time.strftime('%Y%m%d-%H%M%S')}", - job=batch_job, - deferrable=False, - ) - - task_config = find_node_in_config( - config=variant_annotation_task_group_config["nodes"], - node_id="list_nonannotated_vcfs", - )["params"] - - merged_vcfs = ConvertVariantsToVcfOperator( - task_id="list_nonannotated_vcfs", - tsv_files_glob=task_config["input_vcf_glob"], - output_path=task_config["output_path"], - chunk_size=task_config["chunk_size"], - ) - - task_config = find_node_in_config( - variant_annotation_task_group_config["nodes"], "vep_annotation" - ) - task_config_params = task_config["params"] - vep_annotation = VepAnnotateOperator( - task_id=task_config["id"], - vcf_input_path=task_config_params["vcf_input_path"], - vep_output_path=task_config_params["vep_output_path"], - vep_cache_path=task_config_params["vep_cache_path"], - google_batch=task_config["google_batch"], - ) - chain_dependencies( - nodes=variant_annotation_task_group_config["nodes"], - tasks_or_task_groups={ - "variant_to_vcf": variant_to_vcf, - "list_nonannotated_vcfs": merged_vcfs, - "vep_annotation": vep_annotation, - }, - ) - - -# Files to move: -DATA_TO_MOVE = { - # GWAS Catalog manifest files: - "gwas_catalog_manifests": { - "source_bucket": gwas_catalog_manifests_path.bucket, - "source_object": gwas_catalog_manifests_path.path, - "destination_bucket": release_dir.bucket, - "destination_object": f"{release_dir.path}/manifests/", - }, - # L2G gold standard: - "l2g_gold_standard": { - "source_bucket": l2g_gold_standard_path.bucket, - "source_object": l2g_gold_standard_path.path, - "destination_bucket": release_dir.bucket, - "destination_object": f"{release_dir.path}/locus_to_gene_gold_standard.json", - }, -} # This operator meant to fail the DAG if the release folder exists: @@ -153,6 +63,18 @@ def variant_annotation(): # Compiling tasks for moving data to the right place: with TaskGroup(group_id="data_transfer") as data_transfer: # Defining the tasks to execute in the task group: + l2g_gold_standard_path = GCSPath(config["l2g_gold_standard_path"]) + release_dir = GCSPath(config["release_dir"]) + # Files to move: + DATA_TO_MOVE = { + # L2G gold standard: + "l2g_gold_standard": { + "source_bucket": l2g_gold_standard_path.bucket, + "source_object": l2g_gold_standard_path.path, + "destination_bucket": release_dir.bucket, + "destination_object": f"{release_dir.path}/locus_to_gene_gold_standard.json", + }, + } [ GCSToGCSOperator( task_id=f"move_{data_name}", @@ -165,9 +87,80 @@ def variant_annotation(): ] with TaskGroup(group_id="genetics_etl") as genetics_etl: - # Register all TaskGroups as nodes - node_map = {"variant_annotation": variant_annotation()} - # Register all standalone dataproc tasks + # ===== Variant annotation task group ====== + with TaskGroup(group_id="variant_annotation") as variant_annotation: + variant_annotation_task_group_config = find_node_in_config( + nodes, "variant_annotation" + ) + + # ===== Convert variant datasource to vcf file step ====== + task_config = find_node_in_config( + variant_annotation_task_group_config["nodes"], + node_id="variant_to_vcf", + ) + google_batch_config = task_config["google_batch"] + commands = create_task_commands( + commands=google_batch_config["commands"], + params=task_config["params"], + ) + task = create_task_spec( + image=google_batch_config["image"], + commands=commands, + resource_specs=google_batch_config["resource_specs"], + task_specs=google_batch_config["task_specs"], + entrypoint=google_batch_config["entrypoint"], + ) + environment = google_batch_config["environment"] + batch_job = create_batch_job( + task=task, + task_env=create_task_env(environment), + policy_specs=google_batch_config["policy_specs"], + ) + variant_to_vcf = CloudBatchSubmitJobOperator( + task_id="variant_to_vcf", + project_id=GCP_PROJECT_GENETICS, + region=GCP_REGION, + job_name=f"variant-to-vcf-job-{time.strftime('%Y%m%d-%H%M%S')}", + job=batch_job, + deferrable=False, + ) + + # ===== Partition vcf files into chunks step ====== + task_config = find_node_in_config( + config=variant_annotation_task_group_config["nodes"], + node_id="list_nonannotated_vcfs", + ) + merged_vcfs = ConvertVariantsToVcfOperator( + task_id="list_nonannotated_vcfs", + tsv_files_glob=task_config["params"]["input_vcf_glob"], + output_path=task_config["params"]["output_path"], + chunk_size=task_config["params"]["chunk_size"], + ) + # ===== Perform VEP annotations on chunks step ===== + task_config = find_node_in_config( + variant_annotation_task_group_config["nodes"], + node_id="vep_annotation", + ) + task_config_params = task_config["params"] + vep_annotation = VepAnnotateOperator( + task_id=task_config["id"], + vcf_input_path=task_config_params["vcf_input_path"], + vep_output_path=task_config_params["vep_output_path"], + vep_cache_path=task_config_params["vep_cache_path"], + google_batch=task_config["google_batch"], + ) + + chain_dependencies( + nodes=variant_annotation_task_group_config["nodes"], + tasks_or_task_groups={ + "variant_to_vcf": variant_to_vcf, + "list_nonannotated_vcfs": merged_vcfs, + "vep_annotation": vep_annotation, + }, + ) + node_map["variant_annotation"] = variant_annotation + # ===== END Variant annotation task group ====== + tasks = [node for node in nodes if node.get("kind", "Task") == "Task"] # Build individual tasks and register them as nodes. for task in tasks: