Skip to content

Commit

Permalink
Issue 7402: Cherry-pick PR pravega#7371 to r0.14 (pravega#7403)
Browse files Browse the repository at this point in the history
Fixed flaky test io.pravega.test.integration.ReadTest.testReceivingReadCall 

Signed-off-by: dada-dell-emc <dadasaheb.patil@dell.com>
  • Loading branch information
dada-dell-emc authored Feb 21, 2024
1 parent e26174b commit c392c2e
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -251,15 +252,15 @@ static Reply sendRequest(EmbeddedChannel channel, Request request) throws Except
return (Reply) decoded;
}

static EmbeddedChannel createChannel(StreamSegmentStore store) {
static EmbeddedChannel createChannel(StreamSegmentStore store, ScheduledExecutorService executorService) {
ServerConnectionInboundHandler lsh = new ServerConnectionInboundHandler();
EmbeddedChannel channel = new EmbeddedChannel(new ExceptionLoggingHandler(""),
new CommandEncoder(null, MetricNotifier.NO_OP_METRIC_NOTIFIER),
new LengthFieldBasedFrameDecoder(MAX_WIRECOMMAND_SIZE, 4, 4),
new CommandDecoder(),
new AppendDecoder(),
lsh);
IndexAppendProcessor indexAppendProcessor = new IndexAppendProcessor(EXECUTOR, store);
IndexAppendProcessor indexAppendProcessor = new IndexAppendProcessor(executorService, store);
lsh.setRequestProcessor(AppendProcessor.defaultBuilder(indexAppendProcessor)
.store(store)
.connection(new TrackedConnection(lsh))
Expand All @@ -269,6 +270,10 @@ static EmbeddedChannel createChannel(StreamSegmentStore store) {
return channel;
}

static EmbeddedChannel createChannel(StreamSegmentStore store) {
return createChannel(store, EXECUTOR);
}

@Test(timeout = 10000)
public void appendThroughSegmentClient() throws Exception {
String endpoint = "localhost";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import io.pravega.shared.protocol.netty.WireCommands;
import io.pravega.shared.protocol.netty.WireCommands.ReadSegment;
import io.pravega.shared.protocol.netty.WireCommands.SegmentRead;
import io.pravega.test.common.InlineExecutor;
import io.pravega.test.common.LeakDetectorTestSuite;
import io.pravega.test.common.NoOpScheduledExecutor;
import io.pravega.test.common.TestUtils;
Expand Down Expand Up @@ -105,6 +106,7 @@ public class ReadTest extends LeakDetectorTestSuite {

private static final int TIMEOUT_MILLIS = 60000;
private static final ServiceBuilder SERVICE_BUILDER = ServiceBuilder.newInMemoryBuilder(ServiceBuilderConfig.getDefaultConfig());
private static final InlineExecutor EXECUTOR = new InlineExecutor();
private final Consumer<Segment> segmentSealedCallback = segment -> { };
@Rule
public Timeout globalTimeout = Timeout.millis(TIMEOUT_MILLIS);
Expand All @@ -115,8 +117,9 @@ public static void setup() throws Exception {
}

@AfterClass
public static void teardown() {
public static void tearDown() {
SERVICE_BUILDER.close();
EXECUTOR.shutdownNow();
}

@Test(timeout = 10000)
Expand Down Expand Up @@ -163,7 +166,7 @@ public void testReceivingReadCall() throws Exception {
// fill segment store with 10 entries; the total data size is 100 bytes.
fillStoreForSegment(segmentName, data, entries, segmentStore);
@Cleanup
EmbeddedChannel channel = AppendTest.createChannel(segmentStore);
EmbeddedChannel channel = AppendTest.createChannel(segmentStore, EXECUTOR);

ByteBuf actual = Unpooled.buffer(entries * data.length);
while (actual.writerIndex() < actual.capacity()) {
Expand Down

0 comments on commit c392c2e

Please sign in to comment.