Skip to content

Commit

Permalink
Revert "[Object manager] don't abort entire pull request on race cond…
Browse files Browse the repository at this point in the history
…ition in concurrent chunk receive (ray-project#18955)"

This reverts commit d12e35c.
  • Loading branch information
rkooo567 committed Oct 4, 2021
1 parent 993834c commit 13ad971
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 197 deletions.
46 changes: 0 additions & 46 deletions python/ray/tests/test_object_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -605,52 +605,6 @@ def task(x):
ray.get(t, timeout=10)


@pytest.mark.parametrize(
"ray_start_cluster_head", [{
"num_cpus": 0,
"object_store_memory": 75 * 1024 * 1024,
"_system_config": {
"worker_lease_timeout_milliseconds": 0,
"object_manager_pull_timeout_ms": 20000,
"object_spilling_threshold": 1.0,
}
}],
indirect=True)
def test_maximize_concurrent_pull_race_condition(ray_start_cluster_head):
# Test if https://github.com/ray-project/ray/issues/18062 is mitigated
cluster = ray_start_cluster_head
cluster.add_node(num_cpus=8, object_store_memory=75 * 1024 * 1024)

@ray.remote
class RemoteObjectCreator:
def put(self, i):
return np.random.rand(i * 1024 * 1024) # 8 MB data

def idle(self):
pass

@ray.remote
def f(x):
print(f"timestamp={time.time()} pulled {len(x)*8} bytes")
time.sleep(1)
return

remote_obj_creator = RemoteObjectCreator.remote()
remote_refs = [remote_obj_creator.put.remote(1) for _ in range(7)]
print(remote_refs)
# Make sure all objects are created.
ray.get(remote_obj_creator.idle.remote())

local_refs = [ray.put(np.random.rand(1 * 1024 * 1024)) for _ in range(20)]
remote_tasks = [f.remote(x) for x in local_refs]

start = time.time()
ray.get(remote_tasks)
end = time.time()
assert end - start < 10, "Too much time spent in pulling objects, " \
"check the amount of time in retries"


if __name__ == "__main__":
import pytest
import sys
Expand Down
161 changes: 51 additions & 110 deletions src/ray/object_manager/object_buffer_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,57 +14,33 @@

#include "ray/object_manager/object_buffer_pool.h"

#include "absl/time/time.h"
#include "ray/common/status.h"
#include "ray/util/logging.h"

namespace ray {

ObjectBufferPool::ObjectBufferPool(const std::string &store_socket_name,
uint64_t chunk_size)
: store_socket_name_(store_socket_name), default_chunk_size_(chunk_size) {
: default_chunk_size_(chunk_size) {
store_socket_name_ = store_socket_name;
RAY_CHECK_OK(store_client_.Connect(store_socket_name_.c_str(), "", 0, 300));
}

ObjectBufferPool::~ObjectBufferPool() {
absl::MutexLock lock(&pool_mutex_);
auto inflight_ops = create_buffer_ops_;
pool_mutex_.Unlock();

for (const auto &[id, cond_var] : inflight_ops) {
cond_var->SignalAll();
}
auto no_inflight = [this]() {
pool_mutex_.AssertReaderHeld();
return create_buffer_ops_.empty();
};
// Assume no request would arrive, acquire pool_mutex_ when there is no inflight
// operation. Otherwise print an error.
if (!pool_mutex_.LockWhenWithTimeout(absl::Condition(&no_inflight), absl::Seconds(5))) {
RAY_LOG(ERROR)
<< create_buffer_ops_.size() << " remaining inflight create buffer operations "
<< "during ObjectBufferPool destruction. Either abort these operations before "
<< "destroying ObjectBufferPool, or refactor ObjectBufferPool to make it "
"unnecessary to wait for the operations' completion.";
}

// Abort unfinished buffers in progress.
for (auto it = create_buffer_state_.begin(); it != create_buffer_state_.end(); it++) {
RAY_CHECK_OK(store_client_.Release(it->first));
RAY_CHECK_OK(store_client_.Abort(it->first));
create_buffer_state_.erase(it);
// Abort everything in progress.
auto create_buf_state_copy = create_buffer_state_;
for (const auto &pair : create_buf_state_copy) {
AbortCreate(pair.first);
}

RAY_CHECK(create_buffer_state_.empty());
RAY_CHECK_OK(store_client_.Disconnect());
}

uint64_t ObjectBufferPool::GetNumChunks(uint64_t data_size) const {
uint64_t ObjectBufferPool::GetNumChunks(uint64_t data_size) {
return (data_size + default_chunk_size_ - 1) / default_chunk_size_;
}

uint64_t ObjectBufferPool::GetBufferLength(uint64_t chunk_index,
uint64_t data_size) const {
uint64_t ObjectBufferPool::GetBufferLength(uint64_t chunk_index, uint64_t data_size) {
return (chunk_index + 1) * default_chunk_size_ > data_size
? data_size % default_chunk_size_
: default_chunk_size_;
Expand All @@ -73,7 +49,7 @@ uint64_t ObjectBufferPool::GetBufferLength(uint64_t chunk_index,
std::pair<std::shared_ptr<MemoryObjectReader>, ray::Status>
ObjectBufferPool::CreateObjectReader(const ObjectID &object_id,
rpc::Address owner_address) {
absl::MutexLock lock(&pool_mutex_);
std::lock_guard<std::mutex> lock(pool_mutex_);

std::vector<ObjectID> object_ids{object_id};
std::vector<plasma::ObjectBuffer> object_buffers(1);
Expand All @@ -100,21 +76,53 @@ ray::Status ObjectBufferPool::CreateChunk(const ObjectID &object_id,
const rpc::Address &owner_address,
uint64_t data_size, uint64_t metadata_size,
uint64_t chunk_index) {
absl::MutexLock lock(&pool_mutex_);
RAY_RETURN_NOT_OK(EnsureBufferExists(object_id, owner_address, data_size, metadata_size,
chunk_index));
auto &state = create_buffer_state_.at(object_id);
if (state.chunk_state[chunk_index] != CreateChunkState::AVAILABLE) {
std::unique_lock<std::mutex> lock(pool_mutex_);
if (create_buffer_state_.count(object_id) == 0) {
int64_t object_size = data_size - metadata_size;
// Try to create shared buffer.
std::shared_ptr<Buffer> data;

// Release the buffer pool lock during the blocking create call.
lock.unlock();
Status s = store_client_.CreateAndSpillIfNeeded(
object_id, owner_address, object_size, NULL, metadata_size, &data,
plasma::flatbuf::ObjectSource::ReceivedFromRemoteRaylet);
lock.lock();

// Another thread may have succeeded in creating the chunk while the lock
// was released. In that case skip the remainder of the creation block.
if (create_buffer_state_.count(object_id) == 0) {
std::vector<boost::asio::mutable_buffer> buffer;
if (!s.ok()) {
// Create failed. The object may already exist locally. If something else went
// wrong, another chunk will succeed in creating the buffer, and this
// chunk will eventually make it here via pull requests.
return ray::Status::IOError(s.message());
}
// Read object into store.
uint8_t *mutable_data = data->Data();
uint64_t num_chunks = GetNumChunks(data_size);
create_buffer_state_.emplace(
std::piecewise_construct, std::forward_as_tuple(object_id),
std::forward_as_tuple(BuildChunks(object_id, mutable_data, data_size, data)));
RAY_LOG(DEBUG) << "Created object " << object_id
<< " in plasma store, number of chunks: " << num_chunks
<< ", chunk index: " << chunk_index;
RAY_CHECK(create_buffer_state_[object_id].chunk_info.size() == num_chunks);
}
}
if (create_buffer_state_[object_id].chunk_state[chunk_index] !=
CreateChunkState::AVAILABLE) {
// There can be only one reference to this chunk at any given time.
return ray::Status::IOError("Chunk already received by a different thread.");
}
state.chunk_state[chunk_index] = CreateChunkState::REFERENCED;
create_buffer_state_[object_id].chunk_state[chunk_index] = CreateChunkState::REFERENCED;
return ray::Status::OK();
}

void ObjectBufferPool::WriteChunk(const ObjectID &object_id, const uint64_t chunk_index,
const std::string &data) {
absl::MutexLock lock(&pool_mutex_);
std::lock_guard<std::mutex> lock(pool_mutex_);
auto it = create_buffer_state_.find(object_id);
if (it == create_buffer_state_.end() ||
it->second.chunk_state.at(chunk_index) != CreateChunkState::REFERENCED) {
Expand All @@ -140,7 +148,7 @@ void ObjectBufferPool::WriteChunk(const ObjectID &object_id, const uint64_t chun
}

void ObjectBufferPool::AbortCreate(const ObjectID &object_id) {
absl::MutexLock lock(&pool_mutex_);
std::lock_guard<std::mutex> lock(pool_mutex_);
auto it = create_buffer_state_.find(object_id);
if (it != create_buffer_state_.end()) {
RAY_LOG(INFO) << "Not enough memory to create requested object " << object_id
Expand Down Expand Up @@ -171,80 +179,13 @@ std::vector<ObjectBufferPool::ChunkInfo> ObjectBufferPool::BuildChunks(
return chunks;
}

ray::Status ObjectBufferPool::EnsureBufferExists(const ObjectID &object_id,
const rpc::Address &owner_address,
uint64_t data_size,
uint64_t metadata_size,
uint64_t chunk_index) {
// Buffer for object_id already exists.
if (create_buffer_state_.contains(object_id)) {
return ray::Status::OK();
}

auto it = create_buffer_ops_.find(object_id);
if (it != create_buffer_ops_.end()) {
auto cond_var = it->second;
// Release pool_mutex_ while waiting, until there is no inflight create buffer
// operation for the object.
while (create_buffer_ops_.contains(object_id)) {
cond_var->Wait(&pool_mutex_);
}
// Buffer already created.
if (create_buffer_state_.contains(object_id)) {
return ray::Status::OK();
}
// Otherwise, previous create operation failed.
}

// Indicate the inflight create buffer operation, by inserting into create_buffer_ops_.
create_buffer_ops_[object_id] = std::make_shared<absl::CondVar>();
const int64_t object_size = data_size - metadata_size;
std::shared_ptr<Buffer> data;

// Release pool_mutex_ during the blocking create call.
pool_mutex_.Unlock();
Status s = store_client_.CreateAndSpillIfNeeded(
object_id, owner_address, object_size, nullptr, metadata_size, &data,
plasma::flatbuf::ObjectSource::ReceivedFromRemoteRaylet);
pool_mutex_.Lock();

// No other thread could have created the buffer.
RAY_CHECK(!create_buffer_state_.contains(object_id));

// Indicate the create operation for create_buffer_ops_ has finished.
it = create_buffer_ops_.find(object_id);
auto cond_var = it->second;
create_buffer_ops_.erase(it);

if (!s.ok()) {
// Create failed. Buffer creation will be tried by another chunk.
// And this chunk will eventually make it here via retried pull requests.
cond_var->SignalAll();
return ray::Status::IOError(s.message());
}

// Read object into store.
uint8_t *mutable_data = data->Data();
uint64_t num_chunks = GetNumChunks(data_size);
create_buffer_state_.emplace(
std::piecewise_construct, std::forward_as_tuple(object_id),
std::forward_as_tuple(BuildChunks(object_id, mutable_data, data_size, data)));
RAY_CHECK(create_buffer_state_[object_id].chunk_info.size() == num_chunks);
RAY_LOG(DEBUG) << "Created object " << object_id
<< " in plasma store, number of chunks: " << num_chunks
<< ", chunk index: " << chunk_index;

cond_var->SignalAll();
return ray::Status::OK();
}

void ObjectBufferPool::FreeObjects(const std::vector<ObjectID> &object_ids) {
absl::MutexLock lock(&pool_mutex_);
std::lock_guard<std::mutex> lock(pool_mutex_);
RAY_CHECK_OK(store_client_.Delete(object_ids));
}

std::string ObjectBufferPool::DebugString() const {
absl::MutexLock lock(&pool_mutex_);
std::lock_guard<std::mutex> lock(pool_mutex_);
std::stringstream result;
result << "BufferPool:";
result << "\n- create buffer state map size: " << create_buffer_state_.size();
Expand Down
Loading

0 comments on commit 13ad971

Please sign in to comment.