Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove a possible deadlock on polling queue fill #462

Merged
merged 4 commits into from
Nov 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public void cancel() {
});
}

private synchronized void addArrivedRecordsInput(ProcessRecordsInput processRecordsInput) throws InterruptedException {
private void addArrivedRecordsInput(ProcessRecordsInput processRecordsInput) throws InterruptedException {
getRecordsResultQueue.put(processRecordsInput);
prefetchCounters.added(processRecordsInput);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,38 +15,48 @@

package software.amazon.kinesis.retrieval.polling;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import lombok.extern.slf4j.Slf4j;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
Expand Down Expand Up @@ -222,10 +232,11 @@ public void testCallAfterShutdown() {
@Test
public void testExpiredIteratorException() {
log.info("Starting tests");
getRecordsCache.start(sequenceNumber, initialPosition);

when(getRecordsRetrievalStrategy.getRecords(MAX_RECORDS_PER_CALL)).thenThrow(ExpiredIteratorException.class)
.thenReturn(getRecordsResponse);

getRecordsCache.start(sequenceNumber, initialPosition);

doNothing().when(dataFetcher).restartIterator();

getRecordsCache.getNextResult();
Expand All @@ -235,6 +246,85 @@ public void testExpiredIteratorException() {
verify(dataFetcher).restartIterator();
}

@Test(timeout = 1000L)
public void testNoDeadlockOnFullQueue() {
//
// Fixes https://github.com/awslabs/amazon-kinesis-client/issues/448
//
// This test is to verify that the drain of a blocked queue no longer deadlocks.
// If the test times out before starting the subscriber it means something went wrong while filling the queue.
// After the subscriber is started one of the things that can trigger a timeout is a deadlock.
//
GetRecordsResponse response = GetRecordsResponse.builder().records(
Record.builder().data(SdkBytes.fromByteArray(new byte[] { 1, 2, 3 })).sequenceNumber("123").build())
.build();
when(getRecordsRetrievalStrategy.getRecords(anyInt())).thenReturn(response);

getRecordsCache.start(sequenceNumber, initialPosition);

//
// Wait for the queue to fill up, and the publisher to block on adding items to the queue.
//
log.info("Waiting for queue to fill up");
while (getRecordsCache.getRecordsResultQueue.size() < MAX_SIZE) {
Thread.yield();
}

log.info("Queue is currently at {} starting subscriber", getRecordsCache.getRecordsResultQueue.size());
AtomicInteger receivedItems = new AtomicInteger(0);
final int expectedItems = MAX_SIZE * 3;

Object lock = new Object();

Subscriber<ProcessRecordsInput> subscriber = new Subscriber<ProcessRecordsInput>() {
Subscription sub;

@Override
public void onSubscribe(Subscription s) {
sub = s;
s.request(1);
}

@Override
public void onNext(ProcessRecordsInput processRecordsInput) {
receivedItems.incrementAndGet();
if (receivedItems.get() >= expectedItems) {
synchronized (lock) {
log.info("Notifying waiters");
lock.notifyAll();
}
sub.cancel();
} else {
sub.request(1);
}
}

@Override
public void onError(Throwable t) {
log.error("Caught error", t);
throw new RuntimeException(t);
}

@Override
public void onComplete() {
fail("onComplete not expected in this test");
}
};

synchronized (lock) {
log.info("Awaiting notification");
Flowable.fromPublisher(getRecordsCache).subscribeOn(Schedulers.computation())
.observeOn(Schedulers.computation(), true, 8).subscribe(subscriber);
try {
lock.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
verify(getRecordsRetrievalStrategy, atLeast(expectedItems)).getRecords(anyInt());
assertThat(receivedItems.get(), equalTo(expectedItems));
}

@After
public void shutdown() {
getRecordsCache.shutdown();
Expand Down