15
15
runtest , INTERRUPTED , CHILD_ERROR , PROGRESS_MIN_TIME ,
16
16
format_test_result , TestResult , is_failed , TIMEOUT )
17
17
from test .libregrtest .setup import setup_tests
18
- from test .libregrtest .utils import format_duration
18
+ from test .libregrtest .utils import format_duration , print_warning
19
19
20
20
21
21
# Display the running tests if nothing happened last N seconds
@@ -103,9 +103,10 @@ class ExitThread(Exception):
103
103
pass
104
104
105
105
106
- class MultiprocessThread (threading .Thread ):
107
- def __init__ (self , pending , output , ns , timeout ):
106
+ class TestWorkerProcess (threading .Thread ):
107
+ def __init__ (self , worker_id , pending , output , ns , timeout ):
108
108
super ().__init__ ()
109
+ self .worker_id = worker_id
109
110
self .pending = pending
110
111
self .output = output
111
112
self .ns = ns
@@ -114,12 +115,16 @@ def __init__(self, pending, output, ns, timeout):
114
115
self .start_time = None
115
116
self ._popen = None
116
117
self ._killed = False
118
+ self ._stopped = False
117
119
118
120
def __repr__ (self ):
119
- info = ['MultiprocessThread' ]
120
- test = self .current_test_name
121
+ info = [f'TestWorkerProcess #{ self .worker_id } ' ]
121
122
if self .is_alive ():
122
- info .append ('alive' )
123
+ dt = time .monotonic () - self .start_time
124
+ info .append ("running for %s" % format_duration (dt ))
125
+ else :
126
+ info .append ('stopped' )
127
+ test = self .current_test_name
123
128
if test :
124
129
info .append (f'test={ test } ' )
125
130
popen = self ._popen
@@ -128,53 +133,24 @@ def __repr__(self):
128
133
return '<%s>' % ' ' .join (info )
129
134
130
135
def _kill (self ):
131
- dt = time .monotonic () - self .start_time
136
+ if self ._killed :
137
+ return
138
+ self ._killed = True
132
139
133
140
popen = self ._popen
134
- pid = popen .pid
135
- print ("Kill worker process %s running for %.1f sec" % (pid , dt ),
136
- file = sys .stderr , flush = True )
141
+ if popen is None :
142
+ return
137
143
144
+ print (f"Kill { self } " , file = sys .stderr , flush = True )
138
145
try :
139
146
popen .kill ()
140
- return True
141
147
except OSError as exc :
142
- print ("WARNING: Failed to kill worker process %s: %r" % (pid , exc ),
143
- file = sys .stderr , flush = True )
144
- return False
145
-
146
- def _close_wait (self ):
147
- popen = self ._popen
148
-
149
- # stdout and stderr must be closed to ensure that communicate()
150
- # does not hang
151
- popen .stdout .close ()
152
- popen .stderr .close ()
153
-
154
- try :
155
- popen .wait (JOIN_TIMEOUT )
156
- except (subprocess .TimeoutExpired , OSError ) as exc :
157
- print ("WARNING: Failed to wait for worker process %s "
158
- "completion (timeout=%.1f sec): %r"
159
- % (popen .pid , JOIN_TIMEOUT , exc ),
160
- file = sys .stderr , flush = True )
161
-
162
- def kill (self ):
163
- """
164
- Kill the current process (if any).
165
-
166
- This method can be called by the thread running the process,
167
- or by another thread.
168
- """
169
- self ._killed = True
170
-
171
- if self ._popen is None :
172
- return
173
-
174
- if not self ._kill ():
175
- return
148
+ print_warning (f"Failed to kill { self } : { exc !r} " )
176
149
177
- self ._close_wait ()
150
+ def stop (self ):
151
+ # Method called from a different thread to stop this thread
152
+ self ._stopped = True
153
+ self ._kill ()
178
154
179
155
def mp_result_error (self , test_name , error_type , stdout = '' , stderr = '' ,
180
156
err_msg = None ):
@@ -190,59 +166,69 @@ def _timedout(self, test_name):
190
166
try :
191
167
stdout , stderr = popen .communicate (timeout = JOIN_TIMEOUT )
192
168
except (subprocess .TimeoutExpired , OSError ) as exc :
193
- print ("WARNING: Failed to read worker process %s output "
194
- "(timeout=%.1f sec): %r"
195
- % (popen .pid , JOIN_TIMEOUT , exc ),
196
- file = sys .stderr , flush = True )
197
-
198
- self ._close_wait ()
169
+ print_warning (f"Failed to read { self } output "
170
+ f"(timeout={ format_duration (JOIN_TIMEOUT )} ): "
171
+ f"{ exc !r} " )
199
172
200
173
return self .mp_result_error (test_name , TIMEOUT , stdout , stderr )
201
174
202
- def _runtest (self , test_name ):
203
- try :
204
- self .start_time = time .monotonic ()
205
- self .current_test_name = test_name
175
+ def _run_process (self , test_name ):
176
+ self .start_time = time .monotonic ()
206
177
178
+ self .current_test_name = test_name
179
+ try :
180
+ self ._killed = False
207
181
self ._popen = run_test_in_subprocess (test_name , self .ns )
208
182
popen = self ._popen
183
+ except :
184
+ self .current_test_name = None
185
+ raise
186
+
187
+ try :
188
+ if self ._stopped :
189
+ # If kill() has been called before self._popen is set,
190
+ # self._popen is still running. Call again kill()
191
+ # to ensure that the process is killed.
192
+ self ._kill ()
193
+ raise ExitThread
194
+
209
195
try :
210
- try :
211
- if self ._killed :
212
- # If kill() has been called before self._popen is set,
213
- # self._popen is still running. Call again kill()
214
- # to ensure that the process is killed.
215
- self .kill ()
216
- raise ExitThread
217
-
218
- try :
219
- stdout , stderr = popen .communicate (timeout = self .timeout )
220
- except subprocess .TimeoutExpired :
221
- if self ._killed :
222
- # kill() has been called: communicate() fails
223
- # on reading closed stdout/stderr
224
- raise ExitThread
225
-
226
- return self ._timedout (test_name )
227
- except OSError :
228
- if self ._killed :
229
- # kill() has been called: communicate() fails
230
- # on reading closed stdout/stderr
231
- raise ExitThread
232
- raise
233
- except :
234
- self .kill ()
235
- raise
236
- finally :
237
- self ._close_wait ()
196
+ stdout , stderr = popen .communicate (timeout = self .timeout )
197
+ except subprocess .TimeoutExpired :
198
+ if self ._stopped :
199
+ # kill() has been called: communicate() fails
200
+ # on reading closed stdout/stderr
201
+ raise ExitThread
202
+
203
+ return self ._timedout (test_name )
204
+ except OSError :
205
+ if self ._stopped :
206
+ # kill() has been called: communicate() fails
207
+ # on reading closed stdout/stderr
208
+ raise ExitThread
209
+ raise
238
210
239
211
retcode = popen .returncode
212
+ stdout = stdout .strip ()
213
+ stderr = stderr .rstrip ()
214
+
215
+ return (retcode , stdout , stderr )
216
+ except :
217
+ self ._kill ()
218
+ raise
240
219
finally :
241
- self .current_test_name = None
220
+ self ._wait_completed ()
242
221
self ._popen = None
222
+ self .current_test_name = None
223
+
224
+ def _runtest (self , test_name ):
225
+ result = self ._run_process (test_name )
243
226
244
- stdout = stdout .strip ()
245
- stderr = stderr .rstrip ()
227
+ if isinstance (result , MultiprocessResult ):
228
+ # _timedout() case
229
+ return result
230
+
231
+ retcode , stdout , stderr = result
246
232
247
233
err_msg = None
248
234
if retcode != 0 :
@@ -266,7 +252,7 @@ def _runtest(self, test_name):
266
252
return MultiprocessResult (result , stdout , stderr , err_msg )
267
253
268
254
def run (self ):
269
- while not self ._killed :
255
+ while not self ._stopped :
270
256
try :
271
257
try :
272
258
test_name = next (self .pending )
@@ -284,6 +270,33 @@ def run(self):
284
270
self .output .put ((True , traceback .format_exc ()))
285
271
break
286
272
273
+ def _wait_completed (self ):
274
+ popen = self ._popen
275
+
276
+ # stdout and stderr must be closed to ensure that communicate()
277
+ # does not hang
278
+ popen .stdout .close ()
279
+ popen .stderr .close ()
280
+
281
+ try :
282
+ popen .wait (JOIN_TIMEOUT )
283
+ except (subprocess .TimeoutExpired , OSError ) as exc :
284
+ print_warning (f"Failed to wait for { self } completion "
285
+ f"(timeout={ format_duration (JOIN_TIMEOUT )} ): "
286
+ f"{ exc !r} " )
287
+
288
+ def wait_stopped (self , start_time ):
289
+ while True :
290
+ # Write a message every second
291
+ self .join (1.0 )
292
+ if not self .is_alive ():
293
+ break
294
+ dt = time .monotonic () - start_time
295
+ print (f"Waiting for { self } thread for { format_duration (dt )} " , flush = True )
296
+ if dt > JOIN_TIMEOUT :
297
+ print_warning (f"Failed to join { self } in { format_duration (dt )} " )
298
+ break
299
+
287
300
288
301
def get_running (workers ):
289
302
running = []
@@ -298,7 +311,7 @@ def get_running(workers):
298
311
return running
299
312
300
313
301
- class MultiprocessRunner :
314
+ class MultiprocessTestRunner :
302
315
def __init__ (self , regrtest ):
303
316
self .regrtest = regrtest
304
317
self .ns = regrtest .ns
@@ -311,30 +324,20 @@ def __init__(self, regrtest):
311
324
self .workers = None
312
325
313
326
def start_workers (self ):
314
- self .workers = [MultiprocessThread ( self .pending , self .output ,
315
- self .ns , self .worker_timeout )
316
- for _ in range (self .ns .use_mp )]
327
+ self .workers = [TestWorkerProcess ( index , self .pending , self .output ,
328
+ self .ns , self .worker_timeout )
329
+ for index in range (1 , self .ns .use_mp + 1 )]
317
330
print ("Run tests in parallel using %s child processes"
318
331
% len (self .workers ))
319
332
for worker in self .workers :
320
333
worker .start ()
321
334
322
- def wait_workers (self ):
335
+ def stop_workers (self ):
323
336
start_time = time .monotonic ()
324
337
for worker in self .workers :
325
- worker .kill ()
338
+ worker .stop ()
326
339
for worker in self .workers :
327
- while True :
328
- worker .join (1.0 )
329
- if not worker .is_alive ():
330
- break
331
- dt = time .monotonic () - start_time
332
- print ("Wait for regrtest worker %r for %.1f sec" % (worker , dt ),
333
- flush = True )
334
- if dt > JOIN_TIMEOUT :
335
- print ("Warning -- failed to join a regrtest worker %s"
336
- % worker , flush = True )
337
- break
340
+ worker .wait_stopped (start_time )
338
341
339
342
def _get_result (self ):
340
343
if not any (worker .is_alive () for worker in self .workers ):
@@ -418,10 +421,11 @@ def run_tests(self):
418
421
if self .ns .timeout is not None :
419
422
faulthandler .cancel_dump_traceback_later ()
420
423
421
- # a test failed (and --failfast is set) or all tests completed
422
- self .pending .stop ()
423
- self .wait_workers ()
424
+ # Always ensure that all worker processes are no longer
425
+ # worker when we exit this function
426
+ self .pending .stop ()
427
+ self .stop_workers ()
424
428
425
429
426
430
def run_tests_multiprocess (regrtest ):
427
- MultiprocessRunner (regrtest ).run_tests ()
431
+ MultiprocessTestRunner (regrtest ).run_tests ()
0 commit comments