From 73426bd73347ecae0e19626d79fa014d3dc225e7 Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Tue, 24 Oct 2017 09:13:19 -0700 Subject: [PATCH] Don't Sleep for During Retrieval for the BlockingGetRecordsCache The BlockingGetRecordsCache shouldn't sleep when retrieving records as backoff is provided in other parts of the ShardConumer. --- .../lib/worker/BlockingGetRecordsCache.java | 29 ++----------------- .../worker/SimpleRecordsFetcherFactory.java | 3 +- .../worker/BlockingGetRecordsCacheTest.java | 3 +- .../lib/worker/ShardConsumerTest.java | 6 ++-- 4 files changed, 6 insertions(+), 35 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java index d9fc011e9..021d886b6 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java @@ -31,15 +31,11 @@ public class BlockingGetRecordsCache implements GetRecordsCache { private final int maxRecordsPerCall; private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; - private final long idleMillisBetweenCalls; - private Instant lastSuccessfulCall; public BlockingGetRecordsCache(final int maxRecordsPerCall, - final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, - final long idleMillisBetweenCalls) { + final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { this.maxRecordsPerCall = maxRecordsPerCall; this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; - this.idleMillisBetweenCalls = idleMillisBetweenCalls; } @Override @@ -51,33 +47,12 @@ public void start() { @Override public ProcessRecordsInput getNextResult() { - sleepBeforeNextCall(); GetRecordsResult getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall); - lastSuccessfulCall = Instant.now(); - ProcessRecordsInput processRecordsInput = new ProcessRecordsInput() + return new ProcessRecordsInput() .withRecords(getRecordsResult.getRecords()) .withMillisBehindLatest(getRecordsResult.getMillisBehindLatest()); - return processRecordsInput; } - private void sleepBeforeNextCall() { - if (!Thread.interrupted()) { - if (lastSuccessfulCall == null) { - return; - } - long timeSinceLastCall = Duration.between(lastSuccessfulCall, Instant.now()).abs().toMillis(); - if (timeSinceLastCall < idleMillisBetweenCalls) { - try { - Thread.sleep(idleMillisBetweenCalls - timeSinceLastCall); - } catch (InterruptedException e) { - log.info("Thread was interrupted, indicating that shutdown was called."); - } - } - } else { - log.info("Thread has been interrupted, indicating that it is in the shutdown phase."); - } - } - @Override public GetRecordsRetrievalStrategy getGetRecordsRetrievalStrategy() { return getRecordsRetrievalStrategy; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java index e6c9f3b0e..44c93e7b6 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java @@ -28,7 +28,6 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { private int maxRecordsCount = 30000; private long idleMillisBetweenCalls = 1500L; private DataFetchingStrategy dataFetchingStrategy = DataFetchingStrategy.DEFAULT; - private IMetricsFactory metricsFactory; public SimpleRecordsFetcherFactory(int maxRecords) { this.maxRecords = maxRecords; @@ -37,7 +36,7 @@ public SimpleRecordsFetcherFactory(int maxRecords) { @Override public GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId, IMetricsFactory metricsFactory) { if(dataFetchingStrategy.equals(DataFetchingStrategy.DEFAULT)) { - return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy, idleMillisBetweenCalls); + return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy); } else { return new PrefetchGetRecordsCache(maxPendingProcessRecordsInput, maxByteSize, maxRecordsCount, maxRecords, getRecordsRetrievalStrategy, diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCacheTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCacheTest.java index 731c46531..0636baea6 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCacheTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCacheTest.java @@ -40,7 +40,6 @@ @RunWith(MockitoJUnitRunner.class) public class BlockingGetRecordsCacheTest { private static final int MAX_RECORDS_PER_COUNT = 10_000; - private static final long IDLE_MILLIS_BETWEEN_CALLS = 500L; @Mock private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; @@ -53,7 +52,7 @@ public class BlockingGetRecordsCacheTest { @Before public void setup() { records = new ArrayList<>(); - blockingGetRecordsCache = new BlockingGetRecordsCache(MAX_RECORDS_PER_COUNT, getRecordsRetrievalStrategy, IDLE_MILLIS_BETWEEN_CALLS); + blockingGetRecordsCache = new BlockingGetRecordsCache(MAX_RECORDS_PER_COUNT, getRecordsRetrievalStrategy); when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_COUNT))).thenReturn(getRecordsResult); when(getRecordsResult.getRecords()).thenReturn(records); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index 0bd2f31a8..efb9d43c8 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -339,8 +339,7 @@ public final void testConsumeShard() throws Exception { KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo); getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, - new SynchronousGetRecordsRetrievalStrategy(dataFetcher), - 0L)); + new SynchronousGetRecordsRetrievalStrategy(dataFetcher))); when(recordsFetcherFactory.createRecordsFetcher(any(), anyString(),any())).thenReturn(getRecordsCache); ShardConsumer consumer = @@ -469,8 +468,7 @@ public final void testConsumeShardWithInitialPositionAtTimestamp() throws Except KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo); getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, - new SynchronousGetRecordsRetrievalStrategy(dataFetcher), - 0L)); + new SynchronousGetRecordsRetrievalStrategy(dataFetcher))); when(recordsFetcherFactory.createRecordsFetcher(any(), anyString(),any())).thenReturn(getRecordsCache); ShardConsumer consumer =