Skip to content

Commit

Permalink
Rocketmq 5: set context for async callback (#7238)
Browse files Browse the repository at this point in the history
Run callbacks added to the `CompletableFuture` returned from `sendAsync`
with the context that was used when `sendAsync` was called.
Add test for capturing message headers.
  • Loading branch information
laurit authored Nov 22, 2022
1 parent ae49d4f commit 910d177
Show file tree
Hide file tree
Showing 3 changed files with 347 additions and 107 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.util.concurrent.CompletableFuture;

public final class CompletableFutureWrapper {

private CompletableFutureWrapper() {}

public static <T> CompletableFuture<T> wrap(CompletableFuture<T> future) {
CompletableFuture<T> result = new CompletableFuture<>();
Context context = Context.current();
future.whenComplete(
(T value, Throwable throwable) -> {
try (Scope ignored = context.makeCurrent()) {
if (throwable != null) {
result.completeExceptionally(throwable);
} else {
result.complete(value);
}
}
});

return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;

import static io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0.RocketMqSingletons.producerInstrumenter;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPrivate;
import static net.bytebuddy.matcher.ElementMatchers.named;
Expand All @@ -17,9 +18,11 @@
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.client.java.impl.producer.SendReceiptImpl;
import org.apache.rocketmq.client.java.message.PublishingMessageImpl;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.Futures;
Expand Down Expand Up @@ -52,6 +55,13 @@ public void transform(TypeTransformer transformer) {
.and(takesArgument(4, List.class))
.and(takesArgument(5, int.class)),
ProducerImplInstrumentation.class.getName() + "$SendAdvice");

transformer.applyAdviceToMethod(
isMethod()
.and(named("sendAsync"))
.and(takesArguments(1))
.and(takesArgument(0, named("org.apache.rocketmq.client.apis.message.Message"))),
ProducerImplInstrumentation.class.getName() + "$SendAsyncAdvice");
}

@SuppressWarnings("unused")
Expand All @@ -60,8 +70,7 @@ public static class SendAdvice {
public static void onEnter(
@Advice.Argument(0) SettableFuture<List<SendReceiptImpl>> future0,
@Advice.Argument(4) List<PublishingMessageImpl> messages) {
Instrumenter<PublishingMessageImpl, SendReceiptImpl> instrumenter =
RocketMqSingletons.producerInstrumenter();
Instrumenter<PublishingMessageImpl, SendReceiptImpl> instrumenter = producerInstrumenter();
int count = messages.size();
List<SettableFuture<SendReceiptImpl>> futures = FutureConverter.convert(future0, count);
for (int i = 0; i < count; i++) {
Expand Down Expand Up @@ -90,4 +99,16 @@ public static void onEnter(
}
}
}

@SuppressWarnings("unused")
public static class SendAsyncAdvice {
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void onExit(
@Advice.Return(readOnly = false) CompletableFuture<SendReceipt> future,
@Advice.Thrown Throwable throwable) {
if (throwable == null) {
future = CompletableFutureWrapper.wrap(future);
}
}
}
}
Loading

0 comments on commit 910d177

Please sign in to comment.