Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

For testing #9

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion python/ray/internal/internal_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ def global_gc():
worker.core_worker.global_gc()


def memory_summary(node_manager_address=None, node_manager_port=None):
def memory_summary(node_manager_address=None,
node_manager_port=None,
stats_only=False):
"""Returns a formatted string describing memory usage in the cluster."""

import grpc
Expand Down Expand Up @@ -63,6 +65,13 @@ def memory_summary(node_manager_address=None, node_manager_port=None):
reply.store_stats.restored_objects_total,
int(reply.store_stats.restored_bytes_total / (1024 * 1024) /
reply.store_stats.restore_time_total_s)))
if reply.store_stats.referenced_bytes > 0:
store_summary += (
"Total bytes of plasma objects referenced by ray tasks "
"or ray.get: {} MiB.".format(
int(reply.store_stats.referenced_bytes / (1024 * 1024))))
if stats_only:
return store_summary
return reply.memory_summary + "\n" + store_summary


Expand Down
11 changes: 9 additions & 2 deletions python/ray/scripts/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -1372,7 +1372,13 @@ def timeline(address):
type=str,
default=ray_constants.REDIS_DEFAULT_PASSWORD,
help="Connect to ray with redis_password.")
def memory(address, redis_password):
@click.option(
"--stats-only",
is_flag=True,
type=bool,
default=False,
help="Connect to ray with redis_password.")
def memory(address, redis_password, stats_only):
"""Print object references held in a Ray cluster."""
if not address:
address = services.get_ray_address_to_use_or_die()
Expand All @@ -1381,7 +1387,8 @@ def memory(address, redis_password):
raylet = state.node_table()[0]
print(
ray.internal.internal_api.memory_summary(raylet["NodeManagerAddress"],
raylet["NodeManagerPort"]))
raylet["NodeManagerPort"],
stats_only))


