-
-
Notifications
You must be signed in to change notification settings - Fork 11.1k
[Core] Encoder separation for Encode-Prefill-Decode Disaggregation #21740
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
This pull request has merge conflicts that must be resolved before it can be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a significant architectural change for disaggregating the vision encoder from the main model execution, which is a great step towards improving performance for multimodal models. The implementation is comprehensive, touching on scheduling, model execution, and communication between new services. My review focuses on ensuring the robustness and correctness of the new components, particularly around error handling, concurrency, and resource management. I've identified a few critical issues related to hardcoded values and error handling that could impact stability and correctness in a production environment. Addressing these will make the new architecture more resilient.
examples/online_serving/encoder_separation/parallel_separated_encoder_request.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The _recv_encoder_cache method currently converts the received numpy array to a torch tensor and moves it to a hardcoded device cuda:0. This is not flexible and will fail in multi-GPU or non-standard device configurations. This logic should be moved to the caller, which is aware of the correct device. The connector should be responsible only for data transfer, not device placement.
req_id, input_id, pos_info_dict, encoder_cache_numpy = \
pickle.loads(pickled_data)
pos_info = dict_to_pos_info(pos_info_dict)
callback(req_id, input_id, pos_info, encoder_cache_numpy)|
👋 Hi! Thank you for contributing to the vLLM project. 💬 Join our developer Slack at https://slack.vllm.ai to discuss your PR in #pr-reviews, coordinate on features in #feat- channels, or join special interest groups in #sig- channels. Just a reminder: PRs would not trigger full CI run by default. Instead, it would only run Once the PR is approved and ready to go, your PR reviewer(s) can run CI to test the changes comprehensively before merging. To run CI, PR reviewers can either: Add 🚀 |
|
This pull request has merge conflicts that must be resolved before it can be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a significant architectural change to disaggregate the MLLM encoder, which is a crucial step for flexible and efficient distributed inference. The implementation is comprehensive, with changes to the scheduler, model runner, and new components like connectors and wrappers. The documentation is thorough. My review has identified several high-severity issues that could affect the functionality and stability of the new feature. These include bugs in the example launch scripts, a fragile data parsing method in the Redis connector, a potential memory leak in the preallocator, and a concurrency issue in the scheduler's preallocation logic. Addressing these points will improve the robustness of this new architecture.
| until curl -s localhost:$port/v1/chat/completions > /dev/null; do | ||
| sleep 1 | ||
| done" && return 0 || return 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The wait_for_server function attempts to check server readiness by sending a GET request to /v1/chat/completions. However, this endpoint in api_server_1e1pd.py only supports POST requests. A GET request from curl will result in a 405 Method Not Allowed error, causing the script to wait for the full timeout period before failing. You should use an endpoint that supports GET requests, such as /health, for the health check. This same issue exists in launch_epd_serve.sh and launch_epd_serve_separated.sh.
| until curl -s localhost:$port/v1/chat/completions > /dev/null; do | |
| sleep 1 | |
| done" && return 0 || return 1 | |
| until curl -s localhost:$port/health > /dev/null; do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After the server starts, the client will receive a 405 response, and the script will run properly
| timeout 12000 bash -c " | ||
| until curl -s localhost:$port/v1/chat/completions > /dev/null; do | ||
| sleep 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The wait_for_server function attempts to check server readiness by sending a GET request to /v1/chat/completions. However, this endpoint in the vLLM OpenAI-compatible server only supports POST requests. A GET request from curl will result in a 405 Method Not Allowed error, causing the script to wait for the full timeout period before failing. You should use an endpoint that supports GET requests, such as /health, for the health check.
| timeout 12000 bash -c " | |
| until curl -s localhost:$port/v1/chat/completions > /dev/null; do | |
| sleep 1 | |
| until curl -s localhost:$port/health > /dev/null; do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same
| timeout 12000 bash -c " | ||
| until curl -s localhost:$port/v1/chat/completions > /dev/null; do | ||
| sleep 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The wait_for_server function attempts to check server readiness by sending a GET request to /v1/chat/completions. However, this endpoint in the vLLM OpenAI-compatible server only supports POST requests. A GET request from curl will result in a 405 Method Not Allowed error, causing the script to wait for the full timeout period before failing. You should use an endpoint that supports GET requests, such as /health, for the health check.
| timeout 12000 bash -c " | |
| until curl -s localhost:$port/v1/chat/completions > /dev/null; do | |
| sleep 1 | |
| until curl -s localhost:$port/health > /dev/null; do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same
| def _get_request_ranks(self, request_id: str): | ||
| # request_id format: $ACTUAL_REQUEST_ID|$E_RANK|$PD_RANK | ||
| result = request_id.split("|") | ||
| return int(result[1]), int(result[2]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The _get_request_ranks method parses the request ID by splitting it with "|". This approach is fragile because if the original request_id contains a | character, the parsing will fail and lead to incorrect behavior or errors. For a core component like this, a more robust serialization method, such as JSON, should be used to encapsulate the request ID and rank information to avoid such parsing issues. For example, the payload could be a JSON string like {"req_id": "...", "e_rank": 0, "pd_rank": 1}.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The approach is similar to the xPyD disagg code, where the ZMQ addresses are passed via the request ID. Since the request ID is generated on the provided proxy, the code will function properly.
| for _ in range(self.mm_inputs_total[request.request_id]): | ||
| # Clean ignored_preallocs later, currently we assume that | ||
| # all mm_inputs will come to the instance at some moment | ||
| if (request.request_id, _) in self.received_metas_reqs: | ||
| self.received_metas_reqs.remove((request.request_id, _)) | ||
| continue | ||
| self.ignored_preallocs.add((request.request_id, _)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment on line 87, "Clean ignored_preallocs later...", indicates a known issue where ignored_preallocs are not properly cleaned up if a request is finished before all its multimodal inputs are processed. This can lead to a memory leak, as the ignored_preallocs set will grow indefinitely in a long-running server, consuming more and more memory. This should be addressed to ensure the stability of the service.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, the Encoder Scheduler schedules all request's multimodal inputs together. As long as the request is scheduled on the E instance, we will clean the ignored_preallocs on the PD instance properly.
The only concern arises when the request is aborted on the E instance before being scheduled, but is completed on the PD instance through the cache, need to handle this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed, if a request is aborted before being scheduled, we will not send the request to the PD instance from our proxy
vllm/v1/core/sched/scheduler.py
Outdated
| if self.mutex.locked(): | ||
| return | ||
| with self.mutex: | ||
| while not self.ec_preallocator.is_empty(): | ||
| prealloc, candidate = self.ec_preallocator.get_prealloc_candidate( | ||
| self.encoder_cache_manager.num_free_slots, fill_next = True) | ||
| if not prealloc: # can't preallocate | ||
| return | ||
| if candidate is not None: | ||
| self.encoder_cache_manager.preallocate(*candidate) | ||
| # last element | ||
|
|
||
| prealloc, candidate = self.ec_preallocator.get_prealloc_candidate( | ||
| self.encoder_cache_manager.num_free_slots, fill_next = False) | ||
| if (candidate is not None): | ||
| self.encoder_cache_manager.preallocate(*candidate) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The _perform_preallocations method starts with if self.mutex.locked(): return. This non-blocking check can cause preallocations to be skipped if the lock is contended, potentially leading to performance degradation or starvation of preallocation tasks. Since the scheduler's main loop is single-threaded, this lock is likely intended to protect against concurrent access from background threads (e.g., connector callbacks). A standard blocking with self.mutex: at the beginning of the method would be safer and prevent re-entrancy without the risk of starvation.
| if self.mutex.locked(): | |
| return | |
| with self.mutex: | |
| while not self.ec_preallocator.is_empty(): | |
| prealloc, candidate = self.ec_preallocator.get_prealloc_candidate( | |
| self.encoder_cache_manager.num_free_slots, fill_next = True) | |
| if not prealloc: # can't preallocate | |
| return | |
| if candidate is not None: | |
| self.encoder_cache_manager.preallocate(*candidate) | |
| # last element | |
| prealloc, candidate = self.ec_preallocator.get_prealloc_candidate( | |
| self.encoder_cache_manager.num_free_slots, fill_next = False) | |
| if (candidate is not None): | |
| self.encoder_cache_manager.preallocate(*candidate) | |
| with self.mutex: | |
| while not self.ec_preallocator.is_empty(): | |
| prealloc, candidate = self.ec_preallocator.get_prealloc_candidate( | |
| self.encoder_cache_manager.num_free_slots, fill_next = True) | |
| if not prealloc: # can't preallocate | |
| return | |
| if candidate is not None: | |
| self.encoder_cache_manager.preallocate(*candidate) | |
| # last element | |
| prealloc, candidate = self.ec_preallocator.get_prealloc_candidate( | |
| self.encoder_cache_manager.num_free_slots, fill_next = False) | |
| if (candidate is not None): | |
| self.encoder_cache_manager.preallocate(*candidate) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, we use only a synchronized encoder cache preallocator, so the lock here has no effect.
In the future in the asynchronous scenario, there is no need to run the function if the function is already being executed in some thread, since the function obtains preallocations from the "global" queue, . Therefore, we can safely skip it without waiting for the lock if the lock is already locked.
Maybe it will be useful to add a limit on the number of preallocations per perform_preallocations call, because currently, this function executes until there are either no new preallocations or no available slots in the encoder cache manager.
|
This pull request has merge conflicts that must be resolved before it can be |
61a60a7 to
ed61cc7
Compare
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a significant architectural change to support Encode-Prefill-Decode (EPD) disaggregation for multimodal models. The changes are extensive and well-structured, introducing encoder separation, inter-instance communication for cache transfer, and modifications to the scheduler and model runner. The implementation correctly uses wrappers to minimize core logic changes, which is great for maintainability. I've found one critical issue related to a potential deadlock in the scheduling logic that needs to be addressed. Otherwise, this is a very impressive and well-documented contribution.
| def get_prealloc_candidate( | ||
| self, free_space: int, fill_next: bool | ||
| ) -> tuple[bool, tuple[str, int, int, str] | None]: | ||
| """Validate the preallocation candidate, fill the next preallocation | ||
| candidate | ||
| Validate current preallocation candidate, retrieves the next | ||
| preallocation request from the queue. Skips ignored preallocations | ||
| and checks whether prellocated data will fit in space constraints. | ||
| Args: | ||
| free_space: Available cache space in encoder tokens. | ||
| fill_next: Whether to fetch the next candidate after processing. | ||
| Returns: | ||
| Tuple of (should_continue, candidate_data) where: | ||
| - should_continue: True if caller should continue preallocations, | ||
| False if caller should stop. | ||
| - candidate_data: None or tuple of (request_id, input_id, | ||
| num_encoder_tokens, mm_hash) | ||
| """ | ||
| with self.scheduling_lock: | ||
| if self.prealloc_candidate is None: | ||
| if fill_next is True: | ||
| self.prealloc_candidate = self.preallocs_queue.get() | ||
| return (True, None) # No candidate, just get next candidate | ||
|
|
||
| (request_id, input_id, num_encoder_tokens, mm_hash) = \ | ||
| self.prealloc_candidate | ||
| if num_encoder_tokens > free_space: | ||
| return (False, None) | ||
|
|
||
| if fill_next is True: | ||
| self.prealloc_candidate = self.preallocs_queue.get() | ||
| else: | ||
| self.prealloc_candidate = None | ||
|
|
||
| if (request_id, input_id) in self.ignored_preallocs: | ||
| self.ignored_preallocs.remove((request_id, input_id)) | ||
| return (True, None) # Skip and get next | ||
|
|
||
| self.pending_preallocs[request_id].remove(input_id) | ||
|
|
||
| return (True, (request_id, input_id, num_encoder_tokens, mm_hash)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The use of self.preallocs_queue.get() in this function is blocking. If the queue becomes empty while the scheduler's _perform_preallocations loop is running, this call will block indefinitely, causing the scheduler to hang. The while not self.ec_preallocator.is_empty() check in the scheduler is not sufficient to prevent this due to the pre-fetching logic within this function.
To prevent this potential deadlock, you should use a non-blocking get_nowait() and handle the queue.Empty exception.
def get_prealloc_candidate(
self, free_space: int, fill_next: bool
) -> tuple[bool, tuple[str, int, int, str] | None]:
"""Validate the preallocation candidate, fill the next preallocation
candidate
Validate current preallocation candidate, retrieves the next
preallocation request from the queue. Skips ignored preallocations
and checks whether prellocated data will fit in space constraints.
Args:
free_space: Available cache space in encoder tokens.
fill_next: Whether to fetch the next candidate after processing.
Returns:
Tuple of (should_continue, candidate_data) where:
- should_continue: True if caller should continue preallocations,
False if caller should stop.
- candidate_data: None or tuple of (request_id, input_id,
num_encoder_tokens, mm_hash)
"""
with self.scheduling_lock:
if self.prealloc_candidate is None:
if fill_next is True:
try:
self.prealloc_candidate = self.preallocs_queue.get_nowait()
except queue.Empty:
return (True, None) # No candidate, queue is empty
else:
return (True, None) # No candidate, not filling
(request_id, input_id, num_encoder_tokens, mm_hash) = \
self.prealloc_candidate
if num_encoder_tokens > free_space:
return (False, None)
if fill_next is True:
try:
self.prealloc_candidate = self.preallocs_queue.get_nowait()
except queue.Empty:
self.prealloc_candidate = None
else:
self.prealloc_candidate = None
if (request_id, input_id) in self.ignored_preallocs:
self.ignored_preallocs.remove((request_id, input_id))
return (True, None) # Skip and get next
self.pending_preallocs[request_id].remove(input_id)
return (True, (request_id, input_id, num_encoder_tokens, mm_hash))There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The while not self.ec_preallocator.is_empty() check in the scheduler is not sufficient to prevent this due to the pre-fetching logic within this function.
fill_next is true if and only if the queue is not empty, if queue is empty we pass fill_next = false, that will prevent the queue.get() function from execution
| # Cached but currently not referenced by any request | ||
| if not self.cached[mm_hash]: | ||
| # If mm_hash is in preallocated then it will not be in freeable | ||
| if not self.cached[mm_hash] and (not mm_hash in self.preallocated): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| if not self.cached[mm_hash] and (not mm_hash in self.preallocated): | |
| if not self.cached[mm_hash] and mm_hash not in self.preallocated: |
| # mm_hash of mm_data => ids of requests that reference the mm_data | ||
| self.cached: dict[str, set[str]] = {} | ||
|
|
||
| # Blockign |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| # Blockign | |
| # Blocking |
| return | ||
| self.cached[mm_hash].discard(req_id) | ||
| if not self.cached[mm_hash]: | ||
| if (not self.cached[mm_hash]) and (mm_hash not in self.preallocated): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same
| # for these models. | ||
| self.encoder_cache_manager = EncoderCacheManager( | ||
| cache_size=encoder_cache_size) | ||
| cache_size=encoder_cache_size*10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems a bit arbitrary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a temporary value to indicate that we need to add a method for determining an appropriate encoder cache size, I think we can replace this change by some comment
Signed-off-by: LastZhabka <sakhmoldin.mukhammadarif@gmail.com>
Signed-off-by: LastZhabka <sakhmoldin.mukhammadarif@gmail.com>
[Misc] Change GPUModelRunnerWrapper names [Misc] Fix semaphore related risks [Core] Rollback to common encoder cache lifecycle on E instance scheduler Signed-off-by: LastZhabka <sakhmoldin.mukhammadarif@gmail.com>
…he manager [Docs] Docstrings for all new methods provided by EPD disaggregation update [Docs] Update EPD disaggregation README.md [Feat] EPD disaggregation multiple instance [Feat] RedisECConnector, Redis Server configuration through env vars Signed-off-by: LastZhabka <sakhmoldin.mukhammadarif@gmail.com>
ed61cc7 to
f7ee59c
Compare
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This is an impressive and significant contribution that introduces a disaggregated architecture for MLLM inference by separating the visual encoder. The design is well-documented with clear diagrams and explanations, which is crucial for such a complex change. The implementation appears robust, with careful consideration for concurrency and state management across distributed instances. I've found one critical issue related to potential deadlocking in the threading logic of the ECConnectorTemplate. Addressing this will be important for the stability of the new system. Overall, this is a very well-executed piece of work that lays a strong foundation for more flexible and efficient MLLM serving.
| def _limiting_wrapper(self, callback: Callable, arg: Callable): | ||
| """Wrapper function to limit the number of workers """ | ||
| with self.limiting_semaphore: | ||
| callback(arg) | ||
|
|
||
| def _recv_event_loop(self, ): | ||
| """Run receive event loop | ||
| This method runs event loop for receive tasks and ensures that | ||
| the number of requested parallel receives is limited by | ||
| $max_connector_workers. | ||
| """ | ||
| try: | ||
| if self.target_recv_callback[0] is None: | ||
| return | ||
| while True: | ||
| callback, arg = self.target_recv_callback | ||
| with self.limiting_semaphore: | ||
| self.recv_executors.submit(self._limiting_wrapper, | ||
| callback, arg) | ||
| except Exception as e: | ||
| raise ConnectionError("Error during recv event loop") from e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current implementation of _recv_event_loop and _limiting_wrapper can lead to a deadlock. The _recv_event_loop acquires the semaphore and then submits a task (_limiting_wrapper) which also attempts to acquire the same semaphore. Since threading.Semaphore is not re-entrant, if the semaphore's value reaches zero, both the main loop thread and the worker threads will block indefinitely, waiting for each other to release the semaphore.
To fix this, I suggest refactoring the logic to a more standard producer-consumer pattern with a semaphore. The main loop should acquire the semaphore to block when all workers are busy, and each worker task should release the semaphore upon completion. This ensures that the number of concurrent receive operations is correctly limited without causing a deadlock.
def _worker_task(self, callback: Callable, arg: Callable):
"""Wrapper function to release semaphore after task completion."""
try:
callback(arg)
finally:
self.limiting_semaphore.release()
def _recv_event_loop(self, ):
"""Run receive event loop
This method runs event loop for receive tasks and ensures that
the number of requested parallel receives is limited by
$max_connector_workers.
"""
try:
if self.target_recv_callback[0] is None:
return
while True:
callback, arg = self.target_recv_callback
self.limiting_semaphore.acquire()
self.recv_executors.submit(self._worker_task,
callback, arg)
except Exception as e:
raise ConnectionError("Error during recv event loop") from e|
Hey, thanks for the mega job here @fake0fan ! I have a few questions/suggestions since I believe this PR might be going a little bit in an orthogonal direction wrt work on other connectors. First of all, if Redis is to be introduced in the picture as a core component, why shouldn't we change the EncoderCache to be an actual distributed cache, and use that to store/read tensors? Some other design points in no particular order:
|
|
Hi @NickLucche,
During implementation, I wasn’t sure if all models were initialized in the same way, both the PD and E instances are currently loading both the encoder and language model.
I designed system to align more with the current PD disaggregation implementation in vLLM, to support P2P and direct transfer using the same tech stack (ZMQ + NCCL). For the distributed cache, we can reuse the same communication model (with some parts skipped) and use the server with the cache instead of the E instance (?) |
|
Hi, @NickLucche, Thank you for your valuable feedback.
We have also noticed the issues you mentioned, so we plan to reference the current kv connector design overall and refactor a similar ec_connector design to reduce the corresponding complexity in scheduling logic and support for different storage and transmission methods.
We will also consider support for different platforms . Our current idea is to first support a distributed storage on host memory, which can be relatively independent from support for different computing hardware (GPU, TPU, NPU). Of course, we will also decide whether to implement some hardware-specific optimizations (such as P2P direct transfer, etc.) based on performance results later on.
Agreed. So we might still want to keep the scheduler integrated rather than separating it out, similar to kv, mainly adding related logic to the scheduler to support EPD.
The current implementation indeed hasn't implemented this optimization. We will implement Encoder loading only the encoder model later on, but for PD I'm still considering whether, if preemption occurs, the PD instance might still need the Encoder model to perform recomputation, or whether it could directly read the corresponding Encoder embedding data through a distributed storage. This may require further discussion. Overall, this is roughly our current situation. I'm wondering if we need to open a Google doc to introduce more discussion? cc @ywang96 |
|
Hey @fake0fan , apologies for the delay, I was off the last few days.
I also believe we could benefit from the "RFC path", opening up a doc where we can discuss different design options (example https://docs.google.com/document/d/10jhCNxJYvsUhtMtiMAaW2MxU5LU8HVje2pGDnj49gH4/edit?tab=t.0#heading=h.wsk0hlrf3cp2). |
right, that sounds great. I've opened a Google Doc and briefly described some of our current design ideas. We can also discuss more design choices for EPD in it later. |
|
Thansk for doing this! Let me share it with more interested people too. I am confident we can iterate quickly. |
MLLM Encode separation and E-P Encoder Cache Transfer
Encode-Prefill-Decode disaggregation provides greater flexibility in distributed MLLM inference, enables better resource utilization under fixed TTFT and TPOT Service Level Objectives, and allows for the application of stage-level optimizations. To implement the EPD full disaggregation, we need to move the visual encoder to a separate instance.
This update introduces the implementation of MLLM visual encoder separation, an abstraction for inter-instance (E-P) communication for encoder cache transfer, and a concrete example implementation of disaggregated E+PD serving.
Motivation
Encoder separation is a critical part of EPD disaggregation, as it allows the visual encoder to be decoupled from the Prefill and Decode stages. To implement and use EPD disaggregation in future we need a visual encoder separation.
Better serving for MLLM. Consider mixed inputs continuous serving scenario, every 10th request includes a large multimodal input, while all others are text-only. In the current vLLM, all requests in a batch in model execution will wait for the multimodal input embedding generation to complete. When new request with the image arrive, all requests will again wait for the multimodal (MM) encoder to run, causing significant performance degradation. By separating the encoding stage, we can avoid these bottleneck. This scenario is used only to highlight the bottleneck, same bottleneck appears in other serving scenarios and can be fixed by separated encoder.
flowchart LR subgraph "Current vLLM Architecture" direction LR subgraph I2 ["Iteration 2"] direction LR R2["Requests 11-19: Text<br/>🖼️ Request 20: MM + Text"] R3["Running Batch 1-10"] S2[Scheduler] B2["Batch 1-20"] MR2[ModelRunner] MM2["execute_mm_encoder<br/>ALL requests blocked AGAIN"] LM2["Language Model<br/>1 generation step"] R2 --> S2 R3 <--> S2 S2 --> B2 --> MR2 --> MM2 --> LM2 end subgraph I1 ["Iteration 1"] direction LR R1["Requests 1-9: Text<br/> Request 10: MM + Text"] S1[Scheduler] B1["Batch 1-10"] MR1[ ModelRunner] MM1["execute_mm_encoder<br/> ALL requests blocked"] LM1["Language Model<br/>1 generation step"] R1 --> S1 --> B1 --> MR1 --> MM1 --> LM1 end endOverall Process
The separated encode overall process in 1E1PD with proxy scenario:
sequenceDiagram participant P as Proxy participant ES as Encode Scheduler participant EVW as DisaggVModelGPURunnerWrapper participant Redis as Redis participant PDW as DisaggLModelGPURunnerWrapper participant PDS as Prefill+Decode Scheduler P->>ES: Request ES->>ES: Schedule request ES->>Redis: Send encoder cache metadata ES->>EVW: SchedulerOutput EVW->>EVW: _execute_mm_encoder() - Process vision/MM inputs Redis->>PDS: Receive metadata EVW->>EVW: Store in encoder_cache temporarily EVW->>ES: ModelRunnerOutput ES->>P: Empty response P->>PDS: Request alt Multimodal input tokens KV values are not obtained from KV cache PDS->>PDS: Preallocate encoder cache space else end PDS->>Redis: Send preallocation response Redis->>EVW: Receive preallocation alt Multimodal input tokens KV values are not obtained from KV cache EVW->>Redis: Send actual encoder cache data Redis->>PDW: Receive encoder cache PDW->>PDW: Inject into encoder_cache dict else Multimodal input tokens KV values are obtained from KV cache end EVW->>ES: ModelRunnerOutput ES->>ES: Free slots in Encoder Cache Manager PDW->>PDS:ModelRunnerOutput alt Multimodal input tokens KV values are not obtained from KV cache PDS->>PDS: Finalize allocation else Multimodal input tokens KV values are obtained from KV cache PDS->>PDS: Free preallocated encoder cache space end PDS->>PDW: SchedulerOutput Note over EVW,PDW: Phase 3: Normal LM Inference PDW->>PDW: Execute prefill + decode PDW->>PDW: Generate text output PDW->>PDS: ModelRunnerOutput PDS->>P: ResponseImplementation
The implementation avoids large changes. Modifications to the model runner are minimal, and the main functionality is provided through model runner wrapper classes, which are only instantiated during the execution of the separated encoder. Same with scheduler, changes in the scheduler are all handled by the condition if self.separated_encode, ensuring that the default VLLM serving remains unaffected. These changes only take effect during execution with the separated multimodal encoder
vLLM minor changes
EPD Disaggregation Configuration
Files:
vllm/config.py,vllm/core/arg_utils.pyAdded a new configuration class for EPD disaggregation. Currently supports configuration of instance type, instance's EPD rank and the number of connector workers.
Additional ModelRunnerOutput Data Fields
Files:
vllm/v1/outputs.pyThe model runner output now includes two additional data fields:
transfered_mm_dataandinjected_mm_data.The
transfered_mm_datafield passes a list of transfered encoder cache input IDs from the model runner to the scheduler on the encode instance. After receiving transfered data IDs, the scheduler will clear free space in the encoder cache manager.The
injected_mm_datafield passes a list of injected encoder cache input IDs from the model runner to the scheduler on the prefill instance. After receiving injected data IDs, the scheduler will clear free space in the encoder cache manager.Model Runner Wrapper Integration in GPUWorker
Files:
vllm/v1/worker/gpu_worker.pyWhen EPD disaggregation is enabled, the system uses wrapper classes of GPUModelRunner class.
GPU Model Runner sanity check in encoder execution
Files:
vllm/v1/worker/gpu_model_runner.pyIf EPD disaggregated serving is enabled, an additional attribute is added to indicate whether encoder execution is allowed. This attribute is used to perform a sanity check on each execution of the encoder.
Major Changes
EncoderCacheManager new Allocation Logic
Files:
vllm/v1/core/encoder_cache_manager.pyThe EncoderCacheManager now contains methods for a two-step allocation process for remote encoder cache injection. This change introduces 4 new methods designed around the concept of preallocation, to reserve cache space for a request without immediately adding it to the cached dictionary.
The allocation process operates in two steps. In preallocation, the system reserves the required space by deducting it from the available encoder cache size. In allocation finalization, the request completes its allocation in the update_from_output scheduler function after successful encoder cache injection in the model runner.
To implement such approach we need following functions:
can_preallocate(cache_size)- checks if sufficient space exists for preallocation using only the request's encoder cache size data.preallocate(req_id, input_id, cache_size)- takes cache_size slots in the encoder cache manager and adds the information that (req_id, input_id) is preallocated.depreallocate(req_id, input_id)- rolls back the preallocate action by freeing the allocated slots associated with (req_id, input_id). Used when we skip the multimodal input via prefix cache usage.finalize_allocation(req_id, input_id)- finalizes the allocation process by adding the (req_id, input_id) to the cached mapping, called after successful encoder cache injection to complete the allocation that was started with preallocate().The two-step allocation ensures that sufficient space will exist in the encoder cache manager for incoming requests after preallocation notification is sent. It also prevents direct allocation in the encoder cache manager until the actual cache injection occurs.
Since the complete request is not transfered from encode instance to prefill instance. This methods determines are using only the request's metadata.
The manager also includes a
deallocate(self, req_id: str, input_id, encoder_cache_size)method for encode instance to release cache space in encoder scheduler, we need this method because the encoder instance finishes request before encoder cache transfer.EncoderCachePreallocator
Files:
vllm/separated_encode/sched/encoder_cache_preallocator.pyThe EncoderCachePreallocator system introduces a classes for managing encoder cache preallocation across distributed instances. This implementation provides two distinct strategies for handling encoder cache metadata and coordinating allocation decisions between encoder and scheduler instances.
Implementation provides an abstract base class EncoderCachePreallocatorTemplate that defines the core interface for encoder cache preallocation management. This template defines abstract methods for request lifecycle management and create encoder cache connector object in init for receiving encoder cache metadata.
Preallocator will asynchronously receive encoder cache metadata through
receive_encoder_cache_metadata()callback, detailed preallocator behaivour is determined by concrete implementation, but in general it always tries to schedule preallocation of (req_id, input_id), if the (req_id, input_id) is schedulable, then it adds (req_id, input_id) to preallocation queue, that will be used inget_prealloc_candidate.On each request addition and finish the preallocator's corresponding function is called to handle initialization and cleaning.
As the scheduler processes tokens, it continuously updates the multimodal input completion status through preallocator's
update_mm_inputs_done()method. This method determines which multimodal inputs have been fully processed based on their position in the token sequence and the number of computed tokens, that allows us to control which encoder caches are not required now.Two concrete implementations provide different allocation strategies:
AsyncEncoderCachePreallocator provides asynchronous approach that immediately triggers preallocation callbacks upon receiving encoder cache metadata. This implementation maintains minimal state and always accepts encoder cache, this allows to avoid additional request state tracking and synchronous approach.
SyncEncoderCachePreallocator implements a synchronous approach with state tracking. It tracks active requests, pending preallocation requests, waiting preallocation metadata, and ignored preallocation entries, to decide whether instance needs to accept encoder cache, or we can reject it and use data from KV cache.
Both implementations track multimodal input progress through
mm_inputs_doneandmm_inputs_totalcounters, updating completion status as tokens are computed, to clear the encoder cache after injection or to avoid accepting encoder cache for the mm input tokens that are covered by prefix cache. Theget_prealloc_candidatemethod provides a interface for retrieving the next preallocation candidate based on available cache space, with options for immediate next candidate filling. If instance can't get candidate(either no candidates/no slots) it returns false.The preallocation queue operates on a first-come, first-served basis, with candidates being processed based on available encoder cache space. The system tracks which preallocations are pending to avoid duplicate processing and to ensure proper cleanup when preallocations become unnecessary.
The preallocation system coordinates with encoder cache connectors to send preallocation notifications, enabling distributed coordination between instances that generate cache data and instances that manage allocation decisions.
sequenceDiagram participant E as Encode Instance participant PD as Prefill+Decode Instance E->>PD: "I have 2048 tokens of encoder cache to send" Note over PD: Check available memory PD->>PD: encoder_cache_manager.can_preallocate(2048 tokens) alt Multimodal input tokens KV values are not obtained from KV cache PD->>PD: encoder_cache_manager.preallocate(req_id, input_id, 2048 tokens) PD->>E: "I reserved 2048 tokens for you" Note over E: Now safe to send E->>PD: Send actual 2048 tokens data Note over PD: After successful injection PD->>PD: encoder_cache_manager.finalize_allocation(req_id, input_id) else Multimodal input tokens KV values are obtained from KV cache PD->>PD: encoder_cache_manager.depreallocate(req_id, input_id) PD->>E: "No need to send" endEncoder Scheduler
Encoder Scheduler (encode)
Files:
vllm/separated_encode/sched/encoder_scheduler.pySeparate EncoderScheduler class implementation is provided for encode instance scheduling, while prefill and prefill+decode instances continue to use the main Scheduler class.
The EncoderScheduler is a specialized scheduler for encode instances that focuses on multimodal input processing. It maintains an _allocated dictionary to track allocated encoder cache entries and their sizes, this dictionary is used to allow us to finish the encode request before all related transfers are completed.
The encode scheduler schedules all multimodal inputs for a request at once in the schedule() method. It checks if there's sufficient encoder cache space and budget before allocating all inputs together. A request on the encode instance is considered finished when all its multimodal embeddings have been computed, so all requests are finished in 1 iteration after scheduling, transfer is handled separately in encoder cache connectors, space allocated for encoder cache is deallocated only after succesfull transfers, not after request finish.
In the update_from_output() method, the scheduler goes throguh transferred multimodal data IDs and deallocates the corresponding encoder cache entries.
Main Scheduler (prefill and prefill+decode instances)
Files:
vllm/v1/core/sched/scheduler.pyFor prefill and prefill+decode instances, the main scheduler is changed for multimodal inputs encode separation, instance
max_num_encoder_input_tokensvalue is set to 0 to avoid multimodal inputs encoder execution.If current instance is the prefill(P) or prefill+decode(PD) instance, then we instantiate preallocator object in scheduler, this preallocator will manage communication and preallocation, also we set
max_num_encoder_input_tokensto 0 to avoid the usage of the multimodal data encoder on P or PD instance.Mostly main scheduler has 2 changes, integration of encoder cache preallocator and
_perform_preallocations()into request lifecycle and injected data handling.The integration of
ec_preallocatoris described in the corresponding part of the documentation. For the_perform_preallocations()function, this function is used to connectec_preallocator, which manages which requests will be preallocated, and the encoder cache manager, which actually performs preallocations. This function just keeps preallocations until there are enough slots in the encoder cache manager and enough preallocation requests. By default,_perform_preallocations()is called 2 times: inupdate_after_schedule()after freeing some encoder inputs, and inupdate_from_outputafter handling injected data.The injected data handling is performed with
injected_mm_dataobtained fromModelRunnerOutput, scheduler is going through injected data and decides whether the allocation is finalized or we don't need the obtained data anymore and we candepreallocateit.Such an implementation with is designed to avoid redundant processing or injection, ensure that scheduler will have enough slots after encoder cache arriving and ensure the deletion of encoder caches that cannot be used due to prefix caching or early request abortion.
Instance-Specific Model Runner Wrappers
Files:
vllm/separated_encode/worker/gpu_epd_lm_wrapper.py,vllm/separated_encode/worker/gpu_epd_vm_wrapper.pyThe implementation introduces specialized GPU model runner wrappers for disaggregated architecture, focusing on distinct roles for multimodal encoding and text generation. These wrappers are built on top of the GPUModelRunner for better compatibility with future changes in GPUModelRunner. As long as the v1 interface for GPU Model Runner remains unchanged, the wrappers do not require updates, wrapper simply call the original methods, instantiate the encoder cache connector, track information, and modify the model runner output with EPD-related information.
DisaggVModelGPURunnerWrapper (Encode Instance)
This wrapper runs on encode instances and processes multimodal inputs. It executes encoder models and sends the results to other instances through encoder cache connector.
The encode instance doesn't need KV cache since it only runs vision part of MLLM. The wrapper overrides
initialize_kv_cache_tensorsandinitialize_kv_cacheto return empty results, freeing up GPU memory for larger encoder cache storage.During execution, the wrapper executes encoding for scheduled multimodal inputs, converts outputs to numpy arrays and inserts enocder cache to encoder cache connector, also it stores. Since no text generation happens here, it returns a mostly empty ModelRunnerOutput with additional transfer status information in ModelRunnerOutput, this information is used in encoder scheduler to free the space in encoder cache manager.
DisaggLModelGPURunnerWrapper (Prefill/(Prefill+Decode) Instance)
This wrapper runs on prefill or (prefill+decode) instances where the Language Model is exectued. It receives encoder cache from encode instances and injects them into the normal inference pipeline.
The wrapper uses a callback function
receive_encoder_cacheto handle incoming encoder data. This callback converts numpy arrays back to GPU tensors with the correct device and dtype, then stores them in the standard encoder_cache dictionary.During
execute_model, the wrapper simply callsexecute_model, and also tracks which encoder caches were injected. It reports successful injections back to the scheduler through the model output, allowing the scheduler to finalize allocations of preallocated inputs.Encoder Cache Connector
Files:
vllm/separated_encode/ec_transfer/connector/template.py,vllm/separated_encode/ec_transfer/connector/redis.pyThe Encoder Cache Connector provides an abstraction layer for transferring encoder caches between encode and prefill instances in disaggregated vLLM deployments. The abstract base class ECConnectorTemplate defines the communication logic.
The connector operates using a thread-based architecture with separate send and receive event loops. Communication is handled asynchronously through configurable worker pools. It maintains separate queues for send and receive operations, with each operation executed by dedicated worker threads.
The encoder connector operates in four distinct states based on instance type and its component:
State for Encode Scheduler - Pure sender functionality that handles encoder cache metadata transfer. When multimodal input is scheduled, metadata sending tasks are added to the send queue for processing by the send event loop.
State for Prefill Scheduler - Receives encoder cache metadata from encode instances and manages preallocation through scheduler callbacks. The preallocation logic is described in scheduler updates. After successful preallocation, sends completion notifications back to encode instances from which it received the metadata.
State for Encode Model Runner - Manages cache storage, transfer, and lifecycle. It maintains:
When encoder output is generated, add_encoder_cache() either stores the cache locally or immediately schedules transfer if a succesfull preallocation notification was already received. Upon receiving succesfull preallocation notifications via
_maybe_send_encoder_cache(), it either sends the cache immediately or adds the request to the pending set. It can receive failed preallocation notification, it means that we don't need to send encoder cache to this instance and can delete the encoder cache for this (req_id, input_id) from the Encoder instance.State for Prefill Model Runner - Receive-only state that accepts encoder cache data and calls injection callbacks to add the cache into the model runner's encoder cache dictionary.
The communication flow follows this sequence:
The preallocation step is implemented to avoid OOM problems on the prefill instance.
The
add_encoder_cachemethod also looks up pending send queries. If a cache transfer has already been requested (exists incache_to_sendset), it immediately schedules the send operation rather than storing the cache locally. This optimization minimizes latency by sending caches as soon as both prerequisites are met. Failed preallocations are tracked incache_to_avoidto prevent unnecessary transfersTransfer completion tracking is built into the class. Through the connector's
get_transferred_idsmethod, the model runner can determine which request data has already been received.graph LR subgraph "Encode Instance" ES[Encode Scheduler] EMR[Encode Model Runner] end subgraph "Prefill Instance" PS[Prefill Scheduler] PMR[Prefill Model Runner] end ES -.->|metadata| PS EMR -.->|cache data| PMR PS -.->|notification| EMR PMR -.->|injected IDs| PS EMR -.->|transfered IDs| ESExtension Example
The included
RedisECConnectordemonstrates a concrete implementation using Redis as the communication backend. To use other communication backends, implement the abstract methods_send_prealloc_notification,_send_encoder_cache_metas,_send_encoder_cache,_recv_prealloc_notification,_recv_encoder_cache_metas, and_recv_encoder_cacheaccording to your chosen transport mechanism. This connection extension supports multiple E instances and multiple PD or P instances.Usage Instructions
The system offers three different deployment options through separate scripts. The first two scripts,
launch_epd_api_server.shandlaunch_epd_serve.sh, are designed to run the 1E1PD disaggregation on a single device and are used in the benchmark below to illustrate the performance of the encoder separation. The third script,launch_epd_serve_separated.sh, enables 1E1PD disaggregation by deploying EPD across two devices.To start the EPD instances and proxy server, select one of the provided scripts and modify the arguments as needed before execution. You can run the deployment using any of these commands:
After the server starts running, you can interact with it using OpenAI-compatible API requests to send queries and receive responses. Sample Python code for sending requests is available in the examples/online_serving/separated_encode/ directory, providing practical examples for integration.
The distinction between
launch_epd_api_server.shandlaunch_epd_serve.sh. Communication between the proxy and vLLM API introduces significant delays, likely due to the proxy implementation. For benchmarking purposes,launch_epd_api_server.shwas preferred as it uses a custom API server that instantiates AsyncLLMs directly, bypassing the proxy overhead. Both the proxy and API server implementations can be found in their respective directories:examples/online_serving/separated_encode/api_serverandexamples/online_serving/separated_encode/proxy.The API server implementation allows sending both requests simultaneously without waiting for the response from instance E and enables shared tokenization process.
Benchmark
Performance evaluation was conducted using Qwen2.5-VL-3B-Instruct on an NVIDIA A100-SXM4-80GB GPU, comparing default vllm serve against an 1E1PD with API server approach(1 GPU is used). Testing utilized the lmarena-ai/VisionArena-Chat dataset with varying prompt loads from 100 to 1000 requests to assess scalability characteristics.
Approaches were becnhmarked exactly three times for 4 workloads. The detailed individual run results are provided at the end of this document. The following table presents the averaged values across the three runs:
As mentioned in the motivation section, the results demonstrate significant improvements in TPOT, for example median TPOT is improved by at least 30% across all workloads. Due to improvement in TPOT we can also observe improvements in ITL, especially in cases where the number of inputs is significantly larger than 1 batch size. However, the increased TTFT is also observed in some cases.
The elevated TTFT is primarily produced from two factors. First, image preprocessing occurs twice in the current implementation, creating redundant computational overhead. While this duplication introduces latency, the impact is partially mitigated through concurrent request processing to both encoder and prefill-decode instances. Second, the encoder cache transfer mechanism introduces delay in current two-step preallocation approach, where a preallocation notification is sent followed by a one iteration of model execution for the PD instance model runner, that will receive the encoder cache and return the injected IDs.
In the future the image preprocessing on PD or P instance can be removed in separated encode scenario. Also the ECConnector could be extended to utilize an intermediate caching server that stores encoder outputs and enables immediate cache delivery, eliminating the synchronization wait period without changing the ECConnector architecture. Also, adapting the AsyncScheduler feature could provide asynchronous coordination between the encoder and prefill-decode stages, reducing the communication overhead.
Other reasons can produce this TTFT increasing, but most likely it can be handled by proper configuration/changes without architecture changes.
We also can note the general throughput improvements, with EPD achieving ~5% higher request throughput at 1000 prompts workload (10.12 vs 9.58 req/s) and output token throughput (1078 vs 1023 tok/s). By resolving TTFT-related issues and modifying the vllm architecture towards disaggregated serving, we can continue obtaining better results in distributed setting, as the current implementation already slightly demonstrates the benefits while leaving clear optimization opportunities.