Skip to content

Conversation

@can-anyscale
Copy link
Contributor

RayEvent provides a special API, merge, which allows multiple events to be combined into a single event. This reduces gRPC message size, network bandwidth usage, and is essential for scaling task event exports. This PR leverages that feature.

Specifically, it clusters events into groups based on (i) entity ID and (ii) event type. Each group is merged into a single event, which is then added to the gRPC message body. The EntityId is a user-defined function, implemented by the event class creator, that determines which events can be safely merged.

Note: this is a redo of https://github.com/ray-project/ray/pull/56558 which gets converted because it randomize the order the events that get exported, lead to flaky tests etc. This attempt maintain the order even after merging.

Test:

  • CI

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces event merging logic at export time to optimize performance by reducing message size and network usage. Events are grouped by entity ID and type, and then merged. The implementation correctly maintains the order of events, addressing an issue from a previous attempt. I've added a couple of suggestions to further improve performance by reducing string copies during the event grouping process. The new test case for event merging is well-written and covers the new functionality appropriately.

Comment on lines 61 to 73
absl::flat_hash_map<std::pair<std::string, rpc::events::RayEvent::EventType>,
std::vector<std::unique_ptr<RayEventInterface>>>
event_groups;
std::vector<std::pair<std::string, rpc::events::RayEvent::EventType>> group_order;

