diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java index 6789cc6ca..60197b63a 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java @@ -265,6 +265,11 @@ Throwable healthCheck() { if (subscriber != null) { subscriber.cancel(); } + // + // Set the last request time to now, we specifically don't null it out since we want it to trigger a + // restart if the subscription still doesn't start producing. + // + lastRequestTime = Instant.now(); startSubscriptions(); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index 8d9ede589..978cd9f97 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -63,7 +63,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { private boolean isFirstConnection = true; private Subscriber subscriber; - private long outstandingRequests = 0; + private long availableQueueSpace = 0; @Override public void start(ExtendedSequenceNumber extendedSequenceNumber, @@ -129,8 +129,8 @@ private void errorOccurred(RecordFlow triggeringFlow, Throwable t) { shardId, flow.connectionStartedAt, flow.subscribeToShardId, category, t); flow.cancel(); } - log.debug("{}: outstandingRequests zeroing from {}", shardId, outstandingRequests); - outstandingRequests = 0; + log.debug("{}: availableQueueSpace zeroing from {}", shardId, availableQueueSpace); + availableQueueSpace = 0; try { handleFlowError(t); @@ -225,13 +225,15 @@ private void recordsReceived(RecordFlow triggeringFlow, SubscribeToShardEvent re errorOccurred(triggeringFlow, t); } - if (outstandingRequests > 0) { - outstandingRequests--; - triggeringFlow.request(1); - } else { + if (availableQueueSpace <= 0) { log.debug( - "{}: [SubscriptionLifetime] (FanOutRecordsPublisher#recordsReceived) @ {} id: {} -- Attempted to decrement outstandingRequests to below 0", + "{}: [SubscriptionLifetime] (FanOutRecordsPublisher#recordsReceived) @ {} id: {} -- Attempted to decrement availableQueueSpace to below 0", shardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId); + } else { + availableQueueSpace--; + if (availableQueueSpace > 0) { + triggeringFlow.request(1); + } } } } @@ -305,25 +307,51 @@ public void subscribe(Subscriber s) { subscriber.onSubscribe(new Subscription() { @Override public void request(long n) { - long previous = outstandingRequests; - outstandingRequests += n; - if (previous <= 0) { - flow.request(1); + synchronized (lockObject) { + if (subscriber != s) { + log.warn( + "{}: (FanOutRecordsPublisher/Subscription#request) - Rejected an attempt to request({}), because subscribers don't match.", + shardId, n); + return; + } + if (flow == null) { + // + // Flow has been terminated, so we can't make any requests on it anymore. + // + log.debug( + "{}: (FanOutRecordsPublisher/Subscription#request) - Request called for a null flow.", + shardId); + errorOccurred(flow, new IllegalStateException("Attempted to request on a null flow.")); + return; + } + long previous = availableQueueSpace; + availableQueueSpace += n; + if (previous <= 0) { + flow.request(1); + } } } @Override public void cancel() { synchronized (lockObject) { + if (subscriber != s) { + log.warn( + "{}: (FanOutRecordsPublisher/Subscription#cancel) - Rejected attempt to cancel subscription, because subscribers don't match.", + shardId); + return; + } if (!hasValidSubscriber()) { - log.warn("{}: Cancelled called even with an invalid subscriber", shardId); + log.warn( + "{}: (FanOutRecordsPublisher/Subscription#cancel) - Cancelled called even with an invalid subscriber", + shardId); } subscriber = null; if (flow != null) { log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher/Subscription#cancel) @ {} id: {}", shardId, flow.connectionStartedAt, flow.subscribeToShardId); flow.cancel(); - outstandingRequests = 0; + availableQueueSpace = 0; } } } @@ -565,8 +593,8 @@ public void onSubscribe(Subscription s) { } log.debug( "{}: [SubscriptionLifetime]: (RecordSubscription#onSubscribe) @ {} id: {} -- Outstanding: {} items so requesting an item", - parent.shardId, connectionStartedAt, subscribeToShardId, parent.outstandingRequests); - if (parent.outstandingRequests > 0) { + parent.shardId, connectionStartedAt, subscribeToShardId, parent.availableQueueSpace); + if (parent.availableQueueSpace > 0) { request(1); } }