-
Notifications
You must be signed in to change notification settings - Fork 676
feat: DIS-373 dynamo KVBM connector API integration with TRTLLM #2440
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
… ryan/connector-dev
Signed-off-by: Ryan Olson <ryanolson@users.noreply.github.com> Co-authored-by: Olga Andreeva <124622579+oandreeva-nv@users.noreply.github.com>
…unexpected call to get_num_new_matched_tokens
fix fix fix
721abbe to
d192dfc
Compare
| .drt(drt.inner().clone()) | ||
| .host_blocks_config(get_blocks_config(CPU_CACHE, CPU_CACHE_OVERRIDE)) | ||
| .disk_blocks_config(get_blocks_config(DISK_CACHE, DISK_CACHE_OVERRIDE)) | ||
| .bytes_per_block_overriden(bytes_per_block) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Supporting two separate code paths here seems a bit ugly. Might be simpler to remove the bytes_per_block arg in the bindings and do the double-barrier thing in both vLLM and TRTLLM
| use std::collections::HashSet; | ||
| use anyhow; | ||
|
|
||
| pub trait Leader: Send + Sync + std::fmt::Debug { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why have a trait here? AFAIK, we'd only have 1 implementation of this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will import from leader.rs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, actually I find I cannot really import yet, since there are actually some difference between the two traits, some args and arg types are slightly different.
| ); | ||
|
|
||
| // the number of device matched tokens should be less than or equal to the number of tokens in the request | ||
| debug_assert!(num_computed_tokens % self.block_size == 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The TRTLLM KV cache connector can match partial blocks, so this assertion won't always work. See https://github.com/NVIDIA/TensorRT-LLM/blob/69574ad73078656ad0530559888552e3a0cd51e2/cpp/tensorrt_llm/batch_manager/kvCacheManager.cpp#L395
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To simplify things, we could just immediately return 0, false if it's not divisible
|
|
||
| // return the number of external tokens that are ready for onboarding | ||
| // we always return true here as we always asynchronously onboard matched blocks | ||
| if let SlotState::OnboardStaged(num_external_tokens) = slot.state() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not specific to this implementation, but always onboarding asynchronously may not be worthwhile. Not something we need to worry about now, but certainly need to keep that in mind longer term
| } | ||
| } | ||
|
|
||
| /// Note: TRTLLM will not provide any scheduler output data for requests that are onboarding. it is entirely |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. With my current trtllm implementation, we only include a request in the scheduler output if at least 1 token is scheduled. So if we onboard with async=False, it will be included, but with async=True, it won't be included.
|
|
||
| if let Some(&num_external_tokens) = self.inflight_request_to_num_external_tokens.get(&request_id) { | ||
| if num_external_tokens > 0 { | ||
| let num_computed_tokens = block_ids.len() * self.block_size - num_external_tokens; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem right. Instead of block_ids.len() * block_size we should be accessing the context_current_position field on the LlmRequest object. block_ids is a list of all blocks allocated for the entire prefill, and is independent of the amount of device or connector cache hits.
| .get(request_id) | ||
| .unwrap_or(&0); | ||
|
|
||
| slot.apply_scheduler_output(&[], &[], new_req.num_computed_tokens, scheduled_tokens)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we ignoring the new token and block ids here?
| raw_event_handles, | ||
| ) | ||
|
|
||
| def bind_connector_meta(self, metadata: object): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably not great to be overriding bind_connector_meta, given that it isn't an abstract method to be overridden.
| .map_err(to_pyerr) | ||
| } | ||
|
|
||
| pub fn build_connector_meta(&mut self, metadata: Vec<u8>) -> PyResult<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a typo? Should this be bind_connector_meta?
|
|
||
|
|
||
| class DynamoKVBMConnectorWorker(KvCacheConnectorWorker): | ||
| def __init__(self, executor_config: ExecutorConfig, **kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TRTLLM will never instantiate this with kwargs.
| } | ||
|
|
||
| fn compute_num_blocks(num_blocks_config: &KvbmLeaderNumBlocksConfig, bytes_per_block: usize) -> usize { | ||
| if num_blocks_config.is_overriden { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is_overriden should be a computed property that checks if num_blocks_overriden is 0. Alternatively, you could remove is_overriden entirely and just have a get_num_blocks method.
| .unwrap(); | ||
|
|
||
| tracing::info!("Leader barrier synced with {} workers", config.world_size); | ||
| let mut bytes_per_block = worker_data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be summing this across all workers, and directly use that as our bytes per block value. With tp, this approach would work, but we shouldn't assume that all are the same.
d5bc4e5 to
0aad185
Compare
Overview:
dynamo KVBM connector API integration with TRTLLM
Details:
The PR is based on the ongoing changes from NVIDIA/TensorRT-LLM#6488, which added TRT-LLM connector API compatibility.
Changes in this PR:
Worker → Leader: send bytes_per_block.
Leader → Worker: send num_host_blocks and num_disk_blocks.
The only big change is that when the leader calls update_state_after_alloc, no num_external_tokens is passed to the function. For now, a HashMap is used to track each request’s num_external_tokens when calling get_num_new_matched_tokens.
Issues:
Where should the reviewer start?
Related Issues: (use one of the action keywords Closes / Fixes / Resolves / Relates to)