Skip to content

Commit

Permalink
Escalate visibility of isQueueRunning method (spring-attic/spring-clo…
Browse files Browse the repository at this point in the history
…ud-aws#707)

Fixes spring-attic/spring-cloud-aws#350
Closes spring-attic/spring-cloud-aws#351

Co-authored-by: ilukyanovich <ilya.lukyanovich@dataart.com>
  • Loading branch information
maciejwalkowiak and L00kian authored Nov 4, 2020
1 parent 053af9d commit 1d00046
Showing 1 changed file with 13 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ public boolean isRunning(String logicalQueueName) {
}

protected void startQueue(String queueName, QueueAttributes queueAttributes) {
if (this.runningStateByQueue.containsKey(queueName) && this.runningStateByQueue.get(queueName)) {
if (isQueueRunning(queueName)) {
return;
}

Expand All @@ -279,6 +279,16 @@ protected void startQueue(String queueName, QueueAttributes queueAttributes) {
this.scheduledFutureByQueue.put(queueName, future);
}

protected boolean isQueueRunning(String logicalQueueName) {
if (this.runningStateByQueue.containsKey(logicalQueueName)) {
return this.runningStateByQueue.get(logicalQueueName);
}
else {
getLogger().warn("Stopped queue '" + logicalQueueName + "' because it was not listed as running queue.");
return false;
}
}

private static final class SignalExecutingRunnable implements Runnable {

private final CountDownLatch countDownLatch;
Expand Down Expand Up @@ -315,13 +325,13 @@ private AsynchronousMessageListener(String logicalQueueName, QueueAttributes que

@Override
public void run() {
while (isQueueRunning()) {
while (isQueueRunning(this.logicalQueueName)) {
try {
ReceiveMessageResult receiveMessageResult = getAmazonSqs()
.receiveMessage(this.queueAttributes.getReceiveMessageRequest());
CountDownLatch messageBatchLatch = new CountDownLatch(receiveMessageResult.getMessages().size());
for (Message message : receiveMessageResult.getMessages()) {
if (isQueueRunning()) {
if (isQueueRunning(this.logicalQueueName)) {
MessageExecutor messageExecutor = new MessageExecutor(this.logicalQueueName, message,
this.queueAttributes);
getTaskExecutor().execute(new SignalExecutingRunnable(messageBatchLatch, messageExecutor));
Expand Down Expand Up @@ -353,17 +363,6 @@ public void run() {
SimpleMessageListenerContainer.this.scheduledFutureByQueue.remove(this.logicalQueueName);
}

private boolean isQueueRunning() {
if (SimpleMessageListenerContainer.this.runningStateByQueue.containsKey(this.logicalQueueName)) {
return SimpleMessageListenerContainer.this.runningStateByQueue.get(this.logicalQueueName);
}
else {
getLogger().warn(
"Stopped queue '" + this.logicalQueueName + "' because it was not listed as running queue.");
return false;
}
}

}

private final class MessageExecutor implements Runnable {
Expand Down

0 comments on commit 1d00046

Please sign in to comment.