Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle No Lease, and Lease Loss in requestShutdown. #139

Merged
merged 3 commits into from
Feb 17, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ static class ShutdownNotificationState implements ConsumerState {
@Override
public ITask createTask(ShardConsumer consumer) {
return new ShutdownNotificationTask(consumer.getRecordProcessor(), consumer.getRecordProcessorCheckpointer(),
consumer.getShutdownNotification());
consumer.getShutdownNotification(), consumer.getShardInfo());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ class ShutdownNotificationTask implements ITask {
private final IRecordProcessor recordProcessor;
private final IRecordProcessorCheckpointer recordProcessorCheckpointer;
private final ShutdownNotification shutdownNotification;
private final ShardInfo shardInfo;

ShutdownNotificationTask(IRecordProcessor recordProcessor, IRecordProcessorCheckpointer recordProcessorCheckpointer, ShutdownNotification shutdownNotification) {
ShutdownNotificationTask(IRecordProcessor recordProcessor, IRecordProcessorCheckpointer recordProcessorCheckpointer, ShutdownNotification shutdownNotification, ShardInfo shardInfo) {
this.recordProcessor = recordProcessor;
this.recordProcessorCheckpointer = recordProcessorCheckpointer;
this.shutdownNotification = shutdownNotification;
this.shardInfo = shardInfo;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,15 +538,18 @@ private List<ShardInfo> getShardInfoForAssignments() {
*/
public Future<Void> requestShutdown() {

leaseCoordinator.stopLeaseTaker();
//
// Stop accepting new leases
// Stop accepting new leases. Once we do this we can be sure that
// no more leases will be acquired.
//
leaseCoordinator.stopLeaseTaker();

Collection<KinesisClientLease> leases = leaseCoordinator.getAssignments();
if (leases == null || leases.isEmpty()) {
//
// If there are no leases shutdown is already completed.
// If there are no leases notification is already completed, but we still need to shutdown the worker.
//
this.shutdown();
return Futures.immediateFuture(null);
}
CountDownLatch shutdownCompleteLatch = new CountDownLatch(leases.size());
Expand All @@ -555,7 +558,18 @@ public Future<Void> requestShutdown() {
ShutdownNotification shutdownNotification = new ShardConsumerShutdownNotification(leaseCoordinator, lease,
notificationCompleteLatch, shutdownCompleteLatch);
ShardInfo shardInfo = KinesisClientLibLeaseCoordinator.convertLeaseToAssignment(lease);
shardInfoShardConsumerMap.get(shardInfo).notifyShutdownRequested(shutdownNotification);
ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo);
if (consumer != null) {
consumer.notifyShutdownRequested(shutdownNotification);
} else {
//
// There is a race condition between retrieving the current assignments, and creating the
// notification. If the a lease is lost in between these two points, we explicitly decrement the
// notification latches to clear the shutdown.
//
notificationCompleteLatch.countDown();
shutdownCompleteLatch.countDown();
}
}

return new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, this);
Expand Down Expand Up @@ -622,9 +636,11 @@ private void finalShutdown() {
/**
* Returns whether worker can shutdown immediately. Note that this method is called from Worker's {{@link #run()}
* method before every loop run, so method must do minimum amount of work to not impact shard processing timings.
*
* @return Whether worker should shutdown immediately.
*/
private boolean shouldShutdown() {
@VisibleForTesting
boolean shouldShutdown() {
if (executorService.isShutdown()) {
LOG.error("Worker executor service has been shutdown, so record processors cannot be shutdown.");
return true;
Expand Down
Loading