Skip to content

Commit c1329f6

Browse files
dayshahelliot-barn
authored andcommitted
[core] Don't log actor restart warning if arg is detached actor (#57931)
Signed-off-by: dayshah <dhyey2019@gmail.com> Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
1 parent 09ef6e7 commit c1329f6

File tree

6 files changed

+55
-6
lines changed

6 files changed

+55
-6
lines changed

src/ray/core_worker/actor_handle.cc

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ rpc::ActorHandle CreateInnerActorHandle(
3737
bool allow_out_of_order_execution,
3838
bool enable_tensor_transport,
3939
std::optional<bool> enable_task_events,
40-
const std::unordered_map<std::string, std::string> &labels) {
40+
const std::unordered_map<std::string, std::string> &labels,
41+
bool is_detached) {
4142
rpc::ActorHandle inner;
4243
inner.set_actor_id(actor_id.Data(), actor_id.Size());
4344
inner.set_owner_id(owner_id.Binary());
@@ -56,6 +57,7 @@ rpc::ActorHandle CreateInnerActorHandle(
5657
inner.set_enable_tensor_transport(enable_tensor_transport);
5758
inner.set_enable_task_events(enable_task_events.value_or(kDefaultTaskEventEnabled));
5859
inner.mutable_labels()->insert(labels.begin(), labels.end());
60+
inner.set_is_detached(is_detached);
5961
return inner;
6062
}
6163

@@ -89,6 +91,7 @@ rpc::ActorHandle CreateInnerActorHandleFromActorData(
8991
task_spec.actor_creation_task_spec().allow_out_of_order_execution());
9092
inner.set_max_pending_calls(task_spec.actor_creation_task_spec().max_pending_calls());
9193
inner.mutable_labels()->insert(task_spec.labels().begin(), task_spec.labels().end());
94+
inner.set_is_detached(task_spec.actor_creation_task_spec().is_detached());
9295
return inner;
9396
}
9497
} // namespace
@@ -109,7 +112,8 @@ ActorHandle::ActorHandle(
109112
bool allow_out_of_order_execution,
110113
bool enable_tensor_transport,
111114
std::optional<bool> enable_task_events,
112-
const std::unordered_map<std::string, std::string> &labels)
115+
const std::unordered_map<std::string, std::string> &labels,
116+
bool is_detached)
113117
: ActorHandle(CreateInnerActorHandle(actor_id,
114118
owner_id,
115119
owner_address,
@@ -125,7 +129,8 @@ ActorHandle::ActorHandle(
125129
allow_out_of_order_execution,
126130
enable_tensor_transport,
127131
enable_task_events,
128-
labels)) {}
132+
labels,
133+
is_detached)) {}
129134

130135
ActorHandle::ActorHandle(const std::string &serialized)
131136
: ActorHandle(CreateInnerActorHandleFromString(serialized)) {}

src/ray/core_worker/actor_handle.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ class ActorHandle {
5151
bool allow_out_of_order_execution = false,
5252
bool enable_tensor_transport = false,
5353
std::optional<bool> enable_task_events = absl::nullopt,
54-
const std::unordered_map<std::string, std::string> &labels = {});
54+
const std::unordered_map<std::string, std::string> &labels = {},
55+
bool is_detached = false);
5556

5657
/// Constructs an ActorHandle from a serialized string.
5758
explicit ActorHandle(const std::string &serialized);
@@ -113,6 +114,8 @@ class ActorHandle {
113114

114115
bool EnableTensorTransport() const { return inner_.enable_tensor_transport(); }
115116

117+
bool IsDetached() const { return inner_.is_detached(); }
118+
116119
const ::google::protobuf::Map<std::string, std::string> &GetLabels() const {
117120
return inner_.labels();
118121
}

src/ray/core_worker/actor_manager.cc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,16 @@ bool ActorManager::CheckActorHandleExists(const ActorID &actor_id) {
119119
return actor_handles_.find(actor_id) != actor_handles_.end();
120120
}
121121

122+
std::shared_ptr<ActorHandle> ActorManager::GetActorHandleIfExists(
123+
const ActorID &actor_id) {
124+
absl::MutexLock lock(&mutex_);
125+
auto it = actor_handles_.find(actor_id);
126+
if (it != actor_handles_.end()) {
127+
return it->second;
128+
}
129+
return nullptr;
130+
}
131+
122132
bool ActorManager::AddNewActorHandle(std::unique_ptr<ActorHandle> actor_handle,
123133
const std::string &call_site,
124134
const rpc::Address &caller_address,

src/ray/core_worker/actor_manager.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,9 @@ class ActorManager {
143143
/// \param actor_id ID of the actor to be subscribed.
144144
void SubscribeActorState(const ActorID &actor_id);
145145

146+
/// Returns the actor handle if it exists, nullptr otherwise.
147+
std::shared_ptr<ActorHandle> GetActorHandleIfExists(const ActorID &actor_id);
148+
146149
private:
147150
/// Give this worker a handle to an actor.
148151
///

src/ray/core_worker/core_worker.cc

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2115,7 +2115,8 @@ Status CoreWorker::CreateActor(const RayFunction &function,
21152115
actor_creation_options.allow_out_of_order_execution,
21162116
actor_creation_options.enable_tensor_transport,
21172117
actor_creation_options.enable_task_events,
2118-
actor_creation_options.labels);
2118+
actor_creation_options.labels,
2119+
is_detached);
21192120
std::string serialized_actor_handle;
21202121
actor_handle->Serialize(&serialized_actor_handle);
21212122
ActorID root_detached_actor_id;
@@ -2162,13 +2163,38 @@ Status CoreWorker::CreateActor(const RayFunction &function,
21622163
return Status::OK();
21632164
}
21642165

2166+
auto ref_is_detached_actor = [this](const std::string &object_id) {
2167+
auto ref_object_id = ObjectID::FromBinary(object_id);
2168+
if (ObjectID::IsActorID(ref_object_id)) {
2169+
auto ref_actor_id = ObjectID::ToActorID(ref_object_id);
2170+
if (auto ref_actor_handle = actor_manager_->GetActorHandleIfExists(ref_actor_id)) {
2171+
if (ref_actor_handle->IsDetached()) {
2172+
return true;
2173+
}
2174+
}
2175+
}
2176+
return false;
2177+
};
21652178
if (task_spec.MaxActorRestarts() != 0) {
21662179
bool actor_restart_warning = false;
21672180
for (size_t i = 0; i < task_spec.NumArgs(); i++) {
2168-
if (task_spec.ArgByRef(i) || !task_spec.ArgInlinedRefs(i).empty()) {
2181+
if (task_spec.ArgByRef(i)) {
21692182
actor_restart_warning = true;
21702183
break;
21712184
}
2185+
if (!task_spec.ArgInlinedRefs(i).empty()) {
2186+
for (const auto &ref : task_spec.ArgInlinedRefs(i)) {
2187+
if (!ref_is_detached_actor(ref.object_id())) {
2188+
// There's an inlined ref that's not a detached actor, so we want to
2189+
// show the warning.
2190+
actor_restart_warning = true;
2191+
break;
2192+
}
2193+
}
2194+
}
2195+
if (actor_restart_warning) {
2196+
break;
2197+
}
21722198
}
21732199
if (actor_restart_warning) {
21742200
RAY_LOG_ONCE_PER_PROCESS(ERROR)

src/ray/protobuf/core_worker.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ message ActorHandle {
7474
map<string, string> labels = 15;
7575

7676
bool enable_tensor_transport = 16;
77+
78+
bool is_detached = 17;
7779
}
7880

7981
message PushTaskRequest {

0 commit comments

Comments
 (0)