Skip to content

Commit

Permalink
[hibernate#1436] WIP fix fail test
Browse files Browse the repository at this point in the history
  • Loading branch information
namjug-kim committed Jan 9, 2023
1 parent 1b15dfe commit 104c081
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import javax.persistence.criteria.CriteriaBuilder;
import javax.persistence.metamodel.Metamodel;

import io.vertx.core.Vertx;
import org.hibernate.Cache;
import org.hibernate.internal.SessionCreationOptions;
import org.hibernate.internal.SessionFactoryImpl;
Expand Down Expand Up @@ -127,6 +128,7 @@ public Uni<Mutiny.Session> openSession(String tenantId) {
*/
private <S> Uni<S> create(ReactiveConnection connection, Supplier<S> supplier) {
return Uni.createFrom().item( supplier )
.onCancellation().call( () -> Uni.createFrom().completionStage( connection.close() ) )
.onFailure().call( () -> Uni.createFrom().completionStage( connection.close() ) );
}

Expand Down Expand Up @@ -247,15 +249,29 @@ public <T> Uni<T> withStatelessSession(String tenantId, Function<Mutiny.Stateles
}
}

private void runOnVertxContext(io.vertx.core.Context vertxContext, Runnable runnable) {
io.vertx.core.Context currentContext = Vertx.currentContext();
if (currentContext != null) {
runnable.run();
} else {
vertxContext.runOnContext(x -> runnable.run());
}
}

private<S extends Mutiny.Closeable, T> Uni<T> withSession(
Uni<S> sessionUni,
Function<S, Uni<T>> work,
Context.Key<S> contextKey) {
return sessionUni.chain( session -> Uni.createFrom().voidItem()
.invoke( () -> context.put( contextKey, session ) )
.chain( () -> work.apply( session ) )
.eventually( () -> context.remove( contextKey ) )
.eventually(session::close)
return sessionUni.chain( session -> {
io.vertx.core.Context currentVertxContext = Vertx.currentContext();

return Uni.createFrom().voidItem()
.invoke(() -> context.put(contextKey, session))
.chain(() -> work.apply(session))
// when this callback is invoked following a cancellation, it will be called on the thread having called cancel().
.onTermination().invoke(() -> runOnVertxContext(currentVertxContext, () -> context.remove(contextKey)))
.onTermination().call(session::close);
}
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
*/
package org.hibernate.reactive.pool.impl;

import java.util.concurrent.CompletionStage;

import io.vertx.sqlclient.Pool;
import io.vertx.sqlclient.SqlConnection;
import org.hibernate.engine.jdbc.spi.SqlStatementLogger;
import org.hibernate.reactive.pool.ReactiveConnection;
import org.hibernate.reactive.pool.ReactiveConnectionPool;

import io.vertx.sqlclient.Pool;
import io.vertx.sqlclient.SqlConnection;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicReference;

/**
* A pool of reactive connections backed by a supplier of
Expand Down Expand Up @@ -71,11 +71,20 @@ public CompletionStage<ReactiveConnection> getConnection(String tenantId) {
}

private CompletionStage<ReactiveConnection> getConnectionFromPool(Pool pool) {
return pool.getConnection()
.toCompletionStage().thenApply( this::newConnection );
AtomicReference<CompletionStage<ReactiveConnection>> resultReference = new AtomicReference<>();
CompletionStage<ReactiveConnection> reactiveConnectionCompletableFuture = pool.getConnection()
.map(this::newConnection)
.onComplete(handler -> {
if (handler.result() != null && resultReference.get() != null && resultReference.get().toCompletableFuture().isCancelled()) {
handler.result().close();
}
})
.toCompletionStage();
resultReference.set(reactiveConnectionCompletableFuture);
return reactiveConnectionCompletableFuture;
}

private SqlClientConnection newConnection(SqlConnection connection) {
private ReactiveConnection newConnection(SqlConnection connection) {
return new SqlClientConnection( connection, getPool(), getSqlStatementLogger() );
}

Expand Down

0 comments on commit 104c081

Please sign in to comment.