Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix to shutdown PrefetchRecordsPublisher in gracefull manner #857

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@
@KinesisClientInternalApi
public class PrefetchRecordsPublisher implements RecordsPublisher {
private static final String EXPIRED_ITERATOR_METRIC = "ExpiredIterator";
// Since this package is being used by all KCL clients keeping the upper threshold of 60 seconds
private static final long DEFAULT_AWAIT_TERMINATION_TIMEOUT_MILLIS = 60_000L;

private int maxPendingProcessRecordsInput;
private int maxByteSize;
private int maxRecordsCount;
Expand All @@ -93,6 +96,7 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
private final String operation;
private final StreamIdentifier streamId;
private final String streamAndShardId;
private final long awaitTerminationTimeoutMillis;
private Subscriber<? super RecordsRetrieved> subscriber;
@VisibleForTesting @Getter
private final PublisherSession publisherSession;
Expand Down Expand Up @@ -201,6 +205,7 @@ private void updateDemandTrackersOnPublish(PrefetchRecordsRetrieved result) {
* @param getRecordsRetrievalStrategy Retrieval strategy for the get records call
* @param executorService Executor service for the cache
* @param idleMillisBetweenCalls maximum time to wait before dispatching the next get records call
* @param awaitTerminationTimeoutMillis maximum time to wait for graceful shutdown of executorService
*/
public PrefetchRecordsPublisher(final int maxPendingProcessRecordsInput, final int maxByteSize, final int maxRecordsCount,
final int maxRecordsPerCall,
Expand All @@ -209,7 +214,8 @@ public PrefetchRecordsPublisher(final int maxPendingProcessRecordsInput, final i
final long idleMillisBetweenCalls,
@NonNull final MetricsFactory metricsFactory,
@NonNull final String operation,
@NonNull final String shardId) {
@NonNull final String shardId,
final long awaitTerminationTimeoutMillis) {
this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy;
this.maxRecordsPerCall = maxRecordsPerCall;
this.maxPendingProcessRecordsInput = maxPendingProcessRecordsInput;
Expand All @@ -225,6 +231,36 @@ public PrefetchRecordsPublisher(final int maxPendingProcessRecordsInput, final i
this.operation = operation;
this.streamId = this.getRecordsRetrievalStrategy.dataFetcher().getStreamIdentifier();
this.streamAndShardId = this.streamId.serialize() + ":" + shardId;
this.awaitTerminationTimeoutMillis = awaitTerminationTimeoutMillis;
}

/**
* Constructor for the PrefetchRecordsPublisher. This cache prefetches records from Kinesis and stores them in a
* LinkedBlockingQueue.
*
* @see PrefetchRecordsPublisher
*
* @param maxPendingProcessRecordsInput Max number of ProcessRecordsInput that can be held in the cache before
* blocking
* @param maxByteSize Max byte size of the queue before blocking next get records call
* @param maxRecordsCount Max number of records in the queue across all ProcessRecordInput objects
* @param maxRecordsPerCall Max records to be returned per call
* @param getRecordsRetrievalStrategy Retrieval strategy for the get records call
* @param executorService Executor service for the cache
* @param idleMillisBetweenCalls maximum time to wait before dispatching the next get records call
*/
public PrefetchRecordsPublisher(final int maxPendingProcessRecordsInput, final int maxByteSize, final int maxRecordsCount,
final int maxRecordsPerCall,
final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy,
final ExecutorService executorService,
final long idleMillisBetweenCalls,
final MetricsFactory metricsFactory,
final String operation,
final String shardId) {
this(maxPendingProcessRecordsInput, maxByteSize, maxRecordsCount, maxRecordsPerCall,
getRecordsRetrievalStrategy, executorService, idleMillisBetweenCalls,
metricsFactory, operation, shardId,
DEFAULT_AWAIT_TERMINATION_TIMEOUT_MILLIS);
}

@Override
Expand Down Expand Up @@ -260,7 +296,21 @@ private PrefetchRecordsRetrieved peekNextResult() {
@Override
public void shutdown() {
defaultGetRecordsCacheDaemon.isShutdown = true;
executorService.shutdownNow();
executorService.shutdown();
try {
if (!executorService.awaitTermination(awaitTerminationTimeoutMillis, TimeUnit.MILLISECONDS)) {
executorService.shutdownNow();
// Wait a while for tasks to respond to being cancelled
if (!executorService.awaitTermination(awaitTerminationTimeoutMillis, TimeUnit.MILLISECONDS)) {
log.error("Executor service didn't terminate");
}
}
} catch (InterruptedException e) {
// (Re-)Cancel if current thread also interrupted
executorService.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
started = false;
}

Expand Down Expand Up @@ -409,12 +459,15 @@ public void run() {
break;
}

resetLock.readLock().lock();
try {
resetLock.readLock().lock();
makeRetrievalAttempt();
} catch(PositionResetException pre) {
log.debug("{} : Position was reset while attempting to add item to queue.", streamAndShardId);
} catch (Throwable e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why we wrote this exception handling this way originally, but I feel like making this layer InterruptedException and making the final layer Throwable would be a bit more elegant

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are going to log the error message regardless of the InterruptedException and Throwable. Its good to handle InterruptedException inside the Throwable catch block or else we may simply create a separate function for log message.

if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
log.error("{} : Unexpected exception was thrown. This could probably be an issue or a bug." +
" Please search for the exception/error online to check what is going on. If the " +
"issue persists or is a recurring problem, feel free to open an issue on, " +
Expand Down Expand Up @@ -456,6 +509,7 @@ private void makeRetrievalAttempt() {
} catch (RetryableRetrievalException rre) {
log.info("{} : Timeout occurred while waiting for response from Kinesis. Will retry the request.", streamAndShardId);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.info("{} : Thread was interrupted, indicating shutdown was called on the cache.", streamAndShardId);
} catch (ExpiredIteratorException e) {
log.info("{} : records threw ExpiredIteratorException - restarting"
Expand All @@ -482,6 +536,7 @@ private void makeRetrievalAttempt() {
try {
publisherSession.prefetchCounters().waitForConsumer();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
log.info("{} : Thread was interrupted while waiting for the consumer. " +
"Shutdown has probably been started", streamAndShardId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public class PrefetchRecordsPublisherIntegrationTest {
private static final int MAX_RECORDS_COUNT = 30_000;
private static final int MAX_RECORDS_PER_CALL = 10_000;
private static final long IDLE_MILLIS_BETWEEN_CALLS = 500L;
private static final long AWAIT_TERMINATION_TIMEOUT = 1L;
private static final MetricsFactory NULL_METRICS_FACTORY = new NullMetricsFactory();

private PrefetchRecordsPublisher getRecordsCache;
Expand Down Expand Up @@ -121,7 +122,8 @@ public void setup() throws Exception {
IDLE_MILLIS_BETWEEN_CALLS,
new NullMetricsFactory(),
operation,
"test-shard");
"test-shard",
AWAIT_TERMINATION_TIMEOUT);
}

@Test
Expand Down Expand Up @@ -174,7 +176,8 @@ public void testDifferentShardCaches() {
IDLE_MILLIS_BETWEEN_CALLS,
new NullMetricsFactory(),
operation,
"test-shard-2");
"test-shard-2",
AWAIT_TERMINATION_TIMEOUT);

getRecordsCache.start(extendedSequenceNumber, initialPosition);
sleep(IDLE_MILLIS_BETWEEN_CALLS);
Expand Down Expand Up @@ -254,7 +257,7 @@ private RecordsRetrieved evictPublishedEvent(PrefetchRecordsPublisher publisher,
public void shutdown() {
getRecordsCache.shutdown();
sleep(100L);
verify(executorService).shutdownNow();
verify(executorService).shutdown();
// verify(getRecordsRetrievalStrategy).shutdown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public class PrefetchRecordsPublisherTest {
private static final int MAX_SIZE = 5;
private static final int MAX_RECORDS_COUNT = 15000;
private static final long IDLE_MILLIS_BETWEEN_CALLS = 0L;
private static final long AWAIT_TERMINATION_TIMEOUT = 1L;
private static final String NEXT_SHARD_ITERATOR = "testNextShardIterator";

@Mock
Expand Down Expand Up @@ -143,7 +144,8 @@ public void setup() {
IDLE_MILLIS_BETWEEN_CALLS,
new NullMetricsFactory(),
operation,
"shardId");
"shardId",
AWAIT_TERMINATION_TIMEOUT);
spyQueue = spy(getRecordsCache.getPublisherSession().prefetchRecordsQueue());
records = spy(new ArrayList<>());
getRecordsResponse = GetRecordsResponse.builder().records(records).nextShardIterator(NEXT_SHARD_ITERATOR).childShards(new ArrayList<>()).build();
Expand Down Expand Up @@ -224,7 +226,8 @@ public void testGetRecordsWithInitialFailures_LessThanRequiredWait_Throws() {
1000,
new NullMetricsFactory(),
operation,
"shardId");
"shardId",
AWAIT_TERMINATION_TIMEOUT);
// Setup the retrieval strategy to fail initial calls before succeeding
when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_CALL))).thenThrow(new
RetryableRetrievalException("Timed out")).thenThrow(new
Expand Down Expand Up @@ -258,7 +261,8 @@ public void testGetRecordsWithInitialFailures_AdequateWait_Success() {
1000,
new NullMetricsFactory(),
operation,
"shardId");
"shardId",
AWAIT_TERMINATION_TIMEOUT);
// Setup the retrieval strategy to fail initial calls before succeeding
when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_CALL))).thenThrow(new
RetryableRetrievalException("Timed out")).thenThrow(new
Expand Down Expand Up @@ -770,7 +774,7 @@ public void onNext(RecordsRetrieved recordsRetrieved) {
@After
public void shutdown() {
getRecordsCache.shutdown();
verify(executorService).shutdownNow();
verify(executorService).shutdown();
}

private void sleep(long millis) {
Expand Down