Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Object manager] don't abort entire pull request on race condition from concurrent chunk receive - #2 #19216

Merged
merged 3 commits into from
Oct 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 46 additions & 2 deletions python/ray/tests/test_object_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,6 @@ def driver():
ray.get(driver.remote())


# TODO(ekl) this sometimes takes much longer (10+s) due to a higher level
# pull retry. We should try to resolve these hangs in the chunk transfer logic.
def test_pull_bundles_admission_control(shutdown_only):
cluster = Cluster()
object_size = int(6e6)
Expand Down Expand Up @@ -605,6 +603,52 @@ def task(x):
ray.get(t, timeout=10)


@pytest.mark.parametrize(
"ray_start_cluster_head", [{
"num_cpus": 0,
"object_store_memory": 75 * 1024 * 1024,
"_system_config": {
"worker_lease_timeout_milliseconds": 0,
"object_manager_pull_timeout_ms": 20000,
"object_spilling_threshold": 1.0,
}
}],
indirect=True)
def test_maximize_concurrent_pull_race_condition(ray_start_cluster_head):
# Test if https://github.com/ray-project/ray/issues/18062 is mitigated
cluster = ray_start_cluster_head
cluster.add_node(num_cpus=8, object_store_memory=75 * 1024 * 1024)

@ray.remote
class RemoteObjectCreator:
def put(self, i):
return np.random.rand(i * 1024 * 1024) # 8 MB data

def idle(self):
pass

@ray.remote
def f(x):
print(f"timestamp={time.time()} pulled {len(x)*8} bytes")
time.sleep(1)
return

remote_obj_creator = RemoteObjectCreator.remote()
remote_refs = [remote_obj_creator.put.remote(1) for _ in range(7)]
print(remote_refs)
# Make sure all objects are created.
ray.get(remote_obj_creator.idle.remote())

local_refs = [ray.put(np.random.rand(1 * 1024 * 1024)) for _ in range(20)]
remote_tasks = [f.remote(x) for x in local_refs]

start = time.time()
ray.get(remote_tasks)
end = time.time()
assert end - start < 20, "Too much time spent in pulling objects, " \
"check the amount of time in retries"


if __name__ == "__main__":
import pytest
import sys
Expand Down
3 changes: 1 addition & 2 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,7 @@ RAY_CONFIG(int64_t, worker_register_timeout_seconds, 30)
RAY_CONFIG(int64_t, redis_db_connect_retries, 50)
RAY_CONFIG(int64_t, redis_db_connect_wait_milliseconds, 100)

/// Timeout, in milliseconds, to wait before retrying a failed pull in the
/// ObjectManager.
/// The object manager's global timer interval in milliseconds.
RAY_CONFIG(int, object_manager_timer_freq_ms, 100)

/// Timeout, in milliseconds, to wait before retrying a failed pull in the
Expand Down
165 changes: 114 additions & 51 deletions src/ray/object_manager/object_buffer_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,33 +14,57 @@

#include "ray/object_manager/object_buffer_pool.h"

#include "absl/time/time.h"
#include "ray/common/status.h"
#include "ray/util/logging.h"

