From 7dc926d567e6d0acb546c1d83a33f73c6e65812b Mon Sep 17 00:00:00 2001 From: Ricky Xu Date: Tue, 31 Jan 2023 21:15:03 -0500 Subject: [PATCH] [core][state] Adjust worker side reporting with batches && add debugstring (remerging #31840) (#32057) Remerging #31840 Signed-off-by: Edward Oakes --- BUILD.bazel | 1 + src/ray/common/ray_config_def.h | 11 +- src/ray/core_worker/core_worker.cc | 5 +- src/ray/core_worker/task_event_buffer.cc | 139 +++++++++++------- src/ray/core_worker/task_event_buffer.h | 16 +- .../test/task_event_buffer_test.cc | 58 +++++++- src/ray/core_worker/test/task_manager_test.cc | 2 + 7 files changed, 164 insertions(+), 68 deletions(-) diff --git a/BUILD.bazel b/BUILD.bazel index 39c9ddd285da..20759e151419 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -833,6 +833,7 @@ cc_library( ":stats_lib", ":worker_rpc", "//src/ray/protobuf:worker_cc_proto", + "@boost//:circular_buffer", "@boost//:fiber", "@com_google_absl//absl/container:btree", "@com_google_absl//absl/container:flat_hash_map", diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index e0714fdeac75..3a133ea6d510 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -457,9 +457,14 @@ RAY_CONFIG(int64_t, task_events_report_interval_ms, 1000) RAY_CONFIG(int64_t, task_events_max_num_task_in_gcs, 100000) /// Max number of task events stored in the buffer on workers. Any additional events -/// will be dropped. -/// Setting the value to -1 allows for unlimited task events buffered on workers. -RAY_CONFIG(int64_t, task_events_max_num_task_events_in_buffer, 10000) +/// will be dropped. This is set to a large value to avoid worker side data loss. +/// For now, avg size of task event is 200Bytes, 1M task events would incur 200MiB +/// overhead. +RAY_CONFIG(uint64_t, task_events_max_num_task_events_in_buffer, 1 * 1000 * 1000) + +/// Max number of task events to be send in a single message to GCS. This caps both +/// the message size, and also the processing work on GCS. +RAY_CONFIG(uint64_t, task_events_send_batch_size, 10 * 1000) /// Max number of profile events allowed for a single task when sent to GCS. /// NOTE: this limit only applies to the profile events per task in a single diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index c3ea072b52db..b58aa8a44b73 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -531,7 +531,10 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ periodical_runner_.RunFnPeriodically( [this] { RAY_LOG(INFO) << "Event stats:\n\n" - << io_service_.stats().StatsString() << "\n\n"; + << io_service_.stats().StatsString() << "\n\n" + << "-----------------\n" + << "Task Event stats:\n" + << task_event_buffer_->DebugString() << "\n"; }, event_stats_print_interval_ms); } diff --git a/src/ray/core_worker/task_event_buffer.cc b/src/ray/core_worker/task_event_buffer.cc index 10bc43028b94..03cb3a4fbda0 100644 --- a/src/ray/core_worker/task_event_buffer.cc +++ b/src/ray/core_worker/task_event_buffer.cc @@ -22,7 +22,8 @@ namespace worker { TaskEventBufferImpl::TaskEventBufferImpl(std::unique_ptr gcs_client) : work_guard_(boost::asio::make_work_guard(io_service_)), periodical_runner_(io_service_), - gcs_client_(std::move(gcs_client)) {} + gcs_client_(std::move(gcs_client)), + buffer_() {} Status TaskEventBufferImpl::Start(bool auto_flush) { absl::MutexLock lock(&mutex_); @@ -30,8 +31,8 @@ Status TaskEventBufferImpl::Start(bool auto_flush) { RAY_CHECK(report_interval_ms > 0) << "RAY_task_events_report_interval_ms should be > 0 to use TaskEventBuffer."; - buffer_.reserve(RayConfig::instance().task_events_max_num_task_events_in_buffer()); - + buffer_.set_capacity( + {RayConfig::instance().task_events_max_num_task_events_in_buffer()}); // Reporting to GCS, set up gcs client and and events flushing. auto status = gcs_client_->Connect(io_service_); if (!status.ok()) { @@ -100,17 +101,13 @@ void TaskEventBufferImpl::AddTaskEvent(rpc::TaskEvents task_events) { absl::MutexLock lock(&mutex_); auto limit = RayConfig::instance().task_events_max_num_task_events_in_buffer(); - if (limit > 0 && buffer_.size() >= static_cast(limit)) { - // Too many task events, start overriding older ones. - if (buffer_[next_idx_to_overwrite_].has_profile_events()) { + if (limit > 0 && buffer_.full()) { + const auto &to_evict = buffer_.front(); + if (to_evict.has_profile_events()) { num_profile_task_events_dropped_++; } else { num_status_task_events_dropped_++; } - - buffer_[next_idx_to_overwrite_] = std::move(task_events); - next_idx_to_overwrite_ = (next_idx_to_overwrite_ + 1) % limit; - return; } buffer_.push_back(std::move(task_events)); } @@ -119,20 +116,13 @@ void TaskEventBufferImpl::FlushEvents(bool forced) { if (!enabled_) { return; } - std::vector task_events; size_t num_status_task_events_dropped = 0; size_t num_profile_task_events_dropped = 0; + std::vector to_send; + { absl::MutexLock lock(&mutex_); - RAY_LOG_EVERY_MS(INFO, 15000) - << "Pushed task state events to GCS. [total_bytes=" - << (1.0 * total_events_bytes_) / 1024 / 1024 - << "MiB][total_count=" << total_num_events_ - << "][total_status_task_events_dropped=" << num_status_task_events_dropped_ - << "][total_profile_task_events_dropped=" << num_profile_task_events_dropped_ - << "][cur_buffer_size=" << buffer_.size() << "]."; - // Skip if GCS hasn't finished processing the previous message. if (grpc_in_progress_ && !forced) { RAY_LOG_EVERY_N_OR_DEBUG(WARNING, 100) @@ -143,15 +133,20 @@ void TaskEventBufferImpl::FlushEvents(bool forced) { return; } - if (buffer_.size() == 0) { + // No data to send. + if (buffer_.empty()) { return; } - task_events.reserve( - RayConfig::instance().task_events_max_num_task_events_in_buffer()); - buffer_.swap(task_events); - next_idx_to_overwrite_ = 0; + size_t num_to_send = + std::min(static_cast(RayConfig::instance().task_events_send_batch_size()), + static_cast(buffer_.size())); + to_send.insert(to_send.end(), + std::make_move_iterator(buffer_.begin()), + std::make_move_iterator(buffer_.begin() + num_to_send)); + buffer_.erase(buffer_.begin(), buffer_.begin() + num_to_send); + // Send and reset the counters num_profile_task_events_dropped = num_profile_task_events_dropped_; num_profile_task_events_dropped_ = 0; @@ -159,56 +154,49 @@ void TaskEventBufferImpl::FlushEvents(bool forced) { num_status_task_events_dropped_ = 0; } - // Merge multiple events from a single task attempt run into one task event. - absl::flat_hash_map, rpc::TaskEvents> task_events_map; + // Convert to rpc::TaskEventsData + auto data = std::make_unique(); + data->set_num_profile_task_events_dropped(num_profile_task_events_dropped); + data->set_num_status_task_events_dropped(num_status_task_events_dropped); + size_t num_task_events = to_send.size(); size_t num_profile_event_to_send = 0; size_t num_status_event_to_send = 0; - for (auto event : task_events) { - if (event.has_profile_events()) { + for (auto &task_event : to_send) { + auto events_by_task = data->add_events_by_task(); + if (task_event.has_profile_events()) { num_profile_event_to_send++; } else { num_status_event_to_send++; } - auto &task_events_itr = - task_events_map[std::make_pair(event.task_id(), event.attempt_number())]; - task_events_itr.MergeFrom(event); - } - - // Convert to rpc::TaskEventsData - auto data = std::make_unique(); - data->set_num_profile_task_events_dropped(num_profile_task_events_dropped); - data->set_num_status_task_events_dropped(num_status_task_events_dropped); - - auto num_task_events = task_events_map.size(); - for (auto itr : task_events_map) { - auto events_by_task = data->add_events_by_task(); - events_by_task->Swap(&itr.second); + events_by_task->Swap(&task_event); } + gcs::TaskInfoAccessor *task_accessor; { // Sending the protobuf to GCS. absl::MutexLock lock(&mutex_); // Some debug tracking. total_num_events_ += num_task_events; total_events_bytes_ += data->ByteSizeLong(); - - auto on_complete = [this, num_task_events](const Status &status) { - absl::MutexLock lock(&mutex_); - if (!status.ok()) { - RAY_LOG(WARNING) << "Failed to push " << num_task_events - << " task state events to GCS. Data will be lost. [status=" - << status.ToString() << "]"; - } else { - RAY_LOG(DEBUG) << "Push " << num_task_events << " task state events to GCS."; - } - grpc_in_progress_ = false; - }; - // The flag should be unset when on_complete is invoked. grpc_in_progress_ = true; - auto status = - gcs_client_->Tasks().AsyncAddTaskEventData(std::move(data), on_complete); + task_accessor = &gcs_client_->Tasks(); + } + + auto on_complete = [this, num_task_events](const Status &status) { + absl::MutexLock lock(&mutex_); + if (!status.ok()) { + RAY_LOG(WARNING) << "Failed to push " << num_task_events + << " task state events to GCS. Data will be lost. [status=" + << status.ToString() << "]"; + } + grpc_in_progress_ = false; + }; + + auto status = task_accessor->AsyncAddTaskEventData(std::move(data), on_complete); + { + absl::MutexLock lock(&mutex_); if (!status.ok()) { // If we couldn't even send the data by invoking client side callbacks, there's // something seriously wrong, and losing data in this case should not be too @@ -225,6 +213,43 @@ void TaskEventBufferImpl::FlushEvents(bool forced) { } } +const std::string TaskEventBufferImpl::DebugString() { + std::stringstream ss; + + if (!Enabled()) { + ss << "Task Event Buffer is disabled."; + return ss.str(); + } + + bool grpc_in_progress; + size_t num_status_task_events_dropped, num_profile_task_events_dropped, + data_buffer_size; + uint64_t total_events_bytes, total_num_events; + + { + absl::MutexLock lock(&mutex_); + grpc_in_progress = grpc_in_progress_; + num_status_task_events_dropped = num_status_task_events_dropped_; + num_profile_task_events_dropped = num_profile_task_events_dropped_; + total_events_bytes = total_events_bytes_; + total_num_events = total_num_events_; + data_buffer_size = buffer_.size(); + } + + ss << "\nIO Service Stats:\n"; + ss << io_service_.stats().StatsString(); + ss << "\nOther Stats:" + << "\n\tgrpc_in_progress:" << grpc_in_progress + << "\n\tcurrent number of task events in buffer: " << data_buffer_size + << "\n\ttotal task events sent: " << 1.0 * total_events_bytes / 1024 / 1024 << " MiB" + << "\n\ttotal number of task events sent: " << total_num_events + << "\n\tnum status task events dropped: " << num_status_task_events_dropped + << "\n\tnum profile task events dropped: " << num_profile_task_events_dropped + << "\n"; + + return ss.str(); +} + } // namespace worker } // namespace core diff --git a/src/ray/core_worker/task_event_buffer.h b/src/ray/core_worker/task_event_buffer.h index 145b8908e225..7deee0c2e3b7 100644 --- a/src/ray/core_worker/task_event_buffer.h +++ b/src/ray/core_worker/task_event_buffer.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include @@ -92,6 +93,9 @@ class TaskEventBuffer { /// /// The TaskEventBuffer will be disabled if Start() returns not ok. virtual bool Enabled() const = 0; + + /// Return a string that describes the task event buffer stats. + virtual const std::string DebugString() = 0; }; /// Implementation of TaskEventBuffer. @@ -117,11 +121,13 @@ class TaskEventBufferImpl : public TaskEventBuffer { bool Enabled() const override; + const std::string DebugString() LOCKS_EXCLUDED(mutex_) override; + private: /// Test only functions. std::vector GetAllTaskEvents() LOCKS_EXCLUDED(mutex_) { absl::MutexLock lock(&mutex_); - std::vector copy(buffer_); + std::vector copy(buffer_.begin(), buffer_.end()); return copy; } @@ -164,11 +170,8 @@ class TaskEventBufferImpl : public TaskEventBuffer { /// True if the TaskEventBuffer is enabled. std::atomic enabled_ = false; - /// Buffered task events. - std::vector buffer_ GUARDED_BY(mutex_); - - /// A iterator into buffer_ that determines which element to be overwritten. - size_t next_idx_to_overwrite_ GUARDED_BY(mutex_) = 0; + /// Circular buffered task events. + boost::circular_buffer_space_optimized buffer_ GUARDED_BY(mutex_); /// Number of profile task events dropped since the last report flush. size_t num_profile_task_events_dropped_ GUARDED_BY(mutex_) = 0; @@ -188,6 +191,7 @@ class TaskEventBufferImpl : public TaskEventBuffer { uint64_t total_num_events_ GUARDED_BY(mutex_) = 0; FRIEND_TEST(TaskEventBufferTestManualStart, TestGcsClientFail); + FRIEND_TEST(TaskEventBufferTestBatchSend, TestBatchedSend); FRIEND_TEST(TaskEventBufferTest, TestAddEvent); FRIEND_TEST(TaskEventBufferTest, TestFlushEvents); FRIEND_TEST(TaskEventBufferTest, TestFailedFlush); diff --git a/src/ray/core_worker/test/task_event_buffer_test.cc b/src/ray/core_worker/test/task_event_buffer_test.cc index 3a4d116daaec..5f8088cb0261 100644 --- a/src/ray/core_worker/test/task_event_buffer_test.cc +++ b/src/ray/core_worker/test/task_event_buffer_test.cc @@ -38,7 +38,8 @@ class TaskEventBufferTest : public ::testing::Test { R"( { "task_events_report_interval_ms": 1000, - "task_events_max_num_task_events_in_buffer": 100 + "task_events_max_num_task_events_in_buffer": 100, + "task_events_send_batch_size": 100 } )"); @@ -91,6 +92,20 @@ class TaskEventBufferTestManualStart : public TaskEventBufferTest { void SetUp() override {} }; +class TaskEventBufferTestBatchSend : public TaskEventBufferTest { + public: + TaskEventBufferTestBatchSend() : TaskEventBufferTest() { + RayConfig::instance().initialize( + R"( +{ + "task_events_report_interval_ms": 1000, + "task_events_max_num_task_events_in_buffer": 100, + "task_events_send_batch_size": 10 +} + )"); + } +}; + TEST_F(TaskEventBufferTestManualStart, TestGcsClientFail) { ASSERT_NE(task_event_buffer_, nullptr); @@ -270,6 +285,47 @@ TEST_F(TaskEventBufferTest, TestForcedFlush) { task_event_buffer_->FlushEvents(true); } +TEST_F(TaskEventBufferTestBatchSend, TestBatchedSend) { + size_t num_events = 100; + size_t batch_size = 10; // Sync with constructor. + std::vector task_ids; + // Adding some events + for (size_t i = 0; i < num_events; ++i) { + auto task_id = RandomTaskId(); + task_ids.push_back(task_id); + task_event_buffer_->AddTaskEvent(GenStatusTaskEvents(task_id, 0)); + } + + auto task_gcs_accessor = + static_cast(task_event_buffer_->GetGcsClient()) + ->mock_task_accessor; + + size_t i = 0; + // With batch size = 10, there should be 10 flush calls + EXPECT_CALL(*task_gcs_accessor, AsyncAddTaskEventData) + .Times(num_events / batch_size) + .WillRepeatedly( + [&i, &batch_size, &task_ids](std::unique_ptr actual_data, + ray::gcs::StatusCallback callback) { + EXPECT_EQ(actual_data->events_by_task_size(), batch_size); + for (const auto &task : actual_data->events_by_task()) { + // Assert sent data in order. + EXPECT_EQ(task_ids[i++].Binary(), task.task_id()); + } + callback(Status::OK()); + return Status::OK(); + }); + + for (int i = 0; i * batch_size < num_events; i++) { + task_event_buffer_->FlushEvents(false); + EXPECT_EQ(task_event_buffer_->GetAllTaskEvents().size(), + num_events - (i + 1) * batch_size); + } + + // With last flush, there should be no more events in the buffer and as data. + EXPECT_EQ(task_event_buffer_->GetAllTaskEvents().size(), 0); +} + TEST_F(TaskEventBufferTest, TestBufferSizeLimit) { size_t num_limit = 100; // Synced with test setup size_t num_profile = 50; diff --git a/src/ray/core_worker/test/task_manager_test.cc b/src/ray/core_worker/test/task_manager_test.cc index 3bdb156b3eae..1cd73cda00a6 100644 --- a/src/ray/core_worker/test/task_manager_test.cc +++ b/src/ray/core_worker/test/task_manager_test.cc @@ -62,6 +62,8 @@ class MockTaskEventBuffer : public worker::TaskEventBuffer { MOCK_METHOD(void, Stop, (), (override)); MOCK_METHOD(bool, Enabled, (), (const, override)); + + MOCK_METHOD(const std::string, DebugString, (), (override)); }; class TaskManagerTest : public ::testing::Test {