Skip to content

Commit

Permalink
Changed test to ensure that PositionResetException is thrown
Browse files Browse the repository at this point in the history
Changed the test to wait for the queue to reach capacity before
restarting the PrefetchRecordsPublisher.  This should mostly ensure
that calling restartFrom will trigger a throw of a
PositionResetException.

Added @VisibleFortest on the queue since it was already being used in testing.
  • Loading branch information
pfifer committed Feb 6, 2019
1 parent 63f30b3 commit 3e30709
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.Validate;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
Expand Down Expand Up @@ -63,6 +64,7 @@
@KinesisClientInternalApi
public class PrefetchRecordsPublisher implements RecordsPublisher {
private static final String EXPIRED_ITERATOR_METRIC = "ExpiredIterator";
@VisibleForTesting
LinkedBlockingQueue<PrefetchRecordsRetrieved> getRecordsResultQueue;
private int maxPendingProcessRecordsInput;
private int maxByteSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,10 @@ public void testResetClearsRemainingData() {

verify(getRecordsRetrievalStrategy, atLeast(2)).getRecords(anyInt());

while(getRecordsCache.getRecordsResultQueue.remainingCapacity() > 0) {
Thread.yield();
}

getRecordsCache.restartFrom(lastProcessed);
RecordsRetrieved postRestart = getRecordsCache.getNextResult();

Expand Down

0 comments on commit 3e30709

Please sign in to comment.