Skip to content

Commit f05f762

Browse files
authored
chore: Make nats_client private at crate level, various tidy up (#4513)
Signed-off-by: Graham King <grahamk@nvidia.com>
1 parent 2790453 commit f05f762

File tree

22 files changed

+75
-1141
lines changed

22 files changed

+75
-1141
lines changed

lib/llm/src/audit/bus.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,22 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
use super::handle::AuditRecord;
5-
use std::sync::{Arc, OnceLock};
5+
use std::sync::OnceLock;
66
use tokio::sync::broadcast;
77

8-
static BUS: OnceLock<broadcast::Sender<Arc<AuditRecord>>> = OnceLock::new();
8+
static BUS: OnceLock<broadcast::Sender<AuditRecord>> = OnceLock::new();
99

1010
pub fn init(capacity: usize) {
11-
let (tx, _rx) = broadcast::channel::<Arc<AuditRecord>>(capacity);
11+
let (tx, _rx) = broadcast::channel::<AuditRecord>(capacity);
1212
let _ = BUS.set(tx);
1313
}
1414

15-
pub fn subscribe() -> broadcast::Receiver<Arc<AuditRecord>> {
15+
pub fn subscribe() -> broadcast::Receiver<AuditRecord> {
1616
BUS.get().expect("audit bus not initialized").subscribe()
1717
}
1818

1919
pub fn publish(rec: AuditRecord) {
2020
if let Some(tx) = BUS.get() {
21-
let _ = tx.send(Arc::new(rec));
21+
let _ = tx.send(rec);
2222
}
2323
}

lib/llm/src/audit/config.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@ pub struct AuditPolicy {
1010

1111
static POLICY: OnceLock<AuditPolicy> = OnceLock::new();
1212

13+
/// Audit is enabled if we have at least one sink
1314
pub fn init_from_env() -> AuditPolicy {
14-
let enabled = std::env::var("DYN_AUDIT_ENABLED")
15-
.map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
16-
.unwrap_or(false);
17-
AuditPolicy { enabled }
15+
AuditPolicy {
16+
enabled: std::env::var("DYN_AUDIT_SINKS").is_ok(),
17+
}
1818
}
1919

2020
pub fn policy() -> AuditPolicy {

lib/llm/src/audit/sink.rs

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
22
// SPDX-License-Identifier: Apache-2.0
33

4+
use anyhow::Context as _;
45
use async_nats::jetstream;
56
use async_trait::async_trait;
7+
use dynamo_runtime::transports::nats;
68
use std::sync::Arc;
79
use tokio::sync::broadcast;
810

@@ -36,7 +38,7 @@ pub struct NatsSink {
3638
}
3739

3840
impl NatsSink {
39-
pub fn new(nats_client: &dynamo_runtime::transports::nats::Client) -> Self {
41+
pub fn new(nats_client: dynamo_runtime::transports::nats::Client) -> Self {
4042
let subject = std::env::var("DYN_AUDIT_NATS_SUBJECT")
4143
.unwrap_or_else(|_| "dynamo.audit.v1".to_string());
4244
Self {
@@ -64,37 +66,32 @@ impl AuditSink for NatsSink {
6466
}
6567
}
6668

67-
fn parse_sinks_from_env(
68-
nats_client: Option<&dynamo_runtime::transports::nats::Client>,
69-
) -> Vec<Arc<dyn AuditSink>> {
69+
async fn parse_sinks_from_env() -> anyhow::Result<Vec<Arc<dyn AuditSink>>> {
7070
let cfg = std::env::var("DYN_AUDIT_SINKS").unwrap_or_else(|_| "stderr".into());
7171
let mut out: Vec<Arc<dyn AuditSink>> = Vec::new();
7272
for name in cfg.split(',').map(|s| s.trim().to_lowercase()) {
7373
match name.as_str() {
7474
"stderr" | "" => out.push(Arc::new(StderrSink)),
7575
"nats" => {
76-
if let Some(client) = nats_client {
77-
out.push(Arc::new(NatsSink::new(client)));
78-
} else {
79-
tracing::warn!(
80-
"NATS sink requested but no DistributedRuntime NATS client available; skipping"
81-
);
82-
}
76+
let nats_client = nats::ClientOptions::default()
77+
.connect()
78+
.await
79+
.context("Attempting to connect NATS sink from env var DYN_AUDIT_SINKS")?;
80+
out.push(Arc::new(NatsSink::new(nats_client)));
8381
}
8482
// "pg" => out.push(Arc::new(PostgresSink::from_env())),
8583
other => tracing::warn!(%other, "audit: unknown sink ignored"),
8684
}
8785
}
88-
out
86+
Ok(out)
8987
}
9088

9189
/// spawn one worker per sink; each subscribes to the bus (off hot path)
92-
pub fn spawn_workers_from_env(drt: &dynamo_runtime::DistributedRuntime) {
93-
let nats_client = drt.nats_client();
94-
let sinks = parse_sinks_from_env(nats_client);
90+
pub async fn spawn_workers_from_env() -> anyhow::Result<()> {
91+
let sinks = parse_sinks_from_env().await?;
9592
for sink in sinks {
9693
let name = sink.name();
97-
let mut rx: broadcast::Receiver<Arc<AuditRecord>> = bus::subscribe();
94+
let mut rx: broadcast::Receiver<AuditRecord> = bus::subscribe();
9895
tokio::spawn(async move {
9996
loop {
10097
match rx.recv().await {
@@ -110,4 +107,5 @@ pub fn spawn_workers_from_env(drt: &dynamo_runtime::DistributedRuntime) {
110107
});
111108
}
112109
tracing::info!("Audit sinks ready.");
110+
Ok(())
113111
}

lib/llm/src/block_manager/controller.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,8 @@ pub struct Controller<Locality: LocalityProvider, Metadata: BlockMetadata> {
3939
impl<Locality: LocalityProvider, Metadata: BlockMetadata> Controller<Locality, Metadata> {
4040
pub async fn new(
4141
block_manager: KvBlockManager<Locality, Metadata>,
42-
mut component: dynamo_runtime::component::Component,
42+
component: dynamo_runtime::component::Component,
4343
) -> anyhow::Result<Self> {
44-
component.add_stats_service().await?;
45-
4644
let handler = ControllerHandler::new(block_manager.clone());
4745
let engine = Ingress::for_engine(handler.clone())?;
4846

lib/llm/src/entrypoint/input.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ pub async fn run_input(
117117
.and_then(|v| v.parse().ok())
118118
.unwrap_or(1024);
119119
crate::audit::bus::init(cap);
120-
crate::audit::sink::spawn_workers_from_env(&drt);
120+
crate::audit::sink::spawn_workers_from_env().await?;
121121
tracing::info!(cap, "Audit initialized");
122122
}
123123

lib/llm/src/entrypoint/input/endpoint.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,9 @@ pub async fn run(
3232
let cancel_token = distributed_runtime.primary_token().clone();
3333
let endpoint_id: EndpointId = path.parse()?;
3434

35-
let mut component = distributed_runtime
35+
let component = distributed_runtime
3636
.namespace(&endpoint_id.namespace)?
3737
.component(&endpoint_id.component)?;
38-
39-
// We can only make the NATS service if we have NATS
40-
if distributed_runtime.nats_client().is_some() {
41-
component.add_stats_service().await?;
42-
}
4338
let endpoint = component.endpoint(&endpoint_id.name);
4439

4540
let rt_fut: Pin<Box<dyn Future<Output = _> + Send + 'static>> = match engine_config {

lib/llm/src/kv_router/sequence.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1002,8 +1002,7 @@ mod tests {
10021002

10031003
// Create namespace and shared component for both seq_managers
10041004
let namespace = distributed.namespace("test_cross_instance_sync")?;
1005-
let mut component = namespace.component("sequences")?;
1006-
component.add_stats_service().await?;
1005+
let component = namespace.component("sequences")?;
10071006

10081007
// Create multi-worker sequence managers with:
10091008
// - Worker 0 with dp_size=2 (dp_ranks 0 and 1)
@@ -1168,8 +1167,7 @@ mod tests {
11681167

11691168
// Create namespace and shared component for both seq_managers
11701169
let namespace = distributed.namespace("test_no_token_seq_sync")?;
1171-
let mut component = namespace.component("sequences")?;
1172-
component.add_stats_service().await?;
1170+
let component = namespace.component("sequences")?;
11731171

11741172
// Create multi-worker sequence managers with ALL workers [0, 1, 2]
11751173
// Both use the same component to ensure event synchronization works

lib/llm/src/mocker/engine.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -463,8 +463,7 @@ mod integration_tests {
463463
tracing::info!("✓ Runtime and distributed runtime created");
464464

465465
// Create component for MockVllmEngine (needed for publishers)
466-
let mut test_component = distributed.namespace("test")?.component(MOCKER_COMPONENT)?;
467-
test_component.add_stats_service().await?;
466+
let test_component = distributed.namespace("test")?.component(MOCKER_COMPONENT)?;
468467
tracing::info!("✓ Test component created");
469468

470469
// Create MockVllmEngine WITH component (enables publishers)

lib/llm/src/preprocessor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -856,7 +856,7 @@ impl
856856
let request_id = context.id().to_string();
857857
let original_stream_flag = request.inner.stream.unwrap_or(false);
858858

859-
// Build audit handle (None if DYN_AUDIT_ENABLED=0)
859+
// Build audit handle (None if no DYN_AUDIT_SINKS)
860860
let mut audit_handle = crate::audit::handle::create_handle(&request, &request_id);
861861

862862
if let Some(ref mut h) = audit_handle {

lib/llm/tests/audit_nats_integration.rs

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ mod tests {
2727
use dynamo_llm::protocols::openai::chat_completions::{
2828
NvCreateChatCompletionRequest, NvCreateChatCompletionResponse,
2929
};
30-
use dynamo_runtime::Runtime;
3130
use dynamo_runtime::transports::nats;
3231
use futures::StreamExt;
3332
use serde_json::Value;
@@ -48,15 +47,6 @@ mod tests {
4847
.expect("Failed to connect to NATS server")
4948
}
5049

51-
/// Helper to create a test DistributedRuntime with NATS
52-
async fn create_test_drt() -> dynamo_runtime::DistributedRuntime {
53-
let rt = Runtime::from_current().unwrap();
54-
let config = dynamo_runtime::distributed::DistributedConfig::from_settings();
55-
dynamo_runtime::DistributedRuntime::new(rt, config)
56-
.await
57-
.expect("Failed to create DistributedRuntime")
58-
}
59-
6050
/// Helper to create a minimal test request
6151
fn create_test_request(model: &str, store: bool) -> NvCreateChatCompletionRequest {
6252
let json = serde_json::json!({
@@ -155,7 +145,6 @@ mod tests {
155145
// Core test: audit records are published to NATS with correct structure
156146
async_with_vars(
157147
[
158-
("DYN_AUDIT_ENABLED", Some("1")),
159148
("DYN_AUDIT_SINKS", Some("nats")),
160149
("DYN_AUDIT_NATS_SUBJECT", Some(TEST_SUBJECT)),
161150
],
@@ -166,8 +155,7 @@ mod tests {
166155
setup_test_stream(&client, &stream_name, TEST_SUBJECT).await;
167156

168157
bus::init(100);
169-
let drt = create_test_drt().await;
170-
sink::spawn_workers_from_env(&drt);
158+
sink::spawn_workers_from_env().await.unwrap();
171159
time::sleep(Duration::from_millis(100)).await;
172160

173161
// Emit audit record
@@ -212,7 +200,6 @@ mod tests {
212200

213201
async_with_vars(
214202
[
215-
("DYN_AUDIT_ENABLED", Some("1")),
216203
("DYN_AUDIT_SINKS", Some("nats")),
217204
("DYN_AUDIT_NATS_SUBJECT", Some(TEST_SUBJECT)),
218205
],
@@ -223,8 +210,7 @@ mod tests {
223210
setup_test_stream(&client, &stream_name, TEST_SUBJECT).await;
224211

225212
bus::init(100);
226-
let drt = create_test_drt().await;
227-
sink::spawn_workers_from_env(&drt);
213+
sink::spawn_workers_from_env().await.unwrap();
228214
time::sleep(Duration::from_millis(100)).await;
229215

230216
// Request with store=true (should be audited)

0 commit comments

Comments
 (0)