Skip to content

Commit

Permalink
Fix Ratpack server context propagation and enable its concurrency test (
Browse files Browse the repository at this point in the history
  • Loading branch information
agoallikmaa authored May 6, 2021
1 parent 5670024 commit a568daa
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,12 @@ public static class SetExecuteRunnableStateAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static State enterJobSubmit(
@Advice.Argument(value = 0, readOnly = false) Runnable task) {
Runnable newTask = RunnableWrapper.wrapIfNeeded(task);
if (ExecutorInstrumentationUtils.shouldAttachStateToTask(newTask)) {
task = newTask;
if (ExecutorInstrumentationUtils.shouldAttachStateToTask(task)) {
task = RunnableWrapper.wrapIfNeeded(task);
ContextStore<Runnable, State> contextStore =
InstrumentationContext.get(Runnable.class, State.class);
return ExecutorInstrumentationUtils.setupState(
contextStore, newTask, Java8BytecodeBridge.currentContext());
contextStore, task, Java8BytecodeBridge.currentContext());
}
return null;
}
Expand Down Expand Up @@ -118,13 +117,12 @@ public static class SetSubmitRunnableStateAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static State enterJobSubmit(
@Advice.Argument(value = 0, readOnly = false) Runnable task) {
Runnable newTask = RunnableWrapper.wrapIfNeeded(task);
if (ExecutorInstrumentationUtils.shouldAttachStateToTask(newTask)) {
task = newTask;
if (ExecutorInstrumentationUtils.shouldAttachStateToTask(task)) {
task = RunnableWrapper.wrapIfNeeded(task);
ContextStore<Runnable, State> contextStore =
InstrumentationContext.get(Runnable.class, State.class);
return ExecutorInstrumentationUtils.setupState(
contextStore, newTask, Java8BytecodeBridge.currentContext());
contextStore, task, Java8BytecodeBridge.currentContext());
}
return null;
}
Expand All @@ -148,13 +146,12 @@ public static class SetCallableStateAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static State enterJobSubmit(
@Advice.Argument(value = 0, readOnly = false) Callable task) {
Callable newTask = CallableWrapper.wrapIfNeeded(task);
if (ExecutorInstrumentationUtils.shouldAttachStateToTask(newTask)) {
task = newTask;
if (ExecutorInstrumentationUtils.shouldAttachStateToTask(task)) {
task = CallableWrapper.wrapIfNeeded(task);
ContextStore<Callable, State> contextStore =
InstrumentationContext.get(Callable.class, State.class);
return ExecutorInstrumentationUtils.setupState(
contextStore, newTask, Java8BytecodeBridge.currentContext());
contextStore, task, Java8BytecodeBridge.currentContext());
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.netty.v4_1.AttributeKeys;
import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge;
import ratpack.handling.Context;
import ratpack.handling.Handler;

Expand All @@ -23,9 +24,14 @@ public void handle(Context ctx) {
ctx.getDirectChannelAccess().getChannel().attr(AttributeKeys.SERVER_SPAN);
io.opentelemetry.context.Context serverSpanContext = spanAttribute.get();

// Relying on executor instrumentation to assume the netty span is in context as the parent.
// Must use context from channel, as executor instrumentation is not accurate - Ratpack
// internally queues events and then drains them in batches, causing executor instrumentation to
// attach the same context to a batch of events from different requests.
io.opentelemetry.context.Context parentContext =
serverSpanContext != null ? serverSpanContext : Java8BytecodeBridge.currentContext();

io.opentelemetry.context.Context ratpackContext =
tracer().startSpan("ratpack.handler", SpanKind.INTERNAL);
tracer().startSpan(parentContext, "ratpack.handler", SpanKind.INTERNAL);
ctx.getExecution().add(ratpackContext);

ctx.getResponse()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ package server

import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.ERROR
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.EXCEPTION
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.INDEXED_CHILD
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.PATH_PARAM
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.QUERY_PARAM
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.REDIRECT
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.SUCCESS

import io.opentelemetry.api.trace.Span
import ratpack.exec.Promise
import ratpack.groovy.test.embed.GroovyEmbeddedApp
import ratpack.test.embed.EmbeddedApp
Expand Down Expand Up @@ -40,6 +42,18 @@ class RatpackAsyncHttpServerTest extends RatpackHttpServerTest {
}
}
}
prefix(INDEXED_CHILD.rawPath()) {
all {
Promise.sync {
INDEXED_CHILD
} then {
controller(INDEXED_CHILD) {
Span.current().setAttribute("test.request.id", request.queryParams.get("id") as long)
context.response.status(INDEXED_CHILD.status).send()
}
}
}
}
prefix(QUERY_PARAM.rawPath()) {
all {
Promise.sync {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ package server

import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.ERROR
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.EXCEPTION
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.INDEXED_CHILD
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.PATH_PARAM
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.QUERY_PARAM
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.REDIRECT
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.SUCCESS

import io.opentelemetry.api.trace.Span
import ratpack.exec.Promise
import ratpack.groovy.test.embed.GroovyEmbeddedApp
import ratpack.test.embed.EmbeddedApp
Expand Down Expand Up @@ -40,6 +42,18 @@ class RatpackForkedHttpServerTest extends RatpackHttpServerTest {
}
}
}
prefix(INDEXED_CHILD.rawPath()) {
all {
Promise.sync {
INDEXED_CHILD
}.fork().then {
controller(INDEXED_CHILD) {
Span.current().setAttribute("test.request.id", request.queryParams.get("id") as long)
context.response.status(INDEXED_CHILD.status).send()
}
}
}
}
prefix(QUERY_PARAM.rawPath()) {
all {
Promise.sync {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ package server
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.ERROR
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.EXCEPTION
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.INDEXED_CHILD
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.PATH_PARAM
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.QUERY_PARAM
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.REDIRECT
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.SUCCESS

import io.opentelemetry.api.trace.Span
import io.opentelemetry.api.trace.StatusCode
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.asserts.TraceAssert
Expand Down Expand Up @@ -43,6 +45,14 @@ class RatpackHttpServerTest extends HttpServerTest<EmbeddedApp> implements Agent
}
}
}
prefix(INDEXED_CHILD.rawPath()) {
all {
controller(INDEXED_CHILD) {
Span.current().setAttribute("test.request.id", request.queryParams.get("id") as long)
context.response.status(INDEXED_CHILD.status).send()
}
}
}
prefix(QUERY_PARAM.rawPath()) {
all {
controller(QUERY_PARAM) {
Expand Down Expand Up @@ -108,6 +118,11 @@ class RatpackHttpServerTest extends HttpServerTest<EmbeddedApp> implements Agent
true
}

@Override
boolean testConcurrency() {
true
}

@Override
void handlerSpan(TraceAssert trace, int index, Object parent, String method = "GET", ServerEndpoint endpoint = SUCCESS) {
trace.span(index) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,15 @@ protected Boolean computeValue(Class<?> taskClass) {
return false;
}

if (taskClass.getName().startsWith("ratpack.exec.internal.")) {
// Context is passed through Netty channels in Ratpack as executor instrumentation is
// not suitable. As the context that would be propagated via executor would be
// incorrect, skip the propagation. Not checking for concrete class names as this covers
// anonymous classes from ratpack.exec.internal.DefaultExecution and
// ratpack.exec.internal.DefaultExecController.
return false;
}

return true;
}
};
Expand Down

0 comments on commit a568daa

Please sign in to comment.