Skip to content

Commit

Permalink
Update Infinispan Cache Implementation to propagate Vert.x context co…
Browse files Browse the repository at this point in the history
…rrectly (#298)
  • Loading branch information
karesti authored Jul 5, 2024
1 parent 7120304 commit dca2139
Showing 1 changed file with 50 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
Expand All @@ -24,6 +25,10 @@
import io.quarkus.infinispan.client.runtime.InfinispanClientUtil;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;

/**
* This class is an internal Quarkus cache implementation using Infinispan.
Expand Down Expand Up @@ -111,6 +116,8 @@ public <K, V> Uni<V> get(K key, Function<K, V> valueLoader) {

@Override
public <K, V> Uni<V> getAsync(K key, Function<K, Uni<V>> valueLoader) {
Context context = Vertx.currentContext();

return Uni.createFrom().completionStage(CompletionStages.handleAndCompose(remoteCache.getAsync(key), (v1, ex1) -> {
if (ex1 != null) {
return CompletableFuture.failedFuture(ex1);
Expand Down Expand Up @@ -145,7 +152,49 @@ public <K, V> Uni<V> getAsync(K key, Function<K, Uni<V>> valueLoader) {
}
});
return resultAsync;
}));
})).emitOn(new Executor() {
// We need make sure we go back to the original context when the cache value is computed.
// Otherwise, we would always emit on the context having computed the value, which could
// break the duplicated context isolation.
@Override
public void execute(Runnable command) {
Context ctx = Vertx.currentContext();
if (context == null) {
// We didn't capture a context
if (ctx == null) {
// We are not on a context => we can execute immediately.
command.run();
} else {
// We are on a context.
// We cannot continue on the current context as we may share a duplicated context.
// We need a new one. Note that duplicate() does not duplicate the duplicated context,
// but the root context.
((ContextInternal) ctx).duplicate()
.runOnContext(new Handler<Void>() {
@Override
public void handle(Void ignored) {
command.run();
}
});
}
} else {
// We captured a context.
if (ctx == context) {
// We are on the same context => we can execute immediately
command.run();
} else {
// 1) We are not on a context (ctx == null) => we need to switch to the captured context.
// 2) We are on a different context (ctx != null) => we need to switch to the captured context.
context.runOnContext(new Handler<Void>() {
@Override
public void handle(Void ignored) {
command.run();
}
});
}
}
}
});
}

@Override
Expand Down

0 comments on commit dca2139

Please sign in to comment.