diff --git a/lib/bindings/python/rust/llm/block_manager/vllm.rs b/lib/bindings/python/rust/llm/block_manager/vllm.rs index 12fa8b53c6..c41680eaef 100644 --- a/lib/bindings/python/rust/llm/block_manager/vllm.rs +++ b/lib/bindings/python/rust/llm/block_manager/vllm.rs @@ -47,7 +47,7 @@ fn _vllm_integration(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; - m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; Ok(()) diff --git a/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs b/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs index d58f123248..5e8ac592b6 100644 --- a/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs +++ b/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs @@ -30,7 +30,6 @@ use std::{ }; use tokio::sync::mpsc; use tokio; -use pyo3_async_runtimes; type VllmLocality = Logical; @@ -41,6 +40,7 @@ impl From for PyErr { } use dynamo_llm::recorder::Recorder; use tokio_util::sync::CancellationToken; +use anyhow; pub trait Leader: Send + Sync + std::fmt::Debug { @@ -49,29 +49,29 @@ pub trait Leader: Send + Sync + std::fmt::Debug { request_id: String, request_num_tokens: usize, num_computed_tokens: usize, - ) -> PyResult<(usize, bool)>; + ) -> anyhow::Result<(usize, bool)>; fn update_state_after_alloc( &mut self, request_id: String, block_ids: Vec, num_external_tokens: usize, - ) -> PyResult<()>; + ) -> anyhow::Result<()>; fn build_connector_metadata( &mut self, scheduler_output: SchedulerOutput, - ) -> PyResult>; + ) -> anyhow::Result>; fn request_finished( &mut self, request_id: String, block_ids: Vec, - ) -> PyResult; + ) -> anyhow::Result; fn has_slot(&self, request_id: String) -> bool; - fn create_slot(&mut self, request: KvbmRequest, tokens: Vec) -> PyResult<()>; + fn create_slot(&mut self, request: KvbmRequest, tokens: Vec) -> anyhow::Result<()>; } #[derive(Debug)] @@ -128,7 +128,7 @@ impl Leader for KvConnectorLeader { request_id: String, request_num_tokens: usize, num_computed_tokens: usize, - ) -> PyResult<(usize, bool)> { + ) -> anyhow::Result<(usize, bool)> { tracing::debug!( "request_num_tokens: {request_num_tokens}; num_computed_tokens: {num_computed_tokens}" ); @@ -136,8 +136,8 @@ impl Leader for KvConnectorLeader { // the number of device matched tokens should be less than or equal to the number of tokens in the request debug_assert!(num_computed_tokens % self.block_size == 0); - let shared_slot = self.slot_manager.get_slot(&request_id).map_err(to_pyerr)?; - let mut slot = shared_slot.lock().map_err(to_pyerr)?; + let shared_slot = self.slot_manager.get_slot(&request_id)?; + let mut slot = shared_slot.lock().map_err(|e| anyhow::anyhow!("Failed to lock slot: {}", e))?; // early exit if we cannot match full block if (slot.sequence().total_tokens() - num_computed_tokens) < self.block_size { @@ -171,7 +171,7 @@ impl Leader for KvConnectorLeader { request_id: String, block_ids: Vec, num_external_tokens: usize, - ) -> PyResult<()> { + ) -> anyhow::Result<()> { tracing::debug!( request_id, "num_device_blocks: {}; num_external_tokens: {}", @@ -179,8 +179,8 @@ impl Leader for KvConnectorLeader { num_external_tokens ); - let shared_slot = self.slot_manager.get_slot(&request_id).map_err(to_pyerr)?; - let mut slot = shared_slot.lock().map_err(to_pyerr)?; + let shared_slot = self.slot_manager.get_slot(&request_id)?; + let mut slot = shared_slot.lock().map_err(|e| anyhow::anyhow!("Failed to lock slot: {}", e))?; // we have not yet advanced the computed position, but now we can, since we have an indication that we have // necessary gpu blocks into which we will load the external tokens. @@ -210,7 +210,7 @@ impl Leader for KvConnectorLeader { pub fn build_connector_metadata( &mut self, scheduler_output: SchedulerOutput, - ) -> PyResult> { + ) -> anyhow::Result> { // the iteration counter is used to track the number of times we have built the connector metadata // all connetor operations have the iteration counter at which they were issued. // this allows operations to be lazily enqueued to the transfer engine @@ -233,8 +233,8 @@ impl Leader for KvConnectorLeader { // This is kind of a nice abstraction as it keeps the events simplier; however, we now create the request-slot // once for onboarding (this loop), then again for prefill/decode (new_requests loop). for request_id in onboarding_slots.iter() { - let shared_slot = self.slot_manager.get_slot(request_id).map_err(to_pyerr)?; - let mut slot = shared_slot.lock().map_err(to_pyerr)?; + let shared_slot = self.slot_manager.get_slot(request_id)?; + let mut slot = shared_slot.lock().map_err(|e| anyhow::anyhow!("Failed to lock slot: {}", e))?; md.create_slot(request_id.clone()); @@ -255,8 +255,8 @@ impl Leader for KvConnectorLeader { "request_id {request_id} not found in inflight_requests: " ); - let shared_slot = self.slot_manager.get_slot(request_id).map_err(to_pyerr)?; - let mut slot = shared_slot.lock().map_err(to_pyerr)?; + let shared_slot = self.slot_manager.get_slot(request_id)?; + let mut slot = shared_slot.lock().map_err(|e| anyhow::anyhow!("Failed to lock slot: {}", e))?; // inform the worker that a new request-slot should be created md.create_slot(new_req.request_id.clone()); @@ -296,8 +296,8 @@ impl Leader for KvConnectorLeader { "request_id {request_id} not found in inflight_requests: " ); - let shared_slot = self.slot_manager.get_slot(request_id).map_err(to_pyerr)?; - let mut slot = shared_slot.lock().map_err(to_pyerr)?; + let shared_slot = self.slot_manager.get_slot(request_id)?; + let mut slot = shared_slot.lock().map_err(|e| anyhow::anyhow!("Failed to lock slot: {}", e))?; let scheduled_tokens = *scheduler_output .num_scheduled_tokens @@ -322,16 +322,16 @@ impl Leader for KvConnectorLeader { } tracing::debug!("metadata: {md:#?}"); - serde_json::to_vec(&md).map_err(to_pyerr) + serde_json::to_vec(&md).map_err(|e| anyhow::anyhow!("Failed to serialize connector metadata: {}", e)) } - fn request_finished(&mut self, request_id: String, block_ids: Vec) -> PyResult { + fn request_finished(&mut self, request_id: String, block_ids: Vec) -> anyhow::Result { tracing::debug!("Request finished: {request_id}; block_ids: {block_ids:?}"); // grab the slot - let shared_slot = self.slot_manager.get_slot(&request_id).map_err(to_pyerr)?; + let shared_slot = self.slot_manager.get_slot(&request_id)?; // mark the slot as finished - let mut slot = shared_slot.lock().map_err(to_pyerr)?; + let mut slot = shared_slot.lock().map_err(|e| anyhow::anyhow!("Failed to lock slot: {}", e))?; slot.mark_as_finished(self.iteration_counter)?; // todo: allow the request to resolve when it should exit @@ -360,7 +360,7 @@ impl Leader for KvConnectorLeader { /// Create a new slot for the given request ID. /// This is used to create a new slot for the request. - fn create_slot(&mut self, request: KvbmRequest, tokens: Vec) -> PyResult<()> { + fn create_slot(&mut self, request: KvbmRequest, tokens: Vec) -> anyhow::Result<()> { self.slot_manager .create_slot(&request.request_id, tokens, request.salt_hash)?; @@ -413,7 +413,7 @@ impl PyKvConnectorLeader { request_num_tokens: usize, num_computed_tokens: usize, ) -> PyResult<(usize, bool)> { - self.connector_leader.get_num_new_matched_tokens(request_id, request_num_tokens, num_computed_tokens) + self.connector_leader.get_num_new_matched_tokens(request_id, request_num_tokens, num_computed_tokens).map_err(to_pyerr) } fn update_state_after_alloc( @@ -422,18 +422,18 @@ impl PyKvConnectorLeader { block_ids: Vec, num_external_tokens: usize, ) -> PyResult<()> { - self.connector_leader.update_state_after_alloc(request_id, block_ids, num_external_tokens) + self.connector_leader.update_state_after_alloc(request_id, block_ids, num_external_tokens).map_err(to_pyerr) } fn build_connector_metadata( &mut self, scheduler_output: SchedulerOutput, ) -> PyResult> { - self.connector_leader.build_connector_metadata(scheduler_output) + self.connector_leader.build_connector_metadata(scheduler_output).map_err(to_pyerr) } fn request_finished(&mut self, request_id: &str, block_ids: Vec) -> PyResult { - self.connector_leader.request_finished(request_id.to_string(), block_ids) + self.connector_leader.request_finished(request_id.to_string(), block_ids).map_err(to_pyerr) } fn has_slot(&self, request_id: &str) -> bool { @@ -441,6 +441,6 @@ impl PyKvConnectorLeader { } fn create_slot(&mut self, request: KvbmRequest, tokens: Vec) -> PyResult<()> { - self.connector_leader.create_slot(request, tokens) + self.connector_leader.create_slot(request, tokens).map_err(to_pyerr) } } diff --git a/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader/recorder.rs b/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader/recorder.rs index 8f1df72633..53bb737dd3 100644 --- a/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader/recorder.rs +++ b/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader/recorder.rs @@ -1,4 +1,5 @@ use super::*; +use anyhow; #[derive(Debug, Clone, Serialize, Deserialize)] @@ -108,9 +109,7 @@ impl KvConnectorLeaderRecorder { let output_path = "/tmp/records.jsonl"; tracing::info!("recording events to {}", output_path); - // Create recorder synchronously using pyo3 async runtime - let runtime = pyo3_async_runtimes::tokio::get_runtime(); - let recorder = runtime.block_on(async { + let recorder = drt.runtime().primary().block_on(async { Recorder::new(token, &output_path, None, None, None).await }).unwrap(); @@ -125,7 +124,7 @@ impl KvConnectorLeaderRecorder { let (unbounded_tx, unbounded_rx) = mpsc::unbounded_channel(); let recorder_tx = recorder.event_sender(); - let _ = runtime.spawn(Self::forward_unbounded_to_sender(unbounded_rx, recorder_tx)); + let _ = drt.runtime().primary().spawn(Self::forward_unbounded_to_sender(unbounded_rx, recorder_tx)); Self { _recorder: recorder, @@ -158,7 +157,7 @@ impl Leader for KvConnectorLeaderRecorder { request_id: String, request_num_tokens: usize, num_computed_tokens: usize, - ) -> PyResult<(usize, bool)> { + ) -> anyhow::Result<(usize, bool)> { let input_copy = GetNumNewMatchedTokensInput { request_id: request_id.clone(), request_num_tokens: request_num_tokens.clone(), @@ -183,7 +182,7 @@ impl Leader for KvConnectorLeaderRecorder { request_id: String, block_ids: Vec, num_external_tokens: usize, - ) -> PyResult<()> { + ) -> anyhow::Result<()> { let input_copy = UpdateStateAfterAllocInput { request_id: request_id.clone(), block_ids: block_ids.clone(), @@ -197,7 +196,7 @@ impl Leader for KvConnectorLeaderRecorder { fn build_connector_metadata( &mut self, scheduler_output: SchedulerOutput, - ) -> PyResult> { + ) -> anyhow::Result> { let input_copy = BuildConnectorMetaInput { scheduler_output: scheduler_output.clone(), }; @@ -210,7 +209,7 @@ impl Leader for KvConnectorLeaderRecorder { output } - fn request_finished(&mut self, request_id: String, block_ids: Vec) -> PyResult { + fn request_finished(&mut self, request_id: String, block_ids: Vec) -> anyhow::Result { let input_copy = RequestFinishedInput { request_id: request_id.clone(), block_ids: block_ids.clone(), @@ -239,7 +238,7 @@ impl Leader for KvConnectorLeaderRecorder { /// Create a new slot for the given request ID. /// This is used to create a new slot for the request. - fn create_slot(&mut self, request: KvbmRequest, tokens: Vec) -> PyResult<()> { + fn create_slot(&mut self, request: KvbmRequest, tokens: Vec) -> anyhow::Result<()> { let input_copy = CreateSlotInput { request: request.clone(), tokens: tokens.clone(), diff --git a/lib/bindings/python/rust/llm/block_manager/vllm/connector/worker.rs b/lib/bindings/python/rust/llm/block_manager/vllm/connector/worker.rs index 6bd58ad511..aba68bb599 100644 --- a/lib/bindings/python/rust/llm/block_manager/vllm/connector/worker.rs +++ b/lib/bindings/python/rust/llm/block_manager/vllm/connector/worker.rs @@ -20,8 +20,30 @@ use dynamo_llm::block_manager::distributed::{KvbmWorker, KvbmWorkerConfig}; use dynamo_llm::block_manager::storage::torch::TorchTensor; use dynamo_runtime::utils::task::CriticalTaskExecutionHandle; use dynamo_runtime::DistributedRuntime; +use anyhow; + +pub trait Worker: Send + Sync { + fn register_kv_caches( + &mut self, + num_device_blocks: usize, + page_size: usize, + device_id: usize, + dtype_width_bytes: usize, + kv_caches: Vec<(String, Arc)>, + ) -> anyhow::Result<()>; + + fn bind_connector_metadata(&mut self, metadata: Vec) -> anyhow::Result<()>; + + fn clear_connector_metadata(&mut self); + + fn save_kv_layer(&mut self, layer_name: String) -> anyhow::Result<()>; + + fn get_finished( + &mut self, + finished_requests: HashSet, + ) -> (HashSet, HashSet); +} -#[pyclass] pub struct KvConnectorWorker { drt: DistributedRuntime, kvbm_worker: OnceLock, @@ -47,10 +69,8 @@ pub struct KvConnectorWorker { layer_events: Vec, } -#[pymethods] impl KvConnectorWorker { - #[new] - fn new(py_drt: PyDistributedRuntime, vllm_worker_id: String) -> PyResult { + fn new(py_drt: PyDistributedRuntime, vllm_worker_id: String) -> anyhow::Result { let drt = py_drt.inner.clone(); let runtime = drt.runtime().primary(); @@ -64,8 +84,7 @@ impl KvConnectorWorker { drt.primary_token(), "kv-connector-scheduler-task", &runtime, - ) - .map_err(to_pyerr)? + )? .detach(); tracing::info!( @@ -88,34 +107,36 @@ impl KvConnectorWorker { layer_events: Vec::new(), }) } +} +impl Worker for KvConnectorWorker { /// Registers the KV caches with the KVBM worker. /// /// The Dynamo KVBM worker is lazily initialized when the first KV cache is registered. /// This process establishes a connection between all KVBM workers and the leader. - pub fn register_kv_caches( + fn register_kv_caches( &mut self, num_device_blocks: usize, page_size: usize, device_id: usize, dtype_width_bytes: usize, - kv_caches: Vec<(String, Py)>, + kv_caches: Vec<(String, Arc)>, raw_event_handles: Vec, - ) -> PyResult<()> { + ) -> anyhow::Result<()> { if self.kvbm_worker.get().is_some() { tracing::warn!("kvbm worker already registered"); - return Err(to_pyerr(anyhow::anyhow!("kvbm worker already registered"))); + return Err(anyhow::anyhow!("kvbm worker already registered")); } // Process kv_caches in layer execution order (already sorted by layer index) let mut vllm_tensors = Vec::new(); - let mut kv_cache_names = Vec::new(); - for (layer_name, torch_tensor) in kv_caches { - let vllm_tensor = Arc::new(VllmTensor::new(torch_tensor).map_err(to_pyerr)?); - tracing::debug!("Registering KV cache layer: {layer_name}, tensor: {vllm_tensor:?}"); + for (layer_name, vllm_tensor) in kv_caches { + tracing::trace!("Registering KV cache layer: {layer_name}, tensor: {vllm_tensor:?}"); + + // Store for later lookup by name + self.kv_caches.insert(layer_name, vllm_tensor.clone()); // Build ordered tensor list for worker config - kv_cache_names.push((layer_name, vllm_tensor.clone())); vllm_tensors.push(vllm_tensor as Arc); } @@ -132,8 +153,7 @@ impl KvConnectorWorker { .dtype_width_bytes(dtype_width_bytes) .barrier_id(get_barrier_id()) .scheduler_client(Some(self.transfer_client.clone())) - .build() - .map_err(to_pyerr)?; + .build()?; let worker = self .drt @@ -142,12 +162,11 @@ impl KvConnectorWorker { .block_on(async move { let worker = KvbmWorker::new(config).await?; anyhow::Ok(worker) - }) - .map_err(to_pyerr)?; + })?; self.kvbm_worker .set(worker) - .map_err(|_| to_pyerr(anyhow::anyhow!("failed to set kvbm worker")))?; + .map_err(|_| anyhow::anyhow!("failed to set kvbm worker"))?; Ok(()) } @@ -155,9 +174,9 @@ impl KvConnectorWorker { /// Loads the metadata from the leader. /// This action translates the metadata into a set of actions that the worker will perform. /// All actions much be assigned to a slot before [`KvConnectorWorker::clear_metadata`] is called. - pub fn bind_connector_metadata(&mut self, metadata: Vec) -> PyResult<()> { - // debug_assert!(!self.bound, "connector metadata already bound"); - let metadata: ConnectorMetadata = serde_json::from_slice(&metadata).map_err(to_pyerr)?; + fn bind_connector_metadata(&mut self, metadata: Vec) -> anyhow::Result<()> { + debug_assert!(!self.bound, "connector metadata already bound"); + let metadata: ConnectorMetadata = serde_json::from_slice(&metadata)?; self.bound = true; self.iteration = metadata.iteration; self.layers_complete = 0; @@ -166,7 +185,7 @@ impl KvConnectorWorker { "bound new metadata: {metadata:#?}" ); - self.connector.start_next_iteration().map_err(to_pyerr)?; + self.connector.start_next_iteration()?; debug_assert_eq!( self.connector.iteration(), @@ -185,7 +204,7 @@ impl KvConnectorWorker { for slot in metadata.new_slots { debug_assert!(!self.connector.has_slot(&slot), "slot already exists"); - self.connector.create_slot(slot).map_err(to_pyerr)?; + self.connector.create_slot(slot)?; } let mut onboarding_operations = Vec::new(); @@ -221,7 +240,7 @@ impl KvConnectorWorker { } /// Clears the connector metadata and marks the iteration as complete. - pub fn clear_connector_metadata(&mut self) { + fn clear_connector_metadata(&mut self) { tracing::debug!(iteration = self.iteration, "clearing connector metadata"); debug_assert!(self.bound, "connector metadata not bound"); self.bound = false; @@ -234,7 +253,7 @@ impl KvConnectorWorker { /// Trigger layer-wise completion signals. /// Trigger block-wise completion signals afer last layer. - pub fn save_kv_layer(&mut self, _layer_name: String, _kv_layer: Py) -> PyResult<()> { + fn save_kv_layer(&mut self, _layer_name: String) -> anyhow::Result<()> { self.layers_complete += 1; if self.layers_complete == self.kv_cache_names.len() { let offloading_operations = std::mem::take(&mut self.offloading_operations); @@ -250,7 +269,7 @@ impl KvConnectorWorker { Ok(()) } - pub fn get_finished( + fn get_finished( &mut self, finished_requests: HashSet, ) -> (HashSet, HashSet) { @@ -334,6 +353,61 @@ impl KvConnectorWorker { } } +#[pyclass] +pub struct PyKvConnectorWorker { + connector_worker: Box, +} + +#[pymethods] +impl PyKvConnectorWorker { + #[new] + #[pyo3(signature = (py_drt, vllm_worker_id))] + pub fn new(py_drt: PyDistributedRuntime, vllm_worker_id: String) -> PyResult { + let connector_worker: Box = Box::new(KvConnectorWorker::new(py_drt, vllm_worker_id).map_err(to_pyerr)?); + Ok(Self { connector_worker }) + } + + pub fn register_kv_caches( + &mut self, + num_device_blocks: usize, + page_size: usize, + device_id: usize, + dtype_width_bytes: usize, + kv_caches: Vec<(String, Py)>, + ) -> PyResult<()> { + // Convert Python tensors to Rust VllmTensor objects + let mut rust_kv_caches = Vec::new(); + for (layer_name, py_tensor) in kv_caches { + let vllm_tensor = Arc::new(VllmTensor::new(py_tensor).map_err(to_pyerr)?); + rust_kv_caches.push((layer_name, vllm_tensor)); + } + + self.connector_worker + .register_kv_caches(num_device_blocks, page_size, device_id, dtype_width_bytes, rust_kv_caches) + .map_err(to_pyerr) + } + + pub fn bind_connector_metadata(&mut self, metadata: Vec) -> PyResult<()> { + self.connector_worker.bind_connector_metadata(metadata).map_err(to_pyerr) + } + + pub fn clear_connector_metadata(&mut self) { + self.connector_worker.clear_connector_metadata() + } + + pub fn save_kv_layer(&mut self, layer_name: String, _kv_layer: Py) -> PyResult<()> { + // Note: kv_layer is not used in the current implementation + self.connector_worker.save_kv_layer(layer_name).map_err(to_pyerr) + } + + pub fn get_finished( + &mut self, + finished_requests: HashSet, + ) -> (HashSet, HashSet) { + self.connector_worker.get_finished(finished_requests) + } + + use cudarc::driver::sys::{ cuCtxGetCurrent, cuEventSynchronize, cudaError_enum, CUcontext, CUevent, }; diff --git a/lib/bindings/python/src/dynamo/llm/vllm_integration/rust.py b/lib/bindings/python/src/dynamo/llm/vllm_integration/rust.py index 3210dcaab0..2b985399de 100644 --- a/lib/bindings/python/src/dynamo/llm/vllm_integration/rust.py +++ b/lib/bindings/python/src/dynamo/llm/vllm_integration/rust.py @@ -16,7 +16,7 @@ BlockStates = getattr(_vllm_integration, "BlockStates") SlotUpdate = getattr(_vllm_integration, "SlotUpdate") - KvConnectorWorker = getattr(_vllm_integration, "KvConnectorWorker") + KvConnectorWorker = getattr(_vllm_integration, "PyKvConnectorWorker") KvConnectorLeader = getattr(_vllm_integration, "PyKvConnectorLeader") SchedulerOutput = getattr(_vllm_integration, "SchedulerOutput")