diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpClientConnectInstrumentation.java b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpClientConnectInstrumentation.java new file mode 100644 index 000000000000..089f0bedd12d --- /dev/null +++ b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpClientConnectInstrumentation.java @@ -0,0 +1,53 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.returns; + +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import reactor.core.publisher.Mono; +import reactor.netty.Connection; +import reactor.netty.http.client.HttpClient; +import reactor.netty.http.client.HttpClientConfig; +import reactor.netty.http.client.HttpClientConfigBuddy; + +public class HttpClientConnectInstrumentation implements TypeInstrumentation { + @Override + public ElementMatcher typeMatcher() { + return named("reactor.netty.http.client.HttpClientConnect"); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + named("connect").and(returns(named("reactor.core.publisher.Mono"))), + this.getClass().getName() + "$ConnectAdvice"); + } + + @SuppressWarnings("unused") + public static class ConnectAdvice { + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit( + @Advice.Return(readOnly = false) Mono connection, + @Advice.This HttpClient httpClient) { + + HttpClientConfig config = httpClient.configuration(); + // reactor-netty 1.0.x has a bug: the .mapConnect() function is not applied when deferred + // configuration is used + // we're fixing this bug here, so that our instrumentation can safely add its own + // .mapConnect() listener + if (HttpClientConfigBuddy.hasDeferredConfig(config)) { + connection = HttpClientConfigBuddy.getConnector(config).apply(connection); + } + } + } +} diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/InstrumentationContexts.java b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/InstrumentationContexts.java index fddc5f15c3e9..7aee2f2cea93 100644 --- a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/InstrumentationContexts.java +++ b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/InstrumentationContexts.java @@ -13,12 +13,18 @@ import io.opentelemetry.instrumentation.api.internal.Timer; import java.util.Queue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import javax.annotation.Nullable; import reactor.netty.http.client.HttpClientRequest; import reactor.netty.http.client.HttpClientResponse; final class InstrumentationContexts { + private static final AtomicReferenceFieldUpdater + parentContextUpdater = + AtomicReferenceFieldUpdater.newUpdater( + InstrumentationContexts.class, Context.class, "parentContext"); + private volatile Context parentContext; private volatile Timer timer; // on retries, reactor-netty starts the next resend attempt before it ends the previous one (i.e. @@ -27,8 +33,11 @@ final class InstrumentationContexts { private final Queue clientContexts = new LinkedBlockingQueue<>(); void initialize(Context parentContext) { - this.parentContext = HttpClientResendCount.initialize(parentContext); - timer = Timer.start(); + Context parentContextWithResends = HttpClientResendCount.initialize(parentContext); + // make sure initialization happens only once + if (parentContextUpdater.compareAndSet(this, null, parentContextWithResends)) { + timer = Timer.start(); + } } Context getParentContext() { diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyInstrumentationModule.java b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyInstrumentationModule.java index 68f14dc4972f..13c762122a9e 100644 --- a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyInstrumentationModule.java +++ b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyInstrumentationModule.java @@ -36,10 +36,16 @@ public ElementMatcher.Junction classLoaderMatcher() { return hasClassesNamed("reactor.netty.transport.AddressUtils"); } + @Override + public boolean isHelperClass(String className) { + return className.startsWith("reactor.netty.http.client.HttpClientConfigBuddy"); + } + @Override public List typeInstrumentations() { return asList( new HttpClientInstrumentation(), + new HttpClientConnectInstrumentation(), new ResponseReceiverInstrumentation(), new TransportConnectorInstrumentation()); } diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/reactor/netty/http/client/HttpClientConfigBuddy.java b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/reactor/netty/http/client/HttpClientConfigBuddy.java new file mode 100644 index 000000000000..ade93dd4d922 --- /dev/null +++ b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/reactor/netty/http/client/HttpClientConfigBuddy.java @@ -0,0 +1,25 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package reactor.netty.http.client; + +import java.util.function.Function; +import reactor.core.publisher.Mono; +import reactor.netty.Connection; + +// class in reactor package to access package-private code +public final class HttpClientConfigBuddy { + + public static boolean hasDeferredConfig(HttpClientConfig config) { + return config.deferredConf != null; + } + + public static Function, ? extends Mono> + getConnector(HttpClientConfig config) { + return config.connector == null ? Function.identity() : config.connector; + } + + private HttpClientConfigBuddy() {} +} diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyHttpClientDeferredHeadersTest.java b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyHttpClientDeferredHeadersTest.java new file mode 100644 index 000000000000..4076df95a6e8 --- /dev/null +++ b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyHttpClientDeferredHeadersTest.java @@ -0,0 +1,56 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0; + +import io.netty.channel.ChannelOption; +import io.netty.handler.codec.http.HttpMethod; +import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestOptions; +import io.opentelemetry.testing.internal.armeria.common.HttpHeaderNames; +import java.net.URI; +import java.util.Map; +import reactor.core.publisher.Mono; +import reactor.netty.http.client.HttpClient; + +class ReactorNettyHttpClientDeferredHeadersTest extends AbstractReactorNettyHttpClientTest { + + @Override + protected HttpClient createHttpClient() { + int connectionTimeoutMillis = (int) CONNECTION_TIMEOUT.toMillis(); + return HttpClient.create() + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectionTimeoutMillis) + .resolver(getAddressResolverGroup()) + .headers(headers -> headers.set(HttpHeaderNames.USER_AGENT, USER_AGENT)); + } + + @Override + public HttpClient.ResponseReceiver buildRequest( + String method, URI uri, Map headers) { + HttpClient client = + createHttpClient() + .followRedirect(true) + .headersWhen( + h -> { + headers.forEach(h::add); + return Mono.just(h); + }) + .baseUrl(resolveAddress("").toString()); + if (uri.toString().contains("/read-timeout")) { + client = client.responseTimeout(READ_TIMEOUT); + } + return client.request(HttpMethod.valueOf(method)).uri(uri.toString()); + } + + @Override + protected void configure(HttpClientTestOptions.Builder optionsBuilder) { + super.configure(optionsBuilder); + + // these scenarios don't work because deferred config does not apply the doOnRequestError() + // callback + optionsBuilder.disableTestReadTimeout(); + optionsBuilder.disableTestConnectionFailure(); + optionsBuilder.disableTestRemoteConnection(); + } +}