Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drain delivery queue to make slow consumers consume events at their pace #607

Merged
merged 4 commits into from
Sep 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package software.amazon.kinesis.retrieval.fanout;

import com.google.common.annotations.VisibleForTesting;
import lombok.AccessLevel;
import lombok.Data;
import lombok.Getter;
import lombok.NonNull;
Expand All @@ -32,6 +33,7 @@
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponse;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
import software.amazon.awssdk.utils.Either;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.KinesisRequestsBuilder;
Expand Down Expand Up @@ -63,8 +65,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
private static final ThrowableCategory ACQUIRE_TIMEOUT_CATEGORY = new ThrowableCategory(
ThrowableType.ACQUIRE_TIMEOUT);
private static final ThrowableCategory READ_TIMEOUT_CATEGORY = new ThrowableCategory(ThrowableType.READ_TIMEOUT);
private static final int MAX_EVENT_BURST_FROM_SERVICE = 10;
private static final long TIME_TO_WAIT_FOR_FINAL_ACK_MILLIS = 1000;
// Max burst of 10 payload events + 1 terminal event (onError/onComplete) from the service.
private static final int MAX_EVENT_BURST_FROM_SERVICE = 10 + 1;

private final KinesisAsyncClient kinesis;
private final String shardId;
Expand All @@ -82,9 +84,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
private Subscriber<? super RecordsRetrieved> subscriber;
private long availableQueueSpace = 0;

private BlockingQueue<RecordsRetrievedContext> recordsDeliveryQueue = new LinkedBlockingQueue<>(MAX_EVENT_BURST_FROM_SERVICE);
// Flag to indicate if the active subscription is being torn down.
private boolean pendingActiveSubscriptionShutdown = false;
private BlockingQueue<RecordsRetrievedContext> recordsDeliveryQueue = new LinkedBlockingQueue<>(
MAX_EVENT_BURST_FROM_SERVICE);

