Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing bug where initial subscription failure causes shard consumer to get stuck. #562

Merged
merged 3 commits into from
Jul 11, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class ShardConsumerSubscriber implements Subscriber<RecordsRetrieved> {

@VisibleForTesting
final Object lockObject = new Object();
// This holds the last time a request to upstream service is made including the first try to establish subscription.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually it is an attempt to request

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified the wording

private Instant lastRequestTime = null;
private RecordsRetrieved lastAccepted = null;

Expand Down Expand Up @@ -73,6 +74,9 @@ class ShardConsumerSubscriber implements Subscriber<RecordsRetrieved> {

void startSubscriptions() {
synchronized (lockObject) {
// Setting the lastRequestTime to allow for health checks to restart subscriptions if they failed to
// during initial try.
lastRequestTime = Instant.now();
Copy link
Contributor

@ashwing ashwing Jul 3, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Can you add a comment on why we setting this here?
  2. Also add a variable comment on lastRequestTime (Last time an upstream request(N) call made to the service)
  3. Refactor the places where lastRequestTime = Instant.now() followed by startSubscriptions()
  4. Check why shouldn't we set this variable on all request(N) calls made from this class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comments. Other points as discussed offline.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 139 needs to be removed.

if (lastAccepted != null) {
recordsPublisher.restartFrom(lastAccepted);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -47,7 +48,6 @@
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
Expand All @@ -64,7 +64,6 @@
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RecordsRetrieved;
import software.amazon.kinesis.retrieval.RetryableRetrievalException;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;

@Slf4j
Expand Down Expand Up @@ -311,6 +310,64 @@ public void restartAfterRequestTimerExpiresTest() throws Exception {

}

@Test
public void restartAfterRequestTimerExpiresAfterInitialSubscriptionFailsTest() throws Exception {
yatins47 marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test case is hard to follow and has lot of waits. We need to avoid sleeps. adding comments would be helpful. We can sync up offline to check if the test can be shortened.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have refactored the unit test and added comments to made it easier to read. It takes for it to 60 to 80 ms to run.


executorService = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
.setNameFormat("test-" + testName.getMethodName() + "-%04d").setDaemon(true).build());

// Mock record publisher which doesn't publish any records on first try which simulates any scenario which
// causes first subscription try to fail.
recordsPublisher = new RecordPublisherWithInitialFailureSubscription();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test case looks fine. But can we simulate the failure by mocking the executor service rather than the FailingPublisher, as that closely resembles the failure that we are actually trying to address. It should be a simple change though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, added a new test case specifically for Rejection execution exception from executor service.

subscriber = new ShardConsumerSubscriber(recordsPublisher, executorService, bufferSize, shardConsumer, 0);
addUniqueItem(1);

List<ProcessRecordsInput> received = new ArrayList<>();
doAnswer(a -> {
ProcessRecordsInput input = a.getArgumentAt(0, ProcessRecordsInput.class);
received.add(input);
if (input.records().stream().anyMatch(r -> StringUtils.startsWith(r.partitionKey(), TERMINAL_MARKER))) {
synchronized (processedNotifier) {
processedNotifier.notifyAll();
}
}
return null;
}).when(shardConsumer).handleInput(any(ProcessRecordsInput.class), any(Subscription.class));

// First try to start subscriptions.
synchronized (processedNotifier) {
subscriber.startSubscriptions();
}

// Verifying that there are no interactions with shardConsumer mock indicating no records were sent back and
// subscription has not started correctly.
verify(shardConsumer, never()).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)),
any(Subscription.class));

Stream.iterate(2, i -> i + 1).limit(98).forEach(this::addUniqueItem);

addTerminalMarker(2);

// Doing the health check to allow the subscription to restart.
assertThat(subscriber.healthCheck(1), nullValue());

// Allow time for processing of the records to end in the executor thread which call notifyAll as it gets the
// terminal record.
synchronized (processedNotifier) {
processedNotifier.wait(500);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is 500 millis guaranteed to be enough time to reach the terminal record, even under conditions of high CPU usage or other potential slowdowns? We don't want a test failure based on internals we have no control over.

Would mocking your executorService and telling it explicitly what to do on each call let you avoid this wait?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test here is actually about sending records from RecordPublishers to the Subscriber through the executor service. The avg time taken to receive all the records from the record publisher is around 60-80 ms hence 500 seems to be a good general timeout. I can update it to a higher value since this is a timeout and having it higher should not affect our test run time.

}

// Verify that shardConsumer mock was called 100 times and all 100 input records are processed.
verify(shardConsumer, times(100)).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)),
any(Subscription.class));

// Verify that received records in the subscriber are equal to the ones sent by the record publisher.
assertThat(received.size(), equalTo(recordsPublisher.responses.size()));
Stream.iterate(0, i -> i + 1).limit(received.size()).forEach(i -> assertThat(received.get(i),
yatins47 marked this conversation as resolved.
Show resolved Hide resolved
eqProcessRecordsInput(recordsPublisher.responses.get(i).recordsRetrieved.processRecordsInput())));

}

private void addUniqueItem(int id) {
RecordsRetrieved r = mock(RecordsRetrieved.class, "Record-" + id);
ProcessRecordsInput input = ProcessRecordsInput.builder().cacheEntryTime(Instant.now())
Expand Down Expand Up @@ -373,9 +430,9 @@ public ResponseItem(@NonNull Throwable throwable) {
private class TestPublisher implements RecordsPublisher {

private final LinkedList<ResponseItem> responses = new LinkedList<>();
private volatile long requested = 0;
protected volatile long requested = 0;
private int currentIndex = 0;
private Subscriber<? super RecordsRetrieved> subscriber;
protected Subscriber<? super RecordsRetrieved> subscriber;
private RecordsRetrieved restartedFrom;

void add(ResponseItem... toAdd) {
Expand Down Expand Up @@ -448,6 +505,29 @@ public void cancel() {
}
}

private class RecordPublisherWithInitialFailureSubscription extends TestPublisher {
private int subscriptionTryCount = 0;

@Override
public void subscribe(Subscriber<? super RecordsRetrieved> s) {
subscriber = s;
++subscriptionTryCount;
s.onSubscribe(new Subscription() {
@Override
public void request(long n) {
if (subscriptionTryCount != 1) {
send(n);
}
}

@Override
public void cancel() {
requested = 0;
}
});
}
}

class TestShardConsumerSubscriber extends ShardConsumerSubscriber {

private int genericWarningLogged = 0;
Expand Down Expand Up @@ -685,4 +765,4 @@ private void mimicException(Exception exceptionToThrow, TestShardConsumerSubscri
consumer.startSubscriptions();
}

}
}