Skip to content

Commit 0b5e77f

Browse files
committed
Fix race condition on connection closing
The broker can close a connection abruptly (in case of re-authentication fails) and the application can call Client#close() at the same time. There can be a race condition whereby the call to close() triggers a regular closing initiated by the client but the Netty channel is already inactive. The call to close() then blocks until it reaches the RPC timeout. This commit makes sure the channel is still active before trying to send the close command to the broker.
1 parent 63b0497 commit 0b5e77f

File tree

1 file changed

+16
-14
lines changed

1 file changed

+16
-14
lines changed

src/main/java/com/rabbitmq/stream/impl/Client.java

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ public long applyAsLong(Object value) {
184184
private final Map<String, String> connectionProperties;
185185
private final Duration rpcTimeout;
186186
private final List<String> saslMechanisms;
187-
private volatile ShutdownReason shutdownReason = null;
187+
private AtomicReference<ShutdownReason> shutdownReason = new AtomicReference<>();
188188
private final Runnable streamStatsCommandVersionsCheck;
189189
private final boolean filteringSupported;
190190
private final Runnable superStreamManagementCommandVersionsCheck;
@@ -1435,17 +1435,21 @@ public Response unsubscribe(byte subscriptionId) {
14351435

14361436
public void close() {
14371437
if (closing.compareAndSet(false, true)) {
1438-
LOGGER.debug("Closing client");
1439-
1440-
sendClose(RESPONSE_CODE_OK, "OK");
1441-
1442-
closingSequence(ShutdownContext.ShutdownReason.CLIENT_CLOSE);
1443-
1438+
LOGGER.debug("Closing client, channel still active? {}", this.channel.isActive());
1439+
ShutdownReason reason;
1440+
if (this.channel.isActive()) {
1441+
sendClose(RESPONSE_CODE_OK, "OK");
1442+
reason = ShutdownReason.CLIENT_CLOSE;
1443+
} else {
1444+
reason = ShutdownReason.UNKNOWN;
1445+
}
1446+
closingSequence(reason);
14441447
LOGGER.debug("Client closed");
14451448
}
14461449
}
14471450

14481451
void closingSequence(ShutdownContext.ShutdownReason reason) {
1452+
this.shutdownReason(reason);
14491453
if (reason != null) {
14501454
this.shutdownListenerCallback.accept(reason);
14511455
}
@@ -1713,7 +1717,7 @@ public void consumerUpdateResponse(
17131717
}
17141718

17151719
void shutdownReason(ShutdownReason reason) {
1716-
this.shutdownReason = reason;
1720+
this.shutdownReason.compareAndSet(null, reason);
17171721
}
17181722

17191723
public SocketAddress localAddress() {
@@ -2858,16 +2862,14 @@ public void channelInactive(ChannelHandlerContext ctx) {
28582862
// the event is actually dispatched to the listener, emitting
28592863
// an UNKNOWN reason instead of SERVER_CLOSE. So we skip the closing here
28602864
// because it will be handled later anyway.
2861-
if (shutdownReason == null) {
2865+
if (shutdownReason.get() == null) {
2866+
LOGGER.debug("No shutdown reason");
28622867
if (closing.compareAndSet(false, true)) {
2868+
LOGGER.debug("Closing with 'unknown' shutdown reason");
28632869
if (executorService == null) {
28642870
// the TCP connection is closed before the state is initialized
28652871
// we do our best the execute the closing sequence
2866-
new Thread(
2867-
() -> {
2868-
closingSequence(ShutdownReason.UNKNOWN);
2869-
})
2870-
.start();
2872+
new Thread(() -> closingSequence(ShutdownReason.UNKNOWN)).start();
28712873
} else {
28722874
executorService.submit(() -> closingSequence(ShutdownReason.UNKNOWN));
28732875
}

0 commit comments

Comments
 (0)