Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix to shutdown PrefetchRecordsPublisher in gracefull manner #857

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@
@KinesisClientInternalApi
public class PrefetchRecordsPublisher implements RecordsPublisher {
private static final String EXPIRED_ITERATOR_METRIC = "ExpiredIterator";
// Since this package is being used by all KCL clients keeping the upper threshold of 60 seconds
private static final Duration AWAIT_TERMINATION_TIMEOUT = Duration.ofSeconds(60);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How did we get this value? Was there tests run?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeping this as upper threshold value for a prefetcher to end gracefully or else we will forcefully terminate.


private int maxPendingProcessRecordsInput;
private int maxByteSize;
private int maxRecordsCount;
Expand Down Expand Up @@ -260,7 +263,21 @@ private PrefetchRecordsRetrieved peekNextResult() {
@Override
public void shutdown() {
defaultGetRecordsCacheDaemon.isShutdown = true;
executorService.shutdownNow();
executorService.shutdown();
try {
if (!executorService.awaitTermination(AWAIT_TERMINATION_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)) {
executorService.shutdownNow();
// Wait a while for tasks to respond to being cancelled
if (!executorService.awaitTermination(AWAIT_TERMINATION_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)) {
log.error("Executor service didn't terminate");
}
}
} catch (InterruptedException e) {
// Preserve interrupt status
Thread.currentThread().interrupt();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we set the interrupt flag after shutting down the thread-pool in the following line?

Copy link
Contributor Author

@madagascar22 madagascar22 Oct 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yes, we need to shutdown and then set the interrupt or else shutdown will fail. I will update the same.

// (Re-)Cancel if current thread also interrupted
executorService.shutdownNow();
}
started = false;
}

Expand Down Expand Up @@ -409,12 +426,15 @@ public void run() {
break;
}

resetLock.readLock().lock();
try {
resetLock.readLock().lock();
makeRetrievalAttempt();
} catch(PositionResetException pre) {
log.debug("{} : Position was reset while attempting to add item to queue.", streamAndShardId);
} catch (Throwable e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why we wrote this exception handling this way originally, but I feel like making this layer InterruptedException and making the final layer Throwable would be a bit more elegant

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are going to log the error message regardless of the InterruptedException and Throwable. Its good to handle InterruptedException inside the Throwable catch block or else we may simply create a separate function for log message.

if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
log.error("{} : Unexpected exception was thrown. This could probably be an issue or a bug." +
" Please search for the exception/error online to check what is going on. If the " +
"issue persists or is a recurring problem, feel free to open an issue on, " +
Expand Down Expand Up @@ -456,6 +476,7 @@ private void makeRetrievalAttempt() {
} catch (RetryableRetrievalException rre) {
log.info("{} : Timeout occurred while waiting for response from Kinesis. Will retry the request.", streamAndShardId);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.info("{} : Thread was interrupted, indicating shutdown was called on the cache.", streamAndShardId);
} catch (ExpiredIteratorException e) {
log.info("{} : records threw ExpiredIteratorException - restarting"
Expand All @@ -482,6 +503,7 @@ private void makeRetrievalAttempt() {
try {
publisherSession.prefetchCounters().waitForConsumer();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
log.info("{} : Thread was interrupted while waiting for the consumer. " +
"Shutdown has probably been started", streamAndShardId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ private RecordsRetrieved evictPublishedEvent(PrefetchRecordsPublisher publisher,
public void shutdown() {
getRecordsCache.shutdown();
sleep(100L);
verify(executorService).shutdownNow();
verify(executorService).shutdown();
// verify(getRecordsRetrievalStrategy).shutdown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,7 @@ public void onNext(RecordsRetrieved recordsRetrieved) {
@After
public void shutdown() {
getRecordsCache.shutdown();
verify(executorService).shutdownNow();
verify(executorService).shutdown();
}

private void sleep(long millis) {
Expand Down