Skip to content

Commit ce437fc

Browse files
authored
[core] Give io context concurrency hint (#53642)
Most of our io_contexts are only run on a single thread. asio has a concurrency hint option to improve performance in this case, so I'm adding an optional parameter to the constructor if the io context is guaranteed to run on a single thread. Also serves as a source of truth for whether an io_context is only running from a single thread or multiple threads. This is for sure a premature optimization, but shouldn't hurt either way. What the boost io_context documentation says about the `1` option (we're on 1.81). https://www.boost.org/doc/libs/1_81_0/doc/html/boost_asio/overview/core/concurrency_hint.html > The implementation assumes that the io_context will be run from a single thread, and applies several optimisations based on this assumption. For example, when a handler is posted from within another handler, the new handler is added to a fast thread-local queue (with the consequence that the new handler is held back until the currently executing handler finishes). The io_context still provides full thread safety, and distinct I/O objects may be used from any thread. --------- Signed-off-by: dayshah <dhyey2019@gmail.com>
1 parent d57f4a0 commit ce437fc

File tree

14 files changed

+39
-24
lines changed

14 files changed

+39
-24
lines changed

python/ray/includes/global_state_accessor.pxd

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ cdef extern from * namespace "ray::gcs" nogil:
100100
RAY_CHECK(absl::Base64Unescape(config, &config_list));
101101
RayConfig::instance().initialize(config_list);
102102
103-
instrumented_io_context io_service;
103+
instrumented_io_context io_service{/*enable_lag_probe=*/false, /*running_on_single_thread=*/true};
104104
105105
auto redis_client = std::make_shared<RedisClient>(options);
106106
auto status = redis_client->Connect(io_service);

src/ray/common/asio/asio_util.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ class InstrumentedIOContextWithThread {
6161
*/
6262
explicit InstrumentedIOContextWithThread(const std::string &thread_name,
6363
bool enable_lag_probe = false)
64-
: io_service_(enable_lag_probe),
64+
: io_service_(enable_lag_probe, /*running_on_single_thread=*/true),
6565
work_(io_service_.get_executor()),
6666
thread_name_(thread_name) {
6767
io_thread_ = std::thread([this] {
@@ -91,7 +91,8 @@ class InstrumentedIOContextWithThread {
9191
}
9292

9393
private:
94-
instrumented_io_context io_service_;
94+
instrumented_io_context io_service_{/*enable_lag_probe=*/false,
95+
/*running_on_single_thread=*/true};
9596
boost::asio::executor_work_guard<boost::asio::io_context::executor_type>
9697
work_; // to keep io_service_ running
9798
std::thread io_thread_;

src/ray/common/asio/instrumented_io_context.cc

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,11 @@ void ScheduleLagProbe(instrumented_io_context &io_context) {
7373
}
7474
} // namespace
7575

76-
instrumented_io_context::instrumented_io_context(bool enable_lag_probe)
77-
: event_stats_(std::make_shared<EventTracker>()) {
76+
instrumented_io_context::instrumented_io_context(bool enable_lag_probe,
77+
bool running_on_single_thread)
78+
: boost::asio::io_context(
79+
running_on_single_thread ? 1 : BOOST_ASIO_CONCURRENCY_HINT_DEFAULT),
80+
event_stats_(std::make_shared<EventTracker>()) {
7881
if (enable_lag_probe) {
7982
ScheduleLagProbe(*this);
8083
}
@@ -98,23 +101,24 @@ void instrumented_io_context::post(std::function<void()> handler,
98101
}
99102

100103
if (delay_us == 0) {
101-
boost::asio::io_context::post(std::move(handler));
104+
boost::asio::post(*this, std::move(handler));
102105
} else {
103106
execute_after(*this, std::move(handler), std::chrono::microseconds(delay_us));
104107
}
105108
}
106109

107110
void instrumented_io_context::dispatch(std::function<void()> handler, std::string name) {
108111
if (!RayConfig::instance().event_stats()) {
109-
return boost::asio::io_context::post(std::move(handler));
112+
return boost::asio::post(*this, std::move(handler));
110113
}
111114
auto stats_handle = event_stats_->RecordStart(std::move(name));
112115
// References are only invalidated upon deletion of the corresponding item from the
113116
// table, which we won't do until this io_context is deleted. Provided that
114117
// GuardedHandlerStats synchronizes internal access, we can concurrently write to the
115118
// handler stats it->second from multiple threads without acquiring a table-level
116119
// readers lock in the callback.
117-
boost::asio::io_context::dispatch(
120+
boost::asio::dispatch(
121+
*this,
118122
[handler = std::move(handler), stats_handle = std::move(stats_handle)]() mutable {
119123
EventTracker::RecordExecution(handler, std::move(stats_handle));
120124
});

src/ray/common/asio/instrumented_io_context.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ class instrumented_io_context : public boost::asio::io_context {
3434
/// \param enable_lag_probe If true, and if related Ray configs are set, schedule a
3535
/// probe to measure the event loop lag. After a probe is done, it schedules another one
3636
/// so a io_context.run() call will never return.
37-
explicit instrumented_io_context(bool enable_lag_probe = false);
37+
explicit instrumented_io_context(bool enable_lag_probe = false,
38+
bool running_on_single_thread = false);
3839

3940
/// A proxy post function that collects count, queueing, and execution statistics for
4041
/// the given handler.

src/ray/common/asio/io_service_pool.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ IOServicePool::~IOServicePool() {}
2424

2525
void IOServicePool::Run() {
2626
for (size_t i = 0; i < io_service_num_; ++i) {
27-
io_services_.emplace_back(std::make_unique<instrumented_io_context>());
27+
io_services_.emplace_back(std::make_unique<instrumented_io_context>(
28+
/*enable_lag_probe=*/false, /*running_on_single_thread=*/true));
2829
instrumented_io_context &io_service = *io_services_[i];
2930
threads_.emplace_back([&io_service] {
3031
boost::asio::executor_work_guard<boost::asio::io_context::executor_type> work(

src/ray/common/client_connection.cc

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -193,13 +193,13 @@ void ServerConnection::ReadBufferAsync(
193193
if (RayConfig::instance().event_stats()) {
194194
auto &io_context =
195195
static_cast<instrumented_io_context &>(socket_.get_executor().context());
196-
const auto stats_handle =
196+
auto stats_handle =
197197
io_context.stats().RecordStart("ServerConnection.async_read.ReadBufferAsync");
198198
boost::asio::async_read(
199199
socket_,
200200
buffer,
201201
[handler, stats_handle = std::move(stats_handle)](
202-
const boost::system::error_code &ec, size_t bytes_transferred) {
202+
const boost::system::error_code &ec, size_t bytes_transferred) mutable {
203203
EventTracker::RecordExecution(
204204
[handler, ec]() { handler(boost_to_ray_status(ec)); },
205205
std::move(stats_handle));
@@ -427,13 +427,13 @@ void ClientConnection::ProcessMessages() {
427427
auto this_ptr = shared_ClientConnection_from_this();
428428
auto &io_context = static_cast<instrumented_io_context &>(
429429
ServerConnection::socket_.get_executor().context());
430-
const auto stats_handle = io_context.stats().RecordStart(
430+
auto stats_handle = io_context.stats().RecordStart(
431431
"ClientConnection.async_read.ProcessMessageHeader");
432432
boost::asio::async_read(
433433
ServerConnection::socket_,
434434
header,
435435
[this, this_ptr, stats_handle = std::move(stats_handle)](
436-
const boost::system::error_code &ec, size_t bytes_transferred) {
436+
const boost::system::error_code &ec, size_t bytes_transferred) mutable {
437437
EventTracker::RecordExecution(
438438
[this, this_ptr, ec]() { ProcessMessageHeader(ec); },
439439
std::move(stats_handle));
@@ -468,13 +468,13 @@ void ClientConnection::ProcessMessageHeader(const boost::system::error_code &err
468468
auto this_ptr = shared_ClientConnection_from_this();
469469
auto &io_context = static_cast<instrumented_io_context &>(
470470
ServerConnection::socket_.get_executor().context());
471-
const auto stats_handle =
471+
auto stats_handle =
472472
io_context.stats().RecordStart("ClientConnection.async_read.ProcessMessage");
473473
boost::asio::async_read(
474474
ServerConnection::socket_,
475475
boost::asio::buffer(read_message_),
476476
[this, this_ptr, stats_handle = std::move(stats_handle)](
477-
const boost::system::error_code &ec, size_t bytes_transferred) {
477+
const boost::system::error_code &ec, size_t bytes_transferred) mutable {
478478
EventTracker::RecordExecution([this, this_ptr, ec]() { ProcessMessage(ec); },
479479
std::move(stats_handle));
480480
});

src/ray/common/file_system_monitor.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ class FileSystemMonitor {
6868
const std::vector<std::string> paths_;
6969
const double capacity_threshold_;
7070
std::atomic<bool> over_capacity_;
71-
instrumented_io_context io_context_;
71+
instrumented_io_context io_context_{/*enable_lag_probe=*/false,
72+
/*running_on_single_thread=*/true};
7273
std::thread monitor_thread_;
7374
std::shared_ptr<PeriodicalRunner> runner_;
7475
};

src/ray/core_worker/core_worker.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1745,7 +1745,8 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
17451745
bool initialized_ ABSL_GUARDED_BY(initialize_mutex_) = false;
17461746

17471747
/// Event loop where the IO events are handled. e.g. async GCS operations.
1748-
instrumented_io_context io_service_;
1748+
instrumented_io_context io_service_{/*enable_lag_probe=*/false,
1749+
/*running_on_single_thread=*/true};
17491750

17501751
/// Keeps the io_service_ alive.
17511752
boost::asio::executor_work_guard<boost::asio::io_context::executor_type> io_work_;

src/ray/core_worker/core_worker_process.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,8 @@ void CoreWorkerProcessImpl::InitializeSystemConfig() {
264264
// the system config in the constructor of `CoreWorkerProcessImpl`.
265265
std::promise<std::string> promise;
266266
std::thread thread([&] {
267-
instrumented_io_context io_service;
267+
instrumented_io_context io_service{/*enable_lag_probe=*/false,
268+
/*running_on_single_thread=*/true};
268269
boost::asio::executor_work_guard<boost::asio::io_context::executor_type> work(
269270
io_service.get_executor());
270271
rpc::ClientCallManager client_call_manager(io_service, /*record_stats=*/false);

src/ray/core_worker/task_event_buffer.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -436,7 +436,8 @@ class TaskEventBufferImpl : public TaskEventBuffer {
436436
absl::Mutex profile_mutex_;
437437

438438
/// IO service event loop owned by TaskEventBuffer.
439-
instrumented_io_context io_service_;
439+
instrumented_io_context io_service_{/*enable_lag_probe=*/false,
440+
/*running_on_single_thread=*/true};
440441

441442
/// Work guard to prevent the io_context from exiting when no work.
442443
boost::asio::executor_work_guard<boost::asio::io_context::executor_type> work_guard_;

0 commit comments

Comments
 (0)