Skip to content

Commit 70af3e2

Browse files
committed
Fix the failing tests due to connection profile missing STREAM type
Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com>
1 parent 46afedd commit 70af3e2

File tree

4 files changed

+29
-9
lines changed

4 files changed

+29
-9
lines changed

server/src/main/java/org/opensearch/transport/ConnectionProfile.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,8 @@ public static ConnectionProfile buildDefaultConnectionProfile(Settings settings)
112112
// if we are not a data-node we don't need any dedicated channels for recovery
113113
builder.addConnections(DiscoveryNode.isDataNode(settings) ? connectionsPerNodeRecovery : 0, TransportRequestOptions.Type.RECOVERY);
114114
builder.addConnections(connectionsPerNodeReg, TransportRequestOptions.Type.REG);
115-
// TODO use different setting for connectionsPerNodeReg for stream request
116-
builder.addConnections(connectionsPerNodeReg, TransportRequestOptions.Type.STREAM);
115+
// we build a single channel profile with only supported type as STREAM for stream transport defined in StreamTransportService
116+
builder.addConnections(0, TransportRequestOptions.Type.STREAM);
117117
return builder.build();
118118
}
119119

server/src/main/java/org/opensearch/transport/StreamTransportService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public Transport.Connection getConnection(DiscoveryNode node) {
113113
try {
114114
return connectionManager.getConnection(node);
115115
} catch (Exception e) {
116-
logger.error("Failed to get streaming connection to node [{}]", node, e);
116+
logger.error("Failed to get streaming connection to node [{}]: {}", node, e.getMessage());
117117
throw new ConnectTransportException(node, "Failed to get streaming connection", e);
118118
}
119119
}

server/src/test/java/org/opensearch/transport/ConnectionProfileTests.java

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,19 +77,23 @@ public void testBuildConnectionProfile() {
7777
builder.addConnections(2, TransportRequestOptions.Type.STATE, TransportRequestOptions.Type.RECOVERY);
7878
builder.addConnections(3, TransportRequestOptions.Type.PING);
7979
IllegalStateException illegalStateException = expectThrows(IllegalStateException.class, builder::build);
80-
assertEquals("not all types are added for this connection profile - missing types: [REG]", illegalStateException.getMessage());
80+
assertEquals(
81+
"not all types are added for this connection profile - missing types: [REG, STREAM]",
82+
illegalStateException.getMessage()
83+
);
8184

8285
IllegalArgumentException illegalArgumentException = expectThrows(
8386
IllegalArgumentException.class,
8487
() -> builder.addConnections(4, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING)
8588
);
8689
assertEquals("type [PING] is already registered", illegalArgumentException.getMessage());
8790
builder.addConnections(4, TransportRequestOptions.Type.REG);
91+
builder.addConnections(1, TransportRequestOptions.Type.STREAM);
8892
ConnectionProfile build = builder.build();
8993
if (randomBoolean()) {
9094
build = new ConnectionProfile.Builder(build).build();
9195
}
92-
assertEquals(10, build.getNumConnections());
96+
assertEquals(11, build.getNumConnections());
9397
if (setConnectTimeout) {
9498
assertEquals(connectTimeout, build.getConnectTimeout());
9599
} else {
@@ -114,12 +118,12 @@ public void testBuildConnectionProfile() {
114118
assertNull(build.getPingInterval());
115119
}
116120

117-
List<Integer> list = new ArrayList<>(10);
118-
for (int i = 0; i < 10; i++) {
121+
List<Integer> list = new ArrayList<>(11);
122+
for (int i = 0; i < 11; i++) {
119123
list.add(i);
120124
}
121125
final int numIters = randomIntBetween(5, 10);
122-
assertEquals(4, build.getHandles().size());
126+
assertEquals(5, build.getHandles().size());
123127
assertEquals(0, build.getHandles().get(0).offset);
124128
assertEquals(1, build.getHandles().get(0).length);
125129
assertEquals(EnumSet.of(TransportRequestOptions.Type.BULK), build.getHandles().get(0).getTypes());
@@ -155,11 +159,20 @@ public void testBuildConnectionProfile() {
155159
assertThat(channel, Matchers.anyOf(Matchers.is(6), Matchers.is(7), Matchers.is(8), Matchers.is(9)));
156160
}
157161

162+
assertEquals(10, build.getHandles().get(4).offset);
163+
assertEquals(1, build.getHandles().get(4).length);
164+
assertEquals(EnumSet.of(TransportRequestOptions.Type.STREAM), build.getHandles().get(4).getTypes());
165+
channel = build.getHandles().get(4).getChannel(list);
166+
for (int i = 0; i < numIters; i++) {
167+
assertEquals(10, channel.intValue());
168+
}
169+
158170
assertEquals(3, build.getNumConnectionsPerType(TransportRequestOptions.Type.PING));
159171
assertEquals(4, build.getNumConnectionsPerType(TransportRequestOptions.Type.REG));
160172
assertEquals(2, build.getNumConnectionsPerType(TransportRequestOptions.Type.STATE));
161173
assertEquals(2, build.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY));
162174
assertEquals(1, build.getNumConnectionsPerType(TransportRequestOptions.Type.BULK));
175+
assertEquals(1, build.getNumConnectionsPerType(TransportRequestOptions.Type.STREAM));
163176
}
164177

165178
public void testNoChannels() {
@@ -169,7 +182,8 @@ public void testNoChannels() {
169182
TransportRequestOptions.Type.BULK,
170183
TransportRequestOptions.Type.STATE,
171184
TransportRequestOptions.Type.RECOVERY,
172-
TransportRequestOptions.Type.REG
185+
TransportRequestOptions.Type.REG,
186+
TransportRequestOptions.Type.STREAM
173187
);
174188
builder.addConnections(0, TransportRequestOptions.Type.PING);
175189
ConnectionProfile build = builder.build();
@@ -188,6 +202,7 @@ public void testConnectionProfileResolve() {
188202
builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.REG);
189203
builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.STATE);
190204
builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.PING);
205+
builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.STREAM);
191206

