-
Notifications
You must be signed in to change notification settings - Fork 467
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixing bug where initial subscription failure causes shard consumer to get stuck. #562
Conversation
...esis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java
Outdated
Show resolved
Hide resolved
...esis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java
Outdated
Show resolved
Hide resolved
...esis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java
Show resolved
Hide resolved
...esis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a few minor comments but mostly LGTM 👍
@@ -73,6 +73,7 @@ | |||
|
|||
void startSubscriptions() { | |||
synchronized (lockObject) { | |||
lastRequestTime = Instant.now(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Can you add a comment on why we setting this here?
- Also add a variable comment on lastRequestTime (Last time an upstream request(N) call made to the service)
- Refactor the places where lastRequestTime = Instant.now() followed by startSubscriptions()
- Check why shouldn't we set this variable on all request(N) calls made from this class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comments. Other points as discussed offline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line 139 needs to be removed.
@@ -311,6 +309,80 @@ public void restartAfterRequestTimerExpiresTest() throws Exception { | |||
|
|||
} | |||
|
|||
@Test | |||
public void restartAfterRequestTimerExpiresAfterInitialSubscriptionFailsTest() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test case is hard to follow and has lot of waits. We need to avoid sleeps. adding comments would be helpful. We can sync up offline to check if the test can be shortened.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have refactored the unit test and added comments to made it easier to read. It takes for it to 60 to 80 ms to run.
// Allow time for processing of the records to end in the executor thread which call notifyAll as it gets the | ||
// terminal record. | ||
synchronized (processedNotifier) { | ||
processedNotifier.wait(500); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is 500 millis guaranteed to be enough time to reach the terminal record, even under conditions of high CPU usage or other potential slowdowns? We don't want a test failure based on internals we have no control over.
Would mocking your executorService and telling it explicitly what to do on each call let you avoid this wait?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test here is actually about sending records from RecordPublishers to the Subscriber through the executor service. The avg time taken to receive all the records from the record publisher is around 60-80 ms hence 500 seems to be a good general timeout. I can update it to a higher value since this is a timeout and having it higher should not affect our test run time.
@@ -44,6 +44,7 @@ | |||
|
|||
@VisibleForTesting | |||
final Object lockObject = new Object(); | |||
// This holds the last time a request to upstream service is made including the first try to establish subscription. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually it is an attempt to request
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified the wording
|
||
// Mock record publisher which doesn't publish any records on first try which simulates any scenario which | ||
// causes first subscription try to fail. | ||
recordsPublisher = new RecordPublisherWithInitialFailureSubscription(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test case looks fine. But can we simulate the failure by mocking the executor service rather than the FailingPublisher, as that closely resembles the failure that we are actually trying to address. It should be a simple change though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed offline, added a new test case specifically for Rejection execution exception from executor service.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good. Minor refactoring of code and test required.
… exception from executor service.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Fixing bug where initial subscription failure causes shard consumer to get stuck.
Issue #, if available:
Description of changes:
This change is to fix the scenario where the initial SubscribeToShard call failing silently in the SDK layer during the state transition and KCL will be not be able to set the variables which causes the health check to retry the failure. The shard consumer will hold the lease and get stuck without making any progress.
The fix is to update the lastRequestTime variable when we start the subscription to get the health check work in case the initial SubscribeToShard call fail and we don't make progress.
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.