Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(efm): enhanced flexibility mode in genetics etl cluster #63

Merged
merged 1 commit into from
Oct 30, 2024

Conversation

project-defiant
Copy link
Collaborator

@project-defiant project-defiant commented Oct 30, 2024

Context

Introduction of the new SuSiE credible sets from gwas_catalog (gs://gwas_catalog_sumstats_susie/credible_set_clean) resulted in ColocStep failures. Job performed on otg-etl cluster took ~4h and did not finish in that time - see job.

The most of the error logs trace the fact that he executors got lost during the job execution.

java.io.IOException: Connecting to otg-etl-sw-0qf7.europe-west1-d.c.open-targets-genetics-dev.internal/10.132.0.54:41329 failed in the last 4750 ms, fail this connection directly
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:214)
	at org.apache.spark.network.shuffle.ExternalBlockStoreClient.lambda$fetchBlocks$0(ExternalBlockStoreClient.java:132)
	at org.apache.spark.network.shuffle.RetryingBlockTransferor.transferAllOutstanding(RetryingBlockTransferor.java:173)
	at org.apache.spark.network.shuffle.RetryingBlockTransferor.lambda$initiateRetry$0(RetryingBlockTransferor.java:206)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)

The otg-etl cluster uses the following autoscaling policy

{
  "id": "otg-etl",
  "name": "projects/open-targets-genetics-dev/regions/europe-west1/autoscalingPolicies/otg-etl",
  "basicAlgorithm": {
    "yarnConfig": {
      "scaleUpFactor": 1,
      "scaleDownFactor": 1,
      "gracefulDecommissionTimeout": "10800s"
    },
    "cooldownPeriod": "120s"
  },
  "workerConfig": {
    "minInstances": 2,
    "maxInstances": 2,
    "weight": 1
  },
  "secondaryWorkerConfig": {
    "maxInstances": 100,
    "weight": 1
  }
}

Which specifies the ratio of secondaryWorkers to primaryWorkers to be max 100:2.

Caution

In case of the Coloc step, which requires many intensive shuffle operations, if the input dataset increases, the complexity of shuffling will also increase, thus resulting in time increase. This is a potential issue when we store the shuffle partitions on lost exectuors, as described in the EFM (Enhanced Flexibility Mode) description. Losing a worker will make the task restart, resulting in run time elongation.

Further more this can not be determined in advance due to the nature of preemptible (secondary) workers.

To accomodate for the lost shuffle partitions the EFM mode can be utilized.

The EFM mode will make the cluster to save the shuffle partitions only on primary workers. This will mean that we have to accomodate the disk size of the workers and effectively change the autoscaling policy, as EFM does not support the graceful decomissioning of workers.

Changes

The above tweaks were added to the existing dataproc cluster setup to accomodate the shuffling operations in Coloc step:

  • Enable the EFM mode with dataproc:efm.spark.shuffle=primary-workers property
  • Increase the number of primary workers on EFM mode from 2 to 10
  • Increase the ssd disk size on primary workers to 1TB when running on EFM mode
  • Create new autoscaling policy without graceful decomissioning - otg-efm
  • Allow for more then default concurrent threads to save the shuffle data into the primary workers (default is 16 cores * 2 which was adjusted to 50)
  • Allow for more peer connections (from default 1 to 5)
  • Tuning of shuffling with
yarn:yarn.resourcemanager.am.max-attempts
mapred:mapreduce.map.maxattempts
mapred:mapreduce.reduce.maxattempts
spark:spark.task.maxFailures
spark:spark.stage.maxConsecutiveAttempts

All of above comes from reading the documentation on EFM

  • Allow for 3 master nodes to run in parallel to decrese the risk of decomissioning the master node while the job tries to shuffle data to primary workers see High Availability mode
  • Adjustments to the create_cluster function to accomodate all parameters from ClusterGenerator

Additionally

  • Do not fail on parsing the dag topology when prerequisite task is not foud (allows easier config changes) - no need to comment out the prerequisites in the node configuration, if the tasks were commented out.

Note

The Coloc step succeded in 1h 20 min with the EFM mode enabled - see job - we still experienced executor loses

@project-defiant project-defiant added the enhancement New feature or request label Oct 30, 2024
@project-defiant project-defiant marked this pull request as ready for review October 30, 2024 14:31
Copy link
Contributor

@ireneisdoomed ireneisdoomed left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for fixing this!!
Let me see if I got this: with EFM workers are scaled down abruptly, so that shuffle only happens on the primary nodes to avoid data loss. And by saving shuffle partitions only on primary workers, we no longer rely on preemptible workers for shuffle storage, which was the source of the issue.

The new strategy makes sense to me. I think it is interesting that we only see the issue in eCAVIAR since I consider COLOC to be more optimised (although it is more complex too).

My only comment, correct me if I'm wrong, is that we are going to use the same cluster with the same EFM strategy for all ETL jobs, also for those where tasks don't involve heavy shuffling. Do you think removing graceful decommission from other steps will have a significant impact?

@project-defiant
Copy link
Collaborator Author

@ireneisdoomed you are right, we will be storing the shuffle partitions in the primary workers only in EFM mode, as far as the documentation is correct :)

The new strategy makes sense to me. I think it is interesting that we only see the issue in eCAVIAR since I consider COLOC to be more optimised (although it is more complex too).

This could have been non-deterministic. eCaviar took ~3h to complete, while coloc running at the same time failed to compute even after 4 hours. The initial issue, could have been that the fraction of executors running ecaviar were decomissioned in comparision to executors running coloc. It may be that one overlaps succeded, but the others failed. Either way we can not rely on the cluster in preemptible mode so easly, as any step that is running for a long time can be affected this way. Typically from my experiments, after ~30minutes the preemption can appears always, so any data that is not cached up until this stage, will get lost and has to be recomputed.

Another option to investigate is if we could actually retrieve the offending shuffle and cache or even checkpoint the data when before that stage. Then we would do what EFM tries to do, just manually from the code.

My only comment, correct me if I'm wrong, is that we are going to use the same cluster with the same EFM strategy for all ETL jobs, also for those where tasks don't involve heavy shuffling. Do you think removing graceful decommission from other steps will have a significant impact?

This is open to discussion. I am not sure if we will benefit anyhow on other tasks, since they are small and fast in comparission to colocalisation, but then we have to have two clusters, as EFM can be only set during the cluster setup, can not be updated.

Graceful decomissioning is there for downscaling the number of workers. As you might guess, the number of primary workers is fixed in the otg-efm to 10, so we always will have these workers available due to the fact we need more space for the shuffle partitions. This inclines that we can not downscale the cluster primary workers, otherwise we still lose the shuffle partitions that were there. I am not aware if we can distinguish between primary and secondary workers graceful decomissioning unfortunately.

@project-defiant project-defiant merged commit 7a3bd90 into dev Oct 30, 2024
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants