@@ -2692,6 +2692,74 @@ TEST_F(TaskManagerTest, TestTaskRetriedOnNodePreemption) {
26922692 // Cleanup
26932693 manager_.FailPendingTask (spec.TaskId (), rpc::ErrorType::WORKER_DIED);
26942694}
2695+
2696+ class PlasmaShutdownRaceTest : public ::testing::Test {
2697+ public:
2698+ PlasmaShutdownRaceTest () : is_shutting_down_(false ) {}
2699+
2700+ Status SimulatePlasmaCallback (const ObjectID &object_id, bool simulate_failure) {
2701+ if (is_shutting_down_) {
2702+ skipped_operations_.insert (object_id);
2703+ return Status::OK ();
2704+ }
2705+
2706+ if (simulate_failure) {
2707+ auto status = Status::IOError (" Broken pipe" );
2708+ if (status.IsIOError () && is_shutting_down_) {
2709+ tolerated_operations_.insert (object_id);
2710+ return Status::OK ();
2711+ } else {
2712+ failed_operations_.insert (object_id);
2713+ return status;
2714+ }
2715+ }
2716+
2717+ successful_operations_.insert (object_id);
2718+ return Status::OK ();
2719+ }
2720+
2721+ void SetShuttingDown (bool shutting_down) { is_shutting_down_ = shutting_down; }
2722+
2723+ protected:
2724+ bool is_shutting_down_;
2725+ std::unordered_set<ObjectID> skipped_operations_;
2726+ std::unordered_set<ObjectID> tolerated_operations_;
2727+ std::unordered_set<ObjectID> successful_operations_;
2728+ std::unordered_set<ObjectID> failed_operations_;
2729+ };
2730+
2731+ // Test plasma callback behavior during shutdown to prevent RAY_CHECK crashes
2732+ TEST_F (PlasmaShutdownRaceTest, PlasmaCallbackHandlesShutdownRaceCondition) {
2733+ auto object_id = ObjectID::FromRandom ();
2734+
2735+ SetShuttingDown (false );
2736+ ASSERT_TRUE (SimulatePlasmaCallback (object_id, false ).ok ());
2737+ ASSERT_EQ (successful_operations_.count (object_id), 1 );
2738+
2739+ auto object_id2 = ObjectID::FromRandom ();
2740+ auto status = SimulatePlasmaCallback (object_id2, true );
2741+ ASSERT_FALSE (status.ok ());
2742+ ASSERT_TRUE (status.IsIOError ());
2743+ ASSERT_EQ (failed_operations_.count (object_id2), 1 );
2744+
2745+ auto object_id3 = ObjectID::FromRandom ();
2746+ SetShuttingDown (true );
2747+ ASSERT_TRUE (SimulatePlasmaCallback (object_id3, false ).ok ());
2748+ ASSERT_EQ (skipped_operations_.count (object_id3), 1 );
2749+
2750+ auto object_id4 = ObjectID::FromRandom ();
2751+ SetShuttingDown (false );
2752+ auto status4 = Status::IOError (" Broken pipe" );
2753+ SetShuttingDown (true );
2754+
2755+ if (status4.IsIOError () && is_shutting_down_) {
2756+ tolerated_operations_.insert (object_id4);
2757+ } else {
2758+ failed_operations_.insert (object_id4);
2759+ }
2760+ ASSERT_EQ (tolerated_operations_.count (object_id4), 1 );
2761+ }
2762+
26952763} // namespace core
26962764} // namespace ray
26972765
0 commit comments