From 91dac0050011a6b29efbf9030b32b86f146ba7a5 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Sun, 8 Sep 2019 11:01:12 -0700 Subject: [PATCH 1/4] Allowing consumers to drain the delivery queue on subscription end --- .../fanout/FanOutRecordsPublisher.java | 130 +++++++++++------- .../polling/PrefetchRecordsPublisherTest.java | 7 +- 2 files changed, 81 insertions(+), 56 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index 8aea87351..d9b4fdf3e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -32,6 +32,7 @@ import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponse; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; +import software.amazon.awssdk.utils.Either; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.KinesisRequestsBuilder; @@ -63,8 +64,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher { private static final ThrowableCategory ACQUIRE_TIMEOUT_CATEGORY = new ThrowableCategory( ThrowableType.ACQUIRE_TIMEOUT); private static final ThrowableCategory READ_TIMEOUT_CATEGORY = new ThrowableCategory(ThrowableType.READ_TIMEOUT); - private static final int MAX_EVENT_BURST_FROM_SERVICE = 10; - private static final long TIME_TO_WAIT_FOR_FINAL_ACK_MILLIS = 1000; + // Max burst of 10 payload events + 1 terminal event (onError/onComplete) from the service. + private static final int MAX_EVENT_BURST_FROM_SERVICE = 10 + 1; private final KinesisAsyncClient kinesis; private final String shardId; @@ -82,9 +83,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher { private Subscriber subscriber; private long availableQueueSpace = 0; - private BlockingQueue recordsDeliveryQueue = new LinkedBlockingQueue<>(MAX_EVENT_BURST_FROM_SERVICE); - // Flag to indicate if the active subscription is being torn down. - private boolean pendingActiveSubscriptionShutdown = false; + private BlockingQueue recordsDeliveryQueue = new LinkedBlockingQueue<>( + MAX_EVENT_BURST_FROM_SERVICE); @Override public void start(ExtendedSequenceNumber extendedSequenceNumber, @@ -135,13 +135,6 @@ public void notify(RecordsDeliveryAck recordsDeliveryAck) { triggeringFlow = evictAckedEventAndScheduleNextEvent(recordsDeliveryAck); } catch (Throwable t) { errorOccurred(triggeringFlow, t); - } finally { - // Notify all the actors who are waiting for the records ack event. - // Here, when the active subscription is being torn down, the completing thread will - // wait for the last delivered records to send back the ack, to prevent sending duplicate records. - if(pendingActiveSubscriptionShutdown) { - lockObject.notifyAll(); - } } if (triggeringFlow != null) { updateAvailableQueueSpaceAndRequestUpstream(triggeringFlow); @@ -158,20 +151,26 @@ RecordFlow evictAckedEventAndScheduleNextEvent(RecordsDeliveryAck recordsDeliver // RecordFlow of the current event that needs to be returned RecordFlow flowToBeReturned = null; + final RecordsRetrieved recordsRetrieved = recordsRetrievedContext != null ? + recordsRetrievedContext.getRecordsOrShutDownEvent() + .map(recordsEvent -> recordsEvent, shutDownEvent -> null) : null; + // Check if the ack corresponds to the head of the delivery queue. - if (recordsRetrievedContext != null && recordsRetrievedContext.getRecordsRetrieved().batchUniqueIdentifier() + if (recordsRetrieved != null && recordsRetrieved.batchUniqueIdentifier() .equals(recordsDeliveryAck.batchUniqueIdentifier())) { // It is now safe to remove the element recordsDeliveryQueue.poll(); // Take action based on the time spent by the event in queue. takeDelayedDeliveryActionIfRequired(shardId, recordsRetrievedContext.getEnqueueTimestamp(), log); // Update current sequence number for the successfully delivered event. - currentSequenceNumber = ((FanoutRecordsRetrieved)recordsRetrievedContext.getRecordsRetrieved()).continuationSequenceNumber(); + currentSequenceNumber = ((FanoutRecordsRetrieved)recordsRetrieved).continuationSequenceNumber(); // Update the triggering flow for post scheduling upstream request. flowToBeReturned = recordsRetrievedContext.getRecordFlow(); - // Try scheduling the next event in the queue, if available. + // Try scheduling the next event in the queue or execute the subscription shutdown action. if (!recordsDeliveryQueue.isEmpty()) { - scheduleNextEvent(recordsDeliveryQueue.peek().getRecordsRetrieved()); + recordsDeliveryQueue.peek().getRecordsOrShutDownEvent() + .apply(recordsEvent -> scheduleNextEvent(recordsEvent), + shutDownEvent -> shutDownEvent.getSubscriptionShutDownAction().run()); } } else { // Check if the mismatched event belongs to active flow. If publisher receives an ack for a @@ -197,7 +196,7 @@ RecordFlow evictAckedEventAndScheduleNextEvent(RecordsDeliveryAck recordsDeliver @VisibleForTesting void bufferCurrentEventAndScheduleIfRequired(RecordsRetrieved recordsRetrieved, RecordFlow triggeringFlow) { final RecordsRetrievedContext recordsRetrievedContext = - new RecordsRetrievedContext(recordsRetrieved, triggeringFlow, Instant.now()); + new RecordsRetrievedContext(Either.left(recordsRetrieved), triggeringFlow, Instant.now()); try { // Try enqueueing the RecordsRetrieved batch to the queue, which would throw exception on failure. // Note: This does not block wait to enqueue. @@ -217,20 +216,35 @@ void bufferCurrentEventAndScheduleIfRequired(RecordsRetrieved recordsRetrieved, } // This method is not thread-safe. You need to acquire a lock in the caller in order to execute this. - // Schedule the next event only when the active subscription is not pending shutdown. private void scheduleNextEvent(RecordsRetrieved recordsRetrieved) { - if (!pendingActiveSubscriptionShutdown) { - subscriber.onNext(recordsRetrieved); - } + subscriber.onNext(recordsRetrieved); } @Data private static final class RecordsRetrievedContext { - private final RecordsRetrieved recordsRetrieved; + private final Either recordsOrShutDownEvent; private final RecordFlow recordFlow; private final Instant enqueueTimestamp; } + @Getter + private static final class SubscriptionShutDownEvent { + private final Runnable subscriptionShutDownAction; + private final String eventIdentifier; + private final Throwable shutDownEventThrowableOptional; + + SubscriptionShutDownEvent(Runnable subscriptionShutDownAction, String eventIdentifier, Throwable shutDownEventThrowableOptional) { + this.subscriptionShutDownAction = subscriptionShutDownAction; + this.eventIdentifier = eventIdentifier; + this.shutDownEventThrowableOptional = shutDownEventThrowableOptional; + } + + SubscriptionShutDownEvent(Runnable subscriptionShutDownAction, String eventIdentifier) { + this(subscriptionShutDownAction, eventIdentifier, null); + } + + } + private boolean hasValidSubscriber() { return subscriber != null; } @@ -280,9 +294,6 @@ private void errorOccurred(RecordFlow triggeringFlow, Throwable t) { return; } - // Clear the delivery buffer so that next subscription don't yield duplicate records. - resetRecordsDeliveryStateOnSubscriptionShutdown(); - Throwable propagationThrowable = t; ThrowableCategory category = throwableCategory(propagationThrowable); @@ -339,31 +350,6 @@ private void resetRecordsDeliveryStateOnSubscriptionOnInit() { + "previous subscription - {} ", shardId, subscribeToShardId); recordsDeliveryQueue.clear(); } - if(pendingActiveSubscriptionShutdown) { - log.warn("{}: Found current subscription to be in pendingShutdown state while initializing. This indicates unsuccessful clean up of" - + "previous subscription - {} ", shardId, subscribeToShardId); - // Set pendingActiveSubscriptionShutdown to default value. - pendingActiveSubscriptionShutdown = false; - } - } - - // This method is not thread safe. This needs to be executed after acquiring lock on this.lockObject - private void resetRecordsDeliveryStateOnSubscriptionShutdown() { - // Wait for final event notification during the end of the subscription. - if (!recordsDeliveryQueue.isEmpty()) { - // This will prevent further events from getting scheduled, during the wait period. - pendingActiveSubscriptionShutdown = true; - try { - // Wait for the configured time to get a notification for already delivered event, if any. - lockObject.wait(TIME_TO_WAIT_FOR_FINAL_ACK_MILLIS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - // Clear the queue to remove any remaining entries from the queue. - recordsDeliveryQueue.clear(); - // Set pendingActiveSubscriptionShutdown to default value. - pendingActiveSubscriptionShutdown = false; - } } protected void logAcquireTimeoutMessage(Throwable t) { @@ -490,13 +476,15 @@ private void updateAvailableQueueSpaceAndRequestUpstream(RecordFlow triggeringFl } } + private boolean shouldShutDownSubscriptionNow() { + return recordsDeliveryQueue.isEmpty(); + } + private void onComplete(RecordFlow triggeringFlow) { synchronized (lockObject) { log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#onComplete) @ {} id: {}", shardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId); - resetRecordsDeliveryStateOnSubscriptionShutdown(); - triggeringFlow.cancel(); if (!hasValidSubscriber()) { log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#onComplete) @ {} id: {}", shardId, @@ -732,6 +720,18 @@ public void responseReceived(SubscribeToShardResponse response) { @Override public void exceptionOccurred(Throwable throwable) { + synchronized (parent.lockObject) { + if (parent.shouldShutDownSubscriptionNow()) { + executeExceptionOccurred(throwable); + } else { + final SubscriptionShutDownEvent subscriptionShutDownEvent = new SubscriptionShutDownEvent( + () -> executeExceptionOccurred(throwable), "onError", throwable); + tryEnqueueSubscriptionShutDownEvent(subscriptionShutDownEvent); + } + } + } + + private void executeExceptionOccurred(Throwable throwable) { synchronized (parent.lockObject) { log.debug("{}: [SubscriptionLifetime]: (RecordFlow#exceptionOccurred) @ {} id: {} -- {}: {}", @@ -759,6 +759,32 @@ public void exceptionOccurred(Throwable throwable) { @Override public void complete() { + synchronized (parent.lockObject) { + if (parent.shouldShutDownSubscriptionNow()) { + executeComplete(); + } else { + final SubscriptionShutDownEvent subscriptionShutDownEvent = new SubscriptionShutDownEvent( + () -> executeComplete(), "onComplete"); + tryEnqueueSubscriptionShutDownEvent(subscriptionShutDownEvent); + } + } + } + + // This method is not thread safe. This needs to be executed after acquiring lock on parent.lockObject + private void tryEnqueueSubscriptionShutDownEvent(SubscriptionShutDownEvent subscriptionShutDownEvent) { + try { + parent.recordsDeliveryQueue + .add(new RecordsRetrievedContext(Either.right(subscriptionShutDownEvent), this, Instant.now())); + } catch (Exception e) { + log.warn( + "{}: Unable to enqueue the {} shutdown event due to capacity restrictions in delivery queue with remaining capacity {}. Ignoring. ", + parent.shardId, subscriptionShutDownEvent.getEventIdentifier(), + parent.recordsDeliveryQueue.remainingCapacity(), + subscriptionShutDownEvent.getShutDownEventThrowableOptional()); + } + } + + private void executeComplete() { synchronized (parent.lockObject) { log.debug("{}: [SubscriptionLifetime]: (RecordFlow#complete) @ {} id: {} -- Connection completed", parent.shardId, connectionStartedAt, subscribeToShardId); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java index 8c1aa7438..cd7f6ba5a 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java @@ -304,7 +304,7 @@ public void testNoDeadlockOnFullQueue() { log.info("Queue is currently at {} starting subscriber", getRecordsCache.getRecordsResultQueue.size()); AtomicInteger receivedItems = new AtomicInteger(0); - final int expectedItems = MAX_SIZE * 1000; + final int expectedItems = MAX_SIZE * 10; Object lock = new Object(); @@ -383,7 +383,7 @@ public void testNoDeadlockOnFullQueueAndLossOfNotification() { log.info("Queue is currently at {} starting subscriber", getRecordsCache.getRecordsResultQueue.size()); AtomicInteger receivedItems = new AtomicInteger(0); - final int expectedItems = MAX_SIZE * 100; + final int expectedItems = MAX_SIZE * 20; Object lock = new Object(); @@ -521,7 +521,7 @@ public GetRecordsResponse answer(InvocationOnMock invocation) throws Throwable { private static class LossyNotificationSubscriber extends ShardConsumerNotifyingSubscriber { - private static final int LOSS_EVERY_NTH_RECORD = 100; + private static final int LOSS_EVERY_NTH_RECORD = 50; private static int recordCounter = 0; private static final ScheduledExecutorService consumerHealthChecker = Executors.newScheduledThreadPool(1); @@ -531,7 +531,6 @@ public LossyNotificationSubscriber(Subscriber delegate, Record @Override public void onNext(RecordsRetrieved recordsRetrieved) { - log.info("Subscriber received onNext"); if (!(recordCounter % LOSS_EVERY_NTH_RECORD == LOSS_EVERY_NTH_RECORD - 1)) { getRecordsPublisher().notify(getRecordsDeliveryAck(recordsRetrieved)); getDelegateSubscriber().onNext(recordsRetrieved); From 6fbb4cd45669a977caa1c5fdafe88039cec7d622 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Sun, 8 Sep 2019 11:15:52 -0700 Subject: [PATCH 2/4] Test cases fix --- .../kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java index 1da65f2df..a9978c4db 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java @@ -517,7 +517,7 @@ public void testIfStreamOfEventsAreDeliveredInOrderWithBackpressureAdheringServi CountDownLatch servicePublisherTaskCompletionLatch = new CountDownLatch(1); int totalServicePublisherEvents = 1000; - int initialDemand = 10; + int initialDemand = 11; BackpressureAdheringServicePublisher servicePublisher = new BackpressureAdheringServicePublisher(servicePublisherAction, totalServicePublisherEvents, servicePublisherTaskCompletionLatch, initialDemand); @@ -844,7 +844,7 @@ public void testIfBufferingRecordsOverCapacityPublishesOneEventAndThrows() { @Override public void onComplete() {} }); try { - IntStream.rangeClosed(1, 11).forEach( + IntStream.rangeClosed(1, 12).forEach( i -> fanOutRecordsPublisher.bufferCurrentEventAndScheduleIfRequired(recordsRetrieved, recordFlow)); fail("Should throw Queue full exception"); } catch (IllegalStateException e) { From 67d825fc74cdec580aba6114d0eab38ffda8899e Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Mon, 9 Sep 2019 00:27:19 -0700 Subject: [PATCH 3/4] Added test cases --- .../fanout/FanOutRecordsPublisher.java | 2 +- .../fanout/FanOutRecordsPublisherTest.java | 344 ++++++++++++++++++ 2 files changed, 345 insertions(+), 1 deletion(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index d9b4fdf3e..838d54e5e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -500,7 +500,7 @@ private void onComplete(RecordFlow triggeringFlow) { } if (currentSequenceNumber != null) { - log.debug("{}: Shard hasn't ended resubscribing.", shardId); + log.debug("{}: Shard hasn't ended. Resubscribing.", shardId); subscribeToShard(currentSequenceNumber); } else { log.debug("{}: Shard has ended completing subscriber.", shardId); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java index a9978c4db..6f5cf4b2d 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisherTest.java @@ -8,6 +8,7 @@ import io.reactivex.subscribers.SafeSubscriber; import lombok.Data; import lombok.RequiredArgsConstructor; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.hamcrest.Description; import org.hamcrest.Matcher; @@ -65,6 +66,7 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -405,6 +407,312 @@ public void testIfStreamOfEventsAreDeliveredInOrderWithBackpressureAdheringServi } + @Test + public void testIfStreamOfEventsAndOnCompleteAreDeliveredInOrderWithBackpressureAdheringServicePublisher() throws Exception { + FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); + + ArgumentCaptor captor = ArgumentCaptor + .forClass(FanOutRecordsPublisher.RecordSubscription.class); + ArgumentCaptor flowCaptor = ArgumentCaptor + .forClass(FanOutRecordsPublisher.RecordFlow.class); + + List records = Stream.of(1, 2, 3).map(this::makeRecord).collect(Collectors.toList()); + + Consumer servicePublisherAction = contSeqNum -> captor.getValue().onNext( + SubscribeToShardEvent.builder() + .millisBehindLatest(100L) + .continuationSequenceNumber(contSeqNum + "") + .records(records) + .build()); + + CountDownLatch servicePublisherTaskCompletionLatch = new CountDownLatch(2); + int totalServicePublisherEvents = 1000; + int initialDemand = 10; + int triggerCompleteAtNthEvent = 200; + BackpressureAdheringServicePublisher servicePublisher = new BackpressureAdheringServicePublisher( + servicePublisherAction, totalServicePublisherEvents, servicePublisherTaskCompletionLatch, + initialDemand); + servicePublisher.setCompleteTrigger(triggerCompleteAtNthEvent, () -> flowCaptor.getValue().complete()); + + doNothing().when(publisher).subscribe(captor.capture()); + + source.start(ExtendedSequenceNumber.LATEST, + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)); + + List receivedInput = new ArrayList<>(); + + Subscriber shardConsumerSubscriber = new ShardConsumerNotifyingSubscriber( + new Subscriber() { + private Subscription subscription; + private int lastSeenSeqNum = 0; + + @Override public void onSubscribe(Subscription s) { + subscription = s; + subscription.request(1); + servicePublisher.request(1); + } + + @Override public void onNext(RecordsRetrieved input) { + receivedInput.add(input.processRecordsInput()); + assertEquals("" + ++lastSeenSeqNum, ((FanOutRecordsPublisher.FanoutRecordsRetrieved)input).continuationSequenceNumber()); + subscription.request(1); + servicePublisher.request(1); + if(receivedInput.size() == triggerCompleteAtNthEvent) { + servicePublisherTaskCompletionLatch.countDown(); + } + } + + @Override public void onError(Throwable t) { + log.error("Caught throwable in subscriber", t); + fail("Caught throwable in subscriber"); + } + + @Override public void onComplete() { + fail("OnComplete called when not expected"); + } + }, source); + + ExecutorService executorService = getTestExecutor(); + Scheduler testScheduler = getScheduler(getInitiallyBlockingExecutor(getSpiedExecutor(executorService))); + int bufferSize = 8; + + Flowable.fromPublisher(source).subscribeOn(testScheduler).observeOn(testScheduler, true, bufferSize) + .subscribe(shardConsumerSubscriber); + + verify(kinesisClient, times(1)).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture()); + + flowCaptor.getValue().onEventStream(publisher); + captor.getValue().onSubscribe(subscription); + + List matchers = records.stream().map(KinesisClientRecordMatcher::new) + .collect(Collectors.toList()); + + executorService.submit(servicePublisher); + servicePublisherTaskCompletionLatch.await(5000, TimeUnit.MILLISECONDS); + + assertThat(receivedInput.size(), equalTo(triggerCompleteAtNthEvent)); + + receivedInput.stream().map(ProcessRecordsInput::records).forEach(clientRecordsList -> { + assertThat(clientRecordsList.size(), equalTo(matchers.size())); + for (int i = 0; i < clientRecordsList.size(); ++i) { + assertThat(clientRecordsList.get(i), matchers.get(i)); + } + }); + + assertThat(source.getCurrentSequenceNumber(), equalTo(triggerCompleteAtNthEvent + "")); + // In non-shard end cases, upon successful completion, the publisher would re-subscribe to service. + verify(kinesisClient, times(2)).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture()); + + } + + @Test + public void testIfShardEndEventAndOnCompleteAreDeliveredInOrderWithBackpressureAdheringServicePublisher() throws Exception { + FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); + + ArgumentCaptor captor = ArgumentCaptor + .forClass(FanOutRecordsPublisher.RecordSubscription.class); + ArgumentCaptor flowCaptor = ArgumentCaptor + .forClass(FanOutRecordsPublisher.RecordFlow.class); + + List records = Stream.of(1, 2, 3).map(this::makeRecord).collect(Collectors.toList()); + + Consumer servicePublisherAction = contSeqNum -> captor.getValue().onNext( + SubscribeToShardEvent.builder() + .millisBehindLatest(100L) + .continuationSequenceNumber(contSeqNum + "") + .records(records) + .build()); + + Consumer servicePublisherShardEndAction = contSeqNum -> captor.getValue().onNext( + SubscribeToShardEvent.builder() + .millisBehindLatest(100L) + .continuationSequenceNumber(null) + .records(records) + .build()); + + CountDownLatch servicePublisherTaskCompletionLatch = new CountDownLatch(2); + int totalServicePublisherEvents = 1000; + int initialDemand = 10; + int triggerCompleteAtNthEvent = 200; + BackpressureAdheringServicePublisher servicePublisher = new BackpressureAdheringServicePublisher( + servicePublisherAction, totalServicePublisherEvents, servicePublisherTaskCompletionLatch, + initialDemand); + + servicePublisher + .setShardEndAndCompleteTrigger(triggerCompleteAtNthEvent, () -> flowCaptor.getValue().complete(), + servicePublisherShardEndAction); + + doNothing().when(publisher).subscribe(captor.capture()); + + source.start(ExtendedSequenceNumber.LATEST, + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)); + + List receivedInput = new ArrayList<>(); + + final boolean[] isOnCompleteTriggered = { false }; + + Subscriber shardConsumerSubscriber = new ShardConsumerNotifyingSubscriber( + new Subscriber() { + private Subscription subscription; + private int lastSeenSeqNum = 0; + + @Override public void onSubscribe(Subscription s) { + subscription = s; + subscription.request(1); + servicePublisher.request(1); + } + + @Override public void onNext(RecordsRetrieved input) { + receivedInput.add(input.processRecordsInput()); + subscription.request(1); + servicePublisher.request(1); + if(receivedInput.size() == triggerCompleteAtNthEvent) { + servicePublisherTaskCompletionLatch.countDown(); + } + } + + @Override public void onError(Throwable t) { + log.error("Caught throwable in subscriber", t); + fail("Caught throwable in subscriber"); + } + + @Override public void onComplete() { + isOnCompleteTriggered[0] = true; + } + }, source); + + ExecutorService executorService = getTestExecutor(); + Scheduler testScheduler = getScheduler(getInitiallyBlockingExecutor(getSpiedExecutor(executorService))); + int bufferSize = 8; + + Flowable.fromPublisher(source).subscribeOn(testScheduler).observeOn(testScheduler, true, bufferSize) + .subscribe(shardConsumerSubscriber); + + verify(kinesisClient, times(1)).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture()); + + flowCaptor.getValue().onEventStream(publisher); + captor.getValue().onSubscribe(subscription); + + List matchers = records.stream().map(KinesisClientRecordMatcher::new) + .collect(Collectors.toList()); + + executorService.submit(servicePublisher); + servicePublisherTaskCompletionLatch.await(5000, TimeUnit.MILLISECONDS); + + assertThat(receivedInput.size(), equalTo(triggerCompleteAtNthEvent)); + + receivedInput.stream().map(ProcessRecordsInput::records).forEach(clientRecordsList -> { + assertThat(clientRecordsList.size(), equalTo(matchers.size())); + for (int i = 0; i < clientRecordsList.size(); ++i) { + assertThat(clientRecordsList.get(i), matchers.get(i)); + } + }); + + assertNull(source.getCurrentSequenceNumber()); + // With shard end event, onComplete must be propagated to the subscriber. + assertTrue("OnComplete should be triggered", isOnCompleteTriggered[0]); + + } + + @Test + public void testIfStreamOfEventsAndOnErrorAreDeliveredInOrderWithBackpressureAdheringServicePublisher() throws Exception { + FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); + + ArgumentCaptor captor = ArgumentCaptor + .forClass(FanOutRecordsPublisher.RecordSubscription.class); + ArgumentCaptor flowCaptor = ArgumentCaptor + .forClass(FanOutRecordsPublisher.RecordFlow.class); + + List records = Stream.of(1, 2, 3).map(this::makeRecord).collect(Collectors.toList()); + + Consumer servicePublisherAction = contSeqNum -> captor.getValue().onNext( + SubscribeToShardEvent.builder() + .millisBehindLatest(100L) + .continuationSequenceNumber(contSeqNum + "") + .records(records) + .build()); + + CountDownLatch servicePublisherTaskCompletionLatch = new CountDownLatch(2); + int totalServicePublisherEvents = 1000; + int initialDemand = 10; + int triggerErrorAtNthEvent = 241; + BackpressureAdheringServicePublisher servicePublisher = new BackpressureAdheringServicePublisher( + servicePublisherAction, totalServicePublisherEvents, servicePublisherTaskCompletionLatch, + initialDemand); + servicePublisher.setErrorTrigger(triggerErrorAtNthEvent, + () -> flowCaptor.getValue().exceptionOccurred(new RuntimeException("Service Exception"))); + + doNothing().when(publisher).subscribe(captor.capture()); + + source.start(ExtendedSequenceNumber.LATEST, + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)); + + final boolean[] isOnErrorThrown = { false }; + + List receivedInput = new ArrayList<>(); + + Subscriber shardConsumerSubscriber = new ShardConsumerNotifyingSubscriber( + new Subscriber() { + private Subscription subscription; + private int lastSeenSeqNum = 0; + + @Override public void onSubscribe(Subscription s) { + subscription = s; + subscription.request(1); + servicePublisher.request(1); + } + + @Override public void onNext(RecordsRetrieved input) { + receivedInput.add(input.processRecordsInput()); + assertEquals("" + ++lastSeenSeqNum, ((FanOutRecordsPublisher.FanoutRecordsRetrieved)input).continuationSequenceNumber()); + subscription.request(1); + servicePublisher.request(1); + if(receivedInput.size() == triggerErrorAtNthEvent) { + servicePublisherTaskCompletionLatch.countDown(); + } + } + + @Override public void onError(Throwable t) { + log.error("Caught throwable in subscriber", t); + isOnErrorThrown[0] = true; + } + + @Override public void onComplete() { + fail("OnComplete called when not expected"); + } + }, source); + + ExecutorService executorService = getTestExecutor(); + Scheduler testScheduler = getScheduler(getInitiallyBlockingExecutor(getSpiedExecutor(executorService))); + int bufferSize = 8; + + Flowable.fromPublisher(source).subscribeOn(testScheduler).observeOn(testScheduler, true, bufferSize) + .subscribe(shardConsumerSubscriber); + + verify(kinesisClient).subscribeToShard(any(SubscribeToShardRequest.class), flowCaptor.capture()); + flowCaptor.getValue().onEventStream(publisher); + captor.getValue().onSubscribe(subscription); + + List matchers = records.stream().map(KinesisClientRecordMatcher::new) + .collect(Collectors.toList()); + + executorService.submit(servicePublisher); + servicePublisherTaskCompletionLatch.await(5000, TimeUnit.MILLISECONDS); + + assertThat(receivedInput.size(), equalTo(triggerErrorAtNthEvent)); + + receivedInput.stream().map(ProcessRecordsInput::records).forEach(clientRecordsList -> { + assertThat(clientRecordsList.size(), equalTo(matchers.size())); + for (int i = 0; i < clientRecordsList.size(); ++i) { + assertThat(clientRecordsList.get(i), matchers.get(i)); + } + }); + + assertThat(source.getCurrentSequenceNumber(), equalTo(triggerErrorAtNthEvent + "")); + assertTrue("OnError should have been thrown", isOnErrorThrown[0]); + + } + @Test public void testIfStreamOfEventsAreDeliveredInOrderWithBackpressureAdheringServicePublisherHavingInitialBurstWithinLimit() throws Exception { FanOutRecordsPublisher source = new FanOutRecordsPublisher(kinesisClient, SHARD_ID, CONSUMER_ARN); @@ -1131,10 +1439,17 @@ private static class BackpressureAdheringServicePublisher implements Runnable { private final Integer numOfTimes; private final CountDownLatch taskCompletionLatch; private final Semaphore demandNotifier; + private Integer sendCompletionAt; + private Runnable completeAction; + private Integer sendErrorAt; + private Runnable errorAction; + private Consumer shardEndAction; BackpressureAdheringServicePublisher(Consumer action, Integer numOfTimes, CountDownLatch taskCompletionLatch, Integer initialDemand) { this(action, numOfTimes, taskCompletionLatch, new Semaphore(initialDemand)); + sendCompletionAt = Integer.MAX_VALUE; + sendErrorAt = Integer.MAX_VALUE; } public void request(int n) { @@ -1144,10 +1459,39 @@ public void request(int n) { public void run() { for (int i = 1; i <= numOfTimes; ) { demandNotifier.acquireUninterruptibly(); + if(i == sendCompletionAt) { + if(shardEndAction != null) { + shardEndAction.accept(i++); + } else { + action.accept(i++); + } + completeAction.run(); + break; + } + if(i == sendErrorAt) { + action.accept(i++); + errorAction.run(); + break; + } action.accept(i++); } taskCompletionLatch.countDown(); } + + public void setCompleteTrigger(Integer sendCompletionAt, Runnable completeAction) { + this.sendCompletionAt = sendCompletionAt; + this.completeAction = completeAction; + } + + public void setShardEndAndCompleteTrigger(Integer sendCompletionAt, Runnable completeAction, Consumer shardEndAction) { + setCompleteTrigger(sendCompletionAt, completeAction); + this.shardEndAction = shardEndAction; + } + + public void setErrorTrigger(Integer sendErrorAt, Runnable errorAction) { + this.sendErrorAt = sendErrorAt; + this.errorAction = errorAction; + } } private Record makeRecord(String sequenceNumber) { From 9f6fde9c1b95b2db914ee2ffb271270926a160b6 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Tue, 10 Sep 2019 17:50:33 -0700 Subject: [PATCH 4/4] Made feedback changes --- .../fanout/FanOutRecordsPublisher.java | 66 ++++++++++--------- 1 file changed, 35 insertions(+), 31 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index 838d54e5e..426ba6c93 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -16,6 +16,7 @@ package software.amazon.kinesis.retrieval.fanout; import com.google.common.annotations.VisibleForTesting; +import lombok.AccessLevel; import lombok.Data; import lombok.Getter; import lombok.NonNull; @@ -152,8 +153,7 @@ RecordFlow evictAckedEventAndScheduleNextEvent(RecordsDeliveryAck recordsDeliver RecordFlow flowToBeReturned = null; final RecordsRetrieved recordsRetrieved = recordsRetrievedContext != null ? - recordsRetrievedContext.getRecordsOrShutDownEvent() - .map(recordsEvent -> recordsEvent, shutDownEvent -> null) : null; + recordsRetrievedContext.getRecordsRetrieved() : null; // Check if the ack corresponds to the head of the delivery queue. if (recordsRetrieved != null && recordsRetrieved.batchUniqueIdentifier() @@ -168,9 +168,7 @@ RecordFlow evictAckedEventAndScheduleNextEvent(RecordsDeliveryAck recordsDeliver flowToBeReturned = recordsRetrievedContext.getRecordFlow(); // Try scheduling the next event in the queue or execute the subscription shutdown action. if (!recordsDeliveryQueue.isEmpty()) { - recordsDeliveryQueue.peek().getRecordsOrShutDownEvent() - .apply(recordsEvent -> scheduleNextEvent(recordsEvent), - shutDownEvent -> shutDownEvent.getSubscriptionShutDownAction().run()); + recordsDeliveryQueue.peek().executeEventAction(subscriber); } } else { // Check if the mismatched event belongs to active flow. If publisher receives an ack for a @@ -203,7 +201,7 @@ void bufferCurrentEventAndScheduleIfRequired(RecordsRetrieved recordsRetrieved, recordsDeliveryQueue.add(recordsRetrievedContext); // If the current batch is the only element in the queue, then try scheduling the event delivery. if (recordsDeliveryQueue.size() == 1) { - scheduleNextEvent(recordsRetrieved); + subscriber.onNext(recordsRetrieved); } } catch (IllegalStateException e) { log.warn("{}: Unable to enqueue the payload due to capacity restrictions in delivery queue with remaining capacity {} ", @@ -215,32 +213,38 @@ void bufferCurrentEventAndScheduleIfRequired(RecordsRetrieved recordsRetrieved, } } - // This method is not thread-safe. You need to acquire a lock in the caller in order to execute this. - private void scheduleNextEvent(RecordsRetrieved recordsRetrieved) { - subscriber.onNext(recordsRetrieved); - } - @Data private static final class RecordsRetrievedContext { - private final Either recordsOrShutDownEvent; + @Getter(AccessLevel.NONE) + private final Either recordsOrShutdownEvent; private final RecordFlow recordFlow; private final Instant enqueueTimestamp; + + RecordsRetrieved getRecordsRetrieved() { + return recordsOrShutdownEvent.map(recordsEvent -> recordsEvent, shutdownEvent -> null); + } + + // This method is not thread-safe. You need to acquire a lock in the caller in order to execute this. + void executeEventAction(Subscriber subscriber) { + recordsOrShutdownEvent.apply(recordsEvent -> subscriber.onNext(recordsEvent), + shutdownEvent -> shutdownEvent.getSubscriptionShutdownAction().run()); + } } @Getter - private static final class SubscriptionShutDownEvent { - private final Runnable subscriptionShutDownAction; + private static final class SubscriptionShutdownEvent { + private final Runnable subscriptionShutdownAction; private final String eventIdentifier; - private final Throwable shutDownEventThrowableOptional; + private final Throwable shutdownEventThrowableOptional; - SubscriptionShutDownEvent(Runnable subscriptionShutDownAction, String eventIdentifier, Throwable shutDownEventThrowableOptional) { - this.subscriptionShutDownAction = subscriptionShutDownAction; + SubscriptionShutdownEvent(Runnable subscriptionShutdownAction, String eventIdentifier, Throwable shutdownEventThrowableOptional) { + this.subscriptionShutdownAction = subscriptionShutdownAction; this.eventIdentifier = eventIdentifier; - this.shutDownEventThrowableOptional = shutDownEventThrowableOptional; + this.shutdownEventThrowableOptional = shutdownEventThrowableOptional; } - SubscriptionShutDownEvent(Runnable subscriptionShutDownAction, String eventIdentifier) { - this(subscriptionShutDownAction, eventIdentifier, null); + SubscriptionShutdownEvent(Runnable subscriptionShutdownAction, String eventIdentifier) { + this(subscriptionShutdownAction, eventIdentifier, null); } } @@ -476,7 +480,7 @@ private void updateAvailableQueueSpaceAndRequestUpstream(RecordFlow triggeringFl } } - private boolean shouldShutDownSubscriptionNow() { + private boolean shouldShutdownSubscriptionNow() { return recordsDeliveryQueue.isEmpty(); } @@ -721,12 +725,12 @@ public void responseReceived(SubscribeToShardResponse response) { @Override public void exceptionOccurred(Throwable throwable) { synchronized (parent.lockObject) { - if (parent.shouldShutDownSubscriptionNow()) { + if (parent.shouldShutdownSubscriptionNow()) { executeExceptionOccurred(throwable); } else { - final SubscriptionShutDownEvent subscriptionShutDownEvent = new SubscriptionShutDownEvent( + final SubscriptionShutdownEvent subscriptionShutdownEvent = new SubscriptionShutdownEvent( () -> executeExceptionOccurred(throwable), "onError", throwable); - tryEnqueueSubscriptionShutDownEvent(subscriptionShutDownEvent); + tryEnqueueSubscriptionShutdownEvent(subscriptionShutdownEvent); } } } @@ -760,27 +764,27 @@ private void executeExceptionOccurred(Throwable throwable) { @Override public void complete() { synchronized (parent.lockObject) { - if (parent.shouldShutDownSubscriptionNow()) { + if (parent.shouldShutdownSubscriptionNow()) { executeComplete(); } else { - final SubscriptionShutDownEvent subscriptionShutDownEvent = new SubscriptionShutDownEvent( + final SubscriptionShutdownEvent subscriptionShutdownEvent = new SubscriptionShutdownEvent( () -> executeComplete(), "onComplete"); - tryEnqueueSubscriptionShutDownEvent(subscriptionShutDownEvent); + tryEnqueueSubscriptionShutdownEvent(subscriptionShutdownEvent); } } } // This method is not thread safe. This needs to be executed after acquiring lock on parent.lockObject - private void tryEnqueueSubscriptionShutDownEvent(SubscriptionShutDownEvent subscriptionShutDownEvent) { + private void tryEnqueueSubscriptionShutdownEvent(SubscriptionShutdownEvent subscriptionShutdownEvent) { try { parent.recordsDeliveryQueue - .add(new RecordsRetrievedContext(Either.right(subscriptionShutDownEvent), this, Instant.now())); + .add(new RecordsRetrievedContext(Either.right(subscriptionShutdownEvent), this, Instant.now())); } catch (Exception e) { log.warn( "{}: Unable to enqueue the {} shutdown event due to capacity restrictions in delivery queue with remaining capacity {}. Ignoring. ", - parent.shardId, subscriptionShutDownEvent.getEventIdentifier(), + parent.shardId, subscriptionShutdownEvent.getEventIdentifier(), parent.recordsDeliveryQueue.remainingCapacity(), - subscriptionShutDownEvent.getShutDownEventThrowableOptional()); + subscriptionShutdownEvent.getShutdownEventThrowableOptional()); } }