From e6653cfd67a0528bb99277d918b1f1033e8782b6 Mon Sep 17 00:00:00 2001 From: Andrey Ershov Date: Thu, 22 Aug 2019 15:00:46 +0200 Subject: [PATCH 1/5] Rename HttpOnTransportException -> HttpRequestOnTransportException --- .../elasticsearch/ElasticsearchException.java | 4 ++-- .../elasticsearch/transport/TcpTransport.java | 20 +++++++++---------- .../ExceptionSerializationTests.java | 2 +- .../transport/TcpTransportTests.java | 2 +- 4 files changed, 14 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ElasticsearchException.java b/server/src/main/java/org/elasticsearch/ElasticsearchException.java index 51662414e0d07..92bfa32807f3e 100644 --- a/server/src/main/java/org/elasticsearch/ElasticsearchException.java +++ b/server/src/main/java/org/elasticsearch/ElasticsearchException.java @@ -961,8 +961,8 @@ private enum ElasticsearchExceptionHandle { RESOURCE_ALREADY_EXISTS_EXCEPTION(ResourceAlreadyExistsException.class, ResourceAlreadyExistsException::new, 123, UNKNOWN_VERSION_ADDED), // 124 used to be Script.ScriptParseException - HTTP_ON_TRANSPORT_EXCEPTION(TcpTransport.HttpOnTransportException.class, - TcpTransport.HttpOnTransportException::new, 125, UNKNOWN_VERSION_ADDED), + HTTP_REQUEST_ON_TRANSPORT_EXCEPTION(TcpTransport.HttpRequestOnTransportException.class, + TcpTransport.HttpRequestOnTransportException::new, 125, UNKNOWN_VERSION_ADDED), MAPPER_PARSING_EXCEPTION(org.elasticsearch.index.mapper.MapperParsingException.class, org.elasticsearch.index.mapper.MapperParsingException::new, 126, UNKNOWN_VERSION_ADDED), SEARCH_CONTEXT_EXCEPTION(org.elasticsearch.search.SearchContextException.class, diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 6f00d8e00d225..8b629f4a3d9eb 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -601,7 +601,7 @@ public void onException(TcpChannel channel, Exception e) { "cancelled key exception caught on transport layer [{}], disconnecting from relevant node", channel), e); // close the channel as safe measure, which will cause a node to be disconnected if relevant CloseableChannel.closeChannel(channel); - } else if (e instanceof TcpTransport.HttpOnTransportException) { + } else if (e instanceof HttpRequestOnTransportException) { // in case we are able to return data, serialize the exception content and sent it back to the client if (channel.isOpen()) { BytesArray message = new BytesArray(e.getMessage().getBytes(StandardCharsets.UTF_8)); @@ -671,7 +671,7 @@ public void inboundMessage(TcpChannel channel, BytesReference message) { * @param bytesReference the bytes available to consume * @return the number of bytes consumed * @throws StreamCorruptedException if the message header format is not recognized - * @throws TcpTransport.HttpOnTransportException if the message header appears to be an HTTP message + * @throws HttpRequestOnTransportException if the message header appears to be an HTTP message * @throws IllegalArgumentException if the message length is greater that the maximum allowed frame size. * This is dependent on the available memory. */ @@ -693,7 +693,7 @@ public int consumeNetworkReads(TcpChannel channel, BytesReference bytesReference * @param networkBytes the will be read * @return the message decoded * @throws StreamCorruptedException if the message header format is not recognized - * @throws TcpTransport.HttpOnTransportException if the message header appears to be an HTTP message + * @throws HttpRequestOnTransportException if the message header appears to be an HTTP message * @throws IllegalArgumentException if the message length is greater that the maximum allowed frame size. * This is dependent on the available memory. */ @@ -720,7 +720,7 @@ static BytesReference decodeFrame(BytesReference networkBytes) throws IOExceptio * @param networkBytes the will be read * @return the length of the message * @throws StreamCorruptedException if the message header format is not recognized - * @throws TcpTransport.HttpOnTransportException if the message header appears to be an HTTP message + * @throws HttpRequestOnTransportException if the message header appears to be an HTTP message * @throws IllegalArgumentException if the message length is greater that the maximum allowed frame size. * This is dependent on the available memory. */ @@ -734,8 +734,8 @@ public static int readMessageLength(BytesReference networkBytes) throws IOExcept private static int readHeaderBuffer(BytesReference headerBuffer) throws IOException { if (headerBuffer.get(0) != 'E' || headerBuffer.get(1) != 'S') { - if (appearsToBeHTTP(headerBuffer)) { - throw new TcpTransport.HttpOnTransportException("This is not an HTTP port"); + if (appearsToBeHTTPRequest(headerBuffer)) { + throw new HttpRequestOnTransportException("This is not an HTTP port"); } throw new StreamCorruptedException("invalid internal transport message format, got (" @@ -763,7 +763,7 @@ private static int readHeaderBuffer(BytesReference headerBuffer) throws IOExcept return messageLength; } - private static boolean appearsToBeHTTP(BytesReference headerBuffer) { + private static boolean appearsToBeHTTPRequest(BytesReference headerBuffer) { return bufferStartsWith(headerBuffer, "GET") || bufferStartsWith(headerBuffer, "POST") || bufferStartsWith(headerBuffer, "PUT") || @@ -789,9 +789,9 @@ private static boolean bufferStartsWith(BytesReference buffer, String method) { * A helper exception to mark an incoming connection as potentially being HTTP * so an appropriate error code can be returned */ - public static class HttpOnTransportException extends ElasticsearchException { + public static class HttpRequestOnTransportException extends ElasticsearchException { - private HttpOnTransportException(String msg) { + private HttpRequestOnTransportException(String msg) { super(msg); } @@ -800,7 +800,7 @@ public RestStatus status() { return RestStatus.BAD_REQUEST; } - public HttpOnTransportException(StreamInput in) throws IOException { + public HttpRequestOnTransportException(StreamInput in) throws IOException { super(in); } } diff --git a/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java b/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java index 61d8532b5652a..411dc02f8adff 100644 --- a/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java @@ -787,7 +787,7 @@ public void testIds() { ids.put(122, null); ids.put(123, org.elasticsearch.ResourceAlreadyExistsException.class); ids.put(124, null); - ids.put(125, TcpTransport.HttpOnTransportException.class); + ids.put(125, TcpTransport.HttpRequestOnTransportException.class); ids.put(126, org.elasticsearch.index.mapper.MapperParsingException.class); ids.put(127, org.elasticsearch.search.SearchContextException.class); ids.put(128, org.elasticsearch.search.builder.SearchSourceBuilderException.class); diff --git a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java index 17106508ae71a..c55d16efb543d 100644 --- a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java @@ -304,7 +304,7 @@ public void testHTTPHeader() throws IOException { TcpTransport.decodeFrame(bytes); fail("Expected exception"); } catch (Exception ex) { - assertThat(ex, instanceOf(TcpTransport.HttpOnTransportException.class)); + assertThat(ex, instanceOf(TcpTransport.HttpRequestOnTransportException.class)); assertEquals("This is not an HTTP port", ex.getMessage()); } } From dc2778ff542685e84a5ab1069a0038b452875733 Mon Sep 17 00:00:00 2001 From: Andrey Ershov Date: Thu, 22 Aug 2019 15:47:54 +0200 Subject: [PATCH 2/5] Better logging for HTTP response on non-secure transport channel --- .../elasticsearch/transport/TcpTransport.java | 22 +++++++++++++++++++ .../transport/TcpTransportTests.java | 21 +++++++++++++++++- 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 8b629f4a3d9eb..cfc135938a76b 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -601,6 +601,9 @@ public void onException(TcpChannel channel, Exception e) { "cancelled key exception caught on transport layer [{}], disconnecting from relevant node", channel), e); // close the channel as safe measure, which will cause a node to be disconnected if relevant CloseableChannel.closeChannel(channel); + } else if (e instanceof HttpResponseOnTransportException) { + logger.warn(() -> new ParameterizedMessage("{} [{}], closing connection", e.getMessage(), channel)); + CloseableChannel.closeChannel(channel); } else if (e instanceof HttpRequestOnTransportException) { // in case we are able to return data, serialize the exception content and sent it back to the client if (channel.isOpen()) { @@ -738,6 +741,11 @@ private static int readHeaderBuffer(BytesReference headerBuffer) throws IOExcept throw new HttpRequestOnTransportException("This is not an HTTP port"); } + if (appearsToBeHTTPResponse(headerBuffer)) { + throw new HttpResponseOnTransportException("Received HTTP response on transport port, ensure that transport port (not " + + "HTTP port) of remote node is specified in the configuration"); + } + throw new StreamCorruptedException("invalid internal transport message format, got (" + Integer.toHexString(headerBuffer.get(0) & 0xFF) + "," + Integer.toHexString(headerBuffer.get(1) & 0xFF) + "," @@ -775,6 +783,10 @@ private static boolean appearsToBeHTTPRequest(BytesReference headerBuffer) { bufferStartsWith(headerBuffer, "TRACE"); } + private static boolean appearsToBeHTTPResponse(BytesReference headerBuffer) { + return bufferStartsWith(headerBuffer, "HTTP"); + } + private static boolean bufferStartsWith(BytesReference buffer, String method) { char[] chars = method.toCharArray(); for (int i = 0; i < chars.length; i++) { @@ -805,6 +817,16 @@ public HttpRequestOnTransportException(StreamInput in) throws IOException { } } + /** + * A helper exception to mark a remote end of the connection as HTTP + */ + public static class HttpResponseOnTransportException extends ElasticsearchException { + + private HttpResponseOnTransportException(String msg) { + super(msg); + } + } + public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile, ActionListener listener) { long requestId = inboundHandler.getResponseHandlers().newRequestId(); handshaker.sendHandshake(requestId, node, channel, profile.getHandshakeTimeout(), listener); diff --git a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java index c55d16efb543d..88b92d73166af 100644 --- a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java @@ -288,7 +288,7 @@ public void testInvalidHeader() throws IOException { } } - public void testHTTPHeader() throws IOException { + public void testHTTPRequest() throws IOException { String[] httpHeaders = {"GET", "POST", "PUT", "HEAD", "DELETE", "OPTIONS", "PATCH", "TRACE"}; for (String httpHeader : httpHeaders) { @@ -309,4 +309,23 @@ public void testHTTPHeader() throws IOException { } } } + + public void testHTTPResponse() throws IOException { + BytesStreamOutput streamOutput = new BytesStreamOutput(1 << 14); + streamOutput.write('H'); + streamOutput.write('T'); + streamOutput.write('T'); + streamOutput.write('P'); + streamOutput.write(randomByte()); + streamOutput.write(randomByte()); + + try { + TcpTransport.decodeFrame(streamOutput.bytes()); + fail("Expected exception"); + } catch (Exception ex) { + assertThat(ex, instanceOf(TcpTransport.HttpResponseOnTransportException.class)); + assertEquals("Received HTTP response on transport port, ensure that transport port " + + "(not HTTP port) of remote node is specified in the configuration", ex.getMessage()); + } + } } From e62a6ee253b9034e967b16316202879a17bba04b Mon Sep 17 00:00:00 2001 From: Andrey Ershov Date: Mon, 26 Aug 2019 12:40:51 +0200 Subject: [PATCH 3/5] Get rid of HttpResponseOnTransportException --- .../elasticsearch/transport/TcpTransport.java | 18 ++++-------------- .../transport/TcpTransportTests.java | 2 +- 2 files changed, 5 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index cfc135938a76b..401b643365ad1 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -601,15 +601,15 @@ public void onException(TcpChannel channel, Exception e) { "cancelled key exception caught on transport layer [{}], disconnecting from relevant node", channel), e); // close the channel as safe measure, which will cause a node to be disconnected if relevant CloseableChannel.closeChannel(channel); - } else if (e instanceof HttpResponseOnTransportException) { - logger.warn(() -> new ParameterizedMessage("{} [{}], closing connection", e.getMessage(), channel)); - CloseableChannel.closeChannel(channel); } else if (e instanceof HttpRequestOnTransportException) { // in case we are able to return data, serialize the exception content and sent it back to the client if (channel.isOpen()) { BytesArray message = new BytesArray(e.getMessage().getBytes(StandardCharsets.UTF_8)); outboundHandler.sendBytes(channel, message, ActionListener.wrap(() -> CloseableChannel.closeChannel(channel))); } + } else if (e instanceof StreamCorruptedException) { + logger.warn(() -> new ParameterizedMessage("{} [{}], closing connection", e.getMessage(), channel)); + CloseableChannel.closeChannel(channel); } else { logger.warn(() -> new ParameterizedMessage("exception caught on transport layer [{}], closing connection", channel), e); // close the channel, which will cause a node to be disconnected if relevant @@ -742,7 +742,7 @@ private static int readHeaderBuffer(BytesReference headerBuffer) throws IOExcept } if (appearsToBeHTTPResponse(headerBuffer)) { - throw new HttpResponseOnTransportException("Received HTTP response on transport port, ensure that transport port (not " + + throw new StreamCorruptedException("Received HTTP response on transport port, ensure that transport port (not " + "HTTP port) of remote node is specified in the configuration"); } @@ -817,16 +817,6 @@ public HttpRequestOnTransportException(StreamInput in) throws IOException { } } - /** - * A helper exception to mark a remote end of the connection as HTTP - */ - public static class HttpResponseOnTransportException extends ElasticsearchException { - - private HttpResponseOnTransportException(String msg) { - super(msg); - } - } - public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile, ActionListener listener) { long requestId = inboundHandler.getResponseHandlers().newRequestId(); handshaker.sendHandshake(requestId, node, channel, profile.getHandshakeTimeout(), listener); diff --git a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java index 88b92d73166af..0d85834db53e7 100644 --- a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java @@ -323,7 +323,7 @@ public void testHTTPResponse() throws IOException { TcpTransport.decodeFrame(streamOutput.bytes()); fail("Expected exception"); } catch (Exception ex) { - assertThat(ex, instanceOf(TcpTransport.HttpResponseOnTransportException.class)); + assertThat(ex, instanceOf(StreamCorruptedException.class)); assertEquals("Received HTTP response on transport port, ensure that transport port " + "(not HTTP port) of remote node is specified in the configuration", ex.getMessage()); } From 210d146c482565721f6c89c7d16c216c63013f95 Mon Sep 17 00:00:00 2001 From: Andrey Ershov Date: Mon, 26 Aug 2019 12:53:42 +0200 Subject: [PATCH 4/5] Fix message --- .../main/java/org/elasticsearch/transport/TcpTransport.java | 4 ++-- .../java/org/elasticsearch/transport/TcpTransportTests.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 401b643365ad1..0244c18ef20cd 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -742,8 +742,8 @@ private static int readHeaderBuffer(BytesReference headerBuffer) throws IOExcept } if (appearsToBeHTTPResponse(headerBuffer)) { - throw new StreamCorruptedException("Received HTTP response on transport port, ensure that transport port (not " + - "HTTP port) of remote node is specified in the configuration"); + throw new StreamCorruptedException("received HTTP response on transport port, ensure that transport port (not " + + "HTTP port) of a remote node is specified in the configuration"); } throw new StreamCorruptedException("invalid internal transport message format, got (" diff --git a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java index 0d85834db53e7..172ee6bcd7f99 100644 --- a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java @@ -324,8 +324,8 @@ public void testHTTPResponse() throws IOException { fail("Expected exception"); } catch (Exception ex) { assertThat(ex, instanceOf(StreamCorruptedException.class)); - assertEquals("Received HTTP response on transport port, ensure that transport port " + - "(not HTTP port) of remote node is specified in the configuration", ex.getMessage()); + assertEquals("received HTTP response on transport port, ensure that transport port " + + "(not HTTP port) of a remote node is specified in the configuration", ex.getMessage()); } } } From b93e6c2a55fb8c230ed679b323b6d3e432b392bc Mon Sep 17 00:00:00 2001 From: Andrey Ershov Date: Tue, 27 Aug 2019 17:46:20 +0200 Subject: [PATCH 5/5] JavaDoc formatting --- .../main/java/org/elasticsearch/transport/TcpTransport.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 9d9b4c3477791..8605a1ae29798 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -674,7 +674,7 @@ public void inboundMessage(TcpChannel channel, BytesReference message) { * @param bytesReference the bytes available to consume * @return the number of bytes consumed * @throws StreamCorruptedException if the message header format is not recognized - * @throws HttpRequestOnTransportException if the message header appears to be an HTTP message + * @throws HttpRequestOnTransportException if the message header appears to be an HTTP message * @throws IllegalArgumentException if the message length is greater that the maximum allowed frame size. * This is dependent on the available memory. */ @@ -696,7 +696,7 @@ public int consumeNetworkReads(TcpChannel channel, BytesReference bytesReference * @param networkBytes the will be read * @return the message decoded * @throws StreamCorruptedException if the message header format is not recognized - * @throws HttpRequestOnTransportException if the message header appears to be an HTTP message + * @throws HttpRequestOnTransportException if the message header appears to be an HTTP message * @throws IllegalArgumentException if the message length is greater that the maximum allowed frame size. * This is dependent on the available memory. */ @@ -723,7 +723,7 @@ static BytesReference decodeFrame(BytesReference networkBytes) throws IOExceptio * @param networkBytes the will be read * @return the length of the message * @throws StreamCorruptedException if the message header format is not recognized - * @throws HttpRequestOnTransportException if the message header appears to be an HTTP message + * @throws HttpRequestOnTransportException if the message header appears to be an HTTP message * @throws IllegalArgumentException if the message length is greater that the maximum allowed frame size. * This is dependent on the available memory. */