Skip to content

Commit

Permalink
client side support of query_label
Browse files Browse the repository at this point in the history
  • Loading branch information
alexradzin committed Dec 13, 2023
1 parent b0d23d5 commit f4bc232
Show file tree
Hide file tree
Showing 17 changed files with 327 additions and 224 deletions.
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ tasks.register('integrationTest', Test) {
testLogging.exceptionFormat = 'full'
testClassesDirs = sourceSets.integrationTest.output.classesDirs
classpath = sourceSets.integrationTest.runtimeClasspath
maxParallelForks = (int) (Runtime.runtime.availableProcessors() / 2 + 1)
// Temporarily disabled multithreaded execution due to bug on server side.
//maxParallelForks = (int) (Runtime.runtime.availableProcessors() / 2 + 1)

testLogging {
events 'PASSED', 'FAILED', 'SKIPPED'
Expand Down
105 changes: 55 additions & 50 deletions src/integrationTest/java/integration/tests/StatementCancelTest.java
Original file line number Diff line number Diff line change
@@ -1,25 +1,24 @@
package integration.tests;

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.concurrent.TimeUnit;

import com.firebolt.jdbc.exception.ExceptionType;
import com.firebolt.jdbc.exception.FireboltException;
import com.firebolt.jdbc.statement.FireboltStatement;
import integration.IntegrationTest;
import lombok.CustomLog;
import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import com.firebolt.jdbc.exception.ExceptionType;
import com.firebolt.jdbc.exception.FireboltException;
import com.firebolt.jdbc.statement.FireboltStatement;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.concurrent.TimeUnit;

import integration.IntegrationTest;
import lombok.CustomLog;
import static org.junit.jupiter.api.Assertions.assertEquals;

@CustomLog
class StatementCancelTest extends IntegrationTest {
Expand All @@ -36,21 +35,18 @@ void afterEach() {

@Test
@Timeout(value = 2, unit = TimeUnit.MINUTES)
@Tag("slow")
void shouldCancelQuery() throws SQLException, InterruptedException {
try (Connection connection = createConnection()) {
long totalRecordsToInsert;
try (Statement statement = connection.createStatement()) {
ResultSet resultSet = statement.executeQuery("SELECT COUNT(*) from ex_lineitem");
resultSet.next();
totalRecordsToInsert = resultSet.getInt(1);
}
try (Connection connection = createConnection(); Statement fillStatement = connection.createStatement()) {
long now = System.currentTimeMillis();
fillStatement.execute("insert into ex_lineitem ( l_orderkey ) SELECT * FROM GENERATE_SERIES(1, 100000000)");
long insertTime = System.currentTimeMillis() - now;

try (FireboltStatement insertStatement = (FireboltStatement) connection.createStatement()) {
try (Statement insertStatement = connection.createStatement()) {

Thread thread = new Thread(() -> {
try {
insertStatement.execute(
"INSERT INTO first_statement_cancel_test SELECT * FROM ex_lineitem; INSERT INTO second_statement_cancel_test SELECT * FROM ex_lineitem;");
insertStatement.execute("INSERT INTO first_statement_cancel_test SELECT * FROM ex_lineitem; INSERT INTO second_statement_cancel_test SELECT * FROM ex_lineitem;");
} catch (FireboltException e) {
if (!e.getType().equals(ExceptionType.CANCELED)) {
throw new RuntimeException(e);
Expand All @@ -61,47 +57,56 @@ void shouldCancelQuery() throws SQLException, InterruptedException {
}
});
thread.start();
while (!insertStatement.isStatementRunning()) {
Thread.sleep(1000);
// Wait until copying started
while (!((FireboltStatement)insertStatement).isStatementRunning()) {
Thread.sleep(100);
}
insertStatement.cancel();
Thread.sleep(insertTime / 10); // wait 10% of time that was spent to fill data to give chance to the insert statement to copy data
insertStatement.cancel(); // now cancel the statement
}
Thread.sleep(5000);
verifyThatNoMoreRecordsAreAdded(connection, "first_statement_cancel_test", totalRecordsToInsert);
verifyThatNoMoreRecordsAreAdded(connection, "first_statement_cancel_test", insertTime);
verifyThatSecondStatementWasNotExecuted(connection, "second_statement_cancel_test");

}
}

private void verifyThatNoMoreRecordsAreAdded(Connection connection, String tableName, long totalRecordsToInsert)
throws SQLException, InterruptedException {
String countAddedRecordsQuery = String.format("SELECT COUNT(*) FROM %s", tableName);
try (Statement countStatement = connection.createStatement()) {
ResultSet rs = countStatement.executeQuery(countAddedRecordsQuery);
rs.next();
long count = rs.getInt(1);
log.info("{} records were added to table {} before the statement got cancelled", count, tableName);
Thread.sleep(5000); // waiting to see if more records are being added
rs = countStatement.executeQuery(countAddedRecordsQuery);
rs.next();
assertEquals(count, rs.getInt(1));
// The dataset is too small so all the data might already be ingested
// assertTrue(count <= totalRecordsToInsert, "No new records were added
// following the cancellation");
rs.close();
private void verifyThatNoMoreRecordsAreAdded(Connection connection, String tableName, long insertTime) throws SQLException, InterruptedException {
// Get number of rows in the table. Do it several times until we get something. Wait for 10% of time that spent to fill the table.
// We need several attempts because this DB does not support transactions, so sometimes it takes time until the
// data is available.
long waitForResultTime = insertTime / 2;
long waitForResultDelay = waitForResultTime / 10;
log.info("verifyThatNoMoreRecordsAreAdded insertTime={}, waitForResultTime={}", insertTime, waitForResultTime);
int count0;
int i = 0;
for (count0 = count(connection, tableName); i < 10; count0 = count(connection, tableName), i++) {
log.info("verifyThatNoMoreRecordsAreAdded count0={}", count0);
if (count0 > 0) {
break;
}
Thread.sleep(waitForResultDelay);
}

// Wait for more time that we spent to fill the table.
// We want to wait enough to give a chance to the query to fill more data.
Thread.sleep(insertTime); // waiting to see if more records are being added
int count1 = count(connection, tableName);
Thread.sleep(insertTime); // waiting to see if more records are being added
int count2 = count(connection, tableName);
log.info("verifyThatNoMoreRecordsAreAdded count1={}, count2={}", count1, count2);
assertEquals(count1, count2);
}

private void verifyThatSecondStatementWasNotExecuted(Connection connection, String tableName) throws SQLException {
private int count(Connection connection, String tableName) throws SQLException {
String countAddedRecordsQuery = String.format("SELECT COUNT(*) FROM %s", tableName);
try (Statement countStatement = connection.createStatement()) {
ResultSet rs = countStatement.executeQuery(countAddedRecordsQuery);
rs.next();
assertEquals(0, rs.getInt(1));
try (Statement countStatement = connection.createStatement(); ResultSet rs = countStatement.executeQuery(countAddedRecordsQuery)) {
return rs.next() ? rs.getInt(1) : 0;
}
}

private void verifyThatSecondStatementWasNotExecuted(Connection connection, String tableName) throws SQLException {
assertEquals(0, count(connection, tableName));
}

/**
* Extract table name when non-standard sql is used
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class SystemEngineTest extends IntegrationTest {

private static final String DATABASE_NAME = "jdbc_system_engine_integration_test";
private static final String ENGINE_NAME = "jdbc_system_engine_integration_test_engine";
private static final String ENGINE_NEW_NAME = "jdbc_system_engine_integration_test_engine_2";
private static final long ID = ProcessHandle.current().pid() + System.currentTimeMillis();
private static final String DATABASE_NAME = "jdbc_system_engine_integration_test_" + ID;
private static final String ENGINE_NAME = DATABASE_NAME + "_engine";
private static final String ENGINE_NEW_NAME = ENGINE_NAME + "_2";

@BeforeAll
void beforeAll() {
Expand All @@ -56,7 +57,7 @@ void afterAll() {

@Test
void shouldSelect1() throws SQLException {
try (Connection connection = createConnection(getSystemEngineName());
try (Connection connection = createConnection();
ResultSet rs = connection.createStatement().executeQuery("SELECT 1")) {
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
DROP TABLE IF EXISTS "ex_lineitem" CASCADE;
DROP TABLE IF EXISTS "first_statement_cancel_test" CASCADE;
DROP TABLE IF EXISTS "second_statement_cancel_test" CASCADE;
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
DROP TABLE IF EXISTS "ex_lineitem" CASCADE;
DROP TABLE IF EXISTS "first_statement_cancel_test" CASCADE;
DROP TABLE IF EXISTS "second_statement_cancel_test" CASCADE;
CREATE
EXTERNAL TABLE IF NOT EXISTS ex_lineitem ( l_orderkey LONG, l_partkey LONG, l_suppkey LONG, l_linenumber INT, l_quantity LONG, l_extendedprice LONG, l_discount LONG, l_tax LONG, l_returnflag TEXT, l_linestatus TEXT, l_shipdate TEXT, l_commitdate TEXT, l_receiptdate TEXT, l_shipinstruct TEXT, l_shipmode TEXT, l_comment TEXT)URL = 's3://firebolt-publishing-public/samples/tpc-h/parquet/lineitem/'OBJECT_PATTERN = '*.parquet'TYPE = (PARQUET);
CREATE
FACT TABLE IF NOT EXISTS first_statement_cancel_test ( l_orderkey LONG, l_partkey LONG, l_suppkey LONG, l_linenumber INT, l_quantity LONG, l_extendedprice LONG, l_discount LONG, l_tax LONG, l_returnflag TEXT, l_linestatus TEXT, l_shipdate TEXT, l_commitdate TEXT, l_receiptdate TEXT, l_shipinstruct TEXT, l_shipmode TEXT, l_comment TEXT ) PRIMARY INDEX l_orderkey, l_linenumber;
CREATE
FACT TABLE IF NOT EXISTS second_statement_cancel_test ( l_orderkey LONG, l_partkey LONG, l_suppkey LONG, l_linenumber INT, l_quantity LONG, l_extendedprice LONG, l_discount LONG, l_tax LONG, l_returnflag TEXT, l_linestatus TEXT, l_shipdate TEXT, l_commitdate TEXT, l_receiptdate TEXT, l_shipinstruct TEXT, l_shipmode TEXT, l_comment TEXT ) PRIMARY INDEX l_orderkey, l_linenumber;
CREATE FACT TABLE IF NOT EXISTS ex_lineitem ( l_orderkey LONG );
CREATE FACT TABLE IF NOT EXISTS first_statement_cancel_test ( l_orderkey LONG ) PRIMARY INDEX l_orderkey;
CREATE FACT TABLE IF NOT EXISTS second_statement_cancel_test ( l_orderkey LONG ) PRIMARY INDEX l_orderkey;
27 changes: 7 additions & 20 deletions src/main/java/com/firebolt/jdbc/client/FireboltClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public abstract class FireboltClient {
private final OkHttpClient httpClient;
protected final ObjectMapper objectMapper;
private final String headerUserAgentValue;
private final FireboltConnection connection;
protected final FireboltConnection connection;

protected FireboltClient(OkHttpClient httpClient, ObjectMapper objectMapper, FireboltConnection connection, String customDrivers, String customClients) {
this.httpClient = httpClient;
Expand All @@ -68,8 +68,7 @@ protected <T> T getResource(String uri, String host, String accessToken, Class<T

private Request createGetRequest(String uri, String accessToken) {
Request.Builder requestBuilder = new Request.Builder().url(uri);
this.createHeaders(accessToken)
.forEach(header -> requestBuilder.addHeader(header.getLeft(), header.getRight()));
createHeaders(accessToken).forEach(header -> requestBuilder.addHeader(header.getLeft(), header.getRight()));
return requestBuilder.build();
}

Expand Down Expand Up @@ -104,33 +103,21 @@ private OkHttpClient getClientWithTimeouts(int connectionTimeout, int networkTim
}
}

protected Request createPostRequest(String uri, RequestBody requestBody) {
return createPostRequest(uri, requestBody, null, null);
}

protected Request createPostRequest(String uri, RequestBody body, String accessToken, String id) {
Request.Builder requestBuilder = new Request.Builder().url(uri);
this.createHeaders(accessToken)
.forEach(header -> requestBuilder.addHeader(header.getLeft(), header.getRight()));
protected Request createPostRequest(String uri, String label, RequestBody body, String accessToken) {
Request.Builder requestBuilder = new Request.Builder().url(uri).tag(label);
createHeaders(accessToken).forEach(header -> requestBuilder.addHeader(header.getLeft(), header.getRight()));
if (body != null) {
requestBuilder.post(body);
}
if (id != null) {
requestBuilder.tag(id);
}
return requestBuilder.build();
}

protected Request createPostRequest(String uri, String accessToken, String id) {
return createPostRequest(uri, (RequestBody) null, accessToken, id);
}

protected Request createPostRequest(String uri, String json, String accessToken, String id) {
protected Request createPostRequest(String uri, String label, String json, String accessToken) {
RequestBody requestBody = null;
if (json != null) {
requestBody = RequestBody.create(json, MediaType.parse("application/json"));
}
return createPostRequest(uri, requestBody, accessToken, id);
return createPostRequest(uri, label, requestBody, accessToken);
}

protected void validateResponse(String host, Response response, Boolean isCompress) throws FireboltException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public FireboltConnectionTokens postConnectionTokens(String host, String user, S
AuthenticationRequest authenticationRequest = getAuthenticationRequest(user, password, host, environment);
String uri = authenticationRequest.getUri();
log.debug("Creating connection with url {}", uri);
Request request = this.createPostRequest(uri, authenticationRequest.getRequestBody());
Request request = createPostRequest(uri, null, authenticationRequest.getRequestBody(), null);
try (Response response = this.execute(request, host)) {
String responseString = getResponseAsString(response);
FireboltAuthenticationResponse authenticationResponse = objectMapper.readValue(responseString,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,7 @@ InputStream executeSqlStatement(StatementInfoWrapper statementInfoWrapper, Fireb
/**
* Call endpoint to abort a running SQL statement
*/
void abortStatement(String id, FireboltProperties fireboltProperties) throws FireboltException;
void abortStatement(String label, FireboltProperties fireboltProperties) throws FireboltException;

/**
* Abort running HTTP request of a statement
*/
void abortRunningHttpRequest(String id) throws FireboltException;

boolean isStatementRunning(String statementId);
boolean isStatementRunning(String statementLabel);
}
Loading

0 comments on commit f4bc232

Please sign in to comment.