Skip to content

Commit

Permalink
Squashed commit of the following:
Browse files Browse the repository at this point in the history
commit 48c674b
Author: s5u13b <sunbiao.sun@alibaba-inc.com>
Date:   Fri Feb 7 09:41:05 2025 +0000

    Fix lint

commit 322862b
Author: s5u13b <sunbiao.sun@alibaba-inc.com>
Date:   Fri Feb 7 09:39:31 2025 +0000

    Fix entrypoints unit test

commit 75af824
Author: s5u13b <sunbiao.sun@alibaba-inc.com>
Date:   Fri Feb 7 08:07:26 2025 +0000

    Fix lint

commit 2818c8d
Author: s5u13b <sunbiao.sun@alibaba-inc.com>
Date:   Fri Feb 7 08:06:08 2025 +0000

    Fix cr

commit a172468
Author: s5u13b <sunbiao.sun@alibaba-inc.com>
Date:   Fri Feb 7 07:01:07 2025 +0000

    Fix lint

commit 3f863b2
Author: s5u13b <sunbiao.sun@alibaba-inc.com>
Date:   Fri Feb 7 06:54:18 2025 +0000

    Add back timestamp

commit 2e53b24
Author: s5u13b <sunbiao.sun@alibaba-inc.com>
Date:   Fri Feb 7 06:45:16 2025 +0000

    Fix lint

commit eea1a3a
Author: s5u13b <sunbiao.sun@alibaba-inc.com>
Date:   Fri Feb 7 06:37:30 2025 +0000

    Add back timestamps

commit b4a45ef
Author: s5u13b <sunbiao.sun@alibaba-inc.com>
Date:   Fri Feb 7 06:21:48 2025 +0000

    Remove old filter

commit f2df197
Author: s5u13b <sunbiao.sun@alibaba-inc.com>
Date:   Fri Feb 7 06:12:53 2025 +0000

    Add _process_model_outputs back

commit a51cf25
Author: s5u13b <sunbiao.sun@alibaba-inc.com>
Date:   Fri Feb 7 03:46:45 2025 +0000

    Fix abort

commit 1058ec0
Author: s5u13b <sunbiao.sun@alibaba-inc.com>
Date:   Fri Feb 7 02:43:14 2025 +0000

    Remove blank todo

commit 670018e
Author: s5u13b <sunbiao.sun@alibaba-inc.com>
Date:   Fri Feb 7 02:36:27 2025 +0000

    Filter out migrating request

commit fa2fc9c
Author: s5u13b <sunbiao.sun@alibaba-inc.com>
Date:   Fri Jan 24 06:25:35 2025 +0000

    Remove process_model_outputs request timestamps

commit 2a980ca
Author: s5u13b <sunbiao.sun@alibaba-inc.com>
Date:   Fri Jan 24 06:10:49 2025 +0000

    Fix linting

commit 78a1ab4
Author: s5u13b <sunbiao.sun@alibaba-inc.com>
Date:   Fri Jan 24 05:30:15 2025 +0000

    Fix request leaking bug of migration

commit 774205b
Author: s5u13b <sunbiao.sun@alibaba-inc.com>
Date:   Fri Jan 24 03:11:08 2025 +0000

    Fix

commit 814521e
Author: s5u13b <sunbiao.sun@alibaba-inc.com>
Date:   Fri Jan 24 02:57:20 2025 +0000

    Minors

commit b3f0688
Author: s5u13b <sunbiao.sun@alibaba-inc.com>
Date:   Fri Jan 24 01:56:09 2025 +0000

    Change ci timeout-minutes
  • Loading branch information
s5u13b committed Feb 7, 2025
1 parent cc9565c commit d2894ca
Show file tree
Hide file tree
Showing 11 changed files with 80 additions and 21 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/correctness_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
correctness_tests:
needs: cancel_previous_workflows
runs-on: [self-hosted]
timeout-minutes: 30
timeout-minutes: 15
steps:
- name: Checkout
uses: actions/checkout@v4
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/migration_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
migration_tests:
needs: cancel_previous_workflows
runs-on: [self-hosted]
timeout-minutes: 90
timeout-minutes: 60
steps:
- name: Checkout
uses: actions/checkout@v4
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/pylint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
pylint_test:
needs: cancel_previous_workflows
runs-on: [self-hosted]
timeout-minutes: 15
timeout-minutes: 10
steps:
- uses: actions/checkout@v4
- name: Analysing the code with pylint
Expand Down
2 changes: 1 addition & 1 deletion docs/Arguments.md
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,6 @@ usage: -m llumnix.entrypoints.vllm.api_server [-h]
- Llumnix does not support pipeline parallel currently.

`--num-schedule-steps`
- Llumnix does not support multi-step scheduling.
- Llumnix does not support multi-step scheduling currently.

