Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/ray/common/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ ray_cc_library(
],
deps = [
":event_stats",
":metrics",
":ray_config",
"//src/ray/util:array",
"@boost//:asio",
Expand Down Expand Up @@ -311,6 +312,7 @@ ray_cc_library(
"event_stats.h",
],
deps = [
":metrics",
":ray_config",
"//src/ray/stats:stats_metric",
"//src/ray/util:time",
Expand Down
17 changes: 9 additions & 8 deletions src/ray/common/asio/instrumented_io_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#include "ray/common/asio/asio_chaos.h"
#include "ray/common/asio/asio_util.h"
#include "ray/stats/metric.h"
#include "ray/stats/metric_defs.h"

namespace {

Expand All @@ -35,7 +34,7 @@ void LagProbeLoop(instrumented_io_context &io_context,
auto end = std::chrono::steady_clock::now();
auto duration =
std::chrono::duration_cast<std::chrono::milliseconds>(end - begin);
ray::stats::STATS_io_context_event_loop_lag_ms.Record(
io_context.io_context_event_loop_lag_ms_gauge_metric.Record(
duration.count(),
{
{"Name", context_name.value_or(GetThreadName())},
Expand Down Expand Up @@ -106,8 +105,9 @@ void instrumented_io_context::post(std::function<void()> handler,
auto stats_handle =
event_stats_->RecordStart(std::move(name), emit_metrics_, 0, context_name_);
handler = [handler = std::move(handler),
event_stats = event_stats_,
stats_handle = std::move(stats_handle)]() mutable {
EventTracker::RecordExecution(handler, std::move(stats_handle));
event_stats->RecordExecution(handler, std::move(stats_handle));
};
}

Expand All @@ -129,9 +129,10 @@ void instrumented_io_context::dispatch(std::function<void()> handler, std::strin
// GuardedHandlerStats synchronizes internal access, we can concurrently write to the
// handler stats it->second from multiple threads without acquiring a table-level
// readers lock in the callback.
boost::asio::dispatch(
*this,
[handler = std::move(handler), stats_handle = std::move(stats_handle)]() mutable {
EventTracker::RecordExecution(handler, std::move(stats_handle));
});
boost::asio::dispatch(*this,
[event_stats = event_stats_,
handler = std::move(handler),
stats_handle = std::move(stats_handle)]() mutable {
event_stats->RecordExecution(handler, std::move(stats_handle));
});
}
6 changes: 5 additions & 1 deletion src/ray/common/asio/instrumented_io_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <string>

#include "ray/common/event_stats.h"
#include "ray/common/metrics.h"
#include "ray/common/ray_config.h"
#include "ray/util/logging.h"

Expand Down Expand Up @@ -56,7 +57,10 @@ class instrumented_io_context : public boost::asio::io_context {
/// for the provided handler.
void dispatch(std::function<void()> handler, std::string name);

EventTracker &stats() const { return *event_stats_; };
std::shared_ptr<EventTracker> stats() const { return event_stats_; };

ray::stats::Gauge io_context_event_loop_lag_ms_gauge_metric{
ray::GetIoContextEventLoopLagMsGaugeMetric()};

private:
/// The event stats tracker to use to record asio handler stats to.
Expand Down
4 changes: 2 additions & 2 deletions src/ray/common/asio/periodical_runner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ void PeriodicalRunner::DoRunFnPeriodicallyInstrumented(
// which the handler was elgible to execute on the event loop but was queued by the
// event loop.
auto stats_handle =
io_service_.stats().RecordStart(name, false, period.total_nanoseconds());
io_service_.stats()->RecordStart(name, false, period.total_nanoseconds());
timer->async_wait(
[weak_self = weak_from_this(),
fn = std::move(fn),
Expand All @@ -115,7 +115,7 @@ void PeriodicalRunner::DoRunFnPeriodicallyInstrumented(
stats_handle = std::move(stats_handle),
name = std::move(name)](const boost::system::error_code &error) mutable {
if (auto self = weak_self.lock(); self) {
self->io_service_.stats().RecordExecution(
self->io_service_.stats()->RecordExecution(
[self,
fn = std::move(fn),
error,
Expand Down
30 changes: 16 additions & 14 deletions src/ray/common/event_stats.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include <utility>

#include "ray/stats/metric.h"
#include "ray/stats/metric_defs.h"
#include "ray/util/time.h"

namespace {
Expand Down Expand Up @@ -66,9 +65,9 @@ std::shared_ptr<StatsHandle> EventTracker::RecordStart(
}

if (emit_metrics) {
ray::stats::STATS_operation_count.Record(1, event_context_name.value_or(name));
ray::stats::STATS_operation_active_count.Record(curr_count,
event_context_name.value_or(name));
operation_count_metric_.Record(1, {{"Name", event_context_name.value_or(name)}});
operation_active_gauge_metric_.Record(curr_count,
{{"Name", event_context_name.value_or(name)}});
}

return std::make_shared<StatsHandle>(
Expand All @@ -89,10 +88,11 @@ void EventTracker::RecordEnd(std::shared_ptr<StatsHandle> handle) {

if (handle->emit_stats) {
// Update event-specific stats.
ray::stats::STATS_operation_run_time_ms.Record(
execution_time_ns / 1000000, handle->context_name.value_or(handle->event_name));
ray::stats::STATS_operation_active_count.Record(
curr_count, handle->context_name.value_or(handle->event_name));
operation_run_time_ms_histogram_metric_.Record(
execution_time_ns / 1000000,
{{"Name", handle->context_name.value_or(handle->event_name)}});
operation_active_gauge_metric_.Record(
curr_count, {{"Name", handle->context_name.value_or(handle->event_name)}});
}

handle->end_or_execution_recorded = true;
Expand Down Expand Up @@ -135,13 +135,15 @@ void EventTracker::RecordExecution(const std::function<void()> &fn,

if (handle->emit_stats) {
// Update event-specific stats.
ray::stats::STATS_operation_run_time_ms.Record(
execution_time_ns / 1000000, handle->context_name.value_or(handle->event_name));
ray::stats::STATS_operation_active_count.Record(
curr_count, handle->context_name.value_or(handle->event_name));
operation_run_time_ms_histogram_metric_.Record(
execution_time_ns / 1000000,
{{"Name", handle->context_name.value_or(handle->event_name)}});
operation_active_gauge_metric_.Record(
curr_count, {{"Name", handle->context_name.value_or(handle->event_name)}});
// Update global stats.
ray::stats::STATS_operation_queue_time_ms.Record(
queue_time_ns / 1000000, handle->context_name.value_or(handle->event_name));
operation_queue_time_ms_histogram_metric_.Record(
queue_time_ns / 1000000,
{{"Name", handle->context_name.value_or(handle->event_name)}});
}

{
Expand Down
15 changes: 12 additions & 3 deletions src/ray/common/event_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "absl/container/flat_hash_map.h"
#include "absl/synchronization/mutex.h"
#include "ray/common/metrics.h"
#include "ray/common/ray_config.h"
#include "ray/util/logging.h"

Expand Down Expand Up @@ -132,14 +133,14 @@ class EventTracker {
///
/// \param fn The function to execute and instrument.
/// \param handle An opaque stats handle returned by RecordStart().
static void RecordExecution(const std::function<void()> &fn,
std::shared_ptr<StatsHandle> handle);
void RecordExecution(const std::function<void()> &fn,
std::shared_ptr<StatsHandle> handle);

/// Records the end of an event. This is used in conjunction
/// with RecordStart() to manually instrument an event.
///
/// \param handle An opaque stats handle returned by RecordStart().
static void RecordEnd(std::shared_ptr<StatsHandle> handle);
void RecordEnd(std::shared_ptr<StatsHandle> handle);

/// Returns a snapshot view of the global count, queueing, and execution statistics
/// across all handlers.
Expand Down Expand Up @@ -188,4 +189,12 @@ class EventTracker {

/// Protects access to the per-handler post stats table.
mutable absl::Mutex mutex_;

ray::stats::Count operation_count_metric_{ray::GetOperationCountCounterMetric()};
ray::stats::Gauge operation_active_gauge_metric_{
ray::GetOperationActiveCountGaugeMetric()};
ray::stats::Histogram operation_run_time_ms_histogram_metric_{
ray::GetOperationRunTimeMsHistogramMetric()};
ray::stats::Histogram operation_queue_time_ms_histogram_metric_{
ray::GetOperationQueueTimeMsHistogramMetric()};
};
47 changes: 47 additions & 0 deletions src/ray/common/metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,51 @@ inline ray::stats::Histogram GetSchedulerPlacementTimeMsHistogramMetric() {
};
}

inline ray::stats::Gauge GetIoContextEventLoopLagMsGaugeMetric() {
return ray::stats::Gauge{
/*name=*/"io_context_event_loop_lag_ms",
/*description=*/"The latency of a task from post to execution",
/*unit=*/"ms",
/*tag_keys=*/{"Name"},
};
}

inline ray::stats::Count GetOperationCountCounterMetric() {
return ray::stats::Count{
/*name=*/"operation_count",
/*description=*/"operation count",
/*unit=*/"",
/*tag_keys=*/{"Name"},
};
}

inline ray::stats::Histogram GetOperationRunTimeMsHistogramMetric() {
return ray::stats::Histogram{
/*name=*/"operation_run_time_ms",
/*description=*/"operation execution time",
/*unit=*/"ms",
/*boundaries=*/{1, 10, 100, 1000, 10000},
/*tag_keys=*/{"Name"},
};
}

inline ray::stats::Histogram GetOperationQueueTimeMsHistogramMetric() {
return ray::stats::Histogram{
/*name=*/"operation_queue_time_ms",
/*description=*/"operation queuing time",
/*unit=*/"ms",
/*boundaries=*/{1, 10, 100, 1000, 10000},
/*tag_keys=*/{"Name"},
};
}

inline ray::stats::Gauge GetOperationActiveCountGaugeMetric() {
return ray::stats::Gauge{
/*name=*/"operation_active_count",
/*description=*/"active operation number",
/*unit=*/"",
/*tag_keys=*/{"Name"},
};
}

} // namespace ray
4 changes: 2 additions & 2 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -453,10 +453,10 @@ CoreWorker::CoreWorker(
periodical_runner_->RunFnPeriodically(
[this] {
RAY_LOG(INFO) << "Event stats:\n\n"
<< io_service_.stats().StatsString() << "\n\n"
<< io_service_.stats()->StatsString() << "\n\n"
<< "-----------------\n"
<< "Task execution event stats:\n"
<< task_execution_service_.stats().StatsString() << "\n\n"
<< task_execution_service_.stats()->StatsString() << "\n\n"
<< "-----------------\n"
<< "Task Event stats:\n"
<< task_event_buffer_->DebugString() << "\n";
Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/task_event_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1012,7 +1012,7 @@ std::string TaskEventBufferImpl::DebugString() {

auto stats = stats_counter_.GetAll();
ss << "\nIO Service Stats:\n";
ss << io_service_.stats().StatsString();
ss << io_service_.stats()->StatsString();
ss << "\nOther Stats:"
<< "\n\tgcs_grpc_in_progress:" << gcs_grpc_in_progress_
<< "\n\tevent_aggregator_grpc_in_progress:" << event_aggregator_grpc_in_progress_
Expand Down
4 changes: 2 additions & 2 deletions src/ray/gcs/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -952,11 +952,11 @@ void GcsServer::PrintDebugState() const {
RayConfig::instance().event_stats_print_interval_ms();
if (event_stats_print_interval_ms != -1 && RayConfig::instance().event_stats()) {
RAY_LOG(INFO) << "Main service Event stats:\n\n"
<< io_context_provider_.GetDefaultIOContext().stats().StatsString()
<< io_context_provider_.GetDefaultIOContext().stats()->StatsString()
<< "\n\n";
for (const auto &io_context : io_context_provider_.GetAllDedicatedIOContexts()) {
RAY_LOG(INFO) << io_context->GetName() << " Event stats:\n\n"
<< io_context->GetIoService().stats().StatsString() << "\n\n";
<< io_context->GetIoService().stats()->StatsString() << "\n\n";
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/ray/object_manager/object_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ std::string ObjectManager::DebugString() const {
<< num_chunks_received_cancelled_;
result << "\n- num chunks received failed / plasma error: "
<< num_chunks_received_failed_due_to_plasma_;
result << "\nEvent stats:" << rpc_service_.stats().StatsString();
result << "\nEvent stats:" << rpc_service_.stats()->StatsString();
result << "\n" << push_manager_->DebugString();
result << "\n" << object_directory_->DebugString();
result << "\n" << buffer_pool_.DebugString();
Expand Down
2 changes: 1 addition & 1 deletion src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2568,7 +2568,7 @@ std::string NodeManager::DebugString() const {
}

// Event stats.
result << "\nEvent stats:" << io_service_.stats().StatsString();
result << "\nEvent stats:" << io_service_.stats()->StatsString();

result << "\nDebugString() time ms: " << (current_time_ms() - now_ms);
return result.str();
Expand Down
Loading