@cli.command()
Expand Down
86 changes: 63 additions & 23 deletions python/ray/tests/test_object_spilling.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,27 @@ def is_dir_empty(temp_folder,
return num_files == 0


def assert_no_thrashing(address):
state = ray.state.GlobalState()
state._initialize_global_state(address,
ray.ray_constants.REDIS_DEFAULT_PASSWORD)
raylet = state.node_table()[0]
memory_summary = ray.internal.internal_api.memory_summary(
raylet["NodeManagerAddress"],
raylet["NodeManagerPort"],
stats_only=True)
restored_bytes = 0
referenced_bytes = 0

for line in memory_summary.split("\n"):
if "Restored" in line:
restored_bytes = int(line.split(" ")[1])
if "referenced" in line:
referenced_bytes = int(line.split(" ")[-2])
assert referenced_bytes >= restored_bytes, (
f"referenced: {referenced_bytes}, restored: {restored_bytes}")


def test_invalid_config_raises_exception(shutdown_only):
# Make sure ray.init raises an exception before
# it starts processes when invalid object spilling
Expand Down Expand Up @@ -187,7 +208,7 @@ def test_spilling_not_done_for_pinned_object(object_spilling_config,
shutdown_only):
# Limit our object store to 75 MiB of memory.
object_spilling_config, temp_folder = object_spilling_config
ray.init(
address = ray.init(
object_store_memory=75 * 1024 * 1024,
_system_config={
"max_io_workers": 4,
Expand All @@ -203,6 +224,7 @@ def test_spilling_not_done_for_pinned_object(object_spilling_config,
ref2 = ray.put(arr) # noqa

wait_for_condition(lambda: is_dir_empty(temp_folder))
assert_no_thrashing(address["redis_address"])


@pytest.mark.skipif(
Expand Down Expand Up @@ -249,14 +271,15 @@ def depends(arg):

# Test passing the spilled object as an arg to another task.
ray.get(depends.remote(ref))
assert_no_thrashing(cluster.address)


@pytest.mark.skipif(
platform.system() == "Windows", reason="Failing on Windows.")
def test_spill_objects_automatically(object_spilling_config, shutdown_only):
# Limit our object store to 75 MiB of memory.
object_spilling_config, _ = object_spilling_config
ray.init(
address = ray.init(
num_cpus=1,
object_store_memory=75 * 1024 * 1024,
_system_config={
Expand Down Expand Up @@ -287,14 +310,15 @@ def test_spill_objects_automatically(object_spilling_config, shutdown_only):
solution = solution_buffer[index]
sample = ray.get(ref, timeout=0)
assert np.array_equal(sample, solution)
assert_no_thrashing(address["redis_address"])


@pytest.mark.skipif(
platform.system() in ["Darwin", "Windows"], reason="Failing on Windows.")
platform.system() in ["Windows", "Darwin"], reason="Failing on Windows.")
def test_spill_stats(object_spilling_config, shutdown_only):
# Limit our object store to 75 MiB of memory.
object_spilling_config, _ = object_spilling_config
ray.init(
address = ray.init(
num_cpus=1,
object_store_memory=100 * 1024 * 1024,
_system_config={
Expand All @@ -319,17 +343,31 @@ def f():

x_id = f.remote() # noqa
ray.get(x_id)
s = memory_summary()
s = memory_summary(stats_only=True)
assert "Plasma memory usage 50 MiB, 1 objects, 50.0% full" in s, s
assert "Spilled 200 MiB, 4 objects" in s, s
assert "Restored 150 MiB, 3 objects" in s, s

# Test if referenced bytes are correctly calculated.
obj = ray.put(np.zeros(30 * 1024 * 1024, dtype=np.uint8))

@ray.remote
def func_with_ref(obj):
return True

ray.get(func_with_ref.remote(obj))

s = memory_summary(stats_only=True)
# 50MB * 5 references + 30MB used for task execution.
assert "ray.get: 280 MiB." in s, s
assert_no_thrashing(address["redis_address"])


@pytest.mark.skipif(
platform.system() == "Windows", reason="Failing on Windows.")
def test_spill_during_get(object_spilling_config, shutdown_only):
object_spilling_config, _ = object_spilling_config
ray.init(
address = ray.init(
num_cpus=4,
object_store_memory=100 * 1024 * 1024,
_system_config={
Expand All @@ -355,14 +393,15 @@ def f():
# objects are being created.
for x in ids:
print(ray.get(x).shape)
assert_no_thrashing(address["redis_address"])


@pytest.mark.skipif(
platform.system() == "Windows", reason="Failing on Windows.")
def test_spill_deadlock(object_spilling_config, shutdown_only):
object_spilling_config, _ = object_spilling_config
# Limit our object store to 75 MiB of memory.
ray.init(
address = ray.init(
object_store_memory=75 * 1024 * 1024,
_system_config={
"max_io_workers": 1,
Expand All @@ -386,6 +425,7 @@ def test_spill_deadlock(object_spilling_config, shutdown_only):
ref = random.choice(replay_buffer)
sample = ray.get(ref, timeout=0)
assert np.array_equal(sample, arr)
assert_no_thrashing(address["redis_address"])


@pytest.mark.skipif(
Expand All @@ -394,7 +434,7 @@ def test_delete_objects(object_spilling_config, shutdown_only):
# Limit our object store to 75 MiB of memory.
object_spilling_config, temp_folder = object_spilling_config

ray.init(
address = ray.init(
object_store_memory=75 * 1024 * 1024,
_system_config={
"max_io_workers": 1,
Expand All @@ -417,6 +457,7 @@ def test_delete_objects(object_spilling_config, shutdown_only):
del replay_buffer
del ref
wait_for_condition(lambda: is_dir_empty(temp_folder))
assert_no_thrashing(address["redis_address"])


@pytest.mark.skipif(
Expand All @@ -426,7 +467,7 @@ def test_delete_objects_delete_while_creating(object_spilling_config,
# Limit our object store to 75 MiB of memory.
object_spilling_config, temp_folder = object_spilling_config

ray.init(
address = ray.init(
object_store_memory=75 * 1024 * 1024,
_system_config={
"max_io_workers": 4,
Expand Down Expand Up @@ -457,6 +498,7 @@ def test_delete_objects_delete_while_creating(object_spilling_config,
del replay_buffer
del ref
wait_for_condition(lambda: is_dir_empty(temp_folder))
assert_no_thrashing(address["redis_address"])


@pytest.mark.skipif(
Expand All @@ -466,7 +508,7 @@ def test_delete_objects_on_worker_failure(object_spilling_config,
# Limit our object store to 75 MiB of memory.
object_spilling_config, temp_folder = object_spilling_config

ray.init(
address = ray.init(
object_store_memory=75 * 1024 * 1024,
_system_config={
"max_io_workers": 4,
Expand Down Expand Up @@ -518,6 +560,7 @@ def wait_until_actor_dead():

# After all, make sure all objects are deleted upon worker failures.
wait_for_condition(lambda: is_dir_empty(temp_folder))
assert_no_thrashing(address["redis_address"])


@pytest.mark.skipif(
Expand All @@ -539,10 +582,11 @@ def test_delete_objects_multi_node(multi_node_object_spilling_config,
"object_store_full_delay_ms": 100,
"object_spilling_config": object_spilling_config,
})
ray.init(address=cluster.address)
# Add 2 worker nodes.
for _ in range(2):
cluster.add_node(num_cpus=1, object_store_memory=75 * 1024 * 1024)
ray.init(address=cluster.address)
cluster.wait_for_nodes()

arr = np.random.rand(1024 * 1024) # 8 MB data

Expand All @@ -565,9 +609,9 @@ def create_objects(self):
self.replay_buffer.pop()

# Do random sampling.
for _ in range(200):
for _ in range(50):
ref = random.choice(self.replay_buffer)
sample = ray.get(ref, timeout=0)
sample = ray.get(ref, timeout=10)
assert np.array_equal(sample, arr)

actors = [Actor.remote() for _ in range(3)]
Expand All @@ -586,14 +630,15 @@ def wait_until_actor_dead(actor):
wait_for_condition(lambda: wait_until_actor_dead(actor))
# The multi node deletion should work.
wait_for_condition(lambda: is_dir_empty(temp_folder))
assert_no_thrashing(cluster.address)


@pytest.mark.skipif(platform.system() == "Windows", reason="Flaky on Windows.")
def test_fusion_objects(object_spilling_config, shutdown_only):
# Limit our object store to 75 MiB of memory.
object_spilling_config, temp_folder = object_spilling_config
min_spilling_size = 10 * 1024 * 1024
ray.init(
address = ray.init(
object_store_memory=75 * 1024 * 1024,
_system_config={
"max_io_workers": 3,
Expand Down Expand Up @@ -637,12 +682,13 @@ def test_fusion_objects(object_spilling_config, shutdown_only):
if file_size >= min_spilling_size:
is_test_passing = True
assert is_test_passing
assert_no_thrashing(address["redis_address"])


# https://github.com/ray-project/ray/issues/12912
def do_test_release_resource(object_spilling_config, expect_released):
object_spilling_config, temp_folder = object_spilling_config
ray.init(
address = ray.init(
num_cpus=1,
object_store_memory=75 * 1024 * 1024,
_system_config={
Expand Down Expand Up @@ -674,6 +720,7 @@ def f(dep):
assert ready
else:
assert not ready
assert_no_thrashing(address["redis_address"])


@pytest.mark.skipif(
Expand Down Expand Up @@ -745,6 +792,7 @@ def allocate(*args):
# spilling.
tasks = [foo.remote(*task_args) for task_args in args]
ray.get(tasks)
assert_no_thrashing(cluster.address)


@pytest.mark.skipif(
Expand Down Expand Up @@ -801,14 +849,6 @@ def test_file_deleted_when_driver_exits(tmp_path, shutdown_only):
driver.format(temp_dir=str(temp_folder), signum=2)))
wait_for_condition(lambda: is_dir_empty(temp_folder, append_path=""))

# Q: Looks like Sigterm doesn't work with Ray?
# print("Sending sigterm...")
# # Run a driver with sigterm.
# with pytest.raises(subprocess.CalledProcessError):
# print(run_string_as_driver(
# driver.format(temp_dir=str(temp_folder), signum=15)))
# wait_for_condition(is_dir_empty, timeout=1000)


if __name__ == "__main__":
sys.exit(pytest.main(["-sv", __file__]))
2 changes: 2 additions & 0 deletions python/ray/util/dask/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,8 @@ def _rayify_task(
# Ray properly tracks the object dependencies between Ray tasks.
object_refs, repack = unpack_object_refs(args, deps)
# Submit the task using a wrapper function.
for ref in object_refs:
print(f"[GET] object_id:{ref.hex()} size:0 func:{key}")
object_ref = dask_task_wrapper.options(name=f"dask:{key!s}").remote(
func, repack, key, ray_pretask_cbs, ray_posttask_cbs, *object_refs)

Expand Down
3 changes: 3 additions & 0 deletions release/data_processing_tests/workloads/output.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
system,operation,num_nodes,nbytes,npartitions,duration
ray,sort,1,1000000000,50,38.00257587432861
ray,sort,1,1000000000,100,65.93781399726868
Loading