Skip to content

Commit

Permalink
SQL: binary communication implementation for drivers and the CLI (#48261
Browse files Browse the repository at this point in the history
)

* Introduce binary_format request parameter (binary.format for JDBC) to disable binary
communication between clients (jdbc/odbc) and server.
* for CLI - "binary" command line parameter (or -b) is introduced. Default value is "true".
* binary communication (cbor) is enabled by default
* disabling request parameter introduced for debugging purposes only

(cherry picked from commit f96a5ca)
  • Loading branch information
astefan committed Nov 1, 2019
1 parent 4be5440 commit 2c73c7d
Show file tree
Hide file tree
Showing 29 changed files with 931 additions and 139 deletions.
1 change: 1 addition & 0 deletions x-pack/plugin/sql/jdbc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ dependencies {
transitive = false
}
compile project(':libs:elasticsearch-core')
compile "com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:${versions.jackson}"
runtime "com.fasterxml.jackson.core:jackson-core:${versions.jackson}"
testCompile project(":test:framework")
testCompile project(path: xpackModule('core'), configuration: 'testArtifacts')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
8b9826e16c3366764bfb7ad7362554f0471046c3
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ Cursor query(String sql, List<SqlTypedParamValue> params, RequestMeta meta) thro
null,
new RequestInfo(Mode.JDBC),
conCfg.fieldMultiValueLeniency(),
conCfg.indexIncludeFrozen());
conCfg.indexIncludeFrozen(),
conCfg.binaryCommunication());
SqlQueryResponse response = httpClient.query(sqlRequest);
return new DefaultCursor(this, response.cursor(), toJdbcColumnInfo(response.columns()), response.rows(), meta);
}
Expand All @@ -75,7 +76,7 @@ Cursor query(String sql, List<SqlTypedParamValue> params, RequestMeta meta) thro
*/
Tuple<String, List<List<Object>>> nextPage(String cursor, RequestMeta meta) throws SQLException {
SqlQueryRequest sqlRequest = new SqlQueryRequest(cursor, TimeValue.timeValueMillis(meta.timeoutInMs()),
TimeValue.timeValueMillis(meta.queryTimeoutInMs()), new RequestInfo(Mode.JDBC));
TimeValue.timeValueMillis(meta.queryTimeoutInMs()), new RequestInfo(Mode.JDBC), conCfg.binaryCommunication());
SqlQueryResponse response = httpClient.query(sqlRequest);
return new Tuple<>(response.cursor(), response.rows());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.sql.jdbc;

import com.sun.net.httpserver.Headers;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpServer;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.mocksocket.MockHttpServer;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.sql.client.ConnectionConfiguration;
import org.junit.AfterClass;
import org.junit.BeforeClass;

import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.ExecutorService;

