Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[spark] ray on spark creates spark job using stage scheduling #31397

Merged
merged 37 commits into from
Jan 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
c382364
init
WeichenXu123 Dec 20, 2022
b86444f
update
WeichenXu123 Dec 26, 2022
82ae131
update
WeichenXu123 Jan 2, 2023
86e6acd
update
WeichenXu123 Jan 3, 2023
d1f1f35
update
WeichenXu123 Jan 3, 2023
b344320
update
WeichenXu123 Jan 3, 2023
eb00816
update
WeichenXu123 Jan 3, 2023
55b4fc1
update
WeichenXu123 Jan 3, 2023
422473e
fix lint
WeichenXu123 Jan 3, 2023
9de7cdc
update tests
WeichenXu123 Jan 5, 2023
36ae2ef
fix tests
WeichenXu123 Jan 5, 2023
2ec6fbf
update tests
WeichenXu123 Jan 9, 2023
3f4a648
update test-req
WeichenXu123 Jan 9, 2023
caafcdd
set object mem mimimum
WeichenXu123 Jan 11, 2023
1b9e481
update tests
WeichenXu123 Jan 11, 2023
ef535e2
improve hook
WeichenXu123 Jan 11, 2023
ece7321
fix get_max_num_concurrent_tasks
WeichenXu123 Jan 12, 2023
98e7ff5
update test
WeichenXu123 Jan 12, 2023
01a78fc
add mem calc warning
WeichenXu123 Jan 12, 2023
549b999
update
WeichenXu123 Jan 12, 2023
9ea2986
fix
WeichenXu123 Jan 12, 2023
ea029dc
update
WeichenXu123 Jan 12, 2023
017fcde
update api
WeichenXu123 Jan 13, 2023
c287a3b
update tests
WeichenXu123 Jan 13, 2023
9a73a0a
address comments
WeichenXu123 Jan 16, 2023
1936024
Merge branch 'master' into stage-scheduling
WeichenXu123 Jan 16, 2023
e73d36d
add node_options valiation
WeichenXu123 Jan 16, 2023
109ae7e
update
WeichenXu123 Jan 16, 2023
d1d4bda
address comments
WeichenXu123 Jan 17, 2023
5182dd6
nit update
WeichenXu123 Jan 17, 2023
bfa8f51
fix lint
WeichenXu123 Jan 17, 2023
4c6e301
fix lint
WeichenXu123 Jan 17, 2023
5463e05
address comments
WeichenXu123 Jan 18, 2023
4fcb2ff
nit updates
WeichenXu123 Jan 18, 2023
300fb2b
nit update
WeichenXu123 Jan 18, 2023
20df1f3
fix lint
WeichenXu123 Jan 18, 2023
985cf97
Merge branch 'master' into stage-scheduling
WeichenXu123 Jan 24, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 21 additions & 10 deletions doc/source/cluster/vms/user-guides/community/spark.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,31 @@ Assuming the python file name is 'ray-on-spark-example1.py'.
.. code-block:: python

from pyspark.sql import SparkSession
from ray.util.spark import init_ray_cluster, shutdown_ray_cluster, MAX_NUM_WORKER_NODES
from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster, MAX_NUM_WORKER_NODES
jjyao marked this conversation as resolved.
Show resolved Hide resolved
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("Ray on spark example 1") \
.config("spark.task.cpus", "4") \
.getOrCreate()

# initiate a ray cluster on this spark application, it creates a background
# Set up a ray cluster on this spark application, it creates a background
# spark job that each spark task launches one ray worker node.
# ray head node is launched in spark application driver side.
# Resources (CPU / GPU / memory) allocated to each ray worker node is equal
# to resources allocated to the corresponding spark task.
init_ray_cluster(num_worker_nodes=MAX_NUM_WORKER_NODES)
setup_ray_cluster(num_worker_nodes=MAX_NUM_WORKER_NODES)

# You can any ray application code here, the ray application will be executed
# on the ray cluster setup above.
# Note that you don't need to call `ray.init`.
# You don't need to set address for `ray.init`,
# it will connect to the cluster created above automatically.
ray.init()
...

