Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions deploy/metrics/prometheus.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ scrape_configs:
- targets: ['host.docker.internal:8080'] # on the "monitoring" network

# Launch via: DYN_SYSTEM_ENABLED=true DYN_SYSTEM_PORT=8081 dynamo.<backend> ...
# If you want to update the scrape_interval, you may want to also update component.rs's MAX_DELAY
- job_name: 'dynamo-backend'
scrape_interval: 6s
static_configs:
Expand Down
6 changes: 3 additions & 3 deletions lib/bindings/python/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions lib/runtime/examples/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async fn test_backend_with_metrics() -> Result<()> {
let distributed = DistributedRuntime::from_settings(runtime.clone()).await?;

// Get the System status server info to find the actual port
let system_status_info = distributed.system_status_info();
let system_status_info = distributed.system_status_server_info();
let system_status_port = match system_status_info {
Some(info) => {
println!("System status server running on: {}", info.address());
Expand Down
96 changes: 42 additions & 54 deletions lib/runtime/src/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ impl Component {
/// Scrape ServiceSet, which contains NATS stats as well as user defined stats
/// embedded in data field of ServiceInfo.
pub async fn scrape_stats(&self, timeout: Duration) -> Result<ServiceSet> {
// Debug: scraping stats for component
let service_name = self.service_name();
let service_client = self.drt().service_client();
service_client
Expand All @@ -268,9 +269,15 @@ impl Component {

/// Add Prometheus metrics for this component's service stats.
///
/// Uses a channel to synchronize with the spawned async task, ensuring
/// metrics are updated before the callback returns.
pub fn add_metrics_callback(&self) -> Result<()> {
/// Starts a background task that scrapes stats every ~4.7s and updates metrics.
/// The thinking was that it should be a little bit shorter than the Prometheus polling interval.
/// Currently Prometheus polls every 6 seconds, and I wanted every poll to be fresh, so this is set
/// as an arbitrary 4.7 seconds plus 0.3 seconds if it times out. It's a bit of a hand-wavey decision.
pub fn start_scraping_metrics(&self) -> Result<()> {
const NATS_TIMEOUT_AND_INITIAL_DELAY_MS: std::time::Duration =
std::time::Duration::from_millis(300);
const MAX_DELAY_MS: std::time::Duration = std::time::Duration::from_millis(4700);

let component_metrics = ComponentNatsPrometheusMetrics::new(self)?;

let component_clone = self.clone();
Expand All @@ -281,60 +288,41 @@ impl Component {
self.service_name()
); // it happens that in component, hierarchy and service name are the same

// Register a metrics callback that scrapes component statistics
let metrics_callback = Arc::new(move || {
// Timeout for scraping metrics from components (in milliseconds)
// This value is also used by KV Router metrics aggregator (300ms) and other components
const METRICS_SCRAPE_TIMEOUT_MS: u64 = 300;

// Get the current Tokio runtime handle
let handle = tokio::runtime::Handle::try_current()
.map_err(|err| anyhow::anyhow!("No Tokio runtime handle available: {}", err))?;

let m = component_metrics.clone();
let c = component_clone.clone();

// Create a channel to synchronize with the spawned task
let (tx, rx) = std::sync::mpsc::channel::<anyhow::Result<()>>();

let timeout = std::time::Duration::from_millis(METRICS_SCRAPE_TIMEOUT_MS);
handle.spawn(async move {
let result = match c.scrape_stats(timeout).await {
Ok(service_set) => {
m.update_from_service_set(&service_set);
Ok(())
}
Err(err) => {
// Reset metrics on failure
m.reset_to_zeros();
Err(anyhow::anyhow!("Failed to scrape stats: {}", err))
// Start a background task that scrapes stats every 5 seconds
let m = component_metrics.clone();
let c = component_clone.clone();

// Use std::thread for the background task to avoid runtime context issues
std::thread::spawn(move || {
// Use the existing secondary runtime from drt for background metrics scraping
let rt = c.drt().runtime().secondary();

// Run the background scraping loop
rt.block_on(async {
let timeout = NATS_TIMEOUT_AND_INITIAL_DELAY_MS;
let mut delay = NATS_TIMEOUT_AND_INITIAL_DELAY_MS;

loop {
match c.scrape_stats(timeout).await {
Ok(service_set) => {
m.update_from_service_set(&service_set);
}
Err(err) => {
tracing::error!(
"Background scrape failed for {}: {}",
c.service_name(),
err
);
m.reset_to_zeros();
// Double delay on failure, capped at MAX_DELAY
delay = std::cmp::min(delay * 2, MAX_DELAY_MS);
}
}
};

// Send the result back to the waiting thread
// If send fails, the receiver has already given up waiting
let _ = tx.send(result);
});

// Wait for the spawned task to complete (with a timeout to prevent hanging)
// Add 100ms buffer to the scrape timeout to account for processing overhead
let recv_timeout = std::time::Duration::from_millis(METRICS_SCRAPE_TIMEOUT_MS + 100);
match rx.recv_timeout(recv_timeout) {
Ok(result) => result, // Return the actual result from scraping
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
component_metrics.reset_to_zeros();
Err(anyhow::anyhow!("Metrics collection timed out"))
tokio::time::sleep(delay).await;
}
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
component_metrics.reset_to_zeros();
Err(anyhow::anyhow!("Metrics collection task failed"))
}
}
});
});

self.drt()
.register_metrics_callback(hierarchies, metrics_callback);

Ok(())
}

Expand Down Expand Up @@ -587,7 +575,7 @@ impl Namespace {
// Register the metrics callback for this component.
// If registration fails, log a warning but do not propagate the error,
// as metrics are not mission critical and should not block component creation.
if let Err(err) = component.add_metrics_callback() {
if let Err(err) = component.start_scraping_metrics() {
tracing::warn!(
"Failed to add metrics callback for component '{}': {}",
component.service_name(),
Expand Down
13 changes: 2 additions & 11 deletions lib/runtime/src/distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,8 @@ impl DistributedRuntime {
self.instance_sources.clone()
}

/// Add a Prometheus metric to a specific hierarchy's registry
/// Add a Prometheus metric to a specific hierarchy's registry. Note that it is possible
/// to register the same metric name multiple times, as long as the labels are different.
pub fn add_prometheus_metric(
&self,
hierarchy: &str,
Expand All @@ -257,16 +258,6 @@ impl DistributedRuntime {
let mut registries = self.hierarchy_to_metricsregistry.write().unwrap();
let entry = registries.entry(hierarchy.to_string()).or_default();

// If a metric with this name already exists for the hierarchy, warn and skip registration
if entry.has_metric_named(metric_name) {
tracing::warn!(
hierarchy = ?hierarchy,
metric_name = ?metric_name,
"Metric already exists in registry; skipping registration"
);
return Ok(());
}

// Try to register the metric and provide better error information
match entry.prometheus_registry.register(prometheus_metric) {
Ok(_) => Ok(()),
Expand Down
Loading
Loading