Skip to content

Commit

Permalink
Added acquire timeout message, and a test. (#514)
Browse files Browse the repository at this point in the history
The test doesn't verify the message, but does verify that an acquire
timeout triggers the FanOutRecordsPublisher to call logAcquireTimeoutMessage.
  • Loading branch information
pfifer authored and sahilpalvia committed Mar 6, 2019
1 parent 610295e commit 6685a92
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,21 @@ private void errorOccurred(RecordFlow triggeringFlow, Throwable t) {
String logMessage = String.format(
"%s: [SubscriptionLifetime] - (FanOutRecordsPublisher#errorOccurred) @ %s id: %s -- %s",
shardId, flow.connectionStartedAt, flow.subscribeToShardId, category.throwableTypeString);
if (category.throwableType.equals(ThrowableType.READ_TIMEOUT)) {
switch (category.throwableType) {
case READ_TIMEOUT:
log.debug(logMessage, propagationThrowable);
propagationThrowable = new RetryableRetrievalException(category.throwableTypeString,
(Exception) propagationThrowable.getCause());
} else {
break;
case ACQUIRE_TIMEOUT:
logAcquireTimeoutMessage(t);
//
// Fall through is intentional here as we still want to log the details of the exception
//
default:
log.warn(logMessage, propagationThrowable);
}

}
flow.cancel();
}
log.debug("{}: availableQueueSpace zeroing from {}", shardId, availableQueueSpace);
Expand All @@ -190,15 +197,21 @@ private void errorOccurred(RecordFlow triggeringFlow, Throwable t) {
}
}

protected void logAcquireTimeoutMessage(Throwable t) {
log.error("An acquire timeout occurred which usually indicates that the KinesisAsyncClient supplied has a " +
"low maximum streams limit. " +
"Please use the software.amazon.kinesis.common.KinesisClientUtil to setup the client, " +
"or refer to the class to setup the client manually.");
}

