Skip to content

Commit 3086aa0

Browse files
committed
Remove client connections from TcpTransport (elastic#31886)
This is related to elastic#31835. This commit adds a connection manager that manages client connections to other nodes. This means that the TcpTransport no longer maintains a map of nodes that it is connected to.
1 parent b3a60e3 commit 3086aa0

File tree

43 files changed

+1610
-1194
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1610
-1194
lines changed

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -276,8 +276,12 @@ protected NettyTcpChannel bind(String name, InetSocketAddress address) {
276276
return esChannel;
277277
}
278278

279-
ScheduledPing getPing() {
280-
return scheduledPing;
279+
long successfulPingCount() {
280+
return successfulPings.count();
281+
}
282+
283+
long failedPingCount() {
284+
return failedPings.count();
281285
}
282286

283287
@Override

modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4ScheduledPingTests.java

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -63,14 +63,14 @@ public void testScheduledPing() throws Exception {
6363
final Netty4Transport nettyA = new Netty4Transport(settings, threadPool, new NetworkService(Collections.emptyList()),
6464
BigArrays.NON_RECYCLING_INSTANCE, registry, circuitBreakerService);
6565
MockTransportService serviceA = new MockTransportService(settings, nettyA, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
66-
null);
66+
null);
6767
serviceA.start();
6868
serviceA.acceptIncomingRequests();
6969

7070
final Netty4Transport nettyB = new Netty4Transport(settings, threadPool, new NetworkService(Collections.emptyList()),
7171
BigArrays.NON_RECYCLING_INSTANCE, registry, circuitBreakerService);
7272
MockTransportService serviceB = new MockTransportService(settings, nettyB, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
73-
null);
73+
null);
7474

7575
serviceB.start();
7676
serviceB.acceptIncomingRequests();
@@ -82,22 +82,19 @@ public void testScheduledPing() throws Exception {
8282
serviceB.connectToNode(nodeA);
8383

8484
assertBusy(() -> {
85-
assertThat(nettyA.getPing().getSuccessfulPings(), greaterThan(100L));
86-
assertThat(nettyB.getPing().getSuccessfulPings(), greaterThan(100L));
85+
assertThat(nettyA.successfulPingCount(), greaterThan(100L));
86+
assertThat(nettyB.successfulPingCount(), greaterThan(100L));
8787
});
88-
assertThat(nettyA.getPing().getFailedPings(), equalTo(0L));
89-
assertThat(nettyB.getPing().getFailedPings(), equalTo(0L));
88+
assertThat(nettyA.failedPingCount(), equalTo(0L));
89+
assertThat(nettyB.failedPingCount(), equalTo(0L));
9090

9191
serviceA.registerRequestHandler("internal:sayHello", TransportRequest.Empty::new, ThreadPool.Names.GENERIC,
92-
new TransportRequestHandler<TransportRequest.Empty>() {
93-
@Override
94-
public void messageReceived(TransportRequest.Empty request, TransportChannel channel) {
95-
try {
96-
channel.sendResponse(TransportResponse.Empty.INSTANCE, TransportResponseOptions.EMPTY);
97-
} catch (IOException e) {
98-
logger.error("Unexpected failure", e);
99-
fail(e.getMessage());
100-
}
92+
(request, channel) -> {
93+
try {
94+
channel.sendResponse(TransportResponse.Empty.INSTANCE, TransportResponseOptions.EMPTY);
95+
} catch (IOException e) {
96+
logger.error("Unexpected failure", e);
97+
fail(e.getMessage());
10198
}
10299
});
103100

@@ -129,11 +126,11 @@ public void handleException(TransportException exp) {
129126
}
130127

131128
assertBusy(() -> {
132-
assertThat(nettyA.getPing().getSuccessfulPings(), greaterThan(200L));
133-
assertThat(nettyB.getPing().getSuccessfulPings(), greaterThan(200L));
129+
assertThat(nettyA.successfulPingCount(), greaterThan(200L));
130+
assertThat(nettyB.successfulPingCount(), greaterThan(200L));
134131
});
135-
assertThat(nettyA.getPing().getFailedPings(), equalTo(0L));
136-
assertThat(nettyB.getPing().getFailedPings(), equalTo(0L));
132+
assertThat(nettyA.failedPingCount(), equalTo(0L));
133+
assertThat(nettyB.failedPingCount(), equalTo(0L));
137134

138135
Releasables.close(serviceA, serviceB);
139136
terminate(threadPool);

server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public void onFailure(Exception e) {
101101
}
102102

103103
@Override
104-
protected void doRun() throws Exception {
104+
protected void doRun() {
105105
try (Releasable ignored = nodeLocks.acquire(node)) {
106106
validateAndConnectIfNeeded(node);
107107
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.transport;
21+
22+
import org.elasticsearch.action.ActionListener;
23+
import org.elasticsearch.common.concurrent.CompletableContext;
24+
25+
26+
/**
27+
* Abstract Transport.Connection that provides common close logic.
28+
*/
29+
public abstract class CloseableConnection implements Transport.Connection {
30+
31+
private final CompletableContext<Void> closeContext = new CompletableContext<>();
32+
33+
@Override
34+
public void addCloseListener(ActionListener<Void> listener) {
35+
closeContext.addListener(ActionListener.toBiConsumer(listener));
36+
}
37+
38+
@Override
39+
public boolean isClosed() {
40+
return closeContext.isDone();
41+
}
42+
43+
@Override
44+
public void close() {
45+
// This method is safe to call multiple times as the close context will provide concurrency
46+
// protection and only be completed once. The attached listeners will only be notified once.
47+
closeContext.complete(null);
48+
}
49+
}

0 commit comments

Comments
 (0)