forked from hibernate/hibernate-reactive
-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[hibernate#1436] Test behaviour on cancel signal
- Loading branch information
Showing
3 changed files
with
185 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
177 changes: 177 additions & 0 deletions
177
hibernate-reactive-core/src/test/java/org/hibernate/reactive/CancelSignalTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,177 @@ | ||
/* Hibernate, Relational Persistence for Idiomatic Java | ||
* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* Copyright: Red Hat Inc. and Hibernate Authors | ||
*/ | ||
package org.hibernate.reactive; | ||
|
||
import java.util.Collection; | ||
import java.util.List; | ||
import java.util.Objects; | ||
import java.util.Queue; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.ConcurrentLinkedQueue; | ||
import java.util.concurrent.CountDownLatch; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Executors; | ||
import java.util.stream.IntStream; | ||
|
||
import org.junit.jupiter.api.Test; | ||
|
||
import org.jboss.logging.Logger; | ||
|
||
import io.micrometer.core.instrument.Metrics; | ||
import io.smallrye.mutiny.subscription.Cancellable; | ||
import io.vertx.junit5.VertxTestContext; | ||
import jakarta.persistence.Entity; | ||
import jakarta.persistence.Id; | ||
import jakarta.persistence.Table; | ||
|
||
import static java.util.Arrays.stream; | ||
import static java.util.concurrent.CompletableFuture.allOf; | ||
import static java.util.concurrent.CompletableFuture.runAsync; | ||
import static java.util.stream.Stream.concat; | ||
import static org.assertj.core.api.Assertions.assertThat; | ||
|
||
public class CancelSignalTest extends BaseReactiveTest { | ||
private static final Logger LOG = Logger.getLogger( CancelSignalTest.class ); | ||
|
||
@Override | ||
protected Collection<Class<?>> annotatedEntities() { | ||
return List.of( GuineaPig.class ); | ||
} | ||
|
||
@Test | ||
public void cleanupConnectionWhenCancelSignal(VertxTestContext context) { | ||
// larger than 'sql pool size' to check entering the 'pool waiting queue' | ||
int executeSize = 10; | ||
CountDownLatch firstSessionWaiter = new CountDownLatch( 1 ); | ||
Queue<Cancellable> cancellableQueue = new ConcurrentLinkedQueue<>(); | ||
|
||
ExecutorService withSessionExecutor = Executors.newFixedThreadPool( executeSize ); | ||
// Create some jobs that are going to be cancelled asynchronously | ||
CompletableFuture[] withSessionFutures = IntStream | ||
.range( 0, executeSize ) | ||
.mapToObj( i -> runAsync( | ||
() -> { | ||
CountDownLatch countDownLatch = new CountDownLatch( 1 ); | ||
Cancellable cancellable = getMutinySessionFactory() | ||
.withSession( s -> { | ||
LOG.debug( "start withSession: " + i ); | ||
sleep( 100 ); | ||
firstSessionWaiter.countDown(); | ||
return s.find( GuineaPig.class, 1 ); | ||
} ) | ||
.onTermination().invoke( () -> { | ||
countDownLatch.countDown(); | ||
LOG.debug( "future " + i + " terminated" ); | ||
} ) | ||
.subscribe().with( item -> LOG.debug( "end withSession: " + i ) ); | ||
cancellableQueue.add( cancellable ); | ||
await( countDownLatch ); | ||
}, | ||
withSessionExecutor | ||
) ) | ||
.toArray( CompletableFuture[]::new ); | ||
|
||
// Create jobs that are going to cancel the previous ones | ||
ExecutorService cancelExecutor = Executors.newFixedThreadPool( executeSize ); | ||
CompletableFuture[] cancelFutures = IntStream | ||
.range( 0, executeSize ) | ||
.mapToObj( i -> runAsync( | ||
() -> { | ||
await( firstSessionWaiter ); | ||
cancellableQueue.poll().cancel(); | ||
sleep( 500 ); | ||
}, | ||
cancelExecutor | ||
) ) | ||
.toArray( CompletableFuture[]::new ); | ||
|
||
CompletableFuture<Void> allFutures = allOf( concat( stream( withSessionFutures ), stream( cancelFutures ) ) | ||
.toArray( CompletableFuture[]::new ) | ||
); | ||
|
||
// Test that there shouldn't be any pending process | ||
test( context, allFutures.thenAccept( x -> assertThat( sqlPendingMetric() ).isEqualTo( 0.0 ) ) ); | ||
} | ||
|
||
private static double sqlPendingMetric() { | ||
return Metrics.globalRegistry.find( "vertx.sql.processing.pending" ) | ||
.gauge() | ||
.value(); | ||
} | ||
|
||
private static void await(CountDownLatch latch) { | ||
try { | ||
latch.await(); | ||
} | ||
catch (InterruptedException e) { | ||
throw new RuntimeException( e ); | ||
} | ||
} | ||
|
||
private static void sleep(int millis) { | ||
try { | ||
// Add sleep to create a test that delays processing | ||
Thread.sleep( millis ); | ||
} | ||
catch (InterruptedException e) { | ||
throw new RuntimeException( e ); | ||
} | ||
} | ||
|
||
@Entity(name = "GuineaPig") | ||
@Table(name = "Pig") | ||
public static class GuineaPig { | ||
@Id | ||
private Integer id; | ||
private String name; | ||
|
||
public GuineaPig() { | ||
} | ||
|
||
public GuineaPig(Integer id, String name) { | ||
this.id = id; | ||
this.name = name; | ||
} | ||
|
||
public Integer getId() { | ||
return id; | ||
} | ||
|
||
public void setId(Integer id) { | ||
this.id = id; | ||
} | ||
|
||
public String getName() { | ||
return name; | ||
} | ||
|
||
public void setName(String name) { | ||
this.name = name; | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return id + ": " + name; | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) { | ||
if ( this == o ) { | ||
return true; | ||
} | ||
if ( o == null || getClass() != o.getClass() ) { | ||
return false; | ||
} | ||
GuineaPig guineaPig = (GuineaPig) o; | ||
return Objects.equals( name, guineaPig.name ); | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash( name ); | ||
} | ||
} | ||
} |