Skip to content

WIP: Multiturn Benchmarking Support #211

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions src/guidellm/request/session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import itertools
from abc import ABC, abstractmethod
from typing import Generic, TypeVar

from guidellm.backend.response import ResponseSummary
from guidellm.request.request import GenerationRequest

__all__ = ["RequestSession"]

# TODO: Replace with specific types that implement needed features
RequestT = TypeVar("RequestT")
ResponseT = TypeVar("ResponseT")


class RequestSession(ABC, Generic[RequestT, ResponseT]):
@abstractmethod
def get_next_request(self) -> RequestT: ...

@abstractmethod
def get_next_delay(self) -> float: ...

@abstractmethod
def push_response(self, response: ResponseT) -> None: ...

@property
@abstractmethod
def complete(self) -> bool: ...


# FIXME: Bad implementation. Can only handle string requests
class GenerativeRequestSession(RequestSession[GenerationRequest, ResponseSummary]):
def __init__(self, prompts: list[GenerationRequest]) -> None:
if not prompts:
raise ValueError("Prompts cannot be empty")

self.prompts = prompts
self.responses: list[str] = []

def get_request(self) -> GenerationRequest:
completed_responses = len(self.responses)
base_request = self.prompts[completed_responses].model_copy()
base_request.content = "".join(
itertools.chain.from_iterable(
zip((x.content for x in self.prompts), self.responses)
)
)
base_request.stats["prompt_tokens"] = sum(
x.stats["prompt_tokens"] for x in self.prompts[: completed_responses + 1]
)
base_request.constraints["output_tokens"] = sum(
x.constraints["output_tokens"] for x in self.prompts[:completed_responses]
)

return base_request

def push_response(self, response: ResponseSummary) -> None:
if len(self.responses) < len(self.prompts):
if response.response_output_tokens is not None:
self.prompts[len(self.responses)].constraints["output_tokens"] = (
response.response_output_tokens
)
self.responses.append(response.value)
else:
raise ValueError("Response list full")

