Skip to content

Commit

Permalink
[spark] Fix Gloo detecting incorrect Interfaces on DBR (ray-project#4…
Browse files Browse the repository at this point in the history
…2202)

When running distributed Pytorch without GPUs, Pytorch selects a localhost interface for gloo (i.e. 127.0.0.1:XXX), breaking distributed training. This method in Pytorch can yield the incorrect interface when a) the the hostname resolves locally to the loopback address or b) when hostname lookups fail.

This is scoped to DBR specifically because eth0 is guaranteed to exist there. Pytorch+Gloo does not support deny-listing like NCCL (as we do in ray-project#31824) because Pytorch directly uses the environment variable GLOO_SOCKET_IFNAME as the interface to use https://github.com/pytorch/pytorch/blob/7956ca16e649d86cbf11b6e122090fa05678fac3/torch/csrc/distributed/c10d/init.cpp#L2243.

Signed-off-by: Ian Rodney <ian.rodney@gmail.com>
  • Loading branch information
ijrsvt authored and vickytsang committed Jan 12, 2024
1 parent 0f435d9 commit e243ed2
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 0 deletions.
7 changes: 7 additions & 0 deletions python/ray/util/spark/cluster_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,7 @@ def _setup_ray_cluster(
extra_env={
RAY_ON_SPARK_COLLECT_LOG_TO_PATH: collect_log_to_path or "",
RAY_ON_SPARK_START_RAY_PARENT_PID: str(os.getpid()),
**start_hook.custom_environment_variables(),
},
)
spark_job_server = None
Expand Down Expand Up @@ -1511,10 +1512,13 @@ def ray_cluster_job_mapper(_):
if ray_temp_dir is not None:
ray_worker_node_cmd.append(f"--temp-dir={ray_temp_dir}")

hook_entry = _create_hook_entry(is_global=(ray_temp_dir is None))

ray_worker_node_extra_envs = {
RAY_ON_SPARK_COLLECT_LOG_TO_PATH: collect_log_to_path or "",
RAY_ON_SPARK_START_RAY_PARENT_PID: str(os.getpid()),
"RAY_ENABLE_WINDOWS_OR_OSX_CLUSTER": "1",
**hook_entry.custom_environment_variables(),
}

if num_gpus_per_node > 0:
Expand Down Expand Up @@ -1780,10 +1784,13 @@ def start(
)
ray_head_node_cmd.extend(_convert_ray_node_options(head_node_options))

hook_entry = _create_hook_entry(is_global=(ray_temp_dir is None))

extra_env = {
"AUTOSCALER_UPDATE_INTERVAL_S": "1",
RAY_ON_SPARK_COLLECT_LOG_TO_PATH: collect_log_to_path or "",
RAY_ON_SPARK_START_RAY_PARENT_PID: str(os.getpid()),
**hook_entry.custom_environment_variables(),
}

self.ray_head_node_cmd = ray_head_node_cmd
Expand Down
10 changes: 10 additions & 0 deletions python/ray/util/spark/databricks_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,3 +167,13 @@ def auto_shutdown_watcher():
def on_spark_job_created(self, job_group_id):
db_api_entry = get_db_entry_point()
db_api_entry.registerBackgroundSparkJobGroup("job_group_id")

def custom_environment_variables(self):
"""Hardcode `GLOO_SOCKET_IFNAME` to `eth0` for Databricks runtime.
Torch on DBR does not reliably detect the correct interface to use,
and ends up selecting the loopback interface, breaking cross-node
commnication."""
return {
"GLOO_SOCKET_IFNAME": "eth0",
}
3 changes: 3 additions & 0 deletions python/ray/util/spark/start_hook_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,6 @@ def on_cluster_created(self, ray_cluster_handler):

def on_spark_job_created(self, job_group_id):
pass

def custom_environment_variables(self):
return {}

0 comments on commit e243ed2

Please sign in to comment.