diff --git a/src/ray/common/BUILD.bazel b/src/ray/common/BUILD.bazel index 7f7464582b06..f36c260daf62 100644 --- a/src/ray/common/BUILD.bazel +++ b/src/ray/common/BUILD.bazel @@ -282,6 +282,7 @@ ray_cc_library( ], deps = [ ":event_stats", + ":metrics", ":ray_config", "//src/ray/util:array", "@boost//:asio", @@ -311,6 +312,7 @@ ray_cc_library( "event_stats.h", ], deps = [ + ":metrics", ":ray_config", "//src/ray/stats:stats_metric", "//src/ray/util:time", diff --git a/src/ray/common/asio/instrumented_io_context.cc b/src/ray/common/asio/instrumented_io_context.cc index 9147452b6c7f..df66b018d1ee 100644 --- a/src/ray/common/asio/instrumented_io_context.cc +++ b/src/ray/common/asio/instrumented_io_context.cc @@ -20,7 +20,6 @@ #include "ray/common/asio/asio_chaos.h" #include "ray/common/asio/asio_util.h" #include "ray/stats/metric.h" -#include "ray/stats/metric_defs.h" namespace { @@ -35,7 +34,7 @@ void LagProbeLoop(instrumented_io_context &io_context, auto end = std::chrono::steady_clock::now(); auto duration = std::chrono::duration_cast(end - begin); - ray::stats::STATS_io_context_event_loop_lag_ms.Record( + io_context.io_context_event_loop_lag_ms_gauge_metric.Record( duration.count(), { {"Name", context_name.value_or(GetThreadName())}, @@ -106,8 +105,9 @@ void instrumented_io_context::post(std::function handler, auto stats_handle = event_stats_->RecordStart(std::move(name), emit_metrics_, 0, context_name_); handler = [handler = std::move(handler), + event_stats = event_stats_, stats_handle = std::move(stats_handle)]() mutable { - EventTracker::RecordExecution(handler, std::move(stats_handle)); + event_stats->RecordExecution(handler, std::move(stats_handle)); }; } @@ -129,9 +129,10 @@ void instrumented_io_context::dispatch(std::function handler, std::strin // GuardedHandlerStats synchronizes internal access, we can concurrently write to the // handler stats it->second from multiple threads without acquiring a table-level // readers lock in the callback. - boost::asio::dispatch( - *this, - [handler = std::move(handler), stats_handle = std::move(stats_handle)]() mutable { - EventTracker::RecordExecution(handler, std::move(stats_handle)); - }); + boost::asio::dispatch(*this, + [event_stats = event_stats_, + handler = std::move(handler), + stats_handle = std::move(stats_handle)]() mutable { + event_stats->RecordExecution(handler, std::move(stats_handle)); + }); } diff --git a/src/ray/common/asio/instrumented_io_context.h b/src/ray/common/asio/instrumented_io_context.h index 33778bffc80a..0d59c6f5da46 100644 --- a/src/ray/common/asio/instrumented_io_context.h +++ b/src/ray/common/asio/instrumented_io_context.h @@ -19,6 +19,7 @@ #include #include "ray/common/event_stats.h" +#include "ray/common/metrics.h" #include "ray/common/ray_config.h" #include "ray/util/logging.h" @@ -56,7 +57,10 @@ class instrumented_io_context : public boost::asio::io_context { /// for the provided handler. void dispatch(std::function handler, std::string name); - EventTracker &stats() const { return *event_stats_; }; + std::shared_ptr stats() const { return event_stats_; }; + + ray::stats::Gauge io_context_event_loop_lag_ms_gauge_metric{ + ray::GetIoContextEventLoopLagMsGaugeMetric()}; private: /// The event stats tracker to use to record asio handler stats to. diff --git a/src/ray/common/asio/periodical_runner.cc b/src/ray/common/asio/periodical_runner.cc index 8b1b0f7d9c2e..d5f23ad03b10 100644 --- a/src/ray/common/asio/periodical_runner.cc +++ b/src/ray/common/asio/periodical_runner.cc @@ -106,7 +106,7 @@ void PeriodicalRunner::DoRunFnPeriodicallyInstrumented( // which the handler was elgible to execute on the event loop but was queued by the // event loop. auto stats_handle = - io_service_.stats().RecordStart(name, false, period.total_nanoseconds()); + io_service_.stats()->RecordStart(name, false, period.total_nanoseconds()); timer->async_wait( [weak_self = weak_from_this(), fn = std::move(fn), @@ -115,7 +115,7 @@ void PeriodicalRunner::DoRunFnPeriodicallyInstrumented( stats_handle = std::move(stats_handle), name = std::move(name)](const boost::system::error_code &error) mutable { if (auto self = weak_self.lock(); self) { - self->io_service_.stats().RecordExecution( + self->io_service_.stats()->RecordExecution( [self, fn = std::move(fn), error, diff --git a/src/ray/common/event_stats.cc b/src/ray/common/event_stats.cc index ee0e7002e431..b52cd082dd52 100644 --- a/src/ray/common/event_stats.cc +++ b/src/ray/common/event_stats.cc @@ -21,7 +21,6 @@ #include #include "ray/stats/metric.h" -#include "ray/stats/metric_defs.h" #include "ray/util/time.h" namespace { @@ -66,9 +65,9 @@ std::shared_ptr EventTracker::RecordStart( } if (emit_metrics) { - ray::stats::STATS_operation_count.Record(1, event_context_name.value_or(name)); - ray::stats::STATS_operation_active_count.Record(curr_count, - event_context_name.value_or(name)); + operation_count_metric_.Record(1, {{"Name", event_context_name.value_or(name)}}); + operation_active_gauge_metric_.Record(curr_count, + {{"Name", event_context_name.value_or(name)}}); } return std::make_shared( @@ -89,10 +88,11 @@ void EventTracker::RecordEnd(std::shared_ptr handle) { if (handle->emit_stats) { // Update event-specific stats. - ray::stats::STATS_operation_run_time_ms.Record( - execution_time_ns / 1000000, handle->context_name.value_or(handle->event_name)); - ray::stats::STATS_operation_active_count.Record( - curr_count, handle->context_name.value_or(handle->event_name)); + operation_run_time_ms_histogram_metric_.Record( + execution_time_ns / 1000000, + {{"Name", handle->context_name.value_or(handle->event_name)}}); + operation_active_gauge_metric_.Record( + curr_count, {{"Name", handle->context_name.value_or(handle->event_name)}}); } handle->end_or_execution_recorded = true; @@ -135,13 +135,15 @@ void EventTracker::RecordExecution(const std::function &fn, if (handle->emit_stats) { // Update event-specific stats. - ray::stats::STATS_operation_run_time_ms.Record( - execution_time_ns / 1000000, handle->context_name.value_or(handle->event_name)); - ray::stats::STATS_operation_active_count.Record( - curr_count, handle->context_name.value_or(handle->event_name)); + operation_run_time_ms_histogram_metric_.Record( + execution_time_ns / 1000000, + {{"Name", handle->context_name.value_or(handle->event_name)}}); + operation_active_gauge_metric_.Record( + curr_count, {{"Name", handle->context_name.value_or(handle->event_name)}}); // Update global stats. - ray::stats::STATS_operation_queue_time_ms.Record( - queue_time_ns / 1000000, handle->context_name.value_or(handle->event_name)); + operation_queue_time_ms_histogram_metric_.Record( + queue_time_ns / 1000000, + {{"Name", handle->context_name.value_or(handle->event_name)}}); } { diff --git a/src/ray/common/event_stats.h b/src/ray/common/event_stats.h index d687d06de141..25f45c8cb017 100644 --- a/src/ray/common/event_stats.h +++ b/src/ray/common/event_stats.h @@ -18,6 +18,7 @@ #include "absl/container/flat_hash_map.h" #include "absl/synchronization/mutex.h" +#include "ray/common/metrics.h" #include "ray/common/ray_config.h" #include "ray/util/logging.h" @@ -132,14 +133,14 @@ class EventTracker { /// /// \param fn The function to execute and instrument. /// \param handle An opaque stats handle returned by RecordStart(). - static void RecordExecution(const std::function &fn, - std::shared_ptr handle); + void RecordExecution(const std::function &fn, + std::shared_ptr handle); /// Records the end of an event. This is used in conjunction /// with RecordStart() to manually instrument an event. /// /// \param handle An opaque stats handle returned by RecordStart(). - static void RecordEnd(std::shared_ptr handle); + void RecordEnd(std::shared_ptr handle); /// Returns a snapshot view of the global count, queueing, and execution statistics /// across all handlers. @@ -188,4 +189,12 @@ class EventTracker { /// Protects access to the per-handler post stats table. mutable absl::Mutex mutex_; + + ray::stats::Count operation_count_metric_{ray::GetOperationCountCounterMetric()}; + ray::stats::Gauge operation_active_gauge_metric_{ + ray::GetOperationActiveCountGaugeMetric()}; + ray::stats::Histogram operation_run_time_ms_histogram_metric_{ + ray::GetOperationRunTimeMsHistogramMetric()}; + ray::stats::Histogram operation_queue_time_ms_histogram_metric_{ + ray::GetOperationQueueTimeMsHistogramMetric()}; }; diff --git a/src/ray/common/metrics.h b/src/ray/common/metrics.h index 912dd3be1570..b0b8c789ff0d 100644 --- a/src/ray/common/metrics.h +++ b/src/ray/common/metrics.h @@ -86,4 +86,51 @@ inline ray::stats::Histogram GetSchedulerPlacementTimeMsHistogramMetric() { }; } +inline ray::stats::Gauge GetIoContextEventLoopLagMsGaugeMetric() { + return ray::stats::Gauge{ + /*name=*/"io_context_event_loop_lag_ms", + /*description=*/"The latency of a task from post to execution", + /*unit=*/"ms", + /*tag_keys=*/{"Name"}, + }; +} + +inline ray::stats::Count GetOperationCountCounterMetric() { + return ray::stats::Count{ + /*name=*/"operation_count", + /*description=*/"operation count", + /*unit=*/"", + /*tag_keys=*/{"Name"}, + }; +} + +inline ray::stats::Histogram GetOperationRunTimeMsHistogramMetric() { + return ray::stats::Histogram{ + /*name=*/"operation_run_time_ms", + /*description=*/"operation execution time", + /*unit=*/"ms", + /*boundaries=*/{1, 10, 100, 1000, 10000}, + /*tag_keys=*/{"Name"}, + }; +} + +inline ray::stats::Histogram GetOperationQueueTimeMsHistogramMetric() { + return ray::stats::Histogram{ + /*name=*/"operation_queue_time_ms", + /*description=*/"operation queuing time", + /*unit=*/"ms", + /*boundaries=*/{1, 10, 100, 1000, 10000}, + /*tag_keys=*/{"Name"}, + }; +} + +inline ray::stats::Gauge GetOperationActiveCountGaugeMetric() { + return ray::stats::Gauge{ + /*name=*/"operation_active_count", + /*description=*/"active operation number", + /*unit=*/"", + /*tag_keys=*/{"Name"}, + }; +} + } // namespace ray diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index dd7ce698533e..4c8484826211 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -453,10 +453,10 @@ CoreWorker::CoreWorker( periodical_runner_->RunFnPeriodically( [this] { RAY_LOG(INFO) << "Event stats:\n\n" - << io_service_.stats().StatsString() << "\n\n" + << io_service_.stats()->StatsString() << "\n\n" << "-----------------\n" << "Task execution event stats:\n" - << task_execution_service_.stats().StatsString() << "\n\n" + << task_execution_service_.stats()->StatsString() << "\n\n" << "-----------------\n" << "Task Event stats:\n" << task_event_buffer_->DebugString() << "\n"; diff --git a/src/ray/core_worker/task_event_buffer.cc b/src/ray/core_worker/task_event_buffer.cc index dc0d4d14a85c..1f6b6795326b 100644 --- a/src/ray/core_worker/task_event_buffer.cc +++ b/src/ray/core_worker/task_event_buffer.cc @@ -1012,7 +1012,7 @@ std::string TaskEventBufferImpl::DebugString() { auto stats = stats_counter_.GetAll(); ss << "\nIO Service Stats:\n"; - ss << io_service_.stats().StatsString(); + ss << io_service_.stats()->StatsString(); ss << "\nOther Stats:" << "\n\tgcs_grpc_in_progress:" << gcs_grpc_in_progress_ << "\n\tevent_aggregator_grpc_in_progress:" << event_aggregator_grpc_in_progress_ diff --git a/src/ray/gcs/gcs_server.cc b/src/ray/gcs/gcs_server.cc index d1b7cf0a4410..ecb81ebc95fe 100644 --- a/src/ray/gcs/gcs_server.cc +++ b/src/ray/gcs/gcs_server.cc @@ -952,11 +952,11 @@ void GcsServer::PrintDebugState() const { RayConfig::instance().event_stats_print_interval_ms(); if (event_stats_print_interval_ms != -1 && RayConfig::instance().event_stats()) { RAY_LOG(INFO) << "Main service Event stats:\n\n" - << io_context_provider_.GetDefaultIOContext().stats().StatsString() + << io_context_provider_.GetDefaultIOContext().stats()->StatsString() << "\n\n"; for (const auto &io_context : io_context_provider_.GetAllDedicatedIOContexts()) { RAY_LOG(INFO) << io_context->GetName() << " Event stats:\n\n" - << io_context->GetIoService().stats().StatsString() << "\n\n"; + << io_context->GetIoService().stats()->StatsString() << "\n\n"; } } } diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index 646198cce3f8..94b63e5cbedc 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -756,7 +756,7 @@ std::string ObjectManager::DebugString() const { << num_chunks_received_cancelled_; result << "\n- num chunks received failed / plasma error: " << num_chunks_received_failed_due_to_plasma_; - result << "\nEvent stats:" << rpc_service_.stats().StatsString(); + result << "\nEvent stats:" << rpc_service_.stats()->StatsString(); result << "\n" << push_manager_->DebugString(); result << "\n" << object_directory_->DebugString(); result << "\n" << buffer_pool_.DebugString(); diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index e68bdb3dda7e..6eff80a18bde 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -2568,7 +2568,7 @@ std::string NodeManager::DebugString() const { } // Event stats. - result << "\nEvent stats:" << io_service_.stats().StatsString(); + result << "\nEvent stats:" << io_service_.stats()->StatsString(); result << "\nDebugString() time ms: " << (current_time_ms() - now_ms); return result.str(); diff --git a/src/ray/raylet_ipc_client/client_connection.cc b/src/ray/raylet_ipc_client/client_connection.cc index 5626d8822e80..9049ed32826b 100644 --- a/src/ray/raylet_ipc_client/client_connection.cc +++ b/src/ray/raylet_ipc_client/client_connection.cc @@ -274,8 +274,9 @@ void ServerConnection::DoAsyncWrites() { if (RayConfig::instance().event_stats()) { auto &io_context = static_cast(socket_.get_executor().context()); + auto event_stats = io_context.stats(); const auto stats_handle = - io_context.stats().RecordStart("ClientConnection.async_write.DoAsyncWrites"); + event_stats->RecordStart("ClientConnection.async_write.DoAsyncWrites"); boost::asio::async_write( socket_, message_buffers, @@ -285,24 +286,27 @@ void ServerConnection::DoAsyncWrites() { call_handlers, stats_handle = std::move(stats_handle)](const boost::system::error_code &error, size_t bytes_transferred) { - EventTracker::RecordExecution( - [this, this_ptr, num_messages, call_handlers, error]() { - ray::Status status = boost_to_ray_status(error); - if (error.value() == boost::system::errc::errc_t::broken_pipe) { - RAY_LOG(ERROR) << "Broken Pipe happened during calling " - << "ServerConnection::DoAsyncWrites."; - // From now on, calling DoAsyncWrites will directly call the handler - // with this broken-pipe status. - async_write_broken_pipe_ = true; - } else if (!status.ok()) { - RAY_LOG(ERROR) - << "Error encountered during calling " - << "ServerConnection::DoAsyncWrites, message: " << status.message() - << ", error code: " << static_cast(error.value()); - } - call_handlers(status, num_messages); - }, - std::move(stats_handle)); + static_cast(socket_.get_executor().context()) + .stats() + ->RecordExecution( + [this, this_ptr, num_messages, call_handlers, error]() { + ray::Status status = boost_to_ray_status(error); + if (error.value() == boost::system::errc::errc_t::broken_pipe) { + RAY_LOG(ERROR) << "Broken Pipe happened during calling " + << "ServerConnection::DoAsyncWrites."; + // From now on, calling DoAsyncWrites will directly call the handler + // with this broken-pipe status. + async_write_broken_pipe_ = true; + } else if (!status.ok()) { + RAY_LOG(ERROR) + << "Error encountered during calling " + << "ServerConnection::DoAsyncWrites, message: " + << status.message() + << ", error code: " << static_cast(error.value()); + } + call_handlers(status, num_messages); + }, + std::move(stats_handle)); }); } else { boost::asio::async_write( @@ -378,16 +382,18 @@ void ClientConnection::ProcessMessages() { auto this_ptr = shared_ClientConnection_from_this(); auto &io_context = static_cast( ServerConnection::socket_.get_executor().context()); - auto stats_handle = io_context.stats().RecordStart( - "ClientConnection.async_read.ProcessMessageHeader"); + auto event_stats = io_context.stats(); + auto stats_handle = + event_stats->RecordStart("ClientConnection.async_read.ProcessMessageHeader"); boost::asio::async_read( ServerConnection::socket_, header, [this, this_ptr, stats_handle = std::move(stats_handle)]( const boost::system::error_code &ec, size_t bytes_transferred) mutable { - EventTracker::RecordExecution( - [this, this_ptr, ec]() { ProcessMessageHeader(ec); }, - std::move(stats_handle)); + static_cast(socket_.get_executor().context()) + .stats() + ->RecordExecution([this, this_ptr, ec]() { ProcessMessageHeader(ec); }, + std::move(stats_handle)); }); } else { boost::asio::async_read(ServerConnection::socket_, @@ -426,15 +432,16 @@ void ClientConnection::ProcessMessageHeader(const boost::system::error_code &err auto this_ptr = shared_ClientConnection_from_this(); auto &io_context = static_cast( ServerConnection::socket_.get_executor().context()); + auto event_stats = io_context.stats(); auto stats_handle = - io_context.stats().RecordStart("ClientConnection.async_read.ProcessMessage"); + event_stats->RecordStart("ClientConnection.async_read.ProcessMessage"); boost::asio::async_read( ServerConnection::socket_, boost::asio::buffer(read_message_), - [this, this_ptr, stats_handle = std::move(stats_handle)]( + [this, this_ptr, event_stats, stats_handle = std::move(stats_handle)]( const boost::system::error_code &ec, size_t bytes_transferred) mutable { - EventTracker::RecordExecution([this, this_ptr, ec]() { ProcessMessage(ec); }, - std::move(stats_handle)); + event_stats->RecordExecution([this, this_ptr, ec]() { ProcessMessage(ec); }, + std::move(stats_handle)); }); } else { boost::asio::async_read(ServerConnection::socket_, diff --git a/src/ray/rpc/client_call.h b/src/ray/rpc/client_call.h index 0c890ee080cb..c823ca903055 100644 --- a/src/ray/rpc/client_call.h +++ b/src/ray/rpc/client_call.h @@ -271,7 +271,7 @@ class ClientCallManager { const ClientCallback &callback, std::string call_name, int64_t method_timeout_ms = -1) { - auto stats_handle = main_service_.stats().RecordStart(std::move(call_name)); + auto stats_handle = main_service_.stats()->RecordStart(std::move(call_name)); if (method_timeout_ms == -1) { method_timeout_ms = call_timeout_ms_; } @@ -349,7 +349,7 @@ class ClientCallManager { // Implement the delay of the rpc client call as the // delay of OnReplyReceived(). ray::asio::testing::GetDelayUs(stats_handle->event_name)); - EventTracker::RecordEnd(std::move(stats_handle)); + main_service_.stats()->RecordEnd(std::move(stats_handle)); } else { delete tag; } diff --git a/src/ray/rpc/server_call.h b/src/ray/rpc/server_call.h index 1ce071f383e7..1728ffff81a6 100644 --- a/src/ray/rpc/server_call.h +++ b/src/ray/rpc/server_call.h @@ -207,7 +207,7 @@ class ServerCallImpl : public ServerCall { ServerCallState GetState() const override { return state_; } void HandleRequest() override { - stats_handle_ = io_service_.stats().RecordStart(call_name_); + stats_handle_ = io_service_.stats()->RecordStart(call_name_); bool auth_success = true; bool token_auth_failed = false; bool cluster_id_auth_failed = false; @@ -373,7 +373,7 @@ class ServerCallImpl : public ServerCall { /// Log the duration this query used void LogProcessTime() { - EventTracker::RecordEnd(std::move(stats_handle_)); + io_service_.stats()->RecordEnd(std::move(stats_handle_)); auto end_time = absl::GetCurrentTimeNanos(); if (record_metrics_) { grpc_server_req_process_time_ms_histogram_.Record( diff --git a/src/ray/stats/metric_defs.cc b/src/ray/stats/metric_defs.cc index faab7b4fea9c..a4482165fcc0 100644 --- a/src/ray/stats/metric_defs.cc +++ b/src/ray/stats/metric_defs.cc @@ -74,27 +74,6 @@ DEFINE_stats( /// ===================== INTERNAL SYSTEM METRICS ================================= /// =============================================================================== -DEFINE_stats(io_context_event_loop_lag_ms, - "Latency of a task from post to execution", - ("Name"), // Name of the instrumented_io_context. - (), - ray::stats::GAUGE); - -/// Event stats -DEFINE_stats(operation_count, "operation count", ("Name"), (), ray::stats::COUNT); -DEFINE_stats(operation_run_time_ms, - "operation execution time", - ("Name"), - ({1, 10, 100, 1000, 10000}), - ray::stats::HISTOGRAM); -DEFINE_stats(operation_queue_time_ms, - "operation queuing time", - ("Name"), - ({1, 10, 100, 1000, 10000}), - ray::stats::HISTOGRAM); -DEFINE_stats( - operation_active_count, "active operation number", ("Name"), (), ray::stats::GAUGE); - /// Scheduler DEFINE_stats( scheduler_tasks, diff --git a/src/ray/stats/metric_defs.h b/src/ray/stats/metric_defs.h index bb4a75ecc223..7d97ed329302 100644 --- a/src/ray/stats/metric_defs.h +++ b/src/ray/stats/metric_defs.h @@ -42,15 +42,6 @@ namespace stats { /// ray_[component]_[metrics_name]_total (e.g., ray_pull_manager_total) /// -/// ASIO stats -DECLARE_stats(io_context_event_loop_lag_ms); - -/// Event stats -DECLARE_stats(operation_count); -DECLARE_stats(operation_run_time_ms); -DECLARE_stats(operation_queue_time_ms); -DECLARE_stats(operation_active_count); - /// Scheduler DECLARE_stats(scheduler_failed_worker_startup_total); DECLARE_stats(scheduler_tasks);