2323import org .elasticsearch .action .ActionListener ;
2424import org .elasticsearch .action .support .PlainActionFuture ;
2525import org .elasticsearch .cluster .node .DiscoveryNode ;
26+ import org .elasticsearch .common .SuppressForbidden ;
2627import org .elasticsearch .common .bytes .BytesReference ;
2728import org .elasticsearch .common .compress .CompressorFactory ;
2829import org .elasticsearch .common .io .stream .BytesStreamOutput ;
3435import org .elasticsearch .common .transport .TransportAddress ;
3536import org .elasticsearch .common .util .PageCacheRecycler ;
3637import org .elasticsearch .indices .breaker .NoneCircuitBreakerService ;
38+ import org .elasticsearch .tasks .TaskManager ;
3739import org .elasticsearch .test .ESTestCase ;
3840import org .elasticsearch .test .VersionUtils ;
3941import org .elasticsearch .threadpool .TestThreadPool ;
4042import org .elasticsearch .threadpool .ThreadPool ;
4143
4244import java .io .IOException ;
4345import java .io .StreamCorruptedException ;
46+ import java .net .InetAddress ;
4447import java .net .InetSocketAddress ;
4548import java .util .ArrayList ;
4649import java .util .concurrent .TimeUnit ;
4750import java .util .concurrent .atomic .AtomicReference ;
4851
49- import static org .hamcrest .Matchers .equalTo ;
5052import static org .hamcrest .core .IsInstanceOf .instanceOf ;
53+ import static org .mockito .Mockito .mock ;
5154
5255/** Unit tests for {@link TcpTransport} */
5356public class TcpTransportTests extends ESTestCase {
@@ -175,11 +178,12 @@ public void testEnsureVersionCompatibility() {
175178 ise .getMessage ());
176179 }
177180
178- public void testCompressRequest () throws IOException {
181+ @ SuppressForbidden (reason = "Allow accessing localhost" )
182+ public void testCompressRequestAndResponse () throws IOException {
179183 final boolean compressed = randomBoolean ();
180184 Req request = new Req (randomRealisticUnicodeOfLengthBetween (10 , 100 ));
181185 ThreadPool threadPool = new TestThreadPool (TcpTransportTests .class .getName ());
182- AtomicReference <BytesReference > messageCaptor = new AtomicReference <>();
186+ AtomicReference <BytesReference > requestCaptor = new AtomicReference <>();
183187 try {
184188 TcpTransport transport = new TcpTransport ("test" , Settings .EMPTY , Version .CURRENT , threadPool ,
185189 PageCacheRecycler .NON_RECYCLING_INSTANCE , new NoneCircuitBreakerService (), null , null ) {
@@ -191,7 +195,7 @@ protected FakeServerChannel bind(String name, InetSocketAddress address) throws
191195
192196 @ Override
193197 protected FakeTcpChannel initiateChannel (DiscoveryNode node ) throws IOException {
194- return new FakeTcpChannel (true , messageCaptor );
198+ return new FakeTcpChannel (false , requestCaptor );
195199 }
196200
197201 @ Override
@@ -206,7 +210,7 @@ public Releasable openConnection(DiscoveryNode node, ConnectionProfile profile,
206210 int numConnections = profile .getNumConnections ();
207211 ArrayList <TcpChannel > fakeChannels = new ArrayList <>(numConnections );
208212 for (int i = 0 ; i < numConnections ; ++i ) {
209- fakeChannels .add (new FakeTcpChannel (false , messageCaptor ));
213+ fakeChannels .add (new FakeTcpChannel (false , requestCaptor ));
210214 }
211215 listener .onResponse (new NodeChannels (node , fakeChannels , profile , Version .CURRENT ));
212216 return () -> CloseableChannel .closeChannels (fakeChannels , false );
@@ -224,11 +228,20 @@ public Releasable openConnection(DiscoveryNode node, ConnectionProfile profile,
224228 transport .openConnection (node , profileBuilder .build (), future );
225229 Transport .Connection connection = future .actionGet ();
226230 connection .sendRequest (42 , "foobar" , request , TransportRequestOptions .EMPTY );
231+ transport .registerRequestHandler (new RequestHandlerRegistry <>("foobar" , Req ::new , mock (TaskManager .class ),
232+ (request1 , channel , task ) -> channel .sendResponse (TransportResponse .Empty .INSTANCE ), ThreadPool .Names .SAME ,
233+ true , true ));
227234
228- BytesReference reference = messageCaptor .get ();
235+ BytesReference reference = requestCaptor .get ();
229236 assertNotNull (reference );
230237
231- StreamInput streamIn = reference .streamInput ();
238+ AtomicReference <BytesReference > responseCaptor = new AtomicReference <>();
239+ InetSocketAddress address = new InetSocketAddress (InetAddress .getLocalHost (), 0 );
240+ FakeTcpChannel responseChannel = new FakeTcpChannel (true , address , address , responseCaptor );
241+ transport .messageReceived (reference .slice (6 , reference .length () - 6 ), responseChannel );
242+
243+
244+ StreamInput streamIn = responseCaptor .get ().streamInput ();
232245 streamIn .skip (TcpHeader .MARKER_BYTES_SIZE );
233246 int len = streamIn .readInt ();
234247 long requestId = streamIn .readLong ();
@@ -237,17 +250,14 @@ public Releasable openConnection(DiscoveryNode node, ConnectionProfile profile,
237250 Version version = Version .fromId (streamIn .readInt ());
238251 assertEquals (Version .CURRENT , version );
239252 assertEquals (compressed , TransportStatus .isCompress (status ));
253+ assertFalse (TransportStatus .isRequest (status ));
240254 if (compressed ) {
241255 final int bytesConsumed = TcpHeader .HEADER_SIZE ;
242256 streamIn = CompressorFactory .compressor (reference .slice (bytesConsumed , reference .length () - bytesConsumed ))
243257 .streamInput (streamIn );
244258 }
245259 threadPool .getThreadContext ().readHeaders (streamIn );
246- assertThat (streamIn .readStringArray (), equalTo (new String [0 ])); // features
247- assertEquals ("foobar" , streamIn .readString ());
248- Req readReq = new Req ("" );
249- readReq .readFrom (streamIn );
250- assertEquals (request .value , readReq .value );
260+ TransportResponse .Empty .INSTANCE .readFrom (streamIn );
251261
252262 } finally {
253263 ThreadPool .terminate (threadPool , 10 , TimeUnit .SECONDS );
@@ -287,6 +297,10 @@ private Req(String value) {
287297 this .value = value ;
288298 }
289299
300+ private Req (StreamInput in ) throws IOException {
301+ value = in .readString ();
302+ }
303+
290304 @ Override
291305 public void readFrom (StreamInput in ) throws IOException {
292306 value = in .readString ();
0 commit comments