Skip to content

Commit

Permalink
[Core] Remove dead actor checkpoint code (#32045)
Browse files Browse the repository at this point in the history
Checkpointable actor is already removed in #10333
  • Loading branch information
jjyao authored Jan 30, 2023
1 parent e331f6e commit 907e968
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 169 deletions.
2 changes: 1 addition & 1 deletion python/ray/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1189,7 +1189,7 @@ def __getattr__(self, item):
raise AttributeError(
f"'{type(self).__name__}' object has " f"no attribute '{item}'"
)
if item in ["__ray_terminate__", "__ray_checkpoint__"]:
if item in ["__ray_terminate__"]:

class FakeActorMethod(object):
def __call__(self, *args, **kwargs):
Expand Down
168 changes: 0 additions & 168 deletions python/ray/tests/test_actor_advanced.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from ray._private.test_utils import (
SignalActor,
convert_actor_state,
get_non_head_nodes,
kill_actor_and_wait_for_failure,
make_global_state_accessor,
run_string_as_driver,
Expand Down Expand Up @@ -268,173 +267,6 @@ def inc(actor_handle):
results = ray.get(results)


def setup_counter_actor(
test_checkpoint=False, save_exception=False, resume_exception=False
):
# Only set the checkpoint interval if we're testing with checkpointing.
checkpoint_interval = -1
if test_checkpoint:
checkpoint_interval = 5

@ray.remote(checkpoint_interval=checkpoint_interval)
class Counter:
_resume_exception = resume_exception

def __init__(self, save_exception):
self.x = 0
self.num_inc_calls = 0
self.save_exception = save_exception
self.restored = False

def node_id(self):
return ray._private.worker.global_worker.node.unique_id

def inc(self, *xs):
self.x += 1
self.num_inc_calls += 1
return self.x

def get_num_inc_calls(self):
return self.num_inc_calls

def test_restore(self):
# This method will only return True if __ray_restore__ has been
# called.
return self.restored

def __ray_save__(self):
if self.save_exception:
raise Exception("Exception raised in checkpoint save")
return self.x, self.save_exception

def __ray_restore__(self, checkpoint):
if self._resume_exception:
raise Exception("Exception raised in checkpoint resume")
self.x, self.save_exception = checkpoint
self.num_inc_calls = 0
self.restored = True

node_id = ray._private.worker.global_worker.node.unique_id

# Create an actor that is not on the raylet.
actor = Counter.remote(save_exception)
while ray.get(actor.node_id.remote()) == node_id:
actor = Counter.remote(save_exception)

args = [ray.put(0) for _ in range(100)]
ids = [actor.inc.remote(*args[i:]) for i in range(100)]

return actor, ids


@pytest.mark.skip("Fork/join consistency not yet implemented.")
def test_distributed_handle(ray_start_cluster_2_nodes):
cluster = ray_start_cluster_2_nodes
counter, ids = setup_counter_actor(test_checkpoint=False)

@ray.remote
def fork_many_incs(counter, num_incs):
x = None
for _ in range(num_incs):
x = counter.inc.remote()
# Only call ray.get() on the last task submitted.
return ray.get(x)

# Fork num_iters times.
count = ray.get(ids[-1])
num_incs = 100
num_iters = 10
forks = [fork_many_incs.remote(counter, num_incs) for _ in range(num_iters)]
ray.wait(forks, num_returns=len(forks))
count += num_incs * num_iters

# Kill the second plasma store to get rid of the cached objects and
# trigger the corresponding raylet to exit.
# TODO: kill raylet instead once this test is not skipped.
get_non_head_nodes(cluster)[0].kill_plasma_store(wait=True)

# Check that the actor did not restore from a checkpoint.
assert not ray.get(counter.test_restore.remote())
# Check that we can submit another call on the actor and get the
# correct counter result.
x = ray.get(counter.inc.remote())
assert x == count + 1


@pytest.mark.skip("This test does not work yet.")
def test_remote_checkpoint_distributed_handle(ray_start_cluster_2_nodes):
cluster = ray_start_cluster_2_nodes
counter, ids = setup_counter_actor(test_checkpoint=True)

@ray.remote
def fork_many_incs(counter, num_incs):
x = None
for _ in range(num_incs):
x = counter.inc.remote()
# Only call ray.get() on the last task submitted.
return ray.get(x)

# Fork num_iters times.
count = ray.get(ids[-1])
num_incs = 100
num_iters = 10
forks = [fork_many_incs.remote(counter, num_incs) for _ in range(num_iters)]
ray.wait(forks, num_returns=len(forks))
ray.wait([counter.__ray_checkpoint__.remote()])
count += num_incs * num_iters

# Kill the second plasma store to get rid of the cached objects and
# trigger the corresponding raylet to exit.
# TODO: kill raylet instead once this test is not skipped.
get_non_head_nodes(cluster)[0].kill_plasma_store(wait=True)

# Check that the actor restored from a checkpoint.
assert ray.get(counter.test_restore.remote())
# Check that the number of inc calls since actor initialization is
# exactly zero, since there could not have been another inc call since
# the remote checkpoint.
num_inc_calls = ray.get(counter.get_num_inc_calls.remote())
assert num_inc_calls == 0
# Check that we can submit another call on the actor and get the
# correct counter result.
x = ray.get(counter.inc.remote())
assert x == count + 1


@pytest.mark.skip("Fork/join consistency not yet implemented.")
def test_checkpoint_distributed_handle(ray_start_cluster_2_nodes):
cluster = ray_start_cluster_2_nodes
counter, ids = setup_counter_actor(test_checkpoint=True)

@ray.remote
def fork_many_incs(counter, num_incs):
x = None
for _ in range(num_incs):
x = counter.inc.remote()
# Only call ray.get() on the last task submitted.
return ray.get(x)

# Fork num_iters times.
count = ray.get(ids[-1])
num_incs = 100
num_iters = 10
forks = [fork_many_incs.remote(counter, num_incs) for _ in range(num_iters)]
ray.wait(forks, num_returns=len(forks))
count += num_incs * num_iters

# Kill the second plasma store to get rid of the cached objects and
# trigger the corresponding raylet to exit.
# TODO: kill raylet instead once this test is not skipped.
get_non_head_nodes(cluster)[0].kill_plasma_store(wait=True)

# Check that the actor restored from a checkpoint.
assert ray.get(counter.test_restore.remote())
# Check that we can submit another call on the actor and get the
# correct counter result.
x = ray.get(counter.inc.remote())
assert x == count + 1


@pytest.fixture
def setup_queue_actor():
ray.init(num_cpus=1, object_store_memory=int(150 * 1024 * 1024))
Expand Down

0 comments on commit 907e968

Please sign in to comment.