Skip to content

Commit

Permalink
Fix NPE happening when .headersWhen() is used (reactor-netty)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mateusz Rzeszutek committed Sep 20, 2023
1 parent ae25d48 commit 21140e7
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;

import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.returns;

import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClientConfig;
import reactor.netty.http.client.HttpClientConfigBuddy;

public class HttpClientConnectInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("reactor.netty.http.client.HttpClientConnect");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
named("connect").and(returns(named("reactor.core.publisher.Mono"))),
this.getClass().getName() + "$ConnectAdvice");
}

@SuppressWarnings("unused")
public static class ConnectAdvice {

@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(
@Advice.Return(readOnly = false) Mono<? extends Connection> connection,
@Advice.This HttpClient httpClient) {

HttpClientConfig config = httpClient.configuration();
if (HttpClientConfigBuddy.hasDeferredConfig(config)) {
connection = HttpClientConfigBuddy.getConnector(config).apply(connection);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,18 @@
import io.opentelemetry.instrumentation.api.internal.Timer;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.annotation.Nullable;
import reactor.netty.http.client.HttpClientRequest;
import reactor.netty.http.client.HttpClientResponse;

final class InstrumentationContexts {

private static final AtomicReferenceFieldUpdater<InstrumentationContexts, Context>
parentContextUpdater =
AtomicReferenceFieldUpdater.newUpdater(
InstrumentationContexts.class, Context.class, "parentContext");

private volatile Context parentContext;
private volatile Timer timer;
// on retries, reactor-netty starts the next resend attempt before it ends the previous one (i.e.
Expand All @@ -27,8 +33,11 @@ final class InstrumentationContexts {
private final Queue<RequestAndContext> clientContexts = new LinkedBlockingQueue<>();

void initialize(Context parentContext) {
this.parentContext = HttpClientResendCount.initialize(parentContext);
timer = Timer.start();
Context parentContextWithResends = HttpClientResendCount.initialize(parentContext);
// make sure initialization happens only once
if (parentContextUpdater.compareAndSet(this, null, parentContextWithResends)) {
timer = Timer.start();
}
}

Context getParentContext() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,16 @@ public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
return hasClassesNamed("reactor.netty.transport.AddressUtils");
}

@Override
public boolean isHelperClass(String className) {
return className.startsWith("reactor.netty.http.client.HttpClientConfigBuddy");
}

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return asList(
new HttpClientInstrumentation(),
new HttpClientConnectInstrumentation(),
new ResponseReceiverInstrumentation(),
new TransportConnectorInstrumentation());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package reactor.netty.http.client;

import java.util.function.Function;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;

// class in reactor package to access package-private code
public final class HttpClientConfigBuddy {

public static boolean hasDeferredConfig(HttpClientConfig config) {
return config.deferredConf != null;
}

public static Function<? super Mono<? extends Connection>, ? extends Mono<? extends Connection>>
getConnector(HttpClientConfig config) {
return config.connector == null ? Function.identity() : config.connector;
}

private HttpClientConfigBuddy() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;

import io.netty.channel.ChannelOption;
import io.netty.handler.codec.http.HttpMethod;
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestOptions;
import io.opentelemetry.testing.internal.armeria.common.HttpHeaderNames;
import java.net.URI;
import java.util.Map;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;

class ReactorNettyHttpClientDeferredHeadersTest extends AbstractReactorNettyHttpClientTest {

@Override
protected HttpClient createHttpClient() {
int connectionTimeoutMillis = (int) CONNECTION_TIMEOUT.toMillis();
return HttpClient.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectionTimeoutMillis)
.resolver(getAddressResolverGroup())
.headers(headers -> headers.set(HttpHeaderNames.USER_AGENT, USER_AGENT));
}

@Override
public HttpClient.ResponseReceiver<?> buildRequest(
String method, URI uri, Map<String, String> headers) {
HttpClient client =
createHttpClient()
.followRedirect(true)
.headersWhen(
h -> {
headers.forEach(h::add);
return Mono.just(h);
})
.baseUrl(resolveAddress("").toString());
if (uri.toString().contains("/read-timeout")) {
client = client.responseTimeout(READ_TIMEOUT);
}
return client.request(HttpMethod.valueOf(method)).uri(uri.toString());
}

@Override
protected void configure(HttpClientTestOptions.Builder optionsBuilder) {
super.configure(optionsBuilder);

// these scenarios don't work because deferred config does not apply the doOnRequestError()
// callback
optionsBuilder.disableTestReadTimeout();
optionsBuilder.disableTestConnectionFailure();
optionsBuilder.disableTestRemoteConnection();
}
}

0 comments on commit 21140e7

Please sign in to comment.