Skip to content

Commit 17dcffe

Browse files
authored
chore(runtime): Make nats_client private, refactor NATS stats scraping (#4591)
Signed-off-by: Graham King <grahamk@nvidia.com>
1 parent c90e3df commit 17dcffe

File tree

6 files changed

+181
-185
lines changed

6 files changed

+181
-185
lines changed

lib/runtime/src/component.rs

Lines changed: 3 additions & 133 deletions
Original file line numberDiff line numberDiff line change
@@ -269,131 +269,6 @@ impl Component {
269269
instances.sort();
270270
Ok(instances)
271271
}
272-
273-
/// Scrape ServiceSet, which contains NATS stats as well as user defined stats
274-
/// embedded in data field of ServiceInfo.
275-
async fn scrape_stats(&self, timeout: Duration) -> anyhow::Result<ServiceSet> {
276-
// Debug: scraping stats for component
277-
let service_name = self.service_name();
278-
let Some(service_client) = self
279-
.drt()
280-
.nats_client()
281-
.map(|nc| ServiceClient::new(nc.clone()))
282-
else {
283-
anyhow::bail!("ServiceSet is gathered via NATS, do not call this in non-NATS setups.");
284-
};
285-
service_client
286-
.collect_services(&service_name, timeout)
287-
.await
288-
}
289-
290-
/// Add Prometheus metrics for this component's NATS service stats.
291-
///
292-
/// Starts a background task that periodically requests service statistics from NATS
293-
/// and updates the corresponding Prometheus metrics. The first scrape happens immediately,
294-
/// then subsequent scrapes occur at a fixed interval of 9.8 seconds (MAX_WAIT_MS),
295-
/// which should be near or smaller than typical Prometheus scraping intervals to ensure
296-
/// metrics are fresh when Prometheus collects them.
297-
fn start_scraping_nats_service_component_metrics(&self) -> anyhow::Result<()> {
298-
const MAX_WAIT_MS: std::time::Duration = std::time::Duration::from_millis(9800); // Should be <= Prometheus scrape interval
299-
300-
// If there is another component with the same service name, this will fail.
301-
let component_metrics = ComponentNatsServerPrometheusMetrics::new(self)?;
302-
303-
let component_clone = self.clone();
304-
305-
// Start a background task that scrapes stats every 5 seconds
306-
let m = component_metrics.clone();
307-
let c = component_clone.clone();
308-
309-
// Use the DRT's runtime handle to spawn the background task.
310-
// We cannot use regular `tokio::spawn` here because:
311-
// 1. This method may be called from contexts without an active Tokio runtime
312-
// (e.g., tests that create a DRT in a blocking context)
313-
// 2. Tests often create a temporary runtime just to build the DRT, then drop it
314-
// 3. `tokio::spawn` requires being called from within a runtime context
315-
// By using the DRT's own runtime handle, we ensure the task runs in the
316-
// correct runtime that will persist for the lifetime of the component.
317-
c.drt().runtime().secondary().spawn(async move {
318-
let timeout = std::time::Duration::from_millis(500);
319-
let mut interval = tokio::time::interval(MAX_WAIT_MS);
320-
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
321-
322-
loop {
323-
match c.scrape_stats(timeout).await {
324-
Ok(service_set) => {
325-
m.update_from_service_set(&service_set);
326-
}
327-
Err(err) => {
328-
tracing::error!(
329-
"Background scrape failed for {}: {}",
330-
c.service_name(),
331-
err
332-
);
333-
m.reset_to_zeros();
334-
}
335-
}
336-
337-
interval.tick().await;
338-
}
339-
});
340-
341-
Ok(())
342-
}
343-
344-
// Gather NATS metrics
345-
async fn add_stats_service(&mut self) -> anyhow::Result<()> {
346-
let service_name = self.service_name();
347-
348-
// Pre-check to save cost of creating the service, but don't hold the lock
349-
if self
350-
.drt
351-
.component_registry()
352-
.inner
353-
.lock()
354-
.await
355-
.services
356-
.contains_key(&service_name)
357-
{
358-
// The NATS service is per component, but it is called from `serve_endpoint`, and there
359-
// are often multiple endpoints for a component (e.g. `clear_kv_blocks` and `generate`).
360-
tracing::trace!("Service {service_name} already exists");
361-
return Ok(());
362-
}
363-
364-
let Some(nats_client) = self.drt.nats_client() else {
365-
anyhow::bail!("Cannot create NATS service without NATS.");
366-
};
367-
let description = None;
368-
let (nats_service, stats_reg) =
369-
service::build_nats_service(nats_client, self, description).await?;
370-
371-
let mut guard = self.drt.component_registry().inner.lock().await;
372-
if !guard.services.contains_key(&service_name) {
373-
// Normal case
374-
guard.services.insert(service_name.clone(), nats_service);
375-
guard.stats_handlers.insert(service_name.clone(), stats_reg);
376-
377-
tracing::info!("Added NATS / stats service {service_name}");
378-
379-
drop(guard);
380-
} else {
381-
drop(guard);
382-
let _ = nats_service.stop().await;
383-
// The NATS service is per component, but it is called from `serve_endpoint`, and there
384-
// are often multiple endpoints for a component (e.g. `clear_kv_blocks` and `generate`).
385-
return Ok(());
386-
}
387-
388-
if let Err(err) = self.start_scraping_nats_service_component_metrics() {
389-
tracing::debug!(
390-
"Metrics registration failed for '{}': {}",
391-
self.service_name(),
392-
err
393-
);
394-
}
395-
Ok(())
396-
}
397272
}
398273

