Skip to content

Conversation

@DarkLight1337
Copy link
Member

@DarkLight1337 DarkLight1337 commented Oct 15, 2025

Purpose

Part of #26900

  • vllm.utils.AsyncMicrobatchTokenizer -> vllm.utils.async_utils.AsyncMicrobatchTokenizer
  • vllm.utils.cancel_task_threadsafe -> vllm.utils.async_utils.cancel_task_threadsafe
  • vllm.utils.func.make_async -> vllm.utils.async_utils.make_async
  • vllm.utils.run_in_loop -> vllm.utils.async_utils.run_in_loop
  • vllm.utils.in_loop -> vllm.utils.async_utils.in_loop
  • vllm.utils.merge_async_iterators -> vllm.utils.async_utils.merge_async_iterators
  • vllm.utils.collect_from_async_generator -> vllm.utils.async_utils.collect_from_async_generator
  • vllm.utils._run_task_with_lock -> vllm.executor.ray_distributed_executor._run_task_with_lock (V0 code to be removed)

Test Plan

Test Result


Essential Elements of an Effective PR Description Checklist
  • The purpose of the PR, such as "Fix some issue (link existing issues this PR will resolve)".
  • The test plan, such as providing test command.
  • The test results, such as pasting the results comparison before and after, or e2e results
  • (Optional) The necessary documentation update, such as updating supported_models.md and examples for a new model.
  • (Optional) Release notes update. If your change is user facing, please update the release notes draft in the Google Doc.

Signed-off-by: DarkLight1337 <tlleungac@connect.ust.hk>
@DarkLight1337 DarkLight1337 added the ready ONLY add when PR is ready to merge/full CI is needed label Oct 15, 2025
@mergify mergify bot added frontend performance Performance-related issues v1 labels Oct 15, 2025
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors asynchronous utility functions into a new vllm.utils.async_utils module. The changes primarily involve moving code and updating import paths, which has been done correctly across the codebase. I've identified a critical issue in the AsyncMicrobatchTokenizer class related to a memory leak caused by a reference cycle, and I have provided a detailed comment with a code suggestion to fix it.

Comment on lines +64 to +167
def _get_queue(
self, loop: asyncio.AbstractEventLoop, key: tuple
) -> asyncio.Queue[tuple[str, dict, Future] | tuple[list[int], Future]]:
"""Get the request queue for the given operation key, creating a new
queue and batcher task if needed."""
queue = self._queues.get(key)
if queue is None:
self._queues[key] = queue = asyncio.Queue()
if key[0] == "encode":
can_batch = key[1] != "other"
coro = self._batch_encode_loop(queue, can_batch)
else:
assert key[0] == "decode", f"Unknown operation type: {key[0]}."
coro = self._batch_decode_loop(queue)
self._batcher_tasks.append(loop.create_task(coro))
return queue

async def _batch_encode_loop(self, queue: asyncio.Queue, can_batch: bool):
"""Batch incoming encode requests for efficiency."""
while True:
prompt, kwargs, result_future = await queue.get()
prompts = [prompt]
kwargs_list = [kwargs]
result_futures = [result_future]
deadline = self._loop.time() + self.batch_wait_timeout_s

while len(prompts) < self.max_batch_size:
timeout = deadline - self._loop.time()
if timeout <= 0:
break
try:
prompt, kwargs, result_future = await asyncio.wait_for(
queue.get(), timeout
)
prompts.append(prompt)
result_futures.append(result_future)
if not can_batch:
kwargs_list.append(kwargs)
except asyncio.TimeoutError:
break

try:
# If every request uses identical kwargs we can run a single
# batched tokenizer call for a big speed-up.
if can_batch and len(prompts) > 1:
batch_encode_fn = partial(self.tokenizer, prompts, **kwargs)
results = await self._loop.run_in_executor(
self._executor, batch_encode_fn
)

for i, fut in enumerate(result_futures):
if not fut.done():
data = {k: v[i] for k, v in results.items()}
fut.set_result(BatchEncoding(data))
else:
encode_fn = lambda prompts=prompts, kwargs=kwargs_list: [
self.tokenizer(p, **kw) for p, kw in zip(prompts, kwargs)
]
results = await self._loop.run_in_executor(
self._executor, encode_fn
)

for fut, res in zip(result_futures, results):
if not fut.done():
fut.set_result(res)
except Exception as e:
for fut in result_futures:
if not fut.done():
fut.set_exception(e)

async def _batch_decode_loop(self, queue: asyncio.Queue):
"""Batch incoming decode requests for efficiency."""
while True:
token_ids, result_future = await queue.get()
token_ids_list = [token_ids]
result_futures = [result_future]
deadline = self._loop.time() + self.batch_wait_timeout_s

