Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQL: binary communication implementation for drivers and the CLI #48261

Merged
merged 12 commits into from
Oct 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions x-pack/plugin/sql/jdbc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ dependencies {
transitive = false
}
compile project(':libs:elasticsearch-core')
compile "com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:${versions.jackson}"
runtime "com.fasterxml.jackson.core:jackson-core:${versions.jackson}"
testCompile project(":test:framework")
testCompile project(path: xpackModule('core'), configuration: 'testArtifacts')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
8b9826e16c3366764bfb7ad7362554f0471046c3
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ Cursor query(String sql, List<SqlTypedParamValue> params, RequestMeta meta) thro
null,
new RequestInfo(Mode.JDBC),
conCfg.fieldMultiValueLeniency(),
conCfg.indexIncludeFrozen());
conCfg.indexIncludeFrozen(),
conCfg.binaryCommunication());
SqlQueryResponse response = httpClient.query(sqlRequest);
return new DefaultCursor(this, response.cursor(), toJdbcColumnInfo(response.columns()), response.rows(), meta);
}
Expand All @@ -75,7 +76,7 @@ Cursor query(String sql, List<SqlTypedParamValue> params, RequestMeta meta) thro
*/
Tuple<String, List<List<Object>>> nextPage(String cursor, RequestMeta meta) throws SQLException {
SqlQueryRequest sqlRequest = new SqlQueryRequest(cursor, TimeValue.timeValueMillis(meta.timeoutInMs()),
TimeValue.timeValueMillis(meta.queryTimeoutInMs()), new RequestInfo(Mode.JDBC));
TimeValue.timeValueMillis(meta.queryTimeoutInMs()), new RequestInfo(Mode.JDBC), conCfg.binaryCommunication());
SqlQueryResponse response = httpClient.query(sqlRequest);
return new Tuple<>(response.cursor(), response.rows());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.sql.jdbc;

import com.sun.net.httpserver.Headers;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpServer;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.mocksocket.MockHttpServer;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.sql.client.ConnectionConfiguration;
import org.junit.AfterClass;
import org.junit.BeforeClass;

import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.ExecutorService;

