From b066627539d84e20e224a9a6ba227ff233700e58 Mon Sep 17 00:00:00 2001 From: mwtian <81660174+mwtian@users.noreply.github.com> Date: Fri, 8 Oct 2021 12:58:18 -0700 Subject: [PATCH] [Object manager] don't abort entire pull request on race condition from concurrent chunk receive - #2 (#19216) --- python/ray/tests/test_object_manager.py | 48 +++++- src/ray/common/ray_config_def.h | 3 +- src/ray/object_manager/object_buffer_pool.cc | 165 +++++++++++++------ src/ray/object_manager/object_buffer_pool.h | 57 +++++-- src/ray/object_manager/object_manager.h | 2 +- 5 files changed, 202 insertions(+), 73 deletions(-) diff --git a/python/ray/tests/test_object_manager.py b/python/ray/tests/test_object_manager.py index 1f2c5e5dc4944..e44bf22e83187 100644 --- a/python/ray/tests/test_object_manager.py +++ b/python/ray/tests/test_object_manager.py @@ -296,8 +296,6 @@ def driver(): ray.get(driver.remote()) -# TODO(ekl) this sometimes takes much longer (10+s) due to a higher level -# pull retry. We should try to resolve these hangs in the chunk transfer logic. def test_pull_bundles_admission_control(shutdown_only): cluster = Cluster() object_size = int(6e6) @@ -605,6 +603,52 @@ def task(x): ray.get(t, timeout=10) +@pytest.mark.parametrize( + "ray_start_cluster_head", [{ + "num_cpus": 0, + "object_store_memory": 75 * 1024 * 1024, + "_system_config": { + "worker_lease_timeout_milliseconds": 0, + "object_manager_pull_timeout_ms": 20000, + "object_spilling_threshold": 1.0, + } + }], + indirect=True) +def test_maximize_concurrent_pull_race_condition(ray_start_cluster_head): + # Test if https://github.com/ray-project/ray/issues/18062 is mitigated + cluster = ray_start_cluster_head + cluster.add_node(num_cpus=8, object_store_memory=75 * 1024 * 1024) + + @ray.remote + class RemoteObjectCreator: + def put(self, i): + return np.random.rand(i * 1024 * 1024) # 8 MB data + + def idle(self): + pass + + @ray.remote + def f(x): + print(f"timestamp={time.time()} pulled {len(x)*8} bytes") + time.sleep(1) + return + + remote_obj_creator = RemoteObjectCreator.remote() + remote_refs = [remote_obj_creator.put.remote(1) for _ in range(7)] + print(remote_refs) + # Make sure all objects are created. + ray.get(remote_obj_creator.idle.remote()) + + local_refs = [ray.put(np.random.rand(1 * 1024 * 1024)) for _ in range(20)] + remote_tasks = [f.remote(x) for x in local_refs] + + start = time.time() + ray.get(remote_tasks) + end = time.time() + assert end - start < 20, "Too much time spent in pulling objects, " \ + "check the amount of time in retries" + + if __name__ == "__main__": import pytest import sys diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index b22c695b55faf..f4ba9d03b76a8 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -183,8 +183,7 @@ RAY_CONFIG(int64_t, worker_register_timeout_seconds, 30) RAY_CONFIG(int64_t, redis_db_connect_retries, 50) RAY_CONFIG(int64_t, redis_db_connect_wait_milliseconds, 100) -/// Timeout, in milliseconds, to wait before retrying a failed pull in the -/// ObjectManager. +/// The object manager's global timer interval in milliseconds. RAY_CONFIG(int, object_manager_timer_freq_ms, 100) /// Timeout, in milliseconds, to wait before retrying a failed pull in the diff --git a/src/ray/object_manager/object_buffer_pool.cc b/src/ray/object_manager/object_buffer_pool.cc index fc013b1996cc1..b25439cd7203c 100644 --- a/src/ray/object_manager/object_buffer_pool.cc +++ b/src/ray/object_manager/object_buffer_pool.cc @@ -14,6 +14,7 @@ #include "ray/object_manager/object_buffer_pool.h" +#include "absl/time/time.h" #include "ray/common/status.h" #include "ray/util/logging.h" @@ -21,26 +22,49 @@ namespace ray { ObjectBufferPool::ObjectBufferPool(const std::string &store_socket_name, uint64_t chunk_size) - : default_chunk_size_(chunk_size) { - store_socket_name_ = store_socket_name; + : store_socket_name_(store_socket_name), default_chunk_size_(chunk_size) { RAY_CHECK_OK(store_client_.Connect(store_socket_name_.c_str(), "", 0, 300)); } ObjectBufferPool::~ObjectBufferPool() { - // Abort everything in progress. - auto create_buf_state_copy = create_buffer_state_; - for (const auto &pair : create_buf_state_copy) { - AbortCreate(pair.first); + absl::MutexLock lock(&pool_mutex_); + auto inflight_ops = create_buffer_ops_; + pool_mutex_.Unlock(); + + for (const auto &[id, cond_var] : inflight_ops) { + cond_var->SignalAll(); + } + auto no_inflight = [this]() { + pool_mutex_.AssertReaderHeld(); + return create_buffer_ops_.empty(); + }; + // Assume no request would arrive, acquire pool_mutex_ when there is no inflight + // operation. Otherwise print an error. + if (!pool_mutex_.LockWhenWithTimeout(absl::Condition(&no_inflight), absl::Seconds(5))) { + RAY_LOG(ERROR) + << create_buffer_ops_.size() << " remaining inflight create buffer operations " + << "during ObjectBufferPool destruction. Either abort these operations before " + << "destroying ObjectBufferPool, or refactor ObjectBufferPool to make it " + "unnecessary to wait for the operations' completion."; } + + // Abort unfinished buffers in progress. + for (auto it = create_buffer_state_.begin(); it != create_buffer_state_.end(); it++) { + RAY_CHECK_OK(store_client_.Release(it->first)); + RAY_CHECK_OK(store_client_.Abort(it->first)); + create_buffer_state_.erase(it); + } + RAY_CHECK(create_buffer_state_.empty()); RAY_CHECK_OK(store_client_.Disconnect()); } -uint64_t ObjectBufferPool::GetNumChunks(uint64_t data_size) { +uint64_t ObjectBufferPool::GetNumChunks(uint64_t data_size) const { return (data_size + default_chunk_size_ - 1) / default_chunk_size_; } -uint64_t ObjectBufferPool::GetBufferLength(uint64_t chunk_index, uint64_t data_size) { +uint64_t ObjectBufferPool::GetBufferLength(uint64_t chunk_index, + uint64_t data_size) const { return (chunk_index + 1) * default_chunk_size_ > data_size ? data_size % default_chunk_size_ : default_chunk_size_; @@ -49,7 +73,7 @@ uint64_t ObjectBufferPool::GetBufferLength(uint64_t chunk_index, uint64_t data_s std::pair, ray::Status> ObjectBufferPool::CreateObjectReader(const ObjectID &object_id, rpc::Address owner_address) { - std::lock_guard lock(pool_mutex_); + absl::MutexLock lock(&pool_mutex_); std::vector object_ids{object_id}; std::vector object_buffers(1); @@ -76,53 +100,21 @@ ray::Status ObjectBufferPool::CreateChunk(const ObjectID &object_id, const rpc::Address &owner_address, uint64_t data_size, uint64_t metadata_size, uint64_t chunk_index) { - std::unique_lock lock(pool_mutex_); - if (create_buffer_state_.count(object_id) == 0) { - int64_t object_size = data_size - metadata_size; - // Try to create shared buffer. - std::shared_ptr data; - - // Release the buffer pool lock during the blocking create call. - lock.unlock(); - Status s = store_client_.CreateAndSpillIfNeeded( - object_id, owner_address, object_size, nullptr, metadata_size, &data, - plasma::flatbuf::ObjectSource::ReceivedFromRemoteRaylet); - lock.lock(); - - // Another thread may have succeeded in creating the chunk while the lock - // was released. In that case skip the remainder of the creation block. - if (create_buffer_state_.count(object_id) == 0) { - std::vector buffer; - if (!s.ok()) { - // Create failed. The object may already exist locally. If something else went - // wrong, another chunk will succeed in creating the buffer, and this - // chunk will eventually make it here via pull requests. - return ray::Status::IOError(s.message()); - } - // Read object into store. - uint8_t *mutable_data = data->Data(); - uint64_t num_chunks = GetNumChunks(data_size); - create_buffer_state_.emplace( - std::piecewise_construct, std::forward_as_tuple(object_id), - std::forward_as_tuple(BuildChunks(object_id, mutable_data, data_size, data))); - RAY_LOG(DEBUG) << "Created object " << object_id - << " in plasma store, number of chunks: " << num_chunks - << ", chunk index: " << chunk_index; - RAY_CHECK(create_buffer_state_[object_id].chunk_info.size() == num_chunks); - } - } - if (create_buffer_state_[object_id].chunk_state[chunk_index] != - CreateChunkState::AVAILABLE) { + absl::MutexLock lock(&pool_mutex_); + RAY_RETURN_NOT_OK(EnsureBufferExists(object_id, owner_address, data_size, metadata_size, + chunk_index)); + auto &state = create_buffer_state_.at(object_id); + if (state.chunk_state[chunk_index] != CreateChunkState::AVAILABLE) { // There can be only one reference to this chunk at any given time. return ray::Status::IOError("Chunk already received by a different thread."); } - create_buffer_state_[object_id].chunk_state[chunk_index] = CreateChunkState::REFERENCED; + state.chunk_state[chunk_index] = CreateChunkState::REFERENCED; return ray::Status::OK(); } void ObjectBufferPool::WriteChunk(const ObjectID &object_id, const uint64_t chunk_index, const std::string &data) { - std::lock_guard lock(pool_mutex_); + absl::MutexLock lock(&pool_mutex_); auto it = create_buffer_state_.find(object_id); if (it == create_buffer_state_.end() || it->second.chunk_state.at(chunk_index) != CreateChunkState::REFERENCED) { @@ -148,7 +140,7 @@ void ObjectBufferPool::WriteChunk(const ObjectID &object_id, const uint64_t chun } void ObjectBufferPool::AbortCreate(const ObjectID &object_id) { - std::lock_guard lock(pool_mutex_); + absl::MutexLock lock(&pool_mutex_); auto it = create_buffer_state_.find(object_id); if (it != create_buffer_state_.end()) { RAY_LOG(INFO) << "Not enough memory to create requested object " << object_id @@ -179,13 +171,84 @@ std::vector ObjectBufferPool::BuildChunks( return chunks; } +ray::Status ObjectBufferPool::EnsureBufferExists(const ObjectID &object_id, + const rpc::Address &owner_address, + uint64_t data_size, + uint64_t metadata_size, + uint64_t chunk_index) { + while (true) { + // Buffer for object_id already exists. + if (create_buffer_state_.contains(object_id)) { + return ray::Status::OK(); + } + + auto it = create_buffer_ops_.find(object_id); + if (it == create_buffer_ops_.end()) { + // No inflight create buffer operation, proceed to start one. + break; + } + + auto cond_var = it->second; + // Release pool_mutex_ while waiting, until the current inflight create buffer + // operation finishes. + cond_var->Wait(&pool_mutex_); + } + + // Indicate that there is an inflight create buffer operation, by inserting into + // create_buffer_ops_. + RAY_CHECK( + create_buffer_ops_.insert({object_id, std::make_shared()}).second); + const int64_t object_size = + static_cast(data_size) - static_cast(metadata_size); + std::shared_ptr data; + + // Release pool_mutex_ during the blocking create call. + pool_mutex_.Unlock(); + Status s = store_client_.CreateAndSpillIfNeeded( + object_id, owner_address, static_cast(object_size), nullptr, + static_cast(metadata_size), &data, + plasma::flatbuf::ObjectSource::ReceivedFromRemoteRaylet); + pool_mutex_.Lock(); + + // No other thread could have created the buffer. + RAY_CHECK(!create_buffer_state_.contains(object_id)); + + // Remove object_id from create_buffer_ops_ to indicate to the waiting ops that the + // inflight operation has finished. Wake up waiters so they can either start another + // create buffer op, or proceed after the buffer has been created. + { + auto it = create_buffer_ops_.find(object_id); + it->second->SignalAll(); + create_buffer_ops_.erase(it); + } + + if (!s.ok()) { + // Create failed. Buffer creation will be tried by another chunk. + // And this chunk will eventually make it here via retried pull requests. + return ray::Status::IOError(s.message()); + } + + // Read object into store. + uint8_t *mutable_data = data->Data(); + uint64_t num_chunks = GetNumChunks(data_size); + create_buffer_state_.emplace( + std::piecewise_construct, std::forward_as_tuple(object_id), + std::forward_as_tuple(BuildChunks(object_id, mutable_data, data_size, data))); + RAY_CHECK(create_buffer_state_[object_id].chunk_info.size() == num_chunks); + RAY_LOG(DEBUG) << "Created object " << object_id + << " in plasma store, number of chunks: " << num_chunks + << ", chunk index: " << chunk_index; + + return ray::Status::OK(); +} + void ObjectBufferPool::FreeObjects(const std::vector &object_ids) { - std::lock_guard lock(pool_mutex_); + absl::MutexLock lock(&pool_mutex_); RAY_CHECK_OK(store_client_.Delete(object_ids)); } std::string ObjectBufferPool::DebugString() const { - std::lock_guard lock(pool_mutex_); + absl::MutexLock lock(&pool_mutex_); std::stringstream result; result << "BufferPool:"; result << "\n- create buffer state map size: " << create_buffer_state_.size(); diff --git a/src/ray/object_manager/object_buffer_pool.h b/src/ray/object_manager/object_buffer_pool.h index 05c51e5e00117..33d665898f7b4 100644 --- a/src/ray/object_manager/object_buffer_pool.h +++ b/src/ray/object_manager/object_buffer_pool.h @@ -19,9 +19,11 @@ #include #include #include -#include #include +#include "absl/base/thread_annotations.h" +#include "absl/container/flat_hash_map.h" +#include "absl/synchronization/mutex.h" #include "ray/common/id.h" #include "ray/common/status.h" #include "ray/object_manager/memory_object_reader.h" @@ -68,14 +70,14 @@ class ObjectBufferPool { /// /// \param data_size The size of the object + metadata. /// \return The number of chunks into which the object will be split. - uint64_t GetNumChunks(uint64_t data_size); + uint64_t GetNumChunks(uint64_t data_size) const; /// Computes the buffer length of a chunk of an object. /// /// \param chunk_index The chunk index for which to obtain the buffer length. /// \param data_size The size of the object + metadata. /// \return The buffer length of the chunk at chunk_index. - uint64_t GetBufferLength(uint64_t chunk_index, uint64_t data_size); + uint64_t GetBufferLength(uint64_t chunk_index, uint64_t data_size) const; /// Returns an object reader for read. /// @@ -85,7 +87,7 @@ class ObjectBufferPool { /// this method. An IOError status is returned if the Get call on the plasma store /// fails, and the MemoryObjectReader will be empty. std::pair, ray::Status> CreateObjectReader( - const ObjectID &object_id, rpc::Address owner_address); + const ObjectID &object_id, rpc::Address owner_address) LOCKS_EXCLUDED(pool_mutex_); /// Returns a chunk of an empty object at the given chunk_index. The object chunk /// serves as the buffer that is to be written to by a connection receiving an @@ -106,7 +108,7 @@ class ObjectBufferPool { /// (with no intermediate AbortCreateChunk). ray::Status CreateChunk(const ObjectID &object_id, const rpc::Address &owner_address, uint64_t data_size, uint64_t metadata_size, - uint64_t chunk_index); + uint64_t chunk_index) LOCKS_EXCLUDED(pool_mutex_); /// Write to a Chunk of an object. If all chunks of an object is written, /// it seals the object. @@ -119,34 +121,44 @@ class ObjectBufferPool { /// \param chunk_index The index of the chunk. /// \param data The data to write into the chunk. void WriteChunk(const ObjectID &object_id, uint64_t chunk_index, - const std::string &data); + const std::string &data) LOCKS_EXCLUDED(pool_mutex_); /// Free a list of objects from object store. /// /// \param object_ids the The list of ObjectIDs to be deleted. /// \return Void. - void FreeObjects(const std::vector &object_ids); + void FreeObjects(const std::vector &object_ids) LOCKS_EXCLUDED(pool_mutex_); /// Abort the create operation associated with an object. This destroys the buffer /// state, including create operations in progress for all chunks of the object. - void AbortCreate(const ObjectID &object_id); + void AbortCreate(const ObjectID &object_id) LOCKS_EXCLUDED(pool_mutex_); /// Returns debug string for class. /// /// \return string. - std::string DebugString() const; + std::string DebugString() const LOCKS_EXCLUDED(pool_mutex_); private: /// Splits an object into ceil(data_size/chunk_size) chunks, which will /// either be read or written to in parallel. std::vector BuildChunks(const ObjectID &object_id, uint8_t *data, uint64_t data_size, - std::shared_ptr buffer_ref); + std::shared_ptr buffer_ref) + EXCLUSIVE_LOCKS_REQUIRED(pool_mutex_); + + /// Ensures buffer for the object exists, and creates the buffer if needed. + /// Returns OK if buffer exists. + /// Must hold pool_mutex_ when calling this function. pool_mutex_ can be released + /// during the call. + ray::Status EnsureBufferExists(const ObjectID &object_id, + const rpc::Address &owner_address, uint64_t data_size, + uint64_t metadata_size, uint64_t chunk_index) + EXCLUSIVE_LOCKS_REQUIRED(pool_mutex_); /// The state of a chunk associated with a create operation. enum class CreateChunkState : unsigned int { AVAILABLE = 0, REFERENCED, SEALED }; - /// Holds the state of a create buffer. + /// Holds the state of creating chunks. Members are protected by pool_mutex_. struct CreateBufferState { CreateBufferState() {} CreateBufferState(std::vector chunk_info) @@ -166,18 +178,29 @@ class ObjectBufferPool { /// Returned when GetChunk or CreateChunk fails. const ChunkInfo errored_chunk_ = {0, nullptr, 0, nullptr}; - /// Mutex on public methods for thread-safe operations on - /// get_buffer_state_, create_buffer_state_, and store_client_. - mutable std::mutex pool_mutex_; + /// Socket name of plasma store. + const std::string store_socket_name_; + /// Determines the maximum chunk size to be transferred by a single thread. const uint64_t default_chunk_size_; + + /// Mutex to protect create_buffer_ops_, create_buffer_state_ and following invariants: + /// - create_buffer_ops_ contains an object_id iff there is an inflight operation to + /// create the buffer for the object. + /// - An object_id cannot appear in both create_buffer_ops_ and create_buffer_state_. + mutable absl::Mutex pool_mutex_; + /// Makes sure each object has at most one inflight create buffer operation. + /// Other operations can wait on the std::condition_variable for the operation + /// to complete. If successful, the corresponding entry in create_buffer_state_ + /// will be created. + absl::flat_hash_map> create_buffer_ops_ + GUARDED_BY(pool_mutex_); /// The state of a buffer that's currently being used. - std::unordered_map create_buffer_state_; + absl::flat_hash_map create_buffer_state_ + GUARDED_BY(pool_mutex_); /// Plasma client pool. plasma::PlasmaClient store_client_; - /// Socket name of plasma store. - std::string store_socket_name_; }; } // namespace ray diff --git a/src/ray/object_manager/object_manager.h b/src/ray/object_manager/object_manager.h index 688c22ef04ea5..38152dd258060 100644 --- a/src/ray/object_manager/object_manager.h +++ b/src/ray/object_manager/object_manager.h @@ -58,7 +58,7 @@ struct ObjectManagerConfig { /// The object manager's global timer frequency. unsigned int timer_freq_ms; /// The time in milliseconds to wait before retrying a pull - /// that fails due to node id lookup. + /// that failed. unsigned int pull_timeout_ms; /// Object chunk size, in bytes uint64_t object_chunk_size;