Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: FIR-43038 allow jdbc to send batch as a single multi statement #494

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions src/integrationTest/java/integration/tests/ConnectionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,54 @@ void validatesOnSystemEngineIfParameterProvided() throws SQLException {

}

@Test
@Tag("v2")
void preparedStatementBatchesWorkIfMergeParameterProvided() throws SQLException {
String engineName = integration.ConnectionInfo.getInstance().getEngine();
String queryLabel = "test_merge_batches_" + System.currentTimeMillis();
try (Connection connection = createConnection(engineName, Map.of("merge_prepared_statement_batches", "true"))) {
try (Statement statement = connection.createStatement()) {
statement.executeUpdate("CREATE TABLE test_table (id INT)");
try (java.sql.PreparedStatement preparedStatement = connection.prepareStatement(
String.format("/*%s*/INSERT INTO test_table VALUES (?)", queryLabel))) {
for (int i = 0; i < 10; i++) {
preparedStatement.setInt(1, i);
preparedStatement.addBatch();
}
preparedStatement.executeBatch();

}
try (ResultSet rs = statement.executeQuery("SELECT COUNT(*) FROM test_table")) {
assertTrue(rs.next());
assertEquals(10, rs.getInt(1));
}
// sleep for 10s to give QH time to get populated and avoid flakiness
// it sometime takes that long
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
// ignore
}

// Validate we've only executed one insert
String qhQuery = "SELECT count(*) from information_schema.engine_query_history WHERE status='ENDED_SUCCESSFULLY' " +
String.format("AND lower(query_text) like '/*%s*/insert into %%'", queryLabel);
System.out.println(qhQuery);
try (java.sql.PreparedStatement preparedStatement = connection.prepareStatement(qhQuery)) {
try (ResultSet rs = preparedStatement.executeQuery()) {
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
}
}

} finally {
try (Statement statement = connection.createStatement()) {
statement.executeUpdate("DROP TABLE IF EXISTS test_table");
}
}
}
}

