Skip to content

Commit a4261b5

Browse files
authored
[core][onevent] implement even merge logic at export time (#58070)
RayEvent provides a special API, merge, which allows multiple events to be combined into a single event. This reduces gRPC message size, network bandwidth usage, and is essential for scaling task event exports. This PR leverages that feature. Specifically, it clusters events into groups based on (i) entity ID and (ii) event type. Each group is merged into a single event, which is then added to the gRPC message body. The EntityId is a user-defined function, implemented by the event class creator, that determines which events can be safely merged. ``` Note: this is a redo of #56558 which gets converted because it randomize the order the events that get exported, lead to flaky tests etc. This attempt maintain the order even after merging. ``` Test: - CI Signed-off-by: Cuong Nguyen <can@anyscale.com>
1 parent e49c09b commit a4261b5

File tree

3 files changed

+49
-2
lines changed

3 files changed

+49
-2
lines changed

src/ray/observability/ray_event_recorder.cc

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,23 @@ void RayEventRecorder::ExportEvents() {
5656
}
5757
rpc::events::AddEventsRequest request;
5858
rpc::events::RayEventsData ray_event_data;
59-
// TODO(#56391): To further optimize the performance, we can merge multiple
60-
// events with the same resource ID into a single event.
59+
// group the event in the buffer_ by their entity id and type; then for each group,
60+
// merge the events into a single event. maintain the order of the events in the buffer.
61+
std::list<std::unique_ptr<RayEventInterface>> grouped_events;
62+
absl::flat_hash_map<RayEventKey,
63+
std::list<std::unique_ptr<RayEventInterface>>::iterator>
64+
event_key_to_iterator;
6165
for (auto &event : buffer_) {
66+
auto key = std::make_pair(event->GetEntityId(), event->GetEventType());
67+
auto [it, inserted] = event_key_to_iterator.try_emplace(key);
68+
if (inserted) {
69+
grouped_events.push_back(std::move(event));
70+
event_key_to_iterator[key] = std::prev(grouped_events.end());
71+
} else {
72+
(*it->second)->Merge(std::move(*event));
73+
}
74+
}
75+
for (auto &event : grouped_events) {
6276
rpc::events::RayEvent ray_event = std::move(*event).Serialize();
6377
*ray_event_data.mutable_events()->Add() = std::move(ray_event);
6478
}

src/ray/observability/ray_event_recorder.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ class RayEventRecorder : public RayEventRecorderInterface {
5353
void AddEvents(std::vector<std::unique_ptr<RayEventInterface>> &&data_list);
5454

5555
private:
56+
using RayEventKey = std::pair<std::string, rpc::events::RayEvent::EventType>;
57+
5658
rpc::EventAggregatorClient &event_aggregator_client_;
5759
std::shared_ptr<PeriodicalRunner> periodical_runner_;
5860
// Lock for thread safety when modifying the buffer.

src/ray/observability/tests/ray_event_recorder_test.cc

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,37 @@ class RayEventRecorderTest : public ::testing::Test {
7878
size_t max_buffer_size_ = 5;
7979
};
8080

81+
TEST_F(RayEventRecorderTest, TestMergeEvents) {
82+
RayConfig::instance().initialize(
83+
R"(
84+
{
85+
"enable_ray_event": true
86+
}
87+
)");
88+
recorder_->StartExportingEvents();
89+
rpc::JobTableData data;
90+
data.set_job_id("test_job_id");
91+
92+
std::vector<std::unique_ptr<RayEventInterface>> events;
93+
events.push_back(std::make_unique<RayDriverJobLifecycleEvent>(
94+
data, rpc::events::DriverJobLifecycleEvent::CREATED, "test_session_name"));
95+
events.push_back(std::make_unique<RayDriverJobLifecycleEvent>(
96+
data, rpc::events::DriverJobLifecycleEvent::FINISHED, "test_session_name"));
97+
recorder_->AddEvents(std::move(events));
98+
io_service_.run_one();
99+
100+
std::vector<rpc::events::RayEvent> recorded_events = fake_client_->GetRecordedEvents();
101+
// Only one event should be recorded because the two events are merged into one.
102+
ASSERT_EQ(recorded_events.size(), 1);
103+
ASSERT_EQ(recorded_events[0].source_type(), rpc::events::RayEvent::GCS);
104+
ASSERT_EQ(recorded_events[0].session_name(), "test_session_name");
105+
auto state_transitions =
106+
recorded_events[0].driver_job_lifecycle_event().state_transitions();
107+
ASSERT_EQ(state_transitions.size(), 2);
108+
ASSERT_EQ(state_transitions[0].state(), rpc::events::DriverJobLifecycleEvent::CREATED);
109+
ASSERT_EQ(state_transitions[1].state(), rpc::events::DriverJobLifecycleEvent::FINISHED);
110+
}
111+
81112
TEST_F(RayEventRecorderTest, TestRecordEvents) {
82113
RayConfig::instance().initialize(
83114
R"(

0 commit comments

Comments
 (0)