From a62de6f140ccb5962ed80b633696008323f15d2f Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Wed, 8 Oct 2025 12:48:25 +0530 Subject: [PATCH 1/6] [core] Make FreeObjects non-fatal and retryable Signed-off-by: Sagar Sumit --- src/ray/object_manager/object_buffer_pool.cc | 25 ++++++++++++++-- .../tests/object_manager_test.cc | 29 +++++++++++++++++++ 2 files changed, 52 insertions(+), 2 deletions(-) diff --git a/src/ray/object_manager/object_buffer_pool.cc b/src/ray/object_manager/object_buffer_pool.cc index 00dcbe2b8564..2d9082288682 100644 --- a/src/ray/object_manager/object_buffer_pool.cc +++ b/src/ray/object_manager/object_buffer_pool.cc @@ -327,8 +327,29 @@ ray::Status ObjectBufferPool::EnsureBufferExists(const ObjectID &object_id, } void ObjectBufferPool::FreeObjects(const std::vector &object_ids) { - absl::MutexLock lock(&pool_mutex_); - RAY_CHECK_OK(store_client_->Delete(object_ids)); + constexpr int kMaxAttempts = 3; + absl::Duration backoff = absl::Milliseconds(1); + for (int attempt = 1; attempt <= kMaxAttempts; ++attempt) { + Status s; + { + absl::MutexLock lock(&pool_mutex_); + s = store_client_->Delete(object_ids); + } + if (s.ok()) { + return; + } + if (!s.IsIOError()) { + RAY_LOG(WARNING) << "Plasma delete failed (non-IOError), not retrying: " << s; + return; + } + RAY_LOG(WARNING) << "Plasma delete I/O error (attempt " << attempt << " of " + << kMaxAttempts << "): " << s; + if (attempt < kMaxAttempts) { + absl::SleepFor(backoff); + backoff = backoff * 2; + } + } + RAY_LOG(WARNING) << "Plasma delete failed after retries (non-fatal)."; } std::string ObjectBufferPool::DebugString() const { diff --git a/src/ray/object_manager/tests/object_manager_test.cc b/src/ray/object_manager/tests/object_manager_test.cc index d133d291183f..7203695c5e12 100644 --- a/src/ray/object_manager/tests/object_manager_test.cc +++ b/src/ray/object_manager/tests/object_manager_test.cc @@ -29,6 +29,7 @@ #include "ray/common/ray_object.h" #include "ray/common/status.h" #include "ray/object_manager/common.h" +#include "ray/object_manager/object_buffer_pool.h" #include "ray/object_manager/plasma/fake_plasma_client.h" #include "ray/object_manager_rpc_client/fake_object_manager_client.h" @@ -142,4 +143,32 @@ TEST_F(ObjectManagerTest, TestFreeObjectsLocalOnlyFalse) { ASSERT_EQ(NumRemoteFreeObjectsRequests(*object_manager_), 1); } +// A plasma client that simulates a transient I/O error on Delete once, then succeeds. +class FlakyDeletePlasmaClient : public ::plasma::FakePlasmaClient { + public: + Status Delete(const std::vector &object_ids) override { + attempts++; + if (attempts == 1) { + return ray::Status::IOError("No buffer space available"); + } + return ::plasma::FakePlasmaClient::Delete(object_ids); + } + + int attempts = 0; +}; + +TEST_F(ObjectManagerTest, TestObjectBufferPoolFreeObjectsRetryOnIOErrorThenSucceeds) { + auto flaky_client = std::make_shared(); + auto object_id = ObjectID::FromRandom(); + flaky_client->objects_in_plasma_[object_id] = + std::make_pair(std::vector(1), std::vector(1)); + + ObjectBufferPool pool(flaky_client, /*chunk_size=*/1024); + pool.FreeObjects({object_id}); + + // First attempt fails with IOError, second attempt succeeds. + ASSERT_GE(flaky_client->attempts, 2); + ASSERT_TRUE(!flaky_client->objects_in_plasma_.contains(object_id)); +} + } // namespace ray From 9fff4c1ad75b7de3191596123707e8d9b77058f9 Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Wed, 8 Oct 2025 14:10:54 +0530 Subject: [PATCH 2/6] improve test and increase backoff duration Signed-off-by: Sagar Sumit --- src/ray/object_manager/object_buffer_pool.cc | 6 ++++-- src/ray/object_manager/tests/object_manager_test.cc | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/ray/object_manager/object_buffer_pool.cc b/src/ray/object_manager/object_buffer_pool.cc index 2d9082288682..5f67523db474 100644 --- a/src/ray/object_manager/object_buffer_pool.cc +++ b/src/ray/object_manager/object_buffer_pool.cc @@ -328,8 +328,9 @@ ray::Status ObjectBufferPool::EnsureBufferExists(const ObjectID &object_id, void ObjectBufferPool::FreeObjects(const std::vector &object_ids) { constexpr int kMaxAttempts = 3; - absl::Duration backoff = absl::Milliseconds(1); - for (int attempt = 1; attempt <= kMaxAttempts; ++attempt) { + absl::Duration backoff = absl::Milliseconds(5); + int attempt = 1; + while (attempt <= kMaxAttempts) { Status s; { absl::MutexLock lock(&pool_mutex_); @@ -348,6 +349,7 @@ void ObjectBufferPool::FreeObjects(const std::vector &object_ids) { absl::SleepFor(backoff); backoff = backoff * 2; } + attempt++; } RAY_LOG(WARNING) << "Plasma delete failed after retries (non-fatal)."; } diff --git a/src/ray/object_manager/tests/object_manager_test.cc b/src/ray/object_manager/tests/object_manager_test.cc index 7203695c5e12..33b4578bd0d3 100644 --- a/src/ray/object_manager/tests/object_manager_test.cc +++ b/src/ray/object_manager/tests/object_manager_test.cc @@ -167,7 +167,7 @@ TEST_F(ObjectManagerTest, TestObjectBufferPoolFreeObjectsRetryOnIOErrorThenSucce pool.FreeObjects({object_id}); // First attempt fails with IOError, second attempt succeeds. - ASSERT_GE(flaky_client->attempts, 2); + ASSERT_EQ(flaky_client->attempts, 2); ASSERT_TRUE(!flaky_client->objects_in_plasma_.contains(object_id)); } From f804f91f366289a66e2b189b6030dabf3b9506a8 Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Fri, 17 Oct 2025 09:41:13 +0530 Subject: [PATCH 3/6] remove the retry and just log err Signed-off-by: Sagar Sumit --- src/ray/object_manager/object_buffer_pool.cc | 28 +++---------------- .../tests/object_manager_test.cc | 27 +++++++----------- 2 files changed, 14 insertions(+), 41 deletions(-) diff --git a/src/ray/object_manager/object_buffer_pool.cc b/src/ray/object_manager/object_buffer_pool.cc index 5e8d41ee93a6..8474f0020427 100644 --- a/src/ray/object_manager/object_buffer_pool.cc +++ b/src/ray/object_manager/object_buffer_pool.cc @@ -326,31 +326,11 @@ Status ObjectBufferPool::EnsureBufferExists(const ObjectID &object_id, } void ObjectBufferPool::FreeObjects(const std::vector &object_ids) { - constexpr int kMaxAttempts = 3; - absl::Duration backoff = absl::Milliseconds(5); - int attempt = 1; - while (attempt <= kMaxAttempts) { - Status s; - { - absl::MutexLock lock(&pool_mutex_); - s = store_client_->Delete(object_ids); - } - if (s.ok()) { - return; - } - if (!s.IsIOError()) { - RAY_LOG(WARNING) << "Plasma delete failed (non-IOError), not retrying: " << s; - return; - } - RAY_LOG(WARNING) << "Plasma delete I/O error (attempt " << attempt << " of " - << kMaxAttempts << "): " << s; - if (attempt < kMaxAttempts) { - absl::SleepFor(backoff); - backoff = backoff * 2; - } - attempt++; + absl::MutexLock lock(&pool_mutex_); + Status s = store_client_->Delete(object_ids); + if (!s.ok()) { + RAY_LOG(ERROR) << "Failed to delete objects from plasma store (non-fatal): " << s; } - RAY_LOG(WARNING) << "Plasma delete failed after retries (non-fatal)."; } std::string ObjectBufferPool::DebugString() const { diff --git a/src/ray/object_manager/tests/object_manager_test.cc b/src/ray/object_manager/tests/object_manager_test.cc index a59c0107ceac..caf2ecf8da29 100644 --- a/src/ray/object_manager/tests/object_manager_test.cc +++ b/src/ray/object_manager/tests/object_manager_test.cc @@ -144,32 +144,25 @@ TEST_F(ObjectManagerTest, TestFreeObjectsLocalOnlyFalse) { ASSERT_EQ(NumRemoteFreeObjectsRequests(*object_manager_), 1); } -// A plasma client that simulates a transient I/O error on Delete once, then succeeds. -class FlakyDeletePlasmaClient : public ::plasma::FakePlasmaClient { +// A plasma client that always returns an error on Delete. +class FailingDeletePlasmaClient : public ::plasma::FakePlasmaClient { public: Status Delete(const std::vector &object_ids) override { - attempts++; - if (attempts == 1) { - return ray::Status::IOError("No buffer space available"); - } - return ::plasma::FakePlasmaClient::Delete(object_ids); + delete_called = true; + return ray::Status::IOError("No buffer space available"); } - - int attempts = 0; + bool delete_called = false; }; -TEST_F(ObjectManagerTest, TestObjectBufferPoolFreeObjectsRetryOnIOErrorThenSucceeds) { - auto flaky_client = std::make_shared(); +TEST_F(ObjectManagerTest, TestObjectBufferPoolFreeObjectsNonFatalOnError) { + auto failing_client = std::make_shared(); auto object_id = ObjectID::FromRandom(); - flaky_client->objects_in_plasma_[object_id] = - std::make_pair(std::vector(1), std::vector(1)); - ObjectBufferPool pool(flaky_client, /*chunk_size=*/1024); + ObjectBufferPool pool(failing_client, /*chunk_size=*/1024); + // Should not crash even when Delete fails. pool.FreeObjects({object_id}); - // First attempt fails with IOError, second attempt succeeds. - ASSERT_EQ(flaky_client->attempts, 2); - ASSERT_TRUE(!flaky_client->objects_in_plasma_.contains(object_id)); + ASSERT_TRUE(failing_client->delete_called); } } // namespace ray From 508e35e3541e969af3d93513f7c77f2fe398860c Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Fri, 17 Oct 2025 13:23:58 +0530 Subject: [PATCH 4/6] Update log ObjectBufferPool::FreeObjects Co-authored-by: Dhyey Shah Signed-off-by: Sagar Sumit --- src/ray/object_manager/object_buffer_pool.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/object_manager/object_buffer_pool.cc b/src/ray/object_manager/object_buffer_pool.cc index 8474f0020427..34a11d885d33 100644 --- a/src/ray/object_manager/object_buffer_pool.cc +++ b/src/ray/object_manager/object_buffer_pool.cc @@ -329,7 +329,7 @@ void ObjectBufferPool::FreeObjects(const std::vector &object_ids) { absl::MutexLock lock(&pool_mutex_); Status s = store_client_->Delete(object_ids); if (!s.ok()) { - RAY_LOG(ERROR) << "Failed to delete objects from plasma store (non-fatal): " << s; + RAY_LOG(ERROR) << "Failed to delete objects from plasma store: " << s; } } From d3e0e7085d740c5ab2a3a8362e63534ced14fd58 Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Fri, 17 Oct 2025 13:27:32 +0530 Subject: [PATCH 5/6] remove new test; not much value Signed-off-by: Sagar Sumit --- .../tests/object_manager_test.cc | 22 ------------------- 1 file changed, 22 deletions(-) diff --git a/src/ray/object_manager/tests/object_manager_test.cc b/src/ray/object_manager/tests/object_manager_test.cc index caf2ecf8da29..c983adfa0bbd 100644 --- a/src/ray/object_manager/tests/object_manager_test.cc +++ b/src/ray/object_manager/tests/object_manager_test.cc @@ -29,7 +29,6 @@ #include "ray/common/ray_object.h" #include "ray/common/status.h" #include "ray/object_manager/common.h" -#include "ray/object_manager/object_buffer_pool.h" #include "ray/object_manager/plasma/fake_plasma_client.h" #include "ray/object_manager_rpc_client/fake_object_manager_client.h" @@ -144,25 +143,4 @@ TEST_F(ObjectManagerTest, TestFreeObjectsLocalOnlyFalse) { ASSERT_EQ(NumRemoteFreeObjectsRequests(*object_manager_), 1); } -// A plasma client that always returns an error on Delete. -class FailingDeletePlasmaClient : public ::plasma::FakePlasmaClient { - public: - Status Delete(const std::vector &object_ids) override { - delete_called = true; - return ray::Status::IOError("No buffer space available"); - } - bool delete_called = false; -}; - -TEST_F(ObjectManagerTest, TestObjectBufferPoolFreeObjectsNonFatalOnError) { - auto failing_client = std::make_shared(); - auto object_id = ObjectID::FromRandom(); - - ObjectBufferPool pool(failing_client, /*chunk_size=*/1024); - // Should not crash even when Delete fails. - pool.FreeObjects({object_id}); - - ASSERT_TRUE(failing_client->delete_called); -} - } // namespace ray From 9aacf3e2fa580bba6bc30121e5ae89fa80f7a598 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Tue, 21 Oct 2025 10:14:29 -0700 Subject: [PATCH 6/6] Update src/ray/object_manager/object_buffer_pool.cc Signed-off-by: Jiajun Yao --- src/ray/object_manager/object_buffer_pool.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/object_manager/object_buffer_pool.cc b/src/ray/object_manager/object_buffer_pool.cc index 34a11d885d33..c95a685f4284 100644 --- a/src/ray/object_manager/object_buffer_pool.cc +++ b/src/ray/object_manager/object_buffer_pool.cc @@ -329,7 +329,7 @@ void ObjectBufferPool::FreeObjects(const std::vector &object_ids) { absl::MutexLock lock(&pool_mutex_); Status s = store_client_->Delete(object_ids); if (!s.ok()) { - RAY_LOG(ERROR) << "Failed to delete objects from plasma store: " << s; + RAY_LOG(WARNING) << "Failed to delete objects from plasma store: " << s; } }