File tree Expand file tree Collapse file tree 1 file changed +5
-3
lines changed
src/test/java/com/rabbitmq/stream/impl Expand file tree Collapse file tree 1 file changed +5
-3
lines changed Original file line number Diff line number Diff line change @@ -243,9 +243,9 @@ void consumeWithAsyncConsumerFlowControl() throws Exception {
243243 void asynchronousProcessingWithFlowControl () {
244244 int messageCount = 100_000 ;
245245 publishAndWaitForConfirms (cf , messageCount , stream );
246-
247- try ( ExecutorService executorService =
248- Executors . newFixedThreadPool ( getRuntime (). availableProcessors ())) {
246+ ExecutorService executorService =
247+ Executors . newFixedThreadPool ( getRuntime (). availableProcessors ());
248+ try {
249249 CountDownLatch latch = new CountDownLatch (messageCount );
250250 environment .consumerBuilder ().stream (stream )
251251 .offset (OffsetSpecification .first ())
@@ -261,6 +261,8 @@ void asynchronousProcessingWithFlowControl() {
261261 }))
262262 .build ();
263263 assertThat (latch ).is (completed ());
264+ } finally {
265+ executorService .shutdownNow ();
264266 }
265267 }
266268
You can’t perform that action at this time.
0 commit comments