while len(token_ids_list) < self.max_batch_size:
timeout = deadline - self._loop.time()
if timeout <= 0:
break
try:
token_ids, result_future = await asyncio.wait_for(
queue.get(), timeout
)
token_ids_list.append(token_ids)
result_futures.append(result_future)
except asyncio.TimeoutError:
break

try:
# Perform a single batched decode call for all requests
results = await self._loop.run_in_executor(
self._executor, self.tokenizer.batch_decode, token_ids_list
)
for fut, res in zip(result_futures, results):
if not fut.done():
fut.set_result(res)
except Exception as e:
for fut in result_futures:
if not fut.done():
fut.set_exception(e)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The AsyncMicrobatchTokenizer class creates background tasks that hold a reference to the instance, while the instance holds references to these tasks in self._batcher_tasks. This creates a reference cycle (self -> _batcher_tasks -> task -> coro -> self), which prevents the instance from being garbage collected. As a result, the __del__ method is never called, leading to a memory leak and orphaned background tasks that are never cancelled.

To resolve this, you can break the reference cycle by using a weakref to self when creating the coroutines for the background tasks. This requires making _batch_encode_loop and _batch_decode_loop static methods.

Please also add import weakref at the top of the file.

    def _get_queue(
        self, loop: asyncio.AbstractEventLoop, key: tuple
    ) -> asyncio.Queue[tuple[str, dict, Future] | tuple[list[int], Future]]:
        """Get the request queue for the given operation key, creating a new
        queue and batcher task if needed."""
        queue = self._queues.get(key)
        if queue is None:
            self._queues[key] = queue = asyncio.Queue()
            self_weak = weakref.ref(self)
            if key[0] == "encode":
                can_batch = key[1] != "other"
                coro = AsyncMicrobatchTokenizer._batch_encode_loop(
                    self_weak, queue, can_batch)
            else:
                assert key[0] == "decode", f"Unknown operation type: {key[0]}."
                coro = AsyncMicrobatchTokenizer._batch_decode_loop(self_weak,
                                                                    queue)
            self._batcher_tasks.append(loop.create_task(coro))
        return queue

    @staticmethod
    async def _batch_encode_loop(
            self_weak: "weakref.Reference['AsyncMicrobatchTokenizer']",
            queue: asyncio.Queue, can_batch: bool):
        """Batch incoming encode requests for efficiency."""
        self = self_weak()
        if not self:
            return

        while True:
            prompt, kwargs, result_future = await queue.get()
            prompts = [prompt]
            kwargs_list = [kwargs]
            result_futures = [result_future]
            deadline = self._loop.time() + self.batch_wait_timeout_s

            while len(prompts) < self.max_batch_size:
                timeout = deadline - self._loop.time()
                if timeout <= 0:
                    break
                try:
                    prompt, kwargs, result_future = await asyncio.wait_for(
                        queue.get(), timeout)
                    prompts.append(prompt)
                    result_futures.append(result_future)
                    if not can_batch:
                        kwargs_list.append(kwargs)
                except asyncio.TimeoutError:
                    break

            try:
                # If every request uses identical kwargs we can run a single
                # batched tokenizer call for a big speed-up.
                if can_batch and len(prompts) > 1:
                    batch_encode_fn = partial(self.tokenizer, prompts,
                                              **kwargs)
                    results = await self._loop.run_in_executor(
                        self._executor, batch_encode_fn)

                    for i, fut in enumerate(result_futures):
                        if not fut.done():
                            data = {k: v[i] for k, v in results.items()}
                            fut.set_result(BatchEncoding(data))
                else:
                    encode_fn = lambda prompts=prompts, kwargs=kwargs_list: [
                        self.tokenizer(p, **kw)
                        for p, kw in zip(prompts, kwargs)
                    ]
                    results = await self._loop.run_in_executor(
                        self._executor, encode_fn)

                    for fut, res in zip(result_futures, results):
                        if not fut.done():
                            fut.set_result(res)
            except Exception as e:
                for fut in result_futures:
                    if not fut.done():
                        fut.set_exception(e)

    @staticmethod
    async def _batch_decode_loop(
            self_weak: "weakref.Reference['AsyncMicrobatchTokenizer']",
            queue: asyncio.Queue):
        """Batch incoming decode requests for efficiency."""
        self = self_weak()
        if not self:
            return

        while True:
            token_ids, result_future = await queue.get()
            token_ids_list = [token_ids]
            result_futures = [result_future]
            deadline = self._loop.time() + self.batch_wait_timeout_s

            while len(token_ids_list) < self.max_batch_size:
                timeout = deadline - self._loop.time()
                if timeout <= 0:
                    break
                try:
                    token_ids, result_future = await asyncio.wait_for(
                        queue.get(), timeout)
                    token_ids_list.append(token_ids)
                    result_futures.append(result_future)
                except asyncio.TimeoutError:
                    break

            try:
                # Perform a single batched decode call for all requests
                results = await self._loop.run_in_executor(
                    self._executor, self.tokenizer.batch_decode,
                    token_ids_list)
                for fut, res in zip(result_futures, results):
                    if not fut.done():
                        fut.set_result(res)
            except Exception as e:
                for fut in result_futures:
                    if not fut.done():
                        fut.set_exception(e)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@njhill is this valid?

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors asynchronous utility functions into a new vllm.utils.async_utils module. The changes involve moving several functions and the AsyncMicrobatchTokenizer class from vllm.utils and vllm.utils.func and updating their import paths across the codebase. A related test for merge_async_iterators has also been moved to its own file. The refactoring is well-executed, improving code organization without changing any logic. I've reviewed the changes and found no issues.

Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Signed-off-by: DarkLight1337 <tlleungac@connect.ust.hk>
@njhill
Copy link
Member

