From d87f40c9c7bbcf7bd552b9c30ac1824d8f3f21c3 Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Wed, 5 Apr 2023 17:11:05 +0300 Subject: [PATCH] Instrument akka-http bindAndHandle (#8174) Resolves https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/8143 Resolves https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/6081 Resolves https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/5137 Using the same approach as in https://github.com/open-telemetry/opentelemetry-java-instrumentation/pull/6243 and as used by DataDog. Unlike in #6243 this pr does not attempt to prevent leaking scopes into actors but rather instruments the actor to reset context to get rid of the leaked scopes (DataDog does the same). --- .../AkkaActorCellInstrumentation.java | 6 + .../akka-http-10.0/javaagent/build.gradle.kts | 5 +- .../akkahttp/server/AkkaFlowWrapper.java | 178 ++++++++++++++++++ .../AkkaHttpServerInstrumentationModule.java | 80 -------- .../server/HttpExtServerInstrumentation.java | 38 +--- .../AkkaHttpServerInstrumentationTest.scala | 35 ++++ .../akkahttp/AkkaHttpTestWebServer.scala | 51 +++-- .../play-mvc-2.6/javaagent/build.gradle.kts | 4 + 8 files changed, 267 insertions(+), 130 deletions(-) create mode 100644 instrumentation/akka/akka-http-10.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkahttp/server/AkkaFlowWrapper.java create mode 100644 instrumentation/akka/akka-http-10.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/akkahttp/AkkaHttpServerInstrumentationTest.scala diff --git a/instrumentation/akka/akka-actor-2.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkaactor/AkkaActorCellInstrumentation.java b/instrumentation/akka/akka-actor-2.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkaactor/AkkaActorCellInstrumentation.java index 161cc80a693e..6872a886a1c8 100644 --- a/instrumentation/akka/akka-actor-2.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkaactor/AkkaActorCellInstrumentation.java +++ b/instrumentation/akka/akka-actor-2.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkaactor/AkkaActorCellInstrumentation.java @@ -12,6 +12,7 @@ import akka.dispatch.sysmsg.SystemMessage; import io.opentelemetry.context.Scope; import io.opentelemetry.instrumentation.api.util.VirtualField; +import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge; import io.opentelemetry.javaagent.bootstrap.executors.PropagatedContext; import io.opentelemetry.javaagent.bootstrap.executors.TaskAdviceHelper; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; @@ -52,6 +53,11 @@ public static void exit(@Advice.Enter Scope scope) { if (scope != null) { scope.close(); } + // akka-http instrumentation can leak scopes + // reset the context to clear the leaked scopes + if (Java8BytecodeBridge.currentContext() != Java8BytecodeBridge.rootContext()) { + Java8BytecodeBridge.rootContext().makeCurrent(); + } } } diff --git a/instrumentation/akka/akka-http-10.0/javaagent/build.gradle.kts b/instrumentation/akka/akka-http-10.0/javaagent/build.gradle.kts index 94073bab6775..7bfb2d7a48ac 100644 --- a/instrumentation/akka/akka-http-10.0/javaagent/build.gradle.kts +++ b/instrumentation/akka/akka-http-10.0/javaagent/build.gradle.kts @@ -34,10 +34,9 @@ dependencies { library("com.typesafe.akka:akka-http_2.11:10.0.0") library("com.typesafe.akka:akka-stream_2.11:2.4.14") - // these instrumentations are not needed for the tests to pass - // they are here to test for context leaks testInstrumentation(project(":instrumentation:akka:akka-actor-2.3:javaagent")) testInstrumentation(project(":instrumentation:akka:akka-actor-fork-join-2.5:javaagent")) + testInstrumentation(project(":instrumentation:scala-fork-join-2.8:javaagent")) latestDepTestLibrary("com.typesafe.akka:akka-http_2.13:+") latestDepTestLibrary("com.typesafe.akka:akka-stream_2.13:+") @@ -48,6 +47,8 @@ tasks.withType().configureEach { jvmArgs("--add-exports=java.base/sun.security.util=ALL-UNNAMED") jvmArgs("-XX:+IgnoreUnrecognizedVMOptions") + jvmArgs("-Dio.opentelemetry.javaagent.shaded.io.opentelemetry.context.enableStrictContext=false") + systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean) } diff --git a/instrumentation/akka/akka-http-10.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkahttp/server/AkkaFlowWrapper.java b/instrumentation/akka/akka-http-10.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkahttp/server/AkkaFlowWrapper.java new file mode 100644 index 000000000000..227353cec6a7 --- /dev/null +++ b/instrumentation/akka/akka-http-10.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkahttp/server/AkkaFlowWrapper.java @@ -0,0 +1,178 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.akkahttp.server; + +import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentContext; +import static io.opentelemetry.javaagent.instrumentation.akkahttp.server.AkkaHttpServerSingletons.errorResponse; +import static io.opentelemetry.javaagent.instrumentation.akkahttp.server.AkkaHttpServerSingletons.instrumenter; + +import akka.http.scaladsl.model.HttpRequest; +import akka.http.scaladsl.model.HttpResponse; +import akka.stream.Attributes; +import akka.stream.BidiShape; +import akka.stream.Inlet; +import akka.stream.Outlet; +import akka.stream.scaladsl.Flow; +import akka.stream.stage.AbstractInHandler; +import akka.stream.stage.AbstractOutHandler; +import akka.stream.stage.GraphStage; +import akka.stream.stage.GraphStageLogic; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import java.util.ArrayDeque; +import java.util.Deque; + +public class AkkaFlowWrapper + extends GraphStage> { + private final Inlet requestIn = Inlet.create("otel.requestIn"); + private final Outlet requestOut = Outlet.create("otel.requestOut"); + private final Inlet responseIn = Inlet.create("otel.responseIn"); + private final Outlet responseOut = Outlet.create("otel.responseOut"); + + private final BidiShape shape = + BidiShape.of(responseIn, responseOut, requestIn, requestOut); + + public static Flow wrap( + Flow handler) { + return handler.join(new AkkaFlowWrapper()); + } + + @Override + public BidiShape shape() { + return shape; + } + + @Override + public GraphStageLogic createLogic(Attributes attributes) { + return new TracingLogic(); + } + + private class TracingLogic extends GraphStageLogic { + private final Deque requests = new ArrayDeque<>(); + + public TracingLogic() { + super(shape); + + // server pulls response, pass response from user code to server + setHandler( + responseOut, + new AbstractOutHandler() { + @Override + public void onPull() { + pull(responseIn); + } + + @Override + public void onDownstreamFinish() { + cancel(responseIn); + } + }); + + // user code pulls request, pass request from server to user code + setHandler( + requestOut, + new AbstractOutHandler() { + @Override + public void onPull() { + pull(requestIn); + } + + @Override + public void onDownstreamFinish() { + // Invoked on errors. Don't complete this stage to allow error-capturing + cancel(requestIn); + } + }); + + // new request from server + setHandler( + requestIn, + new AbstractInHandler() { + @Override + public void onPush() { + HttpRequest request = grab(requestIn); + + TracingRequest tracingRequest = TracingRequest.EMPTY; + Context parentContext = currentContext(); + if (instrumenter().shouldStart(parentContext, request)) { + Context context = instrumenter().start(parentContext, request); + // scope opened here may leak, actor instrumentation will close it + Scope scope = context.makeCurrent(); + tracingRequest = new TracingRequest(context, scope, request); + } + // event if span wasn't started we need to push TracingRequest to match response + // with request + requests.push(tracingRequest); + + push(requestOut, request); + } + + @Override + public void onUpstreamFinish() { + complete(requestOut); + } + + @Override + public void onUpstreamFailure(Throwable exception) { + fail(requestOut, exception); + } + }); + + // response from user code + setHandler( + responseIn, + new AbstractInHandler() { + @Override + public void onPush() { + HttpResponse response = grab(responseIn); + + TracingRequest tracingRequest = requests.poll(); + if (tracingRequest != null && tracingRequest != TracingRequest.EMPTY) { + // this may happen on a different thread from the one that opened the scope + // actor instrumentation will take care of the leaked scopes + tracingRequest.scope.close(); + instrumenter().end(tracingRequest.context, tracingRequest.request, response, null); + } + push(responseOut, response); + } + + @Override + public void onUpstreamFailure(Throwable exception) { + TracingRequest tracingRequest; + while ((tracingRequest = requests.poll()) != null) { + if (tracingRequest == TracingRequest.EMPTY) { + continue; + } + tracingRequest.scope.close(); + instrumenter() + .end( + tracingRequest.context, tracingRequest.request, errorResponse(), exception); + } + + fail(responseOut, exception); + } + + @Override + public void onUpstreamFinish() { + completeStage(); + } + }); + } + } + + private static class TracingRequest { + static final TracingRequest EMPTY = new TracingRequest(null, null, null); + final Context context; + final Scope scope; + final HttpRequest request; + + TracingRequest(Context context, Scope scope, HttpRequest request) { + this.context = context; + this.scope = scope; + this.request = request; + } + } +} diff --git a/instrumentation/akka/akka-http-10.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkahttp/server/AkkaHttpServerInstrumentationModule.java b/instrumentation/akka/akka-http-10.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkahttp/server/AkkaHttpServerInstrumentationModule.java index d489220b65a9..dbccf8f25460 100644 --- a/instrumentation/akka/akka-http-10.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkahttp/server/AkkaHttpServerInstrumentationModule.java +++ b/instrumentation/akka/akka-http-10.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkahttp/server/AkkaHttpServerInstrumentationModule.java @@ -5,23 +5,12 @@ package io.opentelemetry.javaagent.instrumentation.akkahttp.server; -import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentContext; -import static io.opentelemetry.javaagent.instrumentation.akkahttp.server.AkkaHttpServerSingletons.errorResponse; -import static io.opentelemetry.javaagent.instrumentation.akkahttp.server.AkkaHttpServerSingletons.instrumenter; import static java.util.Collections.singletonList; -import akka.http.scaladsl.model.HttpRequest; -import akka.http.scaladsl.model.HttpResponse; import com.google.auto.service.AutoService; -import io.opentelemetry.context.Context; -import io.opentelemetry.context.Scope; import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import java.util.List; -import scala.Function1; -import scala.concurrent.ExecutionContext; -import scala.concurrent.Future; -import scala.runtime.AbstractFunction1; @AutoService(InstrumentationModule.class) public class AkkaHttpServerInstrumentationModule extends InstrumentationModule { @@ -33,73 +22,4 @@ public AkkaHttpServerInstrumentationModule() { public List typeInstrumentations() { return singletonList(new HttpExtServerInstrumentation()); } - - public static class SyncWrapper extends AbstractFunction1 { - private final Function1 userHandler; - - public SyncWrapper(Function1 userHandler) { - this.userHandler = userHandler; - } - - @Override - public HttpResponse apply(HttpRequest request) { - Context parentContext = currentContext(); - if (!instrumenter().shouldStart(parentContext, request)) { - return userHandler.apply(request); - } - Context context = instrumenter().start(parentContext, request); - try (Scope ignored = context.makeCurrent()) { - HttpResponse response = userHandler.apply(request); - instrumenter().end(context, request, response, null); - return response; - } catch (Throwable t) { - instrumenter().end(context, request, errorResponse(), t); - throw t; - } - } - } - - public static class AsyncWrapper extends AbstractFunction1> { - private final Function1> userHandler; - private final ExecutionContext executionContext; - - public AsyncWrapper( - Function1> userHandler, - ExecutionContext executionContext) { - this.userHandler = userHandler; - this.executionContext = executionContext; - } - - @Override - public Future apply(HttpRequest request) { - Context parentContext = currentContext(); - if (!instrumenter().shouldStart(parentContext, request)) { - return userHandler.apply(request); - } - Context context = instrumenter().start(parentContext, request); - try (Scope ignored = context.makeCurrent()) { - return userHandler - .apply(request) - .transform( - new AbstractFunction1() { - @Override - public HttpResponse apply(HttpResponse response) { - instrumenter().end(context, request, response, null); - return response; - } - }, - new AbstractFunction1() { - @Override - public Throwable apply(Throwable t) { - instrumenter().end(context, request, errorResponse(), t); - return t; - } - }, - executionContext); - } catch (Throwable t) { - instrumenter().end(context, request, null, t); - throw t; - } - } - } } diff --git a/instrumentation/akka/akka-http-10.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkahttp/server/HttpExtServerInstrumentation.java b/instrumentation/akka/akka-http-10.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkahttp/server/HttpExtServerInstrumentation.java index 06d727306b6b..221c34ae631d 100644 --- a/instrumentation/akka/akka-http-10.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkahttp/server/HttpExtServerInstrumentation.java +++ b/instrumentation/akka/akka-http-10.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkahttp/server/HttpExtServerInstrumentation.java @@ -10,14 +10,12 @@ import akka.http.scaladsl.model.HttpRequest; import akka.http.scaladsl.model.HttpResponse; -import akka.stream.Materializer; +import akka.stream.scaladsl.Flow; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; -import scala.Function1; -import scala.concurrent.Future; public class HttpExtServerInstrumentation implements TypeInstrumentation { @Override @@ -27,42 +25,18 @@ public ElementMatcher typeMatcher() { @Override public void transform(TypeTransformer transformer) { - // Instrumenting akka-streams bindAndHandle api was previously attempted. - // This proved difficult as there was no clean way to close the async scope - // in the graph logic after the user's request handler completes. - // - // Instead, we're instrumenting the bindAndHandle function helpers by - // wrapping the scala functions with our own handlers. transformer.applyAdviceToMethod( - named("bindAndHandleSync").and(takesArgument(0, named("scala.Function1"))), - this.getClass().getName() + "$AkkaHttpSyncAdvice"); - transformer.applyAdviceToMethod( - named("bindAndHandleAsync").and(takesArgument(0, named("scala.Function1"))), - this.getClass().getName() + "$AkkaHttpAsyncAdvice"); - } - - @SuppressWarnings("unused") - public static class AkkaHttpSyncAdvice { - - @Advice.OnMethodEnter(suppress = Throwable.class) - public static void wrapHandler( - @Advice.Argument(value = 0, readOnly = false) - Function1 handler) { - handler = new AkkaHttpServerInstrumentationModule.SyncWrapper(handler); - } + named("bindAndHandle").and(takesArgument(0, named("akka.stream.scaladsl.Flow"))), + this.getClass().getName() + "$AkkaBindAndHandleAdvice"); } @SuppressWarnings("unused") - public static class AkkaHttpAsyncAdvice { + public static class AkkaBindAndHandleAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static void wrapHandler( - @Advice.Argument(value = 0, readOnly = false) - Function1> handler, - @Advice.Argument(7) Materializer materializer) { - handler = - new AkkaHttpServerInstrumentationModule.AsyncWrapper( - handler, materializer.executionContext()); + @Advice.Argument(value = 0, readOnly = false) Flow handler) { + handler = AkkaFlowWrapper.wrap(handler); } } } diff --git a/instrumentation/akka/akka-http-10.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/akkahttp/AkkaHttpServerInstrumentationTest.scala b/instrumentation/akka/akka-http-10.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/akkahttp/AkkaHttpServerInstrumentationTest.scala new file mode 100644 index 000000000000..2001f3335f9f --- /dev/null +++ b/instrumentation/akka/akka-http-10.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/akkahttp/AkkaHttpServerInstrumentationTest.scala @@ -0,0 +1,35 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.akkahttp + +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension +import io.opentelemetry.instrumentation.testing.junit.http.{ + HttpServerInstrumentationExtension, + HttpServerTestOptions +} +import org.junit.jupiter.api.extension.RegisterExtension + +class AkkaHttpServerInstrumentationTest + extends AbstractHttpServerInstrumentationTest { + @RegisterExtension val extension: InstrumentationExtension = + HttpServerInstrumentationExtension.forAgent() + + override protected def setupServer(): AnyRef = { + AkkaHttpTestWebServer.start(port) + null + } + + override protected def stopServer(server: Object): Unit = + AkkaHttpTestWebServer.stop() + + override protected def configure( + options: HttpServerTestOptions + ): Unit = { + super.configure(options) + // exception doesn't propagate + options.setTestException(false) + } +} diff --git a/instrumentation/akka/akka-http-10.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/akkahttp/AkkaHttpTestWebServer.scala b/instrumentation/akka/akka-http-10.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/akkahttp/AkkaHttpTestWebServer.scala index cbe891716cd3..69fffed25314 100644 --- a/instrumentation/akka/akka-http-10.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/akkahttp/AkkaHttpTestWebServer.scala +++ b/instrumentation/akka/akka-http-10.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/akkahttp/AkkaHttpTestWebServer.scala @@ -12,11 +12,15 @@ import akka.http.scaladsl.model._ import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server.ExceptionHandler import akka.stream.ActorMaterializer +import io.opentelemetry.instrumentation.testing.junit.http.{ + AbstractHttpServerTest, + ServerEndpoint +} import io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint._ +import java.util.function.Supplier import scala.concurrent.Await -// FIXME: This doesn't work because we don't support bindAndHandle. object AkkaHttpTestWebServer { implicit val system = ActorSystem("my-system") implicit val materializer = ActorMaterializer() @@ -29,21 +33,36 @@ object AkkaHttpTestWebServer { ) } - val route = { // handleExceptions(exceptionHandler) { - path(SUCCESS.rawPath) { - complete( - HttpResponse(status = SUCCESS.getStatus).withEntity(SUCCESS.getBody) - ) - } ~ path(QUERY_PARAM.rawPath) { - complete( - HttpResponse(status = QUERY_PARAM.getStatus).withEntity(SUCCESS.getBody) - ) - } ~ path(REDIRECT.rawPath) { - redirect(Uri(REDIRECT.getBody), StatusCodes.Found) - } ~ path(ERROR.rawPath) { - complete(HttpResponse(status = ERROR.getStatus).withEntity(ERROR.getBody)) - } ~ path(EXCEPTION.rawPath) { - failWith(new Exception(EXCEPTION.getBody)) + val route = handleExceptions(exceptionHandler) { + extractUri { uri => + val endpoint = ServerEndpoint.forPath(uri.path.toString()) + complete { + AbstractHttpServerTest.controller( + endpoint, + new Supplier[HttpResponse] { + def get(): HttpResponse = { + val resp = HttpResponse(status = endpoint.getStatus) + endpoint match { + case SUCCESS => resp.withEntity(endpoint.getBody) + case INDEXED_CHILD => + INDEXED_CHILD.collectSpanAttributes(new UrlParameterProvider { + override def getParameter(name: String): String = + uri.query().get(name).orNull + }) + resp.withEntity("") + case QUERY_PARAM => resp.withEntity(uri.queryString().orNull) + case REDIRECT => + resp.withHeaders(headers.Location(endpoint.getBody)) + case ERROR => resp.withEntity(endpoint.getBody) + case EXCEPTION => throw new Exception(endpoint.getBody) + case _ => + HttpResponse(status = NOT_FOUND.getStatus) + .withEntity(NOT_FOUND.getBody) + } + } + } + ) + } } } diff --git a/instrumentation/play/play-mvc/play-mvc-2.6/javaagent/build.gradle.kts b/instrumentation/play/play-mvc/play-mvc-2.6/javaagent/build.gradle.kts index facf129f50e6..85baa0fa99fd 100644 --- a/instrumentation/play/play-mvc/play-mvc-2.6/javaagent/build.gradle.kts +++ b/instrumentation/play/play-mvc/play-mvc-2.6/javaagent/build.gradle.kts @@ -74,6 +74,10 @@ tasks { dependsOn(testing.suites) } } + + withType().configureEach { + jvmArgs("-Dio.opentelemetry.javaagent.shaded.io.opentelemetry.context.enableStrictContext=false") + } } // play-test depends on websocket-client