Skip to content

Commit 5aca526

Browse files
MengjinYanelliot-barn
authored andcommitted
[Core] Make Preserving Proto Naming During Event JSON Conversion Configurable (#57705)
Recently, when we ran performance tests with task event generation turned on. We saw some performance regression when the workloads ran on very small CPU machines. With further investigation, the overhead mainly comes from the name format convention when converting the proto message to JSON format payload in the aggregator agent. This PR adds an env var for the config to control the name conversion behavior and update the corresponding tests. Also note that, eventually we are planning to remove this config turn off the field name conversion by default after migrated all the current event usage. --------- Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com> Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
1 parent dcd4336 commit 5aca526

File tree

3 files changed

+474
-210
lines changed

3 files changed

+474
-210
lines changed

python/ray/dashboard/modules/aggregator/aggregator_agent.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,12 @@
6666
PUBLISH_EVENTS_TO_EXTERNAL_HTTP_SERVICE = ray_constants.env_bool(
6767
f"{env_var_prefix}_PUBLISH_EVENTS_TO_EXTERNAL_HTTP_SERVICE", True
6868
)
69+
# flag to control whether preserve the proto field name when converting the events to
70+
# JSON. If True, the proto field name will be preserved. If False, the proto field name
71+
# will be converted to camel case.
72+
PRESERVE_PROTO_FIELD_NAME = ray_constants.env_bool(
73+
f"{env_var_prefix}_PRESERVE_PROTO_FIELD_NAME", False
74+
)
6975

7076

7177
class AggregatorAgent(
@@ -124,6 +130,7 @@ def __init__(self, dashboard_agent) -> None:
124130
endpoint=self._events_export_addr,
125131
executor=self._executor,
126132
events_filter_fn=self._can_expose_event,
133+
preserve_proto_field_name=PRESERVE_PROTO_FIELD_NAME,
127134
),
128135
event_buffer=self._event_buffer,
129136
common_metric_tags=self._common_tags,

python/ray/dashboard/modules/aggregator/publisher/async_publisher_client.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,14 @@ def __init__(
6666
executor: ThreadPoolExecutor,
6767
events_filter_fn: Callable[[object], bool],
6868
timeout: float = PUBLISHER_TIMEOUT_SECONDS,
69+
preserve_proto_field_name: bool = False,
6970
) -> None:
7071
self._endpoint = endpoint
7172
self._executor = executor
7273
self._events_filter_fn = events_filter_fn
7374
self._timeout = aiohttp.ClientTimeout(total=timeout)
7475
self._session = None
76+
self._preserve_proto_field_name = preserve_proto_field_name
7577

7678
async def publish(self, batch: PublishBatch) -> PublishStats:
7779
events_batch: list[events_base_event_pb2.RayEvent] = batch.events
@@ -89,7 +91,11 @@ async def publish(self, batch: PublishBatch) -> PublishStats:
8991
self._executor,
9092
lambda: [
9193
json.loads(
92-
message_to_json(e, always_print_fields_with_no_presence=True)
94+
message_to_json(
95+
e,
96+
always_print_fields_with_no_presence=True,
97+
preserving_proto_field_name=self._preserve_proto_field_name,
98+
)
9399
)
94100
for e in filtered
95101
],

0 commit comments

Comments
 (0)