Skip to content

Commit bc6cd7e

Browse files
ryanolsonnnshah1
authored andcommitted
feat: kvbm transfer context v2 (dis-598) (#2873)
Signed-off-by: Ryan Olson <ryanolson@users.noreply.github.com> Signed-off-by: nnshah1 <neelays@nvidia.com>
1 parent e15437f commit bc6cd7e

File tree

4 files changed

+449
-2
lines changed

4 files changed

+449
-2
lines changed

lib/llm/Cargo.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@ integration = ["dynamo-runtime/integration"]
4343
name = "tokenizer"
4444
harness = false
4545

46+
[[bench]]
47+
name = "transfer_context_v2"
48+
harness = false
49+
required-features = ["block-manager", "testing-cuda"]
4650
[dependencies]
4751
# repo
4852
dynamo-runtime = { workspace = true }
@@ -175,4 +179,4 @@ aligned-vec = "0.6.4"
175179
lazy_static = "1.4"
176180

177181
[build-dependencies]
178-
tonic-build = { version = "0.13.1"}
182+
tonic-build = { version = "0.13.1"}
Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
16+
#[cfg(feature = "testing-cuda")]
17+
mod benchmarks {
18+
use std::sync::Arc;
19+
20+
use criterion::{BenchmarkId, Criterion, criterion_group};
21+
use cudarc::driver::{CudaContext, CudaStream};
22+
use nixl_sys;
23+
use tokio::runtime::Runtime;
24+
use tokio_util::task::TaskTracker;
25+
26+
use dynamo_llm::block_manager::block::transfer::context;
27+
28+
struct BenchmarkRuntime {
29+
_runtime: Runtime,
30+
handle: tokio::runtime::Handle,
31+
stream: Arc<CudaStream>,
32+
nixl_agent: Arc<Option<nixl_sys::Agent>>,
33+
}
34+
35+
impl BenchmarkRuntime {
36+
fn new() -> Self {
37+
let runtime = Runtime::new().expect("Failed to create benchmark runtime");
38+
let handle = runtime.handle().clone();
39+
40+
let cuda_ctx = Arc::new(CudaContext::new(0).expect("Failed to create CUDA context"));
41+
let stream = cuda_ctx.default_stream();
42+
let nixl_agent = Arc::new(None);
43+
44+
Self {
45+
_runtime: runtime,
46+
handle,
47+
stream,
48+
nixl_agent,
49+
}
50+
}
51+
52+
fn create_transfer_context(&self) -> context::v2::TransferContext {
53+
context::v2::TransferContext::new(
54+
self.nixl_agent.clone(),
55+
self.stream.clone(),
56+
self.handle.clone(),
57+
)
58+
}
59+
}
60+
61+
/// Benchmark blocking synchronization in tight loop
62+
/// This measures the baseline performance of direct CUDA event sync
63+
fn bench_blocking(c: &mut Criterion) {
64+
let runtime = BenchmarkRuntime::new();
65+
let ctx = runtime.create_transfer_context();
66+
67+
let mut group = c.benchmark_group("blocking_sync");
68+
group.warm_up_time(std::time::Duration::from_millis(500));
69+
group.measurement_time(std::time::Duration::from_secs(3));
70+
71+
group.bench_function("sync", |b| {
72+
b.iter(|| {
73+
let event = ctx.record_event().unwrap();
74+
event.synchronize_blocking().unwrap();
75+
})
76+
});
77+
78+
group.finish();
79+
}
80+
81+
/// Benchmark single-threaded async synchronization
82+
/// This measures only the tokio spawn_blocking overhead vs direct blocking
83+
fn bench_async_single(c: &mut Criterion) {
84+
let runtime = BenchmarkRuntime::new();
85+
let ctx = runtime.create_transfer_context();
86+
87+
let mut group = c.benchmark_group("async_sync");
88+
group.warm_up_time(std::time::Duration::from_millis(500));
89+
group.measurement_time(std::time::Duration::from_secs(3));
90+
91+
group.bench_function("sync", |b| {
92+
b.iter(|| {
93+
runtime._runtime.block_on(async {
94+
let event = ctx.record_event().unwrap();
95+
event.synchronize().await.unwrap();
96+
})
97+
})
98+
});
99+
100+
group.finish();
101+
}
102+
103+
/// Benchmark concurrent async synchronization at different scales
104+
/// This shows where async becomes beneficial due to parallelism
105+
fn bench_concurrent_async(c: &mut Criterion) {
106+
let runtime = BenchmarkRuntime::new();
107+
let mut group = c.benchmark_group("concurrent_async");
108+
group.warm_up_time(std::time::Duration::from_millis(500));
109+
group.measurement_time(std::time::Duration::from_secs(3));
110+
111+
// Test different concurrency levels
112+
for concurrency in [1, 5, 10, 25, 50, 100].iter() {
113+
group.bench_with_input(
114+
BenchmarkId::new("concurrent", concurrency),
115+
concurrency,
116+
|b, &concurrency| {
117+
let ctx = runtime.create_transfer_context();
118+
b.iter(|| {
119+
runtime._runtime.block_on(async {
120+
// Spawn concurrent tasks using TaskTracker
121+
let tracker = TaskTracker::new();
122+
123+
for _ in 0..concurrency {
124+
let ctx_clone = ctx.clone();
125+
tracker.spawn(async move {
126+
let event = ctx_clone.record_event().unwrap();
127+
event.synchronize().await.unwrap();
128+
});
129+
}
130+
131+
// Wait for all tasks to complete
132+
tracker.close();
133+
tracker.wait().await;
134+
});
135+
});
136+
},
137+
);
138+
}
139+
140+
group.finish();
141+
}
142+
143+
/// Benchmark throughput: events per second at different concurrency levels
144+
fn bench_throughput(c: &mut Criterion) {
145+
let runtime = BenchmarkRuntime::new();
146+
let mut group = c.benchmark_group("throughput");
147+
group.sample_size(50); // Fewer samples for throughput tests
148+
group.warm_up_time(std::time::Duration::from_millis(500));
149+
group.measurement_time(std::time::Duration::from_secs(3));
150+
151+
for concurrency in [1, 10, 50].iter() {
152+
let events_per_task = 10; // Process multiple events per task
153+
154+
group.bench_with_input(
155+
BenchmarkId::new("events_per_sec", concurrency),
156+
concurrency,
157+
|b, &concurrency| {
158+
let ctx = runtime.create_transfer_context();
159+
b.iter(|| {
160+
runtime._runtime.block_on(async {
161+
let tracker = TaskTracker::new();
162+
163+
for _ in 0..concurrency {
164+
let ctx_clone = ctx.clone();
165+
tracker.spawn(async move {
166+
// Process multiple events per task
167+
for _ in 0..events_per_task {
168+
let event = ctx_clone.record_event().unwrap();
169+
event.synchronize().await.unwrap();
170+
}
171+
});
172+
}
173+
174+
tracker.close();
175+
tracker.wait().await;
176+
});
177+
});
178+
},
179+
);
180+
}
181+
182+
group.finish();
183+
}
184+
185+
criterion_group!(
186+
benches,
187+
// Core comparison benchmarks
188+
bench_blocking,
189+
bench_async_single,
190+
// Concurrency benchmarks
191+
bench_concurrent_async,
192+
bench_throughput
193+
);
194+
}
195+
196+
#[cfg(feature = "testing-cuda")]
197+
criterion::criterion_main!(benchmarks::benches);
198+
199+
#[cfg(not(feature = "testing-cuda"))]
200+
fn main() {
201+
println!(
202+
"Benchmarks require 'testing-cuda' feature. Run with: cargo bench --features testing-cuda"
203+
);
204+
}

lib/llm/src/block_manager/block/transfer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
// See the License for the specific language governing permissions and
1414
// limitations under the License.
1515

16-
mod context;
16+
pub mod context;
1717
mod cuda;
1818
mod memcpy;
1919
mod nixl;

0 commit comments

Comments
 (0)