# Terminate ray cluster explicitly.
# If you don't call it, when spark application is terminated, the ray cluster will
# also be terminated.
# If you don't call it, when spark application is terminated, the ray cluster
# will also be terminated.
shutdown_ray_cluster()

2) Submit the spark application above to spark standalone cluster.
Expand All @@ -64,7 +66,7 @@ Assuming the python file name is 'long-running-ray-cluster-on-spark.py'.

from pyspark.sql import SparkSession
import time
from ray.util.spark import init_ray_cluster, MAX_NUM_WORKER_NODES
from ray.util.spark import setup_ray_cluster, MAX_NUM_WORKER_NODES

if __name__ == "__main__":
spark = SparkSession \
Expand All @@ -73,9 +75,11 @@ Assuming the python file name is 'long-running-ray-cluster-on-spark.py'.
.config("spark.task.cpus", "4") \
.getOrCreate()

cluster_address = init_ray_cluster(num_worker_nodes=MAX_NUM_WORKER_NODES)
print("Ray cluster is initiated, you can connect to this ray cluster via address "
f"ray://{cluster_address}")
cluster_address = setup_ray_cluster(
num_worker_nodes=MAX_NUM_WORKER_NODES
)
print("Ray cluster is set up, you can connect to this ray cluster "
f"via address ray://{cluster_address}")

# Sleep forever until the spark application being terminated,
# at that time, the ray cluster will also be terminated.
Expand All @@ -90,3 +94,10 @@ Assuming the python file name is 'long-running-ray-cluster-on-spark.py'.
spark-submit \
--master spark://{spark_master_IP}:{spark_master_port} \
path/to/long-running-ray-cluster-on-spark.py

Ray on Spark APIs
-----------------

.. autofunction:: ray.util.spark.setup_ray_cluster

.. autofunction:: ray.util.spark.shutdown_ray_cluster
79 changes: 52 additions & 27 deletions python/ray/tests/spark/test_GPU.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@
import functools
from abc import ABC
from pyspark.sql import SparkSession
from ray.tests.spark.test_basic import RayOnSparkCPUClusterTestBase
from ray.tests.spark.test_basic import RayOnSparkCPUClusterTestBase, _setup_ray_cluster

import ray
from ray.util.spark.cluster_init import _init_ray_cluster

