diff --git a/python/ray/tests/test_object_manager.py b/python/ray/tests/test_object_manager.py index 38ff6ff3c3d1a..1f2c5e5dc4944 100644 --- a/python/ray/tests/test_object_manager.py +++ b/python/ray/tests/test_object_manager.py @@ -605,52 +605,6 @@ def task(x): ray.get(t, timeout=10) -@pytest.mark.parametrize( - "ray_start_cluster_head", [{ - "num_cpus": 0, - "object_store_memory": 75 * 1024 * 1024, - "_system_config": { - "worker_lease_timeout_milliseconds": 0, - "object_manager_pull_timeout_ms": 20000, - "object_spilling_threshold": 1.0, - } - }], - indirect=True) -def test_maximize_concurrent_pull_race_condition(ray_start_cluster_head): - # Test if https://github.com/ray-project/ray/issues/18062 is mitigated - cluster = ray_start_cluster_head - cluster.add_node(num_cpus=8, object_store_memory=75 * 1024 * 1024) - - @ray.remote - class RemoteObjectCreator: - def put(self, i): - return np.random.rand(i * 1024 * 1024) # 8 MB data - - def idle(self): - pass - - @ray.remote - def f(x): - print(f"timestamp={time.time()} pulled {len(x)*8} bytes") - time.sleep(1) - return - - remote_obj_creator = RemoteObjectCreator.remote() - remote_refs = [remote_obj_creator.put.remote(1) for _ in range(7)] - print(remote_refs) - # Make sure all objects are created. - ray.get(remote_obj_creator.idle.remote()) - - local_refs = [ray.put(np.random.rand(1 * 1024 * 1024)) for _ in range(20)] - remote_tasks = [f.remote(x) for x in local_refs] - - start = time.time() - ray.get(remote_tasks) - end = time.time() - assert end - start < 10, "Too much time spent in pulling objects, " \ - "check the amount of time in retries" - - if __name__ == "__main__": import pytest import sys diff --git a/src/ray/object_manager/object_buffer_pool.cc b/src/ray/object_manager/object_buffer_pool.cc index d9c12dfc5add3..e6e214b3062f2 100644 --- a/src/ray/object_manager/object_buffer_pool.cc +++ b/src/ray/object_manager/object_buffer_pool.cc @@ -14,7 +14,6 @@ #include "ray/object_manager/object_buffer_pool.h" -#include "absl/time/time.h" #include "ray/common/status.h" #include "ray/util/logging.h" @@ -22,49 +21,26 @@ namespace ray { ObjectBufferPool::ObjectBufferPool(const std::string &store_socket_name, uint64_t chunk_size) - : store_socket_name_(store_socket_name), default_chunk_size_(chunk_size) { + : default_chunk_size_(chunk_size) { + store_socket_name_ = store_socket_name; RAY_CHECK_OK(store_client_.Connect(store_socket_name_.c_str(), "", 0, 300)); } ObjectBufferPool::~ObjectBufferPool() { - absl::MutexLock lock(&pool_mutex_); - auto inflight_ops = create_buffer_ops_; - pool_mutex_.Unlock(); - - for (const auto &[id, cond_var] : inflight_ops) { - cond_var->SignalAll(); - } - auto no_inflight = [this]() { - pool_mutex_.AssertReaderHeld(); - return create_buffer_ops_.empty(); - }; - // Assume no request would arrive, acquire pool_mutex_ when there is no inflight - // operation. Otherwise print an error. - if (!pool_mutex_.LockWhenWithTimeout(absl::Condition(&no_inflight), absl::Seconds(5))) { - RAY_LOG(ERROR) - << create_buffer_ops_.size() << " remaining inflight create buffer operations " - << "during ObjectBufferPool destruction. Either abort these operations before " - << "destroying ObjectBufferPool, or refactor ObjectBufferPool to make it " - "unnecessary to wait for the operations' completion."; - } - - // Abort unfinished buffers in progress. - for (auto it = create_buffer_state_.begin(); it != create_buffer_state_.end(); it++) { - RAY_CHECK_OK(store_client_.Release(it->first)); - RAY_CHECK_OK(store_client_.Abort(it->first)); - create_buffer_state_.erase(it); + // Abort everything in progress. + auto create_buf_state_copy = create_buffer_state_; + for (const auto &pair : create_buf_state_copy) { + AbortCreate(pair.first); } - RAY_CHECK(create_buffer_state_.empty()); RAY_CHECK_OK(store_client_.Disconnect()); } -uint64_t ObjectBufferPool::GetNumChunks(uint64_t data_size) const { +uint64_t ObjectBufferPool::GetNumChunks(uint64_t data_size) { return (data_size + default_chunk_size_ - 1) / default_chunk_size_; } -uint64_t ObjectBufferPool::GetBufferLength(uint64_t chunk_index, - uint64_t data_size) const { +uint64_t ObjectBufferPool::GetBufferLength(uint64_t chunk_index, uint64_t data_size) { return (chunk_index + 1) * default_chunk_size_ > data_size ? data_size % default_chunk_size_ : default_chunk_size_; @@ -73,7 +49,7 @@ uint64_t ObjectBufferPool::GetBufferLength(uint64_t chunk_index, std::pair, ray::Status> ObjectBufferPool::CreateObjectReader(const ObjectID &object_id, rpc::Address owner_address) { - absl::MutexLock lock(&pool_mutex_); + std::lock_guard lock(pool_mutex_); std::vector object_ids{object_id}; std::vector object_buffers(1); @@ -100,21 +76,53 @@ ray::Status ObjectBufferPool::CreateChunk(const ObjectID &object_id, const rpc::Address &owner_address, uint64_t data_size, uint64_t metadata_size, uint64_t chunk_index) { - absl::MutexLock lock(&pool_mutex_); - RAY_RETURN_NOT_OK(EnsureBufferExists(object_id, owner_address, data_size, metadata_size, - chunk_index)); - auto &state = create_buffer_state_.at(object_id); - if (state.chunk_state[chunk_index] != CreateChunkState::AVAILABLE) { + std::unique_lock lock(pool_mutex_); + if (create_buffer_state_.count(object_id) == 0) { + int64_t object_size = data_size - metadata_size; + // Try to create shared buffer. + std::shared_ptr data; + + // Release the buffer pool lock during the blocking create call. + lock.unlock(); + Status s = store_client_.CreateAndSpillIfNeeded( + object_id, owner_address, object_size, NULL, metadata_size, &data, + plasma::flatbuf::ObjectSource::ReceivedFromRemoteRaylet); + lock.lock(); + + // Another thread may have succeeded in creating the chunk while the lock + // was released. In that case skip the remainder of the creation block. + if (create_buffer_state_.count(object_id) == 0) { + std::vector buffer; + if (!s.ok()) { + // Create failed. The object may already exist locally. If something else went + // wrong, another chunk will succeed in creating the buffer, and this + // chunk will eventually make it here via pull requests. + return ray::Status::IOError(s.message()); + } + // Read object into store. + uint8_t *mutable_data = data->Data(); + uint64_t num_chunks = GetNumChunks(data_size); + create_buffer_state_.emplace( + std::piecewise_construct, std::forward_as_tuple(object_id), + std::forward_as_tuple(BuildChunks(object_id, mutable_data, data_size, data))); + RAY_LOG(DEBUG) << "Created object " << object_id + << " in plasma store, number of chunks: " << num_chunks + << ", chunk index: " << chunk_index; + RAY_CHECK(create_buffer_state_[object_id].chunk_info.size() == num_chunks); + } + } + if (create_buffer_state_[object_id].chunk_state[chunk_index] != + CreateChunkState::AVAILABLE) { // There can be only one reference to this chunk at any given time. return ray::Status::IOError("Chunk already received by a different thread."); } - state.chunk_state[chunk_index] = CreateChunkState::REFERENCED; + create_buffer_state_[object_id].chunk_state[chunk_index] = CreateChunkState::REFERENCED; return ray::Status::OK(); } void ObjectBufferPool::WriteChunk(const ObjectID &object_id, const uint64_t chunk_index, const std::string &data) { - absl::MutexLock lock(&pool_mutex_); + std::lock_guard lock(pool_mutex_); auto it = create_buffer_state_.find(object_id); if (it == create_buffer_state_.end() || it->second.chunk_state.at(chunk_index) != CreateChunkState::REFERENCED) { @@ -140,7 +148,7 @@ void ObjectBufferPool::WriteChunk(const ObjectID &object_id, const uint64_t chun } void ObjectBufferPool::AbortCreate(const ObjectID &object_id) { - absl::MutexLock lock(&pool_mutex_); + std::lock_guard lock(pool_mutex_); auto it = create_buffer_state_.find(object_id); if (it != create_buffer_state_.end()) { RAY_LOG(INFO) << "Not enough memory to create requested object " << object_id @@ -171,80 +179,13 @@ std::vector ObjectBufferPool::BuildChunks( return chunks; } -ray::Status ObjectBufferPool::EnsureBufferExists(const ObjectID &object_id, - const rpc::Address &owner_address, - uint64_t data_size, - uint64_t metadata_size, - uint64_t chunk_index) { - // Buffer for object_id already exists. - if (create_buffer_state_.contains(object_id)) { - return ray::Status::OK(); - } - - auto it = create_buffer_ops_.find(object_id); - if (it != create_buffer_ops_.end()) { - auto cond_var = it->second; - // Release pool_mutex_ while waiting, until there is no inflight create buffer - // operation for the object. - while (create_buffer_ops_.contains(object_id)) { - cond_var->Wait(&pool_mutex_); - } - // Buffer already created. - if (create_buffer_state_.contains(object_id)) { - return ray::Status::OK(); - } - // Otherwise, previous create operation failed. - } - - // Indicate the inflight create buffer operation, by inserting into create_buffer_ops_. - create_buffer_ops_[object_id] = std::make_shared(); - const int64_t object_size = data_size - metadata_size; - std::shared_ptr data; - - // Release pool_mutex_ during the blocking create call. - pool_mutex_.Unlock(); - Status s = store_client_.CreateAndSpillIfNeeded( - object_id, owner_address, object_size, nullptr, metadata_size, &data, - plasma::flatbuf::ObjectSource::ReceivedFromRemoteRaylet); - pool_mutex_.Lock(); - - // No other thread could have created the buffer. - RAY_CHECK(!create_buffer_state_.contains(object_id)); - - // Indicate the create operation for create_buffer_ops_ has finished. - it = create_buffer_ops_.find(object_id); - auto cond_var = it->second; - create_buffer_ops_.erase(it); - - if (!s.ok()) { - // Create failed. Buffer creation will be tried by another chunk. - // And this chunk will eventually make it here via retried pull requests. - cond_var->SignalAll(); - return ray::Status::IOError(s.message()); - } - - // Read object into store. - uint8_t *mutable_data = data->Data(); - uint64_t num_chunks = GetNumChunks(data_size); - create_buffer_state_.emplace( - std::piecewise_construct, std::forward_as_tuple(object_id), - std::forward_as_tuple(BuildChunks(object_id, mutable_data, data_size, data))); - RAY_CHECK(create_buffer_state_[object_id].chunk_info.size() == num_chunks); - RAY_LOG(DEBUG) << "Created object " << object_id - << " in plasma store, number of chunks: " << num_chunks - << ", chunk index: " << chunk_index; - - cond_var->SignalAll(); - return ray::Status::OK(); -} - void ObjectBufferPool::FreeObjects(const std::vector &object_ids) { - absl::MutexLock lock(&pool_mutex_); + std::lock_guard lock(pool_mutex_); RAY_CHECK_OK(store_client_.Delete(object_ids)); } std::string ObjectBufferPool::DebugString() const { - absl::MutexLock lock(&pool_mutex_); + std::lock_guard lock(pool_mutex_); std::stringstream result; result << "BufferPool:"; result << "\n- create buffer state map size: " << create_buffer_state_.size(); diff --git a/src/ray/object_manager/object_buffer_pool.h b/src/ray/object_manager/object_buffer_pool.h index 33d665898f7b4..05c51e5e00117 100644 --- a/src/ray/object_manager/object_buffer_pool.h +++ b/src/ray/object_manager/object_buffer_pool.h @@ -19,11 +19,9 @@ #include #include #include +#include #include -#include "absl/base/thread_annotations.h" -#include "absl/container/flat_hash_map.h" -#include "absl/synchronization/mutex.h" #include "ray/common/id.h" #include "ray/common/status.h" #include "ray/object_manager/memory_object_reader.h" @@ -70,14 +68,14 @@ class ObjectBufferPool { /// /// \param data_size The size of the object + metadata. /// \return The number of chunks into which the object will be split. - uint64_t GetNumChunks(uint64_t data_size) const; + uint64_t GetNumChunks(uint64_t data_size); /// Computes the buffer length of a chunk of an object. /// /// \param chunk_index The chunk index for which to obtain the buffer length. /// \param data_size The size of the object + metadata. /// \return The buffer length of the chunk at chunk_index. - uint64_t GetBufferLength(uint64_t chunk_index, uint64_t data_size) const; + uint64_t GetBufferLength(uint64_t chunk_index, uint64_t data_size); /// Returns an object reader for read. /// @@ -87,7 +85,7 @@ class ObjectBufferPool { /// this method. An IOError status is returned if the Get call on the plasma store /// fails, and the MemoryObjectReader will be empty. std::pair, ray::Status> CreateObjectReader( - const ObjectID &object_id, rpc::Address owner_address) LOCKS_EXCLUDED(pool_mutex_); + const ObjectID &object_id, rpc::Address owner_address); /// Returns a chunk of an empty object at the given chunk_index. The object chunk /// serves as the buffer that is to be written to by a connection receiving an @@ -108,7 +106,7 @@ class ObjectBufferPool { /// (with no intermediate AbortCreateChunk). ray::Status CreateChunk(const ObjectID &object_id, const rpc::Address &owner_address, uint64_t data_size, uint64_t metadata_size, - uint64_t chunk_index) LOCKS_EXCLUDED(pool_mutex_); + uint64_t chunk_index); /// Write to a Chunk of an object. If all chunks of an object is written, /// it seals the object. @@ -121,44 +119,34 @@ class ObjectBufferPool { /// \param chunk_index The index of the chunk. /// \param data The data to write into the chunk. void WriteChunk(const ObjectID &object_id, uint64_t chunk_index, - const std::string &data) LOCKS_EXCLUDED(pool_mutex_); + const std::string &data); /// Free a list of objects from object store. /// /// \param object_ids the The list of ObjectIDs to be deleted. /// \return Void. - void FreeObjects(const std::vector &object_ids) LOCKS_EXCLUDED(pool_mutex_); + void FreeObjects(const std::vector &object_ids); /// Abort the create operation associated with an object. This destroys the buffer /// state, including create operations in progress for all chunks of the object. - void AbortCreate(const ObjectID &object_id) LOCKS_EXCLUDED(pool_mutex_); + void AbortCreate(const ObjectID &object_id); /// Returns debug string for class. /// /// \return string. - std::string DebugString() const LOCKS_EXCLUDED(pool_mutex_); + std::string DebugString() const; private: /// Splits an object into ceil(data_size/chunk_size) chunks, which will /// either be read or written to in parallel. std::vector BuildChunks(const ObjectID &object_id, uint8_t *data, uint64_t data_size, - std::shared_ptr buffer_ref) - EXCLUSIVE_LOCKS_REQUIRED(pool_mutex_); - - /// Ensures buffer for the object exists, and creates the buffer if needed. - /// Returns OK if buffer exists. - /// Must hold pool_mutex_ when calling this function. pool_mutex_ can be released - /// during the call. - ray::Status EnsureBufferExists(const ObjectID &object_id, - const rpc::Address &owner_address, uint64_t data_size, - uint64_t metadata_size, uint64_t chunk_index) - EXCLUSIVE_LOCKS_REQUIRED(pool_mutex_); + std::shared_ptr buffer_ref); /// The state of a chunk associated with a create operation. enum class CreateChunkState : unsigned int { AVAILABLE = 0, REFERENCED, SEALED }; - /// Holds the state of creating chunks. Members are protected by pool_mutex_. + /// Holds the state of a create buffer. struct CreateBufferState { CreateBufferState() {} CreateBufferState(std::vector chunk_info) @@ -178,29 +166,18 @@ class ObjectBufferPool { /// Returned when GetChunk or CreateChunk fails. const ChunkInfo errored_chunk_ = {0, nullptr, 0, nullptr}; - /// Socket name of plasma store. - const std::string store_socket_name_; - + /// Mutex on public methods for thread-safe operations on + /// get_buffer_state_, create_buffer_state_, and store_client_. + mutable std::mutex pool_mutex_; /// Determines the maximum chunk size to be transferred by a single thread. const uint64_t default_chunk_size_; - - /// Mutex to protect create_buffer_ops_, create_buffer_state_ and following invariants: - /// - create_buffer_ops_ contains an object_id iff there is an inflight operation to - /// create the buffer for the object. - /// - An object_id cannot appear in both create_buffer_ops_ and create_buffer_state_. - mutable absl::Mutex pool_mutex_; - /// Makes sure each object has at most one inflight create buffer operation. - /// Other operations can wait on the std::condition_variable for the operation - /// to complete. If successful, the corresponding entry in create_buffer_state_ - /// will be created. - absl::flat_hash_map> create_buffer_ops_ - GUARDED_BY(pool_mutex_); /// The state of a buffer that's currently being used. - absl::flat_hash_map create_buffer_state_ - GUARDED_BY(pool_mutex_); + std::unordered_map create_buffer_state_; /// Plasma client pool. plasma::PlasmaClient store_client_; + /// Socket name of plasma store. + std::string store_socket_name_; }; } // namespace ray diff --git a/src/ray/object_manager/object_manager.h b/src/ray/object_manager/object_manager.h index 38152dd258060..688c22ef04ea5 100644 --- a/src/ray/object_manager/object_manager.h +++ b/src/ray/object_manager/object_manager.h @@ -58,7 +58,7 @@ struct ObjectManagerConfig { /// The object manager's global timer frequency. unsigned int timer_freq_ms; /// The time in milliseconds to wait before retrying a pull - /// that failed. + /// that fails due to node id lookup. unsigned int pull_timeout_ms; /// Object chunk size, in bytes uint64_t object_chunk_size;