Skip to content

Commit

Permalink
Drain delivery queue to make slow consumers consume events at their p…
Browse files Browse the repository at this point in the history
…ace (#607)

* Allowing consumers to drain the delivery queue on subscription end

* Test cases fix

* Added test cases

* Made feedback changes
  • Loading branch information
ashwing authored and micah-jaffe committed Sep 14, 2019
1 parent db94cb6 commit a94dc7d
Show file tree
Hide file tree
Showing 3 changed files with 437 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package software.amazon.kinesis.retrieval.fanout;

import com.google.common.annotations.VisibleForTesting;
import lombok.AccessLevel;
import lombok.Data;
import lombok.Getter;
import lombok.NonNull;
Expand All @@ -32,6 +33,7 @@
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponse;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
import software.amazon.awssdk.utils.Either;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.KinesisRequestsBuilder;
Expand Down Expand Up @@ -63,8 +65,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
private static final ThrowableCategory ACQUIRE_TIMEOUT_CATEGORY = new ThrowableCategory(
ThrowableType.ACQUIRE_TIMEOUT);
private static final ThrowableCategory READ_TIMEOUT_CATEGORY = new ThrowableCategory(ThrowableType.READ_TIMEOUT);
private static final int MAX_EVENT_BURST_FROM_SERVICE = 10;
private static final long TIME_TO_WAIT_FOR_FINAL_ACK_MILLIS = 1000;
// Max burst of 10 payload events + 1 terminal event (onError/onComplete) from the service.
private static final int MAX_EVENT_BURST_FROM_SERVICE = 10 + 1;

private final KinesisAsyncClient kinesis;
private final String shardId;
Expand All @@ -82,9 +84,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher {
private Subscriber<? super RecordsRetrieved> subscriber;
private long availableQueueSpace = 0;

private BlockingQueue<RecordsRetrievedContext> recordsDeliveryQueue = new LinkedBlockingQueue<>(MAX_EVENT_BURST_FROM_SERVICE);
// Flag to indicate if the active subscription is being torn down.
private boolean pendingActiveSubscriptionShutdown = false;
private BlockingQueue<RecordsRetrievedContext> recordsDeliveryQueue = new LinkedBlockingQueue<>(
MAX_EVENT_BURST_FROM_SERVICE);

@Override
public void start(ExtendedSequenceNumber extendedSequenceNumber,
Expand Down Expand Up @@ -135,13 +136,6 @@ public void notify(RecordsDeliveryAck recordsDeliveryAck) {
triggeringFlow = evictAckedEventAndScheduleNextEvent(recordsDeliveryAck);
} catch (Throwable t) {
errorOccurred(triggeringFlow, t);
} finally {
// Notify all the actors who are waiting for the records ack event.
// Here, when the active subscription is being torn down, the completing thread will
// wait for the last delivered records to send back the ack, to prevent sending duplicate records.
if(pendingActiveSubscriptionShutdown) {
lockObject.notifyAll();
}
}
if (triggeringFlow != null) {
updateAvailableQueueSpaceAndRequestUpstream(triggeringFlow);
Expand All @@ -158,20 +152,23 @@ RecordFlow evictAckedEventAndScheduleNextEvent(RecordsDeliveryAck recordsDeliver
// RecordFlow of the current event that needs to be returned
RecordFlow flowToBeReturned = null;

final RecordsRetrieved recordsRetrieved = recordsRetrievedContext != null ?
recordsRetrievedContext.getRecordsRetrieved() : null;

// Check if the ack corresponds to the head of the delivery queue.
if (recordsRetrievedContext != null && recordsRetrievedContext.getRecordsRetrieved().batchUniqueIdentifier()
if (recordsRetrieved != null && recordsRetrieved.batchUniqueIdentifier()
.equals(recordsDeliveryAck.batchUniqueIdentifier())) {
// It is now safe to remove the element
recordsDeliveryQueue.poll();
// Take action based on the time spent by the event in queue.
takeDelayedDeliveryActionIfRequired(shardId, recordsRetrievedContext.getEnqueueTimestamp(), log);
// Update current sequence number for the successfully delivered event.
currentSequenceNumber = ((FanoutRecordsRetrieved)recordsRetrievedContext.getRecordsRetrieved()).continuationSequenceNumber();
currentSequenceNumber = ((FanoutRecordsRetrieved)recordsRetrieved).continuationSequenceNumber();
// Update the triggering flow for post scheduling upstream request.
flowToBeReturned = recordsRetrievedContext.getRecordFlow();
// Try scheduling the next event in the queue, if available.
// Try scheduling the next event in the queue or execute the subscription shutdown action.
if (!recordsDeliveryQueue.isEmpty()) {
scheduleNextEvent(recordsDeliveryQueue.peek().getRecordsRetrieved());
recordsDeliveryQueue.peek().executeEventAction(subscriber);
}
} else {
// Check if the mismatched event belongs to active flow. If publisher receives an ack for a
Expand All @@ -197,14 +194,14 @@ RecordFlow evictAckedEventAndScheduleNextEvent(RecordsDeliveryAck recordsDeliver
@VisibleForTesting
void bufferCurrentEventAndScheduleIfRequired(RecordsRetrieved recordsRetrieved, RecordFlow triggeringFlow) {
final RecordsRetrievedContext recordsRetrievedContext =
new RecordsRetrievedContext(recordsRetrieved, triggeringFlow, Instant.now());
new RecordsRetrievedContext(Either.left(recordsRetrieved), triggeringFlow, Instant.now());
try {
// Try enqueueing the RecordsRetrieved batch to the queue, which would throw exception on failure.
// Note: This does not block wait to enqueue.
recordsDeliveryQueue.add(recordsRetrievedContext);
// If the current batch is the only element in the queue, then try scheduling the event delivery.
if (recordsDeliveryQueue.size() == 1) {
scheduleNextEvent(recordsRetrieved);
subscriber.onNext(recordsRetrieved);
}
} catch (IllegalStateException e) {
log.warn("{}: Unable to enqueue the payload due to capacity restrictions in delivery queue with remaining capacity {} ",
Expand All @@ -216,19 +213,40 @@ void bufferCurrentEventAndScheduleIfRequired(RecordsRetrieved recordsRetrieved,
}
}

// This method is not thread-safe. You need to acquire a lock in the caller in order to execute this.
// Schedule the next event only when the active subscription is not pending shutdown.
private void scheduleNextEvent(RecordsRetrieved recordsRetrieved) {
if (!pendingActiveSubscriptionShutdown) {
subscriber.onNext(recordsRetrieved);
}
}

@Data
private static final class RecordsRetrievedContext {
private final RecordsRetrieved recordsRetrieved;
@Getter(AccessLevel.NONE)
private final Either<RecordsRetrieved, SubscriptionShutdownEvent> recordsOrShutdownEvent;
private final RecordFlow recordFlow;
private final Instant enqueueTimestamp;

RecordsRetrieved getRecordsRetrieved() {
return recordsOrShutdownEvent.map(recordsEvent -> recordsEvent, shutdownEvent -> null);
}

// This method is not thread-safe. You need to acquire a lock in the caller in order to execute this.
void executeEventAction(Subscriber<? super RecordsRetrieved> subscriber) {
recordsOrShutdownEvent.apply(recordsEvent -> subscriber.onNext(recordsEvent),
shutdownEvent -> shutdownEvent.getSubscriptionShutdownAction().run());
}
}

@Getter
private static final class SubscriptionShutdownEvent {
private final Runnable subscriptionShutdownAction;
private final String eventIdentifier;
private final Throwable shutdownEventThrowableOptional;

SubscriptionShutdownEvent(Runnable subscriptionShutdownAction, String eventIdentifier, Throwable shutdownEventThrowableOptional) {
this.subscriptionShutdownAction = subscriptionShutdownAction;
this.eventIdentifier = eventIdentifier;
this.shutdownEventThrowableOptional = shutdownEventThrowableOptional;
}

SubscriptionShutdownEvent(Runnable subscriptionShutdownAction, String eventIdentifier) {
this(subscriptionShutdownAction, eventIdentifier, null);
}

}

private boolean hasValidSubscriber() {
Expand Down Expand Up @@ -280,9 +298,6 @@ private void errorOccurred(RecordFlow triggeringFlow, Throwable t) {
return;
}

// Clear the delivery buffer so that next subscription don't yield duplicate records.
resetRecordsDeliveryStateOnSubscriptionShutdown();

Throwable propagationThrowable = t;
ThrowableCategory category = throwableCategory(propagationThrowable);

Expand Down Expand Up @@ -339,31 +354,6 @@ private void resetRecordsDeliveryStateOnSubscriptionOnInit() {
+ "previous subscription - {} ", shardId, subscribeToShardId);
recordsDeliveryQueue.clear();
}
if(pendingActiveSubscriptionShutdown) {
log.warn("{}: Found current subscription to be in pendingShutdown state while initializing. This indicates unsuccessful clean up of"
+ "previous subscription - {} ", shardId, subscribeToShardId);
// Set pendingActiveSubscriptionShutdown to default value.
pendingActiveSubscriptionShutdown = false;
}
}

// This method is not thread safe. This needs to be executed after acquiring lock on this.lockObject
private void resetRecordsDeliveryStateOnSubscriptionShutdown() {
// Wait for final event notification during the end of the subscription.
if (!recordsDeliveryQueue.isEmpty()) {
// This will prevent further events from getting scheduled, during the wait period.
pendingActiveSubscriptionShutdown = true;
try {
// Wait for the configured time to get a notification for already delivered event, if any.
lockObject.wait(TIME_TO_WAIT_FOR_FINAL_ACK_MILLIS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// Clear the queue to remove any remaining entries from the queue.
recordsDeliveryQueue.clear();
// Set pendingActiveSubscriptionShutdown to default value.
pendingActiveSubscriptionShutdown = false;
}
}

protected void logAcquireTimeoutMessage(Throwable t) {
Expand Down Expand Up @@ -490,13 +480,15 @@ private void updateAvailableQueueSpaceAndRequestUpstream(RecordFlow triggeringFl
}
}

private boolean shouldShutdownSubscriptionNow() {
return recordsDeliveryQueue.isEmpty();
}

private void onComplete(RecordFlow triggeringFlow) {
synchronized (lockObject) {
log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#onComplete) @ {} id: {}", shardId,
triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId);

resetRecordsDeliveryStateOnSubscriptionShutdown();

triggeringFlow.cancel();
if (!hasValidSubscriber()) {
log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#onComplete) @ {} id: {}", shardId,
Expand All @@ -512,7 +504,7 @@ private void onComplete(RecordFlow triggeringFlow) {
}

if (currentSequenceNumber != null) {
log.debug("{}: Shard hasn't ended resubscribing.", shardId);
log.debug("{}: Shard hasn't ended. Resubscribing.", shardId);
subscribeToShard(currentSequenceNumber);
} else {
log.debug("{}: Shard has ended completing subscriber.", shardId);
Expand Down Expand Up @@ -732,6 +724,18 @@ public void responseReceived(SubscribeToShardResponse response) {

@Override
public void exceptionOccurred(Throwable throwable) {
synchronized (parent.lockObject) {
if (parent.shouldShutdownSubscriptionNow()) {
executeExceptionOccurred(throwable);
} else {
final SubscriptionShutdownEvent subscriptionShutdownEvent = new SubscriptionShutdownEvent(
() -> executeExceptionOccurred(throwable), "onError", throwable);
tryEnqueueSubscriptionShutdownEvent(subscriptionShutdownEvent);
}
}
}

private void executeExceptionOccurred(Throwable throwable) {
synchronized (parent.lockObject) {

log.debug("{}: [SubscriptionLifetime]: (RecordFlow#exceptionOccurred) @ {} id: {} -- {}: {}",
Expand Down Expand Up @@ -759,6 +763,32 @@ public void exceptionOccurred(Throwable throwable) {

@Override
public void complete() {
synchronized (parent.lockObject) {
if (parent.shouldShutdownSubscriptionNow()) {
executeComplete();
} else {
final SubscriptionShutdownEvent subscriptionShutdownEvent = new SubscriptionShutdownEvent(
() -> executeComplete(), "onComplete");
tryEnqueueSubscriptionShutdownEvent(subscriptionShutdownEvent);
}
}
}

// This method is not thread safe. This needs to be executed after acquiring lock on parent.lockObject
private void tryEnqueueSubscriptionShutdownEvent(SubscriptionShutdownEvent subscriptionShutdownEvent) {
try {
parent.recordsDeliveryQueue
.add(new RecordsRetrievedContext(Either.right(subscriptionShutdownEvent), this, Instant.now()));
} catch (Exception e) {
log.warn(
"{}: Unable to enqueue the {} shutdown event due to capacity restrictions in delivery queue with remaining capacity {}. Ignoring. ",
parent.shardId, subscriptionShutdownEvent.getEventIdentifier(),
parent.recordsDeliveryQueue.remainingCapacity(),
subscriptionShutdownEvent.getShutdownEventThrowableOptional());
}
}

private void executeComplete() {
synchronized (parent.lockObject) {
log.debug("{}: [SubscriptionLifetime]: (RecordFlow#complete) @ {} id: {} -- Connection completed",
parent.shardId, connectionStartedAt, subscribeToShardId);
Expand Down
Loading

0 comments on commit a94dc7d

Please sign in to comment.