From 6a2189fd5a0e1e754a6b982cae40786a3535893e Mon Sep 17 00:00:00 2001 From: Szymon Szyszkowski Date: Tue, 1 Oct 2024 15:59:09 +0100 Subject: [PATCH] feat: allow for force reinstall dependencies on cluster when set up --- src/ot_orchestration/utils/dataproc.py | 30 ++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/src/ot_orchestration/utils/dataproc.py b/src/ot_orchestration/utils/dataproc.py index f376446..4930de2 100644 --- a/src/ot_orchestration/utils/dataproc.py +++ b/src/ot_orchestration/utils/dataproc.py @@ -22,6 +22,7 @@ GCP_REGION, GCP_ZONE, ) +from ot_orchestration.utils.path import GCSPath def create_cluster( @@ -273,3 +274,32 @@ def generate_dag(cluster_name: str, tasks: list[DataprocSubmitJobOperator]) -> A task.set_downstream(delete_cluster_task) return tasks + + +def reinstall_dependencies( + cluster_name: str, cluster_init_script: str +) -> DataprocSubmitJobOperator: + """Force install dependencies on a Dataproc cluster. + + Args: + cluster_name (str): Name of the cluster. + cluster_init_script (str): Name of the script to run in the cluster to update the dependencies. + + Returns: + DataprocSubmitJobOperator: Airflow task to install dependencies on a Dataproc cluster. + """ + cluster_init_script_name = GCSPath(cluster_init_script).segments["filename"] + return submit_job( + cluster_name=cluster_name, + task_id="install_dependencies", + job_type="pig_job", + job_specification={ + "jar_file_uris": [cluster_init_script], + "query_list": { + "queries": [ + f"sh chmod 750 $PWD/{cluster_init_script_name}", + f"sh $PWD/{cluster_init_script_name}", + ] + }, + }, + )