Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restart from #492

Merged
merged 13 commits into from
Feb 7, 2019
Merged

Restart from #492

merged 13 commits into from
Feb 7, 2019

Conversation

pfifer
Copy link
Contributor

@pfifer pfifer commented Feb 6, 2019

Issue #, if available:

Description of changes:
If something goes wrong between the ShardConsumer, and the RecordsPublisher it's possible that the ShardConsumer may cancel the connection to the RecordsPublisher. RecordsPublisher's are stateful so it's possible the state of RecordPublisher has advanced further through the shard than the ShardConsumer has actually processed. This could cause the ShardConsumer to skip records.

The ShardConsumer will now be able to ask the RecordsPublisher to restart from a specific location, instead of restarting from the last retrieved batch of records.

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

@pfifer pfifer added this to the 2.1.1 milestone Feb 6, 2019
@pfifer pfifer requested a review from sahilpalvia February 6, 2019 16:17
After a failure the KCL would instead restart from what the
ShardConsumer says it last processed.
Extracted the InternalSubscriber to ShardConsumerSubscriber to make
testing easier. Added tests for the ShardConsumerSubscriber that
verifies error handling and other components of the class.

Added tests that verify the restart from behavior.
When the blocking queue is full it would normally enter into a fully
parked state, but would continue to hold the lock.

This changes the process to only block for a second when attempting to
enqueue a response, and if it doesn't succeed check to see if it's
been reset before attempting again.
Changed the locking around the restart to use a reader/writer lock
instead of single lock with a yield.

Changed how the fetcher is reset to not restart from an
advanceIteratorTo which would retrieve a new shard iterator.  Instead
the resetIterator method takes both the iterator to start from, the
last accepted sequence number, and the initial position.
Changed the test to wait for the queue to reach capacity before
restarting the PrefetchRecordsPublisher.  This should mostly ensure
that calling restartFrom will trigger a throw of a
PositionResetException.

Added @VisibleFortest on the queue since it was already being used in testing.
In the test the TestPublisher is accessed from two threads: the test
thread, and the dispatch thread.  Both have the possibility of calling
send() under certain conditions.  This changes it so that only one of
the threads can actively be sending data at a time.

TestPublisher#requested was changed to volatile to ensure that calling
cancel can correctly set it to zero.
This test is somewhat of an odd case as it intends to test what
happens when nothing is dispatched to the ShardConsumerSubcriber for
some amount of time, but data is queued for dispatch.  To do this we
block the single thread of the executor with a lock to ensure that
items pile up in the queue so that should the restart work incorrectly
we will see lost data.
@pfifer pfifer modified the milestones: 2.1.1, 2.1.2 Feb 6, 2019
@sahilpalvia sahilpalvia merged commit fd0cb8e into awslabs:master Feb 7, 2019
pfifer added a commit to pfifer/amazon-kinesis-client that referenced this pull request Feb 18, 2019
https://github.com/awslabs/amazon-kinesis-client/milestone/29
* Fixed handling of the progress detection in the `ShardConsumer` to restart from the last accepted record, instead of the last queued record.
  * awslabs#492
* Fixed handling of exceptions when using polling so that it will no longer treat `SdkException`s as an unexpected exception.
  * awslabs#497
  * awslabs#502
* Fixed a case where lease loss would block the `Scheduler` while waiting for a record processor's `processRecords` method to complete.
  * awslabs#501
sahilpalvia pushed a commit that referenced this pull request Feb 18, 2019
https://github.com/awslabs/amazon-kinesis-client/milestone/29
* Fixed handling of the progress detection in the `ShardConsumer` to restart from the last accepted record, instead of the last queued record.
  * #492
* Fixed handling of exceptions when using polling so that it will no longer treat `SdkException`s as an unexpected exception.
  * #497
  * #502
* Fixed a case where lease loss would block the `Scheduler` while waiting for a record processor's `processRecords` method to complete.
  * #501
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants