40
40
import java .util .Arrays ;
41
41
import java .util .List ;
42
42
import java .util .concurrent .TimeUnit ;
43
+ import java .util .concurrent .TimeoutException ;
43
44
import java .util .concurrent .locks .Condition ;
44
45
import java .util .concurrent .locks .Lock ;
45
46
import java .util .concurrent .locks .ReentrantLock ;
@@ -63,6 +64,7 @@ public class TestCommandListener implements CommandListener {
63
64
private final TestListener listener ;
64
65
private final Lock lock = new ReentrantLock ();
65
66
private final Condition commandCompletedCondition = lock .newCondition ();
67
+ private final Condition commandAnyEventCondition = lock .newCondition ();
66
68
private final boolean observeSensitiveCommands ;
67
69
private boolean ignoreNextSucceededOrFailedEvent ;
68
70
private static final CodecRegistry CODEC_REGISTRY_HACK ;
@@ -223,22 +225,12 @@ private <T extends CommandEvent> List<T> getEvents(final Class<T> type,
223
225
}
224
226
}
225
227
226
- public List <CommandStartedEvent > waitForStartedEvents (final int numEvents ) {
227
- lock .lock ();
228
- try {
229
- while (!hasCompletedEvents (numEvents )) {
230
- try {
231
- if (!commandCompletedCondition .await (TIMEOUT , TimeUnit .SECONDS )) {
232
- throw new MongoTimeoutException ("Timeout waiting for event" );
233
- }
234
- } catch (InterruptedException e ) {
235
- throw interruptAndCreateMongoInterruptedException ("Interrupted waiting for event" , e );
236
- }
237
- }
238
- return getEvents (CommandStartedEvent .class , numEvents );
239
- } finally {
240
- lock .unlock ();
241
- }
228
+ private <T extends CommandEvent > long getEventCount (final Class <T > eventClass , final Predicate <T > matcher ) {
229
+ return getEvents ().stream ()
230
+ .filter (eventClass ::isInstance )
231
+ .map (eventClass ::cast )
232
+ .filter (matcher )
233
+ .count ();
242
234
}
243
235
244
236
public void waitForFirstCommandCompletion () {
@@ -287,6 +279,7 @@ else if (!observeSensitiveCommands) {
287
279
addEvent (new CommandStartedEvent (event .getRequestContext (), event .getOperationId (), event .getRequestId (),
288
280
event .getConnectionDescription (), event .getDatabaseName (), event .getCommandName (),
289
281
event .getCommand () == null ? null : getWritableClone (event .getCommand ())));
282
+ commandAnyEventCondition .signal ();
290
283
} finally {
291
284
lock .unlock ();
292
285
}
@@ -312,6 +305,7 @@ else if (!observeSensitiveCommands) {
312
305
event .getResponse () == null ? null : event .getResponse ().clone (),
313
306
event .getElapsedTime (TimeUnit .NANOSECONDS )));
314
307
commandCompletedCondition .signal ();
308
+ commandAnyEventCondition .signal ();
315
309
} finally {
316
310
lock .unlock ();
317
311
}
@@ -334,6 +328,7 @@ else if (!observeSensitiveCommands) {
334
328
try {
335
329
addEvent (event );
336
330
commandCompletedCondition .signal ();
331
+ commandAnyEventCondition .signal ();
337
332
} finally {
338
333
lock .unlock ();
339
334
}
@@ -428,4 +423,22 @@ private void assertEquivalence(final CommandStartedEvent actual, final CommandSt
428
423
assertEquals (expected .getDatabaseName (), actual .getDatabaseName ());
429
424
assertEquals (expected .getCommand (), actual .getCommand ());
430
425
}
426
+
427
+ public <T extends CommandEvent > void waitForEvents (final Class <T > eventClass , final Predicate <T > matcher , final int count )
428
+ throws TimeoutException {
429
+ lock .lock ();
430
+ try {
431
+ while (getEventCount (eventClass , matcher ) < count ) {
432
+ try {
433
+ if (!commandAnyEventCondition .await (TIMEOUT , TimeUnit .SECONDS )) {
434
+ throw new MongoTimeoutException ("Timeout waiting for command event" );
435
+ }
436
+ } catch (InterruptedException e ) {
437
+ throw interruptAndCreateMongoInterruptedException ("Interrupted waiting for event" , e );
438
+ }
439
+ }
440
+ } finally {
441
+ lock .unlock ();
442
+ }
443
+ }
431
444
}
0 commit comments