@property
def complete(self) -> bool:
return len(self.responses) >= len(self.prompts)
119 changes: 57 additions & 62 deletions src/guidellm/scheduler/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import asyncio
import math
import multiprocessing
import multiprocessing.queues
import time
from collections.abc import AsyncGenerator, Iterable, Iterator
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Manager, Queue
from queue import Empty as QueueEmpty
from typing import (
Any,
Generic,
Expand All @@ -15,17 +15,22 @@
from loguru import logger

from guidellm.config import settings
from guidellm.request.session import RequestSession
from guidellm.scheduler.result import (
SchedulerRequestResult,
SchedulerResult,
SchedulerRunInfo,
)
from guidellm.scheduler.strategy import SchedulingStrategy
from guidellm.scheduler.types import RequestT, ResponseT
from guidellm.scheduler.types import (
MPQueues,
RequestT,
ResponseT,
WorkerProcessRequestTime,
WorkerProcessResult,
)
from guidellm.scheduler.worker import (
RequestsWorker,
WorkerProcessRequest,
WorkerProcessResult,
)

__all__ = ["Scheduler"]
Expand Down Expand Up @@ -114,13 +119,13 @@ async def run(
raise ValueError(f"Invalid max_duration: {max_duration}")

with (
multiprocessing.Manager() as manager,
Manager() as manager,
ProcessPoolExecutor(
max_workers=scheduling_strategy.processes_limit
) as executor,
):
requests_iter: Optional[Iterator[Any]] = None
futures, requests_queue, responses_queue = await self._start_processes(
futures, queues = await self._start_processes(
manager, executor, scheduling_strategy
)
run_info, requests_iter, times_iter = self._run_setup(
Expand Down Expand Up @@ -149,13 +154,14 @@ async def run(
requests_iter = self._add_requests(
requests_iter,
times_iter,
requests_queue,
queues.requests,
queues.times,
run_info,
)
await asyncio.sleep(0) # enable requests to start

iter_result = self._check_result_ready(
responses_queue,
queues.responses,
run_info,
)
if iter_result is not None:
Expand All @@ -171,7 +177,7 @@ async def run(
run_info=run_info,
)

await self._stop_processes(futures, requests_queue)
await self._stop_processes(futures, queues.requests)

async def _start_processes(
self,
Expand All @@ -180,14 +186,16 @@ async def _start_processes(
scheduling_strategy: SchedulingStrategy,
) -> tuple[
list[asyncio.Future],
multiprocessing.Queue,
multiprocessing.Queue,
MPQueues[RequestT, ResponseT],
]:
await self.worker.prepare_multiprocessing()
requests_queue = manager.Queue(
maxsize=scheduling_strategy.queued_requests_limit
queues: MPQueues[RequestT, ResponseT] = MPQueues(
requests=manager.Queue(
maxsize=scheduling_strategy.processing_requests_limit
),
times=manager.Queue(maxsize=scheduling_strategy.processing_requests_limit),
responses=manager.Queue(),
)
responses_queue = manager.Queue()

num_processes = min(
scheduling_strategy.processes_limit,
Expand All @@ -212,36 +220,20 @@ async def _start_processes(
futures = []
loop = asyncio.get_event_loop()
for id_, requests_limit in zip(process_ids, process_requests_limits):
if scheduling_strategy.processing_mode == "sync":
futures.append(
loop.run_in_executor(
executor,
self.worker.process_loop_synchronous,
requests_queue,
responses_queue,
id_,
)
)
elif scheduling_strategy.processing_mode == "async":
futures.append(
loop.run_in_executor(
executor,
self.worker.process_loop_asynchronous,
requests_queue,
responses_queue,
requests_limit,
id_,
)
)
else:
raise ValueError(
f"Invalid processing mode: {scheduling_strategy.processing_mode} "
f"for strategy: {scheduling_strategy}"
futures.append(
loop.run_in_executor(
executor,
self.worker.process_loop_asynchronous,
queues,
False, # TODO: Make configurable
requests_limit,
id_,
)
)

await asyncio.sleep(0.1) # give time for processes to start

return futures, requests_queue, responses_queue
return futures, queues

def _run_setup(
self,
Expand Down Expand Up @@ -284,7 +276,8 @@ def _add_requests(
self,
requests_iter: Optional[Iterator[Any]],
times_iter: Iterator[float],
requests_queue: multiprocessing.Queue,
requests_queue: Queue[RequestSession[RequestT, ResponseT]],
times_queue: Queue[WorkerProcessRequestTime],
run_info: SchedulerRunInfo,
) -> Optional[Iterator[Any]]:
if requests_iter is not None:
Expand All @@ -298,23 +291,24 @@ def _add_requests(
if run_info.created_requests >= run_info.end_number:
raise StopIteration

if (
request_time := next(times_iter)
) >= run_info.end_time or time.time() >= run_info.end_time:
raise StopIteration

request = next(requests_iter)
work_req: WorkerProcessRequest[RequestT] = WorkerProcessRequest(
request=request,
start_time=request_time,
timeout_time=run_info.end_time,
queued_time=time.time(),
)
requests_queue.put(work_req)

run_info.created_requests += 1
run_info.queued_requests += 1
added_count += 1
session = next(requests_iter)
requests_queue.put(session)
for _ in range(len(session)):
if (
request_time := next(times_iter)
) >= run_info.end_time or time.time() >= run_info.end_time:
raise StopIteration

work_req = WorkerProcessRequestTime(
start_time=request_time,
timeout_time=run_info.end_time,
queued_time=time.time(),
)
times_queue.put(work_req)

run_info.created_requests += 1
run_info.queued_requests += 1
added_count += 1
except StopIteration:
# we've reached the limit number, limit time, or exhausted the requests
# set to None to stop adding more and tell the loop no more requests
Expand All @@ -324,14 +318,14 @@ def _add_requests(

def _check_result_ready(
self,
responses_queue: multiprocessing.Queue,
responses_queue: Queue[WorkerProcessResult[RequestT, ResponseT]],
run_info: SchedulerRunInfo,
) -> Optional[SchedulerRequestResult[RequestT, ResponseT]]:
try:
process_response: WorkerProcessResult[RequestT, ResponseT] = (
responses_queue.get_nowait()
)
except multiprocessing.queues.Empty: # type: ignore[attr-defined]
except QueueEmpty:
return None

if process_response.type_ == "request_scheduled":
Expand Down Expand Up @@ -374,8 +368,9 @@ def _check_result_ready(
async def _stop_processes(
self,
futures: list[asyncio.Future],
requests_queue: multiprocessing.Queue,
requests_queue: Queue[RequestSession[RequestT, ResponseT]],
):
# FIXME: Need new method for stopping workers
for _ in futures:
requests_queue.put(None)

Expand Down
4 changes: 3 additions & 1 deletion src/guidellm/scheduler/strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,9 @@ def processes_limit(self) -> int:
:return: {self.streams} for the concurrent scheduling strategy to limit
the worker processes to the number of streams.
"""
return self.streams
cpu_cores = os.cpu_count() or 1

return min(max(1, cpu_cores - 1), self.streams)

@property
def queued_requests_limit(self) -> int:
Expand Down
40 changes: 38 additions & 2 deletions src/guidellm/scheduler/types.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,43 @@
from typing import TypeVar
from dataclasses import dataclass
from multiprocessing import Queue
from typing import Generic, Literal, Optional, TypeVar

__all__ = ["RequestT", "ResponseT"]
from guidellm.request.session import RequestSession
from guidellm.scheduler.result import SchedulerRequestInfo

__all__ = [
"MPQueues",
"RequestT",
"ResponseT",
"WorkerProcessRequestTime",
"WorkerProcessResult",
]


RequestT = TypeVar("RequestT")
ResponseT = TypeVar("ResponseT")


# TODO: Move dataclasses somewhere else


@dataclass
class WorkerProcessRequestTime:
start_time: float
timeout_time: float
queued_time: float


@dataclass
class WorkerProcessResult(Generic[RequestT, ResponseT]):
type_: Literal["request_scheduled", "request_start", "request_complete"]
request: RequestT
response: Optional[ResponseT]
info: SchedulerRequestInfo


@dataclass
class MPQueues(Generic[RequestT, ResponseT]):
requests: Queue[RequestSession[RequestT, ResponseT]]
times: Queue[WorkerProcessRequestTime]
responses: Queue[WorkerProcessResult[RequestT, ResponseT]]
Loading
Loading