diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java index 431b134f6..15a564dfc 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java @@ -176,7 +176,7 @@ public void cancel() { }); } - private synchronized void addArrivedRecordsInput(ProcessRecordsInput processRecordsInput) throws InterruptedException { + private void addArrivedRecordsInput(ProcessRecordsInput processRecordsInput) throws InterruptedException { getRecordsResultQueue.put(processRecordsInput); prefetchCounters.added(processRecordsInput); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java index dd4b96acf..7fb82ea6b 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java @@ -15,38 +15,48 @@ package software.amazon.kinesis.retrieval.polling; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.atMost; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import java.nio.ByteBuffer; import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; -import lombok.extern.slf4j.Slf4j; import org.junit.After; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.runners.MockitoJUnitRunner; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import io.reactivex.Flowable; +import io.reactivex.schedulers.Schedulers; +import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; @@ -222,10 +232,11 @@ public void testCallAfterShutdown() { @Test public void testExpiredIteratorException() { log.info("Starting tests"); - getRecordsCache.start(sequenceNumber, initialPosition); - when(getRecordsRetrievalStrategy.getRecords(MAX_RECORDS_PER_CALL)).thenThrow(ExpiredIteratorException.class) .thenReturn(getRecordsResponse); + + getRecordsCache.start(sequenceNumber, initialPosition); + doNothing().when(dataFetcher).restartIterator(); getRecordsCache.getNextResult(); @@ -235,6 +246,85 @@ public void testExpiredIteratorException() { verify(dataFetcher).restartIterator(); } + @Test(timeout = 1000L) + public void testNoDeadlockOnFullQueue() { + // + // Fixes https://github.com/awslabs/amazon-kinesis-client/issues/448 + // + // This test is to verify that the drain of a blocked queue no longer deadlocks. + // If the test times out before starting the subscriber it means something went wrong while filling the queue. + // After the subscriber is started one of the things that can trigger a timeout is a deadlock. + // + GetRecordsResponse response = GetRecordsResponse.builder().records( + Record.builder().data(SdkBytes.fromByteArray(new byte[] { 1, 2, 3 })).sequenceNumber("123").build()) + .build(); + when(getRecordsRetrievalStrategy.getRecords(anyInt())).thenReturn(response); + + getRecordsCache.start(sequenceNumber, initialPosition); + + // + // Wait for the queue to fill up, and the publisher to block on adding items to the queue. + // + log.info("Waiting for queue to fill up"); + while (getRecordsCache.getRecordsResultQueue.size() < MAX_SIZE) { + Thread.yield(); + } + + log.info("Queue is currently at {} starting subscriber", getRecordsCache.getRecordsResultQueue.size()); + AtomicInteger receivedItems = new AtomicInteger(0); + final int expectedItems = MAX_SIZE * 3; + + Object lock = new Object(); + + Subscriber subscriber = new Subscriber() { + Subscription sub; + + @Override + public void onSubscribe(Subscription s) { + sub = s; + s.request(1); + } + + @Override + public void onNext(ProcessRecordsInput processRecordsInput) { + receivedItems.incrementAndGet(); + if (receivedItems.get() >= expectedItems) { + synchronized (lock) { + log.info("Notifying waiters"); + lock.notifyAll(); + } + sub.cancel(); + } else { + sub.request(1); + } + } + + @Override + public void onError(Throwable t) { + log.error("Caught error", t); + throw new RuntimeException(t); + } + + @Override + public void onComplete() { + fail("onComplete not expected in this test"); + } + }; + + synchronized (lock) { + log.info("Awaiting notification"); + Flowable.fromPublisher(getRecordsCache).subscribeOn(Schedulers.computation()) + .observeOn(Schedulers.computation(), true, 8).subscribe(subscriber); + try { + lock.wait(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + verify(getRecordsRetrievalStrategy, atLeast(expectedItems)).getRecords(anyInt()); + assertThat(receivedItems.get(), equalTo(expectedItems)); + } + @After public void shutdown() { getRecordsCache.shutdown();