diff --git a/instrumentation/akka/akka-actor-2.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkaactor/AkkaActorCellInstrumentation.java b/instrumentation/akka/akka-actor-2.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkaactor/AkkaActorCellInstrumentation.java index 161cc80a693e..6872a886a1c8 100644 --- a/instrumentation/akka/akka-actor-2.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkaactor/AkkaActorCellInstrumentation.java +++ b/instrumentation/akka/akka-actor-2.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkaactor/AkkaActorCellInstrumentation.java @@ -12,6 +12,7 @@ import akka.dispatch.sysmsg.SystemMessage; import io.opentelemetry.context.Scope; import io.opentelemetry.instrumentation.api.util.VirtualField; +import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge; import io.opentelemetry.javaagent.bootstrap.executors.PropagatedContext; import io.opentelemetry.javaagent.bootstrap.executors.TaskAdviceHelper; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; @@ -52,6 +53,11 @@ public static void exit(@Advice.Enter Scope scope) { if (scope != null) { scope.close(); } + // akka-http instrumentation can leak scopes + // reset the context to clear the leaked scopes + if (Java8BytecodeBridge.currentContext() != Java8BytecodeBridge.rootContext()) { + Java8BytecodeBridge.rootContext().makeCurrent(); + } } } diff --git a/instrumentation/akka/akka-http-10.0/javaagent/build.gradle.kts b/instrumentation/akka/akka-http-10.0/javaagent/build.gradle.kts index 94073bab6775..7bfb2d7a48ac 100644 --- a/instrumentation/akka/akka-http-10.0/javaagent/build.gradle.kts +++ b/instrumentation/akka/akka-http-10.0/javaagent/build.gradle.kts @@ -34,10 +34,9 @@ dependencies { library("com.typesafe.akka:akka-http_2.11:10.0.0") library("com.typesafe.akka:akka-stream_2.11:2.4.14") - // these instrumentations are not needed for the tests to pass - // they are here to test for context leaks testInstrumentation(project(":instrumentation:akka:akka-actor-2.3:javaagent")) testInstrumentation(project(":instrumentation:akka:akka-actor-fork-join-2.5:javaagent")) + testInstrumentation(project(":instrumentation:scala-fork-join-2.8:javaagent")) latestDepTestLibrary("com.typesafe.akka:akka-http_2.13:+") latestDepTestLibrary("com.typesafe.akka:akka-stream_2.13:+") @@ -48,6 +47,8 @@ tasks.withType().configureEach { jvmArgs("--add-exports=java.base/sun.security.util=ALL-UNNAMED") jvmArgs("-XX:+IgnoreUnrecognizedVMOptions") + jvmArgs("-Dio.opentelemetry.javaagent.shaded.io.opentelemetry.context.enableStrictContext=false") + systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean) } diff --git a/instrumentation/akka/akka-http-10.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkahttp/server/AkkaFlowWrapper.java b/instrumentation/akka/akka-http-10.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkahttp/server/AkkaFlowWrapper.java new file mode 100644 index 000000000000..227353cec6a7 --- /dev/null +++ b/instrumentation/akka/akka-http-10.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkahttp/server/AkkaFlowWrapper.java @@ -0,0 +1,178 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.akkahttp.server; + +import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentContext; +import static io.opentelemetry.javaagent.instrumentation.akkahttp.server.AkkaHttpServerSingletons.errorResponse; +import static io.opentelemetry.javaagent.instrumentation.akkahttp.server.AkkaHttpServerSingletons.instrumenter; + +import akka.http.scaladsl.model.HttpRequest; +import akka.http.scaladsl.model.HttpResponse; +import akka.stream.Attributes; +import akka.stream.BidiShape; +import akka.stream.Inlet; +import akka.stream.Outlet; +import akka.stream.scaladsl.Flow; +import akka.stream.stage.AbstractInHandler; +import akka.stream.stage.AbstractOutHandler; +import akka.stream.stage.GraphStage; +import akka.stream.stage.GraphStageLogic; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import java.util.ArrayDeque; +import java.util.Deque; + +public class AkkaFlowWrapper + extends GraphStage> { + private final Inlet requestIn = Inlet.create("otel.requestIn"); + private final Outlet requestOut = Outlet.create("otel.requestOut"); + private final Inlet responseIn = Inlet.create("otel.responseIn"); + private final Outlet responseOut = Outlet.create("otel.responseOut"); + + private final BidiShape shape = + BidiShape.of(responseIn, responseOut, requestIn, requestOut); + + public static Flow wrap( + Flow handler) { + return handler.join(new AkkaFlowWrapper()); + } + + @Override + public BidiShape shape() { + return shape; + } + + @Override + public GraphStageLogic createLogic(Attributes attributes) { + return new TracingLogic(); + } + + private class TracingLogic extends GraphStageLogic { + private final Deque requests = new ArrayDeque<>(); + + public TracingLogic() { + super(shape); + + // server pulls response, pass response from user code to server + setHandler( + responseOut, + new AbstractOutHandler() { + @Override + public void onPull() { + pull(responseIn); + } + + @Override + public void onDownstreamFinish() { + cancel(responseIn); + } + }); + + // user code pulls request, pass request from server to user code + setHandler( + requestOut, + new AbstractOutHandler() { + @Override + public void onPull() { + pull(requestIn); + } + + @Override + public void onDownstreamFinish() { + // Invoked on errors. Don't complete this stage to allow error-capturing + cancel(requestIn); + } + }); + + // new request from server + setHandler( + requestIn, + new AbstractInHandler() { + @Override + public void onPush() { + HttpRequest request = grab(requestIn); + + TracingRequest tracingRequest = TracingRequest.EMPTY; + Context parentContext = currentContext(); + if (instrumenter().shouldStart(parentContext, request)) { + Context context = instrumenter().start(parentContext, request); + // scope opened here may leak, actor instrumentation will close it + Scope scope = context.makeCurrent(); + tracingRequest = new TracingRequest(context, scope, request); + } + // event if span wasn't started we need to push TracingRequest to match response + // with request + requests.push(tracingRequest); + + push(requestOut, request); + } + + @Override + public void onUpstreamFinish() { + complete(requestOut); + } + + @Override + public void onUpstreamFailure(Throwable exception) { + fail(requestOut, exception); + } + }); + + // response from user code + setHandler( + responseIn, + new AbstractInHandler() { + @Override + public void onPush() { + HttpResponse response = grab(responseIn); + + TracingRequest tracingRequest = requests.poll(); + if (tracingRequest != null && tracingRequest != TracingRequest.EMPTY) { + // this may happen on a different thread from the one that opened the scope + // actor instrumentation will take care of the leaked scopes + tracingRequest.scope.close(); + instrumenter().end(tracingRequest.context, tracingRequest.request, response, null); + } + push(responseOut, response); + } + + @Override + public void onUpstreamFailure(Throwable exception) { + TracingRequest tracingRequest; + while ((tracingRequest = requests.poll()) != null) { + if (tracingRequest == TracingRequest.EMPTY) { + continue; + } + tracingRequest.scope.close(); + instrumenter() + .end( + tracingRequest.context, tracingRequest.request, errorResponse(), exception); + } + + fail(responseOut, exception); + } + + @Override + public void onUpstreamFinish() { + completeStage(); + } + }); + } + } + + private static class TracingRequest { + static final TracingRequest EMPTY = new TracingRequest(null, null, null); + final Context context; + final Scope scope; + final HttpRequest request; + + TracingRequest(Context context, Scope scope, HttpRequest request) { + this.context = context; + this.scope = scope; + this.request = request; + } + } +} diff --git a/instrumentation/akka/akka-http-10.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkahttp/server/AkkaHttpServerInstrumentationModule.java b/instrumentation/akka/akka-http-10.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkahttp/server/AkkaHttpServerInstrumentationModule.java index d489220b65a9..dbccf8f25460 100644 --- a/instrumentation/akka/akka-http-10.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkahttp/server/AkkaHttpServerInstrumentationModule.java +++ b/instrumentation/akka/akka-http-10.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkahttp/server/AkkaHttpServerInstrumentationModule.java @@ -5,23 +5,12 @@ package io.opentelemetry.javaagent.instrumentation.akkahttp.server; -import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentContext; -import static io.opentelemetry.javaagent.instrumentation.akkahttp.server.AkkaHttpServerSingletons.errorResponse; -import static io.opentelemetry.javaagent.instrumentation.akkahttp.server.AkkaHttpServerSingletons.instrumenter; import static java.util.Collections.singletonList; -import akka.http.scaladsl.model.HttpRequest; -import akka.http.scaladsl.model.HttpResponse; import com.google.auto.service.AutoService; -import io.opentelemetry.context.Context; -import io.opentelemetry.context.Scope; import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import java.util.List; -import scala.Function1; -import scala.concurrent.ExecutionContext; -import scala.concurrent.Future; -import scala.runtime.AbstractFunction1; @AutoService(InstrumentationModule.class) public class AkkaHttpServerInstrumentationModule extends InstrumentationModule { @@ -33,73 +22,4 @@ public AkkaHttpServerInstrumentationModule() { public List typeInstrumentations() { return singletonList(new HttpExtServerInstrumentation()); } - - public static class SyncWrapper extends AbstractFunction1 { - private final Function1 userHandler; - - public SyncWrapper(Function1 userHandler) { - this.userHandler = userHandler; - } - - @Override - public HttpResponse apply(HttpRequest request) { - Context parentContext = currentContext(); - if (!instrumenter().shouldStart(parentContext, request)) { - return userHandler.apply(request); - } - Context context = instrumenter().start(parentContext, request); - try (Scope ignored = context.makeCurrent()) { - HttpResponse response = userHandler.apply(request); - instrumenter().end(context, request, response, null); - return response; - } catch (Throwable t) { - instrumenter().end(context, request, errorResponse(), t); - throw t; - } - } - } - - public static class AsyncWrapper extends AbstractFunction1> { - private final Function1> userHandler; - private final ExecutionContext executionContext; - - public AsyncWrapper( - Function1> userHandler, - ExecutionContext executionContext) { - this.userHandler = userHandler; - this.executionContext = executionContext; - } - - @Override - public Future apply(HttpRequest request) { - Context parentContext = currentContext(); - if (!instrumenter().shouldStart(parentContext, request)) { - return userHandler.apply(request); - } - Context context = instrumenter().start(parentContext, request); - try (Scope ignored = context.makeCurrent()) { - return userHandler - .apply(request) - .transform( - new AbstractFunction1() { - @Override - public HttpResponse apply(HttpResponse response) { - instrumenter().end(context, request, response, null); - return response; - } - }, - new AbstractFunction1() { - @Override - public Throwable apply(Throwable t) { - instrumenter().end(context, request, errorResponse(), t); - return t; - } - }, - executionContext); - } catch (Throwable t) { - instrumenter().end(context, request, null, t); - throw t; - } - } - } } diff --git a/instrumentation/akka/akka-http-10.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkahttp/server/HttpExtServerInstrumentation.java b/instrumentation/akka/akka-http-10.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkahttp/server/HttpExtServerInstrumentation.java index 06d727306b6b..221c34ae631d 100644 --- a/instrumentation/akka/akka-http-10.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkahttp/server/HttpExtServerInstrumentation.java +++ b/instrumentation/akka/akka-http-10.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkahttp/server/HttpExtServerInstrumentation.java @@ -10,14 +10,12 @@ import akka.http.scaladsl.model.HttpRequest; import akka.http.scaladsl.model.HttpResponse; -import akka.stream.Materializer; +import akka.stream.scaladsl.Flow; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; -import scala.Function1; -import scala.concurrent.Future; public class HttpExtServerInstrumentation implements TypeInstrumentation { @Override @@ -27,42 +25,18 @@ public ElementMatcher typeMatcher() { @Override public void transform(TypeTransformer transformer) { - // Instrumenting akka-streams bindAndHandle api was previously attempted. - // This proved difficult as there was no clean way to close the async scope - // in the graph logic after the user's request handler completes. - // - // Instead, we're instrumenting the bindAndHandle function helpers by - // wrapping the scala functions with our own handlers. transformer.applyAdviceToMethod( - named("bindAndHandleSync").and(takesArgument(0, named("scala.Function1"))), - this.getClass().getName() + "$AkkaHttpSyncAdvice"); - transformer.applyAdviceToMethod( - named("bindAndHandleAsync").and(takesArgument(0, named("scala.Function1"))), - this.getClass().getName() + "$AkkaHttpAsyncAdvice"); - } - - @SuppressWarnings("unused") - public static class AkkaHttpSyncAdvice { - - @Advice.OnMethodEnter(suppress = Throwable.class) - public static void wrapHandler( - @Advice.Argument(value = 0, readOnly = false) - Function1 handler) { - handler = new AkkaHttpServerInstrumentationModule.SyncWrapper(handler); - } + named("bindAndHandle").and(takesArgument(0, named("akka.stream.scaladsl.Flow"))), + this.getClass().getName() + "$AkkaBindAndHandleAdvice"); } @SuppressWarnings("unused") - public static class AkkaHttpAsyncAdvice { + public static class AkkaBindAndHandleAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static void wrapHandler( - @Advice.Argument(value = 0, readOnly = false) - Function1> handler, - @Advice.Argument(7) Materializer materializer) { - handler = - new AkkaHttpServerInstrumentationModule.AsyncWrapper( - handler, materializer.executionContext()); + @Advice.Argument(value = 0, readOnly = false) Flow handler) { + handler = AkkaFlowWrapper.wrap(handler); } } } diff --git a/instrumentation/akka/akka-http-10.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/akkahttp/AkkaHttpServerInstrumentationTest.scala b/instrumentation/akka/akka-http-10.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/akkahttp/AkkaHttpServerInstrumentationTest.scala new file mode 100644 index 000000000000..2001f3335f9f --- /dev/null +++ b/instrumentation/akka/akka-http-10.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/akkahttp/AkkaHttpServerInstrumentationTest.scala @@ -0,0 +1,35 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.akkahttp + +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension +import io.opentelemetry.instrumentation.testing.junit.http.{ + HttpServerInstrumentationExtension, + HttpServerTestOptions +} +import org.junit.jupiter.api.extension.RegisterExtension + +class AkkaHttpServerInstrumentationTest + extends AbstractHttpServerInstrumentationTest { + @RegisterExtension val extension: InstrumentationExtension = + HttpServerInstrumentationExtension.forAgent() + + override protected def setupServer(): AnyRef = { + AkkaHttpTestWebServer.start(port) + null + } + + override protected def stopServer(server: Object): Unit = + AkkaHttpTestWebServer.stop() + + override protected def configure( + options: HttpServerTestOptions + ): Unit = { + super.configure(options) + // exception doesn't propagate + options.setTestException(false) + } +} diff --git a/instrumentation/akka/akka-http-10.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/akkahttp/AkkaHttpTestWebServer.scala b/instrumentation/akka/akka-http-10.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/akkahttp/AkkaHttpTestWebServer.scala index cbe891716cd3..69fffed25314 100644 --- a/instrumentation/akka/akka-http-10.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/akkahttp/AkkaHttpTestWebServer.scala +++ b/instrumentation/akka/akka-http-10.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/akkahttp/AkkaHttpTestWebServer.scala @@ -12,11 +12,15 @@ import akka.http.scaladsl.model._ import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server.ExceptionHandler import akka.stream.ActorMaterializer +import io.opentelemetry.instrumentation.testing.junit.http.{ + AbstractHttpServerTest, + ServerEndpoint +} import io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint._ +import java.util.function.Supplier import scala.concurrent.Await -// FIXME: This doesn't work because we don't support bindAndHandle. object AkkaHttpTestWebServer { implicit val system = ActorSystem("my-system") implicit val materializer = ActorMaterializer() @@ -29,21 +33,36 @@ object AkkaHttpTestWebServer { ) } - val route = { // handleExceptions(exceptionHandler) { - path(SUCCESS.rawPath) { - complete( - HttpResponse(status = SUCCESS.getStatus).withEntity(SUCCESS.getBody) - ) - } ~ path(QUERY_PARAM.rawPath) { - complete( - HttpResponse(status = QUERY_PARAM.getStatus).withEntity(SUCCESS.getBody) - ) - } ~ path(REDIRECT.rawPath) { - redirect(Uri(REDIRECT.getBody), StatusCodes.Found) - } ~ path(ERROR.rawPath) { - complete(HttpResponse(status = ERROR.getStatus).withEntity(ERROR.getBody)) - } ~ path(EXCEPTION.rawPath) { - failWith(new Exception(EXCEPTION.getBody)) + val route = handleExceptions(exceptionHandler) { + extractUri { uri => + val endpoint = ServerEndpoint.forPath(uri.path.toString()) + complete { + AbstractHttpServerTest.controller( + endpoint, + new Supplier[HttpResponse] { + def get(): HttpResponse = { + val resp = HttpResponse(status = endpoint.getStatus) + endpoint match { + case SUCCESS => resp.withEntity(endpoint.getBody) + case INDEXED_CHILD => + INDEXED_CHILD.collectSpanAttributes(new UrlParameterProvider { + override def getParameter(name: String): String = + uri.query().get(name).orNull + }) + resp.withEntity("") + case QUERY_PARAM => resp.withEntity(uri.queryString().orNull) + case REDIRECT => + resp.withHeaders(headers.Location(endpoint.getBody)) + case ERROR => resp.withEntity(endpoint.getBody) + case EXCEPTION => throw new Exception(endpoint.getBody) + case _ => + HttpResponse(status = NOT_FOUND.getStatus) + .withEntity(NOT_FOUND.getBody) + } + } + } + ) + } } } diff --git a/instrumentation/play/play-mvc/play-mvc-2.6/javaagent/build.gradle.kts b/instrumentation/play/play-mvc/play-mvc-2.6/javaagent/build.gradle.kts index facf129f50e6..85baa0fa99fd 100644 --- a/instrumentation/play/play-mvc/play-mvc-2.6/javaagent/build.gradle.kts +++ b/instrumentation/play/play-mvc/play-mvc-2.6/javaagent/build.gradle.kts @@ -74,6 +74,10 @@ tasks { dependsOn(testing.suites) } } + + withType().configureEach { + jvmArgs("-Dio.opentelemetry.javaagent.shaded.io.opentelemetry.context.enableStrictContext=false") + } } // play-test depends on websocket-client