pytestmark = pytest.mark.skipif(
not sys.platform.startswith("linux"),
Expand All @@ -22,40 +21,64 @@ class RayOnSparkGPUClusterTestBase(RayOnSparkCPUClusterTestBase, ABC):
num_gpus_per_spark_task = None

def test_gpu_allocation(self):

for num_spark_tasks in [self.max_spark_tasks // 2, self.max_spark_tasks]:
with _init_ray_cluster(num_worker_nodes=num_spark_tasks, safe_mode=False):
for num_worker_nodes, num_cpus_per_node, num_gpus_per_node in [
(
self.max_spark_tasks // 2,
self.num_cpus_per_spark_task,
self.num_gpus_per_spark_task,
),
(
self.max_spark_tasks,
self.num_cpus_per_spark_task,
self.num_gpus_per_spark_task,
),
(
self.max_spark_tasks // 2,
self.num_cpus_per_spark_task * 2,
self.num_gpus_per_spark_task * 2,
),
(
self.max_spark_tasks // 2,
self.num_cpus_per_spark_task,
self.num_gpus_per_spark_task * 2,
),
]:
with _setup_ray_cluster(
num_worker_nodes=num_worker_nodes,
num_cpus_per_node=num_cpus_per_node,
num_gpus_per_node=num_gpus_per_node,
head_node_options={"include_dashboard": False},
):
ray.init()
worker_res_list = self.get_ray_worker_resources_list()
assert len(worker_res_list) == num_spark_tasks
assert len(worker_res_list) == num_worker_nodes
for worker_res in worker_res_list:
assert worker_res["GPU"] == self.num_gpus_per_spark_task

def test_basic_ray_app_using_gpu(self):

with _init_ray_cluster(num_worker_nodes=self.max_spark_tasks, safe_mode=False):
assert worker_res["CPU"] == num_cpus_per_node
assert worker_res["GPU"] == num_gpus_per_node

@ray.remote(num_cpus=1, num_gpus=1)
def f(_):
# Add a sleep to avoid the task finishing too fast,
# so that it can make all ray tasks concurrently running in all idle
# task slots.
time.sleep(5)
return [
int(gpu_id)
for gpu_id in os.environ["CUDA_VISIBLE_DEVICES"].split(",")
]
@ray.remote(num_cpus=num_cpus_per_node, num_gpus=num_gpus_per_node)
def f(_):
# Add a sleep to avoid the task finishing too fast,
# so that it can make all ray tasks concurrently running in all idle
# task slots.
time.sleep(5)
return [
int(gpu_id)
for gpu_id in os.environ["CUDA_VISIBLE_DEVICES"].split(",")
]

futures = [f.remote(i) for i in range(self.num_total_gpus)]
results = ray.get(futures)
merged_results = functools.reduce(lambda x, y: x + y, results)
# Test all ray tasks are assigned with different GPUs.
assert sorted(merged_results) == list(range(self.num_total_gpus))
futures = [f.remote(i) for i in range(num_worker_nodes)]
results = ray.get(futures)
merged_results = functools.reduce(lambda x, y: x + y, results)
# Test all ray tasks are assigned with different GPUs.
assert sorted(merged_results) == list(
range(num_gpus_per_node * num_worker_nodes)
)


class TestBasicSparkGPUCluster(RayOnSparkGPUClusterTestBase):
@classmethod
def setup_class(cls):
super().setup_class()
cls.num_total_cpus = 2
cls.num_total_gpus = 2
cls.num_cpus_per_spark_task = 1
Expand All @@ -76,6 +99,8 @@ def setup_class(cls):
.config(
"spark.worker.resource.gpu.discoveryScript", gpu_discovery_script_path
)
.config("spark.executorEnv.RAY_ON_SPARK_WORKER_CPU_CORES", "2")
.config("spark.executorEnv.RAY_ON_SPARK_WORKER_GPU_NUM", "2")
.getOrCreate()
)

Expand Down
63 changes: 43 additions & 20 deletions python/ray/tests/spark/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,23 @@
import ray

import ray.util.spark.cluster_init
from ray.util.spark import init_ray_cluster, shutdown_ray_cluster
from ray.util.spark.cluster_init import _init_ray_cluster
from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster, MAX_NUM_WORKER_NODES
from ray.util.spark.utils import check_port_open
from pyspark.sql import SparkSession
import time
import logging
from contextlib import contextmanager


@contextmanager
def _setup_ray_cluster(*args, **kwds):
# Code to acquire resource, e.g.:
setup_ray_cluster(*args, **kwds)
try:
yield ray.util.spark.cluster_init._active_ray_cluster
finally:
shutdown_ray_cluster()


pytestmark = pytest.mark.skipif(
not sys.platform.startswith("linux"),
Expand All @@ -32,10 +43,6 @@ class RayOnSparkCPUClusterTestBase(ABC):
num_cpus_per_spark_task = None
max_spark_tasks = None

@classmethod
def setup_class(cls):
pass

@classmethod
def teardown_class(cls):
time.sleep(10) # Wait all background spark job canceled.
Expand All @@ -51,23 +58,41 @@ def get_ray_worker_resources_list():
return wr_list

def test_cpu_allocation(self):
for num_spark_tasks in [self.max_spark_tasks // 2, self.max_spark_tasks]:
with _init_ray_cluster(num_worker_nodes=num_spark_tasks, safe_mode=False):
for num_worker_nodes, num_cpus_per_node, num_worker_nodes_arg in [
(
self.max_spark_tasks // 2,
self.num_cpus_per_spark_task,
self.max_spark_tasks // 2,
),
(self.max_spark_tasks, self.num_cpus_per_spark_task, MAX_NUM_WORKER_NODES),
(
self.max_spark_tasks // 2,
self.num_cpus_per_spark_task * 2,
MAX_NUM_WORKER_NODES,
),
]:
with _setup_ray_cluster(
num_worker_nodes=num_worker_nodes_arg,
num_cpus_per_node=num_cpus_per_node,
head_node_options={"include_dashboard": False},
):
ray.init()
worker_res_list = self.get_ray_worker_resources_list()
assert len(worker_res_list) == num_spark_tasks
assert len(worker_res_list) == num_worker_nodes
for worker_res in worker_res_list:
assert worker_res["CPU"] == self.num_cpus_per_spark_task
assert worker_res["CPU"] == num_cpus_per_node

def test_public_api(self):
try:
ray_temp_root_dir = tempfile.mkdtemp()
collect_log_to_path = tempfile.mkdtemp()
init_ray_cluster(
num_worker_nodes=self.max_spark_tasks,
safe_mode=False,
setup_ray_cluster(
num_worker_nodes=MAX_NUM_WORKER_NODES,
collect_log_to_path=collect_log_to_path,
ray_temp_root_dir=ray_temp_root_dir,
head_node_options={"include_dashboard": True},
)
ray.init()

@ray.remote
def f(x):
Expand Down Expand Up @@ -102,9 +127,8 @@ def f(x):
shutil.rmtree(collect_log_to_path, ignore_errors=True)

def test_ray_cluster_shutdown(self):
with _init_ray_cluster(
num_worker_nodes=self.max_spark_tasks, safe_mode=False
) as cluster:
with _setup_ray_cluster(num_worker_nodes=self.max_spark_tasks) as cluster:
ray.init()
assert len(self.get_ray_worker_resources_list()) == self.max_spark_tasks

# Test: cancel background spark job will cause all ray worker nodes exit.
Expand All @@ -119,9 +143,8 @@ def test_ray_cluster_shutdown(self):
assert not check_port_open(hostname, int(port))

def test_background_spark_job_exit_trigger_ray_head_exit(self):
with _init_ray_cluster(
num_worker_nodes=self.max_spark_tasks, safe_mode=False
) as cluster:
with _setup_ray_cluster(num_worker_nodes=self.max_spark_tasks) as cluster:
ray.init()
# Mimic the case the job failed unexpectedly.
cluster._cancel_background_spark_job()
cluster.spark_job_is_canceled = False
Expand All @@ -135,7 +158,6 @@ def test_background_spark_job_exit_trigger_ray_head_exit(self):
class TestBasicSparkCluster(RayOnSparkCPUClusterTestBase):
@classmethod
def setup_class(cls):
super().setup_class()
cls.num_total_cpus = 2
cls.num_total_gpus = 0
cls.num_cpus_per_spark_task = 1
Expand All @@ -146,6 +168,7 @@ def setup_class(cls):
SparkSession.builder.master("local-cluster[1, 2, 1024]")
.config("spark.task.cpus", "1")
.config("spark.task.maxFailures", "1")
.config("spark.executorEnv.RAY_ON_SPARK_WORKER_CPU_CORES", "2")
.getOrCreate()
)

Expand Down
3 changes: 2 additions & 1 deletion python/ray/tests/spark/test_multicores_per_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
class TestMultiCoresPerTaskCluster(RayOnSparkGPUClusterTestBase):
@classmethod
def setup_class(cls):
super().setup_class()
cls.num_total_cpus = 4
cls.num_total_gpus = 4
cls.num_cpus_per_spark_task = 2
Expand All @@ -34,6 +33,8 @@ def setup_class(cls):
.config(
"spark.worker.resource.gpu.discoveryScript", gpu_discovery_script_path
)
.config("spark.executorEnv.RAY_ON_SPARK_WORKER_CPU_CORES", "4")
.config("spark.executorEnv.RAY_ON_SPARK_WORKER_GPU_NUM", "4")
.getOrCreate()
)

Expand Down
Loading