Skip to content

Commit d04d5ee

Browse files
committed
Various fixes to integration tests. Namespace collision. Fix
dynamo_system_ namespace.
1 parent 47de232 commit d04d5ee

File tree

5 files changed

+103
-56
lines changed

5 files changed

+103
-56
lines changed

lib/runtime/src/component/component.rs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -89,24 +89,26 @@ mod tests {
8989
async fn test_publish() {
9090
let rt = Runtime::from_current().unwrap();
9191
let dtr = DistributedRuntime::from_settings(rt.clone()).await.unwrap();
92-
let ns = dtr.namespace("test".to_string()).unwrap();
93-
let cp = ns.component("component".to_string()).unwrap();
94-
cp.publish("test", &"test".to_string()).await.unwrap();
92+
let ns = dtr.namespace("test_component_publish".to_string()).unwrap();
93+
let cp = ns.component("test_component".to_string()).unwrap();
94+
cp.publish("test_event", &"test".to_string()).await.unwrap();
9595
rt.shutdown();
9696
}
9797

9898
#[tokio::test]
9999
async fn test_subscribe() {
100100
let rt = Runtime::from_current().unwrap();
101101
let dtr = DistributedRuntime::from_settings(rt.clone()).await.unwrap();
102-
let ns = dtr.namespace("test".to_string()).unwrap();
103-
let cp = ns.component("component".to_string()).unwrap();
102+
let ns = dtr
103+
.namespace("test_component_subscribe".to_string())
104+
.unwrap();
105+
let cp = ns.component("test_component".to_string()).unwrap();
104106

105-
// Create a subscriber
106-
let mut subscriber = ns.subscribe("test").await.unwrap();
107+
// Create a subscriber on the component (not namespace)
108+
let mut subscriber = cp.subscribe("test_event").await.unwrap();
107109

108-
// Publish a message
109-
cp.publish("test", &"test_message".to_string())
110+
// Publish a message from the component
111+
cp.publish("test_event", &"test_message".to_string())
110112
.await
111113
.unwrap();
112114

lib/runtime/src/component/namespace.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -99,22 +99,24 @@ mod tests {
9999
async fn test_publish() {
100100
let rt = Runtime::from_current().unwrap();
101101
let dtr = DistributedRuntime::from_settings(rt.clone()).await.unwrap();
102-
let ns = dtr.namespace("test".to_string()).unwrap();
103-
ns.publish("test", &"test".to_string()).await.unwrap();
102+
let ns = dtr.namespace("test_namespace_publish".to_string()).unwrap();
103+
ns.publish("test_event", &"test".to_string()).await.unwrap();
104104
rt.shutdown();
105105
}
106106

107107
#[tokio::test]
108108
async fn test_subscribe() {
109109
let rt = Runtime::from_current().unwrap();
110110
let dtr = DistributedRuntime::from_settings(rt.clone()).await.unwrap();
111-
let ns = dtr.namespace("test".to_string()).unwrap();
111+
let ns = dtr
112+
.namespace("test_namespace_subscribe".to_string())
113+
.unwrap();
112114

113115
// Create a subscriber
114-
let mut subscriber = ns.subscribe("test").await.unwrap();
116+
let mut subscriber = ns.subscribe("test_event").await.unwrap();
115117

116118
// Publish a message
117-
ns.publish("test", &"test_message".to_string())
119+
ns.publish("test_event", &"test_message".to_string())
118120
.await
119121
.unwrap();
120122

lib/runtime/src/http_server.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ impl crate::traits::DistributedRuntimeProvider for HttpMetricsRegistry {
7575

7676
impl MetricsRegistry for HttpMetricsRegistry {
7777
fn basename(&self) -> String {
78-
"http_server".to_string()
78+
"dynamo_system_server".to_string()
7979
}
8080

8181
fn parent_hierarchy(&self) -> Vec<String> {
@@ -98,7 +98,7 @@ impl HttpServerState {
9898
// Note: This metric is created at the DRT level (no namespace), so we manually add "dynamo_" prefix
9999
// to maintain consistency with the project's metric naming convention
100100
let uptime_gauge = http_metrics_registry.as_ref().create_gauge(
101-
"dynamo_uptime_seconds",
101+
"uptime_seconds",
102102
"Total uptime of the DistributedRuntime in seconds",
103103
&[],
104104
)?;
@@ -332,9 +332,9 @@ mod tests {
332332
println!("Full metrics response:\n{}", response);
333333

334334
let expected = "\
335-
# HELP dynamo_uptime_seconds Total uptime of the DistributedRuntime in seconds
336-
# TYPE dynamo_uptime_seconds gauge
337-
dynamo_uptime_seconds{namespace=\"http_server\"} 42
335+
# HELP dynamo_system_server_uptime_seconds Total uptime of the DistributedRuntime in seconds
336+
# TYPE dynamo_system_server_uptime_seconds gauge
337+
dynamo_system_server_uptime_seconds{namespace=\"dynamo_system_server\"} 42
338338
";
339339
assert_eq!(response, expected);
340340
}

lib/runtime/src/metrics.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -797,7 +797,7 @@ mod test_prefixes {
797797
println!("\n=== Testing Invalid Namespace Behavior ===");
798798

799799
// Create a namespace with invalid name (contains hyphen)
800-
let invalid_namespace = drt.namespace("test-namespace").unwrap();
800+
let invalid_namespace = drt.namespace("@@123").unwrap();
801801

802802
// Debug: Let's see what the hierarchy looks like
803803
println!(
@@ -810,15 +810,15 @@ mod test_prefixes {
810810
);
811811
println!("Invalid namespace prefix: '{}'", invalid_namespace.prefix());
812812

813-
// Try to create a metric - this should fail because the namespace name will be used in the metric name
813+
// Try to create a metric - this should fail because "@@123" gets stripped to "" which is invalid
814814
let result = invalid_namespace.create_counter("test_counter", "A test counter", &[]);
815-
println!("Result with invalid namespace 'test-namespace':");
815+
println!("Result with invalid namespace '@@123':");
816816
println!("{:?}", result);
817817

818-
// The result should be an error from Prometheus
818+
// The result should be an error because empty metric names are invalid
819819
assert!(
820820
result.is_err(),
821-
"Creating metric with invalid namespace should fail"
821+
"Creating metric with namespace '@@123' should fail because it gets stripped to empty string"
822822
);
823823

824824
// For comparison, show a valid namespace works
@@ -926,15 +926,15 @@ testnamespace_testgauge{{component="testcomponent",namespace="testnamespace"}} 5
926926
println!("{}", namespace_output);
927927

928928
let expected_namespace_output = format!(
929-
r#"# HELP testintcounter A test int counter
930-
# TYPE testintcounter counter
931-
testintcounter{{namespace="testnamespace"}} 12345
932-
# HELP testnamespace_testcounter A test counter
929+
r#"# HELP testnamespace_testcounter A test counter
933930
# TYPE testnamespace_testcounter counter
934931
testnamespace_testcounter{{component="testcomponent",endpoint="testendpoint",namespace="testnamespace"}} 123.456789
935932
# HELP testnamespace_testgauge A test gauge
936933
# TYPE testnamespace_testgauge gauge
937934
testnamespace_testgauge{{component="testcomponent",namespace="testnamespace"}} 50000
935+
# HELP testnamespace_testintcounter A test int counter
936+
# TYPE testnamespace_testintcounter counter
937+
testnamespace_testintcounter{{namespace="testnamespace"}} 12345
938938
"#
939939
);
940940

@@ -1015,9 +1015,6 @@ testhistogram_bucket{{le="10"}} 3
10151015
testhistogram_bucket{{le="+Inf"}} 3
10161016
testhistogram_sum 7.5
10171017
testhistogram_count 3
1018-
# HELP testintcounter A test int counter
1019-
# TYPE testintcounter counter
1020-
testintcounter{{namespace="testnamespace"}} 12345
10211018
# HELP testintgauge A test int gauge
10221019
# TYPE testintgauge gauge
10231020
testintgauge 42
@@ -1031,6 +1028,9 @@ testnamespace_testcounter{{component="testcomponent",endpoint="testendpoint",nam
10311028
# HELP testnamespace_testgauge A test gauge
10321029
# TYPE testnamespace_testgauge gauge
10331030
testnamespace_testgauge{{component="testcomponent",namespace="testnamespace"}} 50000
1031+
# HELP testnamespace_testintcounter A test int counter
1032+
# TYPE testnamespace_testintcounter counter
1033+
testnamespace_testintcounter{{namespace="testnamespace"}} 12345
10341034
"#
10351035
);
10361036

lib/runtime/tests/soak.rs

Lines changed: 68 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,11 @@
1313
// See the License for the specific language governing permissions and
1414
// limitations under the License.
1515

16+
// cargo test --test soak integration::main --features integration
17+
//!
18+
//! It will send a batch of requests to the runtime and measure the throughput.
19+
//!
20+
//! It will also measure the latency of the requests.
1621
#[cfg(feature = "integration")]
1722
mod integration {
1823

@@ -22,13 +27,17 @@ mod integration {
2227
logging,
2328
pipeline::{
2429
async_trait, network::Ingress, AsyncEngine, AsyncEngineContextProvider, Error, ManyOut,
25-
ResponseStream, SingleIn,
30+
PushRouter, ResponseStream, SingleIn,
2631
},
2732
protocols::annotated::Annotated,
28-
DistributedRuntime, ErrorContext, Result, Runtime, Worker,
33+
stream, DistributedRuntime, ErrorContext, Result, Runtime, Worker,
2934
};
3035
use futures::StreamExt;
31-
use std::{sync::Arc, time::Duration};
36+
use std::{
37+
sync::atomic::{AtomicU64, Ordering},
38+
sync::Arc,
39+
time::Duration,
40+
};
3241
use tokio::time::Instant;
3342

3443
#[test]
@@ -45,16 +54,29 @@ mod integration {
4554

4655
client.await??;
4756
distributed.shutdown();
48-
server.await??;
57+
let handler = server.await??;
58+
59+
// Print final backend counter value
60+
let final_count = handler.backend_counter.load(Ordering::Relaxed);
61+
println!(
62+
"Final RequestHandler backend_counter: {} requests processed",
63+
final_count
64+
);
4965

5066
Ok(())
5167
}
5268

53-
struct RequestHandler {}
69+
struct RequestHandler {
70+
backend_counter: Arc<AtomicU64>,
71+
normal_processing: bool,
72+
}
5473

5574
impl RequestHandler {
56-
fn new() -> Arc<Self> {
57-
Arc::new(Self {})
75+
fn new(normal_processing: bool) -> Arc<Self> {
76+
Arc::new(Self {
77+
backend_counter: Arc::new(AtomicU64::new(0)),
78+
normal_processing,
79+
})
5880
}
5981
}
6082

@@ -63,25 +85,39 @@ mod integration {
6385
async fn generate(&self, input: SingleIn<String>) -> Result<ManyOut<Annotated<String>>> {
6486
let (data, ctx) = input.into_parts();
6587

88+
// Increment backend counter
89+
self.backend_counter.fetch_add(1, Ordering::Relaxed);
90+
6691
let chars = data
6792
.chars()
6893
.map(|c| Annotated::from_data(c.to_string()))
6994
.collect::<Vec<_>>();
7095

71-
let stream = async_stream::stream! {
72-
for c in chars {
73-
yield c;
74-
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
75-
}
76-
};
77-
78-
Ok(ResponseStream::new(Box::pin(stream), ctx.context()))
96+
if self.normal_processing {
97+
let iter_stream = stream::iter(chars);
98+
Ok(ResponseStream::new(Box::pin(iter_stream), ctx.context()))
99+
} else {
100+
// delayed processing, just to saturate the queue
101+
let async_stream = async_stream::stream! {
102+
for c in chars {
103+
yield c;
104+
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
105+
}
106+
};
107+
Ok(ResponseStream::new(Box::pin(async_stream), ctx.context()))
108+
}
79109
}
80110
}
81111

82-
async fn backend(runtime: DistributedRuntime) -> Result<()> {
112+
async fn backend(runtime: DistributedRuntime) -> Result<Arc<RequestHandler>> {
113+
// get the normal processing setting from env (not delayed)
114+
let normal_processing =
115+
std::env::var("DYN_NORMAL_PROCESSING").unwrap_or("true".to_string());
116+
let normal_processing: bool = normal_processing.parse().unwrap_or(true);
117+
83118
// attach an ingress to an engine
84-
let ingress = Ingress::for_engine(RequestHandler::new())?;
119+
let handler = RequestHandler::new(normal_processing);
120+
let ingress = Ingress::for_engine(handler.clone())?;
85121

86122
// // make the ingress discoverable via a component service
87123
// // we must first create a service, then we can attach one more more endpoints
@@ -95,7 +131,9 @@ mod integration {
95131
.endpoint_builder()
96132
.handler(ingress)
97133
.start()
98-
.await
134+
.await?;
135+
136+
Ok(handler)
99137
}
100138

101139
async fn client(runtime: DistributedRuntime) -> Result<()> {
@@ -105,29 +143,32 @@ mod integration {
105143
humantime::parse_duration(&run_duration).unwrap_or(Duration::from_secs(60));
106144

107145
let batch_load = std::env::var("DYN_SOAK_BATCH_LOAD").unwrap_or("10000".to_string());
108-
let batch_load: usize = batch_load.parse().unwrap_or(10000);
146+
let batch_load: usize = batch_load.parse().unwrap_or(100);
109147

110148
let client = runtime
111149
.namespace(DEFAULT_NAMESPACE)?
112150
.component("backend")?
113151
.endpoint("generate")
114-
.client::<String, Annotated<String>>()
152+
.client()
115153
.await?;
116154

117155
client.wait_for_instances().await?;
118-
let client = Arc::new(client);
156+
let router =
157+
PushRouter::<String, Annotated<String>>::from_client(client, Default::default())
158+
.await?;
159+
let router = Arc::new(router);
119160

120161
let start = Instant::now();
121162
let mut count = 0;
122163

123164
loop {
124165
let mut tasks = Vec::new();
125166
for _ in 0..batch_load {
126-
let client = client.clone();
167+
let router = router.clone();
127168
tasks.push(tokio::spawn(async move {
128169
let mut stream = tokio::time::timeout(
129-
Duration::from_secs(30),
130-
client.random("hello world".to_string().into()),
170+
Duration::from_secs(5),
171+
router.random("hello world".to_string().into()),
131172
)
132173
.await
133174
.context("request timed out")??;
@@ -147,7 +188,9 @@ mod integration {
147188

148189
let elapsed = start.elapsed();
149190
count += batch_load;
150-
println!("elapsed: {:?}; count: {}", elapsed, count);
191+
if count % 1000 == 0 {
192+
println!("elapsed: {:?}; count: {}", elapsed, count);
193+
}
151194

152195
if elapsed > run_duration {
153196
println!("done");

0 commit comments

Comments
 (0)