Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Doc] Add tips of writing fault tolerant Ray applications #32191

Merged
merged 5 commits into from
Feb 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions doc/source/ray-core/doc_code/anti_pattern_return_ray_put.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,6 @@ def task_with_dynamic_returns_bad(n):
return return_value_refs


# Note: currently actor tasks don't support dynamic returns
# so the previous approach needs to be used if you want to
# return dynamic number of objects from an actor task.
@ray.remote(num_returns="dynamic")
def task_with_dynamic_returns_good(n):
for i in range(n):
Expand Down
88 changes: 88 additions & 0 deletions doc/source/ray-core/doc_code/fault_tolerance_tips.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# __return_ray_put_start__
import ray


# Non-fault tolerant version:
@ray.remote
def a():
x_ref = ray.put(1)
return x_ref


x_ref = ray.get(a.remote())
# Object x outlives its owner task A.
try:
# If owner of x (i.e. the worker process running task A) dies,
# the application can no longer get value of x.
print(ray.get(x_ref))
except ray.exceptions.OwnerDiedError:
pass
# __return_ray_put_end__


# __return_directly_start__
# Fault tolerant version:
jjyao marked this conversation as resolved.
Show resolved Hide resolved
@ray.remote
def a():
# Here we return the value directly instead of calling ray.put() first.
return 1
jjyao marked this conversation as resolved.
Show resolved Hide resolved


# The owner of x is the driver
# so x is accessible and can be auto recovered
# during the entire lifetime of the driver.
x_ref = a.remote()
print(ray.get(x_ref))
# __return_directly_end__


# __node_ip_resource_start__
@ray.remote
def b():
return 1


# If the node with ip 127.0.0.3 fails while task b is running,
# Ray cannot retry the task on other nodes.
b.options(resources={"node:127.0.0.3": 1}).remote()
# __node_ip_resource_end__

# __node_affinity_scheduling_strategy_start__
# Prefer running on the particular node specified by node id
# but can also run on other nodes if the target node fails.
b.options(
scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
node_id=ray.get_runtime_context().get_node_id(), soft=True
)
).remote()
# __node_affinity_scheduling_strategy_end__


# __manual_retry_start__
@ray.remote
class Actor:
def read_only(self):
import sys
import random

rand = random.random()
if rand < 0.2:
return 2 / 0
elif rand < 0.3:
sys.exit(1)

return 2


actor = Actor.remote()
# Manually retry the actor task.
while True:
try:
print(ray.get(actor.read_only.remote()))
break
except ZeroDivisionError:
pass
except ray.exceptions.RayActorError:
# Manually restart the actor
actor = Actor.remote()
# __manual_retry_end__
57 changes: 57 additions & 0 deletions doc/source/ray-core/fault-tolerance.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,63 @@ tolerance for more information on these mechanisms.
Ray also provides several mechanisms to automatically recover from internal system-level failures. In particular, Ray can automatically recover from some failures in the :ref:`distributed object store <fault-tolerance-objects>`.


How to Write Fault Tolerant Ray Applications
--------------------------------------------

There are several recommendations to make Ray applications fault tolerant:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

anything else you'd like to add @stephanie-wang ?


First, if the fault tolerance mechanisms provided by Ray don't work for you,
you can always catch :ref:`exceptions <ray-core-exceptions>` caused by failures and recover manually.

.. literalinclude:: doc_code/fault_tolerance_tips.py
:language: python
:start-after: __manual_retry_start__
:end-before: __manual_retry_end__

Second, avoid letting an ``ObjectRef`` outlive its :ref:`owner <fault-tolerance-objects>` task or actor
(the task or actor that creates the initial ``ObjectRef`` by calling :meth:`ray.put() <ray.put>` or ``foo.remote()``).
As long as there are still references to an object,
the owner worker of the object keeps running even after the corresponding task or actor finishes.
If the owner worker fails, Ray :ref:`cannot recover <fault-tolerance-ownership>` the object automatically for those who try to access the object.
One example of creating such outlived objects is returning ``ObjectRef`` created by ``ray.put()`` from a task:

.. literalinclude:: doc_code/fault_tolerance_tips.py
:language: python
:start-after: __return_ray_put_start__
:end-before: __return_ray_put_end__

In the above example, object ``x`` outlives its owner task ``a``.
If the worker process running task ``a`` fails, calling ``ray.get`` on ``x_ref`` afterwards will result in an ``OwnerDiedError`` exception.

A fault tolerant version is returning ``x`` directly so that it is owned by the driver and it's only accessed within the lifetime of the driver.
If ``x`` is lost, Ray can automatically recover it via :ref:`lineage reconstruction <fault-tolerance-objects-reconstruction>`.
See :doc:`/ray-core/patterns/return-ray-put` for more details.

.. literalinclude:: doc_code/fault_tolerance_tips.py
:language: python
:start-after: __return_directly_start__
:end-before: __return_directly_end__

Third, avoid using :ref:`custom resource requirements <custom-resources>` that can only be satisfied by a particular node.
If that particular node fails, the running tasks or actors cannot be retried.

.. literalinclude:: doc_code/fault_tolerance_tips.py
:language: python
:start-after: __node_ip_resource_start__
:end-before: __node_ip_resource_end__

If you prefer running a task on a particular node, you can use the :class:`NodeAffinitySchedulingStrategy <ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy>`.
It allows you to specify the affinity as a soft constraint so even if the target node fails, the task can still be retried on other nodes.

.. literalinclude:: doc_code/fault_tolerance_tips.py
:language: python
:start-after: _node_affinity_scheduling_strategy_start__
:end-before: __node_affinity_scheduling_strategy_end__


More about Ray Fault Tolerance
------------------------------

.. toctree::
:maxdepth: 1

Expand Down
6 changes: 1 addition & 5 deletions doc/source/ray-core/patterns/return-ray-put.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Returning ray.put() ObjectRefs are considered anti-patterns for the following re
- It disallows inlining small return values: Ray has a performance optimization to return small (<= 100KB) values inline directly to the caller, avoiding going through the distributed object store.
On the other hand, ``ray.put()`` will unconditionally store the value to the object store which makes the optimization for small return values impossible.
- Returning ObjectRefs involves extra distributed reference counting protocol which is slower than returning the values directly.
- It's less fault tolerant: the worker process that calls ``ray.put()`` is the "owner" of the returned ``ObjectRef`` and the return value fate shares with the owner. If the worker process dies, the return value is lost.
- It's less :ref:`fault tolerant <fault-tolerance>`: the worker process that calls ``ray.put()`` is the "owner" of the returned ``ObjectRef`` and the return value fate shares with the owner. If the worker process dies, the return value is lost.
In contrast, the caller process (often the driver) is the owner of the return value if it's returned directly.

Code example
Expand All @@ -35,7 +35,3 @@ If you don't know the number of returns before calling the task, you should use
:language: python
:start-after: __return_dynamic_multi_values_start__
:end-before: __return_dynamic_multi_values_end__

.. note::

Currently actor tasks don't support dynamic returns so you have to use ``ray.put()`` to store each return value and return a list of ObjectRefs in this case.