public class JdbcHttpClientRequestTests extends ESTestCase {

private static RawRequestMockWebServer webServer = new RawRequestMockWebServer();
private static final Logger logger = LogManager.getLogger(JdbcHttpClientRequestTests.class);

@BeforeClass
public static void init() throws Exception {
webServer.start();
}

@AfterClass
public static void cleanup() {
webServer.close();
}

public void testBinaryRequestEnabled() throws Exception {
assertBinaryRequest(true, XContentType.CBOR);
}

public void testBinaryRequestDisabled() throws Exception {
assertBinaryRequest(false, XContentType.JSON);
}

private void assertBinaryRequest(boolean isBinary, XContentType xContentType) throws Exception {
String url = JdbcConfiguration.URL_PREFIX + webServer.getHostName() + ":" + webServer.getPort();
Properties props = new Properties();
props.setProperty(ConnectionConfiguration.BINARY_COMMUNICATION, Boolean.toString(isBinary));

JdbcHttpClient httpClient = new JdbcHttpClient(JdbcConfiguration.create(url, props, 0), false);

prepareMockResponse();
try {
httpClient.query(randomAlphaOfLength(256), null,
new RequestMeta(randomIntBetween(1, 100), randomNonNegativeLong(), randomNonNegativeLong()));
} catch (SQLException e) {
logger.info("Ignored SQLException", e);
}
assertValues(isBinary, xContentType);

prepareMockResponse();
try {
httpClient.nextPage("", new RequestMeta(randomIntBetween(1, 100), randomNonNegativeLong(), randomNonNegativeLong()));
} catch (SQLException e) {
logger.info("Ignored SQLException", e);
}
assertValues(isBinary, xContentType);
}

private void assertValues(boolean isBinary, XContentType xContentType) {
assertEquals(1, webServer.requests().size());
RawRequest recordedRequest = webServer.takeRequest();
assertEquals(xContentType.mediaTypeWithoutParameters(), recordedRequest.getHeader("Content-Type"));
assertEquals("POST", recordedRequest.getMethod());

BytesReference bytesRef = recordedRequest.getBodyAsBytes();
Map<String, Object> reqContent = XContentHelper.convertToMap(bytesRef, false, xContentType).v2();

assertTrue(((String) reqContent.get("mode")).equalsIgnoreCase("jdbc"));
assertEquals(isBinary, reqContent.get("binary_format"));
}

private void prepareMockResponse() {
webServer.enqueue(new Response()
.setResponseCode(200)
.addHeader("Content-Type", "application/json")
.setBody("{\"rows\":[],\"columns\":[]}"));
}

@SuppressForbidden(reason = "use http server")
private static class RawRequestMockWebServer implements Closeable {
private HttpServer server;
private final Queue<Response> responses = ConcurrentCollections.newQueue();
private final Queue<RawRequest> requests = ConcurrentCollections.newQueue();
private String hostname;
private int port;

RawRequestMockWebServer() {
}

void start() throws IOException {
InetSocketAddress address = new InetSocketAddress(InetAddress.getLoopbackAddress().getHostAddress(), 0);
server = MockHttpServer.createHttp(address, 0);

server.start();
this.hostname = server.getAddress().getHostString();
this.port = server.getAddress().getPort();

server.createContext("/", s -> {
try {
Response response = responses.poll();
RawRequest request = createRequest(s);
requests.add(request);
s.getResponseHeaders().putAll(response.getHeaders());

if (Strings.isEmpty(response.getBody())) {
s.sendResponseHeaders(response.getStatusCode(), 0);
} else {
byte[] responseAsBytes = response.getBody().getBytes(StandardCharsets.UTF_8);
s.sendResponseHeaders(response.getStatusCode(), responseAsBytes.length);
if ("HEAD".equals(request.getMethod()) == false) {
try (OutputStream responseBody = s.getResponseBody()) {
responseBody.write(responseAsBytes);
}
}
}
} catch (Exception e) {
logger.error((Supplier<?>) () -> new ParameterizedMessage("failed to respond to request [{} {}]",
s.getRequestMethod(), s.getRequestURI()), e);
} finally {
s.close();
}

});
}

private RawRequest createRequest(HttpExchange exchange) throws IOException {
RawRequest request = new RawRequest(exchange.getRequestMethod(), exchange.getRequestHeaders());
if (exchange.getRequestBody() != null) {
BytesReference bytesRef = Streams.readFully(exchange.getRequestBody());
request.setBodyAsBytes(bytesRef);
}
return request;
}

String getHostName() {
return hostname;
}

int getPort() {
return port;
}

void enqueue(Response response) {
responses.add(response);
}

List<RawRequest> requests() {
return new ArrayList<>(requests);
}

RawRequest takeRequest() {
return requests.poll();
}

@Override
public void close() {
if (server.getExecutor() instanceof ExecutorService) {
terminate((ExecutorService) server.getExecutor());
}
server.stop(0);
}
}

@SuppressForbidden(reason = "use http server header class")
private static class RawRequest {

private final String method;
private final Headers headers;
private BytesReference bodyAsBytes = null;

RawRequest(String method, Headers headers) {
this.method = method;
this.headers = headers;
}

public String getMethod() {
return method;
}

public String getHeader(String name) {
return headers.getFirst(name);
}

public BytesReference getBodyAsBytes() {
return bodyAsBytes;
}

public void setBodyAsBytes(BytesReference bodyAsBytes) {
this.bodyAsBytes = bodyAsBytes;
}
}

@SuppressForbidden(reason = "use http server header class")
private class Response {

private String body = null;
private int statusCode = 200;
private Headers headers = new Headers();

public Response setBody(String body) {
this.body = body;
return this;
}

public Response setResponseCode(int statusCode) {
this.statusCode = statusCode;
return this;
}

public Response addHeader(String name, String value) {
headers.add(name, value);
return this;
}

String getBody() {
return body;
}

int getStatusCode() {
return statusCode;
}

Headers getHeaders() {
return headers;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,17 @@ public class VersionParityTests extends WebServerTestCase {
public void testExceptionThrownOnIncompatibleVersions() throws IOException, SQLException {
Version version = VersionUtils.randomVersionBetween(random(), null, VersionUtils.getPreviousVersion());
logger.info("Checking exception is thrown for version {}", version);
prepareRequest(version);
prepareResponse(version);

String url = JdbcConfiguration.URL_PREFIX + webServer().getHostName() + ":" + webServer().getPort();
String url = JdbcConfiguration.URL_PREFIX + webServerAddress();
SQLException ex = expectThrows(SQLException.class, () -> new JdbcHttpClient(JdbcConfiguration.create(url, null, 0)));
assertEquals("This version of the JDBC driver is only compatible with Elasticsearch version "
+ org.elasticsearch.xpack.sql.client.Version.CURRENT.toString()
+ ", attempting to connect to a server version " + version.toString(), ex.getMessage());
}

public void testNoExceptionThrownForCompatibleVersions() throws IOException {
prepareRequest(null);
prepareResponse(null);

String url = JdbcConfiguration.URL_PREFIX + webServerAddress();
try {
Expand All @@ -47,7 +47,7 @@ public void testNoExceptionThrownForCompatibleVersions() throws IOException {
}
}

void prepareRequest(Version version) throws IOException {
void prepareResponse(Version version) throws IOException {
MainResponse response = version == null ? createCurrentVersionMainResponse() : createMainResponse(version);
webServer().enqueue(new MockResponse().setResponseCode(200).addHeader("Content-Type", "application/json").setBody(
XContentHelper.toXContent(response, XContentType.JSON, false).utf8ToString()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,11 @@
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.NotEqualMessageBuilder;
import org.elasticsearch.test.rest.ESRestTestCase;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.UnsupportedCharsetException;
import java.sql.JDBCType;
import java.util.HashMap;
Expand All @@ -27,6 +25,7 @@
import static java.util.Collections.singletonList;
import static org.elasticsearch.xpack.sql.qa.rest.BaseRestSqlTestCase.mode;
import static org.elasticsearch.xpack.sql.qa.rest.BaseRestSqlTestCase.randomMode;
import static org.elasticsearch.xpack.sql.qa.rest.BaseRestSqlTestCase.toMap;
import static org.elasticsearch.xpack.sql.qa.rest.RestSqlTestCase.SQL_QUERY_REST_ENDPOINT;
import static org.elasticsearch.xpack.sql.qa.rest.RestSqlTestCase.columnInfo;

Expand Down Expand Up @@ -101,9 +100,7 @@ private void createTestData(int documents) throws UnsupportedCharsetException, I
}

private Map<String, Object> responseToMap(Response response) throws IOException {
try (InputStream content = response.getEntity().getContent()) {
return XContentHelper.convertToMap(JsonXContent.jsonXContent, content, false);
}
return toMap(response, "plain");
}

private void assertCount(RestClient client, int count) throws IOException {
Expand All @@ -114,7 +111,7 @@ private void assertCount(RestClient client, int count) throws IOException {

Request request = new Request("POST", SQL_QUERY_REST_ENDPOINT);
request.setJsonEntity("{\"query\": \"SELECT COUNT(*) FROM test\"" + mode(mode) + "}");
Map<String, Object> actual = responseToMap(client.performRequest(request));
Map<String, Object> actual = toMap(client.performRequest(request), mode);

if (false == expected.equals(actual)) {
NotEqualMessageBuilder message = new NotEqualMessageBuilder();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.sql.qa.multi_node;

import org.elasticsearch.xpack.sql.qa.SqlProtocolTestCase;

public class SqlProtocolIT extends SqlProtocolTestCase {
}
Loading