Skip to content

Commit b35e67a

Browse files
[Core] Exit the Core Worker Early Error Received from Plasma Store (#53679)
What's the issue: - During node shutdown, when the raylet is killed before its core workers, and the tasks on the core workers read/write objects from the plasma store, a broken pipe error will be obtained and the tasks will fail due to the ray task error with reason broken pipe and thus the whole job failed. - This is not the desired behavior because the task failure due to node shutdown should be seen as the system failure and the core worker shouldn't continue executing tasks when the raylet is down. The PR made the change to mitigate the above issue: - In the plasmas store client, add the logic to do core worker quick exit when error happens during read/write buffer and the plasma store client is on the core worker side Test the logic manually to verify the behavior: - With the following test code: ``` ray.init() @ray.remote(max_retries=2) def test_task(obj_ref): time.sleep(1) raylet_pid = int(os.environ["RAY_RAYLET_PID"]) os.kill(raylet_pid, signal.SIGKILL) ray.put(obj_ref) a = ray.put([0] * 250000) ray.get(test_task.remote(a)) ``` - Without the change: ``` ray.exceptions.RayTaskError(OSError): ray::test_task() (pid=30681, ip=127.0.0.1) File "/Users/myan/ray-core-quickstart/test-tasks/test-tasks.py", line 18, in test_task ray.get(test_ref) File "python/ray/includes/common.pxi", line 93, in ray._raylet.check_status raise IOError(message) OSError: Failed to read data from the socket: End of file ``` - With the change in the PR: ``` ray.exceptions.LocalRayletDiedError: The task's local raylet died. Check raylet.out for more information. ``` --------- Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com> Co-authored-by: Ibrahim Rabbani <israbbani@gmail.com>
1 parent 5fddebd commit b35e67a

File tree

7 files changed

+95
-6
lines changed

7 files changed

+95
-6
lines changed

python/ray/tests/test_failure_3.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,31 @@ def get_raylet_pid(self):
4747
wait_for_pid_to_exit(worker_pid)
4848

4949

50+
def test_plasma_store_operation_after_raylet_dies(ray_start_cluster):
51+
"""
52+
Test that the operation on the plasma store after the raylet dies will not fail the
53+
task with an application level error (RayTaskError) but a system level error
54+
(RayletDiedError).
55+
"""
56+
cluster = ray_start_cluster
57+
cluster.add_node(num_cpus=1)
58+
cluster.wait_for_nodes()
59+
60+
ray.init(address=cluster.address)
61+
62+
@ray.remote
63+
def get_after_raylet_dies():
64+
raylet_pid = int(os.environ["RAY_RAYLET_PID"])
65+
os.kill(raylet_pid, SIGKILL)
66+
wait_for_pid_to_exit(raylet_pid)
67+
ray.put([0] * 100000)
68+
69+
try:
70+
ray.get(get_after_raylet_dies.remote(), timeout=10)
71+
except Exception as e:
72+
assert isinstance(e, ray.exceptions.LocalRayletDiedError)
73+
74+
5075
@pytest.mark.parametrize(
5176
"ray_start_cluster_head",
5277
[

src/ray/common/client_connection.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ class ServerConnection : public std::enable_shared_from_this<ServerConnection> {
9999
///
100100
/// \param buffer The buffer.
101101
/// \return Status.
102-
Status WriteBuffer(const std::vector<boost::asio::const_buffer> &buffer);
102+
virtual Status WriteBuffer(const std::vector<boost::asio::const_buffer> &buffer);
103103

104104
/// Write a buffer to this connection asynchronously.
105105
///
@@ -113,7 +113,7 @@ class ServerConnection : public std::enable_shared_from_this<ServerConnection> {
113113
///
114114
/// \param buffer The buffer.
115115
/// \return Status.
116-
Status ReadBuffer(const std::vector<boost::asio::mutable_buffer> &buffer);
116+
virtual Status ReadBuffer(const std::vector<boost::asio::mutable_buffer> &buffer);
117117

118118
/// Read a buffer from this connection asynchronously.
119119
///

src/ray/core_worker/store_provider/plasma_store_provider.cc

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,13 @@ CoreWorkerPlasmaStoreProvider::CoreWorkerPlasmaStoreProvider(
6565
bool warmup,
6666
std::function<std::string()> get_current_call_site)
6767
: raylet_client_(raylet_client),
68-
store_client_(std::make_shared<plasma::PlasmaClient>()),
68+
// We can turn on exit_on_connection_failure on for the core worker plasma
69+
// client to early exit core worker after the raylet's death because on the
70+
// raylet side, we never proactively close the plasma store connection even
71+
// during shutdown. So any error from the raylet side should be a sign of raylet
72+
// death.
73+
store_client_(
74+
std::make_shared<plasma::PlasmaClient>(/*exit_on_connection_failure*/ true)),
6975
reference_counter_(reference_counter),
7076
check_signals_(std::move(check_signals)) {
7177
if (get_current_call_site != nullptr) {

src/ray/object_manager/plasma/client.cc

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ struct ObjectInUseEntry {
9797
class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Impl> {
9898
public:
9999
Impl();
100+
explicit Impl(bool exit_on_connection_failure);
100101
~Impl();
101102

102103
// PlasmaClient method implementations
@@ -235,11 +236,17 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp
235236
std::unordered_set<ObjectID> deletion_cache_;
236237
/// A mutex which protects this class.
237238
std::recursive_mutex client_mutex_;
239+
/// Whether the current process should exit when read or write to the connection fails.
240+
/// It should only be turned on when the plasma client is in a core worker.
241+
bool exit_on_connection_failure_;
238242
};
239243

240244
PlasmaBuffer::~PlasmaBuffer() { RAY_UNUSED(client_->Release(object_id_)); }
241245

242-
PlasmaClient::Impl::Impl() : store_capacity_(0) {}
246+
PlasmaClient::Impl::Impl() : store_capacity_(0), exit_on_connection_failure_(false) {}
247+
248+
PlasmaClient::Impl::Impl(bool exit_on_connection_failure)
249+
: store_capacity_(0), exit_on_connection_failure_(exit_on_connection_failure) {}
243250

244251
PlasmaClient::Impl::~Impl() {}
245252

@@ -868,7 +875,7 @@ Status PlasmaClient::Impl::Connect(const std::string &store_socket_name,
868875
/// The local stream socket that connects to store.
869876
ray::local_stream_socket socket(main_service_);
870877
RAY_RETURN_NOT_OK(ray::ConnectSocketRetry(socket, store_socket_name));
871-
store_conn_.reset(new StoreConn(std::move(socket)));
878+
store_conn_.reset(new StoreConn(std::move(socket), exit_on_connection_failure_));
872879
// Send a ConnectRequest to the store to get its memory capacity.
873880
RAY_RETURN_NOT_OK(SendConnectRequest(store_conn_));
874881
std::vector<uint8_t> buffer;
@@ -912,6 +919,9 @@ std::string PlasmaClient::Impl::DebugString() {
912919

913920
PlasmaClient::PlasmaClient() : impl_(std::make_shared<PlasmaClient::Impl>()) {}
914921

922+
PlasmaClient::PlasmaClient(bool exit_on_connection_failure)
923+
: impl_(std::make_shared<PlasmaClient::Impl>(exit_on_connection_failure)) {}
924+
915925
Status PlasmaClient::Connect(const std::string &store_socket_name,
916926
const std::string &manager_socket_name,
917927
int num_retries) {

src/ray/object_manager/plasma/client.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,8 @@ class PlasmaClient : public PlasmaClientInterface {
242242
public:
243243
PlasmaClient();
244244

245+
explicit PlasmaClient(bool exit_on_connection_failure);
246+
245247
Status Connect(const std::string &store_socket_name,
246248
const std::string &manager_socket_name = "",
247249
int num_retries = -1) override;

src/ray/object_manager/plasma/connection.cc

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,11 @@ Status Client::SendFd(MEMFD_TYPE fd) {
170170
}
171171

172172
StoreConn::StoreConn(ray::local_stream_socket &&socket)
173-
: ray::ServerConnection(std::move(socket)) {}
173+
: ray::ServerConnection(std::move(socket)), exit_on_connection_failure_(false) {}
174+
175+
StoreConn::StoreConn(ray::local_stream_socket &&socket, bool exit_on_connection_failure)
176+
: ray::ServerConnection(std::move(socket)),
177+
exit_on_connection_failure_(exit_on_connection_failure) {}
174178

175179
Status StoreConn::RecvFd(MEMFD_TYPE_NON_UNIQUE *fd) {
176180
#ifdef _WIN32
@@ -192,4 +196,28 @@ Status StoreConn::RecvFd(MEMFD_TYPE_NON_UNIQUE *fd) {
192196
return Status::OK();
193197
}
194198

199+
ray::Status StoreConn::WriteBuffer(const std::vector<boost::asio::const_buffer> &buffer) {
200+
auto status = ray::ServerConnection::WriteBuffer(buffer);
201+
ExitIfErrorStatus(status);
202+
return status;
203+
}
204+
205+
ray::Status StoreConn::ReadBuffer(
206+
const std::vector<boost::asio::mutable_buffer> &buffer) {
207+
auto status = ray::ServerConnection::ReadBuffer(buffer);
208+
ExitIfErrorStatus(status);
209+
return status;
210+
}
211+
212+
void StoreConn::ExitIfErrorStatus(const ray::Status &status) {
213+
if (!status.ok() && exit_on_connection_failure_) {
214+
RAY_LOG(WARNING) << "The connection to the plasma store is failed. Terminate the "
215+
<< "process. Status: " << status;
216+
ray::QuickExit();
217+
RAY_LOG(FATAL)
218+
<< "Accessing unreachable code. This line should never be reached "
219+
<< "after quick process exit due to plasma store connection failure. Please "
220+
"create a github issue at https://github.com/ray-project/ray.";
221+
}
222+
}
195223
} // namespace plasma

src/ray/object_manager/plasma/connection.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,10 +164,28 @@ class StoreConn : public ray::ServerConnection {
164164
public:
165165
explicit StoreConn(ray::local_stream_socket &&socket);
166166

167+
explicit StoreConn(ray::local_stream_socket &&socket, bool exit_on_connection_failure);
168+
167169
/// Receive a file descriptor for the store.
168170
///
169171
/// \return A file descriptor.
170172
ray::Status RecvFd(MEMFD_TYPE_NON_UNIQUE *fd);
173+
174+
ray::Status WriteBuffer(const std::vector<boost::asio::const_buffer> &buffer) override;
175+
176+
ray::Status ReadBuffer(const std::vector<boost::asio::mutable_buffer> &buffer) override;
177+
178+
private:
179+
// Whether the current process should exit when WriteBuffer or ReadBuffer fails.
180+
// Currently it is only turned on when the plasma client is in a core worker.
181+
// TODO(myan): The better way is to handle the failure outside of the plasma client
182+
// and inside the core worker's logic and propogate the correct exception to the user.
183+
bool exit_on_connection_failure_ = false;
184+
185+
// Shutdown the current process if the passed in status is not OK and the client is
186+
// configured to exit on failure.
187+
// @param status: The status to check.
188+
void ExitIfErrorStatus(const ray::Status &status);
171189
};
172190

173191
std::ostream &operator<<(std::ostream &os, const std::shared_ptr<StoreConn> &store_conn);

0 commit comments

Comments
 (0)