Skip to content

Commit 4b0a268

Browse files
sampan-s-nayaksampanjjyao
authored
[Core] handle optional values in ray_event_converter (#58083)
## Description in the original taskEvent proto, worker_id is marked as optional https://github.com/ray-project/ray/blob/830a456b9b558028853423c9042f7e2763ec5283/src/ray/protobuf/gcs.proto#L201 but in ray event it is not https://github.com/ray-project/ray/blob/f635de7c86d0d0f813a305a9fd5e864a64257894/src/ray/protobuf/public/events_task_lifecycle_event.proto#L42 in the converter we always set the worker_id field even if its an empty string https://github.com/ray-project/ray/blob/master/src/ray/gcs/gcs_ray_event_converter.cc#L145. if an optional field is set, even if it is empty(default proto value) it is considered as having a value, and during mergeFrom() calls the value is considered and overwrites the destination objects existing value. source:https://protobuf.dev/programming-guides/field_presence/ Explicitly set fields – including default values – are merged-from this pr fixes this gap in the conversion logic --------- Signed-off-by: sampan <sampan@anyscale.com> Co-authored-by: sampan <sampan@anyscale.com> Co-authored-by: Jiajun Yao <jeromeyjj@gmail.com>
1 parent e34b1de commit 4b0a268

File tree

2 files changed

+150
-5
lines changed

2 files changed

+150
-5
lines changed

src/ray/gcs/gcs_ray_event_converter.cc

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -141,10 +141,19 @@ rpc::TaskEvents ConvertToTaskEvents(rpc::events::TaskLifecycleEvent &&event) {
141141
task_event.set_job_id(event.job_id());
142142

143143
rpc::TaskStateUpdate *task_state_update = task_event.mutable_state_updates();
144-
task_state_update->set_node_id(event.node_id());
145-
task_state_update->set_worker_id(event.worker_id());
146-
task_state_update->set_worker_pid(event.worker_pid());
147-
*task_state_update->mutable_error_info() = std::move(*event.mutable_ray_error_info());
144+
if (!event.node_id().empty()) {
145+
task_state_update->set_node_id(event.node_id());
146+
}
147+
if (!event.worker_id().empty()) {
148+
task_state_update->set_worker_id(event.worker_id());
149+
}
150+
// worker pid can never be 0
151+
if (event.worker_pid() != 0) {
152+
task_state_update->set_worker_pid(event.worker_pid());
153+
}
154+
if (event.has_ray_error_info()) {
155+
*task_state_update->mutable_error_info() = std::move(*event.mutable_ray_error_info());
156+
}
148157

149158
for (const auto &state_transition : event.state_transitions()) {
150159
int64_t ns = ProtoTimestampToAbslTimeNanos(state_transition.timestamp());
@@ -193,7 +202,9 @@ rpc::TaskEvents ConvertToTaskEvents(rpc::events::TaskProfileEvents &&event) {
193202
task_event.set_attempt_number(event.attempt_number());
194203
task_event.set_job_id(event.job_id());
195204

196-
*task_event.mutable_profile_events() = std::move(*event.mutable_profile_events());
205+
if (event.has_profile_events()) {
206+
*task_event.mutable_profile_events() = std::move(*event.mutable_profile_events());
207+
}
197208
return task_event;
198209
}
199210

src/ray/gcs/tests/gcs_ray_event_converter_test.cc

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -411,5 +411,139 @@ TEST(GcsRayEventConverterTest, TestConvertActorTaskDefinitionEvent) {
411411
EXPECT_EQ(task_info.required_resources().at("GPU"), 1.0);
412412
}
413413

414+
// Parameterized test for optional fields in TaskLifecycleEvent.
415+
// Tests that optional fields are only set when they have non-empty values,
416+
// preventing issues where explicitly set empty fields overwrite existing values
417+
// during protobuf mergeFrom() operations.
418+
struct OptionalFieldTestCase {
419+
std::string test_name;
420+
std::string node_id;
421+
std::string worker_id;
422+
int32_t worker_pid;
423+
std::string error_message; // Empty string means no error_info should be set
424+
bool expect_node_id_set;
425+
bool expect_worker_id_set;
426+
bool expect_worker_pid_set;
427+
bool expect_error_info_set;
428+
};
429+
430+
class TaskLifecycleEventOptionalFieldsTest
431+
: public ::testing::TestWithParam<OptionalFieldTestCase> {};
432+
433+
TEST_P(TaskLifecycleEventOptionalFieldsTest, TestOptionalFieldPresence) {
434+
const auto &test_case = GetParam();
435+
436+
rpc::events::AddEventsRequest request;
437+
rpc::events::RayEvent &event = *request.mutable_events_data()->mutable_events()->Add();
438+
event.set_event_type(rpc::events::RayEvent::TASK_LIFECYCLE_EVENT);
439+
rpc::events::TaskLifecycleEvent &lifecycle_event =
440+
*event.mutable_task_lifecycle_event();
441+
442+
// Set basic required fields
443+
lifecycle_event.set_task_id("test_task_id");
444+
lifecycle_event.set_task_attempt(1);
445+
lifecycle_event.set_job_id("test_job_id");
446+
447+
// Set optional fields according to test case
448+
lifecycle_event.set_node_id(test_case.node_id);
449+
lifecycle_event.set_worker_id(test_case.worker_id);
450+
lifecycle_event.set_worker_pid(test_case.worker_pid);
451+
452+
// Set error_info if specified
453+
if (!test_case.error_message.empty()) {
454+
lifecycle_event.mutable_ray_error_info()->set_error_message(test_case.error_message);
455+
}
456+
457+
// Call the converter
458+
auto task_event_data_requests = ConvertToTaskEventDataRequests(std::move(request));
459+
ASSERT_EQ(task_event_data_requests.size(), 1);
460+
const rpc::TaskEvents &task_event =
461+
task_event_data_requests[0].data().events_by_task()[0];
462+
463+
// Verify that state_updates exists
464+
ASSERT_TRUE(task_event.has_state_updates());
465+
const auto &state_updates = task_event.state_updates();
466+
467+
// Verify field presence matches expectations
468+
EXPECT_EQ(state_updates.has_node_id(), test_case.expect_node_id_set)
469+
<< "node_id presence mismatch for test: " << test_case.test_name;
470+
if (test_case.expect_node_id_set) {
471+
EXPECT_EQ(state_updates.node_id(), test_case.node_id);
472+
}
473+
474+
EXPECT_EQ(state_updates.has_worker_id(), test_case.expect_worker_id_set)
475+
<< "worker_id presence mismatch for test: " << test_case.test_name;
476+
if (test_case.expect_worker_id_set) {
477+
EXPECT_EQ(state_updates.worker_id(), test_case.worker_id);
478+
}
479+
480+
EXPECT_EQ(state_updates.has_worker_pid(), test_case.expect_worker_pid_set)
481+
<< "worker_pid presence mismatch for test: " << test_case.test_name;
482+
if (test_case.expect_worker_pid_set) {
483+
EXPECT_EQ(state_updates.worker_pid(), test_case.worker_pid);
484+
}
485+
486+
EXPECT_EQ(state_updates.has_error_info(), test_case.expect_error_info_set)
487+
<< "error_info presence mismatch for test: " << test_case.test_name;
488+
if (test_case.expect_error_info_set) {
489+
EXPECT_EQ(state_updates.error_info().error_message(), test_case.error_message);
490+
}
491+
}
492+
493+
INSTANTIATE_TEST_SUITE_P(
494+
OptionalFields,
495+
TaskLifecycleEventOptionalFieldsTest,
496+
::testing::Values(
497+
// All fields empty - none should be set
498+
OptionalFieldTestCase{"AllEmpty", "", "", 0, "", false, false, false, false},
499+
// All fields non-empty - all should be set
500+
OptionalFieldTestCase{"AllNonEmpty",
501+
"test_node_id",
502+
"test_worker_id",
503+
1234,
504+
"Test error",
505+
true,
506+
true,
507+
true,
508+
true},
509+
// Mixed: node_id set, others empty
510+
OptionalFieldTestCase{
511+
"OnlyNodeId", "test_node_id", "", 0, "", true, false, false, false},
512+
// Mixed: worker_id set, others empty
513+
OptionalFieldTestCase{
514+
"OnlyWorkerId", "", "test_worker_id", 0, "", false, true, false, false},
515+
// Mixed: worker_pid set, others empty
516+
OptionalFieldTestCase{
517+
"OnlyWorkerPid", "", "", 5678, "", false, false, true, false},
518+
// Only error_info set, others empty
519+
OptionalFieldTestCase{
520+
"OnlyErrorInfo", "", "", 0, "Test error", false, false, false, true},
521+
// Mixed: node_id and worker_pid set, worker_id and error_info empty
522+
OptionalFieldTestCase{
523+
"NodeIdAndWorkerPid", "test_node_id", "", 9999, "", true, false, true, false},
524+
// Mixed: worker_id and worker_pid set, node_id and error_info empty
525+
OptionalFieldTestCase{"WorkerIdAndWorkerPid",
526+
"",
527+
"test_worker_id",
528+
4321,
529+
"",
530+
false,
531+
true,
532+
true,
533+
false},
534+
// Mixed: worker_id and error_info set, others empty
535+
OptionalFieldTestCase{"WorkerIdAndErrorInfo",
536+
"",
537+
"test_worker_id",
538+
0,
539+
"Worker error",
540+
false,
541+
true,
542+
false,
543+
true}),
544+
[](const ::testing::TestParamInfo<OptionalFieldTestCase> &info) {
545+
return info.param.test_name;
546+
});
547+
414548
} // namespace gcs
415549
} // namespace ray

0 commit comments

Comments
 (0)