diff --git a/test/integration/src/test/java/io/pravega/test/integration/AppendTest.java b/test/integration/src/test/java/io/pravega/test/integration/AppendTest.java index 6b85a0ea46..16e68c5350 100644 --- a/test/integration/src/test/java/io/pravega/test/integration/AppendTest.java +++ b/test/integration/src/test/java/io/pravega/test/integration/AppendTest.java @@ -90,6 +90,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; @@ -251,7 +252,7 @@ static Reply sendRequest(EmbeddedChannel channel, Request request) throws Except return (Reply) decoded; } - static EmbeddedChannel createChannel(StreamSegmentStore store) { + static EmbeddedChannel createChannel(StreamSegmentStore store, ScheduledExecutorService executorService) { ServerConnectionInboundHandler lsh = new ServerConnectionInboundHandler(); EmbeddedChannel channel = new EmbeddedChannel(new ExceptionLoggingHandler(""), new CommandEncoder(null, MetricNotifier.NO_OP_METRIC_NOTIFIER), @@ -259,7 +260,7 @@ static EmbeddedChannel createChannel(StreamSegmentStore store) { new CommandDecoder(), new AppendDecoder(), lsh); - IndexAppendProcessor indexAppendProcessor = new IndexAppendProcessor(EXECUTOR, store); + IndexAppendProcessor indexAppendProcessor = new IndexAppendProcessor(executorService, store); lsh.setRequestProcessor(AppendProcessor.defaultBuilder(indexAppendProcessor) .store(store) .connection(new TrackedConnection(lsh)) @@ -269,6 +270,10 @@ static EmbeddedChannel createChannel(StreamSegmentStore store) { return channel; } + static EmbeddedChannel createChannel(StreamSegmentStore store) { + return createChannel(store, EXECUTOR); + } + @Test(timeout = 10000) public void appendThroughSegmentClient() throws Exception { String endpoint = "localhost"; diff --git a/test/integration/src/test/java/io/pravega/test/integration/ReadTest.java b/test/integration/src/test/java/io/pravega/test/integration/ReadTest.java index 0f7617b2bb..d8e3da55bd 100644 --- a/test/integration/src/test/java/io/pravega/test/integration/ReadTest.java +++ b/test/integration/src/test/java/io/pravega/test/integration/ReadTest.java @@ -69,6 +69,7 @@ import io.pravega.shared.protocol.netty.WireCommands; import io.pravega.shared.protocol.netty.WireCommands.ReadSegment; import io.pravega.shared.protocol.netty.WireCommands.SegmentRead; +import io.pravega.test.common.InlineExecutor; import io.pravega.test.common.LeakDetectorTestSuite; import io.pravega.test.common.NoOpScheduledExecutor; import io.pravega.test.common.TestUtils; @@ -105,6 +106,7 @@ public class ReadTest extends LeakDetectorTestSuite { private static final int TIMEOUT_MILLIS = 60000; private static final ServiceBuilder SERVICE_BUILDER = ServiceBuilder.newInMemoryBuilder(ServiceBuilderConfig.getDefaultConfig()); + private static final InlineExecutor EXECUTOR = new InlineExecutor(); private final Consumer segmentSealedCallback = segment -> { }; @Rule public Timeout globalTimeout = Timeout.millis(TIMEOUT_MILLIS); @@ -115,8 +117,9 @@ public static void setup() throws Exception { } @AfterClass - public static void teardown() { + public static void tearDown() { SERVICE_BUILDER.close(); + EXECUTOR.shutdownNow(); } @Test(timeout = 10000) @@ -163,7 +166,7 @@ public void testReceivingReadCall() throws Exception { // fill segment store with 10 entries; the total data size is 100 bytes. fillStoreForSegment(segmentName, data, entries, segmentStore); @Cleanup - EmbeddedChannel channel = AppendTest.createChannel(segmentStore); + EmbeddedChannel channel = AppendTest.createChannel(segmentStore, EXECUTOR); ByteBuf actual = Unpooled.buffer(entries * data.length); while (actual.writerIndex() < actual.capacity()) {