private void handleFlowError(Throwable t) {
if (t.getCause() instanceof ResourceNotFoundException) {
log.debug(
"{}: Could not call SubscribeToShard successfully because shard no longer exists. Marking shard for completion.",
shardId);
FanoutRecordsRetrieved response = new FanoutRecordsRetrieved(
ProcessRecordsInput.builder().records(Collections.emptyList()).isAtShardEnd(true).build(), null);
subscriber
.onNext(response);
subscriber.onNext(response);
subscriber.onComplete();
} else {
subscriber.onError(t);
Expand Down Expand Up @@ -280,7 +293,8 @@ private void recordsReceived(RecordFlow triggeringFlow, SubscribeToShardEvent re
ProcessRecordsInput input = ProcessRecordsInput.builder().cacheEntryTime(Instant.now())
.millisBehindLatest(recordBatchEvent.millisBehindLatest())
.isAtShardEnd(recordBatchEvent.continuationSequenceNumber() == null).records(records).build();
FanoutRecordsRetrieved recordsRetrieved = new FanoutRecordsRetrieved(input, recordBatchEvent.continuationSequenceNumber());
FanoutRecordsRetrieved recordsRetrieved = new FanoutRecordsRetrieved(input,
recordBatchEvent.continuationSequenceNumber());

try {
subscriber.onNext(recordsRetrieved);
Expand Down Expand Up @@ -416,7 +430,8 @@ public void cancel() {
}
subscriber = null;
if (flow != null) {
log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher/Subscription#cancel) @ {} id: {}",
log.debug(
"{}: [SubscriptionLifetime]: (FanOutRecordsPublisher/Subscription#cancel) @ {} id: {}",
shardId, flow.connectionStartedAt, flow.subscribeToShardId);
flow.cancel();
availableQueueSpace = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import io.netty.handler.timeout.ReadTimeoutException;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeDiagnosingMatcher;
Expand All @@ -30,9 +33,12 @@
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import io.netty.handler.timeout.ReadTimeoutException;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
Expand Down Expand Up @@ -71,13 +77,15 @@ public class FanOutRecordsPublisherTest {
public void simpleTest() throws Exception {
FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);

ArgumentCaptor<FanOutRecordsPublisher.RecordSubscription> captor = ArgumentCaptor.forClass(FanOutRecordsPublisher.RecordSubscription.class);
ArgumentCaptor<FanOutRecordsPublisher.RecordSubscription> captor = ArgumentCaptor
.forClass(FanOutRecordsPublisher.RecordSubscription.class);
ArgumentCaptor<FanOutRecordsPublisher.RecordFlow> flowCaptor = ArgumentCaptor
.forClass(FanOutRecordsPublisher.RecordFlow.class);

doNothing().when(publisher).subscribe(captor.capture());

source.start(ExtendedSequenceNumber.LATEST, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST));
source.start(ExtendedSequenceNumber.LATEST,
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST));

List<ProcessRecordsInput> receivedInput = new ArrayList<>();

Expand Down Expand Up @@ -138,13 +146,15 @@ public void onComplete() {
public void largeRequestTest() throws Exception {
FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN);

ArgumentCaptor<FanOutRecordsPublisher.RecordSubscription> captor = ArgumentCaptor.forClass(FanOutRecordsPublisher.RecordSubscription.class);
ArgumentCaptor<FanOutRecordsPublisher.RecordSubscription> captor = ArgumentCaptor
.forClass(FanOutRecordsPublisher.RecordSubscription.class);
ArgumentCaptor<FanOutRecordsPublisher.RecordFlow> flowCaptor = ArgumentCaptor
.forClass(FanOutRecordsPublisher.RecordFlow.class);

doNothing().when(publisher).subscribe(captor.capture());

source.start(ExtendedSequenceNumber.LATEST, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST));
source.start(ExtendedSequenceNumber.LATEST,
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST));

List<ProcessRecordsInput> receivedInput = new ArrayList<>();

Expand Down Expand Up @@ -288,9 +298,8 @@ public void testContinuesAfterSequence() {
ArgumentCaptor<FanOutRecordsPublisher.RecordFlow> nextFlowCaptor = ArgumentCaptor
.forClass(FanOutRecordsPublisher.RecordFlow.class);


SubscribeToShardRequest nextExpected = SubscribeToShardRequest.builder().consumerARN(CONSUMER_ARN).shardId(SHARD_ID)
.startingPosition(StartingPosition.builder().sequenceNumber("3")
SubscribeToShardRequest nextExpected = SubscribeToShardRequest.builder().consumerARN(CONSUMER_ARN)
.shardId(SHARD_ID).startingPosition(StartingPosition.builder().sequenceNumber("3")
.type(ShardIteratorType.AFTER_SEQUENCE_NUMBER).build())
.build();

Expand All @@ -301,7 +310,6 @@ public void testContinuesAfterSequence() {
nextFlowCaptor.getValue().onEventStream(publisher);
nextSubscribeCaptor.getValue().onSubscribe(subscription);


List<Record> nextRecords = Stream.of(4, 5, 6).map(this::makeRecord).collect(Collectors.toList());
List<KinesisClientRecordMatcher> nextMatchers = nextRecords.stream().map(KinesisClientRecordMatcher::new)
.collect(Collectors.toList());
Expand All @@ -319,13 +327,128 @@ public void testContinuesAfterSequence() {

}

@Test
public void acquireTimeoutTriggersLogMethodForActiveFlow() {
AtomicBoolean acquireTimeoutLogged = new AtomicBoolean(false);
FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN) {
@Override
protected void logAcquireTimeoutMessage(Throwable t) {
super.logAcquireTimeoutMessage(t);
acquireTimeoutLogged.set(true);
}
};

ArgumentCaptor<FanOutRecordsPublisher.RecordSubscription> captor = ArgumentCaptor
.forClass(FanOutRecordsPublisher.RecordSubscription.class);
ArgumentCaptor<FanOutRecordsPublisher.RecordFlow> flowCaptor = ArgumentCaptor
.forClass(FanOutRecordsPublisher.RecordFlow.class);

doNothing().when(publisher).subscribe(captor.capture());

source.start(ExtendedSequenceNumber.LATEST,
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST));
RecordingSubscriber subscriber = new RecordingSubscriber();
source.subscribe(subscriber);

verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture());

Throwable exception = new CompletionException(
"software.amazon.awssdk.core.exception.SdkClientException",
SdkClientException.create(null, new Throwable(
"Acquire operation took longer than the configured maximum time. This indicates that a " +
"request cannot get a connection from the pool within the specified maximum time. " +
"This can be due to high request rate.\n" +
"Consider taking any of the following actions to mitigate the issue: increase max " +
"connections, increase acquire timeout, or slowing the request rate.\n" +
"Increasing the max connections can increase client throughput (unless the network " +
"interface is already fully utilized), but can eventually start to hit operation " +
"system limitations on the number of file descriptors used by the process. " +
"If you already are fully utilizing your network interface or cannot further " +
"increase your connection count, increasing the acquire timeout gives extra time " +
"for requests to acquire a connection before timing out. " +
"If the connections doesn't free up, the subsequent requests will still timeout.\n" +
"If the above mechanisms are not able to fix the issue, try smoothing out your " +
"requests so that large traffic bursts cannot overload the client, being more " +
"efficient with the number of times you need to call AWS, or by increasing the " +
"number of hosts sending requests.")));

flowCaptor.getValue().exceptionOccurred(exception);

Optional<OnErrorEvent> onErrorEvent = subscriber.events.stream().filter(e -> e instanceof OnErrorEvent).map(e -> (OnErrorEvent)e).findFirst();

assertThat(onErrorEvent, equalTo(Optional.of(new OnErrorEvent(exception))));
assertThat(acquireTimeoutLogged.get(), equalTo(true));

}

private void verifyRecords(List<KinesisClientRecord> clientRecordsList, List<KinesisClientRecordMatcher> matchers) {
assertThat(clientRecordsList.size(), equalTo(matchers.size()));
for (int i = 0; i < clientRecordsList.size(); ++i) {
assertThat(clientRecordsList.get(i), matchers.get(i));
}
}

private interface SubscriberEvent {

}

@Data
private static class SubscribeEvent implements SubscriberEvent {
final Subscription subscription;
}

@Data
private static class OnNextEvent implements SubscriberEvent {
final RecordsRetrieved recordsRetrieved;
}

@Data
private static class OnErrorEvent implements SubscriberEvent {
final Throwable throwable;
}

@Data
private static class OnCompleteEvent implements SubscriberEvent {

}

@Data
private static class RequestEvent implements SubscriberEvent {
final long requested;
}

private static class RecordingSubscriber implements Subscriber<RecordsRetrieved> {

final List<SubscriberEvent> events = new LinkedList<>();

Subscription subscription;

@Override
public void onSubscribe(Subscription s) {
events.add(new SubscribeEvent(s));
subscription = s;
subscription.request(1);
events.add(new RequestEvent(1));
}

@Override
public void onNext(RecordsRetrieved recordsRetrieved) {
events.add(new OnNextEvent(recordsRetrieved));
subscription.request(1);
events.add(new RequestEvent(1));
}

@Override
public void onError(Throwable t) {
events.add(new OnErrorEvent(t));
}

@Override
public void onComplete() {
events.add(new OnCompleteEvent());
}
}

private static class NonFailingSubscriber implements Subscriber<RecordsRetrieved> {
final List<ProcessRecordsInput> received = new ArrayList<>();
Subscription subscription;
Expand Down

0 comments on commit 6685a92

Please sign in to comment.