diff --git a/src/test/java/io/nats/client/impl/ReconnectTests.java b/src/test/java/io/nats/client/impl/ReconnectTests.java index 4f14650b5..7ae89ef17 100644 --- a/src/test/java/io/nats/client/impl/ReconnectTests.java +++ b/src/test/java/io/nats/client/impl/ReconnectTests.java @@ -765,8 +765,8 @@ public boolean includeAllServers() { @Test public void testForceReconnectQueueBehaviorCheck() throws Exception { runInJsCluster((nc0, nc1, nc2) -> { - int pubCount = 1000000; - int subscribeTime = 4000; + int pubCount = 100_000; + int subscribeTime = 5000; int flushWait = 2500; int port = nc0.getServerInfo().getPort(); @@ -803,8 +803,11 @@ private static void _testForceReconnectQueueCheck(String subject, int pubCount, froBuilder.forceClose(); } + ReconnectQueueCheckConnectionListener listener = new ReconnectQueueCheckConnectionListener(); + Options options = Options.builder() .server(getNatsLocalhostUri(port)) + .connectionListener(listener) .dataPortType(ForceReconnectQueueCheckDataPort.class.getCanonicalName()) .build(); @@ -815,6 +818,8 @@ private static void _testForceReconnectQueueCheck(String subject, int pubCount, nc.forceReconnect(froBuilder.build()); + assertTrue(listener.latch.await(subscribeTime, TimeUnit.MILLISECONDS)); + long maxTime = subscribeTime; while (!subscriber.subscriberDone.get() && maxTime > 0) { //noinspection BusyWait @@ -834,6 +839,17 @@ private static void _testForceReconnectQueueCheck(String subject, int pubCount, } } + static class ReconnectQueueCheckConnectionListener implements ConnectionListener { + public CountDownLatch latch = new CountDownLatch(1); + + @Override + public void connectionEvent(Connection conn, Events type) { + if (type == Events.RECONNECTED) { + latch.countDown(); + } + } + } + static class ReconnectQueueCheckSubscriber implements Runnable { final AtomicBoolean subscriberDone; final String subject;