Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/bindings/python/rust/llm/block_manager/vllm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ fn _vllm_integration(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<BlockStates>()?;
m.add_class::<SlotUpdate>()?;

m.add_class::<connector::worker::KvConnectorWorker>()?;
m.add_class::<connector::worker::PyKvConnectorWorker>()?;
m.add_class::<connector::leader::PyKvConnectorLeader>()?;
m.add_class::<connector::SchedulerOutput>()?;
Ok(())
Expand Down
58 changes: 29 additions & 29 deletions lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use std::{
};
use tokio::sync::mpsc;
use tokio;
use pyo3_async_runtimes;

type VllmLocality = Logical<DistributedLeaderWorkerResources>;

Expand All @@ -41,6 +40,7 @@ impl From<SlotError> for PyErr {
}
use dynamo_llm::recorder::Recorder;
use tokio_util::sync::CancellationToken;
use anyhow;


pub trait Leader: Send + Sync + std::fmt::Debug {
Expand All @@ -49,29 +49,29 @@ pub trait Leader: Send + Sync + std::fmt::Debug {
request_id: String,
request_num_tokens: usize,
num_computed_tokens: usize,
) -> PyResult<(usize, bool)>;
) -> anyhow::Result<(usize, bool)>;

fn update_state_after_alloc(
&mut self,
request_id: String,
block_ids: Vec<BlockId>,
num_external_tokens: usize,
) -> PyResult<()>;
) -> anyhow::Result<()>;

fn build_connector_metadata(
&mut self,
scheduler_output: SchedulerOutput,
) -> PyResult<Vec<u8>>;
) -> anyhow::Result<Vec<u8>>;

fn request_finished(
&mut self,
request_id: String,
block_ids: Vec<BlockId>,
) -> PyResult<bool>;
) -> anyhow::Result<bool>;

fn has_slot(&self, request_id: String) -> bool;

fn create_slot(&mut self, request: KvbmRequest, tokens: Vec<u32>) -> PyResult<()>;
fn create_slot(&mut self, request: KvbmRequest, tokens: Vec<u32>) -> anyhow::Result<()>;
}