njhill commented Oct 15, 2025

@DarkLight1337 how about vllm.utils.async?

@DarkLight1337
Copy link
Member Author

async is a reserved keyword in Python so I can't name it that

@DarkLight1337 DarkLight1337 enabled auto-merge (squash) October 15, 2025 15:29
@njhill
Copy link
Member

njhill commented Oct 15, 2025

How about vllm.utils.asyncio?

@DarkLight1337 DarkLight1337 merged commit 828523a into vllm-project:main Oct 15, 2025
60 checks passed
@DarkLight1337 DarkLight1337 deleted the split-async-utils branch October 15, 2025 15:33
@DarkLight1337
Copy link
Member Author

Oops I missed that... maybe next PR then

@DarkLight1337
Copy link
Member Author

Opened #26920

mandy-li pushed a commit to mandy-li/vllm that referenced this pull request Oct 16, 2025
Signed-off-by: DarkLight1337 <tlleungac@connect.ust.hk>
albertoperdomo2 pushed a commit to albertoperdomo2/vllm that referenced this pull request Oct 16, 2025
Signed-off-by: DarkLight1337 <tlleungac@connect.ust.hk>
Signed-off-by: Alberto Perdomo <aperdomo@redhat.com>
albertoperdomo2 pushed a commit to albertoperdomo2/vllm that referenced this pull request Oct 16, 2025
Signed-off-by: DarkLight1337 <tlleungac@connect.ust.hk>
Signed-off-by: Alberto Perdomo <aperdomo@redhat.com>
sducouedic pushed a commit to sducouedic/vllm that referenced this pull request Oct 16, 2025
Signed-off-by: DarkLight1337 <tlleungac@connect.ust.hk>
lywa1998 pushed a commit to lywa1998/vllm that referenced this pull request Oct 20, 2025
Signed-off-by: DarkLight1337 <tlleungac@connect.ust.hk>
alhridoy pushed a commit to alhridoy/vllm that referenced this pull request Oct 24, 2025
Signed-off-by: DarkLight1337 <tlleungac@connect.ust.hk>
xuebwang-amd pushed a commit to xuebwang-amd/vllm that referenced this pull request Oct 24, 2025
Signed-off-by: DarkLight1337 <tlleungac@connect.ust.hk>
Signed-off-by: xuebwang-amd <xuebwang@amd.com>
xuebwang-amd pushed a commit to xuebwang-amd/vllm that referenced this pull request Oct 24, 2025
Signed-off-by: DarkLight1337 <tlleungac@connect.ust.hk>
Signed-off-by: xuebwang-amd <xuebwang@amd.com>
0xrushi pushed a commit to 0xrushi/vllm that referenced this pull request Oct 26, 2025
Signed-off-by: DarkLight1337 <tlleungac@connect.ust.hk>
Signed-off-by: 0xrushi <6279035+0xrushi@users.noreply.github.com>
0xrushi pushed a commit to 0xrushi/vllm that referenced this pull request Oct 26, 2025
Signed-off-by: DarkLight1337 <tlleungac@connect.ust.hk>
Signed-off-by: 0xrushi <6279035+0xrushi@users.noreply.github.com>
rtourgeman pushed a commit to rtourgeman/vllm that referenced this pull request Nov 10, 2025
Signed-off-by: DarkLight1337 <tlleungac@connect.ust.hk>
Zhathw pushed a commit to Zhathw/vllm that referenced this pull request Nov 12, 2025
Signed-off-by: DarkLight1337 <tlleungac@connect.ust.hk>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

frontend performance Performance-related issues ready ONLY add when PR is ready to merge/full CI is needed v1

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants