Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(efm): enhanced flexibility mode in genetics etl cluster #63

Merged
merged 1 commit into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/ot_orchestration/dags/config/genetics_etl.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ dataproc:
PACKAGE: gs://genetics_etl_python_playground/initialisation/gentropy/dev/gentropy-0.0.0-py3-none-any.whl
cluster_init_script: gs://genetics_etl_python_playground/initialisation/gentropy/dev/install_dependencies_on_cluster.sh
cluster_name: otg-etl
autoscaling_policy: otg-etl
autoscaling_policy: otg-efm
allow_efm: true
num_workers: 10

nodes:
- id: biosample_index
Expand Down
4 changes: 1 addition & 3 deletions src/ot_orchestration/dags/genetics_etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,8 @@
# chain prerequisites
chain_dependencies(nodes=config["nodes"], tasks_or_task_groups=node_map)
generate_dataproc_task_chain(
cluster_name=dataproc_specs["cluster_name"],
cluster_metadata=dataproc_specs["cluster_metadata"],
cluster_init_script=dataproc_specs["cluster_init_script"],
tasks=list(node_map.values()), # type: ignore
**config["dataproc"],
)

# DAG description:
Expand Down
1 change: 1 addition & 0 deletions src/ot_orchestration/utils/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
GCP_ZONE = "europe-west1-d"
GCP_DATAPROC_IMAGE = "2.1"
GCP_AUTOSCALING_POLICY = "otg-etl"
GCP_EFM_AUTOSCALING_POLICY = "otg-efm"

# Image configuration.
GENTROPY_DOCKER_IMAGE = (
Expand Down
63 changes: 48 additions & 15 deletions src/ot_orchestration/utils/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from ot_orchestration.utils.common import (
GCP_AUTOSCALING_POLICY,
GCP_DATAPROC_IMAGE,
GCP_EFM_AUTOSCALING_POLICY,
GCP_PROJECT_GENETICS,
GCP_REGION,
GCP_ZONE,
Expand All @@ -29,13 +30,15 @@ def create_cluster(
cluster_name: str,
master_machine_type: str = "n1-highmem-16",
worker_machine_type: str = "n1-standard-16",
num_workers: int = 2,
num_workers: int = 1,
num_preemptible_workers: int = 0,
num_local_ssds: int = 1,
autoscaling_policy: str = GCP_AUTOSCALING_POLICY,
master_disk_size: int = 500,
cluster_init_script: str | None = None,
cluster_metadata: dict[str, str] | None = None,
allow_efm: bool = False,
**kwargs,
) -> DataprocCreateClusterOperator:
"""Generate an Airflow task to create a Dataproc cluster. Common parameters are reused, and varying parameters can be specified as needed.

Expand All @@ -50,26 +53,51 @@ def create_cluster(
master_disk_size (int): Size of the master node's boot disk in GB. Defaults to 500.
cluster_init_script (str | None): Cluster initialization scripts.
cluster_metadata (str | None): Cluster METADATA.
allow_efm (bool): Wether to allow for Enhanced Flexibility Mode in spark cluster to store the shuffle partitions in the primary workers only.
**kwargs (Any): Other parameters to the ClusterGenerator.

NOTE: When `allow_efm` is enabled, the autoscaling policy can not use the graceful decommissioning for primary workers!
NOTE: When `allow_efm` is enabled, the ratio between primary and secondary workers should not be small (1:10) at least.
NOTE: When `allow_efm` is enabled, the size of primary workers disks should be increased and set to use the pd-ssd
To see more about EFM see https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/enhanced-flexibility-mode

Returns:
DataprocCreateClusterOperator: Airflow task to create a Dataproc cluster.
"""
# Create base cluster configuration.
properties = None
if allow_efm:
properties = {
"dataproc:efm.spark.shuffle": "primary-worker",
"spark:spark.sql.files.maxPartitionBytes": "1073741824", # value proposed by the Dataproc documentation. See EFM in docstring.
"spark:spark.sql.shuffle.partitions": "100",
"yarn:spark.shuffle.io.serverThreads": "50", # ensure more threads can write default for n-standard-16 is 2 * (16 cores) threads
"spark:spark.shuffle.io.numConnectionsPerPeer": "5",
"spark:spark.stage.maxConsecutiveAttempts": "10", # defaults to 4, this is in case the master was lost
"spark:spark.task.maxFailures": "10",
}

cluster_config = ClusterGenerator(
num_masters=3
if allow_efm
else 1, # allows to run the dataproc cluster in HA mode.
project_id=GCP_PROJECT_GENETICS,
zone=GCP_ZONE,
master_machine_type=master_machine_type,
worker_machine_type=worker_machine_type,
worker_disk_type="pd-ssd" if allow_efm else "pd-standard",
master_disk_size=master_disk_size,
worker_disk_size=500,
worker_disk_size=1024 if allow_efm else 500,
num_preemptible_workers=num_preemptible_workers,
num_workers=num_workers,
image_version=GCP_DATAPROC_IMAGE,
enable_component_gateway=True,
metadata=cluster_metadata,
idle_delete_ttl=30 * 60, # In seconds.
init_actions_uris=[cluster_init_script] if cluster_init_script else None,
autoscaling_policy=f"projects/{GCP_PROJECT_GENETICS}/regions/{GCP_REGION}/autoscalingPolicies/{autoscaling_policy}",
autoscaling_policy=get_autoscaling_policy(policy_name=autoscaling_policy),
properties=properties,
**kwargs,
).make()

# If specified, amend the configuration to include local SSDs for worker nodes.
Expand All @@ -93,6 +121,19 @@ def create_cluster(
)


def get_autoscaling_policy(
*,
policy_name: str = GCP_AUTOSCALING_POLICY,
region: str = GCP_REGION,
project: str = GCP_PROJECT_GENETICS,
allow_efm: bool = False,
) -> str:
"""Get the autoscaling policy full path."""
if allow_efm and policy_name == GCP_AUTOSCALING_POLICY:
policy_name = GCP_EFM_AUTOSCALING_POLICY
return f"projects/{project}/regions/{region}/autoscalingPolicies/{policy_name}"


def submit_gentropy_step(
cluster_name: str,
step_name: str,
Expand Down Expand Up @@ -221,31 +262,23 @@ def delete_cluster(cluster_name: str) -> DataprocDeleteClusterOperator:


def generate_dataproc_task_chain(
cluster_name: str,
cluster_init_script: str,
cluster_metadata: dict[str, str],
tasks: list[DataprocSubmitJobOperator],
**kwargs,
) -> Any:
"""For a list of Dataproc tasks, generate a complete chain of tasks.

This function adds create_cluster, install_dependencies to the task that does not have any upstream tasks (first one in the DAG)
and adds delete_cluster tasks to the task that does not have any downstream tasks (last one in the DAG)

Args:
cluster_name (str): Name of the cluster.
cluster_init_script (str): URI to the cluster initialization script.
cluster_metadata: (dict[str, str]): METADATA to fill into the cluster during initialization.
tasks (list[DataprocSubmitJobOperator]): List of tasks to execute.
**kwargs (Any): keyword arguments passed to the `create_cluster`.

Returns:
list[DataprocSubmitJobOperator]: list of input tasks with muted chain.
"""
create_cluster_task = create_cluster(
cluster_name,
cluster_metadata=cluster_metadata,
cluster_init_script=cluster_init_script,
)
delete_cluster_task = delete_cluster(cluster_name)
create_cluster_task = create_cluster(**kwargs)
delete_cluster_task = delete_cluster(kwargs["cluster_name"])
for task in tasks:
if not task.get_direct_relatives(upstream=True):
task.set_upstream(create_cluster_task)
Expand Down
3 changes: 2 additions & 1 deletion src/ot_orchestration/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ def chain_dependencies(nodes: list[ConfigNode], tasks_or_task_groups: dict[str,
for label, node in tasks_or_task_groups.items():
print(node_dependencies)
for dependency in node_dependencies[label]:
node.set_upstream(tasks_or_task_groups[dependency])
if dependency in tasks_or_task_groups:
node.set_upstream(tasks_or_task_groups[dependency])


def convert_params_to_hydra_positional_arg(
Expand Down