Skip to content

Commit

Permalink
Merge pull request #41647 from gwenneg/41081
Browse files Browse the repository at this point in the history
Handle duplicated Vert.x context in CaffeineCacheImpl
  • Loading branch information
geoand committed Jul 5, 2024
2 parents 1616dbe + dca2139 commit 2e007d2
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 50 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.quarkus.cache.runtime;

import java.time.Duration;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.function.Supplier;

Expand All @@ -17,10 +16,6 @@
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.TimeoutException;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;

@CacheResult(cacheName = "") // The `cacheName` attribute is @Nonbinding.
@Interceptor
Expand Down Expand Up @@ -58,7 +53,6 @@ public Object intercept(InvocationContext invocationContext) throws Throwable {
try {
ReturnType returnType = determineReturnType(invocationContext.getMethod().getReturnType());
if (returnType != ReturnType.NonAsync) {
Context context = Vertx.currentContext();
Uni<Object> cacheValue = cache.getAsync(key, new Function<Object, Uni<Object>>() {
@SuppressWarnings("unchecked")
@Override
Expand All @@ -76,48 +70,6 @@ public Uni<Object> apply(Object key) {
public Uni<?> apply(Throwable throwable) {
return cache.invalidate(key).replaceWith(throwable);
}
}).emitOn(new Executor() {
// We need make sure we go back to the original context when the cache value is computed.
// Otherwise, we would always emit on the context having computed the value, which could
// break the duplicated context isolation.
@Override
public void execute(Runnable command) {
Context ctx = Vertx.currentContext();
if (context == null) {
// We didn't capture a context
if (ctx == null) {
// We are not on a context => we can execute immediately.
command.run();
} else {
// We are on a context.
// We cannot continue on the current context as we may share a duplicated context.
// We need a new one. Note that duplicate() does not duplicate the duplicated context,
// but the root context.
((ContextInternal) ctx).duplicate()
.runOnContext(new Handler<Void>() {
@Override
public void handle(Void ignored) {
command.run();
}
});
}
} else {
// We captured a context.
if (ctx == context) {
// We are on the same context => we can execute immediately
command.run();
} else {
// 1) We are not on a context (ctx == null) => we need to switch to the captured context.
// 2) We are on a different context (ctx != null) => we need to switch to the captured context.
context.runOnContext(new Handler<Void>() {
@Override
public void handle(Void ignored) {
command.run();
}
});
}
}
}
});

if (binding.lockTimeout() <= 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
Expand All @@ -26,6 +27,10 @@
import io.quarkus.cache.runtime.AbstractCache;
import io.quarkus.cache.runtime.NullValueConverter;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;

/**
* This class is an internal Quarkus cache implementation using Caffeine. Do not use it explicitly from your Quarkus
Expand Down Expand Up @@ -99,6 +104,7 @@ public CompletionStage<V> get() {
@Override
public <K, V> Uni<V> getAsync(K key, Function<K, Uni<V>> valueLoader) {
Objects.requireNonNull(key, NULL_KEYS_NOT_SUPPORTED_MSG);
Context context = Vertx.currentContext();
return Uni.createFrom()
.completionStage(new Supplier<CompletionStage<V>>() {
@Override
Expand All @@ -119,7 +125,51 @@ public CompletableFuture<Object> apply(Object key) {
recorder.doRecord(key);
return result;
}
}).map(fromCacheValue());
})
.map(fromCacheValue())
.emitOn(new Executor() {
// We need make sure we go back to the original context when the cache value is computed.
// Otherwise, we would always emit on the context having computed the value, which could
// break the duplicated context isolation.
@Override
public void execute(Runnable command) {
Context ctx = Vertx.currentContext();
if (context == null) {
// We didn't capture a context
if (ctx == null) {
// We are not on a context => we can execute immediately.
command.run();
} else {
// We are on a context.
// We cannot continue on the current context as we may share a duplicated context.
// We need a new one. Note that duplicate() does not duplicate the duplicated context,
// but the root context.
((ContextInternal) ctx).duplicate()
.runOnContext(new Handler<Void>() {
@Override
public void handle(Void ignored) {
command.run();
}
});
}
} else {
// We captured a context.
if (ctx == context) {
// We are on the same context => we can execute immediately
command.run();
} else {
// 1) We are not on a context (ctx == null) => we need to switch to the captured context.
// 2) We are on a different context (ctx != null) => we need to switch to the captured context.
context.runOnContext(new Handler<Void>() {
@Override
public void handle(Void ignored) {
command.run();
}
});
}
}
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
Expand All @@ -24,6 +25,10 @@
import io.quarkus.infinispan.client.runtime.InfinispanClientUtil;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;

/**
* This class is an internal Quarkus cache implementation using Infinispan.
Expand Down Expand Up @@ -111,6 +116,8 @@ public <K, V> Uni<V> get(K key, Function<K, V> valueLoader) {

@Override
public <K, V> Uni<V> getAsync(K key, Function<K, Uni<V>> valueLoader) {
Context context = Vertx.currentContext();

return Uni.createFrom().completionStage(CompletionStages.handleAndCompose(remoteCache.getAsync(key), (v1, ex1) -> {
if (ex1 != null) {
return CompletableFuture.failedFuture(ex1);
Expand Down Expand Up @@ -145,7 +152,49 @@ public <K, V> Uni<V> getAsync(K key, Function<K, Uni<V>> valueLoader) {
}
});
return resultAsync;
}));
})).emitOn(new Executor() {
// We need make sure we go back to the original context when the cache value is computed.
// Otherwise, we would always emit on the context having computed the value, which could
// break the duplicated context isolation.
@Override
public void execute(Runnable command) {
Context ctx = Vertx.currentContext();
if (context == null) {
// We didn't capture a context
if (ctx == null) {
// We are not on a context => we can execute immediately.
command.run();
} else {
// We are on a context.
// We cannot continue on the current context as we may share a duplicated context.
// We need a new one. Note that duplicate() does not duplicate the duplicated context,
// but the root context.
((ContextInternal) ctx).duplicate()
.runOnContext(new Handler<Void>() {
@Override
public void handle(Void ignored) {
command.run();
}
});
}
} else {
// We captured a context.
if (ctx == context) {
// We are on the same context => we can execute immediately
command.run();
} else {
// 1) We are not on a context (ctx == null) => we need to switch to the captured context.
// 2) We are on a different context (ctx != null) => we need to switch to the captured context.
context.runOnContext(new Handler<Void>() {
@Override
public void handle(Void ignored) {
command.run();
}
});
}
}
}
});
}

@Override
Expand Down

0 comments on commit 2e007d2

Please sign in to comment.