Skip to content

Commit

Permalink
GH-2907: Use CF.closeTimeout for confirms wait
Browse files Browse the repository at this point in the history
Fixes: #2907

Issue link: #2907

The current hard-coded `5 seconds` is not enough in real applications under heavy load

* Fix `CachingConnectionFactory` to use `getCloseTimeout()` for `publisherCallbackChannel.waitForConfirms()`
which is `30 seconds` by default, but can be modified via `CachingConnectionFactory.setCloseTimeout()`

**Auto-cherry-pick to `3.1.x`**
  • Loading branch information
artembilan committed Nov 18, 2024
1 parent d910a8b commit 562bc77
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -479,8 +479,9 @@ protected ExecutorService getExecutorService() {
}

/**
* How long to wait (milliseconds) for a response to a connection close operation from the broker; default 30000 (30
* seconds).
* How long to wait (milliseconds) for a response to a connection close operation from the broker;
* default 30000 (30 seconds).
* Also used for {@link com.rabbitmq.client.Channel#waitForConfirms()}.
* @param closeTimeout the closeTimeout to set.
*/
public void setCloseTimeout(int closeTimeout) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1087,8 +1087,6 @@ public String toString() {

private final class CachedChannelInvocationHandler implements InvocationHandler {

private static final int ASYNC_CLOSE_TIMEOUT = 5_000;

private final ChannelCachingConnectionProxy theConnection;

private final Deque<ChannelProxy> channelList;
Expand Down Expand Up @@ -1302,7 +1300,7 @@ private void returnToCache(ChannelProxy proxy) {
getChannelsExecutor()
.execute(() -> {
try {
publisherCallbackChannel.waitForConfirms(ASYNC_CLOSE_TIMEOUT);
publisherCallbackChannel.waitForConfirms(getCloseTimeout());
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
Expand Down Expand Up @@ -1426,10 +1424,10 @@ private void asyncClose() {
executorService.execute(() -> {
try {
if (ConfirmType.CORRELATED.equals(CachingConnectionFactory.this.confirmType)) {
channel.waitForConfirmsOrDie(ASYNC_CLOSE_TIMEOUT);
channel.waitForConfirmsOrDie(getCloseTimeout());
}
else {
Thread.sleep(ASYNC_CLOSE_TIMEOUT);
Thread.sleep(5_000); // NOSONAR - some time to give the channel a chance to ack
}
}
catch (@SuppressWarnings(UNUSED) InterruptedException e1) {
Expand Down

0 comments on commit 562bc77

Please sign in to comment.