11# SPDX-License-Identifier: Apache-2.0
22# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
33import multiprocessing
4- import os
54import pickle
65import queue
76import signal
@@ -507,6 +506,7 @@ def wait_for_ready(
507506 return cast (list [WorkerProcHandle ], ready_proc_handles )
508507
509508 def shutdown (self ):
509+ self .worker .shutdown ()
510510 self .rpc_broadcast_mq = None
511511 self .worker_response_mq = None
512512 destroy_model_parallel ()
@@ -536,7 +536,7 @@ def signal_handler(signum, frame):
536536 # tuple[Connection, Connection]
537537 reader , ready_writer = kwargs .pop ("ready_pipe" )
538538 death_pipe = kwargs .pop ("death_pipe" , None )
539-
539+ shutdown_event = threading . Event ()
540540 # Start death monitoring thread if death_pipe is provided
541541 if death_pipe is not None :
542542
@@ -548,7 +548,7 @@ def monitor_parent_death():
548548 # Parent process has exited, terminate this worker
549549 logger .info ("Parent process exited, terminating worker" )
550550 # Send signal to self to trigger clean shutdown
551- os . kill ( os . getpid (), signal . SIGTERM )
551+ shutdown_event . set ( )
552552 except Exception as e :
553553 logger .warning ("Death monitoring error: %s" , e )
554554
@@ -576,7 +576,7 @@ def monitor_parent_death():
576576 ready_writer .close ()
577577 ready_writer = None
578578
579- worker .worker_busy_loop ()
579+ worker .worker_busy_loop (cancel = shutdown_event )
580580
581581 except Exception :
582582 # NOTE: if an Exception arises in busy_loop, we send
@@ -586,6 +586,8 @@ def monitor_parent_death():
586586
587587 if ready_writer is not None :
588588 logger .exception ("WorkerProc failed to start." )
589+ elif shutdown_event .is_set ():
590+ logger .info ("WorkerProc shutting down." )
589591 else :
590592 logger .exception ("WorkerProc failed." )
591593
@@ -637,11 +639,11 @@ def async_output_busy_loop(self):
637639 output = self .async_output_queue .get ()
638640 self .enqueue_output (output )
639641
640- def worker_busy_loop (self ):
642+ def worker_busy_loop (self , cancel : Optional [ threading . Event ] = None ):
641643 """Main busy loop for Multiprocessing Workers"""
642644 while True :
643- method , args , kwargs , output_rank = self .rpc_broadcast_mq .dequeue ()
644-
645+ method , args , kwargs , output_rank = self .rpc_broadcast_mq .dequeue (
646+ cancel = cancel )
645647 try :
646648 if isinstance (method , str ):
647649 func = getattr (self .worker , method )
0 commit comments