Skip to content

Commit

Permalink
Context propagation for undertow async dispatch (open-telemetry#4950)
Browse files Browse the repository at this point in the history
  • Loading branch information
laurit authored and RashmiRam committed May 23, 2022
1 parent ecad6c7 commit e7041b2
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.undertow;

import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.field.VirtualField;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge;
import io.opentelemetry.javaagent.instrumentation.api.concurrent.ExecutorAdviceHelper;
import io.opentelemetry.javaagent.instrumentation.api.concurrent.PropagatedContext;
import java.util.concurrent.Executor;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

public class HttpServerExchangeInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("io.undertow.server.HttpServerExchange");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
named("dispatch").and(takesArguments(Executor.class, Runnable.class)),
this.getClass().getName() + "$DispatchAdvice");
}

@SuppressWarnings("unused")
public static class DispatchAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static PropagatedContext enterJobSubmit(
@Advice.Argument(value = 1, readOnly = false) Runnable task) {
Context context = Java8BytecodeBridge.currentContext();
if (ExecutorAdviceHelper.shouldPropagateContext(context, task)) {
VirtualField<Runnable, PropagatedContext> virtualField =
VirtualField.find(Runnable.class, PropagatedContext.class);
return ExecutorAdviceHelper.attachContextToTask(context, virtualField, task);
}
return null;
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void exitJobSubmit(
@Advice.Enter PropagatedContext propagatedContext, @Advice.Thrown Throwable throwable) {
ExecutorAdviceHelper.cleanUpAfterSubmit(propagatedContext, throwable);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package io.opentelemetry.javaagent.instrumentation.undertow;

import static java.util.Collections.singletonList;
import static java.util.Arrays.asList;

import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
Expand All @@ -21,6 +21,6 @@ public UndertowInstrumentationModule() {

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return singletonList(new HandlerInstrumentation());
return asList(new HandlerInstrumentation(), new HttpServerExchangeInstrumentation());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpServerTest
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import io.undertow.Handlers
import io.undertow.Undertow
import io.undertow.util.Headers
import io.undertow.util.HttpString
import io.undertow.util.StatusCodes

import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.CAPTURE_HEADERS
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.ERROR
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.INDEXED_CHILD
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.QUERY_PARAM
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.REDIRECT
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.SUCCESS

class UndertowServerDispatchTest extends HttpServerTest<Undertow> implements AgentTestTrait {

@Override
boolean verifyServerSpanEndTime() {
return false
}

@Override
boolean testException() {
// throwing exception from dispatched task just makes the request time out
return false
}

@Override
Undertow startServer(int port) {
Undertow server = Undertow.builder()
.addHttpListener(port, "localhost")
.setHandler(Handlers.path()
.addExactPath(SUCCESS.rawPath()) { exchange ->
exchange.dispatch {
controller(SUCCESS) {
exchange.getResponseSender().send(SUCCESS.body)
}
}
}
.addExactPath(QUERY_PARAM.rawPath()) { exchange ->
exchange.dispatch {
controller(QUERY_PARAM) {
exchange.getResponseSender().send(exchange.getQueryString())
}
}
}
.addExactPath(REDIRECT.rawPath()) { exchange ->
exchange.dispatch {
controller(REDIRECT) {
exchange.setStatusCode(StatusCodes.FOUND)
exchange.getResponseHeaders().put(Headers.LOCATION, REDIRECT.body)
exchange.endExchange()
}
}
}
.addExactPath(CAPTURE_HEADERS.rawPath()) { exchange ->
exchange.dispatch {
controller(CAPTURE_HEADERS) {
exchange.setStatusCode(StatusCodes.OK)
exchange.getResponseHeaders().put(new HttpString("X-Test-Response"), exchange.getRequestHeaders().getFirst("X-Test-Request"))
exchange.getResponseSender().send(CAPTURE_HEADERS.body)
}
}
}
.addExactPath(ERROR.rawPath()) { exchange ->
exchange.dispatch {
controller(ERROR) {
exchange.setStatusCode(ERROR.status)
exchange.getResponseSender().send(ERROR.body)
}
}
}
.addExactPath(INDEXED_CHILD.rawPath()) { exchange ->
exchange.dispatch {
controller(INDEXED_CHILD) {
INDEXED_CHILD.collectSpanAttributes { name -> exchange.getQueryParameters().get(name).peekFirst() }
exchange.getResponseSender().send(INDEXED_CHILD.body)
}
}
}
).build()
server.start()
return server
}

@Override
void stopServer(Undertow undertow) {
undertow.stop()
}

@Override
String expectedServerSpanName(ServerEndpoint endpoint) {
return "HTTP GET"
}

@Override
List<AttributeKey<?>> extraAttributes() {
[
SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH,
SemanticAttributes.NET_PEER_NAME,
SemanticAttributes.NET_TRANSPORT
]
}
}

0 comments on commit e7041b2

Please sign in to comment.