Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions src/ray/gcs/gcs_ray_event_converter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,19 @@ rpc::TaskEvents ConvertToTaskEvents(rpc::events::TaskLifecycleEvent &&event) {
task_event.set_job_id(event.job_id());

rpc::TaskStateUpdate *task_state_update = task_event.mutable_state_updates();
task_state_update->set_node_id(event.node_id());
task_state_update->set_worker_id(event.worker_id());
task_state_update->set_worker_pid(event.worker_pid());
*task_state_update->mutable_error_info() = std::move(*event.mutable_ray_error_info());
if (!event.node_id().empty()) {
task_state_update->set_node_id(event.node_id());
}
if (!event.worker_id().empty()) {
task_state_update->set_worker_id(event.worker_id());
}
// worker pid can never be 0
if (event.worker_pid() != 0) {
task_state_update->set_worker_pid(event.worker_pid());
}
if (event.has_ray_error_info()) {
*task_state_update->mutable_error_info() = std::move(*event.mutable_ray_error_info());
}

for (const auto &state_transition : event.state_transitions()) {
int64_t ns = ProtoTimestampToAbslTimeNanos(state_transition.timestamp());
Expand Down Expand Up @@ -193,7 +202,9 @@ rpc::TaskEvents ConvertToTaskEvents(rpc::events::TaskProfileEvents &&event) {
task_event.set_attempt_number(event.attempt_number());
task_event.set_job_id(event.job_id());

*task_event.mutable_profile_events() = std::move(*event.mutable_profile_events());
if (event.has_profile_events()) {
*task_event.mutable_profile_events() = std::move(*event.mutable_profile_events());
}
return task_event;
}

Expand Down
134 changes: 134 additions & 0 deletions src/ray/gcs/tests/gcs_ray_event_converter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -411,5 +411,139 @@ TEST(GcsRayEventConverterTest, TestConvertActorTaskDefinitionEvent) {
EXPECT_EQ(task_info.required_resources().at("GPU"), 1.0);
}

// Parameterized test for optional fields in TaskLifecycleEvent.
// Tests that optional fields are only set when they have non-empty values,
// preventing issues where explicitly set empty fields overwrite existing values
// during protobuf mergeFrom() operations.
struct OptionalFieldTestCase {
std::string test_name;
std::string node_id;
std::string worker_id;
int32_t worker_pid;
std::string error_message; // Empty string means no error_info should be set
bool expect_node_id_set;
bool expect_worker_id_set;
bool expect_worker_pid_set;
bool expect_error_info_set;
};

class TaskLifecycleEventOptionalFieldsTest
: public ::testing::TestWithParam<OptionalFieldTestCase> {};

TEST_P(TaskLifecycleEventOptionalFieldsTest, TestOptionalFieldPresence) {
const auto &test_case = GetParam();

rpc::events::AddEventsRequest request;
rpc::events::RayEvent &event = *request.mutable_events_data()->mutable_events()->Add();
event.set_event_type(rpc::events::RayEvent::TASK_LIFECYCLE_EVENT);
rpc::events::TaskLifecycleEvent &lifecycle_event =
*event.mutable_task_lifecycle_event();

// Set basic required fields
lifecycle_event.set_task_id("test_task_id");
lifecycle_event.set_task_attempt(1);
lifecycle_event.set_job_id("test_job_id");

// Set optional fields according to test case
lifecycle_event.set_node_id(test_case.node_id);
lifecycle_event.set_worker_id(test_case.worker_id);
lifecycle_event.set_worker_pid(test_case.worker_pid);

// Set error_info if specified
if (!test_case.error_message.empty()) {
lifecycle_event.mutable_ray_error_info()->set_error_message(test_case.error_message);
}

// Call the converter
auto task_event_data_requests = ConvertToTaskEventDataRequests(std::move(request));
ASSERT_EQ(task_event_data_requests.size(), 1);
const rpc::TaskEvents &task_event =
task_event_data_requests[0].data().events_by_task()[0];

// Verify that state_updates exists
ASSERT_TRUE(task_event.has_state_updates());
const auto &state_updates = task_event.state_updates();

// Verify field presence matches expectations
EXPECT_EQ(state_updates.has_node_id(), test_case.expect_node_id_set)
<< "node_id presence mismatch for test: " << test_case.test_name;
if (test_case.expect_node_id_set) {
EXPECT_EQ(state_updates.node_id(), test_case.node_id);
}

EXPECT_EQ(state_updates.has_worker_id(), test_case.expect_worker_id_set)
<< "worker_id presence mismatch for test: " << test_case.test_name;
if (test_case.expect_worker_id_set) {
EXPECT_EQ(state_updates.worker_id(), test_case.worker_id);
}

EXPECT_EQ(state_updates.has_worker_pid(), test_case.expect_worker_pid_set)
<< "worker_pid presence mismatch for test: " << test_case.test_name;
if (test_case.expect_worker_pid_set) {
EXPECT_EQ(state_updates.worker_pid(), test_case.worker_pid);
}

EXPECT_EQ(state_updates.has_error_info(), test_case.expect_error_info_set)
<< "error_info presence mismatch for test: " << test_case.test_name;
if (test_case.expect_error_info_set) {
EXPECT_EQ(state_updates.error_info().error_message(), test_case.error_message);
}
}

INSTANTIATE_TEST_SUITE_P(
OptionalFields,
TaskLifecycleEventOptionalFieldsTest,
::testing::Values(
// All fields empty - none should be set
OptionalFieldTestCase{"AllEmpty", "", "", 0, "", false, false, false, false},
// All fields non-empty - all should be set
OptionalFieldTestCase{"AllNonEmpty",
"test_node_id",
"test_worker_id",
1234,
"Test error",
true,
true,
true,
true},
// Mixed: node_id set, others empty
OptionalFieldTestCase{
"OnlyNodeId", "test_node_id", "", 0, "", true, false, false, false},
// Mixed: worker_id set, others empty
OptionalFieldTestCase{
"OnlyWorkerId", "", "test_worker_id", 0, "", false, true, false, false},
// Mixed: worker_pid set, others empty
OptionalFieldTestCase{
"OnlyWorkerPid", "", "", 5678, "", false, false, true, false},
// Only error_info set, others empty
OptionalFieldTestCase{
"OnlyErrorInfo", "", "", 0, "Test error", false, false, false, true},
// Mixed: node_id and worker_pid set, worker_id and error_info empty
OptionalFieldTestCase{
"NodeIdAndWorkerPid", "test_node_id", "", 9999, "", true, false, true, false},
// Mixed: worker_id and worker_pid set, node_id and error_info empty
OptionalFieldTestCase{"WorkerIdAndWorkerPid",
"",
"test_worker_id",
4321,
"",
false,
true,
true,
false},
// Mixed: worker_id and error_info set, others empty
OptionalFieldTestCase{"WorkerIdAndErrorInfo",
"",
"test_worker_id",
0,
"Worker error",
false,
true,
false,
true}),
[](const ::testing::TestParamInfo<OptionalFieldTestCase> &info) {
return info.param.test_name;
});

} // namespace gcs
} // namespace ray