-
-
Notifications
You must be signed in to change notification settings - Fork 10.6k
[KV offload][5/N] Add CPUOffloadingSpec
#24251
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a significant new feature: CPU offloading for v1. The implementation is extensive, adding a new offloading framework with managers, specs, handlers, and a dedicated KV connector. The code is well-structured, with a clear separation of concerns between scheduler and worker logic. My review focuses on critical aspects of reliability and resource management, and I've identified a few high-impact issues that should be addressed to ensure the robustness of this new feature.
finished_recving = set() | ||
for job_id, success in self.worker.get_finished(): | ||
# we currently do not support job failures | ||
assert success |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using assert success
is risky as it will crash the worker process if an offloading transfer fails for any reason (e.g., I/O error, out of space). This can bring down the entire system. Failures should be handled more gracefully, for instance by logging a critical error and cleaning up the state for the failed job, without crashing the worker. While the comment indicates failures are not supported, using an assert
is not a robust way to enforce this in production code.
if store_output is None: | ||
logger.warning("Cannot store %s blocks", num_new_blocks) | ||
break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using break
here will prematurely exit the loop that iterates over scheduled requests. If prepare_store
fails for one request (by returning None
), subsequent requests in the same scheduling step will not be considered for offloading. This could lead to offloading starvation for other requests. You should use continue
to proceed to the next request in the loop.
if store_output is None: | |
logger.warning("Cannot store %s blocks", num_new_blocks) | |
break | |
if store_output is None: | |
logger.warning("Cannot store %s blocks", num_new_blocks) | |
continue |
vllm/distributed/kv_transfer/kv_connector/v1/offloading_connector.py
Outdated
Show resolved
Hide resolved
So each PR actually introduces a single new commit. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM .. needs the block hash type change of course (and I assume that affects the other PRs too...)
vllm/v1/offloading/cpu.py
Outdated
attn_backend = get_attn_backend( | ||
self.vllm_config.model_config.get_head_size(), | ||
self.vllm_config.model_config.dtype, | ||
self.vllm_config.cache_config.cache_dtype, | ||
self.gpu_block_size, | ||
self.vllm_config.model_config.is_attention_free, | ||
use_mla=self.vllm_config.model_config.use_mla) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like we should add a get_attn_backend_from_config(VllmConfig)
and use that both here and in the NixlConnector
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@orozery sorry I think some of the comments here actually apply to earlier commit
vllm/v1/offloading/cpu.py
Outdated
# allocate fresh blocks | ||
blocks: list[BlockStatus] = [] | ||
for _ in range(num_fresh_blocks): | ||
blocks.append(CPUBlockStatus(self.num_allocated_blocks)) | ||
self.num_allocated_blocks += 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So there will only be "fresh" blocks temporarily until the cache is full?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right
for blk_hash in request.block_hashes[self.block_size_factor - | ||
1::self.block_size_factor] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use islice?
return 0, False | ||
|
||
start_block_idx = num_computed_tokens // self.offloaded_block_size | ||
hits = self.manager.lookup(block_hashes[start_block_idx:]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use islice?
vllm/distributed/kv_transfer/kv_connector/v1/offloading_connector.py
Outdated
Show resolved
Hide resolved
block_hashes = [ | ||
blk_hash.hash_value | ||
for blk_hash in request.block_hashes[self.block_size_factor - | ||
1::self.block_size_factor] | ||
] | ||
assert len(block_hashes) >= num_blocks | ||
|
||
block_hashes = block_hashes[start_block_idx:num_blocks] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
block_hashes = [ | |
blk_hash.hash_value | |
for blk_hash in request.block_hashes[self.block_size_factor - | |
1::self.block_size_factor] | |
] | |
assert len(block_hashes) >= num_blocks | |
block_hashes = block_hashes[start_block_idx:num_blocks] | |
step = self.block_size_factor | |
block_hashes = [ | |
blk_hash.hash_value for blk_hash in itertools.islice( | |
request.block_hashes, | |
(start_block_idx + 1) * step - 1, | |
(num_blocks + 1) * step - 1, | |
step) | |
] |
src_specs = self.manager.prepare_load(block_hashes) | ||
dst_specs = [ | ||
GPULoadStoreSpec(gpu_block_id) | ||
for gpu_block_id in block_ids[num_computed_gpu_blocks:] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use islice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm changing GPULoadStoreSpec to construct a tensor of block IDs. It needs a list as input, so cannot use islice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may be misunderstanding but this comprehension is creating a list of GPULoadStoreSpec
objects, which can't be used to create a tensor directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh sorry I think I understand you are saying this is n/a after latest refactor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed it to create a single GPULoadStoreSpec object, wrapping a tensor.
vllm/distributed/kv_transfer/kv_connector/v1/offloading_connector.py
Outdated
Show resolved
Hide resolved
vllm/distributed/kv_transfer/kv_connector/v1/offloading_connector.py
Outdated
Show resolved
Hide resolved
vllm/distributed/kv_transfer/kv_connector/v1/offloading_connector.py
Outdated
Show resolved
Hide resolved
0734b51
to
54f0400
Compare
No ciflow labels are configured for this repo. |
@njhill I switched to using islice. |
Thanks @orozery I guess I don't follow the downside / extra complexity. E.g. hits = self.manager.lookup(block_hashes[start_block_idx:]) just becomes hits = self.manager.lookup(islice(block_hashes, None, start_block_idx)) If/where the sliced list is iterated over multiple times then I agree there may be more to consider. |
For example, this: And: |
54f0400
to
3c24ea9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Argh sorry these comments were sitting in pending, just realized I didn't submit it yesterday
src_specs = self.manager.prepare_load(block_hashes) | ||
dst_specs = [ | ||
GPULoadStoreSpec(gpu_block_id) | ||
for gpu_block_id in block_ids[num_computed_gpu_blocks:] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh sorry I think I understand you are saying this is n/a after latest refactor
3c24ea9
to
111fae9
Compare
This pull request has merge conflicts that must be resolved before it can be |
111fae9
to
7b178bb
Compare
This commit registers a new OffloadingSpec to add CPU offloading support to the OffloadingConnector. Signed-off-by: Or Ozeri <oro@il.ibm.com>
7b178bb
to
6519656
Compare
@njhill I've added a small e2e test + small example in the docs |
|
||
|
||
@pytest.mark.parametrize("cpu_block_size", CPU_BLOCK_SIZES) | ||
def test_cpu_offloading(cpu_block_size: int) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way to add some assertions to the test such that it will fail if the offload is not working? Should probably also verify correctness in conjunction with this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a correctness unit test for the transfer function (test_cpu_gpu.py
).
Also there is a correctness unit test that the offloading connector generates the correct transfer addresses of the GPU and the offloaded medium.
I don't know how to we can test correctness e2e.
Currently the test here just checks that prompt generation does not crash when using cpu offloading.
It does not verify that any offloading actually occurs.
One way we can verify this is by adding a kv_events_config
(like in test_kv_cache_events
) that will check for KVEvents with the CPU medium.
I actually started coding that but saw that it is a bit cumbersome, so I decided to defer this to see if others think it's worthwhile.
Another option is to verify latency decreases when we're supposed to hit the cpu cache (after resetting the GPU prefix cache).
We can decrease the variance by, say, repeat this 100 times and verify that at least 70 times the latency decreased.
This will actually be easy to implement (comparing to the KVEvents test).
My concern is that even when repeating the test multiple (e.g. 100) times, it can still be flakey.
Your thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @orozery yes I was thinking perhaps at least some kind of latency comparison but I agree timing tests are fragile / generally not a good idea. If the magnitude of the difference is large enough perhaps it wouldn't need so many attempts, maybe just a handful?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Merging this to ensure that it makes the release but we might want to think a bit more about the e2e CI tests.
Thanks again for all of your hard work @orozery!
Signed-off-by: Or Ozeri <oro@il.ibm.com>
Signed-off-by: Or Ozeri <oro@il.ibm.com> Signed-off-by: charlifu <charlifu@amd.com>
Signed-off-by: Or Ozeri <oro@il.ibm.com> Signed-off-by: yewentao256 <zhyanwentao@126.com>
@orozery how can I debug if the offload connector is being used?
|
You can check the vllm logs which will include logs from the connector. |
Signed-off-by: Or Ozeri <oro@il.ibm.com> Signed-off-by: gaojc <1055866782@qq.com>
Signed-off-by: Or Ozeri <oro@il.ibm.com> Signed-off-by: xuebwang-amd <xuebwang@amd.com>
Signed-off-by: Or Ozeri <oro@il.ibm.com>
This is the final PR enabling CPU offloading in v1.
Concludes RFC #19854.
Depends on #20075, #21448, #22595.