Skip to content

Commit

Permalink
[#1909] Run queries for schema creation using the pool
Browse files Browse the repository at this point in the history
Before we were creating a connection and then ignoring it for each
query required to update the schema or collect metatada.

Now the method for running queries outside the "current" transaction
is in the SqlClientPool.
  • Loading branch information
DavideD authored and yrodiere committed Jun 3, 2024
1 parent 3417888 commit 3ace16f
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import java.util.List;
import java.util.concurrent.CompletionStage;


import org.hibernate.reactive.adaptor.impl.ResultSetAdaptor;

import io.vertx.sqlclient.spi.DatabaseMetadata;
Expand Down Expand Up @@ -192,11 +191,6 @@ public CompletionStage<ResultSet> selectJdbc(String sql, Object[] paramValues) {
: delegate.selectJdbc( sql, paramValues );
}

@Override
public CompletionStage<ResultSet> selectJdbcOutsideTransaction(String sql, Object[] paramValues) {
return delegate.selectJdbcOutsideTransaction( sql, paramValues );
}

public <T> CompletionStage<T> selectIdentifier(String sql, Object[] paramValues, Class<T> idClass) {
// Do not want to execute the batch here
// because we want to be able to select
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,23 +60,6 @@ interface Expectation {

CompletionStage<ResultSet> selectJdbc(String sql, Object[] paramValues);

/**
* This method is intended to be used only for queries returning
* a ResultSet that must be executed outside of any "current"
* transaction (i.e with autocommit=true).
* <p/>
* For example, it would be appropriate to use this method when
* performing queries on information_schema or system tables in
* order to obtain metadata information about catalogs, schemas,
* tables, etc.
*
* @param sql - the query to execute outside of a transaction
* @param paramValues - a non-null array of parameter values
*
* @return the CompletionStage<ResultSet> from executing the query.
*/
CompletionStage<ResultSet> selectJdbcOutsideTransaction(String sql, Object[] paramValues);

<T> CompletionStage<T> insertAndSelectIdentifier(String sql, Object[] paramValues, Class<T> idClass, String idColumnName);
CompletionStage<ResultSet> insertAndSelectIdentifierAsResultSet(String sql, Object[] paramValues, Class<?> idClass, String idColumnName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
import org.hibernate.reactive.provider.ReactiveServiceRegistryBuilder;
import org.hibernate.service.Service;

import java.sql.ResultSet;
import java.util.concurrent.CompletionStage;


/**
* A Hibernate {@link Service} that provides access to pooled
* {@link ReactiveConnection reactive connections}.
Expand Down Expand Up @@ -63,6 +65,8 @@ public interface ReactiveConnectionPool extends Service {
*/
CompletionStage<ReactiveConnection> getConnection(String tenantId, SqlExceptionHelper sqlExceptionHelper);

CompletionStage<ResultSet> selectJdbcOutsideTransaction(String sql, Object[] paramValues);

/**
* The shutdown of the pool is actually asynchronous but the
* core service registry won't return the {@link CompletionStage}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,26 @@
*/
package org.hibernate.reactive.pool.impl;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Objects;
import java.util.concurrent.CompletionStage;

import org.hibernate.engine.jdbc.internal.FormatStyle;
import org.hibernate.engine.jdbc.spi.SqlExceptionHelper;
import org.hibernate.engine.jdbc.spi.SqlStatementLogger;
import org.hibernate.reactive.adaptor.impl.ResultSetAdaptor;
import org.hibernate.reactive.mutiny.Mutiny;
import org.hibernate.reactive.stage.Stage;
import org.hibernate.reactive.util.impl.CompletionStages;

import io.vertx.sqlclient.DatabaseException;
import io.vertx.sqlclient.Pool;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;
import io.vertx.sqlclient.Tuple;

import static org.hibernate.reactive.util.impl.CompletionStages.rethrow;

/**
* A pool of reactive connections backed by a Vert.x {@link Pool}.
Expand Down Expand Up @@ -82,4 +93,41 @@ public SqlExceptionHelper getSqlExceptionHelper() {
public CompletionStage<Void> getCloseFuture() {
return CompletionStages.voidFuture();
}


@Override
public CompletionStage<ResultSet> selectJdbcOutsideTransaction(String sql, Object[] paramValues) {
return preparedQueryOutsideTransaction( sql, Tuple.wrap( paramValues ) )
.thenApply( ResultSetAdaptor::new );
}

public CompletionStage<RowSet<Row>> preparedQueryOutsideTransaction(String sql, Tuple parameters) {
feedback( sql );
return getPool().preparedQuery( sql ).execute( parameters ).toCompletionStage()
.handle( (rows, throwable) -> convertException( rows, sql, throwable ) );
}

/**
* Similar to {@link org.hibernate.exception.internal.SQLExceptionTypeDelegate#convert(SQLException, String, String)}
*/
private <T> T convertException(T rows, String sql, Throwable sqlException) {
if ( sqlException == null ) {
return rows;
}
if ( sqlException instanceof DatabaseException ) {
DatabaseException de = (DatabaseException) sqlException;
sqlException = sqlExceptionHelper
.convert( new SQLException( de.getMessage(), de.getSqlState(), de.getErrorCode() ), "error executing SQL statement", sql );
}
return rethrow( sqlException );
}

private void feedback(String sql) {
Objects.requireNonNull( sql, "SQL query cannot be null" );
// DDL already gets formatted by the client, so don't reformat it
FormatStyle formatStyle = sqlStatementLogger.isFormat() && !sql.contains( System.lineSeparator() )
? FormatStyle.BASIC
: FormatStyle.NONE;
sqlStatementLogger.logStatement( sql, formatStyle.getFormatter() );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,6 @@ public CompletionStage<ResultSet> selectJdbc(String sql, Object[] paramValues) {
.thenApply( ResultSetAdaptor::new );
}

@Override
public CompletionStage<ResultSet> selectJdbcOutsideTransaction(String sql, Object[] paramValues) {
return preparedQueryOutsideTransaction( sql, Tuple.wrap( paramValues ) )
.thenApply( ResultSetAdaptor::new );
}

@Override
public CompletionStage<Void> execute(String sql) {
return preparedQuery( sql )
Expand Down Expand Up @@ -278,12 +272,6 @@ public CompletionStage<RowSet<Row>> preparedQueryOutsideTransaction(String sql)
.handle( (rows, throwable) -> convertException( rows, sql, throwable ) );
}

public CompletionStage<RowSet<Row>> preparedQueryOutsideTransaction(String sql, Tuple parameters) {
feedback( sql );
return pool.preparedQuery( sql ).execute( parameters ).toCompletionStage()
.handle( (rows, throwable) -> convertException( rows, sql, throwable ) );
}

private void feedback(String sql) {
Objects.requireNonNull( sql, "SQL query cannot be null" );
// DDL already gets formatted by the client, so don't reformat it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,29 @@
*/
package org.hibernate.reactive.pool.impl;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;

import org.hibernate.engine.jdbc.internal.FormatStyle;
import org.hibernate.engine.jdbc.spi.SqlExceptionHelper;
import org.hibernate.engine.jdbc.spi.SqlStatementLogger;
import org.hibernate.reactive.adaptor.impl.ResultSetAdaptor;
import org.hibernate.reactive.pool.ReactiveConnection;
import org.hibernate.reactive.pool.ReactiveConnectionPool;

import io.vertx.core.Future;
import io.vertx.sqlclient.DatabaseException;
import io.vertx.sqlclient.Pool;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;
import io.vertx.sqlclient.SqlConnection;
import io.vertx.sqlclient.Tuple;

import static org.hibernate.reactive.util.impl.CompletionStages.rethrow;

/**
* A pool of reactive connections backed by a supplier of
Expand Down Expand Up @@ -99,6 +110,56 @@ private CompletionStage<ReactiveConnection> getConnectionFromPool(Pool pool, Sql
);
}

/**
* This method is intended to be used only for queries returning
* a ResultSet that must be executed outside any "current"
* transaction (i.e. with autocommit=true).
* <p/>
* For example, it would be appropriate to use this method when
* performing queries on information_schema or system tables in
* order to obtain metadata information about catalogs, schemas,
* tables, etc.
*
* @param sql - the query to execute outside a transaction
* @param paramValues - a non-null array of parameter values
*
* @return the CompletionStage<ResultSet> from executing the query.
*/
public CompletionStage<ResultSet> selectJdbcOutsideTransaction(String sql, Object[] paramValues) {
return preparedQueryOutsideTransaction( sql, Tuple.wrap( paramValues ) )
.thenApply( ResultSetAdaptor::new );
}

private CompletionStage<RowSet<Row>> preparedQueryOutsideTransaction(String sql, Tuple parameters) {
feedback( sql );
return getPool().preparedQuery( sql ).execute( parameters ).toCompletionStage()
.handle( (rows, throwable) -> convertException( rows, sql, throwable ) );
}

/**
* Similar to {@link org.hibernate.exception.internal.SQLExceptionTypeDelegate#convert(SQLException, String, String)}
*/
private <T> T convertException(T rows, String sql, Throwable sqlException) {
if ( sqlException == null ) {
return rows;
}
if ( sqlException instanceof DatabaseException ) {
DatabaseException de = (DatabaseException) sqlException;
sqlException = getSqlExceptionHelper()
.convert( new SQLException( de.getMessage(), de.getSqlState(), de.getErrorCode() ), "error executing SQL statement", sql );
}
return rethrow( sqlException );
}

private void feedback(String sql) {
Objects.requireNonNull( sql, "SQL query cannot be null" );
// DDL already gets formatted by the client, so don't reformat it
FormatStyle formatStyle = getSqlStatementLogger().isFormat() && !sql.contains( System.lineSeparator() )
? FormatStyle.BASIC
: FormatStyle.NONE;
getSqlStatementLogger().logStatement( sql, formatStyle.getFormatter() );
}

/**
* @param onCancellation invoke when converted {@link java.util.concurrent.CompletionStage} cancellation.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,10 @@
import java.util.Calendar;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;

import org.hibernate.boot.model.relational.SqlStringGenerationContext;
import org.hibernate.engine.jdbc.env.spi.JdbcEnvironment;
import org.hibernate.reactive.pool.ReactiveConnection;
import org.hibernate.reactive.pool.ReactiveConnectionPool;
import org.hibernate.reactive.pool.impl.Parameters;
import org.hibernate.resource.transaction.spi.DdlTransactionIsolator;
Expand All @@ -48,11 +46,10 @@
import org.hibernate.tool.schema.internal.exec.JdbcContext;

import static org.hibernate.reactive.util.impl.CompletionStages.logSqlException;
import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture;

public class ReactiveImprovedExtractionContextImpl extends ImprovedExtractionContextImpl {

private final ReactiveConnectionPool service;
private final ReactiveConnectionPool connectionPool;

public ReactiveImprovedExtractionContextImpl(
ServiceRegistry registry,
Expand All @@ -65,54 +62,42 @@ public ReactiveImprovedExtractionContextImpl(
NoopDdlTransactionIsolator.INSTANCE,
databaseObjectAccess
);
service = registry.getService( ReactiveConnectionPool.class );
connectionPool = registry.getService( ReactiveConnectionPool.class );
}

@Override
public <T> T getQueryResults(
String queryString,
Object[] positionalParameters,
ResultSetProcessor<T> resultSetProcessor) throws SQLException {

final CompletionStage<ReactiveConnection> connectionStage = service.getConnection();

try (final ResultSet resultSet = getQueryResultSet( queryString, positionalParameters, connectionStage )) {
try (final ResultSet resultSet = getQueryResultSet( queryString, positionalParameters )) {
return resultSetProcessor.process( resultSet );
}
finally {
// This method doesn't return a reactive type, so we start closing the connection and ignore the result
connectionStage
.handle( ReactiveImprovedExtractionContextImpl::ignoreException )
.thenCompose( ReactiveImprovedExtractionContextImpl::closeConnection );

}
}

private static ReactiveConnection ignoreException(ReactiveConnection reactiveConnection, Throwable throwable) {
return reactiveConnection;
}

private static CompletionStage<Void> closeConnection(ReactiveConnection connection) {
// Avoid NullPointerException if we couldn't create a connection
return connection != null ? connection.close() : voidFuture();
}

private ResultSet getQueryResultSet(
String queryString,
Object[] positionalParameters,
CompletionStage<ReactiveConnection> connectionStage) {
Object[] positionalParameters) {
final Object[] parametersToUse = positionalParameters != null ? positionalParameters : new Object[0];
final Parameters parametersDialectSpecific = Parameters.instance(
getJdbcEnvironment().getDialect()
);
final Parameters parametersDialectSpecific = Parameters.instance( getJdbcEnvironment().getDialect() );
final String queryToUse = parametersDialectSpecific.process( queryString, parametersToUse.length );
return connectionStage.thenCompose( c -> c.selectJdbcOutsideTransaction( queryToUse, parametersToUse ) )
return connectionPool
// DDL needs to run outside the current transaction. For example:
// - increment on a table-based id generator should happen outside the current tx.
// - not all databases support transactional DDL
.selectJdbcOutsideTransaction( queryToUse, parametersToUse )
.whenComplete( (resultSet, err) -> logSqlException( err, () -> "could not execute query ", queryToUse ) )
.thenApply(ResultSetWorkaround::new)
.thenApply( ResultSetWorkaround::new )
// During schema migration, errors are ignored
.handle( ReactiveImprovedExtractionContextImpl::ignoreException )
.toCompletableFuture()
.join();
}

private static <T> T ignoreException(T result, Throwable throwable) {
return result;
}

private static class NoopDdlTransactionIsolator implements DdlTransactionIsolator {
static final NoopDdlTransactionIsolator INSTANCE = new NoopDdlTransactionIsolator();

Expand Down

0 comments on commit 3ace16f

Please sign in to comment.