diff --git a/vllm/v1/executor/multiproc_executor.py b/vllm/v1/executor/multiproc_executor.py index cff6181fa3ad..cb125bf4bf17 100644 --- a/vllm/v1/executor/multiproc_executor.py +++ b/vllm/v1/executor/multiproc_executor.py @@ -38,7 +38,7 @@ POLLING_TIMEOUT_MS = 5000 POLLING_TIMEOUT_S = POLLING_TIMEOUT_MS // 1000 -EXECUTE_MODEL_TIMEOUT_S = 30 +EXECUTE_MODEL_TIMEOUT_S = 40 class MultiprocExecutor(Executor): @@ -151,16 +151,16 @@ def execute_model( def collective_rpc(self, method: Union[str, Callable], - timeout: Optional[float] = 180.0, + timeout: Optional[float] = None, args: tuple = (), kwargs: Optional[dict] = None, rank0_reply_only: bool = False) -> list[Any]: - start_time = time.monotonic() - kwargs = kwargs or {} - if self.is_failed: raise RuntimeError("Executor failed.") + deadline = None if timeout is None else time.monotonic() + timeout + kwargs = kwargs or {} + # NOTE: If the args are heterogeneous, then we pack them into a list, # and unpack them in the method of every worker, because every worker # knows their own rank. @@ -176,8 +176,8 @@ def collective_rpc(self, workers = (self.workers[0], ) if rank0_reply_only else self.workers responses = [None] * len(workers) for w in workers: - dequeue_timeout = timeout - (time.monotonic() - start_time - ) if timeout is not None else None + dequeue_timeout = None if deadline is None else ( + deadline - time.monotonic()) status, result = w.worker_response_mq.dequeue( timeout=dequeue_timeout, cancel=self.shutdown_event)