namespace ray {

ObjectBufferPool::ObjectBufferPool(const std::string &store_socket_name,
uint64_t chunk_size)
: default_chunk_size_(chunk_size) {
store_socket_name_ = store_socket_name;
: store_socket_name_(store_socket_name), default_chunk_size_(chunk_size) {
RAY_CHECK_OK(store_client_.Connect(store_socket_name_.c_str(), "", 0, 300));
}

ObjectBufferPool::~ObjectBufferPool() {
// Abort everything in progress.
auto create_buf_state_copy = create_buffer_state_;
for (const auto &pair : create_buf_state_copy) {
AbortCreate(pair.first);
absl::MutexLock lock(&pool_mutex_);
auto inflight_ops = create_buffer_ops_;
pool_mutex_.Unlock();

for (const auto &[id, cond_var] : inflight_ops) {
cond_var->SignalAll();
}
auto no_inflight = [this]() {
pool_mutex_.AssertReaderHeld();
return create_buffer_ops_.empty();
};
// Assume no request would arrive, acquire pool_mutex_ when there is no inflight
// operation. Otherwise print an error.
if (!pool_mutex_.LockWhenWithTimeout(absl::Condition(&no_inflight), absl::Seconds(5))) {
RAY_LOG(ERROR)
<< create_buffer_ops_.size() << " remaining inflight create buffer operations "
<< "during ObjectBufferPool destruction. Either abort these operations before "
<< "destroying ObjectBufferPool, or refactor ObjectBufferPool to make it "
"unnecessary to wait for the operations' completion.";
}

// Abort unfinished buffers in progress.
for (auto it = create_buffer_state_.begin(); it != create_buffer_state_.end(); it++) {
RAY_CHECK_OK(store_client_.Release(it->first));
RAY_CHECK_OK(store_client_.Abort(it->first));
create_buffer_state_.erase(it);
}

RAY_CHECK(create_buffer_state_.empty());
RAY_CHECK_OK(store_client_.Disconnect());
}

uint64_t ObjectBufferPool::GetNumChunks(uint64_t data_size) {
uint64_t ObjectBufferPool::GetNumChunks(uint64_t data_size) const {
return (data_size + default_chunk_size_ - 1) / default_chunk_size_;
}

uint64_t ObjectBufferPool::GetBufferLength(uint64_t chunk_index, uint64_t data_size) {
uint64_t ObjectBufferPool::GetBufferLength(uint64_t chunk_index,
uint64_t data_size) const {
return (chunk_index + 1) * default_chunk_size_ > data_size
? data_size % default_chunk_size_
: default_chunk_size_;
Expand All @@ -49,7 +73,7 @@ uint64_t ObjectBufferPool::GetBufferLength(uint64_t chunk_index, uint64_t data_s
std::pair<std::shared_ptr<MemoryObjectReader>, ray::Status>
ObjectBufferPool::CreateObjectReader(const ObjectID &object_id,
rpc::Address owner_address) {
std::lock_guard<std::mutex> lock(pool_mutex_);
absl::MutexLock lock(&pool_mutex_);

std::vector<ObjectID> object_ids{object_id};
std::vector<plasma::ObjectBuffer> object_buffers(1);
Expand All @@ -76,53 +100,21 @@ ray::Status ObjectBufferPool::CreateChunk(const ObjectID &object_id,
const rpc::Address &owner_address,
uint64_t data_size, uint64_t metadata_size,
uint64_t chunk_index) {
std::unique_lock<std::mutex> lock(pool_mutex_);
if (create_buffer_state_.count(object_id) == 0) {
int64_t object_size = data_size - metadata_size;
// Try to create shared buffer.
std::shared_ptr<Buffer> data;

// Release the buffer pool lock during the blocking create call.
lock.unlock();
Status s = store_client_.CreateAndSpillIfNeeded(
object_id, owner_address, object_size, nullptr, metadata_size, &data,
plasma::flatbuf::ObjectSource::ReceivedFromRemoteRaylet);
lock.lock();

// Another thread may have succeeded in creating the chunk while the lock
// was released. In that case skip the remainder of the creation block.
if (create_buffer_state_.count(object_id) == 0) {
std::vector<boost::asio::mutable_buffer> buffer;
if (!s.ok()) {
// Create failed. The object may already exist locally. If something else went
// wrong, another chunk will succeed in creating the buffer, and this
// chunk will eventually make it here via pull requests.
return ray::Status::IOError(s.message());
}
// Read object into store.
uint8_t *mutable_data = data->Data();
uint64_t num_chunks = GetNumChunks(data_size);
create_buffer_state_.emplace(
std::piecewise_construct, std::forward_as_tuple(object_id),
std::forward_as_tuple(BuildChunks(object_id, mutable_data, data_size, data)));
RAY_LOG(DEBUG) << "Created object " << object_id
<< " in plasma store, number of chunks: " << num_chunks
<< ", chunk index: " << chunk_index;
RAY_CHECK(create_buffer_state_[object_id].chunk_info.size() == num_chunks);
}
}
if (create_buffer_state_[object_id].chunk_state[chunk_index] !=
CreateChunkState::AVAILABLE) {
absl::MutexLock lock(&pool_mutex_);
RAY_RETURN_NOT_OK(EnsureBufferExists(object_id, owner_address, data_size, metadata_size,
chunk_index));
auto &state = create_buffer_state_.at(object_id);
if (state.chunk_state[chunk_index] != CreateChunkState::AVAILABLE) {
// There can be only one reference to this chunk at any given time.
return ray::Status::IOError("Chunk already received by a different thread.");
}
create_buffer_state_[object_id].chunk_state[chunk_index] = CreateChunkState::REFERENCED;
state.chunk_state[chunk_index] = CreateChunkState::REFERENCED;
return ray::Status::OK();
}

void ObjectBufferPool::WriteChunk(const ObjectID &object_id, const uint64_t chunk_index,
const std::string &data) {
std::lock_guard<std::mutex> lock(pool_mutex_);
absl::MutexLock lock(&pool_mutex_);
auto it = create_buffer_state_.find(object_id);
if (it == create_buffer_state_.end() ||
it->second.chunk_state.at(chunk_index) != CreateChunkState::REFERENCED) {
Expand All @@ -148,7 +140,7 @@ void ObjectBufferPool::WriteChunk(const ObjectID &object_id, const uint64_t chun
}

void ObjectBufferPool::AbortCreate(const ObjectID &object_id) {
std::lock_guard<std::mutex> lock(pool_mutex_);
absl::MutexLock lock(&pool_mutex_);
auto it = create_buffer_state_.find(object_id);
if (it != create_buffer_state_.end()) {
RAY_LOG(INFO) << "Not enough memory to create requested object " << object_id
Expand Down Expand Up @@ -179,13 +171,84 @@ std::vector<ObjectBufferPool::ChunkInfo> ObjectBufferPool::BuildChunks(
return chunks;
}

ray::Status ObjectBufferPool::EnsureBufferExists(const ObjectID &object_id,
const rpc::Address &owner_address,
uint64_t data_size,
uint64_t metadata_size,
uint64_t chunk_index) {
while (true) {
// Buffer for object_id already exists.
if (create_buffer_state_.contains(object_id)) {
return ray::Status::OK();
}

auto it = create_buffer_ops_.find(object_id);
if (it == create_buffer_ops_.end()) {
// No inflight create buffer operation, proceed to start one.
break;
}

auto cond_var = it->second;
// Release pool_mutex_ while waiting, until the current inflight create buffer
// operation finishes.
cond_var->Wait(&pool_mutex_);
}

// Indicate that there is an inflight create buffer operation, by inserting into
// create_buffer_ops_.
RAY_CHECK(
create_buffer_ops_.insert({object_id, std::make_shared<absl::CondVar>()}).second);
const int64_t object_size =
static_cast<int64_t>(data_size) - static_cast<int64_t>(metadata_size);
std::shared_ptr<Buffer> data;

// Release pool_mutex_ during the blocking create call.
pool_mutex_.Unlock();
Status s = store_client_.CreateAndSpillIfNeeded(
object_id, owner_address, static_cast<int64_t>(object_size), nullptr,
static_cast<int64_t>(metadata_size), &data,
plasma::flatbuf::ObjectSource::ReceivedFromRemoteRaylet);
pool_mutex_.Lock();

// No other thread could have created the buffer.
RAY_CHECK(!create_buffer_state_.contains(object_id));

// Remove object_id from create_buffer_ops_ to indicate to the waiting ops that the
// inflight operation has finished. Wake up waiters so they can either start another
// create buffer op, or proceed after the buffer has been created.
{
auto it = create_buffer_ops_.find(object_id);
it->second->SignalAll();
create_buffer_ops_.erase(it);
}

if (!s.ok()) {
// Create failed. Buffer creation will be tried by another chunk.
// And this chunk will eventually make it here via retried pull requests.
return ray::Status::IOError(s.message());
}

// Read object into store.
uint8_t *mutable_data = data->Data();
uint64_t num_chunks = GetNumChunks(data_size);
create_buffer_state_.emplace(
std::piecewise_construct, std::forward_as_tuple(object_id),
std::forward_as_tuple(BuildChunks(object_id, mutable_data, data_size, data)));
RAY_CHECK(create_buffer_state_[object_id].chunk_info.size() == num_chunks);
RAY_LOG(DEBUG) << "Created object " << object_id
<< " in plasma store, number of chunks: " << num_chunks
<< ", chunk index: " << chunk_index;

return ray::Status::OK();
}

void ObjectBufferPool::FreeObjects(const std::vector<ObjectID> &object_ids) {
std::lock_guard<std::mutex> lock(pool_mutex_);
absl::MutexLock lock(&pool_mutex_);
RAY_CHECK_OK(store_client_.Delete(object_ids));
}

std::string ObjectBufferPool::DebugString() const {
std::lock_guard<std::mutex> lock(pool_mutex_);
absl::MutexLock lock(&pool_mutex_);
std::stringstream result;
result << "BufferPool:";
result << "\n- create buffer state map size: " << create_buffer_state_.size();
Expand Down
Loading