From 1ba010cd51a62a2506c6058374aca27cd3f335f6 Mon Sep 17 00:00:00 2001 From: "Pfifer, Justin" Date: Mon, 20 Aug 2018 14:51:47 -0700 Subject: [PATCH] Rename outstandingRquests to availableQueueSpace outstandingRequests was actually representing the available space in the RxJava queue. This renames it to better match reality. Also changed to only make the request if there is available queue space. We now decrement availableQueueSpace ahead of determine whether to request another item. --- .../fanout/FanOutRecordsPublisher.java | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index 6821f749a..978cd9f97 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -63,7 +63,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { private boolean isFirstConnection = true; private Subscriber subscriber; - private long outstandingRequests = 0; + private long availableQueueSpace = 0; @Override public void start(ExtendedSequenceNumber extendedSequenceNumber, @@ -129,8 +129,8 @@ private void errorOccurred(RecordFlow triggeringFlow, Throwable t) { shardId, flow.connectionStartedAt, flow.subscribeToShardId, category, t); flow.cancel(); } - log.debug("{}: outstandingRequests zeroing from {}", shardId, outstandingRequests); - outstandingRequests = 0; + log.debug("{}: availableQueueSpace zeroing from {}", shardId, availableQueueSpace); + availableQueueSpace = 0; try { handleFlowError(t); @@ -225,13 +225,15 @@ private void recordsReceived(RecordFlow triggeringFlow, SubscribeToShardEvent re errorOccurred(triggeringFlow, t); } - if (outstandingRequests > 0) { - outstandingRequests--; - triggeringFlow.request(1); - } else { + if (availableQueueSpace <= 0) { log.debug( - "{}: [SubscriptionLifetime] (FanOutRecordsPublisher#recordsReceived) @ {} id: {} -- Attempted to decrement outstandingRequests to below 0", + "{}: [SubscriptionLifetime] (FanOutRecordsPublisher#recordsReceived) @ {} id: {} -- Attempted to decrement availableQueueSpace to below 0", shardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId); + } else { + availableQueueSpace--; + if (availableQueueSpace > 0) { + triggeringFlow.request(1); + } } } } @@ -322,8 +324,8 @@ public void request(long n) { errorOccurred(flow, new IllegalStateException("Attempted to request on a null flow.")); return; } - long previous = outstandingRequests; - outstandingRequests += n; + long previous = availableQueueSpace; + availableQueueSpace += n; if (previous <= 0) { flow.request(1); } @@ -349,7 +351,7 @@ public void cancel() { log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher/Subscription#cancel) @ {} id: {}", shardId, flow.connectionStartedAt, flow.subscribeToShardId); flow.cancel(); - outstandingRequests = 0; + availableQueueSpace = 0; } } } @@ -591,8 +593,8 @@ public void onSubscribe(Subscription s) { } log.debug( "{}: [SubscriptionLifetime]: (RecordSubscription#onSubscribe) @ {} id: {} -- Outstanding: {} items so requesting an item", - parent.shardId, connectionStartedAt, subscribeToShardId, parent.outstandingRequests); - if (parent.outstandingRequests > 0) { + parent.shardId, connectionStartedAt, subscribeToShardId, parent.availableQueueSpace); + if (parent.availableQueueSpace > 0) { request(1); } }