Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure subscription startup for restartAfterRequestTimerExpires tests #1084

Merged
merged 6 commits into from
Mar 24, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ public class ShardConsumerSubscriberTest {

private static final String TERMINAL_MARKER = "Terminal";

private static final long DEFAULT_NOTIFIER_TIMEOUT = 5000L;

private final RequestDetails lastSuccessfulRequestDetails = new RequestDetails();

@Mock
Expand Down Expand Up @@ -125,10 +127,7 @@ public void singleItemTest() throws Exception {

setupNotifierAnswer(1);

synchronized (processedNotifier) {
subscriber.startSubscriptions();
processedNotifier.wait(5000);
}
startSubscriptionsAndWait();

verify(shardConsumer).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), any(Subscription.class));
}
Expand All @@ -139,10 +138,7 @@ public void multipleItemTest() throws Exception {

setupNotifierAnswer(recordsPublisher.responses.size());

synchronized (processedNotifier) {
subscriber.startSubscriptions();
processedNotifier.wait(5000);
}
startSubscriptionsAndWait();

verify(shardConsumer, times(100)).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), any(Subscription.class));
}
Expand Down Expand Up @@ -171,10 +167,7 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
}
}).when(shardConsumer).handleInput(any(ProcessRecordsInput.class), any(Subscription.class));

synchronized (processedNotifier) {
subscriber.startSubscriptions();
processedNotifier.wait(5000);
}
startSubscriptionsAndWait();

assertThat(subscriber.getAndResetDispatchFailure(), equalTo(testException));
assertThat(subscriber.getAndResetDispatchFailure(), nullValue());
Expand All @@ -192,10 +185,7 @@ public void onErrorStopsProcessingTest() throws Exception {

setupNotifierAnswer(10);

synchronized (processedNotifier) {
subscriber.startSubscriptions();
processedNotifier.wait(5000);
}
startSubscriptionsAndWait();

for (int attempts = 0; attempts < 10; attempts++) {
if (subscriber.retrievalFailure() != null) {
Expand All @@ -220,10 +210,7 @@ public void restartAfterErrorTest() throws Exception {

setupNotifierAnswer(10);

synchronized (processedNotifier) {
subscriber.startSubscriptions();
processedNotifier.wait(5000);
}
startSubscriptionsAndWait();

for (int attempts = 0; attempts < 10; attempts++) {
if (subscriber.retrievalFailure() != null) {
Expand All @@ -236,7 +223,7 @@ public void restartAfterErrorTest() throws Exception {

synchronized (processedNotifier) {
assertThat(subscriber.healthCheck(100000), equalTo(expected));
processedNotifier.wait(5000);
processedNotifier.wait(DEFAULT_NOTIFIER_TIMEOUT);
}

assertThat(recordsPublisher.restartedFrom, equalTo(edgeRecord));
Expand Down Expand Up @@ -267,10 +254,7 @@ public void restartAfterRequestTimerExpiresTest() throws Exception {
return null;
}).when(shardConsumer).handleInput(any(ProcessRecordsInput.class), any(Subscription.class));

synchronized (processedNotifier) {
subscriber.startSubscriptions();
processedNotifier.wait(5000);
}
startSubscriptionsAndWait();

synchronized (processedNotifier) {
executorService.execute(() -> {
Expand All @@ -290,7 +274,7 @@ public void restartAfterRequestTimerExpiresTest() throws Exception {
//
// Wait for our blocking thread to control the thread in the executor.
//
processedNotifier.wait(5000);
processedNotifier.wait(DEFAULT_NOTIFIER_TIMEOUT);
}

Stream.iterate(2, i -> i + 1).limit(97).forEach(this::addUniqueItem);
Expand All @@ -301,7 +285,7 @@ public void restartAfterRequestTimerExpiresTest() throws Exception {
assertThat(subscriber.healthCheck(1), nullValue());
barrier.await(500, TimeUnit.MILLISECONDS);

processedNotifier.wait(5000);
processedNotifier.wait(DEFAULT_NOTIFIER_TIMEOUT);
}

verify(shardConsumer, times(100)).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), any(Subscription.class));
Expand Down Expand Up @@ -337,9 +321,7 @@ public void restartAfterRequestTimerExpiresWhenNotGettingRecordsAfterInitializat
}).when(shardConsumer).handleInput(any(ProcessRecordsInput.class), any(Subscription.class));

// First try to start subscriptions.
synchronized (processedNotifier) {
subscriber.startSubscriptions();
}
startSubscriptionsAndWait(100);

// Verifying that there are no interactions with shardConsumer mock indicating no records were sent back and
// subscription has not started correctly.
Expand Down Expand Up @@ -397,9 +379,7 @@ public void restartAfterRequestTimerExpiresWhenInitialTaskExecutionIsRejected()
}).when(shardConsumer).handleInput(any(ProcessRecordsInput.class), any(Subscription.class));

// First try to start subscriptions.
synchronized (processedNotifier) {
subscriber.startSubscriptions();
}
startSubscriptionsAndWait(100);

// Verifying that there are no interactions with shardConsumer mock indicating no records were sent back and
// subscription has not started correctly.
Expand Down Expand Up @@ -478,6 +458,17 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
}).when(shardConsumer).handleInput(any(ProcessRecordsInput.class), any(Subscription.class));
}

private void startSubscriptionsAndWait() throws InterruptedException {
startSubscriptionsAndWait(DEFAULT_NOTIFIER_TIMEOUT);
}

private void startSubscriptionsAndWait(long timeout) throws InterruptedException {
synchronized (processedNotifier) {
subscriber.startSubscriptions();
processedNotifier.wait(timeout);
}
}

private class ResponseItem {
private final RecordsRetrieved recordsRetrieved;
private final Throwable throwable;
Expand Down