From 56747c60c50d3c5d2d3b79887f855bb18e22f099 Mon Sep 17 00:00:00 2001 From: sampan Date: Fri, 24 Oct 2025 08:38:24 +0000 Subject: [PATCH 1/4] [Core] handle optional values in ray_event_converter Signed-off-by: sampan --- src/ray/gcs/gcs_ray_event_converter.cc | 17 +- .../gcs/tests/gcs_ray_event_converter_test.cc | 148 ++++++++++++++++++ 2 files changed, 161 insertions(+), 4 deletions(-) diff --git a/src/ray/gcs/gcs_ray_event_converter.cc b/src/ray/gcs/gcs_ray_event_converter.cc index 1eeaa90039b1..b8b7e4117d55 100644 --- a/src/ray/gcs/gcs_ray_event_converter.cc +++ b/src/ray/gcs/gcs_ray_event_converter.cc @@ -141,10 +141,19 @@ rpc::TaskEvents ConvertToTaskEvents(rpc::events::TaskLifecycleEvent &&event) { task_event.set_job_id(event.job_id()); rpc::TaskStateUpdate *task_state_update = task_event.mutable_state_updates(); - task_state_update->set_node_id(event.node_id()); - task_state_update->set_worker_id(event.worker_id()); - task_state_update->set_worker_pid(event.worker_pid()); - *task_state_update->mutable_error_info() = std::move(*event.mutable_ray_error_info()); + if (!event.node_id().empty()) { + task_state_update->set_node_id(event.node_id()); + } + if (!event.worker_id().empty()) { + task_state_update->set_worker_id(event.worker_id()); + } + // worker pid can never be 0 + if (event.worker_pid() != 0) { + task_state_update->set_worker_pid(event.worker_pid()); + } + if (event.has_ray_error_info()) { + *task_state_update->mutable_error_info() = std::move(*event.mutable_ray_error_info()); + } for (const auto &state_transition : event.state_transitions()) { int64_t ns = ProtoTimestampToAbslTimeNanos(state_transition.timestamp()); diff --git a/src/ray/gcs/tests/gcs_ray_event_converter_test.cc b/src/ray/gcs/tests/gcs_ray_event_converter_test.cc index 2656013071ef..34c82a47d6db 100644 --- a/src/ray/gcs/tests/gcs_ray_event_converter_test.cc +++ b/src/ray/gcs/tests/gcs_ray_event_converter_test.cc @@ -411,5 +411,153 @@ TEST(GcsRayEventConverterTest, TestConvertActorTaskDefinitionEvent) { EXPECT_EQ(task_info.required_resources().at("GPU"), 1.0); } +// Parameterized test for optional fields in TaskLifecycleEvent. +// Tests that optional fields are only set when they have non-empty values, +// preventing issues where explicitly set empty fields overwrite existing values +// during protobuf mergeFrom() operations. +struct OptionalFieldTestCase { + std::string test_name; + std::string node_id; + std::string worker_id; + int32_t worker_pid; + bool expect_node_id_set; + bool expect_worker_id_set; + bool expect_worker_pid_set; +}; + +class TaskLifecycleEventOptionalFieldsTest + : public ::testing::TestWithParam {}; + +TEST_P(TaskLifecycleEventOptionalFieldsTest, TestOptionalFieldPresence) { + const auto &test_case = GetParam(); + + rpc::events::AddEventsRequest request; + rpc::events::RayEvent &event = *request.mutable_events_data()->mutable_events()->Add(); + event.set_event_type(rpc::events::RayEvent::TASK_LIFECYCLE_EVENT); + rpc::events::TaskLifecycleEvent &lifecycle_event = + *event.mutable_task_lifecycle_event(); + + // Set basic required fields + lifecycle_event.set_task_id("test_task_id"); + lifecycle_event.set_task_attempt(1); + lifecycle_event.set_job_id("test_job_id"); + + // Set optional fields according to test case + lifecycle_event.set_node_id(test_case.node_id); + lifecycle_event.set_worker_id(test_case.worker_id); + lifecycle_event.set_worker_pid(test_case.worker_pid); + + // Call the converter + auto task_event_data_requests = ConvertToTaskEventDataRequests(std::move(request)); + ASSERT_EQ(task_event_data_requests.size(), 1); + const rpc::TaskEvents &task_event = + task_event_data_requests[0].data().events_by_task()[0]; + + // Verify that state_updates exists + ASSERT_TRUE(task_event.has_state_updates()); + const auto &state_updates = task_event.state_updates(); + + // Verify field presence matches expectations + EXPECT_EQ(state_updates.has_node_id(), test_case.expect_node_id_set) + << "node_id presence mismatch for test: " << test_case.test_name; + if (test_case.expect_node_id_set) { + EXPECT_EQ(state_updates.node_id(), test_case.node_id); + } + + EXPECT_EQ(state_updates.has_worker_id(), test_case.expect_worker_id_set) + << "worker_id presence mismatch for test: " << test_case.test_name; + if (test_case.expect_worker_id_set) { + EXPECT_EQ(state_updates.worker_id(), test_case.worker_id); + } + + EXPECT_EQ(state_updates.has_worker_pid(), test_case.expect_worker_pid_set) + << "worker_pid presence mismatch for test: " << test_case.test_name; + if (test_case.expect_worker_pid_set) { + EXPECT_EQ(state_updates.worker_pid(), test_case.worker_pid); + } +} + +INSTANTIATE_TEST_SUITE_P( + OptionalFields, + TaskLifecycleEventOptionalFieldsTest, + ::testing::Values( + // All fields empty - none should be set + OptionalFieldTestCase{ + "AllEmpty", "", "", 0, false, false, false}, + // All fields non-empty - all should be set + OptionalFieldTestCase{ + "AllNonEmpty", "test_node_id", "test_worker_id", 1234, true, true, true}, + // Mixed: node_id set, others empty + OptionalFieldTestCase{ + "OnlyNodeId", "test_node_id", "", 0, true, false, false}, + // Mixed: worker_id set, others empty + OptionalFieldTestCase{ + "OnlyWorkerId", "", "test_worker_id", 0, false, true, false}, + // Mixed: worker_pid set, others empty + OptionalFieldTestCase{"OnlyWorkerPid", "", "", 5678, false, false, true}, + // Mixed: node_id and worker_pid set, worker_id empty + OptionalFieldTestCase{ + "NodeIdAndWorkerPid", "test_node_id", "", 9999, true, false, true}, + // Mixed: worker_id and worker_pid set, node_id empty + OptionalFieldTestCase{ + "WorkerIdAndWorkerPid", "", "test_worker_id", 4321, false, true, true}), + [](const ::testing::TestParamInfo &info) { + return info.param.test_name; + }); + +// Test that ray_error_info is only set when it has actual content +TEST(GcsRayEventConverterTest, TestTaskLifecycleEventErrorInfoOnlySetWhenPresent) { + // Test case 1: Event without error info + { + rpc::events::AddEventsRequest request; + rpc::events::RayEvent &event = + *request.mutable_events_data()->mutable_events()->Add(); + event.set_event_type(rpc::events::RayEvent::TASK_LIFECYCLE_EVENT); + rpc::events::TaskLifecycleEvent &lifecycle_event = + *event.mutable_task_lifecycle_event(); + + lifecycle_event.set_task_id("test_task_id"); + lifecycle_event.set_task_attempt(1); + lifecycle_event.set_job_id("test_job_id"); + // Don't set ray_error_info + + auto task_event_data_requests = ConvertToTaskEventDataRequests(std::move(request)); + const rpc::TaskEvents &task_event = + task_event_data_requests[0].data().events_by_task()[0]; + + ASSERT_TRUE(task_event.has_state_updates()); + const auto &state_updates = task_event.state_updates(); + + // error_info should NOT be set when not present in the source event + EXPECT_FALSE(state_updates.has_error_info()); + } + + // Test case 2: Event with error info + { + rpc::events::AddEventsRequest request; + rpc::events::RayEvent &event = + *request.mutable_events_data()->mutable_events()->Add(); + event.set_event_type(rpc::events::RayEvent::TASK_LIFECYCLE_EVENT); + rpc::events::TaskLifecycleEvent &lifecycle_event = + *event.mutable_task_lifecycle_event(); + + lifecycle_event.set_task_id("test_task_id"); + lifecycle_event.set_task_attempt(1); + lifecycle_event.set_job_id("test_job_id"); + lifecycle_event.mutable_ray_error_info()->set_error_message("Test error"); + + auto task_event_data_requests = ConvertToTaskEventDataRequests(std::move(request)); + const rpc::TaskEvents &task_event = + task_event_data_requests[0].data().events_by_task()[0]; + + ASSERT_TRUE(task_event.has_state_updates()); + const auto &state_updates = task_event.state_updates(); + + // error_info should be set when present in the source event + EXPECT_TRUE(state_updates.has_error_info()); + EXPECT_EQ(state_updates.error_info().error_message(), "Test error"); + } +} + } // namespace gcs } // namespace ray From e2bf5ab73d378e7a1bda26aac4d81e92c55ab7a1 Mon Sep 17 00:00:00 2001 From: sampan Date: Fri, 24 Oct 2025 08:38:41 +0000 Subject: [PATCH 2/4] lint Signed-off-by: sampan --- src/ray/gcs/tests/gcs_ray_event_converter_test.cc | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/ray/gcs/tests/gcs_ray_event_converter_test.cc b/src/ray/gcs/tests/gcs_ray_event_converter_test.cc index 34c82a47d6db..0567e122ade3 100644 --- a/src/ray/gcs/tests/gcs_ray_event_converter_test.cc +++ b/src/ray/gcs/tests/gcs_ray_event_converter_test.cc @@ -482,14 +482,12 @@ INSTANTIATE_TEST_SUITE_P( TaskLifecycleEventOptionalFieldsTest, ::testing::Values( // All fields empty - none should be set - OptionalFieldTestCase{ - "AllEmpty", "", "", 0, false, false, false}, + OptionalFieldTestCase{"AllEmpty", "", "", 0, false, false, false}, // All fields non-empty - all should be set OptionalFieldTestCase{ "AllNonEmpty", "test_node_id", "test_worker_id", 1234, true, true, true}, // Mixed: node_id set, others empty - OptionalFieldTestCase{ - "OnlyNodeId", "test_node_id", "", 0, true, false, false}, + OptionalFieldTestCase{"OnlyNodeId", "test_node_id", "", 0, true, false, false}, // Mixed: worker_id set, others empty OptionalFieldTestCase{ "OnlyWorkerId", "", "test_worker_id", 0, false, true, false}, From acc44b523ed0bb40c556278b4b925b9183c56d02 Mon Sep 17 00:00:00 2001 From: sampan Date: Fri, 24 Oct 2025 08:48:29 +0000 Subject: [PATCH 3/4] reduce duplication Signed-off-by: sampan --- .../gcs/tests/gcs_ray_event_converter_test.cc | 116 ++++++++---------- 1 file changed, 52 insertions(+), 64 deletions(-) diff --git a/src/ray/gcs/tests/gcs_ray_event_converter_test.cc b/src/ray/gcs/tests/gcs_ray_event_converter_test.cc index 0567e122ade3..454faa6dc91b 100644 --- a/src/ray/gcs/tests/gcs_ray_event_converter_test.cc +++ b/src/ray/gcs/tests/gcs_ray_event_converter_test.cc @@ -420,9 +420,11 @@ struct OptionalFieldTestCase { std::string node_id; std::string worker_id; int32_t worker_pid; + std::string error_message; // Empty string means no error_info should be set bool expect_node_id_set; bool expect_worker_id_set; bool expect_worker_pid_set; + bool expect_error_info_set; }; class TaskLifecycleEventOptionalFieldsTest @@ -447,6 +449,11 @@ TEST_P(TaskLifecycleEventOptionalFieldsTest, TestOptionalFieldPresence) { lifecycle_event.set_worker_id(test_case.worker_id); lifecycle_event.set_worker_pid(test_case.worker_pid); + // Set error_info if specified + if (!test_case.error_message.empty()) { + lifecycle_event.mutable_ray_error_info()->set_error_message(test_case.error_message); + } + // Call the converter auto task_event_data_requests = ConvertToTaskEventDataRequests(std::move(request)); ASSERT_EQ(task_event_data_requests.size(), 1); @@ -475,6 +482,12 @@ TEST_P(TaskLifecycleEventOptionalFieldsTest, TestOptionalFieldPresence) { if (test_case.expect_worker_pid_set) { EXPECT_EQ(state_updates.worker_pid(), test_case.worker_pid); } + + EXPECT_EQ(state_updates.has_error_info(), test_case.expect_error_info_set) + << "error_info presence mismatch for test: " << test_case.test_name; + if (test_case.expect_error_info_set) { + EXPECT_EQ(state_updates.error_info().error_message(), test_case.error_message); + } } INSTANTIATE_TEST_SUITE_P( @@ -482,80 +495,55 @@ INSTANTIATE_TEST_SUITE_P( TaskLifecycleEventOptionalFieldsTest, ::testing::Values( // All fields empty - none should be set - OptionalFieldTestCase{"AllEmpty", "", "", 0, false, false, false}, + OptionalFieldTestCase{"AllEmpty", "", "", 0, "", false, false, false, false}, // All fields non-empty - all should be set - OptionalFieldTestCase{ - "AllNonEmpty", "test_node_id", "test_worker_id", 1234, true, true, true}, + OptionalFieldTestCase{"AllNonEmpty", + "test_node_id", + "test_worker_id", + 1234, + "Test error", + true, + true, + true, + true}, // Mixed: node_id set, others empty - OptionalFieldTestCase{"OnlyNodeId", "test_node_id", "", 0, true, false, false}, + OptionalFieldTestCase{ + "OnlyNodeId", "test_node_id", "", 0, "", true, false, false, false}, // Mixed: worker_id set, others empty OptionalFieldTestCase{ - "OnlyWorkerId", "", "test_worker_id", 0, false, true, false}, + "OnlyWorkerId", "", "test_worker_id", 0, "", false, true, false, false}, // Mixed: worker_pid set, others empty - OptionalFieldTestCase{"OnlyWorkerPid", "", "", 5678, false, false, true}, - // Mixed: node_id and worker_pid set, worker_id empty OptionalFieldTestCase{ - "NodeIdAndWorkerPid", "test_node_id", "", 9999, true, false, true}, - // Mixed: worker_id and worker_pid set, node_id empty + "OnlyWorkerPid", "", "", 5678, "", false, false, true, false}, + // Only error_info set, others empty OptionalFieldTestCase{ - "WorkerIdAndWorkerPid", "", "test_worker_id", 4321, false, true, true}), + "OnlyErrorInfo", "", "", 0, "Test error", false, false, false, true}, + // Mixed: node_id and worker_pid set, worker_id and error_info empty + OptionalFieldTestCase{ + "NodeIdAndWorkerPid", "test_node_id", "", 9999, "", true, false, true, false}, + // Mixed: worker_id and worker_pid set, node_id and error_info empty + OptionalFieldTestCase{"WorkerIdAndWorkerPid", + "", + "test_worker_id", + 4321, + "", + false, + true, + true, + false}, + // Mixed: worker_id and error_info set, others empty + OptionalFieldTestCase{"WorkerIdAndErrorInfo", + "", + "test_worker_id", + 0, + "Worker error", + false, + true, + false, + true}), [](const ::testing::TestParamInfo &info) { return info.param.test_name; }); -// Test that ray_error_info is only set when it has actual content -TEST(GcsRayEventConverterTest, TestTaskLifecycleEventErrorInfoOnlySetWhenPresent) { - // Test case 1: Event without error info - { - rpc::events::AddEventsRequest request; - rpc::events::RayEvent &event = - *request.mutable_events_data()->mutable_events()->Add(); - event.set_event_type(rpc::events::RayEvent::TASK_LIFECYCLE_EVENT); - rpc::events::TaskLifecycleEvent &lifecycle_event = - *event.mutable_task_lifecycle_event(); - - lifecycle_event.set_task_id("test_task_id"); - lifecycle_event.set_task_attempt(1); - lifecycle_event.set_job_id("test_job_id"); - // Don't set ray_error_info - - auto task_event_data_requests = ConvertToTaskEventDataRequests(std::move(request)); - const rpc::TaskEvents &task_event = - task_event_data_requests[0].data().events_by_task()[0]; - - ASSERT_TRUE(task_event.has_state_updates()); - const auto &state_updates = task_event.state_updates(); - - // error_info should NOT be set when not present in the source event - EXPECT_FALSE(state_updates.has_error_info()); - } - - // Test case 2: Event with error info - { - rpc::events::AddEventsRequest request; - rpc::events::RayEvent &event = - *request.mutable_events_data()->mutable_events()->Add(); - event.set_event_type(rpc::events::RayEvent::TASK_LIFECYCLE_EVENT); - rpc::events::TaskLifecycleEvent &lifecycle_event = - *event.mutable_task_lifecycle_event(); - - lifecycle_event.set_task_id("test_task_id"); - lifecycle_event.set_task_attempt(1); - lifecycle_event.set_job_id("test_job_id"); - lifecycle_event.mutable_ray_error_info()->set_error_message("Test error"); - - auto task_event_data_requests = ConvertToTaskEventDataRequests(std::move(request)); - const rpc::TaskEvents &task_event = - task_event_data_requests[0].data().events_by_task()[0]; - - ASSERT_TRUE(task_event.has_state_updates()); - const auto &state_updates = task_event.state_updates(); - - // error_info should be set when present in the source event - EXPECT_TRUE(state_updates.has_error_info()); - EXPECT_EQ(state_updates.error_info().error_message(), "Test error"); - } -} - } // namespace gcs } // namespace ray From de23e19662cd60963cde1d17bad6900b60fae80d Mon Sep 17 00:00:00 2001 From: sampan Date: Sun, 26 Oct 2025 07:05:52 +0000 Subject: [PATCH 4/4] address comment Signed-off-by: sampan --- src/ray/gcs/gcs_ray_event_converter.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/ray/gcs/gcs_ray_event_converter.cc b/src/ray/gcs/gcs_ray_event_converter.cc index b8b7e4117d55..2f8e90b4ae98 100644 --- a/src/ray/gcs/gcs_ray_event_converter.cc +++ b/src/ray/gcs/gcs_ray_event_converter.cc @@ -202,7 +202,9 @@ rpc::TaskEvents ConvertToTaskEvents(rpc::events::TaskProfileEvents &&event) { task_event.set_attempt_number(event.attempt_number()); task_event.set_job_id(event.job_id()); - *task_event.mutable_profile_events() = std::move(*event.mutable_profile_events()); + if (event.has_profile_events()) { + *task_event.mutable_profile_events() = std::move(*event.mutable_profile_events()); + } return task_event; }