public class JdbcHttpClientRequestTests extends ESTestCase {

private static RawRequestMockWebServer webServer = new RawRequestMockWebServer();
private static final Logger logger = LogManager.getLogger(JdbcHttpClientRequestTests.class);

@BeforeClass
public static void init() throws Exception {
webServer.start();
}

@AfterClass
public static void cleanup() {
webServer.close();
}

public void testBinaryRequestEnabled() throws Exception {
assertBinaryRequest(true, XContentType.CBOR);
}

public void testBinaryRequestDisabled() throws Exception {
assertBinaryRequest(false, XContentType.JSON);
}

private void assertBinaryRequest(boolean isBinary, XContentType xContentType) throws Exception {
String url = JdbcConfiguration.URL_PREFIX + webServer.getHostName() + ":" + webServer.getPort();
Properties props = new Properties();
props.setProperty(ConnectionConfiguration.BINARY_COMMUNICATION, Boolean.toString(isBinary));

JdbcHttpClient httpClient = new JdbcHttpClient(JdbcConfiguration.create(url, props, 0), false);

prepareMockResponse();
try {
httpClient.query(randomAlphaOfLength(256), null,
new RequestMeta(randomIntBetween(1, 100), randomNonNegativeLong(), randomNonNegativeLong()));
} catch (SQLException e) {
logger.info("Ignored SQLException", e);
}
assertValues(isBinary, xContentType);

prepareMockResponse();
try {
httpClient.nextPage("", new RequestMeta(randomIntBetween(1, 100), randomNonNegativeLong(), randomNonNegativeLong()));
} catch (SQLException e) {
logger.info("Ignored SQLException", e);
}
assertValues(isBinary, xContentType);
}

private void assertValues(boolean isBinary, XContentType xContentType) {
assertEquals(1, webServer.requests().size());
RawRequest recordedRequest = webServer.takeRequest();
assertEquals(xContentType.mediaTypeWithoutParameters(), recordedRequest.getHeader("Content-Type"));
assertEquals("POST", recordedRequest.getMethod());

BytesReference bytesRef = recordedRequest.getBodyAsBytes();
Map<String, Object> reqContent = XContentHelper.convertToMap(bytesRef, false, xContentType).v2();

assertTrue(((String) reqContent.get("mode")).equalsIgnoreCase("jdbc"));
assertEquals(isBinary, reqContent.get("binary_format"));
}

private void prepareMockResponse() {
webServer.enqueue(new Response()
.setResponseCode(200)
.addHeader("Content-Type", "application/json")
.setBody("{\"rows\":[],\"columns\":[]}"));
}

@SuppressForbidden(reason = "use http server")
private static class RawRequestMockWebServer implements Closeable {
private HttpServer server;
private final Queue<Response> responses = ConcurrentCollections.newQueue();
private final Queue<RawRequest> requests = ConcurrentCollections.newQueue();
private String hostname;
private int port;

RawRequestMockWebServer() {
}

void start() throws IOException {
InetSocketAddress address = new InetSocketAddress(InetAddress.getLoopbackAddress().getHostAddress(), 0);
server = MockHttpServer.createHttp(address, 0);

server.start();
this.hostname = server.getAddress().getHostString();
this.port = server.getAddress().getPort();

server.createContext("/", s -> {
try {
Response response = responses.poll();
RawRequest request = createRequest(s);
requests.add(request);
s.getResponseHeaders().putAll(response.getHeaders());

if (Strings.isEmpty(response.getBody())) {
s.sendResponseHeaders(response.getStatusCode(), 0);
} else {
byte[] responseAsBytes = response.getBody().getBytes(StandardCharsets.UTF_8);
s.sendResponseHeaders(response.getStatusCode(), responseAsBytes.length);
if ("HEAD".equals(request.getMethod()) == false) {
try (OutputStream responseBody = s.getResponseBody()) {
responseBody.write(responseAsBytes);
}
}
}
} catch (Exception e) {
logger.error((Supplier<?>) () -> new ParameterizedMessage("failed to respond to request [{} {}]",
s.getRequestMethod(), s.getRequestURI()), e);
} finally {
s.close();
}

});
}

private RawRequest createRequest(HttpExchange exchange) throws IOException {
RawRequest request = new RawRequest(exchange.getRequestMethod(), exchange.getRequestHeaders());
if (exchange.getRequestBody() != null) {
BytesReference bytesRef = Streams.readFully(exchange.getRequestBody());
request.setBodyAsBytes(bytesRef);
}
return request;
}

String getHostName() {
return hostname;
}

int getPort() {
return port;
}

void enqueue(Response response) {
responses.add(response);
}

List<RawRequest> requests() {
return new ArrayList<>(requests);
}

RawRequest takeRequest() {
return requests.poll();
}

@Override
public void close() {
if (server.getExecutor() instanceof ExecutorService) {
terminate((ExecutorService) server.getExecutor());
}
server.stop(0);
}
}

@SuppressForbidden(reason = "use http server header class")
private static class RawRequest {

private final String method;
private final Headers headers;
private BytesReference bodyAsBytes = null;

RawRequest(String method, Headers headers) {
this.method = method;
this.headers = headers;
}

public String getMethod() {
return method;
}

public String getHeader(String name) {
return headers.getFirst(name);
}

public BytesReference getBodyAsBytes() {
return bodyAsBytes;
}

public void setBodyAsBytes(BytesReference bodyAsBytes) {
this.bodyAsBytes = bodyAsBytes;
}
}

@SuppressForbidden(reason = "use http server header class")
private class Response {

private String body = null;
private int statusCode = 200;
private Headers headers = new Headers();

public Response setBody(String body) {
this.body = body;
return this;
}

public Response setResponseCode(int statusCode) {
this.statusCode = statusCode;
return this;
}

public Response addHeader(String name, String value) {
headers.add(name, value);
return this;
}

String getBody() {
return body;
}

int getStatusCode() {
return statusCode;
}

Headers getHeaders() {
return headers;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,17 @@ public class VersionParityTests extends WebServerTestCase {

public void testExceptionThrownOnIncompatibleVersions() throws IOException, SQLException {
Version version = VersionUtils.randomVersionBetween(random(), Version.V_6_0_0, VersionUtils.getPreviousVersion(Version.CURRENT));
prepareRequest(version);
prepareResponse(version);

String url = JdbcConfiguration.URL_PREFIX + webServer().getHostName() + ":" + webServer().getPort();
String url = JdbcConfiguration.URL_PREFIX + webServerAddress();
SQLException ex = expectThrows(SQLException.class, () -> new JdbcHttpClient(JdbcConfiguration.create(url, null, 0)));
assertEquals("This version of the JDBC driver is only compatible with Elasticsearch version "
+ org.elasticsearch.xpack.sql.client.Version.CURRENT.toString()
+ ", attempting to connect to a server version " + version.toString(), ex.getMessage());
}

public void testNoExceptionThrownForCompatibleVersions() throws IOException {
prepareRequest(null);
prepareResponse(null);

String url = JdbcConfiguration.URL_PREFIX + webServerAddress();
try {
Expand All @@ -46,7 +46,7 @@ public void testNoExceptionThrownForCompatibleVersions() throws IOException {
}
}

void prepareRequest(Version version) throws IOException {
void prepareResponse(Version version) throws IOException {
MainResponse response = version == null ? createCurrentVersionMainResponse() : createMainResponse(version);
webServer().enqueue(new MockResponse().setResponseCode(200).addHeader("Content-Type", "application/json").setBody(
XContentHelper.toXContent(response, XContentType.JSON, false).utf8ToString()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,11 @@
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.NotEqualMessageBuilder;
import org.elasticsearch.test.rest.ESRestTestCase;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.UnsupportedCharsetException;
import java.sql.JDBCType;
import java.util.HashMap;
Expand All @@ -27,6 +25,7 @@
import static java.util.Collections.singletonList;
import static org.elasticsearch.xpack.sql.qa.rest.BaseRestSqlTestCase.mode;
import static org.elasticsearch.xpack.sql.qa.rest.BaseRestSqlTestCase.randomMode;
import static org.elasticsearch.xpack.sql.qa.rest.BaseRestSqlTestCase.toMap;
import static org.elasticsearch.xpack.sql.qa.rest.RestSqlTestCase.SQL_QUERY_REST_ENDPOINT;
import static org.elasticsearch.xpack.sql.qa.rest.RestSqlTestCase.columnInfo;

Expand Down Expand Up @@ -101,9 +100,7 @@ private void createTestData(int documents) throws UnsupportedCharsetException, I
}

private Map<String, Object> responseToMap(Response response) throws IOException {
try (InputStream content = response.getEntity().getContent()) {
return XContentHelper.convertToMap(JsonXContent.jsonXContent, content, false);
}
return toMap(response, "plain");
}

private void assertCount(RestClient client, int count) throws IOException {
Expand All @@ -114,7 +111,7 @@ private void assertCount(RestClient client, int count) throws IOException {

Request request = new Request("POST", SQL_QUERY_REST_ENDPOINT);
request.setJsonEntity("{\"query\": \"SELECT COUNT(*) FROM test\"" + mode(mode) + "}");
Map<String, Object> actual = responseToMap(client.performRequest(request));
Map<String, Object> actual = toMap(client.performRequest(request), mode);

if (false == expected.equals(actual)) {
NotEqualMessageBuilder message = new NotEqualMessageBuilder();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.sql.qa.multi_node;

import org.elasticsearch.xpack.sql.qa.SqlProtocolTestCase;

public class SqlProtocolIT extends SqlProtocolTestCase {
}
Loading

0 comments on commit 2c73c7d

Please sign in to comment.