192207
final boolean connectionTimeoutSet = randomBoolean();
193208
if (connectionTimeoutSet) {
@@ -235,6 +250,7 @@ public void testDefaultConnectionProfile() {
235250
assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE));
236251
assertEquals(2, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY));
237252
assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK));
253+
assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STREAM));
238254
assertEquals(TransportSettings.CONNECT_TIMEOUT.get(Settings.EMPTY), profile.getConnectTimeout());
239255
assertEquals(TransportSettings.CONNECT_TIMEOUT.get(Settings.EMPTY), profile.getHandshakeTimeout());
240256
assertEquals(TransportSettings.TRANSPORT_COMPRESS.get(Settings.EMPTY), profile.getCompressionEnabled());
@@ -247,6 +263,7 @@ public void testDefaultConnectionProfile() {
247263
assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE));
248264
assertEquals(2, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY));
249265
assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK));
266+
assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STREAM));
250267

251268
profile = ConnectionProfile.buildDefaultConnectionProfile(nonDataNode());
252269
assertEquals(11, profile.getNumConnections());
@@ -255,6 +272,7 @@ public void testDefaultConnectionProfile() {
255272
assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE));
256273
assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY));
257274
assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK));
275+
assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STREAM));
258276

259277
profile = ConnectionProfile.buildDefaultConnectionProfile(
260278
removeRoles(
@@ -267,5 +285,6 @@ public void testDefaultConnectionProfile() {
267285
assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE));
268286
assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY));
269287
assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK));
288+
assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STREAM));
270289
}
271290
}

test/framework/src/main/java/org/opensearch/transport/AbstractSimpleTransportTestCase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2242,6 +2242,7 @@ public void testHandshakeUpdatesVersion() throws IOException {
22422242
TransportRequestOptions.Type.REG,
22432243
TransportRequestOptions.Type.STATE
22442244
);
2245+
builder.addConnections(0, TransportRequestOptions.Type.STREAM);
22452246
try (Transport.Connection connection = serviceA.openConnection(node, builder.build())) {
22462247
assertEquals(version, connection.getVersion());
22472248
}

0 commit comments

Comments
 (0)