diff --git a/doc/source/ray-core/direct-transport.rst b/doc/source/ray-core/direct-transport.rst index c9d4838d7b44..8f79f72fbade 100644 --- a/doc/source/ray-core/direct-transport.rst +++ b/doc/source/ray-core/direct-transport.rst @@ -298,6 +298,13 @@ For collective-based tensor transports (Gloo and NCCL): * Any unexpected system bugs +Due to a known issue, we currently do not support repeated transfers of tensors that share the same memory space but simultaneously belong to different objects. To support this pattern, ensure that the first object is freed before storing the same tensor again in a second object. + +.. literalinclude:: doc_code/direct_transport_nixl.py + :language: python + :start-after: __nixl_limitations_start__ + :end-before: __nixl_limitations_end__ + Advanced: RDT Internals ======================= diff --git a/doc/source/ray-core/doc_code/direct_transport_nixl.py b/doc/source/ray-core/doc_code/direct_transport_nixl.py index 2acf084cf195..6267b8c24fc2 100644 --- a/doc/source/ray-core/doc_code/direct_transport_nixl.py +++ b/doc/source/ray-core/doc_code/direct_transport_nixl.py @@ -56,3 +56,36 @@ def consume_with_nixl(self, refs): ref1 = receiver.consume_with_nixl.remote(refs) print(ray.get(ref1)) # __nixl_put__and_get_end__ + + +# __nixl_limitations_start__ +@ray.remote(num_gpus=1) +class Actor: + def __init__(self): + self.tensor1 = torch.tensor([1, 2, 3]) + self.tensor2 = torch.tensor([4, 5, 6]) + self.tensor3 = torch.tensor([7, 8, 9]) + + @ray.method(tensor_transport="nixl") + def send_dict1(self): + return {"round1-1": self.tensor1, "round1-2": self.tensor2} + + @ray.method(tensor_transport="nixl") + def send_dict2(self): + return {"round2-1": self.tensor1, "round2-3": self.tensor3} + + def sum_dict(self, dict): + return sum(v.sum().item() for v in dict.values()) + + +sender, receiver = Actor.remote(), Actor.remote() +ref1 = sender.send_dict1.remote() +result1 = receiver.sum_dict.remote(ref1) +print(ray.get(result1)) +ref2 = sender.send_dict2.remote() +result2 = receiver.sum_dict.remote(ref2) +try: + print(ray.get(result2)) +except ValueError as e: + print("Error caught:", e) +# __nixl_limitations_end__ diff --git a/python/ray/experimental/gpu_object_manager/gpu_object_store.py b/python/ray/experimental/gpu_object_manager/gpu_object_store.py index d52bec00c157..78c2c2f38606 100644 --- a/python/ray/experimental/gpu_object_manager/gpu_object_store.py +++ b/python/ray/experimental/gpu_object_manager/gpu_object_store.py @@ -202,6 +202,11 @@ def add_object( is_primary: Whether the GPU object is the primary copy. """ with self._object_present_cv: + for tensor in gpu_object: + if tensor in self._tensor_to_object_ids: + raise ValueError( + f"Tensor already exists in the RDT object store. Free all references to ObjectRef({obj_id}) before storing the tensor again." + ) for tensor in gpu_object: self._tensor_to_object_ids[tensor].add(obj_id) # Append to the queue instead of overwriting diff --git a/python/ray/tests/gpu_objects/test_gpu_objects_gloo.py b/python/ray/tests/gpu_objects/test_gpu_objects_gloo.py index b98cd9077124..ddecf58adfef 100644 --- a/python/ray/tests/gpu_objects/test_gpu_objects_gloo.py +++ b/python/ray/tests/gpu_objects/test_gpu_objects_gloo.py @@ -809,6 +809,9 @@ def gc(): assert not gpu_object_store.has_object(obj_id) +@pytest.mark.skip( + reason="RDT currently doesn't support multiple objects containing the same tensor" +) def test_wait_tensor_freed_double_tensor(ray_start_regular): """Unit test for ray.experimental.wait_tensor_freed when multiple objects contain the same tensor.""" @@ -848,6 +851,9 @@ def gc(obj_id): assert not gpu_object_store.has_object(obj_id2) +@pytest.mark.skip( + reason="RDT currently doesn't support multiple objects containing the same tensor" +) def test_send_back_and_dst_warning(ray_start_regular): # Test warning when object is sent back to the src actor and to dst actors world_size = 2