Skip to content

Commit

Permalink
QPIDJMS-600 Ensure session and connection close await async sends
Browse files Browse the repository at this point in the history
Session and Connection close should be awaiting the outcome of async send
completions before returning. This change allows them to await up to the
close timeout value before moving on and failing any completions that are
not completed after that point. Several tests added to cover this behavior.

(cherry picked from commit 90eb60f)
  • Loading branch information
tabish121 committed Sep 20, 2024
1 parent 49242ac commit 408fec7
Show file tree
Hide file tree
Showing 11 changed files with 365 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -916,6 +916,14 @@ void pull(JmsConsumerId consumerId, long timeout, ProviderSynchronization synchr
}
}

ProviderFuture newProviderFuture() {
return newProviderFuture(null);
}

ProviderFuture newProviderFuture(ProviderSynchronization synchronization) {
return provider.newProviderFuture(synchronization);
}

//----- Property setters and getters -------------------------------------//

@Override
Expand Down
65 changes: 49 additions & 16 deletions qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
private final ReentrantLock sendLock = new ReentrantLock();
private volatile ThreadPoolExecutor deliveryExecutor;
private volatile ThreadPoolExecutor completionExcecutor;
private volatile ProviderFuture asyncSendsCompletion;
private AtomicReference<Thread> deliveryThread = new AtomicReference<Thread>();
private boolean deliveryThreadCheckEnabled = true;
private AtomicReference<Thread> completionThread = new AtomicReference<Thread>();
Expand Down Expand Up @@ -351,6 +352,7 @@ protected boolean shutdown(Throwable cause) throws JMSException {
for (JmsMessageProducer producer : new ArrayList<JmsMessageProducer>(this.producers.values())) {
producer.shutdown(cause);
}

} catch (JMSException jmsEx) {
shutdownError = jmsEx;
}
Expand All @@ -367,30 +369,52 @@ protected boolean shutdown(Throwable cause) throws JMSException {
}
}

// Ensure that no asynchronous completion sends remain blocked after close.
try {
if (getSessionMode() == Session.CLIENT_ACKNOWLEDGE) {
acknowledge(ACK_TYPE.SESSION_SHUTDOWN);
}
} catch (Exception e) {
LOG.trace("Exception during session shutdown cleanup acknowledgement", e);
}

// Ensure that no asynchronous completion sends remain blocked after close but wait
// using the close timeout for the asynchronous sends to complete normally.
final ExecutorService completionExecutor = getCompletionExecutor();

synchronized (sessionInfo) {
// Producers are now quiesced and we can await completion of asynchronous sends
// that are still pending a result or timeout once we've done a quick check to
// see if any are actually pending or have completed already.
asyncSendsCompletion = connection.newProviderFuture();

completionExecutor.execute(() -> {
if (asyncSendQueue.isEmpty()) {
asyncSendsCompletion.onSuccess();
}
});
}

try {
asyncSendsCompletion.sync(connection.getCloseTimeout(), TimeUnit.MILLISECONDS);
} catch (Exception ex) {
LOG.trace("Exception during wait for asynchronous sends to complete", ex);
} finally {
if (cause == null) {
cause = new JMSException("Session closed remotely before message transfer result was notified");
}

getCompletionExecutor().execute(new FailOrCompleteAsyncCompletionsTask(JmsExceptionSupport.create(cause)));
getCompletionExecutor().shutdown();
// as a last task we want to fail any stragglers in the asynchronous send queue and then
// shutdown the queue to prevent any more submissions while the cleanup goes on.
completionExecutor.execute(new FailOrCompleteAsyncCompletionsTask(JmsExceptionSupport.create(cause)));
completionExecutor.shutdown();
}

try {
getCompletionExecutor().awaitTermination(connection.getCloseTimeout(), TimeUnit.MILLISECONDS);
completionExecutor.awaitTermination(connection.getCloseTimeout(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.trace("Session close awaiting send completions was interrupted");
}

try {
if (getSessionMode() == Session.CLIENT_ACKNOWLEDGE) {
acknowledge(ACK_TYPE.SESSION_SHUTDOWN);
}
} catch (Exception e) {
LOG.trace("Exception during session shutdown cleanup acknowledgement", e);
}

if (shutdownError != null) {
throw shutdownError;
}
Expand Down Expand Up @@ -856,11 +880,12 @@ protected void send(JmsMessageProducer producer, Destination dest, Message msg,
}

private void send(JmsMessageProducer producer, JmsDestination destination, Message original, int deliveryMode, int priority, long timeToLive, boolean disableMsgId, boolean disableTimestamp, long deliveryDelay, CompletionListener listener) throws JMSException {
sendLock.lock();

JmsMessage outbound = null;
sendLock.lock();

try {
checkClosed();

original.setJMSDeliveryMode(deliveryMode);
original.setJMSPriority(priority);
original.setJMSRedelivered(false);
Expand Down Expand Up @@ -909,7 +934,7 @@ private void send(JmsMessageProducer producer, JmsDestination destination, Messa
}

outbound.getFacade().setDeliveryTime(deliveryTime, hasDelay);
if(!isJmsMessage) {
if (!isJmsMessage) {
// If the original was a foreign message, we still need to update it too.
setForeignMessageDeliveryTime(original, deliveryTime);
}
Expand Down Expand Up @@ -977,7 +1002,7 @@ public void onPendingFailure(ProviderException cause) {
}
} catch (JMSException jmsEx) {
// Ensure that on failure case the message is returned to usable state for another send attempt.
if(outbound != null) {
if (outbound != null) {
outbound.onSendComplete();
}
throw jmsEx;
Expand Down Expand Up @@ -1511,6 +1536,10 @@ public void run() {
if (producerId == null) {
asyncSendQueue.clear();
}

if (closed.get() && asyncSendsCompletion != null && asyncSendQueue.isEmpty()) {
asyncSendsCompletion.onSuccess();
}
}
}

Expand Down Expand Up @@ -1577,6 +1606,10 @@ public void run() {
}
}
}

if (closed.get() && asyncSendsCompletion != null && asyncSendQueue.isEmpty()) {
asyncSendsCompletion.onSuccess();
}
} catch (Exception ex) {
LOG.error("Async completion task encountered unexpected failure", ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ public void testAsyncSendDoesNotMarksBytesMessageReadOnly() throws Exception {
@Timeout(20)
public void testAsyncCompletionSendMarksBytesMessageReadOnly() throws Exception {
try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer, "jms.closeTimeout=50");
connection.setSendTimeout(15000);

testPeer.expectBegin();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ public void testAsyncSendDoesNotMarkMapMessageReadOnly() throws Exception {
@Timeout(20)
public void testAsyncCompletionSendMarksMapMessageReadOnly() throws Exception {
try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer, "jms.closeTimeout=50");
connection.setSendTimeout(15000);

testPeer.expectBegin();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2231,7 +2231,7 @@ public void testAsyncSendDoesNotMarkMessageReadOnly() throws Exception {
@Timeout(20)
public void testAsyncCompletionSendMarksMessageReadOnly() throws Exception {
try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer, "jms.closeTimeout=50");
connection.setSendTimeout(15000);

testPeer.expectBegin();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,7 @@ public void testAsyncCompletionSendMarksObjectMessageReadOnly() throws Exception
try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
connection.setSendTimeout(15000);
connection.setCloseTimeout(10);

testPeer.expectBegin();

Expand Down
Loading

0 comments on commit 408fec7

Please sign in to comment.