@@ -195,7 +195,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
195195 // this lock is here to make sure we close this transport and disconnect all the client nodes
196196 // connections while no connect operations is going on
197197 private final ReadWriteLock closeLock = new ReentrantReadWriteLock ();
198- private final boolean compressResponses ;
198+ private final boolean compressAllResponses ;
199199 private volatile BoundTransportAddress boundAddress ;
200200 private final String transportName ;
201201
@@ -220,16 +220,16 @@ public TcpTransport(String transportName, Settings settings, Version version, T
220220 this .pageCacheRecycler = pageCacheRecycler ;
221221 this .circuitBreakerService = circuitBreakerService ;
222222 this .namedWriteableRegistry = namedWriteableRegistry ;
223- this .compressResponses = Transport .TRANSPORT_TCP_COMPRESS .get (settings );
223+ this .compressAllResponses = Transport .TRANSPORT_TCP_COMPRESS .get (settings );
224224 this .networkService = networkService ;
225225 this .transportName = transportName ;
226226 this .transportLogger = new TransportLogger ();
227227 this .handshaker = new TransportHandshaker (version , threadPool ,
228228 (node , channel , requestId , v ) -> sendRequestToChannel (node , channel , requestId ,
229229 TransportHandshaker .HANDSHAKE_ACTION_NAME , new TransportHandshaker .HandshakeRequest (version ),
230- TransportRequestOptions .EMPTY , v , TransportStatus .setHandshake ((byte ) 0 )),
230+ TransportRequestOptions .EMPTY , v , false , TransportStatus .setHandshake ((byte ) 0 )),
231231 (v , features , channel , response , requestId ) -> sendResponse (v , features , channel , response , requestId ,
232- TransportHandshaker .HANDSHAKE_ACTION_NAME , TransportResponseOptions . EMPTY , TransportStatus .setHandshake ((byte ) 0 )));
232+ TransportHandshaker .HANDSHAKE_ACTION_NAME , false , TransportStatus .setHandshake ((byte ) 0 )));
233233 this .keepAlive = new TransportKeepAlive (threadPool , this ::internalSendMessage );
234234 this .nodeName = Node .NODE_NAME_SETTING .get (settings );
235235
@@ -337,11 +337,7 @@ public void sendRequest(long requestId, String action, TransportRequest request,
337337 throw new NodeNotConnectedException (node , "connection already closed" );
338338 }
339339 TcpChannel channel = channel (options .type ());
340-
341- if (compress ) {
342- options = TransportRequestOptions .builder (options ).withCompress (true ).build ();
343- }
344- sendRequestToChannel (this .node , channel , requestId , action , request , options , getVersion (), (byte ) 0 );
340+ sendRequestToChannel (this .node , channel , requestId , action , request , options , getVersion (), compress , (byte ) 0 );
345341 }
346342 }
347343
@@ -768,11 +764,11 @@ private boolean canCompress(TransportRequest request) {
768764
769765 private void sendRequestToChannel (final DiscoveryNode node , final TcpChannel channel , final long requestId , final String action ,
770766 final TransportRequest request , TransportRequestOptions options , Version channelVersion ,
771- byte status ) throws IOException , TransportException {
767+ boolean compressRequest , byte status ) throws IOException , TransportException {
772768
773769 // only compress if asked and the request is not bytes. Otherwise only
774770 // the header part is compressed, and the "body" can't be extracted as compressed
775- final boolean compressMessage = options . compress () && canCompress (request );
771+ final boolean compressMessage = compressRequest && canCompress (request );
776772
777773 status = TransportStatus .setRequest (status );
778774 ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput (bigArrays );
@@ -871,8 +867,8 @@ public void sendResponse(
871867 final TransportResponse response ,
872868 final long requestId ,
873869 final String action ,
874- final TransportResponseOptions options ) throws IOException {
875- sendResponse (nodeVersion , features , channel , response , requestId , action , options , (byte ) 0 );
870+ final boolean compress ) throws IOException {
871+ sendResponse (nodeVersion , features , channel , response , requestId , action , compress , (byte ) 0 );
876872 }
877873
878874 private void sendResponse (
@@ -882,29 +878,26 @@ private void sendResponse(
882878 final TransportResponse response ,
883879 final long requestId ,
884880 final String action ,
885- TransportResponseOptions options ,
881+ boolean compress ,
886882 byte status ) throws IOException {
887- if (compressResponses && options .compress () == false ) {
888- options = TransportResponseOptions .builder (options ).withCompress (true ).build ();
889- }
883+ boolean compressMessage = compress || compressAllResponses ;
890884
891885 status = TransportStatus .setResponse (status );
892886 ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput (bigArrays );
893- CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream (bStream , options . compress () );
887+ CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream (bStream , compressMessage );
894888 boolean addedReleaseListener = false ;
895889 try {
896- if (options . compress () ) {
890+ if (compressMessage ) {
897891 status = TransportStatus .setCompress (status );
898892 }
899893 threadPool .getThreadContext ().writeTo (stream );
900894 stream .setVersion (nodeVersion );
901895 stream .setFeatures (features );
902896 BytesReference message = buildMessage (requestId , status , nodeVersion , response , stream );
903897
904- final TransportResponseOptions finalOptions = options ;
905898 // this might be called in a different thread
906899 ReleaseListener releaseListener = new ReleaseListener (stream ,
907- () -> messageListener .onResponseSent (requestId , action , response , finalOptions ));
900+ () -> messageListener .onResponseSent (requestId , action , response ));
908901 internalSendMessage (channel , message , releaseListener );
909902 addedReleaseListener = true ;
910903 } finally {
@@ -1530,9 +1523,9 @@ public void onRequestReceived(long requestId, String action) {
15301523 }
15311524
15321525 @ Override
1533- public void onResponseSent (long requestId , String action , TransportResponse response , TransportResponseOptions finalOptions ) {
1526+ public void onResponseSent (long requestId , String action , TransportResponse response ) {
15341527 for (TransportMessageListener listener : listeners ) {
1535- listener .onResponseSent (requestId , action , response , finalOptions );
1528+ listener .onResponseSent (requestId , action , response );
15361529 }
15371530 }
15381531
0 commit comments