Besides, Llumnix does not support sampling algorithms whose number of ouput sequences is greater than one (vllm.SamplingParams.n > 1), such as beam search.
2 changes: 1 addition & 1 deletion llumnix/backends/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ async def put_nowait_to_servers(self,
server_info.request_output_queue_port))
req_outputs = list(server_request_outputs.values())[idx]
request_ids = [req_output.request_id for req_output in req_outputs]
self.engine_actor_handle.abort_request.remote(request_ids)
self.engine_actor_handle.abort.remote(request_ids)

def init_backend_engine(instance_id: str,
placement_group: PlacementGroup,
Expand Down
76 changes: 63 additions & 13 deletions llumnix/backends/vllm/llm_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from vllm.engine.arg_utils import EngineArgs
from vllm.utils import Counter
from vllm.usage.usage_lib import UsageContext
from vllm.engine.llm_engine import SchedulerContext

from llumnix.logging.logger import init_logger
from llumnix.instance_info import InstanceInfo
Expand Down Expand Up @@ -55,11 +56,10 @@ def create(seq_group: SequenceGroupLlumnix, use_cache: bool = False):
if hasattr(seq_group,
'embeddings') and seq_group.embeddings is not None:
return EmbeddingRequestOutput.from_seq_group(seq_group), seq_group.server_info
if RequestStatus.is_migrating(seq_group.status):
return None
# pylint: disable=too-many-function-args
return RequestOutput.from_seq_group(seq_group, use_cache), seq_group.server_info


class LLMEngineLlumnix(_AsyncLLMEngine):
def __init__(self,
instance_id: str,
Expand Down Expand Up @@ -130,9 +130,64 @@ def from_engine_args(
)
return engine

# pylint: disable=inconsistent-return-statements
def _process_model_outputs(self,
ctx: SchedulerContext,
request_id: Optional[str] = None) -> None:
if len(ctx.output_queue) == 0:
return None

if request_id:
(outputs, seq_group_metadata_list, scheduler_outputs, is_async,
is_last_step, is_first_step_output, skip) = ctx.output_queue[0]
else:
(outputs, seq_group_metadata_list, scheduler_outputs, is_async,
is_last_step, is_first_step_output,
skip) = ctx.output_queue.popleft()

# Filter out outputs of migrating requests.
server_infos = []
if outputs:
new_outputs = []
new_scheduled_seq_groups = []
new_seq_group_metadata_list = []
for scheduled_seq_group, seq_group_meta, seq_group_output in \
zip(scheduler_outputs.scheduled_seq_groups, seq_group_metadata_list, outputs[0].outputs):
seq_group = scheduled_seq_group.seq_group
if seq_group.get_seqs(SequenceStatus.RUNNING):
new_scheduled_seq_groups.append(scheduled_seq_group)
new_seq_group_metadata_list.append(seq_group_meta)
new_outputs.append(seq_group_output)
server_infos.append(seq_group.server_info)
scheduler_outputs.scheduled_seq_groups = new_scheduled_seq_groups
outputs[0].outputs = new_outputs
seq_group_metadata_list = new_seq_group_metadata_list

if request_id:
ctx.output_queue[0] = (outputs, seq_group_metadata_list, scheduler_outputs, is_async,
is_last_step, is_first_step_output, skip)
else:
ctx.output_queue.appendleft((outputs, seq_group_metadata_list, scheduler_outputs, is_async,
is_last_step, is_first_step_output, skip))

for server_info in server_infos:
if hasattr(server_info, 'request_timestamps'):
server_info.request_timestamps.engine_process_model_outputs_timestamp_begin = time.time()

super()._process_model_outputs(ctx, request_id)

if ctx.request_outputs:
request_outputs, server_infos = zip(*ctx.request_outputs)
for request_output, server_info in zip(request_outputs, server_infos):
if hasattr(server_info, 'request_timestamps'):
request_output.request_timestamps = server_info.request_timestamps
request_output.request_timestamps.engine_process_model_outputs_timestamp_end = time.time()

return

def _process_request_outputs(
self,
outputs: List[Tuple[RequestOutput,ServerInfo]],
outputs: List[Tuple[RequestOutput, ServerInfo]],
step_begin_time: float
) -> Tuple[List[RequestOutput], List[ServerInfo]]:
request_outputs = []
Expand All @@ -141,20 +196,15 @@ def _process_request_outputs(
request_outputs, server_infos = zip(*outputs)
request_outputs = list(request_outputs)
server_infos = list(server_infos)
for request_output, server_info in zip(request_outputs, server_infos):
# Assign request_timestamps from server_infos to request_outputs.
if hasattr(server_info, 'request_timestamps'):
request_output.request_timestamps = server_info.request_timestamps
if request_output.finished:
logger.info("engine finished request {}".format(request_output.request_id))
for server_info in server_infos:
if hasattr(server_info, 'request_timestamps'):
server_info.request_timestamps.engine_process_model_outputs_timestamp_begin = time.time()
for request_output in request_outputs:
if hasattr(request_output, 'request_timestamps'):
request_output.request_timestamps.engine_step_timestamp_begin = step_begin_time
request_output.request_timestamps.engine_step_timestamp_end = time.time()

for request_output in request_outputs:
if request_output.finished:
logger.info("engine finished request {}".format(request_output.request_id))

instance_info: InstanceInfo = self.instance_info
instance_info.instance_id = self.instance_id
instance_info.step_id = next(self.step_counter)
Expand Down Expand Up @@ -348,7 +398,7 @@ def get_waiting_queue(self) -> Deque[SequenceGroupLlumnix]:
def get_request_incremental_blocks(self, *args, **kwargs) -> Tuple[List[int], List[int]]:
return self.engine.scheduler[0].get_request_incremental_blocks(*args, **kwargs)

def remove_running_request(self, *args, **kwargs) -> None:
def remove_running_request(self, *args, **kwargs) -> bool:
return self.engine.scheduler[0].remove_running_request(*args, **kwargs)

def remove_waiting_request(self, *args, **kwargs) -> bool:
Expand Down
2 changes: 2 additions & 0 deletions llumnix/backends/vllm/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ def _set_status(self,

def free_dst_pre_alloc_cache(self, request_id: str = None) -> None:
if request_id:
logger.info("free request {} pre_alloc_cache".format(request_id))
block_table = self.pre_alloc_cache_dict.pop(request_id, None)
if block_table:
block_table.free()
Expand All @@ -199,6 +200,7 @@ def free_dst_pre_alloc_cache(self, request_id: str = None) -> None:
# Clear all pre-allocated cache of dst instance when src instance encounters exception.
request_ids = list(self.pre_alloc_cache_dict.keys())
for req_id in request_ids:
logger.info("free request {} pre_alloc_cache".format(req_id))
block_table = self.pre_alloc_cache_dict.pop(req_id, None)
if block_table:
block_table.free()
Expand Down
2 changes: 1 addition & 1 deletion llumnix/backends/vllm/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def migrate_cache(self, src_worker_handle_list, src_blocks: List[int], dst_block
total_kv_cache_size = len(src_blocks) * CacheEngine.get_cache_block_size(
self.cache_config, self.model_config, self.parallel_config)
speed = total_kv_cache_size/GiB_bytes/(end_time - start_time)
logger.info("[migration_cache] blocks_num: {}, total_kv_cache_size: {}, time: {}s, speed: {}GB/s."
logger.info("Migrate kv cache done, blocks_num: {}, total_kv_cache_size: {}, time: {}s, speed: {}GB/s."
.format(len(src_blocks), convert_bytes(total_kv_cache_size), end_time-start_time, speed))

def do_recv(self, *args, **kwargs):
Expand Down
8 changes: 6 additions & 2 deletions llumnix/entrypoints/vllm/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import time
import asyncio
from typing import Dict
from functools import partial

import ray

Expand Down Expand Up @@ -45,8 +44,9 @@ async def generate(self,
**kwargs) -> AsyncStream:
if sampling_params.n > 1:
raise ValueError("Unsupported feature: multiple sequence decoding")
logger.info("entrypoints receive request {}".format(request_id))
# pylint: disable=unexpected-keyword-arg
results_generator = AsyncStream(request_id, cancel=partial(self.abort, verbose=False))
results_generator = AsyncStream(request_id, cancel=self.abort_request)
self.request_streams[request_id] = results_generator
server_info_copy = copy.deepcopy(self.server_info)

Expand Down Expand Up @@ -115,6 +115,10 @@ async def abort(self, request_id: str) -> None:
except ray.exceptions.RayActorError:
logger.warning("Manager is unavailable.")

def abort_request(self, request_id: str) -> None:
logger.info("Abort request: {}.".format(request_id))
self.manager.abort.remote(request_id)

async def is_ready(self) -> bool:
ready_status = await self.manager.is_ready.remote()
return ready_status
Expand Down
1 change: 1 addition & 0 deletions llumnix/llumlet/llumlet.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ async def _check_engine_state_loop(self):

async def migrate_out(self, dst_instance_name: str) -> List[str]:
migrate_out_requests = self.migration_scheduler.get_migrate_out_requests()

if len(migrate_out_requests) == 0:
return []

Expand Down
2 changes: 2 additions & 0 deletions llumnix/llumlet/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class RequestInferenceType(str, Enum):
PREFILL = "prefill"
DECODE = "decode"


class RequestStatus(str, Enum):
RUNNING = "running"
WAITING = "waiting"
Expand All @@ -32,6 +33,7 @@ class RequestStatus(str, Enum):
def is_migrating(status) -> bool:
return status in [RequestStatus.RUNNING_MIGRATING, RequestStatus.WAITING_MIGRATING]


class LlumnixRequest:
def __init__(self, request_id: int, server_info: ServerInfo, expected_steps: int) -> None:
self.request_id = request_id
Expand Down

0 comments on commit d2894ca

Please sign in to comment.