void unsuccessfulConnect(boolean useDatabase, boolean useEngine) throws SQLException {
ConnectionInfo params = integration.ConnectionInfo.getInstance();
String url = getJdbcUrl(params, useDatabase, useEngine);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public class FireboltProperties {
private final String userClients;
private final String accessToken;
private final boolean validateOnSystemEngine;
private final boolean mergePreparedStatementBatches;
@Builder.Default
private Map<String, String> initialAdditionalProperties = new HashMap<>();
@Builder.Default
Expand Down Expand Up @@ -112,6 +113,7 @@ public FireboltProperties(Properties properties) {
userDrivers = getSetting(properties, FireboltSessionProperty.USER_DRIVERS);
userClients = getSetting(properties, FireboltSessionProperty.USER_CLIENTS);
validateOnSystemEngine = getSetting(properties, FireboltSessionProperty.VALIDATE_ON_SYSTEM_ENGINE);
mergePreparedStatementBatches = getSetting(properties, FireboltSessionProperty.MERGE_PREPARED_STATEMENT_BATCHES);

environment = getEnvironment(configuredEnvironment, properties);
host = getHost(configuredEnvironment, properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public enum FireboltSessionProperty {
VALIDATE_ON_SYSTEM_ENGINE("validate_on_system_engine", false, Boolean.class,
"Whether to validate the connection on the system engine or not. By default validates on an engine currently connected.",
FireboltProperties::isValidateOnSystemEngine),
MERGE_PREPARED_STATEMENT_BATCHES("merge_prepared_statement_batches", false, Boolean.class,
"Whether to send prepared statement batches as a single statement. By default, they are sent one by one.", FireboltProperties::isMergePreparedStatementBatches),
// We keep all the deprecated properties to ensure backward compatibility - but
// they do not have any effect.
@Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
public class FireboltStatement extends JdbcBase implements Statement {

private final FireboltStatementService statementService;
private final FireboltProperties sessionProperties;
protected final FireboltProperties sessionProperties;
private final FireboltConnection connection;
private final Collection<String> statementsToExecuteLabels = new HashSet<>();
private boolean closeOnCompletion = false;
Expand Down Expand Up @@ -436,7 +436,7 @@ public int[] executeBatch() throws SQLException {
result.add(rs.map(x -> 0).orElse(SUCCESS_NO_INFO));
}
}
return result.stream().mapToInt(Integer::intValue).toArray();
return result.stream().mapToInt(Integer::intValue).toArray();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,18 +266,34 @@ public void setArray(int parameterIndex, Array x) throws SQLException {
public int[] executeBatch() throws SQLException {
validateStatementIsNotClosed();
log.debug("Executing batch for statement: {}", rawStatement);
List<StatementInfoWrapper> inserts = new ArrayList<>();
List<StatementInfoWrapper> statements = new ArrayList<>();
int[] result = new int[rows.size()];
for (Map<Integer, String> row : rows) {
inserts.addAll(prepareSQL(row));
statements.addAll(prepareSQL(row));
}
execute(inserts);
for (int i = 0; i < inserts.size(); i++) {
if (sessionProperties.isMergePreparedStatementBatches()) {
if (!statements.isEmpty()) {
execute(List.of(asSingleStatement(statements)));
}
} else {
execute(statements);
}
for (int i = 0; i < statements.size(); i++) {
result[i] = SUCCESS_NO_INFO;
}
return result;
}

private StatementInfoWrapper asSingleStatement(List<StatementInfoWrapper> queries) {
// merge all queries into a single query, separated by semicolons
StringBuilder sb = new StringBuilder();
var first = queries.get(0);
for (StatementInfoWrapper query : queries) {
sb.append(query.getSql()).append(";");
}
return new StatementInfoWrapper(sb.toString(), first.getInitialStatement().getStatementType(), first.getParam(), first.getInitialStatement());
}

@Override
@NotImplemented
public int executeUpdate(String sql) throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;

@ExtendWith(MockitoExtension.class)
abstract class FireboltConnectionTest {
Expand Down Expand Up @@ -373,6 +369,28 @@ void shouldForceSystemEngineWhenValidateOnSystemEngineIsSet() throws SQLExceptio
}
}

@Test
void shouldSendBatchesInSingleQueryWhenMergeBatchesIsSet() throws SQLException {
when(fireboltStatementService.execute(any(), any(), any()))
.thenReturn(Optional.empty());
Properties propertiesCopy = new Properties(connectionProperties);
propertiesCopy.put("merge_prepared_statement_batches", "true");
try (FireboltConnection fireboltConnection = createConnection(URL, propertiesCopy)) {
fireboltConnection.createStatement().execute("SET param=value");
PreparedStatement statement = fireboltConnection.prepareStatement("INSERT INTO t VALUES (?)");
statement.setInt(1, 1);
statement.addBatch();
statement.setInt(1, 2);
statement.addBatch();
statement.executeBatch();
verify(fireboltStatementService, atLeast(2)).execute(queryInfoWrapperArgumentCaptor.capture(),
propertiesArgumentCaptor.capture(), any());
assertEquals("INSERT INTO t VALUES (1);INSERT INTO t VALUES (2);", queryInfoWrapperArgumentCaptor.getValue().getSql());
// Validate that parameters are preserved
assertEquals(Map.of("param", "value"), propertiesArgumentCaptor.getValue().getAdditionalProperties());
}
}

@Test
void shouldIgnore429WhenValidatingConnection() throws SQLException {
when(fireboltStatementService.execute(any(), any(), any()))
Expand Down Expand Up @@ -738,5 +756,16 @@ void shouldValidateOnUserEngineByDefault() throws SQLException {
}
}

@Test
void shouldSendBatchesSeparatelyByDefault() throws SQLException {
try (FireboltConnection fireboltConnection = createConnection(URL, connectionProperties)) {
assertEquals("false", fireboltConnection.getClientInfo().get("merge_prepared_statement_batches"));
}
connectionProperties.put("merge_prepared_statement_batches", "true");
try (FireboltConnection fireboltConnection = createConnection(URL, connectionProperties)) {
assertEquals("true", fireboltConnection.getClientInfo().get("merge_prepared_statement_batches"));
}
}

protected abstract FireboltConnection createConnection(String url, Properties props) throws SQLException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ void shouldHaveAllTheSpecifiedCustomProperties() {
properties.put("someCustomProperties", "custom_value");
properties.put("compress", "1");
properties.put("validate_on_system_engine", "true");
properties.put("merge_prepared_statement_batches", "true");

Map<String, String> customProperties = new HashMap<>();
customProperties.put("someCustomProperties", "custom_value");
Expand All @@ -59,7 +60,7 @@ void shouldHaveAllTheSpecifiedCustomProperties() {
.initialAdditionalProperties(customProperties).keepAliveTimeoutMillis(300000)
.maxConnectionsTotal(300).maxRetries(3).socketTimeoutMillis(20).connectionTimeoutMillis(60000)
.tcpKeepInterval(30).tcpKeepIdle(60).tcpKeepCount(10).environment("app").validateOnSystemEngine(true)
.build();
.mergePreparedStatementBatches(true).build();
assertEquals(expectedDefaultProperties, new FireboltProperties(properties));
}

Expand Down
Loading