399274
impl ComponentBuilder {
@@ -404,14 +279,9 @@ impl ComponentBuilder {
404279
pub fn build(self) -> Result<Component, anyhow::Error> {
405280
let component = self.build_internal()?;
406281
// If this component is using NATS, gather it's metrics
407-
if component.drt().request_plane().is_nats() {
408-
let mut c = component.clone();
409-
// Start in the background to isolate the async, and because we don't need it yet
410-
component.drt().runtime().secondary().spawn(async move {
411-
if let Err(err) = c.add_stats_service().await {
412-
tracing::error!(error = %err, component = c.service_name(), "Failed starting stats service");
413-
}
414-
});
282+
let drt = component.drt();
283+
if drt.request_plane().is_nats() {
284+
drt.start_stats_service(component.clone());
415285
}
416286
Ok(component)
417287
}

lib/runtime/src/component/component.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,9 @@ impl EventPublisher for Component {
3232
bytes: Vec<u8>,
3333
) -> Result<()> {
3434
let subject = format!("{}.{}", self.subject(), event_name.as_ref());
35-
let Some(nats_client) = self.drt().nats_client() else {
36-
anyhow::bail!("KV router's EventPublisher requires NATS");
37-
};
38-
nats_client.client().publish(subject, bytes.into()).await?;
35+
self.drt()
36+
.kv_router_nats_publish(subject, bytes.into())
37+
.await?;
3938
Ok(())
4039
}
4140
}
@@ -47,10 +46,7 @@ impl EventSubscriber for Component {
4746
event_name: impl AsRef<str> + Send + Sync,
4847
) -> Result<async_nats::Subscriber> {
4948
let subject = format!("{}.{}", self.subject(), event_name.as_ref());
50-
let Some(nats_client) = self.drt().nats_client() else {
51-
anyhow::bail!("KV router's EventSubscriber requires NATS");
52-
};
53-
Ok(nats_client.client().subscribe(subject).await?)
49+
Ok(self.drt().kv_router_nats_subscribe(subject).await?)
5450
}
5551

5652
async fn subscribe_with_type<T: for<'de> Deserialize<'de> + Send + 'static>(

lib/runtime/src/component/namespace.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,9 @@ impl EventPublisher for Namespace {
3535
bytes: Vec<u8>,
3636
) -> Result<()> {
3737
let subject = format!("{}.{}", self.subject(), event_name.as_ref());
38-
let Some(nats_client) = self.drt().nats_client() else {
39-
anyhow::bail!("KV router's Namespace EventPublisher requires NATS");
40-
};
41-
nats_client.client().publish(subject, bytes.into()).await?;
38+
self.drt()
39+
.kv_router_nats_publish(subject, bytes.into())
40+
.await?;
4241
Ok(())
4342
}
4443
}
@@ -50,10 +49,7 @@ impl EventSubscriber for Namespace {
5049
event_name: impl AsRef<str> + Send + Sync,
5150
) -> Result<async_nats::Subscriber> {
5251
let subject = format!("{}.{}", self.subject(), event_name.as_ref());
53-
let Some(nats_client) = self.drt().nats_client() else {
54-
anyhow::bail!("KV router's Namespace EventSubscriber requires NATS");
55-
};
56-
Ok(nats_client.client().subscribe(subject).await?)
52+
Ok(self.drt().kv_router_nats_subscribe(subject).await?)
5753
}
5854

5955
async fn subscribe_with_type<T: for<'de> Deserialize<'de> + Send + 'static>(

0 commit comments

Comments
 (0)