Skip to content

Commit

Permalink
[core][state] Adjust worker side reporting with batches && add debugs…
Browse files Browse the repository at this point in the history
…tring (remerging ray-project#31840) (ray-project#32057)

Remerging ray-project#31840

Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
  • Loading branch information
rickyyx authored and edoakes committed Mar 22, 2023
1 parent b7f1eee commit 7dc926d
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 68 deletions.
1 change: 1 addition & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -833,6 +833,7 @@ cc_library(
":stats_lib",
":worker_rpc",
"//src/ray/protobuf:worker_cc_proto",
"@boost//:circular_buffer",
"@boost//:fiber",
"@com_google_absl//absl/container:btree",
"@com_google_absl//absl/container:flat_hash_map",
Expand Down
11 changes: 8 additions & 3 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -457,9 +457,14 @@ RAY_CONFIG(int64_t, task_events_report_interval_ms, 1000)
RAY_CONFIG(int64_t, task_events_max_num_task_in_gcs, 100000)

/// Max number of task events stored in the buffer on workers. Any additional events
/// will be dropped.
/// Setting the value to -1 allows for unlimited task events buffered on workers.
RAY_CONFIG(int64_t, task_events_max_num_task_events_in_buffer, 10000)
/// will be dropped. This is set to a large value to avoid worker side data loss.
/// For now, avg size of task event is 200Bytes, 1M task events would incur 200MiB
/// overhead.
RAY_CONFIG(uint64_t, task_events_max_num_task_events_in_buffer, 1 * 1000 * 1000)

/// Max number of task events to be send in a single message to GCS. This caps both
/// the message size, and also the processing work on GCS.
RAY_CONFIG(uint64_t, task_events_send_batch_size, 10 * 1000)

/// Max number of profile events allowed for a single task when sent to GCS.
/// NOTE: this limit only applies to the profile events per task in a single
Expand Down
5 changes: 4 additions & 1 deletion src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,10 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
periodical_runner_.RunFnPeriodically(
[this] {
RAY_LOG(INFO) << "Event stats:\n\n"
<< io_service_.stats().StatsString() << "\n\n";
<< io_service_.stats().StatsString() << "\n\n"
<< "-----------------\n"
<< "Task Event stats:\n"
<< task_event_buffer_->DebugString() << "\n";
},
event_stats_print_interval_ms);
}
Expand Down
139 changes: 82 additions & 57 deletions src/ray/core_worker/task_event_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,17 @@ namespace worker {
TaskEventBufferImpl::TaskEventBufferImpl(std::unique_ptr<gcs::GcsClient> gcs_client)
: work_guard_(boost::asio::make_work_guard(io_service_)),
periodical_runner_(io_service_),
gcs_client_(std::move(gcs_client)) {}
gcs_client_(std::move(gcs_client)),
buffer_() {}

Status TaskEventBufferImpl::Start(bool auto_flush) {
absl::MutexLock lock(&mutex_);
auto report_interval_ms = RayConfig::instance().task_events_report_interval_ms();
RAY_CHECK(report_interval_ms > 0)
<< "RAY_task_events_report_interval_ms should be > 0 to use TaskEventBuffer.";

buffer_.reserve(RayConfig::instance().task_events_max_num_task_events_in_buffer());

buffer_.set_capacity(
{RayConfig::instance().task_events_max_num_task_events_in_buffer()});
// Reporting to GCS, set up gcs client and and events flushing.
auto status = gcs_client_->Connect(io_service_);
if (!status.ok()) {
Expand Down Expand Up @@ -100,17 +101,13 @@ void TaskEventBufferImpl::AddTaskEvent(rpc::TaskEvents task_events) {
absl::MutexLock lock(&mutex_);

auto limit = RayConfig::instance().task_events_max_num_task_events_in_buffer();
if (limit > 0 && buffer_.size() >= static_cast<size_t>(limit)) {
// Too many task events, start overriding older ones.
if (buffer_[next_idx_to_overwrite_].has_profile_events()) {
if (limit > 0 && buffer_.full()) {
const auto &to_evict = buffer_.front();
if (to_evict.has_profile_events()) {
num_profile_task_events_dropped_++;
} else {
num_status_task_events_dropped_++;
}

buffer_[next_idx_to_overwrite_] = std::move(task_events);
next_idx_to_overwrite_ = (next_idx_to_overwrite_ + 1) % limit;
return;
}
buffer_.push_back(std::move(task_events));
}
Expand All @@ -119,20 +116,13 @@ void TaskEventBufferImpl::FlushEvents(bool forced) {
if (!enabled_) {
return;
}
std::vector<rpc::TaskEvents> task_events;
size_t num_status_task_events_dropped = 0;
size_t num_profile_task_events_dropped = 0;
std::vector<rpc::TaskEvents> to_send;

{
absl::MutexLock lock(&mutex_);

RAY_LOG_EVERY_MS(INFO, 15000)
<< "Pushed task state events to GCS. [total_bytes="
<< (1.0 * total_events_bytes_) / 1024 / 1024
<< "MiB][total_count=" << total_num_events_
<< "][total_status_task_events_dropped=" << num_status_task_events_dropped_
<< "][total_profile_task_events_dropped=" << num_profile_task_events_dropped_
<< "][cur_buffer_size=" << buffer_.size() << "].";

// Skip if GCS hasn't finished processing the previous message.
if (grpc_in_progress_ && !forced) {
RAY_LOG_EVERY_N_OR_DEBUG(WARNING, 100)
Expand All @@ -143,72 +133,70 @@ void TaskEventBufferImpl::FlushEvents(bool forced) {
return;
}

if (buffer_.size() == 0) {
// No data to send.
if (buffer_.empty()) {
return;
}

task_events.reserve(
RayConfig::instance().task_events_max_num_task_events_in_buffer());
buffer_.swap(task_events);
next_idx_to_overwrite_ = 0;
size_t num_to_send =
std::min(static_cast<size_t>(RayConfig::instance().task_events_send_batch_size()),
static_cast<size_t>(buffer_.size()));
to_send.insert(to_send.end(),
std::make_move_iterator(buffer_.begin()),
std::make_move_iterator(buffer_.begin() + num_to_send));
buffer_.erase(buffer_.begin(), buffer_.begin() + num_to_send);

// Send and reset the counters
num_profile_task_events_dropped = num_profile_task_events_dropped_;
num_profile_task_events_dropped_ = 0;

num_status_task_events_dropped = num_status_task_events_dropped_;
num_status_task_events_dropped_ = 0;
}

// Merge multiple events from a single task attempt run into one task event.
absl::flat_hash_map<std::pair<std::string, int>, rpc::TaskEvents> task_events_map;
// Convert to rpc::TaskEventsData
auto data = std::make_unique<rpc::TaskEventData>();
data->set_num_profile_task_events_dropped(num_profile_task_events_dropped);
data->set_num_status_task_events_dropped(num_status_task_events_dropped);

size_t num_task_events = to_send.size();
size_t num_profile_event_to_send = 0;
size_t num_status_event_to_send = 0;
for (auto event : task_events) {
if (event.has_profile_events()) {
for (auto &task_event : to_send) {
auto events_by_task = data->add_events_by_task();
if (task_event.has_profile_events()) {
num_profile_event_to_send++;
} else {
num_status_event_to_send++;
}
auto &task_events_itr =
task_events_map[std::make_pair(event.task_id(), event.attempt_number())];
task_events_itr.MergeFrom(event);
}

// Convert to rpc::TaskEventsData
auto data = std::make_unique<rpc::TaskEventData>();
data->set_num_profile_task_events_dropped(num_profile_task_events_dropped);
data->set_num_status_task_events_dropped(num_status_task_events_dropped);

auto num_task_events = task_events_map.size();
for (auto itr : task_events_map) {
auto events_by_task = data->add_events_by_task();
events_by_task->Swap(&itr.second);
events_by_task->Swap(&task_event);
}

gcs::TaskInfoAccessor *task_accessor;
{
// Sending the protobuf to GCS.
absl::MutexLock lock(&mutex_);
// Some debug tracking.
total_num_events_ += num_task_events;
total_events_bytes_ += data->ByteSizeLong();

auto on_complete = [this, num_task_events](const Status &status) {
absl::MutexLock lock(&mutex_);
if (!status.ok()) {
RAY_LOG(WARNING) << "Failed to push " << num_task_events
<< " task state events to GCS. Data will be lost. [status="
<< status.ToString() << "]";
} else {
RAY_LOG(DEBUG) << "Push " << num_task_events << " task state events to GCS.";
}
grpc_in_progress_ = false;
};

// The flag should be unset when on_complete is invoked.
grpc_in_progress_ = true;
auto status =
gcs_client_->Tasks().AsyncAddTaskEventData(std::move(data), on_complete);
task_accessor = &gcs_client_->Tasks();
}

auto on_complete = [this, num_task_events](const Status &status) {
absl::MutexLock lock(&mutex_);
if (!status.ok()) {
RAY_LOG(WARNING) << "Failed to push " << num_task_events
<< " task state events to GCS. Data will be lost. [status="
<< status.ToString() << "]";
}
grpc_in_progress_ = false;
};

auto status = task_accessor->AsyncAddTaskEventData(std::move(data), on_complete);
{
absl::MutexLock lock(&mutex_);
if (!status.ok()) {
// If we couldn't even send the data by invoking client side callbacks, there's
// something seriously wrong, and losing data in this case should not be too
Expand All @@ -225,6 +213,43 @@ void TaskEventBufferImpl::FlushEvents(bool forced) {
}
}

const std::string TaskEventBufferImpl::DebugString() {
std::stringstream ss;

if (!Enabled()) {
ss << "Task Event Buffer is disabled.";
return ss.str();
}

bool grpc_in_progress;
size_t num_status_task_events_dropped, num_profile_task_events_dropped,
data_buffer_size;
uint64_t total_events_bytes, total_num_events;

{
absl::MutexLock lock(&mutex_);
grpc_in_progress = grpc_in_progress_;
num_status_task_events_dropped = num_status_task_events_dropped_;
num_profile_task_events_dropped = num_profile_task_events_dropped_;
total_events_bytes = total_events_bytes_;
total_num_events = total_num_events_;
data_buffer_size = buffer_.size();
}

ss << "\nIO Service Stats:\n";
ss << io_service_.stats().StatsString();
ss << "\nOther Stats:"
<< "\n\tgrpc_in_progress:" << grpc_in_progress
<< "\n\tcurrent number of task events in buffer: " << data_buffer_size
<< "\n\ttotal task events sent: " << 1.0 * total_events_bytes / 1024 / 1024 << " MiB"
<< "\n\ttotal number of task events sent: " << total_num_events
<< "\n\tnum status task events dropped: " << num_status_task_events_dropped
<< "\n\tnum profile task events dropped: " << num_profile_task_events_dropped
<< "\n";

return ss.str();
}

} // namespace worker

} // namespace core
Expand Down
16 changes: 10 additions & 6 deletions src/ray/core_worker/task_event_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <boost/circular_buffer.hpp>
#include <memory>
#include <string>

Expand Down Expand Up @@ -92,6 +93,9 @@ class TaskEventBuffer {
///
/// The TaskEventBuffer will be disabled if Start() returns not ok.
virtual bool Enabled() const = 0;

/// Return a string that describes the task event buffer stats.
virtual const std::string DebugString() = 0;
};

/// Implementation of TaskEventBuffer.
Expand All @@ -117,11 +121,13 @@ class TaskEventBufferImpl : public TaskEventBuffer {

bool Enabled() const override;

const std::string DebugString() LOCKS_EXCLUDED(mutex_) override;

private:
/// Test only functions.
std::vector<rpc::TaskEvents> GetAllTaskEvents() LOCKS_EXCLUDED(mutex_) {
absl::MutexLock lock(&mutex_);
std::vector<rpc::TaskEvents> copy(buffer_);
std::vector<rpc::TaskEvents> copy(buffer_.begin(), buffer_.end());
return copy;
}

Expand Down Expand Up @@ -164,11 +170,8 @@ class TaskEventBufferImpl : public TaskEventBuffer {
/// True if the TaskEventBuffer is enabled.
std::atomic<bool> enabled_ = false;

/// Buffered task events.
std::vector<rpc::TaskEvents> buffer_ GUARDED_BY(mutex_);

/// A iterator into buffer_ that determines which element to be overwritten.
size_t next_idx_to_overwrite_ GUARDED_BY(mutex_) = 0;
/// Circular buffered task events.
boost::circular_buffer_space_optimized<rpc::TaskEvents> buffer_ GUARDED_BY(mutex_);

/// Number of profile task events dropped since the last report flush.
size_t num_profile_task_events_dropped_ GUARDED_BY(mutex_) = 0;
Expand All @@ -188,6 +191,7 @@ class TaskEventBufferImpl : public TaskEventBuffer {
uint64_t total_num_events_ GUARDED_BY(mutex_) = 0;

FRIEND_TEST(TaskEventBufferTestManualStart, TestGcsClientFail);
FRIEND_TEST(TaskEventBufferTestBatchSend, TestBatchedSend);
FRIEND_TEST(TaskEventBufferTest, TestAddEvent);
FRIEND_TEST(TaskEventBufferTest, TestFlushEvents);
FRIEND_TEST(TaskEventBufferTest, TestFailedFlush);
Expand Down
58 changes: 57 additions & 1 deletion src/ray/core_worker/test/task_event_buffer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ class TaskEventBufferTest : public ::testing::Test {
R"(
{
"task_events_report_interval_ms": 1000,
"task_events_max_num_task_events_in_buffer": 100
"task_events_max_num_task_events_in_buffer": 100,
"task_events_send_batch_size": 100
}
)");

Expand Down Expand Up @@ -91,6 +92,20 @@ class TaskEventBufferTestManualStart : public TaskEventBufferTest {
void SetUp() override {}
};

class TaskEventBufferTestBatchSend : public TaskEventBufferTest {
public:
TaskEventBufferTestBatchSend() : TaskEventBufferTest() {
RayConfig::instance().initialize(
R"(
{
"task_events_report_interval_ms": 1000,
"task_events_max_num_task_events_in_buffer": 100,
"task_events_send_batch_size": 10
}
)");
}
};

TEST_F(TaskEventBufferTestManualStart, TestGcsClientFail) {
ASSERT_NE(task_event_buffer_, nullptr);

Expand Down Expand Up @@ -270,6 +285,47 @@ TEST_F(TaskEventBufferTest, TestForcedFlush) {
task_event_buffer_->FlushEvents(true);
}

TEST_F(TaskEventBufferTestBatchSend, TestBatchedSend) {
size_t num_events = 100;
size_t batch_size = 10; // Sync with constructor.
std::vector<TaskID> task_ids;
// Adding some events
for (size_t i = 0; i < num_events; ++i) {
auto task_id = RandomTaskId();
task_ids.push_back(task_id);
task_event_buffer_->AddTaskEvent(GenStatusTaskEvents(task_id, 0));
}

auto task_gcs_accessor =
static_cast<ray::gcs::MockGcsClient *>(task_event_buffer_->GetGcsClient())
->mock_task_accessor;

size_t i = 0;
// With batch size = 10, there should be 10 flush calls
EXPECT_CALL(*task_gcs_accessor, AsyncAddTaskEventData)
.Times(num_events / batch_size)
.WillRepeatedly(
[&i, &batch_size, &task_ids](std::unique_ptr<rpc::TaskEventData> actual_data,
ray::gcs::StatusCallback callback) {
EXPECT_EQ(actual_data->events_by_task_size(), batch_size);
for (const auto &task : actual_data->events_by_task()) {
// Assert sent data in order.
EXPECT_EQ(task_ids[i++].Binary(), task.task_id());
}
callback(Status::OK());
return Status::OK();
});

for (int i = 0; i * batch_size < num_events; i++) {
task_event_buffer_->FlushEvents(false);
EXPECT_EQ(task_event_buffer_->GetAllTaskEvents().size(),
num_events - (i + 1) * batch_size);
}

// With last flush, there should be no more events in the buffer and as data.
EXPECT_EQ(task_event_buffer_->GetAllTaskEvents().size(), 0);
}

TEST_F(TaskEventBufferTest, TestBufferSizeLimit) {
size_t num_limit = 100; // Synced with test setup
size_t num_profile = 50;
Expand Down
2 changes: 2 additions & 0 deletions src/ray/core_worker/test/task_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ class MockTaskEventBuffer : public worker::TaskEventBuffer {
MOCK_METHOD(void, Stop, (), (override));

MOCK_METHOD(bool, Enabled, (), (const, override));

MOCK_METHOD(const std::string, DebugString, (), (override));
};

class TaskManagerTest : public ::testing::Test {
Expand Down

0 comments on commit 7dc926d

Please sign in to comment.