From 9453c1741cdbacd8af0e525a845f68bd8e339185 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Tue, 23 May 2023 22:57:16 -0700 Subject: [PATCH] [Core] Guarantee the ordering of put ActorTaskSpecTable and ActorTable (#35683) In order to guarantee that we put ActorTaskSpecTable before ActorTable, we should put ActorTable inside the ActorTaskSpecTable put callback. Otherwise, Redis may receive ActorTable put before ActorTaskSpecTable put. If we crash in the middle, we may end up with actor data inside ActorTable but not ActorTaskSpecTable. Signed-off-by: Jiajun Yao --- src/ray/gcs/gcs_server/gcs_actor_manager.cc | 65 +++++++++++---------- 1 file changed, 34 insertions(+), 31 deletions(-) diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.cc b/src/ray/gcs/gcs_server/gcs_actor_manager.cc index ee328510ea82f..dad8d4ecdf6e7 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.cc @@ -591,38 +591,41 @@ Status GcsActorManager::RegisterActor(const ray::rpc::RegisterActorRequest &requ // The backend storage is supposed to be reliable, so the status must be ok. RAY_CHECK_OK(gcs_table_storage_->ActorTaskSpecTable().Put( - actor_id, request.task_spec(), [](const Status &status) {})); - RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put( - actor->GetActorID(), - *actor->GetMutableActorTableData(), - [this, actor](const Status &status) { - // The backend storage is supposed to be reliable, so the status must be ok. - RAY_CHECK_OK(status); - // If a creator dies before this callback is called, the actor could have been - // already destroyed. It is okay not to invoke a callback because we don't need - // to reply to the creator as it is already dead. - auto registered_actor_it = registered_actors_.find(actor->GetActorID()); - if (registered_actor_it == registered_actors_.end()) { - // NOTE(sang): This logic assumes that the ordering of backend call is - // guaranteed. It is currently true because we use a single TCP socket to call - // the default Redis backend. If ordering is not guaranteed, we should overwrite - // the actor state to DEAD to avoid race condition. - return; - } - RAY_CHECK_OK(gcs_publisher_->PublishActor( - actor->GetActorID(), actor->GetActorTableData(), nullptr)); - // Invoke all callbacks for all registration requests of this actor (duplicated - // requests are included) and remove all of them from - // actor_to_register_callbacks_. - // Reply to the owner to indicate that the actor has been registered. - auto iter = actor_to_register_callbacks_.find(actor->GetActorID()); - RAY_CHECK(iter != actor_to_register_callbacks_.end() && !iter->second.empty()); - auto callbacks = std::move(iter->second); - actor_to_register_callbacks_.erase(iter); - for (auto &callback : callbacks) { - callback(actor); - } + actor_id, request.task_spec(), [this, actor](const Status &status) { + RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put( + actor->GetActorID(), + *actor->GetMutableActorTableData(), + [this, actor](const Status &status) { + // The backend storage is supposed to be reliable, so the status must be ok. + RAY_CHECK_OK(status); + // If a creator dies before this callback is called, the actor could have + // been already destroyed. It is okay not to invoke a callback because we + // don't need to reply to the creator as it is already dead. + auto registered_actor_it = registered_actors_.find(actor->GetActorID()); + if (registered_actor_it == registered_actors_.end()) { + // NOTE(sang): This logic assumes that the ordering of backend call is + // guaranteed. It is currently true because we use a single TCP socket to + // call the default Redis backend. If ordering is not guaranteed, we + // should overwrite the actor state to DEAD to avoid race condition. + return; + } + RAY_CHECK_OK(gcs_publisher_->PublishActor( + actor->GetActorID(), actor->GetActorTableData(), nullptr)); + // Invoke all callbacks for all registration requests of this actor + // (duplicated requests are included) and remove all of them from + // actor_to_register_callbacks_. + // Reply to the owner to indicate that the actor has been registered. + auto iter = actor_to_register_callbacks_.find(actor->GetActorID()); + RAY_CHECK(iter != actor_to_register_callbacks_.end() && + !iter->second.empty()); + auto callbacks = std::move(iter->second); + actor_to_register_callbacks_.erase(iter); + for (auto &callback : callbacks) { + callback(actor); + } + })); })); + return Status::OK(); }