3838POLLING_TIMEOUT_MS = 5000
3939POLLING_TIMEOUT_S = POLLING_TIMEOUT_MS // 1000
4040
41- EXECUTE_MODEL_TIMEOUT_S = 30
41+ EXECUTE_MODEL_TIMEOUT_S = 40
4242
4343
4444class MultiprocExecutor (Executor ):
@@ -151,16 +151,16 @@ def execute_model(
151151
152152 def collective_rpc (self ,
153153 method : Union [str , Callable ],
154- timeout : Optional [float ] = 180.0 ,
154+ timeout : Optional [float ] = None ,
155155 args : tuple = (),
156156 kwargs : Optional [dict ] = None ,
157157 rank0_reply_only : bool = False ) -> list [Any ]:
158- start_time = time .monotonic ()
159- kwargs = kwargs or {}
160-
161158 if self .is_failed :
162159 raise RuntimeError ("Executor failed." )
163160
161+ deadline = None if timeout is None else time .monotonic () + timeout
162+ kwargs = kwargs or {}
163+
164164 # NOTE: If the args are heterogeneous, then we pack them into a list,
165165 # and unpack them in the method of every worker, because every worker
166166 # knows their own rank.
@@ -176,8 +176,8 @@ def collective_rpc(self,
176176 workers = (self .workers [0 ], ) if rank0_reply_only else self .workers
177177 responses = [None ] * len (workers )
178178 for w in workers :
179- dequeue_timeout = timeout - ( time . monotonic () - start_time
180- ) if timeout is not None else None
179+ dequeue_timeout = None if deadline is None else (
180+ deadline - time . monotonic ())
181181 status , result = w .worker_response_mq .dequeue (
182182 timeout = dequeue_timeout , cancel = self .shutdown_event )
183183
0 commit comments