@Override
public void start(ExtendedSequenceNumber extendedSequenceNumber,
Expand Down Expand Up @@ -135,13 +136,6 @@ public void notify(RecordsDeliveryAck recordsDeliveryAck) {
triggeringFlow = evictAckedEventAndScheduleNextEvent(recordsDeliveryAck);
} catch (Throwable t) {
errorOccurred(triggeringFlow, t);
} finally {
// Notify all the actors who are waiting for the records ack event.
// Here, when the active subscription is being torn down, the completing thread will
// wait for the last delivered records to send back the ack, to prevent sending duplicate records.
if(pendingActiveSubscriptionShutdown) {
lockObject.notifyAll();
}
}
if (triggeringFlow != null) {
updateAvailableQueueSpaceAndRequestUpstream(triggeringFlow);
Expand All @@ -158,20 +152,23 @@ RecordFlow evictAckedEventAndScheduleNextEvent(RecordsDeliveryAck recordsDeliver
// RecordFlow of the current event that needs to be returned
RecordFlow flowToBeReturned = null;

final RecordsRetrieved recordsRetrieved = recordsRetrievedContext != null ?
recordsRetrievedContext.getRecordsRetrieved() : null;

// Check if the ack corresponds to the head of the delivery queue.
if (recordsRetrievedContext != null && recordsRetrievedContext.getRecordsRetrieved().batchUniqueIdentifier()
if (recordsRetrieved != null && recordsRetrieved.batchUniqueIdentifier()
.equals(recordsDeliveryAck.batchUniqueIdentifier())) {
// It is now safe to remove the element
recordsDeliveryQueue.poll();
// Take action based on the time spent by the event in queue.
takeDelayedDeliveryActionIfRequired(shardId, recordsRetrievedContext.getEnqueueTimestamp(), log);
// Update current sequence number for the successfully delivered event.
currentSequenceNumber = ((FanoutRecordsRetrieved)recordsRetrievedContext.getRecordsRetrieved()).continuationSequenceNumber();
currentSequenceNumber = ((FanoutRecordsRetrieved)recordsRetrieved).continuationSequenceNumber();
// Update the triggering flow for post scheduling upstream request.
flowToBeReturned = recordsRetrievedContext.getRecordFlow();
// Try scheduling the next event in the queue, if available.
// Try scheduling the next event in the queue or execute the subscription shutdown action.
if (!recordsDeliveryQueue.isEmpty()) {
scheduleNextEvent(recordsDeliveryQueue.peek().getRecordsRetrieved());
recordsDeliveryQueue.peek().executeEventAction(subscriber);
}
} else {
// Check if the mismatched event belongs to active flow. If publisher receives an ack for a
Expand All @@ -197,14 +194,14 @@ RecordFlow evictAckedEventAndScheduleNextEvent(RecordsDeliveryAck recordsDeliver
@VisibleForTesting
void bufferCurrentEventAndScheduleIfRequired(RecordsRetrieved recordsRetrieved, RecordFlow triggeringFlow) {
final RecordsRetrievedContext recordsRetrievedContext =
new RecordsRetrievedContext(recordsRetrieved, triggeringFlow, Instant.now());
new RecordsRetrievedContext(Either.left(recordsRetrieved), triggeringFlow, Instant.now());
try {
// Try enqueueing the RecordsRetrieved batch to the queue, which would throw exception on failure.
// Note: This does not block wait to enqueue.
recordsDeliveryQueue.add(recordsRetrievedContext);
// If the current batch is the only element in the queue, then try scheduling the event delivery.
if (recordsDeliveryQueue.size() == 1) {
scheduleNextEvent(recordsRetrieved);
subscriber.onNext(recordsRetrieved);
}
} catch (IllegalStateException e) {
log.warn("{}: Unable to enqueue the payload due to capacity restrictions in delivery queue with remaining capacity {} ",
Expand All @@ -216,19 +213,40 @@ void bufferCurrentEventAndScheduleIfRequired(RecordsRetrieved recordsRetrieved,
}
}

// This method is not thread-safe. You need to acquire a lock in the caller in order to execute this.
// Schedule the next event only when the active subscription is not pending shutdown.
private void scheduleNextEvent(RecordsRetrieved recordsRetrieved) {
if (!pendingActiveSubscriptionShutdown) {
subscriber.onNext(recordsRetrieved);
}
}

@Data
private static final class RecordsRetrievedContext {
private final RecordsRetrieved recordsRetrieved;
@Getter(AccessLevel.NONE)
private final Either<RecordsRetrieved, SubscriptionShutdownEvent> recordsOrShutdownEvent;
private final RecordFlow recordFlow;
private final Instant enqueueTimestamp;

RecordsRetrieved getRecordsRetrieved() {
return recordsOrShutdownEvent.map(recordsEvent -> recordsEvent, shutdownEvent -> null);
}

// This method is not thread-safe. You need to acquire a lock in the caller in order to execute this.
void executeEventAction(Subscriber<? super RecordsRetrieved> subscriber) {
recordsOrShutdownEvent.apply(recordsEvent -> subscriber.onNext(recordsEvent),
shutdownEvent -> shutdownEvent.getSubscriptionShutdownAction().run());
}
}

@Getter
private static final class SubscriptionShutdownEvent {
private final Runnable subscriptionShutdownAction;
private final String eventIdentifier;
private final Throwable shutdownEventThrowableOptional;

SubscriptionShutdownEvent(Runnable subscriptionShutdownAction, String eventIdentifier, Throwable shutdownEventThrowableOptional) {
this.subscriptionShutdownAction = subscriptionShutdownAction;
this.eventIdentifier = eventIdentifier;
this.shutdownEventThrowableOptional = shutdownEventThrowableOptional;
}

SubscriptionShutdownEvent(Runnable subscriptionShutdownAction, String eventIdentifier) {
this(subscriptionShutdownAction, eventIdentifier, null);
}

}

private boolean hasValidSubscriber() {
Expand Down Expand Up @@ -280,9 +298,6 @@ private void errorOccurred(RecordFlow triggeringFlow, Throwable t) {
return;
}

// Clear the delivery buffer so that next subscription don't yield duplicate records.
resetRecordsDeliveryStateOnSubscriptionShutdown();

Throwable propagationThrowable = t;
ThrowableCategory category = throwableCategory(propagationThrowable);

Expand Down Expand Up @@ -339,31 +354,6 @@ private void resetRecordsDeliveryStateOnSubscriptionOnInit() {
+ "previous subscription - {} ", shardId, subscribeToShardId);
recordsDeliveryQueue.clear();
}
if(pendingActiveSubscriptionShutdown) {
log.warn("{}: Found current subscription to be in pendingShutdown state while initializing. This indicates unsuccessful clean up of"
+ "previous subscription - {} ", shardId, subscribeToShardId);
// Set pendingActiveSubscriptionShutdown to default value.
pendingActiveSubscriptionShutdown = false;
}
}

// This method is not thread safe. This needs to be executed after acquiring lock on this.lockObject
private void resetRecordsDeliveryStateOnSubscriptionShutdown() {
// Wait for final event notification during the end of the subscription.
if (!recordsDeliveryQueue.isEmpty()) {
// This will prevent further events from getting scheduled, during the wait period.
pendingActiveSubscriptionShutdown = true;
try {
// Wait for the configured time to get a notification for already delivered event, if any.
lockObject.wait(TIME_TO_WAIT_FOR_FINAL_ACK_MILLIS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// Clear the queue to remove any remaining entries from the queue.
recordsDeliveryQueue.clear();
// Set pendingActiveSubscriptionShutdown to default value.
pendingActiveSubscriptionShutdown = false;
}
}

protected void logAcquireTimeoutMessage(Throwable t) {
Expand Down Expand Up @@ -490,13 +480,15 @@ private void updateAvailableQueueSpaceAndRequestUpstream(RecordFlow triggeringFl
}
}

private boolean shouldShutdownSubscriptionNow() {
return recordsDeliveryQueue.isEmpty();
}

private void onComplete(RecordFlow triggeringFlow) {
synchronized (lockObject) {
log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#onComplete) @ {} id: {}", shardId,
triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId);

resetRecordsDeliveryStateOnSubscriptionShutdown();

triggeringFlow.cancel();
if (!hasValidSubscriber()) {
log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#onComplete) @ {} id: {}", shardId,
Expand All @@ -512,7 +504,7 @@ private void onComplete(RecordFlow triggeringFlow) {
}

if (currentSequenceNumber != null) {
log.debug("{}: Shard hasn't ended resubscribing.", shardId);
log.debug("{}: Shard hasn't ended. Resubscribing.", shardId);
subscribeToShard(currentSequenceNumber);
} else {
log.debug("{}: Shard has ended completing subscriber.", shardId);
Expand Down Expand Up @@ -732,6 +724,18 @@ public void responseReceived(SubscribeToShardResponse response) {

@Override
public void exceptionOccurred(Throwable throwable) {
synchronized (parent.lockObject) {
if (parent.shouldShutdownSubscriptionNow()) {
executeExceptionOccurred(throwable);
} else {
final SubscriptionShutdownEvent subscriptionShutdownEvent = new SubscriptionShutdownEvent(
() -> executeExceptionOccurred(throwable), "onError", throwable);
tryEnqueueSubscriptionShutdownEvent(subscriptionShutdownEvent);
}
}
}

private void executeExceptionOccurred(Throwable throwable) {
synchronized (parent.lockObject) {

log.debug("{}: [SubscriptionLifetime]: (RecordFlow#exceptionOccurred) @ {} id: {} -- {}: {}",
Expand Down Expand Up @@ -759,6 +763,32 @@ public void exceptionOccurred(Throwable throwable) {

@Override
public void complete() {
synchronized (parent.lockObject) {
if (parent.shouldShutdownSubscriptionNow()) {
executeComplete();
} else {
final SubscriptionShutdownEvent subscriptionShutdownEvent = new SubscriptionShutdownEvent(
() -> executeComplete(), "onComplete");
tryEnqueueSubscriptionShutdownEvent(subscriptionShutdownEvent);
}
}
}

// This method is not thread safe. This needs to be executed after acquiring lock on parent.lockObject
private void tryEnqueueSubscriptionShutdownEvent(SubscriptionShutdownEvent subscriptionShutdownEvent) {
try {
parent.recordsDeliveryQueue
.add(new RecordsRetrievedContext(Either.right(subscriptionShutdownEvent), this, Instant.now()));
} catch (Exception e) {
log.warn(
"{}: Unable to enqueue the {} shutdown event due to capacity restrictions in delivery queue with remaining capacity {}. Ignoring. ",
parent.shardId, subscriptionShutdownEvent.getEventIdentifier(),
parent.recordsDeliveryQueue.remainingCapacity(),
subscriptionShutdownEvent.getShutdownEventThrowableOptional());
}
}

private void executeComplete() {
synchronized (parent.lockObject) {
log.debug("{}: [SubscriptionLifetime]: (RecordFlow#complete) @ {} id: {} -- Connection completed",
parent.shardId, connectionStartedAt, subscribeToShardId);
Expand Down
Loading