Skip to content

Commit

Permalink
Don't Sleep for During Retrieval for the BlockingGetRecordsCache
Browse files Browse the repository at this point in the history
The BlockingGetRecordsCache shouldn't sleep when retrieving records as backoff is provided in other parts of the ShardConumer.
  • Loading branch information
sahilpalvia authored and pfifer committed Oct 24, 2017
1 parent cc7e329 commit 73426bd
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,11 @@
public class BlockingGetRecordsCache implements GetRecordsCache {
private final int maxRecordsPerCall;
private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
private final long idleMillisBetweenCalls;
private Instant lastSuccessfulCall;

public BlockingGetRecordsCache(final int maxRecordsPerCall,
final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy,
final long idleMillisBetweenCalls) {
final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) {
this.maxRecordsPerCall = maxRecordsPerCall;
this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy;
this.idleMillisBetweenCalls = idleMillisBetweenCalls;
}

@Override
Expand All @@ -51,33 +47,12 @@ public void start() {

@Override
public ProcessRecordsInput getNextResult() {
sleepBeforeNextCall();
GetRecordsResult getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall);
lastSuccessfulCall = Instant.now();
ProcessRecordsInput processRecordsInput = new ProcessRecordsInput()
return new ProcessRecordsInput()
.withRecords(getRecordsResult.getRecords())
.withMillisBehindLatest(getRecordsResult.getMillisBehindLatest());
return processRecordsInput;
}

private void sleepBeforeNextCall() {
if (!Thread.interrupted()) {
if (lastSuccessfulCall == null) {
return;
}
long timeSinceLastCall = Duration.between(lastSuccessfulCall, Instant.now()).abs().toMillis();
if (timeSinceLastCall < idleMillisBetweenCalls) {
try {
Thread.sleep(idleMillisBetweenCalls - timeSinceLastCall);
} catch (InterruptedException e) {
log.info("Thread was interrupted, indicating that shutdown was called.");
}
}
} else {
log.info("Thread has been interrupted, indicating that it is in the shutdown phase.");
}
}

@Override
public GetRecordsRetrievalStrategy getGetRecordsRetrievalStrategy() {
return getRecordsRetrievalStrategy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory {
private int maxRecordsCount = 30000;
private long idleMillisBetweenCalls = 1500L;
private DataFetchingStrategy dataFetchingStrategy = DataFetchingStrategy.DEFAULT;
private IMetricsFactory metricsFactory;

public SimpleRecordsFetcherFactory(int maxRecords) {
this.maxRecords = maxRecords;
Expand All @@ -37,7 +36,7 @@ public SimpleRecordsFetcherFactory(int maxRecords) {
@Override
public GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId, IMetricsFactory metricsFactory) {
if(dataFetchingStrategy.equals(DataFetchingStrategy.DEFAULT)) {
return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy, idleMillisBetweenCalls);
return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy);
} else {
return new PrefetchGetRecordsCache(maxPendingProcessRecordsInput, maxByteSize, maxRecordsCount, maxRecords,
getRecordsRetrievalStrategy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
@RunWith(MockitoJUnitRunner.class)
public class BlockingGetRecordsCacheTest {
private static final int MAX_RECORDS_PER_COUNT = 10_000;
private static final long IDLE_MILLIS_BETWEEN_CALLS = 500L;

@Mock
private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
Expand All @@ -53,7 +52,7 @@ public class BlockingGetRecordsCacheTest {
@Before
public void setup() {
records = new ArrayList<>();
blockingGetRecordsCache = new BlockingGetRecordsCache(MAX_RECORDS_PER_COUNT, getRecordsRetrievalStrategy, IDLE_MILLIS_BETWEEN_CALLS);
blockingGetRecordsCache = new BlockingGetRecordsCache(MAX_RECORDS_PER_COUNT, getRecordsRetrievalStrategy);

when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_COUNT))).thenReturn(getRecordsResult);
when(getRecordsResult.getRecords()).thenReturn(records);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,7 @@ public final void testConsumeShard() throws Exception {
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo);

getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords,
new SynchronousGetRecordsRetrievalStrategy(dataFetcher),
0L));
new SynchronousGetRecordsRetrievalStrategy(dataFetcher)));
when(recordsFetcherFactory.createRecordsFetcher(any(), anyString(),any())).thenReturn(getRecordsCache);

ShardConsumer consumer =
Expand Down Expand Up @@ -469,8 +468,7 @@ public final void testConsumeShardWithInitialPositionAtTimestamp() throws Except
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo);

getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords,
new SynchronousGetRecordsRetrievalStrategy(dataFetcher),
0L));
new SynchronousGetRecordsRetrievalStrategy(dataFetcher)));
when(recordsFetcherFactory.createRecordsFetcher(any(), anyString(),any())).thenReturn(getRecordsCache);

ShardConsumer consumer =
Expand Down

0 comments on commit 73426bd

Please sign in to comment.