Skip to content

Commit

Permalink
Fail listener on exception in TcpTransport#openConnection (#101907) (#…
Browse files Browse the repository at this point in the history
…101955)

Today `TcpTransport#openConnection` may throw exceptions on certain
kinds of failure, but other kinds of failure are passed to the listener.
This is trappy and not all callers handle it correctly. This commit
makes sure that all exceptions are passed to the listener.

Closes #100510
  • Loading branch information
DaveCTurner authored Nov 9, 2023
1 parent 4d7f289 commit e573c1d
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 15 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/101907.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 101907
summary: Fail listener on exception in `TcpTransport#openConnection`
area: Network
type: bug
issues:
- 100510
33 changes: 18 additions & 15 deletions server/src/main/java/org/elasticsearch/transport/TcpTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -306,22 +306,25 @@ protected ConnectionProfile maybeOverrideConnectionProfile(ConnectionProfile con

@Override
public void openConnection(DiscoveryNode node, ConnectionProfile profile, ActionListener<Transport.Connection> listener) {

Objects.requireNonNull(profile, "connection profile cannot be null");
if (node == null) {
throw new ConnectTransportException(null, "can't open connection to a null node");
}
ConnectionProfile finalProfile = maybeOverrideConnectionProfile(profile);
if (closeLock.readLock().tryLock() == false) {
ensureOpen();
assert false : "should not get here ever because close-write-lock should only be held on shutdown";
throw new ConnectTransportException(node, "failed to acquire close-read-lock");
}
try {
ensureOpen();
initiateConnection(node, finalProfile, listener);
} finally {
closeLock.readLock().unlock();
Objects.requireNonNull(profile, "connection profile cannot be null");
if (node == null) {
throw new ConnectTransportException(null, "can't open connection to a null node");
}
final ConnectionProfile finalProfile = maybeOverrideConnectionProfile(profile);
if (closeLock.readLock().tryLock() == false) {
ensureOpen();
assert false : "should not get here ever because close-write-lock should only be held on shutdown";
throw new ConnectTransportException(node, "failed to acquire close-read-lock");
}
try {
ensureOpen();
initiateConnection(node, finalProfile, listener);
} finally {
closeLock.readLock().unlock();
}
} catch (Exception e) {
listener.onFailure(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -850,6 +851,31 @@ public void handleException(TransportException exp) {
}
}

public void testExceptionOnConnect() {
final Transport transportA = serviceA.getOriginalTransport();

final PlainActionFuture<Transport.Connection> nullProfileFuture = new PlainActionFuture<>();
transportA.openConnection(nodeB, null, nullProfileFuture);
assertTrue(nullProfileFuture.isDone());
expectThrows(ExecutionException.class, NullPointerException.class, nullProfileFuture::get);

final ConnectionProfile profile = ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY);
final PlainActionFuture<Transport.Connection> nullNodeFuture = new PlainActionFuture<>();
transportA.openConnection(null, profile, nullNodeFuture);
assertTrue(nullNodeFuture.isDone());
expectThrows(ExecutionException.class, ConnectTransportException.class, nullNodeFuture::get);

serviceA.stop();
assertEquals(Lifecycle.State.STOPPED, transportA.lifecycleState());
serviceA.close();
assertEquals(Lifecycle.State.CLOSED, transportA.lifecycleState());

final PlainActionFuture<Transport.Connection> closedTransportFuture = new PlainActionFuture<>();
transportA.openConnection(nodeB, profile, closedTransportFuture);
assertTrue(closedTransportFuture.isDone());
expectThrows(ExecutionException.class, IllegalStateException.class, closedTransportFuture::get);
}

public void testDisconnectListener() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
TransportConnectionListener disconnectListener = new TransportConnectionListener() {
Expand Down

0 comments on commit e573c1d

Please sign in to comment.