Skip to content

Commit

Permalink
Fix redisson ClassCastException (#6054)
Browse files Browse the repository at this point in the history
  • Loading branch information
laurit authored May 17, 2022
1 parent 83c4418 commit 1298b46
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 162 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,19 @@ public final class CompletableFutureWrapper<T> extends CompletableFuture<T>
private volatile EndOperationListener<T> endOperationListener;

private CompletableFutureWrapper(CompletableFuture<T> delegate) {
Context context = Context.current();
this.whenComplete(
(result, error) -> {
EndOperationListener<T> endOperationListener = this.endOperationListener;
if (endOperationListener != null) {
endOperationListener.accept(result, error);
}
if (error != null) {
delegate.completeExceptionally(error);
} else {
delegate.complete(result);
try (Scope ignored = context.makeCurrent()) {
if (error != null) {
delegate.completeExceptionally(error);
} else {
delegate.complete(result);
}
}
});
}
Expand All @@ -40,33 +43,6 @@ public static <T> CompletableFuture<T> wrap(CompletableFuture<T> delegate) {
return new CompletableFutureWrapper<>(delegate);
}

/**
* Wrap {@link CompletableFuture} to run callbacks with the context that was current at the time
* this method was called.
*
* <p>This method should be called on, or as close as possible to, the {@link CompletableFuture}
* that is returned to the user to ensure that the callbacks added by user are run in appropriate
* context.
*/
public static <T> CompletableFuture<T> wrapContext(CompletableFuture<T> future) {
Context context = Context.current();
// when input future is completed, complete result future with context that was current
// at the time when the future was wrapped
CompletableFuture<T> result = new CompletableFuture<>();
future.whenComplete(
(T value, Throwable throwable) -> {
try (Scope ignored = context.makeCurrent()) {
if (throwable != null) {
result.completeExceptionally(throwable);
} else {
result.complete(value);
}
}
});

return result;
}

@Override
public void setEndOperationListener(EndOperationListener<T> endOperationListener) {
this.endOperationListener = endOperationListener;
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@ public RedissonInstrumentationModule() {

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return asList(
new RedisConnectionInstrumentation(),
new RedisCommandDataInstrumentation(),
new RedisCommandAsyncServiceInstrumentation(),
new RedissonCompletableFutureWrapperInstrumentation());
return asList(new RedisConnectionInstrumentation(), new RedisCommandDataInstrumentation());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,19 @@ public class RedissonPromiseWrapper<T> extends RedissonPromise<T> implements Pro
private volatile EndOperationListener<T> endOperationListener;

private RedissonPromiseWrapper(RPromise<T> delegate) {
Context context = Context.current();
this.whenComplete(
(result, error) -> {
EndOperationListener<T> endOperationListener = this.endOperationListener;
if (endOperationListener != null) {
endOperationListener.accept(result, error);
}
if (error != null) {
delegate.tryFailure(error);
} else {
delegate.trySuccess(result);
try (Scope ignored = context.makeCurrent()) {
if (error != null) {
delegate.tryFailure(error);
} else {
delegate.trySuccess(result);
}
}
});
}
Expand All @@ -40,32 +43,6 @@ public static <T> RPromise<T> wrap(RPromise<T> delegate) {
return new RedissonPromiseWrapper<>(delegate);
}

/**
* Wrap {@link RPromise} to run callbacks with the context that was current at the time this
* method was called.
*
* <p>This method should be called on, or as close as possible to, the {@link RPromise} that is
* returned to the user to ensure that the callbacks added by user are run in appropriate context.
*/
public static <T> RPromise<T> wrapContext(RPromise<T> promise) {
Context context = Context.current();
// when returned promise is completed, complete input promise with context that was current
// at the time when the promise was wrapped
RPromise<T> result = new RedissonPromise<T>();
result.whenComplete(
(value, error) -> {
try (Scope ignored = context.makeCurrent()) {
if (error != null) {
promise.tryFailure(error);
} else {
promise.trySuccess(value);
}
}
});

return result;
}

@Override
public void setEndOperationListener(EndOperationListener<T> endOperationListener) {
this.endOperationListener = endOperationListener;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
import io.opentelemetry.api.trace.Span
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import java.util.concurrent.Callable
import java.util.concurrent.CompletionStage
import org.redisson.Redisson
import org.redisson.api.RBucket
import org.redisson.api.RFuture
import org.redisson.api.RScheduledExecutorService
import org.redisson.api.RSet
import org.redisson.api.RedissonClient
import org.redisson.config.Config
Expand Down Expand Up @@ -129,5 +131,21 @@ class RedissonAsyncClientTest extends AgentInstrumentationSpecification {
}
}

// regression test for https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/6033
def "test schedule"() {
RScheduledExecutorService executorService = redisson.getExecutorService("EXECUTOR")
def taskId = executorService.schedule(new MyCallable(), 0, TimeUnit.SECONDS)
.getTaskId()
expect:
taskId != null
}

private static class MyCallable implements Callable, Serializable {

@Override
Object call() throws Exception {
return null
}
}
}

0 comments on commit 1298b46

Please sign in to comment.