#[derive(Debug)]
Expand Down Expand Up @@ -128,16 +128,16 @@ impl Leader for KvConnectorLeader {
request_id: String,
request_num_tokens: usize,
num_computed_tokens: usize,
) -> PyResult<(usize, bool)> {
) -> anyhow::Result<(usize, bool)> {
tracing::debug!(
"request_num_tokens: {request_num_tokens}; num_computed_tokens: {num_computed_tokens}"
);

// the number of device matched tokens should be less than or equal to the number of tokens in the request
debug_assert!(num_computed_tokens % self.block_size == 0);

let shared_slot = self.slot_manager.get_slot(&request_id).map_err(to_pyerr)?;
let mut slot = shared_slot.lock().map_err(to_pyerr)?;
let shared_slot = self.slot_manager.get_slot(&request_id)?;
let mut slot = shared_slot.lock().map_err(|e| anyhow::anyhow!("Failed to lock slot: {}", e))?;

// early exit if we cannot match full block
if (slot.sequence().total_tokens() - num_computed_tokens) < self.block_size {
Expand Down Expand Up @@ -171,16 +171,16 @@ impl Leader for KvConnectorLeader {
request_id: String,
block_ids: Vec<BlockId>,
num_external_tokens: usize,
) -> PyResult<()> {
) -> anyhow::Result<()> {
tracing::debug!(
request_id,
"num_device_blocks: {}; num_external_tokens: {}",
block_ids.len(),
num_external_tokens
);

let shared_slot = self.slot_manager.get_slot(&request_id).map_err(to_pyerr)?;
let mut slot = shared_slot.lock().map_err(to_pyerr)?;
let shared_slot = self.slot_manager.get_slot(&request_id)?;
let mut slot = shared_slot.lock().map_err(|e| anyhow::anyhow!("Failed to lock slot: {}", e))?;

// we have not yet advanced the computed position, but now we can, since we have an indication that we have
// necessary gpu blocks into which we will load the external tokens.
Expand Down Expand Up @@ -210,7 +210,7 @@ impl Leader for KvConnectorLeader {
pub fn build_connector_metadata(
&mut self,
scheduler_output: SchedulerOutput,
) -> PyResult<Vec<u8>> {
) -> anyhow::Result<Vec<u8>> {
// the iteration counter is used to track the number of times we have built the connector metadata
// all connetor operations have the iteration counter at which they were issued.
// this allows operations to be lazily enqueued to the transfer engine
Expand All @@ -233,8 +233,8 @@ impl Leader for KvConnectorLeader {
// This is kind of a nice abstraction as it keeps the events simplier; however, we now create the request-slot
// once for onboarding (this loop), then again for prefill/decode (new_requests loop).
for request_id in onboarding_slots.iter() {
let shared_slot = self.slot_manager.get_slot(request_id).map_err(to_pyerr)?;
let mut slot = shared_slot.lock().map_err(to_pyerr)?;
let shared_slot = self.slot_manager.get_slot(request_id)?;
let mut slot = shared_slot.lock().map_err(|e| anyhow::anyhow!("Failed to lock slot: {}", e))?;

md.create_slot(request_id.clone());

Expand All @@ -255,8 +255,8 @@ impl Leader for KvConnectorLeader {
"request_id {request_id} not found in inflight_requests: "
);

let shared_slot = self.slot_manager.get_slot(request_id).map_err(to_pyerr)?;
let mut slot = shared_slot.lock().map_err(to_pyerr)?;
let shared_slot = self.slot_manager.get_slot(request_id)?;
let mut slot = shared_slot.lock().map_err(|e| anyhow::anyhow!("Failed to lock slot: {}", e))?;

// inform the worker that a new request-slot should be created
md.create_slot(new_req.request_id.clone());
Expand Down Expand Up @@ -296,8 +296,8 @@ impl Leader for KvConnectorLeader {
"request_id {request_id} not found in inflight_requests: "
);

let shared_slot = self.slot_manager.get_slot(request_id).map_err(to_pyerr)?;
let mut slot = shared_slot.lock().map_err(to_pyerr)?;
let shared_slot = self.slot_manager.get_slot(request_id)?;
let mut slot = shared_slot.lock().map_err(|e| anyhow::anyhow!("Failed to lock slot: {}", e))?;

let scheduled_tokens = *scheduler_output
.num_scheduled_tokens
Expand All @@ -322,16 +322,16 @@ impl Leader for KvConnectorLeader {
}

tracing::debug!("metadata: {md:#?}");
serde_json::to_vec(&md).map_err(to_pyerr)
serde_json::to_vec(&md).map_err(|e| anyhow::anyhow!("Failed to serialize connector metadata: {}", e))
}

fn request_finished(&mut self, request_id: String, block_ids: Vec<BlockId>) -> PyResult<bool> {
fn request_finished(&mut self, request_id: String, block_ids: Vec<BlockId>) -> anyhow::Result<bool> {
tracing::debug!("Request finished: {request_id}; block_ids: {block_ids:?}");
// grab the slot
let shared_slot = self.slot_manager.get_slot(&request_id).map_err(to_pyerr)?;
let shared_slot = self.slot_manager.get_slot(&request_id)?;

// mark the slot as finished
let mut slot = shared_slot.lock().map_err(to_pyerr)?;
let mut slot = shared_slot.lock().map_err(|e| anyhow::anyhow!("Failed to lock slot: {}", e))?;
slot.mark_as_finished(self.iteration_counter)?;

// todo: allow the request to resolve when it should exit
Expand Down Expand Up @@ -360,7 +360,7 @@ impl Leader for KvConnectorLeader {

/// Create a new slot for the given request ID.
/// This is used to create a new slot for the request.
fn create_slot(&mut self, request: KvbmRequest, tokens: Vec<u32>) -> PyResult<()> {
fn create_slot(&mut self, request: KvbmRequest, tokens: Vec<u32>) -> anyhow::Result<()> {
self.slot_manager
.create_slot(&request.request_id, tokens, request.salt_hash)?;

Expand Down Expand Up @@ -413,7 +413,7 @@ impl PyKvConnectorLeader {
request_num_tokens: usize,
num_computed_tokens: usize,
) -> PyResult<(usize, bool)> {
self.connector_leader.get_num_new_matched_tokens(request_id, request_num_tokens, num_computed_tokens)
self.connector_leader.get_num_new_matched_tokens(request_id, request_num_tokens, num_computed_tokens).map_err(to_pyerr)
}

fn update_state_after_alloc(
Expand All @@ -422,25 +422,25 @@ impl PyKvConnectorLeader {
block_ids: Vec<BlockId>,
num_external_tokens: usize,
) -> PyResult<()> {
self.connector_leader.update_state_after_alloc(request_id, block_ids, num_external_tokens)
self.connector_leader.update_state_after_alloc(request_id, block_ids, num_external_tokens).map_err(to_pyerr)
}

fn build_connector_metadata(
&mut self,
scheduler_output: SchedulerOutput,
) -> PyResult<Vec<u8>> {
self.connector_leader.build_connector_metadata(scheduler_output)
self.connector_leader.build_connector_metadata(scheduler_output).map_err(to_pyerr)
}

fn request_finished(&mut self, request_id: &str, block_ids: Vec<BlockId>) -> PyResult<bool> {
self.connector_leader.request_finished(request_id.to_string(), block_ids)
self.connector_leader.request_finished(request_id.to_string(), block_ids).map_err(to_pyerr)
}

fn has_slot(&self, request_id: &str) -> bool {
self.connector_leader.has_slot(request_id.to_string())
}

fn create_slot(&mut self, request: KvbmRequest, tokens: Vec<u32>) -> PyResult<()> {
self.connector_leader.create_slot(request, tokens)
self.connector_leader.create_slot(request, tokens).map_err(to_pyerr)
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::*;
use anyhow;


#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -108,9 +109,7 @@ impl KvConnectorLeaderRecorder {
let output_path = "/tmp/records.jsonl";
tracing::info!("recording events to {}", output_path);

// Create recorder synchronously using pyo3 async runtime
let runtime = pyo3_async_runtimes::tokio::get_runtime();
let recorder = runtime.block_on(async {
let recorder = drt.runtime().primary().block_on(async {
Recorder::new(token, &output_path, None, None, None).await
}).unwrap();

Expand All @@ -125,7 +124,7 @@ impl KvConnectorLeaderRecorder {
let (unbounded_tx, unbounded_rx) = mpsc::unbounded_channel();
let recorder_tx = recorder.event_sender();

let _ = runtime.spawn(Self::forward_unbounded_to_sender(unbounded_rx, recorder_tx));
let _ = drt.runtime().primary().spawn(Self::forward_unbounded_to_sender(unbounded_rx, recorder_tx));

Self {
_recorder: recorder,
Expand Down Expand Up @@ -158,7 +157,7 @@ impl Leader for KvConnectorLeaderRecorder {
request_id: String,
request_num_tokens: usize,
num_computed_tokens: usize,
) -> PyResult<(usize, bool)> {
) -> anyhow::Result<(usize, bool)> {
let input_copy = GetNumNewMatchedTokensInput {
request_id: request_id.clone(),
request_num_tokens: request_num_tokens.clone(),
Expand All @@ -183,7 +182,7 @@ impl Leader for KvConnectorLeaderRecorder {
request_id: String,
block_ids: Vec<BlockId>,
num_external_tokens: usize,
) -> PyResult<()> {
) -> anyhow::Result<()> {
let input_copy = UpdateStateAfterAllocInput {
request_id: request_id.clone(),
block_ids: block_ids.clone(),
Expand All @@ -197,7 +196,7 @@ impl Leader for KvConnectorLeaderRecorder {
fn build_connector_metadata(
&mut self,
scheduler_output: SchedulerOutput,
) -> PyResult<Vec<u8>> {
) -> anyhow::Result<Vec<u8>> {
let input_copy = BuildConnectorMetaInput {
scheduler_output: scheduler_output.clone(),
};
Expand All @@ -210,7 +209,7 @@ impl Leader for KvConnectorLeaderRecorder {
output
}

fn request_finished(&mut self, request_id: String, block_ids: Vec<BlockId>) -> PyResult<bool> {
fn request_finished(&mut self, request_id: String, block_ids: Vec<BlockId>) -> anyhow::Result<bool> {
let input_copy = RequestFinishedInput {
request_id: request_id.clone(),
block_ids: block_ids.clone(),
Expand Down Expand Up @@ -239,7 +238,7 @@ impl Leader for KvConnectorLeaderRecorder {

/// Create a new slot for the given request ID.
/// This is used to create a new slot for the request.
fn create_slot(&mut self, request: KvbmRequest, tokens: Vec<u32>) -> PyResult<()> {
fn create_slot(&mut self, request: KvbmRequest, tokens: Vec<u32>) -> anyhow::Result<()> {
let input_copy = CreateSlotInput {
request: request.clone(),
tokens: tokens.clone(),
Expand Down
Loading
Loading