diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index 6821f749a..978cd9f97 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -63,7 +63,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { private boolean isFirstConnection = true; private Subscriber subscriber; - private long outstandingRequests = 0; + private long availableQueueSpace = 0; @Override public void start(ExtendedSequenceNumber extendedSequenceNumber, @@ -129,8 +129,8 @@ private void errorOccurred(RecordFlow triggeringFlow, Throwable t) { shardId, flow.connectionStartedAt, flow.subscribeToShardId, category, t); flow.cancel(); } - log.debug("{}: outstandingRequests zeroing from {}", shardId, outstandingRequests); - outstandingRequests = 0; + log.debug("{}: availableQueueSpace zeroing from {}", shardId, availableQueueSpace); + availableQueueSpace = 0; try { handleFlowError(t); @@ -225,13 +225,15 @@ private void recordsReceived(RecordFlow triggeringFlow, SubscribeToShardEvent re errorOccurred(triggeringFlow, t); } - if (outstandingRequests > 0) { - outstandingRequests--; - triggeringFlow.request(1); - } else { + if (availableQueueSpace <= 0) { log.debug( - "{}: [SubscriptionLifetime] (FanOutRecordsPublisher#recordsReceived) @ {} id: {} -- Attempted to decrement outstandingRequests to below 0", + "{}: [SubscriptionLifetime] (FanOutRecordsPublisher#recordsReceived) @ {} id: {} -- Attempted to decrement availableQueueSpace to below 0", shardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId); + } else { + availableQueueSpace--; + if (availableQueueSpace > 0) { + triggeringFlow.request(1); + } } } } @@ -322,8 +324,8 @@ public void request(long n) { errorOccurred(flow, new IllegalStateException("Attempted to request on a null flow.")); return; } - long previous = outstandingRequests; - outstandingRequests += n; + long previous = availableQueueSpace; + availableQueueSpace += n; if (previous <= 0) { flow.request(1); } @@ -349,7 +351,7 @@ public void cancel() { log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher/Subscription#cancel) @ {} id: {}", shardId, flow.connectionStartedAt, flow.subscribeToShardId); flow.cancel(); - outstandingRequests = 0; + availableQueueSpace = 0; } } } @@ -591,8 +593,8 @@ public void onSubscribe(Subscription s) { } log.debug( "{}: [SubscriptionLifetime]: (RecordSubscription#onSubscribe) @ {} id: {} -- Outstanding: {} items so requesting an item", - parent.shardId, connectionStartedAt, subscribeToShardId, parent.outstandingRequests); - if (parent.outstandingRequests > 0) { + parent.shardId, connectionStartedAt, subscribeToShardId, parent.availableQueueSpace); + if (parent.availableQueueSpace > 0) { request(1); } }