for (auto &event : buffer_) {
rpc::events::RayEvent ray_event = std::move(*event).Serialize();
auto key = std::make_pair(event->GetEntityId(), event->GetEventType());
auto [it, inserted] = event_groups.try_emplace(key);
if (inserted) {
group_order.push_back(key);
}
it->second.push_back(std::move(event));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To improve performance and reduce string copies, you can change group_order to store pointers to the keys in event_groups instead of copying the keys. absl::flat_hash_map guarantees pointer stability on insertion, so this is safe. Additionally, using std::move with try_emplace is more efficient as it avoids copying the key if it's a new entry and does nothing if the key already exists.

Suggested change
absl::flat_hash_map<std::pair<std::string, rpc::events::RayEvent::EventType>,
std::vector<std::unique_ptr<RayEventInterface>>>
event_groups;
std::vector<std::pair<std::string, rpc::events::RayEvent::EventType>> group_order;
for (auto &event : buffer_) {
rpc::events::RayEvent ray_event = std::move(*event).Serialize();
auto key = std::make_pair(event->GetEntityId(), event->GetEventType());
auto [it, inserted] = event_groups.try_emplace(key);
if (inserted) {
group_order.push_back(key);
}
it->second.push_back(std::move(event));
}
absl::flat_hash_map<std::pair<std::string, rpc::events::RayEvent::EventType>,
std::vector<std::unique_ptr<RayEventInterface>>>
event_groups;
std::vector<const std::pair<std::string, rpc::events::RayEvent::EventType>*> group_order;
for (auto &event : buffer_) {
auto key = std::make_pair(event->GetEntityId(), event->GetEventType());
auto [it, inserted] = event_groups.try_emplace(std::move(key));
if (inserted) {
group_order.push_back(&it->first);
}
it->second.push_back(std::move(event));
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, i tried this and it segfault; the iter of the absl::flat_hash_map is not safe

Copy link
Contributor

@sampan-s-nayak sampan-s-nayak Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if we store the event order in a list (linked_list) instead of vector and store iterators to the item in the list in the hash_map?

iterators to items in linked lists are always stable, iterators to items in flat_hash_map can change which was probably why you encountered a segmentation fault.

and we can maybe do the merge in the first loop itself (if an event_key already exists in event_groups then we just merge with the item instead of differing it to later?

something like:

  absl::flat_hash_map<RayEventKey,std::list<std::unique_ptr<RayEventInterface>>::iterator> event_groups;
  std::list<std::unique_ptr<RayEventInterface>> grouped_events;

  for (auto &event : buffer_) {
    auto event_key = RayEventKey(event->GetEntityId(), event->GetEventType());
    auto it = event_groups.find(event_key);
    if (it == event_groups.end()) {
     // first occurrence of the eventKey
      grouped_events.emplace_back(std::move(event));
      event_groups.emplace(event_key, grouped_events.end());
    }
    else{
      // entry already exists merge
      it->second->Merge(std::move(*event));
    }
  }

so grouped_events should have the merged events in the right order

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh that's nit, let me give it a try

Comment on lines 74 to 75
for (auto &key : group_order) {
auto &events = event_groups[key];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To align with the change of group_order storing pointers, this loop should be updated to dereference the key pointers. Using at() is also slightly safer here than operator[] as it expresses the intent that the key must exist.

Suggested change
for (auto &key : group_order) {
auto &events = event_groups[key];
for (auto *key : group_order) {
auto &events = event_groups.at(*key);

@can-anyscale can-anyscale added the go add ONLY when ready to merge, run all tests label Oct 24, 2025
@ray-gardener ray-gardener bot added core Issues that should be addressed in Ray Core observability Issues related to the Ray Dashboard, Logging, Metrics, Tracing, and/or Profiling labels Oct 24, 2025
// events with the same resource ID into a single event.
// group the event in the buffer_ by their entity id and type; then for each group,
// merge the events into a single event. maintain the order of the events in the buffer.
absl::flat_hash_map<std::pair<std::string, rpc::events::RayEvent::EventType>,
Copy link
Contributor

@sampan-s-nayak sampan-s-nayak Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we define a dataclass instead of using a pair here for better readability

struct RayEventKey {
  std::string entity_id;
  rpc::events::RayEvent::EventType event_type;
}

and then use that in

absl::flat_hash_map<RayEventKey,std::vector<std::unique_ptr<RayEventInterface>>> event_groups;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

totally

@sampan-s-nayak
Copy link
Contributor

Should we also consider merging on the aggregator side?
Because of the event recorders’ flush interval, we’re not guaranteed to merge all related events.
For example:

  • Suppose an actor lifecycle event with state DEPENDENCIES_UNREADY is received and flushed to the aggregator.
  • After that flush, the recorder receives additional events for states PENDING_CREATION, ALIVE, and DEAD, which do get merged and then sent to the aggregator.

In this case, the aggregator ends up with one event for DEPENDENCIES_UNREADY and another merged event for the remaining states.

To address this, we could consider performing merging on the aggregator side as well.
However, the same flush-interval issue also exists there, so merging at the aggregator alone may not fully solve the problem.

Copy link
Contributor Author

@can-anyscale can-anyscale left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also consider merging on the aggregator side?

We could yeah, though I think merging is only for perf (especially the part about converting protobuf to json on the aggregator side) and not a functional requirement, so it needs to be measured if it worths it

// events with the same resource ID into a single event.
// group the event in the buffer_ by their entity id and type; then for each group,
// merge the events into a single event. maintain the order of the events in the buffer.
absl::flat_hash_map<std::pair<std::string, rpc::events::RayEvent::EventType>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

totally

Comment on lines 61 to 73
absl::flat_hash_map<std::pair<std::string, rpc::events::RayEvent::EventType>,
std::vector<std::unique_ptr<RayEventInterface>>>
event_groups;
std::vector<std::pair<std::string, rpc::events::RayEvent::EventType>> group_order;

for (auto &event : buffer_) {
rpc::events::RayEvent ray_event = std::move(*event).Serialize();
auto key = std::make_pair(event->GetEntityId(), event->GetEventType());
auto [it, inserted] = event_groups.try_emplace(key);
if (inserted) {
group_order.push_back(key);
}
it->second.push_back(std::move(event));
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh that's nit, let me give it a try

Signed-off-by: Cuong Nguyen <can@anyscale.com>
@can-anyscale
Copy link
Contributor Author

@sampan-s-nayak's comments

Copy link
Contributor

@sampan-s-nayak sampan-s-nayak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! changes LGTM!

@can-anyscale can-anyscale merged commit a4261b5 into master Oct 30, 2025
6 checks passed
@can-anyscale can-anyscale deleted the can-tbuff01 branch October 30, 2025 17:02
YoussefEssDS pushed a commit to YoussefEssDS/ray that referenced this pull request Nov 8, 2025
…t#58070)

RayEvent provides a special API, merge, which allows multiple events to
be combined into a single event. This reduces gRPC message size, network
bandwidth usage, and is essential for scaling task event exports. This
PR leverages that feature.

Specifically, it clusters events into groups based on (i) entity ID and
(ii) event type. Each group is merged into a single event, which is then
added to the gRPC message body. The EntityId is a user-defined function,
implemented by the event class creator, that determines which events can
be safely merged.

```
Note: this is a redo of ray-project#56558 which gets converted because it randomize the order the events that get exported, lead to flaky tests etc. This attempt maintain the order even after merging.
```

Test:
- CI

Signed-off-by: Cuong Nguyen <can@anyscale.com>
landscapepainter pushed a commit to landscapepainter/ray that referenced this pull request Nov 17, 2025
…t#58070)

RayEvent provides a special API, merge, which allows multiple events to
be combined into a single event. This reduces gRPC message size, network
bandwidth usage, and is essential for scaling task event exports. This
PR leverages that feature.

Specifically, it clusters events into groups based on (i) entity ID and
(ii) event type. Each group is merged into a single event, which is then
added to the gRPC message body. The EntityId is a user-defined function,
implemented by the event class creator, that determines which events can
be safely merged.

```
Note: this is a redo of ray-project#56558 which gets converted because it randomize the order the events that get exported, lead to flaky tests etc. This attempt maintain the order even after merging.
```

Test:
- CI

Signed-off-by: Cuong Nguyen <can@anyscale.com>
Aydin-ab pushed a commit to Aydin-ab/ray-aydin that referenced this pull request Nov 19, 2025
…t#58070)

RayEvent provides a special API, merge, which allows multiple events to
be combined into a single event. This reduces gRPC message size, network
bandwidth usage, and is essential for scaling task event exports. This
PR leverages that feature.

Specifically, it clusters events into groups based on (i) entity ID and
(ii) event type. Each group is merged into a single event, which is then
added to the gRPC message body. The EntityId is a user-defined function,
implemented by the event class creator, that determines which events can
be safely merged.

```
Note: this is a redo of ray-project#56558 which gets converted because it randomize the order the events that get exported, lead to flaky tests etc. This attempt maintain the order even after merging.
```

Test:
- CI

Signed-off-by: Cuong Nguyen <can@anyscale.com>
Signed-off-by: Aydin Abiar <aydin@anyscale.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Issues that should be addressed in Ray Core go add ONLY when ready to merge, run all tests observability Issues related to the Ray Dashboard, Logging, Metrics, Tracing, and/or Profiling

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants